diff --git a/schainpy/CHANGELOG.md b/schainpy/CHANGELOG.md index 4f4d159..c54d3f0 100644 --- a/schainpy/CHANGELOG.md +++ b/schainpy/CHANGELOG.md @@ -1,5 +1,12 @@ ## CHANGELOG: +### 3.0 +* Python 3.x compatible +* New architecture with multiprocessing and IPC communication +* Add @MPDecorator for multiprocessing Units and Operations +* Added new type of operation `external` for non-locking operations +* New plotting architecture with buffering/throttle capabilities to speed up plots + ### 2.3 * Added support for Madrigal formats (reading/writing). * Added support for reading BLTR parameters (*.sswma). diff --git a/schainpy/admin.py b/schainpy/admin.py index 7dc2984..0bb49fa 100644 --- a/schainpy/admin.py +++ b/schainpy/admin.py @@ -24,7 +24,7 @@ from email.mime.multipart import MIMEMultipart import schainpy from schainpy.utils import log -from schainpy.model.graphics.jroplot_data import popup +from schainpy.model.graphics.jroplot_base import popup def get_path(): ''' diff --git a/schainpy/controller.py b/schainpy/controller.py index 8e39197..42ad19d 100644 --- a/schainpy/controller.py +++ b/schainpy/controller.py @@ -97,9 +97,7 @@ def wait(context): receiver = c.socket(zmq.SUB) receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id)) receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode()) - log.error('startinggg') msg = receiver.recv_multipart()[1] - #log.error(msg) context.terminate() class ParameterConf(): @@ -1245,7 +1243,7 @@ class Project(Process): try: zmq.proxy(xpub, xsub) - except zmq.ContextTerminated: + except: # zmq.ContextTerminated: xpub.close() xsub.close() @@ -1260,6 +1258,6 @@ class Project(Process): # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo - log.success('{} finished (time: {}s)'.format( + log.success('{} Done (time: {}s)'.format( self.name, time.time()-self.start_time)) diff --git a/schainpy/model/data/jrodata.py b/schainpy/model/data/jrodata.py index daa22c1..39844e7 100644 --- a/schainpy/model/data/jrodata.py +++ b/schainpy/model/data/jrodata.py @@ -7,7 +7,9 @@ $Id: JROData.py 173 2012-11-20 15:06:21Z murco $ import copy import numpy import datetime +import json +from schainpy.utils import log from .jroheaderIO import SystemHeader, RadarControllerHeader @@ -79,7 +81,7 @@ def hildebrand_sekhon(data, navg): j = 0 cont = 1 - while((cont==1)and(j (rtest*sump**2)): j = j - 1 - sump = sump - sortdata[j] - sumq = sumq - sortdata[j]**2 + sump = sump - sortdata[j] + sumq = sumq - sortdata[j]**2 cont = 0 j += 1 - lnoise = sump /j + lnoise = sump / j return lnoise @@ -147,85 +149,49 @@ class JROData(GenericData): # m_ProcessingHeader = ProcessingHeader() systemHeaderObj = SystemHeader() - radarControllerHeaderObj = RadarControllerHeader() - # data = None - type = None - datatype = None # dtype but in string - # dtype = None - # nChannels = None - # nHeights = None - nProfiles = None - heightList = None - channelList = None - flagDiscontinuousBlock = False - useLocalTime = False - utctime = None - timeZone = None - dstFlag = None - errorCount = None - blocksize = None - # nCode = None -# # nBaud = None -# # code = None - flagDecodeData = False # asumo q la data no esta decodificada - flagDeflipData = False # asumo q la data no esta sin flip - flagShiftFFT = False - # ippSeconds = None - # timeInterval = None - nCohInt = None - # noise = None - windowOfFilter = 1 - # Speed of ligth C = 3e8 - frequency = 49.92e6 - realtime = False - beacon_heiIndexList = None - last_block = None - blocknow = None - azimuth = None - zenith = None - beam = Beam() - profileIndex = None + error = None + data = None + data_plt = None - error = (0, '') def __str__(self): @@ -395,53 +361,29 @@ class Voltage(JROData): ''' self.useLocalTime = True - self.radarControllerHeaderObj = RadarControllerHeader() - self.systemHeaderObj = SystemHeader() - self.type = "Voltage" - self.data = None - # self.dtype = None - # self.nChannels = 0 - # self.nHeights = 0 - self.nProfiles = None - - self.heightList = None - + self.heightList = Non self.channelList = None - # self.channelIndexList = None - self.flagNoData = True - self.flagDiscontinuousBlock = False - self.utctime = None - self.timeZone = None - self.dstFlag = None - self.errorCount = None - self.nCohInt = None - self.blocksize = None - self.flagDecodeData = False # asumo q la data no esta decodificada - self.flagDeflipData = False # asumo q la data no esta sin flip - self.flagShiftFFT = False - self.flagDataAsBlock = False # Asumo que la data es leida perfil a perfil - self.profileIndex = 0 def getNoisebyHildebrand(self, channel=None): @@ -505,93 +447,53 @@ class Spectra(JROData): # data spc es un numpy array de 2 dmensiones (canales, perfiles, alturas) data_spc = None - # data cspc es un numpy array de 2 dmensiones (canales, pares, alturas) data_cspc = None - # data dc es un numpy array de 2 dmensiones (canales, alturas) data_dc = None - # data power data_pwr = None - nFFTPoints = None - # nPairs = None - pairsList = None - nIncohInt = None - wavelength = None # Necesario para cacular el rango de velocidad desde la frecuencia - nCohInt = None # se requiere para determinar el valor de timeInterval - ippFactor = None - profileIndex = 0 - plotting = "spectra" - def __init__(self): ''' Constructor ''' self.useLocalTime = True - self.radarControllerHeaderObj = RadarControllerHeader() - self.systemHeaderObj = SystemHeader() - self.type = "Spectra" - # self.data = None - # self.dtype = None - # self.nChannels = 0 - # self.nHeights = 0 - self.nProfiles = None - self.heightList = None - self.channelList = None - # self.channelIndexList = None - self.pairsList = None - self.flagNoData = True - self.flagDiscontinuousBlock = False - self.utctime = None - self.nCohInt = None - self.nIncohInt = None - self.blocksize = None - self.nFFTPoints = None - self.wavelength = None - self.flagDecodeData = False # asumo q la data no esta decodificada - self.flagDeflipData = False # asumo q la data no esta sin flip - self.flagShiftFFT = False - self.ippFactor = 1 - #self.noise = None - self.beacon_heiIndexList = [] - self.noise_estimation = None def getNoisebyHildebrand(self, xmin_index=None, xmax_index=None, ymin_index=None, ymax_index=None): @@ -692,7 +594,8 @@ class Spectra(JROData): def getTimeInterval(self): - timeInterval = self.ippSeconds * self.nCohInt * self.nIncohInt * self.nProfiles * self.ippFactor + timeInterval = self.ippSeconds * self.nCohInt * \ + self.nIncohInt * self.nProfiles * self.ippFactor return timeInterval @@ -755,19 +658,12 @@ class Spectra(JROData): class SpectraHeis(Spectra): data_spc = None - data_cspc = None - data_dc = None - nFFTPoints = None - # nPairs = None - pairsList = None - nCohInt = None - nIncohInt = None def __init__(self): @@ -830,36 +726,21 @@ class SpectraHeis(Spectra): class Fits(JROData): heightList = None - channelList = None - flagNoData = True - flagDiscontinuousBlock = False - useLocalTime = False - utctime = None - timeZone = None - # ippSeconds = None - # timeInterval = None - nCohInt = None - nIncohInt = None - noise = None - windowOfFilter = 1 - # Speed of ligth C = 3e8 - frequency = 49.92e6 - realtime = False def __init__(self): @@ -978,33 +859,19 @@ class Fits(JROData): class Correlation(JROData): noise = None - SNR = None - #-------------------------------------------------- - mode = None - split = False - data_cf = None - lags = None - lagRange = None - pairsList = None - normFactor = None - #-------------------------------------------------- - # calculateVelocity = None - nLags = None - nPairs = None - nAvg = None def __init__(self): @@ -1068,7 +935,8 @@ class Correlation(JROData): ind_vel = numpy.array([-2, -1, 1, 2]) + freq_dc if ind_vel[0] < 0: - ind_vel[list(range(0, 1))] = ind_vel[list(range(0, 1))] + self.num_prof + ind_vel[list(range(0, 1))] = ind_vel[list( + range(0, 1))] + self.num_prof if mode == 1: jspectra[:, freq_dc, :] = ( @@ -1154,55 +1022,30 @@ class Correlation(JROData): class Parameters(Spectra): experimentInfo = None # Information about the experiment - # Information from previous data - inputUnit = None # Type of data to be processed - operation = None # Type of operation to parametrize - # normFactor = None #Normalization Factor - groupList = None # List of Pairs, Groups, etc - # Parameters - data_param = None # Parameters obtained - data_pre = None # Data Pre Parametrization - data_SNR = None # Signal to Noise Ratio - # heightRange = None #Heights - abscissaList = None # Abscissa, can be velocities, lags or time - # noise = None #Noise Potency - utctimeInit = None # Initial UTC time - paramInterval = None # Time interval to calculate Parameters in seconds - useLocalTime = True - # Fitting - data_error = None # Error of the estimation - constants = None - library = None - # Output signal - outputInterval = None # Time interval to calculate output signal in seconds - data_output = None # Out signal - nAvg = None - noise_estimation = None - GauSPC = None # Fit gaussian SPC def __init__(self): @@ -1248,4 +1091,252 @@ class Parameters(Spectra): return self.spc_noise timeInterval = property(getTimeInterval) - noise = property(getNoise, setValue, "I'm the 'Noise' property.") \ No newline at end of file + noise = property(getNoise, setValue, "I'm the 'Noise' property.") + + +class PlotterData(object): + ''' + Object to hold data to be plotted + ''' + + MAXNUMX = 100 + MAXNUMY = 100 + + def __init__(self, code, throttle_value, exp_code, buffering=True): + + self.throttle = throttle_value + self.exp_code = exp_code + self.buffering = buffering + self.ready = False + self.localtime = False + self.data = {} + self.meta = {} + self.__times = [] + self.__heights = [] + + if 'snr' in code: + self.plottypes = ['snr'] + elif code == 'spc': + self.plottypes = ['spc', 'noise', 'rti'] + elif code == 'rti': + self.plottypes = ['noise', 'rti'] + else: + self.plottypes = [code] + + for plot in self.plottypes: + self.data[plot] = {} + + def __str__(self): + dum = ['{}{}'.format(key, self.shape(key)) for key in self.data] + return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times)) + + def __len__(self): + return len(self.__times) + + def __getitem__(self, key): + + if key not in self.data: + raise KeyError(log.error('Missing key: {}'.format(key))) + if 'spc' in key or not self.buffering: + ret = self.data[key] + else: + ret = numpy.array([self.data[key][x] for x in self.times]) + if ret.ndim > 1: + ret = numpy.swapaxes(ret, 0, 1) + return ret + + def __contains__(self, key): + return key in self.data + + def setup(self): + ''' + Configure object + ''' + + self.type = '' + self.ready = False + self.data = {} + self.__times = [] + self.__heights = [] + self.__all_heights = set() + for plot in self.plottypes: + if 'snr' in plot: + plot = 'snr' + self.data[plot] = {} + + if 'spc' in self.data or 'rti' in self.data: + self.data['noise'] = {} + if 'noise' not in self.plottypes: + self.plottypes.append('noise') + + def shape(self, key): + ''' + Get the shape of the one-element data for the given key + ''' + + if len(self.data[key]): + if 'spc' in key or not self.buffering: + return self.data[key].shape + return self.data[key][self.__times[0]].shape + return (0,) + + def update(self, dataOut, tm): + ''' + Update data object with new dataOut + ''' + + if tm in self.__times: + return + + self.type = dataOut.type + self.parameters = getattr(dataOut, 'parameters', []) + if hasattr(dataOut, 'pairsList'): + self.pairs = dataOut.pairsList + if hasattr(dataOut, 'meta'): + self.meta = dataOut.meta + self.channels = dataOut.channelList + self.interval = dataOut.getTimeInterval() + self.localtime = dataOut.useLocalTime + if 'spc' in self.plottypes or 'cspc' in self.plottypes: + self.xrange = (dataOut.getFreqRange(1)/1000., + dataOut.getAcfRange(1), dataOut.getVelRange(1)) + self.__heights.append(dataOut.heightList) + self.__all_heights.update(dataOut.heightList) + self.__times.append(tm) + + for plot in self.plottypes: + if plot == 'spc': + z = dataOut.data_spc/dataOut.normFactor + buffer = 10*numpy.log10(z) + if plot == 'cspc': + buffer = dataOut.data_cspc + if plot == 'noise': + buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor) + if plot == 'rti': + buffer = dataOut.getPower() + if plot == 'snr_db': + buffer = dataOut.data_SNR + if plot == 'snr': + buffer = 10*numpy.log10(dataOut.data_SNR) + if plot == 'dop': + buffer = 10*numpy.log10(dataOut.data_DOP) + if plot == 'mean': + buffer = dataOut.data_MEAN + if plot == 'std': + buffer = dataOut.data_STD + if plot == 'coh': + buffer = dataOut.getCoherence() + if plot == 'phase': + buffer = dataOut.getCoherence(phase=True) + if plot == 'output': + buffer = dataOut.data_output + if plot == 'param': + buffer = dataOut.data_param + + if 'spc' in plot: + self.data[plot] = buffer + else: + if self.buffering: + self.data[plot][tm] = buffer + else: + self.data[plot] = buffer + + def normalize_heights(self): + ''' + Ensure same-dimension of the data for different heighList + ''' + + H = numpy.array(list(self.__all_heights)) + H.sort() + for key in self.data: + shape = self.shape(key)[:-1] + H.shape + for tm, obj in list(self.data[key].items()): + h = self.__heights[self.__times.index(tm)] + if H.size == h.size: + continue + index = numpy.where(numpy.in1d(H, h))[0] + dummy = numpy.zeros(shape) + numpy.nan + if len(shape) == 2: + dummy[:, index] = obj + else: + dummy[index] = obj + self.data[key][tm] = dummy + + self.__heights = [H for tm in self.__times] + + def jsonify(self, decimate=False): + ''' + Convert data to json + ''' + + data = {} + tm = self.times[-1] + dy = int(self.heights.size/self.MAXNUMY) + 1 + for key in self.data: + if key in ('spc', 'cspc') or not self.buffering: + dx = int(self.data[key].shape[1]/self.MAXNUMX) + 1 + data[key] = self.roundFloats( + self.data[key][::, ::dx, ::dy].tolist()) + else: + data[key] = self.roundFloats(self.data[key][tm].tolist()) + + ret = {'data': data} + ret['exp_code'] = self.exp_code + ret['time'] = float(tm) + ret['interval'] = float(self.interval) + ret['localtime'] = self.localtime + ret['yrange'] = self.roundFloats(self.heights[::dy].tolist()) + if 'spc' in self.data or 'cspc' in self.data: + ret['xrange'] = self.roundFloats(self.xrange[2][::dx].tolist()) + else: + ret['xrange'] = [] + if hasattr(self, 'pairs'): + ret['pairs'] = [(int(p[0]), int(p[1])) for p in self.pairs] + else: + ret['pairs'] = [] + + for key, value in list(self.meta.items()): + ret[key] = value + + return json.dumps(ret) + + @property + def times(self): + ''' + Return the list of times of the current data + ''' + + ret = numpy.array(self.__times) + ret.sort() + return ret + + @property + def min_time(self): + ''' + Return the minimun time value + ''' + + return self.times[0] + + @property + def max_time(self): + ''' + Return the maximun time value + ''' + + return self.times[-1] + + @property + def heights(self): + ''' + Return the list of heights of the current data + ''' + + return numpy.array(self.__heights[-1]) + + @staticmethod + def roundFloats(obj): + if isinstance(obj, list): + return list(map(PlotterData.roundFloats, obj)) + elif isinstance(obj, float): + return round(obj, 2) diff --git a/schainpy/model/graphics/__init__.py b/schainpy/model/graphics/__init__.py index 98b5033..439b190 100644 --- a/schainpy/model/graphics/__init__.py +++ b/schainpy/model/graphics/__init__.py @@ -4,4 +4,3 @@ from .jroplot_heispectra import * from .jroplot_correlation import * from .jroplot_parameters import * from .jroplot_data import * -from .jroplotter import * diff --git a/schainpy/model/graphics/jroplot_base.py b/schainpy/model/graphics/jroplot_base.py new file mode 100644 index 0000000..fd6c1ec --- /dev/null +++ b/schainpy/model/graphics/jroplot_base.py @@ -0,0 +1,803 @@ + +import os +import sys +import zmq +import time +import datetime +from functools import wraps +import numpy +import matplotlib + +if 'BACKEND' in os.environ: + matplotlib.use(os.environ['BACKEND']) +elif 'linux' in sys.platform: + matplotlib.use("TkAgg") +elif 'darwin' in sys.platform: + matplotlib.use('TkAgg') +else: + from schainpy.utils import log + log.warning('Using default Backend="Agg"', 'INFO') + matplotlib.use('Agg') + +import matplotlib.pyplot as plt +from matplotlib.patches import Polygon +from mpl_toolkits.axes_grid1 import make_axes_locatable +from matplotlib.ticker import FuncFormatter, LinearLocator, MultipleLocator + +from schainpy.model.data.jrodata import PlotterData +from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator +from schainpy.utils import log + +jet_values = matplotlib.pyplot.get_cmap('jet', 100)(numpy.arange(100))[10:90] +blu_values = matplotlib.pyplot.get_cmap( + 'seismic_r', 20)(numpy.arange(20))[10:15] +ncmap = matplotlib.colors.LinearSegmentedColormap.from_list( + 'jro', numpy.vstack((blu_values, jet_values))) +matplotlib.pyplot.register_cmap(cmap=ncmap) + +CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis', + 'plasma', 'inferno', 'Greys', 'seismic', 'bwr', 'coolwarm')] + +EARTH_RADIUS = 6.3710e3 + + +def ll2xy(lat1, lon1, lat2, lon2): + + p = 0.017453292519943295 + a = 0.5 - numpy.cos((lat2 - lat1) * p)/2 + numpy.cos(lat1 * p) * \ + numpy.cos(lat2 * p) * (1 - numpy.cos((lon2 - lon1) * p)) / 2 + r = 12742 * numpy.arcsin(numpy.sqrt(a)) + theta = numpy.arctan2(numpy.sin((lon2-lon1)*p)*numpy.cos(lat2*p), numpy.cos(lat1*p) + * numpy.sin(lat2*p)-numpy.sin(lat1*p)*numpy.cos(lat2*p)*numpy.cos((lon2-lon1)*p)) + theta = -theta + numpy.pi/2 + return r*numpy.cos(theta), r*numpy.sin(theta) + + +def km2deg(km): + ''' + Convert distance in km to degrees + ''' + + return numpy.rad2deg(km/EARTH_RADIUS) + + +def figpause(interval): + backend = plt.rcParams['backend'] + if backend in matplotlib.rcsetup.interactive_bk: + figManager = matplotlib._pylab_helpers.Gcf.get_active() + if figManager is not None: + canvas = figManager.canvas + if canvas.figure.stale: + canvas.draw() + try: + canvas.start_event_loop(interval) + except: + pass + return + + +def popup(message): + ''' + ''' + + fig = plt.figure(figsize=(12, 8), facecolor='r') + text = '\n'.join([s.strip() for s in message.split(':')]) + fig.text(0.01, 0.5, text, ha='left', va='center', + size='20', weight='heavy', color='w') + fig.show() + figpause(1000) + + +class Throttle(object): + ''' + Decorator that prevents a function from being called more than once every + time period. + To create a function that cannot be called more than once a minute, but + will sleep until it can be called: + @Throttle(minutes=1) + def foo(): + pass + + for i in range(10): + foo() + print "This function has run %s times." % i + ''' + + def __init__(self, seconds=0, minutes=0, hours=0): + self.throttle_period = datetime.timedelta( + seconds=seconds, minutes=minutes, hours=hours + ) + + self.time_of_last_call = datetime.datetime.min + + def __call__(self, fn): + @wraps(fn) + def wrapper(*args, **kwargs): + coerce = kwargs.pop('coerce', None) + if coerce: + self.time_of_last_call = datetime.datetime.now() + return fn(*args, **kwargs) + else: + now = datetime.datetime.now() + time_since_last_call = now - self.time_of_last_call + time_left = self.throttle_period - time_since_last_call + + if time_left > datetime.timedelta(seconds=0): + return + + self.time_of_last_call = datetime.datetime.now() + return fn(*args, **kwargs) + + return wrapper + +def apply_throttle(value): + + @Throttle(seconds=value) + def fnThrottled(fn): + fn() + + return fnThrottled + +@MPDecorator +class Plotter(ProcessingUnit): + ''' + Proccessing unit to handle plot operations + ''' + + def __init__(self): + + ProcessingUnit.__init__(self) + + def setup(self, **kwargs): + + self.connections = 0 + self.web_address = kwargs.get('web_server', False) + self.realtime = kwargs.get('realtime', False) + self.localtime = kwargs.get('localtime', True) + self.buffering = kwargs.get('buffering', True) + self.throttle = kwargs.get('throttle', 2) + self.exp_code = kwargs.get('exp_code', None) + self.set_ready = apply_throttle(self.throttle) + self.dates = [] + self.data = PlotterData( + self.plots, self.throttle, self.exp_code, self.buffering) + self.isConfig = True + + def ready(self): + ''' + Set dataOut ready + ''' + + self.data.ready = True + self.dataOut.data_plt = self.data + + def run(self, realtime=True, localtime=True, buffering=True, + throttle=2, exp_code=None, web_server=None): + + if not self.isConfig: + self.setup(realtime=realtime, localtime=localtime, + buffering=buffering, throttle=throttle, exp_code=exp_code, + web_server=web_server) + + if self.web_address: + log.success( + 'Sending to web: {}'.format(self.web_address), + self.name + ) + self.context = zmq.Context() + self.sender_web = self.context.socket(zmq.REQ) + self.sender_web.connect(self.web_address) + self.poll = zmq.Poller() + self.poll.register(self.sender_web, zmq.POLLIN) + time.sleep(1) + + # t = Thread(target=self.event_monitor, args=(monitor,)) + # t.start() + + self.dataOut = self.dataIn + self.data.ready = False + + if self.dataOut.flagNoData: + coerce = True + else: + coerce = False + + if self.dataOut.type == 'Parameters': + tm = self.dataOut.utctimeInit + else: + tm = self.dataOut.utctime + if self.dataOut.useLocalTime: + if not self.localtime: + tm += time.timezone + dt = datetime.datetime.fromtimestamp(tm).date() + else: + if self.localtime: + tm -= time.timezone + dt = datetime.datetime.utcfromtimestamp(tm).date() + if dt not in self.dates: + if self.data: + self.ready() + self.data.setup() + self.dates.append(dt) + + self.data.update(self.dataOut, tm) + + if False: # TODO check when publishers ends + self.connections -= 1 + if self.connections == 0 and dt in self.dates: + self.data.ended = True + self.ready() + time.sleep(1) + else: + if self.realtime: + self.ready() + if self.web_address: + retries = 5 + while True: + self.sender_web.send(self.data.jsonify()) + socks = dict(self.poll.poll(5000)) + if socks.get(self.sender_web) == zmq.POLLIN: + reply = self.sender_web.recv_string() + if reply == 'ok': + log.log("Response from server ok", self.name) + break + else: + log.warning( + "Malformed reply from server: {}".format(reply), self.name) + + else: + log.warning( + "No response from server, retrying...", self.name) + self.sender_web.setsockopt(zmq.LINGER, 0) + self.sender_web.close() + self.poll.unregister(self.sender_web) + retries -= 1 + if retries == 0: + log.error( + "Server seems to be offline, abandoning", self.name) + self.sender_web = self.context.socket(zmq.REQ) + self.sender_web.connect(self.web_address) + self.poll.register(self.sender_web, zmq.POLLIN) + time.sleep(1) + break + self.sender_web = self.context.socket(zmq.REQ) + self.sender_web.connect(self.web_address) + self.poll.register(self.sender_web, zmq.POLLIN) + time.sleep(1) + else: + self.set_ready(self.ready, coerce=coerce) + + return + + def close(self): + pass + + +@MPDecorator +class Plot(Operation): + ''' + Base class for Schain plotting operations + ''' + + CODE = 'Figure' + colormap = 'jro' + bgcolor = 'white' + __missing = 1E30 + + __attrs__ = ['show', 'save', 'xmin', 'xmax', 'ymin', 'ymax', 'zmin', 'zmax', + 'zlimits', 'xlabel', 'ylabel', 'xaxis', 'cb_label', 'title', + 'colorbar', 'bgcolor', 'width', 'height', 'localtime', 'oneFigure', + 'showprofile', 'decimation', 'pause'] + + def __init__(self): + + Operation.__init__(self) + self.isConfig = False + self.isPlotConfig = False + + def __fmtTime(self, x, pos): + ''' + ''' + + return '{}'.format(self.getDateTime(x).strftime('%H:%M')) + + def __setup(self, **kwargs): + ''' + Initialize variables + ''' + + self.figures = [] + self.axes = [] + self.cb_axes = [] + self.localtime = kwargs.pop('localtime', True) + self.show = kwargs.get('show', True) + self.save = kwargs.get('save', False) + self.ftp = kwargs.get('ftp', False) + self.colormap = kwargs.get('colormap', self.colormap) + self.colormap_coh = kwargs.get('colormap_coh', 'jet') + self.colormap_phase = kwargs.get('colormap_phase', 'RdBu_r') + self.colormaps = kwargs.get('colormaps', None) + self.bgcolor = kwargs.get('bgcolor', self.bgcolor) + self.showprofile = kwargs.get('showprofile', False) + self.title = kwargs.get('wintitle', self.CODE.upper()) + self.cb_label = kwargs.get('cb_label', None) + self.cb_labels = kwargs.get('cb_labels', None) + self.labels = kwargs.get('labels', None) + self.xaxis = kwargs.get('xaxis', 'frequency') + self.zmin = kwargs.get('zmin', None) + self.zmax = kwargs.get('zmax', None) + self.zlimits = kwargs.get('zlimits', None) + self.xmin = kwargs.get('xmin', None) + self.xmax = kwargs.get('xmax', None) + self.xrange = kwargs.get('xrange', 12) + self.xscale = kwargs.get('xscale', None) + self.ymin = kwargs.get('ymin', None) + self.ymax = kwargs.get('ymax', None) + self.yscale = kwargs.get('yscale', None) + self.xlabel = kwargs.get('xlabel', None) + self.decimation = kwargs.get('decimation', None) + self.showSNR = kwargs.get('showSNR', False) + self.oneFigure = kwargs.get('oneFigure', True) + self.width = kwargs.get('width', None) + self.height = kwargs.get('height', None) + self.colorbar = kwargs.get('colorbar', True) + self.factors = kwargs.get('factors', [1, 1, 1, 1, 1, 1, 1, 1]) + self.channels = kwargs.get('channels', None) + self.titles = kwargs.get('titles', []) + self.polar = False + self.grid = kwargs.get('grid', False) + self.pause = kwargs.get('pause', False) + self.save_labels = kwargs.get('save_labels', None) + self.realtime = kwargs.get('realtime', True) + self.buffering = kwargs.get('buffering', True) + self.throttle = kwargs.get('throttle', 2) + self.exp_code = kwargs.get('exp_code', None) + self.__throttle_plot = apply_throttle(self.throttle) + self.data = PlotterData( + self.CODE, self.throttle, self.exp_code, self.buffering) + + def __setup_plot(self): + ''' + Common setup for all figures, here figures and axes are created + ''' + + self.setup() + + self.time_label = 'LT' if self.localtime else 'UTC' + if self.data.localtime: + self.getDateTime = datetime.datetime.fromtimestamp + else: + self.getDateTime = datetime.datetime.utcfromtimestamp + + if self.width is None: + self.width = 8 + + self.figures = [] + self.axes = [] + self.cb_axes = [] + self.pf_axes = [] + self.cmaps = [] + + size = '15%' if self.ncols == 1 else '30%' + pad = '4%' if self.ncols == 1 else '8%' + + if self.oneFigure: + if self.height is None: + self.height = 1.4 * self.nrows + 1 + fig = plt.figure(figsize=(self.width, self.height), + edgecolor='k', + facecolor='w') + self.figures.append(fig) + for n in range(self.nplots): + ax = fig.add_subplot(self.nrows, self.ncols, + n + 1, polar=self.polar) + ax.tick_params(labelsize=8) + ax.firsttime = True + ax.index = 0 + ax.press = None + self.axes.append(ax) + if self.showprofile: + cax = self.__add_axes(ax, size=size, pad=pad) + cax.tick_params(labelsize=8) + self.pf_axes.append(cax) + else: + if self.height is None: + self.height = 3 + for n in range(self.nplots): + fig = plt.figure(figsize=(self.width, self.height), + edgecolor='k', + facecolor='w') + ax = fig.add_subplot(1, 1, 1, polar=self.polar) + ax.tick_params(labelsize=8) + ax.firsttime = True + ax.index = 0 + ax.press = None + self.figures.append(fig) + self.axes.append(ax) + if self.showprofile: + cax = self.__add_axes(ax, size=size, pad=pad) + cax.tick_params(labelsize=8) + self.pf_axes.append(cax) + + for n in range(self.nrows): + if self.colormaps is not None: + cmap = plt.get_cmap(self.colormaps[n]) + else: + cmap = plt.get_cmap(self.colormap) + cmap.set_bad(self.bgcolor, 1.) + self.cmaps.append(cmap) + + for fig in self.figures: + fig.canvas.mpl_connect('key_press_event', self.OnKeyPress) + fig.canvas.mpl_connect('scroll_event', self.OnBtnScroll) + fig.canvas.mpl_connect('button_press_event', self.onBtnPress) + fig.canvas.mpl_connect('motion_notify_event', self.onMotion) + fig.canvas.mpl_connect('button_release_event', self.onBtnRelease) + if self.show: + fig.show() + + def OnKeyPress(self, event): + ''' + Event for pressing keys (up, down) change colormap + ''' + ax = event.inaxes + if ax in self.axes: + if event.key == 'down': + ax.index += 1 + elif event.key == 'up': + ax.index -= 1 + if ax.index < 0: + ax.index = len(CMAPS) - 1 + elif ax.index == len(CMAPS): + ax.index = 0 + cmap = CMAPS[ax.index] + ax.cbar.set_cmap(cmap) + ax.cbar.draw_all() + ax.plt.set_cmap(cmap) + ax.cbar.patch.figure.canvas.draw() + self.colormap = cmap.name + + def OnBtnScroll(self, event): + ''' + Event for scrolling, scale figure + ''' + cb_ax = event.inaxes + if cb_ax in [ax.cbar.ax for ax in self.axes if ax.cbar]: + ax = [ax for ax in self.axes if cb_ax == ax.cbar.ax][0] + pt = ax.cbar.ax.bbox.get_points()[:, 1] + nrm = ax.cbar.norm + vmin, vmax, p0, p1, pS = ( + nrm.vmin, nrm.vmax, pt[0], pt[1], event.y) + scale = 2 if event.step == 1 else 0.5 + point = vmin + (vmax - vmin) / (p1 - p0) * (pS - p0) + ax.cbar.norm.vmin = point - scale * (point - vmin) + ax.cbar.norm.vmax = point - scale * (point - vmax) + ax.plt.set_norm(ax.cbar.norm) + ax.cbar.draw_all() + ax.cbar.patch.figure.canvas.draw() + + def onBtnPress(self, event): + ''' + Event for mouse button press + ''' + cb_ax = event.inaxes + if cb_ax is None: + return + + if cb_ax in [ax.cbar.ax for ax in self.axes if ax.cbar]: + cb_ax.press = event.x, event.y + else: + cb_ax.press = None + + def onMotion(self, event): + ''' + Event for move inside colorbar + ''' + cb_ax = event.inaxes + if cb_ax is None: + return + if cb_ax not in [ax.cbar.ax for ax in self.axes if ax.cbar]: + return + if cb_ax.press is None: + return + + ax = [ax for ax in self.axes if cb_ax == ax.cbar.ax][0] + xprev, yprev = cb_ax.press + dx = event.x - xprev + dy = event.y - yprev + cb_ax.press = event.x, event.y + scale = ax.cbar.norm.vmax - ax.cbar.norm.vmin + perc = 0.03 + + if event.button == 1: + ax.cbar.norm.vmin -= (perc * scale) * numpy.sign(dy) + ax.cbar.norm.vmax -= (perc * scale) * numpy.sign(dy) + elif event.button == 3: + ax.cbar.norm.vmin -= (perc * scale) * numpy.sign(dy) + ax.cbar.norm.vmax += (perc * scale) * numpy.sign(dy) + + ax.cbar.draw_all() + ax.plt.set_norm(ax.cbar.norm) + ax.cbar.patch.figure.canvas.draw() + + def onBtnRelease(self, event): + ''' + Event for mouse button release + ''' + cb_ax = event.inaxes + if cb_ax is not None: + cb_ax.press = None + + def __add_axes(self, ax, size='30%', pad='8%'): + ''' + Add new axes to the given figure + ''' + divider = make_axes_locatable(ax) + nax = divider.new_horizontal(size=size, pad=pad) + ax.figure.add_axes(nax) + return nax + + def setup(self): + ''' + This method should be implemented in the child class, the following + attributes should be set: + + self.nrows: number of rows + self.ncols: number of cols + self.nplots: number of plots (channels or pairs) + self.ylabel: label for Y axes + self.titles: list of axes title + + ''' + raise NotImplementedError + + def fill_gaps(self, x_buffer, y_buffer, z_buffer): + ''' + Create a masked array for missing data + ''' + if x_buffer.shape[0] < 2: + return x_buffer, y_buffer, z_buffer + + deltas = x_buffer[1:] - x_buffer[0:-1] + x_median = numpy.median(deltas) + + index = numpy.where(deltas > 5 * x_median) + + if len(index[0]) != 0: + z_buffer[::, index[0], ::] = self.__missing + z_buffer = numpy.ma.masked_inside(z_buffer, + 0.99 * self.__missing, + 1.01 * self.__missing) + + return x_buffer, y_buffer, z_buffer + + def decimate(self): + + # dx = int(len(self.x)/self.__MAXNUMX) + 1 + dy = int(len(self.y) / self.decimation) + 1 + + # x = self.x[::dx] + x = self.x + y = self.y[::dy] + z = self.z[::, ::, ::dy] + + return x, y, z + + def format(self): + ''' + Set min and max values, labels, ticks and titles + ''' + + if self.xmin is None: + xmin = self.data.min_time + else: + if self.xaxis is 'time': + dt = self.getDateTime(self.data.min_time) + xmin = (dt.replace(hour=int(self.xmin), minute=0, second=0) - + datetime.datetime(1970, 1, 1)).total_seconds() + if self.data.localtime: + xmin += time.timezone + else: + xmin = self.xmin + + if self.xmax is None: + xmax = xmin + self.xrange * 60 * 60 + else: + if self.xaxis is 'time': + dt = self.getDateTime(self.data.max_time) + xmax = (dt.replace(hour=int(self.xmax), minute=59, second=59) - + datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=1)).total_seconds() + if self.data.localtime: + xmax += time.timezone + else: + xmax = self.xmax + + ymin = self.ymin if self.ymin else numpy.nanmin(self.y) + ymax = self.ymax if self.ymax else numpy.nanmax(self.y) + + Y = numpy.array([1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000]) + i = 1 if numpy.where( + abs(ymax-ymin) <= Y)[0][0] < 0 else numpy.where(abs(ymax-ymin) <= Y)[0][0] + ystep = Y[i] / 10. + + if self.xaxis is not 'time': + X = numpy.array([1, 2, 5, 10, 20, 50, 100, + 200, 500, 1000, 2000, 5000])/2. + i = 1 if numpy.where( + abs(xmax-xmin) <= X)[0][0] < 0 else numpy.where(abs(xmax-xmin) <= X)[0][0] + xstep = X[i] / 10. + + for n, ax in enumerate(self.axes): + if ax.firsttime: + ax.set_facecolor(self.bgcolor) + ax.yaxis.set_major_locator(MultipleLocator(ystep)) + if self.xscale: + ax.xaxis.set_major_formatter(FuncFormatter( + lambda x, pos: '{0:g}'.format(x*self.xscale))) + if self.xscale: + ax.yaxis.set_major_formatter(FuncFormatter( + lambda x, pos: '{0:g}'.format(x*self.yscale))) + if self.xaxis is 'time': + ax.xaxis.set_major_formatter(FuncFormatter(self.__fmtTime)) + ax.xaxis.set_major_locator(LinearLocator(9)) + else: + ax.xaxis.set_major_locator(MultipleLocator(xstep)) + if self.xlabel is not None: + ax.set_xlabel(self.xlabel) + ax.set_ylabel(self.ylabel) + ax.firsttime = False + if self.showprofile: + self.pf_axes[n].set_ylim(ymin, ymax) + self.pf_axes[n].set_xlim(self.zmin, self.zmax) + self.pf_axes[n].set_xlabel('dB') + self.pf_axes[n].grid(b=True, axis='x') + [tick.set_visible(False) + for tick in self.pf_axes[n].get_yticklabels()] + if self.colorbar: + ax.cbar = plt.colorbar( + ax.plt, ax=ax, fraction=0.05, pad=0.02, aspect=10) + ax.cbar.ax.tick_params(labelsize=8) + ax.cbar.ax.press = None + if self.cb_label: + ax.cbar.set_label(self.cb_label, size=8) + elif self.cb_labels: + ax.cbar.set_label(self.cb_labels[n], size=8) + else: + ax.cbar = None + if self.grid: + ax.grid(True) + + if not self.polar: + ax.set_xlim(xmin, xmax) + ax.set_ylim(ymin, ymax) + ax.set_title('{} {} {}'.format( + self.titles[n], + self.getDateTime(self.data.max_time).strftime( + '%Y-%m-%dT%H:%M:%S'), + self.time_label), + size=8) + else: + ax.set_title('{}'.format(self.titles[n]), size=8) + ax.set_ylim(0, 90) + ax.set_yticks(numpy.arange(0, 90, 20)) + ax.yaxis.labelpad = 40 + + def clear_figures(self): + ''' + Reset axes for redraw plots + ''' + + for ax in self.axes: + ax.clear() + ax.firsttime = True + if ax.cbar: + ax.cbar.remove() + + def __plot(self): + ''' + Main function to plot, format and save figures + ''' + + #try: + self.plot() + self.format() + #except Exception as e: + # log.warning('{} Plot could not be updated... check data'.format( + # self.CODE), self.name) + # log.error(str(e), '') + # return + + for n, fig in enumerate(self.figures): + if self.nrows == 0 or self.nplots == 0: + log.warning('No data', self.name) + fig.text(0.5, 0.5, 'No Data', fontsize='large', ha='center') + fig.canvas.manager.set_window_title(self.CODE) + continue + + fig.tight_layout() + fig.canvas.manager.set_window_title('{} - {}'.format(self.title, + self.getDateTime(self.data.max_time).strftime('%Y/%m/%d'))) + fig.canvas.draw() + + if self.save: + + if self.save_labels: + labels = self.save_labels + else: + labels = list(range(self.nrows)) + + if self.oneFigure: + label = '' + else: + label = '-{}'.format(labels[n]) + figname = os.path.join( + self.save, + self.CODE, + '{}{}_{}.png'.format( + self.CODE, + label, + self.getDateTime(self.data.max_time).strftime( + '%Y%m%d_%H%M%S'), + ) + ) + log.log('Saving figure: {}'.format(figname), self.name) + if not os.path.isdir(os.path.dirname(figname)): + os.makedirs(os.path.dirname(figname)) + fig.savefig(figname) + + def plot(self): + ''' + Must be defined in the child class + ''' + raise NotImplementedError + + def run(self, dataOut, **kwargs): + + if dataOut.flagNoData and not dataOut.error: + return dataOut + + if dataOut.error: + coerce = True + else: + coerce = False + + if self.isConfig is False: + self.__setup(**kwargs) + self.data.setup() + self.isConfig = True + + if dataOut.type == 'Parameters': + tm = dataOut.utctimeInit + else: + tm = dataOut.utctime + + if dataOut.useLocalTime: + if not self.localtime: + tm += time.timezone + else: + if self.localtime: + tm -= time.timezone + + if self.data and (tm - self.data.min_time) >= self.xrange*60*60: + self.__plot() + self.data.setup() + self.clear_figures() + + self.data.update(dataOut, tm) + + if self.isPlotConfig is False: + self.__setup_plot() + self.isPlotConfig = True + + if self.realtime: + self.__plot() + else: + self.__throttle_plot(self.__plot, coerce=coerce) + + figpause(0.001) + + def close(self): + + if self.data and self.pause: + figpause(10) + diff --git a/schainpy/model/graphics/jroplot_correlation.py b/schainpy/model/graphics/jroplot_correlation.py index 9dea381..37fdc5c 100644 --- a/schainpy/model/graphics/jroplot_correlation.py +++ b/schainpy/model/graphics/jroplot_correlation.py @@ -5,7 +5,7 @@ import copy from schainpy.model import * from .figure import Figure, isRealtime -class CorrelationPlot(Figure): +class CorrelationPlot_(Figure): isConfig = None __nsubplots = None diff --git a/schainpy/model/graphics/jroplot_data.py b/schainpy/model/graphics/jroplot_data.py index f5d5bd9..e85be46 100644 --- a/schainpy/model/graphics/jroplot_data.py +++ b/schainpy/model/graphics/jroplot_data.py @@ -1,41 +1,32 @@ +''' +New Plots Operations + +@author: juan.espinoza@jro.igp.gob.pe +''' + -import os import time -import glob import datetime -from multiprocessing import Process - -import zmq import numpy -import matplotlib -import matplotlib.pyplot as plt -from matplotlib.patches import Polygon -from mpl_toolkits.axes_grid1 import make_axes_locatable -from matplotlib.ticker import FuncFormatter, LinearLocator, MultipleLocator -from schainpy.model.proc.jroproc_base import Operation +from schainpy.model.graphics.jroplot_base import Plot, plt from schainpy.utils import log -jet_values = matplotlib.pyplot.get_cmap('jet', 100)(numpy.arange(100))[10:90] -blu_values = matplotlib.pyplot.get_cmap( - 'seismic_r', 20)(numpy.arange(20))[10:15] -ncmap = matplotlib.colors.LinearSegmentedColormap.from_list( - 'jro', numpy.vstack((blu_values, jet_values))) -matplotlib.pyplot.register_cmap(cmap=ncmap) - -CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis', 'plasma', 'inferno', 'Greys', 'seismic', 'bwr', 'coolwarm')] - EARTH_RADIUS = 6.3710e3 + def ll2xy(lat1, lon1, lat2, lon2): p = 0.017453292519943295 - a = 0.5 - numpy.cos((lat2 - lat1) * p)/2 + numpy.cos(lat1 * p) * numpy.cos(lat2 * p) * (1 - numpy.cos((lon2 - lon1) * p)) / 2 + a = 0.5 - numpy.cos((lat2 - lat1) * p)/2 + numpy.cos(lat1 * p) * \ + numpy.cos(lat2 * p) * (1 - numpy.cos((lon2 - lon1) * p)) / 2 r = 12742 * numpy.arcsin(numpy.sqrt(a)) - theta = numpy.arctan2(numpy.sin((lon2-lon1)*p)*numpy.cos(lat2*p), numpy.cos(lat1*p)*numpy.sin(lat2*p)-numpy.sin(lat1*p)*numpy.cos(lat2*p)*numpy.cos((lon2-lon1)*p)) + theta = numpy.arctan2(numpy.sin((lon2-lon1)*p)*numpy.cos(lat2*p), numpy.cos(lat1*p) + * numpy.sin(lat2*p)-numpy.sin(lat1*p)*numpy.cos(lat2*p)*numpy.cos((lon2-lon1)*p)) theta = -theta + numpy.pi/2 return r*numpy.cos(theta), r*numpy.sin(theta) + def km2deg(km): ''' Convert distance in km to degrees @@ -43,536 +34,8 @@ def km2deg(km): return numpy.rad2deg(km/EARTH_RADIUS) -def figpause(interval): - backend = plt.rcParams['backend'] - if backend in matplotlib.rcsetup.interactive_bk: - figManager = matplotlib._pylab_helpers.Gcf.get_active() - if figManager is not None: - canvas = figManager.canvas - if canvas.figure.stale: - canvas.draw() - try: - canvas.start_event_loop(interval) - except: - pass - return - -def popup(message): - ''' - ''' - - fig = plt.figure(figsize=(12, 8), facecolor='r') - text = '\n'.join([s.strip() for s in message.split(':')]) - fig.text(0.01, 0.5, text, ha='left', va='center', size='20', weight='heavy', color='w') - fig.show() - figpause(1000) - - -class PlotData(Operation, Process): - ''' - Base class for Schain plotting operations - ''' - - CODE = 'Figure' - colormap = 'jro' - bgcolor = 'white' - CONFLATE = False - __missing = 1E30 - - __attrs__ = ['show', 'save', 'xmin', 'xmax', 'ymin', 'ymax', 'zmin', 'zmax', - 'zlimits', 'xlabel', 'ylabel', 'xaxis','cb_label', 'title', - 'colorbar', 'bgcolor', 'width', 'height', 'localtime', 'oneFigure', - 'showprofile', 'decimation', 'ftp'] - - def __init__(self, **kwargs): - - Operation.__init__(self, plot=True, **kwargs) - Process.__init__(self) - - self.kwargs['code'] = self.CODE - self.mp = False - self.data = None - self.isConfig = False - self.figures = [] - self.axes = [] - self.cb_axes = [] - self.localtime = kwargs.pop('localtime', True) - self.show = kwargs.get('show', True) - self.save = kwargs.get('save', False) - self.ftp = kwargs.get('ftp', False) - self.colormap = kwargs.get('colormap', self.colormap) - self.colormap_coh = kwargs.get('colormap_coh', 'jet') - self.colormap_phase = kwargs.get('colormap_phase', 'RdBu_r') - self.colormaps = kwargs.get('colormaps', None) - self.bgcolor = kwargs.get('bgcolor', self.bgcolor) - self.showprofile = kwargs.get('showprofile', False) - self.title = kwargs.get('wintitle', self.CODE.upper()) - self.cb_label = kwargs.get('cb_label', None) - self.cb_labels = kwargs.get('cb_labels', None) - self.labels = kwargs.get('labels', None) - self.xaxis = kwargs.get('xaxis', 'frequency') - self.zmin = kwargs.get('zmin', None) - self.zmax = kwargs.get('zmax', None) - self.zlimits = kwargs.get('zlimits', None) - self.xmin = kwargs.get('xmin', None) - self.xmax = kwargs.get('xmax', None) - self.xrange = kwargs.get('xrange', 24) - self.xscale = kwargs.get('xscale', None) - self.ymin = kwargs.get('ymin', None) - self.ymax = kwargs.get('ymax', None) - self.yscale = kwargs.get('yscale', None) - self.xlabel = kwargs.get('xlabel', None) - self.decimation = kwargs.get('decimation', None) - self.showSNR = kwargs.get('showSNR', False) - self.oneFigure = kwargs.get('oneFigure', True) - self.width = kwargs.get('width', None) - self.height = kwargs.get('height', None) - self.colorbar = kwargs.get('colorbar', True) - self.factors = kwargs.get('factors', [1, 1, 1, 1, 1, 1, 1, 1]) - self.channels = kwargs.get('channels', None) - self.titles = kwargs.get('titles', []) - self.polar = False - self.grid = kwargs.get('grid', False) - - def __fmtTime(self, x, pos): - ''' - ''' - - return '{}'.format(self.getDateTime(x).strftime('%H:%M')) - - def __setup(self): - ''' - Common setup for all figures, here figures and axes are created - ''' - - if self.CODE not in self.data: - raise ValueError(log.error('Missing data for {}'.format(self.CODE), - self.name)) - - self.setup() - - self.time_label = 'LT' if self.localtime else 'UTC' - if self.data.localtime: - self.getDateTime = datetime.datetime.fromtimestamp - else: - self.getDateTime = datetime.datetime.utcfromtimestamp - - if self.width is None: - self.width = 8 - - self.figures = [] - self.axes = [] - self.cb_axes = [] - self.pf_axes = [] - self.cmaps = [] - - size = '15%' if self.ncols == 1 else '30%' - pad = '4%' if self.ncols == 1 else '8%' - - if self.oneFigure: - if self.height is None: - self.height = 1.4 * self.nrows + 1 - fig = plt.figure(figsize=(self.width, self.height), - edgecolor='k', - facecolor='w') - self.figures.append(fig) - for n in range(self.nplots): - ax = fig.add_subplot(self.nrows, self.ncols, - n + 1, polar=self.polar) - ax.tick_params(labelsize=8) - ax.firsttime = True - ax.index = 0 - ax.press = None - self.axes.append(ax) - if self.showprofile: - cax = self.__add_axes(ax, size=size, pad=pad) - cax.tick_params(labelsize=8) - self.pf_axes.append(cax) - else: - if self.height is None: - self.height = 3 - for n in range(self.nplots): - fig = plt.figure(figsize=(self.width, self.height), - edgecolor='k', - facecolor='w') - ax = fig.add_subplot(1, 1, 1, polar=self.polar) - ax.tick_params(labelsize=8) - ax.firsttime = True - ax.index = 0 - ax.press = None - self.figures.append(fig) - self.axes.append(ax) - if self.showprofile: - cax = self.__add_axes(ax, size=size, pad=pad) - cax.tick_params(labelsize=8) - self.pf_axes.append(cax) - - for n in range(self.nrows): - if self.colormaps is not None: - cmap = plt.get_cmap(self.colormaps[n]) - else: - cmap = plt.get_cmap(self.colormap) - cmap.set_bad(self.bgcolor, 1.) - self.cmaps.append(cmap) - - for fig in self.figures: - fig.canvas.mpl_connect('key_press_event', self.OnKeyPress) - fig.canvas.mpl_connect('scroll_event', self.OnBtnScroll) - fig.canvas.mpl_connect('button_press_event', self.onBtnPress) - fig.canvas.mpl_connect('motion_notify_event', self.onMotion) - fig.canvas.mpl_connect('button_release_event', self.onBtnRelease) - if self.show: - fig.show() - - def OnKeyPress(self, event): - ''' - Event for pressing keys (up, down) change colormap - ''' - ax = event.inaxes - if ax in self.axes: - if event.key == 'down': - ax.index += 1 - elif event.key == 'up': - ax.index -= 1 - if ax.index < 0: - ax.index = len(CMAPS) - 1 - elif ax.index == len(CMAPS): - ax.index = 0 - cmap = CMAPS[ax.index] - ax.cbar.set_cmap(cmap) - ax.cbar.draw_all() - ax.plt.set_cmap(cmap) - ax.cbar.patch.figure.canvas.draw() - self.colormap = cmap.name - - def OnBtnScroll(self, event): - ''' - Event for scrolling, scale figure - ''' - cb_ax = event.inaxes - if cb_ax in [ax.cbar.ax for ax in self.axes if ax.cbar]: - ax = [ax for ax in self.axes if cb_ax == ax.cbar.ax][0] - pt = ax.cbar.ax.bbox.get_points()[:, 1] - nrm = ax.cbar.norm - vmin, vmax, p0, p1, pS = ( - nrm.vmin, nrm.vmax, pt[0], pt[1], event.y) - scale = 2 if event.step == 1 else 0.5 - point = vmin + (vmax - vmin) / (p1 - p0) * (pS - p0) - ax.cbar.norm.vmin = point - scale * (point - vmin) - ax.cbar.norm.vmax = point - scale * (point - vmax) - ax.plt.set_norm(ax.cbar.norm) - ax.cbar.draw_all() - ax.cbar.patch.figure.canvas.draw() - - def onBtnPress(self, event): - ''' - Event for mouse button press - ''' - cb_ax = event.inaxes - if cb_ax is None: - return - - if cb_ax in [ax.cbar.ax for ax in self.axes if ax.cbar]: - cb_ax.press = event.x, event.y - else: - cb_ax.press = None - - def onMotion(self, event): - ''' - Event for move inside colorbar - ''' - cb_ax = event.inaxes - if cb_ax is None: - return - if cb_ax not in [ax.cbar.ax for ax in self.axes if ax.cbar]: - return - if cb_ax.press is None: - return - - ax = [ax for ax in self.axes if cb_ax == ax.cbar.ax][0] - xprev, yprev = cb_ax.press - dx = event.x - xprev - dy = event.y - yprev - cb_ax.press = event.x, event.y - scale = ax.cbar.norm.vmax - ax.cbar.norm.vmin - perc = 0.03 - - if event.button == 1: - ax.cbar.norm.vmin -= (perc * scale) * numpy.sign(dy) - ax.cbar.norm.vmax -= (perc * scale) * numpy.sign(dy) - elif event.button == 3: - ax.cbar.norm.vmin -= (perc * scale) * numpy.sign(dy) - ax.cbar.norm.vmax += (perc * scale) * numpy.sign(dy) - - ax.cbar.draw_all() - ax.plt.set_norm(ax.cbar.norm) - ax.cbar.patch.figure.canvas.draw() - - def onBtnRelease(self, event): - ''' - Event for mouse button release - ''' - cb_ax = event.inaxes - if cb_ax is not None: - cb_ax.press = None - - def __add_axes(self, ax, size='30%', pad='8%'): - ''' - Add new axes to the given figure - ''' - divider = make_axes_locatable(ax) - nax = divider.new_horizontal(size=size, pad=pad) - ax.figure.add_axes(nax) - return nax - - self.setup() - - def setup(self): - ''' - This method should be implemented in the child class, the following - attributes should be set: - - self.nrows: number of rows - self.ncols: number of cols - self.nplots: number of plots (channels or pairs) - self.ylabel: label for Y axes - self.titles: list of axes title - - ''' - raise NotImplementedError - - def fill_gaps(self, x_buffer, y_buffer, z_buffer): - ''' - Create a masked array for missing data - ''' - if x_buffer.shape[0] < 2: - return x_buffer, y_buffer, z_buffer - - deltas = x_buffer[1:] - x_buffer[0:-1] - x_median = numpy.median(deltas) - - index = numpy.where(deltas > 5 * x_median) - - if len(index[0]) != 0: - z_buffer[::, index[0], ::] = self.__missing - z_buffer = numpy.ma.masked_inside(z_buffer, - 0.99 * self.__missing, - 1.01 * self.__missing) - - return x_buffer, y_buffer, z_buffer - - def decimate(self): - - # dx = int(len(self.x)/self.__MAXNUMX) + 1 - dy = int(len(self.y) / self.decimation) + 1 - - # x = self.x[::dx] - x = self.x - y = self.y[::dy] - z = self.z[::, ::, ::dy] - - return x, y, z - def format(self): - ''' - Set min and max values, labels, ticks and titles - ''' - - if self.xmin is None: - xmin = self.min_time - else: - if self.xaxis is 'time': - dt = self.getDateTime(self.min_time) - xmin = (dt.replace(hour=int(self.xmin), minute=0, second=0) - - datetime.datetime(1970, 1, 1)).total_seconds() - if self.data.localtime: - xmin += time.timezone - else: - xmin = self.xmin - - if self.xmax is None: - xmax = xmin + self.xrange * 60 * 60 - else: - if self.xaxis is 'time': - dt = self.getDateTime(self.max_time) - xmax = (dt.replace(hour=int(self.xmax), minute=59, second=59) - - datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=1)).total_seconds() - if self.data.localtime: - xmax += time.timezone - else: - xmax = self.xmax - - ymin = self.ymin if self.ymin else numpy.nanmin(self.y) - ymax = self.ymax if self.ymax else numpy.nanmax(self.y) - - Y = numpy.array([1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000]) - i = 1 if numpy.where(abs(ymax-ymin) <= Y)[0][0] < 0 else numpy.where(abs(ymax-ymin) <= Y)[0][0] - ystep = Y[i] / 10. - - if self.xaxis is not 'time': - X = numpy.array([1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000])/2. - i = 1 if numpy.where(abs(xmax-xmin) <= X)[0][0] < 0 else numpy.where(abs(xmax-xmin) <= X)[0][0] - xstep = X[i] / 10. - - for n, ax in enumerate(self.axes): - if ax.firsttime: - ax.set_facecolor(self.bgcolor) - ax.yaxis.set_major_locator(MultipleLocator(ystep)) - if self.xscale: - ax.xaxis.set_major_formatter(FuncFormatter(lambda x, pos: '{0:g}'.format(x*self.xscale))) - if self.xscale: - ax.yaxis.set_major_formatter(FuncFormatter(lambda x, pos: '{0:g}'.format(x*self.yscale))) - if self.xaxis is 'time': - ax.xaxis.set_major_formatter(FuncFormatter(self.__fmtTime)) - ax.xaxis.set_major_locator(LinearLocator(9)) - else: - ax.xaxis.set_major_locator(MultipleLocator(xstep)) - if self.xlabel is not None: - ax.set_xlabel(self.xlabel) - ax.set_ylabel(self.ylabel) - ax.firsttime = False - if self.showprofile: - self.pf_axes[n].set_ylim(ymin, ymax) - self.pf_axes[n].set_xlim(self.zmin, self.zmax) - self.pf_axes[n].set_xlabel('dB') - self.pf_axes[n].grid(b=True, axis='x') - [tick.set_visible(False) - for tick in self.pf_axes[n].get_yticklabels()] - if self.colorbar: - ax.cbar = plt.colorbar( - ax.plt, ax=ax, fraction=0.05, pad=0.02, aspect=10) - ax.cbar.ax.tick_params(labelsize=8) - ax.cbar.ax.press = None - if self.cb_label: - ax.cbar.set_label(self.cb_label, size=8) - elif self.cb_labels: - ax.cbar.set_label(self.cb_labels[n], size=8) - else: - ax.cbar = None - if self.grid: - ax.grid(True) - - if not self.polar: - ax.set_xlim(xmin, xmax) - ax.set_ylim(ymin, ymax) - ax.set_title('{} {} {}'.format( - self.titles[n], - self.getDateTime(self.max_time).strftime('%Y-%m-%dT%H:%M:%S'), - self.time_label), - size=8) - else: - ax.set_title('{}'.format(self.titles[n]), size=8) - ax.set_ylim(0, 90) - ax.set_yticks(numpy.arange(0, 90, 20)) - ax.yaxis.labelpad = 40 - - def __plot(self): - ''' - ''' - log.log('Plotting', self.name) - - try: - self.plot() - self.format() - except Exception as e: - log.warning('{} Plot could not be updated... check data'.format(self.CODE), self.name) - log.error(str(e), '') - return - - for n, fig in enumerate(self.figures): - if self.nrows == 0 or self.nplots == 0: - log.warning('No data', self.name) - fig.text(0.5, 0.5, 'No Data', fontsize='large', ha='center') - fig.canvas.manager.set_window_title(self.CODE) - continue - - fig.tight_layout() - fig.canvas.manager.set_window_title('{} - {}'.format(self.title, - self.getDateTime(self.max_time).strftime('%Y/%m/%d'))) - fig.canvas.draw() - - if self.save and (self.data.ended or not self.data.buffering): - - if self.save_labels: - labels = self.save_labels - else: - labels = list(range(self.nrows)) - - if self.oneFigure: - label = '' - else: - label = '-{}'.format(labels[n]) - figname = os.path.join( - self.save, - self.CODE, - '{}{}_{}.png'.format( - self.CODE, - label, - self.getDateTime(self.saveTime).strftime( - '%Y%m%d_%H%M%S'), - ) - ) - log.log('Saving figure: {}'.format(figname), self.name) - if not os.path.isdir(os.path.dirname(figname)): - os.makedirs(os.path.dirname(figname)) - fig.savefig(figname) - - def plot(self): - ''' - ''' - raise NotImplementedError - - def run(self): - - log.log('Starting', self.name) - - context = zmq.Context() - receiver = context.socket(zmq.SUB) - receiver.setsockopt(zmq.SUBSCRIBE, '') - receiver.setsockopt(zmq.CONFLATE, self.CONFLATE) - - if 'server' in self.kwargs['parent']: - receiver.connect( - 'ipc:///tmp/{}.plots'.format(self.kwargs['parent']['server'])) - else: - receiver.connect("ipc:///tmp/zmq.plots") - - while True: - try: - self.data = receiver.recv_pyobj(flags=zmq.NOBLOCK) - if self.data.localtime and self.localtime: - self.times = self.data.times - elif self.data.localtime and not self.localtime: - self.times = self.data.times + time.timezone - elif not self.data.localtime and self.localtime: - self.times = self.data.times - time.timezone - else: - self.times = self.data.times - - self.min_time = self.times[0] - self.max_time = self.times[-1] - - if self.isConfig is False: - self.__setup() - self.isConfig = True - - self.__plot() - - except zmq.Again as e: - if self.data and self.data.ended: - break - log.log('Waiting for data...') - if self.data: - figpause(self.data.throttle) - else: - time.sleep(2) - - def close(self): - if self.data: - self.__plot() - - -class PlotSpectraData(PlotData): +class SpectraPlot(Plot): ''' Plot for Spectra data ''' @@ -644,10 +107,9 @@ class PlotSpectraData(PlotData): ax.plt_mean.set_data(mean, y) self.titles.append('CH {}: {:3.2f}dB'.format(n, noise)) - self.saveTime = self.max_time -class PlotCrossSpectraData(PlotData): +class CrossSpectraPlot(Plot): CODE = 'cspc' zmin_coh = None @@ -741,10 +203,8 @@ class PlotCrossSpectraData(PlotData): ax.plt.set_array(phase.T.ravel()) self.titles.append('Phase CH{} * CH{}'.format(pair[0], pair[1])) - self.saveTime = self.max_time - -class PlotSpectraMeanData(PlotSpectraData): +class SpectraMeanPlot(SpectraPlot): ''' Plot for Spectra and Mean ''' @@ -752,7 +212,7 @@ class PlotSpectraMeanData(PlotSpectraData): colormap = 'jro' -class PlotRTIData(PlotData): +class RTIPlot(Plot): ''' Plot for RTI data ''' @@ -771,7 +231,7 @@ class PlotRTIData(PlotData): self.CODE.upper(), x) for x in range(self.nrows)] def plot(self): - self.x = self.times + self.x = self.data.times self.y = self.data.heights self.z = self.data[self.CODE] self.z = numpy.ma.masked_invalid(self.z) @@ -781,7 +241,7 @@ class PlotRTIData(PlotData): else: x, y, z = self.fill_gaps(*self.decimate()) - for n, ax in enumerate(self.axes): + for n, ax in enumerate(self.axes): self.zmin = self.zmin if self.zmin else numpy.min(self.z) self.zmax = self.zmax if self.zmax else numpy.max(self.z) if ax.firsttime: @@ -807,10 +267,8 @@ class PlotRTIData(PlotData): ax.plot_noise.set_data(numpy.repeat( self.data['noise'][n][-1], len(self.y)), self.y) - self.saveTime = self.min_time - -class PlotCOHData(PlotRTIData): +class CoherencePlot(RTIPlot): ''' Plot for Coherence data ''' @@ -833,7 +291,7 @@ class PlotCOHData(PlotRTIData): 'Phase Map Ch{} * Ch{}'.format(x[0], x[1]) for x in self.data.pairs] -class PlotPHASEData(PlotCOHData): +class PhasePlot(CoherencePlot): ''' Plot for Phase map data ''' @@ -842,7 +300,7 @@ class PlotPHASEData(PlotCOHData): colormap = 'seismic' -class PlotNoiseData(PlotData): +class NoisePlot(Plot): ''' Plot for noise ''' @@ -860,8 +318,8 @@ class PlotNoiseData(PlotData): def plot(self): - x = self.times - xmin = self.min_time + x = self.data.times + xmin = self.data.min_time xmax = xmin + self.xrange * 60 * 60 Y = self.data[self.CODE] @@ -877,10 +335,9 @@ class PlotNoiseData(PlotData): self.ymin = numpy.nanmin(Y) - 5 self.ymax = numpy.nanmax(Y) + 5 - self.saveTime = self.min_time -class PlotSNRData(PlotRTIData): +class SnrPlot(RTIPlot): ''' Plot for SNR Data ''' @@ -889,7 +346,7 @@ class PlotSNRData(PlotRTIData): colormap = 'jet' -class PlotDOPData(PlotRTIData): +class DopplerPlot(RTIPlot): ''' Plot for DOPPLER Data ''' @@ -898,7 +355,7 @@ class PlotDOPData(PlotRTIData): colormap = 'jet' -class PlotSkyMapData(PlotData): +class SkyMapPlot(Plot): ''' Plot for meteors detection data ''' @@ -938,16 +395,15 @@ class PlotSkyMapData(PlotData): else: ax.plot.set_data(x, y) - dt1 = self.getDateTime(self.min_time).strftime('%y/%m/%d %H:%M:%S') - dt2 = self.getDateTime(self.max_time).strftime('%y/%m/%d %H:%M:%S') + dt1 = self.getDateTime(self.data.min_time).strftime('%y/%m/%d %H:%M:%S') + dt2 = self.getDateTime(self.data.max_time).strftime('%y/%m/%d %H:%M:%S') title = 'Meteor Detection Sky Map\n %s - %s \n Number of events: %5.0f\n' % (dt1, dt2, len(x)) self.titles[0] = title - self.saveTime = self.max_time -class PlotParamData(PlotRTIData): +class ParametersPlot(RTIPlot): ''' Plot for data_param object ''' @@ -973,7 +429,7 @@ class PlotParamData(PlotRTIData): def plot(self): self.data.normalize_heights() - self.x = self.times + self.x = self.data.times self.y = self.data.heights if self.showSNR: self.z = numpy.concatenate( @@ -990,7 +446,7 @@ class PlotParamData(PlotRTIData): x, y, z = self.fill_gaps(*self.decimate()) for n, ax in enumerate(self.axes): - + self.zmax = self.zmax if self.zmax is not None else numpy.max( self.z[n]) self.zmin = self.zmin if self.zmin is not None else numpy.min( @@ -1015,10 +471,8 @@ class PlotParamData(PlotRTIData): cmap=self.cmaps[n] ) - self.saveTime = self.min_time - -class PlotOutputData(PlotParamData): +class OutputPlot(ParametersPlot): ''' Plot data_output object ''' @@ -1027,9 +481,9 @@ class PlotOutputData(PlotParamData): colormap = 'seismic' -class PlotPolarMapData(PlotData): +class PolarMapPlot(Plot): ''' - Plot for meteors detection data + Plot for weather radar ''' CODE = 'param' @@ -1058,20 +512,24 @@ class PlotPolarMapData(PlotData): self.cb_labels = self.data.meta['units'] self.lat = self.data.meta['latitude'] self.lon = self.data.meta['longitude'] - self.xmin, self.xmax = float(km2deg(self.xmin) + self.lon), float(km2deg(self.xmax) + self.lon) - self.ymin, self.ymax = float(km2deg(self.ymin) + self.lat), float(km2deg(self.ymax) + self.lat) + self.xmin, self.xmax = float( + km2deg(self.xmin) + self.lon), float(km2deg(self.xmax) + self.lon) + self.ymin, self.ymax = float( + km2deg(self.ymin) + self.lat), float(km2deg(self.ymax) + self.lat) # self.polar = True - def plot(self): - + def plot(self): + for n, ax in enumerate(self.axes): data = self.data['param'][self.channels[n]] - - zeniths = numpy.linspace(0, self.data.meta['max_range'], data.shape[1]) - if self.mode == 'E': + + zeniths = numpy.linspace( + 0, self.data.meta['max_range'], data.shape[1]) + if self.mode == 'E': azimuths = -numpy.radians(self.data.heights)+numpy.pi/2 r, theta = numpy.meshgrid(zeniths, azimuths) - x, y = r*numpy.cos(theta)*numpy.cos(numpy.radians(self.data.meta['elevation'])), r*numpy.sin(theta)*numpy.cos(numpy.radians(self.data.meta['elevation'])) + x, y = r*numpy.cos(theta)*numpy.cos(numpy.radians(self.data.meta['elevation'])), r*numpy.sin( + theta)*numpy.cos(numpy.radians(self.data.meta['elevation'])) x = km2deg(x) + self.lon y = km2deg(y) + self.lat else: @@ -1083,35 +541,36 @@ class PlotPolarMapData(PlotData): if ax.firsttime: if self.zlimits is not None: self.zmin, self.zmax = self.zlimits[n] - ax.plt = ax.pcolormesh(#r, theta, numpy.ma.array(data, mask=numpy.isnan(data)), - x, y, numpy.ma.array(data, mask=numpy.isnan(data)), - vmin=self.zmin, - vmax=self.zmax, - cmap=self.cmaps[n]) + ax.plt = ax.pcolormesh( # r, theta, numpy.ma.array(data, mask=numpy.isnan(data)), + x, y, numpy.ma.array(data, mask=numpy.isnan(data)), + vmin=self.zmin, + vmax=self.zmax, + cmap=self.cmaps[n]) else: if self.zlimits is not None: self.zmin, self.zmax = self.zlimits[n] ax.collections.remove(ax.collections[0]) - ax.plt = ax.pcolormesh(# r, theta, numpy.ma.array(data, mask=numpy.isnan(data)), - x, y, numpy.ma.array(data, mask=numpy.isnan(data)), - vmin=self.zmin, - vmax=self.zmax, - cmap=self.cmaps[n]) + ax.plt = ax.pcolormesh( # r, theta, numpy.ma.array(data, mask=numpy.isnan(data)), + x, y, numpy.ma.array(data, mask=numpy.isnan(data)), + vmin=self.zmin, + vmax=self.zmax, + cmap=self.cmaps[n]) if self.mode == 'A': continue - + # plot district names f = open('/data/workspace/schain_scripts/distrito.csv') for line in f: label, lon, lat = [s.strip() for s in line.split(',') if s] lat = float(lat) - lon = float(lon) + lon = float(lon) # ax.plot(lon, lat, '.b', ms=2) - ax.text(lon, lat, label.decode('utf8'), ha='center', va='bottom', size='8', color='black') - + ax.text(lon, lat, label.decode('utf8'), ha='center', + va='bottom', size='8', color='black') + # plot limites - limites =[] + limites = [] tmp = [] for line in open('/data/workspace/schain_scripts/lima.csv'): if '#' in line: @@ -1122,7 +581,8 @@ class PlotPolarMapData(PlotData): values = line.strip().split(',') tmp.append((float(values[0]), float(values[1]))) for points in limites: - ax.add_patch(Polygon(points, ec='k', fc='none', ls='--', lw=0.5)) + ax.add_patch( + Polygon(points, ec='k', fc='none', ls='--', lw=0.5)) # plot Cuencas for cuenca in ('rimac', 'lurin', 'mala', 'chillon', 'chilca', 'chancay-huaral'): @@ -1133,22 +593,21 @@ class PlotPolarMapData(PlotData): # plot grid for r in (15, 30, 45, 60): - ax.add_artist(plt.Circle((self.lon, self.lat), km2deg(r), color='0.6', fill=False, lw=0.2)) + ax.add_artist(plt.Circle((self.lon, self.lat), + km2deg(r), color='0.6', fill=False, lw=0.2)) ax.text( self.lon + (km2deg(r))*numpy.cos(60*numpy.pi/180), self.lat + (km2deg(r))*numpy.sin(60*numpy.pi/180), - '{}km'.format(r), + '{}km'.format(r), ha='center', va='bottom', size='8', color='0.6', weight='heavy') - + if self.mode == 'E': title = 'El={}$^\circ$'.format(self.data.meta['elevation']) label = 'E{:02d}'.format(int(self.data.meta['elevation'])) else: title = 'Az={}$^\circ$'.format(self.data.meta['azimuth']) label = 'A{:02d}'.format(int(self.data.meta['azimuth'])) - - self.save_labels = ['{}-{}'.format(lbl, label) for lbl in self.labels] - self.titles = ['{} {}'.format(self.data.parameters[x], title) for x in self.channels] - self.saveTime = self.max_time - \ No newline at end of file + self.save_labels = ['{}-{}'.format(lbl, label) for lbl in self.labels] + self.titles = ['{} {}'.format( + self.data.parameters[x], title) for x in self.channels] diff --git a/schainpy/model/graphics/jroplot_heispectra.py b/schainpy/model/graphics/jroplot_heispectra.py index bfc6e18..2bca309 100644 --- a/schainpy/model/graphics/jroplot_heispectra.py +++ b/schainpy/model/graphics/jroplot_heispectra.py @@ -10,7 +10,7 @@ import numpy from .figure import Figure, isRealtime from .plotting_codes import * -class SpectraHeisScope(Figure): +class SpectraHeisScope_(Figure): isConfig = None @@ -173,7 +173,7 @@ class SpectraHeisScope(Figure): wr_period=wr_period, thisDatetime=thisDatetime) -class RTIfromSpectraHeis(Figure): +class RTIfromSpectraHeis_(Figure): isConfig = None __nsubplots = None diff --git a/schainpy/model/graphics/jroplot_parameters.py b/schainpy/model/graphics/jroplot_parameters.py index f3259ac..d049627 100644 --- a/schainpy/model/graphics/jroplot_parameters.py +++ b/schainpy/model/graphics/jroplot_parameters.py @@ -7,7 +7,7 @@ from .plotting_codes import * from schainpy.model.proc.jroproc_base import MPDecorator from schainpy.utils import log -class FitGauPlot(Figure): +class FitGauPlot_(Figure): isConfig = None __nsubplots = None @@ -218,7 +218,7 @@ class FitGauPlot(Figure): -class MomentsPlot(Figure): +class MomentsPlot_(Figure): isConfig = None __nsubplots = None @@ -405,7 +405,7 @@ class MomentsPlot(Figure): thisDatetime=thisDatetime) -class SkyMapPlot(Figure): +class SkyMapPlot_(Figure): __isConfig = None __nsubplots = None @@ -561,7 +561,7 @@ class SkyMapPlot(Figure): -class WindProfilerPlot(Figure): +class WindProfilerPlot_(Figure): __isConfig = None __nsubplots = None @@ -774,7 +774,7 @@ class WindProfilerPlot(Figure): update_figfile = True @MPDecorator -class ParametersPlot(Figure): +class ParametersPlot_(Figure): __isConfig = None __nsubplots = None @@ -986,7 +986,7 @@ class ParametersPlot(Figure): return dataOut @MPDecorator -class Parameters1Plot(Figure): +class Parameters1Plot_(Figure): __isConfig = None __nsubplots = None @@ -1237,7 +1237,7 @@ class Parameters1Plot(Figure): update_figfile=False) return dataOut -class SpectralFittingPlot(Figure): +class SpectralFittingPlot_(Figure): __isConfig = None __nsubplots = None @@ -1415,7 +1415,7 @@ class SpectralFittingPlot(Figure): thisDatetime=thisDatetime) -class EWDriftsPlot(Figure): +class EWDriftsPlot_(Figure): __isConfig = None __nsubplots = None @@ -1621,7 +1621,7 @@ class EWDriftsPlot(Figure): -class PhasePlot(Figure): +class PhasePlot_(Figure): __isConfig = None __nsubplots = None @@ -1785,7 +1785,7 @@ class PhasePlot(Figure): -class NSMeteorDetection1Plot(Figure): +class NSMeteorDetection1Plot_(Figure): isConfig = None __nsubplots = None @@ -1969,7 +1969,7 @@ class NSMeteorDetection1Plot(Figure): thisDatetime=thisDatetime) -class NSMeteorDetection2Plot(Figure): +class NSMeteorDetection2Plot_(Figure): isConfig = None __nsubplots = None diff --git a/schainpy/model/graphics/jroplot_spectra.py b/schainpy/model/graphics/jroplot_spectra.py index d6ea2b6..47fd698 100644 --- a/schainpy/model/graphics/jroplot_spectra.py +++ b/schainpy/model/graphics/jroplot_spectra.py @@ -14,7 +14,7 @@ from schainpy.model.proc.jroproc_base import MPDecorator from schainpy.utils import log @MPDecorator -class SpectraPlot(Figure): +class SpectraPlot_(Figure): isConfig = None __nsubplots = None @@ -226,7 +226,7 @@ class SpectraPlot(Figure): return dataOut @MPDecorator -class CrossSpectraPlot(Figure): +class CrossSpectraPlot_(Figure): isConfig = None __nsubplots = None @@ -453,7 +453,7 @@ class CrossSpectraPlot(Figure): return dataOut @MPDecorator -class RTIPlot(Figure): +class RTIPlot_(Figure): __isConfig = None __nsubplots = None @@ -667,7 +667,7 @@ class RTIPlot(Figure): return dataOut @MPDecorator -class CoherenceMap(Figure): +class CoherenceMap_(Figure): isConfig = None __nsubplots = None @@ -878,7 +878,7 @@ class CoherenceMap(Figure): return dataOut @MPDecorator -class PowerProfilePlot(Figure): +class PowerProfilePlot_(Figure): isConfig = None __nsubplots = None @@ -1008,7 +1008,7 @@ class PowerProfilePlot(Figure): return dataOut @MPDecorator -class SpectraCutPlot(Figure): +class SpectraCutPlot_(Figure): isConfig = None __nsubplots = None @@ -1145,7 +1145,7 @@ class SpectraCutPlot(Figure): return dataOut @MPDecorator -class Noise(Figure): +class Noise_(Figure): isConfig = None __nsubplots = None @@ -1352,7 +1352,7 @@ class Noise(Figure): return dataOut @MPDecorator -class BeaconPhase(Figure): +class BeaconPhase_(Figure): __isConfig = None __nsubplots = None diff --git a/schainpy/model/graphics/jroplot_voltage.py b/schainpy/model/graphics/jroplot_voltage.py index 5544143..ae4a972 100644 --- a/schainpy/model/graphics/jroplot_voltage.py +++ b/schainpy/model/graphics/jroplot_voltage.py @@ -12,7 +12,7 @@ from .figure import Figure @MPDecorator -class Scope(Figure): +class Scope_(Figure): isConfig = None diff --git a/schainpy/model/graphics/jroplotter.py b/schainpy/model/graphics/jroplotter.py deleted file mode 100644 index d222d1c..0000000 --- a/schainpy/model/graphics/jroplotter.py +++ /dev/null @@ -1,240 +0,0 @@ -''' -Created on Jul 9, 2014 - -@author: roj-idl71 -''' -import os, sys -import datetime -import numpy -import traceback - -from time import sleep -from threading import Lock -# from threading import Thread - -import schainpy -import schainpy.admin - -from schainpy.model.proc.jroproc_base import Operation -from schainpy.model.serializer.data import obj2Dict, dict2Obj -from .jroplot_correlation import * -from .jroplot_heispectra import * -from .jroplot_parameters import * -from .jroplot_spectra import * -from .jroplot_voltage import * - - -class Plotter(Operation): - - isConfig = None - name = None - __queue = None - - def __init__(self, plotter_name, plotter_queue=None, **kwargs): - - Operation.__init__(self, **kwargs) - - self.isConfig = False - self.name = plotter_name - self.__queue = plotter_queue - - def getSubplots(self): - - nrow = self.nplots - ncol = 1 - return nrow, ncol - - def setup(self, **kwargs): - - print("Initializing ...") - - - def run(self, dataOut, id=None, **kwargs): - - """ - - Input: - dataOut : - id : - """ - - packDict = {} - - packDict['id'] = id - packDict['name'] = self.name - packDict['kwargs'] = kwargs - -# packDict['data'] = obj2Dict(dataOut) - packDict['data'] = dataOut - - self.__queue.put(packDict) - -# class PlotManager(Thread): -class PlotManager(): - - __err = False - __stop = False - __realtime = False - - controllerThreadObj = None - - plotterList = ['Scope', - 'SpectraPlot', 'RTIPlot', - 'SpectraCutPlot', - 'CrossSpectraPlot', 'CoherenceMap', - 'PowerProfilePlot', 'Noise', 'BeaconPhase', - 'CorrelationPlot', - 'SpectraHeisScope', 'RTIfromSpectraHeis'] - - def __init__(self, plotter_queue): - -# Thread.__init__(self) -# self.setDaemon(True) - - self.__queue = plotter_queue - self.__lock = Lock() - - self.plotInstanceDict = {} - - self.__err = False - self.__stop = False - self.__realtime = False - - def __handleError(self, name="", send_email=False): - - err = traceback.format_exception(sys.exc_info()[0], - sys.exc_info()[1], - sys.exc_info()[2]) - - print("***** Error occurred in PlotManager *****") - print("***** [%s]: %s" %(name, err[-1])) - - message = "\nError ocurred in %s:\n" %name - message += "".join(err) - - sys.stderr.write(message) - - if not send_email: - return - - import socket - - subject = "SChain v%s: Error running %s\n" %(schainpy.__version__, name) - - subtitle = "%s:\n" %(name) - subtitle += "Hostname: %s\n" %socket.gethostbyname(socket.gethostname()) - subtitle += "Working directory: %s\n" %os.path.abspath("./") - # subtitle += "Configuration file: %s\n" %self.filename - subtitle += "Time: %s\n" %str(datetime.datetime.now()) - - adminObj = schainpy.admin.SchainNotify() - adminObj.sendAlert(message=message, - subject=subject, - subtitle=subtitle) - - def run(self): - - if self.__queue.empty(): - return - - if self.__err: - serial_data = self.__queue.get() - self.__queue.task_done() - return - - self.__lock.acquire() - -# if self.__queue.full(): -# for i in range(int(self.__queue.qsize()/2)): -# serial_data = self.__queue.get() -# self.__queue.task_done() - - n = int(self.__queue.qsize()/3 + 1) - - for i in range(n): - - if self.__queue.empty(): - break - - serial_data = self.__queue.get() - self.__queue.task_done() - - plot_id = serial_data['id'] - plot_name = serial_data['name'] - kwargs = serial_data['kwargs'] -# dataDict = serial_data['data'] -# -# dataPlot = dict2Obj(dataDict) - - dataPlot = serial_data['data'] - - if plot_id not in list(self.plotInstanceDict.keys()): - className = eval(plot_name) - self.plotInstanceDict[plot_id] = className(**kwargs) - - plotter = self.plotInstanceDict[plot_id] - try: - plotter.run(dataPlot, plot_id, **kwargs) - except: - self.__err = True - self.__handleError(plot_name, send_email=True) - break - - self.__lock.release() - - def isEmpty(self): - - return self.__queue.empty() - - def stop(self): - - self.__lock.acquire() - - self.__stop = True - - self.__lock.release() - - def close(self): - - self.__lock.acquire() - - for plot_id in list(self.plotInstanceDict.keys()): - plotter = self.plotInstanceDict[plot_id] - plotter.close() - - self.__lock.release() - - def setController(self, controllerThreadObj): - - self.controllerThreadObj = controllerThreadObj - - def start(self): - - if not self.controllerThreadObj.isRunning(): - raise RuntimeError("controllerThreadObj has not been initialized. Use controllerThreadObj.start() before call this method") - - self.join() - - def join(self): - - #Execute plotter while controller is running - while self.controllerThreadObj.isRunning(): - self.run() - - self.controllerThreadObj.stop() - - #Wait until plotter queue is empty - while not self.isEmpty(): - self.run() - - self.close() - - def isErrorDetected(self): - - self.__lock.acquire() - - err = self.__err - - self.__lock.release() - - return err \ No newline at end of file diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index 6e2a4e9..5dda991 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -11,13 +11,15 @@ Based on: $Author: murco $ $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $ ''' -from platform import python_version + import inspect import zmq import time import pickle import os from multiprocessing import Process +from zmq.utils.monitor import recv_monitor_message + from schainpy.utils import log @@ -35,15 +37,6 @@ class ProcessingUnit(object): """ - - METHODS = {} - dataIn = None - dataInList = [] - id = None - inputId = None - dataOut = None - dictProcs = None - isConfig = False def __init__(self): @@ -51,15 +44,15 @@ class ProcessingUnit(object): self.dataOut = None self.isConfig = False self.operations = [] + self.plots = [] def getAllowedArgs(self): if hasattr(self, '__attrs__'): return self.__attrs__ else: return inspect.getargspec(self.run).args - - def addOperation(self, conf, operation): + def addOperation(self, conf, operation): """ This method is used in the controller, and update the dictionary containing the operations to execute. The dict posses the id of the operation process (IPC purposes) @@ -76,7 +69,11 @@ class ProcessingUnit(object): objId : identificador del objeto, necesario para comunicar con master(procUnit) """ - self.operations.append((operation, conf.type, conf.id, conf.getKwargs())) + self.operations.append( + (operation, conf.type, conf.id, conf.getKwargs())) + + if 'plot' in self.name.lower(): + self.plots.append(operation.CODE) def getOperationObj(self, objId): @@ -86,7 +83,6 @@ class ProcessingUnit(object): return self.operations[objId] def operation(self, **kwargs): - """ Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los atributos del objeto dataOut @@ -96,7 +92,7 @@ class ProcessingUnit(object): **kwargs : Diccionario de argumentos de la funcion a ejecutar """ - raise NotImplementedError + raise NotImplementedError def setup(self): @@ -107,9 +103,10 @@ class ProcessingUnit(object): raise NotImplementedError def close(self): - #Close every thread, queue or any other object here is it is neccesary. + return - + + class Operation(object): """ @@ -126,22 +123,15 @@ class Operation(object): Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer) """ - id = None - __buffer = None - dest = None - isConfig = False - readyFlag = None def __init__(self): - self.buffer = None - self.dest = None + self.id = None self.isConfig = False - self.readyFlag = False if not hasattr(self, 'name'): self.name = self.__class__.__name__ - + def getAllowedArgs(self): if hasattr(self, '__attrs__'): return self.__attrs__ @@ -154,9 +144,7 @@ class Operation(object): raise NotImplementedError - def run(self, dataIn, **kwargs): - """ Realiza las operaciones necesarias sobre la dataIn.data y actualiza los atributos del objeto dataIn. @@ -180,18 +168,17 @@ class Operation(object): def close(self): - pass + return def MPDecorator(BaseClass): - """ Multiprocessing class decorator This function add multiprocessing features to a BaseClass. Also, it handle the communication beetween processes (readers, procUnits and operations). """ - + class MPClass(BaseClass, Process): def __init__(self, *args, **kwargs): @@ -203,42 +190,38 @@ def MPDecorator(BaseClass): self.sender = None self.receiver = None self.name = BaseClass.__name__ + self.start_time = time.time() if len(self.args) is 3: self.typeProc = "ProcUnit" - self.id = args[0] - self.inputId = args[1] - self.project_id = args[2] - else: + self.id = args[0] + self.inputId = args[1] + self.project_id = args[2] + elif len(self.args) is 2: self.id = args[0] self.inputId = args[0] self.project_id = args[1] self.typeProc = "Operation" - def getAllowedArgs(self): - - if hasattr(self, '__attrs__'): - return self.__attrs__ - else: - return inspect.getargspec(BaseClass.run).args - def subscribe(self): ''' This function create a socket to receive objects from the topic `inputId`. ''' - + c = zmq.Context() self.receiver = c.socket(zmq.SUB) - self.receiver.connect('ipc:///tmp/schain/{}_pub'.format(self.project_id)) + self.receiver.connect( + 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) def listen(self): ''' This function waits for objects and deserialize using pickle ''' - - data = pickle.loads(self.receiver.recv_multipart()[1]) + + data = pickle.loads(self.receiver.recv_multipart()[1]) + return data def set_publisher(self): @@ -248,14 +231,14 @@ def MPDecorator(BaseClass): time.sleep(1) c = zmq.Context() - self.sender = c.socket(zmq.PUB) - self.sender.connect('ipc:///tmp/schain/{}_sub'.format(self.project_id)) + self.sender = c.socket(zmq.PUB) + self.sender.connect( + 'ipc:///tmp/schain/{}_sub'.format(self.project_id)) - def publish(self, data, id): + def publish(self, data, id): ''' This function publish an object, to a specific topic. ''' - self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) def runReader(self): @@ -263,28 +246,32 @@ def MPDecorator(BaseClass): Run fuction for read units ''' while True: - - BaseClass.run(self, **self.kwargs) - if self.dataOut.error[0] == -1: - log.error(self.dataOut.error[1]) - self.publish('end', self.id) - #self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()]) - break + BaseClass.run(self, **self.kwargs) - for op, optype, id, kwargs in self.operations: - if optype=='self': + for op, optype, opId, kwargs in self.operations: + if optype == 'self': op(**kwargs) - elif optype=='other': + elif optype == 'other': self.dataOut = op.run(self.dataOut, **self.kwargs) - elif optype=='external': + elif optype == 'external': self.publish(self.dataOut, opId) - if self.dataOut.flagNoData: + if self.dataOut.flagNoData and self.dataOut.error is None: continue - - self.publish(self.dataOut, self.id) - + + self.publish(self.dataOut, self.id) + + if self.dataOut.error: + if self.dataOut.error[0] == -1: + log.error(self.dataOut.error[1], self.name) + if self.dataOut.error[0] == 1: + log.success(self.dataOut.error[1], self.name) + # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()]) + break + + time.sleep(1) + def runProc(self): ''' Run function for proccessing units @@ -293,49 +280,45 @@ def MPDecorator(BaseClass): while True: self.dataIn = self.listen() - if self.dataIn == 'end': - self.publish('end', self.id) - for op, optype, opId, kwargs in self.operations: - if optype == 'external': - self.publish('end', opId) - break - - if self.dataIn.flagNoData: + if self.dataIn.flagNoData and self.dataIn.error is None: continue BaseClass.run(self, **self.kwargs) for op, optype, opId, kwargs in self.operations: - if optype=='self': + if optype == 'self': op(**kwargs) - elif optype=='other': + elif optype == 'other': self.dataOut = op.run(self.dataOut, **kwargs) - elif optype=='external': + elif optype == 'external': self.publish(self.dataOut, opId) - if self.dataOut.flagNoData: - continue - self.publish(self.dataOut, self.id) + if self.dataIn.error: + break + + time.sleep(1) def runOp(self): ''' - Run function for operations + Run function for external operations (this operations just receive data + ex: plots, writers, publishers) ''' while True: dataOut = self.listen() - if dataOut == 'end': - break - BaseClass.run(self, dataOut, **self.kwargs) - + + if dataOut.error: + break + time.sleep(1) + def run(self): - + if self.typeProc is "ProcUnit": - + if self.inputId is not None: self.subscribe() self.set_publisher() @@ -346,22 +329,48 @@ def MPDecorator(BaseClass): self.runReader() elif self.typeProc is "Operation": - + self.subscribe() self.runOp() else: raise ValueError("Unknown type") - print("%s done" % BaseClass.__name__) self.close() + def event_monitor(self, monitor): + + events = {} + + for name in dir(zmq): + if name.startswith('EVENT_'): + value = getattr(zmq, name) + events[value] = name + + while monitor.poll(): + evt = recv_monitor_message(monitor) + if evt['event'] == 32: + self.connections += 1 + if evt['event'] == 512: + pass + + evt.update({'description': events[evt['event']]}) + + if evt['event'] == zmq.EVENT_MONITOR_STOPPED: + break + monitor.close() + print('event monitor thread done!') + def close(self): - + + BaseClass.close(self) + if self.sender: self.sender.close() - + if self.receiver: self.receiver.close() - return MPClass \ No newline at end of file + log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name) + + return MPClass diff --git a/schainpy/model/utils/jroutils_publish.py b/schainpy/model/utils/jroutils_publish.py index ea41b05..2013f89 100644 --- a/schainpy/model/utils/jroutils_publish.py +++ b/schainpy/model/utils/jroutils_publish.py @@ -7,11 +7,9 @@ import glob import time import json import numpy -import paho.mqtt.client as mqtt import zmq import datetime import ftplib -from zmq.utils.monitor import recv_monitor_message from functools import wraps from threading import Thread from multiprocessing import Process @@ -54,428 +52,52 @@ def get_plot_code(s): else: return 24 -def roundFloats(obj): - if isinstance(obj, list): - return list(map(roundFloats, obj)) - elif isinstance(obj, float): - return round(obj, 2) - def decimate(z, MAXNUMY): dy = int(len(z[0])/MAXNUMY) + 1 return z[::, ::dy] -class throttle(object): - ''' - Decorator that prevents a function from being called more than once every - time period. - To create a function that cannot be called more than once a minute, but - will sleep until it can be called: - @throttle(minutes=1) - def foo(): - pass - - for i in range(10): - foo() - print "This function has run %s times." % i - ''' - - def __init__(self, seconds=0, minutes=0, hours=0): - self.throttle_period = datetime.timedelta( - seconds=seconds, minutes=minutes, hours=hours - ) - - self.time_of_last_call = datetime.datetime.min - - def __call__(self, fn): - @wraps(fn) - def wrapper(*args, **kwargs): - coerce = kwargs.pop('coerce', None) - if coerce: - self.time_of_last_call = datetime.datetime.now() - return fn(*args, **kwargs) - else: - now = datetime.datetime.now() - time_since_last_call = now - self.time_of_last_call - time_left = self.throttle_period - time_since_last_call - - if time_left > datetime.timedelta(seconds=0): - return - - self.time_of_last_call = datetime.datetime.now() - return fn(*args, **kwargs) - - return wrapper - -class Data(object): - ''' - Object to hold data to be plotted - ''' - - def __init__(self, plottypes, throttle_value, exp_code, buffering=True): - self.plottypes = plottypes - self.throttle = throttle_value - self.exp_code = exp_code - self.buffering = buffering - self.ended = False - self.localtime = False - self.meta = {} - self.__times = [] - self.__heights = [] - - def __str__(self): - dum = ['{}{}'.format(key, self.shape(key)) for key in self.data] - return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times)) - - def __len__(self): - return len(self.__times) - - def __getitem__(self, key): - if key not in self.data: - raise KeyError(log.error('Missing key: {}'.format(key))) - - if 'spc' in key or not self.buffering: - ret = self.data[key] - else: - ret = numpy.array([self.data[key][x] for x in self.times]) - if ret.ndim > 1: - ret = numpy.swapaxes(ret, 0, 1) - return ret - - def __contains__(self, key): - return key in self.data - - def setup(self): - ''' - Configure object - ''' - - self.type = '' - self.ended = False - self.data = {} - self.__times = [] - self.__heights = [] - self.__all_heights = set() - for plot in self.plottypes: - if 'snr' in plot: - plot = 'snr' - self.data[plot] = {} - - def shape(self, key): - ''' - Get the shape of the one-element data for the given key - ''' - - if len(self.data[key]): - if 'spc' in key or not self.buffering: - return self.data[key].shape - return self.data[key][self.__times[0]].shape - return (0,) - - def update(self, dataOut, tm): - ''' - Update data object with new dataOut - ''' - - if tm in self.__times: - return - - self.type = dataOut.type - self.parameters = getattr(dataOut, 'parameters', []) - if hasattr(dataOut, 'pairsList'): - self.pairs = dataOut.pairsList - if hasattr(dataOut, 'meta'): - self.meta = dataOut.meta - self.channels = dataOut.channelList - self.interval = dataOut.getTimeInterval() - self.localtime = dataOut.useLocalTime - if 'spc' in self.plottypes or 'cspc' in self.plottypes: - self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1)) - self.__heights.append(dataOut.heightList) - self.__all_heights.update(dataOut.heightList) - self.__times.append(tm) - - for plot in self.plottypes: - if plot == 'spc': - z = dataOut.data_spc/dataOut.normFactor - buffer = 10*numpy.log10(z) - if plot == 'cspc': - buffer = dataOut.data_cspc - if plot == 'noise': - buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor) - if plot == 'rti': - buffer = dataOut.getPower() - if plot == 'snr_db': - buffer = dataOut.data_SNR - if plot == 'snr': - buffer = 10*numpy.log10(dataOut.data_SNR) - if plot == 'dop': - buffer = 10*numpy.log10(dataOut.data_DOP) - if plot == 'mean': - buffer = dataOut.data_MEAN - if plot == 'std': - buffer = dataOut.data_STD - if plot == 'coh': - buffer = dataOut.getCoherence() - if plot == 'phase': - buffer = dataOut.getCoherence(phase=True) - if plot == 'output': - buffer = dataOut.data_output - if plot == 'param': - buffer = dataOut.data_param - - if 'spc' in plot: - self.data[plot] = buffer - else: - if self.buffering: - self.data[plot][tm] = buffer - else: - self.data[plot] = buffer - - def normalize_heights(self): - ''' - Ensure same-dimension of the data for different heighList - ''' - - H = numpy.array(list(self.__all_heights)) - H.sort() - for key in self.data: - shape = self.shape(key)[:-1] + H.shape - for tm, obj in list(self.data[key].items()): - h = self.__heights[self.__times.index(tm)] - if H.size == h.size: - continue - index = numpy.where(numpy.in1d(H, h))[0] - dummy = numpy.zeros(shape) + numpy.nan - if len(shape) == 2: - dummy[:, index] = obj - else: - dummy[index] = obj - self.data[key][tm] = dummy - - self.__heights = [H for tm in self.__times] - - def jsonify(self, decimate=False): - ''' - Convert data to json - ''' - - data = {} - tm = self.times[-1] - dy = int(self.heights.size/MAXNUMY) + 1 - for key in self.data: - if key in ('spc', 'cspc') or not self.buffering: - dx = int(self.data[key].shape[1]/MAXNUMX) + 1 - data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist()) - else: - data[key] = roundFloats(self.data[key][tm].tolist()) - - ret = {'data': data} - ret['exp_code'] = self.exp_code - ret['time'] = tm - ret['interval'] = self.interval - ret['localtime'] = self.localtime - ret['yrange'] = roundFloats(self.heights[::dy].tolist()) - if 'spc' in self.data or 'cspc' in self.data: - ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist()) - else: - ret['xrange'] = [] - if hasattr(self, 'pairs'): - ret['pairs'] = self.pairs - else: - ret['pairs'] = [] - - for key, value in list(self.meta.items()): - ret[key] = value - - return json.dumps(ret) - - @property - def times(self): - ''' - Return the list of times of the current data - ''' - - ret = numpy.array(self.__times) - ret.sort() - return ret - - @property - def heights(self): - ''' - Return the list of heights of the current data - ''' - - return numpy.array(self.__heights[-1]) class PublishData(Operation): ''' Operation to send data over zmq. ''' - __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose'] + __attrs__ = ['host', 'port', 'delay', 'verbose'] def __init__(self, **kwargs): """Inicio.""" Operation.__init__(self, **kwargs) self.isConfig = False - self.client = None - self.zeromq = None - self.mqtt = None - def on_disconnect(self, client, userdata, rc): - if rc != 0: - log.warning('Unexpected disconnection.') - self.connect() - - def connect(self): - log.warning('trying to connect') - try: - self.client.connect( - host=self.host, - port=self.port, - keepalive=60*10, - bind_address='') - self.client.loop_start() - # self.client.publish( - # self.topic + 'SETUP', - # json.dumps(setup), - # retain=True - # ) - except: - log.error('MQTT Conection error.') - self.client = False - - def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs): + def setup(self, server='zmq.pipe', delay=0, verbose=True, **kwargs): self.counter = 0 - self.topic = kwargs.get('topic', 'schain') self.delay = kwargs.get('delay', 0) - self.plottype = kwargs.get('plottype', 'spectra') - self.host = kwargs.get('host', "10.10.10.82") - self.port = kwargs.get('port', 3000) - self.clientId = clientId self.cnt = 0 - self.zeromq = zeromq - self.mqtt = kwargs.get('plottype', 0) - self.client = None self.verbose = verbose setup = [] - if mqtt is 1: - self.client = mqtt.Client( - client_id=self.clientId + self.topic + 'SCHAIN', - clean_session=True) - self.client.on_disconnect = self.on_disconnect - self.connect() - for plot in self.plottype: - setup.append({ - 'plot': plot, - 'topic': self.topic + plot, - 'title': getattr(self, plot + '_' + 'title', False), - 'xlabel': getattr(self, plot + '_' + 'xlabel', False), - 'ylabel': getattr(self, plot + '_' + 'ylabel', False), - 'xrange': getattr(self, plot + '_' + 'xrange', False), - 'yrange': getattr(self, plot + '_' + 'yrange', False), - 'zrange': getattr(self, plot + '_' + 'zrange', False), - }) - if zeromq is 1: - context = zmq.Context() - self.zmq_socket = context.socket(zmq.PUSH) - server = kwargs.get('server', 'zmq.pipe') - - if 'tcp://' in server: - address = server - else: - address = 'ipc:///tmp/%s' % server - - self.zmq_socket.connect(address) - time.sleep(1) + context = zmq.Context() + self.zmq_socket = context.socket(zmq.PUSH) + server = kwargs.get('server', 'zmq.pipe') + + if 'tcp://' in server: + address = server + else: + address = 'ipc:///tmp/%s' % server + + self.zmq_socket.connect(address) + time.sleep(1) def publish_data(self): self.dataOut.finished = False - if self.mqtt is 1: - yData = self.dataOut.heightList[:2].tolist() - if self.plottype == 'spectra': - data = getattr(self.dataOut, 'data_spc') - z = data/self.dataOut.normFactor - zdB = 10*numpy.log10(z) - xlen, ylen = zdB[0].shape - dx = int(xlen/MAXNUMX) + 1 - dy = int(ylen/MAXNUMY) + 1 - Z = [0 for i in self.dataOut.channelList] - for i in self.dataOut.channelList: - Z[i] = zdB[i][::dx, ::dy].tolist() - payload = { - 'timestamp': self.dataOut.utctime, - 'data': roundFloats(Z), - 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList], - 'interval': self.dataOut.getTimeInterval(), - 'type': self.plottype, - 'yData': yData - } - - elif self.plottype in ('rti', 'power'): - data = getattr(self.dataOut, 'data_spc') - z = data/self.dataOut.normFactor - avg = numpy.average(z, axis=1) - avgdB = 10*numpy.log10(avg) - xlen, ylen = z[0].shape - dy = numpy.floor(ylen/self.__MAXNUMY) + 1 - AVG = [0 for i in self.dataOut.channelList] - for i in self.dataOut.channelList: - AVG[i] = avgdB[i][::dy].tolist() - payload = { - 'timestamp': self.dataOut.utctime, - 'data': roundFloats(AVG), - 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList], - 'interval': self.dataOut.getTimeInterval(), - 'type': self.plottype, - 'yData': yData - } - elif self.plottype == 'noise': - noise = self.dataOut.getNoise()/self.dataOut.normFactor - noisedB = 10*numpy.log10(noise) - payload = { - 'timestamp': self.dataOut.utctime, - 'data': roundFloats(noisedB.reshape(-1, 1).tolist()), - 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList], - 'interval': self.dataOut.getTimeInterval(), - 'type': self.plottype, - 'yData': yData - } - elif self.plottype == 'snr': - data = getattr(self.dataOut, 'data_SNR') - avgdB = 10*numpy.log10(data) - - ylen = data[0].size - dy = numpy.floor(ylen/self.__MAXNUMY) + 1 - AVG = [0 for i in self.dataOut.channelList] - for i in self.dataOut.channelList: - AVG[i] = avgdB[i][::dy].tolist() - payload = { - 'timestamp': self.dataOut.utctime, - 'data': roundFloats(AVG), - 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList], - 'type': self.plottype, - 'yData': yData - } - else: - print("Tipo de grafico invalido") - payload = { - 'data': 'None', - 'timestamp': 'None', - 'type': None - } - - self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0) - - if self.zeromq is 1: - if self.verbose: - log.log( - 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime), - self.name - ) - self.zmq_socket.send_pyobj(self.dataOut) + + if self.verbose: + log.log( + 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime), + self.name + ) + self.zmq_socket.send_pyobj(self.dataOut) def run(self, dataOut, **kwargs): self.dataOut = dataOut @@ -487,15 +109,12 @@ class PublishData(Operation): time.sleep(self.delay) def close(self): - if self.zeromq is 1: - self.dataOut.finished = True - self.zmq_socket.send_pyobj(self.dataOut) - time.sleep(0.1) - self.zmq_socket.close() - if self.client: - self.client.loop_stop() - self.client.disconnect() - + + self.dataOut.finished = True + self.zmq_socket.send_pyobj(self.dataOut) + time.sleep(0.1) + self.zmq_socket.close() + class ReceiverData(ProcessingUnit): @@ -536,185 +155,6 @@ class ReceiverData(ProcessingUnit): 'Receiving') -class PlotterReceiver(ProcessingUnit, Process): - - throttle_value = 5 - __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle', - 'exp_code', 'web_server', 'buffering'] - - def __init__(self, **kwargs): - - ProcessingUnit.__init__(self, **kwargs) - Process.__init__(self) - self.mp = False - self.isConfig = False - self.isWebConfig = False - self.connections = 0 - server = kwargs.get('server', 'zmq.pipe') - web_server = kwargs.get('web_server', None) - if 'tcp://' in server: - address = server - else: - address = 'ipc:///tmp/%s' % server - self.address = address - self.web_address = web_server - self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')] - self.realtime = kwargs.get('realtime', False) - self.localtime = kwargs.get('localtime', True) - self.buffering = kwargs.get('buffering', True) - self.throttle_value = kwargs.get('throttle', 5) - self.exp_code = kwargs.get('exp_code', None) - self.sendData = self.initThrottle(self.throttle_value) - self.dates = [] - self.setup() - - def setup(self): - - self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering) - self.isConfig = True - - def event_monitor(self, monitor): - - events = {} - - for name in dir(zmq): - if name.startswith('EVENT_'): - value = getattr(zmq, name) - events[value] = name - - while monitor.poll(): - evt = recv_monitor_message(monitor) - if evt['event'] == 32: - self.connections += 1 - if evt['event'] == 512: - pass - - evt.update({'description': events[evt['event']]}) - - if evt['event'] == zmq.EVENT_MONITOR_STOPPED: - break - monitor.close() - print('event monitor thread done!') - - def initThrottle(self, throttle_value): - - @throttle(seconds=throttle_value) - def sendDataThrottled(fn_sender, data): - fn_sender(data) - - return sendDataThrottled - - def send(self, data): - log.log('Sending {}'.format(data), self.name) - self.sender.send_pyobj(data) - - def run(self): - - log.log( - 'Starting from {}'.format(self.address), - self.name - ) - - self.context = zmq.Context() - self.receiver = self.context.socket(zmq.PULL) - self.receiver.bind(self.address) - monitor = self.receiver.get_monitor_socket() - self.sender = self.context.socket(zmq.PUB) - if self.web_address: - log.success( - 'Sending to web: {}'.format(self.web_address), - self.name - ) - self.sender_web = self.context.socket(zmq.REQ) - self.sender_web.connect(self.web_address) - self.poll = zmq.Poller() - self.poll.register(self.sender_web, zmq.POLLIN) - time.sleep(1) - - if 'server' in self.kwargs: - self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server'])) - else: - self.sender.bind("ipc:///tmp/zmq.plots") - - time.sleep(2) - - t = Thread(target=self.event_monitor, args=(monitor,)) - t.start() - - while True: - dataOut = self.receiver.recv_pyobj() - if not dataOut.flagNoData: - if dataOut.type == 'Parameters': - tm = dataOut.utctimeInit - else: - tm = dataOut.utctime - if dataOut.useLocalTime: - if not self.localtime: - tm += time.timezone - dt = datetime.datetime.fromtimestamp(tm).date() - else: - if self.localtime: - tm -= time.timezone - dt = datetime.datetime.utcfromtimestamp(tm).date() - coerce = False - if dt not in self.dates: - if self.data: - self.data.ended = True - self.send(self.data) - coerce = True - self.data.setup() - self.dates.append(dt) - - self.data.update(dataOut, tm) - - if dataOut.finished is True: - self.connections -= 1 - if self.connections == 0 and dt in self.dates: - self.data.ended = True - self.send(self.data) - # self.data.setup() - time.sleep(1) - break - else: - if self.realtime: - self.send(self.data) - if self.web_address: - retries = 5 - while True: - self.sender_web.send(self.data.jsonify()) - socks = dict(self.poll.poll(5000)) - if socks.get(self.sender_web) == zmq.POLLIN: - reply = self.sender_web.recv_string() - if reply == 'ok': - log.log("Response from server ok", self.name) - break - else: - log.warning("Malformed reply from server: {}".format(reply), self.name) - - else: - log.warning("No response from server, retrying...", self.name) - self.sender_web.setsockopt(zmq.LINGER, 0) - self.sender_web.close() - self.poll.unregister(self.sender_web) - retries -= 1 - if retries == 0: - log.error("Server seems to be offline, abandoning", self.name) - self.sender_web = self.context.socket(zmq.REQ) - self.sender_web.connect(self.web_address) - self.poll.register(self.sender_web, zmq.POLLIN) - time.sleep(1) - break - self.sender_web = self.context.socket(zmq.REQ) - self.sender_web.connect(self.web_address) - self.poll.register(self.sender_web, zmq.POLLIN) - time.sleep(1) - else: - self.sendData(self.send, self.data, coerce=coerce) - coerce = False - - return - - class SendToFTP(Operation, Process): '''