#!/usr/bin/env python import sys import time import json import multiprocessing import contextlib from PyTango import AttrWriteType, DevState, DispLevel, Except, ErrSeverity from PyTango.server import (Device, DeviceMeta, attribute, device_property, command, server_run) class Command(object): def __init__(self, stop=False, payload=None): self.payload = payload self.stop = stop class UfoProcess(multiprocessing.Process): def __init__(self, json): super(UfoProcess, self).__init__() self.json = json self.server, self.client = multiprocessing.Pipe(duplex=True) def log(self, s): print("Process-{}: {}".format(self.pid, s)) def run(self): from gi.repository import GLib try: self.run_ufo() except GLib.GError as e: self.log("Error: {}".format(e.message)) sys.exit(1) @contextlib.contextmanager def measure(self): self.log("Processing data ...") start = time.time() yield end = time.time() self.log("Finished in {:03f}s.".format(end - start)) def run_ufo(self): raise NotImplementedError class SingleProcess(UfoProcess): def __init__(self, json): super(SingleProcess, self).__init__(json) def run_ufo(self): from gi.repository import Ufo pm = Ufo.PluginManager() g = Ufo.TaskGraph() g.read_from_data(pm, self.json) with self.measure(): sched = Ufo.Scheduler() sched.run(g) self.server.send(0) class ContinuousProcess(UfoProcess): def __init__(self, json): super(ContinuousProcess, self).__init__(json) self.job_queue = multiprocessing.Queue() self.timeout = 30 * 60 def run_ufo(self): from gi.repository import Ufo pm = Ufo.PluginManager() sched = Ufo.Scheduler() g = None done = False while not done: if not self.server.poll(self.timeout): break command = self.server.recv() done = command.stop if command.payload: if g is None: g = Ufo.TaskGraph() g.read_from_data(pm, command.payload) else: json_data = json.loads(command.payload) nodes = {n.get_identifier(): n for n in g.get_nodes()} for task in json_data['nodes']: if 'properties' in task and task['name'] in nodes: nodes[task['name']].set_properties(**task['properties']) with self.measure(): sched.run(g) self.server.send(0) class Process(Device): __metaclass__ = DeviceMeta jobs = attribute(label="Jobs", dtype=[int], display_level=DispLevel.OPERATOR, access=AttrWriteType.READ, doc="Job IDs of all running jobs") json = attribute(label="JSON", dtype=str, display_level=DispLevel.OPERATOR, access=AttrWriteType.READ_WRITE, doc="JSON description") def init_device(self): Device.init_device(self) self._jobs = {} self._json = "{}" self.set_state(DevState.ON) def read_jobs(self): return [job_id for job_id, job in self._jobs.items() if job.is_alive()] def read_json(self): return self._json def write_json(self, json): self._json = json def get_job(self, job_id): if job_id not in self._jobs: Except.throw_exception("Unknown job ID", "Unknown job ID", "Here", ErrSeverity.ERR) return self._jobs[job_id] @command(dtype_out=int, doc_out="Job ID") def Run(self): process = SingleProcess(self._json) process.start() self._jobs[process.pid] = process return process.pid @command(dtype_in=int, doc_in="Job ID", dtype_out=bool, doc_out="True if still running") def Running(self, job_id): return self.get_job(job_id).is_alive() @command(dtype_in=int, doc_in="Job ID", dtype_out=int, doc_out="Exit code of process") def ExitCode(self, job_id): code = self.get_job(job_id).exitcode return -42 if code is None else code @command(dtype_out=int, doc_out="Job ID") def RunContinuous(self): process = ContinuousProcess(self._json) process.start() self._jobs[process.pid] = process return process.pid @command(dtype_in=int, doc_in="Job ID") def Continue(self, job_id): job = self.get_job(job_id) job.client.send(Command(payload=self._json)) @command(dtype_in=int, doc_in="Job ID") def Wait(self, job_id): job = self.get_job(job_id) job.client.recv() @command(dtype_in=int, doc_in="Job ID") def Stop(self, job_id): job = self.get_job(job_id) job.client.send(Command(stop=True)) if __name__ == '__main__': server_run((Process,))