Source code for ipc.broadcast

# --------------------------------------------------------------------------------------
# Copyright 2016, Benedikt J. Daurer, Filipe R.N.C. Maia, Max F. Hantke, Carl Nettelblad
# Hummingbird is distributed under the terms of the Simplified BSD License.
# -------------------------------------------------------------------------
"""Broadcasts the analysed data to be displayed in the interface."""
import numpy
import ipc
import logging
import hashlib

evt = None
data_conf = {}
sent_time = {}


[docs]def init_data(title, **kwds): """Configures the data broadcast named title. All the keyword=value pairs given will be set in the configuration dictionary for that broadcast, which are then available at the interface.""" if(title in data_conf.keys()): data_conf[title].update(kwds) else: logging.debug("Initializing source '%s.'" % title) data_conf[title] = kwds if(ipc.mpi.is_slave()): ipc.mpi.send('__data_conf__', data_conf)
def _check_type(title, data_y): """Make sure that the given broadcast already has the data_type set. If not set it appropriately.""" if(title not in data_conf): data_conf[title] = {} if('data_type' not in data_conf[title]): if(isinstance(data_y, numpy.ndarray)): # Special handling for data plots if(len(data_y.shape) == 1) or (len(data_y.shape) == 2 and data_y.shape[0] == 2): data_conf[title]['data_type'] = 'vector' # Images with more longer first dimension elif(len(data_y.shape) == 2): data_conf[title]['data_type'] = 'image' else: raise ValueError(("%dD data not supported; shape=%s" % (len(data_y.shape), data_y.shape))) else: data_conf[title]['data_type'] = 'scalar' if(ipc.mpi.is_slave()): ipc.mpi.send('__data_conf__', data_conf)
[docs]def new_data(title, data_y, mpi_reduce=False, **kwds): """Send a new data item, which will be appended to any existing values at the interface. If mpi_reduce is True data_y will be summed over all the slaves. All keywords pairs given will also be transmitted and available at the interface.""" global sent_time _check_type(title, data_y) event_id = evt.event_id() # If send_rate is given limit the send rate to it if 'send_rate' in kwds and kwds['send_rate'] is not None: send_rate = float(kwds['send_rate'])/ipc.mpi.nr_workers() cur_time = event_id if title in sent_time: send_probability = (cur_time-sent_time[title])*send_rate else: send_probability = 1 sent_time[title] = cur_time #print 'send_probability', send_probability if numpy.random.random() > send_probability: # do not send the data return if(ipc.mpi.is_slave()): if(mpi_reduce): ipc.mpi.send_reduce(title, 'new_data', data_y, event_id, **kwds) else: m = hashlib.md5() m.update(bytes(title)) if m.digest() in ipc.mpi.subscribed: ipc.mpi.send(title, [ipc.uuid, 'new_data', title, data_y, event_id, kwds]) else: logging.debug('%s not subscribed, not sending' % (title)) else: ipc.zmq().send(title, [ipc.uuid, 'new_data', title, data_y, event_id, kwds]) logging.debug("Sending data on source '%s'" % title)
[docs]def set_current_event(_evt): """Updates the current event, such that it can be accessed easily in analysis code""" ipc.broadcast.evt = _evt