''' Serverqueue where jobs can be submitted. Jobs will be calculated on the spot or passed on to the OAR cluster if so specified in the configfile. Jobs can be submitted in a json dictionary. The keyword 'command' and 'configfilename' supply a string with the command and the path to the configfile. Everything else is assumed to be an override in the configfile. If an override cannot be parsed the job will start anyway without the override. The processingqueue cannot be interrupted. ''' import socket import threading import SocketServer import time import sys import binoculars.util, binoculars.main import traceback import re import json import os import Queue class ProcessTCPHandler(SocketServer.BaseRequestHandler): def handle(self): input = self.request.recv(1024) if input.startswith('test'): print 'Recieved test request' self.request.sendall('Connection succesful') else: try: job = json.loads(input) parsed, result = parse_job(job) if parsed: print 'Recieved command: {0}. Job is added to queue.\nNumber of jobs left in queue: {1}'.format(job['command'], self.server.q.qsize()) response = 'Job added to queue' self.server.q.put(job) else: response = result except: print 'Could not parse the job: {0}'.format(input) print traceback.format_exc() response = 'Error: Job could not be added to queue' finally: self.request.sendall(response) def parse_job(job): try: overrides = [] for key in job.keys(): if not key in ['command', 'configfilename']: section_key, value = job[key].split('=') section, key = section_key.split(':') overrides.append((section, key, value)) return True, overrides except: message = 'Error parsing the configuration options. {0}'.format(job) return False, message def process(run_event, ip, port, q): while run_event.is_set(): if q.empty(): time.sleep(1) else: job = q.get() # assume everything in the list is an override except for command and configfilename command = str(job['command']) configfilename = job['configfilename'] overrides = parse_job(job)[1]# [1] are the succesfully parsed jobs print 'Start processing: {0}'.format(command) try: configobj = binoculars.util.ConfigFile.fromtxtfile(configfilename, overrides = overrides) if binoculars.util.parse_bool(configobj.dispatcher['send_to_gui']): configobj.dispatcher['host'] = ip configobj.dispatcher['port'] = port binoculars.main.Main.from_object(configobj, [command]) print 'Succesfully finished processing: {0}.'.format(command) except Exception as exc: errorfilename = 'error_{0}.txt'.format(command) print 'An error occured for scan {0}. For more information see {1}'.format(command, errorfilename) with open(errorfilename, 'w') as fp: traceback.print_exc(file = fp) finally: print 'Number of jobs left in queue: {0}'.format(q.qsize()) if __name__ == '__main__': if len(sys.argv) > 1: ip = sys.argv[1] port = sys.argv[2] else: ip = None port = None q = Queue.Queue() binoculars.util.register_python_executable(os.path.join(os.path.dirname(__file__), 'binoculars.py')) HOST, PORT = socket.gethostbyname(socket.gethostname()), 0 run_event = threading.Event() run_event.set() process_thread = threading.Thread(target=process, args = (run_event, ip, port, q)) process_thread.start() server = SocketServer.TCPServer((HOST, PORT), ProcessTCPHandler) server.q = q ip, port = server.server_address print 'Process server started running at ip {0} and port {1}. Interrupt server with Ctrl-C'.format(ip, port) try: server.serve_forever() except KeyboardInterrupt: run_event.clear() process_thread.join()