Source code for flowws_analysis.ViewQt


import argparse
import contextlib
import functools
import hashlib
import importlib
import json
import logging
import os
import signal
import threading
import traceback
import queue

import flowws
from flowws import Argument as Arg
import Qt
from Qt import QtCore, QtGui, QtWidgets

logger = logging.getLogger(__name__)

def _bool_checkbox_helper(f):
    remap = {
        2: True,
        0: False
    }
    try:
        # fix for old versions of pyqt
        remap[QtCore.Qt.CheckState.Checked] = True
        remap[QtCore.Qt.CheckState.Unchecked] = False
    except AttributeError:
        pass

    def result(value):
        return f(remap[value])
    return result

class ViewQtWindow(QtWidgets.QMainWindow):
    def __init__(self, exit_event, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._exit_event = exit_event
        self._settings_keys = 'None', 'None'

    def _load_state(self):
        settings = QtCore.QSettings('flowws-analysis', 'ViewQt')
        for key_id in self._settings_keys:
            state_key = 'autosave/{}/window_state'.format(key_id)
            state = settings.value(state_key, None)
            geom_key = 'autosave/{}/geometry'.format(key_id)
            geom = settings.value(geom_key)

            if state is not None:
                self.restoreState(state)
                self.restoreGeometry(geom)

                iterations = enumerate(self.centralWidget().subWindowList())
                for (i, window) in iterations:
                    geom_key = 'autosave/{}/{}/geometry'.format(key_id, i)
                    window_geom = settings.value(geom_key, None)
                    if window_geom is not None:
                        window.setGeometry(window_geom)

                return

    def _save_state(self):
        settings = QtCore.QSettings('flowws-analysis', 'ViewQt')
        state = self.saveState()
        geom = self.saveGeometry()

        for key_id in self._settings_keys:
            state_key = 'autosave/{}/window_state'.format(key_id)
            settings.setValue(state_key, state)
            geom_key = 'autosave/{}/geometry'.format(key_id)
            settings.setValue(geom_key, geom)

            for (i, window) in enumerate(self.centralWidget().subWindowList()):
                geom_key = 'autosave/{}/{}/geometry'.format(key_id, i)
                settings.setValue(geom_key, window.geometry())

    def _setup_state(self, stages, visuals):
        stage_names = [type(stage).__name__ for stage in stages]
        vis_names = [type(vis).__name__ for vis in visuals]

        vis_hash = hashlib.sha1(b'flowws-analysis.ViewQt')
        vis_hash.update(b'vis_names')
        vis_hash.update(';'.join(vis_names).encode())

        full_hash = vis_hash.copy()
        full_hash.update(b'stage_names')
        full_hash.update(';'.join(stage_names).encode())

        self._settings_keys = full_hash.hexdigest()[:32], vis_hash.hexdigest()[:32]

    def closeEvent(self, event):
        self._save_state()
        self._exit_event.set()
        super().closeEvent(event)

class ViewQtApp(QtWidgets.QApplication):
    def __init__(self, workflow, rerun_event, stage_event, exit_event,
                 visual_queue, scope_queue, display_controls, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.workflow = workflow
        self.rerun_event = rerun_event
        self.stage_event = stage_event
        self.exit_event = exit_event
        self.visual_queue = visual_queue
        self.scope_queue = scope_queue

        self._visual_cache = {}
        self._currently_refreshing = False

        self._make_widgets(display_controls)
        self._make_menu()
        self._make_timers()

    def _check_close(self):
        if self.exit_event.is_set():
            self.main_window.close()

    def _make_config_widget(self, arg, stage):
        callback = functools.partial(self._rerun, arg, stage)
        result = None

        if arg.type == int:
            result = QtWidgets.QSpinBox()
            if arg.name in stage.arguments:
                val = stage.arguments[arg.name]
            else:
                val = 0

            if arg.valid_values is not None:
                range_ = arg.valid_values
                result.setMinimum(range_.min +
                                  (not range_.inclusive[0]))
                result.setMaximum(range_.max -
                                  (not range_.inclusive[1]))
            else:
                result.setMaximum(max(1024, val*4))
            result.setValue(val)
            result.valueChanged[int].connect(callback)
        elif arg.type == float:
            result = QtWidgets.QDoubleSpinBox()
            if arg.name in stage.arguments:
                val = stage.arguments[arg.name]
            else:
                val = 0

            if arg.valid_values is not None:
                range_ = arg.valid_values
            else:
                range_ = flowws.Range(0, val*4 if val else 8, True)

            delta = range_.max - range_.min

            result.setDecimals(6)
            result.setMinimum(range_.min +
                              1e-2*delta*(not range_.inclusive[0]))
            result.setMaximum(range_.max -
                              1e-2*delta*(not range_.inclusive[1]))
            result.setStepType(QtWidgets.QAbstractSpinBox.AdaptiveDecimalStepType)
            result.setSingleStep(5e-2*delta)
            result.setValue(val)
            result.valueChanged[float].connect(callback)
        elif arg.type == str:
            if arg.valid_values is not None:
                result = QtWidgets.QComboBox()
                result.addItems(arg.valid_values)
                if arg.name in stage.arguments:
                    result.setCurrentText(stage.arguments[arg.name])
                result.currentIndexChanged[str].connect(callback)
            else:
                result = QtWidgets.QLineEdit()
                if arg.name in stage.arguments:
                    result.setText(stage.arguments[arg.name])
                result.textChanged[str].connect(callback)
        elif arg.type == bool:
            result = QtWidgets.QCheckBox()
            if arg.name in stage.arguments:
                result.setChecked(stage.arguments[arg.name])
            result.stateChanged.connect(_bool_checkbox_helper(callback))

        return result

    def _make_config_widgets(self):
        widgets = []
        for stage in self.workflow.stages:
            if isinstance(stage, ViewQt):
                continue
            groupbox = QtWidgets.QGroupBox(type(stage).__name__)
            layout = QtWidgets.QFormLayout()
            groupbox.setLayout(layout)

            for arg in stage.arg_specification_list:
                widget = self._make_config_widget(arg, stage)
                if widget is None:
                    continue

                layout.addRow(arg.name, widget)

            for (label, callback) in getattr(stage, 'gui_actions', []):
                widget = QtWidgets.QPushButton(label)
                widget.clicked.connect(
                    lambda *args, c=callback, **kwargs: c(self._last_scope, self._last_storage))
                layout.addWidget(widget)

            if layout.rowCount():
                widgets.append(groupbox)

        layout = QtWidgets.QVBoxLayout()
        for widget in widgets:
            layout.addWidget(widget)
        self.config_widget.setLayout(layout)

    def _make_menu(self):
        self.menubar = QtWidgets.QMenuBar()

        self.file_menu = self.menubar.addMenu('&File')
        save_action = self.file_menu.addAction('&Save')
        save_action.setShortcut('Ctrl+S')
        save_action.triggered.connect(self._save_json)
        self.file_menu.addAction(save_action)
        close_action = self.file_menu.addAction('&Close')
        close_action.setShortcut('Ctrl+W')
        close_action.triggered.connect(lambda *args: self.main_window.close())
        self.file_menu.addAction(close_action)

        self.view_menu = self.menubar.addMenu('&View')
        toggle_options_action = self.view_menu.addAction('Toggle options')
        toggle_options_action.triggered.connect(self._toggle_options)
        self.view_menu.addSection('Windows')
        tile_action = self.view_menu.addAction('&Tile')
        tile_action.triggered.connect(
            lambda *args: self.mdi_area.tileSubWindows())
        self.view_menu.addAction(tile_action)
        cascade_action = self.view_menu.addAction('&Cascade')
        cascade_action.triggered.connect(
            lambda *args: self.mdi_area.cascadeSubWindows())
        self.view_menu.addAction(cascade_action)
        refresh_action = self.view_menu.addAction('&Refresh')
        refresh_action.setShortcut('Ctrl+R')
        refresh_action.triggered.connect(self._refresh_windows)
        self.view_menu.addAction(refresh_action)

        self.main_window.setMenuBar(self.menubar)

    def _make_timers(self):
        self.stage_timer = QtCore.QTimer(self)
        self.stage_timer.timeout.connect(self._update_stage_config)
        self.stage_timer.start(1)

        self.visual_timer = QtCore.QTimer(self)
        self.visual_timer.timeout.connect(self._update_visuals)
        self.visual_timer.start(1)

        self.close_timer = QtCore.QTimer(self);
        self.close_timer.timeout.connect(self._check_close)
        self.close_timer.start(1)

    def _make_visuals(self):
        visuals = []
        try:
            while True:
                visuals = self.visual_queue.get_nowait()
        except queue.Empty: # skip to most recent visuals to display
            pass

        for vis in visuals:
            self._update_visual(vis)

        self.mdi_area.tileSubWindows()
        self.main_window._setup_state(self.workflow.stages, visuals)

    def _make_widgets(self, display_controls):
        self.main_window = ViewQtWindow(self.exit_event)
        self.mdi_area = QtWidgets.QMdiArea(self.main_window)
        self.config_dock = QtWidgets.QDockWidget('Options', self.main_window)
        self.config_dock.setObjectName('config_dock')
        self.scroll_widget = QtWidgets.QScrollArea(self.config_dock)
        self.config_widget = QtWidgets.QFrame(self.scroll_widget)
        self.scroll_widget.setWidget(self.config_widget)
        self.scroll_widget.setWidgetResizable(True)
        self.config_dock.setWidget(self.scroll_widget)

        self.main_window.setCentralWidget(self.mdi_area)
        self.main_window.addDockWidget(QtCore.Qt.LeftDockWidgetArea, self.config_dock)

        if not display_controls:
            self.config_dock.close()

        self._make_config_widgets()
        self._make_visuals()
        self.main_window.show()
        self.main_window._load_state()

    def _refresh_windows(self):
        self.main_window._save_state()
        self._visual_cache.clear()
        self.mdi_area.closeAllSubWindows()
        self._currently_refreshing = True
        self.rerun_event.set()

    def _rerun(self, arg, stage, value, eval_first=False):
        if eval_first:
            value = eval(value)

        stage.arguments[arg.name] = value

        self.rerun_event.set()

    def _save_json(self):
        settings = QtCore.QSettings('flowws-analysis', 'ViewQt')
        dirname = settings.value('save_menu/last_directory')
        (fname, _) = QtWidgets.QFileDialog.getSaveFileName(
            self.main_window, 'Save Workflow', dirname, filter='*.json')

        if not fname:
            return

        description = self.workflow.to_JSON()
        with open(fname, 'w') as output:
            json.dump(description, output, skipkeys=True)
        settings.setValue('save_menu/last_directory', os.path.dirname(fname))

    def _setup_mdi_subwindow(self, window):
        window.setWindowFlags(QtCore.Qt.CustomizeWindowHint |
                              QtCore.Qt.WindowMinMaxButtonsHint |
                              QtCore.Qt.WindowTitleHint)
        window.show()

    def _toggle_options(self):
        if self.config_dock.isVisible():
            self.config_dock.hide()
        else:
            self.config_dock.show()

    def _update_stage_config(self):
        if not self.stage_event.is_set():
            return

        self.stage_event.clear()
        # TODO cache some of these things instead of constantly recreating
        self._make_config_widgets()

    def _update_visual(self, vis):
        if hasattr(vis, 'draw_matplotlib'):
            from matplotlib.backends.backend_qt5agg import FigureCanvas
            from matplotlib.figure import Figure

            if vis not in self._visual_cache:
                fig = self._visual_cache[vis] = Figure(dpi=72)
                canvas = FigureCanvas(fig)
                window = self.mdi_area.addSubWindow(canvas)
                self._setup_mdi_subwindow(window)

            fig = self._visual_cache[vis]
            fig.clear()
            vis.draw_matplotlib(fig)
            fig.canvas.draw_idle()

        elif hasattr(vis, 'draw_plato'):
            import vispy, vispy.app
            vispy.app.use_app(Qt.__binding__)
            import plato.draw.vispy as draw
            basic_scene = vis.draw_plato()

            if vis not in self._visual_cache:
                canvas_kwargs = dict(config=dict(samples=4))
                scene = self._visual_cache[vis] = basic_scene.convert(
                    draw, canvas_kwargs=canvas_kwargs, clip_scale=8)
                window = self.mdi_area.addSubWindow(scene._canvas.native)
                self._setup_mdi_subwindow(window)

            vispy_scene = self._visual_cache[vis]
            should_clear = len(vispy_scene) != len(basic_scene)
            should_clear |= any(not isinstance(a, type(b)) for (a, b) in
                                zip(vispy_scene, basic_scene))
            if should_clear:
                for prim in reversed(list(vispy_scene)):
                    vispy_scene.remove_primitive(prim)
                for prim in basic_scene.convert(draw):
                    vispy_scene.add_primitive(prim)
            else:
                for (src, dest) in zip(basic_scene, vispy_scene):
                    dest.copy_from(src, True)

            for feature in (vispy_scene.enabled_features -
                            basic_scene.enabled_features):
                vispy_scene.disable(feature)
            for feature in basic_scene.enabled_features:
                config = basic_scene.get_feature_config(feature)
                vispy_scene.enable(feature, **config)

            try:
                vispy_scene.render()
            except AttributeError:
                pass

    def _update_visuals(self):
        visuals = []
        try:
            while True:
                visuals = self.visual_queue.get_nowait()
        except queue.Empty: # skip to most recent visuals to display
            pass

        for vis in visuals:
            self._update_visual(vis)

        if visuals:
            self.main_window._setup_state(self.workflow.stages, visuals)

        try:
            while True:
                (self._last_scope, self._last_storage) = self.scope_queue.get_nowait()
        except queue.Empty: # skip to most recent visuals to display
            pass
        self._last_scope['visual_objects'] = self._visual_cache

        linked_visuals = [self._visual_cache[v] for v in
                          self._last_scope.get('visual_link_rotation', [])]
        for visual in linked_visuals:
            visual.enable('link_rotation', targets=linked_visuals)

        if self._currently_refreshing:
            self._currently_refreshing = False
            self.main_window._load_state()

class RerunThread(threading.Thread):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.args = kwargs['args']

    def run(self):
        (scope, workflow, rerun_event, stage_event, exit_event, visual_queue,
         display_controls) = \
            self.args

        while True:
            try:
                rerun_event.wait(1e-2)
                if rerun_event.is_set():
                    rerun_event.clear()
                    workflow.run()
                    stage_event.set()
                    visual_queue.put(scope.get('visuals', []))
            except KeyboardInterrupt:
                exit_event.set()
            except Exception as e:
                msg = traceback.format_exc(3)
                logger.error(msg)

            if exit_event.is_set():
                break

def sigint_handler(exit_event, *args):
    exit_event.set()

[docs]@flowws.add_stage_arguments class ViewQt(flowws.Stage): """Provide an interactive view of the entire workflow using Qt. An interactive display window will be opened that displays visual results while allowing the arguments of all stages in the workflow to be modified. """ ARGS = [ Arg('controls', '-c', bool, True, help='Display controls'), ] def __init__(self, *args, **kwargs): self.workflow = None self._running_threads = None self._rerun_event = threading.Event() self._stage_event = threading.Event() self._exit_event = threading.Event() self._visual_queue = queue.Queue() self._scope_queue = queue.Queue() super().__init__(*args, **kwargs) def run(self, scope, storage): """Displays parameters and outputs for the workflow in a Qt window.""" self.workflow = scope['workflow'] scope['rerun_callback'] = self.rerun self._scope_queue.put((scope, storage)) if self._running_threads is None: our_sigint_handler = functools.partial(sigint_handler, self._exit_event) signal.signal(signal.SIGINT, our_sigint_handler) args = (scope, self.workflow, self._rerun_event, self._stage_event, self._exit_event, self._visual_queue, self.arguments['controls']) self._visual_queue.put(scope.get('visuals', [])) self._running_threads = rerun_thread = RerunThread(args=args) rerun_thread.start() app = ViewQtApp( self.workflow, self._rerun_event, self._stage_event, self._exit_event, self._visual_queue, self._scope_queue, self.arguments['controls'], []) app.exec_() rerun_thread.join() def rerun(self): self._rerun_event.set()