##// END OF EJS Templates
fix error of selection time in HDFReader
Alexander Valdez -
r1691:5adc7ae610db
parent child
Show More
@@ -1,628 +1,638
1 1 import os
2 2 import time
3 3 import datetime
4 4
5 5 import numpy
6 6 import h5py
7 7
8 8 import schainpy.admin
9 9 from schainpy.model.data.jrodata import *
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.io.jroIO_base import *
12 12 from schainpy.utils import log
13 13
14 14
15 15 class HDFReader(Reader, ProcessingUnit):
16 16 """Processing unit to read HDF5 format files
17 17
18 18 This unit reads HDF5 files created with `HDFWriter` operation contains
19 19 by default two groups Data and Metadata all variables would be saved as `dataOut`
20 20 attributes.
21 21 It is possible to read any HDF5 file by given the structure in the `description`
22 22 parameter, also you can add extra values to metadata with the parameter `extras`.
23 23
24 24 Parameters:
25 25 -----------
26 26 path : str
27 27 Path where files are located.
28 28 startDate : date
29 29 Start date of the files
30 30 endDate : list
31 31 End date of the files
32 32 startTime : time
33 33 Start time of the files
34 34 endTime : time
35 35 End time of the files
36 36 description : dict, optional
37 37 Dictionary with the description of the HDF5 file
38 38 extras : dict, optional
39 39 Dictionary with extra metadata to be be added to `dataOut`
40 40
41 41 Examples
42 42 --------
43 43
44 44 desc = {
45 45 'Data': {
46 46 'data_output': ['u', 'v', 'w'],
47 47 'utctime': 'timestamps',
48 48 } ,
49 49 'Metadata': {
50 50 'heightList': 'heights'
51 51 }
52 52 }
53 53
54 54 desc = {
55 55 'Data': {
56 56 'data_output': 'winds',
57 57 'utctime': 'timestamps'
58 58 },
59 59 'Metadata': {
60 60 'heightList': 'heights'
61 61 }
62 62 }
63 63
64 64 extras = {
65 65 'timeZone': 300
66 66 }
67 67
68 68 reader = project.addReadUnit(
69 69 name='HDFReader',
70 70 path='/path/to/files',
71 71 startDate='2019/01/01',
72 72 endDate='2019/01/31',
73 73 startTime='00:00:00',
74 74 endTime='23:59:59',
75 75 # description=json.dumps(desc),
76 76 # extras=json.dumps(extras),
77 77 )
78 78
79 ATTENTION:
80 Add attribute:
81
82 utcoffset='-18000'
83 in the last part of reader in order to work in Local Time
84
79 85 """
80 86
81 87 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras']
82 88
83 89 def __init__(self):
84 90 ProcessingUnit.__init__(self)
85 91 self.dataOut = Parameters()
86 92 self.ext = ".hdf5"
87 93 self.optchar = "D"
88 94 self.meta = {}
89 95 self.data = {}
90 96 self.open_file = h5py.File
91 97 self.open_mode = 'r'
92 98 self.description = {}
93 99 self.extras = {}
94 100 self.filefmt = "*%Y%j***"
95 101 self.folderfmt = "*%Y%j"
96 102 self.utcoffset = 0
97 103
98 104 def setup(self, **kwargs):
99
100 105 self.set_kwargs(**kwargs)
101 106 if not self.ext.startswith('.'):
102 107 self.ext = '.{}'.format(self.ext)
103 108
104 109 if self.online:
105 110 log.log("Searching files in online mode...", self.name)
106 111
107 112 for nTries in range(self.nTries):
108 113 fullpath = self.searchFilesOnLine(self.path, self.startDate,
109 114 self.endDate, self.expLabel, self.ext, self.walk,
110 115 self.filefmt, self.folderfmt)
111 116 try:
112 117 fullpath = next(fullpath)
113 118 except:
114 119 fullpath = None
115 120
116 121 if fullpath:
117 122 break
118 123
119 124 log.warning(
120 125 'Waiting {} sec for a valid file in {}: try {} ...'.format(
121 126 self.delay, self.path, nTries + 1),
122 127 self.name)
123 128 time.sleep(self.delay)
124 129
125 130 if not(fullpath):
126 131 raise schainpy.admin.SchainError(
127 132 'There isn\'t any valid file in {}'.format(self.path))
128 133
129 134 pathname, filename = os.path.split(fullpath)
130 135 self.year = int(filename[1:5])
131 136 self.doy = int(filename[5:8])
132 137 self.set = int(filename[8:11]) - 1
133 138 else:
134 139 log.log("Searching files in {}".format(self.path), self.name)
135 140 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
136 141 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
137 142
138 143 self.setNextFile()
139 144
140 145 return
141 146
142 147 def readFirstHeader(self):
143 148 '''Read metadata and data'''
144 149
145 150 self.__readMetadata()
146 151 self.__readData()
147 152 self.__setBlockList()
148 153
149 154 if 'type' in self.meta:
150 155 self.dataOut = eval(self.meta['type'])()
151 156
152 157 for attr in self.meta:
153 158 setattr(self.dataOut, attr, self.meta[attr])
154 159
155 160 self.blockIndex = 0
156 161
157 162 return
158 163
159 164 def __setBlockList(self):
160 165 '''
161 166 Selects the data within the times defined
162 167
163 168 self.fp
164 169 self.startTime
165 170 self.endTime
166 171 self.blockList
167 172 self.blocksPerFile
168 173
169 174 '''
170 175
171 176 startTime = self.startTime
172 177 endTime = self.endTime
173 178 thisUtcTime = self.data['utctime'] + self.utcoffset
174 179
175 180 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
176 181 thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0])
177 182
178 183 thisDate = thisDatetime.date()
179 184 thisTime = thisDatetime.time()
180 185 startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
181 186 endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
182
183 187 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
184 if len(ind)==0:
185 raise schainpy.admin.SchainError("[Reading] Date time selected invalid [%s - %s]: No *%s files in %s)" % (startTime, endTime, self.ext, self.path))
186 188
187 189 self.blockList = ind
188 190 self.blocksPerFile = len(ind)
191
192 if len(ind)==0:
193 print("[Reading] Block No. %d/%d -> %s [Skipping]" % (self.blockIndex,
194 self.blocksPerFile,
195 thisDatetime))
196 self.setNextFile()
197 self.readFirstHeader()
198
189 199 return
190 200
191 201 def __readMetadata(self):
192 202 '''
193 203 Reads Metadata
194 204 '''
195 205
196 206 meta = {}
197 207
198 208 if self.description:
199 209 for key, value in self.description['Metadata'].items():
200 210 meta[key] = self.fp[value][()]
201 211 else:
202 212 grp = self.fp['Metadata']
203 213 for name in grp:
204 214 meta[name] = grp[name][()]
205 215
206 216 if self.extras:
207 217 for key, value in self.extras.items():
208 218 meta[key] = value
209 219 self.meta = meta
210 220
211 221 return
212 222
213 223 def __readData(self):
214 224
215 225 data = {}
216 226
217 227 if self.description:
218 228 for key, value in self.description['Data'].items():
219 229 if isinstance(value, str):
220 230 if isinstance(self.fp[value], h5py.Dataset):
221 231 data[key] = self.fp[value][()]
222 232 elif isinstance(self.fp[value], h5py.Group):
223 233 array = []
224 234 for ch in self.fp[value]:
225 235 array.append(self.fp[value][ch][()])
226 236 data[key] = numpy.array(array)
227 237 elif isinstance(value, list):
228 238 array = []
229 239 for ch in value:
230 240 array.append(self.fp[ch][()])
231 241 data[key] = numpy.array(array)
232 242 else:
233 243 grp = self.fp['Data']
234 244 for name in grp:
235 245 if isinstance(grp[name], h5py.Dataset):
236 246 array = grp[name][()]
237 247 elif isinstance(grp[name], h5py.Group):
238 248 array = []
239 249 for ch in grp[name]:
240 250 array.append(grp[name][ch][()])
241 251 array = numpy.array(array)
242 252 else:
243 253 log.warning('Unknown type: {}'.format(name))
244 254
245 255 if name in self.description:
246 256 key = self.description[name]
247 257 else:
248 258 key = name
249 259 data[key] = array
250 260
251 261 self.data = data
252 262 return
253 263
254 264 def getData(self):
255 265
256 266 for attr in self.data:
257 267 if self.data[attr].ndim == 1:
258 268 setattr(self.dataOut, attr, self.data[attr][self.blockIndex])
259 269 else:
260 270 setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex])
261 271
262 272 self.dataOut.flagNoData = False
263 273 self.blockIndex += 1
264 274
265 275 log.log("Block No. {}/{} -> {}".format(
266 276 self.blockIndex,
267 277 self.blocksPerFile,
268 278 self.dataOut.datatime.ctime()), self.name)
269 279
270 280 return
271 281
272 282 def run(self, **kwargs):
273 283
274 284 if not(self.isConfig):
275 285 self.setup(**kwargs)
276 286 self.isConfig = True
277 287
278 288 if self.blockIndex == self.blocksPerFile:
279 289 self.setNextFile()
280 290
281 291 self.getData()
282 292
283 293 return
284 294
285 295 @MPDecorator
286 296 class HDFWriter(Operation):
287 297 """Operation to write HDF5 files.
288 298
289 299 The HDF5 file contains by default two groups Data and Metadata where
290 300 you can save any `dataOut` attribute specified by `dataList` and `metadataList`
291 301 parameters, data attributes are normaly time dependent where the metadata
292 302 are not.
293 303 It is possible to customize the structure of the HDF5 file with the
294 304 optional description parameter see the examples.
295 305
296 306 Parameters:
297 307 -----------
298 308 path : str
299 309 Path where files will be saved.
300 310 blocksPerFile : int
301 311 Number of blocks per file
302 312 metadataList : list
303 313 List of the dataOut attributes that will be saved as metadata
304 314 dataList : int
305 315 List of the dataOut attributes that will be saved as data
306 316 setType : bool
307 317 If True the name of the files corresponds to the timestamp of the data
308 318 description : dict, optional
309 319 Dictionary with the desired description of the HDF5 file
310 320
311 321 Examples
312 322 --------
313 323
314 324 desc = {
315 325 'data_output': {'winds': ['z', 'w', 'v']},
316 326 'utctime': 'timestamps',
317 327 'heightList': 'heights'
318 328 }
319 329 desc = {
320 330 'data_output': ['z', 'w', 'v'],
321 331 'utctime': 'timestamps',
322 332 'heightList': 'heights'
323 333 }
324 334 desc = {
325 335 'Data': {
326 336 'data_output': 'winds',
327 337 'utctime': 'timestamps'
328 338 },
329 339 'Metadata': {
330 340 'heightList': 'heights'
331 341 }
332 342 }
333 343
334 344 writer = proc_unit.addOperation(name='HDFWriter')
335 345 writer.addParameter(name='path', value='/path/to/file')
336 346 writer.addParameter(name='blocksPerFile', value='32')
337 347 writer.addParameter(name='metadataList', value='heightList,timeZone')
338 348 writer.addParameter(name='dataList',value='data_output,utctime')
339 349 # writer.addParameter(name='description',value=json.dumps(desc))
340 350
341 351 """
342 352
343 353 ext = ".hdf5"
344 354 optchar = "D"
345 355 filename = None
346 356 path = None
347 357 setFile = None
348 358 fp = None
349 359 firsttime = True
350 360 #Configurations
351 361 blocksPerFile = None
352 362 blockIndex = None
353 363 dataOut = None
354 364 #Data Arrays
355 365 dataList = None
356 366 metadataList = None
357 367 currentDay = None
358 368 lastTime = None
359 369
360 370 def __init__(self):
361 371
362 372 Operation.__init__(self)
363 373 return
364 374
365 375 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None, description=None):
366 376 self.path = path
367 377 self.blocksPerFile = blocksPerFile
368 378 self.metadataList = metadataList
369 379 self.dataList = [s.strip() for s in dataList]
370 380 self.setType = setType
371 381 self.description = description
372 382
373 383 if self.metadataList is None:
374 384 self.metadataList = self.dataOut.metadata_list
375 385
376 386 tableList = []
377 387 dsList = []
378 388
379 389 for i in range(len(self.dataList)):
380 390 dsDict = {}
381 391 if hasattr(self.dataOut, self.dataList[i]):
382 392 dataAux = getattr(self.dataOut, self.dataList[i])
383 393 dsDict['variable'] = self.dataList[i]
384 394 else:
385 395 log.warning('Attribute {} not found in dataOut', self.name)
386 396 continue
387 397
388 398 if dataAux is None:
389 399 continue
390 400 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
391 401 dsDict['nDim'] = 0
392 402 else:
393 403 dsDict['nDim'] = len(dataAux.shape)
394 404 dsDict['shape'] = dataAux.shape
395 405 dsDict['dsNumber'] = dataAux.shape[0]
396 406 dsDict['dtype'] = dataAux.dtype
397 407
398 408 dsList.append(dsDict)
399 409
400 410 self.dsList = dsList
401 411 self.currentDay = self.dataOut.datatime.date()
402 412
403 413 def timeFlag(self):
404 414 currentTime = self.dataOut.utctime
405 415 timeTuple = time.localtime(currentTime)
406 416 dataDay = timeTuple.tm_yday
407 417
408 418 if self.lastTime is None:
409 419 self.lastTime = currentTime
410 420 self.currentDay = dataDay
411 421 return False
412 422
413 423 timeDiff = currentTime - self.lastTime
414 424
415 425 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
416 426 if dataDay != self.currentDay:
417 427 self.currentDay = dataDay
418 428 return True
419 429 elif timeDiff > 3*60*60:
420 430 self.lastTime = currentTime
421 431 return True
422 432 else:
423 433 self.lastTime = currentTime
424 434 return False
425 435
426 436 def run(self, dataOut, path, blocksPerFile=10, metadataList=None,
427 437 dataList=[], setType=None, description={}):
428 438
429 439 self.dataOut = dataOut
430 440 if not(self.isConfig):
431 441 self.setup(path=path, blocksPerFile=blocksPerFile,
432 442 metadataList=metadataList, dataList=dataList,
433 443 setType=setType, description=description)
434 444
435 445 self.isConfig = True
436 446 self.setNextFile()
437 447
438 448 self.putData()
439 449 return
440 450
441 451 def setNextFile(self):
442 452
443 453 ext = self.ext
444 454 path = self.path
445 455 setFile = self.setFile
446 456
447 457 timeTuple = time.localtime(self.dataOut.utctime)
448 458 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
449 459 fullpath = os.path.join(path, subfolder)
450 460
451 461 if os.path.exists(fullpath):
452 462 filesList = os.listdir(fullpath)
453 463 filesList = [k for k in filesList if k.startswith(self.optchar)]
454 464 if len( filesList ) > 0:
455 465 filesList = sorted(filesList, key=str.lower)
456 466 filen = filesList[-1]
457 467 # el filename debera tener el siguiente formato
458 468 # 0 1234 567 89A BCDE (hex)
459 469 # x YYYY DDD SSS .ext
460 470 if isNumber(filen[8:11]):
461 471 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
462 472 else:
463 473 setFile = -1
464 474 else:
465 475 setFile = -1 #inicializo mi contador de seteo
466 476 else:
467 477 os.makedirs(fullpath)
468 478 setFile = -1 #inicializo mi contador de seteo
469 479
470 480 if self.setType is None:
471 481 setFile += 1
472 482 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
473 483 timeTuple.tm_year,
474 484 timeTuple.tm_yday,
475 485 setFile,
476 486 ext )
477 487 else:
478 488 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
479 489 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
480 490 timeTuple.tm_year,
481 491 timeTuple.tm_yday,
482 492 setFile,
483 493 ext )
484 494
485 495 self.filename = os.path.join( path, subfolder, file )
486 496
487 497 #Setting HDF5 File
488 498 self.fp = h5py.File(self.filename, 'w')
489 499 #write metadata
490 500 self.writeMetadata(self.fp)
491 501 #Write data
492 502 self.writeData(self.fp)
493 503
494 504 def getLabel(self, name, x=None):
495 505
496 506 if x is None:
497 507 if 'Data' in self.description:
498 508 data = self.description['Data']
499 509 if 'Metadata' in self.description:
500 510 data.update(self.description['Metadata'])
501 511 else:
502 512 data = self.description
503 513 if name in data:
504 514 if isinstance(data[name], str):
505 515 return data[name]
506 516 elif isinstance(data[name], list):
507 517 return None
508 518 elif isinstance(data[name], dict):
509 519 for key, value in data[name].items():
510 520 return key
511 521 return name
512 522 else:
513 523 if 'Metadata' in self.description:
514 524 meta = self.description['Metadata']
515 525 else:
516 526 meta = self.description
517 527 if name in meta:
518 528 if isinstance(meta[name], list):
519 529 return meta[name][x]
520 530 elif isinstance(meta[name], dict):
521 531 for key, value in meta[name].items():
522 532 return value[x]
523 533 if 'cspc' in name:
524 534 return 'pair{:02d}'.format(x)
525 535 else:
526 536 return 'channel{:02d}'.format(x)
527 537
528 538 def writeMetadata(self, fp):
529 539
530 540 if self.description:
531 541 if 'Metadata' in self.description:
532 542 grp = fp.create_group('Metadata')
533 543 else:
534 544 grp = fp
535 545 else:
536 546 grp = fp.create_group('Metadata')
537 547
538 548 for i in range(len(self.metadataList)):
539 549 if not hasattr(self.dataOut, self.metadataList[i]):
540 550 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
541 551 continue
542 552 value = getattr(self.dataOut, self.metadataList[i])
543 553 if isinstance(value, bool):
544 554 if value is True:
545 555 value = 1
546 556 else:
547 557 value = 0
548 558 grp.create_dataset(self.getLabel(self.metadataList[i]), data=value)
549 559 return
550 560
551 561 def writeData(self, fp):
552 562
553 563 if self.description:
554 564 if 'Data' in self.description:
555 565 grp = fp.create_group('Data')
556 566 else:
557 567 grp = fp
558 568 else:
559 569 grp = fp.create_group('Data')
560 570
561 571 dtsets = []
562 572 data = []
563 573
564 574 for dsInfo in self.dsList:
565 575 if dsInfo['nDim'] == 0:
566 576 ds = grp.create_dataset(
567 577 self.getLabel(dsInfo['variable']),
568 578 (self.blocksPerFile, ),
569 579 chunks=True,
570 580 dtype=numpy.float64)
571 581 dtsets.append(ds)
572 582 data.append((dsInfo['variable'], -1))
573 583 else:
574 584 label = self.getLabel(dsInfo['variable'])
575 585 if label is not None:
576 586 sgrp = grp.create_group(label)
577 587 else:
578 588 sgrp = grp
579 589 for i in range(dsInfo['dsNumber']):
580 590 ds = sgrp.create_dataset(
581 591 self.getLabel(dsInfo['variable'], i),
582 592 (self.blocksPerFile, ) + dsInfo['shape'][1:],
583 593 chunks=True,
584 594 dtype=dsInfo['dtype'])
585 595 dtsets.append(ds)
586 596 data.append((dsInfo['variable'], i))
587 597 fp.flush()
588 598
589 599 log.log('Creating file: {}'.format(fp.filename), self.name)
590 600
591 601 self.ds = dtsets
592 602 self.data = data
593 603 self.firsttime = True
594 604 self.blockIndex = 0
595 605 return
596 606
597 607 def putData(self):
598 608
599 609 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
600 610 self.closeFile()
601 611 self.setNextFile()
602 612
603 613 for i, ds in enumerate(self.ds):
604 614 attr, ch = self.data[i]
605 615 if ch == -1:
606 616 ds[self.blockIndex] = getattr(self.dataOut, attr)
607 617 else:
608 618 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
609 619
610 620 self.fp.flush()
611 621 self.blockIndex += 1
612 622 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
613 623
614 624 return
615 625
616 626 def closeFile(self):
617 627
618 628 if self.blockIndex != self.blocksPerFile:
619 629 for ds in self.ds:
620 630 ds.resize(self.blockIndex, axis=0)
621 631
622 632 if self.fp:
623 633 self.fp.flush()
624 634 self.fp.close()
625 635
626 636 def close(self):
627 637
628 638 self.closeFile()
General Comments 0
You need to be logged in to leave comments. Login now