##// END OF EJS Templates
Now HDFWriter allows to write data with no channel dimension
rflores -
r1708:f3265d754878
parent child
Show More
@@ -1,636 +1,636
1 1 import os
2 2 import time
3 3 import datetime
4 4
5 5 import numpy
6 6 import h5py
7 7
8 8 import schainpy.admin
9 9 from schainpy.model.data.jrodata import *
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.io.jroIO_base import *
12 12 from schainpy.utils import log
13 13
14 14
15 15 class HDFReader(Reader, ProcessingUnit):
16 16 """Processing unit to read HDF5 format files
17 17
18 18 This unit reads HDF5 files created with `HDFWriter` operation contains
19 19 by default two groups Data and Metadata all variables would be saved as `dataOut`
20 20 attributes.
21 21 It is possible to read any HDF5 file by given the structure in the `description`
22 22 parameter, also you can add extra values to metadata with the parameter `extras`.
23 23
24 24 Parameters:
25 25 -----------
26 26 path : str
27 27 Path where files are located.
28 28 startDate : date
29 29 Start date of the files
30 30 endDate : list
31 31 End date of the files
32 32 startTime : time
33 33 Start time of the files
34 34 endTime : time
35 35 End time of the files
36 36 description : dict, optional
37 37 Dictionary with the description of the HDF5 file
38 38 extras : dict, optional
39 39 Dictionary with extra metadata to be be added to `dataOut`
40 40
41 41 Examples
42 42 --------
43 43
44 44 desc = {
45 45 'Data': {
46 46 'data_output': ['u', 'v', 'w'],
47 47 'utctime': 'timestamps',
48 48 } ,
49 49 'Metadata': {
50 50 'heightList': 'heights'
51 51 }
52 52 }
53 53
54 54 desc = {
55 55 'Data': {
56 56 'data_output': 'winds',
57 57 'utctime': 'timestamps'
58 58 },
59 59 'Metadata': {
60 60 'heightList': 'heights'
61 61 }
62 62 }
63 63
64 64 extras = {
65 65 'timeZone': 300
66 66 }
67 67
68 68 reader = project.addReadUnit(
69 69 name='HDFReader',
70 70 path='/path/to/files',
71 71 startDate='2019/01/01',
72 72 endDate='2019/01/31',
73 73 startTime='00:00:00',
74 74 endTime='23:59:59',
75 75 # description=json.dumps(desc),
76 76 # extras=json.dumps(extras),
77 77 )
78 78
79 79 """
80 80
81 81 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras']
82 82
83 83 def __init__(self):
84 84 ProcessingUnit.__init__(self)
85 85 self.dataOut = Parameters()
86 86 self.ext = ".hdf5"
87 87 self.optchar = "D"
88 88 self.meta = {}
89 89 self.data = {}
90 90 self.open_file = h5py.File
91 91 self.open_mode = 'r'
92 92 self.description = {}
93 93 self.extras = {}
94 94 self.filefmt = "*%Y%j***"
95 95 self.folderfmt = "*%Y%j"
96 96 self.utcoffset = 0
97 97
98 98 def setup(self, **kwargs):
99 99
100 100 self.set_kwargs(**kwargs)
101 101 if not self.ext.startswith('.'):
102 102 self.ext = '.{}'.format(self.ext)
103 103
104 104 if self.online:
105 105 log.log("Searching files in online mode...", self.name)
106 106
107 107 for nTries in range(self.nTries):
108 108 fullpath = self.searchFilesOnLine(self.path, self.startDate,
109 109 self.endDate, self.expLabel, self.ext, self.walk,
110 110 self.filefmt, self.folderfmt)
111 111 try:
112 112 fullpath = next(fullpath)
113 113 except:
114 114 fullpath = None
115 115
116 116 if fullpath:
117 117 break
118 118
119 119 log.warning(
120 120 'Waiting {} sec for a valid file in {}: try {} ...'.format(
121 121 self.delay, self.path, nTries + 1),
122 122 self.name)
123 123 time.sleep(self.delay)
124 124
125 125 if not(fullpath):
126 126 raise schainpy.admin.SchainError(
127 127 'There isn\'t any valid file in {}'.format(self.path))
128 128
129 129 pathname, filename = os.path.split(fullpath)
130 130 self.year = int(filename[1:5])
131 131 self.doy = int(filename[5:8])
132 132 self.set = int(filename[8:11]) - 1
133 133 else:
134 134 log.log("Searching files in {}".format(self.path), self.name)
135 135 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
136 136 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
137 137
138 138 self.setNextFile()
139 139
140 140 return
141 141
142 142 def readFirstHeader(self):
143 143 '''Read metadata and data'''
144 144
145 145 self.__readMetadata()
146 146 self.__readData()
147 147 self.__setBlockList()
148 148
149 149 if 'type' in self.meta:
150 150 self.dataOut = eval(self.meta['type'])()
151 151
152 152 for attr in self.meta:
153 153 setattr(self.dataOut, attr, self.meta[attr])
154 154
155 155 self.blockIndex = 0
156 156
157 157 return
158 158
159 159 def __setBlockList(self):
160 160 '''
161 161 Selects the data within the times defined
162 162
163 163 self.fp
164 164 self.startTime
165 165 self.endTime
166 166 self.blockList
167 167 self.blocksPerFile
168 168
169 169 '''
170 170
171 171 startTime = self.startTime
172 172 endTime = self.endTime
173 173 thisUtcTime = self.data['utctime'] + self.utcoffset
174 174 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
175 175 thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0])
176 176
177 177 thisDate = thisDatetime.date()
178 178 thisTime = thisDatetime.time()
179 179
180 180 startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
181 181 endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
182 182
183 183 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
184 184
185 185 self.blockList = ind
186 186 self.blocksPerFile = len(ind)
187 187 return
188 188
189 189 def __readMetadata(self):
190 190 '''
191 191 Reads Metadata
192 192 '''
193 193
194 194 meta = {}
195 195
196 196 if self.description:
197 197 for key, value in self.description['Metadata'].items():
198 198 meta[key] = self.fp[value][()]
199 199 else:
200 200 grp = self.fp['Metadata']
201 201 for name in grp:
202 202 meta[name] = grp[name][()]
203 203
204 204 if self.extras:
205 205 for key, value in self.extras.items():
206 206 meta[key] = value
207 207 self.meta = meta
208 208
209 209 return
210 210
211 211 def __readData(self):
212 212
213 213 data = {}
214 214
215 215 if self.description:
216 216 for key, value in self.description['Data'].items():
217 217 if isinstance(value, str):
218 218 if isinstance(self.fp[value], h5py.Dataset):
219 219 data[key] = self.fp[value][()]
220 220 elif isinstance(self.fp[value], h5py.Group):
221 221 array = []
222 222 for ch in self.fp[value]:
223 223 array.append(self.fp[value][ch][()])
224 224 data[key] = numpy.array(array)
225 225 elif isinstance(value, list):
226 226 array = []
227 227 for ch in value:
228 228 array.append(self.fp[ch][()])
229 229 data[key] = numpy.array(array)
230 230 else:
231 231 grp = self.fp['Data']
232 232 for name in grp:
233 233 if isinstance(grp[name], h5py.Dataset):
234 234 array = grp[name][()]
235 235 elif isinstance(grp[name], h5py.Group):
236 236 array = []
237 237 for ch in grp[name]:
238 238 array.append(grp[name][ch][()])
239 239 array = numpy.array(array)
240 240 else:
241 241 log.warning('Unknown type: {}'.format(name))
242 242
243 243 if name in self.description:
244 244 key = self.description[name]
245 245 else:
246 246 key = name
247 247 data[key] = array
248 248
249 249 self.data = data
250 250 return
251 251
252 252 def getData(self):
253 253
254 254 for attr in self.data:
255 255 if self.data[attr].ndim == 1:
256 256 setattr(self.dataOut, attr, self.data[attr][self.blockIndex])
257 257 else:
258 258 setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex])
259 259
260 260 self.dataOut.flagNoData = False
261 261 self.blockIndex += 1
262 262
263 263 log.log("Block No. {}/{} -> {}".format(
264 264 self.blockIndex,
265 265 self.blocksPerFile,
266 266 self.dataOut.datatime.ctime()), self.name)
267 267
268 268 return
269 269
270 270 def run(self, **kwargs):
271 271
272 272 if not(self.isConfig):
273 273 self.setup(**kwargs)
274 274 self.isConfig = True
275 275
276 276 if self.blockIndex == self.blocksPerFile:
277 277 self.setNextFile()
278 278
279 279 self.getData()
280 280
281 281 return
282 282
283 283 @MPDecorator
284 284 class HDFWriter(Operation):
285 285 """Operation to write HDF5 files.
286 286
287 287 The HDF5 file contains by default two groups Data and Metadata where
288 288 you can save any `dataOut` attribute specified by `dataList` and `metadataList`
289 289 parameters, data attributes are normaly time dependent where the metadata
290 290 are not.
291 291 It is possible to customize the structure of the HDF5 file with the
292 292 optional description parameter see the examples.
293 293
294 294 Parameters:
295 295 -----------
296 296 path : str
297 297 Path where files will be saved.
298 298 blocksPerFile : int
299 299 Number of blocks per file
300 300 metadataList : list
301 301 List of the dataOut attributes that will be saved as metadata
302 302 dataList : int
303 303 List of the dataOut attributes that will be saved as data
304 304 setType : bool
305 305 If True the name of the files corresponds to the timestamp of the data
306 306 description : dict, optional
307 307 Dictionary with the desired description of the HDF5 file
308 308
309 309 Examples
310 310 --------
311 311
312 312 desc = {
313 313 'data_output': {'winds': ['z', 'w', 'v']},
314 314 'utctime': 'timestamps',
315 315 'heightList': 'heights'
316 316 }
317 317 desc = {
318 318 'data_output': ['z', 'w', 'v'],
319 319 'utctime': 'timestamps',
320 320 'heightList': 'heights'
321 321 }
322 322 desc = {
323 323 'Data': {
324 324 'data_output': 'winds',
325 325 'utctime': 'timestamps'
326 326 },
327 327 'Metadata': {
328 328 'heightList': 'heights'
329 329 }
330 330 }
331 331
332 332 writer = proc_unit.addOperation(name='HDFWriter')
333 333 writer.addParameter(name='path', value='/path/to/file')
334 334 writer.addParameter(name='blocksPerFile', value='32')
335 335 writer.addParameter(name='metadataList', value='heightList,timeZone')
336 336 writer.addParameter(name='dataList',value='data_output,utctime')
337 337 # writer.addParameter(name='description',value=json.dumps(desc))
338 338
339 339 """
340 340
341 341 ext = ".hdf5"
342 342 optchar = "D"
343 343 filename = None
344 344 path = None
345 345 setFile = None
346 346 fp = None
347 347 firsttime = True
348 348 #Configurations
349 349 blocksPerFile = None
350 350 blockIndex = None
351 351 dataOut = None
352 352 #Data Arrays
353 353 dataList = None
354 354 metadataList = None
355 355 currentDay = None
356 356 lastTime = None
357 357
358 358 def __init__(self):
359 359
360 360 Operation.__init__(self)
361 361 return
362 362
363 363 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None, description=None, uniqueChannel=False):
364 364 self.path = path
365 365 self.blocksPerFile = blocksPerFile
366 366 self.metadataList = metadataList
367 367 self.dataList = [s.strip() for s in dataList]
368 368 self.setType = setType
369 369 self.description = description
370 370 self.uniqueChannel = uniqueChannel
371 371
372 372 if self.metadataList is None:
373 373 self.metadataList = self.dataOut.metadata_list
374 374
375 375 tableList = []
376 376 dsList = []
377 377
378 378 for i in range(len(self.dataList)):
379 379 dsDict = {}
380 380 if hasattr(self.dataOut, self.dataList[i]):
381 381 dataAux = getattr(self.dataOut, self.dataList[i])
382 382 dsDict['variable'] = self.dataList[i]
383 383 else:
384 384 log.warning('Attribute {} not found in dataOut', self.name)
385 385 continue
386 386
387 387 if dataAux is None:
388 388 continue
389 389 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
390 390 dsDict['nDim'] = 0
391 391 else:
392 392 if uniqueChannel: #Creates extra dimension to avoid the creation of multiple channels
393 dataAux = numpy.expand_dims(dataAux, axis=0)
394
393 #dataAux = numpy.expand_dims(dataAux, axis=0)
394 setattr(self.dataOut, self.dataList[i], numpy.expand_dims(getattr(self.dataOut, self.dataList[i]), axis=0))
395 dataAux = getattr(self.dataOut, self.dataList[i])
396 #print(getattr(self.dataOut, self.dataList[i]))
395 397 dsDict['nDim'] = len(dataAux.shape)
396 398 dsDict['shape'] = dataAux.shape
397 399 dsDict['dsNumber'] = dataAux.shape[0]
398 400 dsDict['dtype'] = dataAux.dtype
399 401
400 402 dsList.append(dsDict)
401 403
402 404 self.dsList = dsList
403 405 self.currentDay = self.dataOut.datatime.date()
404 406
405 407 def timeFlag(self):
406 408 currentTime = self.dataOut.utctime
407 409 timeTuple = time.localtime(currentTime)
408 410 dataDay = timeTuple.tm_yday
409 411
410 412 if self.lastTime is None:
411 413 self.lastTime = currentTime
412 414 self.currentDay = dataDay
413 415 return False
414 416
415 417 timeDiff = currentTime - self.lastTime
416 418
417 419 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
418 420 if dataDay != self.currentDay:
419 421 self.currentDay = dataDay
420 422 return True
421 423 elif timeDiff > 3*60*60:
422 424 self.lastTime = currentTime
423 425 return True
424 426 else:
425 427 self.lastTime = currentTime
426 428 return False
427 429
428 430 def run(self, dataOut, path, blocksPerFile=10, metadataList=None,
429 431 dataList=[], setType=None, description={}, uniqueChannel= False):
430 432
431 433 self.dataOut = dataOut
432 434 if not(self.isConfig):
433 435 self.setup(path=path, blocksPerFile=blocksPerFile,
434 436 metadataList=metadataList, dataList=dataList,
435 437 setType=setType, description=description, uniqueChannel=uniqueChannel)
436 438
437 439 self.isConfig = True
438 440 self.setNextFile()
439 441
440 442 self.putData()
441 443
442 444 return
443 445
444 446 def setNextFile(self):
445 447
446 448 ext = self.ext
447 449 path = self.path
448 450 setFile = self.setFile
449 451
450 452 timeTuple = time.localtime(self.dataOut.utctime)
451 453 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
452 454 fullpath = os.path.join(path, subfolder)
453 455
454 456 if os.path.exists(fullpath):
455 457 filesList = os.listdir(fullpath)
456 458 filesList = [k for k in filesList if k.startswith(self.optchar)]
457 459 if len( filesList ) > 0:
458 460 filesList = sorted(filesList, key=str.lower)
459 461 filen = filesList[-1]
460 462 # el filename debera tener el siguiente formato
461 463 # 0 1234 567 89A BCDE (hex)
462 464 # x YYYY DDD SSS .ext
463 465 if isNumber(filen[8:11]):
464 466 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
465 467 else:
466 468 setFile = -1
467 469 else:
468 470 setFile = -1 #inicializo mi contador de seteo
469 471 else:
470 472 os.makedirs(fullpath)
471 473 setFile = -1 #inicializo mi contador de seteo
472 474
473 475 if self.setType is None:
474 476 setFile += 1
475 477 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
476 478 timeTuple.tm_year,
477 479 timeTuple.tm_yday,
478 480 setFile,
479 481 ext )
480 482 else:
481 483 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
482 484 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
483 485 timeTuple.tm_year,
484 486 timeTuple.tm_yday,
485 487 setFile,
486 488 ext )
487 489
488 490 self.filename = os.path.join( path, subfolder, file )
489 491
490 492 #Setting HDF5 File
491 493 self.fp = h5py.File(self.filename, 'w')
492 494 #write metadata
493 495 self.writeMetadata(self.fp)
494 496 #Write data
495 497 self.writeData(self.fp)
496 498
497 499 def getLabel(self, name, x=None):
498 500 #print("x: ", x)
499 501 if x is None:
500 502 if 'Data' in self.description:
501 503 data = self.description['Data']
502 504 if 'Metadata' in self.description:
503 505 data.update(self.description['Metadata'])
504 506 else:
505 507 data = self.description
506 508 if name in data:
507 509 if isinstance(data[name], str):
508 510 return data[name]
509 511 elif isinstance(data[name], list):
510 512 return None
511 513 elif isinstance(data[name], dict):
512 514 for key, value in data[name].items():
513 515 return key
514 516 return name
515 517 else:
516 518 if 'Metadata' in self.description:
517 519 meta = self.description['Metadata']
518 520 else:
519 521 meta = self.description
520 522 if name in meta:
521 523 if isinstance(meta[name], list):
522 524 return meta[name][x]
523 525 elif isinstance(meta[name], dict):
524 526 for key, value in meta[name].items():
525 527 return value[x]
526 528 if 'cspc' in name:
527 529 return 'pair{:02d}'.format(x)
528 530 else:
529 531 return 'channel{:02d}'.format(x)
530 532
531 533 def writeMetadata(self, fp):
532 534
533 535 if self.description:
534 536 if 'Metadata' in self.description:
535 537 grp = fp.create_group('Metadata')
536 538 else:
537 539 grp = fp
538 540 else:
539 541 grp = fp.create_group('Metadata')
540 542
541 543 for i in range(len(self.metadataList)):
542 544 if not hasattr(self.dataOut, self.metadataList[i]):
543 545 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
544 546 continue
545 547 value = getattr(self.dataOut, self.metadataList[i])
546 548 if isinstance(value, bool):
547 549 if value is True:
548 550 value = 1
549 551 else:
550 552 value = 0
551 553 grp.create_dataset(self.getLabel(self.metadataList[i]), data=value)
552 554 return
553 555
554 556 def writeData(self, fp):
555 557
556 558 if self.description:
557 559 if 'Data' in self.description:
558 560 grp = fp.create_group('Data')
559 561 else:
560 562 grp = fp
561 563 else:
562 564 grp = fp.create_group('Data')
563 565
564 566 dtsets = []
565 567 data = []
566 568 #print("self.dsList: ", self.dsList)
567 569 for dsInfo in self.dsList:
568 570 if dsInfo['nDim'] == 0:
569 571 ds = grp.create_dataset(
570 572 self.getLabel(dsInfo['variable']),
571 573 (self.blocksPerFile, ),
572 574 chunks=True,
573 575 dtype=numpy.float64)
574 576 dtsets.append(ds)
575 577 data.append((dsInfo['variable'], -1))
576 578 else:
577 579 label = self.getLabel(dsInfo['variable'])
578 580 if label is not None:
579 581 sgrp = grp.create_group(label)
580 582 else:
581 583 sgrp = grp
582 584 for i in range(dsInfo['dsNumber']):
583 585 ds = sgrp.create_dataset(
584 586 self.getLabel(dsInfo['variable'], i),
585 587 (self.blocksPerFile, ) + dsInfo['shape'][1:],
586 588 chunks=True,
587 589 dtype=dsInfo['dtype'])
588 590 dtsets.append(ds)
589 591 data.append((dsInfo['variable'], i))
590 592
591 if self.uniqueChannel: #Deletes extra dimension created to avoid the creation of multiple channels
592 dataAux = getattr(self.dataOut, dsInfo['variable'])
593 dataAux = dataAux[0]
594
595 593 fp.flush()
596 594
597 595 log.log('Creating file: {}'.format(fp.filename), self.name)
598 596
599 597 self.ds = dtsets
600 598 self.data = data
601 599 self.firsttime = True
602 600 self.blockIndex = 0
603 601 return
604 602
605 603 def putData(self):
606 604
607 605 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
608 606 self.closeFile()
609 607 self.setNextFile()
610 608
611 609 for i, ds in enumerate(self.ds):
612 610 attr, ch = self.data[i]
613 611 if ch == -1:
614 612 ds[self.blockIndex] = getattr(self.dataOut, attr)
615 613 else:
616 614 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
615 if self.uniqueChannel: #Deletes extra dimension created to avoid the creation of multiple channels
616 setattr(self.dataOut, attr, getattr(self.dataOut, attr)[0])
617 617
618 618 self.fp.flush()
619 619 self.blockIndex += 1
620 620 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
621 621
622 622 return
623 623
624 624 def closeFile(self):
625 625
626 626 if self.blockIndex != self.blocksPerFile:
627 627 for ds in self.ds:
628 628 ds.resize(self.blockIndex, axis=0)
629 629
630 630 if self.fp:
631 631 self.fp.flush()
632 632 self.fp.close()
633 633
634 634 def close(self):
635 635
636 636 self.closeFile()
General Comments 0
You need to be logged in to leave comments. Login now