# --------------------------------------------------------------------------------------
# Copyright 2016, Benedikt J. Daurer, Filipe R.N.C. Maia, Max F. Hantke, Carl Nettelblad
# Hummingbird is distributed under the terms of the Simplified BSD License.
# -------------------------------------------------------------------------
"""Coordinates data reading, translation and analysis."""
import os
import logging
import imp
import ipc
import time
import signal
[docs]class Worker(object):
"""Coordinates data reading, translation and analysis.
This is the main class of the backend of Hummingbird. It uses a light source
dependent translator to read and translate the data into a common format. It
then runs whatever analysis algorithms are specified in the user provided
configuration file.
Args:
config_file (str): The configuration file to load.
"""
state = None
conf = None
def __init__(self, config_file, port):
# Save the port as global variable in ipc
ipc.port = port
if(config_file is None):
# Try to load an example configuration file
config_file = os.path.abspath(os.path.dirname(__file__)+
"/../../examples/basic/dummy.py")
logging.warning("No configuration file given! "
"Loading example configuration from %s",
(config_file))
if not os.path.isfile(config_file):
raise IOError('Could not find backend configuration file %s' % (config_file))
self._config_file = config_file
# self.backend_conf = imp.load_source('backend_conf', config_file)
signal.signal(signal.SIGUSR1, self.raise_interruption)
self.oldHandler = signal.signal(signal.SIGINT, self.ctrlcevent)
self.load_conf()
Worker.state['_config_file'] = config_file
#Worker.state['_config_dir'] = os.path.dirname(config_file)
if(not ipc.mpi.is_master()):
self.translator = init_translator(Worker.state)
print "MPI rank %d, pid %d" % (ipc.mpi.rank, os.getpid())
if (ipc.mpi.is_zmqserver()):
try:
os.environ["OMPI_COMM_WORLD_SIZE"]
pid = -1
try:
with open('.pid', 'r') as file:
pid = int(file.read())
except:
pass
if not check_pid(pid):
with open('.pid', 'w') as file: file.write(str(os.getppid()))
except KeyError:
with open('.pid', 'w') as file: file.write(str(os.getpid()))
self.reloadnow = False
print 'Starting backend...'
[docs] def raise_interruption(self, signum, stack):
self.reloadnow = True
[docs] def load_conf(self):
"""Load or reload the configuration file."""
Worker.conf = imp.load_source('backend_conf', self._config_file)
if(Worker.state is None):
Worker.state = Worker.conf.state
else:
# Only copy the keys that exist in the newly loaded state
for k in Worker.conf.state:
Worker.state[k] = Worker.conf.state[k]
[docs] def start(self):
"""Start the event loop."""
Worker.state['running'] = True
self.event_loop()
[docs] def ctrlcevent(self, whatSignal, stack):
self.reloadnow = True
signal.signal(signal.SIGINT, self.oldHandler)
[docs] def event_loop(self):
"""The event loop.
While ``state['running']`` is True, it will get events
from the translator and process them as fast as possible.
"""
while(True):
try:
while(Worker.state['running']) and not self.reloadnow:
self.reloadnow = self.reloadnow or ipc.mpi.checkreload()
if(ipc.mpi.is_master()):
is_exiting = ipc.mpi.master_loop()
if is_exiting:
return
else:
try:
evt = self.translator.next_event()
if evt is None:
return
except (RuntimeError) as e:
logging.warning("Some problem with %s (library used for translation), probably due to reloading the backend. (%s)" % (self.translator.library,e))
raise KeyboardInterrupt
ipc.set_current_event(evt)
try:
Worker.conf.onEvent(evt)
except (KeyError, TypeError) as exc:
logging.warning("Missing or wrong type of data, probably due to missing event data.", exc_info=True)
except (RuntimeError) as e:
logging.warning("Some problem with %s (library used for translation), probably due to reloading the backend." %self.translator.library, exc_info=True)
except StopIteration:
logging.warning("Stopping iteration.")
if 'end_of_run' in dir(Worker.conf):
Worker.conf.end_of_run()
ipc.mpi.slave_done()
return
except KeyboardInterrupt:
try:
print "Hit Ctrl+c again in the next second to quit..."
time.sleep(1)
self.reloadnow = True
signal.signal(signal.SIGINT, self.ctrlcevent)
except KeyboardInterrupt:
print "Exiting..."
break
if self.reloadnow:
self.reloadnow = False
print "Reloading configuration file."
self.load_conf()
try:
Worker.conf.close()
except:
pass
signal.signal(signal.SIGINT, self.oldHandler)
def init_translator(state):
"""Initialize the translator, depending on the state['Facility']."""
if('Facility' not in state):
raise ValueError("You need to set the 'Facility' in the configuration")
elif(state['Facility'].lower() == 'lcls'):
from backend.lcls import LCLSTranslator
return LCLSTranslator(state)
elif(state['Facility'].lower() == 'dummy'):
from backend.dummy import DummyTranslator
return DummyTranslator(state)
else:
raise ValueError('Facility %s not supported' % (state['Facility']))
def check_pid(pid):
""" Check For the existence of a unix pid. """
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True