##// END OF EJS Templates
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
jespinoza -
r1254:6b25d3b79646
parent child
Show More
@@ -627,48 +627,3 class AMISRReader(ProcessingUnit):
627 self.isConfig = True
627 self.isConfig = True
628
628
629 self.getData()
629 self.getData()
630
631 class Writer(Operation):
632 '''
633 classdocs
634 '''
635
636 def __init__(self):
637 '''
638 Constructor
639 '''
640 self.dataOut = None
641
642 self.isConfig = False
643
644 def setup(self, dataIn, path, blocksPerFile, set=0, ext=None):
645 '''
646 In this method we should set all initial parameters.
647
648 Input:
649 dataIn : Input data will also be outputa data
650
651 '''
652 self.dataOut = dataIn
653
654
655
656
657
658 self.isConfig = True
659
660 return
661
662 def run(self, dataIn, **kwargs):
663 '''
664 This method will be called many times so here you should put all your code
665
666 Inputs:
667
668 dataIn : object with the data
669
670 '''
671
672 if not self.isConfig:
673 self.setup(dataIn, **kwargs)
674 No newline at end of file
@@ -15,7 +15,7 import numpy
15 import h5py
15 import h5py
16
16
17 import schainpy.admin
17 import schainpy.admin
18 from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
18 from schainpy.model.io.jroIO_base import LOCALTIME, Reader
19 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
19 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
20 from schainpy.model.data.jrodata import Parameters
20 from schainpy.model.data.jrodata import Parameters
21 from schainpy.utils import log
21 from schainpy.utils import log
@@ -81,7 +81,7 def load_json(obj):
81 return iterable
81 return iterable
82
82
83 @MPDecorator
83 @MPDecorator
84 class MADReader(JRODataReader, ProcessingUnit):
84 class MADReader(Reader, ProcessingUnit):
85
85
86 def __init__(self):
86 def __init__(self):
87
87
@@ -91,93 +91,78 class MADReader(JRODataReader, ProcessingUnit):
91 self.counter_records = 0
91 self.counter_records = 0
92 self.nrecords = None
92 self.nrecords = None
93 self.flagNoMoreFiles = 0
93 self.flagNoMoreFiles = 0
94 self.isConfig = False
95 self.filename = None
94 self.filename = None
96 self.intervals = set()
95 self.intervals = set()
96 self.datatime = datetime.datetime(1900,1,1)
97 self.format = None
98 self.filefmt = "***%Y%m%d*******"
97
99
98 def setup(self,
100 def setup(self, **kwargs):
99 path=None,
100 startDate=None,
101 endDate=None,
102 format=None,
103 startTime=datetime.time(0, 0, 0),
104 endTime=datetime.time(23, 59, 59),
105 **kwargs):
106
101
107 self.path = path
102 self.set_kwargs(**kwargs)
108 self.startDate = startDate
103 self.oneDDict = load_json(self.oneDDict)
109 self.endDate = endDate
104 self.twoDDict = load_json(self.twoDDict)
110 self.startTime = startTime
105 self.ind2DList = load_json(self.ind2DList)
111 self.endTime = endTime
106 self.independentParam = self.ind2DList[0]
112 self.datatime = datetime.datetime(1900,1,1)
113 self.oneDDict = load_json(kwargs.get('oneDDict',
114 "{\"GDLATR\":\"lat\", \"GDLONR\":\"lon\"}"))
115 self.twoDDict = load_json(kwargs.get('twoDDict',
116 "{\"GDALT\": \"heightList\"}"))
117 self.independentParam = 'GDALT'
118
107
119 if self.path is None:
108 if self.path is None:
120 raise ValueError('The path is not valid')
109 raise ValueError('The path is not valid')
121
110
122 if format is None:
111 self.open_file = open
112 self.open_mode = 'rb'
113
114 if self.format is None:
123 raise ValueError('The format is not valid choose simple or hdf5')
115 raise ValueError('The format is not valid choose simple or hdf5')
124 elif format.lower() in ('simple', 'txt'):
116 elif self.format.lower() in ('simple', 'txt'):
125 self.ext = '.txt'
117 self.ext = '.txt'
126 elif format.lower() in ('cedar',):
118 elif self.format.lower() in ('cedar',):
127 self.ext = '.001'
119 self.ext = '.001'
128 else:
120 else:
129 self.ext = '.hdf5'
121 self.ext = '.hdf5'
122 self.open_file = h5py.File
123 self.open_mode = 'r'
130
124
131 self.search_files(self.path)
125 if self.online:
132 self.fileId = 0
126 log.log("Searching files in online mode...", self.name)
133
134 if not self.fileList:
135 raise Warning('There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path))
136
137 self.setNextFile()
138
139 def search_files(self, path):
140 '''
141 Searching for madrigal files in path
142 Creating a list of files to procces included in [startDate,endDate]
143
144 Input:
145 path - Path to find files
146 '''
147
148 log.log('Searching files {} in {} '.format(self.ext, path), 'MADReader')
149 fileList0 = glob.glob1(path, '*{}'.format(self.ext))
150 fileList0.sort()
151
152 self.fileList = []
153 self.dateFileList = []
154
155 startDate = self.startDate - datetime.timedelta(1)
156 endDate = self.endDate + datetime.timedelta(1)
157
158 for thisFile in fileList0:
159 year = thisFile[3:7]
160 if not year.isdigit():
161 continue
162
163 month = thisFile[7:9]
164 if not month.isdigit():
165 continue
166
127
167 day = thisFile[9:11]
128 for nTries in range(self.nTries):
168 if not day.isdigit():
129 fullpath = self.searchFilesOnLine(self.path, self.startDate,
169 continue
130 self.endDate, self.expLabel, self.ext, self.walk,
131 self.filefmt, self.folderfmt)
170
132
171 year, month, day = int(year), int(month), int(day)
133 try:
172 dateFile = datetime.date(year, month, day)
134 fullpath = next(fullpath)
135 except:
136 fullpath = None
137
138 if fullpath:
139 break
173
140
174 if (startDate > dateFile) or (endDate < dateFile):
141 log.warning(
175 continue
142 'Waiting {} sec for a valid file in {}: try {} ...'.format(
143 self.delay, self.path, nTries + 1),
144 self.name)
145 time.sleep(self.delay)
146
147 if not(fullpath):
148 raise schainpy.admin.SchainError(
149 'There isn\'t any valid file in {}'.format(self.path))
150
151 else:
152 log.log("Searching files in {}".format(self.path), self.name)
153 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
154 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
155
156 self.setNextFile()
176
157
177 self.fileList.append(thisFile)
158 def readFirstHeader(self):
178 self.dateFileList.append(dateFile)
159 '''Read header and data'''
179
160
180 return
161 self.parseHeader()
162 self.parseData()
163 self.blockIndex = 0
164
165 return
181
166
182 def parseHeader(self):
167 def parseHeader(self):
183 '''
168 '''
@@ -237,51 +222,13 class MADReader(JRODataReader, ProcessingUnit):
237 self.times = numpy.unique(self.data['Table Layout']['ut1_unix'])
222 self.times = numpy.unique(self.data['Table Layout']['ut1_unix'])
238 self.counter_records = int(self.data['Table Layout']['recno'][0])
223 self.counter_records = int(self.data['Table Layout']['recno'][0])
239 self.nrecords = int(self.data['Table Layout']['recno'][-1])
224 self.nrecords = int(self.data['Table Layout']['recno'][-1])
240
241 def setNextFile(self):
242 '''
243 '''
244
245 file_id = self.fileId
246
247 if file_id == len(self.fileList):
248 log.success('No more files', 'MADReader')
249 self.flagNoMoreFiles = 1
250 return 0
251
252 log.success(
253 'Opening: {}'.format(self.fileList[file_id]),
254 'MADReader'
255 )
256
257 filename = os.path.join(self.path, self.fileList[file_id])
258
259 if self.filename is not None:
260 self.fp.close()
261
262 self.filename = filename
263 self.filedate = self.dateFileList[file_id]
264
265 if self.ext=='.hdf5':
266 self.fp = h5py.File(self.filename, 'r')
267 else:
268 self.fp = open(self.filename, 'rb')
269
270 self.parseHeader()
271 self.parseData()
272 self.sizeOfFile = os.path.getsize(self.filename)
273 self.flagIsNewFile = 0
274 self.fileId += 1
275
276 return 1
277
225
278 def readNextBlock(self):
226 def readNextBlock(self):
279
227
280 while True:
228 while True:
281 self.flagDiscontinuousBlock = 0
229 self.flagDiscontinuousBlock = 0
282 if self.flagIsNewFile:
230 if self.counter_records == self.nrecords:
283 if not self.setNextFile():
231 self.setNextFile()
284 return 0
285
232
286 self.readBlock()
233 self.readBlock()
287
234
@@ -321,7 +268,6 class MADReader(JRODataReader, ProcessingUnit):
321 dum.append(self.data[self.counter_records])
268 dum.append(self.data[self.counter_records])
322 self.counter_records += 1
269 self.counter_records += 1
323 if self.counter_records == self.nrecords:
270 if self.counter_records == self.nrecords:
324 self.flagIsNewFile = True
325 break
271 break
326 continue
272 continue
327 self.intervals.add((datatime-self.datatime).seconds)
273 self.intervals.add((datatime-self.datatime).seconds)
@@ -334,9 +280,7 class MADReader(JRODataReader, ProcessingUnit):
334 if datatime.date()>self.datatime.date():
280 if datatime.date()>self.datatime.date():
335 self.flagDiscontinuousBlock = 1
281 self.flagDiscontinuousBlock = 1
336 self.datatime = datatime
282 self.datatime = datatime
337 self.counter_records += 1
283 self.counter_records += 1
338 if self.counter_records == self.nrecords:
339 self.flagIsNewFile = True
340
284
341 self.buffer = numpy.array(dum)
285 self.buffer = numpy.array(dum)
342 return
286 return
@@ -390,10 +334,6 class MADReader(JRODataReader, ProcessingUnit):
390 '''
334 '''
391 Storing data from databuffer to dataOut object
335 Storing data from databuffer to dataOut object
392 '''
336 '''
393 if self.flagNoMoreFiles:
394 self.dataOut.flagNoData = True
395 raise schainpy.admin.SchainError('No file left to process')
396 return 0
397
337
398 if not self.readNextBlock():
338 if not self.readNextBlock():
399 self.dataOut.flagNoData = True
339 self.dataOut.flagNoData = True
@@ -403,10 +343,49 class MADReader(JRODataReader, ProcessingUnit):
403
343
404 return 1
344 return 1
405
345
346 def run(self, **kwargs):
347
348 if not(self.isConfig):
349 self.setup(**kwargs)
350 self.isConfig = True
351
352 self.getData()
353
354 return
355
406 @MPDecorator
356 @MPDecorator
407 class MADWriter(Operation):
357 class MADWriter(Operation):
408
358 '''Writing module for Madrigal files
409 missing = -32767
359
360 type: external
361
362 Inputs:
363 path path where files will be created
364 oneDDict json of one-dimensional parameters in record where keys
365 are Madrigal codes (integers or mnemonics) and values the corresponding
366 dataOut attribute e.g: {
367 'gdlatr': 'lat',
368 'gdlonr': 'lon',
369 'gdlat2':'lat',
370 'glon2':'lon'}
371 ind2DList list of independent spatial two-dimensional parameters e.g:
372 ['heigthList']
373 twoDDict json of two-dimensional parameters in record where keys
374 are Madrigal codes (integers or mnemonics) and values the corresponding
375 dataOut attribute if multidimensional array specify as tupple
376 ('attr', pos) e.g: {
377 'gdalt': 'heightList',
378 'vn1p2': ('data_output', 0),
379 'vn2p2': ('data_output', 1),
380 'vn3': ('data_output', 2),
381 'snl': ('data_SNR', 'db')
382 }
383 metadata json of madrigal metadata (kinst, kindat, catalog and header)
384 format hdf5, cedar
385 blocks number of blocks per file'''
386
387 __attrs__ = ['path', 'oneDDict', 'ind2DList', 'twoDDict','metadata', 'format', 'blocks']
388 missing = -32767
410
389
411 def __init__(self):
390 def __init__(self):
412
391
@@ -416,41 +395,18 class MADWriter(Operation):
416 self.path = None
395 self.path = None
417 self.fp = None
396 self.fp = None
418
397
419 def run(self, dataOut, path, oneDDict, independentParam='[]', twoDDict='{}',
398 def run(self, dataOut, path, oneDDict, ind2DList='[]', twoDDict='{}',
420 metadata='{}', format='cedar', **kwargs):
399 metadata='{}', format='cedar', **kwargs):
421 '''
400
422 Inputs:
423 path - path where files will be created
424 oneDDict - json of one-dimensional parameters in record where keys
425 are Madrigal codes (integers or mnemonics) and values the corresponding
426 dataOut attribute e.g: {
427 'gdlatr': 'lat',
428 'gdlonr': 'lon',
429 'gdlat2':'lat',
430 'glon2':'lon'}
431 independentParam - list of independent spatial two-dimensional parameters e.g:
432 ['heigthList']
433 twoDDict - json of two-dimensional parameters in record where keys
434 are Madrigal codes (integers or mnemonics) and values the corresponding
435 dataOut attribute if multidimensional array specify as tupple
436 ('attr', pos) e.g: {
437 'gdalt': 'heightList',
438 'vn1p2': ('data_output', 0),
439 'vn2p2': ('data_output', 1),
440 'vn3': ('data_output', 2),
441 'snl': ('data_SNR', 'db')
442 }
443 metadata - json of madrigal metadata (kinst, kindat, catalog and header)
444 '''
445 if not self.isConfig:
401 if not self.isConfig:
446 self.setup(path, oneDDict, independentParam, twoDDict, metadata, format, **kwargs)
402 self.setup(path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs)
447 self.isConfig = True
403 self.isConfig = True
448
404
449 self.dataOut = dataOut
405 self.dataOut = dataOut
450 self.putData()
406 self.putData()
451 return 1
407 return 1
452
408
453 def setup(self, path, oneDDict, independentParam, twoDDict, metadata, format, **kwargs):
409 def setup(self, path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs):
454 '''
410 '''
455 Configure Operation
411 Configure Operation
456 '''
412 '''
@@ -460,7 +416,7 class MADWriter(Operation):
460 self.counter = 0
416 self.counter = 0
461 self.oneDDict = load_json(oneDDict)
417 self.oneDDict = load_json(oneDDict)
462 self.twoDDict = load_json(twoDDict)
418 self.twoDDict = load_json(twoDDict)
463 self.independentParam = load_json(independentParam)
419 self.ind2DList = load_json(ind2DList)
464 meta = load_json(metadata)
420 meta = load_json(metadata)
465 self.kinst = meta.get('kinst')
421 self.kinst = meta.get('kinst')
466 self.kindat = meta.get('kindat')
422 self.kindat = meta.get('kindat')
@@ -471,7 +427,7 class MADWriter(Operation):
471 self.extra_args = {}
427 self.extra_args = {}
472 elif format == 'hdf5':
428 elif format == 'hdf5':
473 self.ext = '.hdf5'
429 self.ext = '.hdf5'
474 self.extra_args = {'independentParam': self.independentParam}
430 self.extra_args = {'ind2DList': self.ind2DList}
475
431
476 self.keys = [k.lower() for k in self.twoDDict]
432 self.keys = [k.lower() for k in self.twoDDict]
477 if 'range' in self.keys:
433 if 'range' in self.keys:
@@ -504,6 +460,8 class MADWriter(Operation):
504 log.success(
460 log.success(
505 'Creating file: {}'.format(self.fullname),
461 'Creating file: {}'.format(self.fullname),
506 'MADWriter')
462 'MADWriter')
463 if not os.path.exists(self.path):
464 os.makedirs(self.path)
507 self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True)
465 self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True)
508 except ValueError as e:
466 except ValueError as e:
509 log.error(
467 log.error(
@@ -546,11 +504,11 class MADWriter(Operation):
546 tmp = 10*numpy.log10(SNRavg)
504 tmp = 10*numpy.log10(SNRavg)
547 else:
505 else:
548 tmp = getattr(self.dataOut, value)
506 tmp = getattr(self.dataOut, value)
549 out[key] = tmp.flatten()
507 out[key] = tmp.flatten()[:len(heights)]
550 elif isinstance(value, (tuple, list)):
508 elif isinstance(value, (tuple, list)):
551 attr, x = value
509 attr, x = value
552 data = getattr(self.dataOut, attr)
510 data = getattr(self.dataOut, attr)
553 out[key] = data[int(x)]
511 out[key] = data[int(x)][:len(heights)]
554
512
555 a = numpy.array([out[k] for k in self.keys])
513 a = numpy.array([out[k] for k in self.keys])
556 nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))])
514 nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))])
@@ -966,218 +966,82 class ParamWriter(Operation):
966
966
967
967
968 @MPDecorator
968 @MPDecorator
969 class ParameterReader(JRODataReader,ProcessingUnit):
969 class ParameterReader(Reader, ProcessingUnit):
970 '''
970 '''
971 Reads HDF5 format files
971 Reads HDF5 format files
972 '''
972 '''
973
973
974 ext = ".hdf5"
975 optchar = "D"
976 timezone = None
977 startTime = None
978 endTime = None
979 fileIndex = None
980 blockList = None #List to blocks to be read from the file
981 blocksPerFile = None #Number of blocks to be read
982 blockIndex = None
983 path = None
984 #List of Files
985 filenameList = None
986 datetimeList = None
987 #Hdf5 File
988 listMetaname = None
989 listMeta = None
990 listDataname = None
991 listData = None
992 listShapes = None
993 fp = None
994 #dataOut reconstruction
995 dataOut = None
996
997 def __init__(self):
974 def __init__(self):
998 ProcessingUnit.__init__(self)
975 ProcessingUnit.__init__(self)
999 self.dataOut = Parameters()
976 self.dataOut = Parameters()
1000 return
977 self.ext = ".hdf5"
978 self.optchar = "D"
979 self.timezone = "lt"
980 self.listMetaname = []
981 self.listMeta = []
982 self.listDataname = []
983 self.listData = []
984 self.listShapes = []
985 self.open_file = h5py.File
986 self.open_mode = 'r'
987 self.metadata = False
988 self.filefmt = "*%Y%j***"
989 self.folderfmt = "*%Y%j"
1001
990
1002 def setup(self, **kwargs):
991 def setup(self, **kwargs):
1003
992
1004 path = kwargs['path']
993 self.set_kwargs(**kwargs)
1005 startDate = kwargs['startDate']
994 if not self.ext.startswith('.'):
1006 endDate = kwargs['endDate']
995 self.ext = '.{}'.format(self.ext)
1007 startTime = kwargs['startTime']
1008 endTime = kwargs['endTime']
1009 walk = kwargs['walk']
1010 if 'ext' in kwargs:
1011 ext = kwargs['ext']
1012 else:
1013 ext = '.hdf5'
1014 if 'timezone' in kwargs:
1015 self.timezone = kwargs['timezone']
1016 else:
1017 self.timezone = 'lt'
1018
1019 print("[Reading] Searching files in offline mode ...")
1020 pathList, filenameList = self.searchFilesOffLine(path, startDate=startDate, endDate=endDate,
1021 startTime=startTime, endTime=endTime,
1022 ext=ext, walk=walk)
1023
996
1024 if not(filenameList):
997 if self.online:
1025 print("There is no files into the folder: %s"%(path))
998 log.log("Searching files in online mode...", self.name)
1026 sys.exit(-1)
1027
1028 self.fileIndex = -1
1029 self.startTime = startTime
1030 self.endTime = endTime
1031 self.__readMetadata()
1032 self.__setNextFileOffline()
1033
1034 return
1035
999
1036 def searchFilesOffLine(self, path, startDate=None, endDate=None, startTime=datetime.time(0,0,0), endTime=datetime.time(23,59,59), ext='.hdf5', walk=True):
1000 for nTries in range(self.nTries):
1001 fullpath = self.searchFilesOnLine(self.path, self.startDate,
1002 self.endDate, self.expLabel, self.ext, self.walk,
1003 self.filefmt, self.folderfmt)
1037
1004
1038 expLabel = ''
1005 try:
1039 self.filenameList = []
1006 fullpath = next(fullpath)
1040 self.datetimeList = []
1007 except:
1041 pathList = []
1008 fullpath = None
1042 dateList, pathList = self.findDatafiles(path, startDate, endDate, expLabel, ext, walk, include_path=True)
1009
1043
1010 if fullpath:
1044 if dateList == []:
1011 break
1045 print("[Reading] No *%s files in %s from %s to %s)"%(ext, path,
1012
1046 datetime.datetime.combine(startDate,startTime).ctime(),
1013 log.warning(
1047 datetime.datetime.combine(endDate,endTime).ctime()))
1014 'Waiting {} sec for a valid file in {}: try {} ...'.format(
1048
1015 self.delay, self.path, nTries + 1),
1049 return None, None
1016 self.name)
1050
1017 time.sleep(self.delay)
1051 if len(dateList) > 1:
1018
1052 print("[Reading] %d days were found in date range: %s - %s" %(len(dateList), startDate, endDate))
1019 if not(fullpath):
1020 raise schainpy.admin.SchainError(
1021 'There isn\'t any valid file in {}'.format(self.path))
1022
1023 pathname, filename = os.path.split(fullpath)
1024 self.year = int(filename[1:5])
1025 self.doy = int(filename[5:8])
1026 self.set = int(filename[8:11]) - 1
1053 else:
1027 else:
1054 print("[Reading] data was found for the date %s" %(dateList[0]))
1028 log.log("Searching files in {}".format(self.path), self.name)
1055
1029 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
1056 filenameList = []
1030 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
1057 datetimeList = []
1031
1058
1032 self.setNextFile()
1059 for thisPath in pathList:
1060
1061 fileList = glob.glob1(thisPath, "*%s" %ext)
1062 fileList.sort()
1063
1064 for file in fileList:
1065
1066 filename = os.path.join(thisPath,file)
1067
1068 if not isFileInDateRange(filename, startDate, endDate):
1069 continue
1070
1071 thisDatetime = self.__isFileInTimeRange(filename, startDate, endDate, startTime, endTime)
1072
1073 if not(thisDatetime):
1074 continue
1075
1076 filenameList.append(filename)
1077 datetimeList.append(thisDatetime)
1078
1079 if not(filenameList):
1080 print("[Reading] Any file was found int time range %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime()))
1081 return None, None
1082
1083 print("[Reading] %d file(s) was(were) found in time range: %s - %s" %(len(filenameList), startTime, endTime))
1084 print()
1085
1086 self.filenameList = filenameList
1087 self.datetimeList = datetimeList
1088
1089 return pathList, filenameList
1090
1091 def __isFileInTimeRange(self,filename, startDate, endDate, startTime, endTime):
1092
1093 """
1094 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
1095
1096 Inputs:
1097 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
1098 startDate : fecha inicial del rango seleccionado en formato datetime.date
1099 endDate : fecha final del rango seleccionado en formato datetime.date
1100 startTime : tiempo inicial del rango seleccionado en formato datetime.time
1101 endTime : tiempo final del rango seleccionado en formato datetime.time
1102
1103 Return:
1104 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
1105 fecha especificado, de lo contrario retorna False.
1106
1107 Excepciones:
1108 Si el archivo no existe o no puede ser abierto
1109 Si la cabecera no puede ser leida.
1110
1111 """
1112
1113 try:
1114 fp = h5py.File(filename, 'r')
1115 grp1 = fp['Data']
1116
1117 except IOError:
1118 traceback.print_exc()
1119 raise IOError("The file %s can't be opened" %(filename))
1120 #In case has utctime attribute
1121 grp2 = grp1['utctime']
1122 thisUtcTime = grp2.value[0]
1123
1124 fp.close()
1125
1126 if self.timezone == 'lt':
1127 thisUtcTime -= 5*3600
1128
1129 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime + 5*3600)
1130 thisDate = thisDatetime.date()
1131 thisTime = thisDatetime.time()
1132
1133 startUtcTime = (datetime.datetime.combine(thisDate,startTime)- datetime.datetime(1970, 1, 1)).total_seconds()
1134 endUtcTime = (datetime.datetime.combine(thisDate,endTime)- datetime.datetime(1970, 1, 1)).total_seconds()
1135
1136 #General case
1137 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
1138 #-----------o----------------------------o-----------
1139 # startTime endTime
1140
1141 if endTime >= startTime:
1142 thisUtcLog = numpy.logical_and(thisUtcTime > startUtcTime, thisUtcTime < endUtcTime)
1143 if numpy.any(thisUtcLog): #If there is one block between the hours mentioned
1144 return thisDatetime
1145 return None
1146
1147 #If endTime < startTime then endTime belongs to the next day
1148 #<<<<<<<<<<<o o>>>>>>>>>>>
1149 #-----------o----------------------------o-----------
1150 # endTime startTime
1151
1152 if (thisDate == startDate) and numpy.all(thisUtcTime < startUtcTime):
1153 return None
1154
1155 if (thisDate == endDate) and numpy.all(thisUtcTime > endUtcTime):
1156 return None
1157
1158 if numpy.all(thisUtcTime < startUtcTime) and numpy.all(thisUtcTime > endUtcTime):
1159 return None
1160
1161 return thisDatetime
1162
1163 def __setNextFileOffline(self):
1164
1165 self.fileIndex += 1
1166 idFile = self.fileIndex
1167
1168 if not(idFile < len(self.filenameList)):
1169 raise schainpy.admin.SchainError('No more files')
1170
1033
1171 filename = self.filenameList[idFile]
1034 return
1172 self.fp = h5py.File(filename, 'r')
1173 self.filename = filename
1174
1035
1175 print("Setting the file: %s"%self.filename)
1036 def readFirstHeader(self):
1037 '''Read metadata and data'''
1176
1038
1177 self.__setBlockList()
1039 self.__readMetadata()
1178 self.__readData()
1040 self.__readData()
1041 self.__setBlockList()
1179 self.blockIndex = 0
1042 self.blockIndex = 0
1180 return 1
1043
1044 return
1181
1045
1182 def __setBlockList(self):
1046 def __setBlockList(self):
1183 '''
1047 '''
@@ -1190,12 +1054,13 class ParameterReader(JRODataReader,ProcessingUnit):
1190 self.blocksPerFile
1054 self.blocksPerFile
1191
1055
1192 '''
1056 '''
1193 fp = self.fp
1057
1194 startTime = self.startTime
1058 startTime = self.startTime
1195 endTime = self.endTime
1059 endTime = self.endTime
1196
1060
1197 grp = fp['Data']
1061 index = self.listDataname.index('utctime')
1198 thisUtcTime = grp['utctime'].value
1062 thisUtcTime = self.listData[index]
1063 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
1199
1064
1200 if self.timezone == 'lt':
1065 if self.timezone == 'lt':
1201 thisUtcTime -= 5*3600
1066 thisUtcTime -= 5*3600
@@ -1219,51 +1084,78 class ParameterReader(JRODataReader,ProcessingUnit):
1219 Reads Metadata
1084 Reads Metadata
1220 '''
1085 '''
1221
1086
1222 filename = self.filenameList[0]
1223 fp = h5py.File(filename, 'r')
1224 gp = fp['Metadata']
1225 listMetaname = []
1087 listMetaname = []
1226 listMetadata = []
1088 listMetadata = []
1227
1089 if 'Metadata' in self.fp:
1228 for item in list(gp.items()):
1090 gp = self.fp['Metadata']
1229 name = item[0]
1091 for item in list(gp.items()):
1230
1092 name = item[0]
1231 if name=='variables':
1093
1232 table = gp[name][:]
1094 if name=='variables':
1233 listShapes = {}
1095 table = gp[name][:]
1234 for shapes in table:
1096 listShapes = {}
1235 listShapes[shapes[0].decode()] = numpy.array([shapes[1]])
1097 for shapes in table:
1236 else:
1098 listShapes[shapes[0].decode()] = numpy.array([shapes[1]])
1237 data = gp[name].value
1099 else:
1238 listMetaname.append(name)
1100 data = gp[name].value
1239 listMetadata.append(data)
1101 listMetaname.append(name)
1102 listMetadata.append(data)
1103 elif self.metadata:
1104 metadata = json.loads(self.metadata)
1105 listShapes = {}
1106 for tup in metadata:
1107 name, values, dim = tup
1108 if dim == -1:
1109 listMetaname.append(name)
1110 listMetadata.append(self.fp[values].value)
1111 else:
1112 listShapes[name] = numpy.array([dim])
1113 else:
1114 raise IOError('Missing Metadata group in file or metadata info')
1240
1115
1241 self.listShapes = listShapes
1116 self.listShapes = listShapes
1242 self.listMetaname = listMetaname
1117 self.listMetaname = listMetaname
1243 self.listMeta = listMetadata
1118 self.listMeta = listMetadata
1244
1119
1245 fp.close()
1246 return
1120 return
1247
1121
1248 def __readData(self):
1122 def __readData(self):
1249
1123
1250 grp = self.fp['Data']
1251 listdataname = []
1124 listdataname = []
1252 listdata = []
1125 listdata = []
1253
1126
1254 for item in list(grp.items()):
1127 if 'Data' in self.fp:
1255 name = item[0]
1128 grp = self.fp['Data']
1256 listdataname.append(name)
1129 for item in list(grp.items()):
1257 dim = self.listShapes[name][0]
1130 name = item[0]
1258 if dim == 0:
1131 listdataname.append(name)
1259 array = grp[name].value
1132 dim = self.listShapes[name][0]
1260 else:
1133 if dim == 0:
1261 array = []
1134 array = grp[name].value
1262 for i in range(dim):
1135 else:
1263 array.append(grp[name]['table{:02d}'.format(i)].value)
1136 array = []
1264 array = numpy.array(array)
1137 for i in range(dim):
1265
1138 array.append(grp[name]['table{:02d}'.format(i)].value)
1266 listdata.append(array)
1139 array = numpy.array(array)
1140
1141 listdata.append(array)
1142 elif self.metadata:
1143 metadata = json.loads(self.metadata)
1144 for tup in metadata:
1145 name, values, dim = tup
1146 listdataname.append(name)
1147 if dim == -1:
1148 continue
1149 elif dim == 0:
1150 array = self.fp[values].value
1151 else:
1152 array = []
1153 for var in values:
1154 array.append(self.fp[var].value)
1155 array = numpy.array(array)
1156 listdata.append(array)
1157 else:
1158 raise IOError('Missing Data group in file or metadata info')
1267
1159
1268 self.listDataname = listdataname
1160 self.listDataname = listdataname
1269 self.listData = listdata
1161 self.listData = listdata
@@ -1281,6 +1173,7 class ParameterReader(JRODataReader,ProcessingUnit):
1281 else:
1173 else:
1282 setattr(self.dataOut, self.listDataname[j], self.listData[j][:,self.blockIndex])
1174 setattr(self.dataOut, self.listDataname[j], self.listData[j][:,self.blockIndex])
1283
1175
1176 self.dataOut.paramInterval = self.interval
1284 self.dataOut.flagNoData = False
1177 self.dataOut.flagNoData = False
1285 self.blockIndex += 1
1178 self.blockIndex += 1
1286
1179
@@ -1293,9 +1186,7 class ParameterReader(JRODataReader,ProcessingUnit):
1293 self.isConfig = True
1186 self.isConfig = True
1294
1187
1295 if self.blockIndex == self.blocksPerFile:
1188 if self.blockIndex == self.blocksPerFile:
1296 if not(self.__setNextFileOffline()):
1189 self.setNextFile()
1297 self.dataOut.flagNoData = True
1298 return 0
1299
1190
1300 self.getData()
1191 self.getData()
1301
1192
@@ -42,6 +42,8 class ProcessingUnit(object):
42
42
43
43
44 """
44 """
45 proc_type = 'processing'
46 __attrs__ = []
45
47
46 def __init__(self):
48 def __init__(self):
47
49
@@ -128,6 +130,8 class Operation(object):
128 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
130 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
129
131
130 """
132 """
133 proc_type = 'operation'
134 __attrs__ = []
131
135
132 def __init__(self):
136 def __init__(self):
133
137
@@ -178,54 +182,31 class Operation(object):
178 class InputQueue(Thread):
182 class InputQueue(Thread):
179
183
180 '''
184 '''
181
182 Class to hold input data for Proccessing Units and external Operations,
185 Class to hold input data for Proccessing Units and external Operations,
183
184 '''
186 '''
185
187
186
187
188 def __init__(self, project_id, inputId):
188 def __init__(self, project_id, inputId):
189
189
190 Thread.__init__(self)
190 Thread.__init__(self)
191
192 self.queue = Queue()
191 self.queue = Queue()
193
194 self.project_id = project_id
192 self.project_id = project_id
195
196 self.inputId = inputId
193 self.inputId = inputId
197
194
198
199
200 def run(self):
195 def run(self):
201
196
202
203
204 c = zmq.Context()
197 c = zmq.Context()
205
206 self.receiver = c.socket(zmq.SUB)
198 self.receiver = c.socket(zmq.SUB)
207
208 self.receiver.connect(
199 self.receiver.connect(
209
210 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
200 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
211
212 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
201 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
213
202
214
215
216 while True:
203 while True:
217
218 self.queue.put(self.receiver.recv_multipart()[1])
204 self.queue.put(self.receiver.recv_multipart()[1])
219
205
220
221
222 def get(self):
206 def get(self):
223
207
224
225
226 return pickle.loads(self.queue.get())
208 return pickle.loads(self.queue.get())
227
209
228
229
210
230 def MPDecorator(BaseClass):
211 def MPDecorator(BaseClass):
231 """
212 """
@@ -248,6 +229,7 def MPDecorator(BaseClass):
248 self.i = 0
229 self.i = 0
249 self.t = time.time()
230 self.t = time.time()
250 self.name = BaseClass.__name__
231 self.name = BaseClass.__name__
232 self.__doc__ = BaseClass.__doc__
251
233
252 if 'plot' in self.name.lower() and not self.name.endswith('_'):
234 if 'plot' in self.name.lower() and not self.name.endswith('_'):
253 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
235 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
General Comments 0
You need to be logged in to leave comments. Login now