##// END OF EJS Templates
update jroIO_param class HDFWriter
Alexander Valdez -
r1701:dc9cba07d600
parent child
Show More
@@ -1,639 +1,650
1 1 import os
2 2 import time
3 3 import datetime
4 4
5 5 import numpy
6 6 import h5py
7 7
8 8 import schainpy.admin
9 9 from schainpy.model.data.jrodata import *
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.io.jroIO_base import *
12 12 from schainpy.utils import log
13 13
14 14
15 15 class HDFReader(Reader, ProcessingUnit):
16 16 """Processing unit to read HDF5 format files
17 17
18 18 This unit reads HDF5 files created with `HDFWriter` operation contains
19 19 by default two groups Data and Metadata all variables would be saved as `dataOut`
20 20 attributes.
21 21 It is possible to read any HDF5 file by given the structure in the `description`
22 22 parameter, also you can add extra values to metadata with the parameter `extras`.
23 23
24 24 Parameters:
25 25 -----------
26 26 path : str
27 27 Path where files are located.
28 28 startDate : date
29 29 Start date of the files
30 30 endDate : list
31 31 End date of the files
32 32 startTime : time
33 33 Start time of the files
34 34 endTime : time
35 35 End time of the files
36 36 description : dict, optional
37 37 Dictionary with the description of the HDF5 file
38 38 extras : dict, optional
39 39 Dictionary with extra metadata to be be added to `dataOut`
40 40
41 41 Attention: Be carefull, add attribute utcoffset, in the last part of reader in order to work in Local Time without time problems.
42 42
43 43 -----------
44 44 utcoffset='-18000'
45 45
46 46
47 47 Examples
48 48 --------
49 49
50 50 desc = {
51 51 'Data': {
52 52 'data_output': ['u', 'v', 'w'],
53 53 'utctime': 'timestamps',
54 54 } ,
55 55 'Metadata': {
56 56 'heightList': 'heights'
57 57 }
58 58 }
59 59
60 60 desc = {
61 61 'Data': {
62 62 'data_output': 'winds',
63 63 'utctime': 'timestamps'
64 64 },
65 65 'Metadata': {
66 66 'heightList': 'heights'
67 67 }
68 68 }
69 69
70 70 extras = {
71 71 'timeZone': 300
72 72 }
73 73
74
75 74 reader = project.addReadUnit(
76 75 name='HDFReader',
77 76 path='/path/to/files',
78 77 startDate='2019/01/01',
79 78 endDate='2019/01/31',
80 79 startTime='00:00:00',
81 80 endTime='23:59:59',
82 81 utcoffset='-18000'
83 82 # description=json.dumps(desc),
84 83 # extras=json.dumps(extras),
85 84 )
86 85
87 86 """
88 87
89 88 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras']
90 89
91 90 def __init__(self):
92 91 ProcessingUnit.__init__(self)
93 92 self.dataOut = Parameters()
94 93 self.ext = ".hdf5"
95 94 self.optchar = "D"
96 95 self.meta = {}
97 96 self.data = {}
98 97 self.open_file = h5py.File
99 98 self.open_mode = 'r'
100 99 self.description = {}
101 100 self.extras = {}
102 101 self.filefmt = "*%Y%j***"
103 102 self.folderfmt = "*%Y%j"
104 103 self.utcoffset = 0
105 104
106 105 def setup(self, **kwargs):
106
107 107 self.set_kwargs(**kwargs)
108 108 if not self.ext.startswith('.'):
109 109 self.ext = '.{}'.format(self.ext)
110 110
111 111 if self.online:
112 112 log.log("Searching files in online mode...", self.name)
113 113
114 114 for nTries in range(self.nTries):
115 115 fullpath = self.searchFilesOnLine(self.path, self.startDate,
116 116 self.endDate, self.expLabel, self.ext, self.walk,
117 117 self.filefmt, self.folderfmt)
118 118 try:
119 119 fullpath = next(fullpath)
120 120 except:
121 121 fullpath = None
122 122
123 123 if fullpath:
124 124 break
125 125
126 126 log.warning(
127 127 'Waiting {} sec for a valid file in {}: try {} ...'.format(
128 128 self.delay, self.path, nTries + 1),
129 129 self.name)
130 130 time.sleep(self.delay)
131 131
132 132 if not(fullpath):
133 133 raise schainpy.admin.SchainError(
134 134 'There isn\'t any valid file in {}'.format(self.path))
135 135
136 136 pathname, filename = os.path.split(fullpath)
137 137 self.year = int(filename[1:5])
138 138 self.doy = int(filename[5:8])
139 139 self.set = int(filename[8:11]) - 1
140 140 else:
141 141 log.log("Searching files in {}".format(self.path), self.name)
142 142 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
143 143 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
144 144
145 145 self.setNextFile()
146 146
147 147 return
148 148
149 149 def readFirstHeader(self):
150 150 '''Read metadata and data'''
151 151
152 152 self.__readMetadata()
153 153 self.__readData()
154 154 self.__setBlockList()
155 155
156 156 if 'type' in self.meta:
157 157 self.dataOut = eval(self.meta['type'])()
158 158
159 159 for attr in self.meta:
160 160 setattr(self.dataOut, attr, self.meta[attr])
161 161
162 162 self.blockIndex = 0
163 163
164 164 return
165 165
166 166 def __setBlockList(self):
167 167 '''
168 168 Selects the data within the times defined
169 169
170 170 self.fp
171 171 self.startTime
172 172 self.endTime
173 173 self.blockList
174 174 self.blocksPerFile
175 175
176 176 '''
177 177
178 178 startTime = self.startTime
179 179 endTime = self.endTime
180 180 thisUtcTime = self.data['utctime'] + self.utcoffset
181
182 181 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
183 182 thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0])
184 183
185 184 thisDate = thisDatetime.date()
186 185 thisTime = thisDatetime.time()
187 186 startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
188 187 endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
189 188 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
190 189
191 190 self.blockList = ind
192 191 self.blocksPerFile = len(ind)
193 192
194 193 if len(ind)==0:
195 194 print("[Reading] Block No. %d/%d -> %s [Skipping]" % (self.blockIndex,
196 195 self.blocksPerFile,
197 196 thisDatetime))
198 197 self.setNextFile()
199 198
200 199 return
201 200
202 201 def __readMetadata(self):
203 202 '''
204 203 Reads Metadata
205 204 '''
206 205
207 206 meta = {}
208 207
209 208 if self.description:
210 209 for key, value in self.description['Metadata'].items():
211 210 meta[key] = self.fp[value][()]
212 211 else:
213 212 grp = self.fp['Metadata']
214 213 for name in grp:
215 214 meta[name] = grp[name][()]
216 215
217 216 if self.extras:
218 217 for key, value in self.extras.items():
219 218 meta[key] = value
220 219 self.meta = meta
221 220
222 221 return
223 222
224 223 def __readData(self):
225 224
226 225 data = {}
227 226
228 227 if self.description:
229 228 for key, value in self.description['Data'].items():
230 229 if isinstance(value, str):
231 230 if isinstance(self.fp[value], h5py.Dataset):
232 231 data[key] = self.fp[value][()]
233 232 elif isinstance(self.fp[value], h5py.Group):
234 233 array = []
235 234 for ch in self.fp[value]:
236 235 array.append(self.fp[value][ch][()])
237 236 data[key] = numpy.array(array)
238 237 elif isinstance(value, list):
239 238 array = []
240 239 for ch in value:
241 240 array.append(self.fp[ch][()])
242 241 data[key] = numpy.array(array)
243 242 else:
244 243 grp = self.fp['Data']
245 244 for name in grp:
246 245 if isinstance(grp[name], h5py.Dataset):
247 246 array = grp[name][()]
248 247 elif isinstance(grp[name], h5py.Group):
249 248 array = []
250 249 for ch in grp[name]:
251 250 array.append(grp[name][ch][()])
252 251 array = numpy.array(array)
253 252 else:
254 253 log.warning('Unknown type: {}'.format(name))
255 254
256 255 if name in self.description:
257 256 key = self.description[name]
258 257 else:
259 258 key = name
260 259 data[key] = array
261 260
262 261 self.data = data
263 262 return
264 263
265 264 def getData(self):
266 265
267 266 for attr in self.data:
268 267 if self.data[attr].ndim == 1:
269 268 setattr(self.dataOut, attr, self.data[attr][self.blockIndex])
270 269 else:
271 270 setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex])
272 271
273 272 self.dataOut.flagNoData = False
274 273 self.blockIndex += 1
275 274
276 275 log.log("Block No. {}/{} -> {}".format(
277 276 self.blockIndex,
278 277 self.blocksPerFile,
279 278 self.dataOut.datatime.ctime()), self.name)
280 279
281 280 return
282 281
283 282 def run(self, **kwargs):
284 283
285 284 if not(self.isConfig):
286 285 self.setup(**kwargs)
287 286 self.isConfig = True
288 287
289 288 if self.blockIndex == self.blocksPerFile:
290 289 self.setNextFile()
291 290
292 291 self.getData()
293 292
294 293 return
295 294
296 295 @MPDecorator
297 296 class HDFWriter(Operation):
298 297 """Operation to write HDF5 files.
299 298
300 299 The HDF5 file contains by default two groups Data and Metadata where
301 300 you can save any `dataOut` attribute specified by `dataList` and `metadataList`
302 301 parameters, data attributes are normaly time dependent where the metadata
303 302 are not.
304 303 It is possible to customize the structure of the HDF5 file with the
305 304 optional description parameter see the examples.
306 305
307 306 Parameters:
308 307 -----------
309 308 path : str
310 309 Path where files will be saved.
311 310 blocksPerFile : int
312 311 Number of blocks per file
313 312 metadataList : list
314 313 List of the dataOut attributes that will be saved as metadata
315 314 dataList : int
316 315 List of the dataOut attributes that will be saved as data
317 316 setType : bool
318 317 If True the name of the files corresponds to the timestamp of the data
319 318 description : dict, optional
320 319 Dictionary with the desired description of the HDF5 file
321 320
322 321 Examples
323 322 --------
324 323
325 324 desc = {
326 325 'data_output': {'winds': ['z', 'w', 'v']},
327 326 'utctime': 'timestamps',
328 327 'heightList': 'heights'
329 328 }
330 329 desc = {
331 330 'data_output': ['z', 'w', 'v'],
332 331 'utctime': 'timestamps',
333 332 'heightList': 'heights'
334 333 }
335 334 desc = {
336 335 'Data': {
337 336 'data_output': 'winds',
338 337 'utctime': 'timestamps'
339 338 },
340 339 'Metadata': {
341 340 'heightList': 'heights'
342 341 }
343 342 }
344 343
345 344 writer = proc_unit.addOperation(name='HDFWriter')
346 345 writer.addParameter(name='path', value='/path/to/file')
347 346 writer.addParameter(name='blocksPerFile', value='32')
348 347 writer.addParameter(name='metadataList', value='heightList,timeZone')
349 348 writer.addParameter(name='dataList',value='data_output,utctime')
350 349 # writer.addParameter(name='description',value=json.dumps(desc))
351 350
352 351 """
353 352
354 353 ext = ".hdf5"
355 354 optchar = "D"
356 355 filename = None
357 356 path = None
358 357 setFile = None
359 358 fp = None
360 359 firsttime = True
361 360 #Configurations
362 361 blocksPerFile = None
363 362 blockIndex = None
364 363 dataOut = None
365 364 #Data Arrays
366 365 dataList = None
367 366 metadataList = None
368 367 currentDay = None
369 368 lastTime = None
370 369
371 370 def __init__(self):
372 371
373 372 Operation.__init__(self)
374 373 return
375 374
376 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None, description=None):
375 def set_kwargs(self, **kwargs):
376
377 for key, value in kwargs.items():
378 setattr(self, key, value)
379
380 def set_kwargs_obj(self, obj, **kwargs):
381
382 for key, value in kwargs.items():
383 setattr(obj, key, value)
384
385 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None, description=None, **kwargs):
377 386 self.path = path
378 387 self.blocksPerFile = blocksPerFile
379 388 self.metadataList = metadataList
380 389 self.dataList = [s.strip() for s in dataList]
381 390 self.setType = setType
382 391 self.description = description
392 self.set_kwargs(**kwargs)
383 393
384 394 if self.metadataList is None:
385 395 self.metadataList = self.dataOut.metadata_list
386 396
387 397 tableList = []
388 398 dsList = []
389 399
390 400 for i in range(len(self.dataList)):
391 401 dsDict = {}
392 402 if hasattr(self.dataOut, self.dataList[i]):
393 403 dataAux = getattr(self.dataOut, self.dataList[i])
394 404 dsDict['variable'] = self.dataList[i]
395 405 else:
396 406 log.warning('Attribute {} not found in dataOut', self.name)
397 407 continue
398 408
399 409 if dataAux is None:
400 410 continue
401 411 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
402 412 dsDict['nDim'] = 0
403 413 else:
404 414 dsDict['nDim'] = len(dataAux.shape)
405 415 dsDict['shape'] = dataAux.shape
406 416 dsDict['dsNumber'] = dataAux.shape[0]
407 417 dsDict['dtype'] = dataAux.dtype
408 418
409 419 dsList.append(dsDict)
410 420
411 421 self.dsList = dsList
412 422 self.currentDay = self.dataOut.datatime.date()
413 423
414 424 def timeFlag(self):
415 425 currentTime = self.dataOut.utctime
416 426 timeTuple = time.localtime(currentTime)
417 427 dataDay = timeTuple.tm_yday
418 428
419 429 if self.lastTime is None:
420 430 self.lastTime = currentTime
421 431 self.currentDay = dataDay
422 432 return False
423 433
424 434 timeDiff = currentTime - self.lastTime
425 435
426 436 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
427 437 if dataDay != self.currentDay:
428 438 self.currentDay = dataDay
429 439 return True
430 440 elif timeDiff > 3*60*60:
431 441 self.lastTime = currentTime
432 442 return True
433 443 else:
434 444 self.lastTime = currentTime
435 445 return False
436 446
437 447 def run(self, dataOut, path, blocksPerFile=10, metadataList=None,
438 dataList=[], setType=None, description={}):
448 dataList=[], setType=None, description={}, **kwargs):
439 449
440 450 self.dataOut = dataOut
451 self.set_kwargs_obj(self.dataOut, **kwargs)
441 452 if not(self.isConfig):
442 453 self.setup(path=path, blocksPerFile=blocksPerFile,
443 454 metadataList=metadataList, dataList=dataList,
444 setType=setType, description=description)
455 setType=setType, description=description, **kwargs)
445 456
446 457 self.isConfig = True
447 458 self.setNextFile()
448 459
449 460 self.putData()
450 461 return
451 462
452 463 def setNextFile(self):
453 464
454 465 ext = self.ext
455 466 path = self.path
456 467 setFile = self.setFile
457 468
458 469 timeTuple = time.localtime(self.dataOut.utctime)
459 470 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
460 471 fullpath = os.path.join(path, subfolder)
461 472
462 473 if os.path.exists(fullpath):
463 474 filesList = os.listdir(fullpath)
464 475 filesList = [k for k in filesList if k.startswith(self.optchar)]
465 476 if len( filesList ) > 0:
466 477 filesList = sorted(filesList, key=str.lower)
467 478 filen = filesList[-1]
468 479 # el filename debera tener el siguiente formato
469 480 # 0 1234 567 89A BCDE (hex)
470 481 # x YYYY DDD SSS .ext
471 482 if isNumber(filen[8:11]):
472 483 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
473 484 else:
474 485 setFile = -1
475 486 else:
476 487 setFile = -1 #inicializo mi contador de seteo
477 488 else:
478 489 os.makedirs(fullpath)
479 490 setFile = -1 #inicializo mi contador de seteo
480 491
481 492 if self.setType is None:
482 493 setFile += 1
483 494 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
484 495 timeTuple.tm_year,
485 496 timeTuple.tm_yday,
486 497 setFile,
487 498 ext )
488 499 else:
489 500 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
490 501 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
491 502 timeTuple.tm_year,
492 503 timeTuple.tm_yday,
493 504 setFile,
494 505 ext )
495 506
496 507 self.filename = os.path.join( path, subfolder, file )
497 508
498 509 #Setting HDF5 File
499 510 self.fp = h5py.File(self.filename, 'w')
500 511 #write metadata
501 512 self.writeMetadata(self.fp)
502 513 #Write data
503 514 self.writeData(self.fp)
504 515
505 516 def getLabel(self, name, x=None):
506 517
507 518 if x is None:
508 519 if 'Data' in self.description:
509 520 data = self.description['Data']
510 521 if 'Metadata' in self.description:
511 522 data.update(self.description['Metadata'])
512 523 else:
513 524 data = self.description
514 525 if name in data:
515 526 if isinstance(data[name], str):
516 527 return data[name]
517 528 elif isinstance(data[name], list):
518 529 return None
519 530 elif isinstance(data[name], dict):
520 531 for key, value in data[name].items():
521 532 return key
522 533 return name
523 534 else:
524 535 if 'Metadata' in self.description:
525 536 meta = self.description['Metadata']
526 537 else:
527 538 meta = self.description
528 539 if name in meta:
529 540 if isinstance(meta[name], list):
530 541 return meta[name][x]
531 542 elif isinstance(meta[name], dict):
532 543 for key, value in meta[name].items():
533 544 return value[x]
534 545 if 'cspc' in name:
535 546 return 'pair{:02d}'.format(x)
536 547 else:
537 548 return 'channel{:02d}'.format(x)
538 549
539 550 def writeMetadata(self, fp):
540 551
541 552 if self.description:
542 553 if 'Metadata' in self.description:
543 554 grp = fp.create_group('Metadata')
544 555 else:
545 556 grp = fp
546 557 else:
547 558 grp = fp.create_group('Metadata')
548 559
549 560 for i in range(len(self.metadataList)):
550 561 if not hasattr(self.dataOut, self.metadataList[i]):
551 562 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
552 563 continue
553 564 value = getattr(self.dataOut, self.metadataList[i])
554 565 if isinstance(value, bool):
555 566 if value is True:
556 567 value = 1
557 568 else:
558 569 value = 0
559 570 grp.create_dataset(self.getLabel(self.metadataList[i]), data=value)
560 571 return
561 572
562 573 def writeData(self, fp):
563 574
564 575 if self.description:
565 576 if 'Data' in self.description:
566 577 grp = fp.create_group('Data')
567 578 else:
568 579 grp = fp
569 580 else:
570 581 grp = fp.create_group('Data')
571 582
572 583 dtsets = []
573 584 data = []
574 585
575 586 for dsInfo in self.dsList:
576 587 if dsInfo['nDim'] == 0:
577 588 ds = grp.create_dataset(
578 589 self.getLabel(dsInfo['variable']),
579 590 (self.blocksPerFile, ),
580 591 chunks=True,
581 592 dtype=numpy.float64)
582 593 dtsets.append(ds)
583 594 data.append((dsInfo['variable'], -1))
584 595 else:
585 596 label = self.getLabel(dsInfo['variable'])
586 597 if label is not None:
587 598 sgrp = grp.create_group(label)
588 599 else:
589 600 sgrp = grp
590 601 for i in range(dsInfo['dsNumber']):
591 602 ds = sgrp.create_dataset(
592 603 self.getLabel(dsInfo['variable'], i),
593 604 (self.blocksPerFile, ) + dsInfo['shape'][1:],
594 605 chunks=True,
595 606 dtype=dsInfo['dtype'])
596 607 dtsets.append(ds)
597 608 data.append((dsInfo['variable'], i))
598 609 fp.flush()
599 610
600 611 log.log('Creating file: {}'.format(fp.filename), self.name)
601 612
602 613 self.ds = dtsets
603 614 self.data = data
604 615 self.firsttime = True
605 616 self.blockIndex = 0
606 617 return
607 618
608 619 def putData(self):
609 620
610 621 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
611 622 self.closeFile()
612 623 self.setNextFile()
613 624
614 625 for i, ds in enumerate(self.ds):
615 626 attr, ch = self.data[i]
616 627 if ch == -1:
617 628 ds[self.blockIndex] = getattr(self.dataOut, attr)
618 629 else:
619 630 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
620 631
621 632 self.fp.flush()
622 633 self.blockIndex += 1
623 634 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
624 635
625 636 return
626 637
627 638 def closeFile(self):
628 639
629 640 if self.blockIndex != self.blocksPerFile:
630 641 for ds in self.ds:
631 642 ds.resize(self.blockIndex, axis=0)
632 643
633 644 if self.fp:
634 645 self.fp.flush()
635 646 self.fp.close()
636 647
637 648 def close(self):
638 649
639 650 self.closeFile()
General Comments 0
You need to be logged in to leave comments. Login now