##// END OF EJS Templates
cambio a timeZone
joabAM -
r1402:2b5adcd8c887
parent child
Show More
@@ -1,674 +1,674
1 1 import os
2 2 import time
3 3 import datetime
4 4
5 5 import numpy
6 6 import h5py
7 7
8 8 import schainpy.admin
9 9 from schainpy.model.data.jrodata import *
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.io.jroIO_base import *
12 12 from schainpy.utils import log
13 13
14 14
15 15 class HDFReader(Reader, ProcessingUnit):
16 16 """Processing unit to read HDF5 format files
17 17
18 18 This unit reads HDF5 files created with `HDFWriter` operation contains
19 19 by default two groups Data and Metadata all variables would be saved as `dataOut`
20 20 attributes.
21 21 It is possible to read any HDF5 file by given the structure in the `description`
22 22 parameter, also you can add extra values to metadata with the parameter `extras`.
23 23
24 24 Parameters:
25 25 -----------
26 26 path : str
27 27 Path where files are located.
28 28 startDate : date
29 29 Start date of the files
30 30 endDate : list
31 31 End date of the files
32 32 startTime : time
33 33 Start time of the files
34 34 endTime : time
35 35 End time of the files
36 36 description : dict, optional
37 37 Dictionary with the description of the HDF5 file
38 38 extras : dict, optional
39 39 Dictionary with extra metadata to be be added to `dataOut`
40 40
41 41 Examples
42 42 --------
43 43
44 44 desc = {
45 45 'Data': {
46 46 'data_output': ['u', 'v', 'w'],
47 47 'utctime': 'timestamps',
48 48 } ,
49 49 'Metadata': {
50 50 'heightList': 'heights'
51 51 }
52 52 }
53 53
54 54 desc = {
55 55 'Data': {
56 56 'data_output': 'winds',
57 57 'utctime': 'timestamps'
58 58 },
59 59 'Metadata': {
60 60 'heightList': 'heights'
61 61 }
62 62 }
63 63
64 64 extras = {
65 65 'timeZone': 300
66 66 }
67 67
68 68 reader = project.addReadUnit(
69 69 name='HDFReader',
70 70 path='/path/to/files',
71 71 startDate='2019/01/01',
72 72 endDate='2019/01/31',
73 73 startTime='00:00:00',
74 74 endTime='23:59:59',
75 75 # description=json.dumps(desc),
76 76 # extras=json.dumps(extras),
77 77 )
78 78
79 79 """
80 80
81 81 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras']
82 82
83 83 def __init__(self):
84 84 ProcessingUnit.__init__(self)
85 85 self.dataOut = Parameters()
86 86 self.ext = ".hdf5"
87 87 self.optchar = "D"
88 88 self.meta = {}
89 89 self.data = {}
90 90 self.open_file = h5py.File
91 91 self.open_mode = 'r'
92 92 self.description = {}
93 93 self.extras = {}
94 94 self.filefmt = "*%Y%j***"
95 95 self.folderfmt = "*%Y%j"
96 96 self.utcoffset = 0
97 97
98 98 def setup(self, **kwargs):
99 99
100 100 self.set_kwargs(**kwargs)
101 101 if not self.ext.startswith('.'):
102 102 self.ext = '.{}'.format(self.ext)
103 103
104 104 if self.online:
105 105 log.log("Searching files in online mode...", self.name)
106 106
107 107 for nTries in range(self.nTries):
108 108 fullpath = self.searchFilesOnLine(self.path, self.startDate,
109 109 self.endDate, self.expLabel, self.ext, self.walk,
110 110 self.filefmt, self.folderfmt)
111 111 pathname, filename = os.path.split(fullpath)
112 112 #print(pathname,filename)
113 113 try:
114 114 fullpath = next(fullpath)
115 115
116 116 except:
117 117 fullpath = None
118 118
119 119 if fullpath:
120 120 break
121 121
122 122 log.warning(
123 123 'Waiting {} sec for a valid file in {}: try {} ...'.format(
124 124 self.delay, self.path, nTries + 1),
125 125 self.name)
126 126 time.sleep(self.delay)
127 127
128 128 if not(fullpath):
129 129 raise schainpy.admin.SchainError(
130 130 'There isn\'t any valid file in {}'.format(self.path))
131 131
132 132 pathname, filename = os.path.split(fullpath)
133 133 self.year = int(filename[1:5])
134 134 self.doy = int(filename[5:8])
135 135 self.set = int(filename[8:11]) - 1
136 136 else:
137 137 log.log("Searching files in {}".format(self.path), self.name)
138 138 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
139 139 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
140 140
141 141 self.setNextFile()
142 142
143 143 return
144 144
145 145
146 146 def readFirstHeader(self):
147 147 '''Read metadata and data'''
148 148
149 149 self.__readMetadata()
150 150 self.__readData()
151 151 self.__setBlockList()
152 152
153 153 if 'type' in self.meta:
154 154 ##print("Creting dataOut...")
155 155 self.dataOut = eval(self.meta['type'])()
156 156 ##print(vars(self.dataOut))
157 157
158 158 for attr in self.meta:
159 159 ##print("attr: ", attr)
160 160 ##print(type(self.dataOut).__name__)
161 161 setattr(self.dataOut, attr, self.meta[attr])
162 162
163 163 self.blockIndex = 0
164 164
165 165 return
166 166
167 167 def __setBlockList(self):
168 168 '''
169 169 Selects the data within the times defined
170 170
171 171 self.fp
172 172 self.startTime
173 173 self.endTime
174 174 self.blockList
175 175 self.blocksPerFile
176 176
177 177 '''
178 178
179 179 startTime = self.startTime
180 180 endTime = self.endTime
181 181 thisUtcTime = self.data['utctime'] + self.utcoffset
182 182 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
183 183 thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0])
184 184 self.startFileDatetime = thisDatetime
185 185 thisDate = thisDatetime.date()
186 186 thisTime = thisDatetime.time()
187 187
188 188 startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
189 189 endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
190 190
191 191 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
192 192
193 193 self.blockList = ind
194 194 self.blocksPerFile = len(ind)
195 195 self.blocksPerFile = len(thisUtcTime)
196 196 return
197 197
198 198 def __readMetadata(self):
199 199 '''
200 200 Reads Metadata
201 201 '''
202 202
203 203 meta = {}
204 204
205 205 if self.description:
206 206 for key, value in self.description['Metadata'].items():
207 207 meta[key] = self.fp[value][()]
208 208 else:
209 209 grp = self.fp['Metadata']
210 210 for name in grp:
211 211 meta[name] = grp[name][()]
212 212
213 213 if self.extras:
214 214 for key, value in self.extras.items():
215 215 meta[key] = value
216 216 self.meta = meta
217 217
218 218 return
219 219
220 220
221 221
222 222 def checkForRealPath(self, nextFile, nextDay):
223 223
224 224 # print("check FRP")
225 225 # dt = self.startFileDatetime + datetime.timedelta(1)
226 226 # filename = '{}.{}{}'.format(self.path, dt.strftime('%Y%m%d'), self.ext)
227 227 # fullfilename = os.path.join(self.path, filename)
228 228 # print("check Path ",fullfilename,filename)
229 229 # if os.path.exists(fullfilename):
230 230 # return fullfilename, filename
231 231 # return None, filename
232 232 return None,None
233 233
234 234 def __readData(self):
235 235
236 236 data = {}
237 237
238 238 if self.description:
239 239 for key, value in self.description['Data'].items():
240 240 if isinstance(value, str):
241 241 if isinstance(self.fp[value], h5py.Dataset):
242 242 data[key] = self.fp[value][()]
243 243 elif isinstance(self.fp[value], h5py.Group):
244 244 array = []
245 245 for ch in self.fp[value]:
246 246 array.append(self.fp[value][ch][()])
247 247 data[key] = numpy.array(array)
248 248 elif isinstance(value, list):
249 249 array = []
250 250 for ch in value:
251 251 array.append(self.fp[ch][()])
252 252 data[key] = numpy.array(array)
253 253 else:
254 254 grp = self.fp['Data']
255 255 for name in grp:
256 256 if isinstance(grp[name], h5py.Dataset):
257 257 array = grp[name][()]
258 258 elif isinstance(grp[name], h5py.Group):
259 259 array = []
260 260 for ch in grp[name]:
261 261 array.append(grp[name][ch][()])
262 262 array = numpy.array(array)
263 263 else:
264 264 log.warning('Unknown type: {}'.format(name))
265 265
266 266 if name in self.description:
267 267 key = self.description[name]
268 268 else:
269 269 key = name
270 270 data[key] = array
271 271
272 272 self.data = data
273 273 return
274 274
275 275 def getData(self):
276 276 if not self.isDateTimeInRange(self.startFileDatetime, self.startDate, self.endDate, self.startTime, self.endTime):
277 277 self.dataOut.flagNoData = True
278 278 self.blockIndex = self.blocksPerFile
279 279 #self.dataOut.error = True TERMINA EL PROGRAMA, removido
280 280 return
281 281 for attr in self.data:
282 282 #print("attr ",attr)
283 283 if self.data[attr].ndim == 1:
284 284 setattr(self.dataOut, attr, self.data[attr][self.blockIndex])
285 285 else:
286 286 setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex])
287 287
288 288
289 289 self.blockIndex += 1
290 290
291 291 if self.blockIndex == 1:
292 292 log.log("Block No. {}/{} -> {}".format(
293 293 self.blockIndex,
294 294 self.blocksPerFile,
295 295 self.dataOut.datatime.ctime()), self.name)
296 296 else:
297 297 log.log("Block No. {}/{} ".format(
298 298 self.blockIndex,
299 299 self.blocksPerFile),self.name)
300 300
301 301 self.dataOut.flagNoData = False
302 302 self.dataOut.error = False
303 303 return
304 304
305 305 def run(self, **kwargs):
306 306
307 307 if not(self.isConfig):
308 308 self.setup(**kwargs)
309 309 self.isConfig = True
310 310
311 311 if self.blockIndex == self.blocksPerFile:
312 312 self.setNextFile()
313 313
314 314 self.getData()
315 315
316 316 return
317 317
318 318 @MPDecorator
319 319 class HDFWriter(Operation):
320 320 """Operation to write HDF5 files.
321 321
322 322 The HDF5 file contains by default two groups Data and Metadata where
323 323 you can save any `dataOut` attribute specified by `dataList` and `metadataList`
324 324 parameters, data attributes are normaly time dependent where the metadata
325 325 are not.
326 326 It is possible to customize the structure of the HDF5 file with the
327 327 optional description parameter see the examples.
328 328
329 329 Parameters:
330 330 -----------
331 331 path : str
332 332 Path where files will be saved.
333 333 blocksPerFile : int
334 334 Number of blocks per file
335 335 metadataList : list
336 336 List of the dataOut attributes that will be saved as metadata
337 337 dataList : int
338 338 List of the dataOut attributes that will be saved as data
339 339 setType : bool
340 340 If True the name of the files corresponds to the timestamp of the data
341 341 description : dict, optional
342 342 Dictionary with the desired description of the HDF5 file
343 343
344 344 Examples
345 345 --------
346 346
347 347 desc = {
348 348 'data_output': {'winds': ['z', 'w', 'v']},
349 349 'utctime': 'timestamps',
350 350 'heightList': 'heights'
351 351 }
352 352 desc = {
353 353 'data_output': ['z', 'w', 'v'],
354 354 'utctime': 'timestamps',
355 355 'heightList': 'heights'
356 356 }
357 357 desc = {
358 358 'Data': {
359 359 'data_output': 'winds',
360 360 'utctime': 'timestamps'
361 361 },
362 362 'Metadata': {
363 363 'heightList': 'heights'
364 364 }
365 365 }
366 366
367 367 writer = proc_unit.addOperation(name='HDFWriter')
368 368 writer.addParameter(name='path', value='/path/to/file')
369 369 writer.addParameter(name='blocksPerFile', value='32')
370 370 writer.addParameter(name='metadataList', value='heightList,timeZone')
371 371 writer.addParameter(name='dataList',value='data_output,utctime')
372 372 # writer.addParameter(name='description',value=json.dumps(desc))
373 373
374 374 """
375 375
376 376 ext = ".hdf5"
377 377 optchar = "D"
378 378 filename = None
379 379 path = None
380 380 setFile = None
381 381 fp = None
382 382 firsttime = True
383 383 #Configurations
384 384 blocksPerFile = None
385 385 blockIndex = None
386 386 dataOut = None
387 387 #Data Arrays
388 388 dataList = None
389 389 metadataList = None
390 390 currentDay = None
391 391 lastTime = None
392 typeTime = "ut"
392 timeZone = "ut"
393 393 hourLimit = 3
394 394 breakDays = True
395 395
396 396 def __init__(self):
397 397
398 398 Operation.__init__(self)
399 399 return
400 400
401 401 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None,
402 description=None,typeTime = "ut",hourLimit = 3, breakDays=True):
402 description=None,timeZone = "ut",hourLimit = 3, breakDays=True):
403 403 self.path = path
404 404 self.blocksPerFile = blocksPerFile
405 405 self.metadataList = metadataList
406 406 self.dataList = [s.strip() for s in dataList]
407 407 self.setType = setType
408 408 self.description = description
409 self.timeZone = typeTime
409 self.timeZone = timeZone
410 410 self.hourLimit = hourLimit
411 411 self.breakDays = breakDays
412 412
413 413 if self.metadataList is None:
414 414 self.metadataList = self.dataOut.metadata_list
415 415
416 416 tableList = []
417 417 dsList = []
418 418
419 419 for i in range(len(self.dataList)):
420 420 dsDict = {}
421 421 if hasattr(self.dataOut, self.dataList[i]):
422 422 dataAux = getattr(self.dataOut, self.dataList[i])
423 423 dsDict['variable'] = self.dataList[i]
424 424 else:
425 425 log.warning('Attribute {} not found in dataOut', self.name)
426 426 continue
427 427
428 428 if dataAux is None:
429 429 continue
430 430 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
431 431 dsDict['nDim'] = 0
432 432 else:
433 433 dsDict['nDim'] = len(dataAux.shape)
434 434 dsDict['shape'] = dataAux.shape
435 435 dsDict['dsNumber'] = dataAux.shape[0]
436 436 dsDict['dtype'] = dataAux.dtype
437 437
438 438 dsList.append(dsDict)
439 439
440 440 self.dsList = dsList
441 441 self.currentDay = self.dataOut.datatime.date()
442 442
443 443 def timeFlag(self):
444 444 currentTime = self.dataOut.utctime
445 445 if self.timeZone == "lt":
446 446 timeTuple = time.localtime(currentTime)
447 447 elif self.timeZone == "ut":
448 448 timeTuple = time.gmtime(currentTime)
449 449
450 450 dataDay = timeTuple.tm_yday
451 451 #print("time UTC: ",currentTime, self.dataOut.datatime)
452 452 if self.lastTime is None:
453 453 self.lastTime = currentTime
454 454 self.currentDay = dataDay
455 455 return False
456 456
457 457 timeDiff = currentTime - self.lastTime
458 458
459 459 #Si el dia es diferente o si la diferencia entre un dato y otro supera self.hourLimit
460 460 if (dataDay != self.currentDay) and self.breakDays:
461 461 self.currentDay = dataDay
462 462 return True
463 463 elif timeDiff > self.hourLimit*60*60:
464 464 self.lastTime = currentTime
465 465 return True
466 466 else:
467 467 self.lastTime = currentTime
468 468 return False
469 469
470 470 def run(self, dataOut,**kwargs):
471 471
472 472 self.dataOut = dataOut
473 473 if not(self.isConfig):
474 474 self.setup(**kwargs)
475 475
476 476 self.isConfig = True
477 477 self.setNextFile()
478 478
479 479 self.putData()
480 480 return
481 481
482 482 def setNextFile(self):
483 483
484 484 ext = self.ext
485 485 path = self.path
486 486 setFile = self.setFile
487 487 if self.timeZone == "lt":
488 488 timeTuple = time.localtime(self.dataOut.utctime)
489 489 elif self.timeZone == "ut":
490 490 timeTuple = time.gmtime(self.dataOut.utctime)
491 491 #print("path: ",timeTuple)
492 492 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
493 493 fullpath = os.path.join(path, subfolder)
494 494
495 495 if os.path.exists(fullpath):
496 496 filesList = os.listdir(fullpath)
497 497 filesList = [k for k in filesList if k.startswith(self.optchar)]
498 498 if len( filesList ) > 0:
499 499 filesList = sorted(filesList, key=str.lower)
500 500 filen = filesList[-1]
501 501 # el filename debera tener el siguiente formato
502 502 # 0 1234 567 89A BCDE (hex)
503 503 # x YYYY DDD SSS .ext
504 504 if isNumber(filen[8:11]):
505 505 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
506 506 else:
507 507 setFile = -1
508 508 else:
509 509 setFile = -1 #inicializo mi contador de seteo
510 510 else:
511 511 os.makedirs(fullpath)
512 512 setFile = -1 #inicializo mi contador de seteo
513 513
514 514 if self.setType is None:
515 515 setFile += 1
516 516 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
517 517 timeTuple.tm_year,
518 518 timeTuple.tm_yday,
519 519 setFile,
520 520 ext )
521 521 else:
522 522 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
523 523 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
524 524 timeTuple.tm_year,
525 525 timeTuple.tm_yday,
526 526 setFile,
527 527 ext )
528 528
529 529 self.filename = os.path.join( path, subfolder, file )
530 530
531 531 #Setting HDF5 File
532 532 self.fp = h5py.File(self.filename, 'w')
533 533 #write metadata
534 534 self.writeMetadata(self.fp)
535 535 #Write data
536 536 self.writeData(self.fp)
537 537
538 538 def getLabel(self, name, x=None):
539 539
540 540 if x is None:
541 541 if 'Data' in self.description:
542 542 data = self.description['Data']
543 543 if 'Metadata' in self.description:
544 544 data.update(self.description['Metadata'])
545 545 else:
546 546 data = self.description
547 547 if name in data:
548 548 if isinstance(data[name], str):
549 549 return data[name]
550 550 elif isinstance(data[name], list):
551 551 return None
552 552 elif isinstance(data[name], dict):
553 553 for key, value in data[name].items():
554 554 return key
555 555 return name
556 556 else:
557 557 if 'Metadata' in self.description:
558 558 meta = self.description['Metadata']
559 559 else:
560 560 meta = self.description
561 561 if name in meta:
562 562 if isinstance(meta[name], list):
563 563 return meta[name][x]
564 564 elif isinstance(meta[name], dict):
565 565 for key, value in meta[name].items():
566 566 return value[x]
567 567 if 'cspc' in name:
568 568 return 'pair{:02d}'.format(x)
569 569 else:
570 570 return 'channel{:02d}'.format(x)
571 571
572 572 def writeMetadata(self, fp):
573 573
574 574 if self.description:
575 575 if 'Metadata' in self.description:
576 576 grp = fp.create_group('Metadata')
577 577 else:
578 578 grp = fp
579 579 else:
580 580 grp = fp.create_group('Metadata')
581 581
582 582 for i in range(len(self.metadataList)):
583 583 if not hasattr(self.dataOut, self.metadataList[i]):
584 584 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
585 585 continue
586 586 value = getattr(self.dataOut, self.metadataList[i])
587 587 if isinstance(value, bool):
588 588 if value is True:
589 589 value = 1
590 590 else:
591 591 value = 0
592 592 grp.create_dataset(self.getLabel(self.metadataList[i]), data=value)
593 593 return
594 594
595 595 def writeData(self, fp):
596 596
597 597 if self.description:
598 598 if 'Data' in self.description:
599 599 grp = fp.create_group('Data')
600 600 else:
601 601 grp = fp
602 602 else:
603 603 grp = fp.create_group('Data')
604 604
605 605 dtsets = []
606 606 data = []
607 607
608 608 for dsInfo in self.dsList:
609 609 if dsInfo['nDim'] == 0:
610 610 ds = grp.create_dataset(
611 611 self.getLabel(dsInfo['variable']),
612 612 (self.blocksPerFile, ),
613 613 chunks=True,
614 614 dtype=numpy.float64)
615 615 dtsets.append(ds)
616 616 data.append((dsInfo['variable'], -1))
617 617 else:
618 618 label = self.getLabel(dsInfo['variable'])
619 619 if label is not None:
620 620 sgrp = grp.create_group(label)
621 621 else:
622 622 sgrp = grp
623 623 for i in range(dsInfo['dsNumber']):
624 624 ds = sgrp.create_dataset(
625 625 self.getLabel(dsInfo['variable'], i),
626 626 (self.blocksPerFile, ) + dsInfo['shape'][1:],
627 627 chunks=True,
628 628 dtype=dsInfo['dtype'])
629 629 dtsets.append(ds)
630 630 data.append((dsInfo['variable'], i))
631 631 fp.flush()
632 632
633 633 log.log('Creating file: {}'.format(fp.filename), self.name)
634 634
635 635 self.ds = dtsets
636 636 self.data = data
637 637 self.firsttime = True
638 638 self.blockIndex = 0
639 639 return
640 640
641 641 def putData(self):
642 642
643 643 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
644 644 self.closeFile()
645 645 self.setNextFile()
646 646
647 647 for i, ds in enumerate(self.ds):
648 648 attr, ch = self.data[i]
649 649 if ch == -1:
650 650 ds[self.blockIndex] = getattr(self.dataOut, attr)
651 651 else:
652 652 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
653 653
654 654 self.fp.flush()
655 655 self.blockIndex += 1
656 656 if self.blockIndex == 1:
657 657 log.log('Block No. {}/{} --> {}'.format(self.blockIndex, self.blocksPerFile,self.dataOut.datatime.ctime()), self.name)
658 658 else:
659 659 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
660 660 return
661 661
662 662 def closeFile(self):
663 663
664 664 if self.blockIndex != self.blocksPerFile:
665 665 for ds in self.ds:
666 666 ds.resize(self.blockIndex, axis=0)
667 667
668 668 if self.fp:
669 669 self.fp.flush()
670 670 self.fp.close()
671 671
672 672 def close(self):
673 673
674 674 self.closeFile()
General Comments 0
You need to be logged in to leave comments. Login now