diff --git a/schainpy/model/graphics/jroplot_base.py b/schainpy/model/graphics/jroplot_base.py index 838615c..e5c31d6 100644 --- a/schainpy/model/graphics/jroplot_base.py +++ b/schainpy/model/graphics/jroplot_base.py @@ -3,9 +3,10 @@ import os import sys import zmq import time +import numpy import datetime from functools import wraps -import numpy +from threading import Thread import matplotlib if 'BACKEND' in os.environ: @@ -13,7 +14,7 @@ if 'BACKEND' in os.environ: elif 'linux' in sys.platform: matplotlib.use("TkAgg") elif 'darwin' in sys.platform: - matplotlib.use('TkAgg') + matplotlib.use('WxAgg') else: from schainpy.utils import log log.warning('Using default Backend="Agg"', 'INFO') @@ -40,7 +41,6 @@ CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis', EARTH_RADIUS = 6.3710e3 - def ll2xy(lat1, lon1, lat2, lon2): p = 0.017453292519943295 @@ -138,140 +138,6 @@ def apply_throttle(value): return fnThrottled -@MPDecorator -class Plotter(ProcessingUnit): - ''' - Proccessing unit to handle plot operations - ''' - - def __init__(self): - - ProcessingUnit.__init__(self) - - def setup(self, **kwargs): - - self.connections = 0 - self.web_address = kwargs.get('web_server', False) - self.realtime = kwargs.get('realtime', False) - self.localtime = kwargs.get('localtime', True) - self.buffering = kwargs.get('buffering', True) - self.throttle = kwargs.get('throttle', 2) - self.exp_code = kwargs.get('exp_code', None) - self.set_ready = apply_throttle(self.throttle) - self.dates = [] - self.data = PlotterData( - self.plots, self.throttle, self.exp_code, self.buffering) - self.isConfig = True - - def ready(self): - ''' - Set dataOut ready - ''' - - self.data.ready = True - self.dataOut.data_plt = self.data - - def run(self, realtime=True, localtime=True, buffering=True, - throttle=2, exp_code=None, web_server=None): - - if not self.isConfig: - self.setup(realtime=realtime, localtime=localtime, - buffering=buffering, throttle=throttle, exp_code=exp_code, - web_server=web_server) - - if self.web_address: - log.success( - 'Sending to web: {}'.format(self.web_address), - self.name - ) - self.context = zmq.Context() - self.sender_web = self.context.socket(zmq.REQ) - self.sender_web.connect(self.web_address) - self.poll = zmq.Poller() - self.poll.register(self.sender_web, zmq.POLLIN) - time.sleep(1) - - # t = Thread(target=self.event_monitor, args=(monitor,)) - # t.start() - - self.dataOut = self.dataIn - self.data.ready = False - - if self.dataOut.flagNoData: - coerce = True - else: - coerce = False - - if self.dataOut.type == 'Parameters': - tm = self.dataOut.utctimeInit - else: - tm = self.dataOut.utctime - if self.dataOut.useLocalTime: - if not self.localtime: - tm += time.timezone - dt = datetime.datetime.fromtimestamp(tm).date() - else: - if self.localtime: - tm -= time.timezone - dt = datetime.datetime.utcfromtimestamp(tm).date() - if dt not in self.dates: - if self.data: - self.ready() - self.data.setup() - self.dates.append(dt) - - self.data.update(self.dataOut, tm) - - if False: # TODO check when publishers ends - self.connections -= 1 - if self.connections == 0 and dt in self.dates: - self.data.ended = True - self.ready() - time.sleep(1) - else: - if self.realtime: - self.ready() - if self.web_address: - retries = 5 - while True: - self.sender_web.send(self.data.jsonify()) - socks = dict(self.poll.poll(5000)) - if socks.get(self.sender_web) == zmq.POLLIN: - reply = self.sender_web.recv_string() - if reply == 'ok': - log.log("Response from server ok", self.name) - break - else: - log.warning( - "Malformed reply from server: {}".format(reply), self.name) - - else: - log.warning( - "No response from server, retrying...", self.name) - self.sender_web.setsockopt(zmq.LINGER, 0) - self.sender_web.close() - self.poll.unregister(self.sender_web) - retries -= 1 - if retries == 0: - log.error( - "Server seems to be offline, abandoning", self.name) - self.sender_web = self.context.socket(zmq.REQ) - self.sender_web.connect(self.web_address) - self.poll.register(self.sender_web, zmq.POLLIN) - time.sleep(1) - break - self.sender_web = self.context.socket(zmq.REQ) - self.sender_web.connect(self.web_address) - self.poll.register(self.sender_web, zmq.POLLIN) - time.sleep(1) - else: - self.set_ready(self.ready, coerce=coerce) - - return - - def close(self): - pass - @MPDecorator class Plot(Operation): @@ -280,7 +146,7 @@ class Plot(Operation): ''' CODE = 'Figure' - colormap = 'jro' + colormap = 'jet' bgcolor = 'white' __missing = 1E30 @@ -294,6 +160,8 @@ class Plot(Operation): Operation.__init__(self) self.isConfig = False self.isPlotConfig = False + self.save_counter = 1 + self.sender_counter = 1 def __fmtTime(self, x, pos): ''' @@ -312,6 +180,7 @@ class Plot(Operation): self.localtime = kwargs.pop('localtime', True) self.show = kwargs.get('show', True) self.save = kwargs.get('save', False) + self.save_period = kwargs.get('save_period', 2) self.ftp = kwargs.get('ftp', False) self.colormap = kwargs.get('colormap', self.colormap) self.colormap_coh = kwargs.get('colormap_coh', 'jet') @@ -353,9 +222,19 @@ class Plot(Operation): self.buffering = kwargs.get('buffering', True) self.throttle = kwargs.get('throttle', 2) self.exp_code = kwargs.get('exp_code', None) + self.plot_server = kwargs.get('plot_server', False) + self.sender_period = kwargs.get('sender_period', 2) self.__throttle_plot = apply_throttle(self.throttle) self.data = PlotterData( self.CODE, self.throttle, self.exp_code, self.buffering) + + if self.plot_server: + if not self.plot_server.startswith('tcp://'): + self.plot_server = 'tcp://{}'.format(self.plot_server) + log.success( + 'Sending to server: {}'.format(self.plot_server), + self.name + ) def __setup_plot(self): ''' @@ -538,20 +417,6 @@ class Plot(Operation): ax.figure.add_axes(nax) return nax - def setup(self): - ''' - This method should be implemented in the child class, the following - attributes should be set: - - self.nrows: number of rows - self.ncols: number of cols - self.nplots: number of plots (channels or pairs) - self.ylabel: label for Y axes - self.titles: list of axes title - - ''' - raise NotImplementedError - def fill_gaps(self, x_buffer, y_buffer, z_buffer): ''' Create a masked array for missing data @@ -722,14 +587,14 @@ class Plot(Operation): Main function to plot, format and save figures ''' - #try: - self.plot() - self.format() - #except Exception as e: - # log.warning('{} Plot could not be updated... check data'.format( - # self.CODE), self.name) - # log.error(str(e), '') - # return + try: + self.plot() + self.format() + except Exception as e: + log.warning('{} Plot could not be updated... check data'.format( + self.CODE), self.name) + log.error(str(e), '') + return for n, fig in enumerate(self.figures): if self.nrows == 0 or self.nplots == 0: @@ -744,30 +609,118 @@ class Plot(Operation): fig.canvas.draw() if self.save: + self.save_figure(n) + + if self.plot_server: + self.send_to_server() + # t = Thread(target=self.send_to_server) + # t.start() - if self.save_labels: - labels = self.save_labels - else: - labels = list(range(self.nrows)) + def save_figure(self, n): + ''' + ''' - if self.oneFigure: - label = '' - else: - label = '-{}'.format(labels[n]) - figname = os.path.join( - self.save, + if self.save_counter < self.save_period: + self.save_counter += 1 + return + + self.save_counter = 1 + + fig = self.figures[n] + + if self.save_labels: + labels = self.save_labels + else: + labels = list(range(self.nrows)) + + if self.oneFigure: + label = '' + else: + label = '-{}'.format(labels[n]) + figname = os.path.join( + self.save, + self.CODE, + '{}{}_{}.png'.format( + self.CODE, + label, + self.getDateTime(self.data.max_time).strftime( + '%Y%m%d_%H%M%S' + ), + ) + ) + log.log('Saving figure: {}'.format(figname), self.name) + if not os.path.isdir(os.path.dirname(figname)): + os.makedirs(os.path.dirname(figname)) + fig.savefig(figname) + + if self.realtime: + figname = os.path.join( + self.save, + '{}{}_{}.png'.format( self.CODE, - '{}{}_{}.png'.format( - self.CODE, - label, - self.getDateTime(self.data.max_time).strftime( - '%Y%m%d_%H%M%S'), + label, + self.getDateTime(self.data.min_time).strftime( + '%Y%m%d' + ), ) ) - log.log('Saving figure: {}'.format(figname), self.name) - if not os.path.isdir(os.path.dirname(figname)): - os.makedirs(os.path.dirname(figname)) - fig.savefig(figname) + fig.savefig(figname) + + def send_to_server(self): + ''' + ''' + + if self.sender_counter < self.sender_period: + self.sender_counter += 1 + + self.sender_counter = 1 + + retries = 2 + while True: + self.socket.send_string(self.data.jsonify()) + socks = dict(self.poll.poll(5000)) + if socks.get(self.socket) == zmq.POLLIN: + reply = self.socket.recv_string() + if reply == 'ok': + log.log("Response from server ok", self.name) + break + else: + log.warning( + "Malformed reply from server: {}".format(reply), self.name) + + else: + log.warning( + "No response from server, retrying...", self.name) + self.socket.setsockopt(zmq.LINGER, 0) + self.socket.close() + self.poll.unregister(self.socket) + retries -= 1 + if retries == 0: + log.error( + "Server seems to be offline, abandoning", self.name) + self.socket = self.context.socket(zmq.REQ) + self.socket.connect(self.plot_server) + self.poll.register(self.socket, zmq.POLLIN) + time.sleep(1) + break + self.socket = self.context.socket(zmq.REQ) + self.socket.connect(self.plot_server) + self.poll.register(self.socket, zmq.POLLIN) + time.sleep(0.5) + + def setup(self): + ''' + This method should be implemented in the child class, the following + attributes should be set: + + self.nrows: number of rows + self.ncols: number of cols + self.nplots: number of plots (channels or pairs) + self.ylabel: label for Y axes + self.titles: list of axes title + + ''' + raise NotImplementedError def plot(self): ''' @@ -786,6 +739,12 @@ class Plot(Operation): self.__setup(**kwargs) self.data.setup() self.isConfig = True + if self.plot_server: + self.context = zmq.Context() + self.socket = self.context.socket(zmq.REQ) + self.socket.connect(self.plot_server) + self.poll = zmq.Poller() + self.poll.register(self.socket, zmq.POLLIN) if dataOut.type == 'Parameters': tm = dataOut.utctimeInit diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index 3ab5da1..eed9f4b 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -12,11 +12,11 @@ Based on: $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $ ''' +import os import inspect import zmq import time import pickle -import os from multiprocessing import Process from zmq.utils.monitor import recv_monitor_message @@ -250,7 +250,7 @@ def MPDecorator(BaseClass): while True: BaseClass.run(self, **self.kwargs) - + for op, optype, opId, kwargs in self.operations: if optype == 'self' and not self.dataOut.flagNoData: op(**kwargs)