From f1f11d1fc58680079c9aea2999afe5017b117889 2018-08-01 20:54:38 From: George Yong Date: 2018-08-01 20:54:38 Subject: [PATCH] Multiprocessing for heispectra (Fits) all working --- diff --git a/schainpy/controller.py b/schainpy/controller.py index 42ad19d..b3587bd 100644 --- a/schainpy/controller.py +++ b/schainpy/controller.py @@ -18,19 +18,9 @@ from xml.dom import minidom from schainpy.admin import Alarm, SchainWarning - -### Temporary imports!!! -# from schainpy.model import * -from schainpy.model.io import * -from schainpy.model.graphics import * -from schainpy.model.proc.jroproc_base import * -from schainpy.model.proc.bltrproc_parameters import * -from schainpy.model.proc.jroproc_spectra import * -from schainpy.model.proc.jroproc_voltage import * -from schainpy.model.proc.jroproc_parameters import * -from schainpy.model.utils.jroutils_publish import * +from schainpy.model import * from schainpy.utils import log -### + DTYPES = { 'Voltage': '.r', @@ -683,6 +673,7 @@ class ProcUnitConf(): ''' Instancia de unidades de procesamiento. ''' + className = eval(self.name) kwargs = self.getKwargs() procUnitObj = className(self.id, self.inputId, self.project_id, **kwargs) # necesitan saber su id y su entrada por fines de ipc diff --git a/schainpy/model/data/jrodata.py b/schainpy/model/data/jrodata.py index bebece8..2fc4394 100644 --- a/schainpy/model/data/jrodata.py +++ b/schainpy/model/data/jrodata.py @@ -368,7 +368,7 @@ class Voltage(JROData): # self.nChannels = 0 # self.nHeights = 0 self.nProfiles = None - self.heightList = Non + self.heightList = None self.channelList = None # self.channelIndexList = None self.flagNoData = True @@ -848,6 +848,12 @@ class Fits(JROData): return timeInterval + def get_ippSeconds(self): + ''' + ''' + return self.ipp_sec + + datatime = property(getDatatime, "I'm the 'datatime' property") nHeights = property(getNHeights, "I'm the 'nHeights' property.") nChannels = property(getNChannels, "I'm the 'nChannel' property.") @@ -857,7 +863,7 @@ class Fits(JROData): ltctime = property(getltctime, "I'm the 'ltctime' property") timeInterval = property(getTimeInterval, "I'm the 'timeInterval' property") - + ippSeconds = property(get_ippSeconds, '') class Correlation(JROData): diff --git a/schainpy/model/graphics/jroplot_heispectra.py b/schainpy/model/graphics/jroplot_heispectra.py index 2bca309..e69070c 100644 --- a/schainpy/model/graphics/jroplot_heispectra.py +++ b/schainpy/model/graphics/jroplot_heispectra.py @@ -9,7 +9,10 @@ import numpy from .figure import Figure, isRealtime from .plotting_codes import * +from schainpy.model.proc.jroproc_base import MPDecorator + +@MPDecorator class SpectraHeisScope_(Figure): @@ -20,9 +23,9 @@ class SpectraHeisScope_(Figure): HEIGHTPROF = None PREFIX = 'spc' - def __init__(self, **kwargs): + def __init__(self):#, **kwargs): - Figure.__init__(self, **kwargs) + Figure.__init__(self)#, **kwargs) self.isConfig = False self.__nsubplots = 1 @@ -96,6 +99,9 @@ class SpectraHeisScope_(Figure): ymax : None, """ + if dataOut.flagNoData: + return dataOut + if dataOut.realtime: if not(isRealtime(utcdatatime = dataOut.utctime)): print('Skipping this plot function') @@ -173,6 +179,9 @@ class SpectraHeisScope_(Figure): wr_period=wr_period, thisDatetime=thisDatetime) + return dataOut + +@MPDecorator class RTIfromSpectraHeis_(Figure): isConfig = None @@ -180,8 +189,8 @@ class RTIfromSpectraHeis_(Figure): PREFIX = 'rtinoise' - def __init__(self, **kwargs): - Figure.__init__(self, **kwargs) + def __init__(self):#, **kwargs): + Figure.__init__(self)#, **kwargs) self.timerange = 24*60*60 self.isConfig = False self.__nsubplots = 1 @@ -231,6 +240,10 @@ class RTIfromSpectraHeis_(Figure): server=None, folder=None, username=None, password=None, ftp_wei=0, exp_code=0, sub_exp_code=0, plot_pos=0): + if dataOut.flagNoData: + return dataOut + + if channelList == None: channelIndexList = dataOut.channelIndexList channelList = dataOut.channelList @@ -326,4 +339,7 @@ class RTIfromSpectraHeis_(Figure): ftp=ftp, wr_period=wr_period, thisDatetime=thisDatetime, - update_figfile=update_figfile) \ No newline at end of file + update_figfile=update_figfile) + + + return dataOut \ No newline at end of file diff --git a/schainpy/model/io/jroIO_heispectra.py b/schainpy/model/io/jroIO_heispectra.py index bbf8748..69b81a3 100644 --- a/schainpy/model/io/jroIO_heispectra.py +++ b/schainpy/model/io/jroIO_heispectra.py @@ -20,7 +20,9 @@ from xml.etree.ElementTree import ElementTree from .jroIO_base import isRadarFolder, isNumber from schainpy.model.data.jrodata import Fits -from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit +from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit, MPDecorator +from schainpy.utils import log + class PyFits(object): name=None @@ -281,7 +283,7 @@ class FitsWriter(Operation): self.isConfig = True self.putData() - +@MPDecorator class FitsReader(ProcessingUnit): # __TIMEZONE = time.timezone @@ -298,8 +300,8 @@ class FitsReader(ProcessingUnit): data = None data_header_dict = None - def __init__(self, **kwargs): - ProcessingUnit.__init__(self, **kwargs) + def __init__(self):#, **kwargs): + ProcessingUnit.__init__(self)#, **kwargs) self.isConfig = False self.ext = '.fits' self.setFile = 0 @@ -391,8 +393,7 @@ class FitsReader(ProcessingUnit): self.dataOut.nCohInt = self.nCohInt self.dataOut.nIncohInt = self.nIncohInt - - self.dataOut.ippSeconds = self.ippSeconds + self.dataOut.ipp_sec = self.ippSeconds def readHeader(self): headerObj = self.fitsObj[0] @@ -691,18 +692,17 @@ class FitsReader(ProcessingUnit): if self.flagNoMoreFiles: self.dataOut.flagNoData = True - print('Process finished') - return 0 + return (0, 'No more files') self.flagDiscontinuousBlock = 0 self.flagIsNewBlock = 0 if not(self.readNextBlock()): - return 0 + return (1, 'Error reading data') if self.data is None: self.dataOut.flagNoData = True - return 0 + return (0, 'No more data') self.dataOut.data = self.data self.dataOut.data_header = self.data_header_dict @@ -718,8 +718,7 @@ class FitsReader(ProcessingUnit): # self.dataOut.channelList = self.channelList # self.dataOut.heightList = self.heightList self.dataOut.flagNoData = False - - return self.dataOut.data + # return self.dataOut.data def run(self, **kwargs): @@ -729,6 +728,7 @@ class FitsReader(ProcessingUnit): self.getData() +@MPDecorator class SpectraHeisWriter(Operation): # set = None setFile = None @@ -736,8 +736,8 @@ class SpectraHeisWriter(Operation): doypath = None subfolder = None - def __init__(self, **kwargs): - Operation.__init__(self, **kwargs) + def __init__(self):#, **kwargs): + Operation.__init__(self)#, **kwargs) self.wrObj = PyFits() # self.dataOut = dataOut self.nTotalBlocks=0 @@ -845,4 +845,5 @@ class SpectraHeisWriter(Operation): self.setup(dataOut, **kwargs) self.isConfig = True - self.putData() \ No newline at end of file + self.putData() + return dataOut \ No newline at end of file diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index 5dda991..98e9c5a 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -214,14 +214,14 @@ def MPDecorator(BaseClass): self.receiver.connect( 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) - + def listen(self): ''' This function waits for objects and deserialize using pickle ''' - + data = pickle.loads(self.receiver.recv_multipart()[1]) - + return data def set_publisher(self): @@ -278,13 +278,16 @@ def MPDecorator(BaseClass): ''' while True: - self.dataIn = self.listen() + self.dataIn = self.listen() if self.dataIn.flagNoData and self.dataIn.error is None: continue - + BaseClass.run(self, **self.kwargs) + if self.dataOut.flagNoData: + continue + for op, optype, opId, kwargs in self.operations: if optype == 'self': op(**kwargs) @@ -304,7 +307,7 @@ def MPDecorator(BaseClass): Run function for external operations (this operations just receive data ex: plots, writers, publishers) ''' - + while True: dataOut = self.listen() @@ -316,11 +319,12 @@ def MPDecorator(BaseClass): time.sleep(1) def run(self): - if self.typeProc is "ProcUnit": if self.inputId is not None: + self.subscribe() + self.set_publisher() if 'Reader' not in BaseClass.__name__: diff --git a/schainpy/model/proc/jroproc_heispectra.py b/schainpy/model/proc/jroproc_heispectra.py index 23f6989..1e3a11e 100644 --- a/schainpy/model/proc/jroproc_heispectra.py +++ b/schainpy/model/proc/jroproc_heispectra.py @@ -1,13 +1,16 @@ import numpy -from .jroproc_base import ProcessingUnit, Operation +from .jroproc_base import ProcessingUnit, Operation, MPDecorator from schainpy.model.data.jrodata import SpectraHeis +from schainpy.utils import log + +@MPDecorator class SpectraHeisProc(ProcessingUnit): - def __init__(self, **kwargs): + def __init__(self):#, **kwargs): - ProcessingUnit.__init__(self, **kwargs) + ProcessingUnit.__init__(self)#, **kwargs) # self.buffer = None # self.firstdatatime = None @@ -86,7 +89,7 @@ class SpectraHeisProc(ProcessingUnit): if self.dataIn.type == "Fits": self.__updateObjFromFits() self.dataOut.flagNoData = False - return + return if self.dataIn.type == "SpectraHeis": self.dataOut.copy(self.dataIn) @@ -145,6 +148,7 @@ class SpectraHeisProc(ProcessingUnit): return 1 + class IncohInt4SpectraHeis(Operation): isConfig = False @@ -163,9 +167,9 @@ class IncohInt4SpectraHeis(Operation): n = None - def __init__(self, **kwargs): + def __init__(self):#, **kwargs): - Operation.__init__(self, **kwargs) + Operation.__init__(self)#, **kwargs) # self.isConfig = False def setup(self, n=None, timeInterval=None, overlapping=False): @@ -341,4 +345,6 @@ class IncohInt4SpectraHeis(Operation): dataOut.utctime = avgdatatime # dataOut.timeInterval = dataOut.ippSeconds * dataOut.nIncohInt # dataOut.timeInterval = self.__timeInterval*self.n - dataOut.flagNoData = False \ No newline at end of file + dataOut.flagNoData = False + + return dataOut \ No newline at end of file