##// END OF EJS Templates
Merge branch 'v3.0-devel' of http://jro-dev.igp.gob.pe/rhodecode/schain into v3.0-devel
Juan C. Espinoza -
r1175:7e36d2e90f1d merge
parent child
Show More
@@ -1,23 +1,36
1 '''
1 '''
2 Updated on January , 2018, for multiprocessing purposes
3 Author: Sergio Cortez
2 Created on September , 2012
4 Created on September , 2012
3 @author:
4 '''
5 '''
5
6 from platform import python_version
6 import sys
7 import sys
7 import ast
8 import ast
8 import datetime
9 import datetime
9 import traceback
10 import traceback
10 import math
11 import math
11 import time
12 import time
13 import zmq
12 from multiprocessing import Process, cpu_count
14 from multiprocessing import Process, cpu_count
13
15
14 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
15 from xml.dom import minidom
17 from xml.dom import minidom
16
18
17 import schainpy
19
18 from schainpy.admin import Alarm, SchainWarning
20 from schainpy.admin import Alarm, SchainWarning
19 from schainpy.model import *
21
22 ### Temporary imports!!!
23 # from schainpy.model import *
24 from schainpy.model.io import *
25 from schainpy.model.graphics import *
26 from schainpy.model.proc.jroproc_base import *
27 from schainpy.model.proc.bltrproc_parameters import *
28 from schainpy.model.proc.jroproc_spectra import *
29 from schainpy.model.proc.jroproc_voltage import *
30 from schainpy.model.proc.jroproc_parameters import *
31 from schainpy.model.utils.jroutils_publish import *
20 from schainpy.utils import log
32 from schainpy.utils import log
33 ###
21
34
22 DTYPES = {
35 DTYPES = {
23 'Voltage': '.r',
36 'Voltage': '.r',
@@ -77,7 +90,6 def MPProject(project, n=cpu_count()):
77
90
78 time.sleep(3)
91 time.sleep(3)
79
92
80
81 class ParameterConf():
93 class ParameterConf():
82
94
83 id = None
95 id = None
@@ -267,7 +279,6 class ParameterConf():
267
279
268 print('Parameter[%s]: name = %s, value = %s, format = %s' % (self.id, self.name, self.value, self.format))
280 print('Parameter[%s]: name = %s, value = %s, format = %s' % (self.id, self.name, self.value, self.format))
269
281
270
271 class OperationConf():
282 class OperationConf():
272
283
273 id = None
284 id = None
@@ -284,12 +295,15 class OperationConf():
284 self.id = '0'
295 self.id = '0'
285 self.name = None
296 self.name = None
286 self.priority = None
297 self.priority = None
287 self.type = 'self'
298 self.topic = None
288
299
289 def __getNewId(self):
300 def __getNewId(self):
290
301
291 return int(self.id) * 10 + len(self.parmConfObjList) + 1
302 return int(self.id) * 10 + len(self.parmConfObjList) + 1
292
303
304 def getId(self):
305 return self.id
306
293 def updateId(self, new_id):
307 def updateId(self, new_id):
294
308
295 self.id = str(new_id)
309 self.id = str(new_id)
@@ -361,7 +375,6 class OperationConf():
361 self.name = name
375 self.name = name
362 self.type = type
376 self.type = type
363 self.priority = priority
377 self.priority = priority
364
365 self.parmConfObjList = []
378 self.parmConfObjList = []
366
379
367 def removeParameters(self):
380 def removeParameters(self):
@@ -443,26 +456,18 class OperationConf():
443 for parmConfObj in self.parmConfObjList:
456 for parmConfObj in self.parmConfObjList:
444 parmConfObj.printattr()
457 parmConfObj.printattr()
445
458
446 def createObject(self, plotter_queue=None):
459 def createObject(self):
447
448 if self.type == 'self':
449 raise ValueError('This operation type cannot be created')
450
451 if self.type == 'plotter':
452 if not plotter_queue:
453 raise ValueError('plotter_queue is not defined. Use:\nmyProject = Project()\nmyProject.setPlotterQueue(plotter_queue)')
454
455 opObj = Plotter(self.name, plotter_queue)
456
457 if self.type == 'external' or self.type == 'other':
458
460
459 className = eval(self.name)
461 className = eval(self.name)
460 kwargs = self.getKwargs()
462 kwargs = self.getKwargs()
461
463
462 opObj = className(**kwargs)
464 opObj = className(self.id, **kwargs)
463
465
464 return opObj
466 opObj.start()
465
467
468 print(' Operation created')
469
470 return opObj
466
471
467 class ProcUnitConf():
472 class ProcUnitConf():
468
473
@@ -508,13 +513,13 class ProcUnitConf():
508 return self.id
513 return self.id
509
514
510 def updateId(self, new_id, parentId=parentId):
515 def updateId(self, new_id, parentId=parentId):
511
516 '''
512 new_id = int(parentId) * 10 + (int(self.id) % 10)
517 new_id = int(parentId) * 10 + (int(self.id) % 10)
513 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
518 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
514
519
515 # If this proc unit has not inputs
520 # If this proc unit has not inputs
516 if self.inputId == '0':
521 #if self.inputId == '0':
517 new_inputId = 0
522 #new_inputId = 0
518
523
519 n = 1
524 n = 1
520 for opConfObj in self.opConfObjList:
525 for opConfObj in self.opConfObjList:
@@ -526,8 +531,9 class ProcUnitConf():
526
531
527 self.parentId = str(parentId)
532 self.parentId = str(parentId)
528 self.id = str(new_id)
533 self.id = str(new_id)
529 self.inputId = str(new_inputId)
534 #self.inputId = str(new_inputId)
530
535 '''
536 n = 1
531 def getInputId(self):
537 def getInputId(self):
532
538
533 return self.inputId
539 return self.inputId
@@ -560,11 +566,17 class ProcUnitConf():
560 return self.procUnitObj
566 return self.procUnitObj
561
567
562 def setup(self, id, name, datatype, inputId, parentId=None):
568 def setup(self, id, name, datatype, inputId, parentId=None):
569 '''
570 id sera el topico a publicar
571 inputId sera el topico a subscribirse
572 '''
563
573
564 # Compatible with old signal chain version
574 # Compatible with old signal chain version
565 if datatype == None and name == None:
575 if datatype == None and name == None:
566 raise ValueError('datatype or name should be defined')
576 raise ValueError('datatype or name should be defined')
567
577
578 #Definir una condicion para inputId cuando sea 0
579
568 if name == None:
580 if name == None:
569 if 'Proc' in datatype:
581 if 'Proc' in datatype:
570 name = datatype
582 name = datatype
@@ -579,7 +591,6 class ProcUnitConf():
579 self.datatype = datatype
591 self.datatype = datatype
580 self.inputId = inputId
592 self.inputId = inputId
581 self.parentId = parentId
593 self.parentId = parentId
582
583 self.opConfObjList = []
594 self.opConfObjList = []
584
595
585 self.addOperation(name='run', optype='self')
596 self.addOperation(name='run', optype='self')
@@ -603,9 +614,15 class ProcUnitConf():
603 return opObj
614 return opObj
604
615
605 def addOperation(self, name, optype='self'):
616 def addOperation(self, name, optype = 'self'):
617 '''
618 Actualizacion - > proceso comunicacion
619 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
620 definir el tipoc de socket o comunicacion ipc++
621
622 '''
606
623
607 id = self.__getNewId()
624 id = self.__getNewId()
608 priority = self.__getPriority()
625 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
609
626
610 opConfObj = OperationConf()
627 opConfObj = OperationConf()
611 opConfObj.setup(id, name=name, priority=priority, type=optype)
628 opConfObj.setup(id, name=name, priority=priority, type=optype)
@@ -668,11 +685,15 class ProcUnitConf():
668
685
669 return kwargs
686 return kwargs
670
687
671 def createObjects(self, plotter_queue=None):
688 def createObjects(self, dictUnits):
689 '''
690 Instancia de unidades de procesamiento.
672
691
692 '''
673 className = eval(self.name)
693 className = eval(self.name)
674 kwargs = self.getKwargs()
694 kwargs = self.getKwargs()
675 procUnitObj = className(**kwargs)
695 procUnitObj = className(self.id, self.inputId, dictUnits, **kwargs) # necesitan saber su id y su entrada por fines de ipc
696
676
697
677 for opConfObj in self.opConfObjList:
698 for opConfObj in self.opConfObjList:
678
699
@@ -682,21 +703,25 class ProcUnitConf():
682 procUnitObj.addOperationKwargs(
703 procUnitObj.addOperationKwargs(
683 opConfObj.id, **opConfObj.getKwargs())
704 opConfObj.id, **opConfObj.getKwargs())
684 continue
705 continue
706 print("Creating operation process:", opConfObj.name, "for", self.name)
707 opObj = opConfObj.createObject()
685
708
686 opObj = opConfObj.createObject(plotter_queue)
687
709
688 self.opObjDict[opConfObj.id] = opObj
710 #self.opObjDict[opConfObj.id] = opObj.name
689
711
690 procUnitObj.addOperation(opObj, opConfObj.id)
712 procUnitObj.addOperation(opConfObj.name, opConfObj.id)
713
714 procUnitObj.start()
691
715
692 self.procUnitObj = procUnitObj
716 self.procUnitObj = procUnitObj
693
717
718
694 return procUnitObj
719 return procUnitObj
695
720
696 def run(self):
721 def run(self):
697
722
698 is_ok = False
723 is_ok = True
699
724 """
700 for opConfObj in self.opConfObjList:
725 for opConfObj in self.opConfObjList:
701
726
702 kwargs = {}
727 kwargs = {}
@@ -712,8 +737,10 class ProcUnitConf():
712
737
713 is_ok = is_ok or sts
738 is_ok = is_ok or sts
714
739
740 """
715 return is_ok
741 return is_ok
716
742
743
717 def close(self):
744 def close(self):
718
745
719 for opConfObj in self.opConfObjList:
746 for opConfObj in self.opConfObjList:
@@ -757,6 +784,15 class ReadUnitConf(ProcUnitConf):
757 def setup(self, id, name, datatype, path='', startDate='', endDate='',
784 def setup(self, id, name, datatype, path='', startDate='', endDate='',
758 startTime='', endTime='', parentId=None, server=None, **kwargs):
785 startTime='', endTime='', parentId=None, server=None, **kwargs):
759
786
787
788 '''
789 *****el id del proceso sera el Topico
790
791 Adicion de {topic}, si no esta presente -> error
792 kwargs deben ser trasmitidos en la instanciacion
793
794 '''
795
760 # Compatible with old signal chain version
796 # Compatible with old signal chain version
761 if datatype == None and name == None:
797 if datatype == None and name == None:
762 raise ValueError('datatype or name should be defined')
798 raise ValueError('datatype or name should be defined')
@@ -892,7 +928,6 class ReadUnitConf(ProcUnitConf):
892 class Project(Process):
928 class Project(Process):
893
929
894 id = None
930 id = None
895 # name = None
896 description = None
931 description = None
897 filename = None
932 filename = None
898
933
@@ -900,16 +935,15 class Project(Process):
900
935
901 ELEMENTNAME = 'Project'
936 ELEMENTNAME = 'Project'
902
937
903 plotterQueue = None
904
938
905 def __init__(self, plotter_queue=None):
939
940 def __init__(self):
906
941
907 Process.__init__(self)
942 Process.__init__(self)
908 self.id = None
943 self.id = None
909 self.description = None
944 self.description = None
910 self.email = None
945 self.email = None
911 self.alarm = None
946 self.alarm = None
912 self.plotterQueue = plotter_queue
913 self.procUnitConfObjDict = {}
947 self.procUnitConfObjDict = {}
914
948
915 def __getNewId(self):
949 def __getNewId(self):
@@ -958,11 +992,13 class Project(Process):
958
992
959 def setup(self, id, name='', description='', email=None, alarm=[]):
993 def setup(self, id, name='', description='', email=None, alarm=[]):
960
994
961 print()
995 print(' ')
962 print('*' * 60)
996 print('*' * 60)
963 print(' Starting SIGNAL CHAIN PROCESSING v%s ' % schainpy.__version__)
997 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
964 print('*' * 60)
998 print('*' * 60)
965 print()
999 print("* Python " + python_version() + " *")
1000 print('*' * 19)
1001 print(' ')
966 self.id = str(id)
1002 self.id = str(id)
967 self.description = description
1003 self.description = description
968 self.email = email
1004 self.email = email
@@ -981,6 +1017,14 class Project(Process):
981
1017
982 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
1018 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
983
1019
1020 '''
1021 Actualizacion:
1022 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
1023
1024 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
1025
1026 '''
1027
984 if id is None:
1028 if id is None:
985 idReadUnit = self.__getNewId()
1029 idReadUnit = self.__getNewId()
986 else:
1030 else:
@@ -996,11 +1040,21 class Project(Process):
996
1040
997 def addProcUnit(self, inputId='0', datatype=None, name=None):
1041 def addProcUnit(self, inputId='0', datatype=None, name=None):
998
1042
999 idProcUnit = self.__getNewId()
1043 '''
1044 Actualizacion:
1045 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
1046 Deberia reemplazar a "inputId"
1047
1048 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
1049 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
1050
1051 '''
1052
1053 idProcUnit = self.__getNewId() #Topico para subscripcion
1000
1054
1001 procUnitConfObj = ProcUnitConf()
1055 procUnitConfObj = ProcUnitConf()
1002 procUnitConfObj.setup(idProcUnit, name, datatype,
1056 procUnitConfObj.setup(idProcUnit, name, datatype, inputId, #topic_read, topic_write,
1003 inputId, parentId=self.id)
1057 parentId=self.id)
1004
1058
1005 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1059 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1006
1060
@@ -1156,29 +1210,11 class Project(Process):
1156 def createObjects(self):
1210 def createObjects(self):
1157
1211
1158 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1212 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1159 procUnitConfObj.createObjects(self.plotterQueue)
1213 print("Creating process:", procUnitConfObj.name)
1160
1214 procUnitConfObj.createObjects(self.procUnitConfObjDict)
1161 def __connect(self, objIN, thisObj):
1162
1163 thisObj.setInput(objIN.getOutputObj())
1164
1215
1165 def connectObjects(self):
1166
1216
1167 for thisPUConfObj in list(self.procUnitConfObjDict.values()):
1217 print('All processes were created')
1168
1169 inputId = thisPUConfObj.getInputId()
1170
1171 if int(inputId) == 0:
1172 continue
1173
1174 # Get input object
1175 puConfINObj = self.procUnitConfObjDict[inputId]
1176 puObjIN = puConfINObj.getProcUnitObj()
1177
1178 # Get current object
1179 thisPUObj = thisPUConfObj.getProcUnitObj()
1180
1181 self.__connect(puObjIN, thisPUObj)
1182
1218
1183 def __handleError(self, procUnitConfObj, modes=None, stdout=True):
1219 def __handleError(self, procUnitConfObj, modes=None, stdout=True):
1184
1220
@@ -1268,75 +1304,30 class Project(Process):
1268
1304
1269 self.filename = filename
1305 self.filename = filename
1270
1306
1271 def setPlotterQueue(self, plotter_queue):
1307 def setProxyCom(self):
1272
1308
1273 raise NotImplementedError('Use schainpy.controller_api.ControllerThread instead Project class')
1309 ctx = zmq.Context()
1310 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
1311 xsub = ctx.socket(zmq.XSUB)
1312 xsub.bind('ipc:///tmp/socketTmp/a')
1313 xpub = ctx.socket(zmq.XPUB)
1314 xpub.bind('ipc:///tmp/socketTmp/b')
1274
1315
1275 def getPlotterQueue(self):
1316 print("Controller Ready: Processes and proxy created")
1317 zmq.proxy(xsub, xpub)
1276
1318
1277 raise NotImplementedError('Use schainpy.controller_api.ControllerThread instead Project class')
1278
1319
1279 def useExternalPlotter(self):
1280
1281 raise NotImplementedError('Use schainpy.controller_api.ControllerThread instead Project class')
1282
1320
1283 def run(self):
1321 def run(self):
1284
1322
1285 log.success('Starting {}'.format(self.name), tag='')
1323 log.success('Starting {}'.format(self.name), tag='')
1286 self.start_time = time.time()
1324 self.start_time = time.time()
1287 self.createObjects()
1325 self.createObjects()
1288 self.connectObjects()
1326 self.setProxyCom()
1289
1290 keyList = list(self.procUnitConfObjDict.keys())
1291 keyList.sort()
1292
1293 err = None
1294
1327
1295 while(True):
1328 # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo
1296
1297 is_ok = False
1298
1299 for procKey in keyList:
1300
1301 procUnitConfObj = self.procUnitConfObjDict[procKey]
1302
1303 try:
1304 sts = procUnitConfObj.run()
1305 is_ok = is_ok or sts
1306 except SchainWarning:
1307 err = self.__handleError(procUnitConfObj, modes=[2, 3], stdout=False)
1308 is_ok = False
1309 break
1310 except KeyboardInterrupt:
1311 is_ok = False
1312 break
1313 except ValueError as e:
1314 time.sleep(0.5)
1315 err = self.__handleError(procUnitConfObj)
1316 is_ok = False
1317 break
1318 except:
1319 time.sleep(0.5)
1320 err = self.__handleError(procUnitConfObj)
1321 is_ok = False
1322 break
1323
1324 # If every process unit finished so end process
1325 if not(is_ok):
1326 break
1327
1328 if not self.runController():
1329 break
1330
1329
1331 # Closing every process
1330 # Closing every process
1332 for procKey in keyList:
1333 procUnitConfObj = self.procUnitConfObjDict[procKey]
1334 procUnitConfObj.close()
1335
1336 if err is not None:
1337 err.start()
1338 # err.join()
1339
1340 log.success('{} finished (time: {}s)'.format(
1331 log.success('{} finished (time: {}s)'.format(
1341 self.name,
1332 self.name,
1342 time.time()-self.start_time)) No newline at end of file
1333 time.time()-self.start_time))
@@ -3,7 +3,8 import numpy
3 import time, datetime
3 import time, datetime
4 from schainpy.model.graphics import mpldriver
4 from schainpy.model.graphics import mpldriver
5
5
6 from schainpy.model.proc.jroproc_base import Operation
6 from schainpy.model.proc.jroproc_base import MPDecorator, Operation
7
7
8
8 def isTimeInHourRange(datatime, xmin, xmax):
9 def isTimeInHourRange(datatime, xmin, xmax):
9
10
@@ -62,9 +63,9 class Figure(Operation):
62
63
63 created = False
64 created = False
64 parameters = {}
65 parameters = {}
65 def __init__(self, **kwargs):
66 def __init__(self):#, **kwargs):
66
67
67 Operation.__init__(self, **kwargs)
68 Operation.__init__(self)#, **kwargs)
68
69
69 def __del__(self):
70 def __del__(self):
70
71
@@ -9,8 +9,11 import numpy
9
9
10 from .figure import Figure, isRealtime, isTimeInHourRange
10 from .figure import Figure, isRealtime, isTimeInHourRange
11 from .plotting_codes import *
11 from .plotting_codes import *
12 from schainpy.model.proc.jroproc_base import MPDecorator
12
13
14 from schainpy.utils import log
13
15
16 @MPDecorator
14 class SpectraPlot(Figure):
17 class SpectraPlot(Figure):
15
18
16 isConfig = None
19 isConfig = None
@@ -20,11 +23,10 class SpectraPlot(Figure):
20 HEIGHTPROF = None
23 HEIGHTPROF = None
21 PREFIX = 'spc'
24 PREFIX = 'spc'
22
25
23 def __init__(self, **kwargs):
26 def __init__(self):#, **kwargs):
24 Figure.__init__(self, **kwargs)
27 Figure.__init__(self)#, **kwargs)
25 self.isConfig = False
28 self.isConfig = False
26 self.__nsubplots = 1
29 self.__nsubplots = 1
27
28 self.WIDTH = 250
30 self.WIDTH = 250
29 self.HEIGHT = 250
31 self.HEIGHT = 250
30 self.WIDTHPROF = 120
32 self.WIDTHPROF = 120
@@ -104,6 +106,9 class SpectraPlot(Figure):
104 zmin : None,
106 zmin : None,
105 zmax : None
107 zmax : None
106 """
108 """
109 if dataOut.flagNoData:
110 return dataOut
111
107 if realtime:
112 if realtime:
108 if not(isRealtime(utcdatatime = dataOut.utctime)):
113 if not(isRealtime(utcdatatime = dataOut.utctime)):
109 print('Skipping this plot function')
114 print('Skipping this plot function')
@@ -219,6 +224,8 class SpectraPlot(Figure):
219 wr_period=wr_period,
224 wr_period=wr_period,
220 thisDatetime=thisDatetime)
225 thisDatetime=thisDatetime)
221
226
227 return dataOut
228 @MPDecorator
222 class CrossSpectraPlot(Figure):
229 class CrossSpectraPlot(Figure):
223
230
224 isConfig = None
231 isConfig = None
@@ -230,8 +237,8 class CrossSpectraPlot(Figure):
230 HEIGHTPROF = None
237 HEIGHTPROF = None
231 PREFIX = 'cspc'
238 PREFIX = 'cspc'
232
239
233 def __init__(self, **kwargs):
240 def __init__(self):#, **kwargs):
234 Figure.__init__(self, **kwargs)
241 Figure.__init__(self)#, **kwargs)
235 self.isConfig = False
242 self.isConfig = False
236 self.__nsubplots = 4
243 self.__nsubplots = 4
237 self.counter_imagwr = 0
244 self.counter_imagwr = 0
@@ -301,6 +308,9 class CrossSpectraPlot(Figure):
301 zmax : None
308 zmax : None
302 """
309 """
303
310
311 if dataOut.flagNoData:
312 return dataOut
313
304 if pairsList == None:
314 if pairsList == None:
305 pairsIndexList = dataOut.pairsIndexList
315 pairsIndexList = dataOut.pairsIndexList
306 else:
316 else:
@@ -440,7 +450,9 class CrossSpectraPlot(Figure):
440 wr_period=wr_period,
450 wr_period=wr_period,
441 thisDatetime=thisDatetime)
451 thisDatetime=thisDatetime)
442
452
453 return dataOut
443
454
455 @MPDecorator
444 class RTIPlot(Figure):
456 class RTIPlot(Figure):
445
457
446 __isConfig = None
458 __isConfig = None
@@ -450,9 +462,9 class RTIPlot(Figure):
450 HEIGHTPROF = None
462 HEIGHTPROF = None
451 PREFIX = 'rti'
463 PREFIX = 'rti'
452
464
453 def __init__(self, **kwargs):
465 def __init__(self):#, **kwargs):
454
466
455 Figure.__init__(self, **kwargs)
467 Figure.__init__(self)#, **kwargs)
456 self.timerange = None
468 self.timerange = None
457 self.isConfig = False
469 self.isConfig = False
458 self.__nsubplots = 1
470 self.__nsubplots = 1
@@ -540,6 +552,8 class RTIPlot(Figure):
540 zmin : None,
552 zmin : None,
541 zmax : None
553 zmax : None
542 """
554 """
555 if dataOut.flagNoData:
556 return dataOut
543
557
544 #colormap = kwargs.get('colormap', 'jet')
558 #colormap = kwargs.get('colormap', 'jet')
545 if HEIGHT is not None:
559 if HEIGHT is not None:
@@ -650,7 +664,9 class RTIPlot(Figure):
650 wr_period=wr_period,
664 wr_period=wr_period,
651 thisDatetime=thisDatetime,
665 thisDatetime=thisDatetime,
652 update_figfile=update_figfile)
666 update_figfile=update_figfile)
667 return dataOut
653
668
669 @MPDecorator
654 class CoherenceMap(Figure):
670 class CoherenceMap(Figure):
655 isConfig = None
671 isConfig = None
656 __nsubplots = None
672 __nsubplots = None
@@ -659,8 +675,8 class CoherenceMap(Figure):
659 HEIGHTPROF = None
675 HEIGHTPROF = None
660 PREFIX = 'cmap'
676 PREFIX = 'cmap'
661
677
662 def __init__(self, **kwargs):
678 def __init__(self):#, **kwargs):
663 Figure.__init__(self, **kwargs)
679 Figure.__init__(self)#, **kwargs)
664 self.timerange = 2*60*60
680 self.timerange = 2*60*60
665 self.isConfig = False
681 self.isConfig = False
666 self.__nsubplots = 1
682 self.__nsubplots = 1
@@ -723,6 +739,10 class CoherenceMap(Figure):
723 server=None, folder=None, username=None, password=None,
739 server=None, folder=None, username=None, password=None,
724 ftp_wei=0, exp_code=0, sub_exp_code=0, plot_pos=0):
740 ftp_wei=0, exp_code=0, sub_exp_code=0, plot_pos=0):
725
741
742
743 if dataOut.flagNoData:
744 return dataOut
745
726 if not isTimeInHourRange(dataOut.datatime, xmin, xmax):
746 if not isTimeInHourRange(dataOut.datatime, xmin, xmax):
727 return
747 return
728
748
@@ -855,6 +875,9 class CoherenceMap(Figure):
855 thisDatetime=thisDatetime,
875 thisDatetime=thisDatetime,
856 update_figfile=update_figfile)
876 update_figfile=update_figfile)
857
877
878 return dataOut
879
880 @MPDecorator
858 class PowerProfilePlot(Figure):
881 class PowerProfilePlot(Figure):
859
882
860 isConfig = None
883 isConfig = None
@@ -864,8 +887,8 class PowerProfilePlot(Figure):
864 HEIGHTPROF = None
887 HEIGHTPROF = None
865 PREFIX = 'spcprofile'
888 PREFIX = 'spcprofile'
866
889
867 def __init__(self, **kwargs):
890 def __init__(self):#, **kwargs):
868 Figure.__init__(self, **kwargs)
891 Figure.__init__(self)#, **kwargs)
869 self.isConfig = False
892 self.isConfig = False
870 self.__nsubplots = 1
893 self.__nsubplots = 1
871
894
@@ -907,6 +930,9 class PowerProfilePlot(Figure):
907 ftp=False, wr_period=1, server=None,
930 ftp=False, wr_period=1, server=None,
908 folder=None, username=None, password=None):
931 folder=None, username=None, password=None):
909
932
933 if dataOut.flagNoData:
934 return dataOut
935
910
936
911 if channelList == None:
937 if channelList == None:
912 channelIndexList = dataOut.channelIndexList
938 channelIndexList = dataOut.channelIndexList
@@ -979,6 +1005,9 class PowerProfilePlot(Figure):
979 wr_period=wr_period,
1005 wr_period=wr_period,
980 thisDatetime=thisDatetime)
1006 thisDatetime=thisDatetime)
981
1007
1008 return dataOut
1009
1010 @MPDecorator
982 class SpectraCutPlot(Figure):
1011 class SpectraCutPlot(Figure):
983
1012
984 isConfig = None
1013 isConfig = None
@@ -988,8 +1017,8 class SpectraCutPlot(Figure):
988 HEIGHTPROF = None
1017 HEIGHTPROF = None
989 PREFIX = 'spc_cut'
1018 PREFIX = 'spc_cut'
990
1019
991 def __init__(self, **kwargs):
1020 def __init__(self):#, **kwargs):
992 Figure.__init__(self, **kwargs)
1021 Figure.__init__(self)#, **kwargs)
993 self.isConfig = False
1022 self.isConfig = False
994 self.__nsubplots = 1
1023 self.__nsubplots = 1
995
1024
@@ -1032,6 +1061,8 class SpectraCutPlot(Figure):
1032 folder=None, username=None, password=None,
1061 folder=None, username=None, password=None,
1033 xaxis="frequency"):
1062 xaxis="frequency"):
1034
1063
1064 if dataOut.flagNoData:
1065 return dataOut
1035
1066
1036 if channelList == None:
1067 if channelList == None:
1037 channelIndexList = dataOut.channelIndexList
1068 channelIndexList = dataOut.channelIndexList
@@ -1111,6 +1142,9 class SpectraCutPlot(Figure):
1111 wr_period=wr_period,
1142 wr_period=wr_period,
1112 thisDatetime=thisDatetime)
1143 thisDatetime=thisDatetime)
1113
1144
1145 return dataOut
1146
1147 @MPDecorator
1114 class Noise(Figure):
1148 class Noise(Figure):
1115
1149
1116 isConfig = None
1150 isConfig = None
@@ -1119,8 +1153,8 class Noise(Figure):
1119 PREFIX = 'noise'
1153 PREFIX = 'noise'
1120
1154
1121
1155
1122 def __init__(self, **kwargs):
1156 def __init__(self):#, **kwargs):
1123 Figure.__init__(self, **kwargs)
1157 Figure.__init__(self)#, **kwargs)
1124 self.timerange = 24*60*60
1158 self.timerange = 24*60*60
1125 self.isConfig = False
1159 self.isConfig = False
1126 self.__nsubplots = 1
1160 self.__nsubplots = 1
@@ -1209,6 +1243,9 class Noise(Figure):
1209 server=None, folder=None, username=None, password=None,
1243 server=None, folder=None, username=None, password=None,
1210 ftp_wei=0, exp_code=0, sub_exp_code=0, plot_pos=0):
1244 ftp_wei=0, exp_code=0, sub_exp_code=0, plot_pos=0):
1211
1245
1246 if dataOut.flagNoData:
1247 return dataOut
1248
1212 if not isTimeInHourRange(dataOut.datatime, xmin, xmax):
1249 if not isTimeInHourRange(dataOut.datatime, xmin, xmax):
1213 return
1250 return
1214
1251
@@ -1312,6 +1349,9 class Noise(Figure):
1312 if save:
1349 if save:
1313 self.save_data(self.filename_noise, noisedB, thisDatetime)
1350 self.save_data(self.filename_noise, noisedB, thisDatetime)
1314
1351
1352 return dataOut
1353
1354 @MPDecorator
1315 class BeaconPhase(Figure):
1355 class BeaconPhase(Figure):
1316
1356
1317 __isConfig = None
1357 __isConfig = None
@@ -1319,8 +1359,8 class BeaconPhase(Figure):
1319
1359
1320 PREFIX = 'beacon_phase'
1360 PREFIX = 'beacon_phase'
1321
1361
1322 def __init__(self, **kwargs):
1362 def __init__(self):#, **kwargs):
1323 Figure.__init__(self, **kwargs)
1363 Figure.__init__(self)#, **kwargs)
1324 self.timerange = 24*60*60
1364 self.timerange = 24*60*60
1325 self.isConfig = False
1365 self.isConfig = False
1326 self.__nsubplots = 1
1366 self.__nsubplots = 1
@@ -1399,6 +1439,9 class BeaconPhase(Figure):
1399 server=None, folder=None, username=None, password=None,
1439 server=None, folder=None, username=None, password=None,
1400 ftp_wei=0, exp_code=0, sub_exp_code=0, plot_pos=0):
1440 ftp_wei=0, exp_code=0, sub_exp_code=0, plot_pos=0):
1401
1441
1442 if dataOut.flagNoData:
1443 return dataOut
1444
1402 if not isTimeInHourRange(dataOut.datatime, xmin, xmax):
1445 if not isTimeInHourRange(dataOut.datatime, xmin, xmax):
1403 return
1446 return
1404
1447
@@ -1540,3 +1583,5 class BeaconPhase(Figure):
1540 wr_period=wr_period,
1583 wr_period=wr_period,
1541 thisDatetime=thisDatetime,
1584 thisDatetime=thisDatetime,
1542 update_figfile=update_figfile)
1585 update_figfile=update_figfile)
1586
1587 return dataOut #Yong No newline at end of file
@@ -6,15 +6,18 Created on Jul 9, 2014
6 import os
6 import os
7 import datetime
7 import datetime
8 import numpy
8 import numpy
9
9 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator #YONG
10 from schainpy.utils import log
10 from .figure import Figure
11 from .figure import Figure
11
12
13
14 @MPDecorator
12 class Scope(Figure):
15 class Scope(Figure):
13
16
14 isConfig = None
17 isConfig = None
15
18
16 def __init__(self, **kwargs):
19 def __init__(self):#, **kwargs): #YONG
17 Figure.__init__(self, **kwargs)
20 Figure.__init__(self)#, **kwargs)
18 self.isConfig = False
21 self.isConfig = False
19 self.WIDTH = 300
22 self.WIDTH = 300
20 self.HEIGHT = 200
23 self.HEIGHT = 200
@@ -127,6 +130,8 class Scope(Figure):
127 ymin : None,
130 ymin : None,
128 ymax : None,
131 ymax : None,
129 """
132 """
133 if dataOut.flagNoData:
134 return dataOut
130
135
131 if channelList == None:
136 if channelList == None:
132 channelIndexList = dataOut.channelIndexList
137 channelIndexList = dataOut.channelIndexList
@@ -223,3 +228,5 class Scope(Figure):
223 ftp=ftp,
228 ftp=ftp,
224 wr_period=wr_period,
229 wr_period=wr_period,
225 thisDatetime=thisDatetime)
230 thisDatetime=thisDatetime)
231
232 return dataOut No newline at end of file
@@ -20,7 +20,6 try:
20 except:
20 except:
21 from time import sleep
21 from time import sleep
22
22
23 import schainpy.admin
24 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
23 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
25 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
24 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
26 from schainpy.utils import log
25 from schainpy.utils import log
@@ -885,6 +884,7 class JRODataReader(JRODataIO):
885 self.flagIsNewFile = 0
884 self.flagIsNewFile = 0
886 self.fp = None
885 self.fp = None
887 self.flagNoMoreFiles = 1
886 self.flagNoMoreFiles = 1
887 # print '[Reading] No more files to read'
888
888
889 return fileOk_flag
889 return fileOk_flag
890
890
@@ -898,7 +898,7 class JRODataReader(JRODataIO):
898 newFile = self.__setNextFileOffline()
898 newFile = self.__setNextFileOffline()
899
899
900 if not(newFile):
900 if not(newFile):
901 raise schainpy.admin.SchainWarning('No more files to read')
901 raise(schainpy.admin.SchainWarning('No more files to read'))
902 return 0
902 return 0
903
903
904 if self.verbose:
904 if self.verbose:
@@ -1052,7 +1052,7 class JRODataReader(JRODataIO):
1052 # Skip block out of startTime and endTime
1052 # Skip block out of startTime and endTime
1053 while True:
1053 while True:
1054 if not(self.__setNewBlock()):
1054 if not(self.__setNewBlock()):
1055 raise schainpy
1055 raise(schainpy.admin.SchainWarning('No more files'))
1056 return 0
1056 return 0
1057
1057
1058 if not(self.readBlock()):
1058 if not(self.readBlock()):
@@ -1320,11 +1320,11 class JRODataReader(JRODataIO):
1320 if fullpath:
1320 if fullpath:
1321 break
1321 break
1322
1322
1323 print('[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (delay, path, nTries + 1))
1323 print('[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (self.delay, path, nTries + 1))
1324 sleep(delay)
1324 sleep(self.delay)
1325
1325
1326 if not(fullpath):
1326 if not(fullpath):
1327 raise schainpy.admin.SchainWarning('There isn\'t any valid file in {}'.format(path))
1327 raise(schainpy.admin.SchainWarning('There isn\'t any valid file in {}'.format(path)))
1328 return
1328 return
1329
1329
1330 self.year = year
1330 self.year = year
@@ -6,10 +6,11 Created on Jul 2, 2014
6 import numpy
6 import numpy
7
7
8 from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
8 from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
9 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
9 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
10 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
10 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
11 from schainpy.model.data.jrodata import Spectra
11 from schainpy.model.data.jrodata import Spectra
12
12
13 @MPDecorator
13 class SpectraReader(JRODataReader, ProcessingUnit):
14 class SpectraReader(JRODataReader, ProcessingUnit):
14 """
15 """
15 Esta clase permite leer datos de espectros desde archivos procesados (.pdata). La lectura
16 Esta clase permite leer datos de espectros desde archivos procesados (.pdata). La lectura
@@ -69,7 +70,7 class SpectraReader(JRODataReader, ProcessingUnit):
69
70
70 rdPairList = []
71 rdPairList = []
71
72
72 def __init__(self, **kwargs):
73 def __init__(self):#, **kwargs):
73 """
74 """
74 Inicializador de la clase SpectraReader para la lectura de datos de espectros.
75 Inicializador de la clase SpectraReader para la lectura de datos de espectros.
75
76
@@ -88,7 +89,7 class SpectraReader(JRODataReader, ProcessingUnit):
88 """
89 """
89
90
90 #Eliminar de la base la herencia
91 #Eliminar de la base la herencia
91 ProcessingUnit.__init__(self, **kwargs)
92 ProcessingUnit.__init__(self)#, **kwargs)
92
93
93 # self.isConfig = False
94 # self.isConfig = False
94
95
@@ -7,7 +7,7 Created on Jul 2, 2014
7 import numpy
7 import numpy
8
8
9 from .jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
9 from .jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
11 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
12 from schainpy.model.data.jrodata import Voltage
12 from schainpy.model.data.jrodata import Voltage
13 import zmq
13 import zmq
@@ -15,7 +15,7 import tempfile
15 from io import StringIO
15 from io import StringIO
16 # from _sha import blocksize
16 # from _sha import blocksize
17
17
18
18 @MPDecorator
19 class VoltageReader(JRODataReader, ProcessingUnit):
19 class VoltageReader(JRODataReader, ProcessingUnit):
20 """
20 """
21 Esta clase permite leer datos de voltage desde archivos en formato rawdata (.r). La lectura
21 Esta clase permite leer datos de voltage desde archivos en formato rawdata (.r). La lectura
@@ -62,7 +62,7 class VoltageReader(JRODataReader, ProcessingUnit):
62 optchar = "D"
62 optchar = "D"
63 dataOut = None
63 dataOut = None
64
64
65 def __init__(self, **kwargs):
65 def __init__(self):#, **kwargs):
66 """
66 """
67 Inicializador de la clase VoltageReader para la lectura de datos de voltage.
67 Inicializador de la clase VoltageReader para la lectura de datos de voltage.
68
68
@@ -81,7 +81,7 class VoltageReader(JRODataReader, ProcessingUnit):
81 None
81 None
82 """
82 """
83
83
84 ProcessingUnit.__init__(self, **kwargs)
84 ProcessingUnit.__init__(self)#, **kwargs)
85
85
86 self.isConfig = False
86 self.isConfig = False
87
87
@@ -762,3 +762,4 class VoltageWriter(JRODataWriter, Operation):
762 self.processingHeaderObj.processFlags = self.getProcessFlags()
762 self.processingHeaderObj.processFlags = self.getProcessFlags()
763
763
764 self.setBasicHeader()
764 self.setBasicHeader()
765 No newline at end of file
This diff has been collapsed as it changes many lines, (550 lines changed) Show them Hide them
@@ -1,41 +1,39
1 '''
1 '''
2
2 Updated for multiprocessing
3 Author : Sergio Cortez
4 Jan 2018
5 Abstract:
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9
10 Based on:
3 $Author: murco $
11 $Author: murco $
4 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
5 '''
13 '''
14 from platform import python_version
6 import inspect
15 import inspect
7 from fuzzywuzzy import process
16 import zmq
8
17 import time
9 def checkKwargs(method, kwargs):
18 import pickle
10 currentKwargs = kwargs
19 import os
11 choices = inspect.getargspec(method).args
20 from multiprocessing import Process
12 try:
13 choices.remove('self')
14 except Exception as e:
15 pass
16
21
17 try:
22 from schainpy.utils import log
18 choices.remove('dataOut')
19 except Exception as e:
20 pass
21
23
22 for kwarg in kwargs:
23 fuzz = process.extractOne(kwarg, choices)
24 if fuzz is None:
25 continue
26 if fuzz[1] < 100:
27 raise Exception('\x1b[0;32;40mDid you mean {} instead of {} in {}? \x1b[0m'.
28 format(fuzz[0], kwarg, method.__self__.__class__.__name__))
29
24
30 class ProcessingUnit(object):
25 class ProcessingUnit(object):
31
26
32 """
27 """
33 Esta es la clase base para el procesamiento de datos.
28 Update - Jan 2018 - MULTIPROCESSING
29 All the "call" methods present in the previous base were removed.
30 The majority of operations are independant processes, thus
31 the decorator is in charge of communicate the operation processes
32 with the proccessing unit via IPC.
33
34 The constructor does not receive any argument. The remaining methods
35 are related with the operations to execute.
34
36
35 Contiene el metodo "call" para llamar operaciones. Las operaciones pueden ser:
36 - Metodos internos (callMethod)
37 - Objetos del tipo Operation (callObject). Antes de ser llamados, estos objetos
38 tienen que ser agreagados con el metodo "add".
39
37
40 """
38 """
41 # objeto de datos de entrada (Voltage, Spectra o Correlation)
39 # objeto de datos de entrada (Voltage, Spectra o Correlation)
@@ -43,33 +41,25 class ProcessingUnit(object):
43 dataInList = []
41 dataInList = []
44
42
45 # objeto de datos de entrada (Voltage, Spectra o Correlation)
43 # objeto de datos de entrada (Voltage, Spectra o Correlation)
44
45 id = None
46 inputId = None
47
46 dataOut = None
48 dataOut = None
47
49
50 dictProcs = None
51
48 operations2RunDict = None
52 operations2RunDict = None
49
53
50 isConfig = False
54 isConfig = False
51
55
52
56 def __init__(self):
53 def __init__(self, *args, **kwargs):
54
57
55 self.dataIn = None
58 self.dataIn = None
56 self.dataInList = []
57
58 self.dataOut = None
59 self.dataOut = None
59
60
60 self.operations2RunDict = {}
61 self.operationKwargs = {}
62
63 self.isConfig = False
61 self.isConfig = False
64
62
65 self.args = args
66 self.kwargs = kwargs
67
68 if not hasattr(self, 'name'):
69 self.name = self.__class__.__name__
70
71 checkKwargs(self.run, kwargs)
72
73 def getAllowedArgs(self):
63 def getAllowedArgs(self):
74 if hasattr(self, '__attrs__'):
64 if hasattr(self, '__attrs__'):
75 return self.__attrs__
65 return self.__attrs__
@@ -82,10 +72,12 class ProcessingUnit(object):
82
72
83 self.operationKwargs[objId] = kwargs
73 self.operationKwargs[objId] = kwargs
84
74
85
86 def addOperation(self, opObj, objId):
75 def addOperation(self, opObj, objId):
87
76
88 """
77 """
78 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
79 posses the id of the operation process (IPC purposes)
80
89 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
81 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
90 identificador asociado a este objeto.
82 identificador asociado a este objeto.
91
83
@@ -95,13 +87,14 class ProcessingUnit(object):
95
87
96 Return:
88 Return:
97
89
98 objId : identificador del objeto, necesario para ejecutar la operacion
90 objId : identificador del objeto, necesario para comunicar con master(procUnit)
99 """
91 """
100
92
101 self.operations2RunDict[objId] = opObj
93 self.operations2RunDict[objId] = opObj
102
94
103 return objId
95 return objId
104
96
97
105 def getOperationObj(self, objId):
98 def getOperationObj(self, objId):
106
99
107 if objId not in list(self.operations2RunDict.keys()):
100 if objId not in list(self.operations2RunDict.keys()):
@@ -122,239 +115,424 class ProcessingUnit(object):
122
115
123 raise NotImplementedError
116 raise NotImplementedError
124
117
125 def callMethod(self, name, opId):
118 def setup(self):
126
119
127 """
120 raise NotImplementedError
128 Ejecuta el metodo con el nombre "name" y con argumentos **kwargs de la propia clase.
129
121
130 Input:
122 def run(self):
131 name : nombre del metodo a ejecutar
123
124 raise NotImplementedError
125
126 def close(self):
127 #Close every thread, queue or any other object here is it is neccesary.
128 return
132
129
133 **kwargs : diccionario con los nombres y valores de la funcion a ejecutar.
130 class Operation(object):
134
131
135 """
132 """
133 Update - Jan 2018 - MULTIPROCESSING
136
134
137 #Checking the inputs
135 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
138 if name == 'run':
136 The constructor doe snot receive any argument, neither the baseclass.
139
137
140 if not self.checkInputs():
141 self.dataOut.flagNoData = True
142 return False
143 else:
144 #Si no es un metodo RUN la entrada es la misma dataOut (interna)
145 if self.dataOut is not None and self.dataOut.isEmpty():
146 return False
147
138
148 #Getting the pointer to method
139 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
149 methodToCall = getattr(self, name)
140 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
141 acumulacion dentro de esta clase
150
142
151 #Executing the self method
143 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
152
144
153 if hasattr(self, 'mp'):
145 """
154 if name=='run':
146 id = None
155 if self.mp is False:
147 __buffer = None
156 self.mp = True
148 dest = None
157 self.start()
149 isConfig = False
158 else:
150 readyFlag = None
159 self.operationKwargs[opId]['parent'] = self.kwargs
151
160 methodToCall(**self.operationKwargs[opId])
152 def __init__(self):
161 else:
153
162 if name=='run':
154 self.buffer = None
163 methodToCall(**self.kwargs)
155 self.dest = None
156 self.isConfig = False
157 self.readyFlag = False
158
159 if not hasattr(self, 'name'):
160 self.name = self.__class__.__name__
161
162 def getAllowedArgs(self):
163 if hasattr(self, '__attrs__'):
164 return self.__attrs__
164 else:
165 else:
165 methodToCall(**self.operationKwargs[opId])
166 return inspect.getargspec(self.run).args
166
167
167 if self.dataOut is None:
168 def setup(self):
168 return False
169
169
170 if self.dataOut.isEmpty():
170 self.isConfig = True
171 return False
171
172 raise NotImplementedError
172
173
173 return True
174
174
175 def callObject(self, objId):
175 def run(self, dataIn, **kwargs):
176
176
177 """
177 """
178 Ejecuta la operacion asociada al identificador del objeto "objId"
178 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
179 atributos del objeto dataIn.
179
180
180 Input:
181 Input:
181
182
182 objId : identificador del objeto a ejecutar
183 dataIn : objeto del tipo JROData
183
184 **kwargs : diccionario con los nombres y valores de la funcion a ejecutar.
185
184
186 Return:
185 Return:
187
186
188 None
187 None
188
189 Affected:
190 __buffer : buffer de recepcion de datos.
191
189 """
192 """
193 if not self.isConfig:
194 self.setup(**kwargs)
190
195
191 if self.dataOut is not None and self.dataOut.isEmpty():
196 raise NotImplementedError
192 return False
193
197
194 externalProcObj = self.operations2RunDict[objId]
198 def close(self):
195
199
196 if hasattr(externalProcObj, 'mp'):
200 pass
197 if externalProcObj.mp is False:
198 externalProcObj.kwargs['parent'] = self.kwargs
199 self.operationKwargs[objId] = externalProcObj.kwargs
200 externalProcObj.mp = True
201 externalProcObj.start()
202 else:
203 externalProcObj.run(self.dataOut, **externalProcObj.kwargs)
204 self.operationKwargs[objId] = externalProcObj.kwargs
205
201
206
202
207 return True
203 ######### Decorator #########
208
204
209 def call(self, opType, opName=None, opId=None):
210 """
211 Return True si ejecuta la operacion interna nombrada "opName" o la operacion externa
212 identificada con el id "opId"; con los argumentos "**kwargs".
213
205
214 False si la operacion no se ha ejecutado.
206 def MPDecorator(BaseClass):
215
207
216 Input:
208 """
209 "Multiprocessing class decorator"
217
210
218 opType : Puede ser "self" o "external"
211 This function add multiprocessing features to the base class. Also,
212 it handle the communication beetween processes (readers, procUnits and operations).
213 Receive the arguments at the moment of instantiation. According to that, discriminates if it
214 is a procUnit or an operation
215 """
219
216
220 Depende del tipo de operacion para llamar a:callMethod or callObject:
217 class MPClass(BaseClass, Process):
221
218
222 1. If opType = "self": Llama a un metodo propio de esta clase:
219 "This is the overwritten class"
220 operations2RunDict = None
221 socket_l = None
222 socket_p = None
223 socketOP = None
224 socket_router = None
225 dictProcs = None
226 typeProc = None
227 def __init__(self, *args, **kwargs):
228 super(MPClass, self).__init__()
229 Process.__init__(self)
223
230
224 name_method = getattr(self, name)
225 name_method(**kwargs)
226
231
232 self.operationKwargs = {}
233 self.args = args
227
234
228 2. If opType = "other" o"external": Llama al metodo "run()" de una instancia de la
229 clase "Operation" o de un derivado de ella:
230
235
231 instanceName = self.operationList[opId]
236 self.operations2RunDict = {}
232 instanceName.run(**kwargs)
237 self.kwargs = kwargs
233
238
234 opName : Si la operacion es interna (opType = 'self'), entonces el "opName" sera
239 # The number of arguments (args) determine the type of process
235 usada para llamar a un metodo interno de la clase Processing
236
240
237 opId : Si la operacion es externa (opType = 'other' o 'external), entonces el
241 if len(self.args) is 3:
238 "opId" sera usada para llamar al metodo "run" de la clase Operation
242 self.typeProc = "ProcUnit"
239 registrada anteriormente con ese Id
243 self.id = args[0] #topico de publicacion
244 self.inputId = args[1] #topico de subcripcion
245 self.dictProcs = args[2] #diccionario de procesos globales
246 else:
247 self.id = args[0]
248 self.typeProc = "Operation"
240
249
241 Exception:
250 def addOperationKwargs(self, objId, **kwargs):
242 Este objeto de tipo Operation debe de haber sido agregado antes con el metodo:
243 "addOperation" e identificado con el valor "opId" = el id de la operacion.
244 De lo contrario retornara un error del tipo ValueError
245
251
246 """
252 self.operationKwargs[objId] = kwargs
247
253
248 if opType == 'self':
254 def getAllowedArgs(self):
249
255
250 if not opName:
256 if hasattr(self, '__attrs__'):
251 raise ValueError("opName parameter should be defined")
257 return self.__attrs__
258 else:
259 return inspect.getargspec(self.run).args
252
260
253 sts = self.callMethod(opName, opId)
254
261
255 elif opType == 'other' or opType == 'external' or opType == 'plotter':
262 def sockListening(self, topic):
256
263
257 if not opId:
264 """
258 raise ValueError("opId parameter should be defined")
265 This function create a socket to receive objects.
266 The 'topic' argument is related to the publisher process from which the self process is
267 listening (data).
268 In the case were the self process is listening to a Reader (proc Unit),
269 special conditions are introduced to maximize parallelism.
270 """
259
271
260 if opId not in list(self.operations2RunDict.keys()):
272 cont = zmq.Context()
261 raise ValueError("Any operation with id=%s has been added" %str(opId))
273 zmq_socket = cont.socket(zmq.SUB)
274 if not os.path.exists('/tmp/socketTmp'):
275 os.mkdir('/tmp/socketTmp')
262
276
263 sts = self.callObject(opId)
277 if 'Reader' in self.dictProcs[self.inputId].name:
278 zmq_socket.connect('ipc:///tmp/socketTmp/b')
264
279
265 else:
280 else:
266 raise ValueError("opType should be 'self', 'external' or 'plotter'; and not '%s'" %opType)
281 zmq_socket.connect('ipc:///tmp/socketTmp/%s' % self.inputId)
267
282
268 return sts
283 #log.error('RECEIVING FROM {} {}'.format(self.inputId, str(topic).encode()))
284 zmq_socket.setsockopt(zmq.SUBSCRIBE, str(topic).encode()) #yong
269
285
270 def setInput(self, dataIn):
286 return zmq_socket
271
287
272 self.dataIn = dataIn
273 self.dataInList.append(dataIn)
274
288
275 def getOutputObj(self):
289 def listenProc(self, sock):
276
290
277 return self.dataOut
291 """
292 This function listen to a ipc addres until a message is recovered. To serialize the
293 data (object), pickle has been use.
294 The 'sock' argument is the socket previously connect to an ipc address and with a topic subscription.
295 """
278
296
279 def checkInputs(self):
297 a = sock.recv_multipart()
298 a = pickle.loads(a[1])
299 return a
280
300
281 for thisDataIn in self.dataInList:
301 def sockPublishing(self):
302
303 """
304 This function create a socket for publishing purposes.
305 Depending on the process type from where is created, it binds or connect
306 to special IPC addresses.
307 """
308 time.sleep(4) #yong
309 context = zmq.Context()
310 zmq_socket = context.socket(zmq.PUB)
311 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
312 if 'Reader' in self.dictProcs[self.id].name:
313 zmq_socket.connect('ipc:///tmp/socketTmp/a')
314 else:
315 zmq_socket.bind('ipc:///tmp/socketTmp/%s' % self.id)
316
317 return zmq_socket
318
319 def publishProc(self, sock, data):
320
321 """
322 This function publish a python object (data) under a specific topic in a socket (sock).
323 Usually, the topic is the self id of the process.
324 """
282
325
283 if thisDataIn.isEmpty():
326 sock.send_multipart([str(self.id).encode(), pickle.dumps(data)]) #yong
284 return False
285
327
286 return True
328 return True
287
329
288 def setup(self):
330 def sockOp(self):
289
331
290 raise NotImplementedError
332 """
333 This function create a socket for communication purposes with operation processes.
334 """
291
335
292 def run(self):
336 cont = zmq.Context()
337 zmq_socket = cont.socket(zmq.DEALER)
293
338
294 raise NotImplementedError
339 if python_version()[0] == '2':
340 zmq_socket.setsockopt(zmq.IDENTITY, self.id)
341 if python_version()[0] == '3':
342 zmq_socket.setsockopt_string(zmq.IDENTITY, self.id)
295
343
296 def close(self):
297 #Close every thread, queue or any other object here is it is neccesary.
298 return
299
344
300 class Operation(object):
345 return zmq_socket
301
346
302 """
303 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
304 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
305 acumulacion dentro de esta clase
306
347
307 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
348 def execOp(self, socket, opId, dataObj):
308
349
309 """
350 """
351 This function 'execute' an operation main routine by establishing a
352 connection with it and sending a python object (dataOut).
353 """
354 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
355 socket.connect('ipc:///tmp/socketTmp/%s' %opId)
310
356
311 __buffer = None
312 isConfig = False
313
357
314 def __init__(self, **kwargs):
358 socket.send(pickle.dumps(dataObj)) #yong
315
359
316 self.__buffer = None
360 argument = socket.recv_multipart()[0]
317 self.isConfig = False
318 self.kwargs = kwargs
319 if not hasattr(self, 'name'):
320 self.name = self.__class__.__name__
321 checkKwargs(self.run, kwargs)
322
361
323 def getAllowedArgs(self):
362 argument = pickle.loads(argument)
324 if hasattr(self, '__attrs__'):
325 return self.__attrs__
326 else:
327 return inspect.getargspec(self.run).args
328
363
329 def setup(self):
364 return argument
330
365
331 self.isConfig = True
366 def sockIO(self):
332
367
333 raise NotImplementedError
368 """
369 Socket defined for an operation process. It is able to recover the object sent from another process as well as a
370 identifier of who sent it.
371 """
334
372
335 def run(self, dataIn, **kwargs):
373 cont = zmq.Context()
374 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
375 socket = cont.socket(zmq.ROUTER)
376 socket.bind('ipc:///tmp/socketTmp/%s' % self.id)
377
378 return socket
379
380 def funIOrec(self, socket):
336
381
337 """
382 """
338 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
383 Operation method, recover the id of the process who sent a python object.
339 atributos del objeto dataIn.
384 The 'socket' argument is the socket binded to a specific process ipc.
385 """
340
386
341 Input:
387 #id_proc = socket.recv()
342
388
343 dataIn : objeto del tipo JROData
389 #dataObj = socket.recv_pyobj()
344
390
345 Return:
391 dataObj = socket.recv_multipart()
346
392
347 None
393 dataObj[1] = pickle.loads(dataObj[1])
394 return dataObj[0], dataObj[1]
348
395
349 Affected:
396 def funIOsen(self, socket, data, dest):
350 __buffer : buffer de recepcion de datos.
351
397
352 """
398 """
353 if not self.isConfig:
399 Operation method, send a python object to a specific destination.
354 self.setup(**kwargs)
400 The 'dest' argument is the id of a proccesinf unit.
401 """
355
402
356 raise NotImplementedError
403 socket.send_multipart([dest, pickle.dumps(data)]) #yong
357
404
358 def close(self):
405 return True
359
406
360 pass No newline at end of file
407
408 def runReader(self):
409
410 # time.sleep(3)
411 while True:
412
413 BaseClass.run(self, **self.kwargs)
414
415
416 keyList = list(self.operations2RunDict.keys())
417 keyList.sort()
418
419 for key in keyList:
420 self.socketOP = self.sockOp()
421 self.dataOut = self.execOp(self.socketOP, key, self.dataOut)
422
423
424 if self.flagNoMoreFiles: #Usar un objeto con flags para saber si termino el proc o hubo un error
425 self.publishProc(self.socket_p, "Finish")
426 break
427
428 if self.dataOut.flagNoData:
429 continue
430
431 #print("Publishing data...")
432 self.publishProc(self.socket_p, self.dataOut)
433 # time.sleep(2)
434
435
436 print("%s done" %BaseClass.__name__)
437 return 0
438
439 def runProc(self):
440
441 # All the procUnits with kwargs that require a setup initialization must be defined here.
442
443 if self.setupReq:
444 BaseClass.setup(self, **self.kwargs)
445
446 while True:
447 self.dataIn = self.listenProc(self.socket_l)
448 #print("%s received data" %BaseClass.__name__)
449
450 if self.dataIn == "Finish":
451 break
452
453 m_arg = list(self.kwargs.keys())
454 num_arg = list(range(1,int(BaseClass.run.__code__.co_argcount)))
455
456 run_arg = {}
457
458 for var in num_arg:
459 if BaseClass.run.__code__.co_varnames[var] in m_arg:
460 run_arg[BaseClass.run.__code__.co_varnames[var]] = self.kwargs[BaseClass.run.__code__.co_varnames[var]]
461
462 #BaseClass.run(self, **self.kwargs)
463 BaseClass.run(self, **run_arg)
464
465 ## Iterar sobre una serie de data que podrias aplicarse
466
467 for m_name in BaseClass.METHODS:
468
469 met_arg = {}
470
471 for arg in m_arg:
472 if arg in BaseClass.METHODS[m_name]:
473 for att in BaseClass.METHODS[m_name]:
474 met_arg[att] = self.kwargs[att]
475
476 method = getattr(BaseClass, m_name)
477 method(self, **met_arg)
478 break
479
480 if self.dataOut.flagNoData:
481 continue
482
483 keyList = list(self.operations2RunDict.keys())
484 keyList.sort()
485
486 for key in keyList:
487
488 self.socketOP = self.sockOp()
489 self.dataOut = self.execOp(self.socketOP, key, self.dataOut)
490
491
492 self.publishProc(self.socket_p, self.dataOut)
493
494
495 print("%s done" %BaseClass.__name__)
496
497 return 0
498
499 def runOp(self):
500
501 while True:
502
503 [self.dest ,self.buffer] = self.funIOrec(self.socket_router)
504
505 self.buffer = BaseClass.run(self, self.buffer, **self.kwargs)
506
507 self.funIOsen(self.socket_router, self.buffer, self.dest)
508
509 print("%s done" %BaseClass.__name__)
510 return 0
511
512
513 def run(self):
514
515 if self.typeProc is "ProcUnit":
516
517 self.socket_p = self.sockPublishing()
518
519 if 'Reader' not in self.dictProcs[self.id].name:
520 self.socket_l = self.sockListening(self.inputId)
521 self.runProc()
522
523 else:
524
525 self.runReader()
526
527 elif self.typeProc is "Operation":
528
529 self.socket_router = self.sockIO()
530
531 self.runOp()
532
533 else:
534 raise ValueError("Unknown type")
535
536 return 0
537
538 return MPClass No newline at end of file
@@ -2,16 +2,24 import itertools
2
2
3 import numpy
3 import numpy
4
4
5 from .jroproc_base import ProcessingUnit, Operation
5 from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator, Operation
6 from schainpy.model.data.jrodata import Spectra
6 from schainpy.model.data.jrodata import Spectra
7 from schainpy.model.data.jrodata import hildebrand_sekhon
7 from schainpy.model.data.jrodata import hildebrand_sekhon
8 from schainpy.utils import log #yong
8 from schainpy.utils import log
9
9
10 @MPDecorator
10 class SpectraProc(ProcessingUnit):
11 class SpectraProc(ProcessingUnit):
11
12
12 def __init__(self, **kwargs):
13 METHODS = {'selectHeights' : ['minHei', 'maxHei'],
14 'selectChannels' : 'channelList',
15 'selectChannelsByIndex': 'channelIndexList',
16 'getBeaconSignal' : ['tauindex', 'channelindex', 'hei_ref'],
17 'selectHeightsByIndex' : ['minIndex', 'maxIndex']
18 }
13
19
14 ProcessingUnit.__init__(self, **kwargs)
20 def __init__(self):#, **kwargs):
21
22 ProcessingUnit.__init__(self)#, **kwargs)
15
23
16 self.buffer = None
24 self.buffer = None
17 self.firstdatatime = None
25 self.firstdatatime = None
@@ -19,6 +27,7 class SpectraProc(ProcessingUnit):
19 self.dataOut = Spectra()
27 self.dataOut = Spectra()
20 self.id_min = None
28 self.id_min = None
21 self.id_max = None
29 self.id_max = None
30 self.setupReq = False #Agregar a todas las unidades de proc
22
31
23 def __updateSpecFromVoltage(self):
32 def __updateSpecFromVoltage(self):
24
33
@@ -774,7 +783,7 class SpectraProc(ProcessingUnit):
774
783
775 return 1
784 return 1
776
785
777
786 @MPDecorator
778 class IncohInt(Operation):
787 class IncohInt(Operation):
779
788
780 __profIndex = 0
789 __profIndex = 0
@@ -795,9 +804,11 class IncohInt(Operation):
795
804
796 n = None
805 n = None
797
806
798 def __init__(self, **kwargs):
807 def __init__(self):#, **kwargs):
808
809 Operation.__init__(self)#, **kwargs)
810
799
811
800 Operation.__init__(self, **kwargs)
801 # self.isConfig = False
812 # self.isConfig = False
802
813
803 def setup(self, n=None, timeInterval=None, overlapping=False):
814 def setup(self, n=None, timeInterval=None, overlapping=False):
@@ -951,3 +962,5 class IncohInt(Operation):
951 dataOut.nIncohInt *= self.n
962 dataOut.nIncohInt *= self.n
952 dataOut.utctime = avgdatatime
963 dataOut.utctime = avgdatatime
953 dataOut.flagNoData = False
964 dataOut.flagNoData = False
965
966 return dataOut No newline at end of file
@@ -3,24 +3,28 import numpy
3 from scipy import interpolate
3 from scipy import interpolate
4 #TODO
4 #TODO
5 #from schainpy import cSchain
5 #from schainpy import cSchain
6 from .jroproc_base import ProcessingUnit, Operation
6 from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator, Operation
7 from schainpy.model.data.jrodata import Voltage
7 from schainpy.model.data.jrodata import Voltage
8 from time import time
9 from schainpy.utils import log
8 from schainpy.utils import log
9 from time import time
10
10
11
11
12 @MPDecorator
12 class VoltageProc(ProcessingUnit):
13 class VoltageProc(ProcessingUnit):
13
14
15 METHODS = {} #yong
14
16
15 def __init__(self, **kwargs):
17 def __init__(self):#, **kwargs): #yong
16
18
17 ProcessingUnit.__init__(self, **kwargs)
19 ProcessingUnit.__init__(self)#, **kwargs)
18
20
19 # self.objectDict = {}
21 # self.objectDict = {}
20 self.dataOut = Voltage()
22 self.dataOut = Voltage()
21 self.flip = 1
23 self.flip = 1
24 self.setupReq = False #yong
22
25
23 def run(self):
26 def run(self):
27
24 if self.dataIn.type == 'AMISR':
28 if self.dataIn.type == 'AMISR':
25 self.__updateObjFromAmisrInput()
29 self.__updateObjFromAmisrInput()
26
30
@@ -317,7 +321,7 class VoltageProc(ProcessingUnit):
317 self.dataOut.data[:,:,botLim:topLim+1] = ynew
321 self.dataOut.data[:,:,botLim:topLim+1] = ynew
318
322
319 # import collections
323 # import collections
320
324 @MPDecorator
321 class CohInt(Operation):
325 class CohInt(Operation):
322
326
323 isConfig = False
327 isConfig = False
@@ -333,9 +337,9 class CohInt(Operation):
333 __dataToPutStride = False
337 __dataToPutStride = False
334 n = None
338 n = None
335
339
336 def __init__(self, **kwargs):
340 def __init__(self):#, **kwargs):
337
341
338 Operation.__init__(self, **kwargs)
342 Operation.__init__(self)#, **kwargs)
339
343
340 # self.isConfig = False
344 # self.isConfig = False
341
345
@@ -549,6 +553,7 class CohInt(Operation):
549 return avgdata, avgdatatime
553 return avgdata, avgdatatime
550
554
551 def run(self, dataOut, n=None, timeInterval=None, stride=None, overlapping=False, byblock=False, **kwargs):
555 def run(self, dataOut, n=None, timeInterval=None, stride=None, overlapping=False, byblock=False, **kwargs):
556
552 if not self.isConfig:
557 if not self.isConfig:
553 self.setup(n=n, stride=stride, timeInterval=timeInterval, overlapping=overlapping, byblock=byblock, **kwargs)
558 self.setup(n=n, stride=stride, timeInterval=timeInterval, overlapping=overlapping, byblock=byblock, **kwargs)
554 self.isConfig = True
559 self.isConfig = True
@@ -577,7 +582,8 class CohInt(Operation):
577 # raise
582 # raise
578 # dataOut.timeInterval = dataOut.ippSeconds * dataOut.nCohInt
583 # dataOut.timeInterval = dataOut.ippSeconds * dataOut.nCohInt
579 dataOut.flagNoData = False
584 dataOut.flagNoData = False
580
585 return dataOut
586 @MPDecorator
581 class Decoder(Operation):
587 class Decoder(Operation):
582
588
583 isConfig = False
589 isConfig = False
@@ -588,15 +594,15 class Decoder(Operation):
588 nCode = None
594 nCode = None
589 nBaud = None
595 nBaud = None
590
596
591 def __init__(self, **kwargs):
597 def __init__(self):#, **kwargs):
592
598
593 Operation.__init__(self, **kwargs)
599 Operation.__init__(self)#, **kwargs)
594
600
595 self.times = None
601 self.times = None
596 self.osamp = None
602 self.osamp = None
597 # self.__setValues = False
603 # self.__setValues = False
598 self.isConfig = False
604 # self.isConfig = False
599
605 self.setupReq = False
600 def setup(self, code, osamp, dataOut):
606 def setup(self, code, osamp, dataOut):
601
607
602 self.__profIndex = 0
608 self.__profIndex = 0
@@ -763,22 +769,22 class Decoder(Operation):
763
769
764 if self.__profIndex == self.nCode-1:
770 if self.__profIndex == self.nCode-1:
765 self.__profIndex = 0
771 self.__profIndex = 0
766 return 1
772 return dataOut
767
773
768 self.__profIndex += 1
774 self.__profIndex += 1
769
775
770 return 1
776 return dataOut
771 # dataOut.flagDeflipData = True #asumo q la data no esta sin flip
777 # dataOut.flagDeflipData = True #asumo q la data no esta sin flip
772
778
773
779 @MPDecorator
774 class ProfileConcat(Operation):
780 class ProfileConcat(Operation):
775
781
776 isConfig = False
782 isConfig = False
777 buffer = None
783 buffer = None
778
784
779 def __init__(self, **kwargs):
785 def __init__(self):#, **kwargs):
780
786
781 Operation.__init__(self, **kwargs)
787 Operation.__init__(self)#, **kwargs)
782 self.profileIndex = 0
788 self.profileIndex = 0
783
789
784 def reset(self):
790 def reset(self):
@@ -820,16 +826,17 class ProfileConcat(Operation):
820 xf = dataOut.heightList[0] + dataOut.nHeights * deltaHeight * m
826 xf = dataOut.heightList[0] + dataOut.nHeights * deltaHeight * m
821 dataOut.heightList = numpy.arange(dataOut.heightList[0], xf, deltaHeight)
827 dataOut.heightList = numpy.arange(dataOut.heightList[0], xf, deltaHeight)
822 dataOut.ippSeconds *= m
828 dataOut.ippSeconds *= m
823
829 return dataOut
830 @MPDecorator
824 class ProfileSelector(Operation):
831 class ProfileSelector(Operation):
825
832
826 profileIndex = None
833 profileIndex = None
827 # Tamanho total de los perfiles
834 # Tamanho total de los perfiles
828 nProfiles = None
835 nProfiles = None
829
836
830 def __init__(self, **kwargs):
837 def __init__(self):#, **kwargs):
831
838
832 Operation.__init__(self, **kwargs)
839 Operation.__init__(self)#, **kwargs)
833 self.profileIndex = 0
840 self.profileIndex = 0
834
841
835 def incProfileIndex(self):
842 def incProfileIndex(self):
@@ -979,13 +986,14 class ProfileSelector(Operation):
979
986
980 raise ValueError("ProfileSelector needs profileList, profileRangeList or rangeList parameter")
987 raise ValueError("ProfileSelector needs profileList, profileRangeList or rangeList parameter")
981
988
982 return False
989 #return False
983
990 return dataOut
991 @MPDecorator
984 class Reshaper(Operation):
992 class Reshaper(Operation):
985
993
986 def __init__(self, **kwargs):
994 def __init__(self):#, **kwargs):
987
995
988 Operation.__init__(self, **kwargs)
996 Operation.__init__(self)#, **kwargs)
989
997
990 self.__buffer = None
998 self.__buffer = None
991 self.__nitems = 0
999 self.__nitems = 0
@@ -1084,11 +1092,13 class Reshaper(Operation):
1084
1092
1085 dataOut.ippSeconds /= self.__nTxs
1093 dataOut.ippSeconds /= self.__nTxs
1086
1094
1095 return dataOut
1096 @MPDecorator
1087 class SplitProfiles(Operation):
1097 class SplitProfiles(Operation):
1088
1098
1089 def __init__(self, **kwargs):
1099 def __init__(self):#, **kwargs):
1090
1100
1091 Operation.__init__(self, **kwargs)
1101 Operation.__init__(self)#, **kwargs)
1092
1102
1093 def run(self, dataOut, n):
1103 def run(self, dataOut, n):
1094
1104
@@ -1104,6 +1114,7 class SplitProfiles(Operation):
1104 raise ValueError("Could not split the data, n=%d has to be multiple of %d" %(n, shape[2]))
1114 raise ValueError("Could not split the data, n=%d has to be multiple of %d" %(n, shape[2]))
1105
1115
1106 new_shape = shape[0], shape[1]*n, int(shape[2]/n)
1116 new_shape = shape[0], shape[1]*n, int(shape[2]/n)
1117
1107 dataOut.data = numpy.reshape(dataOut.data, new_shape)
1118 dataOut.data = numpy.reshape(dataOut.data, new_shape)
1108 dataOut.flagNoData = False
1119 dataOut.flagNoData = False
1109
1120
@@ -1123,10 +1134,12 class SplitProfiles(Operation):
1123
1134
1124 dataOut.ippSeconds /= n
1135 dataOut.ippSeconds /= n
1125
1136
1137 return dataOut
1138 @MPDecorator
1126 class CombineProfiles(Operation):
1139 class CombineProfiles(Operation):
1127 def __init__(self, **kwargs):
1140 def __init__(self):#, **kwargs):
1128
1141
1129 Operation.__init__(self, **kwargs)
1142 Operation.__init__(self)#, **kwargs)
1130
1143
1131 self.__remData = None
1144 self.__remData = None
1132 self.__profileIndex = 0
1145 self.__profileIndex = 0
@@ -1184,6 +1197,7 class CombineProfiles(Operation):
1184
1197
1185 dataOut.ippSeconds *= n
1198 dataOut.ippSeconds *= n
1186
1199
1200 return dataOut
1187 # import collections
1201 # import collections
1188 # from scipy.stats import mode
1202 # from scipy.stats import mode
1189 #
1203 #
General Comments 0
You need to be logged in to leave comments. Login now