##// END OF EJS Templates
Review decorator logic for ending process
Juan C. Espinoza -
r1193:c3967e412107
parent child
Show More
@@ -129,8 +129,7 class ParameterConf():
129 129 raise ValueError('%s: This parameter value is empty' % self.name)
130 130
131 131 if format == 'list':
132 strList = value.split(',')
133
132 strList = [s.strip() for s in value.split(',')]
134 133 self.__formated_value = strList
135 134
136 135 return self.__formated_value
@@ -753,9 +753,6 class Plot(Operation):
753 753
754 754 def run(self, dataOut, **kwargs):
755 755
756 if dataOut.flagNoData and not dataOut.error:
757 return dataOut
758
759 756 if dataOut.error:
760 757 coerce = True
761 758 else:
@@ -356,7 +356,7 class BLTRParamReader(JRODataReader, ProcessingUnit):
356 356 '''
357 357 if self.flagNoMoreFiles:
358 358 self.dataOut.flagNoData = True
359 self.dataOut.error = (1, 'No More files to read')
359 self.dataOut.error = 'No More files to read'
360 360
361 361 if not self.readNextBlock():
362 362 self.dataOut.flagNoData = True
@@ -898,7 +898,7 class JRODataReader(JRODataIO):
898 898 newFile = self.__setNextFileOffline()
899 899
900 900 if not(newFile):
901 self.dataOut.error = (-1, 'No more files to read')
901 self.dataOut.error = 'No more files to read'
902 902 return 0
903 903
904 904 if self.verbose:
@@ -1052,7 +1052,7 class JRODataReader(JRODataIO):
1052 1052 # Skip block out of startTime and endTime
1053 1053 while True:
1054 1054 if not(self.__setNewBlock()):
1055 self.dataOut.error = (-1, 'No more files to read')
1055 self.dataOut.error = 'No more files to read'
1056 1056 return 0
1057 1057
1058 1058 if not(self.readBlock()):
@@ -1324,7 +1324,7 class JRODataReader(JRODataIO):
1324 1324 sleep(self.delay)
1325 1325
1326 1326 if not(fullpath):
1327 self.dataOut.error = (-1, 'There isn\'t any valid file in {}'.format(path))
1327 self.dataOut.error = 'There isn\'t any valid file in {}'.format(path)
1328 1328 return
1329 1329
1330 1330 self.year = year
@@ -569,21 +569,20 class DigitalRFReader(ProcessingUnit):
569 569 if self.__readNextBlock():
570 570 break
571 571 if self.__thisUnixSample > self.__endUTCSecond * self.__sample_rate:
572 self.dataOut.error = (1, '')
572 self.dataOut.error = 'Error'
573 573 return
574 574
575 575 if self.__flagDiscontinuousBlock:
576 print('[Reading] discontinuous block found ... continue with the next block')
577 self.dataOut.error = (1, '')
576 self.dataOut.error = 'discontinuous block found'
578 577 return
579 578
580 579 if not self.__online:
581 self.dataOut.error = (1, '')
580 self.dataOut.error = 'Online?'
582 581 return
583 582
584 583 err_counter += 1
585 584 if err_counter > nTries:
586 self.dataOut.error = (1, '')
585 self.dataOut.error = 'Max retrys reach'
587 586 return
588 587
589 588 print('[Reading] waiting %d seconds to read a new block' % seconds)
@@ -395,7 +395,7 class MADReader(JRODataReader, ProcessingUnit):
395 395 '''
396 396 if self.flagNoMoreFiles:
397 397 self.dataOut.flagNoData = True
398 log.error('No file left to process', 'MADReader')
398 self.dataOut.error = 'No file left to process'
399 399 return 0
400 400
401 401 if not self.readNextBlock():
@@ -152,7 +152,7 class ParamReader(JRODataReader,ProcessingUnit):
152 152 #----------------------------------------------------------------------------------
153 153
154 154 for thisPath in pathList:
155 # thisPath = pathList[pathDict[file]]
155 # thisPath = pathList[pathDict[file]]
156 156
157 157 fileList = glob.glob1(thisPath, "*%s" %ext)
158 158 fileList.sort()
@@ -220,7 +220,7 class ParamReader(JRODataReader,ProcessingUnit):
220 220 #chino rata
221 221 #In case has utctime attribute
222 222 grp2 = grp1['utctime']
223 # thisUtcTime = grp2.value[0] - 5*3600 #To convert to local time
223 # thisUtcTime = grp2.value[0] - 5*3600 #To convert to local time
224 224 thisUtcTime = grp2.value[0]
225 225
226 226 fp.close()
@@ -229,7 +229,7 class ParamReader(JRODataReader,ProcessingUnit):
229 229 thisUtcTime -= 5*3600
230 230
231 231 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600)
232 # thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0])
232 # thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0])
233 233 thisDate = thisDatetime.date()
234 234 thisTime = thisDatetime.time()
235 235
@@ -282,11 +282,11 class ParamReader(JRODataReader,ProcessingUnit):
282 282
283 283 print("Setting the file: %s"%self.filename)
284 284
285 # self.__readMetadata()
285 # self.__readMetadata()
286 286 self.__setBlockList()
287 287 self.__readData()
288 # self.nRecords = self.fp['Data'].attrs['blocksPerFile']
289 # self.nRecords = self.fp['Data'].attrs['nRecords']
288 # self.nRecords = self.fp['Data'].attrs['blocksPerFile']
289 # self.nRecords = self.fp['Data'].attrs['nRecords']
290 290 self.blockIndex = 0
291 291 return 1
292 292
@@ -340,16 +340,16 class ParamReader(JRODataReader,ProcessingUnit):
340 340
341 341 '''
342 342
343 # grp = self.fp['Data']
344 # pathMeta = os.path.join(self.path, grp.attrs['metadata'])
345 #
346 # if pathMeta == self.pathMeta:
347 # return
348 # else:
349 # self.pathMeta = pathMeta
350 #
351 # filePointer = h5py.File(self.pathMeta,'r')
352 # groupPointer = filePointer['Metadata']
343 # grp = self.fp['Data']
344 # pathMeta = os.path.join(self.path, grp.attrs['metadata'])
345 #
346 # if pathMeta == self.pathMeta:
347 # return
348 # else:
349 # self.pathMeta = pathMeta
350 #
351 # filePointer = h5py.File(self.pathMeta,'r')
352 # groupPointer = filePointer['Metadata']
353 353
354 354 filename = self.filenameList[0]
355 355
@@ -372,8 +372,8 class ParamReader(JRODataReader,ProcessingUnit):
372 372 listMetaname.append(name)
373 373 listMetadata.append(data)
374 374
375 # if name=='type':
376 # self.__initDataOut(data)
375 # if name=='type':
376 # self.__initDataOut(data)
377 377
378 378 self.listShapes = listShapes
379 379 self.listMetaname = listMetaname
@@ -471,7 +471,7 class ParamReader(JRODataReader,ProcessingUnit):
471 471 listShapes = self.listShapes
472 472
473 473 blockIndex = self.blockIndex
474 # blockList = self.blockList
474 # blockList = self.blockList
475 475
476 476 for i in range(len(listMeta)):
477 477 setattr(self.dataOut,listMetaname[i],listMeta[i])
@@ -517,12 +517,13 class ParamReader(JRODataReader,ProcessingUnit):
517 517
518 518 if not(self.isConfig):
519 519 self.setup(**kwargs)
520 # self.setObjProperties()
520 # self.setObjProperties()
521 521 self.isConfig = True
522 522
523 523 self.getData()
524 524
525 525 return
526
526 527 @MPDecorator
527 528 class ParamWriter(Operation):
528 529 '''
@@ -542,61 +543,35 class ParamWriter(Operation):
542 543
543 544
544 545 ext = ".hdf5"
545
546 546 optchar = "D"
547
548 547 metaoptchar = "M"
549
550 548 metaFile = None
551
552 549 filename = None
553
554 550 path = None
555
556 551 setFile = None
557
558 552 fp = None
559
560 553 grp = None
561
562 554 ds = None
563
564 555 firsttime = True
565
566 556 #Configurations
567
568 557 blocksPerFile = None
569
570 558 blockIndex = None
571
572 559 dataOut = None
573
574 560 #Data Arrays
575
576 561 dataList = None
577
578 562 metadataList = None
579
580 # arrayDim = None
581
582 563 dsList = None #List of dictionaries with dataset properties
583
584 564 tableDim = None
585
586 # dtype = [('arrayName', 'S20'),('nChannels', 'i'), ('nPoints', 'i'), ('nSamples', 'i'),('mode', 'b')]
587
588 565 dtype = [('arrayName', 'S20'),('nDimensions', 'i'), ('dim2', 'i'), ('dim1', 'i'),('dim0', 'i'),('mode', 'b')]
589
590 566 currentDay = None
591
592 567 lastTime = None
593 568
594 def __init__(self):#, **kwargs):
595 Operation.__init__(self)#, **kwargs)
596 #self.isConfig = False
569 def __init__(self):
570
571 Operation.__init__(self)
597 572 return
598 573
599 def setup(self, dataOut, path=None, blocksPerFile=10, metadataList=None, dataList=None, mode=None, **kwargs):
574 def setup(self, dataOut, path=None, blocksPerFile=10, metadataList=None, dataList=None, mode=None, setType=None):
600 575 self.path = path
601 576 self.blocksPerFile = blocksPerFile
602 577 self.metadataList = metadataList
@@ -606,9 +581,9 class ParamWriter(Operation):
606 581 if self.mode is not None:
607 582 self.mode = numpy.zeros(len(self.dataList)) + mode
608 583 else:
609 #self.mode = numpy.ones(len(self.dataList),int)
610 584 self.mode = numpy.ones(len(self.dataList))
611 log.error(self.mode)#yong
585
586 self.setType = setType
612 587
613 588 arrayDim = numpy.zeros((len(self.dataList),5))
614 589
@@ -631,11 +606,6 class ParamWriter(Operation):
631 606
632 607 return 0
633 608
634 #Not array, just a number
635 #Mode 0
636 #log.error(mode)#yong
637 #log.error(len(mode))#yong
638 #log.error(type(mode))#yong
639 609 if type(dataAux)==float or type(dataAux)==int:
640 610 dsDict['mode'] = 0
641 611 dsDict['nDim'] = 0
@@ -644,16 +614,13 class ParamWriter(Operation):
644 614
645 615 #Mode 2: meteors
646 616 elif self.mode[i] == 2:
647 # dsDict['nDim'] = 0
648 617 dsDict['dsName'] = 'table0'
649 618 dsDict['mode'] = 2 # Mode meteors
650 619 dsDict['shape'] = dataAux.shape[-1]
651 620 dsDict['nDim'] = 0
652 621 dsDict['dsNumber'] = 1
653
654 622 arrayDim[i,3] = dataAux.shape[-1]
655 623 arrayDim[i,4] = self.mode[i] #Mode the data was stored
656
657 624 dsList.append(dsDict)
658 625
659 626 #Mode 1
@@ -661,7 +628,6 class ParamWriter(Operation):
661 628 arrayDim0 = dataAux.shape #Data dimensions
662 629 arrayDim[i,0] = len(arrayDim0) #Number of array dimensions
663 630 arrayDim[i,4] = self.mode[i] #Mode the data was stored
664
665 631 strtable = 'table'
666 632 dsDict['mode'] = 1 # Mode parameters
667 633
@@ -703,14 +669,11 class ParamWriter(Operation):
703 669 table = numpy.array((self.dataList[i],) + tuple(arrayDim[i,:]),dtype = dtype0)
704 670 tableList.append(table)
705 671
706 # self.arrayDim = arrayDim
707 672 self.dsList = dsList
708 673 self.tableDim = numpy.array(tableList, dtype = dtype0)
709 674 self.blockIndex = 0
710
711 675 timeTuple = time.localtime(dataOut.utctime)
712 676 self.currentDay = timeTuple.tm_yday
713 return 1
714 677
715 678 def putMetadata(self):
716 679
@@ -744,7 +707,7 class ParamWriter(Operation):
744 707 filesList = os.listdir( fullpath )
745 708 filesList = sorted( filesList, key=str.lower )
746 709 if len( filesList ) > 0:
747 filesList = [k for k in filesList if 'M' in k]
710 filesList = [k for k in filesList if k.startswith(self.metaoptchar)]
748 711 filen = filesList[-1]
749 712 # el filename debera tener el siguiente formato
750 713 # 0 1234 567 89A BCDE (hex)
@@ -825,7 +788,7 class ParamWriter(Operation):
825 788
826 789 if os.path.exists(fullpath):
827 790 filesList = os.listdir( fullpath )
828 filesList = [k for k in filesList if 'D' in k]
791 filesList = [k for k in filesList if k.startswith(self.optchar)]
829 792 if len( filesList ) > 0:
830 793 filesList = sorted( filesList, key=str.lower )
831 794 filen = filesList[-1]
@@ -842,16 +805,16 class ParamWriter(Operation):
842 805 os.makedirs(fullpath)
843 806 setFile = -1 #inicializo mi contador de seteo
844 807
845 if None is None:
808 if self.setType is None:
846 809 setFile += 1
847 file = '%s%4.4d%3.3d%03d%s' % (self.metaoptchar,
810 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
848 811 timeTuple.tm_year,
849 812 timeTuple.tm_yday,
850 813 setFile,
851 814 ext )
852 815 else:
853 816 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
854 file = '%s%4.4d%3.3d%04d%s' % (self.metaoptchar,
817 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
855 818 timeTuple.tm_year,
856 819 timeTuple.tm_yday,
857 820 setFile,
@@ -865,9 +828,6 class ParamWriter(Operation):
865 828 self.writeMetadata(fp)
866 829 #Write data
867 830 grp = fp.create_group("Data")
868 # grp.attrs['metadata'] = self.metaFile
869
870 # grp.attrs['blocksPerFile'] = 0
871 831 ds = []
872 832 data = []
873 833 dsList = self.dsList
@@ -876,13 +836,11 class ParamWriter(Operation):
876 836 dsInfo = dsList[i]
877 837 #One-dimension data
878 838 if dsInfo['mode'] == 0:
879 # ds0 = grp.create_dataset(self.dataList[i], (1,1), maxshape=(1,self.blocksPerFile) , chunks = True, dtype='S20')
880 839 ds0 = grp.create_dataset(dsInfo['variable'], (1,1), maxshape=(1,self.blocksPerFile) , chunks = True, dtype=numpy.float64)
881 840 ds.append(ds0)
882 841 data.append([])
883 842 i += 1
884 843 continue
885 # nDimsForDs.append(nDims[i])
886 844
887 845 elif dsInfo['mode'] == 2:
888 846 grp0 = grp.create_group(dsInfo['variable'])
@@ -910,23 +868,14 class ParamWriter(Operation):
910 868 ds.append(ds0)
911 869 data.append([])
912 870 i += 1
913 # nDimsForDs.append(nDims[i])
914 871
915 872 fp.flush()
916 873 fp.close()
917 874
918 # self.nDatas = nDatas
919 # self.nDims = nDims
920 # self.nDimsForDs = nDimsForDs
921 #Saving variables
922 print('Writing the file: %s'%filename)
875 log.log('creating file: {}'.format(filename), 'Writing')
923 876 self.filename = filename
924 # self.fp = fp
925 # self.grp = grp
926 # self.grp.attrs.modify('nRecords', 1)
927 877 self.ds = ds
928 878 self.data = data
929 # self.setFile = setFile
930 879 self.firsttime = True
931 880 self.blockIndex = 0
932 881 return
@@ -936,7 +885,6 class ParamWriter(Operation):
936 885 if self.blockIndex == self.blocksPerFile or self.timeFlag():
937 886 self.setNextFile()
938 887
939 # if not self.firsttime:
940 888 self.readBlock()
941 889 self.setBlock() #Prepare data to be written
942 890 self.writeBlock() #Write data
@@ -958,7 +906,6 class ParamWriter(Operation):
958 906 grp = fp["Data"]
959 907 ind = 0
960 908
961 # grp.attrs['blocksPerFile'] = 0
962 909 while ind < len(dsList):
963 910 dsInfo = dsList[ind]
964 911
@@ -1004,9 +951,9 class ParamWriter(Operation):
1004 951 if mode == 0 or mode == 2 or nDim == 1:
1005 952 data[ind] = dataAux
1006 953 ind += 1
1007 # elif nDim == 1:
1008 # data[ind] = numpy.reshape(dataAux,(numpy.size(dataAux),1))
1009 # ind += 1
954 # elif nDim == 1:
955 # data[ind] = numpy.reshape(dataAux,(numpy.size(dataAux),1))
956 # ind += 1
1010 957 elif nDim == 2:
1011 958 for j in range(dsInfo['dsNumber']):
1012 959 data[ind] = dataAux[j,:]
@@ -1032,8 +979,6 class ParamWriter(Operation):
1032 979
1033 980 # First time
1034 981 if self.firsttime:
1035 # self.ds[i].resize(self.data[i].shape)
1036 # self.ds[i][self.blockIndex,:] = self.data[i]
1037 982 if type(self.data[i]) == numpy.ndarray:
1038 983
1039 984 if nDim == 3:
@@ -1076,14 +1021,12 class ParamWriter(Operation):
1076 1021 self.fp.close()
1077 1022 return
1078 1023
1079 def run(self, dataOut, path, blocksPerFile=10, metadataList=None, dataList=None, mode=None, **kwargs):
1024 def run(self, dataOut, path, blocksPerFile=10, metadataList=None, dataList=None, mode=None, setType=None):
1080 1025
1081 1026 if not(self.isConfig):
1082 flagdata = self.setup(dataOut, path=path, blocksPerFile=blocksPerFile,
1083 metadataList=metadataList, dataList=dataList, mode=mode, **kwargs)
1084
1085 if not(flagdata):
1086 return
1027 self.setup(dataOut, path=path, blocksPerFile=blocksPerFile,
1028 metadataList=metadataList, dataList=dataList, mode=mode,
1029 setType=setType)
1087 1030
1088 1031 self.isConfig = True
1089 1032 self.setNextFile()
@@ -190,6 +190,8 def MPDecorator(BaseClass):
190 190 self.sender = None
191 191 self.receiver = None
192 192 self.name = BaseClass.__name__
193 if 'plot' in self.name.lower():
194 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
193 195 self.start_time = time.time()
194 196
195 197 if len(self.args) is 3:
@@ -257,16 +259,13 def MPDecorator(BaseClass):
257 259 elif optype == 'external':
258 260 self.publish(self.dataOut, opId)
259 261
260 if self.dataOut.flagNoData and self.dataOut.error is None:
262 if self.dataOut.flagNoData and not self.dataOut.error:
261 263 continue
262 264
263 265 self.publish(self.dataOut, self.id)
264 266
265 267 if self.dataOut.error:
266 if self.dataOut.error[0] == -1:
267 log.error(self.dataOut.error[1], self.name)
268 if self.dataOut.error[0] == 1:
269 log.success(self.dataOut.error[1], self.name)
268 log.error(self.dataOut.error, self.name)
270 269 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
271 270 break
272 271
@@ -285,8 +284,9 def MPDecorator(BaseClass):
285 284
286 285 BaseClass.run(self, **self.kwargs)
287 286
288 if self.dataOut.flagNoData:
289 continue
287 if self.dataIn.error:
288 self.dataOut.error = self.dataIn.error
289 self.dataOut.flagNoData = True
290 290
291 291 for op, optype, opId, kwargs in self.operations:
292 292 if optype == 'self':
@@ -294,7 +294,8 def MPDecorator(BaseClass):
294 294 elif optype == 'other':
295 295 self.dataOut = op.run(self.dataOut, **kwargs)
296 296 elif optype == 'external':
297 self.publish(self.dataOut, opId)
297 if not self.dataOut.flagNoData or self.dataOut.error:
298 self.publish(self.dataOut, opId)
298 299
299 300 self.publish(self.dataOut, self.id)
300 301 if self.dataIn.error:
@@ -316,6 +317,7 def MPDecorator(BaseClass):
316 317
317 318 if dataOut.error:
318 319 break
320
319 321 time.sleep(1)
320 322
321 323 def run(self):
General Comments 0
You need to be logged in to leave comments. Login now