From 7e36d2e90f1d3422fb8cb671ab3d09dded148272 2018-07-06 22:50:32 From: Juan C. Espinoza Date: 2018-07-06 22:50:32 Subject: [PATCH] Merge branch 'v3.0-devel' of http://jro-dev.igp.gob.pe/rhodecode/schain into v3.0-devel --- diff --git a/schainpy/controller.py b/schainpy/controller.py index 93bd843..750fe56 100644 --- a/schainpy/controller.py +++ b/schainpy/controller.py @@ -1,23 +1,36 @@ ''' +Updated on January , 2018, for multiprocessing purposes +Author: Sergio Cortez Created on September , 2012 -@author: ''' - +from platform import python_version import sys import ast import datetime import traceback import math import time +import zmq from multiprocessing import Process, cpu_count from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring from xml.dom import minidom -import schainpy + from schainpy.admin import Alarm, SchainWarning -from schainpy.model import * + +### Temporary imports!!! +# from schainpy.model import * +from schainpy.model.io import * +from schainpy.model.graphics import * +from schainpy.model.proc.jroproc_base import * +from schainpy.model.proc.bltrproc_parameters import * +from schainpy.model.proc.jroproc_spectra import * +from schainpy.model.proc.jroproc_voltage import * +from schainpy.model.proc.jroproc_parameters import * +from schainpy.model.utils.jroutils_publish import * from schainpy.utils import log +### DTYPES = { 'Voltage': '.r', @@ -77,7 +90,6 @@ def MPProject(project, n=cpu_count()): time.sleep(3) - class ParameterConf(): id = None @@ -267,7 +279,6 @@ class ParameterConf(): print('Parameter[%s]: name = %s, value = %s, format = %s' % (self.id, self.name, self.value, self.format)) - class OperationConf(): id = None @@ -284,12 +295,15 @@ class OperationConf(): self.id = '0' self.name = None self.priority = None - self.type = 'self' + self.topic = None def __getNewId(self): return int(self.id) * 10 + len(self.parmConfObjList) + 1 + def getId(self): + return self.id + def updateId(self, new_id): self.id = str(new_id) @@ -361,7 +375,6 @@ class OperationConf(): self.name = name self.type = type self.priority = priority - self.parmConfObjList = [] def removeParameters(self): @@ -443,27 +456,19 @@ class OperationConf(): for parmConfObj in self.parmConfObjList: parmConfObj.printattr() - def createObject(self, plotter_queue=None): - - if self.type == 'self': - raise ValueError('This operation type cannot be created') + def createObject(self): - if self.type == 'plotter': - if not plotter_queue: - raise ValueError('plotter_queue is not defined. Use:\nmyProject = Project()\nmyProject.setPlotterQueue(plotter_queue)') - - opObj = Plotter(self.name, plotter_queue) + className = eval(self.name) + kwargs = self.getKwargs() - if self.type == 'external' or self.type == 'other': + opObj = className(self.id, **kwargs) - className = eval(self.name) - kwargs = self.getKwargs() + opObj.start() - opObj = className(**kwargs) + print(' Operation created') return opObj - class ProcUnitConf(): id = None @@ -484,7 +489,7 @@ class ProcUnitConf(): self.id = None self.datatype = None self.name = None - self.inputId = None + self.inputId = None self.opConfObjList = [] @@ -507,14 +512,14 @@ class ProcUnitConf(): return self.id - def updateId(self, new_id, parentId=parentId): - + def updateId(self, new_id, parentId=parentId): + ''' new_id = int(parentId) * 10 + (int(self.id) % 10) new_inputId = int(parentId) * 10 + (int(self.inputId) % 10) # If this proc unit has not inputs - if self.inputId == '0': - new_inputId = 0 + #if self.inputId == '0': + #new_inputId = 0 n = 1 for opConfObj in self.opConfObjList: @@ -526,8 +531,9 @@ class ProcUnitConf(): self.parentId = str(parentId) self.id = str(new_id) - self.inputId = str(new_inputId) - + #self.inputId = str(new_inputId) + ''' + n = 1 def getInputId(self): return self.inputId @@ -560,11 +566,17 @@ class ProcUnitConf(): return self.procUnitObj def setup(self, id, name, datatype, inputId, parentId=None): - + ''' + id sera el topico a publicar + inputId sera el topico a subscribirse + ''' + # Compatible with old signal chain version if datatype == None and name == None: raise ValueError('datatype or name should be defined') + #Definir una condicion para inputId cuando sea 0 + if name == None: if 'Proc' in datatype: name = datatype @@ -577,12 +589,11 @@ class ProcUnitConf(): self.id = str(id) self.name = name self.datatype = datatype - self.inputId = inputId + self.inputId = inputId self.parentId = parentId - self.opConfObjList = [] - self.addOperation(name='run', optype='self') + self.addOperation(name='run', optype='self') def removeOperations(self): @@ -602,10 +613,16 @@ class ProcUnitConf(): return opObj - def addOperation(self, name, optype='self'): + def addOperation(self, name, optype = 'self'): + ''' + Actualizacion - > proceso comunicacion + En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic + definir el tipoc de socket o comunicacion ipc++ + + ''' id = self.__getNewId() - priority = self.__getPriority() + priority = self.__getPriority() # Sin mucho sentido, pero puede usarse opConfObj = OperationConf() opConfObj.setup(id, name=name, priority=priority, type=optype) @@ -668,11 +685,15 @@ class ProcUnitConf(): return kwargs - def createObjects(self, plotter_queue=None): + def createObjects(self, dictUnits): + ''' + Instancia de unidades de procesamiento. + ''' className = eval(self.name) kwargs = self.getKwargs() - procUnitObj = className(**kwargs) + procUnitObj = className(self.id, self.inputId, dictUnits, **kwargs) # necesitan saber su id y su entrada por fines de ipc + for opConfObj in self.opConfObjList: @@ -682,21 +703,25 @@ class ProcUnitConf(): procUnitObj.addOperationKwargs( opConfObj.id, **opConfObj.getKwargs()) continue - - opObj = opConfObj.createObject(plotter_queue) - - self.opObjDict[opConfObj.id] = opObj - - procUnitObj.addOperation(opObj, opConfObj.id) + print("Creating operation process:", opConfObj.name, "for", self.name) + opObj = opConfObj.createObject() + + + #self.opObjDict[opConfObj.id] = opObj.name + + procUnitObj.addOperation(opConfObj.name, opConfObj.id) + + procUnitObj.start() self.procUnitObj = procUnitObj + return procUnitObj def run(self): - - is_ok = False - + + is_ok = True + """ for opConfObj in self.opConfObjList: kwargs = {} @@ -711,9 +736,11 @@ class ProcUnitConf(): opId=opConfObj.id) is_ok = is_ok or sts - + + """ return is_ok - + + def close(self): for opConfObj in self.opConfObjList: @@ -752,11 +779,20 @@ class ReadUnitConf(ProcUnitConf): def getElementName(self): - return self.ELEMENTNAME - + return self.ELEMENTNAME + def setup(self, id, name, datatype, path='', startDate='', endDate='', startTime='', endTime='', parentId=None, server=None, **kwargs): + + ''' + *****el id del proceso sera el Topico + + Adicion de {topic}, si no esta presente -> error + kwargs deben ser trasmitidos en la instanciacion + + ''' + # Compatible with old signal chain version if datatype == None and name == None: raise ValueError('datatype or name should be defined') @@ -814,9 +850,9 @@ class ReadUnitConf(ProcUnitConf): self.opConfObjList = [] - def addRunOperation(self, **kwargs): + def addRunOperation(self, **kwargs): - opObj = self.addOperation(name='run', optype='self') + opObj = self.addOperation(name='run', optype='self') if self.server is None: opObj.addParameter( @@ -892,7 +928,6 @@ class ReadUnitConf(ProcUnitConf): class Project(Process): id = None - # name = None description = None filename = None @@ -900,16 +935,15 @@ class Project(Process): ELEMENTNAME = 'Project' - plotterQueue = None + - def __init__(self, plotter_queue=None): + def __init__(self): Process.__init__(self) - self.id = None + self.id = None self.description = None self.email = None self.alarm = None - self.plotterQueue = plotter_queue self.procUnitConfObjDict = {} def __getNewId(self): @@ -958,13 +992,15 @@ class Project(Process): def setup(self, id, name='', description='', email=None, alarm=[]): - print() + print(' ') print('*' * 60) - print(' Starting SIGNAL CHAIN PROCESSING v%s ' % schainpy.__version__) + print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__) print('*' * 60) - print() + print("* Python " + python_version() + " *") + print('*' * 19) + print(' ') self.id = str(id) - self.description = description + self.description = description self.email = email self.alarm = alarm @@ -981,6 +1017,14 @@ class Project(Process): def addReadUnit(self, id=None, datatype=None, name=None, **kwargs): + ''' + Actualizacion: + Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos + + * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data) + + ''' + if id is None: idReadUnit = self.__getNewId() else: @@ -991,16 +1035,26 @@ class Project(Process): parentId=self.id, **kwargs) self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj - + return readUnitConfObj def addProcUnit(self, inputId='0', datatype=None, name=None): - idProcUnit = self.__getNewId() + ''' + Actualizacion: + Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit) + Deberia reemplazar a "inputId" + + ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia + (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica. + + ''' + + idProcUnit = self.__getNewId() #Topico para subscripcion procUnitConfObj = ProcUnitConf() - procUnitConfObj.setup(idProcUnit, name, datatype, - inputId, parentId=self.id) + procUnitConfObj.setup(idProcUnit, name, datatype, inputId, #topic_read, topic_write, + parentId=self.id) self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj @@ -1156,29 +1210,11 @@ class Project(Process): def createObjects(self): for procUnitConfObj in list(self.procUnitConfObjDict.values()): - procUnitConfObj.createObjects(self.plotterQueue) - - def __connect(self, objIN, thisObj): - - thisObj.setInput(objIN.getOutputObj()) - - def connectObjects(self): - - for thisPUConfObj in list(self.procUnitConfObjDict.values()): + print("Creating process:", procUnitConfObj.name) + procUnitConfObj.createObjects(self.procUnitConfObjDict) + - inputId = thisPUConfObj.getInputId() - - if int(inputId) == 0: - continue - - # Get input object - puConfINObj = self.procUnitConfObjDict[inputId] - puObjIN = puConfINObj.getProcUnitObj() - - # Get current object - thisPUObj = thisPUConfObj.getProcUnitObj() - - self.__connect(puObjIN, thisPUObj) + print('All processes were created') def __handleError(self, procUnitConfObj, modes=None, stdout=True): @@ -1193,7 +1229,7 @@ class Project(Process): err = traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2]) - + log.error('{}'.format(err[-1]), procUnitConfObj.name) message = ''.join(err) @@ -1268,75 +1304,30 @@ class Project(Process): self.filename = filename - def setPlotterQueue(self, plotter_queue): - - raise NotImplementedError('Use schainpy.controller_api.ControllerThread instead Project class') - - def getPlotterQueue(self): - - raise NotImplementedError('Use schainpy.controller_api.ControllerThread instead Project class') - - def useExternalPlotter(self): + def setProxyCom(self): + + ctx = zmq.Context() + if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp') + xsub = ctx.socket(zmq.XSUB) + xsub.bind('ipc:///tmp/socketTmp/a') + xpub = ctx.socket(zmq.XPUB) + xpub.bind('ipc:///tmp/socketTmp/b') + + print("Controller Ready: Processes and proxy created") + zmq.proxy(xsub, xpub) - raise NotImplementedError('Use schainpy.controller_api.ControllerThread instead Project class') + def run(self): log.success('Starting {}'.format(self.name), tag='') self.start_time = time.time() - self.createObjects() - self.connectObjects() - - keyList = list(self.procUnitConfObjDict.keys()) - keyList.sort() - - err = None - - while(True): - - is_ok = False + self.createObjects() + self.setProxyCom() - for procKey in keyList: - - procUnitConfObj = self.procUnitConfObjDict[procKey] - - try: - sts = procUnitConfObj.run() - is_ok = is_ok or sts - except SchainWarning: - err = self.__handleError(procUnitConfObj, modes=[2, 3], stdout=False) - is_ok = False - break - except KeyboardInterrupt: - is_ok = False - break - except ValueError as e: - time.sleep(0.5) - err = self.__handleError(procUnitConfObj) - is_ok = False - break - except: - time.sleep(0.5) - err = self.__handleError(procUnitConfObj) - is_ok = False - break - - # If every process unit finished so end process - if not(is_ok): - break - - if not self.runController(): - break + # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo # Closing every process - for procKey in keyList: - procUnitConfObj = self.procUnitConfObjDict[procKey] - procUnitConfObj.close() - - if err is not None: - err.start() - # err.join() - log.success('{} finished (time: {}s)'.format( self.name, time.time()-self.start_time)) \ No newline at end of file diff --git a/schainpy/model/graphics/figure.py b/schainpy/model/graphics/figure.py index 931e126..041d89c 100644 --- a/schainpy/model/graphics/figure.py +++ b/schainpy/model/graphics/figure.py @@ -3,7 +3,8 @@ import numpy import time, datetime from schainpy.model.graphics import mpldriver -from schainpy.model.proc.jroproc_base import Operation +from schainpy.model.proc.jroproc_base import MPDecorator, Operation + def isTimeInHourRange(datatime, xmin, xmax): @@ -62,9 +63,9 @@ class Figure(Operation): created = False parameters = {} - def __init__(self, **kwargs): + def __init__(self):#, **kwargs): - Operation.__init__(self, **kwargs) + Operation.__init__(self)#, **kwargs) def __del__(self): diff --git a/schainpy/model/graphics/jroplot_spectra.py b/schainpy/model/graphics/jroplot_spectra.py index 4eeb92b..24f8702 100644 --- a/schainpy/model/graphics/jroplot_spectra.py +++ b/schainpy/model/graphics/jroplot_spectra.py @@ -9,8 +9,11 @@ import numpy from .figure import Figure, isRealtime, isTimeInHourRange from .plotting_codes import * +from schainpy.model.proc.jroproc_base import MPDecorator +from schainpy.utils import log +@MPDecorator class SpectraPlot(Figure): isConfig = None @@ -20,11 +23,10 @@ class SpectraPlot(Figure): HEIGHTPROF = None PREFIX = 'spc' - def __init__(self, **kwargs): - Figure.__init__(self, **kwargs) + def __init__(self):#, **kwargs): + Figure.__init__(self)#, **kwargs) self.isConfig = False self.__nsubplots = 1 - self.WIDTH = 250 self.HEIGHT = 250 self.WIDTHPROF = 120 @@ -104,6 +106,9 @@ class SpectraPlot(Figure): zmin : None, zmax : None """ + if dataOut.flagNoData: + return dataOut + if realtime: if not(isRealtime(utcdatatime = dataOut.utctime)): print('Skipping this plot function') @@ -219,6 +224,8 @@ class SpectraPlot(Figure): wr_period=wr_period, thisDatetime=thisDatetime) + return dataOut +@MPDecorator class CrossSpectraPlot(Figure): isConfig = None @@ -230,8 +237,8 @@ class CrossSpectraPlot(Figure): HEIGHTPROF = None PREFIX = 'cspc' - def __init__(self, **kwargs): - Figure.__init__(self, **kwargs) + def __init__(self):#, **kwargs): + Figure.__init__(self)#, **kwargs) self.isConfig = False self.__nsubplots = 4 self.counter_imagwr = 0 @@ -301,6 +308,9 @@ class CrossSpectraPlot(Figure): zmax : None """ + if dataOut.flagNoData: + return dataOut + if pairsList == None: pairsIndexList = dataOut.pairsIndexList else: @@ -440,7 +450,9 @@ class CrossSpectraPlot(Figure): wr_period=wr_period, thisDatetime=thisDatetime) + return dataOut +@MPDecorator class RTIPlot(Figure): __isConfig = None @@ -450,9 +462,9 @@ class RTIPlot(Figure): HEIGHTPROF = None PREFIX = 'rti' - def __init__(self, **kwargs): + def __init__(self):#, **kwargs): - Figure.__init__(self, **kwargs) + Figure.__init__(self)#, **kwargs) self.timerange = None self.isConfig = False self.__nsubplots = 1 @@ -540,6 +552,8 @@ class RTIPlot(Figure): zmin : None, zmax : None """ + if dataOut.flagNoData: + return dataOut #colormap = kwargs.get('colormap', 'jet') if HEIGHT is not None: @@ -650,7 +664,9 @@ class RTIPlot(Figure): wr_period=wr_period, thisDatetime=thisDatetime, update_figfile=update_figfile) + return dataOut +@MPDecorator class CoherenceMap(Figure): isConfig = None __nsubplots = None @@ -659,8 +675,8 @@ class CoherenceMap(Figure): HEIGHTPROF = None PREFIX = 'cmap' - def __init__(self, **kwargs): - Figure.__init__(self, **kwargs) + def __init__(self):#, **kwargs): + Figure.__init__(self)#, **kwargs) self.timerange = 2*60*60 self.isConfig = False self.__nsubplots = 1 @@ -723,6 +739,10 @@ class CoherenceMap(Figure): server=None, folder=None, username=None, password=None, ftp_wei=0, exp_code=0, sub_exp_code=0, plot_pos=0): + + if dataOut.flagNoData: + return dataOut + if not isTimeInHourRange(dataOut.datatime, xmin, xmax): return @@ -855,6 +875,9 @@ class CoherenceMap(Figure): thisDatetime=thisDatetime, update_figfile=update_figfile) + return dataOut + +@MPDecorator class PowerProfilePlot(Figure): isConfig = None @@ -864,8 +887,8 @@ class PowerProfilePlot(Figure): HEIGHTPROF = None PREFIX = 'spcprofile' - def __init__(self, **kwargs): - Figure.__init__(self, **kwargs) + def __init__(self):#, **kwargs): + Figure.__init__(self)#, **kwargs) self.isConfig = False self.__nsubplots = 1 @@ -907,6 +930,9 @@ class PowerProfilePlot(Figure): ftp=False, wr_period=1, server=None, folder=None, username=None, password=None): + if dataOut.flagNoData: + return dataOut + if channelList == None: channelIndexList = dataOut.channelIndexList @@ -978,7 +1004,10 @@ class PowerProfilePlot(Figure): ftp=ftp, wr_period=wr_period, thisDatetime=thisDatetime) + + return dataOut +@MPDecorator class SpectraCutPlot(Figure): isConfig = None @@ -988,8 +1017,8 @@ class SpectraCutPlot(Figure): HEIGHTPROF = None PREFIX = 'spc_cut' - def __init__(self, **kwargs): - Figure.__init__(self, **kwargs) + def __init__(self):#, **kwargs): + Figure.__init__(self)#, **kwargs) self.isConfig = False self.__nsubplots = 1 @@ -1032,6 +1061,8 @@ class SpectraCutPlot(Figure): folder=None, username=None, password=None, xaxis="frequency"): + if dataOut.flagNoData: + return dataOut if channelList == None: channelIndexList = dataOut.channelIndexList @@ -1111,6 +1142,9 @@ class SpectraCutPlot(Figure): wr_period=wr_period, thisDatetime=thisDatetime) + return dataOut + +@MPDecorator class Noise(Figure): isConfig = None @@ -1119,8 +1153,8 @@ class Noise(Figure): PREFIX = 'noise' - def __init__(self, **kwargs): - Figure.__init__(self, **kwargs) + def __init__(self):#, **kwargs): + Figure.__init__(self)#, **kwargs) self.timerange = 24*60*60 self.isConfig = False self.__nsubplots = 1 @@ -1209,6 +1243,9 @@ class Noise(Figure): server=None, folder=None, username=None, password=None, ftp_wei=0, exp_code=0, sub_exp_code=0, plot_pos=0): + if dataOut.flagNoData: + return dataOut + if not isTimeInHourRange(dataOut.datatime, xmin, xmax): return @@ -1312,6 +1349,9 @@ class Noise(Figure): if save: self.save_data(self.filename_noise, noisedB, thisDatetime) + return dataOut + +@MPDecorator class BeaconPhase(Figure): __isConfig = None @@ -1319,8 +1359,8 @@ class BeaconPhase(Figure): PREFIX = 'beacon_phase' - def __init__(self, **kwargs): - Figure.__init__(self, **kwargs) + def __init__(self):#, **kwargs): + Figure.__init__(self)#, **kwargs) self.timerange = 24*60*60 self.isConfig = False self.__nsubplots = 1 @@ -1399,6 +1439,9 @@ class BeaconPhase(Figure): server=None, folder=None, username=None, password=None, ftp_wei=0, exp_code=0, sub_exp_code=0, plot_pos=0): + if dataOut.flagNoData: + return dataOut + if not isTimeInHourRange(dataOut.datatime, xmin, xmax): return @@ -1539,4 +1582,6 @@ class BeaconPhase(Figure): ftp=ftp, wr_period=wr_period, thisDatetime=thisDatetime, - update_figfile=update_figfile) \ No newline at end of file + update_figfile=update_figfile) + + return dataOut #Yong \ No newline at end of file diff --git a/schainpy/model/graphics/jroplot_voltage.py b/schainpy/model/graphics/jroplot_voltage.py index e5754e2..0a1218b 100644 --- a/schainpy/model/graphics/jroplot_voltage.py +++ b/schainpy/model/graphics/jroplot_voltage.py @@ -6,15 +6,18 @@ Created on Jul 9, 2014 import os import datetime import numpy - +from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator #YONG +from schainpy.utils import log from .figure import Figure + +@MPDecorator class Scope(Figure): isConfig = None - def __init__(self, **kwargs): - Figure.__init__(self, **kwargs) + def __init__(self):#, **kwargs): #YONG + Figure.__init__(self)#, **kwargs) self.isConfig = False self.WIDTH = 300 self.HEIGHT = 200 @@ -127,6 +130,8 @@ class Scope(Figure): ymin : None, ymax : None, """ + if dataOut.flagNoData: + return dataOut if channelList == None: channelIndexList = dataOut.channelIndexList @@ -222,4 +227,6 @@ class Scope(Figure): save=save, ftp=ftp, wr_period=wr_period, - thisDatetime=thisDatetime) \ No newline at end of file + thisDatetime=thisDatetime) + + return dataOut \ No newline at end of file diff --git a/schainpy/model/io/jroIO_base.py b/schainpy/model/io/jroIO_base.py index 9d8012d..835a7bd 100644 --- a/schainpy/model/io/jroIO_base.py +++ b/schainpy/model/io/jroIO_base.py @@ -20,7 +20,6 @@ try: except: from time import sleep -import schainpy.admin from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width from schainpy.utils import log @@ -885,6 +884,7 @@ class JRODataReader(JRODataIO): self.flagIsNewFile = 0 self.fp = None self.flagNoMoreFiles = 1 +# print '[Reading] No more files to read' return fileOk_flag @@ -897,8 +897,8 @@ class JRODataReader(JRODataIO): else: newFile = self.__setNextFileOffline() - if not(newFile): - raise schainpy.admin.SchainWarning('No more files to read') + if not(newFile): + raise(schainpy.admin.SchainWarning('No more files to read')) return 0 if self.verbose: @@ -1052,7 +1052,7 @@ class JRODataReader(JRODataIO): # Skip block out of startTime and endTime while True: if not(self.__setNewBlock()): - raise schainpy + raise(schainpy.admin.SchainWarning('No more files')) return 0 if not(self.readBlock()): @@ -1320,11 +1320,11 @@ class JRODataReader(JRODataIO): if fullpath: break - print('[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (delay, path, nTries + 1)) - sleep(delay) + print('[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (self.delay, path, nTries + 1)) + sleep(self.delay) - if not(fullpath): - raise schainpy.admin.SchainWarning('There isn\'t any valid file in {}'.format(path)) + if not(fullpath): + raise(schainpy.admin.SchainWarning('There isn\'t any valid file in {}'.format(path))) return self.year = year diff --git a/schainpy/model/io/jroIO_spectra.py b/schainpy/model/io/jroIO_spectra.py index 14f04c8..b6fc5cf 100644 --- a/schainpy/model/io/jroIO_spectra.py +++ b/schainpy/model/io/jroIO_spectra.py @@ -6,10 +6,11 @@ Created on Jul 2, 2014 import numpy from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter -from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation +from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader from schainpy.model.data.jrodata import Spectra +@MPDecorator class SpectraReader(JRODataReader, ProcessingUnit): """ Esta clase permite leer datos de espectros desde archivos procesados (.pdata). La lectura @@ -69,7 +70,7 @@ class SpectraReader(JRODataReader, ProcessingUnit): rdPairList = [] - def __init__(self, **kwargs): + def __init__(self):#, **kwargs): """ Inicializador de la clase SpectraReader para la lectura de datos de espectros. @@ -88,7 +89,7 @@ class SpectraReader(JRODataReader, ProcessingUnit): """ #Eliminar de la base la herencia - ProcessingUnit.__init__(self, **kwargs) + ProcessingUnit.__init__(self)#, **kwargs) # self.isConfig = False diff --git a/schainpy/model/io/jroIO_voltage.py b/schainpy/model/io/jroIO_voltage.py index 4485896..afdfc5f 100644 --- a/schainpy/model/io/jroIO_voltage.py +++ b/schainpy/model/io/jroIO_voltage.py @@ -7,7 +7,7 @@ Created on Jul 2, 2014 import numpy from .jroIO_base import LOCALTIME, JRODataReader, JRODataWriter -from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation +from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader from schainpy.model.data.jrodata import Voltage import zmq @@ -15,7 +15,7 @@ import tempfile from io import StringIO # from _sha import blocksize - +@MPDecorator class VoltageReader(JRODataReader, ProcessingUnit): """ Esta clase permite leer datos de voltage desde archivos en formato rawdata (.r). La lectura @@ -62,7 +62,7 @@ class VoltageReader(JRODataReader, ProcessingUnit): optchar = "D" dataOut = None - def __init__(self, **kwargs): + def __init__(self):#, **kwargs): """ Inicializador de la clase VoltageReader para la lectura de datos de voltage. @@ -81,7 +81,7 @@ class VoltageReader(JRODataReader, ProcessingUnit): None """ - ProcessingUnit.__init__(self, **kwargs) + ProcessingUnit.__init__(self)#, **kwargs) self.isConfig = False @@ -761,4 +761,5 @@ class VoltageWriter(JRODataWriter, Operation): self.processingHeaderObj.processFlags = self.getProcessFlags() - self.setBasicHeader() \ No newline at end of file + self.setBasicHeader() + \ No newline at end of file diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index f78f4f8..eb9d967 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -1,41 +1,39 @@ ''' - -$Author: murco $ -$Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $ +Updated for multiprocessing +Author : Sergio Cortez +Jan 2018 +Abstract: + Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created. + The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated. + The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self'). + +Based on: + $Author: murco $ + $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $ ''' +from platform import python_version import inspect -from fuzzywuzzy import process - -def checkKwargs(method, kwargs): - currentKwargs = kwargs - choices = inspect.getargspec(method).args - try: - choices.remove('self') - except Exception as e: - pass +import zmq +import time +import pickle +import os +from multiprocessing import Process - try: - choices.remove('dataOut') - except Exception as e: - pass +from schainpy.utils import log - for kwarg in kwargs: - fuzz = process.extractOne(kwarg, choices) - if fuzz is None: - continue - if fuzz[1] < 100: - raise Exception('\x1b[0;32;40mDid you mean {} instead of {} in {}? \x1b[0m'. - format(fuzz[0], kwarg, method.__self__.__class__.__name__)) class ProcessingUnit(object): """ - Esta es la clase base para el procesamiento de datos. + Update - Jan 2018 - MULTIPROCESSING + All the "call" methods present in the previous base were removed. + The majority of operations are independant processes, thus + the decorator is in charge of communicate the operation processes + with the proccessing unit via IPC. - Contiene el metodo "call" para llamar operaciones. Las operaciones pueden ser: - - Metodos internos (callMethod) - - Objetos del tipo Operation (callObject). Antes de ser llamados, estos objetos - tienen que ser agreagados con el metodo "add". + The constructor does not receive any argument. The remaining methods + are related with the operations to execute. + """ # objeto de datos de entrada (Voltage, Spectra o Correlation) @@ -43,33 +41,25 @@ class ProcessingUnit(object): dataInList = [] # objeto de datos de entrada (Voltage, Spectra o Correlation) + + id = None + inputId = None + dataOut = None + dictProcs = None + operations2RunDict = None isConfig = False - - def __init__(self, *args, **kwargs): + def __init__(self): self.dataIn = None - self.dataInList = [] - self.dataOut = None - self.operations2RunDict = {} - self.operationKwargs = {} - self.isConfig = False - self.args = args - self.kwargs = kwargs - - if not hasattr(self, 'name'): - self.name = self.__class__.__name__ - - checkKwargs(self.run, kwargs) - def getAllowedArgs(self): if hasattr(self, '__attrs__'): return self.__attrs__ @@ -81,27 +71,30 @@ class ProcessingUnit(object): ''' self.operationKwargs[objId] = kwargs - - + def addOperation(self, opObj, objId): """ - Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el - identificador asociado a este objeto. + This method is used in the controller, and update the dictionary containing the operations to execute. The dict + posses the id of the operation process (IPC purposes) - Input: + Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el + identificador asociado a este objeto. - object : objeto de la clase "Operation" + Input: - Return: + object : objeto de la clase "Operation" + + Return: - objId : identificador del objeto, necesario para ejecutar la operacion + objId : identificador del objeto, necesario para comunicar con master(procUnit) """ self.operations2RunDict[objId] = opObj return objId + def getOperationObj(self, objId): if objId not in list(self.operations2RunDict.keys()): @@ -120,241 +113,426 @@ class ProcessingUnit(object): **kwargs : Diccionario de argumentos de la funcion a ejecutar """ - raise NotImplementedError - - def callMethod(self, name, opId): - - """ - Ejecuta el metodo con el nombre "name" y con argumentos **kwargs de la propia clase. - - Input: - name : nombre del metodo a ejecutar - - **kwargs : diccionario con los nombres y valores de la funcion a ejecutar. - - """ + raise NotImplementedError - #Checking the inputs - if name == 'run': - - if not self.checkInputs(): - self.dataOut.flagNoData = True - return False - else: - #Si no es un metodo RUN la entrada es la misma dataOut (interna) - if self.dataOut is not None and self.dataOut.isEmpty(): - return False + def setup(self): - #Getting the pointer to method - methodToCall = getattr(self, name) + raise NotImplementedError - #Executing the self method + def run(self): - if hasattr(self, 'mp'): - if name=='run': - if self.mp is False: - self.mp = True - self.start() - else: - self.operationKwargs[opId]['parent'] = self.kwargs - methodToCall(**self.operationKwargs[opId]) - else: - if name=='run': - methodToCall(**self.kwargs) - else: - methodToCall(**self.operationKwargs[opId]) + raise NotImplementedError - if self.dataOut is None: - return False + def close(self): + #Close every thread, queue or any other object here is it is neccesary. + return + +class Operation(object): - if self.dataOut.isEmpty(): - return False + """ + Update - Jan 2018 - MULTIPROCESSING - return True + Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process. + The constructor doe snot receive any argument, neither the baseclass. - def callObject(self, objId): - """ - Ejecuta la operacion asociada al identificador del objeto "objId" + Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit + y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de + acumulacion dentro de esta clase - Input: + Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer) - objId : identificador del objeto a ejecutar + """ + id = None + __buffer = None + dest = None + isConfig = False + readyFlag = None - **kwargs : diccionario con los nombres y valores de la funcion a ejecutar. + def __init__(self): - Return: + self.buffer = None + self.dest = None + self.isConfig = False + self.readyFlag = False - None - """ + if not hasattr(self, 'name'): + self.name = self.__class__.__name__ + + def getAllowedArgs(self): + if hasattr(self, '__attrs__'): + return self.__attrs__ + else: + return inspect.getargspec(self.run).args - if self.dataOut is not None and self.dataOut.isEmpty(): - return False + def setup(self): - externalProcObj = self.operations2RunDict[objId] + self.isConfig = True - if hasattr(externalProcObj, 'mp'): - if externalProcObj.mp is False: - externalProcObj.kwargs['parent'] = self.kwargs - self.operationKwargs[objId] = externalProcObj.kwargs - externalProcObj.mp = True - externalProcObj.start() - else: - externalProcObj.run(self.dataOut, **externalProcObj.kwargs) - self.operationKwargs[objId] = externalProcObj.kwargs + raise NotImplementedError - return True + def run(self, dataIn, **kwargs): - def call(self, opType, opName=None, opId=None): """ - Return True si ejecuta la operacion interna nombrada "opName" o la operacion externa - identificada con el id "opId"; con los argumentos "**kwargs". - - False si la operacion no se ha ejecutado. + Realiza las operaciones necesarias sobre la dataIn.data y actualiza los + atributos del objeto dataIn. Input: - opType : Puede ser "self" o "external" + dataIn : objeto del tipo JROData - Depende del tipo de operacion para llamar a:callMethod or callObject: + Return: - 1. If opType = "self": Llama a un metodo propio de esta clase: + None - name_method = getattr(self, name) - name_method(**kwargs) + Affected: + __buffer : buffer de recepcion de datos. + """ + if not self.isConfig: + self.setup(**kwargs) - 2. If opType = "other" o"external": Llama al metodo "run()" de una instancia de la - clase "Operation" o de un derivado de ella: + raise NotImplementedError - instanceName = self.operationList[opId] - instanceName.run(**kwargs) + def close(self): - opName : Si la operacion es interna (opType = 'self'), entonces el "opName" sera - usada para llamar a un metodo interno de la clase Processing + pass - opId : Si la operacion es externa (opType = 'other' o 'external), entonces el - "opId" sera usada para llamar al metodo "run" de la clase Operation - registrada anteriormente con ese Id - Exception: - Este objeto de tipo Operation debe de haber sido agregado antes con el metodo: - "addOperation" e identificado con el valor "opId" = el id de la operacion. - De lo contrario retornara un error del tipo ValueError +######### Decorator ######### - """ - if opType == 'self': +def MPDecorator(BaseClass): + + """ + "Multiprocessing class decorator" - if not opName: - raise ValueError("opName parameter should be defined") + This function add multiprocessing features to the base class. Also, + it handle the communication beetween processes (readers, procUnits and operations). + Receive the arguments at the moment of instantiation. According to that, discriminates if it + is a procUnit or an operation + """ + + class MPClass(BaseClass, Process): + + "This is the overwritten class" + operations2RunDict = None + socket_l = None + socket_p = None + socketOP = None + socket_router = None + dictProcs = None + typeProc = None + def __init__(self, *args, **kwargs): + super(MPClass, self).__init__() + Process.__init__(self) + + + self.operationKwargs = {} + self.args = args + + + self.operations2RunDict = {} + self.kwargs = kwargs + + # The number of arguments (args) determine the type of process + + if len(self.args) is 3: + self.typeProc = "ProcUnit" + self.id = args[0] #topico de publicacion + self.inputId = args[1] #topico de subcripcion + self.dictProcs = args[2] #diccionario de procesos globales + else: + self.id = args[0] + self.typeProc = "Operation" + + def addOperationKwargs(self, objId, **kwargs): + + self.operationKwargs[objId] = kwargs - sts = self.callMethod(opName, opId) + def getAllowedArgs(self): - elif opType == 'other' or opType == 'external' or opType == 'plotter': + if hasattr(self, '__attrs__'): + return self.__attrs__ + else: + return inspect.getargspec(self.run).args + + + def sockListening(self, topic): + + """ + This function create a socket to receive objects. + The 'topic' argument is related to the publisher process from which the self process is + listening (data). + In the case were the self process is listening to a Reader (proc Unit), + special conditions are introduced to maximize parallelism. + """ + + cont = zmq.Context() + zmq_socket = cont.socket(zmq.SUB) + if not os.path.exists('/tmp/socketTmp'): + os.mkdir('/tmp/socketTmp') + + if 'Reader' in self.dictProcs[self.inputId].name: + zmq_socket.connect('ipc:///tmp/socketTmp/b') + + else: + zmq_socket.connect('ipc:///tmp/socketTmp/%s' % self.inputId) + + #log.error('RECEIVING FROM {} {}'.format(self.inputId, str(topic).encode())) + zmq_socket.setsockopt(zmq.SUBSCRIBE, str(topic).encode()) #yong - if not opId: - raise ValueError("opId parameter should be defined") + return zmq_socket - if opId not in list(self.operations2RunDict.keys()): - raise ValueError("Any operation with id=%s has been added" %str(opId)) - sts = self.callObject(opId) + def listenProc(self, sock): - else: - raise ValueError("opType should be 'self', 'external' or 'plotter'; and not '%s'" %opType) + """ + This function listen to a ipc addres until a message is recovered. To serialize the + data (object), pickle has been use. + The 'sock' argument is the socket previously connect to an ipc address and with a topic subscription. + """ + + a = sock.recv_multipart() + a = pickle.loads(a[1]) + return a - return sts + def sockPublishing(self): - def setInput(self, dataIn): + """ + This function create a socket for publishing purposes. + Depending on the process type from where is created, it binds or connect + to special IPC addresses. + """ + time.sleep(4) #yong + context = zmq.Context() + zmq_socket = context.socket(zmq.PUB) + if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp') + if 'Reader' in self.dictProcs[self.id].name: + zmq_socket.connect('ipc:///tmp/socketTmp/a') + else: + zmq_socket.bind('ipc:///tmp/socketTmp/%s' % self.id) - self.dataIn = dataIn - self.dataInList.append(dataIn) + return zmq_socket - def getOutputObj(self): + def publishProc(self, sock, data): - return self.dataOut + """ + This function publish a python object (data) under a specific topic in a socket (sock). + Usually, the topic is the self id of the process. + """ - def checkInputs(self): + sock.send_multipart([str(self.id).encode(), pickle.dumps(data)]) #yong + + return True - for thisDataIn in self.dataInList: + def sockOp(self): - if thisDataIn.isEmpty(): - return False + """ + This function create a socket for communication purposes with operation processes. + """ - return True + cont = zmq.Context() + zmq_socket = cont.socket(zmq.DEALER) + + if python_version()[0] == '2': + zmq_socket.setsockopt(zmq.IDENTITY, self.id) + if python_version()[0] == '3': + zmq_socket.setsockopt_string(zmq.IDENTITY, self.id) - def setup(self): - raise NotImplementedError + return zmq_socket - def run(self): - raise NotImplementedError + def execOp(self, socket, opId, dataObj): - def close(self): - #Close every thread, queue or any other object here is it is neccesary. - return + """ + This function 'execute' an operation main routine by establishing a + connection with it and sending a python object (dataOut). + """ + if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp') + socket.connect('ipc:///tmp/socketTmp/%s' %opId) + + + socket.send(pickle.dumps(dataObj)) #yong + + argument = socket.recv_multipart()[0] + + argument = pickle.loads(argument) + + return argument + + def sockIO(self): -class Operation(object): + """ + Socket defined for an operation process. It is able to recover the object sent from another process as well as a + identifier of who sent it. + """ - """ - Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit - y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de - acumulacion dentro de esta clase + cont = zmq.Context() + if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp') + socket = cont.socket(zmq.ROUTER) + socket.bind('ipc:///tmp/socketTmp/%s' % self.id) - Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer) + return socket - """ + def funIOrec(self, socket): - __buffer = None - isConfig = False + """ + Operation method, recover the id of the process who sent a python object. + The 'socket' argument is the socket binded to a specific process ipc. + """ - def __init__(self, **kwargs): + #id_proc = socket.recv() + + #dataObj = socket.recv_pyobj() + + dataObj = socket.recv_multipart() + + dataObj[1] = pickle.loads(dataObj[1]) + return dataObj[0], dataObj[1] + + def funIOsen(self, socket, data, dest): + + """ + Operation method, send a python object to a specific destination. + The 'dest' argument is the id of a proccesinf unit. + """ + + socket.send_multipart([dest, pickle.dumps(data)]) #yong - self.__buffer = None - self.isConfig = False - self.kwargs = kwargs - if not hasattr(self, 'name'): - self.name = self.__class__.__name__ - checkKwargs(self.run, kwargs) + return True - def getAllowedArgs(self): - if hasattr(self, '__attrs__'): - return self.__attrs__ - else: - return inspect.getargspec(self.run).args - def setup(self): + def runReader(self): - self.isConfig = True + # time.sleep(3) + while True: + + BaseClass.run(self, **self.kwargs) - raise NotImplementedError - def run(self, dataIn, **kwargs): + keyList = list(self.operations2RunDict.keys()) + keyList.sort() + + for key in keyList: + self.socketOP = self.sockOp() + self.dataOut = self.execOp(self.socketOP, key, self.dataOut) - """ - Realiza las operaciones necesarias sobre la dataIn.data y actualiza los - atributos del objeto dataIn. + + if self.flagNoMoreFiles: #Usar un objeto con flags para saber si termino el proc o hubo un error + self.publishProc(self.socket_p, "Finish") + break - Input: - - dataIn : objeto del tipo JROData + if self.dataOut.flagNoData: + continue + + #print("Publishing data...") + self.publishProc(self.socket_p, self.dataOut) + # time.sleep(2) + + + print("%s done" %BaseClass.__name__) + return 0 + + def runProc(self): - Return: + # All the procUnits with kwargs that require a setup initialization must be defined here. - None + if self.setupReq: + BaseClass.setup(self, **self.kwargs) - Affected: - __buffer : buffer de recepcion de datos. + while True: + self.dataIn = self.listenProc(self.socket_l) + #print("%s received data" %BaseClass.__name__) + + if self.dataIn == "Finish": + break + + m_arg = list(self.kwargs.keys()) + num_arg = list(range(1,int(BaseClass.run.__code__.co_argcount))) + + run_arg = {} + + for var in num_arg: + if BaseClass.run.__code__.co_varnames[var] in m_arg: + run_arg[BaseClass.run.__code__.co_varnames[var]] = self.kwargs[BaseClass.run.__code__.co_varnames[var]] + + #BaseClass.run(self, **self.kwargs) + BaseClass.run(self, **run_arg) + + ## Iterar sobre una serie de data que podrias aplicarse + + for m_name in BaseClass.METHODS: + + met_arg = {} + + for arg in m_arg: + if arg in BaseClass.METHODS[m_name]: + for att in BaseClass.METHODS[m_name]: + met_arg[att] = self.kwargs[att] + + method = getattr(BaseClass, m_name) + method(self, **met_arg) + break + + if self.dataOut.flagNoData: + continue + + keyList = list(self.operations2RunDict.keys()) + keyList.sort() + + for key in keyList: + + self.socketOP = self.sockOp() + self.dataOut = self.execOp(self.socketOP, key, self.dataOut) + + + self.publishProc(self.socket_p, self.dataOut) + + + print("%s done" %BaseClass.__name__) + + return 0 + + def runOp(self): + + while True: + + [self.dest ,self.buffer] = self.funIOrec(self.socket_router) + + self.buffer = BaseClass.run(self, self.buffer, **self.kwargs) + + self.funIOsen(self.socket_router, self.buffer, self.dest) + + print("%s done" %BaseClass.__name__) + return 0 + + + def run(self): + + if self.typeProc is "ProcUnit": + + self.socket_p = self.sockPublishing() + + if 'Reader' not in self.dictProcs[self.id].name: + self.socket_l = self.sockListening(self.inputId) + self.runProc() + + else: + + self.runReader() + + elif self.typeProc is "Operation": + + self.socket_router = self.sockIO() + + self.runOp() - """ - if not self.isConfig: - self.setup(**kwargs) - - raise NotImplementedError - - def close(self): + else: + raise ValueError("Unknown type") - pass \ No newline at end of file + return 0 + + return MPClass \ No newline at end of file diff --git a/schainpy/model/proc/jroproc_spectra.py b/schainpy/model/proc/jroproc_spectra.py index bbede48..c02f770 100644 --- a/schainpy/model/proc/jroproc_spectra.py +++ b/schainpy/model/proc/jroproc_spectra.py @@ -2,16 +2,24 @@ import itertools import numpy -from .jroproc_base import ProcessingUnit, Operation +from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator, Operation from schainpy.model.data.jrodata import Spectra from schainpy.model.data.jrodata import hildebrand_sekhon -from schainpy.utils import log #yong +from schainpy.utils import log +@MPDecorator class SpectraProc(ProcessingUnit): - def __init__(self, **kwargs): + METHODS = {'selectHeights' : ['minHei', 'maxHei'], + 'selectChannels' : 'channelList', + 'selectChannelsByIndex': 'channelIndexList', + 'getBeaconSignal' : ['tauindex', 'channelindex', 'hei_ref'], + 'selectHeightsByIndex' : ['minIndex', 'maxIndex'] + } - ProcessingUnit.__init__(self, **kwargs) + def __init__(self):#, **kwargs): + + ProcessingUnit.__init__(self)#, **kwargs) self.buffer = None self.firstdatatime = None @@ -19,6 +27,7 @@ class SpectraProc(ProcessingUnit): self.dataOut = Spectra() self.id_min = None self.id_max = None + self.setupReq = False #Agregar a todas las unidades de proc def __updateSpecFromVoltage(self): @@ -134,7 +143,7 @@ class SpectraProc(ProcessingUnit): if self.dataOut.data_cspc is not None: #desplaza a la derecha en el eje 2 determinadas posiciones self.dataOut.data_cspc = numpy.roll(self.dataOut.data_cspc, shift, axis=1) - + return True if self.dataIn.type == "Voltage": @@ -774,7 +783,7 @@ class SpectraProc(ProcessingUnit): return 1 - +@MPDecorator class IncohInt(Operation): __profIndex = 0 @@ -795,9 +804,11 @@ class IncohInt(Operation): n = None - def __init__(self, **kwargs): + def __init__(self):#, **kwargs): + + Operation.__init__(self)#, **kwargs) + - Operation.__init__(self, **kwargs) # self.isConfig = False def setup(self, n=None, timeInterval=None, overlapping=False): @@ -930,7 +941,7 @@ class IncohInt(Operation): def run(self, dataOut, n=None, timeInterval=None, overlapping=False): if n == 1: return - + dataOut.flagNoData = True if not self.isConfig: @@ -950,4 +961,6 @@ class IncohInt(Operation): dataOut.nIncohInt *= self.n dataOut.utctime = avgdatatime - dataOut.flagNoData = False \ No newline at end of file + dataOut.flagNoData = False + + return dataOut \ No newline at end of file diff --git a/schainpy/model/proc/jroproc_voltage.py b/schainpy/model/proc/jroproc_voltage.py index ef624c8..dc3cfff 100644 --- a/schainpy/model/proc/jroproc_voltage.py +++ b/schainpy/model/proc/jroproc_voltage.py @@ -3,24 +3,28 @@ import numpy from scipy import interpolate #TODO #from schainpy import cSchain -from .jroproc_base import ProcessingUnit, Operation +from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator, Operation from schainpy.model.data.jrodata import Voltage -from time import time from schainpy.utils import log +from time import time +@MPDecorator class VoltageProc(ProcessingUnit): + + METHODS = {} #yong + def __init__(self):#, **kwargs): #yong - def __init__(self, **kwargs): - - ProcessingUnit.__init__(self, **kwargs) + ProcessingUnit.__init__(self)#, **kwargs) # self.objectDict = {} self.dataOut = Voltage() self.flip = 1 + self.setupReq = False #yong def run(self): + if self.dataIn.type == 'AMISR': self.__updateObjFromAmisrInput() @@ -317,7 +321,7 @@ class VoltageProc(ProcessingUnit): self.dataOut.data[:,:,botLim:topLim+1] = ynew # import collections - +@MPDecorator class CohInt(Operation): isConfig = False @@ -333,9 +337,9 @@ class CohInt(Operation): __dataToPutStride = False n = None - def __init__(self, **kwargs): + def __init__(self):#, **kwargs): - Operation.__init__(self, **kwargs) + Operation.__init__(self)#, **kwargs) # self.isConfig = False @@ -549,6 +553,7 @@ class CohInt(Operation): return avgdata, avgdatatime def run(self, dataOut, n=None, timeInterval=None, stride=None, overlapping=False, byblock=False, **kwargs): + if not self.isConfig: self.setup(n=n, stride=stride, timeInterval=timeInterval, overlapping=overlapping, byblock=byblock, **kwargs) self.isConfig = True @@ -577,7 +582,8 @@ class CohInt(Operation): # raise # dataOut.timeInterval = dataOut.ippSeconds * dataOut.nCohInt dataOut.flagNoData = False - + return dataOut +@MPDecorator class Decoder(Operation): isConfig = False @@ -588,15 +594,15 @@ class Decoder(Operation): nCode = None nBaud = None - def __init__(self, **kwargs): + def __init__(self):#, **kwargs): - Operation.__init__(self, **kwargs) + Operation.__init__(self)#, **kwargs) self.times = None self.osamp = None # self.__setValues = False - self.isConfig = False - + # self.isConfig = False + self.setupReq = False def setup(self, code, osamp, dataOut): self.__profIndex = 0 @@ -763,22 +769,22 @@ class Decoder(Operation): if self.__profIndex == self.nCode-1: self.__profIndex = 0 - return 1 + return dataOut self.__profIndex += 1 - return 1 + return dataOut # dataOut.flagDeflipData = True #asumo q la data no esta sin flip - +@MPDecorator class ProfileConcat(Operation): isConfig = False buffer = None - def __init__(self, **kwargs): + def __init__(self):#, **kwargs): - Operation.__init__(self, **kwargs) + Operation.__init__(self)#, **kwargs) self.profileIndex = 0 def reset(self): @@ -820,16 +826,17 @@ class ProfileConcat(Operation): xf = dataOut.heightList[0] + dataOut.nHeights * deltaHeight * m dataOut.heightList = numpy.arange(dataOut.heightList[0], xf, deltaHeight) dataOut.ippSeconds *= m - + return dataOut +@MPDecorator class ProfileSelector(Operation): profileIndex = None # Tamanho total de los perfiles nProfiles = None - def __init__(self, **kwargs): + def __init__(self):#, **kwargs): - Operation.__init__(self, **kwargs) + Operation.__init__(self)#, **kwargs) self.profileIndex = 0 def incProfileIndex(self): @@ -979,13 +986,14 @@ class ProfileSelector(Operation): raise ValueError("ProfileSelector needs profileList, profileRangeList or rangeList parameter") - return False - + #return False + return dataOut +@MPDecorator class Reshaper(Operation): - def __init__(self, **kwargs): + def __init__(self):#, **kwargs): - Operation.__init__(self, **kwargs) + Operation.__init__(self)#, **kwargs) self.__buffer = None self.__nitems = 0 @@ -1084,11 +1092,13 @@ class Reshaper(Operation): dataOut.ippSeconds /= self.__nTxs + return dataOut +@MPDecorator class SplitProfiles(Operation): - def __init__(self, **kwargs): + def __init__(self):#, **kwargs): - Operation.__init__(self, **kwargs) + Operation.__init__(self)#, **kwargs) def run(self, dataOut, n): @@ -1102,8 +1112,9 @@ class SplitProfiles(Operation): if shape[2] % n != 0: raise ValueError("Could not split the data, n=%d has to be multiple of %d" %(n, shape[2])) - + new_shape = shape[0], shape[1]*n, int(shape[2]/n) + dataOut.data = numpy.reshape(dataOut.data, new_shape) dataOut.flagNoData = False @@ -1123,10 +1134,12 @@ class SplitProfiles(Operation): dataOut.ippSeconds /= n + return dataOut +@MPDecorator class CombineProfiles(Operation): - def __init__(self, **kwargs): + def __init__(self):#, **kwargs): - Operation.__init__(self, **kwargs) + Operation.__init__(self)#, **kwargs) self.__remData = None self.__profileIndex = 0 @@ -1184,6 +1197,7 @@ class CombineProfiles(Operation): dataOut.ippSeconds *= n + return dataOut # import collections # from scipy.stats import mode # @@ -1318,4 +1332,4 @@ class CombineProfiles(Operation): # # self.__startIndex += self.__newNSamples # -# return \ No newline at end of file +# return