##// END OF EJS Templates
Review decorator logic for ending process
Juan C. Espinoza -
r1193:c3967e412107
parent child
Show More
@@ -129,8 +129,7 class ParameterConf():
129 raise ValueError('%s: This parameter value is empty' % self.name)
129 raise ValueError('%s: This parameter value is empty' % self.name)
130
130
131 if format == 'list':
131 if format == 'list':
132 strList = value.split(',')
132 strList = [s.strip() for s in value.split(',')]
133
134 self.__formated_value = strList
133 self.__formated_value = strList
135
134
136 return self.__formated_value
135 return self.__formated_value
@@ -753,9 +753,6 class Plot(Operation):
753
753
754 def run(self, dataOut, **kwargs):
754 def run(self, dataOut, **kwargs):
755
755
756 if dataOut.flagNoData and not dataOut.error:
757 return dataOut
758
759 if dataOut.error:
756 if dataOut.error:
760 coerce = True
757 coerce = True
761 else:
758 else:
@@ -356,7 +356,7 class BLTRParamReader(JRODataReader, ProcessingUnit):
356 '''
356 '''
357 if self.flagNoMoreFiles:
357 if self.flagNoMoreFiles:
358 self.dataOut.flagNoData = True
358 self.dataOut.flagNoData = True
359 self.dataOut.error = (1, 'No More files to read')
359 self.dataOut.error = 'No More files to read'
360
360
361 if not self.readNextBlock():
361 if not self.readNextBlock():
362 self.dataOut.flagNoData = True
362 self.dataOut.flagNoData = True
@@ -898,7 +898,7 class JRODataReader(JRODataIO):
898 newFile = self.__setNextFileOffline()
898 newFile = self.__setNextFileOffline()
899
899
900 if not(newFile):
900 if not(newFile):
901 self.dataOut.error = (-1, 'No more files to read')
901 self.dataOut.error = 'No more files to read'
902 return 0
902 return 0
903
903
904 if self.verbose:
904 if self.verbose:
@@ -1052,7 +1052,7 class JRODataReader(JRODataIO):
1052 # Skip block out of startTime and endTime
1052 # Skip block out of startTime and endTime
1053 while True:
1053 while True:
1054 if not(self.__setNewBlock()):
1054 if not(self.__setNewBlock()):
1055 self.dataOut.error = (-1, 'No more files to read')
1055 self.dataOut.error = 'No more files to read'
1056 return 0
1056 return 0
1057
1057
1058 if not(self.readBlock()):
1058 if not(self.readBlock()):
@@ -1324,7 +1324,7 class JRODataReader(JRODataIO):
1324 sleep(self.delay)
1324 sleep(self.delay)
1325
1325
1326 if not(fullpath):
1326 if not(fullpath):
1327 self.dataOut.error = (-1, 'There isn\'t any valid file in {}'.format(path))
1327 self.dataOut.error = 'There isn\'t any valid file in {}'.format(path)
1328 return
1328 return
1329
1329
1330 self.year = year
1330 self.year = year
@@ -569,21 +569,20 class DigitalRFReader(ProcessingUnit):
569 if self.__readNextBlock():
569 if self.__readNextBlock():
570 break
570 break
571 if self.__thisUnixSample > self.__endUTCSecond * self.__sample_rate:
571 if self.__thisUnixSample > self.__endUTCSecond * self.__sample_rate:
572 self.dataOut.error = (1, '')
572 self.dataOut.error = 'Error'
573 return
573 return
574
574
575 if self.__flagDiscontinuousBlock:
575 if self.__flagDiscontinuousBlock:
576 print('[Reading] discontinuous block found ... continue with the next block')
576 self.dataOut.error = 'discontinuous block found'
577 self.dataOut.error = (1, '')
578 return
577 return
579
578
580 if not self.__online:
579 if not self.__online:
581 self.dataOut.error = (1, '')
580 self.dataOut.error = 'Online?'
582 return
581 return
583
582
584 err_counter += 1
583 err_counter += 1
585 if err_counter > nTries:
584 if err_counter > nTries:
586 self.dataOut.error = (1, '')
585 self.dataOut.error = 'Max retrys reach'
587 return
586 return
588
587
589 print('[Reading] waiting %d seconds to read a new block' % seconds)
588 print('[Reading] waiting %d seconds to read a new block' % seconds)
@@ -395,7 +395,7 class MADReader(JRODataReader, ProcessingUnit):
395 '''
395 '''
396 if self.flagNoMoreFiles:
396 if self.flagNoMoreFiles:
397 self.dataOut.flagNoData = True
397 self.dataOut.flagNoData = True
398 log.error('No file left to process', 'MADReader')
398 self.dataOut.error = 'No file left to process'
399 return 0
399 return 0
400
400
401 if not self.readNextBlock():
401 if not self.readNextBlock():
@@ -523,6 +523,7 class ParamReader(JRODataReader,ProcessingUnit):
523 self.getData()
523 self.getData()
524
524
525 return
525 return
526
526 @MPDecorator
527 @MPDecorator
527 class ParamWriter(Operation):
528 class ParamWriter(Operation):
528 '''
529 '''
@@ -542,61 +543,35 class ParamWriter(Operation):
542
543
543
544
544 ext = ".hdf5"
545 ext = ".hdf5"
545
546 optchar = "D"
546 optchar = "D"
547
548 metaoptchar = "M"
547 metaoptchar = "M"
549
550 metaFile = None
548 metaFile = None
551
552 filename = None
549 filename = None
553
554 path = None
550 path = None
555
556 setFile = None
551 setFile = None
557
558 fp = None
552 fp = None
559
560 grp = None
553 grp = None
561
562 ds = None
554 ds = None
563
564 firsttime = True
555 firsttime = True
565
566 #Configurations
556 #Configurations
567
568 blocksPerFile = None
557 blocksPerFile = None
569
570 blockIndex = None
558 blockIndex = None
571
572 dataOut = None
559 dataOut = None
573
574 #Data Arrays
560 #Data Arrays
575
576 dataList = None
561 dataList = None
577
578 metadataList = None
562 metadataList = None
579
580 # arrayDim = None
581
582 dsList = None #List of dictionaries with dataset properties
563 dsList = None #List of dictionaries with dataset properties
583
584 tableDim = None
564 tableDim = None
585
586 # dtype = [('arrayName', 'S20'),('nChannels', 'i'), ('nPoints', 'i'), ('nSamples', 'i'),('mode', 'b')]
587
588 dtype = [('arrayName', 'S20'),('nDimensions', 'i'), ('dim2', 'i'), ('dim1', 'i'),('dim0', 'i'),('mode', 'b')]
565 dtype = [('arrayName', 'S20'),('nDimensions', 'i'), ('dim2', 'i'), ('dim1', 'i'),('dim0', 'i'),('mode', 'b')]
589
590 currentDay = None
566 currentDay = None
591
592 lastTime = None
567 lastTime = None
593
568
594 def __init__(self):#, **kwargs):
569 def __init__(self):
595 Operation.__init__(self)#, **kwargs)
570
596 #self.isConfig = False
571 Operation.__init__(self)
597 return
572 return
598
573
599 def setup(self, dataOut, path=None, blocksPerFile=10, metadataList=None, dataList=None, mode=None, **kwargs):
574 def setup(self, dataOut, path=None, blocksPerFile=10, metadataList=None, dataList=None, mode=None, setType=None):
600 self.path = path
575 self.path = path
601 self.blocksPerFile = blocksPerFile
576 self.blocksPerFile = blocksPerFile
602 self.metadataList = metadataList
577 self.metadataList = metadataList
@@ -606,9 +581,9 class ParamWriter(Operation):
606 if self.mode is not None:
581 if self.mode is not None:
607 self.mode = numpy.zeros(len(self.dataList)) + mode
582 self.mode = numpy.zeros(len(self.dataList)) + mode
608 else:
583 else:
609 #self.mode = numpy.ones(len(self.dataList),int)
610 self.mode = numpy.ones(len(self.dataList))
584 self.mode = numpy.ones(len(self.dataList))
611 log.error(self.mode)#yong
585
586 self.setType = setType
612
587
613 arrayDim = numpy.zeros((len(self.dataList),5))
588 arrayDim = numpy.zeros((len(self.dataList),5))
614
589
@@ -631,11 +606,6 class ParamWriter(Operation):
631
606
632 return 0
607 return 0
633
608
634 #Not array, just a number
635 #Mode 0
636 #log.error(mode)#yong
637 #log.error(len(mode))#yong
638 #log.error(type(mode))#yong
639 if type(dataAux)==float or type(dataAux)==int:
609 if type(dataAux)==float or type(dataAux)==int:
640 dsDict['mode'] = 0
610 dsDict['mode'] = 0
641 dsDict['nDim'] = 0
611 dsDict['nDim'] = 0
@@ -644,16 +614,13 class ParamWriter(Operation):
644
614
645 #Mode 2: meteors
615 #Mode 2: meteors
646 elif self.mode[i] == 2:
616 elif self.mode[i] == 2:
647 # dsDict['nDim'] = 0
648 dsDict['dsName'] = 'table0'
617 dsDict['dsName'] = 'table0'
649 dsDict['mode'] = 2 # Mode meteors
618 dsDict['mode'] = 2 # Mode meteors
650 dsDict['shape'] = dataAux.shape[-1]
619 dsDict['shape'] = dataAux.shape[-1]
651 dsDict['nDim'] = 0
620 dsDict['nDim'] = 0
652 dsDict['dsNumber'] = 1
621 dsDict['dsNumber'] = 1
653
654 arrayDim[i,3] = dataAux.shape[-1]
622 arrayDim[i,3] = dataAux.shape[-1]
655 arrayDim[i,4] = self.mode[i] #Mode the data was stored
623 arrayDim[i,4] = self.mode[i] #Mode the data was stored
656
657 dsList.append(dsDict)
624 dsList.append(dsDict)
658
625
659 #Mode 1
626 #Mode 1
@@ -661,7 +628,6 class ParamWriter(Operation):
661 arrayDim0 = dataAux.shape #Data dimensions
628 arrayDim0 = dataAux.shape #Data dimensions
662 arrayDim[i,0] = len(arrayDim0) #Number of array dimensions
629 arrayDim[i,0] = len(arrayDim0) #Number of array dimensions
663 arrayDim[i,4] = self.mode[i] #Mode the data was stored
630 arrayDim[i,4] = self.mode[i] #Mode the data was stored
664
665 strtable = 'table'
631 strtable = 'table'
666 dsDict['mode'] = 1 # Mode parameters
632 dsDict['mode'] = 1 # Mode parameters
667
633
@@ -703,14 +669,11 class ParamWriter(Operation):
703 table = numpy.array((self.dataList[i],) + tuple(arrayDim[i,:]),dtype = dtype0)
669 table = numpy.array((self.dataList[i],) + tuple(arrayDim[i,:]),dtype = dtype0)
704 tableList.append(table)
670 tableList.append(table)
705
671
706 # self.arrayDim = arrayDim
707 self.dsList = dsList
672 self.dsList = dsList
708 self.tableDim = numpy.array(tableList, dtype = dtype0)
673 self.tableDim = numpy.array(tableList, dtype = dtype0)
709 self.blockIndex = 0
674 self.blockIndex = 0
710
711 timeTuple = time.localtime(dataOut.utctime)
675 timeTuple = time.localtime(dataOut.utctime)
712 self.currentDay = timeTuple.tm_yday
676 self.currentDay = timeTuple.tm_yday
713 return 1
714
677
715 def putMetadata(self):
678 def putMetadata(self):
716
679
@@ -744,7 +707,7 class ParamWriter(Operation):
744 filesList = os.listdir( fullpath )
707 filesList = os.listdir( fullpath )
745 filesList = sorted( filesList, key=str.lower )
708 filesList = sorted( filesList, key=str.lower )
746 if len( filesList ) > 0:
709 if len( filesList ) > 0:
747 filesList = [k for k in filesList if 'M' in k]
710 filesList = [k for k in filesList if k.startswith(self.metaoptchar)]
748 filen = filesList[-1]
711 filen = filesList[-1]
749 # el filename debera tener el siguiente formato
712 # el filename debera tener el siguiente formato
750 # 0 1234 567 89A BCDE (hex)
713 # 0 1234 567 89A BCDE (hex)
@@ -825,7 +788,7 class ParamWriter(Operation):
825
788
826 if os.path.exists(fullpath):
789 if os.path.exists(fullpath):
827 filesList = os.listdir( fullpath )
790 filesList = os.listdir( fullpath )
828 filesList = [k for k in filesList if 'D' in k]
791 filesList = [k for k in filesList if k.startswith(self.optchar)]
829 if len( filesList ) > 0:
792 if len( filesList ) > 0:
830 filesList = sorted( filesList, key=str.lower )
793 filesList = sorted( filesList, key=str.lower )
831 filen = filesList[-1]
794 filen = filesList[-1]
@@ -842,16 +805,16 class ParamWriter(Operation):
842 os.makedirs(fullpath)
805 os.makedirs(fullpath)
843 setFile = -1 #inicializo mi contador de seteo
806 setFile = -1 #inicializo mi contador de seteo
844
807
845 if None is None:
808 if self.setType is None:
846 setFile += 1
809 setFile += 1
847 file = '%s%4.4d%3.3d%03d%s' % (self.metaoptchar,
810 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
848 timeTuple.tm_year,
811 timeTuple.tm_year,
849 timeTuple.tm_yday,
812 timeTuple.tm_yday,
850 setFile,
813 setFile,
851 ext )
814 ext )
852 else:
815 else:
853 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
816 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
854 file = '%s%4.4d%3.3d%04d%s' % (self.metaoptchar,
817 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
855 timeTuple.tm_year,
818 timeTuple.tm_year,
856 timeTuple.tm_yday,
819 timeTuple.tm_yday,
857 setFile,
820 setFile,
@@ -865,9 +828,6 class ParamWriter(Operation):
865 self.writeMetadata(fp)
828 self.writeMetadata(fp)
866 #Write data
829 #Write data
867 grp = fp.create_group("Data")
830 grp = fp.create_group("Data")
868 # grp.attrs['metadata'] = self.metaFile
869
870 # grp.attrs['blocksPerFile'] = 0
871 ds = []
831 ds = []
872 data = []
832 data = []
873 dsList = self.dsList
833 dsList = self.dsList
@@ -876,13 +836,11 class ParamWriter(Operation):
876 dsInfo = dsList[i]
836 dsInfo = dsList[i]
877 #One-dimension data
837 #One-dimension data
878 if dsInfo['mode'] == 0:
838 if dsInfo['mode'] == 0:
879 # ds0 = grp.create_dataset(self.dataList[i], (1,1), maxshape=(1,self.blocksPerFile) , chunks = True, dtype='S20')
880 ds0 = grp.create_dataset(dsInfo['variable'], (1,1), maxshape=(1,self.blocksPerFile) , chunks = True, dtype=numpy.float64)
839 ds0 = grp.create_dataset(dsInfo['variable'], (1,1), maxshape=(1,self.blocksPerFile) , chunks = True, dtype=numpy.float64)
881 ds.append(ds0)
840 ds.append(ds0)
882 data.append([])
841 data.append([])
883 i += 1
842 i += 1
884 continue
843 continue
885 # nDimsForDs.append(nDims[i])
886
844
887 elif dsInfo['mode'] == 2:
845 elif dsInfo['mode'] == 2:
888 grp0 = grp.create_group(dsInfo['variable'])
846 grp0 = grp.create_group(dsInfo['variable'])
@@ -910,23 +868,14 class ParamWriter(Operation):
910 ds.append(ds0)
868 ds.append(ds0)
911 data.append([])
869 data.append([])
912 i += 1
870 i += 1
913 # nDimsForDs.append(nDims[i])
914
871
915 fp.flush()
872 fp.flush()
916 fp.close()
873 fp.close()
917
874
918 # self.nDatas = nDatas
875 log.log('creating file: {}'.format(filename), 'Writing')
919 # self.nDims = nDims
920 # self.nDimsForDs = nDimsForDs
921 #Saving variables
922 print('Writing the file: %s'%filename)
923 self.filename = filename
876 self.filename = filename
924 # self.fp = fp
925 # self.grp = grp
926 # self.grp.attrs.modify('nRecords', 1)
927 self.ds = ds
877 self.ds = ds
928 self.data = data
878 self.data = data
929 # self.setFile = setFile
930 self.firsttime = True
879 self.firsttime = True
931 self.blockIndex = 0
880 self.blockIndex = 0
932 return
881 return
@@ -936,7 +885,6 class ParamWriter(Operation):
936 if self.blockIndex == self.blocksPerFile or self.timeFlag():
885 if self.blockIndex == self.blocksPerFile or self.timeFlag():
937 self.setNextFile()
886 self.setNextFile()
938
887
939 # if not self.firsttime:
940 self.readBlock()
888 self.readBlock()
941 self.setBlock() #Prepare data to be written
889 self.setBlock() #Prepare data to be written
942 self.writeBlock() #Write data
890 self.writeBlock() #Write data
@@ -958,7 +906,6 class ParamWriter(Operation):
958 grp = fp["Data"]
906 grp = fp["Data"]
959 ind = 0
907 ind = 0
960
908
961 # grp.attrs['blocksPerFile'] = 0
962 while ind < len(dsList):
909 while ind < len(dsList):
963 dsInfo = dsList[ind]
910 dsInfo = dsList[ind]
964
911
@@ -1032,8 +979,6 class ParamWriter(Operation):
1032
979
1033 # First time
980 # First time
1034 if self.firsttime:
981 if self.firsttime:
1035 # self.ds[i].resize(self.data[i].shape)
1036 # self.ds[i][self.blockIndex,:] = self.data[i]
1037 if type(self.data[i]) == numpy.ndarray:
982 if type(self.data[i]) == numpy.ndarray:
1038
983
1039 if nDim == 3:
984 if nDim == 3:
@@ -1076,14 +1021,12 class ParamWriter(Operation):
1076 self.fp.close()
1021 self.fp.close()
1077 return
1022 return
1078
1023
1079 def run(self, dataOut, path, blocksPerFile=10, metadataList=None, dataList=None, mode=None, **kwargs):
1024 def run(self, dataOut, path, blocksPerFile=10, metadataList=None, dataList=None, mode=None, setType=None):
1080
1025
1081 if not(self.isConfig):
1026 if not(self.isConfig):
1082 flagdata = self.setup(dataOut, path=path, blocksPerFile=blocksPerFile,
1027 self.setup(dataOut, path=path, blocksPerFile=blocksPerFile,
1083 metadataList=metadataList, dataList=dataList, mode=mode, **kwargs)
1028 metadataList=metadataList, dataList=dataList, mode=mode,
1084
1029 setType=setType)
1085 if not(flagdata):
1086 return
1087
1030
1088 self.isConfig = True
1031 self.isConfig = True
1089 self.setNextFile()
1032 self.setNextFile()
@@ -190,6 +190,8 def MPDecorator(BaseClass):
190 self.sender = None
190 self.sender = None
191 self.receiver = None
191 self.receiver = None
192 self.name = BaseClass.__name__
192 self.name = BaseClass.__name__
193 if 'plot' in self.name.lower():
194 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
193 self.start_time = time.time()
195 self.start_time = time.time()
194
196
195 if len(self.args) is 3:
197 if len(self.args) is 3:
@@ -257,16 +259,13 def MPDecorator(BaseClass):
257 elif optype == 'external':
259 elif optype == 'external':
258 self.publish(self.dataOut, opId)
260 self.publish(self.dataOut, opId)
259
261
260 if self.dataOut.flagNoData and self.dataOut.error is None:
262 if self.dataOut.flagNoData and not self.dataOut.error:
261 continue
263 continue
262
264
263 self.publish(self.dataOut, self.id)
265 self.publish(self.dataOut, self.id)
264
266
265 if self.dataOut.error:
267 if self.dataOut.error:
266 if self.dataOut.error[0] == -1:
268 log.error(self.dataOut.error, self.name)
267 log.error(self.dataOut.error[1], self.name)
268 if self.dataOut.error[0] == 1:
269 log.success(self.dataOut.error[1], self.name)
270 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
269 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
271 break
270 break
272
271
@@ -285,8 +284,9 def MPDecorator(BaseClass):
285
284
286 BaseClass.run(self, **self.kwargs)
285 BaseClass.run(self, **self.kwargs)
287
286
288 if self.dataOut.flagNoData:
287 if self.dataIn.error:
289 continue
288 self.dataOut.error = self.dataIn.error
289 self.dataOut.flagNoData = True
290
290
291 for op, optype, opId, kwargs in self.operations:
291 for op, optype, opId, kwargs in self.operations:
292 if optype == 'self':
292 if optype == 'self':
@@ -294,6 +294,7 def MPDecorator(BaseClass):
294 elif optype == 'other':
294 elif optype == 'other':
295 self.dataOut = op.run(self.dataOut, **kwargs)
295 self.dataOut = op.run(self.dataOut, **kwargs)
296 elif optype == 'external':
296 elif optype == 'external':
297 if not self.dataOut.flagNoData or self.dataOut.error:
297 self.publish(self.dataOut, opId)
298 self.publish(self.dataOut, opId)
298
299
299 self.publish(self.dataOut, self.id)
300 self.publish(self.dataOut, self.id)
@@ -316,6 +317,7 def MPDecorator(BaseClass):
316
317
317 if dataOut.error:
318 if dataOut.error:
318 break
319 break
320
319 time.sleep(1)
321 time.sleep(1)
320
322
321 def run(self):
323 def run(self):
General Comments 0
You need to be logged in to leave comments. Login now