##// END OF EJS Templates
Límite de bloques, hora, zona horaria
joabAM -
r1401:c018ae3094fa
parent child
Show More
@@ -1,660 +1,674
1 1 import os
2 2 import time
3 3 import datetime
4 4
5 5 import numpy
6 6 import h5py
7 7
8 8 import schainpy.admin
9 9 from schainpy.model.data.jrodata import *
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.io.jroIO_base import *
12 12 from schainpy.utils import log
13 13
14 14
15 15 class HDFReader(Reader, ProcessingUnit):
16 16 """Processing unit to read HDF5 format files
17 17
18 18 This unit reads HDF5 files created with `HDFWriter` operation contains
19 19 by default two groups Data and Metadata all variables would be saved as `dataOut`
20 20 attributes.
21 21 It is possible to read any HDF5 file by given the structure in the `description`
22 22 parameter, also you can add extra values to metadata with the parameter `extras`.
23 23
24 24 Parameters:
25 25 -----------
26 26 path : str
27 27 Path where files are located.
28 28 startDate : date
29 29 Start date of the files
30 30 endDate : list
31 31 End date of the files
32 32 startTime : time
33 33 Start time of the files
34 34 endTime : time
35 35 End time of the files
36 36 description : dict, optional
37 37 Dictionary with the description of the HDF5 file
38 38 extras : dict, optional
39 39 Dictionary with extra metadata to be be added to `dataOut`
40 40
41 41 Examples
42 42 --------
43 43
44 44 desc = {
45 45 'Data': {
46 46 'data_output': ['u', 'v', 'w'],
47 47 'utctime': 'timestamps',
48 48 } ,
49 49 'Metadata': {
50 50 'heightList': 'heights'
51 51 }
52 52 }
53 53
54 54 desc = {
55 55 'Data': {
56 56 'data_output': 'winds',
57 57 'utctime': 'timestamps'
58 58 },
59 59 'Metadata': {
60 60 'heightList': 'heights'
61 61 }
62 62 }
63 63
64 64 extras = {
65 65 'timeZone': 300
66 66 }
67 67
68 68 reader = project.addReadUnit(
69 69 name='HDFReader',
70 70 path='/path/to/files',
71 71 startDate='2019/01/01',
72 72 endDate='2019/01/31',
73 73 startTime='00:00:00',
74 74 endTime='23:59:59',
75 75 # description=json.dumps(desc),
76 76 # extras=json.dumps(extras),
77 77 )
78 78
79 79 """
80 80
81 81 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras']
82 82
83 83 def __init__(self):
84 84 ProcessingUnit.__init__(self)
85 85 self.dataOut = Parameters()
86 86 self.ext = ".hdf5"
87 87 self.optchar = "D"
88 88 self.meta = {}
89 89 self.data = {}
90 90 self.open_file = h5py.File
91 91 self.open_mode = 'r'
92 92 self.description = {}
93 93 self.extras = {}
94 94 self.filefmt = "*%Y%j***"
95 95 self.folderfmt = "*%Y%j"
96 96 self.utcoffset = 0
97 97
98 98 def setup(self, **kwargs):
99 99
100 100 self.set_kwargs(**kwargs)
101 101 if not self.ext.startswith('.'):
102 102 self.ext = '.{}'.format(self.ext)
103 103
104 104 if self.online:
105 105 log.log("Searching files in online mode...", self.name)
106 106
107 107 for nTries in range(self.nTries):
108 108 fullpath = self.searchFilesOnLine(self.path, self.startDate,
109 109 self.endDate, self.expLabel, self.ext, self.walk,
110 110 self.filefmt, self.folderfmt)
111 111 pathname, filename = os.path.split(fullpath)
112 112 #print(pathname,filename)
113 113 try:
114 114 fullpath = next(fullpath)
115 115
116 116 except:
117 117 fullpath = None
118 118
119 119 if fullpath:
120 120 break
121 121
122 122 log.warning(
123 123 'Waiting {} sec for a valid file in {}: try {} ...'.format(
124 124 self.delay, self.path, nTries + 1),
125 125 self.name)
126 126 time.sleep(self.delay)
127 127
128 128 if not(fullpath):
129 129 raise schainpy.admin.SchainError(
130 130 'There isn\'t any valid file in {}'.format(self.path))
131 131
132 132 pathname, filename = os.path.split(fullpath)
133 133 self.year = int(filename[1:5])
134 134 self.doy = int(filename[5:8])
135 135 self.set = int(filename[8:11]) - 1
136 136 else:
137 137 log.log("Searching files in {}".format(self.path), self.name)
138 138 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
139 139 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
140 140
141 141 self.setNextFile()
142 142
143 143 return
144 144
145 145
146 146 def readFirstHeader(self):
147 147 '''Read metadata and data'''
148 148
149 149 self.__readMetadata()
150 150 self.__readData()
151 151 self.__setBlockList()
152 152
153 153 if 'type' in self.meta:
154 ##print("Creting dataOut...")
154 155 self.dataOut = eval(self.meta['type'])()
156 ##print(vars(self.dataOut))
155 157
156 158 for attr in self.meta:
157 #print("attr: ", attr)
159 ##print("attr: ", attr)
160 ##print(type(self.dataOut).__name__)
158 161 setattr(self.dataOut, attr, self.meta[attr])
159 162
160
161 163 self.blockIndex = 0
162 164
163 165 return
164 166
165 167 def __setBlockList(self):
166 168 '''
167 169 Selects the data within the times defined
168 170
169 171 self.fp
170 172 self.startTime
171 173 self.endTime
172 174 self.blockList
173 175 self.blocksPerFile
174 176
175 177 '''
176 178
177 179 startTime = self.startTime
178 180 endTime = self.endTime
179 181 thisUtcTime = self.data['utctime'] + self.utcoffset
180 182 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
181 183 thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0])
182 184 self.startFileDatetime = thisDatetime
183 185 thisDate = thisDatetime.date()
184 186 thisTime = thisDatetime.time()
185 187
186 188 startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
187 189 endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
188 190
189 191 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
190 192
191 193 self.blockList = ind
192 194 self.blocksPerFile = len(ind)
193 195 self.blocksPerFile = len(thisUtcTime)
194 196 return
195 197
196 198 def __readMetadata(self):
197 199 '''
198 200 Reads Metadata
199 201 '''
200 202
201 203 meta = {}
202 204
203 205 if self.description:
204 206 for key, value in self.description['Metadata'].items():
205 207 meta[key] = self.fp[value][()]
206 208 else:
207 209 grp = self.fp['Metadata']
208 210 for name in grp:
209 211 meta[name] = grp[name][()]
210 212
211 213 if self.extras:
212 214 for key, value in self.extras.items():
213 215 meta[key] = value
214 216 self.meta = meta
215 217
216 218 return
217 219
218 220
219 221
220 222 def checkForRealPath(self, nextFile, nextDay):
221 223
222 224 # print("check FRP")
223 225 # dt = self.startFileDatetime + datetime.timedelta(1)
224 226 # filename = '{}.{}{}'.format(self.path, dt.strftime('%Y%m%d'), self.ext)
225 227 # fullfilename = os.path.join(self.path, filename)
226 228 # print("check Path ",fullfilename,filename)
227 229 # if os.path.exists(fullfilename):
228 230 # return fullfilename, filename
229 231 # return None, filename
230 232 return None,None
231 233
232 234 def __readData(self):
233 235
234 236 data = {}
235 237
236 238 if self.description:
237 239 for key, value in self.description['Data'].items():
238 240 if isinstance(value, str):
239 241 if isinstance(self.fp[value], h5py.Dataset):
240 242 data[key] = self.fp[value][()]
241 243 elif isinstance(self.fp[value], h5py.Group):
242 244 array = []
243 245 for ch in self.fp[value]:
244 246 array.append(self.fp[value][ch][()])
245 247 data[key] = numpy.array(array)
246 248 elif isinstance(value, list):
247 249 array = []
248 250 for ch in value:
249 251 array.append(self.fp[ch][()])
250 252 data[key] = numpy.array(array)
251 253 else:
252 254 grp = self.fp['Data']
253 255 for name in grp:
254 256 if isinstance(grp[name], h5py.Dataset):
255 257 array = grp[name][()]
256 258 elif isinstance(grp[name], h5py.Group):
257 259 array = []
258 260 for ch in grp[name]:
259 261 array.append(grp[name][ch][()])
260 262 array = numpy.array(array)
261 263 else:
262 264 log.warning('Unknown type: {}'.format(name))
263 265
264 266 if name in self.description:
265 267 key = self.description[name]
266 268 else:
267 269 key = name
268 270 data[key] = array
269 271
270 272 self.data = data
271 273 return
272 274
273 275 def getData(self):
274 276 if not self.isDateTimeInRange(self.startFileDatetime, self.startDate, self.endDate, self.startTime, self.endTime):
275 277 self.dataOut.flagNoData = True
276 278 self.blockIndex = self.blocksPerFile
277 279 #self.dataOut.error = True TERMINA EL PROGRAMA, removido
278 280 return
279 281 for attr in self.data:
282 #print("attr ",attr)
280 283 if self.data[attr].ndim == 1:
281 284 setattr(self.dataOut, attr, self.data[attr][self.blockIndex])
282 285 else:
283 286 setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex])
284 287
285 self.dataOut.flagNoData = False
288
286 289 self.blockIndex += 1
287 290
288 291 if self.blockIndex == 1:
289 292 log.log("Block No. {}/{} -> {}".format(
290 293 self.blockIndex,
291 294 self.blocksPerFile,
292 295 self.dataOut.datatime.ctime()), self.name)
293 296 else:
294 297 log.log("Block No. {}/{} ".format(
295 298 self.blockIndex,
296 299 self.blocksPerFile),self.name)
297 300
298
301 self.dataOut.flagNoData = False
302 self.dataOut.error = False
299 303 return
300 304
301 305 def run(self, **kwargs):
302 306
303 307 if not(self.isConfig):
304 308 self.setup(**kwargs)
305 309 self.isConfig = True
306 310
307 311 if self.blockIndex == self.blocksPerFile:
308 312 self.setNextFile()
309 313
310 314 self.getData()
311 315
312 316 return
313 317
314 318 @MPDecorator
315 319 class HDFWriter(Operation):
316 320 """Operation to write HDF5 files.
317 321
318 322 The HDF5 file contains by default two groups Data and Metadata where
319 323 you can save any `dataOut` attribute specified by `dataList` and `metadataList`
320 324 parameters, data attributes are normaly time dependent where the metadata
321 325 are not.
322 326 It is possible to customize the structure of the HDF5 file with the
323 327 optional description parameter see the examples.
324 328
325 329 Parameters:
326 330 -----------
327 331 path : str
328 332 Path where files will be saved.
329 333 blocksPerFile : int
330 334 Number of blocks per file
331 335 metadataList : list
332 336 List of the dataOut attributes that will be saved as metadata
333 337 dataList : int
334 338 List of the dataOut attributes that will be saved as data
335 339 setType : bool
336 340 If True the name of the files corresponds to the timestamp of the data
337 341 description : dict, optional
338 342 Dictionary with the desired description of the HDF5 file
339 343
340 344 Examples
341 345 --------
342 346
343 347 desc = {
344 348 'data_output': {'winds': ['z', 'w', 'v']},
345 349 'utctime': 'timestamps',
346 350 'heightList': 'heights'
347 351 }
348 352 desc = {
349 353 'data_output': ['z', 'w', 'v'],
350 354 'utctime': 'timestamps',
351 355 'heightList': 'heights'
352 356 }
353 357 desc = {
354 358 'Data': {
355 359 'data_output': 'winds',
356 360 'utctime': 'timestamps'
357 361 },
358 362 'Metadata': {
359 363 'heightList': 'heights'
360 364 }
361 365 }
362 366
363 367 writer = proc_unit.addOperation(name='HDFWriter')
364 368 writer.addParameter(name='path', value='/path/to/file')
365 369 writer.addParameter(name='blocksPerFile', value='32')
366 370 writer.addParameter(name='metadataList', value='heightList,timeZone')
367 371 writer.addParameter(name='dataList',value='data_output,utctime')
368 372 # writer.addParameter(name='description',value=json.dumps(desc))
369 373
370 374 """
371 375
372 376 ext = ".hdf5"
373 377 optchar = "D"
374 378 filename = None
375 379 path = None
376 380 setFile = None
377 381 fp = None
378 382 firsttime = True
379 383 #Configurations
380 384 blocksPerFile = None
381 385 blockIndex = None
382 386 dataOut = None
383 387 #Data Arrays
384 388 dataList = None
385 389 metadataList = None
386 390 currentDay = None
387 391 lastTime = None
392 typeTime = "ut"
393 hourLimit = 3
394 breakDays = True
388 395
389 396 def __init__(self):
390 397
391 398 Operation.__init__(self)
392 399 return
393 400
394 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None, description=None):
401 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None,
402 description=None,typeTime = "ut",hourLimit = 3, breakDays=True):
395 403 self.path = path
396 404 self.blocksPerFile = blocksPerFile
397 405 self.metadataList = metadataList
398 406 self.dataList = [s.strip() for s in dataList]
399 407 self.setType = setType
400 408 self.description = description
409 self.timeZone = typeTime
410 self.hourLimit = hourLimit
411 self.breakDays = breakDays
401 412
402 413 if self.metadataList is None:
403 414 self.metadataList = self.dataOut.metadata_list
404 415
405 416 tableList = []
406 417 dsList = []
407 418
408 419 for i in range(len(self.dataList)):
409 420 dsDict = {}
410 421 if hasattr(self.dataOut, self.dataList[i]):
411 422 dataAux = getattr(self.dataOut, self.dataList[i])
412 423 dsDict['variable'] = self.dataList[i]
413 424 else:
414 425 log.warning('Attribute {} not found in dataOut', self.name)
415 426 continue
416 427
417 428 if dataAux is None:
418 429 continue
419 430 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
420 431 dsDict['nDim'] = 0
421 432 else:
422 433 dsDict['nDim'] = len(dataAux.shape)
423 434 dsDict['shape'] = dataAux.shape
424 435 dsDict['dsNumber'] = dataAux.shape[0]
425 436 dsDict['dtype'] = dataAux.dtype
426 437
427 438 dsList.append(dsDict)
428 439
429 440 self.dsList = dsList
430 441 self.currentDay = self.dataOut.datatime.date()
431 442
432 443 def timeFlag(self):
433 444 currentTime = self.dataOut.utctime
445 if self.timeZone == "lt":
434 446 timeTuple = time.localtime(currentTime)
447 elif self.timeZone == "ut":
448 timeTuple = time.gmtime(currentTime)
449
435 450 dataDay = timeTuple.tm_yday
436 451 #print("time UTC: ",currentTime, self.dataOut.datatime)
437 452 if self.lastTime is None:
438 453 self.lastTime = currentTime
439 454 self.currentDay = dataDay
440 455 return False
441 456
442 457 timeDiff = currentTime - self.lastTime
443 458
444 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
445 if dataDay != self.currentDay:
459 #Si el dia es diferente o si la diferencia entre un dato y otro supera self.hourLimit
460 if (dataDay != self.currentDay) and self.breakDays:
446 461 self.currentDay = dataDay
447 462 return True
448 elif timeDiff > 3*60*60:
463 elif timeDiff > self.hourLimit*60*60:
449 464 self.lastTime = currentTime
450 465 return True
451 466 else:
452 467 self.lastTime = currentTime
453 468 return False
454 469
455 def run(self, dataOut, path, blocksPerFile=10, metadataList=None,
456 dataList=[], setType=None, description={}):
470 def run(self, dataOut,**kwargs):
457 471
458 472 self.dataOut = dataOut
459 473 if not(self.isConfig):
460 self.setup(path=path, blocksPerFile=blocksPerFile,
461 metadataList=metadataList, dataList=dataList,
462 setType=setType, description=description)
474 self.setup(**kwargs)
463 475
464 476 self.isConfig = True
465 477 self.setNextFile()
466 478
467 479 self.putData()
468 480 return
469 481
470 482 def setNextFile(self):
471 483
472 484 ext = self.ext
473 485 path = self.path
474 486 setFile = self.setFile
475
487 if self.timeZone == "lt":
488 timeTuple = time.localtime(self.dataOut.utctime)
489 elif self.timeZone == "ut":
476 490 timeTuple = time.gmtime(self.dataOut.utctime)
477 491 #print("path: ",timeTuple)
478 492 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
479 493 fullpath = os.path.join(path, subfolder)
480 494
481 495 if os.path.exists(fullpath):
482 496 filesList = os.listdir(fullpath)
483 497 filesList = [k for k in filesList if k.startswith(self.optchar)]
484 498 if len( filesList ) > 0:
485 499 filesList = sorted(filesList, key=str.lower)
486 500 filen = filesList[-1]
487 501 # el filename debera tener el siguiente formato
488 502 # 0 1234 567 89A BCDE (hex)
489 503 # x YYYY DDD SSS .ext
490 504 if isNumber(filen[8:11]):
491 505 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
492 506 else:
493 507 setFile = -1
494 508 else:
495 509 setFile = -1 #inicializo mi contador de seteo
496 510 else:
497 511 os.makedirs(fullpath)
498 512 setFile = -1 #inicializo mi contador de seteo
499 513
500 514 if self.setType is None:
501 515 setFile += 1
502 516 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
503 517 timeTuple.tm_year,
504 518 timeTuple.tm_yday,
505 519 setFile,
506 520 ext )
507 521 else:
508 522 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
509 523 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
510 524 timeTuple.tm_year,
511 525 timeTuple.tm_yday,
512 526 setFile,
513 527 ext )
514 528
515 529 self.filename = os.path.join( path, subfolder, file )
516 530
517 531 #Setting HDF5 File
518 532 self.fp = h5py.File(self.filename, 'w')
519 533 #write metadata
520 534 self.writeMetadata(self.fp)
521 535 #Write data
522 536 self.writeData(self.fp)
523 537
524 538 def getLabel(self, name, x=None):
525 539
526 540 if x is None:
527 541 if 'Data' in self.description:
528 542 data = self.description['Data']
529 543 if 'Metadata' in self.description:
530 544 data.update(self.description['Metadata'])
531 545 else:
532 546 data = self.description
533 547 if name in data:
534 548 if isinstance(data[name], str):
535 549 return data[name]
536 550 elif isinstance(data[name], list):
537 551 return None
538 552 elif isinstance(data[name], dict):
539 553 for key, value in data[name].items():
540 554 return key
541 555 return name
542 556 else:
543 557 if 'Metadata' in self.description:
544 558 meta = self.description['Metadata']
545 559 else:
546 560 meta = self.description
547 561 if name in meta:
548 562 if isinstance(meta[name], list):
549 563 return meta[name][x]
550 564 elif isinstance(meta[name], dict):
551 565 for key, value in meta[name].items():
552 566 return value[x]
553 567 if 'cspc' in name:
554 568 return 'pair{:02d}'.format(x)
555 569 else:
556 570 return 'channel{:02d}'.format(x)
557 571
558 572 def writeMetadata(self, fp):
559 573
560 574 if self.description:
561 575 if 'Metadata' in self.description:
562 576 grp = fp.create_group('Metadata')
563 577 else:
564 578 grp = fp
565 579 else:
566 580 grp = fp.create_group('Metadata')
567 581
568 582 for i in range(len(self.metadataList)):
569 583 if not hasattr(self.dataOut, self.metadataList[i]):
570 584 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
571 585 continue
572 586 value = getattr(self.dataOut, self.metadataList[i])
573 587 if isinstance(value, bool):
574 588 if value is True:
575 589 value = 1
576 590 else:
577 591 value = 0
578 592 grp.create_dataset(self.getLabel(self.metadataList[i]), data=value)
579 593 return
580 594
581 595 def writeData(self, fp):
582 596
583 597 if self.description:
584 598 if 'Data' in self.description:
585 599 grp = fp.create_group('Data')
586 600 else:
587 601 grp = fp
588 602 else:
589 603 grp = fp.create_group('Data')
590 604
591 605 dtsets = []
592 606 data = []
593 607
594 608 for dsInfo in self.dsList:
595 609 if dsInfo['nDim'] == 0:
596 610 ds = grp.create_dataset(
597 611 self.getLabel(dsInfo['variable']),
598 612 (self.blocksPerFile, ),
599 613 chunks=True,
600 614 dtype=numpy.float64)
601 615 dtsets.append(ds)
602 616 data.append((dsInfo['variable'], -1))
603 617 else:
604 618 label = self.getLabel(dsInfo['variable'])
605 619 if label is not None:
606 620 sgrp = grp.create_group(label)
607 621 else:
608 622 sgrp = grp
609 623 for i in range(dsInfo['dsNumber']):
610 624 ds = sgrp.create_dataset(
611 625 self.getLabel(dsInfo['variable'], i),
612 626 (self.blocksPerFile, ) + dsInfo['shape'][1:],
613 627 chunks=True,
614 628 dtype=dsInfo['dtype'])
615 629 dtsets.append(ds)
616 630 data.append((dsInfo['variable'], i))
617 631 fp.flush()
618 632
619 633 log.log('Creating file: {}'.format(fp.filename), self.name)
620 634
621 635 self.ds = dtsets
622 636 self.data = data
623 637 self.firsttime = True
624 638 self.blockIndex = 0
625 639 return
626 640
627 641 def putData(self):
628 642
629 643 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
630 644 self.closeFile()
631 645 self.setNextFile()
632 646
633 647 for i, ds in enumerate(self.ds):
634 648 attr, ch = self.data[i]
635 649 if ch == -1:
636 650 ds[self.blockIndex] = getattr(self.dataOut, attr)
637 651 else:
638 652 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
639 653
640 654 self.fp.flush()
641 655 self.blockIndex += 1
642 656 if self.blockIndex == 1:
643 657 log.log('Block No. {}/{} --> {}'.format(self.blockIndex, self.blocksPerFile,self.dataOut.datatime.ctime()), self.name)
644 658 else:
645 659 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
646 660 return
647 661
648 662 def closeFile(self):
649 663
650 664 if self.blockIndex != self.blocksPerFile:
651 665 for ds in self.ds:
652 666 ds.resize(self.blockIndex, axis=0)
653 667
654 668 if self.fp:
655 669 self.fp.flush()
656 670 self.fp.close()
657 671
658 672 def close(self):
659 673
660 674 self.closeFile()
1 NO CONTENT: modified file
General Comments 0
You need to be logged in to leave comments. Login now