##// END OF EJS Templates
Se comenta la linea 181, el atributo self.interval no se usa y genera error, en los scripts que utilicen el Reader de hdf5 jroIO_param.py, se sugiere en la unidad de lectura añadir el utcoffset=-18000 debido a que el reader trabaja en UTC, si se actualiza a esta version se puede hacer la seleccion de intervalos de tiempo en el startTime y endTime
Alexander Valdez -
r1720:2b23d7682e63
parent child
Show More
@@ -1,650 +1,650
1 1 import os
2 2 import time
3 3 import datetime
4 4
5 5 import numpy
6 6 import h5py
7 7
8 8 import schainpy.admin
9 9 from schainpy.model.data.jrodata import *
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.io.jroIO_base import *
12 12 from schainpy.utils import log
13 13
14 14
15 15 class HDFReader(Reader, ProcessingUnit):
16 16 """Processing unit to read HDF5 format files
17 17
18 18 This unit reads HDF5 files created with `HDFWriter` operation contains
19 19 by default two groups Data and Metadata all variables would be saved as `dataOut`
20 20 attributes.
21 21 It is possible to read any HDF5 file by given the structure in the `description`
22 22 parameter, also you can add extra values to metadata with the parameter `extras`.
23 23
24 24 Parameters:
25 25 -----------
26 26 path : str
27 27 Path where files are located.
28 28 startDate : date
29 29 Start date of the files
30 30 endDate : list
31 31 End date of the files
32 32 startTime : time
33 33 Start time of the files
34 34 endTime : time
35 35 End time of the files
36 36 description : dict, optional
37 37 Dictionary with the description of the HDF5 file
38 38 extras : dict, optional
39 39 Dictionary with extra metadata to be be added to `dataOut`
40 40
41 41 Attention: Be carefull, add attribute utcoffset, in the last part of reader in order to work in Local Time without time problems.
42 42
43 43 -----------
44 44 utcoffset='-18000'
45 45
46 46
47 47 Examples
48 48 --------
49 49
50 50 desc = {
51 51 'Data': {
52 52 'data_output': ['u', 'v', 'w'],
53 53 'utctime': 'timestamps',
54 54 } ,
55 55 'Metadata': {
56 56 'heightList': 'heights'
57 57 }
58 58 }
59 59
60 60 desc = {
61 61 'Data': {
62 62 'data_output': 'winds',
63 63 'utctime': 'timestamps'
64 64 },
65 65 'Metadata': {
66 66 'heightList': 'heights'
67 67 }
68 68 }
69 69
70 70 extras = {
71 71 'timeZone': 300
72 72 }
73 73
74 74 reader = project.addReadUnit(
75 75 name='HDFReader',
76 76 path='/path/to/files',
77 77 startDate='2019/01/01',
78 78 endDate='2019/01/31',
79 79 startTime='00:00:00',
80 80 endTime='23:59:59',
81 81 utcoffset='-18000'
82 82 # description=json.dumps(desc),
83 83 # extras=json.dumps(extras),
84 84 )
85 85
86 86 """
87 87
88 88 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras']
89 89
90 90 def __init__(self):
91 91 ProcessingUnit.__init__(self)
92 92 self.dataOut = Parameters()
93 93 self.ext = ".hdf5"
94 94 self.optchar = "D"
95 95 self.meta = {}
96 96 self.data = {}
97 97 self.open_file = h5py.File
98 98 self.open_mode = 'r'
99 99 self.description = {}
100 100 self.extras = {}
101 101 self.filefmt = "*%Y%j***"
102 102 self.folderfmt = "*%Y%j"
103 103 self.utcoffset = 0
104 104
105 105 def setup(self, **kwargs):
106 106
107 107 self.set_kwargs(**kwargs)
108 108 if not self.ext.startswith('.'):
109 109 self.ext = '.{}'.format(self.ext)
110 110
111 111 if self.online:
112 112 log.log("Searching files in online mode...", self.name)
113 113
114 114 for nTries in range(self.nTries):
115 115 fullpath = self.searchFilesOnLine(self.path, self.startDate,
116 116 self.endDate, self.expLabel, self.ext, self.walk,
117 117 self.filefmt, self.folderfmt)
118 118 try:
119 119 fullpath = next(fullpath)
120 120 except:
121 121 fullpath = None
122 122
123 123 if fullpath:
124 124 break
125 125
126 126 log.warning(
127 127 'Waiting {} sec for a valid file in {}: try {} ...'.format(
128 128 self.delay, self.path, nTries + 1),
129 129 self.name)
130 130 time.sleep(self.delay)
131 131
132 132 if not(fullpath):
133 133 raise schainpy.admin.SchainError(
134 134 'There isn\'t any valid file in {}'.format(self.path))
135 135
136 136 pathname, filename = os.path.split(fullpath)
137 137 self.year = int(filename[1:5])
138 138 self.doy = int(filename[5:8])
139 139 self.set = int(filename[8:11]) - 1
140 140 else:
141 141 log.log("Searching files in {}".format(self.path), self.name)
142 142 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
143 143 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
144 144
145 145 self.setNextFile()
146 146
147 147 return
148 148
149 149 def readFirstHeader(self):
150 150 '''Read metadata and data'''
151 151
152 152 self.__readMetadata()
153 153 self.__readData()
154 154 self.__setBlockList()
155 155
156 156 if 'type' in self.meta:
157 157 self.dataOut = eval(self.meta['type'])()
158 158
159 159 for attr in self.meta:
160 160 setattr(self.dataOut, attr, self.meta[attr])
161 161
162 162 self.blockIndex = 0
163 163
164 164 return
165 165
166 166 def __setBlockList(self):
167 167 '''
168 168 Selects the data within the times defined
169 169
170 170 self.fp
171 171 self.startTime
172 172 self.endTime
173 173 self.blockList
174 174 self.blocksPerFile
175 175
176 176 '''
177 177
178 178 startTime = self.startTime
179 179 endTime = self.endTime
180 180 thisUtcTime = self.data['utctime'] + self.utcoffset
181 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
181 #self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
182 182 thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0])
183 183
184 184 thisDate = thisDatetime.date()
185 185 thisTime = thisDatetime.time()
186 186 startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
187 187 endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
188 188 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
189 189
190 190 self.blockList = ind
191 191 self.blocksPerFile = len(ind)
192 192
193 193 if len(ind)==0:
194 194 print("[Reading] Block No. %d/%d -> %s [Skipping]" % (self.blockIndex,
195 195 self.blocksPerFile,
196 196 thisDatetime))
197 197 self.setNextFile()
198 198
199 199 return
200 200
201 201 def __readMetadata(self):
202 202 '''
203 203 Reads Metadata
204 204 '''
205 205
206 206 meta = {}
207 207
208 208 if self.description:
209 209 for key, value in self.description['Metadata'].items():
210 210 meta[key] = self.fp[value][()]
211 211 else:
212 212 grp = self.fp['Metadata']
213 213 for name in grp:
214 214 meta[name] = grp[name][()]
215 215
216 216 if self.extras:
217 217 for key, value in self.extras.items():
218 218 meta[key] = value
219 219 self.meta = meta
220 220
221 221 return
222 222
223 223 def __readData(self):
224 224
225 225 data = {}
226 226
227 227 if self.description:
228 228 for key, value in self.description['Data'].items():
229 229 if isinstance(value, str):
230 230 if isinstance(self.fp[value], h5py.Dataset):
231 231 data[key] = self.fp[value][()]
232 232 elif isinstance(self.fp[value], h5py.Group):
233 233 array = []
234 234 for ch in self.fp[value]:
235 235 array.append(self.fp[value][ch][()])
236 236 data[key] = numpy.array(array)
237 237 elif isinstance(value, list):
238 238 array = []
239 239 for ch in value:
240 240 array.append(self.fp[ch][()])
241 241 data[key] = numpy.array(array)
242 242 else:
243 243 grp = self.fp['Data']
244 244 for name in grp:
245 245 if isinstance(grp[name], h5py.Dataset):
246 246 array = grp[name][()]
247 247 elif isinstance(grp[name], h5py.Group):
248 248 array = []
249 249 for ch in grp[name]:
250 250 array.append(grp[name][ch][()])
251 251 array = numpy.array(array)
252 252 else:
253 253 log.warning('Unknown type: {}'.format(name))
254 254
255 255 if name in self.description:
256 256 key = self.description[name]
257 257 else:
258 258 key = name
259 259 data[key] = array
260 260
261 261 self.data = data
262 262 return
263 263
264 264 def getData(self):
265 265
266 266 for attr in self.data:
267 267 if self.data[attr].ndim == 1:
268 268 setattr(self.dataOut, attr, self.data[attr][self.blockIndex])
269 269 else:
270 270 setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex])
271 271
272 272 self.dataOut.flagNoData = False
273 273 self.blockIndex += 1
274 274
275 275 log.log("Block No. {}/{} -> {}".format(
276 276 self.blockIndex,
277 277 self.blocksPerFile,
278 278 self.dataOut.datatime.ctime()), self.name)
279 279
280 280 return
281 281
282 282 def run(self, **kwargs):
283 283
284 284 if not(self.isConfig):
285 285 self.setup(**kwargs)
286 286 self.isConfig = True
287 287
288 288 if self.blockIndex == self.blocksPerFile:
289 289 self.setNextFile()
290 290
291 291 self.getData()
292 292
293 293 return
294 294
295 295 @MPDecorator
296 296 class HDFWriter(Operation):
297 297 """Operation to write HDF5 files.
298 298
299 299 The HDF5 file contains by default two groups Data and Metadata where
300 300 you can save any `dataOut` attribute specified by `dataList` and `metadataList`
301 301 parameters, data attributes are normaly time dependent where the metadata
302 302 are not.
303 303 It is possible to customize the structure of the HDF5 file with the
304 304 optional description parameter see the examples.
305 305
306 306 Parameters:
307 307 -----------
308 308 path : str
309 309 Path where files will be saved.
310 310 blocksPerFile : int
311 311 Number of blocks per file
312 312 metadataList : list
313 313 List of the dataOut attributes that will be saved as metadata
314 314 dataList : int
315 315 List of the dataOut attributes that will be saved as data
316 316 setType : bool
317 317 If True the name of the files corresponds to the timestamp of the data
318 318 description : dict, optional
319 319 Dictionary with the desired description of the HDF5 file
320 320
321 321 Examples
322 322 --------
323 323
324 324 desc = {
325 325 'data_output': {'winds': ['z', 'w', 'v']},
326 326 'utctime': 'timestamps',
327 327 'heightList': 'heights'
328 328 }
329 329 desc = {
330 330 'data_output': ['z', 'w', 'v'],
331 331 'utctime': 'timestamps',
332 332 'heightList': 'heights'
333 333 }
334 334 desc = {
335 335 'Data': {
336 336 'data_output': 'winds',
337 337 'utctime': 'timestamps'
338 338 },
339 339 'Metadata': {
340 340 'heightList': 'heights'
341 341 }
342 342 }
343 343
344 344 writer = proc_unit.addOperation(name='HDFWriter')
345 345 writer.addParameter(name='path', value='/path/to/file')
346 346 writer.addParameter(name='blocksPerFile', value='32')
347 347 writer.addParameter(name='metadataList', value='heightList,timeZone')
348 348 writer.addParameter(name='dataList',value='data_output,utctime')
349 349 # writer.addParameter(name='description',value=json.dumps(desc))
350 350
351 351 """
352 352
353 353 ext = ".hdf5"
354 354 optchar = "D"
355 355 filename = None
356 356 path = None
357 357 setFile = None
358 358 fp = None
359 359 firsttime = True
360 360 #Configurations
361 361 blocksPerFile = None
362 362 blockIndex = None
363 363 dataOut = None
364 364 #Data Arrays
365 365 dataList = None
366 366 metadataList = None
367 367 currentDay = None
368 368 lastTime = None
369 369
370 370 def __init__(self):
371 371
372 372 Operation.__init__(self)
373 373 return
374 374
375 375 def set_kwargs(self, **kwargs):
376 376
377 377 for key, value in kwargs.items():
378 378 setattr(self, key, value)
379 379
380 380 def set_kwargs_obj(self, obj, **kwargs):
381 381
382 382 for key, value in kwargs.items():
383 383 setattr(obj, key, value)
384 384
385 385 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None, description=None, **kwargs):
386 386 self.path = path
387 387 self.blocksPerFile = blocksPerFile
388 388 self.metadataList = metadataList
389 389 self.dataList = [s.strip() for s in dataList]
390 390 self.setType = setType
391 391 self.description = description
392 392 self.set_kwargs(**kwargs)
393 393
394 394 if self.metadataList is None:
395 395 self.metadataList = self.dataOut.metadata_list
396 396
397 397 tableList = []
398 398 dsList = []
399 399
400 400 for i in range(len(self.dataList)):
401 401 dsDict = {}
402 402 if hasattr(self.dataOut, self.dataList[i]):
403 403 dataAux = getattr(self.dataOut, self.dataList[i])
404 404 dsDict['variable'] = self.dataList[i]
405 405 else:
406 406 log.warning('Attribute {} not found in dataOut', self.name)
407 407 continue
408 408
409 409 if dataAux is None:
410 410 continue
411 411 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
412 412 dsDict['nDim'] = 0
413 413 else:
414 414 dsDict['nDim'] = len(dataAux.shape)
415 415 dsDict['shape'] = dataAux.shape
416 416 dsDict['dsNumber'] = dataAux.shape[0]
417 417 dsDict['dtype'] = dataAux.dtype
418 418
419 419 dsList.append(dsDict)
420 420
421 421 self.dsList = dsList
422 422 self.currentDay = self.dataOut.datatime.date()
423 423
424 424 def timeFlag(self):
425 425 currentTime = self.dataOut.utctime
426 426 timeTuple = time.localtime(currentTime)
427 427 dataDay = timeTuple.tm_yday
428 428
429 429 if self.lastTime is None:
430 430 self.lastTime = currentTime
431 431 self.currentDay = dataDay
432 432 return False
433 433
434 434 timeDiff = currentTime - self.lastTime
435 435
436 436 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
437 437 if dataDay != self.currentDay:
438 438 self.currentDay = dataDay
439 439 return True
440 440 elif timeDiff > 3*60*60:
441 441 self.lastTime = currentTime
442 442 return True
443 443 else:
444 444 self.lastTime = currentTime
445 445 return False
446 446
447 447 def run(self, dataOut, path, blocksPerFile=10, metadataList=None,
448 448 dataList=[], setType=None, description={}, **kwargs):
449 449
450 450 self.dataOut = dataOut
451 451 self.set_kwargs_obj(self.dataOut, **kwargs)
452 452 if not(self.isConfig):
453 453 self.setup(path=path, blocksPerFile=blocksPerFile,
454 454 metadataList=metadataList, dataList=dataList,
455 455 setType=setType, description=description, **kwargs)
456 456
457 457 self.isConfig = True
458 458 self.setNextFile()
459 459
460 460 self.putData()
461 461 return
462 462
463 463 def setNextFile(self):
464 464
465 465 ext = self.ext
466 466 path = self.path
467 467 setFile = self.setFile
468 468
469 469 timeTuple = time.localtime(self.dataOut.utctime)
470 470 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
471 471 fullpath = os.path.join(path, subfolder)
472 472
473 473 if os.path.exists(fullpath):
474 474 filesList = os.listdir(fullpath)
475 475 filesList = [k for k in filesList if k.startswith(self.optchar)]
476 476 if len(filesList) > 0:
477 477 filesList = sorted(filesList, key=str.lower)
478 478 filen = filesList[-1]
479 479 # el filename debera tener el siguiente formato
480 480 # 0 1234 567 89A BCDE (hex)
481 481 # x YYYY DDD SSS .ext
482 482 if isNumber(filen[8:11]):
483 483 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
484 484 else:
485 485 setFile = -1
486 486 else:
487 487 setFile = -1 #inicializo mi contador de seteo
488 488 else:
489 489 os.makedirs(fullpath)
490 490 setFile = -1 #inicializo mi contador de seteo
491 491
492 492 if self.setType is None:
493 493 setFile += 1
494 494 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
495 495 timeTuple.tm_year,
496 496 timeTuple.tm_yday,
497 497 setFile,
498 498 ext)
499 499 else:
500 500 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
501 501 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
502 502 timeTuple.tm_year,
503 503 timeTuple.tm_yday,
504 504 setFile,
505 505 ext)
506 506
507 507 self.filename = os.path.join(path, subfolder, file)
508 508
509 509 #Setting HDF5 File
510 510 self.fp = h5py.File(self.filename, 'w')
511 511 #write metadata
512 512 self.writeMetadata(self.fp)
513 513 #Write data
514 514 self.writeData(self.fp)
515 515
516 516 def getLabel(self, name, x=None):
517 517
518 518 if x is None:
519 519 if 'Data' in self.description:
520 520 data = self.description['Data']
521 521 if 'Metadata' in self.description:
522 522 data.update(self.description['Metadata'])
523 523 else:
524 524 data = self.description
525 525 if name in data:
526 526 if isinstance(data[name], str):
527 527 return data[name]
528 528 elif isinstance(data[name], list):
529 529 return None
530 530 elif isinstance(data[name], dict):
531 531 for key, value in data[name].items():
532 532 return key
533 533 return name
534 534 else:
535 535 if 'Metadata' in self.description:
536 536 meta = self.description['Metadata']
537 537 else:
538 538 meta = self.description
539 539 if name in meta:
540 540 if isinstance(meta[name], list):
541 541 return meta[name][x]
542 542 elif isinstance(meta[name], dict):
543 543 for key, value in meta[name].items():
544 544 return value[x]
545 545 if 'cspc' in name:
546 546 return 'pair{:02d}'.format(x)
547 547 else:
548 548 return 'channel{:02d}'.format(x)
549 549
550 550 def writeMetadata(self, fp):
551 551
552 552 if self.description:
553 553 if 'Metadata' in self.description:
554 554 grp = fp.create_group('Metadata')
555 555 else:
556 556 grp = fp
557 557 else:
558 558 grp = fp.create_group('Metadata')
559 559
560 560 for i in range(len(self.metadataList)):
561 561 if not hasattr(self.dataOut, self.metadataList[i]):
562 562 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
563 563 continue
564 564 value = getattr(self.dataOut, self.metadataList[i])
565 565 if isinstance(value, bool):
566 566 if value is True:
567 567 value = 1
568 568 else:
569 569 value = 0
570 570 grp.create_dataset(self.getLabel(self.metadataList[i]), data=value)
571 571 return
572 572
573 573 def writeData(self, fp):
574 574
575 575 if self.description:
576 576 if 'Data' in self.description:
577 577 grp = fp.create_group('Data')
578 578 else:
579 579 grp = fp
580 580 else:
581 581 grp = fp.create_group('Data')
582 582
583 583 dtsets = []
584 584 data = []
585 585
586 586 for dsInfo in self.dsList:
587 587 if dsInfo['nDim'] == 0:
588 588 ds = grp.create_dataset(
589 589 self.getLabel(dsInfo['variable']),
590 590 (self.blocksPerFile,),
591 591 chunks=True,
592 592 dtype=numpy.float64)
593 593 dtsets.append(ds)
594 594 data.append((dsInfo['variable'], -1))
595 595 else:
596 596 label = self.getLabel(dsInfo['variable'])
597 597 if label is not None:
598 598 sgrp = grp.create_group(label)
599 599 else:
600 600 sgrp = grp
601 601 for i in range(dsInfo['dsNumber']):
602 602 ds = sgrp.create_dataset(
603 603 self.getLabel(dsInfo['variable'], i),
604 604 (self.blocksPerFile,) + dsInfo['shape'][1:],
605 605 chunks=True,
606 606 dtype=dsInfo['dtype'])
607 607 dtsets.append(ds)
608 608 data.append((dsInfo['variable'], i))
609 609 fp.flush()
610 610
611 611 log.log('Creating file: {}'.format(fp.filename), self.name)
612 612
613 613 self.ds = dtsets
614 614 self.data = data
615 615 self.firsttime = True
616 616 self.blockIndex = 0
617 617 return
618 618
619 619 def putData(self):
620 620
621 621 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
622 622 self.closeFile()
623 623 self.setNextFile()
624 624
625 625 for i, ds in enumerate(self.ds):
626 626 attr, ch = self.data[i]
627 627 if ch == -1:
628 628 ds[self.blockIndex] = getattr(self.dataOut, attr)
629 629 else:
630 630 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
631 631
632 632 self.fp.flush()
633 633 self.blockIndex += 1
634 634 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
635 635
636 636 return
637 637
638 638 def closeFile(self):
639 639
640 640 if self.blockIndex != self.blocksPerFile:
641 641 for ds in self.ds:
642 642 ds.resize(self.blockIndex, axis=0)
643 643
644 644 if self.fp:
645 645 self.fp.flush()
646 646 self.fp.close()
647 647
648 648 def close(self):
649 649
650 650 self.closeFile()
General Comments 0
You need to be logged in to leave comments. Login now