##// END OF EJS Templates
Reading Unit for Madrigal decorated (just for python2X)
George Yong -
r1189:67ce1e63c533
parent child
Show More
@@ -1,642 +1,642
1 1 '''
2 2 Created on Aug 1, 2017
3 3
4 4 @author: Juan C. Espinoza
5 5 '''
6 6
7 7 import os
8 8 import sys
9 9 import time
10 10 import json
11 11 import glob
12 12 import datetime
13 13
14 14 import numpy
15 15 import h5py
16 16
17 17 from schainpy.model.io.jroIO_base import JRODataReader
18 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
18 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
19 19 from schainpy.model.data.jrodata import Parameters
20 20 from schainpy.utils import log
21 21
22 22 try:
23 23 import madrigal.cedar
24 24 except:
25 25 log.warning(
26 26 'You should install "madrigal library" module if you want to read/write Madrigal data'
27 27 )
28 28
29 29 DEF_CATALOG = {
30 30 'principleInvestigator': 'Marco Milla',
31 31 'expPurpose': None,
32 32 'cycleTime': None,
33 33 'correlativeExp': None,
34 34 'sciRemarks': None,
35 35 'instRemarks': None
36 36 }
37 37 DEF_HEADER = {
38 38 'kindatDesc': None,
39 39 'analyst': 'Jicamarca User',
40 40 'comments': None,
41 41 'history': None
42 42 }
43 43 MNEMONICS = {
44 44 10: 'jro',
45 45 11: 'jbr',
46 46 840: 'jul',
47 47 13: 'jas',
48 48 1000: 'pbr',
49 49 1001: 'hbr',
50 50 1002: 'obr',
51 51 }
52 52
53 53 UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
54 54
55 55 def load_json(obj):
56 56 '''
57 57 Parse json as string instead of unicode
58 58 '''
59 59
60 60 if isinstance(obj, str):
61 61 iterable = json.loads(obj)
62 62 else:
63 63 iterable = obj
64 64
65 65 if isinstance(iterable, dict):
66 66 return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, str) else v
67 67 for k, v in list(iterable.items())}
68 68 elif isinstance(iterable, (list, tuple)):
69 69 return [str(v) if isinstance(v, str) else v for v in iterable]
70 70
71 71 return iterable
72 72
73
73 @MPDecorator
74 74 class MADReader(JRODataReader, ProcessingUnit):
75 75
76 def __init__(self, **kwargs):
76 def __init__(self):
77 77
78 ProcessingUnit.__init__(self, **kwargs)
78 ProcessingUnit.__init__(self)
79 79
80 80 self.dataOut = Parameters()
81 81 self.counter_records = 0
82 82 self.nrecords = None
83 83 self.flagNoMoreFiles = 0
84 84 self.isConfig = False
85 85 self.filename = None
86 86 self.intervals = set()
87 87
88 88 def setup(self,
89 89 path=None,
90 90 startDate=None,
91 91 endDate=None,
92 92 format=None,
93 93 startTime=datetime.time(0, 0, 0),
94 94 endTime=datetime.time(23, 59, 59),
95 95 **kwargs):
96 96
97 97 self.path = path
98 98 self.startDate = startDate
99 99 self.endDate = endDate
100 100 self.startTime = startTime
101 101 self.endTime = endTime
102 102 self.datatime = datetime.datetime(1900,1,1)
103 103 self.oneDDict = load_json(kwargs.get('oneDDict',
104 104 "{\"GDLATR\":\"lat\", \"GDLONR\":\"lon\"}"))
105 105 self.twoDDict = load_json(kwargs.get('twoDDict',
106 106 "{\"GDALT\": \"heightList\"}"))
107 107 self.ind2DList = load_json(kwargs.get('ind2DList',
108 108 "[\"GDALT\"]"))
109 109 if self.path is None:
110 110 raise ValueError('The path is not valid')
111 111
112 112 if format is None:
113 113 raise ValueError('The format is not valid choose simple or hdf5')
114 114 elif format.lower() in ('simple', 'txt'):
115 115 self.ext = '.txt'
116 116 elif format.lower() in ('cedar',):
117 117 self.ext = '.001'
118 118 else:
119 119 self.ext = '.hdf5'
120 120
121 121 self.search_files(self.path)
122 122 self.fileId = 0
123 123
124 124 if not self.fileList:
125 125 raise Warning('There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path))
126 126
127 127 self.setNextFile()
128 128
129 129 def search_files(self, path):
130 130 '''
131 131 Searching for madrigal files in path
132 132 Creating a list of files to procces included in [startDate,endDate]
133 133
134 134 Input:
135 135 path - Path to find files
136 136 '''
137 137
138 138 log.log('Searching files {} in {} '.format(self.ext, path), 'MADReader')
139 139 foldercounter = 0
140 140 fileList0 = glob.glob1(path, '*{}'.format(self.ext))
141 141 fileList0.sort()
142 142
143 143 self.fileList = []
144 144 self.dateFileList = []
145 145
146 146 startDate = self.startDate - datetime.timedelta(1)
147 147 endDate = self.endDate + datetime.timedelta(1)
148 148
149 149 for thisFile in fileList0:
150 150 year = thisFile[3:7]
151 151 if not year.isdigit():
152 152 continue
153 153
154 154 month = thisFile[7:9]
155 155 if not month.isdigit():
156 156 continue
157 157
158 158 day = thisFile[9:11]
159 159 if not day.isdigit():
160 160 continue
161 161
162 162 year, month, day = int(year), int(month), int(day)
163 163 dateFile = datetime.date(year, month, day)
164 164
165 165 if (startDate > dateFile) or (endDate < dateFile):
166 166 continue
167 167
168 168 self.fileList.append(thisFile)
169 169 self.dateFileList.append(dateFile)
170 170
171 171 return
172 172
173 173 def parseHeader(self):
174 174 '''
175 175 '''
176 176
177 177 self.output = {}
178 178 self.version = '2'
179 179 s_parameters = None
180 180 if self.ext == '.txt':
181 181 self.parameters = [s.strip().lower() for s in self.fp.readline().strip().split(' ') if s]
182 182 elif self.ext == '.hdf5':
183 183 metadata = self.fp['Metadata']
184 184 data = self.fp['Data']['Array Layout']
185 185 if 'Independent Spatial Parameters' in metadata:
186 186 s_parameters = [s[0].lower() for s in metadata['Independent Spatial Parameters']]
187 187 self.version = '3'
188 188 one = [s[0].lower() for s in data['1D Parameters']['Data Parameters']]
189 189 one_d = [1 for s in one]
190 190 two = [s[0].lower() for s in data['2D Parameters']['Data Parameters']]
191 191 two_d = [2 for s in two]
192 192 self.parameters = one + two
193 193 self.parameters_d = one_d + two_d
194 194
195 195 log.success('Parameters found: {}'.format(','.join(self.parameters)),
196 196 'MADReader')
197 197 if s_parameters:
198 198 log.success('Spatial parameters: {}'.format(','.join(s_parameters)),
199 199 'MADReader')
200 200
201 201 for param in list(self.oneDDict.keys()):
202 202 if param.lower() not in self.parameters:
203 203 log.warning(
204 204 'Parameter {} not found will be ignored'.format(
205 205 param),
206 206 'MADReader')
207 207 self.oneDDict.pop(param, None)
208 208
209 209 for param, value in list(self.twoDDict.items()):
210 210 if param.lower() not in self.parameters:
211 211 log.warning(
212 212 'Parameter {} not found, it will be ignored'.format(
213 213 param),
214 214 'MADReader')
215 215 self.twoDDict.pop(param, None)
216 216 continue
217 217 if isinstance(value, list):
218 218 if value[0] not in self.output:
219 219 self.output[value[0]] = []
220 220 self.output[value[0]].append(None)
221 221
222 222 def parseData(self):
223 223 '''
224 224 '''
225 225
226 226 if self.ext == '.txt':
227 227 self.data = numpy.genfromtxt(self.fp, missing_values=('missing'))
228 228 self.nrecords = self.data.shape[0]
229 229 self.ranges = numpy.unique(self.data[:,self.parameters.index(self.ind2DList[0].lower())])
230 230 elif self.ext == '.hdf5':
231 231 self.data = self.fp['Data']['Array Layout']
232 232 self.nrecords = len(self.data['timestamps'].value)
233 233 self.ranges = self.data['range'].value
234 234
235 235 def setNextFile(self):
236 236 '''
237 237 '''
238 238
239 239 file_id = self.fileId
240 240
241 241 if file_id == len(self.fileList):
242 242 log.success('No more files', 'MADReader')
243 243 self.flagNoMoreFiles = 1
244 244 return 0
245 245
246 246 log.success(
247 247 'Opening: {}'.format(self.fileList[file_id]),
248 248 'MADReader'
249 249 )
250 250
251 251 filename = os.path.join(self.path, self.fileList[file_id])
252 252
253 253 if self.filename is not None:
254 254 self.fp.close()
255 255
256 256 self.filename = filename
257 257 self.filedate = self.dateFileList[file_id]
258 258
259 259 if self.ext=='.hdf5':
260 260 self.fp = h5py.File(self.filename, 'r')
261 261 else:
262 262 self.fp = open(self.filename, 'rb')
263 263
264 264 self.parseHeader()
265 265 self.parseData()
266 266 self.sizeOfFile = os.path.getsize(self.filename)
267 267 self.counter_records = 0
268 268 self.flagIsNewFile = 0
269 269 self.fileId += 1
270 270
271 271 return 1
272 272
273 273 def readNextBlock(self):
274 274
275 275 while True:
276 276 self.flagDiscontinuousBlock = 0
277 277 if self.flagIsNewFile:
278 278 if not self.setNextFile():
279 279 return 0
280 280
281 281 self.readBlock()
282 282
283 283 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
284 284 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
285 285 log.warning(
286 286 'Reading Record No. {}/{} -> {} [Skipping]'.format(
287 287 self.counter_records,
288 288 self.nrecords,
289 289 self.datatime.ctime()),
290 290 'MADReader')
291 291 continue
292 292 break
293 293
294 294 log.log(
295 295 'Reading Record No. {}/{} -> {}'.format(
296 296 self.counter_records,
297 297 self.nrecords,
298 298 self.datatime.ctime()),
299 299 'MADReader')
300 300
301 301 return 1
302 302
303 303 def readBlock(self):
304 304 '''
305 305 '''
306 306 dum = []
307 307 if self.ext == '.txt':
308 308 dt = self.data[self.counter_records][:6].astype(int)
309 309 if datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5]).date() > self.datatime.date():
310 310 self.flagDiscontinuousBlock = 1
311 311 self.datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
312 312 while True:
313 313 dt = self.data[self.counter_records][:6].astype(int)
314 314 datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
315 315 if datatime == self.datatime:
316 316 dum.append(self.data[self.counter_records])
317 317 self.counter_records += 1
318 318 if self.counter_records == self.nrecords:
319 319 self.flagIsNewFile = True
320 320 break
321 321 continue
322 322 self.intervals.add((datatime-self.datatime).seconds)
323 323 break
324 324 elif self.ext == '.hdf5':
325 325 datatime = datetime.datetime.utcfromtimestamp(
326 326 self.data['timestamps'][self.counter_records])
327 327 nHeights = len(self.ranges)
328 328 for n, param in enumerate(self.parameters):
329 329 if self.parameters_d[n] == 1:
330 330 dum.append(numpy.ones(nHeights)*self.data['1D Parameters'][param][self.counter_records])
331 331 else:
332 332 if self.version == '2':
333 333 dum.append(self.data['2D Parameters'][param][self.counter_records])
334 334 else:
335 335 tmp = self.data['2D Parameters'][param].value.T
336 336 dum.append(tmp[self.counter_records])
337 337 self.intervals.add((datatime-self.datatime).seconds)
338 338 if datatime.date()>self.datatime.date():
339 339 self.flagDiscontinuousBlock = 1
340 340 self.datatime = datatime
341 341 self.counter_records += 1
342 342 if self.counter_records == self.nrecords:
343 343 self.flagIsNewFile = True
344 344
345 345 self.buffer = numpy.array(dum)
346 346 return
347 347
348 348 def set_output(self):
349 349 '''
350 350 Storing data from buffer to dataOut object
351 351 '''
352 352
353 353 parameters = [None for __ in self.parameters]
354 354
355 355 for param, attr in list(self.oneDDict.items()):
356 356 x = self.parameters.index(param.lower())
357 357 setattr(self.dataOut, attr, self.buffer[0][x])
358 358
359 359 for param, value in list(self.twoDDict.items()):
360 360 x = self.parameters.index(param.lower())
361 361 if self.ext == '.txt':
362 362 y = self.parameters.index(self.ind2DList[0].lower())
363 363 ranges = self.buffer[:,y]
364 364 if self.ranges.size == ranges.size:
365 365 continue
366 366 index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
367 367 dummy = numpy.zeros(self.ranges.shape) + numpy.nan
368 368 dummy[index] = self.buffer[:,x]
369 369 else:
370 370 dummy = self.buffer[x]
371 371
372 372 if isinstance(value, str):
373 373 if value not in self.ind2DList:
374 374 setattr(self.dataOut, value, dummy.reshape(1,-1))
375 375 elif isinstance(value, list):
376 376 self.output[value[0]][value[1]] = dummy
377 377 parameters[value[1]] = param
378 378
379 379 for key, value in list(self.output.items()):
380 380 setattr(self.dataOut, key, numpy.array(value))
381 381
382 382 self.dataOut.parameters = [s for s in parameters if s]
383 383 self.dataOut.heightList = self.ranges
384 384 self.dataOut.utctime = (self.datatime - datetime.datetime(1970, 1, 1)).total_seconds()
385 385 self.dataOut.utctimeInit = self.dataOut.utctime
386 386 self.dataOut.paramInterval = min(self.intervals)
387 387 self.dataOut.useLocalTime = False
388 388 self.dataOut.flagNoData = False
389 389 self.dataOut.nrecords = self.nrecords
390 390 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
391 391
392 392 def getData(self):
393 393 '''
394 394 Storing data from databuffer to dataOut object
395 395 '''
396 396 if self.flagNoMoreFiles:
397 397 self.dataOut.flagNoData = True
398 398 log.error('No file left to process', 'MADReader')
399 399 return 0
400 400
401 401 if not self.readNextBlock():
402 402 self.dataOut.flagNoData = True
403 403 return 0
404 404
405 405 self.set_output()
406 406
407 407 return 1
408 408
409 409
410 410 class MADWriter(Operation):
411 411
412 412 missing = -32767
413 413
414 414 def __init__(self, **kwargs):
415 415
416 416 Operation.__init__(self, **kwargs)
417 417 self.dataOut = Parameters()
418 418 self.counter = 0
419 419 self.path = None
420 420 self.fp = None
421 421
422 422 def run(self, dataOut, path, oneDDict, ind2DList='[]', twoDDict='{}',
423 423 metadata='{}', format='cedar', **kwargs):
424 424 '''
425 425 Inputs:
426 426 path - path where files will be created
427 427 oneDDict - json of one-dimensional parameters in record where keys
428 428 are Madrigal codes (integers or mnemonics) and values the corresponding
429 429 dataOut attribute e.g: {
430 430 'gdlatr': 'lat',
431 431 'gdlonr': 'lon',
432 432 'gdlat2':'lat',
433 433 'glon2':'lon'}
434 434 ind2DList - list of independent spatial two-dimensional parameters e.g:
435 435 ['heighList']
436 436 twoDDict - json of two-dimensional parameters in record where keys
437 437 are Madrigal codes (integers or mnemonics) and values the corresponding
438 438 dataOut attribute if multidimensional array specify as tupple
439 439 ('attr', pos) e.g: {
440 440 'gdalt': 'heightList',
441 441 'vn1p2': ('data_output', 0),
442 442 'vn2p2': ('data_output', 1),
443 443 'vn3': ('data_output', 2),
444 444 'snl': ('data_SNR', 'db')
445 445 }
446 446 metadata - json of madrigal metadata (kinst, kindat, catalog and header)
447 447 '''
448 448 if not self.isConfig:
449 449 self.setup(path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs)
450 450 self.isConfig = True
451 451
452 452 self.dataOut = dataOut
453 453 self.putData()
454 454 return
455 455
456 456 def setup(self, path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs):
457 457 '''
458 458 Configure Operation
459 459 '''
460 460
461 461 self.path = path
462 462 self.blocks = kwargs.get('blocks', None)
463 463 self.counter = 0
464 464 self.oneDDict = load_json(oneDDict)
465 465 self.twoDDict = load_json(twoDDict)
466 466 self.ind2DList = load_json(ind2DList)
467 467 meta = load_json(metadata)
468 468 self.kinst = meta.get('kinst')
469 469 self.kindat = meta.get('kindat')
470 470 self.catalog = meta.get('catalog', DEF_CATALOG)
471 471 self.header = meta.get('header', DEF_HEADER)
472 472 if format == 'cedar':
473 473 self.ext = '.dat'
474 474 self.extra_args = {}
475 475 elif format == 'hdf5':
476 476 self.ext = '.hdf5'
477 477 self.extra_args = {'ind2DList': self.ind2DList}
478 478
479 479 self.keys = [k.lower() for k in self.twoDDict]
480 480 if 'range' in self.keys:
481 481 self.keys.remove('range')
482 482 if 'gdalt' in self.keys:
483 483 self.keys.remove('gdalt')
484 484
485 485 def setFile(self):
486 486 '''
487 487 Create new cedar file object
488 488 '''
489 489
490 490 self.mnemonic = MNEMONICS[self.kinst] #TODO get mnemonic from madrigal
491 491 date = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
492 492
493 493 filename = '{}{}{}'.format(self.mnemonic,
494 494 date.strftime('%Y%m%d_%H%M%S'),
495 495 self.ext)
496 496
497 497 self.fullname = os.path.join(self.path, filename)
498 498
499 499 if os.path.isfile(self.fullname) :
500 500 log.warning(
501 501 'Destination file {} already exists, previous file deleted.'.format(
502 502 self.fullname),
503 503 'MADWriter')
504 504 os.remove(self.fullname)
505 505
506 506 try:
507 507 log.success(
508 508 'Creating file: {}'.format(self.fullname),
509 509 'MADWriter')
510 510 self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True)
511 511 except ValueError as e:
512 512 log.error(
513 513 'Impossible to create a cedar object with "madrigal.cedar.MadrigalCedarFile"',
514 514 'MADWriter')
515 515 return
516 516
517 517 return 1
518 518
519 519 def writeBlock(self):
520 520 '''
521 521 Add data records to cedar file taking data from oneDDict and twoDDict
522 522 attributes.
523 523 Allowed parameters in: parcodes.tab
524 524 '''
525 525
526 526 startTime = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
527 527 endTime = startTime + datetime.timedelta(seconds=self.dataOut.paramInterval)
528 528 heights = self.dataOut.heightList
529 529
530 530 if self.ext == '.dat':
531 531 for key, value in list(self.twoDDict.items()):
532 532 if isinstance(value, str):
533 533 data = getattr(self.dataOut, value)
534 534 invalid = numpy.isnan(data)
535 535 data[invalid] = self.missing
536 536 elif isinstance(value, (tuple, list)):
537 537 attr, key = value
538 538 data = getattr(self.dataOut, attr)
539 539 invalid = numpy.isnan(data)
540 540 data[invalid] = self.missing
541 541
542 542 out = {}
543 543 for key, value in list(self.twoDDict.items()):
544 544 key = key.lower()
545 545 if isinstance(value, str):
546 546 if 'db' in value.lower():
547 547 tmp = getattr(self.dataOut, value.replace('_db', ''))
548 548 SNRavg = numpy.average(tmp, axis=0)
549 549 tmp = 10*numpy.log10(SNRavg)
550 550 else:
551 551 tmp = getattr(self.dataOut, value)
552 552 out[key] = tmp.flatten()
553 553 elif isinstance(value, (tuple, list)):
554 554 attr, x = value
555 555 data = getattr(self.dataOut, attr)
556 556 out[key] = data[int(x)]
557 557
558 558 a = numpy.array([out[k] for k in self.keys])
559 559 nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))])
560 560 index = numpy.where(nrows == False)[0]
561 561
562 562 rec = madrigal.cedar.MadrigalDataRecord(
563 563 self.kinst,
564 564 self.kindat,
565 565 startTime.year,
566 566 startTime.month,
567 567 startTime.day,
568 568 startTime.hour,
569 569 startTime.minute,
570 570 startTime.second,
571 571 startTime.microsecond/10000,
572 572 endTime.year,
573 573 endTime.month,
574 574 endTime.day,
575 575 endTime.hour,
576 576 endTime.minute,
577 577 endTime.second,
578 578 endTime.microsecond/10000,
579 579 list(self.oneDDict.keys()),
580 580 list(self.twoDDict.keys()),
581 581 len(index),
582 582 **self.extra_args
583 583 )
584 584
585 585 # Setting 1d values
586 586 for key in self.oneDDict:
587 587 rec.set1D(key, getattr(self.dataOut, self.oneDDict[key]))
588 588
589 589 # Setting 2d values
590 590 nrec = 0
591 591 for n in index:
592 592 for key in out:
593 593 rec.set2D(key, nrec, out[key][n])
594 594 nrec += 1
595 595
596 596 self.fp.append(rec)
597 597 if self.ext == '.hdf5' and self.counter % 500 == 0 and self.counter > 0:
598 598 self.fp.dump()
599 599 if self.counter % 100 == 0 and self.counter > 0:
600 600 log.log(
601 601 'Writing {} records'.format(
602 602 self.counter),
603 603 'MADWriter')
604 604
605 605 def setHeader(self):
606 606 '''
607 607 Create an add catalog and header to cedar file
608 608 '''
609 609
610 610 log.success('Closing file {}'.format(self.fullname), 'MADWriter')
611 611
612 612 if self.ext == '.dat':
613 613 self.fp.write()
614 614 else:
615 615 self.fp.dump()
616 616 self.fp.close()
617 617
618 618 header = madrigal.cedar.CatalogHeaderCreator(self.fullname)
619 619 header.createCatalog(**self.catalog)
620 620 header.createHeader(**self.header)
621 621 header.write()
622 622
623 623 def putData(self):
624 624
625 625 if self.dataOut.flagNoData:
626 626 return 0
627 627
628 628 if self.dataOut.flagDiscontinuousBlock or self.counter == self.blocks:
629 629 if self.counter > 0:
630 630 self.setHeader()
631 631 self.counter = 0
632 632
633 633 if self.counter == 0:
634 634 self.setFile()
635 635
636 636 self.writeBlock()
637 637 self.counter += 1
638 638
639 639 def close(self):
640 640
641 641 if self.counter > 0:
642 642 self.setHeader() No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now