import bisect
import functools
import flowws
from flowws import Argument as Arg
import gtar
gtar.widths.setdefault('color', 4)
def index_sort_key(x):
"""Sorting key to use for getar frame indices"""
return (len(x), x)
def find_le(a, x):
"""Find rightmost value less than or equal to x"""
i = bisect.bisect_right(a, x)
if i:
return a[i-1]
raise ValueError
[docs]@flowws.add_stage_arguments
class GTAR(flowws.Stage):
"""Emit the contents of a :std:doc:`libgetar<libgetar:index>`-format file into the scope.
The GTAR module outputs the records found in a getar-format file
directly into the scope. It provides a notion of frames in a
trajectory using the discretely-varying record with the most
indices as the basis.
"""
ARGS = [
Arg('filename', '-i', str, required=True,
help='Getar-format filename to open'),
Arg('frame', '-f', int, 0,
help='Frame to load'),
Arg('loop_frames', type=bool, default=False,
help='If True, loop the workflow over frames found in the trajectory file, beginning at the given frame'),
Arg('group', '-g', str, '',
help='GTAR group to restrict results to'),
]
def __init__(self, *args, **kwargs):
self._cached_record_frames = {}
self._gtar_traj = self._gtar_file = None
self._looping = False
super().__init__(*args, **kwargs)
@functools.lru_cache(maxsize=1)
def _get_traj(self, filename, storage):
for handle in (self._gtar_traj, self._gtar_file):
if handle is not None:
handle.close()
try:
gtar_file = gtar_traj = None
gtar_file = storage.open(
self.arguments['filename'], 'rb', on_filesystem=True)
gtar_traj = gtar.GTAR(gtar_file.name, 'r')
self._gtar_file, self._gtar_traj = gtar_file, gtar_traj
gtar_file = gtar_traj = None
finally:
for handle in (gtar_traj, gtar_file):
if handle is not None:
handle.close()
return self._gtar_traj
def run(self, scope, storage):
"""Load records found in a getar file into the scope."""
scope['filename'] = self.arguments['filename']
scope['frame'] = self.arguments['frame']
scope['cache_key'] = scope['filename'], scope['frame']
gtar_traj = self._get_traj(self.arguments['filename'], storage)
self._cache_record_frames(gtar_traj, scope, storage, self.arguments['group'])
if self.arguments['loop_frames'] and not self._looping:
self._looping, self.arguments['loop_frames'] = True, False
try:
# loop over all frames except the last one
frame_max = len(self._cached_frame_indices)
frames = range(self.arguments['frame'], frame_max - 1)
for frame in frames:
self.arguments['frame'] = frame
scope['workflow'].run()
finally:
self._looping, self.arguments['loop_frames'] = False, True
# now run the last frame
self.arguments['frame'] = max(0, len(self._cached_frame_indices) - 1)
recs = self._set_record_frames()
for rec in recs:
callback = functools.partial(gtar_traj.getRecord, rec, rec.getIndex())
scope.set_call(rec.getName(), callback)
def _cache_record_frames(self, traj, scope, storage, group):
self._cached_record_frames = {}
for rec in traj.getRecordTypes():
# ignore per-bond records and so on, for now
if rec.getGroup() != group:
continue
self._cached_record_frames[rec] = list(map(
index_sort_key, traj.queryFrames(rec)))
if rec.getBehavior() == gtar.Behavior.Constant:
callback = functools.partial(traj.getRecord, rec, rec.getIndex())
scope.set_call(rec.getName(), callback)
try:
(_, self._cached_frame_indices) = max(
(len(indices), indices) for (rec, indices) in
self._cached_record_frames.items() if
rec.getBehavior() == gtar.Behavior.Discrete)
except ValueError: # empty sequence
self._cached_frame_indices = []
def _set_record_frames(self):
frame = self.arguments['frame']
if not self._cached_frame_indices:
return []
index_to_find = index_sort_key(self._cached_frame_indices[self.arguments['frame']][1])
self.arg_specifications['frame'].valid_values = flowws.Range(
0, len(self._cached_frame_indices), (True, False))
for (rec, indices) in self._cached_record_frames.items():
try:
index = find_le(indices, index_to_find)[1]
rec.setIndex(index)
except ValueError:
pass
return self._cached_record_frames.keys()