# --------------------------------------------------------------------------------------
# Copyright 2016, Benedikt J. Daurer, Filipe R.N.C. Maia, Max F. Hantke, Carl Nettelblad
# Hummingbird is distributed under the terms of the Simplified BSD License.
# -------------------------------------------------------------------------
"""A plotting module for correlations and maps"""
import numpy as np
import ipc
from scipy.sparse import lil_matrix
from backend import Record
_existingPlots = {}
# Private classes / helper functions
# ----------------------------------
class _MeanMap:
def __init__(self, name, xmin, xmax, ymin, ymax, step, localRadius, overviewStep, xlabel, ylabel, group=None):
# Initialize local map
self.localRadius = localRadius / float(step)
self.step = step
self.xrange = np.linspace(xmin, xmax, (xmax-xmin)/float(step)+1)
self.yrange = np.linspace(ymin, ymax, (ymax-ymin)/float(step)+1)
self.Nx = self.xrange.shape[0]
self.Ny = self.yrange.shape[0]
self.localXmin = self.xrange[self.Nx/2-self.localRadius]
self.localXmax = self.xrange[self.Nx/2+self.localRadius]
self.localYmin = self.yrange[self.Ny/2-self.localRadius]
self.localYmax = self.yrange[self.Ny/2+self.localRadius]
self.sparseSum = lil_matrix((self.Ny, self.Nx), dtype=np.float32)
self.sparseNorm = lil_matrix((self.Ny, self.Nx), dtype=np.float32)
self.localMap = np.zeros((2*self.localRadius+1, 2*self.localRadius+1))
# Initialize overview map
self.overviewXrange = np.linspace(xmin, xmax, (xmax-xmin)/float(overviewStep))
self.overviewYrange = np.linspace(ymin, ymax, (ymax-ymin)/float(overviewStep))
overviewNx = self.overviewXrange.shape[0]
overviewNy = self.overviewYrange.shape[0]
self.overviewMap = np.zeros((overviewNy, overviewNx))
# Initialize plots
self.counter = 0
ipc.broadcast.init_data(name+' -> Overview', data_type='image', history_length=1, flipy=True, \
xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax, xlabel=xlabel, ylabel=ylabel, group=group)
ipc.broadcast.init_data(name+' -> Local', data_type='image', history_length=1, flipy=True, \
xmin=self.localXmin, xmax=self.localXmax, \
ymin=self.localYmin, ymax=self.localYmax, xlabel=xlabel, ylabel=ylabel, group=group)
def append(self, X, Y, Z, N):
try:
N = N.data
except AttributeError:
pass
self.sparseSum[abs(self.yrange - Y.data).argmin(), abs(self.xrange - X.data).argmin()] += Z.data
self.sparseNorm[abs(self.yrange - Y.data).argmin(), abs(self.xrange - X.data).argmin()] += N
self.overviewMap[abs(self.overviewYrange - Y.data).argmin(), abs(self.overviewXrange - X.data).argmin()] += 1
self.counter += 1
def updateCenter(self, X, Y):
self.center = (abs(self.yrange - Y.data).argmin(), abs(self.xrange - X.data).argmin())
def updateLocalLimits(self):
self.localXmin = max(self.xrange[max(self.center[1]-self.localRadius, 0)], self.xrange.min())
self.localXmax = min(self.xrange[min(self.center[1]+self.localRadius, self.Nx-1)], self.xrange.max())
self.localYmin = max(self.yrange[max(self.center[0]-self.localRadius, 0)], self.yrange.min())
self.localYmax = min(self.yrange[min(self.center[0]+self.localRadius, self.Ny-1)], self.yrange.max())
def gatherSumsAndNorms(self):
if(ipc.mpi.slaves_comm):
sparseSums = ipc.mpi.slaves_comm.gather(self.sparseSum)
sparseNorms = ipc.mpi.slaves_comm.gather(self.sparseNorm)
if(ipc.mpi.is_main_slave()):
self.sparseSum = sparseSums[0]
self.sparseNorm = sparseNorms[0]
for i in sparseSums[1:]:
self.sparseSum += i
for n in sparseNorms[1:]:
self.sparseNorm += n
def gatherOverview(self):
ipc.mpi.sum(self.overviewMap)
def updateLocalMap(self):
r = int(self.localRadius)
c = self.center
self.localSum = self.sparseSum[c[0]-r: c[0]+r+1, c[1]-r:c[1]+r+1].toarray()
self.localNorm = self.sparseNorm[c[0]-r: c[0]+r+1, c[1]-r:c[1]+r+1].toarray()
visited = self.localNorm != 0
self.localMap[visited] = self.localSum[visited] / self.localNorm[visited]
def updateOverviewMap(self, X,Y):
current = (abs(self.overviewYrange - Y.data).argmin(), abs(self.overviewXrange - X.data).argmin())
visited = self.overviewMap[()] != 0
self.overviewMap[visited] = 1
self.overviewMap[current] = 2
# Public Plotting functions - Put new plotting functions here!
# ------------------------------------------------------------
#meanMaps = {}
[docs]def plotMeanMapDynamic(X, Y, Z, norm=1., msg='', update=100, xmin=0, xmax=100, ymin=0, ymax=100, step=10, \
localRadius=100, overviewStep=100, xlabel=None, ylabel=None, name=None, group=None):
"""Plotting the mean of parameter Z as a function of parameters X and Y.
(Using a buffer in the backend).
Args:
:X(Record): An event parameter e.g. Motor position in X
:Y(Record): An event parameter e.g. Motor position in Y
:Z(Record): An event parameter e.g. Intensity
Kwargs:
:norm(int): Z is normalized by a given value, e.g. gmd (default = 1)
:msg (str): A message to be displayed in the plot
:update (int): After how many new data points, an update is send to the frontend (default = 100)
:xmin (int): (default = 0)
:xmax (int): (default = 100)
:ymin (int): (default = 0)
:ymax (int): (default = 100)
:step (int): The resolution of the map in units of x,y (default = 10)
:xlabel (str): (default = X.name)
:ylabel (str): (default = Y.name)
:localRadius (int): The radius of a square neighborehood around the current position (X.data, Y.data) (default = 100)
:overviewStep (int): The resolution of the overiew map (default = 100)
"""
if name is None:
name = "%s(%s,%s)" %(Z.name, X.name, Y.name)
if (not name in _existingPlots):
if xlabel is None: xlabel = X.name
if ylabel is None: ylabel = Y.name
_existingPlots[name] = _MeanMap(name, xmin, xmax, ymin, ymax, step, localRadius, overviewStep, xlabel, ylabel, group=group)
m = existingPlots[name]
m.append(X, Y, Z, norm)
if(not m.counter % update):
m.gatherSumsAndNorms()
m.gatherOverview()
if(ipc.mpi.is_main_worker()):
m.updateCenter(X, Y)
m.updateLocalLimits()
m.updateLocalMap()
m.updateOverviewMap(X,Y)
print m.localXmin, m.localXmax, m.localYmin, m.localYmax
ipc.new_data(name+' -> Local', m.localMap, msg=msg, \
xmin=m.localXmin, xmax=m.localXmax, ymin=m.localYmin, ymax=m.localYmax)
ipc.new_data(name+' -> Overview', m.overviewMap)
correlations = {}
xArray = []
yArray = []
[docs]def plotCorrelation(X, Y, history=100, name=None, group=None):
"""Plotting the correlation of two parameters X and Y over time.
(Using a buffer in the backend).
Args:
:X(Record): An event parameter, e.g. hit rate
:Y(Record): An event parameter, e.g. some motor position
Kwargs:
:history(int): Buffer length
"""
if name is None:
name = "Corr(%s,%s)" %(X.name, Y.name)
if (not name in _existingPlots):
ipc.broadcast.init_data(name, history_length=100, group=group)
_existingPlots[name] = True
x,y = (X.data, Y.data)
xArray.append(x)
yArray.append(y)
correlation = x*y/(np.mean(xArray)*np.mean(yArray))
ipc.new_data(name, correlation)
[docs]def plotHeatmap(X, Y, xmin=0, xmax=1, xbins=10, ymin=0, ymax=1, ybins=10, name=None, group=None):
"""Plotting the heatmap of two parameters X and Y. Has been tested in MPI mode.
(Using a buffer in the backend).
Args:
:X(Record): An event parameter, e.g. hit rate
:Y(Record): An event parameter, e.g. some motor position
Kwargs:
:xmin(int): default = 0
:xmax(int): default = 1
:xbins(int): default = 10
:ymin(int): default = 0
:ymax(int): default = 1
:ybins(int): default = 10
"""
if name is None:
name = "Heatmap(%s,%s)" %(X.name, Y.name)
if not(name in _existingPlots):
# initiate (y, x) in 2D array to get correct orientation of image
_existingPlots[name] = np.zeros((ybins, xbins), dtype=int)
ipc.broadcast.init_data(name, data_type="image", group=group)
deltaX = (xmax - float(xmin))/xbins
deltaY = (ymax - float(ymin))/ybins
nx = np.ceil((X.data - xmin)/deltaX)
if (nx < 0):
nx = 0
elif (nx >= xbins):
nx = xbins - 1
ny = np.ceil((Y.data - ymin)/deltaY)
if (ny < 0):
ny = 0
elif (ny >= ybins):
ny = ybins - 1
# assign y to row and x to col in 2D array
_existingPlots[name][ny, nx] += 1
current_heatmap = np.copy(heatmaps[name])
ipc.mpi.sum(current_heatmap)
if ipc.mpi.is_main_worker():
ipc.new_data(name, current_heatmap[()])
[docs]def plotMeanMap(X,Y,Z, xmin=0, xmax=10, xbins=10, ymin=0, ymax=10, ybins=10, xlabel=None, ylabel=None, msg='', dynamic_extent=False, initial_reset=False, name=None, group=None):
"""Plotting the meanmap of Z as a function of two parameters X and Y.
(No buffer in the backend).
Args:
:X(Record,float): An event parameter, e.g. injector position in x
:Y(Record,float): An event parameter, e.g. injector position in y
:Z(Record,float): Some metric, e.g. hit rate, size, etc...
Kwargs:
:xmin(int): default = 0
:xmax(int): default = 10
:xbins(int): default = 10
:ymin(int): default = 0
:ymax(int): default = 10
:ybins(int): default = 10
:name(str): The key that appears in the interface (default = MeanMap(X.name, Y.name))
:xlabel(str):
:ylabel(str):
:msg(msg): Any message to be displayed in the plot
"""
if name is None:
name = "MeanMap(%s,%s,%s)" % (X.name, Y.name, Z.name)
if (not name in _existingPlots):
if xlabel is None: xlabel = X.name
if ylabel is None: ylabel = Y.name
ipc.broadcast.init_data(name, data_type='triple', history_length=1,
xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax,
xbins=xbins, ybins=ybins,
xlabel=xlabel, ylabel=ylabel, flipy=True,
dynamic_extent=dynamic_extent, initial_reset=initial_reset,
group=group)
_existingPlots[name] = True
x = X if not isinstance(X, Record) else X.data
y = Y if not isinstance(Y, Record) else Y.data
z = Z if not isinstance(Z, Record) else Z.data
ipc.new_data(name, np.array([x, y, z]), msg=msg)
[docs]def plotScatter(X,Y, name=None, history=100, xlabel=None, ylabel=None, group=None):
"""Plotting the scatter of two parameters X and Y.
(No buffer in the backend).
Args:
:X(Record): An event parameter, e.g. injector position in x
:Y(Record): An event parameter, e.g. injector position in y
Kwargs:
:name(str): The key that appears in the interface (default = MeanMap(X.name, Y.name))
:xlabel(str):
:ylabel(str):
"""
if name is None:
name = "Scatter(%s,%s)" %(X.name, Y.name)
if (not name in _existingPlots):
if xlabel is None: xlabel = X.name
if ylabel is None: ylabel = Y.name
ipc.broadcast.init_data(name, data_type='tuple', history_length=history,
xlabel=xlabel, ylabel=ylabel, group=group)
_existingPlots[name] = True
ipc.new_data(name, np.array([X.data, Y.data]))
[docs]def plotScatterBg(X,Y, name=None, history=100, xlabel=None, ylabel=None, bg_filename=None, bg_xmin=0., bg_xmax=1., bg_ymin=0., bg_ymax=0., bg_angle=0., group=None):
"""Plotting the scatter of two parameters X and Y.
"""
if name is None:
name = "ScatterBg(%s,%s)" %(X.name, Y.name)
if (not name in _existingPlots):
if xlabel is None: xlabel = X.name
if ylabel is None: ylabel = Y.name
ipc.broadcast.init_data(name, data_type='tuple', history_length=history,
xlabel=xlabel, ylabel=ylabel,
bg_filename=bg_filename,
bg_xmin=bg_xmin, bg_xmax=bg_xmax,
bg_ymin=bg_ymin, bg_ymax=bg_ymax,
bg_angle=bg_angle, group=group)
_existingPlots[name] = True
ipc.new_data(name, np.array([X.data, Y.data]))
[docs]def plotScatterColor(X,Y,Z, name=None, history=100, xlabel=None, ylabel=None, zlabel=None, vmin=None, vmax=None, group=None):
"""Plotting the scatter of two parameters X and Y and use Z for color.
(No buffer in the backend).
Args:
:X(Record): An event parameter, e.g. injector position in x
:Y(Record): An event parameter, e.g. injector position in y
:Z(Record): An event parameter, e.g. injector position in z
Kwargs:
:name(str): The key that appears in the interface (default = MeanMap(X.name, Y.name))
:xlabel(str):
:ylabel(str):
"""
if name is None:
name = "ScatterColor(%s,%s)" %(X.name, Y.name)
if (not name in _existingPlots):
if xlabel is None: xlabel = X.name
if ylabel is None: ylabel = Y.name
if zlabel is None: zlabel = Z.name
if vmin is None: vmin = 0
if vmax is None: vmax = 1
ipc.broadcast.init_data(name, data_type='triple', history_length=history,
xlabel=xlabel, ylabel=ylabel, zlabel=zlabel,
vmin=vmin, vmax=vmax, group=group)
_existingPlots[name] = True
ipc.new_data(name, np.array([X.data, Y.data, Z.data]))