##// END OF EJS Templates
HDFWriter Fixed
rflores -
r1710:906133bef78e
parent child
Show More
@@ -1,638 +1,640
1 1 import os
2 2 import time
3 3 import datetime
4 4
5 5 import numpy
6 6 import h5py
7 7
8 8 import schainpy.admin
9 9 from schainpy.model.data.jrodata import *
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.io.jroIO_base import *
12 12 from schainpy.utils import log
13 13
14 14
15 15 class HDFReader(Reader, ProcessingUnit):
16 16 """Processing unit to read HDF5 format files
17 17
18 18 This unit reads HDF5 files created with `HDFWriter` operation contains
19 19 by default two groups Data and Metadata all variables would be saved as `dataOut`
20 20 attributes.
21 21 It is possible to read any HDF5 file by given the structure in the `description`
22 22 parameter, also you can add extra values to metadata with the parameter `extras`.
23 23
24 24 Parameters:
25 25 -----------
26 26 path : str
27 27 Path where files are located.
28 28 startDate : date
29 29 Start date of the files
30 30 endDate : list
31 31 End date of the files
32 32 startTime : time
33 33 Start time of the files
34 34 endTime : time
35 35 End time of the files
36 36 description : dict, optional
37 37 Dictionary with the description of the HDF5 file
38 38 extras : dict, optional
39 39 Dictionary with extra metadata to be be added to `dataOut`
40 40
41 41 Examples
42 42 --------
43 43
44 44 desc = {
45 45 'Data': {
46 46 'data_output': ['u', 'v', 'w'],
47 47 'utctime': 'timestamps',
48 48 } ,
49 49 'Metadata': {
50 50 'heightList': 'heights'
51 51 }
52 52 }
53 53
54 54 desc = {
55 55 'Data': {
56 56 'data_output': 'winds',
57 57 'utctime': 'timestamps'
58 58 },
59 59 'Metadata': {
60 60 'heightList': 'heights'
61 61 }
62 62 }
63 63
64 64 extras = {
65 65 'timeZone': 300
66 66 }
67 67
68 68 reader = project.addReadUnit(
69 69 name='HDFReader',
70 70 path='/path/to/files',
71 71 startDate='2019/01/01',
72 72 endDate='2019/01/31',
73 73 startTime='00:00:00',
74 74 endTime='23:59:59',
75 75 # description=json.dumps(desc),
76 76 # extras=json.dumps(extras),
77 77 )
78 78
79 79 """
80 80
81 81 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras']
82 82
83 83 def __init__(self):
84 84 ProcessingUnit.__init__(self)
85 85 self.dataOut = Parameters()
86 86 self.ext = ".hdf5"
87 87 self.optchar = "D"
88 88 self.meta = {}
89 89 self.data = {}
90 90 self.open_file = h5py.File
91 91 self.open_mode = 'r'
92 92 self.description = {}
93 93 self.extras = {}
94 94 self.filefmt = "*%Y%j***"
95 95 self.folderfmt = "*%Y%j"
96 96 self.utcoffset = 0
97 97
98 98 def setup(self, **kwargs):
99 99
100 100 self.set_kwargs(**kwargs)
101 101 if not self.ext.startswith('.'):
102 102 self.ext = '.{}'.format(self.ext)
103 103
104 104 if self.online:
105 105 log.log("Searching files in online mode...", self.name)
106 106
107 107 for nTries in range(self.nTries):
108 108 fullpath = self.searchFilesOnLine(self.path, self.startDate,
109 109 self.endDate, self.expLabel, self.ext, self.walk,
110 110 self.filefmt, self.folderfmt)
111 111 try:
112 112 fullpath = next(fullpath)
113 113 except:
114 114 fullpath = None
115 115
116 116 if fullpath:
117 117 break
118 118
119 119 log.warning(
120 120 'Waiting {} sec for a valid file in {}: try {} ...'.format(
121 121 self.delay, self.path, nTries + 1),
122 122 self.name)
123 123 time.sleep(self.delay)
124 124
125 125 if not(fullpath):
126 126 raise schainpy.admin.SchainError(
127 127 'There isn\'t any valid file in {}'.format(self.path))
128 128
129 129 pathname, filename = os.path.split(fullpath)
130 130 self.year = int(filename[1:5])
131 131 self.doy = int(filename[5:8])
132 132 self.set = int(filename[8:11]) - 1
133 133 else:
134 134 log.log("Searching files in {}".format(self.path), self.name)
135 135 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
136 136 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
137 137
138 138 self.setNextFile()
139 139
140 140 return
141 141
142 142 def readFirstHeader(self):
143 143 '''Read metadata and data'''
144 144
145 145 self.__readMetadata()
146 146 self.__readData()
147 147 self.__setBlockList()
148 148
149 149 if 'type' in self.meta:
150 150 self.dataOut = eval(self.meta['type'])()
151 151
152 152 for attr in self.meta:
153 153 setattr(self.dataOut, attr, self.meta[attr])
154 154
155 155 self.blockIndex = 0
156 156
157 157 return
158 158
159 159 def __setBlockList(self):
160 160 '''
161 161 Selects the data within the times defined
162 162
163 163 self.fp
164 164 self.startTime
165 165 self.endTime
166 166 self.blockList
167 167 self.blocksPerFile
168 168
169 169 '''
170 170
171 171 startTime = self.startTime
172 172 endTime = self.endTime
173 173 thisUtcTime = self.data['utctime'] + self.utcoffset
174 174 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
175 175 thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0])
176 176
177 177 thisDate = thisDatetime.date()
178 178 thisTime = thisDatetime.time()
179 179
180 180 startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
181 181 endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
182 182
183 183 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
184 184
185 185 self.blockList = ind
186 186 self.blocksPerFile = len(ind)
187 187 return
188 188
189 189 def __readMetadata(self):
190 190 '''
191 191 Reads Metadata
192 192 '''
193 193
194 194 meta = {}
195 195
196 196 if self.description:
197 197 for key, value in self.description['Metadata'].items():
198 198 meta[key] = self.fp[value][()]
199 199 else:
200 200 grp = self.fp['Metadata']
201 201 for name in grp:
202 202 meta[name] = grp[name][()]
203 203
204 204 if self.extras:
205 205 for key, value in self.extras.items():
206 206 meta[key] = value
207 207 self.meta = meta
208 208
209 209 return
210 210
211 211 def __readData(self):
212 212
213 213 data = {}
214 214
215 215 if self.description:
216 216 for key, value in self.description['Data'].items():
217 217 if isinstance(value, str):
218 218 if isinstance(self.fp[value], h5py.Dataset):
219 219 data[key] = self.fp[value][()]
220 220 elif isinstance(self.fp[value], h5py.Group):
221 221 array = []
222 222 for ch in self.fp[value]:
223 223 array.append(self.fp[value][ch][()])
224 224 data[key] = numpy.array(array)
225 225 elif isinstance(value, list):
226 226 array = []
227 227 for ch in value:
228 228 array.append(self.fp[ch][()])
229 229 data[key] = numpy.array(array)
230 230 else:
231 231 grp = self.fp['Data']
232 232 for name in grp:
233 233 if isinstance(grp[name], h5py.Dataset):
234 234 array = grp[name][()]
235 235 elif isinstance(grp[name], h5py.Group):
236 236 array = []
237 237 for ch in grp[name]:
238 238 array.append(grp[name][ch][()])
239 239 array = numpy.array(array)
240 240 else:
241 241 log.warning('Unknown type: {}'.format(name))
242 242
243 243 if name in self.description:
244 244 key = self.description[name]
245 245 else:
246 246 key = name
247 247 data[key] = array
248 248
249 249 self.data = data
250 250 return
251 251
252 252 def getData(self):
253 253
254 254 for attr in self.data:
255 255 if self.data[attr].ndim == 1:
256 256 setattr(self.dataOut, attr, self.data[attr][self.blockIndex])
257 257 else:
258 258 setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex])
259 259
260 260 self.dataOut.flagNoData = False
261 261 self.blockIndex += 1
262 262
263 263 log.log("Block No. {}/{} -> {}".format(
264 264 self.blockIndex,
265 265 self.blocksPerFile,
266 266 self.dataOut.datatime.ctime()), self.name)
267 267
268 268 return
269 269
270 270 def run(self, **kwargs):
271 271
272 272 if not(self.isConfig):
273 273 self.setup(**kwargs)
274 274 self.isConfig = True
275 275
276 276 if self.blockIndex == self.blocksPerFile:
277 277 self.setNextFile()
278 278
279 279 self.getData()
280 280
281 281 return
282 282
283 283 @MPDecorator
284 284 class HDFWriter(Operation):
285 285 """Operation to write HDF5 files.
286 286
287 287 The HDF5 file contains by default two groups Data and Metadata where
288 288 you can save any `dataOut` attribute specified by `dataList` and `metadataList`
289 289 parameters, data attributes are normaly time dependent where the metadata
290 290 are not.
291 291 It is possible to customize the structure of the HDF5 file with the
292 292 optional description parameter see the examples.
293 293
294 294 Parameters:
295 295 -----------
296 296 path : str
297 297 Path where files will be saved.
298 298 blocksPerFile : int
299 299 Number of blocks per file
300 300 metadataList : list
301 301 List of the dataOut attributes that will be saved as metadata
302 302 dataList : int
303 303 List of the dataOut attributes that will be saved as data
304 304 setType : bool
305 305 If True the name of the files corresponds to the timestamp of the data
306 306 description : dict, optional
307 307 Dictionary with the desired description of the HDF5 file
308 308
309 309 Examples
310 310 --------
311 311
312 312 desc = {
313 313 'data_output': {'winds': ['z', 'w', 'v']},
314 314 'utctime': 'timestamps',
315 315 'heightList': 'heights'
316 316 }
317 317 desc = {
318 318 'data_output': ['z', 'w', 'v'],
319 319 'utctime': 'timestamps',
320 320 'heightList': 'heights'
321 321 }
322 322 desc = {
323 323 'Data': {
324 324 'data_output': 'winds',
325 325 'utctime': 'timestamps'
326 326 },
327 327 'Metadata': {
328 328 'heightList': 'heights'
329 329 }
330 330 }
331 331
332 332 writer = proc_unit.addOperation(name='HDFWriter')
333 333 writer.addParameter(name='path', value='/path/to/file')
334 334 writer.addParameter(name='blocksPerFile', value='32')
335 335 writer.addParameter(name='metadataList', value='heightList,timeZone')
336 336 writer.addParameter(name='dataList',value='data_output,utctime')
337 337 # writer.addParameter(name='description',value=json.dumps(desc))
338 338
339 339 """
340 340
341 341 ext = ".hdf5"
342 342 optchar = "D"
343 343 filename = None
344 344 path = None
345 345 setFile = None
346 346 fp = None
347 347 firsttime = True
348 348 #Configurations
349 349 blocksPerFile = None
350 350 blockIndex = None
351 351 dataOut = None
352 352 #Data Arrays
353 353 dataList = None
354 354 metadataList = None
355 355 currentDay = None
356 356 lastTime = None
357 357
358 358 def __init__(self):
359 359
360 360 Operation.__init__(self)
361 361 return
362 362
363 363 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None, description=None, uniqueChannel=False):
364 364 self.path = path
365 365 self.blocksPerFile = blocksPerFile
366 366 self.metadataList = metadataList
367 367 self.dataList = [s.strip() for s in dataList]
368 368 self.setType = setType
369 369 self.description = description
370 370 self.uniqueChannel = uniqueChannel
371 371
372 372 if self.metadataList is None:
373 373 self.metadataList = self.dataOut.metadata_list
374 374
375 375 tableList = []
376 376 dsList = []
377 377
378 378 for i in range(len(self.dataList)):
379 379 dsDict = {}
380 380 if hasattr(self.dataOut, self.dataList[i]):
381 381 dataAux = getattr(self.dataOut, self.dataList[i])
382 382 dsDict['variable'] = self.dataList[i]
383 383 else:
384 384 log.warning('Attribute {} not found in dataOut', self.name)
385 385 continue
386 386
387 387 if dataAux is None:
388 388 continue
389 389 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
390 390 dsDict['nDim'] = 0
391 391 else:
392 392 if uniqueChannel: #Creates extra dimension to avoid the creation of multiple channels
393 #dataAux = numpy.expand_dims(dataAux, axis=0)
394 setattr(self.dataOut, self.dataList[i], numpy.expand_dims(getattr(self.dataOut, self.dataList[i]), axis=0))
395 dataAux = getattr(self.dataOut, self.dataList[i])
393 dataAux = numpy.expand_dims(dataAux, axis=0)
394 #setattr(self.dataOut, self.dataList[i], numpy.expand_dims(getattr(self.dataOut, self.dataList[i]), axis=0))
395 #dataAux = getattr(self.dataOut, self.dataList[i])
396 396 #print(getattr(self.dataOut, self.dataList[i]))
397 397 dsDict['nDim'] = len(dataAux.shape)
398 398 dsDict['shape'] = dataAux.shape
399 399 dsDict['dsNumber'] = dataAux.shape[0]
400 400 dsDict['dtype'] = dataAux.dtype
401 401
402 402 dsList.append(dsDict)
403 403
404 404 self.dsList = dsList
405 405 self.currentDay = self.dataOut.datatime.date()
406 406
407 407 def timeFlag(self):
408 408 currentTime = self.dataOut.utctime
409 409 timeTuple = time.localtime(currentTime)
410 410 dataDay = timeTuple.tm_yday
411 411
412 412 if self.lastTime is None:
413 413 self.lastTime = currentTime
414 414 self.currentDay = dataDay
415 415 return False
416 416
417 417 timeDiff = currentTime - self.lastTime
418 418
419 419 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
420 420 if dataDay != self.currentDay:
421 421 self.currentDay = dataDay
422 422 return True
423 423 elif timeDiff > 3*60*60:
424 424 self.lastTime = currentTime
425 425 return True
426 426 else:
427 427 self.lastTime = currentTime
428 428 return False
429 429
430 430 def run(self, dataOut, path, blocksPerFile=10, metadataList=None,
431 431 dataList=[], setType=None, description={}, uniqueChannel= False):
432 432
433 433 self.dataOut = dataOut
434 434 if not(self.isConfig):
435 435 self.setup(path=path, blocksPerFile=blocksPerFile,
436 436 metadataList=metadataList, dataList=dataList,
437 437 setType=setType, description=description, uniqueChannel=uniqueChannel)
438 438
439 439 self.isConfig = True
440 440 self.setNextFile()
441 441
442 442 self.putData()
443 443
444 444 return
445 445
446 446 def setNextFile(self):
447 447
448 448 ext = self.ext
449 449 path = self.path
450 450 setFile = self.setFile
451 451
452 452 timeTuple = time.localtime(self.dataOut.utctime)
453 453 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
454 454 fullpath = os.path.join(path, subfolder)
455 455
456 456 if os.path.exists(fullpath):
457 457 filesList = os.listdir(fullpath)
458 458 filesList = [k for k in filesList if k.startswith(self.optchar)]
459 459 if len( filesList ) > 0:
460 460 filesList = sorted(filesList, key=str.lower)
461 461 filen = filesList[-1]
462 462 # el filename debera tener el siguiente formato
463 463 # 0 1234 567 89A BCDE (hex)
464 464 # x YYYY DDD SSS .ext
465 465 if isNumber(filen[8:11]):
466 466 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
467 467 else:
468 468 setFile = -1
469 469 else:
470 470 setFile = -1 #inicializo mi contador de seteo
471 471 else:
472 472 os.makedirs(fullpath)
473 473 setFile = -1 #inicializo mi contador de seteo
474 474
475 475 if self.setType is None:
476 476 setFile += 1
477 477 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
478 478 timeTuple.tm_year,
479 479 timeTuple.tm_yday,
480 480 setFile,
481 481 ext )
482 482 else:
483 483 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
484 484 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
485 485 timeTuple.tm_year,
486 486 timeTuple.tm_yday,
487 487 setFile,
488 488 ext )
489 489
490 490 self.filename = os.path.join( path, subfolder, file )
491 491
492 492 #Setting HDF5 File
493 493 self.fp = h5py.File(self.filename, 'w')
494 494 #write metadata
495 495 self.writeMetadata(self.fp)
496 496 #Write data
497 497 self.writeData(self.fp)
498 498
499 499 def getLabel(self, name, x=None):
500 500 #print("x: ", x)
501 501 if x is None:
502 502 if 'Data' in self.description:
503 503 data = self.description['Data']
504 504 if 'Metadata' in self.description:
505 505 data.update(self.description['Metadata'])
506 506 else:
507 507 data = self.description
508 508 if name in data:
509 509 if isinstance(data[name], str):
510 510 return data[name]
511 511 elif isinstance(data[name], list):
512 512 return None
513 513 elif isinstance(data[name], dict):
514 514 for key, value in data[name].items():
515 515 return key
516 516 return name
517 517 else:
518 518 if 'Metadata' in self.description:
519 519 meta = self.description['Metadata']
520 520 else:
521 521 meta = self.description
522 522 if name in meta:
523 523 if isinstance(meta[name], list):
524 524 return meta[name][x]
525 525 elif isinstance(meta[name], dict):
526 526 for key, value in meta[name].items():
527 527 return value[x]
528 528 if 'cspc' in name:
529 529 return 'pair{:02d}'.format(x)
530 530 else:
531 531 return 'channel{:02d}'.format(x)
532 532
533 533 def writeMetadata(self, fp):
534 534
535 535 if self.description:
536 536 if 'Metadata' in self.description:
537 537 grp = fp.create_group('Metadata')
538 538 else:
539 539 grp = fp
540 540 else:
541 541 grp = fp.create_group('Metadata')
542 542
543 543 for i in range(len(self.metadataList)):
544 544 if not hasattr(self.dataOut, self.metadataList[i]):
545 545 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
546 546 continue
547 547 value = getattr(self.dataOut, self.metadataList[i])
548 548 if isinstance(value, bool):
549 549 if value is True:
550 550 value = 1
551 551 else:
552 552 value = 0
553 553 grp.create_dataset(self.getLabel(self.metadataList[i]), data=value)
554 554 return
555 555
556 556 def writeData(self, fp):
557 557
558 558 if self.description:
559 559 if 'Data' in self.description:
560 560 grp = fp.create_group('Data')
561 561 else:
562 562 grp = fp
563 563 else:
564 564 grp = fp.create_group('Data')
565 565
566 566 dtsets = []
567 567 data = []
568 568 #print("self.dsList: ", self.dsList)
569 569 for dsInfo in self.dsList:
570 570 if dsInfo['nDim'] == 0:
571 571 ds = grp.create_dataset(
572 572 self.getLabel(dsInfo['variable']),
573 573 (self.blocksPerFile, ),
574 574 chunks=True,
575 575 dtype=numpy.float64)
576 576 dtsets.append(ds)
577 577 data.append((dsInfo['variable'], -1))
578 578 else:
579 579 label = self.getLabel(dsInfo['variable'])
580 580 if label is not None:
581 581 sgrp = grp.create_group(label)
582 582 else:
583 583 sgrp = grp
584 if self.uniqueChannel: #Creates extra dimension to avoid the creation of multiple channels
585 setattr(self.dataOut, dsInfo['variable'], numpy.expand_dims(getattr(self.dataOut, dsInfo['variable']), axis=0))
584 586 for i in range(dsInfo['dsNumber']):
585 587 ds = sgrp.create_dataset(
586 588 self.getLabel(dsInfo['variable'], i),
587 589 (self.blocksPerFile, ) + dsInfo['shape'][1:],
588 590 chunks=True,
589 591 dtype=dsInfo['dtype'])
590 592 dtsets.append(ds)
591 593 data.append((dsInfo['variable'], i))
592 594
593 595 fp.flush()
594 596
595 597 log.log('Creating file: {}'.format(fp.filename), self.name)
596 598
597 599 self.ds = dtsets
598 600 self.data = data
599 601 self.firsttime = True
600 602 self.blockIndex = 0
601 603 return
602 604
603 605 def putData(self):
604 606
605 607 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
606 608 self.closeFile()
607 609 self.setNextFile()
608 610
609 611 for i, ds in enumerate(self.ds):
610 612 attr, ch = self.data[i]
611 613 if ch == -1:
612 614 ds[self.blockIndex] = getattr(self.dataOut, attr)
613 615 else:
614 616 if self.uniqueChannel and self.blockIndex != 0: #Creates extra dimension to avoid the creation of multiple channels
615 617 setattr(self.dataOut, attr, numpy.expand_dims(getattr(self.dataOut, attr), axis=0))
616 618 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
617 619 if self.uniqueChannel: #Deletes extra dimension created to avoid the creation of multiple channels
618 620 setattr(self.dataOut, attr, getattr(self.dataOut, attr)[0])
619 621
620 622 self.fp.flush()
621 623 self.blockIndex += 1
622 624 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
623 625
624 626 return
625 627
626 628 def closeFile(self):
627 629
628 630 if self.blockIndex != self.blocksPerFile:
629 631 for ds in self.ds:
630 632 ds.resize(self.blockIndex, axis=0)
631 633
632 634 if self.fp:
633 635 self.fp.flush()
634 636 self.fp.close()
635 637
636 638 def close(self):
637 639
638 640 self.closeFile()
General Comments 0
You need to be logged in to leave comments. Login now