diff --git a/schainpy/controller.py b/schainpy/controller.py index 3c2f309..21bc640 100644 --- a/schainpy/controller.py +++ b/schainpy/controller.py @@ -8,6 +8,7 @@ import ast import datetime import traceback import math +import time from multiprocessing import Process, Queue, cpu_count import schainpy @@ -66,7 +67,8 @@ def multiSchain(child, nProcess=cpu_count(), startDate=None, endDate=None, by_da for process in processes: process.join() process.terminate() - #sys.exit() + + time.sleep(3) class ParameterConf(): diff --git a/schainpy/model/data/jrodata.py b/schainpy/model/data/jrodata.py index b862970..c58715e 100644 --- a/schainpy/model/data/jrodata.py +++ b/schainpy/model/data/jrodata.py @@ -65,7 +65,7 @@ def hildebrand_sekhon(data, navg): anoise : noise's level """ - sortdata = numpy.sort(data,axis=None) + sortdata = numpy.sort(data, axis=None) # lenOfData = len(sortdata) # nums_min = lenOfData*0.2 # diff --git a/schainpy/model/graphics/jroplot_data.py b/schainpy/model/graphics/jroplot_data.py index 72074c7..b01484e 100644 --- a/schainpy/model/graphics/jroplot_data.py +++ b/schainpy/model/graphics/jroplot_data.py @@ -24,9 +24,8 @@ class PlotData(Operation, Process): CODE = 'Figure' colormap = 'jro' - CONFLATE = True + CONFLATE = False __MAXNUMX = 80 - __MAXNUMY = 80 __missing = 1E30 def __init__(self, **kwargs): @@ -38,6 +37,7 @@ class PlotData(Operation, Process): self.dataOut = None self.isConfig = False self.figure = None + self.figure2 = None #JM modificatiom self.axes = [] self.localtime = kwargs.pop('localtime', True) self.show = kwargs.get('show', True) @@ -55,7 +55,9 @@ class PlotData(Operation, Process): self.xrange = kwargs.get('xrange', 24) self.ymin = kwargs.get('ymin', None) self.ymax = kwargs.get('ymax', None) + self.__MAXNUMY = kwargs.get('decimation', 80) self.throttle_value = 5 + self.times = [] def fill_gaps(self, x_buffer, y_buffer, z_buffer): @@ -94,19 +96,27 @@ class PlotData(Operation, Process): if self.show: print 'showing' self.figure.show() + self.figure2.show() self.plot() plt.tight_layout() self.figure.canvas.manager.set_window_title('{} {} - Date:{}'.format(self.title, self.CODE.upper(), datetime.datetime.fromtimestamp(self.max_time).strftime('%y/%m/%d %H:%M:%S'))) + self.figure2.canvas.manager.set_window_title('{} {} - Date:{}'.format(self.title, self.CODE.upper(), + datetime.datetime.fromtimestamp(self.max_time).strftime('%y/%m/%d %H:%M:%S'))) if self.save: figname = os.path.join(self.save, '{}_{}.png'.format(self.CODE, datetime.datetime.fromtimestamp(self.saveTime).strftime('%y%m%d_%H%M%S'))) print 'Saving figure: {}'.format(figname) self.figure.savefig(figname) + figname2 = os.path.join(self.save, '{}_{}2.png'.format(self.CODE, + datetime.datetime.fromtimestamp(self.saveTime).strftime('%y%m%d_%H%M%S'))) + print 'Saving figure: {}'.format(figname2) + self.figure2.savefig(figname2) self.figure.canvas.draw() + self.figure2.canvas.draw() def plot(self): @@ -121,11 +131,16 @@ class PlotData(Operation, Process): receiver.setsockopt(zmq.SUBSCRIBE, '') receiver.setsockopt(zmq.CONFLATE, self.CONFLATE) receiver.connect("ipc:///tmp/zmq.plots") - + seconds_passed = 0 while True: try: - self.data = receiver.recv_pyobj(flags=zmq.NOBLOCK) + self.data = receiver.recv_pyobj(flags=zmq.NOBLOCK)#flags=zmq.NOBLOCK + self.started = self.data['STARTED'] self.dataOut = self.data['dataOut'] + + if (len(self.times) < len(self.data['times']) and not self.started and self.data['ENDED']): + continue + self.times = self.data['times'] self.times.sort() self.throttle_value = self.data['throttle'] @@ -133,16 +148,25 @@ class PlotData(Operation, Process): self.max_time = self.times[-1] if self.isConfig is False: + print 'setting up' self.setup() self.isConfig = True - self.__plot() + self.__plot() if self.data['ENDED'] is True: + print '********GRAPHIC ENDED********' + self.ended = True self.isConfig = False + self.__plot() + elif seconds_passed >= self.data['throttle']: + print 'passed', seconds_passed + self.__plot() + seconds_passed = 0 except zmq.Again as e: print 'Waiting for data...' - plt.pause(self.throttle_value) + plt.pause(2) + seconds_passed += 2 def close(self): if self.dataOut: @@ -453,10 +477,29 @@ class PlotRTIData(PlotData): self.figure.clf() self.axes = [] - for n in range(self.nrows): - ax = self.figure.add_subplot(self.nrows, self.ncols, n+1) - ax.firsttime = True - self.axes.append(ax) + if self.figure2 is None: + self.figure2 = plt.figure(figsize=(self.width, self.height), + edgecolor='k', + facecolor='w') + else: + self.figure2.clf() + self.axes = [] + + ax = self.figure.add_subplot(1,1,1) + #ax = self.figure( n+1) + ax.firsttime = True + self.axes.append(ax) + + ax = self.figure2.add_subplot(1,1,1) + #ax = self.figure( n+1) + ax.firsttime = True + self.axes.append(ax) + # for n in range(self.nrows): + # ax = self.figure.add_subplot(self.nrows, self.ncols, n+1) + # #ax = self.figure( n+1) + # ax.firsttime = True + # self.axes.append(ax) + def plot(self): @@ -469,15 +512,14 @@ class PlotRTIData(PlotData): self.z = np.array(self.z) for n, ax in enumerate(self.axes): - x, y, z = self.fill_gaps(*self.decimate()) xmin = self.min_time xmax = xmin+self.xrange*60*60 + self.zmin = self.zmin if self.zmin else np.min(self.z) + self.zmax = self.zmax if self.zmax else np.max(self.z) if ax.firsttime: self.ymin = self.ymin if self.ymin else np.nanmin(self.y) self.ymax = self.ymax if self.ymax else np.nanmax(self.y) - self.zmin = self.zmin if self.zmin else np.nanmin(self.z) - self.zmax = self.zmax if self.zmax else np.nanmax(self.z) plot = ax.pcolormesh(x, y, z[n].T, vmin=self.zmin, vmax=self.zmax, @@ -485,7 +527,8 @@ class PlotRTIData(PlotData): ) divider = make_axes_locatable(ax) cax = divider.new_horizontal(size='2%', pad=0.05) - self.figure.add_axes(cax) + #self.figure.add_axes(cax) + #self.figure2.add_axes(cax) plt.colorbar(plot, cax) ax.set_ylim(self.ymin, self.ymax) diff --git a/schainpy/model/io/jroIO_base.py b/schainpy/model/io/jroIO_base.py index c753eb3..a9b38a1 100644 --- a/schainpy/model/io/jroIO_base.py +++ b/schainpy/model/io/jroIO_base.py @@ -888,7 +888,8 @@ class JRODataReader(JRODataIO): print '[Reading] No more files to read' return 0 - print '[Reading] Setting the file: %s' % self.filename + if self.verbose: + print '[Reading] Setting the file: %s' % self.filename self.__readFirstHeader() self.nReadBlocks = 0 @@ -1053,9 +1054,10 @@ class JRODataReader(JRODataIO): break - print "[Reading] Block No. %d/%d -> %s" %(self.nReadBlocks, - self.processingHeaderObj.dataBlocksPerFile, - self.dataOut.datatime.ctime()) + if self.verbose: + print "[Reading] Block No. %d/%d -> %s" %(self.nReadBlocks, + self.processingHeaderObj.dataBlocksPerFile, + self.dataOut.datatime.ctime()) return 1 def __readFirstHeader(self): @@ -1266,7 +1268,9 @@ class JRODataReader(JRODataIO): blocktime=None, queue=None, skip=None, - cursor=None): + cursor=None, + warnings=True, + verbose=True): if path == None: raise ValueError, "[Reading] The path is not valid" @@ -1338,6 +1342,9 @@ class JRODataReader(JRODataIO): self.selBlocksize = blocksize self.selBlocktime = blocktime + # Verbose----------- + self.verbose = verbose + self.warnings = warnings if not(self.setNextFile()): if (startDate!=None) and (endDate!=None): diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index bc43456..e52ceef 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -24,7 +24,7 @@ def checkKwargs(method, kwargs): if fuzz is None: continue if fuzz[1] < 100: - raise Exception('\x1b[2;30;43mDid you mean {} instead of {} in {}? \x1b[0m'. + raise Exception('\x1b[0;32;40mDid you mean {} instead of {} in {}? \x1b[0m'. format(fuzz[0], kwarg, method.__self__.__class__.__name__)) class ProcessingUnit(object): diff --git a/schainpy/model/utils/jroutils_publish.py b/schainpy/model/utils/jroutils_publish.py index b9a5d70..68a5907 100644 --- a/schainpy/model/utils/jroutils_publish.py +++ b/schainpy/model/utils/jroutils_publish.py @@ -7,7 +7,7 @@ import json import numpy import paho.mqtt.client as mqtt import zmq -import cPickle as pickle +from profilehooks import profile import datetime from zmq.utils.monitor import recv_monitor_message from functools import wraps @@ -29,7 +29,7 @@ def roundFloats(obj): elif isinstance(obj, float): return round(obj, 2) -def decimate(z): +def decimate(z, MAXNUMY): # dx = int(len(self.x)/self.__MAXNUMX) + 1 dy = int(len(z[0])/MAXNUMY) + 1 @@ -107,7 +107,7 @@ class PublishData(Operation): print "MQTT Conection error." self.client = False - def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs): + def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs): self.counter = 0 self.topic = kwargs.get('topic', 'schain') self.delay = kwargs.get('delay', 0) @@ -119,6 +119,8 @@ class PublishData(Operation): self.zeromq = zeromq self.mqtt = kwargs.get('plottype', 0) self.client = None + self.verbose = verbose + self.dataOut.firstdata = True setup = [] if mqtt is 1: self.client = mqtt.Client( @@ -150,6 +152,7 @@ class PublishData(Operation): self.zmq_socket.connect(address) time.sleep(1) + def publish_data(self): self.dataOut.finished = False if self.mqtt is 1: @@ -230,8 +233,11 @@ class PublishData(Operation): self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0) if self.zeromq is 1: - print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime) + if self.verbose: + print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime) self.zmq_socket.send_pyobj(self.dataOut) + self.dataOut.firstdata = False + def run(self, dataOut, **kwargs): self.dataOut = dataOut @@ -246,12 +252,11 @@ class PublishData(Operation): if self.zeromq is 1: self.dataOut.finished = True self.zmq_socket.send_pyobj(self.dataOut) - + self.zmq_socket.close() if self.client: self.client.loop_stop() self.client.disconnect() - class ReceiverData(ProcessingUnit, Process): throttle_value = 5 @@ -330,12 +335,13 @@ class ReceiverData(ProcessingUnit, Process): return sendDataThrottled + def send(self, data): # print '[sending] data=%s size=%s' % (data.keys(), len(data['times'])) self.sender.send_pyobj(data) - def update(self): + def update(self): t = self.dataOut.utctime if t in self.data['times']: @@ -397,7 +403,7 @@ class ReceiverData(ProcessingUnit, Process): self.sender_web.connect(self.plot_address) time.sleep(1) self.sender.bind("ipc:///tmp/zmq.plots") - + time.sleep(3) t = Thread(target=self.event_monitor, args=(monitor,)) t.start() @@ -408,6 +414,10 @@ class ReceiverData(ProcessingUnit, Process): self.update() + if self.dataOut.firstdata is True: + self.data['STARTED'] = True + + if self.dataOut.finished is True: self.send(self.data) self.connections -= 1 @@ -416,6 +426,7 @@ class ReceiverData(ProcessingUnit, Process): self.data['ENDED'] = True self.send(self.data) self.setup() + self.started = False else: if self.realtime: self.send(self.data) @@ -424,6 +435,7 @@ class ReceiverData(ProcessingUnit, Process): self.sendData(self.send, self.data) self.started = True + self.data['STARTED'] = False return def sendToWeb(self): diff --git a/schainpy/scripts/PPD.py b/schainpy/scripts/PPD.py index e7c1b5d..01cab7a 100644 --- a/schainpy/scripts/PPD.py +++ b/schainpy/scripts/PPD.py @@ -4,6 +4,7 @@ from schainpy.controller import Project, multiSchain desc = "HF_EXAMPLE" path='/home/ci-81/Documents/DATA/HFADATA/hfdata_2017/pdata/sp1_f0' +path = '/media/ci-81/Huancayo/DATA/hfradar_2016/pdata/sp1_f1' def fiber(cursor, skip, q, dt): controllerObj = Project() @@ -96,4 +97,4 @@ if __name__ == '__main__': parser = argparse.ArgumentParser(description='Set number of parallel processes') parser.add_argument('--nProcess', default=1, type=int) args = parser.parse_args() - multiSchain(fiber, nProcess=8, startDate='2017/02/10', endDate='2017/02/12') + multiSchain(fiber, nProcess=8, startDate='2016/04/23', endDate='2016/04/27') diff --git a/schainpy/scripts/julia_mp.py b/schainpy/scripts/julia_mp.py index 0743086..d812999 100644 --- a/schainpy/scripts/julia_mp.py +++ b/schainpy/scripts/julia_mp.py @@ -1,75 +1,97 @@ -#!/usr/bin/env python -''' -Created on Jul 7, 2014 +import argparse -@author: roj-idl71 -''' -import os, sys -from datetime import datetime, timedelta -import multiprocessing -from schainpy.controller import Project +from schainpy.controller import Project, multiSchain -def main(date): +desc = "HF_EXAMPLE" - controllerObj = Project() - - controllerObj.setup(id='191', name='test01', description='') - - readUnitConfObj = controllerObj.addReadUnit(datatype='Spectra', - path='/home/nanosat/data/zeus', - startDate=date, - endDate=date, - startTime='00:00:00', - endTime='23:59:59', - online=0, - walk=1, - expLabel='') - - procUnitConfObj1 = controllerObj.addProcUnit(datatype='Spectra', inputId=readUnitConfObj.getId()) - #opObj11 = procUnitConfObj1.addOperation(name='removeDC') - #opObj11.addParameter(name='mode', value='1', format='int') - - #opObj11 = procUnitConfObj1.addOperation(name='removeInterference') - - - opObj11 = procUnitConfObj1.addOperation(name='RTIPlot', optype='other') - opObj11.addParameter(name='id', value='10', format='int') - opObj11.addParameter(name='wintitle', value='150Km', format='str') - opObj11.addParameter(name='colormap', value='jro', format='str') - opObj11.addParameter(name='xaxis', value='time', format='str') - opObj11.addParameter(name='xmin', value='0', format='int') - opObj11.addParameter(name='xmax', value='23', format='int') - #opObj11.addParameter(name='ymin', value='100', format='int') - #opObj11.addParameter(name='ymax', value='150', format='int') - opObj11.addParameter(name='zmin', value='10', format='int') - opObj11.addParameter(name='zmax', value='35', format='int') +def fiber(cursor, skip, q, dt): + controllerObj = Project() - - - opObject12 = procUnitConfObj1.addOperation(name='PlotRTIData', optype='other') - opObject12.addParameter(name='id', value='12', format='int') - opObject12.addParameter(name='wintitle', value='150Km', format='str') - opObject12.addParameter(name='colormap', value='jro', format='str') - opObject12.addParameter(name='xaxis', value='time', format='str') - opObject12.addParameter(name='xmin', value='0', format='int') - opObject12.addParameter(name='xmax', value='23', format='int') - #opObject12.addParameter(name='ymin', value='100', format='int') - #opObject12.addParameter(name='ymax', value='150', format='int') - opObject12.addParameter(name='zmin', value='10', format='int') - opObject12.addParameter(name='zmax', value='35', format='int') - #opObject12.addParameter(name='pause', value='1', format='bool') - opObject12.addParameter(name='show', value='0', format='bool') - opObject12.addParameter(name='save', value='/tmp', format='str') - + controllerObj.setup(id='191', name='test01', description=desc) + + readUnitConfObj = controllerObj.addReadUnit(datatype='SpectraReader', + path='/home/nanosat/data/julia', + startDate=dt, + endDate=dt, + startTime="00:00:00", + endTime="23:59:59", + online=0, + #set=1426485881, + delay=10, + walk=1, + queue=q, + cursor=cursor, + skip=skip, + #timezone=-5*3600 + ) + + # #opObj11 = readUnitConfObj.addOperation(name='printNumberOfBlock') + # + procUnitConfObj2 = controllerObj.addProcUnit(datatype='Spectra', inputId=readUnitConfObj.getId()) + # procUnitConfObj2.addParameter(name='nipp', value='5', format='int') + + # procUnitConfObj3 = controllerObj.addProcUnit(datatype='ParametersProc', inputId=readUnitConfObj.getId()) + # opObj11 = procUnitConfObj3.addOperation(name='SpectralMoments', optype='other') + + # + # opObj11 = procUnitConfObj1.addOperation(name='SpectraPlot', optype='other') + # opObj11.addParameter(name='id', value='1000', format='int') + # opObj11.addParameter(name='wintitle', value='HF_Jicamarca_Spc', format='str') + # opObj11.addParameter(name='channelList', value='0', format='intlist') + # opObj11.addParameter(name='zmin', value='-120', format='float') + # opObj11.addParameter(name='zmax', value='-70', format='float') + # opObj11.addParameter(name='save', value='1', format='int') + # opObj11.addParameter(name='figpath', value=figpath, format='str') + + # opObj11 = procUnitConfObj3.addOperation(name='Parameters1Plot', optype='other') + # opObj11.addParameter(name='channelList', value='0', format='intList') + # + # opObj11.addParameter(name='id', value='2000', format='int') + # # opObj11.addParameter(name='colormap', value='0', format='bool') + # opObj11.addParameter(name='onlySNR', value='1', format='bool') + # opObj11.addParameter(name='DOP', value='0', format='bool') + # opObj11.addParameter(name='showSNR', value='1', format='bool') + # opObj11.addParameter(name='SNRthresh', value='0', format='int') + # opObj11.addParameter(name='SNRmin', value='-10', format='int') + # opObj11.addParameter(name='SNRmax', value='30', format='int') + + # opObj11.addParameter(name='showSNR', value='1', format='int') + # # opObj11.addParameter(name='channelList', value='0', format='intlist') + # # opObj11.addParameter(name='xmin', value='0', format='float') + # opObj11.addParameter(name='xmin', value='0', format='float') + # opObj11.addParameter(name='xmax', value='24', format='float') + + # opObj11.addParameter(name='zmin', value='-110', format='float') + # opObj11.addParameter(name='zmax', value='-70', format='float') + # opObj11.addParameter(name='save', value='0', format='int') + # # opObj11.addParameter(name='figpath', value='/tmp/', format='str') + # + opObj12 = procUnitConfObj2.addOperation(name='PublishData', optype='other') + opObj12.addParameter(name='zeromq', value=1, format='int') + # opObj12.addParameter(name='server', value='tcp://10.10.10.82:7000', format='str') + + + # opObj13 = procUnitConfObj3.addOperation(name='PublishData', optype='other') + # opObj13.addParameter(name='zeromq', value=1, format='int') + # opObj13.addParameter(name='server', value="juanca", format='str') + + # opObj12.addParameter(name='delay', value=1, format='int') + + + # print "Escribiendo el archivo XML" + # controllerObj.writeXml(filename) + # print "Leyendo el archivo XML" + # controllerObj.readXml(filename) + + + # timeit.timeit('controllerObj.run()', number=2) controllerObj.start() -if __name__=='__main__': - - dt = datetime(2017, 1, 12) - - dates = [(dt+timedelta(x)).strftime('%Y/%m/%d') for x in range(20)] - p = multiprocessing.Pool(4) - p.map(main, dates) +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Set number of parallel processes') + parser.add_argument('--nProcess', default=1, type=int) + args = parser.parse_args() + multiSchain(fiber, nProcess=args.nProcess, startDate='2016/08/19', endDate='2016/08/19') diff --git a/schainpy/scripts/receiver.py b/schainpy/scripts/receiver.py index 9d1c3c1..c0a4bf6 100644 --- a/schainpy/scripts/receiver.py +++ b/schainpy/scripts/receiver.py @@ -17,11 +17,14 @@ if __name__ == '__main__': proc1 = controllerObj.addProcUnit(name='ReceiverData') # proc1.addParameter(name='realtime', value='0', format='bool') #proc1.addParameter(name='plottypes', value='rti,coh,phase,snr,dop', format='str') - proc1.addParameter(name='plottypes', value='rti,coh,phase,snr', format='str') + #proc1.addParameter(name='plottypes', value='rti,coh,phase,snr', format='str') + proc1.addParameter(name='plottypes', value='dop', format='str') + proc1.addParameter(name='throttle', value='10', format='int') #proc1.addParameter(name='server', value='tcp://10.10.10.82:7000', format='str') ## TODO Agregar direccion de server de publicacion a graficos como variable + """ op1 = proc1.addOperation(name='PlotRTIData', optype='other') op1.addParameter(name='wintitle', value='HF System', format='str') op1.addParameter(name='save', value='/home/ci-81/Pictures', format='str') @@ -49,6 +52,7 @@ if __name__ == '__main__': # proc2.addParameter(name='server', value='juanca', format='str') # proc2.addParameter(name='plottypes', value='snr,dop', format='str') # + op3 = proc1.addOperation(name='PlotSNRData', optype='other') op3.addParameter(name='wintitle', value='HF System SNR0', format='str') op3.addParameter(name='save', value='/home/ci-81/Pictures', format='str') @@ -56,23 +60,19 @@ if __name__ == '__main__': op3.addParameter(name='zmin', value='-10', format='int') op3.addParameter(name='zmax', value='30', format='int') op3.addParameter(name='SNRthresh', value='0', format='float') + """ # + op5 = proc1.addOperation(name='PlotDOPData', optype='other') + op5.addParameter(name='wintitle', value='HF System DOP', format='str') + op5.addParameter(name='save', value='/home/ci-81/Pictures', format='str') + op5.addParameter(name='show', value='1', format='bool') + op5.addParameter(name='zmin', value='-120', format='float') + op5.addParameter(name='zmax', value='120', format='float') + op5.addParameter(name='colormap', value='RdBu_r', format='str') """ op4 = proc1.addOperation(name='PlotSNRData1', optype='other') op4.addParameter(name='wintitle', value='HF System SNR1', format='str') op4.addParameter(name='save', value='/home/ci-81/Pictures', format='str') op4.addParameter(name='show', value='0', format='bool') - - - op5 = proc1.addOperation(name='PlotDOPData', optype='other') - op5.addParameter(name='wintitle', value='HF System DOP', format='str') - op5.addParameter(name='save', value='/home/ci-81/Pictures', format='str') - op5.addParameter(name='show', value='0', format='bool') - op5.addParameter(name='colormap', value='jet', format='str') """ - - - - - controllerObj.start() diff --git a/schainpy/scripts/schain.xml b/schainpy/scripts/schain.xml index 6fd2140..fe952f2 100644 --- a/schainpy/scripts/schain.xml +++ b/schainpy/scripts/schain.xml @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file diff --git a/setup.py b/setup.py index a0f14e8..3fe773f 100644 --- a/setup.py +++ b/setup.py @@ -44,5 +44,6 @@ setup(name="schainpy", "paramiko >= 2.1.2", "paho-mqtt >= 1.2", "zmq", + "fuzzywuzzy" ], - ) \ No newline at end of file + )