##// END OF EJS Templates
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
jespinoza -
r1254:6b25d3b79646
parent child
Show More
@@ -627,48 +627,3 class AMISRReader(ProcessingUnit):
627 627 self.isConfig = True
628 628
629 629 self.getData()
630
631 class Writer(Operation):
632 '''
633 classdocs
634 '''
635
636 def __init__(self):
637 '''
638 Constructor
639 '''
640 self.dataOut = None
641
642 self.isConfig = False
643
644 def setup(self, dataIn, path, blocksPerFile, set=0, ext=None):
645 '''
646 In this method we should set all initial parameters.
647
648 Input:
649 dataIn : Input data will also be outputa data
650
651 '''
652 self.dataOut = dataIn
653
654
655
656
657
658 self.isConfig = True
659
660 return
661
662 def run(self, dataIn, **kwargs):
663 '''
664 This method will be called many times so here you should put all your code
665
666 Inputs:
667
668 dataIn : object with the data
669
670 '''
671
672 if not self.isConfig:
673 self.setup(dataIn, **kwargs)
674 No newline at end of file
@@ -15,7 +15,7 import numpy
15 15 import h5py
16 16
17 17 import schainpy.admin
18 from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
18 from schainpy.model.io.jroIO_base import LOCALTIME, Reader
19 19 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
20 20 from schainpy.model.data.jrodata import Parameters
21 21 from schainpy.utils import log
@@ -81,7 +81,7 def load_json(obj):
81 81 return iterable
82 82
83 83 @MPDecorator
84 class MADReader(JRODataReader, ProcessingUnit):
84 class MADReader(Reader, ProcessingUnit):
85 85
86 86 def __init__(self):
87 87
@@ -91,93 +91,78 class MADReader(JRODataReader, ProcessingUnit):
91 91 self.counter_records = 0
92 92 self.nrecords = None
93 93 self.flagNoMoreFiles = 0
94 self.isConfig = False
95 94 self.filename = None
96 95 self.intervals = set()
96 self.datatime = datetime.datetime(1900,1,1)
97 self.format = None
98 self.filefmt = "***%Y%m%d*******"
97 99
98 def setup(self,
99 path=None,
100 startDate=None,
101 endDate=None,
102 format=None,
103 startTime=datetime.time(0, 0, 0),
104 endTime=datetime.time(23, 59, 59),
105 **kwargs):
100 def setup(self, **kwargs):
106 101
107 self.path = path
108 self.startDate = startDate
109 self.endDate = endDate
110 self.startTime = startTime
111 self.endTime = endTime
112 self.datatime = datetime.datetime(1900,1,1)
113 self.oneDDict = load_json(kwargs.get('oneDDict',
114 "{\"GDLATR\":\"lat\", \"GDLONR\":\"lon\"}"))
115 self.twoDDict = load_json(kwargs.get('twoDDict',
116 "{\"GDALT\": \"heightList\"}"))
117 self.independentParam = 'GDALT'
102 self.set_kwargs(**kwargs)
103 self.oneDDict = load_json(self.oneDDict)
104 self.twoDDict = load_json(self.twoDDict)
105 self.ind2DList = load_json(self.ind2DList)
106 self.independentParam = self.ind2DList[0]
118 107
119 108 if self.path is None:
120 109 raise ValueError('The path is not valid')
121 110
122 if format is None:
111 self.open_file = open
112 self.open_mode = 'rb'
113
114 if self.format is None:
123 115 raise ValueError('The format is not valid choose simple or hdf5')
124 elif format.lower() in ('simple', 'txt'):
116 elif self.format.lower() in ('simple', 'txt'):
125 117 self.ext = '.txt'
126 elif format.lower() in ('cedar',):
118 elif self.format.lower() in ('cedar',):
127 119 self.ext = '.001'
128 120 else:
129 121 self.ext = '.hdf5'
122 self.open_file = h5py.File
123 self.open_mode = 'r'
130 124
131 self.search_files(self.path)
132 self.fileId = 0
133
134 if not self.fileList:
135 raise Warning('There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path))
136
137 self.setNextFile()
138
139 def search_files(self, path):
140 '''
141 Searching for madrigal files in path
142 Creating a list of files to procces included in [startDate,endDate]
143
144 Input:
145 path - Path to find files
146 '''
147
148 log.log('Searching files {} in {} '.format(self.ext, path), 'MADReader')
149 fileList0 = glob.glob1(path, '*{}'.format(self.ext))
150 fileList0.sort()
151
152 self.fileList = []
153 self.dateFileList = []
154
155 startDate = self.startDate - datetime.timedelta(1)
156 endDate = self.endDate + datetime.timedelta(1)
157
158 for thisFile in fileList0:
159 year = thisFile[3:7]
160 if not year.isdigit():
161 continue
162
163 month = thisFile[7:9]
164 if not month.isdigit():
165 continue
125 if self.online:
126 log.log("Searching files in online mode...", self.name)
166 127
167 day = thisFile[9:11]
168 if not day.isdigit():
169 continue
128 for nTries in range(self.nTries):
129 fullpath = self.searchFilesOnLine(self.path, self.startDate,
130 self.endDate, self.expLabel, self.ext, self.walk,
131 self.filefmt, self.folderfmt)
170 132
171 year, month, day = int(year), int(month), int(day)
172 dateFile = datetime.date(year, month, day)
133 try:
134 fullpath = next(fullpath)
135 except:
136 fullpath = None
137
138 if fullpath:
139 break
173 140
174 if (startDate > dateFile) or (endDate < dateFile):
175 continue
141 log.warning(
142 'Waiting {} sec for a valid file in {}: try {} ...'.format(
143 self.delay, self.path, nTries + 1),
144 self.name)
145 time.sleep(self.delay)
146
147 if not(fullpath):
148 raise schainpy.admin.SchainError(
149 'There isn\'t any valid file in {}'.format(self.path))
150
151 else:
152 log.log("Searching files in {}".format(self.path), self.name)
153 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
154 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
155
156 self.setNextFile()
176 157
177 self.fileList.append(thisFile)
178 self.dateFileList.append(dateFile)
158 def readFirstHeader(self):
159 '''Read header and data'''
179 160
180 return
161 self.parseHeader()
162 self.parseData()
163 self.blockIndex = 0
164
165 return
181 166
182 167 def parseHeader(self):
183 168 '''
@@ -237,51 +222,13 class MADReader(JRODataReader, ProcessingUnit):
237 222 self.times = numpy.unique(self.data['Table Layout']['ut1_unix'])
238 223 self.counter_records = int(self.data['Table Layout']['recno'][0])
239 224 self.nrecords = int(self.data['Table Layout']['recno'][-1])
240
241 def setNextFile(self):
242 '''
243 '''
244
245 file_id = self.fileId
246
247 if file_id == len(self.fileList):
248 log.success('No more files', 'MADReader')
249 self.flagNoMoreFiles = 1
250 return 0
251
252 log.success(
253 'Opening: {}'.format(self.fileList[file_id]),
254 'MADReader'
255 )
256
257 filename = os.path.join(self.path, self.fileList[file_id])
258
259 if self.filename is not None:
260 self.fp.close()
261
262 self.filename = filename
263 self.filedate = self.dateFileList[file_id]
264
265 if self.ext=='.hdf5':
266 self.fp = h5py.File(self.filename, 'r')
267 else:
268 self.fp = open(self.filename, 'rb')
269
270 self.parseHeader()
271 self.parseData()
272 self.sizeOfFile = os.path.getsize(self.filename)
273 self.flagIsNewFile = 0
274 self.fileId += 1
275
276 return 1
277 225
278 226 def readNextBlock(self):
279 227
280 228 while True:
281 229 self.flagDiscontinuousBlock = 0
282 if self.flagIsNewFile:
283 if not self.setNextFile():
284 return 0
230 if self.counter_records == self.nrecords:
231 self.setNextFile()
285 232
286 233 self.readBlock()
287 234
@@ -321,7 +268,6 class MADReader(JRODataReader, ProcessingUnit):
321 268 dum.append(self.data[self.counter_records])
322 269 self.counter_records += 1
323 270 if self.counter_records == self.nrecords:
324 self.flagIsNewFile = True
325 271 break
326 272 continue
327 273 self.intervals.add((datatime-self.datatime).seconds)
@@ -334,9 +280,7 class MADReader(JRODataReader, ProcessingUnit):
334 280 if datatime.date()>self.datatime.date():
335 281 self.flagDiscontinuousBlock = 1
336 282 self.datatime = datatime
337 self.counter_records += 1
338 if self.counter_records == self.nrecords:
339 self.flagIsNewFile = True
283 self.counter_records += 1
340 284
341 285 self.buffer = numpy.array(dum)
342 286 return
@@ -390,10 +334,6 class MADReader(JRODataReader, ProcessingUnit):
390 334 '''
391 335 Storing data from databuffer to dataOut object
392 336 '''
393 if self.flagNoMoreFiles:
394 self.dataOut.flagNoData = True
395 raise schainpy.admin.SchainError('No file left to process')
396 return 0
397 337
398 338 if not self.readNextBlock():
399 339 self.dataOut.flagNoData = True
@@ -403,10 +343,49 class MADReader(JRODataReader, ProcessingUnit):
403 343
404 344 return 1
405 345
346 def run(self, **kwargs):
347
348 if not(self.isConfig):
349 self.setup(**kwargs)
350 self.isConfig = True
351
352 self.getData()
353
354 return
355
406 356 @MPDecorator
407 357 class MADWriter(Operation):
408
409 missing = -32767
358 '''Writing module for Madrigal files
359
360 type: external
361
362 Inputs:
363 path path where files will be created
364 oneDDict json of one-dimensional parameters in record where keys
365 are Madrigal codes (integers or mnemonics) and values the corresponding
366 dataOut attribute e.g: {
367 'gdlatr': 'lat',
368 'gdlonr': 'lon',
369 'gdlat2':'lat',
370 'glon2':'lon'}
371 ind2DList list of independent spatial two-dimensional parameters e.g:
372 ['heigthList']
373 twoDDict json of two-dimensional parameters in record where keys
374 are Madrigal codes (integers or mnemonics) and values the corresponding
375 dataOut attribute if multidimensional array specify as tupple
376 ('attr', pos) e.g: {
377 'gdalt': 'heightList',
378 'vn1p2': ('data_output', 0),
379 'vn2p2': ('data_output', 1),
380 'vn3': ('data_output', 2),
381 'snl': ('data_SNR', 'db')
382 }
383 metadata json of madrigal metadata (kinst, kindat, catalog and header)
384 format hdf5, cedar
385 blocks number of blocks per file'''
386
387 __attrs__ = ['path', 'oneDDict', 'ind2DList', 'twoDDict','metadata', 'format', 'blocks']
388 missing = -32767
410 389
411 390 def __init__(self):
412 391
@@ -416,41 +395,18 class MADWriter(Operation):
416 395 self.path = None
417 396 self.fp = None
418 397
419 def run(self, dataOut, path, oneDDict, independentParam='[]', twoDDict='{}',
398 def run(self, dataOut, path, oneDDict, ind2DList='[]', twoDDict='{}',
420 399 metadata='{}', format='cedar', **kwargs):
421 '''
422 Inputs:
423 path - path where files will be created
424 oneDDict - json of one-dimensional parameters in record where keys
425 are Madrigal codes (integers or mnemonics) and values the corresponding
426 dataOut attribute e.g: {
427 'gdlatr': 'lat',
428 'gdlonr': 'lon',
429 'gdlat2':'lat',
430 'glon2':'lon'}
431 independentParam - list of independent spatial two-dimensional parameters e.g:
432 ['heigthList']
433 twoDDict - json of two-dimensional parameters in record where keys
434 are Madrigal codes (integers or mnemonics) and values the corresponding
435 dataOut attribute if multidimensional array specify as tupple
436 ('attr', pos) e.g: {
437 'gdalt': 'heightList',
438 'vn1p2': ('data_output', 0),
439 'vn2p2': ('data_output', 1),
440 'vn3': ('data_output', 2),
441 'snl': ('data_SNR', 'db')
442 }
443 metadata - json of madrigal metadata (kinst, kindat, catalog and header)
444 '''
400
445 401 if not self.isConfig:
446 self.setup(path, oneDDict, independentParam, twoDDict, metadata, format, **kwargs)
402 self.setup(path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs)
447 403 self.isConfig = True
448 404
449 405 self.dataOut = dataOut
450 406 self.putData()
451 407 return 1
452 408
453 def setup(self, path, oneDDict, independentParam, twoDDict, metadata, format, **kwargs):
409 def setup(self, path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs):
454 410 '''
455 411 Configure Operation
456 412 '''
@@ -460,7 +416,7 class MADWriter(Operation):
460 416 self.counter = 0
461 417 self.oneDDict = load_json(oneDDict)
462 418 self.twoDDict = load_json(twoDDict)
463 self.independentParam = load_json(independentParam)
419 self.ind2DList = load_json(ind2DList)
464 420 meta = load_json(metadata)
465 421 self.kinst = meta.get('kinst')
466 422 self.kindat = meta.get('kindat')
@@ -471,7 +427,7 class MADWriter(Operation):
471 427 self.extra_args = {}
472 428 elif format == 'hdf5':
473 429 self.ext = '.hdf5'
474 self.extra_args = {'independentParam': self.independentParam}
430 self.extra_args = {'ind2DList': self.ind2DList}
475 431
476 432 self.keys = [k.lower() for k in self.twoDDict]
477 433 if 'range' in self.keys:
@@ -504,6 +460,8 class MADWriter(Operation):
504 460 log.success(
505 461 'Creating file: {}'.format(self.fullname),
506 462 'MADWriter')
463 if not os.path.exists(self.path):
464 os.makedirs(self.path)
507 465 self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True)
508 466 except ValueError as e:
509 467 log.error(
@@ -546,11 +504,11 class MADWriter(Operation):
546 504 tmp = 10*numpy.log10(SNRavg)
547 505 else:
548 506 tmp = getattr(self.dataOut, value)
549 out[key] = tmp.flatten()
507 out[key] = tmp.flatten()[:len(heights)]
550 508 elif isinstance(value, (tuple, list)):
551 509 attr, x = value
552 data = getattr(self.dataOut, attr)
553 out[key] = data[int(x)]
510 data = getattr(self.dataOut, attr)
511 out[key] = data[int(x)][:len(heights)]
554 512
555 513 a = numpy.array([out[k] for k in self.keys])
556 514 nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))])
@@ -966,218 +966,82 class ParamWriter(Operation):
966 966
967 967
968 968 @MPDecorator
969 class ParameterReader(JRODataReader,ProcessingUnit):
969 class ParameterReader(Reader, ProcessingUnit):
970 970 '''
971 971 Reads HDF5 format files
972 972 '''
973 973
974 ext = ".hdf5"
975 optchar = "D"
976 timezone = None
977 startTime = None
978 endTime = None
979 fileIndex = None
980 blockList = None #List to blocks to be read from the file
981 blocksPerFile = None #Number of blocks to be read
982 blockIndex = None
983 path = None
984 #List of Files
985 filenameList = None
986 datetimeList = None
987 #Hdf5 File
988 listMetaname = None
989 listMeta = None
990 listDataname = None
991 listData = None
992 listShapes = None
993 fp = None
994 #dataOut reconstruction
995 dataOut = None
996
997 974 def __init__(self):
998 975 ProcessingUnit.__init__(self)
999 976 self.dataOut = Parameters()
1000 return
977 self.ext = ".hdf5"
978 self.optchar = "D"
979 self.timezone = "lt"
980 self.listMetaname = []
981 self.listMeta = []
982 self.listDataname = []
983 self.listData = []
984 self.listShapes = []
985 self.open_file = h5py.File
986 self.open_mode = 'r'
987 self.metadata = False
988 self.filefmt = "*%Y%j***"
989 self.folderfmt = "*%Y%j"
1001 990
1002 991 def setup(self, **kwargs):
1003 992
1004 path = kwargs['path']
1005 startDate = kwargs['startDate']
1006 endDate = kwargs['endDate']
1007 startTime = kwargs['startTime']
1008 endTime = kwargs['endTime']
1009 walk = kwargs['walk']
1010 if 'ext' in kwargs:
1011 ext = kwargs['ext']
1012 else:
1013 ext = '.hdf5'
1014 if 'timezone' in kwargs:
1015 self.timezone = kwargs['timezone']
1016 else:
1017 self.timezone = 'lt'
1018
1019 print("[Reading] Searching files in offline mode ...")
1020 pathList, filenameList = self.searchFilesOffLine(path, startDate=startDate, endDate=endDate,
1021 startTime=startTime, endTime=endTime,
1022 ext=ext, walk=walk)
993 self.set_kwargs(**kwargs)
994 if not self.ext.startswith('.'):
995 self.ext = '.{}'.format(self.ext)
1023 996
1024 if not(filenameList):
1025 print("There is no files into the folder: %s"%(path))
1026 sys.exit(-1)
1027
1028 self.fileIndex = -1
1029 self.startTime = startTime
1030 self.endTime = endTime
1031 self.__readMetadata()
1032 self.__setNextFileOffline()
1033
1034 return
997 if self.online:
998 log.log("Searching files in online mode...", self.name)
1035 999
1036 def searchFilesOffLine(self, path, startDate=None, endDate=None, startTime=datetime.time(0,0,0), endTime=datetime.time(23,59,59), ext='.hdf5', walk=True):
1000 for nTries in range(self.nTries):
1001 fullpath = self.searchFilesOnLine(self.path, self.startDate,
1002 self.endDate, self.expLabel, self.ext, self.walk,
1003 self.filefmt, self.folderfmt)
1037 1004
1038 expLabel = ''
1039 self.filenameList = []
1040 self.datetimeList = []
1041 pathList = []
1042 dateList, pathList = self.findDatafiles(path, startDate, endDate, expLabel, ext, walk, include_path=True)
1043
1044 if dateList == []:
1045 print("[Reading] No *%s files in %s from %s to %s)"%(ext, path,
1046 datetime.datetime.combine(startDate,startTime).ctime(),
1047 datetime.datetime.combine(endDate,endTime).ctime()))
1048
1049 return None, None
1050
1051 if len(dateList) > 1:
1052 print("[Reading] %d days were found in date range: %s - %s" %(len(dateList), startDate, endDate))
1005 try:
1006 fullpath = next(fullpath)
1007 except:
1008 fullpath = None
1009
1010 if fullpath:
1011 break
1012
1013 log.warning(
1014 'Waiting {} sec for a valid file in {}: try {} ...'.format(
1015 self.delay, self.path, nTries + 1),
1016 self.name)
1017 time.sleep(self.delay)
1018
1019 if not(fullpath):
1020 raise schainpy.admin.SchainError(
1021 'There isn\'t any valid file in {}'.format(self.path))
1022
1023 pathname, filename = os.path.split(fullpath)
1024 self.year = int(filename[1:5])
1025 self.doy = int(filename[5:8])
1026 self.set = int(filename[8:11]) - 1
1053 1027 else:
1054 print("[Reading] data was found for the date %s" %(dateList[0]))
1055
1056 filenameList = []
1057 datetimeList = []
1058
1059 for thisPath in pathList:
1060
1061 fileList = glob.glob1(thisPath, "*%s" %ext)
1062 fileList.sort()
1063
1064 for file in fileList:
1065
1066 filename = os.path.join(thisPath,file)
1067
1068 if not isFileInDateRange(filename, startDate, endDate):
1069 continue
1070
1071 thisDatetime = self.__isFileInTimeRange(filename, startDate, endDate, startTime, endTime)
1072
1073 if not(thisDatetime):
1074 continue
1075
1076 filenameList.append(filename)
1077 datetimeList.append(thisDatetime)
1078
1079 if not(filenameList):
1080 print("[Reading] Any file was found int time range %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime()))
1081 return None, None
1082
1083 print("[Reading] %d file(s) was(were) found in time range: %s - %s" %(len(filenameList), startTime, endTime))
1084 print()
1085
1086 self.filenameList = filenameList
1087 self.datetimeList = datetimeList
1088
1089 return pathList, filenameList
1090
1091 def __isFileInTimeRange(self,filename, startDate, endDate, startTime, endTime):
1092
1093 """
1094 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
1095
1096 Inputs:
1097 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
1098 startDate : fecha inicial del rango seleccionado en formato datetime.date
1099 endDate : fecha final del rango seleccionado en formato datetime.date
1100 startTime : tiempo inicial del rango seleccionado en formato datetime.time
1101 endTime : tiempo final del rango seleccionado en formato datetime.time
1102
1103 Return:
1104 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
1105 fecha especificado, de lo contrario retorna False.
1106
1107 Excepciones:
1108 Si el archivo no existe o no puede ser abierto
1109 Si la cabecera no puede ser leida.
1110
1111 """
1112
1113 try:
1114 fp = h5py.File(filename, 'r')
1115 grp1 = fp['Data']
1116
1117 except IOError:
1118 traceback.print_exc()
1119 raise IOError("The file %s can't be opened" %(filename))
1120 #In case has utctime attribute
1121 grp2 = grp1['utctime']
1122 thisUtcTime = grp2.value[0]
1123
1124 fp.close()
1125
1126 if self.timezone == 'lt':
1127 thisUtcTime -= 5*3600
1128
1129 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime + 5*3600)
1130 thisDate = thisDatetime.date()
1131 thisTime = thisDatetime.time()
1132
1133 startUtcTime = (datetime.datetime.combine(thisDate,startTime)- datetime.datetime(1970, 1, 1)).total_seconds()
1134 endUtcTime = (datetime.datetime.combine(thisDate,endTime)- datetime.datetime(1970, 1, 1)).total_seconds()
1135
1136 #General case
1137 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
1138 #-----------o----------------------------o-----------
1139 # startTime endTime
1140
1141 if endTime >= startTime:
1142 thisUtcLog = numpy.logical_and(thisUtcTime > startUtcTime, thisUtcTime < endUtcTime)
1143 if numpy.any(thisUtcLog): #If there is one block between the hours mentioned
1144 return thisDatetime
1145 return None
1146
1147 #If endTime < startTime then endTime belongs to the next day
1148 #<<<<<<<<<<<o o>>>>>>>>>>>
1149 #-----------o----------------------------o-----------
1150 # endTime startTime
1151
1152 if (thisDate == startDate) and numpy.all(thisUtcTime < startUtcTime):
1153 return None
1154
1155 if (thisDate == endDate) and numpy.all(thisUtcTime > endUtcTime):
1156 return None
1157
1158 if numpy.all(thisUtcTime < startUtcTime) and numpy.all(thisUtcTime > endUtcTime):
1159 return None
1160
1161 return thisDatetime
1162
1163 def __setNextFileOffline(self):
1164
1165 self.fileIndex += 1
1166 idFile = self.fileIndex
1167
1168 if not(idFile < len(self.filenameList)):
1169 raise schainpy.admin.SchainError('No more files')
1028 log.log("Searching files in {}".format(self.path), self.name)
1029 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
1030 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
1031
1032 self.setNextFile()
1170 1033
1171 filename = self.filenameList[idFile]
1172 self.fp = h5py.File(filename, 'r')
1173 self.filename = filename
1034 return
1174 1035
1175 print("Setting the file: %s"%self.filename)
1036 def readFirstHeader(self):
1037 '''Read metadata and data'''
1176 1038
1177 self.__setBlockList()
1039 self.__readMetadata()
1178 1040 self.__readData()
1041 self.__setBlockList()
1179 1042 self.blockIndex = 0
1180 return 1
1043
1044 return
1181 1045
1182 1046 def __setBlockList(self):
1183 1047 '''
@@ -1190,12 +1054,13 class ParameterReader(JRODataReader,ProcessingUnit):
1190 1054 self.blocksPerFile
1191 1055
1192 1056 '''
1193 fp = self.fp
1057
1194 1058 startTime = self.startTime
1195 1059 endTime = self.endTime
1196 1060
1197 grp = fp['Data']
1198 thisUtcTime = grp['utctime'].value
1061 index = self.listDataname.index('utctime')
1062 thisUtcTime = self.listData[index]
1063 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
1199 1064
1200 1065 if self.timezone == 'lt':
1201 1066 thisUtcTime -= 5*3600
@@ -1219,51 +1084,78 class ParameterReader(JRODataReader,ProcessingUnit):
1219 1084 Reads Metadata
1220 1085 '''
1221 1086
1222 filename = self.filenameList[0]
1223 fp = h5py.File(filename, 'r')
1224 gp = fp['Metadata']
1225 1087 listMetaname = []
1226 1088 listMetadata = []
1227
1228 for item in list(gp.items()):
1229 name = item[0]
1230
1231 if name=='variables':
1232 table = gp[name][:]
1233 listShapes = {}
1234 for shapes in table:
1235 listShapes[shapes[0].decode()] = numpy.array([shapes[1]])
1236 else:
1237 data = gp[name].value
1238 listMetaname.append(name)
1239 listMetadata.append(data)
1089 if 'Metadata' in self.fp:
1090 gp = self.fp['Metadata']
1091 for item in list(gp.items()):
1092 name = item[0]
1093
1094 if name=='variables':
1095 table = gp[name][:]
1096 listShapes = {}
1097 for shapes in table:
1098 listShapes[shapes[0].decode()] = numpy.array([shapes[1]])
1099 else:
1100 data = gp[name].value
1101 listMetaname.append(name)
1102 listMetadata.append(data)
1103 elif self.metadata:
1104 metadata = json.loads(self.metadata)
1105 listShapes = {}
1106 for tup in metadata:
1107 name, values, dim = tup
1108 if dim == -1:
1109 listMetaname.append(name)
1110 listMetadata.append(self.fp[values].value)
1111 else:
1112 listShapes[name] = numpy.array([dim])
1113 else:
1114 raise IOError('Missing Metadata group in file or metadata info')
1240 1115
1241 1116 self.listShapes = listShapes
1242 1117 self.listMetaname = listMetaname
1243 self.listMeta = listMetadata
1118 self.listMeta = listMetadata
1244 1119
1245 fp.close()
1246 1120 return
1247 1121
1248 1122 def __readData(self):
1249 1123
1250 grp = self.fp['Data']
1251 1124 listdataname = []
1252 1125 listdata = []
1253
1254 for item in list(grp.items()):
1255 name = item[0]
1256 listdataname.append(name)
1257 dim = self.listShapes[name][0]
1258 if dim == 0:
1259 array = grp[name].value
1260 else:
1261 array = []
1262 for i in range(dim):
1263 array.append(grp[name]['table{:02d}'.format(i)].value)
1264 array = numpy.array(array)
1265
1266 listdata.append(array)
1126
1127 if 'Data' in self.fp:
1128 grp = self.fp['Data']
1129 for item in list(grp.items()):
1130 name = item[0]
1131 listdataname.append(name)
1132 dim = self.listShapes[name][0]
1133 if dim == 0:
1134 array = grp[name].value
1135 else:
1136 array = []
1137 for i in range(dim):
1138 array.append(grp[name]['table{:02d}'.format(i)].value)
1139 array = numpy.array(array)
1140
1141 listdata.append(array)
1142 elif self.metadata:
1143 metadata = json.loads(self.metadata)
1144 for tup in metadata:
1145 name, values, dim = tup
1146 listdataname.append(name)
1147 if dim == -1:
1148 continue
1149 elif dim == 0:
1150 array = self.fp[values].value
1151 else:
1152 array = []
1153 for var in values:
1154 array.append(self.fp[var].value)
1155 array = numpy.array(array)
1156 listdata.append(array)
1157 else:
1158 raise IOError('Missing Data group in file or metadata info')
1267 1159
1268 1160 self.listDataname = listdataname
1269 1161 self.listData = listdata
@@ -1281,6 +1173,7 class ParameterReader(JRODataReader,ProcessingUnit):
1281 1173 else:
1282 1174 setattr(self.dataOut, self.listDataname[j], self.listData[j][:,self.blockIndex])
1283 1175
1176 self.dataOut.paramInterval = self.interval
1284 1177 self.dataOut.flagNoData = False
1285 1178 self.blockIndex += 1
1286 1179
@@ -1293,9 +1186,7 class ParameterReader(JRODataReader,ProcessingUnit):
1293 1186 self.isConfig = True
1294 1187
1295 1188 if self.blockIndex == self.blocksPerFile:
1296 if not(self.__setNextFileOffline()):
1297 self.dataOut.flagNoData = True
1298 return 0
1189 self.setNextFile()
1299 1190
1300 1191 self.getData()
1301 1192
@@ -42,6 +42,8 class ProcessingUnit(object):
42 42
43 43
44 44 """
45 proc_type = 'processing'
46 __attrs__ = []
45 47
46 48 def __init__(self):
47 49
@@ -128,6 +130,8 class Operation(object):
128 130 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
129 131
130 132 """
133 proc_type = 'operation'
134 __attrs__ = []
131 135
132 136 def __init__(self):
133 137
@@ -178,54 +182,31 class Operation(object):
178 182 class InputQueue(Thread):
179 183
180 184 '''
181
182 185 Class to hold input data for Proccessing Units and external Operations,
183
184 186 '''
185 187
186
187
188 188 def __init__(self, project_id, inputId):
189 189
190 190 Thread.__init__(self)
191
192 191 self.queue = Queue()
193
194 192 self.project_id = project_id
195
196 193 self.inputId = inputId
197 194
198
199
200 195 def run(self):
201 196
202
203
204 197 c = zmq.Context()
205
206 198 self.receiver = c.socket(zmq.SUB)
207
208 199 self.receiver.connect(
209
210 200 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
211
212 201 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
213
214
215
202
216 203 while True:
217
218 204 self.queue.put(self.receiver.recv_multipart()[1])
219 205
220
221
222 206 def get(self):
223
224
225
207
226 208 return pickle.loads(self.queue.get())
227
228
209
229 210
230 211 def MPDecorator(BaseClass):
231 212 """
@@ -248,6 +229,7 def MPDecorator(BaseClass):
248 229 self.i = 0
249 230 self.t = time.time()
250 231 self.name = BaseClass.__name__
232 self.__doc__ = BaseClass.__doc__
251 233
252 234 if 'plot' in self.name.lower() and not self.name.endswith('_'):
253 235 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
General Comments 0
You need to be logged in to leave comments. Login now