From 6b25d3b7964610e6224697407147deb5fd717b12 2019-09-30 16:43:29 From: jespinoza Date: 2019-09-30 16:43:29 Subject: [PATCH] Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files --- diff --git a/schainpy/model/io/jroIO_kamisr.py b/schainpy/model/io/jroIO_kamisr.py index 61a662f..da7adb1 100644 --- a/schainpy/model/io/jroIO_kamisr.py +++ b/schainpy/model/io/jroIO_kamisr.py @@ -627,48 +627,3 @@ class AMISRReader(ProcessingUnit): self.isConfig = True self.getData() - -class Writer(Operation): - ''' - classdocs - ''' - - def __init__(self): - ''' - Constructor - ''' - self.dataOut = None - - self.isConfig = False - - def setup(self, dataIn, path, blocksPerFile, set=0, ext=None): - ''' - In this method we should set all initial parameters. - - Input: - dataIn : Input data will also be outputa data - - ''' - self.dataOut = dataIn - - - - - - self.isConfig = True - - return - - def run(self, dataIn, **kwargs): - ''' - This method will be called many times so here you should put all your code - - Inputs: - - dataIn : object with the data - - ''' - - if not self.isConfig: - self.setup(dataIn, **kwargs) - \ No newline at end of file diff --git a/schainpy/model/io/jroIO_madrigal.py b/schainpy/model/io/jroIO_madrigal.py index 3a3cd25..d99145c 100644 --- a/schainpy/model/io/jroIO_madrigal.py +++ b/schainpy/model/io/jroIO_madrigal.py @@ -15,7 +15,7 @@ import numpy import h5py import schainpy.admin -from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter +from schainpy.model.io.jroIO_base import LOCALTIME, Reader from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator from schainpy.model.data.jrodata import Parameters from schainpy.utils import log @@ -81,7 +81,7 @@ def load_json(obj): return iterable @MPDecorator -class MADReader(JRODataReader, ProcessingUnit): +class MADReader(Reader, ProcessingUnit): def __init__(self): @@ -91,93 +91,78 @@ class MADReader(JRODataReader, ProcessingUnit): self.counter_records = 0 self.nrecords = None self.flagNoMoreFiles = 0 - self.isConfig = False self.filename = None self.intervals = set() + self.datatime = datetime.datetime(1900,1,1) + self.format = None + self.filefmt = "***%Y%m%d*******" - def setup(self, - path=None, - startDate=None, - endDate=None, - format=None, - startTime=datetime.time(0, 0, 0), - endTime=datetime.time(23, 59, 59), - **kwargs): + def setup(self, **kwargs): - self.path = path - self.startDate = startDate - self.endDate = endDate - self.startTime = startTime - self.endTime = endTime - self.datatime = datetime.datetime(1900,1,1) - self.oneDDict = load_json(kwargs.get('oneDDict', - "{\"GDLATR\":\"lat\", \"GDLONR\":\"lon\"}")) - self.twoDDict = load_json(kwargs.get('twoDDict', - "{\"GDALT\": \"heightList\"}")) - self.independentParam = 'GDALT' + self.set_kwargs(**kwargs) + self.oneDDict = load_json(self.oneDDict) + self.twoDDict = load_json(self.twoDDict) + self.ind2DList = load_json(self.ind2DList) + self.independentParam = self.ind2DList[0] if self.path is None: raise ValueError('The path is not valid') - if format is None: + self.open_file = open + self.open_mode = 'rb' + + if self.format is None: raise ValueError('The format is not valid choose simple or hdf5') - elif format.lower() in ('simple', 'txt'): + elif self.format.lower() in ('simple', 'txt'): self.ext = '.txt' - elif format.lower() in ('cedar',): + elif self.format.lower() in ('cedar',): self.ext = '.001' else: self.ext = '.hdf5' + self.open_file = h5py.File + self.open_mode = 'r' - self.search_files(self.path) - self.fileId = 0 - - if not self.fileList: - raise Warning('There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path)) - - self.setNextFile() - - def search_files(self, path): - ''' - Searching for madrigal files in path - Creating a list of files to procces included in [startDate,endDate] - - Input: - path - Path to find files - ''' - - log.log('Searching files {} in {} '.format(self.ext, path), 'MADReader') - fileList0 = glob.glob1(path, '*{}'.format(self.ext)) - fileList0.sort() - - self.fileList = [] - self.dateFileList = [] - - startDate = self.startDate - datetime.timedelta(1) - endDate = self.endDate + datetime.timedelta(1) - - for thisFile in fileList0: - year = thisFile[3:7] - if not year.isdigit(): - continue - - month = thisFile[7:9] - if not month.isdigit(): - continue + if self.online: + log.log("Searching files in online mode...", self.name) - day = thisFile[9:11] - if not day.isdigit(): - continue + for nTries in range(self.nTries): + fullpath = self.searchFilesOnLine(self.path, self.startDate, + self.endDate, self.expLabel, self.ext, self.walk, + self.filefmt, self.folderfmt) - year, month, day = int(year), int(month), int(day) - dateFile = datetime.date(year, month, day) + try: + fullpath = next(fullpath) + except: + fullpath = None + + if fullpath: + break - if (startDate > dateFile) or (endDate < dateFile): - continue + log.warning( + 'Waiting {} sec for a valid file in {}: try {} ...'.format( + self.delay, self.path, nTries + 1), + self.name) + time.sleep(self.delay) + + if not(fullpath): + raise schainpy.admin.SchainError( + 'There isn\'t any valid file in {}'.format(self.path)) + + else: + log.log("Searching files in {}".format(self.path), self.name) + self.filenameList = self.searchFilesOffLine(self.path, self.startDate, + self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt) + + self.setNextFile() - self.fileList.append(thisFile) - self.dateFileList.append(dateFile) + def readFirstHeader(self): + '''Read header and data''' - return + self.parseHeader() + self.parseData() + self.blockIndex = 0 + + return def parseHeader(self): ''' @@ -237,51 +222,13 @@ class MADReader(JRODataReader, ProcessingUnit): self.times = numpy.unique(self.data['Table Layout']['ut1_unix']) self.counter_records = int(self.data['Table Layout']['recno'][0]) self.nrecords = int(self.data['Table Layout']['recno'][-1]) - - def setNextFile(self): - ''' - ''' - - file_id = self.fileId - - if file_id == len(self.fileList): - log.success('No more files', 'MADReader') - self.flagNoMoreFiles = 1 - return 0 - - log.success( - 'Opening: {}'.format(self.fileList[file_id]), - 'MADReader' - ) - - filename = os.path.join(self.path, self.fileList[file_id]) - - if self.filename is not None: - self.fp.close() - - self.filename = filename - self.filedate = self.dateFileList[file_id] - - if self.ext=='.hdf5': - self.fp = h5py.File(self.filename, 'r') - else: - self.fp = open(self.filename, 'rb') - - self.parseHeader() - self.parseData() - self.sizeOfFile = os.path.getsize(self.filename) - self.flagIsNewFile = 0 - self.fileId += 1 - - return 1 def readNextBlock(self): while True: self.flagDiscontinuousBlock = 0 - if self.flagIsNewFile: - if not self.setNextFile(): - return 0 + if self.counter_records == self.nrecords: + self.setNextFile() self.readBlock() @@ -321,7 +268,6 @@ class MADReader(JRODataReader, ProcessingUnit): dum.append(self.data[self.counter_records]) self.counter_records += 1 if self.counter_records == self.nrecords: - self.flagIsNewFile = True break continue self.intervals.add((datatime-self.datatime).seconds) @@ -334,9 +280,7 @@ class MADReader(JRODataReader, ProcessingUnit): if datatime.date()>self.datatime.date(): self.flagDiscontinuousBlock = 1 self.datatime = datatime - self.counter_records += 1 - if self.counter_records == self.nrecords: - self.flagIsNewFile = True + self.counter_records += 1 self.buffer = numpy.array(dum) return @@ -390,10 +334,6 @@ class MADReader(JRODataReader, ProcessingUnit): ''' Storing data from databuffer to dataOut object ''' - if self.flagNoMoreFiles: - self.dataOut.flagNoData = True - raise schainpy.admin.SchainError('No file left to process') - return 0 if not self.readNextBlock(): self.dataOut.flagNoData = True @@ -403,10 +343,49 @@ class MADReader(JRODataReader, ProcessingUnit): return 1 + def run(self, **kwargs): + + if not(self.isConfig): + self.setup(**kwargs) + self.isConfig = True + + self.getData() + + return + @MPDecorator class MADWriter(Operation): - - missing = -32767 + '''Writing module for Madrigal files + +type: external + +Inputs: + path path where files will be created + oneDDict json of one-dimensional parameters in record where keys + are Madrigal codes (integers or mnemonics) and values the corresponding + dataOut attribute e.g: { + 'gdlatr': 'lat', + 'gdlonr': 'lon', + 'gdlat2':'lat', + 'glon2':'lon'} + ind2DList list of independent spatial two-dimensional parameters e.g: + ['heigthList'] + twoDDict json of two-dimensional parameters in record where keys + are Madrigal codes (integers or mnemonics) and values the corresponding + dataOut attribute if multidimensional array specify as tupple + ('attr', pos) e.g: { + 'gdalt': 'heightList', + 'vn1p2': ('data_output', 0), + 'vn2p2': ('data_output', 1), + 'vn3': ('data_output', 2), + 'snl': ('data_SNR', 'db') + } + metadata json of madrigal metadata (kinst, kindat, catalog and header) + format hdf5, cedar + blocks number of blocks per file''' + + __attrs__ = ['path', 'oneDDict', 'ind2DList', 'twoDDict','metadata', 'format', 'blocks'] + missing = -32767 def __init__(self): @@ -416,41 +395,18 @@ class MADWriter(Operation): self.path = None self.fp = None - def run(self, dataOut, path, oneDDict, independentParam='[]', twoDDict='{}', + def run(self, dataOut, path, oneDDict, ind2DList='[]', twoDDict='{}', metadata='{}', format='cedar', **kwargs): - ''' - Inputs: - path - path where files will be created - oneDDict - json of one-dimensional parameters in record where keys - are Madrigal codes (integers or mnemonics) and values the corresponding - dataOut attribute e.g: { - 'gdlatr': 'lat', - 'gdlonr': 'lon', - 'gdlat2':'lat', - 'glon2':'lon'} - independentParam - list of independent spatial two-dimensional parameters e.g: - ['heigthList'] - twoDDict - json of two-dimensional parameters in record where keys - are Madrigal codes (integers or mnemonics) and values the corresponding - dataOut attribute if multidimensional array specify as tupple - ('attr', pos) e.g: { - 'gdalt': 'heightList', - 'vn1p2': ('data_output', 0), - 'vn2p2': ('data_output', 1), - 'vn3': ('data_output', 2), - 'snl': ('data_SNR', 'db') - } - metadata - json of madrigal metadata (kinst, kindat, catalog and header) - ''' + if not self.isConfig: - self.setup(path, oneDDict, independentParam, twoDDict, metadata, format, **kwargs) + self.setup(path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs) self.isConfig = True self.dataOut = dataOut self.putData() return 1 - def setup(self, path, oneDDict, independentParam, twoDDict, metadata, format, **kwargs): + def setup(self, path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs): ''' Configure Operation ''' @@ -460,7 +416,7 @@ class MADWriter(Operation): self.counter = 0 self.oneDDict = load_json(oneDDict) self.twoDDict = load_json(twoDDict) - self.independentParam = load_json(independentParam) + self.ind2DList = load_json(ind2DList) meta = load_json(metadata) self.kinst = meta.get('kinst') self.kindat = meta.get('kindat') @@ -471,7 +427,7 @@ class MADWriter(Operation): self.extra_args = {} elif format == 'hdf5': self.ext = '.hdf5' - self.extra_args = {'independentParam': self.independentParam} + self.extra_args = {'ind2DList': self.ind2DList} self.keys = [k.lower() for k in self.twoDDict] if 'range' in self.keys: @@ -504,6 +460,8 @@ class MADWriter(Operation): log.success( 'Creating file: {}'.format(self.fullname), 'MADWriter') + if not os.path.exists(self.path): + os.makedirs(self.path) self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True) except ValueError as e: log.error( @@ -546,11 +504,11 @@ class MADWriter(Operation): tmp = 10*numpy.log10(SNRavg) else: tmp = getattr(self.dataOut, value) - out[key] = tmp.flatten() + out[key] = tmp.flatten()[:len(heights)] elif isinstance(value, (tuple, list)): attr, x = value - data = getattr(self.dataOut, attr) - out[key] = data[int(x)] + data = getattr(self.dataOut, attr) + out[key] = data[int(x)][:len(heights)] a = numpy.array([out[k] for k in self.keys]) nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))]) diff --git a/schainpy/model/io/jroIO_param.py b/schainpy/model/io/jroIO_param.py index 631913b..cdb91da 100644 --- a/schainpy/model/io/jroIO_param.py +++ b/schainpy/model/io/jroIO_param.py @@ -966,218 +966,82 @@ class ParamWriter(Operation): @MPDecorator -class ParameterReader(JRODataReader,ProcessingUnit): +class ParameterReader(Reader, ProcessingUnit): ''' Reads HDF5 format files ''' - ext = ".hdf5" - optchar = "D" - timezone = None - startTime = None - endTime = None - fileIndex = None - blockList = None #List to blocks to be read from the file - blocksPerFile = None #Number of blocks to be read - blockIndex = None - path = None - #List of Files - filenameList = None - datetimeList = None - #Hdf5 File - listMetaname = None - listMeta = None - listDataname = None - listData = None - listShapes = None - fp = None - #dataOut reconstruction - dataOut = None - def __init__(self): ProcessingUnit.__init__(self) self.dataOut = Parameters() - return + self.ext = ".hdf5" + self.optchar = "D" + self.timezone = "lt" + self.listMetaname = [] + self.listMeta = [] + self.listDataname = [] + self.listData = [] + self.listShapes = [] + self.open_file = h5py.File + self.open_mode = 'r' + self.metadata = False + self.filefmt = "*%Y%j***" + self.folderfmt = "*%Y%j" def setup(self, **kwargs): - path = kwargs['path'] - startDate = kwargs['startDate'] - endDate = kwargs['endDate'] - startTime = kwargs['startTime'] - endTime = kwargs['endTime'] - walk = kwargs['walk'] - if 'ext' in kwargs: - ext = kwargs['ext'] - else: - ext = '.hdf5' - if 'timezone' in kwargs: - self.timezone = kwargs['timezone'] - else: - self.timezone = 'lt' - - print("[Reading] Searching files in offline mode ...") - pathList, filenameList = self.searchFilesOffLine(path, startDate=startDate, endDate=endDate, - startTime=startTime, endTime=endTime, - ext=ext, walk=walk) + self.set_kwargs(**kwargs) + if not self.ext.startswith('.'): + self.ext = '.{}'.format(self.ext) - if not(filenameList): - print("There is no files into the folder: %s"%(path)) - sys.exit(-1) - - self.fileIndex = -1 - self.startTime = startTime - self.endTime = endTime - self.__readMetadata() - self.__setNextFileOffline() - - return + if self.online: + log.log("Searching files in online mode...", self.name) - def searchFilesOffLine(self, path, startDate=None, endDate=None, startTime=datetime.time(0,0,0), endTime=datetime.time(23,59,59), ext='.hdf5', walk=True): + for nTries in range(self.nTries): + fullpath = self.searchFilesOnLine(self.path, self.startDate, + self.endDate, self.expLabel, self.ext, self.walk, + self.filefmt, self.folderfmt) - expLabel = '' - self.filenameList = [] - self.datetimeList = [] - pathList = [] - dateList, pathList = self.findDatafiles(path, startDate, endDate, expLabel, ext, walk, include_path=True) - - if dateList == []: - print("[Reading] No *%s files in %s from %s to %s)"%(ext, path, - datetime.datetime.combine(startDate,startTime).ctime(), - datetime.datetime.combine(endDate,endTime).ctime())) - - return None, None - - if len(dateList) > 1: - print("[Reading] %d days were found in date range: %s - %s" %(len(dateList), startDate, endDate)) + try: + fullpath = next(fullpath) + except: + fullpath = None + + if fullpath: + break + + log.warning( + 'Waiting {} sec for a valid file in {}: try {} ...'.format( + self.delay, self.path, nTries + 1), + self.name) + time.sleep(self.delay) + + if not(fullpath): + raise schainpy.admin.SchainError( + 'There isn\'t any valid file in {}'.format(self.path)) + + pathname, filename = os.path.split(fullpath) + self.year = int(filename[1:5]) + self.doy = int(filename[5:8]) + self.set = int(filename[8:11]) - 1 else: - print("[Reading] data was found for the date %s" %(dateList[0])) - - filenameList = [] - datetimeList = [] - - for thisPath in pathList: - - fileList = glob.glob1(thisPath, "*%s" %ext) - fileList.sort() - - for file in fileList: - - filename = os.path.join(thisPath,file) - - if not isFileInDateRange(filename, startDate, endDate): - continue - - thisDatetime = self.__isFileInTimeRange(filename, startDate, endDate, startTime, endTime) - - if not(thisDatetime): - continue - - filenameList.append(filename) - datetimeList.append(thisDatetime) - - if not(filenameList): - print("[Reading] Any file was found int time range %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime())) - return None, None - - print("[Reading] %d file(s) was(were) found in time range: %s - %s" %(len(filenameList), startTime, endTime)) - print() - - self.filenameList = filenameList - self.datetimeList = datetimeList - - return pathList, filenameList - - def __isFileInTimeRange(self,filename, startDate, endDate, startTime, endTime): - - """ - Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado. - - Inputs: - filename : nombre completo del archivo de datos en formato Jicamarca (.r) - startDate : fecha inicial del rango seleccionado en formato datetime.date - endDate : fecha final del rango seleccionado en formato datetime.date - startTime : tiempo inicial del rango seleccionado en formato datetime.time - endTime : tiempo final del rango seleccionado en formato datetime.time - - Return: - Boolean : Retorna True si el archivo de datos contiene datos en el rango de - fecha especificado, de lo contrario retorna False. - - Excepciones: - Si el archivo no existe o no puede ser abierto - Si la cabecera no puede ser leida. - - """ - - try: - fp = h5py.File(filename, 'r') - grp1 = fp['Data'] - - except IOError: - traceback.print_exc() - raise IOError("The file %s can't be opened" %(filename)) - #In case has utctime attribute - grp2 = grp1['utctime'] - thisUtcTime = grp2.value[0] - - fp.close() - - if self.timezone == 'lt': - thisUtcTime -= 5*3600 - - thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime + 5*3600) - thisDate = thisDatetime.date() - thisTime = thisDatetime.time() - - startUtcTime = (datetime.datetime.combine(thisDate,startTime)- datetime.datetime(1970, 1, 1)).total_seconds() - endUtcTime = (datetime.datetime.combine(thisDate,endTime)- datetime.datetime(1970, 1, 1)).total_seconds() - - #General case - # o>>>>>>>>>>>>>><<<<<<<<<<<<<= startTime: - thisUtcLog = numpy.logical_and(thisUtcTime > startUtcTime, thisUtcTime < endUtcTime) - if numpy.any(thisUtcLog): #If there is one block between the hours mentioned - return thisDatetime - return None - - #If endTime < startTime then endTime belongs to the next day - #<<<<<<<<<<>>>>>>>>>> - #-----------o----------------------------o----------- - # endTime startTime - - if (thisDate == startDate) and numpy.all(thisUtcTime < startUtcTime): - return None - - if (thisDate == endDate) and numpy.all(thisUtcTime > endUtcTime): - return None - - if numpy.all(thisUtcTime < startUtcTime) and numpy.all(thisUtcTime > endUtcTime): - return None - - return thisDatetime - - def __setNextFileOffline(self): - - self.fileIndex += 1 - idFile = self.fileIndex - - if not(idFile < len(self.filenameList)): - raise schainpy.admin.SchainError('No more files') + log.log("Searching files in {}".format(self.path), self.name) + self.filenameList = self.searchFilesOffLine(self.path, self.startDate, + self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt) + + self.setNextFile() - filename = self.filenameList[idFile] - self.fp = h5py.File(filename, 'r') - self.filename = filename + return - print("Setting the file: %s"%self.filename) + def readFirstHeader(self): + '''Read metadata and data''' - self.__setBlockList() + self.__readMetadata() self.__readData() + self.__setBlockList() self.blockIndex = 0 - return 1 + + return def __setBlockList(self): ''' @@ -1190,12 +1054,13 @@ class ParameterReader(JRODataReader,ProcessingUnit): self.blocksPerFile ''' - fp = self.fp + startTime = self.startTime endTime = self.endTime - grp = fp['Data'] - thisUtcTime = grp['utctime'].value + index = self.listDataname.index('utctime') + thisUtcTime = self.listData[index] + self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1]) if self.timezone == 'lt': thisUtcTime -= 5*3600 @@ -1219,51 +1084,78 @@ class ParameterReader(JRODataReader,ProcessingUnit): Reads Metadata ''' - filename = self.filenameList[0] - fp = h5py.File(filename, 'r') - gp = fp['Metadata'] listMetaname = [] listMetadata = [] - - for item in list(gp.items()): - name = item[0] - - if name=='variables': - table = gp[name][:] - listShapes = {} - for shapes in table: - listShapes[shapes[0].decode()] = numpy.array([shapes[1]]) - else: - data = gp[name].value - listMetaname.append(name) - listMetadata.append(data) + if 'Metadata' in self.fp: + gp = self.fp['Metadata'] + for item in list(gp.items()): + name = item[0] + + if name=='variables': + table = gp[name][:] + listShapes = {} + for shapes in table: + listShapes[shapes[0].decode()] = numpy.array([shapes[1]]) + else: + data = gp[name].value + listMetaname.append(name) + listMetadata.append(data) + elif self.metadata: + metadata = json.loads(self.metadata) + listShapes = {} + for tup in metadata: + name, values, dim = tup + if dim == -1: + listMetaname.append(name) + listMetadata.append(self.fp[values].value) + else: + listShapes[name] = numpy.array([dim]) + else: + raise IOError('Missing Metadata group in file or metadata info') self.listShapes = listShapes self.listMetaname = listMetaname - self.listMeta = listMetadata + self.listMeta = listMetadata - fp.close() return def __readData(self): - grp = self.fp['Data'] listdataname = [] listdata = [] - - for item in list(grp.items()): - name = item[0] - listdataname.append(name) - dim = self.listShapes[name][0] - if dim == 0: - array = grp[name].value - else: - array = [] - for i in range(dim): - array.append(grp[name]['table{:02d}'.format(i)].value) - array = numpy.array(array) - - listdata.append(array) + + if 'Data' in self.fp: + grp = self.fp['Data'] + for item in list(grp.items()): + name = item[0] + listdataname.append(name) + dim = self.listShapes[name][0] + if dim == 0: + array = grp[name].value + else: + array = [] + for i in range(dim): + array.append(grp[name]['table{:02d}'.format(i)].value) + array = numpy.array(array) + + listdata.append(array) + elif self.metadata: + metadata = json.loads(self.metadata) + for tup in metadata: + name, values, dim = tup + listdataname.append(name) + if dim == -1: + continue + elif dim == 0: + array = self.fp[values].value + else: + array = [] + for var in values: + array.append(self.fp[var].value) + array = numpy.array(array) + listdata.append(array) + else: + raise IOError('Missing Data group in file or metadata info') self.listDataname = listdataname self.listData = listdata @@ -1281,6 +1173,7 @@ class ParameterReader(JRODataReader,ProcessingUnit): else: setattr(self.dataOut, self.listDataname[j], self.listData[j][:,self.blockIndex]) + self.dataOut.paramInterval = self.interval self.dataOut.flagNoData = False self.blockIndex += 1 @@ -1293,9 +1186,7 @@ class ParameterReader(JRODataReader,ProcessingUnit): self.isConfig = True if self.blockIndex == self.blocksPerFile: - if not(self.__setNextFileOffline()): - self.dataOut.flagNoData = True - return 0 + self.setNextFile() self.getData() diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index cbbd721..156c926 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -42,6 +42,8 @@ class ProcessingUnit(object): """ + proc_type = 'processing' + __attrs__ = [] def __init__(self): @@ -128,6 +130,8 @@ class Operation(object): Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer) """ + proc_type = 'operation' + __attrs__ = [] def __init__(self): @@ -178,54 +182,31 @@ class Operation(object): class InputQueue(Thread): ''' - Class to hold input data for Proccessing Units and external Operations, - ''' - - def __init__(self, project_id, inputId): Thread.__init__(self) - self.queue = Queue() - self.project_id = project_id - self.inputId = inputId - - def run(self): - - c = zmq.Context() - self.receiver = c.socket(zmq.SUB) - self.receiver.connect( - 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) - self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) - - - + while True: - self.queue.put(self.receiver.recv_multipart()[1]) - - def get(self): - - - + return pickle.loads(self.queue.get()) - - + def MPDecorator(BaseClass): """ @@ -248,6 +229,7 @@ def MPDecorator(BaseClass): self.i = 0 self.t = time.time() self.name = BaseClass.__name__ + self.__doc__ = BaseClass.__doc__ if 'plot' in self.name.lower() and not self.name.endswith('_'): self.name = '{}{}'.format(self.CODE.upper(), 'Plot')