##// END OF EJS Templates
update uniquechannel
avaldez -
r1784:57448bf73d50
parent child
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -1,662 +1,662
1 1 import os
2 2 import time
3 3 import datetime
4 4
5 5 import numpy
6 6 import h5py
7 7
8 8 import schainpy.admin
9 9 from schainpy.model.data.jrodata import *
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.io.jroIO_base import *
12 12 from schainpy.utils import log
13 13
14 14
15 15 class HDFReader(Reader, ProcessingUnit):
16 16 """Processing unit to read HDF5 format files
17 17
18 18 This unit reads HDF5 files created with `HDFWriter` operation contains
19 19 by default two groups Data and Metadata all variables would be saved as `dataOut`
20 20 attributes.
21 21 It is possible to read any HDF5 file by given the structure in the `description`
22 22 parameter, also you can add extra values to metadata with the parameter `extras`.
23 23
24 24 Parameters:
25 25 -----------
26 26 path : str
27 27 Path where files are located.
28 28 startDate : date
29 29 Start date of the files
30 30 endDate : list
31 31 End date of the files
32 32 startTime : time
33 33 Start time of the files
34 34 endTime : time
35 35 End time of the files
36 36 description : dict, optional
37 37 Dictionary with the description of the HDF5 file
38 38 extras : dict, optional
39 39 Dictionary with extra metadata to be be added to `dataOut`
40 40
41 41 Examples
42 42 --------
43 43
44 44 desc = {
45 45 'Data': {
46 46 'data_output': ['u', 'v', 'w'],
47 47 'utctime': 'timestamps',
48 48 } ,
49 49 'Metadata': {
50 50 'heightList': 'heights'
51 51 }
52 52 }
53 53
54 54 desc = {
55 55 'Data': {
56 56 'data_output': 'winds',
57 57 'utctime': 'timestamps'
58 58 },
59 59 'Metadata': {
60 60 'heightList': 'heights'
61 61 }
62 62 }
63 63
64 64 extras = {
65 65 'timeZone': 300
66 66 }
67 67
68 68 reader = project.addReadUnit(
69 69 name='HDFReader',
70 70 path='/path/to/files',
71 71 startDate='2019/01/01',
72 72 endDate='2019/01/31',
73 73 startTime='00:00:00',
74 74 endTime='23:59:59',
75 75 # description=json.dumps(desc),
76 76 # extras=json.dumps(extras),
77 77 )
78 78
79 79 """
80 80
81 81 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras']
82 82
83 83 def __init__(self):
84 84 ProcessingUnit.__init__(self)
85 85 self.dataOut = Parameters()
86 86 self.ext = ".hdf5"
87 87 self.optchar = "D"
88 88 self.meta = {}
89 89 self.data = {}
90 90 self.open_file = h5py.File
91 91 self.open_mode = 'r'
92 92 self.description = {}
93 93 self.extras = {}
94 94 self.filefmt = "*%Y%j***"
95 95 self.folderfmt = "*%Y%j"
96 96 self.utcoffset = 0
97 97
98 98 def setup(self, **kwargs):
99 99
100 100 self.set_kwargs(**kwargs)
101 101 if not self.ext.startswith('.'):
102 102 self.ext = '.{}'.format(self.ext)
103 103
104 104 if self.online:
105 105 log.log("Searching files in online mode...", self.name)
106 106
107 107 for nTries in range(self.nTries):
108 108 fullpath = self.searchFilesOnLine(self.path, self.startDate,
109 109 self.endDate, self.expLabel, self.ext, self.walk,
110 110 self.filefmt, self.folderfmt)
111 111 try:
112 112 fullpath = next(fullpath)
113 113 except:
114 114 fullpath = None
115 115
116 116 if fullpath:
117 117 break
118 118
119 119 log.warning(
120 120 'Waiting {} sec for a valid file in {}: try {} ...'.format(
121 121 self.delay, self.path, nTries + 1),
122 122 self.name)
123 123 time.sleep(self.delay)
124 124
125 125 if not(fullpath):
126 126 raise schainpy.admin.SchainError(
127 127 'There isn\'t any valid file in {}'.format(self.path))
128 128
129 129 pathname, filename = os.path.split(fullpath)
130 130 self.year = int(filename[1:5])
131 131 self.doy = int(filename[5:8])
132 132 self.set = int(filename[8:11]) - 1
133 133 else:
134 134 log.log("Searching files in {}".format(self.path), self.name)
135 135 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
136 136 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
137 137
138 138 self.setNextFile()
139 139
140 140 return
141 141
142 142 def readFirstHeader(self):
143 143 '''Read metadata and data'''
144 144
145 145 self.__readMetadata()
146 146 self.__readData()
147 147 self.__setBlockList()
148 148
149 149 if 'type' in self.meta:
150 150 self.dataOut = eval(self.meta['type'])()
151 151
152 152 for attr in self.meta:
153 153 setattr(self.dataOut, attr, self.meta[attr])
154 154
155 155 self.blockIndex = 0
156 156
157 157 return
158 158
159 159 def __setBlockList(self):
160 160 '''
161 161 Selects the data within the times defined
162 162
163 163 self.fp
164 164 self.startTime
165 165 self.endTime
166 166 self.blockList
167 167 self.blocksPerFile
168 168
169 169 '''
170 170
171 171 startTime = self.startTime
172 172 endTime = self.endTime
173 173 thisUtcTime = self.data['utctime'] + self.utcoffset
174 174 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
175 175 thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0])
176 176
177 177 thisDate = thisDatetime.date()
178 178 thisTime = thisDatetime.time()
179 179
180 180 startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
181 181 endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
182 182
183 183 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
184 184
185 185 self.blockList = ind
186 186 self.blocksPerFile = len(ind)
187 187 # similar to master
188 188 if len(ind)==0:
189 189 print("[Reading] Block No. %d/%d -> %s [Skipping]" % (self.blockIndex,
190 190 self.blocksPerFile,
191 191 thisDatetime))
192 192 self.setNextFile()
193 193 # similar to master
194 194 return
195 195
196 196 def __readMetadata(self):
197 197 '''
198 198 Reads Metadata
199 199 '''
200 200
201 201 meta = {}
202 202
203 203 if self.description:
204 204 for key, value in self.description['Metadata'].items():
205 205 meta[key] = self.fp[value][()]
206 206 else:
207 207 grp = self.fp['Metadata']
208 208 for name in grp:
209 209 meta[name] = grp[name][()]
210 210
211 211 if self.extras:
212 212 for key, value in self.extras.items():
213 213 meta[key] = value
214 214 self.meta = meta
215 215
216 216 return
217 217
218 218 def __readData(self):
219 219
220 220 data = {}
221 221
222 222 if self.description:
223 223 for key, value in self.description['Data'].items():
224 224 if isinstance(value, str):
225 225 if isinstance(self.fp[value], h5py.Dataset):
226 226 data[key] = self.fp[value][()]
227 227 elif isinstance(self.fp[value], h5py.Group):
228 228 array = []
229 229 for ch in self.fp[value]:
230 230 array.append(self.fp[value][ch][()])
231 231 data[key] = numpy.array(array)
232 232 elif isinstance(value, list):
233 233 array = []
234 234 for ch in value:
235 235 array.append(self.fp[ch][()])
236 236 data[key] = numpy.array(array)
237 237 else:
238 238 grp = self.fp['Data']
239 239 for name in grp:
240 240 if isinstance(grp[name], h5py.Dataset):
241 241 array = grp[name][()]
242 242 elif isinstance(grp[name], h5py.Group):
243 243 array = []
244 244 for ch in grp[name]:
245 245 array.append(grp[name][ch][()])
246 246 array = numpy.array(array)
247 247 else:
248 248 log.warning('Unknown type: {}'.format(name))
249 249
250 250 if name in self.description:
251 251 key = self.description[name]
252 252 else:
253 253 key = name
254 254 data[key] = array
255 255
256 256 self.data = data
257 257 return
258 258
259 259 def getData(self):
260 260
261 261 for attr in self.data:
262 262 if self.data[attr].ndim == 1:
263 263 setattr(self.dataOut, attr, self.data[attr][self.blockIndex])
264 264 else:
265 265 setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex])
266 266
267 267 self.dataOut.flagNoData = False
268 268 self.blockIndex += 1
269 269
270 270 log.log("Block No. {}/{} -> {}".format(
271 271 self.blockIndex,
272 272 self.blocksPerFile,
273 273 self.dataOut.datatime.ctime()), self.name)
274 274
275 275 return
276 276
277 277 def run(self, **kwargs):
278 278
279 279 if not(self.isConfig):
280 280 self.setup(**kwargs)
281 281 self.isConfig = True
282 282
283 283 if self.blockIndex == self.blocksPerFile:
284 284 self.setNextFile()
285 285
286 286 self.getData()
287 287
288 288 return
289 289
290 290 @MPDecorator
291 291 class HDFWriter(Operation):
292 292 """Operation to write HDF5 files.
293 293
294 294 The HDF5 file contains by default two groups Data and Metadata where
295 295 you can save any `dataOut` attribute specified by `dataList` and `metadataList`
296 296 parameters, data attributes are normaly time dependent where the metadata
297 297 are not.
298 298 It is possible to customize the structure of the HDF5 file with the
299 299 optional description parameter see the examples.
300 300
301 301 Parameters:
302 302 -----------
303 303 path : str
304 304 Path where files will be saved.
305 305 blocksPerFile : int
306 306 Number of blocks per file
307 307 metadataList : list
308 308 List of the dataOut attributes that will be saved as metadata
309 309 dataList : int
310 310 List of the dataOut attributes that will be saved as data
311 311 setType : bool
312 312 If True the name of the files corresponds to the timestamp of the data
313 313 description : dict, optional
314 314 Dictionary with the desired description of the HDF5 file
315 315
316 316 Examples
317 317 --------
318 318
319 319 desc = {
320 320 'data_output': {'winds': ['z', 'w', 'v']},
321 321 'utctime': 'timestamps',
322 322 'heightList': 'heights'
323 323 }
324 324 desc = {
325 325 'data_output': ['z', 'w', 'v'],
326 326 'utctime': 'timestamps',
327 327 'heightList': 'heights'
328 328 }
329 329 desc = {
330 330 'Data': {
331 331 'data_output': 'winds',
332 332 'utctime': 'timestamps'
333 333 },
334 334 'Metadata': {
335 335 'heightList': 'heights'
336 336 }
337 337 }
338 338
339 339 writer = proc_unit.addOperation(name='HDFWriter')
340 340 writer.addParameter(name='path', value='/path/to/file')
341 341 writer.addParameter(name='blocksPerFile', value='32')
342 342 writer.addParameter(name='metadataList', value='heightList,timeZone')
343 343 writer.addParameter(name='dataList',value='data_output,utctime')
344 344 # writer.addParameter(name='description',value=json.dumps(desc))
345 345
346 346 """
347 347
348 348 ext = ".hdf5"
349 349 optchar = "D"
350 350 filename = None
351 351 path = None
352 352 setFile = None
353 353 fp = None
354 354 firsttime = True
355 355 #Configurations
356 356 blocksPerFile = None
357 357 blockIndex = None
358 358 dataOut = None
359 359 #Data Arrays
360 360 dataList = None
361 361 metadataList = None
362 362 currentDay = None
363 363 lastTime = None
364 364
365 365 def __init__(self):
366 366
367 367 Operation.__init__(self)
368 368 return
369 369
370 370 def set_kwargs(self, **kwargs):
371 371
372 372 for key, value in kwargs.items():
373 373 setattr(self, key, value)
374 374
375 375 def set_kwargs_obj(self, obj, **kwargs):
376 376
377 377 for key, value in kwargs.items():
378 378 setattr(obj, key, value)
379 379
380 380 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None, description=None, **kwargs):
381 381 self.path = path
382 382 self.blocksPerFile = blocksPerFile
383 383 self.metadataList = metadataList
384 384 self.dataList = [s.strip() for s in dataList]
385 385 self.setType = setType
386 386 self.description = description
387 387 self.set_kwargs(**kwargs)
388 388 #print("self.uniqueChannel: ", self.uniqueChannel)
389 #self.uniqueChannel = uniqueChannel
389 self.uniqueChannel = uniqueChannel
390 390
391 391 if self.metadataList is None:
392 392 self.metadataList = self.dataOut.metadata_list
393 393
394 394 tableList = []
395 395 dsList = []
396 396
397 397 for i in range(len(self.dataList)):
398 398 dsDict = {}
399 399 if hasattr(self.dataOut, self.dataList[i]):
400 400 dataAux = getattr(self.dataOut, self.dataList[i])
401 401 dsDict['variable'] = self.dataList[i]
402 402 else:
403 403 log.warning('Attribute {} not found in dataOut', self.name)
404 404 continue
405 405
406 406 if dataAux is None:
407 407 continue
408 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
408 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float_)):
409 409 dsDict['nDim'] = 0
410 410 else:
411 411 if self.uniqueChannel: #Creates extra dimension to avoid the creation of multiple channels
412 412 dataAux = numpy.expand_dims(dataAux, axis=0)
413 413 #setattr(self.dataOut, self.dataList[i], numpy.expand_dims(getattr(self.dataOut, self.dataList[i]), axis=0))
414 414 #dataAux = getattr(self.dataOut, self.dataList[i])
415 415 #print(getattr(self.dataOut, self.dataList[i]))
416 416 dsDict['nDim'] = len(dataAux.shape)
417 417 dsDict['shape'] = dataAux.shape
418 418 dsDict['dsNumber'] = dataAux.shape[0]
419 419 dsDict['dtype'] = dataAux.dtype
420 420
421 421 dsList.append(dsDict)
422 422
423 423 self.dsList = dsList
424 424 self.currentDay = self.dataOut.datatime.date()
425 425
426 426 def timeFlag(self):
427 427 currentTime = self.dataOut.utctime
428 428 timeTuple = time.localtime(currentTime)
429 429 dataDay = timeTuple.tm_yday
430 430
431 431 if self.lastTime is None:
432 432 self.lastTime = currentTime
433 433 self.currentDay = dataDay
434 434 return False
435 435
436 436 timeDiff = currentTime - self.lastTime
437 437
438 438 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
439 439 if dataDay != self.currentDay:
440 440 self.currentDay = dataDay
441 441 return True
442 442 elif timeDiff > 3*60*60:
443 443 self.lastTime = currentTime
444 444 return True
445 445 else:
446 446 self.lastTime = currentTime
447 447 return False
448 448
449 449 def run(self, dataOut, path, blocksPerFile=10, metadataList=None,
450 450 dataList=[], setType=None, description={}, **kwargs):
451 451
452 452 self.dataOut = dataOut
453 453 self.set_kwargs_obj(self.dataOut, **kwargs)
454 454 if not(self.isConfig):
455 455 self.setup(path=path, blocksPerFile=blocksPerFile,
456 456 metadataList=metadataList, dataList=dataList,
457 457 setType=setType, description=description, **kwargs)
458 458
459 459 self.isConfig = True
460 460 self.setNextFile()
461 461
462 462 self.putData()
463 463
464 464 return
465 465
466 466 def setNextFile(self):
467 467
468 468 ext = self.ext
469 469 path = self.path
470 470 setFile = self.setFile
471 471
472 472 timeTuple = time.localtime(self.dataOut.utctime)
473 473 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
474 474 fullpath = os.path.join(path, subfolder)
475 475
476 476 if os.path.exists(fullpath):
477 477 filesList = os.listdir(fullpath)
478 478 filesList = [k for k in filesList if k.startswith(self.optchar)]
479 479 if len( filesList ) > 0:
480 480 filesList = sorted(filesList, key=str.lower)
481 481 filen = filesList[-1]
482 482 # el filename debera tener el siguiente formato
483 483 # 0 1234 567 89A BCDE (hex)
484 484 # x YYYY DDD SSS .ext
485 485 if isNumber(filen[8:11]):
486 486 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
487 487 else:
488 488 setFile = -1
489 489 else:
490 490 setFile = -1 #inicializo mi contador de seteo
491 491 else:
492 492 os.makedirs(fullpath)
493 493 setFile = -1 #inicializo mi contador de seteo
494 494
495 495 if self.setType is None:
496 496 setFile += 1
497 497 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
498 498 timeTuple.tm_year,
499 499 timeTuple.tm_yday,
500 500 setFile,
501 501 ext )
502 502 else:
503 503 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
504 504 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
505 505 timeTuple.tm_year,
506 506 timeTuple.tm_yday,
507 507 setFile,
508 508 ext )
509 509
510 510 self.filename = os.path.join( path, subfolder, file )
511 511
512 512 #Setting HDF5 File
513 513 self.fp = h5py.File(self.filename, 'w')
514 514 #write metadata
515 515 self.writeMetadata(self.fp)
516 516 #Write data
517 517 self.writeData(self.fp)
518 518
519 519 def getLabel(self, name, x=None):
520 520 #print("x: ", x)
521 521 if x is None:
522 522 if 'Data' in self.description:
523 523 data = self.description['Data']
524 524 if 'Metadata' in self.description:
525 525 data.update(self.description['Metadata'])
526 526 else:
527 527 data = self.description
528 528 if name in data:
529 529 if isinstance(data[name], str):
530 530 return data[name]
531 531 elif isinstance(data[name], list):
532 532 return None
533 533 elif isinstance(data[name], dict):
534 534 for key, value in data[name].items():
535 535 return key
536 536 return name
537 537 else:
538 538 if 'Data' in self.description:
539 539 data = self.description['Data']
540 540 if 'Metadata' in self.description:
541 541 data.update(self.description['Metadata'])
542 542 else:
543 543 data = self.description
544 544 if name in data:
545 545 if isinstance(data[name], list):
546 546 return data[name][x]
547 547 elif isinstance(data[name], dict):
548 548 for key, value in data[name].items():
549 549 return value[x]
550 550 if 'cspc' in name:
551 551 return 'pair{:02d}'.format(x)
552 552 else:
553 553 return 'channel{:02d}'.format(x)
554 554
555 555 def writeMetadata(self, fp):
556 556
557 557 if self.description:
558 558 if 'Metadata' in self.description:
559 559 grp = fp.create_group('Metadata')
560 560 else:
561 561 grp = fp
562 562 else:
563 563 grp = fp.create_group('Metadata')
564 564
565 565 for i in range(len(self.metadataList)):
566 566 if not hasattr(self.dataOut, self.metadataList[i]):
567 567 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
568 568 continue
569 569 value = getattr(self.dataOut, self.metadataList[i])
570 570 if isinstance(value, bool):
571 571 if value is True:
572 572 value = 1
573 573 else:
574 574 value = 0
575 575 grp.create_dataset(self.getLabel(self.metadataList[i]), data=value)
576 576 return
577 577
578 578 def writeData(self, fp):
579 579
580 580 if self.description:
581 581 if 'Data' in self.description:
582 582 grp = fp.create_group('Data')
583 583 else:
584 584 grp = fp
585 585 else:
586 586 grp = fp.create_group('Data')
587 587
588 588 dtsets = []
589 589 data = []
590 590 #print("self.dsList: ", self.dsList)
591 591 for dsInfo in self.dsList:
592 592 if dsInfo['nDim'] == 0:
593 593 ds = grp.create_dataset(
594 594 self.getLabel(dsInfo['variable']),
595 595 (self.blocksPerFile, ),
596 596 chunks=True,
597 597 dtype=numpy.float64)
598 598 dtsets.append(ds)
599 599 data.append((dsInfo['variable'], -1))
600 600 else:
601 601 label = self.getLabel(dsInfo['variable'])
602 602 if label is not None:
603 603 sgrp = grp.create_group(label)
604 604 else:
605 605 sgrp = grp
606 606 if self.uniqueChannel: #Creates extra dimension to avoid the creation of multiple channels
607 607 setattr(self.dataOut, dsInfo['variable'], numpy.expand_dims(getattr(self.dataOut, dsInfo['variable']), axis=0))
608 608 for i in range(dsInfo['dsNumber']):
609 609 ds = sgrp.create_dataset(
610 610 self.getLabel(dsInfo['variable'], i),
611 611 (self.blocksPerFile, ) + dsInfo['shape'][1:],
612 612 chunks=True,
613 613 dtype=dsInfo['dtype'])
614 614 dtsets.append(ds)
615 615 data.append((dsInfo['variable'], i))
616 616
617 617 fp.flush()
618 618
619 619 log.log('Creating file: {}'.format(fp.filename), self.name)
620 620
621 621 self.ds = dtsets
622 622 self.data = data
623 623 self.firsttime = True
624 624 self.blockIndex = 0
625 625 return
626 626
627 627 def putData(self):
628 628
629 629 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
630 630 self.closeFile()
631 631 self.setNextFile()
632 632
633 633 for i, ds in enumerate(self.ds):
634 634 attr, ch = self.data[i]
635 635 if ch == -1:
636 636 ds[self.blockIndex] = getattr(self.dataOut, attr)
637 637 else:
638 638 if self.uniqueChannel and self.blockIndex != 0: #Creates extra dimension to avoid the creation of multiple channels
639 639 setattr(self.dataOut, attr, numpy.expand_dims(getattr(self.dataOut, attr), axis=0))
640 640 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
641 641 if self.uniqueChannel: #Deletes extra dimension created to avoid the creation of multiple channels
642 642 setattr(self.dataOut, attr, getattr(self.dataOut, attr)[0])
643 643
644 644 self.fp.flush()
645 645 self.blockIndex += 1
646 646 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
647 647
648 648 return
649 649
650 650 def closeFile(self):
651 651
652 652 if self.blockIndex != self.blocksPerFile:
653 653 for ds in self.ds:
654 654 ds.resize(self.blockIndex, axis=0)
655 655
656 656 if self.fp:
657 657 self.fp.flush()
658 658 self.fp.close()
659 659
660 660 def close(self):
661 661
662 662 self.closeFile()
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
General Comments 0
You need to be logged in to leave comments. Login now