From eed97f9f9bf51a236451fd8261fde5e295b2945a 2018-01-24 19:49:19 From: jespinoza Date: 2018-01-24 19:49:19 Subject: [PATCH] New Operation SendToFTP --- diff --git a/schainpy/model/utils/jroutils_publish.py b/schainpy/model/utils/jroutils_publish.py index 9ce28d9..f215738 100644 --- a/schainpy/model/utils/jroutils_publish.py +++ b/schainpy/model/utils/jroutils_publish.py @@ -2,12 +2,15 @@ @author: Juan C. Espinoza ''' +import os +import glob import time import json import numpy import paho.mqtt.client as mqtt import zmq import datetime +import ftplib from zmq.utils.monitor import recv_monitor_message from functools import wraps from threading import Thread @@ -17,8 +20,33 @@ from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit from schainpy.model.data.jrodata import JROData from schainpy.utils import log -MAXNUMX = 100 -MAXNUMY = 100 +MAXNUMX = 500 +MAXNUMY = 500 + +PLOT_CODES = { + 'rti': 0, #Range time intensity (RTI). + 'spc': 1, #Spectra (and Cross-spectra) information. + 'cspc': 2, #Cross-Correlation information. + 'coh': 3, #Coherence map. + 'base': 4, #Base lines graphic. + 'row': 5, #Row Spectra. + 'total' : 6, #Total Power. + 'drift' : 7, #Drifts graphics. + 'height' : 8, #Height profile. + 'phase' : 9, #Signal Phase. + 'power' : 16, + 'noise' : 17, + 'beacon' : 18, + #USED IN jroplot_parameters.py + 'wind' : 22, + 'skymap' : 23, + # 'MPHASE_CODE' : 24, + 'moments' : 25, + 'param' : 26, + 'spc_fit' : 27, + 'ew_drifts' : 28, + 'reflectivity': 30 +} class PrettyFloat(float): def __repr__(self): @@ -82,10 +110,11 @@ class Data(object): Object to hold data to be plotted ''' - def __init__(self, plottypes, throttle_value, exp_code): + def __init__(self, plottypes, throttle_value, exp_code, buffering=True): self.plottypes = plottypes self.throttle = throttle_value self.exp_code = exp_code + self.buffering = buffering self.ended = False self.localtime = False self.__times = [] @@ -102,7 +131,7 @@ class Data(object): if key not in self.data: raise KeyError(log.error('Missing key: {}'.format(key))) - if 'spc' in key: + if 'spc' in key or not self.buffering: ret = self.data[key] else: ret = numpy.array([self.data[key][x] for x in self.times]) @@ -118,6 +147,7 @@ class Data(object): Configure object ''' + self.type = '' self.ended = False self.data = {} self.__times = [] @@ -134,7 +164,7 @@ class Data(object): ''' if len(self.data[key]): - if 'spc' in key: + if 'spc' in key or not self.buffering: return self.data[key].shape return self.data[key][self.__times[0]].shape return (0,) @@ -147,6 +177,7 @@ class Data(object): if tm in self.__times: return + self.type = dataOut.type self.parameters = getattr(dataOut, 'parameters', []) if hasattr(dataOut, 'pairsList'): self.pairs = dataOut.pairsList @@ -166,27 +197,32 @@ class Data(object): if plot == 'cspc': self.data[plot] = dataOut.data_cspc if plot == 'noise': - self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor) + buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor) if plot == 'rti': - self.data[plot][tm] = dataOut.getPower() + buffer = dataOut.getPower() if plot == 'snr_db': self.data['snr'][tm] = dataOut.data_SNR if plot == 'snr': - self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR) + buffer = 10*numpy.log10(dataOut.data_SNR) if plot == 'dop': - self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP) + buffer = 10*numpy.log10(dataOut.data_DOP) if plot == 'mean': - self.data[plot][tm] = dataOut.data_MEAN + buffer = dataOut.data_MEAN if plot == 'std': - self.data[plot][tm] = dataOut.data_STD + buffer = dataOut.data_STD if plot == 'coh': - self.data[plot][tm] = dataOut.getCoherence() + buffer = dataOut.getCoherence() if plot == 'phase': - self.data[plot][tm] = dataOut.getCoherence(phase=True) + buffer = dataOut.getCoherence(phase=True) if plot == 'output': - self.data[plot][tm] = dataOut.data_output + buffer = dataOut.data_output if plot == 'param': - self.data[plot][tm] = dataOut.data_param + buffer = dataOut.data_param + + if self.buffering: + self.data[plot][tm] = buffer + else: + self.data[plot] = buffer def normalize_heights(self): ''' @@ -220,7 +256,7 @@ class Data(object): tm = self.times[-1] for key in self.data: - if key in ('spc', 'cspc'): + if key in ('spc', 'cspc') or not self.buffering: dx = int(self.data[key].shape[1]/MAXNUMX) + 1 dy = int(self.data[key].shape[2]/MAXNUMY) + 1 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist()) @@ -235,6 +271,8 @@ class Data(object): ret['yrange'] = roundFloats(self.heights.tolist()) if key in ('spc', 'cspc'): ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist()) + else: + ret['xrange'] = [] if hasattr(self, 'pairs'): ret['pairs'] = self.pairs return json.dumps(ret) @@ -489,7 +527,7 @@ class PlotterReceiver(ProcessingUnit, Process): throttle_value = 5 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle', - 'exp_code', 'web_server'] + 'exp_code', 'web_server', 'buffering'] def __init__(self, **kwargs): @@ -510,6 +548,7 @@ class PlotterReceiver(ProcessingUnit, Process): self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')] self.realtime = kwargs.get('realtime', False) self.localtime = kwargs.get('localtime', True) + self.buffering = kwargs.get('buffering', True) self.throttle_value = kwargs.get('throttle', 5) self.exp_code = kwargs.get('exp_code', None) self.sendData = self.initThrottle(self.throttle_value) @@ -518,8 +557,8 @@ class PlotterReceiver(ProcessingUnit, Process): def setup(self): - self.data = Data(self.plottypes, self.throttle_value, self.exp_code) - self.isConfig = True + self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering) + self.isConfig = True def event_monitor(self, monitor): @@ -553,12 +592,12 @@ class PlotterReceiver(ProcessingUnit, Process): return sendDataThrottled def send(self, data): - log.success('Sending {}'.format(data), self.name) + log.log('Sending {}'.format(data), self.name) self.sender.send_pyobj(data) def run(self): - log.success( + log.log( 'Starting from {}'.format(self.address), self.name ) @@ -573,7 +612,7 @@ class PlotterReceiver(ProcessingUnit, Process): 'Sending to web: {}'.format(self.web_address), self.name ) - self.sender_web = self.context.socket(zmq.PUB) + self.sender_web = self.context.socket(zmq.PUSH) self.sender_web.connect(self.web_address) time.sleep(1) @@ -623,9 +662,168 @@ class PlotterReceiver(ProcessingUnit, Process): if self.realtime: self.send(self.data) if self.web_address: - self.sender_web.send(self.data.jsonify()) + payload = self.data.jsonify() + log.log('Sending to web... type:{}, size:{}'.format(dataOut.type, len(payload)), self.name) + self.sender_web.send(payload) else: self.sendData(self.send, self.data, coerce=coerce) coerce = False return + + +class SendToFTP(Operation, Process): + + ''' + Operation to send data over FTP. + ''' + + __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout'] + + def __init__(self, **kwargs): + ''' + patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...] + ''' + Operation.__init__(self, **kwargs) + Process.__init__(self) + self.server = kwargs.get('server') + self.username = kwargs.get('username') + self.password = kwargs.get('password') + self.patterns = kwargs.get('patterns') + self.timeout = kwargs.get('timeout', 10) + self.times = [time.time() for p in self.patterns] + self.latest = ['' for p in self.patterns] + self.mp = False + self.ftp = None + + def setup(self): + + log.log('Connecting to ftp://{}'.format(self.server), self.name) + try: + self.ftp = ftplib.FTP(self.server, timeout=self.timeout) + except ftplib.all_errors: + log.error('Server connection fail: {}'.format(self.server), self.name) + if self.ftp is not None: + self.ftp.close() + self.ftp = None + self.isConfig = False + return + + try: + self.ftp.login(self.username, self.password) + except ftplib.all_errors: + log.error('The given username y/o password are incorrect', self.name) + if self.ftp is not None: + self.ftp.close() + self.ftp = None + self.isConfig = False + return + + log.success('Connection success', self.name) + self.isConfig = True + return + + def check(self): + + try: + self.ftp.voidcmd("NOOP") + except: + log.warning('Connection lost... trying to reconnect', self.name) + if self.ftp is not None: + self.ftp.close() + self.ftp = None + self.setup() + + def find_files(self, path, ext): + + files = glob.glob1(path, '*{}'.format(ext)) + files.sort() + if files: + return files[-1] + return None + + def getftpname(self, filename, exp_code, sub_exp_code): + + thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d') + YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year + DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday + exp_code = '%3.3d'%exp_code + sub_exp_code = '%2.2d'%sub_exp_code + plot_code = '%2.2d'% PLOT_CODES[filename.split('_')[0].split('-')[0]] + name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png' + return name + + def upload(self, src, dst): + + log.log('Uploading {} '.format(src), self.name, nl=False) + + fp = open(src, 'rb') + command = 'STOR {}'.format(dst) + + try: + self.ftp.storbinary(command, fp, blocksize=1024) + except ftplib.all_errors, e: + log.error('{}'.format(e), self.name) + if self.ftp is not None: + self.ftp.close() + self.ftp = None + return + + try: + self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst)) + except ftplib.all_errors, e: + log.error('{}'.format(e), self.name) + if self.ftp is not None: + self.ftp.close() + self.ftp = None + + fp.close() + + log.success('OK', tag='') + + def send_files(self): + + for x, pattern in enumerate(self.patterns): + local, remote, ext, delay, exp_code, sub_exp_code = pattern + if time.time()-self.times[x] >= delay: + srcname = self.find_files(local, ext) + + if srcname is None or srcname == self.latest[x]: + continue + + if 'png' in ext: + dstname = self.getftpname(srcname, exp_code, sub_exp_code) + else: + dstname = srcname + + src = os.path.join(local, srcname) + + if os.path.getmtime(src) < time.time() - 30*60: + continue + + dst = os.path.join(remote, dstname) + + if self.ftp is None: + continue + + self.upload(src, dst) + + self.times[x] = time.time() + self.latest[x] = srcname + + def run(self): + + while True: + if not self.isConfig: + self.setup() + if self.ftp is not None: + self.check() + self.send_files() + time.sleep(2) + + def close(): + + if self.ftp is not None: + if self.ftp is not None: + self.ftp.close() + self.terminate()