##// END OF EJS Templates
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
jespinoza -
r1254:6b25d3b79646
parent child
Show More
@@ -627,48 +627,3 class AMISRReader(ProcessingUnit):
627 627 self.isConfig = True
628 628
629 629 self.getData()
630
631 class Writer(Operation):
632 '''
633 classdocs
634 '''
635
636 def __init__(self):
637 '''
638 Constructor
639 '''
640 self.dataOut = None
641
642 self.isConfig = False
643
644 def setup(self, dataIn, path, blocksPerFile, set=0, ext=None):
645 '''
646 In this method we should set all initial parameters.
647
648 Input:
649 dataIn : Input data will also be outputa data
650
651 '''
652 self.dataOut = dataIn
653
654
655
656
657
658 self.isConfig = True
659
660 return
661
662 def run(self, dataIn, **kwargs):
663 '''
664 This method will be called many times so here you should put all your code
665
666 Inputs:
667
668 dataIn : object with the data
669
670 '''
671
672 if not self.isConfig:
673 self.setup(dataIn, **kwargs)
674 No newline at end of file
@@ -15,7 +15,7 import numpy
15 15 import h5py
16 16
17 17 import schainpy.admin
18 from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
18 from schainpy.model.io.jroIO_base import LOCALTIME, Reader
19 19 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
20 20 from schainpy.model.data.jrodata import Parameters
21 21 from schainpy.utils import log
@@ -81,7 +81,7 def load_json(obj):
81 81 return iterable
82 82
83 83 @MPDecorator
84 class MADReader(JRODataReader, ProcessingUnit):
84 class MADReader(Reader, ProcessingUnit):
85 85
86 86 def __init__(self):
87 87
@@ -91,91 +91,76 class MADReader(JRODataReader, ProcessingUnit):
91 91 self.counter_records = 0
92 92 self.nrecords = None
93 93 self.flagNoMoreFiles = 0
94 self.isConfig = False
95 94 self.filename = None
96 95 self.intervals = set()
96 self.datatime = datetime.datetime(1900,1,1)
97 self.format = None
98 self.filefmt = "***%Y%m%d*******"
97 99
98 def setup(self,
99 path=None,
100 startDate=None,
101 endDate=None,
102 format=None,
103 startTime=datetime.time(0, 0, 0),
104 endTime=datetime.time(23, 59, 59),
105 **kwargs):
100 def setup(self, **kwargs):
106 101
107 self.path = path
108 self.startDate = startDate
109 self.endDate = endDate
110 self.startTime = startTime
111 self.endTime = endTime
112 self.datatime = datetime.datetime(1900,1,1)
113 self.oneDDict = load_json(kwargs.get('oneDDict',
114 "{\"GDLATR\":\"lat\", \"GDLONR\":\"lon\"}"))
115 self.twoDDict = load_json(kwargs.get('twoDDict',
116 "{\"GDALT\": \"heightList\"}"))
117 self.independentParam = 'GDALT'
102 self.set_kwargs(**kwargs)
103 self.oneDDict = load_json(self.oneDDict)
104 self.twoDDict = load_json(self.twoDDict)
105 self.ind2DList = load_json(self.ind2DList)
106 self.independentParam = self.ind2DList[0]
118 107
119 108 if self.path is None:
120 109 raise ValueError('The path is not valid')
121 110
122 if format is None:
111 self.open_file = open
112 self.open_mode = 'rb'
113
114 if self.format is None:
123 115 raise ValueError('The format is not valid choose simple or hdf5')
124 elif format.lower() in ('simple', 'txt'):
116 elif self.format.lower() in ('simple', 'txt'):
125 117 self.ext = '.txt'
126 elif format.lower() in ('cedar',):
118 elif self.format.lower() in ('cedar',):
127 119 self.ext = '.001'
128 120 else:
129 121 self.ext = '.hdf5'
122 self.open_file = h5py.File
123 self.open_mode = 'r'
130 124
131 self.search_files(self.path)
132 self.fileId = 0
133
134 if not self.fileList:
135 raise Warning('There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path))
136
137 self.setNextFile()
138
139 def search_files(self, path):
140 '''
141 Searching for madrigal files in path
142 Creating a list of files to procces included in [startDate,endDate]
143
144 Input:
145 path - Path to find files
146 '''
125 if self.online:
126 log.log("Searching files in online mode...", self.name)
147 127
148 log.log('Searching files {} in {} '.format(self.ext, path), 'MADReader')
149 fileList0 = glob.glob1(path, '*{}'.format(self.ext))
150 fileList0.sort()
128 for nTries in range(self.nTries):
129 fullpath = self.searchFilesOnLine(self.path, self.startDate,
130 self.endDate, self.expLabel, self.ext, self.walk,
131 self.filefmt, self.folderfmt)
151 132
152 self.fileList = []
153 self.dateFileList = []
133 try:
134 fullpath = next(fullpath)
135 except:
136 fullpath = None
154 137
155 startDate = self.startDate - datetime.timedelta(1)
156 endDate = self.endDate + datetime.timedelta(1)
138 if fullpath:
139 break
157 140
158 for thisFile in fileList0:
159 year = thisFile[3:7]
160 if not year.isdigit():
161 continue
141 log.warning(
142 'Waiting {} sec for a valid file in {}: try {} ...'.format(
143 self.delay, self.path, nTries + 1),
144 self.name)
145 time.sleep(self.delay)
162 146
163 month = thisFile[7:9]
164 if not month.isdigit():
165 continue
147 if not(fullpath):
148 raise schainpy.admin.SchainError(
149 'There isn\'t any valid file in {}'.format(self.path))
166 150
167 day = thisFile[9:11]
168 if not day.isdigit():
169 continue
151 else:
152 log.log("Searching files in {}".format(self.path), self.name)
153 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
154 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
170 155
171 year, month, day = int(year), int(month), int(day)
172 dateFile = datetime.date(year, month, day)
156 self.setNextFile()
173 157
174 if (startDate > dateFile) or (endDate < dateFile):
175 continue
158 def readFirstHeader(self):
159 '''Read header and data'''
176 160
177 self.fileList.append(thisFile)
178 self.dateFileList.append(dateFile)
161 self.parseHeader()
162 self.parseData()
163 self.blockIndex = 0
179 164
180 165 return
181 166
@@ -238,50 +223,12 class MADReader(JRODataReader, ProcessingUnit):
238 223 self.counter_records = int(self.data['Table Layout']['recno'][0])
239 224 self.nrecords = int(self.data['Table Layout']['recno'][-1])
240 225
241 def setNextFile(self):
242 '''
243 '''
244
245 file_id = self.fileId
246
247 if file_id == len(self.fileList):
248 log.success('No more files', 'MADReader')
249 self.flagNoMoreFiles = 1
250 return 0
251
252 log.success(
253 'Opening: {}'.format(self.fileList[file_id]),
254 'MADReader'
255 )
256
257 filename = os.path.join(self.path, self.fileList[file_id])
258
259 if self.filename is not None:
260 self.fp.close()
261
262 self.filename = filename
263 self.filedate = self.dateFileList[file_id]
264
265 if self.ext=='.hdf5':
266 self.fp = h5py.File(self.filename, 'r')
267 else:
268 self.fp = open(self.filename, 'rb')
269
270 self.parseHeader()
271 self.parseData()
272 self.sizeOfFile = os.path.getsize(self.filename)
273 self.flagIsNewFile = 0
274 self.fileId += 1
275
276 return 1
277
278 226 def readNextBlock(self):
279 227
280 228 while True:
281 229 self.flagDiscontinuousBlock = 0
282 if self.flagIsNewFile:
283 if not self.setNextFile():
284 return 0
230 if self.counter_records == self.nrecords:
231 self.setNextFile()
285 232
286 233 self.readBlock()
287 234
@@ -321,7 +268,6 class MADReader(JRODataReader, ProcessingUnit):
321 268 dum.append(self.data[self.counter_records])
322 269 self.counter_records += 1
323 270 if self.counter_records == self.nrecords:
324 self.flagIsNewFile = True
325 271 break
326 272 continue
327 273 self.intervals.add((datatime-self.datatime).seconds)
@@ -335,8 +281,6 class MADReader(JRODataReader, ProcessingUnit):
335 281 self.flagDiscontinuousBlock = 1
336 282 self.datatime = datatime
337 283 self.counter_records += 1
338 if self.counter_records == self.nrecords:
339 self.flagIsNewFile = True
340 284
341 285 self.buffer = numpy.array(dum)
342 286 return
@@ -390,10 +334,6 class MADReader(JRODataReader, ProcessingUnit):
390 334 '''
391 335 Storing data from databuffer to dataOut object
392 336 '''
393 if self.flagNoMoreFiles:
394 self.dataOut.flagNoData = True
395 raise schainpy.admin.SchainError('No file left to process')
396 return 0
397 337
398 338 if not self.readNextBlock():
399 339 self.dataOut.flagNoData = True
@@ -403,34 +343,34 class MADReader(JRODataReader, ProcessingUnit):
403 343
404 344 return 1
405 345
406 @MPDecorator
407 class MADWriter(Operation):
346 def run(self, **kwargs):
408 347
409 missing = -32767
348 if not(self.isConfig):
349 self.setup(**kwargs)
350 self.isConfig = True
410 351
411 def __init__(self):
352 self.getData()
412 353
413 Operation.__init__(self)
414 self.dataOut = Parameters()
415 self.counter = 0
416 self.path = None
417 self.fp = None
354 return
355
356 @MPDecorator
357 class MADWriter(Operation):
358 '''Writing module for Madrigal files
359
360 type: external
418 361
419 def run(self, dataOut, path, oneDDict, independentParam='[]', twoDDict='{}',
420 metadata='{}', format='cedar', **kwargs):
421 '''
422 362 Inputs:
423 path - path where files will be created
424 oneDDict - json of one-dimensional parameters in record where keys
363 path path where files will be created
364 oneDDict json of one-dimensional parameters in record where keys
425 365 are Madrigal codes (integers or mnemonics) and values the corresponding
426 366 dataOut attribute e.g: {
427 367 'gdlatr': 'lat',
428 368 'gdlonr': 'lon',
429 369 'gdlat2':'lat',
430 370 'glon2':'lon'}
431 independentParam - list of independent spatial two-dimensional parameters e.g:
371 ind2DList list of independent spatial two-dimensional parameters e.g:
432 372 ['heigthList']
433 twoDDict - json of two-dimensional parameters in record where keys
373 twoDDict json of two-dimensional parameters in record where keys
434 374 are Madrigal codes (integers or mnemonics) and values the corresponding
435 375 dataOut attribute if multidimensional array specify as tupple
436 376 ('attr', pos) e.g: {
@@ -440,17 +380,33 class MADWriter(Operation):
440 380 'vn3': ('data_output', 2),
441 381 'snl': ('data_SNR', 'db')
442 382 }
443 metadata - json of madrigal metadata (kinst, kindat, catalog and header)
444 '''
383 metadata json of madrigal metadata (kinst, kindat, catalog and header)
384 format hdf5, cedar
385 blocks number of blocks per file'''
386
387 __attrs__ = ['path', 'oneDDict', 'ind2DList', 'twoDDict','metadata', 'format', 'blocks']
388 missing = -32767
389
390 def __init__(self):
391
392 Operation.__init__(self)
393 self.dataOut = Parameters()
394 self.counter = 0
395 self.path = None
396 self.fp = None
397
398 def run(self, dataOut, path, oneDDict, ind2DList='[]', twoDDict='{}',
399 metadata='{}', format='cedar', **kwargs):
400
445 401 if not self.isConfig:
446 self.setup(path, oneDDict, independentParam, twoDDict, metadata, format, **kwargs)
402 self.setup(path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs)
447 403 self.isConfig = True
448 404
449 405 self.dataOut = dataOut
450 406 self.putData()
451 407 return 1
452 408
453 def setup(self, path, oneDDict, independentParam, twoDDict, metadata, format, **kwargs):
409 def setup(self, path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs):
454 410 '''
455 411 Configure Operation
456 412 '''
@@ -460,7 +416,7 class MADWriter(Operation):
460 416 self.counter = 0
461 417 self.oneDDict = load_json(oneDDict)
462 418 self.twoDDict = load_json(twoDDict)
463 self.independentParam = load_json(independentParam)
419 self.ind2DList = load_json(ind2DList)
464 420 meta = load_json(metadata)
465 421 self.kinst = meta.get('kinst')
466 422 self.kindat = meta.get('kindat')
@@ -471,7 +427,7 class MADWriter(Operation):
471 427 self.extra_args = {}
472 428 elif format == 'hdf5':
473 429 self.ext = '.hdf5'
474 self.extra_args = {'independentParam': self.independentParam}
430 self.extra_args = {'ind2DList': self.ind2DList}
475 431
476 432 self.keys = [k.lower() for k in self.twoDDict]
477 433 if 'range' in self.keys:
@@ -504,6 +460,8 class MADWriter(Operation):
504 460 log.success(
505 461 'Creating file: {}'.format(self.fullname),
506 462 'MADWriter')
463 if not os.path.exists(self.path):
464 os.makedirs(self.path)
507 465 self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True)
508 466 except ValueError as e:
509 467 log.error(
@@ -546,11 +504,11 class MADWriter(Operation):
546 504 tmp = 10*numpy.log10(SNRavg)
547 505 else:
548 506 tmp = getattr(self.dataOut, value)
549 out[key] = tmp.flatten()
507 out[key] = tmp.flatten()[:len(heights)]
550 508 elif isinstance(value, (tuple, list)):
551 509 attr, x = value
552 510 data = getattr(self.dataOut, attr)
553 out[key] = data[int(x)]
511 out[key] = data[int(x)][:len(heights)]
554 512
555 513 a = numpy.array([out[k] for k in self.keys])
556 514 nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))])
@@ -966,218 +966,82 class ParamWriter(Operation):
966 966
967 967
968 968 @MPDecorator
969 class ParameterReader(JRODataReader,ProcessingUnit):
969 class ParameterReader(Reader, ProcessingUnit):
970 970 '''
971 971 Reads HDF5 format files
972 972 '''
973 973
974 ext = ".hdf5"
975 optchar = "D"
976 timezone = None
977 startTime = None
978 endTime = None
979 fileIndex = None
980 blockList = None #List to blocks to be read from the file
981 blocksPerFile = None #Number of blocks to be read
982 blockIndex = None
983 path = None
984 #List of Files
985 filenameList = None
986 datetimeList = None
987 #Hdf5 File
988 listMetaname = None
989 listMeta = None
990 listDataname = None
991 listData = None
992 listShapes = None
993 fp = None
994 #dataOut reconstruction
995 dataOut = None
996
997 974 def __init__(self):
998 975 ProcessingUnit.__init__(self)
999 976 self.dataOut = Parameters()
1000 return
977 self.ext = ".hdf5"
978 self.optchar = "D"
979 self.timezone = "lt"
980 self.listMetaname = []
981 self.listMeta = []
982 self.listDataname = []
983 self.listData = []
984 self.listShapes = []
985 self.open_file = h5py.File
986 self.open_mode = 'r'
987 self.metadata = False
988 self.filefmt = "*%Y%j***"
989 self.folderfmt = "*%Y%j"
1001 990
1002 991 def setup(self, **kwargs):
1003 992
1004 path = kwargs['path']
1005 startDate = kwargs['startDate']
1006 endDate = kwargs['endDate']
1007 startTime = kwargs['startTime']
1008 endTime = kwargs['endTime']
1009 walk = kwargs['walk']
1010 if 'ext' in kwargs:
1011 ext = kwargs['ext']
1012 else:
1013 ext = '.hdf5'
1014 if 'timezone' in kwargs:
1015 self.timezone = kwargs['timezone']
1016 else:
1017 self.timezone = 'lt'
1018
1019 print("[Reading] Searching files in offline mode ...")
1020 pathList, filenameList = self.searchFilesOffLine(path, startDate=startDate, endDate=endDate,
1021 startTime=startTime, endTime=endTime,
1022 ext=ext, walk=walk)
1023
1024 if not(filenameList):
1025 print("There is no files into the folder: %s"%(path))
1026 sys.exit(-1)
1027
1028 self.fileIndex = -1
1029 self.startTime = startTime
1030 self.endTime = endTime
1031 self.__readMetadata()
1032 self.__setNextFileOffline()
1033
1034 return
1035
1036 def searchFilesOffLine(self, path, startDate=None, endDate=None, startTime=datetime.time(0,0,0), endTime=datetime.time(23,59,59), ext='.hdf5', walk=True):
1037
1038 expLabel = ''
1039 self.filenameList = []
1040 self.datetimeList = []
1041 pathList = []
1042 dateList, pathList = self.findDatafiles(path, startDate, endDate, expLabel, ext, walk, include_path=True)
1043
1044 if dateList == []:
1045 print("[Reading] No *%s files in %s from %s to %s)"%(ext, path,
1046 datetime.datetime.combine(startDate,startTime).ctime(),
1047 datetime.datetime.combine(endDate,endTime).ctime()))
1048
1049 return None, None
1050
1051 if len(dateList) > 1:
1052 print("[Reading] %d days were found in date range: %s - %s" %(len(dateList), startDate, endDate))
1053 else:
1054 print("[Reading] data was found for the date %s" %(dateList[0]))
1055
1056 filenameList = []
1057 datetimeList = []
1058
1059 for thisPath in pathList:
1060
1061 fileList = glob.glob1(thisPath, "*%s" %ext)
1062 fileList.sort()
1063
1064 for file in fileList:
1065
1066 filename = os.path.join(thisPath,file)
1067
1068 if not isFileInDateRange(filename, startDate, endDate):
1069 continue
1070
1071 thisDatetime = self.__isFileInTimeRange(filename, startDate, endDate, startTime, endTime)
1072
1073 if not(thisDatetime):
1074 continue
1075
1076 filenameList.append(filename)
1077 datetimeList.append(thisDatetime)
1078
1079 if not(filenameList):
1080 print("[Reading] Any file was found int time range %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime()))
1081 return None, None
1082
1083 print("[Reading] %d file(s) was(were) found in time range: %s - %s" %(len(filenameList), startTime, endTime))
1084 print()
1085
1086 self.filenameList = filenameList
1087 self.datetimeList = datetimeList
1088
1089 return pathList, filenameList
1090
1091 def __isFileInTimeRange(self,filename, startDate, endDate, startTime, endTime):
1092
1093 """
1094 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
1095
1096 Inputs:
1097 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
1098 startDate : fecha inicial del rango seleccionado en formato datetime.date
1099 endDate : fecha final del rango seleccionado en formato datetime.date
1100 startTime : tiempo inicial del rango seleccionado en formato datetime.time
1101 endTime : tiempo final del rango seleccionado en formato datetime.time
1102
1103 Return:
1104 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
1105 fecha especificado, de lo contrario retorna False.
993 self.set_kwargs(**kwargs)
994 if not self.ext.startswith('.'):
995 self.ext = '.{}'.format(self.ext)
1106 996
1107 Excepciones:
1108 Si el archivo no existe o no puede ser abierto
1109 Si la cabecera no puede ser leida.
997 if self.online:
998 log.log("Searching files in online mode...", self.name)
1110 999
1111 """
1000 for nTries in range(self.nTries):
1001 fullpath = self.searchFilesOnLine(self.path, self.startDate,
1002 self.endDate, self.expLabel, self.ext, self.walk,
1003 self.filefmt, self.folderfmt)
1112 1004
1113 1005 try:
1114 fp = h5py.File(filename, 'r')
1115 grp1 = fp['Data']
1116
1117 except IOError:
1118 traceback.print_exc()
1119 raise IOError("The file %s can't be opened" %(filename))
1120 #In case has utctime attribute
1121 grp2 = grp1['utctime']
1122 thisUtcTime = grp2.value[0]
1123
1124 fp.close()
1125
1126 if self.timezone == 'lt':
1127 thisUtcTime -= 5*3600
1128
1129 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime + 5*3600)
1130 thisDate = thisDatetime.date()
1131 thisTime = thisDatetime.time()
1132
1133 startUtcTime = (datetime.datetime.combine(thisDate,startTime)- datetime.datetime(1970, 1, 1)).total_seconds()
1134 endUtcTime = (datetime.datetime.combine(thisDate,endTime)- datetime.datetime(1970, 1, 1)).total_seconds()
1135
1136 #General case
1137 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
1138 #-----------o----------------------------o-----------
1139 # startTime endTime
1140
1141 if endTime >= startTime:
1142 thisUtcLog = numpy.logical_and(thisUtcTime > startUtcTime, thisUtcTime < endUtcTime)
1143 if numpy.any(thisUtcLog): #If there is one block between the hours mentioned
1144 return thisDatetime
1145 return None
1146
1147 #If endTime < startTime then endTime belongs to the next day
1148 #<<<<<<<<<<<o o>>>>>>>>>>>
1149 #-----------o----------------------------o-----------
1150 # endTime startTime
1151
1152 if (thisDate == startDate) and numpy.all(thisUtcTime < startUtcTime):
1153 return None
1154
1155 if (thisDate == endDate) and numpy.all(thisUtcTime > endUtcTime):
1156 return None
1157
1158 if numpy.all(thisUtcTime < startUtcTime) and numpy.all(thisUtcTime > endUtcTime):
1159 return None
1160
1161 return thisDatetime
1162
1163 def __setNextFileOffline(self):
1164
1165 self.fileIndex += 1
1166 idFile = self.fileIndex
1006 fullpath = next(fullpath)
1007 except:
1008 fullpath = None
1009
1010 if fullpath:
1011 break
1012
1013 log.warning(
1014 'Waiting {} sec for a valid file in {}: try {} ...'.format(
1015 self.delay, self.path, nTries + 1),
1016 self.name)
1017 time.sleep(self.delay)
1018
1019 if not(fullpath):
1020 raise schainpy.admin.SchainError(
1021 'There isn\'t any valid file in {}'.format(self.path))
1022
1023 pathname, filename = os.path.split(fullpath)
1024 self.year = int(filename[1:5])
1025 self.doy = int(filename[5:8])
1026 self.set = int(filename[8:11]) - 1
1027 else:
1028 log.log("Searching files in {}".format(self.path), self.name)
1029 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
1030 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
1167 1031
1168 if not(idFile < len(self.filenameList)):
1169 raise schainpy.admin.SchainError('No more files')
1032 self.setNextFile()
1170 1033
1171 filename = self.filenameList[idFile]
1172 self.fp = h5py.File(filename, 'r')
1173 self.filename = filename
1034 return
1174 1035
1175 print("Setting the file: %s"%self.filename)
1036 def readFirstHeader(self):
1037 '''Read metadata and data'''
1176 1038
1177 self.__setBlockList()
1039 self.__readMetadata()
1178 1040 self.__readData()
1041 self.__setBlockList()
1179 1042 self.blockIndex = 0
1180 return 1
1043
1044 return
1181 1045
1182 1046 def __setBlockList(self):
1183 1047 '''
@@ -1190,12 +1054,13 class ParameterReader(JRODataReader,ProcessingUnit):
1190 1054 self.blocksPerFile
1191 1055
1192 1056 '''
1193 fp = self.fp
1057
1194 1058 startTime = self.startTime
1195 1059 endTime = self.endTime
1196 1060
1197 grp = fp['Data']
1198 thisUtcTime = grp['utctime'].value
1061 index = self.listDataname.index('utctime')
1062 thisUtcTime = self.listData[index]
1063 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
1199 1064
1200 1065 if self.timezone == 'lt':
1201 1066 thisUtcTime -= 5*3600
@@ -1219,12 +1084,10 class ParameterReader(JRODataReader,ProcessingUnit):
1219 1084 Reads Metadata
1220 1085 '''
1221 1086
1222 filename = self.filenameList[0]
1223 fp = h5py.File(filename, 'r')
1224 gp = fp['Metadata']
1225 1087 listMetaname = []
1226 1088 listMetadata = []
1227
1089 if 'Metadata' in self.fp:
1090 gp = self.fp['Metadata']
1228 1091 for item in list(gp.items()):
1229 1092 name = item[0]
1230 1093
@@ -1237,20 +1100,32 class ParameterReader(JRODataReader,ProcessingUnit):
1237 1100 data = gp[name].value
1238 1101 listMetaname.append(name)
1239 1102 listMetadata.append(data)
1103 elif self.metadata:
1104 metadata = json.loads(self.metadata)
1105 listShapes = {}
1106 for tup in metadata:
1107 name, values, dim = tup
1108 if dim == -1:
1109 listMetaname.append(name)
1110 listMetadata.append(self.fp[values].value)
1111 else:
1112 listShapes[name] = numpy.array([dim])
1113 else:
1114 raise IOError('Missing Metadata group in file or metadata info')
1240 1115
1241 1116 self.listShapes = listShapes
1242 1117 self.listMetaname = listMetaname
1243 1118 self.listMeta = listMetadata
1244 1119
1245 fp.close()
1246 1120 return
1247 1121
1248 1122 def __readData(self):
1249 1123
1250 grp = self.fp['Data']
1251 1124 listdataname = []
1252 1125 listdata = []
1253 1126
1127 if 'Data' in self.fp:
1128 grp = self.fp['Data']
1254 1129 for item in list(grp.items()):
1255 1130 name = item[0]
1256 1131 listdataname.append(name)
@@ -1264,6 +1139,23 class ParameterReader(JRODataReader,ProcessingUnit):
1264 1139 array = numpy.array(array)
1265 1140
1266 1141 listdata.append(array)
1142 elif self.metadata:
1143 metadata = json.loads(self.metadata)
1144 for tup in metadata:
1145 name, values, dim = tup
1146 listdataname.append(name)
1147 if dim == -1:
1148 continue
1149 elif dim == 0:
1150 array = self.fp[values].value
1151 else:
1152 array = []
1153 for var in values:
1154 array.append(self.fp[var].value)
1155 array = numpy.array(array)
1156 listdata.append(array)
1157 else:
1158 raise IOError('Missing Data group in file or metadata info')
1267 1159
1268 1160 self.listDataname = listdataname
1269 1161 self.listData = listdata
@@ -1281,6 +1173,7 class ParameterReader(JRODataReader,ProcessingUnit):
1281 1173 else:
1282 1174 setattr(self.dataOut, self.listDataname[j], self.listData[j][:,self.blockIndex])
1283 1175
1176 self.dataOut.paramInterval = self.interval
1284 1177 self.dataOut.flagNoData = False
1285 1178 self.blockIndex += 1
1286 1179
@@ -1293,9 +1186,7 class ParameterReader(JRODataReader,ProcessingUnit):
1293 1186 self.isConfig = True
1294 1187
1295 1188 if self.blockIndex == self.blocksPerFile:
1296 if not(self.__setNextFileOffline()):
1297 self.dataOut.flagNoData = True
1298 return 0
1189 self.setNextFile()
1299 1190
1300 1191 self.getData()
1301 1192
@@ -42,6 +42,8 class ProcessingUnit(object):
42 42
43 43
44 44 """
45 proc_type = 'processing'
46 __attrs__ = []
45 47
46 48 def __init__(self):
47 49
@@ -128,6 +130,8 class Operation(object):
128 130 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
129 131
130 132 """
133 proc_type = 'operation'
134 __attrs__ = []
131 135
132 136 def __init__(self):
133 137
@@ -178,55 +182,32 class Operation(object):
178 182 class InputQueue(Thread):
179 183
180 184 '''
181
182 185 Class to hold input data for Proccessing Units and external Operations,
183
184 186 '''
185 187
186
187
188 188 def __init__(self, project_id, inputId):
189 189
190 190 Thread.__init__(self)
191
192 191 self.queue = Queue()
193
194 192 self.project_id = project_id
195
196 193 self.inputId = inputId
197 194
198
199
200 195 def run(self):
201 196
202
203
204 197 c = zmq.Context()
205
206 198 self.receiver = c.socket(zmq.SUB)
207
208 199 self.receiver.connect(
209
210 200 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
211
212 201 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
213 202
214
215
216 203 while True:
217
218 204 self.queue.put(self.receiver.recv_multipart()[1])
219 205
220
221
222 206 def get(self):
223 207
224
225
226 208 return pickle.loads(self.queue.get())
227 209
228 210
229
230 211 def MPDecorator(BaseClass):
231 212 """
232 213 Multiprocessing class decorator
@@ -248,6 +229,7 def MPDecorator(BaseClass):
248 229 self.i = 0
249 230 self.t = time.time()
250 231 self.name = BaseClass.__name__
232 self.__doc__ = BaseClass.__doc__
251 233
252 234 if 'plot' in self.name.lower() and not self.name.endswith('_'):
253 235 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
General Comments 0
You need to be logged in to leave comments. Login now