diff --git a/.gitignore b/.gitignore index fdad5c7..c31424d 100644 --- a/.gitignore +++ b/.gitignore @@ -96,3 +96,7 @@ ENV/ # mkdocs documentation /site + +# eclipse +.project +.pydevproject diff --git a/schainpy/__init__.py b/schainpy/__init__.py index a395f37..3b8fe16 100644 --- a/schainpy/__init__.py +++ b/schainpy/__init__.py @@ -4,4 +4,4 @@ Created on Feb 7, 2012 @author $Author$ @version $Id$ ''' -__version__ = "2.2.5" \ No newline at end of file +__version__ = "2.3" \ No newline at end of file diff --git a/schainpy/controller.py b/schainpy/controller.py index f67c982..1d4530a 100644 --- a/schainpy/controller.py +++ b/schainpy/controller.py @@ -438,7 +438,8 @@ class OperationConf(): def createObject(self, plotter_queue=None): - if self.type == 'self': + + if self.type == 'self': raise ValueError, "This operation type cannot be created" if self.type == 'plotter': @@ -449,10 +450,10 @@ class OperationConf(): opObj = Plotter(self.name, plotter_queue) if self.type == 'external' or self.type == 'other': - print self.name + className = eval(self.name) kwargs = self.getKwargs() - print kwargs + opObj = className(**kwargs) return opObj @@ -671,14 +672,18 @@ class ProcUnitConf(): kwargs = self.getKwargs() procUnitObj = className(**kwargs) - for opConfObj in self.opConfObjList: - - if opConfObj.type == 'self': + for opConfObj in self.opConfObjList: + + if opConfObj.type=='self' and self.name=='run': + continue + elif opConfObj.type=='self': + procUnitObj.addOperationKwargs(opConfObj.id, **opConfObj.getKwargs()) continue opObj = opConfObj.createObject(plotter_queue) self.opObjDict[opConfObj.id] = opObj + procUnitObj.addOperation(opObj, opConfObj.id) self.procUnitObj = procUnitObj diff --git a/schainpy/model/graphics/jroplot_data.py b/schainpy/model/graphics/jroplot_data.py index 3cfb669..6d3d898 100644 --- a/schainpy/model/graphics/jroplot_data.py +++ b/schainpy/model/graphics/jroplot_data.py @@ -29,8 +29,9 @@ class PlotData(Operation, Process): def __init__(self, **kwargs): - Operation.__init__(self, **kwargs) + Operation.__init__(self, plot=True, **kwargs) Process.__init__(self) + self.kwargs['code'] = self.CODE self.mp = False self.dataOut = None self.isConfig = False diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index 2362475..08e6beb 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -35,12 +35,20 @@ class ProcessingUnit(object): self.dataOut = None self.operations2RunDict = {} + self.operationKwargs = {} self.isConfig = False self.args = args self.kwargs = kwargs + def addOperationKwargs(self, objId, **kwargs): + ''' + ''' + + self.operationKwargs[objId] = kwargs + + def addOperation(self, opObj, objId): """ @@ -80,7 +88,7 @@ class ProcessingUnit(object): raise NotImplementedError - def callMethod(self, name, **kwargs): + def callMethod(self, name, opId): """ Ejecuta el metodo con el nombre "name" y con argumentos **kwargs de la propia clase. @@ -100,7 +108,7 @@ class ProcessingUnit(object): return False else: #Si no es un metodo RUN la entrada es la misma dataOut (interna) - if self.dataOut.isEmpty(): + if self.dataOut is not None and self.dataOut.isEmpty(): return False #Getting the pointer to method @@ -109,11 +117,17 @@ class ProcessingUnit(object): #Executing the self method if hasattr(self, 'mp'): - if self.mp is False: - self.mp = True - self.start() + if name=='run': + if self.mp is False: + self.mp = True + self.start() + else: + methodToCall(**self.operationKwargs[opId]) else: - methodToCall(**kwargs) + if name=='run': + methodToCall(**self.kwargs) + else: + methodToCall(**self.operationKwargs[opId]) if self.dataOut is None: return False @@ -146,10 +160,12 @@ class ProcessingUnit(object): if hasattr(externalProcObj, 'mp'): if externalProcObj.mp is False: + self.operationKwargs[objId] = externalProcObj.kwargs externalProcObj.mp = True externalProcObj.start() else: externalProcObj.run(self.dataOut, **externalProcObj.kwargs) + self.operationKwargs[objId] = externalProcObj.kwargs return True @@ -198,7 +214,7 @@ class ProcessingUnit(object): if not opName: raise ValueError, "opName parameter should be defined" - sts = self.callMethod(opName, **self.kwargs) + sts = self.callMethod(opName, opId) elif opType == 'other' or opType == 'external' or opType == 'plotter': diff --git a/schainpy/model/proc/jroproc_parameters.py b/schainpy/model/proc/jroproc_parameters.py index 484a8b6..c0ed146 100644 --- a/schainpy/model/proc/jroproc_parameters.py +++ b/schainpy/model/proc/jroproc_parameters.py @@ -518,9 +518,6 @@ class WindProfiler(Operation): n = None - def __init__(self): - Operation.__init__(self) - def __calculateCosDir(self, elev, azim): zen = (90 - elev)*numpy.pi/180 azim = azim*numpy.pi/180 @@ -1204,8 +1201,6 @@ class WindProfiler(Operation): class EWDriftsEstimation(Operation): - def __init__(self): - Operation.__init__(self) def __correctValues(self, heiRang, phi, velRadial, SNR): listPhi = phi.tolist() @@ -2032,7 +2027,7 @@ class SMDetection(Operation): timeLag = 45*10**-3 else: timeLag = 15*10**-3 - lag = numpy.ceil(timeLag/timeInterval) + lag = int(numpy.ceil(timeLag/timeInterval)) listMeteors1 = [] diff --git a/schainpy/model/utils/jroutils_publish.py b/schainpy/model/utils/jroutils_publish.py index 97983d9..5984794 100644 --- a/schainpy/model/utils/jroutils_publish.py +++ b/schainpy/model/utils/jroutils_publish.py @@ -29,6 +29,10 @@ def roundFloats(obj): elif isinstance(obj, float): return round(obj, 2) +def decimate(z): + # dx = int(len(self.x)/self.__MAXNUMX) + 1 + dy = int(len(z[0])/MAXNUMY) + 1 + return z[::, ::dy] class throttle(object): """Decorator that prevents a function from being called more than once every @@ -258,15 +262,23 @@ class ReceiverData(ProcessingUnit, Process): Process.__init__(self) self.mp = False self.isConfig = False + self.isWebConfig = False self.plottypes =[] self.connections = 0 server = kwargs.get('server', 'zmq.pipe') + plot_server = kwargs.get('plot_server', 'zmq.web') if 'tcp://' in server: address = server else: address = 'ipc:///tmp/%s' % server + if 'tcp://' in plot_server: + plot_address = plot_server + else: + plot_address = 'ipc:///tmp/%s' % plot_server + self.address = address + self.plot_address = plot_address self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')] self.realtime = kwargs.get('realtime', False) self.throttle_value = kwargs.get('throttle', 10) @@ -283,6 +295,7 @@ class ReceiverData(ProcessingUnit, Process): self.data['throttle'] = self.throttle_value self.data['ENDED'] = False self.isConfig = True + self.data_web = {} def event_monitor(self, monitor): @@ -318,7 +331,7 @@ class ReceiverData(ProcessingUnit, Process): return sendDataThrottled def send(self, data): - print '[sending] data=%s size=%s' % (data.keys(), len(data['times'])) + # print '[sending] data=%s size=%s' % (data.keys(), len(data['times'])) self.sender.send_pyobj(data) def update(self): @@ -327,7 +340,6 @@ class ReceiverData(ProcessingUnit, Process): self.data['times'].append(t) self.data['dataOut'] = self.dataOut for plottype in self.plottypes: - if plottype == 'spc': z = self.dataOut.data_spc/self.dataOut.normFactor self.data[plottype] = 10*numpy.log10(z) @@ -342,7 +354,9 @@ class ReceiverData(ProcessingUnit, Process): self.data[plottype][t] = self.dataOut.getCoherence() if plottype == 'phase': self.data[plottype][t] = self.dataOut.getCoherence(phase=True) - + if self.realtime: + self.data_web[plottype] = roundFloats(decimate(self.data[plottype][t]).tolist()) + self.data_web['time'] = t def run(self): print '[Starting] {} from {}'.format(self.name, self.address) @@ -352,7 +366,9 @@ class ReceiverData(ProcessingUnit, Process): self.receiver.bind(self.address) monitor = self.receiver.get_monitor_socket() self.sender = self.context.socket(zmq.PUB) - + if self.realtime: + self.sender_web = self.context.socket(zmq.PUB) + self.sender_web.bind(self.plot_address) self.sender.bind("ipc:///tmp/zmq.plots") t = Thread(target=self.event_monitor, args=(monitor,)) @@ -376,8 +392,29 @@ class ReceiverData(ProcessingUnit, Process): else: if self.realtime: self.send(self.data) + self.sender_web.send_string(json.dumps(self.data_web)) else: self.sendData(self.send, self.data) self.started = True return + + def sendToWeb(self): + + if not self.isWebConfig: + context = zmq.Context() + sender_web_config = context.socket(zmq.PUB) + if 'tcp://' in self.plot_address: + print self.plot_address + dum, address, port = self.plot_address.split(':') + conf_address = '{}:{}:{}'.format(dum, address, int(port)+1) + else: + conf_address = self.plot_address + '.config' + sender_web_config.bind(conf_address) + + for kwargs in self.operationKwargs.values(): + if 'plot' in kwargs: + sender_web_config.send_string(json.dumps(kwargs)) + print kwargs + self.isWebConfig = True + diff --git a/schainpy/scripts/PPD.py b/schainpy/scripts/PPD.py index 76cde49..c4c00a7 100644 --- a/schainpy/scripts/PPD.py +++ b/schainpy/scripts/PPD.py @@ -65,7 +65,7 @@ def fiber(cursor, skip, q, dt): # opObj13.addParameter(name='zeromq', value=1, format='int') # opObj13.addParameter(name='server', value="juanca", format='str') - # opObj12.addParameter(name='delay', value=1, format='int') + opObj12.addParameter(name='delay', value=1, format='int') # print "Escribiendo el archivo XML" @@ -81,6 +81,6 @@ def fiber(cursor, skip, q, dt): if __name__ == '__main__': parser = argparse.ArgumentParser(description='Set number of parallel processes') - parser.add_argument('--nProcess', default=2, type=int) + parser.add_argument('--nProcess', default=1, type=int) args = parser.parse_args() multiSchain(fiber, nProcess=args.nProcess, startDate='2015/09/26', endDate='2015/09/26') diff --git a/schainpy/scripts/receiver.py b/schainpy/scripts/receiver.py index 54d3409..b83a5d1 100644 --- a/schainpy/scripts/receiver.py +++ b/schainpy/scripts/receiver.py @@ -15,21 +15,22 @@ if __name__ == '__main__': controllerObj.setup(id='191', name='test01', description=desc) proc1 = controllerObj.addProcUnit(name='ReceiverData') - proc1.addParameter(name='realtime', value='0', format='bool') - proc1.addParameter(name='plottypes', value='rti,coh,phase', format='str') + proc1.addParameter(name='realtime', value='1', format='bool') + proc1.addParameter(name='plottypes', value='rti', format='str') proc1.addParameter(name='throttle', value='10', format='int') + ## TODO Agregar direccion de server de publicacion a graficos como variable - op1 = proc1.addOperation(name='PlotRTIData', optype='other') - op1.addParameter(name='wintitle', value='Julia 150Km', format='str') - op1.addParameter(name='save', value='/home/nanosat/Pictures', format='str') - - op2 = proc1.addOperation(name='PlotCOHData', optype='other') - op2.addParameter(name='wintitle', value='Julia 150Km', format='str') - op2.addParameter(name='save', value='/home/nanosat/Pictures', format='str') + # op1 = proc1.addOperation(name='PlotRTIData', optype='other') + # op1.addParameter(name='wintitle', value='Julia 150Km', format='str') + # op1.addParameter(name='save', value='/home/nanosat/Pictures', format='str') # - op6 = proc1.addOperation(name='PlotPHASEData', optype='other') - op6.addParameter(name='wintitle', value='Julia 150Km', format='str') - op6.addParameter(name='save', value='/home/nanosat/Pictures', format='str') + # op2 = proc1.addOperation(name='PlotCOHData', optype='other') + # op2.addParameter(name='wintitle', value='Julia 150Km', format='str') + # op2.addParameter(name='save', value='/home/nanosat/Pictures', format='str') + # # + # op6 = proc1.addOperation(name='PlotPHASEData', optype='other') + # op6.addParameter(name='wintitle', value='Julia 150Km', format='str') + # op6.addParameter(name='save', value='/home/nanosat/Pictures', format='str') # # proc2 = controllerObj.addProcUnit(name='ReceiverData') # proc2.addParameter(name='server', value='juanca', format='str') diff --git a/schainpy/scripts/schain.xml b/schainpy/scripts/schain.xml index a3a9eea..89b71e9 100644 --- a/schainpy/scripts/schain.xml +++ b/schainpy/scripts/schain.xml @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file