##// END OF EJS Templates
Chequeo previo de startTime and endTime
Alexander Valdez -
r1687:ec27c2dc20c2
parent child
Show More
@@ -1,626 +1,628
1 1 import os
2 2 import time
3 3 import datetime
4 4
5 5 import numpy
6 6 import h5py
7 7
8 8 import schainpy.admin
9 9 from schainpy.model.data.jrodata import *
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.io.jroIO_base import *
12 12 from schainpy.utils import log
13 13
14 14
15 15 class HDFReader(Reader, ProcessingUnit):
16 16 """Processing unit to read HDF5 format files
17 17
18 18 This unit reads HDF5 files created with `HDFWriter` operation contains
19 19 by default two groups Data and Metadata all variables would be saved as `dataOut`
20 20 attributes.
21 21 It is possible to read any HDF5 file by given the structure in the `description`
22 22 parameter, also you can add extra values to metadata with the parameter `extras`.
23 23
24 24 Parameters:
25 25 -----------
26 26 path : str
27 27 Path where files are located.
28 28 startDate : date
29 29 Start date of the files
30 30 endDate : list
31 31 End date of the files
32 32 startTime : time
33 33 Start time of the files
34 34 endTime : time
35 35 End time of the files
36 36 description : dict, optional
37 37 Dictionary with the description of the HDF5 file
38 38 extras : dict, optional
39 39 Dictionary with extra metadata to be be added to `dataOut`
40 40
41 41 Examples
42 42 --------
43 43
44 44 desc = {
45 45 'Data': {
46 46 'data_output': ['u', 'v', 'w'],
47 47 'utctime': 'timestamps',
48 48 } ,
49 49 'Metadata': {
50 50 'heightList': 'heights'
51 51 }
52 52 }
53 53
54 54 desc = {
55 55 'Data': {
56 56 'data_output': 'winds',
57 57 'utctime': 'timestamps'
58 58 },
59 59 'Metadata': {
60 60 'heightList': 'heights'
61 61 }
62 62 }
63 63
64 64 extras = {
65 65 'timeZone': 300
66 66 }
67 67
68 68 reader = project.addReadUnit(
69 69 name='HDFReader',
70 70 path='/path/to/files',
71 71 startDate='2019/01/01',
72 72 endDate='2019/01/31',
73 73 startTime='00:00:00',
74 74 endTime='23:59:59',
75 75 # description=json.dumps(desc),
76 76 # extras=json.dumps(extras),
77 77 )
78 78
79 79 """
80 80
81 81 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras']
82 82
83 83 def __init__(self):
84 84 ProcessingUnit.__init__(self)
85 85 self.dataOut = Parameters()
86 86 self.ext = ".hdf5"
87 87 self.optchar = "D"
88 88 self.meta = {}
89 89 self.data = {}
90 90 self.open_file = h5py.File
91 91 self.open_mode = 'r'
92 92 self.description = {}
93 93 self.extras = {}
94 94 self.filefmt = "*%Y%j***"
95 95 self.folderfmt = "*%Y%j"
96 96 self.utcoffset = 0
97 97
98 98 def setup(self, **kwargs):
99 99
100 100 self.set_kwargs(**kwargs)
101 101 if not self.ext.startswith('.'):
102 102 self.ext = '.{}'.format(self.ext)
103 103
104 104 if self.online:
105 105 log.log("Searching files in online mode...", self.name)
106 106
107 107 for nTries in range(self.nTries):
108 108 fullpath = self.searchFilesOnLine(self.path, self.startDate,
109 109 self.endDate, self.expLabel, self.ext, self.walk,
110 110 self.filefmt, self.folderfmt)
111 111 try:
112 112 fullpath = next(fullpath)
113 113 except:
114 114 fullpath = None
115 115
116 116 if fullpath:
117 117 break
118 118
119 119 log.warning(
120 120 'Waiting {} sec for a valid file in {}: try {} ...'.format(
121 121 self.delay, self.path, nTries + 1),
122 122 self.name)
123 123 time.sleep(self.delay)
124 124
125 125 if not(fullpath):
126 126 raise schainpy.admin.SchainError(
127 127 'There isn\'t any valid file in {}'.format(self.path))
128 128
129 129 pathname, filename = os.path.split(fullpath)
130 130 self.year = int(filename[1:5])
131 131 self.doy = int(filename[5:8])
132 132 self.set = int(filename[8:11]) - 1
133 133 else:
134 134 log.log("Searching files in {}".format(self.path), self.name)
135 135 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
136 136 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
137 137
138 138 self.setNextFile()
139 139
140 140 return
141 141
142 142 def readFirstHeader(self):
143 143 '''Read metadata and data'''
144 144
145 145 self.__readMetadata()
146 146 self.__readData()
147 147 self.__setBlockList()
148 148
149 149 if 'type' in self.meta:
150 150 self.dataOut = eval(self.meta['type'])()
151 151
152 152 for attr in self.meta:
153 153 setattr(self.dataOut, attr, self.meta[attr])
154 154
155 155 self.blockIndex = 0
156 156
157 157 return
158 158
159 159 def __setBlockList(self):
160 160 '''
161 161 Selects the data within the times defined
162 162
163 163 self.fp
164 164 self.startTime
165 165 self.endTime
166 166 self.blockList
167 167 self.blocksPerFile
168 168
169 169 '''
170 170
171 171 startTime = self.startTime
172 172 endTime = self.endTime
173 173 thisUtcTime = self.data['utctime'] + self.utcoffset
174
174 175 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
175 176 thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0])
176 177
177 178 thisDate = thisDatetime.date()
178 179 thisTime = thisDatetime.time()
179
180 180 startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
181 181 endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
182
182
183 183 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
184 if len(ind)==0:
185 raise schainpy.admin.SchainError("[Reading] Date time selected invalid [%s - %s]: No *%s files in %s)" % (startTime, endTime, self.ext, self.path))
184 186
185 187 self.blockList = ind
186 188 self.blocksPerFile = len(ind)
187 189 return
188 190
189 191 def __readMetadata(self):
190 192 '''
191 193 Reads Metadata
192 194 '''
193 195
194 196 meta = {}
195 197
196 198 if self.description:
197 199 for key, value in self.description['Metadata'].items():
198 200 meta[key] = self.fp[value][()]
199 201 else:
200 202 grp = self.fp['Metadata']
201 203 for name in grp:
202 204 meta[name] = grp[name][()]
203 205
204 206 if self.extras:
205 207 for key, value in self.extras.items():
206 208 meta[key] = value
207 209 self.meta = meta
208 210
209 211 return
210 212
211 213 def __readData(self):
212 214
213 215 data = {}
214 216
215 217 if self.description:
216 218 for key, value in self.description['Data'].items():
217 219 if isinstance(value, str):
218 220 if isinstance(self.fp[value], h5py.Dataset):
219 221 data[key] = self.fp[value][()]
220 222 elif isinstance(self.fp[value], h5py.Group):
221 223 array = []
222 224 for ch in self.fp[value]:
223 225 array.append(self.fp[value][ch][()])
224 226 data[key] = numpy.array(array)
225 227 elif isinstance(value, list):
226 228 array = []
227 229 for ch in value:
228 230 array.append(self.fp[ch][()])
229 231 data[key] = numpy.array(array)
230 232 else:
231 233 grp = self.fp['Data']
232 234 for name in grp:
233 235 if isinstance(grp[name], h5py.Dataset):
234 236 array = grp[name][()]
235 237 elif isinstance(grp[name], h5py.Group):
236 238 array = []
237 239 for ch in grp[name]:
238 240 array.append(grp[name][ch][()])
239 241 array = numpy.array(array)
240 242 else:
241 243 log.warning('Unknown type: {}'.format(name))
242 244
243 245 if name in self.description:
244 246 key = self.description[name]
245 247 else:
246 248 key = name
247 249 data[key] = array
248 250
249 251 self.data = data
250 252 return
251 253
252 254 def getData(self):
253 255
254 256 for attr in self.data:
255 257 if self.data[attr].ndim == 1:
256 258 setattr(self.dataOut, attr, self.data[attr][self.blockIndex])
257 259 else:
258 260 setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex])
259 261
260 262 self.dataOut.flagNoData = False
261 263 self.blockIndex += 1
262 264
263 265 log.log("Block No. {}/{} -> {}".format(
264 266 self.blockIndex,
265 267 self.blocksPerFile,
266 268 self.dataOut.datatime.ctime()), self.name)
267 269
268 270 return
269 271
270 272 def run(self, **kwargs):
271 273
272 274 if not(self.isConfig):
273 275 self.setup(**kwargs)
274 276 self.isConfig = True
275 277
276 278 if self.blockIndex == self.blocksPerFile:
277 279 self.setNextFile()
278 280
279 281 self.getData()
280 282
281 283 return
282 284
283 285 @MPDecorator
284 286 class HDFWriter(Operation):
285 287 """Operation to write HDF5 files.
286 288
287 289 The HDF5 file contains by default two groups Data and Metadata where
288 290 you can save any `dataOut` attribute specified by `dataList` and `metadataList`
289 291 parameters, data attributes are normaly time dependent where the metadata
290 292 are not.
291 293 It is possible to customize the structure of the HDF5 file with the
292 294 optional description parameter see the examples.
293 295
294 296 Parameters:
295 297 -----------
296 298 path : str
297 299 Path where files will be saved.
298 300 blocksPerFile : int
299 301 Number of blocks per file
300 302 metadataList : list
301 303 List of the dataOut attributes that will be saved as metadata
302 304 dataList : int
303 305 List of the dataOut attributes that will be saved as data
304 306 setType : bool
305 307 If True the name of the files corresponds to the timestamp of the data
306 308 description : dict, optional
307 309 Dictionary with the desired description of the HDF5 file
308 310
309 311 Examples
310 312 --------
311 313
312 314 desc = {
313 315 'data_output': {'winds': ['z', 'w', 'v']},
314 316 'utctime': 'timestamps',
315 317 'heightList': 'heights'
316 318 }
317 319 desc = {
318 320 'data_output': ['z', 'w', 'v'],
319 321 'utctime': 'timestamps',
320 322 'heightList': 'heights'
321 323 }
322 324 desc = {
323 325 'Data': {
324 326 'data_output': 'winds',
325 327 'utctime': 'timestamps'
326 328 },
327 329 'Metadata': {
328 330 'heightList': 'heights'
329 331 }
330 332 }
331 333
332 334 writer = proc_unit.addOperation(name='HDFWriter')
333 335 writer.addParameter(name='path', value='/path/to/file')
334 336 writer.addParameter(name='blocksPerFile', value='32')
335 337 writer.addParameter(name='metadataList', value='heightList,timeZone')
336 338 writer.addParameter(name='dataList',value='data_output,utctime')
337 339 # writer.addParameter(name='description',value=json.dumps(desc))
338 340
339 341 """
340 342
341 343 ext = ".hdf5"
342 344 optchar = "D"
343 345 filename = None
344 346 path = None
345 347 setFile = None
346 348 fp = None
347 349 firsttime = True
348 350 #Configurations
349 351 blocksPerFile = None
350 352 blockIndex = None
351 353 dataOut = None
352 354 #Data Arrays
353 355 dataList = None
354 356 metadataList = None
355 357 currentDay = None
356 358 lastTime = None
357 359
358 360 def __init__(self):
359 361
360 362 Operation.__init__(self)
361 363 return
362 364
363 365 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None, description=None):
364 366 self.path = path
365 367 self.blocksPerFile = blocksPerFile
366 368 self.metadataList = metadataList
367 369 self.dataList = [s.strip() for s in dataList]
368 370 self.setType = setType
369 371 self.description = description
370 372
371 373 if self.metadataList is None:
372 374 self.metadataList = self.dataOut.metadata_list
373 375
374 376 tableList = []
375 377 dsList = []
376 378
377 379 for i in range(len(self.dataList)):
378 380 dsDict = {}
379 381 if hasattr(self.dataOut, self.dataList[i]):
380 382 dataAux = getattr(self.dataOut, self.dataList[i])
381 383 dsDict['variable'] = self.dataList[i]
382 384 else:
383 385 log.warning('Attribute {} not found in dataOut', self.name)
384 386 continue
385 387
386 388 if dataAux is None:
387 389 continue
388 390 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
389 391 dsDict['nDim'] = 0
390 392 else:
391 393 dsDict['nDim'] = len(dataAux.shape)
392 394 dsDict['shape'] = dataAux.shape
393 395 dsDict['dsNumber'] = dataAux.shape[0]
394 396 dsDict['dtype'] = dataAux.dtype
395 397
396 398 dsList.append(dsDict)
397 399
398 400 self.dsList = dsList
399 401 self.currentDay = self.dataOut.datatime.date()
400 402
401 403 def timeFlag(self):
402 404 currentTime = self.dataOut.utctime
403 405 timeTuple = time.localtime(currentTime)
404 406 dataDay = timeTuple.tm_yday
405 407
406 408 if self.lastTime is None:
407 409 self.lastTime = currentTime
408 410 self.currentDay = dataDay
409 411 return False
410 412
411 413 timeDiff = currentTime - self.lastTime
412 414
413 415 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
414 416 if dataDay != self.currentDay:
415 417 self.currentDay = dataDay
416 418 return True
417 419 elif timeDiff > 3*60*60:
418 420 self.lastTime = currentTime
419 421 return True
420 422 else:
421 423 self.lastTime = currentTime
422 424 return False
423 425
424 426 def run(self, dataOut, path, blocksPerFile=10, metadataList=None,
425 427 dataList=[], setType=None, description={}):
426 428
427 429 self.dataOut = dataOut
428 430 if not(self.isConfig):
429 431 self.setup(path=path, blocksPerFile=blocksPerFile,
430 432 metadataList=metadataList, dataList=dataList,
431 433 setType=setType, description=description)
432 434
433 435 self.isConfig = True
434 436 self.setNextFile()
435 437
436 438 self.putData()
437 439 return
438 440
439 441 def setNextFile(self):
440 442
441 443 ext = self.ext
442 444 path = self.path
443 445 setFile = self.setFile
444 446
445 447 timeTuple = time.localtime(self.dataOut.utctime)
446 448 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
447 449 fullpath = os.path.join(path, subfolder)
448 450
449 451 if os.path.exists(fullpath):
450 452 filesList = os.listdir(fullpath)
451 453 filesList = [k for k in filesList if k.startswith(self.optchar)]
452 454 if len( filesList ) > 0:
453 455 filesList = sorted(filesList, key=str.lower)
454 456 filen = filesList[-1]
455 457 # el filename debera tener el siguiente formato
456 458 # 0 1234 567 89A BCDE (hex)
457 459 # x YYYY DDD SSS .ext
458 460 if isNumber(filen[8:11]):
459 461 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
460 462 else:
461 463 setFile = -1
462 464 else:
463 465 setFile = -1 #inicializo mi contador de seteo
464 466 else:
465 467 os.makedirs(fullpath)
466 468 setFile = -1 #inicializo mi contador de seteo
467 469
468 470 if self.setType is None:
469 471 setFile += 1
470 472 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
471 473 timeTuple.tm_year,
472 474 timeTuple.tm_yday,
473 475 setFile,
474 476 ext )
475 477 else:
476 478 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
477 479 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
478 480 timeTuple.tm_year,
479 481 timeTuple.tm_yday,
480 482 setFile,
481 483 ext )
482 484
483 485 self.filename = os.path.join( path, subfolder, file )
484 486
485 487 #Setting HDF5 File
486 488 self.fp = h5py.File(self.filename, 'w')
487 489 #write metadata
488 490 self.writeMetadata(self.fp)
489 491 #Write data
490 492 self.writeData(self.fp)
491 493
492 494 def getLabel(self, name, x=None):
493 495
494 496 if x is None:
495 497 if 'Data' in self.description:
496 498 data = self.description['Data']
497 499 if 'Metadata' in self.description:
498 500 data.update(self.description['Metadata'])
499 501 else:
500 502 data = self.description
501 503 if name in data:
502 504 if isinstance(data[name], str):
503 505 return data[name]
504 506 elif isinstance(data[name], list):
505 507 return None
506 508 elif isinstance(data[name], dict):
507 509 for key, value in data[name].items():
508 510 return key
509 511 return name
510 512 else:
511 513 if 'Metadata' in self.description:
512 514 meta = self.description['Metadata']
513 515 else:
514 516 meta = self.description
515 517 if name in meta:
516 518 if isinstance(meta[name], list):
517 519 return meta[name][x]
518 520 elif isinstance(meta[name], dict):
519 521 for key, value in meta[name].items():
520 522 return value[x]
521 523 if 'cspc' in name:
522 524 return 'pair{:02d}'.format(x)
523 525 else:
524 526 return 'channel{:02d}'.format(x)
525 527
526 528 def writeMetadata(self, fp):
527 529
528 530 if self.description:
529 531 if 'Metadata' in self.description:
530 532 grp = fp.create_group('Metadata')
531 533 else:
532 534 grp = fp
533 535 else:
534 536 grp = fp.create_group('Metadata')
535 537
536 538 for i in range(len(self.metadataList)):
537 539 if not hasattr(self.dataOut, self.metadataList[i]):
538 540 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
539 541 continue
540 542 value = getattr(self.dataOut, self.metadataList[i])
541 543 if isinstance(value, bool):
542 544 if value is True:
543 545 value = 1
544 546 else:
545 547 value = 0
546 548 grp.create_dataset(self.getLabel(self.metadataList[i]), data=value)
547 549 return
548 550
549 551 def writeData(self, fp):
550 552
551 553 if self.description:
552 554 if 'Data' in self.description:
553 555 grp = fp.create_group('Data')
554 556 else:
555 557 grp = fp
556 558 else:
557 559 grp = fp.create_group('Data')
558 560
559 561 dtsets = []
560 562 data = []
561 563
562 564 for dsInfo in self.dsList:
563 565 if dsInfo['nDim'] == 0:
564 566 ds = grp.create_dataset(
565 567 self.getLabel(dsInfo['variable']),
566 568 (self.blocksPerFile, ),
567 569 chunks=True,
568 570 dtype=numpy.float64)
569 571 dtsets.append(ds)
570 572 data.append((dsInfo['variable'], -1))
571 573 else:
572 574 label = self.getLabel(dsInfo['variable'])
573 575 if label is not None:
574 576 sgrp = grp.create_group(label)
575 577 else:
576 578 sgrp = grp
577 579 for i in range(dsInfo['dsNumber']):
578 580 ds = sgrp.create_dataset(
579 581 self.getLabel(dsInfo['variable'], i),
580 582 (self.blocksPerFile, ) + dsInfo['shape'][1:],
581 583 chunks=True,
582 584 dtype=dsInfo['dtype'])
583 585 dtsets.append(ds)
584 586 data.append((dsInfo['variable'], i))
585 587 fp.flush()
586 588
587 589 log.log('Creating file: {}'.format(fp.filename), self.name)
588 590
589 591 self.ds = dtsets
590 592 self.data = data
591 593 self.firsttime = True
592 594 self.blockIndex = 0
593 595 return
594 596
595 597 def putData(self):
596 598
597 599 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
598 600 self.closeFile()
599 601 self.setNextFile()
600 602
601 603 for i, ds in enumerate(self.ds):
602 604 attr, ch = self.data[i]
603 605 if ch == -1:
604 606 ds[self.blockIndex] = getattr(self.dataOut, attr)
605 607 else:
606 608 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
607 609
608 610 self.fp.flush()
609 611 self.blockIndex += 1
610 612 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
611 613
612 614 return
613 615
614 616 def closeFile(self):
615 617
616 618 if self.blockIndex != self.blocksPerFile:
617 619 for ds in self.ds:
618 620 ds.resize(self.blockIndex, axis=0)
619 621
620 622 if self.fp:
621 623 self.fp.flush()
622 624 self.fp.close()
623 625
624 626 def close(self):
625 627
626 628 self.closeFile()
General Comments 0
You need to be logged in to leave comments. Login now