##// END OF EJS Templates
update setNextFile internamente llama a la funcion readFirstHeader
Alexander Valdez -
r1695:17b25ecdbe24
parent child
Show More
@@ -1,640 +1,639
1 1 import os
2 2 import time
3 3 import datetime
4 4
5 5 import numpy
6 6 import h5py
7 7
8 8 import schainpy.admin
9 9 from schainpy.model.data.jrodata import *
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.io.jroIO_base import *
12 12 from schainpy.utils import log
13 13
14 14
15 15 class HDFReader(Reader, ProcessingUnit):
16 16 """Processing unit to read HDF5 format files
17 17
18 18 This unit reads HDF5 files created with `HDFWriter` operation contains
19 19 by default two groups Data and Metadata all variables would be saved as `dataOut`
20 20 attributes.
21 21 It is possible to read any HDF5 file by given the structure in the `description`
22 22 parameter, also you can add extra values to metadata with the parameter `extras`.
23 23
24 24 Parameters:
25 25 -----------
26 26 path : str
27 27 Path where files are located.
28 28 startDate : date
29 29 Start date of the files
30 30 endDate : list
31 31 End date of the files
32 32 startTime : time
33 33 Start time of the files
34 34 endTime : time
35 35 End time of the files
36 36 description : dict, optional
37 37 Dictionary with the description of the HDF5 file
38 38 extras : dict, optional
39 39 Dictionary with extra metadata to be be added to `dataOut`
40 40
41 41 Attention: Be carefull, add attribute utcoffset, in the last part of reader in order to work in Local Time without time problems.
42 42
43 43 -----------
44 44 utcoffset='-18000'
45 45
46 46
47 47 Examples
48 48 --------
49 49
50 50 desc = {
51 51 'Data': {
52 52 'data_output': ['u', 'v', 'w'],
53 53 'utctime': 'timestamps',
54 54 } ,
55 55 'Metadata': {
56 56 'heightList': 'heights'
57 57 }
58 58 }
59 59
60 60 desc = {
61 61 'Data': {
62 62 'data_output': 'winds',
63 63 'utctime': 'timestamps'
64 64 },
65 65 'Metadata': {
66 66 'heightList': 'heights'
67 67 }
68 68 }
69 69
70 70 extras = {
71 71 'timeZone': 300
72 72 }
73 73
74 74
75 75 reader = project.addReadUnit(
76 76 name='HDFReader',
77 77 path='/path/to/files',
78 78 startDate='2019/01/01',
79 79 endDate='2019/01/31',
80 80 startTime='00:00:00',
81 81 endTime='23:59:59',
82 82 utcoffset='-18000'
83 83 # description=json.dumps(desc),
84 84 # extras=json.dumps(extras),
85 85 )
86 86
87 87 """
88 88
89 89 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras']
90 90
91 91 def __init__(self):
92 92 ProcessingUnit.__init__(self)
93 93 self.dataOut = Parameters()
94 94 self.ext = ".hdf5"
95 95 self.optchar = "D"
96 96 self.meta = {}
97 97 self.data = {}
98 98 self.open_file = h5py.File
99 99 self.open_mode = 'r'
100 100 self.description = {}
101 101 self.extras = {}
102 102 self.filefmt = "*%Y%j***"
103 103 self.folderfmt = "*%Y%j"
104 104 self.utcoffset = 0
105 105
106 106 def setup(self, **kwargs):
107 107 self.set_kwargs(**kwargs)
108 108 if not self.ext.startswith('.'):
109 109 self.ext = '.{}'.format(self.ext)
110 110
111 111 if self.online:
112 112 log.log("Searching files in online mode...", self.name)
113 113
114 114 for nTries in range(self.nTries):
115 115 fullpath = self.searchFilesOnLine(self.path, self.startDate,
116 116 self.endDate, self.expLabel, self.ext, self.walk,
117 117 self.filefmt, self.folderfmt)
118 118 try:
119 119 fullpath = next(fullpath)
120 120 except:
121 121 fullpath = None
122 122
123 123 if fullpath:
124 124 break
125 125
126 126 log.warning(
127 127 'Waiting {} sec for a valid file in {}: try {} ...'.format(
128 128 self.delay, self.path, nTries + 1),
129 129 self.name)
130 130 time.sleep(self.delay)
131 131
132 132 if not(fullpath):
133 133 raise schainpy.admin.SchainError(
134 134 'There isn\'t any valid file in {}'.format(self.path))
135 135
136 136 pathname, filename = os.path.split(fullpath)
137 137 self.year = int(filename[1:5])
138 138 self.doy = int(filename[5:8])
139 139 self.set = int(filename[8:11]) - 1
140 140 else:
141 141 log.log("Searching files in {}".format(self.path), self.name)
142 142 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
143 143 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
144 144
145 145 self.setNextFile()
146 146
147 147 return
148 148
149 149 def readFirstHeader(self):
150 150 '''Read metadata and data'''
151 151
152 152 self.__readMetadata()
153 153 self.__readData()
154 154 self.__setBlockList()
155 155
156 156 if 'type' in self.meta:
157 157 self.dataOut = eval(self.meta['type'])()
158 158
159 159 for attr in self.meta:
160 160 setattr(self.dataOut, attr, self.meta[attr])
161 161
162 162 self.blockIndex = 0
163 163
164 164 return
165 165
166 166 def __setBlockList(self):
167 167 '''
168 168 Selects the data within the times defined
169 169
170 170 self.fp
171 171 self.startTime
172 172 self.endTime
173 173 self.blockList
174 174 self.blocksPerFile
175 175
176 176 '''
177 177
178 178 startTime = self.startTime
179 179 endTime = self.endTime
180 180 thisUtcTime = self.data['utctime'] + self.utcoffset
181 181
182 182 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
183 183 thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0])
184 184
185 185 thisDate = thisDatetime.date()
186 186 thisTime = thisDatetime.time()
187 187 startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
188 188 endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
189 189 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
190 190
191 191 self.blockList = ind
192 192 self.blocksPerFile = len(ind)
193 193
194 194 if len(ind)==0:
195 195 print("[Reading] Block No. %d/%d -> %s [Skipping]" % (self.blockIndex,
196 196 self.blocksPerFile,
197 197 thisDatetime))
198 198 self.setNextFile()
199 #self.readFirstHeader()
200 199
201 200 return
202 201
203 202 def __readMetadata(self):
204 203 '''
205 204 Reads Metadata
206 205 '''
207 206
208 207 meta = {}
209 208
210 209 if self.description:
211 210 for key, value in self.description['Metadata'].items():
212 211 meta[key] = self.fp[value][()]
213 212 else:
214 213 grp = self.fp['Metadata']
215 214 for name in grp:
216 215 meta[name] = grp[name][()]
217 216
218 217 if self.extras:
219 218 for key, value in self.extras.items():
220 219 meta[key] = value
221 220 self.meta = meta
222 221
223 222 return
224 223
225 224 def __readData(self):
226 225
227 226 data = {}
228 227
229 228 if self.description:
230 229 for key, value in self.description['Data'].items():
231 230 if isinstance(value, str):
232 231 if isinstance(self.fp[value], h5py.Dataset):
233 232 data[key] = self.fp[value][()]
234 233 elif isinstance(self.fp[value], h5py.Group):
235 234 array = []
236 235 for ch in self.fp[value]:
237 236 array.append(self.fp[value][ch][()])
238 237 data[key] = numpy.array(array)
239 238 elif isinstance(value, list):
240 239 array = []
241 240 for ch in value:
242 241 array.append(self.fp[ch][()])
243 242 data[key] = numpy.array(array)
244 243 else:
245 244 grp = self.fp['Data']
246 245 for name in grp:
247 246 if isinstance(grp[name], h5py.Dataset):
248 247 array = grp[name][()]
249 248 elif isinstance(grp[name], h5py.Group):
250 249 array = []
251 250 for ch in grp[name]:
252 251 array.append(grp[name][ch][()])
253 252 array = numpy.array(array)
254 253 else:
255 254 log.warning('Unknown type: {}'.format(name))
256 255
257 256 if name in self.description:
258 257 key = self.description[name]
259 258 else:
260 259 key = name
261 260 data[key] = array
262 261
263 262 self.data = data
264 263 return
265 264
266 265 def getData(self):
267 266
268 267 for attr in self.data:
269 268 if self.data[attr].ndim == 1:
270 269 setattr(self.dataOut, attr, self.data[attr][self.blockIndex])
271 270 else:
272 271 setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex])
273 272
274 273 self.dataOut.flagNoData = False
275 274 self.blockIndex += 1
276 275
277 276 log.log("Block No. {}/{} -> {}".format(
278 277 self.blockIndex,
279 278 self.blocksPerFile,
280 279 self.dataOut.datatime.ctime()), self.name)
281 280
282 281 return
283 282
284 283 def run(self, **kwargs):
285 284
286 285 if not(self.isConfig):
287 286 self.setup(**kwargs)
288 287 self.isConfig = True
289 288
290 289 if self.blockIndex == self.blocksPerFile:
291 290 self.setNextFile()
292 291
293 292 self.getData()
294 293
295 294 return
296 295
297 296 @MPDecorator
298 297 class HDFWriter(Operation):
299 298 """Operation to write HDF5 files.
300 299
301 300 The HDF5 file contains by default two groups Data and Metadata where
302 301 you can save any `dataOut` attribute specified by `dataList` and `metadataList`
303 302 parameters, data attributes are normaly time dependent where the metadata
304 303 are not.
305 304 It is possible to customize the structure of the HDF5 file with the
306 305 optional description parameter see the examples.
307 306
308 307 Parameters:
309 308 -----------
310 309 path : str
311 310 Path where files will be saved.
312 311 blocksPerFile : int
313 312 Number of blocks per file
314 313 metadataList : list
315 314 List of the dataOut attributes that will be saved as metadata
316 315 dataList : int
317 316 List of the dataOut attributes that will be saved as data
318 317 setType : bool
319 318 If True the name of the files corresponds to the timestamp of the data
320 319 description : dict, optional
321 320 Dictionary with the desired description of the HDF5 file
322 321
323 322 Examples
324 323 --------
325 324
326 325 desc = {
327 326 'data_output': {'winds': ['z', 'w', 'v']},
328 327 'utctime': 'timestamps',
329 328 'heightList': 'heights'
330 329 }
331 330 desc = {
332 331 'data_output': ['z', 'w', 'v'],
333 332 'utctime': 'timestamps',
334 333 'heightList': 'heights'
335 334 }
336 335 desc = {
337 336 'Data': {
338 337 'data_output': 'winds',
339 338 'utctime': 'timestamps'
340 339 },
341 340 'Metadata': {
342 341 'heightList': 'heights'
343 342 }
344 343 }
345 344
346 345 writer = proc_unit.addOperation(name='HDFWriter')
347 346 writer.addParameter(name='path', value='/path/to/file')
348 347 writer.addParameter(name='blocksPerFile', value='32')
349 348 writer.addParameter(name='metadataList', value='heightList,timeZone')
350 349 writer.addParameter(name='dataList',value='data_output,utctime')
351 350 # writer.addParameter(name='description',value=json.dumps(desc))
352 351
353 352 """
354 353
355 354 ext = ".hdf5"
356 355 optchar = "D"
357 356 filename = None
358 357 path = None
359 358 setFile = None
360 359 fp = None
361 360 firsttime = True
362 361 #Configurations
363 362 blocksPerFile = None
364 363 blockIndex = None
365 364 dataOut = None
366 365 #Data Arrays
367 366 dataList = None
368 367 metadataList = None
369 368 currentDay = None
370 369 lastTime = None
371 370
372 371 def __init__(self):
373 372
374 373 Operation.__init__(self)
375 374 return
376 375
377 376 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None, description=None):
378 377 self.path = path
379 378 self.blocksPerFile = blocksPerFile
380 379 self.metadataList = metadataList
381 380 self.dataList = [s.strip() for s in dataList]
382 381 self.setType = setType
383 382 self.description = description
384 383
385 384 if self.metadataList is None:
386 385 self.metadataList = self.dataOut.metadata_list
387 386
388 387 tableList = []
389 388 dsList = []
390 389
391 390 for i in range(len(self.dataList)):
392 391 dsDict = {}
393 392 if hasattr(self.dataOut, self.dataList[i]):
394 393 dataAux = getattr(self.dataOut, self.dataList[i])
395 394 dsDict['variable'] = self.dataList[i]
396 395 else:
397 396 log.warning('Attribute {} not found in dataOut', self.name)
398 397 continue
399 398
400 399 if dataAux is None:
401 400 continue
402 401 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
403 402 dsDict['nDim'] = 0
404 403 else:
405 404 dsDict['nDim'] = len(dataAux.shape)
406 405 dsDict['shape'] = dataAux.shape
407 406 dsDict['dsNumber'] = dataAux.shape[0]
408 407 dsDict['dtype'] = dataAux.dtype
409 408
410 409 dsList.append(dsDict)
411 410
412 411 self.dsList = dsList
413 412 self.currentDay = self.dataOut.datatime.date()
414 413
415 414 def timeFlag(self):
416 415 currentTime = self.dataOut.utctime
417 416 timeTuple = time.localtime(currentTime)
418 417 dataDay = timeTuple.tm_yday
419 418
420 419 if self.lastTime is None:
421 420 self.lastTime = currentTime
422 421 self.currentDay = dataDay
423 422 return False
424 423
425 424 timeDiff = currentTime - self.lastTime
426 425
427 426 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
428 427 if dataDay != self.currentDay:
429 428 self.currentDay = dataDay
430 429 return True
431 430 elif timeDiff > 3*60*60:
432 431 self.lastTime = currentTime
433 432 return True
434 433 else:
435 434 self.lastTime = currentTime
436 435 return False
437 436
438 437 def run(self, dataOut, path, blocksPerFile=10, metadataList=None,
439 438 dataList=[], setType=None, description={}):
440 439
441 440 self.dataOut = dataOut
442 441 if not(self.isConfig):
443 442 self.setup(path=path, blocksPerFile=blocksPerFile,
444 443 metadataList=metadataList, dataList=dataList,
445 444 setType=setType, description=description)
446 445
447 446 self.isConfig = True
448 447 self.setNextFile()
449 448
450 449 self.putData()
451 450 return
452 451
453 452 def setNextFile(self):
454 453
455 454 ext = self.ext
456 455 path = self.path
457 456 setFile = self.setFile
458 457
459 458 timeTuple = time.localtime(self.dataOut.utctime)
460 459 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
461 460 fullpath = os.path.join(path, subfolder)
462 461
463 462 if os.path.exists(fullpath):
464 463 filesList = os.listdir(fullpath)
465 464 filesList = [k for k in filesList if k.startswith(self.optchar)]
466 465 if len( filesList ) > 0:
467 466 filesList = sorted(filesList, key=str.lower)
468 467 filen = filesList[-1]
469 468 # el filename debera tener el siguiente formato
470 469 # 0 1234 567 89A BCDE (hex)
471 470 # x YYYY DDD SSS .ext
472 471 if isNumber(filen[8:11]):
473 472 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
474 473 else:
475 474 setFile = -1
476 475 else:
477 476 setFile = -1 #inicializo mi contador de seteo
478 477 else:
479 478 os.makedirs(fullpath)
480 479 setFile = -1 #inicializo mi contador de seteo
481 480
482 481 if self.setType is None:
483 482 setFile += 1
484 483 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
485 484 timeTuple.tm_year,
486 485 timeTuple.tm_yday,
487 486 setFile,
488 487 ext )
489 488 else:
490 489 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
491 490 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
492 491 timeTuple.tm_year,
493 492 timeTuple.tm_yday,
494 493 setFile,
495 494 ext )
496 495
497 496 self.filename = os.path.join( path, subfolder, file )
498 497
499 498 #Setting HDF5 File
500 499 self.fp = h5py.File(self.filename, 'w')
501 500 #write metadata
502 501 self.writeMetadata(self.fp)
503 502 #Write data
504 503 self.writeData(self.fp)
505 504
506 505 def getLabel(self, name, x=None):
507 506
508 507 if x is None:
509 508 if 'Data' in self.description:
510 509 data = self.description['Data']
511 510 if 'Metadata' in self.description:
512 511 data.update(self.description['Metadata'])
513 512 else:
514 513 data = self.description
515 514 if name in data:
516 515 if isinstance(data[name], str):
517 516 return data[name]
518 517 elif isinstance(data[name], list):
519 518 return None
520 519 elif isinstance(data[name], dict):
521 520 for key, value in data[name].items():
522 521 return key
523 522 return name
524 523 else:
525 524 if 'Metadata' in self.description:
526 525 meta = self.description['Metadata']
527 526 else:
528 527 meta = self.description
529 528 if name in meta:
530 529 if isinstance(meta[name], list):
531 530 return meta[name][x]
532 531 elif isinstance(meta[name], dict):
533 532 for key, value in meta[name].items():
534 533 return value[x]
535 534 if 'cspc' in name:
536 535 return 'pair{:02d}'.format(x)
537 536 else:
538 537 return 'channel{:02d}'.format(x)
539 538
540 539 def writeMetadata(self, fp):
541 540
542 541 if self.description:
543 542 if 'Metadata' in self.description:
544 543 grp = fp.create_group('Metadata')
545 544 else:
546 545 grp = fp
547 546 else:
548 547 grp = fp.create_group('Metadata')
549 548
550 549 for i in range(len(self.metadataList)):
551 550 if not hasattr(self.dataOut, self.metadataList[i]):
552 551 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
553 552 continue
554 553 value = getattr(self.dataOut, self.metadataList[i])
555 554 if isinstance(value, bool):
556 555 if value is True:
557 556 value = 1
558 557 else:
559 558 value = 0
560 559 grp.create_dataset(self.getLabel(self.metadataList[i]), data=value)
561 560 return
562 561
563 562 def writeData(self, fp):
564 563
565 564 if self.description:
566 565 if 'Data' in self.description:
567 566 grp = fp.create_group('Data')
568 567 else:
569 568 grp = fp
570 569 else:
571 570 grp = fp.create_group('Data')
572 571
573 572 dtsets = []
574 573 data = []
575 574
576 575 for dsInfo in self.dsList:
577 576 if dsInfo['nDim'] == 0:
578 577 ds = grp.create_dataset(
579 578 self.getLabel(dsInfo['variable']),
580 579 (self.blocksPerFile, ),
581 580 chunks=True,
582 581 dtype=numpy.float64)
583 582 dtsets.append(ds)
584 583 data.append((dsInfo['variable'], -1))
585 584 else:
586 585 label = self.getLabel(dsInfo['variable'])
587 586 if label is not None:
588 587 sgrp = grp.create_group(label)
589 588 else:
590 589 sgrp = grp
591 590 for i in range(dsInfo['dsNumber']):
592 591 ds = sgrp.create_dataset(
593 592 self.getLabel(dsInfo['variable'], i),
594 593 (self.blocksPerFile, ) + dsInfo['shape'][1:],
595 594 chunks=True,
596 595 dtype=dsInfo['dtype'])
597 596 dtsets.append(ds)
598 597 data.append((dsInfo['variable'], i))
599 598 fp.flush()
600 599
601 600 log.log('Creating file: {}'.format(fp.filename), self.name)
602 601
603 602 self.ds = dtsets
604 603 self.data = data
605 604 self.firsttime = True
606 605 self.blockIndex = 0
607 606 return
608 607
609 608 def putData(self):
610 609
611 610 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
612 611 self.closeFile()
613 612 self.setNextFile()
614 613
615 614 for i, ds in enumerate(self.ds):
616 615 attr, ch = self.data[i]
617 616 if ch == -1:
618 617 ds[self.blockIndex] = getattr(self.dataOut, attr)
619 618 else:
620 619 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
621 620
622 621 self.fp.flush()
623 622 self.blockIndex += 1
624 623 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
625 624
626 625 return
627 626
628 627 def closeFile(self):
629 628
630 629 if self.blockIndex != self.blocksPerFile:
631 630 for ds in self.ds:
632 631 ds.resize(self.blockIndex, axis=0)
633 632
634 633 if self.fp:
635 634 self.fp.flush()
636 635 self.fp.close()
637 636
638 637 def close(self):
639 638
640 639 self.closeFile()
General Comments 0
You need to be logged in to leave comments. Login now