import collections
import functools
import json
import os
import flowws
from flowws import Argument as Arg
import garnett
import numpy as np
GARNETT_READERS = dict(
cif=garnett.reader.CifFileReader,
dcd=garnett.reader.DCDFileReader,
gsd=garnett.reader.GSDHoomdFileReader,
pos=garnett.reader.PosFileReader,
sqlite=garnett.reader.GetarFileReader,
tar=garnett.reader.GetarFileReader,
zip=garnett.reader.GetarFileReader,
)
GARNETT_TEXT_MODES = {'cif', 'pos'}
[docs]@flowws.add_stage_arguments
class Garnett(flowws.Stage):
"""Emit the contents of a :std:doc:`garnett<garnett:index>`-readable trajectory.
The Garnett module outputs frames from a trajectory to be used for
analysis and visualization.
"""
ARGS = [
Arg('filename', '-i', str, required=True,
help='Filename to open'),
Arg('frame', '-f', int, 0,
help='Frame to load'),
Arg('loop_frames', type=bool, default=False,
help='If True, loop the workflow over frames found in the trajectory file, beginning at the given frame'),
]
def __init__(self, *args, **kwargs):
self._garnett_file = None
self._looping = False
super().__init__(*args, **kwargs)
@functools.lru_cache(maxsize=1)
def _get_traj(self, filename, storage):
if self._garnett_file is not None:
self._garnett_file.close()
try:
garnett_file = None
suffix = os.path.splitext(self.arguments['filename'])[1][1:]
reader = GARNETT_READERS[suffix]
mode = 'r' if suffix in GARNETT_TEXT_MODES else 'rb'
garnett_file = storage.open(
self.arguments['filename'], mode, on_filesystem=True)
garnett_traj = reader().read(garnett_file)
self._garnett_file = garnett_file
garnett_file = None
finally:
if garnett_file is not None:
garnett_file.close()
return garnett_traj
def run(self, scope, storage):
"""Load records found in a getar file into the scope."""
scope['filename'] = self.arguments['filename']
scope['frame'] = self.arguments['frame']
scope['cache_key'] = scope['filename'], scope['frame']
garnett_traj = self._get_traj(self.arguments['filename'], storage)
self.arg_specifications['frame'].valid_values = flowws.Range(
0, len(garnett_traj), (True, False))
frame = garnett_traj[self.arguments['frame']]
# account for changes in the garnett API around v0.7
try:
types = frame.typeid
positions = frame.position
# account for some types of frames, like those from CIF
# files, not exposing orientations and raising an
# AttributeError instead
try:
orientations = frame.orientation
except AttributeError:
orientations = None
except AttributeError:
type_map = {k: i for (i, k) in enumerate(sorted(set(frame.types)))}
types = np.array([type_map[t] for t in frame.types], dtype=np.uint32)
positions = frame.positions
try:
orientations = frame.orientations
except AttributeError:
orientations = None
try:
type_shapes = [shape.type_shape for shape in frame.shapedef.values()]
type_shapes = json.dumps(type_shapes)
scope['type_shapes.json'] = type_shapes
except AttributeError: # no shapedefs
pass
try:
scope['diameter'] = frame.diameter
except AttributeError: # no diameters
pass
scope['position'] = positions
if orientations is not None:
scope['orientation'] = orientations
scope['type'] = types
scope['box'] = frame.box.get_box_array()
scope['dimensions'] = frame.box.dimensions