Source code for backend.worker

# --------------------------------------------------------------------------------------
# Copyright 2016, Benedikt J. Daurer, Filipe R.N.C. Maia, Max F. Hantke, Carl Nettelblad
# Hummingbird is distributed under the terms of the Simplified BSD License.
# -------------------------------------------------------------------------
"""Coordinates data reading, translation and analysis."""
import os
import logging
import imp
import ipc
import time
import signal

[docs]class Worker(object): """Coordinates data reading, translation and analysis. This is the main class of the backend of Hummingbird. It uses a light source dependent translator to read and translate the data into a common format. It then runs whatever analysis algorithms are specified in the user provided configuration file. Args: config_file (str): The configuration file to load. """ state = None conf = None def __init__(self, config_file, port): # Save the port as global variable in ipc ipc.port = port if(config_file is None): # Try to load an example configuration file config_file = os.path.abspath(os.path.dirname(__file__)+ "/../../examples/basic/dummy.py") logging.warning("No configuration file given! " "Loading example configuration from %s", (config_file)) if not os.path.isfile(config_file): raise IOError('Could not find backend configuration file %s' % (config_file)) self._config_file = config_file # self.backend_conf = imp.load_source('backend_conf', config_file) signal.signal(signal.SIGUSR1, self.raise_interruption) self.oldHandler = signal.signal(signal.SIGINT, self.ctrlcevent) self.load_conf() Worker.state['_config_file'] = config_file #Worker.state['_config_dir'] = os.path.dirname(config_file) if(not ipc.mpi.is_master()): self.translator = init_translator(Worker.state) print "MPI rank %d, pid %d" % (ipc.mpi.rank, os.getpid()) if (ipc.mpi.is_zmqserver()): try: os.environ["OMPI_COMM_WORLD_SIZE"] pid = -1 try: with open('.pid', 'r') as file: pid = int(file.read()) except: pass if not check_pid(pid): with open('.pid', 'w') as file: file.write(str(os.getppid())) except KeyError: with open('.pid', 'w') as file: file.write(str(os.getpid())) self.reloadnow = False print 'Starting backend...'
[docs] def raise_interruption(self, signum, stack): self.reloadnow = True
[docs] def load_conf(self): """Load or reload the configuration file.""" Worker.conf = imp.load_source('backend_conf', self._config_file) if(Worker.state is None): Worker.state = Worker.conf.state else: # Only copy the keys that exist in the newly loaded state for k in Worker.conf.state: Worker.state[k] = Worker.conf.state[k]
[docs] def start(self): """Start the event loop.""" Worker.state['running'] = True self.event_loop()
[docs] def ctrlcevent(self, whatSignal, stack): self.reloadnow = True signal.signal(signal.SIGINT, self.oldHandler)
[docs] def event_loop(self): """The event loop. While ``state['running']`` is True, it will get events from the translator and process them as fast as possible. """ while(True): try: while(Worker.state['running']) and not self.reloadnow: self.reloadnow = self.reloadnow or ipc.mpi.checkreload() if(ipc.mpi.is_master()): is_exiting = ipc.mpi.master_loop() if is_exiting: return else: try: evt = self.translator.next_event() if evt is None: return except (RuntimeError) as e: logging.warning("Some problem with %s (library used for translation), probably due to reloading the backend. (%s)" % (self.translator.library,e)) raise KeyboardInterrupt ipc.set_current_event(evt) try: Worker.conf.onEvent(evt) except (KeyError, TypeError) as exc: logging.warning("Missing or wrong type of data, probably due to missing event data.", exc_info=True) except (RuntimeError) as e: logging.warning("Some problem with %s (library used for translation), probably due to reloading the backend." %self.translator.library, exc_info=True) except StopIteration: logging.warning("Stopping iteration.") if 'end_of_run' in dir(Worker.conf): Worker.conf.end_of_run() ipc.mpi.slave_done() return except KeyboardInterrupt: try: print "Hit Ctrl+c again in the next second to quit..." time.sleep(1) self.reloadnow = True signal.signal(signal.SIGINT, self.ctrlcevent) except KeyboardInterrupt: print "Exiting..." break if self.reloadnow: self.reloadnow = False print "Reloading configuration file." self.load_conf() try: Worker.conf.close() except: pass signal.signal(signal.SIGINT, self.oldHandler)
def init_translator(state): """Initialize the translator, depending on the state['Facility'].""" if('Facility' not in state): raise ValueError("You need to set the 'Facility' in the configuration") elif(state['Facility'].lower() == 'lcls'): from backend.lcls import LCLSTranslator return LCLSTranslator(state) elif(state['Facility'].lower() == 'dummy'): from backend.dummy import DummyTranslator return DummyTranslator(state) else: raise ValueError('Facility %s not supported' % (state['Facility'])) def check_pid(pid): """ Check For the existence of a unix pid. """ try: os.kill(pid, 0) except OSError: return False else: return True