Source code for ipc.mpi

# --------------------------------------------------------------------------------------
# Copyright 2016, Benedikt J. Daurer, Filipe R.N.C. Maia, Max F. Hantke, Carl Nettelblad
# Hummingbird is distributed under the terms of the Simplified BSD License.
# -------------------------------------------------------------------------
"""Allows the backend and analysis to run in parallel using MPI."""
import ipc
import numpy
import numbers
import logging
import time

reducedata = {}
slavesdone = []

[docs]def is_master(): """Returns True if the process has MPI rank 0 and there are multiple processes.""" return rank == 0 and size > 1
[docs]def is_zmqserver(): """Returns True if the process has MPI rank 0, which is always the worker hosting the zmq server.""" return rank == 0
[docs]def nr_workers(): """Returns nr. of available workers.""" return (size - 1) if (size > 1) else size
[docs]def get_source(sources): """Returns source based on a given list of sources and given the rank of the current process. Slaves are distributed equally across available data sources.""" return sources[rank % len(sources)]
try: # Try to import MPI and create a group containing all the slaves from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() slave_group = comm.Get_group().Incl(range(1, size)) slaves_comm = comm.Create(slave_group) reload_comm = comm.Clone() MPI_TAG_INIT = 1 + 4353 MPI_TAG_EXPAND = 2 + 4353 MPI_TAG_READY = 3 + 4353 MPI_TAG_CLOSE = 4 + 4353 except ImportError: rank = 0 size = 1 comm = None slaves_comm = None reload_comm = None subscribed = set()
[docs]def slave_rank(): if size > 1: return rank -1 else: return 0
[docs]def is_slave(): """Returns True if the process has MPI rank > 0.""" return rank > 0
[docs]def is_main_slave(): """Returns True if the process has MPI rank == 1.""" return rank == 1
[docs]def is_worker(): """Returns True if the process is a slave or there is only one process.""" return is_slave() or size == 1
[docs]def is_main_worker(): """Returns True if the process is the main slave or there is only one process.""" return is_main_slave() or size == 1
[docs]def send(title, data): """Send a list of data items to the master node.""" if comm is not None: comm.send([title, data], 0)
[docs]def checkreload(): global subscribed if ipc.zmq() is not None: if ipc.zmq().reloadmaster == True: ipc.zmq().reloadmaster = False if reload_comm is not None: for i in xrange(1,size): reload_comm.send(['__reload__'], i) return True if is_slave(): if reload_comm.Iprobe(): msg = reload_comm.recv() if(msg[0] == '__reload__'): logging.debug('Got reload') return True elif(msg[0] == '__subscribed__'): logging.debug('Got subscribed %s' % msg[1]) subscribed = msg[1] return False
[docs]def master_loop(): """Run the main loop on the master process. It retransmits all received messages using its zmqserver and handles any possible reductions.""" status = MPI.Status() msg = comm.recv(None, MPI.ANY_SOURCE, status = status) if(msg[0] == '__data_conf__'): ipc.broadcast.data_conf.update(msg[1]) elif(msg[0] == '__reduce__'): cmd = msg[1] if(msg[2] != ()): data_y = numpy.zeros(msg[1]) else: data_y = 0 incomingdata = msg[3] getback = msg[4] source = status.Get_source() # This indicates that we really should have an object for the state if cmd not in reducedata: reducedata[cmd] = {} reducedata[cmd][source] = incomingdata if getback: cnt = 0 for data in reducedata[cmd]: data_y = data_y + reducedata[cmd][data] comm.send(data_y, source) elif(msg[0] == '__exit__'): slavesdone.append(True) logging.warning("Slave with rank = %d reports to be done" %msg[1]) if len(slavesdone) == nr_workers(): MPI.Finalize() return True else: # Inject a proper UUID msg[1][0] = ipc.uuid ipc.zmq().send(msg[0], msg[1])
[docs]def send_reduce(title, cmd, data_y, data_x, **kwds): """Reduce data and send it to the master. Not currently used, maybe should be removed.""" # Need an MPI barrier here on the slaves side # Otherwise the main_slave can block the master # while other slaves are sending data # DO NOT USE ME print "DO NOT USE send_reduce" slaves_comm.Barrier() if(is_main_slave()): # Alert master for the reduction if(isinstance(data_y, numbers.Number)): comm.send(['__reduce__', title, cmd, (), data_x, kwds, data_y], 0) else: comm.send(['__reduce__', title, cmd, data_y.shape, data_x, kwds, data_y], 0)
[docs]def sum(cmd, array): """Element-wise sum of a numpy array across all the slave processes. The result is only available in the main_slave (rank 1).""" #_reduce(array, "SUM") #return if size == 1: return if(isinstance(array, numbers.Number)): comm.send(['__reduce__', cmd, (), array, is_main_slave()], 0) else: comm.send(['__reduce__', cmd, array.shape, array, is_main_slave()], 0) if not is_main_slave(): return None databack = comm.recv(None, 0) if(isinstance(databack, numbers.Number)): array[()] = databack else: array[:] = databack[:]
[docs]def max(array): """Element-wise max of a numpy array across all the slave processes. The result is only available in the main_slave (rank 1).""" _reduce(array, "MAX")
[docs]def min(array): """Element-wise max of a numpy array across all the slave processes. The result is only available in the main_slave (rank 1).""" _reduce(array, "MIN")
[docs]def prod(array): """Element-wise product of a numpy array across all the slave processes. The result is only available in the main_slave (rank 1).""" _reduce(array, "PROD")
[docs]def logical_or(array): """Element-wise logical OR of a numpy array across all the slave processes. The result is only available in the main_slave (rank 1).""" _reduce(array, "LOR")
[docs]def logical_and(array): """Element-wise logical AND of a numpy array across all the slave processes. The result is only available in the main_worker().""" _reduce(array, "LAND")
def _reduce(array, op): """Reduce a numpy array with the given MPI op across all the slave processes""" if(not isinstance(array,numpy.ndarray)): raise TypeError("argument must be a numpy ndarray") if(slaves_comm): if(is_main_slave()): slaves_comm.Reduce(MPI.IN_PLACE, array, op=getattr(MPI,op)) else: slaves_comm.Reduce(array, None, op=getattr(MPI,op))
[docs]def slave_done(): send('__exit__', rank)