##// END OF EJS Templates
comentario y recomendacion para usar utcoffset
Alexander Valdez -
r1692:93a8eb01fd3d
parent child
Show More
@@ -1,638 +1,640
1 1 import os
2 2 import time
3 3 import datetime
4 4
5 5 import numpy
6 6 import h5py
7 7
8 8 import schainpy.admin
9 9 from schainpy.model.data.jrodata import *
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.io.jroIO_base import *
12 12 from schainpy.utils import log
13 13
14 14
15 15 class HDFReader(Reader, ProcessingUnit):
16 16 """Processing unit to read HDF5 format files
17 17
18 18 This unit reads HDF5 files created with `HDFWriter` operation contains
19 19 by default two groups Data and Metadata all variables would be saved as `dataOut`
20 20 attributes.
21 21 It is possible to read any HDF5 file by given the structure in the `description`
22 22 parameter, also you can add extra values to metadata with the parameter `extras`.
23 23
24 24 Parameters:
25 25 -----------
26 26 path : str
27 27 Path where files are located.
28 28 startDate : date
29 29 Start date of the files
30 30 endDate : list
31 31 End date of the files
32 32 startTime : time
33 33 Start time of the files
34 34 endTime : time
35 35 End time of the files
36 36 description : dict, optional
37 37 Dictionary with the description of the HDF5 file
38 38 extras : dict, optional
39 39 Dictionary with extra metadata to be be added to `dataOut`
40
41 Attention: Be carefull, add attribute utcoffset, in the last part of reader in order to work in Local Time without time problems.
42
43 -----------
44 utcoffset='-18000'
45
40 46
41 47 Examples
42 48 --------
43 49
44 50 desc = {
45 51 'Data': {
46 52 'data_output': ['u', 'v', 'w'],
47 53 'utctime': 'timestamps',
48 54 } ,
49 55 'Metadata': {
50 56 'heightList': 'heights'
51 57 }
52 58 }
53 59
54 60 desc = {
55 61 'Data': {
56 62 'data_output': 'winds',
57 63 'utctime': 'timestamps'
58 64 },
59 65 'Metadata': {
60 66 'heightList': 'heights'
61 67 }
62 68 }
63 69
64 70 extras = {
65 71 'timeZone': 300
66 72 }
67 73
74
68 75 reader = project.addReadUnit(
69 76 name='HDFReader',
70 77 path='/path/to/files',
71 78 startDate='2019/01/01',
72 79 endDate='2019/01/31',
73 80 startTime='00:00:00',
74 81 endTime='23:59:59',
82 utcoffset='-18000'
75 83 # description=json.dumps(desc),
76 84 # extras=json.dumps(extras),
77 85 )
78 86
79 ATTENTION:
80 Add attribute:
81
82 utcoffset='-18000'
83 in the last part of reader in order to work in Local Time
84
85 87 """
86 88
87 89 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras']
88 90
89 91 def __init__(self):
90 92 ProcessingUnit.__init__(self)
91 93 self.dataOut = Parameters()
92 94 self.ext = ".hdf5"
93 95 self.optchar = "D"
94 96 self.meta = {}
95 97 self.data = {}
96 98 self.open_file = h5py.File
97 99 self.open_mode = 'r'
98 100 self.description = {}
99 101 self.extras = {}
100 102 self.filefmt = "*%Y%j***"
101 103 self.folderfmt = "*%Y%j"
102 104 self.utcoffset = 0
103 105
104 106 def setup(self, **kwargs):
105 107 self.set_kwargs(**kwargs)
106 108 if not self.ext.startswith('.'):
107 109 self.ext = '.{}'.format(self.ext)
108 110
109 111 if self.online:
110 112 log.log("Searching files in online mode...", self.name)
111 113
112 114 for nTries in range(self.nTries):
113 115 fullpath = self.searchFilesOnLine(self.path, self.startDate,
114 116 self.endDate, self.expLabel, self.ext, self.walk,
115 117 self.filefmt, self.folderfmt)
116 118 try:
117 119 fullpath = next(fullpath)
118 120 except:
119 121 fullpath = None
120 122
121 123 if fullpath:
122 124 break
123 125
124 126 log.warning(
125 127 'Waiting {} sec for a valid file in {}: try {} ...'.format(
126 128 self.delay, self.path, nTries + 1),
127 129 self.name)
128 130 time.sleep(self.delay)
129 131
130 132 if not(fullpath):
131 133 raise schainpy.admin.SchainError(
132 134 'There isn\'t any valid file in {}'.format(self.path))
133 135
134 136 pathname, filename = os.path.split(fullpath)
135 137 self.year = int(filename[1:5])
136 138 self.doy = int(filename[5:8])
137 139 self.set = int(filename[8:11]) - 1
138 140 else:
139 141 log.log("Searching files in {}".format(self.path), self.name)
140 142 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
141 143 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
142 144
143 145 self.setNextFile()
144 146
145 147 return
146 148
147 149 def readFirstHeader(self):
148 150 '''Read metadata and data'''
149 151
150 152 self.__readMetadata()
151 153 self.__readData()
152 154 self.__setBlockList()
153 155
154 156 if 'type' in self.meta:
155 157 self.dataOut = eval(self.meta['type'])()
156 158
157 159 for attr in self.meta:
158 160 setattr(self.dataOut, attr, self.meta[attr])
159 161
160 162 self.blockIndex = 0
161 163
162 164 return
163 165
164 166 def __setBlockList(self):
165 167 '''
166 168 Selects the data within the times defined
167 169
168 170 self.fp
169 171 self.startTime
170 172 self.endTime
171 173 self.blockList
172 174 self.blocksPerFile
173 175
174 176 '''
175 177
176 178 startTime = self.startTime
177 179 endTime = self.endTime
178 180 thisUtcTime = self.data['utctime'] + self.utcoffset
179 181
180 182 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
181 183 thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0])
182 184
183 185 thisDate = thisDatetime.date()
184 186 thisTime = thisDatetime.time()
185 187 startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
186 188 endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
187 189 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
188 190
189 191 self.blockList = ind
190 192 self.blocksPerFile = len(ind)
191 193
192 194 if len(ind)==0:
193 195 print("[Reading] Block No. %d/%d -> %s [Skipping]" % (self.blockIndex,
194 196 self.blocksPerFile,
195 197 thisDatetime))
196 198 self.setNextFile()
197 199 self.readFirstHeader()
198 200
199 201 return
200 202
201 203 def __readMetadata(self):
202 204 '''
203 205 Reads Metadata
204 206 '''
205 207
206 208 meta = {}
207 209
208 210 if self.description:
209 211 for key, value in self.description['Metadata'].items():
210 212 meta[key] = self.fp[value][()]
211 213 else:
212 214 grp = self.fp['Metadata']
213 215 for name in grp:
214 216 meta[name] = grp[name][()]
215 217
216 218 if self.extras:
217 219 for key, value in self.extras.items():
218 220 meta[key] = value
219 221 self.meta = meta
220 222
221 223 return
222 224
223 225 def __readData(self):
224 226
225 227 data = {}
226 228
227 229 if self.description:
228 230 for key, value in self.description['Data'].items():
229 231 if isinstance(value, str):
230 232 if isinstance(self.fp[value], h5py.Dataset):
231 233 data[key] = self.fp[value][()]
232 234 elif isinstance(self.fp[value], h5py.Group):
233 235 array = []
234 236 for ch in self.fp[value]:
235 237 array.append(self.fp[value][ch][()])
236 238 data[key] = numpy.array(array)
237 239 elif isinstance(value, list):
238 240 array = []
239 241 for ch in value:
240 242 array.append(self.fp[ch][()])
241 243 data[key] = numpy.array(array)
242 244 else:
243 245 grp = self.fp['Data']
244 246 for name in grp:
245 247 if isinstance(grp[name], h5py.Dataset):
246 248 array = grp[name][()]
247 249 elif isinstance(grp[name], h5py.Group):
248 250 array = []
249 251 for ch in grp[name]:
250 252 array.append(grp[name][ch][()])
251 253 array = numpy.array(array)
252 254 else:
253 255 log.warning('Unknown type: {}'.format(name))
254 256
255 257 if name in self.description:
256 258 key = self.description[name]
257 259 else:
258 260 key = name
259 261 data[key] = array
260 262
261 263 self.data = data
262 264 return
263 265
264 266 def getData(self):
265 267
266 268 for attr in self.data:
267 269 if self.data[attr].ndim == 1:
268 270 setattr(self.dataOut, attr, self.data[attr][self.blockIndex])
269 271 else:
270 272 setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex])
271 273
272 274 self.dataOut.flagNoData = False
273 275 self.blockIndex += 1
274 276
275 277 log.log("Block No. {}/{} -> {}".format(
276 278 self.blockIndex,
277 279 self.blocksPerFile,
278 280 self.dataOut.datatime.ctime()), self.name)
279 281
280 282 return
281 283
282 284 def run(self, **kwargs):
283 285
284 286 if not(self.isConfig):
285 287 self.setup(**kwargs)
286 288 self.isConfig = True
287 289
288 290 if self.blockIndex == self.blocksPerFile:
289 291 self.setNextFile()
290 292
291 293 self.getData()
292 294
293 295 return
294 296
295 297 @MPDecorator
296 298 class HDFWriter(Operation):
297 299 """Operation to write HDF5 files.
298 300
299 301 The HDF5 file contains by default two groups Data and Metadata where
300 302 you can save any `dataOut` attribute specified by `dataList` and `metadataList`
301 303 parameters, data attributes are normaly time dependent where the metadata
302 304 are not.
303 305 It is possible to customize the structure of the HDF5 file with the
304 306 optional description parameter see the examples.
305 307
306 308 Parameters:
307 309 -----------
308 310 path : str
309 311 Path where files will be saved.
310 312 blocksPerFile : int
311 313 Number of blocks per file
312 314 metadataList : list
313 315 List of the dataOut attributes that will be saved as metadata
314 316 dataList : int
315 317 List of the dataOut attributes that will be saved as data
316 318 setType : bool
317 319 If True the name of the files corresponds to the timestamp of the data
318 320 description : dict, optional
319 321 Dictionary with the desired description of the HDF5 file
320 322
321 323 Examples
322 324 --------
323 325
324 326 desc = {
325 327 'data_output': {'winds': ['z', 'w', 'v']},
326 328 'utctime': 'timestamps',
327 329 'heightList': 'heights'
328 330 }
329 331 desc = {
330 332 'data_output': ['z', 'w', 'v'],
331 333 'utctime': 'timestamps',
332 334 'heightList': 'heights'
333 335 }
334 336 desc = {
335 337 'Data': {
336 338 'data_output': 'winds',
337 339 'utctime': 'timestamps'
338 340 },
339 341 'Metadata': {
340 342 'heightList': 'heights'
341 343 }
342 344 }
343 345
344 346 writer = proc_unit.addOperation(name='HDFWriter')
345 347 writer.addParameter(name='path', value='/path/to/file')
346 348 writer.addParameter(name='blocksPerFile', value='32')
347 349 writer.addParameter(name='metadataList', value='heightList,timeZone')
348 350 writer.addParameter(name='dataList',value='data_output,utctime')
349 351 # writer.addParameter(name='description',value=json.dumps(desc))
350 352
351 353 """
352 354
353 355 ext = ".hdf5"
354 356 optchar = "D"
355 357 filename = None
356 358 path = None
357 359 setFile = None
358 360 fp = None
359 361 firsttime = True
360 362 #Configurations
361 363 blocksPerFile = None
362 364 blockIndex = None
363 365 dataOut = None
364 366 #Data Arrays
365 367 dataList = None
366 368 metadataList = None
367 369 currentDay = None
368 370 lastTime = None
369 371
370 372 def __init__(self):
371 373
372 374 Operation.__init__(self)
373 375 return
374 376
375 377 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None, description=None):
376 378 self.path = path
377 379 self.blocksPerFile = blocksPerFile
378 380 self.metadataList = metadataList
379 381 self.dataList = [s.strip() for s in dataList]
380 382 self.setType = setType
381 383 self.description = description
382 384
383 385 if self.metadataList is None:
384 386 self.metadataList = self.dataOut.metadata_list
385 387
386 388 tableList = []
387 389 dsList = []
388 390
389 391 for i in range(len(self.dataList)):
390 392 dsDict = {}
391 393 if hasattr(self.dataOut, self.dataList[i]):
392 394 dataAux = getattr(self.dataOut, self.dataList[i])
393 395 dsDict['variable'] = self.dataList[i]
394 396 else:
395 397 log.warning('Attribute {} not found in dataOut', self.name)
396 398 continue
397 399
398 400 if dataAux is None:
399 401 continue
400 402 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
401 403 dsDict['nDim'] = 0
402 404 else:
403 405 dsDict['nDim'] = len(dataAux.shape)
404 406 dsDict['shape'] = dataAux.shape
405 407 dsDict['dsNumber'] = dataAux.shape[0]
406 408 dsDict['dtype'] = dataAux.dtype
407 409
408 410 dsList.append(dsDict)
409 411
410 412 self.dsList = dsList
411 413 self.currentDay = self.dataOut.datatime.date()
412 414
413 415 def timeFlag(self):
414 416 currentTime = self.dataOut.utctime
415 417 timeTuple = time.localtime(currentTime)
416 418 dataDay = timeTuple.tm_yday
417 419
418 420 if self.lastTime is None:
419 421 self.lastTime = currentTime
420 422 self.currentDay = dataDay
421 423 return False
422 424
423 425 timeDiff = currentTime - self.lastTime
424 426
425 427 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
426 428 if dataDay != self.currentDay:
427 429 self.currentDay = dataDay
428 430 return True
429 431 elif timeDiff > 3*60*60:
430 432 self.lastTime = currentTime
431 433 return True
432 434 else:
433 435 self.lastTime = currentTime
434 436 return False
435 437
436 438 def run(self, dataOut, path, blocksPerFile=10, metadataList=None,
437 439 dataList=[], setType=None, description={}):
438 440
439 441 self.dataOut = dataOut
440 442 if not(self.isConfig):
441 443 self.setup(path=path, blocksPerFile=blocksPerFile,
442 444 metadataList=metadataList, dataList=dataList,
443 445 setType=setType, description=description)
444 446
445 447 self.isConfig = True
446 448 self.setNextFile()
447 449
448 450 self.putData()
449 451 return
450 452
451 453 def setNextFile(self):
452 454
453 455 ext = self.ext
454 456 path = self.path
455 457 setFile = self.setFile
456 458
457 459 timeTuple = time.localtime(self.dataOut.utctime)
458 460 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
459 461 fullpath = os.path.join(path, subfolder)
460 462
461 463 if os.path.exists(fullpath):
462 464 filesList = os.listdir(fullpath)
463 465 filesList = [k for k in filesList if k.startswith(self.optchar)]
464 466 if len( filesList ) > 0:
465 467 filesList = sorted(filesList, key=str.lower)
466 468 filen = filesList[-1]
467 469 # el filename debera tener el siguiente formato
468 470 # 0 1234 567 89A BCDE (hex)
469 471 # x YYYY DDD SSS .ext
470 472 if isNumber(filen[8:11]):
471 473 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
472 474 else:
473 475 setFile = -1
474 476 else:
475 477 setFile = -1 #inicializo mi contador de seteo
476 478 else:
477 479 os.makedirs(fullpath)
478 480 setFile = -1 #inicializo mi contador de seteo
479 481
480 482 if self.setType is None:
481 483 setFile += 1
482 484 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
483 485 timeTuple.tm_year,
484 486 timeTuple.tm_yday,
485 487 setFile,
486 488 ext )
487 489 else:
488 490 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
489 491 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
490 492 timeTuple.tm_year,
491 493 timeTuple.tm_yday,
492 494 setFile,
493 495 ext )
494 496
495 497 self.filename = os.path.join( path, subfolder, file )
496 498
497 499 #Setting HDF5 File
498 500 self.fp = h5py.File(self.filename, 'w')
499 501 #write metadata
500 502 self.writeMetadata(self.fp)
501 503 #Write data
502 504 self.writeData(self.fp)
503 505
504 506 def getLabel(self, name, x=None):
505 507
506 508 if x is None:
507 509 if 'Data' in self.description:
508 510 data = self.description['Data']
509 511 if 'Metadata' in self.description:
510 512 data.update(self.description['Metadata'])
511 513 else:
512 514 data = self.description
513 515 if name in data:
514 516 if isinstance(data[name], str):
515 517 return data[name]
516 518 elif isinstance(data[name], list):
517 519 return None
518 520 elif isinstance(data[name], dict):
519 521 for key, value in data[name].items():
520 522 return key
521 523 return name
522 524 else:
523 525 if 'Metadata' in self.description:
524 526 meta = self.description['Metadata']
525 527 else:
526 528 meta = self.description
527 529 if name in meta:
528 530 if isinstance(meta[name], list):
529 531 return meta[name][x]
530 532 elif isinstance(meta[name], dict):
531 533 for key, value in meta[name].items():
532 534 return value[x]
533 535 if 'cspc' in name:
534 536 return 'pair{:02d}'.format(x)
535 537 else:
536 538 return 'channel{:02d}'.format(x)
537 539
538 540 def writeMetadata(self, fp):
539 541
540 542 if self.description:
541 543 if 'Metadata' in self.description:
542 544 grp = fp.create_group('Metadata')
543 545 else:
544 546 grp = fp
545 547 else:
546 548 grp = fp.create_group('Metadata')
547 549
548 550 for i in range(len(self.metadataList)):
549 551 if not hasattr(self.dataOut, self.metadataList[i]):
550 552 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
551 553 continue
552 554 value = getattr(self.dataOut, self.metadataList[i])
553 555 if isinstance(value, bool):
554 556 if value is True:
555 557 value = 1
556 558 else:
557 559 value = 0
558 560 grp.create_dataset(self.getLabel(self.metadataList[i]), data=value)
559 561 return
560 562
561 563 def writeData(self, fp):
562 564
563 565 if self.description:
564 566 if 'Data' in self.description:
565 567 grp = fp.create_group('Data')
566 568 else:
567 569 grp = fp
568 570 else:
569 571 grp = fp.create_group('Data')
570 572
571 573 dtsets = []
572 574 data = []
573 575
574 576 for dsInfo in self.dsList:
575 577 if dsInfo['nDim'] == 0:
576 578 ds = grp.create_dataset(
577 579 self.getLabel(dsInfo['variable']),
578 580 (self.blocksPerFile, ),
579 581 chunks=True,
580 582 dtype=numpy.float64)
581 583 dtsets.append(ds)
582 584 data.append((dsInfo['variable'], -1))
583 585 else:
584 586 label = self.getLabel(dsInfo['variable'])
585 587 if label is not None:
586 588 sgrp = grp.create_group(label)
587 589 else:
588 590 sgrp = grp
589 591 for i in range(dsInfo['dsNumber']):
590 592 ds = sgrp.create_dataset(
591 593 self.getLabel(dsInfo['variable'], i),
592 594 (self.blocksPerFile, ) + dsInfo['shape'][1:],
593 595 chunks=True,
594 596 dtype=dsInfo['dtype'])
595 597 dtsets.append(ds)
596 598 data.append((dsInfo['variable'], i))
597 599 fp.flush()
598 600
599 601 log.log('Creating file: {}'.format(fp.filename), self.name)
600 602
601 603 self.ds = dtsets
602 604 self.data = data
603 605 self.firsttime = True
604 606 self.blockIndex = 0
605 607 return
606 608
607 609 def putData(self):
608 610
609 611 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
610 612 self.closeFile()
611 613 self.setNextFile()
612 614
613 615 for i, ds in enumerate(self.ds):
614 616 attr, ch = self.data[i]
615 617 if ch == -1:
616 618 ds[self.blockIndex] = getattr(self.dataOut, attr)
617 619 else:
618 620 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
619 621
620 622 self.fp.flush()
621 623 self.blockIndex += 1
622 624 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
623 625
624 626 return
625 627
626 628 def closeFile(self):
627 629
628 630 if self.blockIndex != self.blocksPerFile:
629 631 for ds in self.ds:
630 632 ds.resize(self.blockIndex, axis=0)
631 633
632 634 if self.fp:
633 635 self.fp.flush()
634 636 self.fp.close()
635 637
636 638 def close(self):
637 639
638 640 self.closeFile()
General Comments 0
You need to be logged in to leave comments. Login now