##// END OF EJS Templates
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
jespinoza -
r1254:6b25d3b79646
parent child
Show More
@@ -1,674 +1,629
1 1 '''
2 2 Created on Set 9, 2015
3 3
4 4 @author: roj-idl71 Karim Kuyeng
5 5 '''
6 6
7 7 import os
8 8 import sys
9 9 import glob
10 10 import fnmatch
11 11 import datetime
12 12 import time
13 13 import re
14 14 import h5py
15 15 import numpy
16 16
17 17 try:
18 18 from gevent import sleep
19 19 except:
20 20 from time import sleep
21 21
22 22 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
23 23 from schainpy.model.data.jrodata import Voltage
24 24 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
25 25 from numpy import imag
26 26
27 27 class AMISRReader(ProcessingUnit):
28 28 '''
29 29 classdocs
30 30 '''
31 31
32 32 def __init__(self):
33 33 '''
34 34 Constructor
35 35 '''
36 36
37 37 ProcessingUnit.__init__(self)
38 38
39 39 self.set = None
40 40 self.subset = None
41 41 self.extension_file = '.h5'
42 42 self.dtc_str = 'dtc'
43 43 self.dtc_id = 0
44 44 self.status = True
45 45 self.isConfig = False
46 46 self.dirnameList = []
47 47 self.filenameList = []
48 48 self.fileIndex = None
49 49 self.flagNoMoreFiles = False
50 50 self.flagIsNewFile = 0
51 51 self.filename = ''
52 52 self.amisrFilePointer = None
53 53
54 54
55 55 self.dataset = None
56 56
57 57
58 58
59 59
60 60 self.profileIndex = 0
61 61
62 62
63 63 self.beamCodeByFrame = None
64 64 self.radacTimeByFrame = None
65 65
66 66 self.dataset = None
67 67
68 68
69 69
70 70
71 71 self.__firstFile = True
72 72
73 73 self.buffer = None
74 74
75 75
76 76 self.timezone = 'ut'
77 77
78 78 self.__waitForNewFile = 20
79 79 self.__filename_online = None
80 80 #Is really necessary create the output object in the initializer
81 81 self.dataOut = Voltage()
82 82
83 83 def setup(self,path=None,
84 84 startDate=None,
85 85 endDate=None,
86 86 startTime=None,
87 87 endTime=None,
88 88 walk=True,
89 89 timezone='ut',
90 90 all=0,
91 91 code = None,
92 92 nCode = 0,
93 93 nBaud = 0,
94 94 online=False):
95 95
96 96 self.timezone = timezone
97 97 self.all = all
98 98 self.online = online
99 99
100 100 self.code = code
101 101 self.nCode = int(nCode)
102 102 self.nBaud = int(nBaud)
103 103
104 104
105 105
106 106 #self.findFiles()
107 107 if not(online):
108 108 #Busqueda de archivos offline
109 109 self.searchFilesOffLine(path, startDate, endDate, startTime, endTime, walk)
110 110 else:
111 111 self.searchFilesOnLine(path, startDate, endDate, startTime,endTime,walk)
112 112
113 113 if not(self.filenameList):
114 114 print("There is no files into the folder: %s"%(path))
115 115
116 116 sys.exit(-1)
117 117
118 118 self.fileIndex = -1
119 119
120 120 self.readNextFile(online)
121 121
122 122 '''
123 123 Add code
124 124 '''
125 125 self.isConfig = True
126 126
127 127 pass
128 128
129 129
130 130 def readAMISRHeader(self,fp):
131 131 header = 'Raw11/Data/RadacHeader'
132 132 self.beamCodeByPulse = fp.get(header+'/BeamCode') # LIST OF BEAMS PER PROFILE, TO BE USED ON REARRANGE
133 133 self.beamCode = fp.get('Raw11/Data/Beamcodes') # NUMBER OF CHANNELS AND IDENTIFY POSITION TO CREATE A FILE WITH THAT INFO
134 134 #self.code = fp.get(header+'/Code') # NOT USE FOR THIS
135 135 self.frameCount = fp.get(header+'/FrameCount')# NOT USE FOR THIS
136 136 self.modeGroup = fp.get(header+'/ModeGroup')# NOT USE FOR THIS
137 137 self.nsamplesPulse = fp.get(header+'/NSamplesPulse')# TO GET NSA OR USING DATA FOR THAT
138 138 self.pulseCount = fp.get(header+'/PulseCount')# NOT USE FOR THIS
139 139 self.radacTime = fp.get(header+'/RadacTime')# 1st TIME ON FILE ANDE CALCULATE THE REST WITH IPP*nindexprofile
140 140 self.timeCount = fp.get(header+'/TimeCount')# NOT USE FOR THIS
141 141 self.timeStatus = fp.get(header+'/TimeStatus')# NOT USE FOR THIS
142 142 self.rangeFromFile = fp.get('Raw11/Data/Samples/Range')
143 143 self.frequency = fp.get('Rx/Frequency')
144 144 txAus = fp.get('Raw11/Data/Pulsewidth')
145 145
146 146
147 147 self.nblocks = self.pulseCount.shape[0] #nblocks
148 148
149 149 self.nprofiles = self.pulseCount.shape[1] #nprofile
150 150 self.nsa = self.nsamplesPulse[0,0] #ngates
151 151 self.nchannels = self.beamCode.shape[1]
152 152 self.ippSeconds = (self.radacTime[0][1] -self.radacTime[0][0]) #Ipp in seconds
153 153 #self.__waitForNewFile = self.nblocks # wait depending on the number of blocks since each block is 1 sec
154 154 self.__waitForNewFile = self.nblocks * self.nprofiles * self.ippSeconds # wait until new file is created
155 155
156 156 #filling radar controller header parameters
157 157 self.__ippKm = self.ippSeconds *.15*1e6 # in km
158 158 self.__txA = (txAus.value)*.15 #(ipp[us]*.15km/1us) in km
159 159 self.__txB = 0
160 160 nWindows=1
161 161 self.__nSamples = self.nsa
162 162 self.__firstHeight = self.rangeFromFile[0][0]/1000 #in km
163 163 self.__deltaHeight = (self.rangeFromFile[0][1] - self.rangeFromFile[0][0])/1000
164 164
165 165 #for now until understand why the code saved is different (code included even though code not in tuf file)
166 166 #self.__codeType = 0
167 167 # self.__nCode = None
168 168 # self.__nBaud = None
169 169 self.__code = self.code
170 170 self.__codeType = 0
171 171 if self.code != None:
172 172 self.__codeType = 1
173 173 self.__nCode = self.nCode
174 174 self.__nBaud = self.nBaud
175 175 #self.__code = 0
176 176
177 177 #filling system header parameters
178 178 self.__nSamples = self.nsa
179 179 self.newProfiles = self.nprofiles/self.nchannels
180 180 self.__channelList = list(range(self.nchannels))
181 181
182 182 self.__frequency = self.frequency[0][0]
183 183
184 184
185 185
186 186 def createBuffers(self):
187 187
188 188 pass
189 189
190 190 def __setParameters(self,path='', startDate='',endDate='',startTime='', endTime='', walk=''):
191 191 self.path = path
192 192 self.startDate = startDate
193 193 self.endDate = endDate
194 194 self.startTime = startTime
195 195 self.endTime = endTime
196 196 self.walk = walk
197 197
198 198 def __checkPath(self):
199 199 if os.path.exists(self.path):
200 200 self.status = 1
201 201 else:
202 202 self.status = 0
203 203 print('Path:%s does not exists'%self.path)
204 204
205 205 return
206 206
207 207
208 208 def __selDates(self, amisr_dirname_format):
209 209 try:
210 210 year = int(amisr_dirname_format[0:4])
211 211 month = int(amisr_dirname_format[4:6])
212 212 dom = int(amisr_dirname_format[6:8])
213 213 thisDate = datetime.date(year,month,dom)
214 214
215 215 if (thisDate>=self.startDate and thisDate <= self.endDate):
216 216 return amisr_dirname_format
217 217 except:
218 218 return None
219 219
220 220
221 221 def __findDataForDates(self,online=False):
222 222
223 223 if not(self.status):
224 224 return None
225 225
226 226 pat = '\d+.\d+'
227 227 dirnameList = [re.search(pat,x) for x in os.listdir(self.path)]
228 228 dirnameList = [x for x in dirnameList if x!=None]
229 229 dirnameList = [x.string for x in dirnameList]
230 230 if not(online):
231 231 dirnameList = [self.__selDates(x) for x in dirnameList]
232 232 dirnameList = [x for x in dirnameList if x!=None]
233 233 if len(dirnameList)>0:
234 234 self.status = 1
235 235 self.dirnameList = dirnameList
236 236 self.dirnameList.sort()
237 237 else:
238 238 self.status = 0
239 239 return None
240 240
241 241 def __getTimeFromData(self):
242 242 startDateTime_Reader = datetime.datetime.combine(self.startDate,self.startTime)
243 243 endDateTime_Reader = datetime.datetime.combine(self.endDate,self.endTime)
244 244
245 245 print('Filtering Files from %s to %s'%(startDateTime_Reader, endDateTime_Reader))
246 246 print('........................................')
247 247 filter_filenameList = []
248 248 self.filenameList.sort()
249 249 #for i in range(len(self.filenameList)-1):
250 250 for i in range(len(self.filenameList)):
251 251 filename = self.filenameList[i]
252 252 fp = h5py.File(filename,'r')
253 253 time_str = fp.get('Time/RadacTimeString')
254 254
255 255 startDateTimeStr_File = time_str[0][0].split('.')[0]
256 256 junk = time.strptime(startDateTimeStr_File, '%Y-%m-%d %H:%M:%S')
257 257 startDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec)
258 258
259 259 endDateTimeStr_File = time_str[-1][-1].split('.')[0]
260 260 junk = time.strptime(endDateTimeStr_File, '%Y-%m-%d %H:%M:%S')
261 261 endDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec)
262 262
263 263 fp.close()
264 264
265 265 if self.timezone == 'lt':
266 266 startDateTime_File = startDateTime_File - datetime.timedelta(minutes = 300)
267 267 endDateTime_File = endDateTime_File - datetime.timedelta(minutes = 300)
268 268
269 269 if (endDateTime_File>=startDateTime_Reader and endDateTime_File<endDateTime_Reader):
270 270 #self.filenameList.remove(filename)
271 271 filter_filenameList.append(filename)
272 272
273 273 if (endDateTime_File>=endDateTime_Reader):
274 274 break
275 275
276 276
277 277 filter_filenameList.sort()
278 278 self.filenameList = filter_filenameList
279 279 return 1
280 280
281 281 def __filterByGlob1(self, dirName):
282 282 filter_files = glob.glob1(dirName, '*.*%s'%self.extension_file)
283 283 filter_files.sort()
284 284 filterDict = {}
285 285 filterDict.setdefault(dirName)
286 286 filterDict[dirName] = filter_files
287 287 return filterDict
288 288
289 289 def __getFilenameList(self, fileListInKeys, dirList):
290 290 for value in fileListInKeys:
291 291 dirName = list(value.keys())[0]
292 292 for file in value[dirName]:
293 293 filename = os.path.join(dirName, file)
294 294 self.filenameList.append(filename)
295 295
296 296
297 297 def __selectDataForTimes(self, online=False):
298 298 #aun no esta implementado el filtro for tiempo
299 299 if not(self.status):
300 300 return None
301 301
302 302 dirList = [os.path.join(self.path,x) for x in self.dirnameList]
303 303
304 304 fileListInKeys = [self.__filterByGlob1(x) for x in dirList]
305 305
306 306 self.__getFilenameList(fileListInKeys, dirList)
307 307 if not(online):
308 308 #filtro por tiempo
309 309 if not(self.all):
310 310 self.__getTimeFromData()
311 311
312 312 if len(self.filenameList)>0:
313 313 self.status = 1
314 314 self.filenameList.sort()
315 315 else:
316 316 self.status = 0
317 317 return None
318 318
319 319 else:
320 320 #get the last file - 1
321 321 self.filenameList = [self.filenameList[-2]]
322 322
323 323 new_dirnameList = []
324 324 for dirname in self.dirnameList:
325 325 junk = numpy.array([dirname in x for x in self.filenameList])
326 326 junk_sum = junk.sum()
327 327 if junk_sum > 0:
328 328 new_dirnameList.append(dirname)
329 329 self.dirnameList = new_dirnameList
330 330 return 1
331 331
332 332 def searchFilesOnLine(self, path, startDate, endDate, startTime=datetime.time(0,0,0),
333 333 endTime=datetime.time(23,59,59),walk=True):
334 334
335 335 if endDate ==None:
336 336 startDate = datetime.datetime.utcnow().date()
337 337 endDate = datetime.datetime.utcnow().date()
338 338
339 339 self.__setParameters(path=path, startDate=startDate, endDate=endDate,startTime = startTime,endTime=endTime, walk=walk)
340 340
341 341 self.__checkPath()
342 342
343 343 self.__findDataForDates(online=True)
344 344
345 345 self.dirnameList = [self.dirnameList[-1]]
346 346
347 347 self.__selectDataForTimes(online=True)
348 348
349 349 return
350 350
351 351
352 352 def searchFilesOffLine(self,
353 353 path,
354 354 startDate,
355 355 endDate,
356 356 startTime=datetime.time(0,0,0),
357 357 endTime=datetime.time(23,59,59),
358 358 walk=True):
359 359
360 360 self.__setParameters(path, startDate, endDate, startTime, endTime, walk)
361 361
362 362 self.__checkPath()
363 363
364 364 self.__findDataForDates()
365 365
366 366 self.__selectDataForTimes()
367 367
368 368 for i in range(len(self.filenameList)):
369 369 print("%s" %(self.filenameList[i]))
370 370
371 371 return
372 372
373 373 def __setNextFileOffline(self):
374 374 idFile = self.fileIndex
375 375
376 376 while (True):
377 377 idFile += 1
378 378 if not(idFile < len(self.filenameList)):
379 379 self.flagNoMoreFiles = 1
380 380 print("No more Files")
381 381 return 0
382 382
383 383 filename = self.filenameList[idFile]
384 384
385 385 amisrFilePointer = h5py.File(filename,'r')
386 386
387 387 break
388 388
389 389 self.flagIsNewFile = 1
390 390 self.fileIndex = idFile
391 391 self.filename = filename
392 392
393 393 self.amisrFilePointer = amisrFilePointer
394 394
395 395 print("Setting the file: %s"%self.filename)
396 396
397 397 return 1
398 398
399 399
400 400 def __setNextFileOnline(self):
401 401 filename = self.filenameList[0]
402 402 if self.__filename_online != None:
403 403 self.__selectDataForTimes(online=True)
404 404 filename = self.filenameList[0]
405 405 wait = 0
406 406 while self.__filename_online == filename:
407 407 print('waiting %d seconds to get a new file...'%(self.__waitForNewFile))
408 408 if wait == 5:
409 409 return 0
410 410 sleep(self.__waitForNewFile)
411 411 self.__selectDataForTimes(online=True)
412 412 filename = self.filenameList[0]
413 413 wait += 1
414 414
415 415 self.__filename_online = filename
416 416
417 417 self.amisrFilePointer = h5py.File(filename,'r')
418 418 self.flagIsNewFile = 1
419 419 self.filename = filename
420 420 print("Setting the file: %s"%self.filename)
421 421 return 1
422 422
423 423
424 424 def readData(self):
425 425 buffer = self.amisrFilePointer.get('Raw11/Data/Samples/Data')
426 426 re = buffer[:,:,:,0]
427 427 im = buffer[:,:,:,1]
428 428 dataset = re + im*1j
429 429 self.radacTime = self.amisrFilePointer.get('Raw11/Data/RadacHeader/RadacTime')
430 430 timeset = self.radacTime[:,0]
431 431 return dataset,timeset
432 432
433 433 def reshapeData(self):
434 434 #self.beamCodeByPulse, self.beamCode, self.nblocks, self.nprofiles, self.nsa,
435 435 channels = self.beamCodeByPulse[0,:]
436 436 nchan = self.nchannels
437 437 #self.newProfiles = self.nprofiles/nchan #must be defined on filljroheader
438 438 nblocks = self.nblocks
439 439 nsamples = self.nsa
440 440
441 441 #Dimensions : nChannels, nProfiles, nSamples
442 442 new_block = numpy.empty((nblocks, nchan, self.newProfiles, nsamples), dtype="complex64")
443 443 ############################################
444 444
445 445 for thisChannel in range(nchan):
446 446 new_block[:,thisChannel,:,:] = self.dataset[:,numpy.where(channels==self.beamCode[0][thisChannel])[0],:]
447 447
448 448
449 449 new_block = numpy.transpose(new_block, (1,0,2,3))
450 450 new_block = numpy.reshape(new_block, (nchan,-1, nsamples))
451 451
452 452 return new_block
453 453
454 454 def updateIndexes(self):
455 455
456 456 pass
457 457
458 458 def fillJROHeader(self):
459 459
460 460 #fill radar controller header
461 461 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(ippKm=self.__ippKm,
462 462 txA=self.__txA,
463 463 txB=0,
464 464 nWindows=1,
465 465 nHeights=self.__nSamples,
466 466 firstHeight=self.__firstHeight,
467 467 deltaHeight=self.__deltaHeight,
468 468 codeType=self.__codeType,
469 469 nCode=self.__nCode, nBaud=self.__nBaud,
470 470 code = self.__code,
471 471 fClock=1)
472 472
473 473
474 474
475 475 #fill system header
476 476 self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples,
477 477 nProfiles=self.newProfiles,
478 478 nChannels=len(self.__channelList),
479 479 adcResolution=14,
480 480 pciDioBusWith=32)
481 481
482 482 self.dataOut.type = "Voltage"
483 483
484 484 self.dataOut.data = None
485 485
486 486 self.dataOut.dtype = numpy.dtype([('real','<i8'),('imag','<i8')])
487 487
488 488 # self.dataOut.nChannels = 0
489 489
490 490 # self.dataOut.nHeights = 0
491 491
492 492 self.dataOut.nProfiles = self.newProfiles*self.nblocks
493 493
494 494 #self.dataOut.heightList = self.__firstHeigth + numpy.arange(self.__nSamples, dtype = numpy.float)*self.__deltaHeigth
495 495 ranges = numpy.reshape(self.rangeFromFile.value,(-1))
496 496 self.dataOut.heightList = ranges/1000.0 #km
497 497
498 498
499 499 self.dataOut.channelList = self.__channelList
500 500
501 501 self.dataOut.blocksize = self.dataOut.getNChannels() * self.dataOut.getNHeights()
502 502
503 503 # self.dataOut.channelIndexList = None
504 504
505 505 self.dataOut.flagNoData = True
506 506
507 507 #Set to TRUE if the data is discontinuous
508 508 self.dataOut.flagDiscontinuousBlock = False
509 509
510 510 self.dataOut.utctime = None
511 511
512 512 #self.dataOut.timeZone = -5 #self.__timezone/60 #timezone like jroheader, difference in minutes between UTC and localtime
513 513 if self.timezone == 'lt':
514 514 self.dataOut.timeZone = time.timezone / 60. #get the timezone in minutes
515 515 else:
516 516 self.dataOut.timeZone = 0 #by default time is UTC
517 517
518 518 self.dataOut.dstFlag = 0
519 519
520 520 self.dataOut.errorCount = 0
521 521
522 522 self.dataOut.nCohInt = 1
523 523
524 524 self.dataOut.flagDecodeData = False #asumo que la data esta decodificada
525 525
526 526 self.dataOut.flagDeflipData = False #asumo que la data esta sin flip
527 527
528 528 self.dataOut.flagShiftFFT = False
529 529
530 530 self.dataOut.ippSeconds = self.ippSeconds
531 531
532 532 #Time interval between profiles
533 533 #self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt
534 534
535 535 self.dataOut.frequency = self.__frequency
536 536
537 537 self.dataOut.realtime = self.online
538 538 pass
539 539
540 540 def readNextFile(self,online=False):
541 541
542 542 if not(online):
543 543 newFile = self.__setNextFileOffline()
544 544 else:
545 545 newFile = self.__setNextFileOnline()
546 546
547 547 if not(newFile):
548 548 return 0
549 549
550 550 #if self.__firstFile:
551 551 self.readAMISRHeader(self.amisrFilePointer)
552 552 self.createBuffers()
553 553 self.fillJROHeader()
554 554 #self.__firstFile = False
555 555
556 556
557 557
558 558 self.dataset,self.timeset = self.readData()
559 559
560 560 if self.endDate!=None:
561 561 endDateTime_Reader = datetime.datetime.combine(self.endDate,self.endTime)
562 562 time_str = self.amisrFilePointer.get('Time/RadacTimeString')
563 563 startDateTimeStr_File = time_str[0][0].split('.')[0]
564 564 junk = time.strptime(startDateTimeStr_File, '%Y-%m-%d %H:%M:%S')
565 565 startDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec)
566 566 if self.timezone == 'lt':
567 567 startDateTime_File = startDateTime_File - datetime.timedelta(minutes = 300)
568 568 if (startDateTime_File>endDateTime_Reader):
569 569 return 0
570 570
571 571 self.jrodataset = self.reshapeData()
572 572 #----self.updateIndexes()
573 573 self.profileIndex = 0
574 574
575 575 return 1
576 576
577 577
578 578 def __hasNotDataInBuffer(self):
579 579 if self.profileIndex >= (self.newProfiles*self.nblocks):
580 580 return 1
581 581 return 0
582 582
583 583
584 584 def getData(self):
585 585
586 586 if self.flagNoMoreFiles:
587 587 self.dataOut.flagNoData = True
588 588 return 0
589 589
590 590 if self.__hasNotDataInBuffer():
591 591 if not (self.readNextFile(self.online)):
592 592 return 0
593 593
594 594
595 595 if self.dataset is None: # setear esta condicion cuando no hayan datos por leers
596 596 self.dataOut.flagNoData = True
597 597 return 0
598 598
599 599 #self.dataOut.data = numpy.reshape(self.jrodataset[self.profileIndex,:],(1,-1))
600 600
601 601 self.dataOut.data = self.jrodataset[:,self.profileIndex,:]
602 602
603 603 #self.dataOut.utctime = self.jrotimeset[self.profileIndex]
604 604 #verificar basic header de jro data y ver si es compatible con este valor
605 605 #self.dataOut.utctime = self.timeset + (self.profileIndex * self.ippSeconds * self.nchannels)
606 606 indexprof = numpy.mod(self.profileIndex, self.newProfiles)
607 607 indexblock = self.profileIndex/self.newProfiles
608 608 #print indexblock, indexprof
609 609 self.dataOut.utctime = self.timeset[indexblock] + (indexprof * self.ippSeconds * self.nchannels)
610 610 self.dataOut.profileIndex = self.profileIndex
611 611 self.dataOut.flagNoData = False
612 612 # if indexprof == 0:
613 613 # print self.dataOut.utctime
614 614
615 615 self.profileIndex += 1
616 616
617 617 return self.dataOut.data
618 618
619 619
620 620 def run(self, **kwargs):
621 621 '''
622 622 This method will be called many times so here you should put all your code
623 623 '''
624 624
625 625 if not self.isConfig:
626 626 self.setup(**kwargs)
627 627 self.isConfig = True
628 628
629 629 self.getData()
630
631 class Writer(Operation):
632 '''
633 classdocs
634 '''
635
636 def __init__(self):
637 '''
638 Constructor
639 '''
640 self.dataOut = None
641
642 self.isConfig = False
643
644 def setup(self, dataIn, path, blocksPerFile, set=0, ext=None):
645 '''
646 In this method we should set all initial parameters.
647
648 Input:
649 dataIn : Input data will also be outputa data
650
651 '''
652 self.dataOut = dataIn
653
654
655
656
657
658 self.isConfig = True
659
660 return
661
662 def run(self, dataIn, **kwargs):
663 '''
664 This method will be called many times so here you should put all your code
665
666 Inputs:
667
668 dataIn : object with the data
669
670 '''
671
672 if not self.isConfig:
673 self.setup(dataIn, **kwargs)
674 No newline at end of file
@@ -1,639 +1,597
1 1 '''
2 2 Created on Aug 1, 2017
3 3
4 4 @author: Juan C. Espinoza
5 5 '''
6 6
7 7 import os
8 8 import sys
9 9 import time
10 10 import json
11 11 import glob
12 12 import datetime
13 13
14 14 import numpy
15 15 import h5py
16 16
17 17 import schainpy.admin
18 from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
18 from schainpy.model.io.jroIO_base import LOCALTIME, Reader
19 19 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
20 20 from schainpy.model.data.jrodata import Parameters
21 21 from schainpy.utils import log
22 22
23 23 try:
24 24 import madrigal.cedar
25 25 except:
26 26 log.warning(
27 27 'You should install "madrigal library" module if you want to read/write Madrigal data'
28 28 )
29 29
30 30 try:
31 31 basestring
32 32 except:
33 33 basestring = str
34 34
35 35 DEF_CATALOG = {
36 36 'principleInvestigator': 'Marco Milla',
37 37 'expPurpose': '',
38 38 'cycleTime': '',
39 39 'correlativeExp': '',
40 40 'sciRemarks': '',
41 41 'instRemarks': ''
42 42 }
43 43
44 44 DEF_HEADER = {
45 45 'kindatDesc': '',
46 46 'analyst': 'Jicamarca User',
47 47 'comments': '',
48 48 'history': ''
49 49 }
50 50
51 51 MNEMONICS = {
52 52 10: 'jro',
53 53 11: 'jbr',
54 54 840: 'jul',
55 55 13: 'jas',
56 56 1000: 'pbr',
57 57 1001: 'hbr',
58 58 1002: 'obr',
59 59 400: 'clr'
60 60
61 61 }
62 62
63 63 UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
64 64
65 65 def load_json(obj):
66 66 '''
67 67 Parse json as string instead of unicode
68 68 '''
69 69
70 70 if isinstance(obj, str):
71 71 iterable = json.loads(obj)
72 72 else:
73 73 iterable = obj
74 74
75 75 if isinstance(iterable, dict):
76 76 return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, basestring) else v
77 77 for k, v in list(iterable.items())}
78 78 elif isinstance(iterable, (list, tuple)):
79 79 return [str(v) if isinstance(v, basestring) else v for v in iterable]
80 80
81 81 return iterable
82 82
83 83 @MPDecorator
84 class MADReader(JRODataReader, ProcessingUnit):
84 class MADReader(Reader, ProcessingUnit):
85 85
86 86 def __init__(self):
87 87
88 88 ProcessingUnit.__init__(self)
89 89
90 90 self.dataOut = Parameters()
91 91 self.counter_records = 0
92 92 self.nrecords = None
93 93 self.flagNoMoreFiles = 0
94 self.isConfig = False
95 94 self.filename = None
96 95 self.intervals = set()
96 self.datatime = datetime.datetime(1900,1,1)
97 self.format = None
98 self.filefmt = "***%Y%m%d*******"
97 99
98 def setup(self,
99 path=None,
100 startDate=None,
101 endDate=None,
102 format=None,
103 startTime=datetime.time(0, 0, 0),
104 endTime=datetime.time(23, 59, 59),
105 **kwargs):
100 def setup(self, **kwargs):
106 101
107 self.path = path
108 self.startDate = startDate
109 self.endDate = endDate
110 self.startTime = startTime
111 self.endTime = endTime
112 self.datatime = datetime.datetime(1900,1,1)
113 self.oneDDict = load_json(kwargs.get('oneDDict',
114 "{\"GDLATR\":\"lat\", \"GDLONR\":\"lon\"}"))
115 self.twoDDict = load_json(kwargs.get('twoDDict',
116 "{\"GDALT\": \"heightList\"}"))
117 self.independentParam = 'GDALT'
102 self.set_kwargs(**kwargs)
103 self.oneDDict = load_json(self.oneDDict)
104 self.twoDDict = load_json(self.twoDDict)
105 self.ind2DList = load_json(self.ind2DList)
106 self.independentParam = self.ind2DList[0]
118 107
119 108 if self.path is None:
120 109 raise ValueError('The path is not valid')
121 110
122 if format is None:
111 self.open_file = open
112 self.open_mode = 'rb'
113
114 if self.format is None:
123 115 raise ValueError('The format is not valid choose simple or hdf5')
124 elif format.lower() in ('simple', 'txt'):
116 elif self.format.lower() in ('simple', 'txt'):
125 117 self.ext = '.txt'
126 elif format.lower() in ('cedar',):
118 elif self.format.lower() in ('cedar',):
127 119 self.ext = '.001'
128 120 else:
129 121 self.ext = '.hdf5'
122 self.open_file = h5py.File
123 self.open_mode = 'r'
130 124
131 self.search_files(self.path)
132 self.fileId = 0
133
134 if not self.fileList:
135 raise Warning('There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path))
136
137 self.setNextFile()
138
139 def search_files(self, path):
140 '''
141 Searching for madrigal files in path
142 Creating a list of files to procces included in [startDate,endDate]
143
144 Input:
145 path - Path to find files
146 '''
147
148 log.log('Searching files {} in {} '.format(self.ext, path), 'MADReader')
149 fileList0 = glob.glob1(path, '*{}'.format(self.ext))
150 fileList0.sort()
151
152 self.fileList = []
153 self.dateFileList = []
154
155 startDate = self.startDate - datetime.timedelta(1)
156 endDate = self.endDate + datetime.timedelta(1)
157
158 for thisFile in fileList0:
159 year = thisFile[3:7]
160 if not year.isdigit():
161 continue
162
163 month = thisFile[7:9]
164 if not month.isdigit():
165 continue
125 if self.online:
126 log.log("Searching files in online mode...", self.name)
166 127
167 day = thisFile[9:11]
168 if not day.isdigit():
169 continue
128 for nTries in range(self.nTries):
129 fullpath = self.searchFilesOnLine(self.path, self.startDate,
130 self.endDate, self.expLabel, self.ext, self.walk,
131 self.filefmt, self.folderfmt)
170 132
171 year, month, day = int(year), int(month), int(day)
172 dateFile = datetime.date(year, month, day)
133 try:
134 fullpath = next(fullpath)
135 except:
136 fullpath = None
137
138 if fullpath:
139 break
173 140
174 if (startDate > dateFile) or (endDate < dateFile):
175 continue
141 log.warning(
142 'Waiting {} sec for a valid file in {}: try {} ...'.format(
143 self.delay, self.path, nTries + 1),
144 self.name)
145 time.sleep(self.delay)
146
147 if not(fullpath):
148 raise schainpy.admin.SchainError(
149 'There isn\'t any valid file in {}'.format(self.path))
150
151 else:
152 log.log("Searching files in {}".format(self.path), self.name)
153 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
154 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
155
156 self.setNextFile()
176 157
177 self.fileList.append(thisFile)
178 self.dateFileList.append(dateFile)
158 def readFirstHeader(self):
159 '''Read header and data'''
179 160
180 return
161 self.parseHeader()
162 self.parseData()
163 self.blockIndex = 0
164
165 return
181 166
182 167 def parseHeader(self):
183 168 '''
184 169 '''
185 170
186 171 self.output = {}
187 172 self.version = '2'
188 173 s_parameters = None
189 174 if self.ext == '.txt':
190 175 self.parameters = [s.strip().lower() for s in self.fp.readline().decode().strip().split(' ') if s]
191 176 elif self.ext == '.hdf5':
192 177 self.metadata = self.fp['Metadata']
193 178 if '_record_layout' in self.metadata:
194 179 s_parameters = [s[0].lower().decode() for s in self.metadata['Independent Spatial Parameters']]
195 180 self.version = '3'
196 181 self.parameters = [s[0].lower().decode() for s in self.metadata['Data Parameters']]
197 182
198 183 log.success('Parameters found: {}'.format(self.parameters),
199 184 'MADReader')
200 185 if s_parameters:
201 186 log.success('Spatial parameters found: {}'.format(s_parameters),
202 187 'MADReader')
203 188
204 189 for param in list(self.oneDDict.keys()):
205 190 if param.lower() not in self.parameters:
206 191 log.warning(
207 192 'Parameter {} not found will be ignored'.format(
208 193 param),
209 194 'MADReader')
210 195 self.oneDDict.pop(param, None)
211 196
212 197 for param, value in list(self.twoDDict.items()):
213 198 if param.lower() not in self.parameters:
214 199 log.warning(
215 200 'Parameter {} not found, it will be ignored'.format(
216 201 param),
217 202 'MADReader')
218 203 self.twoDDict.pop(param, None)
219 204 continue
220 205 if isinstance(value, list):
221 206 if value[0] not in self.output:
222 207 self.output[value[0]] = []
223 208 self.output[value[0]].append([])
224 209
225 210 def parseData(self):
226 211 '''
227 212 '''
228 213
229 214 if self.ext == '.txt':
230 215 self.data = numpy.genfromtxt(self.fp, missing_values=('missing'))
231 216 self.nrecords = self.data.shape[0]
232 217 self.ranges = numpy.unique(self.data[:,self.parameters.index(self.independentParam.lower())])
233 218 self.counter_records = 0
234 219 elif self.ext == '.hdf5':
235 220 self.data = self.fp['Data']
236 221 self.ranges = numpy.unique(self.data['Table Layout'][self.independentParam.lower()])
237 222 self.times = numpy.unique(self.data['Table Layout']['ut1_unix'])
238 223 self.counter_records = int(self.data['Table Layout']['recno'][0])
239 224 self.nrecords = int(self.data['Table Layout']['recno'][-1])
240
241 def setNextFile(self):
242 '''
243 '''
244
245 file_id = self.fileId
246
247 if file_id == len(self.fileList):
248 log.success('No more files', 'MADReader')
249 self.flagNoMoreFiles = 1
250 return 0
251
252 log.success(
253 'Opening: {}'.format(self.fileList[file_id]),
254 'MADReader'
255 )
256
257 filename = os.path.join(self.path, self.fileList[file_id])
258
259 if self.filename is not None:
260 self.fp.close()
261
262 self.filename = filename
263 self.filedate = self.dateFileList[file_id]
264
265 if self.ext=='.hdf5':
266 self.fp = h5py.File(self.filename, 'r')
267 else:
268 self.fp = open(self.filename, 'rb')
269
270 self.parseHeader()
271 self.parseData()
272 self.sizeOfFile = os.path.getsize(self.filename)
273 self.flagIsNewFile = 0
274 self.fileId += 1
275
276 return 1
277 225
278 226 def readNextBlock(self):
279 227
280 228 while True:
281 229 self.flagDiscontinuousBlock = 0
282 if self.flagIsNewFile:
283 if not self.setNextFile():
284 return 0
230 if self.counter_records == self.nrecords:
231 self.setNextFile()
285 232
286 233 self.readBlock()
287 234
288 235 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
289 236 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
290 237 log.warning(
291 238 'Reading Record No. {}/{} -> {} [Skipping]'.format(
292 239 self.counter_records,
293 240 self.nrecords,
294 241 self.datatime.ctime()),
295 242 'MADReader')
296 243 continue
297 244 break
298 245
299 246 log.log(
300 247 'Reading Record No. {}/{} -> {}'.format(
301 248 self.counter_records,
302 249 self.nrecords,
303 250 self.datatime.ctime()),
304 251 'MADReader')
305 252
306 253 return 1
307 254
308 255 def readBlock(self):
309 256 '''
310 257 '''
311 258 dum = []
312 259 if self.ext == '.txt':
313 260 dt = self.data[self.counter_records][:6].astype(int)
314 261 if datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5]).date() > self.datatime.date():
315 262 self.flagDiscontinuousBlock = 1
316 263 self.datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
317 264 while True:
318 265 dt = self.data[self.counter_records][:6].astype(int)
319 266 datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
320 267 if datatime == self.datatime:
321 268 dum.append(self.data[self.counter_records])
322 269 self.counter_records += 1
323 270 if self.counter_records == self.nrecords:
324 self.flagIsNewFile = True
325 271 break
326 272 continue
327 273 self.intervals.add((datatime-self.datatime).seconds)
328 274 break
329 275 elif self.ext == '.hdf5':
330 276 datatime = datetime.datetime.utcfromtimestamp(
331 277 self.times[self.counter_records])
332 278 dum = self.data['Table Layout'][self.data['Table Layout']['recno']==self.counter_records]
333 279 self.intervals.add((datatime-self.datatime).seconds)
334 280 if datatime.date()>self.datatime.date():
335 281 self.flagDiscontinuousBlock = 1
336 282 self.datatime = datatime
337 self.counter_records += 1
338 if self.counter_records == self.nrecords:
339 self.flagIsNewFile = True
283 self.counter_records += 1
340 284
341 285 self.buffer = numpy.array(dum)
342 286 return
343 287
344 288 def set_output(self):
345 289 '''
346 290 Storing data from buffer to dataOut object
347 291 '''
348 292
349 293 parameters = [None for __ in self.parameters]
350 294
351 295 for param, attr in list(self.oneDDict.items()):
352 296 x = self.parameters.index(param.lower())
353 297 setattr(self.dataOut, attr, self.buffer[0][x])
354 298
355 299 for param, value in list(self.twoDDict.items()):
356 300 dummy = numpy.zeros(self.ranges.shape) + numpy.nan
357 301 if self.ext == '.txt':
358 302 x = self.parameters.index(param.lower())
359 303 y = self.parameters.index(self.independentParam.lower())
360 304 ranges = self.buffer[:,y]
361 305 #if self.ranges.size == ranges.size:
362 306 # continue
363 307 index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
364 308 dummy[index] = self.buffer[:,x]
365 309 else:
366 310 ranges = self.buffer[self.independentParam.lower()]
367 311 index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
368 312 dummy[index] = self.buffer[param.lower()]
369 313
370 314 if isinstance(value, str):
371 315 if value not in self.independentParam:
372 316 setattr(self.dataOut, value, dummy.reshape(1,-1))
373 317 elif isinstance(value, list):
374 318 self.output[value[0]][value[1]] = dummy
375 319 parameters[value[1]] = param
376 320 for key, value in list(self.output.items()):
377 321 setattr(self.dataOut, key, numpy.array(value))
378 322
379 323 self.dataOut.parameters = [s for s in parameters if s]
380 324 self.dataOut.heightList = self.ranges
381 325 self.dataOut.utctime = (self.datatime - datetime.datetime(1970, 1, 1)).total_seconds()
382 326 self.dataOut.utctimeInit = self.dataOut.utctime
383 327 self.dataOut.paramInterval = min(self.intervals)
384 328 self.dataOut.useLocalTime = False
385 329 self.dataOut.flagNoData = False
386 330 self.dataOut.nrecords = self.nrecords
387 331 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
388 332
389 333 def getData(self):
390 334 '''
391 335 Storing data from databuffer to dataOut object
392 336 '''
393 if self.flagNoMoreFiles:
394 self.dataOut.flagNoData = True
395 raise schainpy.admin.SchainError('No file left to process')
396 return 0
397 337
398 338 if not self.readNextBlock():
399 339 self.dataOut.flagNoData = True
400 340 return 0
401 341
402 342 self.set_output()
403 343
404 344 return 1
405 345
346 def run(self, **kwargs):
347
348 if not(self.isConfig):
349 self.setup(**kwargs)
350 self.isConfig = True
351
352 self.getData()
353
354 return
355
406 356 @MPDecorator
407 357 class MADWriter(Operation):
408
409 missing = -32767
358 '''Writing module for Madrigal files
359
360 type: external
361
362 Inputs:
363 path path where files will be created
364 oneDDict json of one-dimensional parameters in record where keys
365 are Madrigal codes (integers or mnemonics) and values the corresponding
366 dataOut attribute e.g: {
367 'gdlatr': 'lat',
368 'gdlonr': 'lon',
369 'gdlat2':'lat',
370 'glon2':'lon'}
371 ind2DList list of independent spatial two-dimensional parameters e.g:
372 ['heigthList']
373 twoDDict json of two-dimensional parameters in record where keys
374 are Madrigal codes (integers or mnemonics) and values the corresponding
375 dataOut attribute if multidimensional array specify as tupple
376 ('attr', pos) e.g: {
377 'gdalt': 'heightList',
378 'vn1p2': ('data_output', 0),
379 'vn2p2': ('data_output', 1),
380 'vn3': ('data_output', 2),
381 'snl': ('data_SNR', 'db')
382 }
383 metadata json of madrigal metadata (kinst, kindat, catalog and header)
384 format hdf5, cedar
385 blocks number of blocks per file'''
386
387 __attrs__ = ['path', 'oneDDict', 'ind2DList', 'twoDDict','metadata', 'format', 'blocks']
388 missing = -32767
410 389
411 390 def __init__(self):
412 391
413 392 Operation.__init__(self)
414 393 self.dataOut = Parameters()
415 394 self.counter = 0
416 395 self.path = None
417 396 self.fp = None
418 397
419 def run(self, dataOut, path, oneDDict, independentParam='[]', twoDDict='{}',
398 def run(self, dataOut, path, oneDDict, ind2DList='[]', twoDDict='{}',
420 399 metadata='{}', format='cedar', **kwargs):
421 '''
422 Inputs:
423 path - path where files will be created
424 oneDDict - json of one-dimensional parameters in record where keys
425 are Madrigal codes (integers or mnemonics) and values the corresponding
426 dataOut attribute e.g: {
427 'gdlatr': 'lat',
428 'gdlonr': 'lon',
429 'gdlat2':'lat',
430 'glon2':'lon'}
431 independentParam - list of independent spatial two-dimensional parameters e.g:
432 ['heigthList']
433 twoDDict - json of two-dimensional parameters in record where keys
434 are Madrigal codes (integers or mnemonics) and values the corresponding
435 dataOut attribute if multidimensional array specify as tupple
436 ('attr', pos) e.g: {
437 'gdalt': 'heightList',
438 'vn1p2': ('data_output', 0),
439 'vn2p2': ('data_output', 1),
440 'vn3': ('data_output', 2),
441 'snl': ('data_SNR', 'db')
442 }
443 metadata - json of madrigal metadata (kinst, kindat, catalog and header)
444 '''
400
445 401 if not self.isConfig:
446 self.setup(path, oneDDict, independentParam, twoDDict, metadata, format, **kwargs)
402 self.setup(path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs)
447 403 self.isConfig = True
448 404
449 405 self.dataOut = dataOut
450 406 self.putData()
451 407 return 1
452 408
453 def setup(self, path, oneDDict, independentParam, twoDDict, metadata, format, **kwargs):
409 def setup(self, path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs):
454 410 '''
455 411 Configure Operation
456 412 '''
457 413
458 414 self.path = path
459 415 self.blocks = kwargs.get('blocks', None)
460 416 self.counter = 0
461 417 self.oneDDict = load_json(oneDDict)
462 418 self.twoDDict = load_json(twoDDict)
463 self.independentParam = load_json(independentParam)
419 self.ind2DList = load_json(ind2DList)
464 420 meta = load_json(metadata)
465 421 self.kinst = meta.get('kinst')
466 422 self.kindat = meta.get('kindat')
467 423 self.catalog = meta.get('catalog', DEF_CATALOG)
468 424 self.header = meta.get('header', DEF_HEADER)
469 425 if format == 'cedar':
470 426 self.ext = '.dat'
471 427 self.extra_args = {}
472 428 elif format == 'hdf5':
473 429 self.ext = '.hdf5'
474 self.extra_args = {'independentParam': self.independentParam}
430 self.extra_args = {'ind2DList': self.ind2DList}
475 431
476 432 self.keys = [k.lower() for k in self.twoDDict]
477 433 if 'range' in self.keys:
478 434 self.keys.remove('range')
479 435 if 'gdalt' in self.keys:
480 436 self.keys.remove('gdalt')
481 437
482 438 def setFile(self):
483 439 '''
484 440 Create new cedar file object
485 441 '''
486 442
487 443 self.mnemonic = MNEMONICS[self.kinst] #TODO get mnemonic from madrigal
488 444 date = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
489 445
490 446 filename = '{}{}{}'.format(self.mnemonic,
491 447 date.strftime('%Y%m%d_%H%M%S'),
492 448 self.ext)
493 449
494 450 self.fullname = os.path.join(self.path, filename)
495 451
496 452 if os.path.isfile(self.fullname) :
497 453 log.warning(
498 454 'Destination file {} already exists, previous file deleted.'.format(
499 455 self.fullname),
500 456 'MADWriter')
501 457 os.remove(self.fullname)
502 458
503 459 try:
504 460 log.success(
505 461 'Creating file: {}'.format(self.fullname),
506 462 'MADWriter')
463 if not os.path.exists(self.path):
464 os.makedirs(self.path)
507 465 self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True)
508 466 except ValueError as e:
509 467 log.error(
510 468 'Impossible to create a cedar object with "madrigal.cedar.MadrigalCedarFile"',
511 469 'MADWriter')
512 470 return
513 471
514 472 return 1
515 473
516 474 def writeBlock(self):
517 475 '''
518 476 Add data records to cedar file taking data from oneDDict and twoDDict
519 477 attributes.
520 478 Allowed parameters in: parcodes.tab
521 479 '''
522 480
523 481 startTime = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
524 482 endTime = startTime + datetime.timedelta(seconds=self.dataOut.paramInterval)
525 483 heights = self.dataOut.heightList
526 484
527 485 if self.ext == '.dat':
528 486 for key, value in list(self.twoDDict.items()):
529 487 if isinstance(value, str):
530 488 data = getattr(self.dataOut, value)
531 489 invalid = numpy.isnan(data)
532 490 data[invalid] = self.missing
533 491 elif isinstance(value, (tuple, list)):
534 492 attr, key = value
535 493 data = getattr(self.dataOut, attr)
536 494 invalid = numpy.isnan(data)
537 495 data[invalid] = self.missing
538 496
539 497 out = {}
540 498 for key, value in list(self.twoDDict.items()):
541 499 key = key.lower()
542 500 if isinstance(value, str):
543 501 if 'db' in value.lower():
544 502 tmp = getattr(self.dataOut, value.replace('_db', ''))
545 503 SNRavg = numpy.average(tmp, axis=0)
546 504 tmp = 10*numpy.log10(SNRavg)
547 505 else:
548 506 tmp = getattr(self.dataOut, value)
549 out[key] = tmp.flatten()
507 out[key] = tmp.flatten()[:len(heights)]
550 508 elif isinstance(value, (tuple, list)):
551 509 attr, x = value
552 data = getattr(self.dataOut, attr)
553 out[key] = data[int(x)]
510 data = getattr(self.dataOut, attr)
511 out[key] = data[int(x)][:len(heights)]
554 512
555 513 a = numpy.array([out[k] for k in self.keys])
556 514 nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))])
557 515 index = numpy.where(nrows == False)[0]
558 516
559 517 rec = madrigal.cedar.MadrigalDataRecord(
560 518 self.kinst,
561 519 self.kindat,
562 520 startTime.year,
563 521 startTime.month,
564 522 startTime.day,
565 523 startTime.hour,
566 524 startTime.minute,
567 525 startTime.second,
568 526 startTime.microsecond/10000,
569 527 endTime.year,
570 528 endTime.month,
571 529 endTime.day,
572 530 endTime.hour,
573 531 endTime.minute,
574 532 endTime.second,
575 533 endTime.microsecond/10000,
576 534 list(self.oneDDict.keys()),
577 535 list(self.twoDDict.keys()),
578 536 len(index),
579 537 **self.extra_args
580 538 )
581 539
582 540 # Setting 1d values
583 541 for key in self.oneDDict:
584 542 rec.set1D(key, getattr(self.dataOut, self.oneDDict[key]))
585 543
586 544 # Setting 2d values
587 545 nrec = 0
588 546 for n in index:
589 547 for key in out:
590 548 rec.set2D(key, nrec, out[key][n])
591 549 nrec += 1
592 550
593 551 self.fp.append(rec)
594 552 if self.ext == '.hdf5' and self.counter % 500 == 0 and self.counter > 0:
595 553 self.fp.dump()
596 554 if self.counter % 20 == 0 and self.counter > 0:
597 555 log.log(
598 556 'Writing {} records'.format(
599 557 self.counter),
600 558 'MADWriter')
601 559
602 560 def setHeader(self):
603 561 '''
604 562 Create an add catalog and header to cedar file
605 563 '''
606 564
607 565 log.success('Closing file {}'.format(self.fullname), 'MADWriter')
608 566
609 567 if self.ext == '.dat':
610 568 self.fp.write()
611 569 else:
612 570 self.fp.dump()
613 571 self.fp.close()
614 572
615 573 header = madrigal.cedar.CatalogHeaderCreator(self.fullname)
616 574 header.createCatalog(**self.catalog)
617 575 header.createHeader(**self.header)
618 576 header.write()
619 577
620 578 def putData(self):
621 579
622 580 if self.dataOut.flagNoData:
623 581 return 0
624 582
625 583 if self.dataOut.flagDiscontinuousBlock or self.counter == self.blocks:
626 584 if self.counter > 0:
627 585 self.setHeader()
628 586 self.counter = 0
629 587
630 588 if self.counter == 0:
631 589 self.setFile()
632 590
633 591 self.writeBlock()
634 592 self.counter += 1
635 593
636 594 def close(self):
637 595
638 596 if self.counter > 0:
639 597 self.setHeader() No newline at end of file
@@ -1,1544 +1,1435
1 1 import numpy
2 2 import time
3 3 import os
4 4 import h5py
5 5 import re
6 6 import datetime
7 7
8 8 import schainpy.admin
9 9 from schainpy.model.data.jrodata import *
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.io.jroIO_base import *
12 12 from schainpy.utils import log
13 13
14 14 @MPDecorator
15 15 class ParamReader(JRODataReader,ProcessingUnit):
16 16 '''
17 17 Reads HDF5 format files
18 18 path
19 19 startDate
20 20 endDate
21 21 startTime
22 22 endTime
23 23 '''
24 24
25 25 ext = ".hdf5"
26 26 optchar = "D"
27 27 timezone = None
28 28 startTime = None
29 29 endTime = None
30 30 fileIndex = None
31 31 utcList = None #To select data in the utctime list
32 32 blockList = None #List to blocks to be read from the file
33 33 blocksPerFile = None #Number of blocks to be read
34 34 blockIndex = None
35 35 path = None
36 36 #List of Files
37 37 filenameList = None
38 38 datetimeList = None
39 39 #Hdf5 File
40 40 listMetaname = None
41 41 listMeta = None
42 42 listDataname = None
43 43 listData = None
44 44 listShapes = None
45 45 fp = None
46 46 #dataOut reconstruction
47 47 dataOut = None
48 48
49 49 def __init__(self):#, **kwargs):
50 50 ProcessingUnit.__init__(self) #, **kwargs)
51 51 self.dataOut = Parameters()
52 52 return
53 53
54 54 def setup(self, **kwargs):
55 55
56 56 path = kwargs['path']
57 57 startDate = kwargs['startDate']
58 58 endDate = kwargs['endDate']
59 59 startTime = kwargs['startTime']
60 60 endTime = kwargs['endTime']
61 61 walk = kwargs['walk']
62 62 if 'ext' in kwargs:
63 63 ext = kwargs['ext']
64 64 else:
65 65 ext = '.hdf5'
66 66 if 'timezone' in kwargs:
67 67 self.timezone = kwargs['timezone']
68 68 else:
69 69 self.timezone = 'lt'
70 70
71 71 print("[Reading] Searching files in offline mode ...")
72 72 pathList, filenameList = self.searchFilesOffLine(path, startDate=startDate, endDate=endDate,
73 73 startTime=startTime, endTime=endTime,
74 74 ext=ext, walk=walk)
75 75
76 76 if not(filenameList):
77 77 print("There is no files into the folder: %s"%(path))
78 78 sys.exit(-1)
79 79
80 80 self.fileIndex = -1
81 81 self.startTime = startTime
82 82 self.endTime = endTime
83 83
84 84 self.__readMetadata()
85 85
86 86 self.__setNextFileOffline()
87 87
88 88 return
89 89
90 90 def searchFilesOffLine(self,
91 91 path,
92 92 startDate=None,
93 93 endDate=None,
94 94 startTime=datetime.time(0,0,0),
95 95 endTime=datetime.time(23,59,59),
96 96 ext='.hdf5',
97 97 walk=True):
98 98
99 99 expLabel = ''
100 100 self.filenameList = []
101 101 self.datetimeList = []
102 102
103 103 pathList = []
104 104
105 105 JRODataObj = JRODataReader()
106 106 dateList, pathList = JRODataObj.findDatafiles(path, startDate, endDate, expLabel, ext, walk, include_path=True)
107 107
108 108 if dateList == []:
109 109 print("[Reading] No *%s files in %s from %s to %s)"%(ext, path,
110 110 datetime.datetime.combine(startDate,startTime).ctime(),
111 111 datetime.datetime.combine(endDate,endTime).ctime()))
112 112
113 113 return None, None
114 114
115 115 if len(dateList) > 1:
116 116 print("[Reading] %d days were found in date range: %s - %s" %(len(dateList), startDate, endDate))
117 117 else:
118 118 print("[Reading] data was found for the date %s" %(dateList[0]))
119 119
120 120 filenameList = []
121 121 datetimeList = []
122 122
123 123 #----------------------------------------------------------------------------------
124 124
125 125 for thisPath in pathList:
126 126
127 127 fileList = glob.glob1(thisPath, "*%s" %ext)
128 128 fileList.sort()
129 129
130 130 for file in fileList:
131 131
132 132 filename = os.path.join(thisPath,file)
133 133
134 134 if not isFileInDateRange(filename, startDate, endDate):
135 135 continue
136 136
137 137 thisDatetime = self.__isFileInTimeRange(filename, startDate, endDate, startTime, endTime)
138 138
139 139 if not(thisDatetime):
140 140 continue
141 141
142 142 filenameList.append(filename)
143 143 datetimeList.append(thisDatetime)
144 144
145 145 if not(filenameList):
146 146 print("[Reading] Any file was found int time range %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime()))
147 147 return None, None
148 148
149 149 print("[Reading] %d file(s) was(were) found in time range: %s - %s" %(len(filenameList), startTime, endTime))
150 150 print()
151 151
152 152 self.filenameList = filenameList
153 153 self.datetimeList = datetimeList
154 154
155 155 return pathList, filenameList
156 156
157 157 def __isFileInTimeRange(self,filename, startDate, endDate, startTime, endTime):
158 158
159 159 """
160 160 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
161 161
162 162 Inputs:
163 163 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
164 164 startDate : fecha inicial del rango seleccionado en formato datetime.date
165 165 endDate : fecha final del rango seleccionado en formato datetime.date
166 166 startTime : tiempo inicial del rango seleccionado en formato datetime.time
167 167 endTime : tiempo final del rango seleccionado en formato datetime.time
168 168
169 169 Return:
170 170 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
171 171 fecha especificado, de lo contrario retorna False.
172 172
173 173 Excepciones:
174 174 Si el archivo no existe o no puede ser abierto
175 175 Si la cabecera no puede ser leida.
176 176
177 177 """
178 178
179 179 try:
180 180 fp = h5py.File(filename,'r')
181 181 grp1 = fp['Data']
182 182
183 183 except IOError:
184 184 traceback.print_exc()
185 185 raise IOError("The file %s can't be opened" %(filename))
186 186
187 187 #In case has utctime attribute
188 188 grp2 = grp1['utctime']
189 189 # thisUtcTime = grp2.value[0] - 5*3600 #To convert to local time
190 190 thisUtcTime = grp2.value[0]
191 191
192 192 fp.close()
193 193
194 194 if self.timezone == 'lt':
195 195 thisUtcTime -= 5*3600
196 196
197 197 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600)
198 198 thisDate = thisDatetime.date()
199 199 thisTime = thisDatetime.time()
200 200
201 201 startUtcTime = (datetime.datetime.combine(thisDate,startTime)- datetime.datetime(1970, 1, 1)).total_seconds()
202 202 endUtcTime = (datetime.datetime.combine(thisDate,endTime)- datetime.datetime(1970, 1, 1)).total_seconds()
203 203
204 204 #General case
205 205 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
206 206 #-----------o----------------------------o-----------
207 207 # startTime endTime
208 208
209 209 if endTime >= startTime:
210 210 thisUtcLog = numpy.logical_and(thisUtcTime > startUtcTime, thisUtcTime < endUtcTime)
211 211 if numpy.any(thisUtcLog): #If there is one block between the hours mentioned
212 212 return thisDatetime
213 213 return None
214 214
215 215 #If endTime < startTime then endTime belongs to the next day
216 216 #<<<<<<<<<<<o o>>>>>>>>>>>
217 217 #-----------o----------------------------o-----------
218 218 # endTime startTime
219 219
220 220 if (thisDate == startDate) and numpy.all(thisUtcTime < startUtcTime):
221 221 return None
222 222
223 223 if (thisDate == endDate) and numpy.all(thisUtcTime > endUtcTime):
224 224 return None
225 225
226 226 if numpy.all(thisUtcTime < startUtcTime) and numpy.all(thisUtcTime > endUtcTime):
227 227 return None
228 228
229 229 return thisDatetime
230 230
231 231 def __setNextFileOffline(self):
232 232
233 233 self.fileIndex += 1
234 234 idFile = self.fileIndex
235 235
236 236 if not(idFile < len(self.filenameList)):
237 237 raise schainpy.admin.SchainError("No more Files")
238 238 return 0
239 239
240 240 filename = self.filenameList[idFile]
241 241 filePointer = h5py.File(filename,'r')
242 242 self.filename = filename
243 243 self.fp = filePointer
244 244
245 245 print("Setting the file: %s"%self.filename)
246 246
247 247 self.__setBlockList()
248 248 self.__readData()
249 249 self.blockIndex = 0
250 250 return 1
251 251
252 252 def __setBlockList(self):
253 253 '''
254 254 Selects the data within the times defined
255 255
256 256 self.fp
257 257 self.startTime
258 258 self.endTime
259 259
260 260 self.blockList
261 261 self.blocksPerFile
262 262
263 263 '''
264 264 fp = self.fp
265 265 startTime = self.startTime
266 266 endTime = self.endTime
267 267
268 268 grp = fp['Data']
269 269 thisUtcTime = grp['utctime'].value.astype(numpy.float)[0]
270 270
271 271 #ERROOOOR
272 272 if self.timezone == 'lt':
273 273 thisUtcTime -= 5*3600
274 274
275 275 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600)
276 276
277 277 thisDate = thisDatetime.date()
278 278 thisTime = thisDatetime.time()
279 279
280 280 startUtcTime = (datetime.datetime.combine(thisDate,startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
281 281 endUtcTime = (datetime.datetime.combine(thisDate,endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
282 282
283 283 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
284 284
285 285 self.blockList = ind
286 286 self.blocksPerFile = len(ind)
287 287
288 288 return
289 289
290 290 def __readMetadata(self):
291 291 '''
292 292 Reads Metadata
293 293
294 294 self.pathMeta
295 295 self.listShapes
296 296 self.listMetaname
297 297 self.listMeta
298 298
299 299 '''
300 300
301 301 filename = self.filenameList[0]
302 302 fp = h5py.File(filename,'r')
303 303 gp = fp['Metadata']
304 304
305 305 listMetaname = []
306 306 listMetadata = []
307 307 for item in list(gp.items()):
308 308 name = item[0]
309 309
310 310 if name=='array dimensions':
311 311 table = gp[name][:]
312 312 listShapes = {}
313 313 for shapes in table:
314 314 listShapes[shapes[0]] = numpy.array([shapes[1],shapes[2],shapes[3],shapes[4],shapes[5]])
315 315 else:
316 316 data = gp[name].value
317 317 listMetaname.append(name)
318 318 listMetadata.append(data)
319 319
320 320 self.listShapes = listShapes
321 321 self.listMetaname = listMetaname
322 322 self.listMeta = listMetadata
323 323
324 324 fp.close()
325 325 return
326 326
327 327 def __readData(self):
328 328 grp = self.fp['Data']
329 329 listdataname = []
330 330 listdata = []
331 331
332 332 for item in list(grp.items()):
333 333 name = item[0]
334 334 listdataname.append(name)
335 335
336 336 array = self.__setDataArray(grp[name],self.listShapes[name])
337 337 listdata.append(array)
338 338
339 339 self.listDataname = listdataname
340 340 self.listData = listdata
341 341 return
342 342
343 343 def __setDataArray(self, dataset, shapes):
344 344
345 345 nDims = shapes[0]
346 346 nDim2 = shapes[1] #Dimension 0
347 347 nDim1 = shapes[2] #Dimension 1, number of Points or Parameters
348 348 nDim0 = shapes[3] #Dimension 2, number of samples or ranges
349 349 mode = shapes[4] #Mode of storing
350 350 blockList = self.blockList
351 351 blocksPerFile = self.blocksPerFile
352 352
353 353 #Depending on what mode the data was stored
354 354 if mode == 0: #Divided in channels
355 355 arrayData = dataset.value.astype(numpy.float)[0][blockList]
356 356 if mode == 1: #Divided in parameter
357 357 strds = 'table'
358 358 nDatas = nDim1
359 359 newShapes = (blocksPerFile,nDim2,nDim0)
360 360 elif mode==2: #Concatenated in a table
361 361 strds = 'table0'
362 362 arrayData = dataset[strds].value
363 363 #Selecting part of the dataset
364 364 utctime = arrayData[:,0]
365 365 u, indices = numpy.unique(utctime, return_index=True)
366 366
367 367 if blockList.size != indices.size:
368 368 indMin = indices[blockList[0]]
369 369 if blockList[1] + 1 >= indices.size:
370 370 arrayData = arrayData[indMin:,:]
371 371 else:
372 372 indMax = indices[blockList[1] + 1]
373 373 arrayData = arrayData[indMin:indMax,:]
374 374 return arrayData
375 375
376 376 # One dimension
377 377 if nDims == 0:
378 378 arrayData = dataset.value.astype(numpy.float)[0][blockList]
379 379
380 380 # Two dimensions
381 381 elif nDims == 2:
382 382 arrayData = numpy.zeros((blocksPerFile,nDim1,nDim0))
383 383 newShapes = (blocksPerFile,nDim0)
384 384 nDatas = nDim1
385 385
386 386 for i in range(nDatas):
387 387 data = dataset[strds + str(i)].value
388 388 arrayData[:,i,:] = data[blockList,:]
389 389
390 390 # Three dimensions
391 391 else:
392 392 arrayData = numpy.zeros((blocksPerFile,nDim2,nDim1,nDim0))
393 393 for i in range(nDatas):
394 394
395 395 data = dataset[strds + str(i)].value
396 396
397 397 for b in range(blockList.size):
398 398 arrayData[b,:,i,:] = data[:,:,blockList[b]]
399 399
400 400 return arrayData
401 401
402 402 def __setDataOut(self):
403 403 listMeta = self.listMeta
404 404 listMetaname = self.listMetaname
405 405 listDataname = self.listDataname
406 406 listData = self.listData
407 407 listShapes = self.listShapes
408 408
409 409 blockIndex = self.blockIndex
410 410 # blockList = self.blockList
411 411
412 412 for i in range(len(listMeta)):
413 413 setattr(self.dataOut,listMetaname[i],listMeta[i])
414 414
415 415 for j in range(len(listData)):
416 416 nShapes = listShapes[listDataname[j]][0]
417 417 mode = listShapes[listDataname[j]][4]
418 418 if nShapes == 1:
419 419 setattr(self.dataOut,listDataname[j],listData[j][blockIndex])
420 420 elif nShapes > 1:
421 421 setattr(self.dataOut,listDataname[j],listData[j][blockIndex,:])
422 422 elif mode==0:
423 423 setattr(self.dataOut,listDataname[j],listData[j][blockIndex])
424 424 #Mode Meteors
425 425 elif mode ==2:
426 426 selectedData = self.__selectDataMode2(listData[j], blockIndex)
427 427 setattr(self.dataOut, listDataname[j], selectedData)
428 428 return
429 429
430 430 def __selectDataMode2(self, data, blockIndex):
431 431 utctime = data[:,0]
432 432 aux, indices = numpy.unique(utctime, return_inverse=True)
433 433 selInd = numpy.where(indices == blockIndex)[0]
434 434 selData = data[selInd,:]
435 435
436 436 return selData
437 437
438 438 def getData(self):
439 439
440 440 if self.blockIndex==self.blocksPerFile:
441 441 if not( self.__setNextFileOffline() ):
442 442 self.dataOut.flagNoData = True
443 443 return 0
444 444
445 445 self.__setDataOut()
446 446 self.dataOut.flagNoData = False
447 447
448 448 self.blockIndex += 1
449 449
450 450 return
451 451
452 452 def run(self, **kwargs):
453 453
454 454 if not(self.isConfig):
455 455 self.setup(**kwargs)
456 456 self.isConfig = True
457 457
458 458 self.getData()
459 459
460 460 return
461 461
462 462 @MPDecorator
463 463 class ParamWriter(Operation):
464 464 '''
465 465 HDF5 Writer, stores parameters data in HDF5 format files
466 466
467 467 path: path where the files will be stored
468 468 blocksPerFile: number of blocks that will be saved in per HDF5 format file
469 469 mode: selects the data stacking mode: '0' channels, '1' parameters, '3' table (for meteors)
470 470 metadataList: list of attributes that will be stored as metadata
471 471 dataList: list of attributes that will be stores as data
472 472 '''
473 473
474 474 ext = ".hdf5"
475 475 optchar = "D"
476 476 metaoptchar = "M"
477 477 metaFile = None
478 478 filename = None
479 479 path = None
480 480 setFile = None
481 481 fp = None
482 482 grp = None
483 483 ds = None
484 484 firsttime = True
485 485 #Configurations
486 486 blocksPerFile = None
487 487 blockIndex = None
488 488 dataOut = None
489 489 #Data Arrays
490 490 dataList = None
491 491 metadataList = None
492 492 dsList = None #List of dictionaries with dataset properties
493 493 tableDim = None
494 494 dtype = [('arrayName', 'S20'),('nDimensions', 'i'), ('dim2', 'i'), ('dim1', 'i'),('dim0', 'i'),('mode', 'b')]
495 495 currentDay = None
496 496 lastTime = None
497 497 setType = None
498 498
499 499 def __init__(self):
500 500
501 501 Operation.__init__(self)
502 502 return
503 503
504 504 def setup(self, dataOut, path=None, blocksPerFile=10, metadataList=None, dataList=None, mode=None, setType=None):
505 505 self.path = path
506 506 self.blocksPerFile = blocksPerFile
507 507 self.metadataList = metadataList
508 508 self.dataList = dataList
509 509 self.dataOut = dataOut
510 510 self.mode = mode
511 511 if self.mode is not None:
512 512 self.mode = numpy.zeros(len(self.dataList)) + mode
513 513 else:
514 514 self.mode = numpy.ones(len(self.dataList))
515 515
516 516 self.setType = setType
517 517
518 518 arrayDim = numpy.zeros((len(self.dataList),5))
519 519
520 520 #Table dimensions
521 521 dtype0 = self.dtype
522 522 tableList = []
523 523
524 524 #Dictionary and list of tables
525 525 dsList = []
526 526
527 527 for i in range(len(self.dataList)):
528 528 dsDict = {}
529 529 dataAux = getattr(self.dataOut, self.dataList[i])
530 530 dsDict['variable'] = self.dataList[i]
531 531 #--------------------- Conditionals ------------------------
532 532 #There is no data
533 533
534 534 if dataAux is None:
535 535
536 536 return 0
537 537
538 538 if isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
539 539 dsDict['mode'] = 0
540 540 dsDict['nDim'] = 0
541 541 arrayDim[i,0] = 0
542 542 dsList.append(dsDict)
543 543
544 544 #Mode 2: meteors
545 545 elif self.mode[i] == 2:
546 546 dsDict['dsName'] = 'table0'
547 547 dsDict['mode'] = 2 # Mode meteors
548 548 dsDict['shape'] = dataAux.shape[-1]
549 549 dsDict['nDim'] = 0
550 550 dsDict['dsNumber'] = 1
551 551 arrayDim[i,3] = dataAux.shape[-1]
552 552 arrayDim[i,4] = self.mode[i] #Mode the data was stored
553 553 dsList.append(dsDict)
554 554
555 555 #Mode 1
556 556 else:
557 557 arrayDim0 = dataAux.shape #Data dimensions
558 558 arrayDim[i,0] = len(arrayDim0) #Number of array dimensions
559 559 arrayDim[i,4] = self.mode[i] #Mode the data was stored
560 560 strtable = 'table'
561 561 dsDict['mode'] = 1 # Mode parameters
562 562
563 563 # Three-dimension arrays
564 564 if len(arrayDim0) == 3:
565 565 arrayDim[i,1:-1] = numpy.array(arrayDim0)
566 566 nTables = int(arrayDim[i,2])
567 567 dsDict['dsNumber'] = nTables
568 568 dsDict['shape'] = arrayDim[i,2:4]
569 569 dsDict['nDim'] = 3
570 570
571 571 for j in range(nTables):
572 572 dsDict = dsDict.copy()
573 573 dsDict['dsName'] = strtable + str(j)
574 574 dsList.append(dsDict)
575 575
576 576 # Two-dimension arrays
577 577 elif len(arrayDim0) == 2:
578 578 arrayDim[i,2:-1] = numpy.array(arrayDim0)
579 579 nTables = int(arrayDim[i,2])
580 580 dsDict['dsNumber'] = nTables
581 581 dsDict['shape'] = arrayDim[i,3]
582 582 dsDict['nDim'] = 2
583 583
584 584 for j in range(nTables):
585 585 dsDict = dsDict.copy()
586 586 dsDict['dsName'] = strtable + str(j)
587 587 dsList.append(dsDict)
588 588
589 589 # One-dimension arrays
590 590 elif len(arrayDim0) == 1:
591 591 arrayDim[i,3] = arrayDim0[0]
592 592 dsDict['shape'] = arrayDim0[0]
593 593 dsDict['dsNumber'] = 1
594 594 dsDict['dsName'] = strtable + str(0)
595 595 dsDict['nDim'] = 1
596 596 dsList.append(dsDict)
597 597
598 598 table = numpy.array((self.dataList[i],) + tuple(arrayDim[i,:]),dtype = dtype0)
599 599 tableList.append(table)
600 600
601 601 self.dsList = dsList
602 602 self.tableDim = numpy.array(tableList, dtype = dtype0)
603 603 self.blockIndex = 0
604 604 timeTuple = time.localtime(dataOut.utctime)
605 605 self.currentDay = timeTuple.tm_yday
606 606
607 607 def putMetadata(self):
608 608
609 609 fp = self.createMetadataFile()
610 610 self.writeMetadata(fp)
611 611 fp.close()
612 612 return
613 613
614 614 def createMetadataFile(self):
615 615 ext = self.ext
616 616 path = self.path
617 617 setFile = self.setFile
618 618
619 619 timeTuple = time.localtime(self.dataOut.utctime)
620 620
621 621 subfolder = ''
622 622 fullpath = os.path.join( path, subfolder )
623 623
624 624 if not( os.path.exists(fullpath) ):
625 625 os.mkdir(fullpath)
626 626 setFile = -1 #inicializo mi contador de seteo
627 627
628 628 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
629 629 fullpath = os.path.join( path, subfolder )
630 630
631 631 if not( os.path.exists(fullpath) ):
632 632 os.mkdir(fullpath)
633 633 setFile = -1 #inicializo mi contador de seteo
634 634
635 635 else:
636 636 filesList = os.listdir( fullpath )
637 637 filesList = sorted( filesList, key=str.lower )
638 638 if len( filesList ) > 0:
639 639 filesList = [k for k in filesList if k.startswith(self.metaoptchar)]
640 640 filen = filesList[-1]
641 641 # el filename debera tener el siguiente formato
642 642 # 0 1234 567 89A BCDE (hex)
643 643 # x YYYY DDD SSS .ext
644 644 if isNumber( filen[8:11] ):
645 645 setFile = int( filen[8:11] ) #inicializo mi contador de seteo al seteo del ultimo file
646 646 else:
647 647 setFile = -1
648 648 else:
649 649 setFile = -1 #inicializo mi contador de seteo
650 650
651 651 if self.setType is None:
652 652 setFile += 1
653 653 file = '%s%4.4d%3.3d%03d%s' % (self.metaoptchar,
654 654 timeTuple.tm_year,
655 655 timeTuple.tm_yday,
656 656 setFile,
657 657 ext )
658 658 else:
659 659 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
660 660 file = '%s%4.4d%3.3d%04d%s' % (self.metaoptchar,
661 661 timeTuple.tm_year,
662 662 timeTuple.tm_yday,
663 663 setFile,
664 664 ext )
665 665
666 666 filename = os.path.join( path, subfolder, file )
667 667 self.metaFile = file
668 668 #Setting HDF5 File
669 669 fp = h5py.File(filename,'w')
670 670
671 671 return fp
672 672
673 673 def writeMetadata(self, fp):
674 674
675 675 grp = fp.create_group("Metadata")
676 676 grp.create_dataset('array dimensions', data = self.tableDim, dtype = self.dtype)
677 677
678 678 for i in range(len(self.metadataList)):
679 679 grp.create_dataset(self.metadataList[i], data=getattr(self.dataOut, self.metadataList[i]))
680 680 return
681 681
682 682 def timeFlag(self):
683 683 currentTime = self.dataOut.utctime
684 684
685 685 if self.lastTime is None:
686 686 self.lastTime = currentTime
687 687
688 688 #Day
689 689 timeTuple = time.localtime(currentTime)
690 690 dataDay = timeTuple.tm_yday
691 691
692 692 #Time
693 693 timeDiff = currentTime - self.lastTime
694 694
695 695 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
696 696 if dataDay != self.currentDay:
697 697 self.currentDay = dataDay
698 698 return True
699 699 elif timeDiff > 3*60*60:
700 700 self.lastTime = currentTime
701 701 return True
702 702 else:
703 703 self.lastTime = currentTime
704 704 return False
705 705
706 706 def setNextFile(self):
707 707
708 708 ext = self.ext
709 709 path = self.path
710 710 setFile = self.setFile
711 711 mode = self.mode
712 712
713 713 timeTuple = time.localtime(self.dataOut.utctime)
714 714 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
715 715
716 716 fullpath = os.path.join( path, subfolder )
717 717
718 718 if os.path.exists(fullpath):
719 719 filesList = os.listdir( fullpath )
720 720 filesList = [k for k in filesList if 'M' in k]
721 721 if len( filesList ) > 0:
722 722 filesList = sorted( filesList, key=str.lower )
723 723 filen = filesList[-1]
724 724 # el filename debera tener el siguiente formato
725 725 # 0 1234 567 89A BCDE (hex)
726 726 # x YYYY DDD SSS .ext
727 727 if isNumber( filen[8:11] ):
728 728 setFile = int( filen[8:11] ) #inicializo mi contador de seteo al seteo del ultimo file
729 729 else:
730 730 setFile = -1
731 731 else:
732 732 setFile = -1 #inicializo mi contador de seteo
733 733 else:
734 734 os.makedirs(fullpath)
735 735 setFile = -1 #inicializo mi contador de seteo
736 736
737 737 if self.setType is None:
738 738 setFile += 1
739 739 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
740 740 timeTuple.tm_year,
741 741 timeTuple.tm_yday,
742 742 setFile,
743 743 ext )
744 744 else:
745 745 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
746 746 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
747 747 timeTuple.tm_year,
748 748 timeTuple.tm_yday,
749 749 setFile,
750 750 ext )
751 751
752 752 filename = os.path.join( path, subfolder, file )
753 753
754 754 #Setting HDF5 File
755 755 fp = h5py.File(filename,'w')
756 756 #write metadata
757 757 self.writeMetadata(fp)
758 758 #Write data
759 759 grp = fp.create_group("Data")
760 760 ds = []
761 761 data = []
762 762 dsList = self.dsList
763 763 i = 0
764 764 while i < len(dsList):
765 765 dsInfo = dsList[i]
766 766 #One-dimension data
767 767 if dsInfo['mode'] == 0:
768 768 ds0 = grp.create_dataset(dsInfo['variable'], (1,1), maxshape=(1,self.blocksPerFile) , chunks = True, dtype=numpy.float64)
769 769 ds.append(ds0)
770 770 data.append([])
771 771 i += 1
772 772 continue
773 773
774 774 elif dsInfo['mode'] == 2:
775 775 grp0 = grp.create_group(dsInfo['variable'])
776 776 ds0 = grp0.create_dataset(dsInfo['dsName'], (1,dsInfo['shape']), data = numpy.zeros((1,dsInfo['shape'])) , maxshape=(None,dsInfo['shape']), chunks=True)
777 777 ds.append(ds0)
778 778 data.append([])
779 779 i += 1
780 780 continue
781 781
782 782 elif dsInfo['mode'] == 1:
783 783 grp0 = grp.create_group(dsInfo['variable'])
784 784
785 785 for j in range(dsInfo['dsNumber']):
786 786 dsInfo = dsList[i]
787 787 tableName = dsInfo['dsName']
788 788
789 789
790 790 if dsInfo['nDim'] == 3:
791 791 shape = dsInfo['shape'].astype(int)
792 792 ds0 = grp0.create_dataset(tableName, (shape[0],shape[1],1) , data = numpy.zeros((shape[0],shape[1],1)), maxshape = (None,shape[1],None), chunks=True)
793 793 else:
794 794 shape = int(dsInfo['shape'])
795 795 ds0 = grp0.create_dataset(tableName, (1,shape), data = numpy.zeros((1,shape)) , maxshape=(None,shape), chunks=True)
796 796
797 797 ds.append(ds0)
798 798 data.append([])
799 799 i += 1
800 800
801 801 fp.flush()
802 802 fp.close()
803 803
804 804 log.log('creating file: {}'.format(filename), 'Writing')
805 805 self.filename = filename
806 806 self.ds = ds
807 807 self.data = data
808 808 self.firsttime = True
809 809 self.blockIndex = 0
810 810 return
811 811
812 812 def putData(self):
813 813
814 814 if self.blockIndex == self.blocksPerFile or self.timeFlag():
815 815 self.setNextFile()
816 816
817 817 self.readBlock()
818 818 self.setBlock() #Prepare data to be written
819 819 self.writeBlock() #Write data
820 820
821 821 return
822 822
823 823 def readBlock(self):
824 824
825 825 '''
826 826 data Array configured
827 827
828 828
829 829 self.data
830 830 '''
831 831 dsList = self.dsList
832 832 ds = self.ds
833 833 #Setting HDF5 File
834 834 fp = h5py.File(self.filename,'r+')
835 835 grp = fp["Data"]
836 836 ind = 0
837 837
838 838 while ind < len(dsList):
839 839 dsInfo = dsList[ind]
840 840
841 841 if dsInfo['mode'] == 0:
842 842 ds0 = grp[dsInfo['variable']]
843 843 ds[ind] = ds0
844 844 ind += 1
845 845 else:
846 846
847 847 grp0 = grp[dsInfo['variable']]
848 848
849 849 for j in range(dsInfo['dsNumber']):
850 850 dsInfo = dsList[ind]
851 851 ds0 = grp0[dsInfo['dsName']]
852 852 ds[ind] = ds0
853 853 ind += 1
854 854
855 855 self.fp = fp
856 856 self.grp = grp
857 857 self.ds = ds
858 858
859 859 return
860 860
861 861 def setBlock(self):
862 862 '''
863 863 data Array configured
864 864
865 865
866 866 self.data
867 867 '''
868 868 #Creating Arrays
869 869 dsList = self.dsList
870 870 data = self.data
871 871 ind = 0
872 872
873 873 while ind < len(dsList):
874 874 dsInfo = dsList[ind]
875 875 dataAux = getattr(self.dataOut, dsInfo['variable'])
876 876
877 877 mode = dsInfo['mode']
878 878 nDim = dsInfo['nDim']
879 879
880 880 if mode == 0 or mode == 2 or nDim == 1:
881 881 data[ind] = dataAux
882 882 ind += 1
883 883 # elif nDim == 1:
884 884 # data[ind] = numpy.reshape(dataAux,(numpy.size(dataAux),1))
885 885 # ind += 1
886 886 elif nDim == 2:
887 887 for j in range(dsInfo['dsNumber']):
888 888 data[ind] = dataAux[j,:]
889 889 ind += 1
890 890 elif nDim == 3:
891 891 for j in range(dsInfo['dsNumber']):
892 892 data[ind] = dataAux[:,j,:]
893 893 ind += 1
894 894
895 895 self.data = data
896 896 return
897 897
898 898 def writeBlock(self):
899 899 '''
900 900 Saves the block in the HDF5 file
901 901 '''
902 902 dsList = self.dsList
903 903
904 904 for i in range(len(self.ds)):
905 905 dsInfo = dsList[i]
906 906 nDim = dsInfo['nDim']
907 907 mode = dsInfo['mode']
908 908
909 909 # First time
910 910 if self.firsttime:
911 911 if type(self.data[i]) == numpy.ndarray:
912 912
913 913 if nDim == 3:
914 914 self.data[i] = self.data[i].reshape((self.data[i].shape[0],self.data[i].shape[1],1))
915 915 self.ds[i].resize(self.data[i].shape)
916 916 if mode == 2:
917 917 self.ds[i].resize(self.data[i].shape)
918 918 self.ds[i][:] = self.data[i]
919 919 else:
920 920
921 921 # From second time
922 922 # Meteors!
923 923 if mode == 2:
924 924 dataShape = self.data[i].shape
925 925 dsShape = self.ds[i].shape
926 926 self.ds[i].resize((self.ds[i].shape[0] + dataShape[0],self.ds[i].shape[1]))
927 927 self.ds[i][dsShape[0]:,:] = self.data[i]
928 928 # No dimension
929 929 elif mode == 0:
930 930 self.ds[i].resize((self.ds[i].shape[0], self.ds[i].shape[1] + 1))
931 931 self.ds[i][0,-1] = self.data[i]
932 932 # One dimension
933 933 elif nDim == 1:
934 934 self.ds[i].resize((self.ds[i].shape[0] + 1, self.ds[i].shape[1]))
935 935 self.ds[i][-1,:] = self.data[i]
936 936 # Two dimension
937 937 elif nDim == 2:
938 938 self.ds[i].resize((self.ds[i].shape[0] + 1,self.ds[i].shape[1]))
939 939 self.ds[i][self.blockIndex,:] = self.data[i]
940 940 # Three dimensions
941 941 elif nDim == 3:
942 942 self.ds[i].resize((self.ds[i].shape[0],self.ds[i].shape[1],self.ds[i].shape[2]+1))
943 943 self.ds[i][:,:,-1] = self.data[i]
944 944
945 945 self.firsttime = False
946 946 self.blockIndex += 1
947 947
948 948 #Close to save changes
949 949 self.fp.flush()
950 950 self.fp.close()
951 951 return
952 952
953 953 def run(self, dataOut, path, blocksPerFile=10, metadataList=None, dataList=None, mode=None, setType=None):
954 954
955 955 self.dataOut = dataOut
956 956 if not(self.isConfig):
957 957 self.setup(dataOut, path=path, blocksPerFile=blocksPerFile,
958 958 metadataList=metadataList, dataList=dataList, mode=mode,
959 959 setType=setType)
960 960
961 961 self.isConfig = True
962 962 self.setNextFile()
963 963
964 964 self.putData()
965 965 return
966 966
967 967
968 968 @MPDecorator
969 class ParameterReader(JRODataReader,ProcessingUnit):
969 class ParameterReader(Reader, ProcessingUnit):
970 970 '''
971 971 Reads HDF5 format files
972 972 '''
973 973
974 ext = ".hdf5"
975 optchar = "D"
976 timezone = None
977 startTime = None
978 endTime = None
979 fileIndex = None
980 blockList = None #List to blocks to be read from the file
981 blocksPerFile = None #Number of blocks to be read
982 blockIndex = None
983 path = None
984 #List of Files
985 filenameList = None
986 datetimeList = None
987 #Hdf5 File
988 listMetaname = None
989 listMeta = None
990 listDataname = None
991 listData = None
992 listShapes = None
993 fp = None
994 #dataOut reconstruction
995 dataOut = None
996
997 974 def __init__(self):
998 975 ProcessingUnit.__init__(self)
999 976 self.dataOut = Parameters()
1000 return
977 self.ext = ".hdf5"
978 self.optchar = "D"
979 self.timezone = "lt"
980 self.listMetaname = []
981 self.listMeta = []
982 self.listDataname = []
983 self.listData = []
984 self.listShapes = []
985 self.open_file = h5py.File
986 self.open_mode = 'r'
987 self.metadata = False
988 self.filefmt = "*%Y%j***"
989 self.folderfmt = "*%Y%j"
1001 990
1002 991 def setup(self, **kwargs):
1003 992
1004 path = kwargs['path']
1005 startDate = kwargs['startDate']
1006 endDate = kwargs['endDate']
1007 startTime = kwargs['startTime']
1008 endTime = kwargs['endTime']
1009 walk = kwargs['walk']
1010 if 'ext' in kwargs:
1011 ext = kwargs['ext']
1012 else:
1013 ext = '.hdf5'
1014 if 'timezone' in kwargs:
1015 self.timezone = kwargs['timezone']
1016 else:
1017 self.timezone = 'lt'
1018
1019 print("[Reading] Searching files in offline mode ...")
1020 pathList, filenameList = self.searchFilesOffLine(path, startDate=startDate, endDate=endDate,
1021 startTime=startTime, endTime=endTime,
1022 ext=ext, walk=walk)
993 self.set_kwargs(**kwargs)
994 if not self.ext.startswith('.'):
995 self.ext = '.{}'.format(self.ext)
1023 996
1024 if not(filenameList):
1025 print("There is no files into the folder: %s"%(path))
1026 sys.exit(-1)
1027
1028 self.fileIndex = -1
1029 self.startTime = startTime
1030 self.endTime = endTime
1031 self.__readMetadata()
1032 self.__setNextFileOffline()
1033
1034 return
997 if self.online:
998 log.log("Searching files in online mode...", self.name)
1035 999
1036 def searchFilesOffLine(self, path, startDate=None, endDate=None, startTime=datetime.time(0,0,0), endTime=datetime.time(23,59,59), ext='.hdf5', walk=True):
1000 for nTries in range(self.nTries):
1001 fullpath = self.searchFilesOnLine(self.path, self.startDate,
1002 self.endDate, self.expLabel, self.ext, self.walk,
1003 self.filefmt, self.folderfmt)
1037 1004
1038 expLabel = ''
1039 self.filenameList = []
1040 self.datetimeList = []
1041 pathList = []
1042 dateList, pathList = self.findDatafiles(path, startDate, endDate, expLabel, ext, walk, include_path=True)
1043
1044 if dateList == []:
1045 print("[Reading] No *%s files in %s from %s to %s)"%(ext, path,
1046 datetime.datetime.combine(startDate,startTime).ctime(),
1047 datetime.datetime.combine(endDate,endTime).ctime()))
1048
1049 return None, None
1050
1051 if len(dateList) > 1:
1052 print("[Reading] %d days were found in date range: %s - %s" %(len(dateList), startDate, endDate))
1005 try:
1006 fullpath = next(fullpath)
1007 except:
1008 fullpath = None
1009
1010 if fullpath:
1011 break
1012
1013 log.warning(
1014 'Waiting {} sec for a valid file in {}: try {} ...'.format(
1015 self.delay, self.path, nTries + 1),
1016 self.name)
1017 time.sleep(self.delay)
1018
1019 if not(fullpath):
1020 raise schainpy.admin.SchainError(
1021 'There isn\'t any valid file in {}'.format(self.path))
1022
1023 pathname, filename = os.path.split(fullpath)
1024 self.year = int(filename[1:5])
1025 self.doy = int(filename[5:8])
1026 self.set = int(filename[8:11]) - 1
1053 1027 else:
1054 print("[Reading] data was found for the date %s" %(dateList[0]))
1055
1056 filenameList = []
1057 datetimeList = []
1058
1059 for thisPath in pathList:
1060
1061 fileList = glob.glob1(thisPath, "*%s" %ext)
1062 fileList.sort()
1063
1064 for file in fileList:
1065
1066 filename = os.path.join(thisPath,file)
1067
1068 if not isFileInDateRange(filename, startDate, endDate):
1069 continue
1070
1071 thisDatetime = self.__isFileInTimeRange(filename, startDate, endDate, startTime, endTime)
1072
1073 if not(thisDatetime):
1074 continue
1075
1076 filenameList.append(filename)
1077 datetimeList.append(thisDatetime)
1078
1079 if not(filenameList):
1080 print("[Reading] Any file was found int time range %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime()))
1081 return None, None
1082
1083 print("[Reading] %d file(s) was(were) found in time range: %s - %s" %(len(filenameList), startTime, endTime))
1084 print()
1085
1086 self.filenameList = filenameList
1087 self.datetimeList = datetimeList
1088
1089 return pathList, filenameList
1090
1091 def __isFileInTimeRange(self,filename, startDate, endDate, startTime, endTime):
1092
1093 """
1094 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
1095
1096 Inputs:
1097 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
1098 startDate : fecha inicial del rango seleccionado en formato datetime.date
1099 endDate : fecha final del rango seleccionado en formato datetime.date
1100 startTime : tiempo inicial del rango seleccionado en formato datetime.time
1101 endTime : tiempo final del rango seleccionado en formato datetime.time
1102
1103 Return:
1104 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
1105 fecha especificado, de lo contrario retorna False.
1106
1107 Excepciones:
1108 Si el archivo no existe o no puede ser abierto
1109 Si la cabecera no puede ser leida.
1110
1111 """
1112
1113 try:
1114 fp = h5py.File(filename, 'r')
1115 grp1 = fp['Data']
1116
1117 except IOError:
1118 traceback.print_exc()
1119 raise IOError("The file %s can't be opened" %(filename))
1120 #In case has utctime attribute
1121 grp2 = grp1['utctime']
1122 thisUtcTime = grp2.value[0]
1123
1124 fp.close()
1125
1126 if self.timezone == 'lt':
1127 thisUtcTime -= 5*3600
1128
1129 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime + 5*3600)
1130 thisDate = thisDatetime.date()
1131 thisTime = thisDatetime.time()
1132
1133 startUtcTime = (datetime.datetime.combine(thisDate,startTime)- datetime.datetime(1970, 1, 1)).total_seconds()
1134 endUtcTime = (datetime.datetime.combine(thisDate,endTime)- datetime.datetime(1970, 1, 1)).total_seconds()
1135
1136 #General case
1137 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
1138 #-----------o----------------------------o-----------
1139 # startTime endTime
1140
1141 if endTime >= startTime:
1142 thisUtcLog = numpy.logical_and(thisUtcTime > startUtcTime, thisUtcTime < endUtcTime)
1143 if numpy.any(thisUtcLog): #If there is one block between the hours mentioned
1144 return thisDatetime
1145 return None
1146
1147 #If endTime < startTime then endTime belongs to the next day
1148 #<<<<<<<<<<<o o>>>>>>>>>>>
1149 #-----------o----------------------------o-----------
1150 # endTime startTime
1151
1152 if (thisDate == startDate) and numpy.all(thisUtcTime < startUtcTime):
1153 return None
1154
1155 if (thisDate == endDate) and numpy.all(thisUtcTime > endUtcTime):
1156 return None
1157
1158 if numpy.all(thisUtcTime < startUtcTime) and numpy.all(thisUtcTime > endUtcTime):
1159 return None
1160
1161 return thisDatetime
1162
1163 def __setNextFileOffline(self):
1164
1165 self.fileIndex += 1
1166 idFile = self.fileIndex
1167
1168 if not(idFile < len(self.filenameList)):
1169 raise schainpy.admin.SchainError('No more files')
1028 log.log("Searching files in {}".format(self.path), self.name)
1029 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
1030 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
1031
1032 self.setNextFile()
1170 1033
1171 filename = self.filenameList[idFile]
1172 self.fp = h5py.File(filename, 'r')
1173 self.filename = filename
1034 return
1174 1035
1175 print("Setting the file: %s"%self.filename)
1036 def readFirstHeader(self):
1037 '''Read metadata and data'''
1176 1038
1177 self.__setBlockList()
1039 self.__readMetadata()
1178 1040 self.__readData()
1041 self.__setBlockList()
1179 1042 self.blockIndex = 0
1180 return 1
1043
1044 return
1181 1045
1182 1046 def __setBlockList(self):
1183 1047 '''
1184 1048 Selects the data within the times defined
1185 1049
1186 1050 self.fp
1187 1051 self.startTime
1188 1052 self.endTime
1189 1053 self.blockList
1190 1054 self.blocksPerFile
1191 1055
1192 1056 '''
1193 fp = self.fp
1057
1194 1058 startTime = self.startTime
1195 1059 endTime = self.endTime
1196 1060
1197 grp = fp['Data']
1198 thisUtcTime = grp['utctime'].value
1061 index = self.listDataname.index('utctime')
1062 thisUtcTime = self.listData[index]
1063 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
1199 1064
1200 1065 if self.timezone == 'lt':
1201 1066 thisUtcTime -= 5*3600
1202 1067
1203 1068 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600)
1204 1069
1205 1070 thisDate = thisDatetime.date()
1206 1071 thisTime = thisDatetime.time()
1207 1072
1208 1073 startUtcTime = (datetime.datetime.combine(thisDate,startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
1209 1074 endUtcTime = (datetime.datetime.combine(thisDate,endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
1210 1075
1211 1076 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
1212 1077
1213 1078 self.blockList = ind
1214 1079 self.blocksPerFile = len(ind)
1215 1080 return
1216 1081
1217 1082 def __readMetadata(self):
1218 1083 '''
1219 1084 Reads Metadata
1220 1085 '''
1221 1086
1222 filename = self.filenameList[0]
1223 fp = h5py.File(filename, 'r')
1224 gp = fp['Metadata']
1225 1087 listMetaname = []
1226 1088 listMetadata = []
1227
1228 for item in list(gp.items()):
1229 name = item[0]
1230
1231 if name=='variables':
1232 table = gp[name][:]
1233 listShapes = {}
1234 for shapes in table:
1235 listShapes[shapes[0].decode()] = numpy.array([shapes[1]])
1236 else:
1237 data = gp[name].value
1238 listMetaname.append(name)
1239 listMetadata.append(data)
1089 if 'Metadata' in self.fp:
1090 gp = self.fp['Metadata']
1091 for item in list(gp.items()):
1092 name = item[0]
1093
1094 if name=='variables':
1095 table = gp[name][:]
1096 listShapes = {}
1097 for shapes in table:
1098 listShapes[shapes[0].decode()] = numpy.array([shapes[1]])
1099 else:
1100 data = gp[name].value
1101 listMetaname.append(name)
1102 listMetadata.append(data)
1103 elif self.metadata:
1104 metadata = json.loads(self.metadata)
1105 listShapes = {}
1106 for tup in metadata:
1107 name, values, dim = tup
1108 if dim == -1:
1109 listMetaname.append(name)
1110 listMetadata.append(self.fp[values].value)
1111 else:
1112 listShapes[name] = numpy.array([dim])
1113 else:
1114 raise IOError('Missing Metadata group in file or metadata info')
1240 1115
1241 1116 self.listShapes = listShapes
1242 1117 self.listMetaname = listMetaname
1243 self.listMeta = listMetadata
1118 self.listMeta = listMetadata
1244 1119
1245 fp.close()
1246 1120 return
1247 1121
1248 1122 def __readData(self):
1249 1123
1250 grp = self.fp['Data']
1251 1124 listdataname = []
1252 1125 listdata = []
1253
1254 for item in list(grp.items()):
1255 name = item[0]
1256 listdataname.append(name)
1257 dim = self.listShapes[name][0]
1258 if dim == 0:
1259 array = grp[name].value
1260 else:
1261 array = []
1262 for i in range(dim):
1263 array.append(grp[name]['table{:02d}'.format(i)].value)
1264 array = numpy.array(array)
1265
1266 listdata.append(array)
1126
1127 if 'Data' in self.fp:
1128 grp = self.fp['Data']
1129 for item in list(grp.items()):
1130 name = item[0]
1131 listdataname.append(name)
1132 dim = self.listShapes[name][0]
1133 if dim == 0:
1134 array = grp[name].value
1135 else:
1136 array = []
1137 for i in range(dim):
1138 array.append(grp[name]['table{:02d}'.format(i)].value)
1139 array = numpy.array(array)
1140
1141 listdata.append(array)
1142 elif self.metadata:
1143 metadata = json.loads(self.metadata)
1144 for tup in metadata:
1145 name, values, dim = tup
1146 listdataname.append(name)
1147 if dim == -1:
1148 continue
1149 elif dim == 0:
1150 array = self.fp[values].value
1151 else:
1152 array = []
1153 for var in values:
1154 array.append(self.fp[var].value)
1155 array = numpy.array(array)
1156 listdata.append(array)
1157 else:
1158 raise IOError('Missing Data group in file or metadata info')
1267 1159
1268 1160 self.listDataname = listdataname
1269 1161 self.listData = listdata
1270 1162 return
1271 1163
1272 1164 def getData(self):
1273 1165
1274 1166 for i in range(len(self.listMeta)):
1275 1167 setattr(self.dataOut, self.listMetaname[i], self.listMeta[i])
1276 1168
1277 1169 for j in range(len(self.listData)):
1278 1170 dim = self.listShapes[self.listDataname[j]][0]
1279 1171 if dim == 0:
1280 1172 setattr(self.dataOut, self.listDataname[j], self.listData[j][self.blockIndex])
1281 1173 else:
1282 1174 setattr(self.dataOut, self.listDataname[j], self.listData[j][:,self.blockIndex])
1283 1175
1176 self.dataOut.paramInterval = self.interval
1284 1177 self.dataOut.flagNoData = False
1285 1178 self.blockIndex += 1
1286 1179
1287 1180 return
1288 1181
1289 1182 def run(self, **kwargs):
1290 1183
1291 1184 if not(self.isConfig):
1292 1185 self.setup(**kwargs)
1293 1186 self.isConfig = True
1294 1187
1295 1188 if self.blockIndex == self.blocksPerFile:
1296 if not(self.__setNextFileOffline()):
1297 self.dataOut.flagNoData = True
1298 return 0
1189 self.setNextFile()
1299 1190
1300 1191 self.getData()
1301 1192
1302 1193 return
1303 1194
1304 1195 @MPDecorator
1305 1196 class ParameterWriter(Operation):
1306 1197 '''
1307 1198 HDF5 Writer, stores parameters data in HDF5 format files
1308 1199
1309 1200 path: path where the files will be stored
1310 1201 blocksPerFile: number of blocks that will be saved in per HDF5 format file
1311 1202 mode: selects the data stacking mode: '0' channels, '1' parameters, '3' table (for meteors)
1312 1203 metadataList: list of attributes that will be stored as metadata
1313 1204 dataList: list of attributes that will be stores as data
1314 1205 '''
1315 1206
1316 1207
1317 1208 ext = ".hdf5"
1318 1209 optchar = "D"
1319 1210 metaoptchar = "M"
1320 1211 metaFile = None
1321 1212 filename = None
1322 1213 path = None
1323 1214 setFile = None
1324 1215 fp = None
1325 1216 grp = None
1326 1217 ds = None
1327 1218 firsttime = True
1328 1219 #Configurations
1329 1220 blocksPerFile = None
1330 1221 blockIndex = None
1331 1222 dataOut = None
1332 1223 #Data Arrays
1333 1224 dataList = None
1334 1225 metadataList = None
1335 1226 dsList = None #List of dictionaries with dataset properties
1336 1227 tableDim = None
1337 1228 dtype = [('name', 'S20'),('nDim', 'i')]
1338 1229 currentDay = None
1339 1230 lastTime = None
1340 1231
1341 1232 def __init__(self):
1342 1233
1343 1234 Operation.__init__(self)
1344 1235 return
1345 1236
1346 1237 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None):
1347 1238 self.path = path
1348 1239 self.blocksPerFile = blocksPerFile
1349 1240 self.metadataList = metadataList
1350 1241 self.dataList = dataList
1351 1242 self.setType = setType
1352 1243
1353 1244 tableList = []
1354 1245 dsList = []
1355 1246
1356 1247 for i in range(len(self.dataList)):
1357 1248 dsDict = {}
1358 1249 dataAux = getattr(self.dataOut, self.dataList[i])
1359 1250 dsDict['variable'] = self.dataList[i]
1360 1251
1361 1252 if dataAux is None:
1362 1253 continue
1363 1254 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
1364 1255 dsDict['nDim'] = 0
1365 1256 else:
1366 1257 dsDict['nDim'] = len(dataAux.shape)
1367 1258 dsDict['shape'] = dataAux.shape
1368 1259 dsDict['dsNumber'] = dataAux.shape[0]
1369 1260
1370 1261 dsList.append(dsDict)
1371 1262 tableList.append((self.dataList[i], dsDict['nDim']))
1372 1263
1373 1264 self.dsList = dsList
1374 1265 self.tableDim = numpy.array(tableList, dtype=self.dtype)
1375 1266 self.currentDay = self.dataOut.datatime.date()
1376 1267
1377 1268 def timeFlag(self):
1378 1269 currentTime = self.dataOut.utctime
1379 1270 timeTuple = time.localtime(currentTime)
1380 1271 dataDay = timeTuple.tm_yday
1381 1272
1382 1273 if self.lastTime is None:
1383 1274 self.lastTime = currentTime
1384 1275 self.currentDay = dataDay
1385 1276 return False
1386 1277
1387 1278 timeDiff = currentTime - self.lastTime
1388 1279
1389 1280 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
1390 1281 if dataDay != self.currentDay:
1391 1282 self.currentDay = dataDay
1392 1283 return True
1393 1284 elif timeDiff > 3*60*60:
1394 1285 self.lastTime = currentTime
1395 1286 return True
1396 1287 else:
1397 1288 self.lastTime = currentTime
1398 1289 return False
1399 1290
1400 1291 def run(self, dataOut, path, blocksPerFile=10, metadataList=None, dataList=None, setType=None):
1401 1292
1402 1293 self.dataOut = dataOut
1403 1294 if not(self.isConfig):
1404 1295 self.setup(path=path, blocksPerFile=blocksPerFile,
1405 1296 metadataList=metadataList, dataList=dataList,
1406 1297 setType=setType)
1407 1298
1408 1299 self.isConfig = True
1409 1300 self.setNextFile()
1410 1301
1411 1302 self.putData()
1412 1303 return
1413 1304
1414 1305 def setNextFile(self):
1415 1306
1416 1307 ext = self.ext
1417 1308 path = self.path
1418 1309 setFile = self.setFile
1419 1310
1420 1311 timeTuple = time.localtime(self.dataOut.utctime)
1421 1312 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
1422 1313 fullpath = os.path.join(path, subfolder)
1423 1314
1424 1315 if os.path.exists(fullpath):
1425 1316 filesList = os.listdir(fullpath)
1426 1317 filesList = [k for k in filesList if k.startswith(self.optchar)]
1427 1318 if len( filesList ) > 0:
1428 1319 filesList = sorted(filesList, key=str.lower)
1429 1320 filen = filesList[-1]
1430 1321 # el filename debera tener el siguiente formato
1431 1322 # 0 1234 567 89A BCDE (hex)
1432 1323 # x YYYY DDD SSS .ext
1433 1324 if isNumber(filen[8:11]):
1434 1325 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
1435 1326 else:
1436 1327 setFile = -1
1437 1328 else:
1438 1329 setFile = -1 #inicializo mi contador de seteo
1439 1330 else:
1440 1331 os.makedirs(fullpath)
1441 1332 setFile = -1 #inicializo mi contador de seteo
1442 1333
1443 1334 if self.setType is None:
1444 1335 setFile += 1
1445 1336 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
1446 1337 timeTuple.tm_year,
1447 1338 timeTuple.tm_yday,
1448 1339 setFile,
1449 1340 ext )
1450 1341 else:
1451 1342 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
1452 1343 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
1453 1344 timeTuple.tm_year,
1454 1345 timeTuple.tm_yday,
1455 1346 setFile,
1456 1347 ext )
1457 1348
1458 1349 self.filename = os.path.join( path, subfolder, file )
1459 1350
1460 1351 #Setting HDF5 File
1461 1352 self.fp = h5py.File(self.filename, 'w')
1462 1353 #write metadata
1463 1354 self.writeMetadata(self.fp)
1464 1355 #Write data
1465 1356 self.writeData(self.fp)
1466 1357
1467 1358 def writeMetadata(self, fp):
1468 1359
1469 1360 grp = fp.create_group("Metadata")
1470 1361 grp.create_dataset('variables', data=self.tableDim, dtype=self.dtype)
1471 1362
1472 1363 for i in range(len(self.metadataList)):
1473 1364 if not hasattr(self.dataOut, self.metadataList[i]):
1474 1365 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
1475 1366 continue
1476 1367 value = getattr(self.dataOut, self.metadataList[i])
1477 1368 grp.create_dataset(self.metadataList[i], data=value)
1478 1369 return
1479 1370
1480 1371 def writeData(self, fp):
1481 1372
1482 1373 grp = fp.create_group("Data")
1483 1374 dtsets = []
1484 1375 data = []
1485 1376
1486 1377 for dsInfo in self.dsList:
1487 1378 if dsInfo['nDim'] == 0:
1488 1379 ds = grp.create_dataset(
1489 1380 dsInfo['variable'],
1490 1381 (self.blocksPerFile, ),
1491 1382 chunks=True,
1492 1383 dtype=numpy.float64)
1493 1384 dtsets.append(ds)
1494 1385 data.append((dsInfo['variable'], -1))
1495 1386 else:
1496 1387 sgrp = grp.create_group(dsInfo['variable'])
1497 1388 for i in range(dsInfo['dsNumber']):
1498 1389 ds = sgrp.create_dataset(
1499 1390 'table{:02d}'.format(i),
1500 1391 (self.blocksPerFile, ) + dsInfo['shape'][1:],
1501 1392 chunks=True)
1502 1393 dtsets.append(ds)
1503 1394 data.append((dsInfo['variable'], i))
1504 1395 fp.flush()
1505 1396
1506 1397 log.log('Creating file: {}'.format(fp.filename), self.name)
1507 1398
1508 1399 self.ds = dtsets
1509 1400 self.data = data
1510 1401 self.firsttime = True
1511 1402 self.blockIndex = 0
1512 1403 return
1513 1404
1514 1405 def putData(self):
1515 1406
1516 1407 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
1517 1408 self.closeFile()
1518 1409 self.setNextFile()
1519 1410
1520 1411 for i, ds in enumerate(self.ds):
1521 1412 attr, ch = self.data[i]
1522 1413 if ch == -1:
1523 1414 ds[self.blockIndex] = getattr(self.dataOut, attr)
1524 1415 else:
1525 1416 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
1526 1417
1527 1418 self.fp.flush()
1528 1419 self.blockIndex += 1
1529 1420 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
1530 1421
1531 1422 return
1532 1423
1533 1424 def closeFile(self):
1534 1425
1535 1426 if self.blockIndex != self.blocksPerFile:
1536 1427 for ds in self.ds:
1537 1428 ds.resize(self.blockIndex, axis=0)
1538 1429
1539 1430 self.fp.flush()
1540 1431 self.fp.close()
1541 1432
1542 1433 def close(self):
1543 1434
1544 1435 self.closeFile()
@@ -1,433 +1,415
1 1 '''
2 2 Updated for multiprocessing
3 3 Author : Sergio Cortez
4 4 Jan 2018
5 5 Abstract:
6 6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9 9
10 10 Based on:
11 11 $Author: murco $
12 12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 13 '''
14 14
15 15 import os
16 16 import inspect
17 17 import zmq
18 18 import time
19 19 import pickle
20 20 import traceback
21 21 try:
22 22 from queue import Queue
23 23 except:
24 24 from Queue import Queue
25 25 from threading import Thread
26 26 from multiprocessing import Process
27 27
28 28 from schainpy.utils import log
29 29
30 30
31 31 class ProcessingUnit(object):
32 32
33 33 """
34 34 Update - Jan 2018 - MULTIPROCESSING
35 35 All the "call" methods present in the previous base were removed.
36 36 The majority of operations are independant processes, thus
37 37 the decorator is in charge of communicate the operation processes
38 38 with the proccessing unit via IPC.
39 39
40 40 The constructor does not receive any argument. The remaining methods
41 41 are related with the operations to execute.
42 42
43 43
44 44 """
45 proc_type = 'processing'
46 __attrs__ = []
45 47
46 48 def __init__(self):
47 49
48 50 self.dataIn = None
49 51 self.dataOut = None
50 52 self.isConfig = False
51 53 self.operations = []
52 54 self.plots = []
53 55
54 56 def getAllowedArgs(self):
55 57 if hasattr(self, '__attrs__'):
56 58 return self.__attrs__
57 59 else:
58 60 return inspect.getargspec(self.run).args
59 61
60 62 def addOperation(self, conf, operation):
61 63 """
62 64 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
63 65 posses the id of the operation process (IPC purposes)
64 66
65 67 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
66 68 identificador asociado a este objeto.
67 69
68 70 Input:
69 71
70 72 object : objeto de la clase "Operation"
71 73
72 74 Return:
73 75
74 76 objId : identificador del objeto, necesario para comunicar con master(procUnit)
75 77 """
76 78
77 79 self.operations.append(
78 80 (operation, conf.type, conf.id, conf.getKwargs()))
79 81
80 82 if 'plot' in self.name.lower():
81 83 self.plots.append(operation.CODE)
82 84
83 85 def getOperationObj(self, objId):
84 86
85 87 if objId not in list(self.operations.keys()):
86 88 return None
87 89
88 90 return self.operations[objId]
89 91
90 92 def operation(self, **kwargs):
91 93 """
92 94 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
93 95 atributos del objeto dataOut
94 96
95 97 Input:
96 98
97 99 **kwargs : Diccionario de argumentos de la funcion a ejecutar
98 100 """
99 101
100 102 raise NotImplementedError
101 103
102 104 def setup(self):
103 105
104 106 raise NotImplementedError
105 107
106 108 def run(self):
107 109
108 110 raise NotImplementedError
109 111
110 112 def close(self):
111 113
112 114 return
113 115
114 116
115 117 class Operation(object):
116 118
117 119 """
118 120 Update - Jan 2018 - MULTIPROCESSING
119 121
120 122 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
121 123 The constructor doe snot receive any argument, neither the baseclass.
122 124
123 125
124 126 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
125 127 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
126 128 acumulacion dentro de esta clase
127 129
128 130 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
129 131
130 132 """
133 proc_type = 'operation'
134 __attrs__ = []
131 135
132 136 def __init__(self):
133 137
134 138 self.id = None
135 139 self.isConfig = False
136 140
137 141 if not hasattr(self, 'name'):
138 142 self.name = self.__class__.__name__
139 143
140 144 def getAllowedArgs(self):
141 145 if hasattr(self, '__attrs__'):
142 146 return self.__attrs__
143 147 else:
144 148 return inspect.getargspec(self.run).args
145 149
146 150 def setup(self):
147 151
148 152 self.isConfig = True
149 153
150 154 raise NotImplementedError
151 155
152 156 def run(self, dataIn, **kwargs):
153 157 """
154 158 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
155 159 atributos del objeto dataIn.
156 160
157 161 Input:
158 162
159 163 dataIn : objeto del tipo JROData
160 164
161 165 Return:
162 166
163 167 None
164 168
165 169 Affected:
166 170 __buffer : buffer de recepcion de datos.
167 171
168 172 """
169 173 if not self.isConfig:
170 174 self.setup(**kwargs)
171 175
172 176 raise NotImplementedError
173 177
174 178 def close(self):
175 179
176 180 return
177 181
178 182 class InputQueue(Thread):
179 183
180 184 '''
181
182 185 Class to hold input data for Proccessing Units and external Operations,
183
184 186 '''
185 187
186
187
188 188 def __init__(self, project_id, inputId):
189 189
190 190 Thread.__init__(self)
191
192 191 self.queue = Queue()
193
194 192 self.project_id = project_id
195
196 193 self.inputId = inputId
197 194
198
199
200 195 def run(self):
201 196
202
203
204 197 c = zmq.Context()
205
206 198 self.receiver = c.socket(zmq.SUB)
207
208 199 self.receiver.connect(
209
210 200 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
211
212 201 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
213
214
215
202
216 203 while True:
217
218 204 self.queue.put(self.receiver.recv_multipart()[1])
219 205
220
221
222 206 def get(self):
223
224
225
207
226 208 return pickle.loads(self.queue.get())
227
228
209
229 210
230 211 def MPDecorator(BaseClass):
231 212 """
232 213 Multiprocessing class decorator
233 214
234 215 This function add multiprocessing features to a BaseClass. Also, it handle
235 216 the communication beetween processes (readers, procUnits and operations).
236 217 """
237 218
238 219 class MPClass(BaseClass, Process):
239 220
240 221 def __init__(self, *args, **kwargs):
241 222 super(MPClass, self).__init__()
242 223 Process.__init__(self)
243 224 self.operationKwargs = {}
244 225 self.args = args
245 226 self.kwargs = kwargs
246 227 self.sender = None
247 228 self.receiver = None
248 229 self.i = 0
249 230 self.t = time.time()
250 231 self.name = BaseClass.__name__
232 self.__doc__ = BaseClass.__doc__
251 233
252 234 if 'plot' in self.name.lower() and not self.name.endswith('_'):
253 235 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
254 236
255 237 self.start_time = time.time()
256 238 self.id = args[0]
257 239 self.inputId = args[1]
258 240 self.project_id = args[2]
259 241 self.err_queue = args[3]
260 242 self.typeProc = args[4]
261 243 self.err_queue.put('#_start_#')
262 244 self.queue = InputQueue(self.project_id, self.inputId)
263 245
264 246 def subscribe(self):
265 247 '''
266 248 Start the zmq socket receiver and subcribe to input ID.
267 249 '''
268 250
269 251 self.queue.start()
270 252
271 253 def listen(self):
272 254 '''
273 255 This function waits for objects
274 256 '''
275 257
276 258 return self.queue.get()
277 259
278 260 def set_publisher(self):
279 261 '''
280 262 This function create a zmq socket for publishing objects.
281 263 '''
282 264
283 265 time.sleep(0.5)
284 266
285 267 c = zmq.Context()
286 268 self.sender = c.socket(zmq.PUB)
287 269 self.sender.connect(
288 270 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
289 271
290 272 def publish(self, data, id):
291 273 '''
292 274 This function publish an object, to an specific topic.
293 275 For Read Units (inputId == None) adds a little delay
294 276 to avoid data loss
295 277 '''
296 278
297 279 if self.inputId is None:
298 280 self.i += 1
299 281 if self.i % 40 == 0 and time.time()-self.t > 0.1:
300 282 self.i = 0
301 283 self.t = time.time()
302 284 time.sleep(0.05)
303 285 elif self.i % 40 == 0:
304 286 self.i = 0
305 287 self.t = time.time()
306 288 time.sleep(0.01)
307 289
308 290 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
309 291
310 292 def runReader(self):
311 293 '''
312 294 Run fuction for read units
313 295 '''
314 296 while True:
315 297
316 298 try:
317 299 BaseClass.run(self, **self.kwargs)
318 300 except:
319 301 err = traceback.format_exc()
320 302 if 'No more files' in err:
321 303 log.warning('No more files to read', self.name)
322 304 else:
323 305 self.err_queue.put('{}|{}'.format(self.name, err))
324 306 self.dataOut.error = True
325 307
326 308 for op, optype, opId, kwargs in self.operations:
327 309 if optype == 'self' and not self.dataOut.flagNoData:
328 310 op(**kwargs)
329 311 elif optype == 'other' and not self.dataOut.flagNoData:
330 312 self.dataOut = op.run(self.dataOut, **self.kwargs)
331 313 elif optype == 'external':
332 314 self.publish(self.dataOut, opId)
333 315
334 316 if self.dataOut.flagNoData and not self.dataOut.error:
335 317 continue
336 318
337 319 self.publish(self.dataOut, self.id)
338 320
339 321 if self.dataOut.error:
340 322 break
341 323
342 324 time.sleep(0.5)
343 325
344 326 def runProc(self):
345 327 '''
346 328 Run function for proccessing units
347 329 '''
348 330
349 331 while True:
350 332 self.dataIn = self.listen()
351 333
352 334 if self.dataIn.flagNoData and self.dataIn.error is None:
353 335 continue
354 336 elif not self.dataIn.error:
355 337 try:
356 338 BaseClass.run(self, **self.kwargs)
357 339 except:
358 340 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
359 341 self.dataOut.error = True
360 342 elif self.dataIn.error:
361 343 self.dataOut.error = self.dataIn.error
362 344 self.dataOut.flagNoData = True
363 345
364 346 for op, optype, opId, kwargs in self.operations:
365 347 if optype == 'self' and not self.dataOut.flagNoData:
366 348 op(**kwargs)
367 349 elif optype == 'other' and not self.dataOut.flagNoData:
368 350 self.dataOut = op.run(self.dataOut, **kwargs)
369 351 elif optype == 'external' and not self.dataOut.flagNoData:
370 352 self.publish(self.dataOut, opId)
371 353
372 354 self.publish(self.dataOut, self.id)
373 355 for op, optype, opId, kwargs in self.operations:
374 356 if optype == 'external' and self.dataOut.error:
375 357 self.publish(self.dataOut, opId)
376 358
377 359 if self.dataOut.error:
378 360 break
379 361
380 362 time.sleep(0.5)
381 363
382 364 def runOp(self):
383 365 '''
384 366 Run function for external operations (this operations just receive data
385 367 ex: plots, writers, publishers)
386 368 '''
387 369
388 370 while True:
389 371
390 372 dataOut = self.listen()
391 373
392 374 if not dataOut.error:
393 375 BaseClass.run(self, dataOut, **self.kwargs)
394 376 else:
395 377 break
396 378
397 379 def run(self):
398 380 if self.typeProc is "ProcUnit":
399 381
400 382 if self.inputId is not None:
401 383 self.subscribe()
402 384
403 385 self.set_publisher()
404 386
405 387 if 'Reader' not in BaseClass.__name__:
406 388 self.runProc()
407 389 else:
408 390 self.runReader()
409 391
410 392 elif self.typeProc is "Operation":
411 393
412 394 self.subscribe()
413 395 self.runOp()
414 396
415 397 else:
416 398 raise ValueError("Unknown type")
417 399
418 400 self.close()
419 401
420 402 def close(self):
421 403
422 404 BaseClass.close(self)
423 405 self.err_queue.put('#_end_#')
424 406
425 407 if self.sender:
426 408 self.sender.close()
427 409
428 410 if self.receiver:
429 411 self.receiver.close()
430 412
431 413 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
432 414
433 415 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now