##// END OF EJS Templates
Review MP changes, three types of operations: self, other and external
Juan C. Espinoza -
r1177:b013e28003fc
parent child
Show More
@@ -11,7 +11,10 import sys
11 import time
11 import time
12 import traceback
12 import traceback
13 import smtplib
13 import smtplib
14 import configparser
14 if sys.version[0] == '3':
15 from configparser import ConfigParser
16 else:
17 from ConfigParser import ConfigParser
15 import io
18 import io
16 from threading import Thread
19 from threading import Thread
17 from multiprocessing import Process
20 from multiprocessing import Process
@@ -144,7 +147,7 class SchainConfigure():
144 return
147 return
145
148
146 # create Parser using standard module ConfigParser
149 # create Parser using standard module ConfigParser
147 self.__parser = configparser.ConfigParser()
150 self.__parser = ConfigParser()
148
151
149 # read conf file into a StringIO with "[madrigal]\n" section heading prepended
152 # read conf file into a StringIO with "[madrigal]\n" section heading prepended
150 strConfFile = io.StringIO("[schain]\n" + self.__confFile.read())
153 strConfFile = io.StringIO("[schain]\n" + self.__confFile.read())
@@ -12,7 +12,7 import math
12 import time
12 import time
13 import zmq
13 import zmq
14 from multiprocessing import Process, cpu_count
14 from multiprocessing import Process, cpu_count
15
15 from threading import Thread
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 from xml.dom import minidom
17 from xml.dom import minidom
18
18
@@ -90,6 +90,18 def MPProject(project, n=cpu_count()):
90
90
91 time.sleep(3)
91 time.sleep(3)
92
92
93 def wait(context):
94
95 time.sleep(1)
96 c = zmq.Context()
97 receiver = c.socket(zmq.SUB)
98 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
99 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
100 log.error('startinggg')
101 msg = receiver.recv_multipart()[1]
102 #log.error(msg)
103 context.terminate()
104
93 class ParameterConf():
105 class ParameterConf():
94
106
95 id = None
107 id = None
@@ -281,13 +293,6 class ParameterConf():
281
293
282 class OperationConf():
294 class OperationConf():
283
295
284 id = None
285 name = None
286 priority = None
287 type = None
288
289 parmConfObjList = []
290
291 ELEMENTNAME = 'Operation'
296 ELEMENTNAME = 'Operation'
292
297
293 def __init__(self):
298 def __init__(self):
@@ -369,9 +374,10 class OperationConf():
369
374
370 return kwargs
375 return kwargs
371
376
372 def setup(self, id, name, priority, type):
377 def setup(self, id, name, priority, type, project_id):
373
378
374 self.id = str(id)
379 self.id = str(id)
380 self.project_id = project_id
375 self.name = name
381 self.name = name
376 self.type = type
382 self.type = type
377 self.priority = priority
383 self.priority = priority
@@ -459,29 +465,18 class OperationConf():
459 def createObject(self):
465 def createObject(self):
460
466
461 className = eval(self.name)
467 className = eval(self.name)
462 kwargs = self.getKwargs()
468
463
469 if self.type == 'other':
464 opObj = className(self.id, **kwargs)
470 opObj = className()
465
471 elif self.type == 'external':
466 opObj.start()
472 kwargs = self.getKwargs()
467
473 opObj = className(self.id, self.project_id, **kwargs)
468 print(' Operation created')
474 opObj.start()
469
475
470 return opObj
476 return opObj
471
477
472 class ProcUnitConf():
478 class ProcUnitConf():
473
479
474 id = None
475 name = None
476 datatype = None
477 inputId = None
478 parentId = None
479
480 opConfObjList = []
481
482 procUnitObj = None
483 opObjList = []
484
485 ELEMENTNAME = 'ProcUnit'
480 ELEMENTNAME = 'ProcUnit'
486
481
487 def __init__(self):
482 def __init__(self):
@@ -490,9 +485,7 class ProcUnitConf():
490 self.datatype = None
485 self.datatype = None
491 self.name = None
486 self.name = None
492 self.inputId = None
487 self.inputId = None
493
494 self.opConfObjList = []
488 self.opConfObjList = []
495
496 self.procUnitObj = None
489 self.procUnitObj = None
497 self.opObjDict = {}
490 self.opObjDict = {}
498
491
@@ -512,7 +505,7 class ProcUnitConf():
512
505
513 return self.id
506 return self.id
514
507
515 def updateId(self, new_id, parentId=parentId):
508 def updateId(self, new_id):
516 '''
509 '''
517 new_id = int(parentId) * 10 + (int(self.id) % 10)
510 new_id = int(parentId) * 10 + (int(self.id) % 10)
518 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
511 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
@@ -534,6 +527,7 class ProcUnitConf():
534 #self.inputId = str(new_inputId)
527 #self.inputId = str(new_inputId)
535 '''
528 '''
536 n = 1
529 n = 1
530
537 def getInputId(self):
531 def getInputId(self):
538
532
539 return self.inputId
533 return self.inputId
@@ -565,7 +559,7 class ProcUnitConf():
565
559
566 return self.procUnitObj
560 return self.procUnitObj
567
561
568 def setup(self, id, name, datatype, inputId, parentId=None):
562 def setup(self, project_id, id, name, datatype, inputId):
569 '''
563 '''
570 id sera el topico a publicar
564 id sera el topico a publicar
571 inputId sera el topico a subscribirse
565 inputId sera el topico a subscribirse
@@ -587,10 +581,10 class ProcUnitConf():
587 datatype = name.replace('Proc', '')
581 datatype = name.replace('Proc', '')
588
582
589 self.id = str(id)
583 self.id = str(id)
584 self.project_id = project_id
590 self.name = name
585 self.name = name
591 self.datatype = datatype
586 self.datatype = datatype
592 self.inputId = inputId
587 self.inputId = inputId
593 self.parentId = parentId
594 self.opConfObjList = []
588 self.opConfObjList = []
595
589
596 self.addOperation(name='run', optype='self')
590 self.addOperation(name='run', optype='self')
@@ -613,7 +607,7 class ProcUnitConf():
613
607
614 return opObj
608 return opObj
615
609
616 def addOperation(self, name, optype = 'self'):
610 def addOperation(self, name, optype='self'):
617 '''
611 '''
618 Actualizacion - > proceso comunicacion
612 Actualizacion - > proceso comunicacion
619 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
613 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
@@ -623,10 +617,8 class ProcUnitConf():
623
617
624 id = self.__getNewId()
618 id = self.__getNewId()
625 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
619 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
626
627 opConfObj = OperationConf()
620 opConfObj = OperationConf()
628 opConfObj.setup(id, name=name, priority=priority, type=optype)
621 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id)
629
630 self.opConfObjList.append(opConfObj)
622 self.opConfObjList.append(opConfObj)
631
623
632 return opConfObj
624 return opConfObj
@@ -685,62 +677,33 class ProcUnitConf():
685
677
686 return kwargs
678 return kwargs
687
679
688 def createObjects(self, dictUnits):
680 def createObjects(self):
689 '''
681 '''
690 Instancia de unidades de procesamiento.
682 Instancia de unidades de procesamiento.
691
692 '''
683 '''
693 className = eval(self.name)
684 className = eval(self.name)
694 kwargs = self.getKwargs()
685 kwargs = self.getKwargs()
695 procUnitObj = className(self.id, self.inputId, dictUnits, **kwargs) # necesitan saber su id y su entrada por fines de ipc
686 procUnitObj = className(self.id, self.inputId, self.project_id, **kwargs) # necesitan saber su id y su entrada por fines de ipc
696
687 log.success('creating process...', self.name)
697
688
698 for opConfObj in self.opConfObjList:
689 for opConfObj in self.opConfObjList:
699
690
700 if opConfObj.type == 'self' and self.name == 'run':
691 if opConfObj.type == 'self' and opConfObj.name == 'run':
701 continue
692 continue
702 elif opConfObj.type == 'self':
693 elif opConfObj.type == 'self':
703 procUnitObj.addOperationKwargs(
694 opObj = getattr(procUnitObj, opConfObj.name)
704 opConfObj.id, **opConfObj.getKwargs())
695 else:
705 continue
696 opObj = opConfObj.createObject()
706 print("Creating operation process:", opConfObj.name, "for", self.name)
707 opObj = opConfObj.createObject()
708
709
697
710 #self.opObjDict[opConfObj.id] = opObj.name
698 log.success('creating operation: {}, type:{}'.format(
699 opConfObj.name,
700 opConfObj.type), self.name)
711
701
712 procUnitObj.addOperation(opConfObj.name, opConfObj.id)
702 procUnitObj.addOperation(opConfObj, opObj)
713
703
714 procUnitObj.start()
704 procUnitObj.start()
715
716 self.procUnitObj = procUnitObj
705 self.procUnitObj = procUnitObj
717
706
718
719 return procUnitObj
720
721 def run(self):
722
723 is_ok = True
724 """
725 for opConfObj in self.opConfObjList:
726
727 kwargs = {}
728 for parmConfObj in opConfObj.getParameterObjList():
729 if opConfObj.name == 'run' and parmConfObj.name == 'datatype':
730 continue
731
732 kwargs[parmConfObj.name] = parmConfObj.getValue()
733
734 sts = self.procUnitObj.call(opType=opConfObj.type,
735 opName=opConfObj.name,
736 opId=opConfObj.id)
737
738 is_ok = is_ok or sts
739
740 """
741 return is_ok
742
743
744 def close(self):
707 def close(self):
745
708
746 for opConfObj in self.opConfObjList:
709 for opConfObj in self.opConfObjList:
@@ -757,12 +720,6 class ProcUnitConf():
757
720
758 class ReadUnitConf(ProcUnitConf):
721 class ReadUnitConf(ProcUnitConf):
759
722
760 path = None
761 startDate = None
762 endDate = None
763 startTime = None
764 endTime = None
765
766 ELEMENTNAME = 'ReadUnit'
723 ELEMENTNAME = 'ReadUnit'
767
724
768 def __init__(self):
725 def __init__(self):
@@ -771,18 +728,14 class ReadUnitConf(ProcUnitConf):
771 self.datatype = None
728 self.datatype = None
772 self.name = None
729 self.name = None
773 self.inputId = None
730 self.inputId = None
774
775 self.parentId = None
776
777 self.opConfObjList = []
731 self.opConfObjList = []
778 self.opObjList = []
779
732
780 def getElementName(self):
733 def getElementName(self):
781
734
782 return self.ELEMENTNAME
735 return self.ELEMENTNAME
783
736
784 def setup(self, id, name, datatype, path='', startDate='', endDate='',
737 def setup(self, project_id, id, name, datatype, path='', startDate='', endDate='',
785 startTime='', endTime='', parentId=None, server=None, **kwargs):
738 startTime='', endTime='', server=None, **kwargs):
786
739
787
740
788 '''
741 '''
@@ -810,6 +763,7 class ReadUnitConf(ProcUnitConf):
810 name = '{}Reader'.format(name)
763 name = '{}Reader'.format(name)
811
764
812 self.id = id
765 self.id = id
766 self.project_id = project_id
813 self.name = name
767 self.name = name
814 self.datatype = datatype
768 self.datatype = datatype
815 if path != '':
769 if path != '':
@@ -818,8 +772,6 class ReadUnitConf(ProcUnitConf):
818 self.endDate = endDate
772 self.endDate = endDate
819 self.startTime = startTime
773 self.startTime = startTime
820 self.endTime = endTime
774 self.endTime = endTime
821 self.inputId = '0'
822 self.parentId = parentId
823 self.server = server
775 self.server = server
824 self.addRunOperation(**kwargs)
776 self.addRunOperation(**kwargs)
825
777
@@ -834,13 +786,12 class ReadUnitConf(ProcUnitConf):
834 self.datatype = self.name.replace('Reader', '')
786 self.datatype = self.name.replace('Reader', '')
835
787
836 attrs = ('path', 'startDate', 'endDate',
788 attrs = ('path', 'startDate', 'endDate',
837 'startTime', 'endTime', 'parentId')
789 'startTime', 'endTime')
838
790
839 for attr in attrs:
791 for attr in attrs:
840 if attr in kwargs:
792 if attr in kwargs:
841 setattr(self, attr, kwargs.pop(attr))
793 setattr(self, attr, kwargs.pop(attr))
842
794
843 self.inputId = '0'
844 self.updateRunOperation(**kwargs)
795 self.updateRunOperation(**kwargs)
845
796
846 def removeOperations(self):
797 def removeOperations(self):
@@ -900,14 +851,10 class ReadUnitConf(ProcUnitConf):
900 self.id = upElement.get('id')
851 self.id = upElement.get('id')
901 self.name = upElement.get('name')
852 self.name = upElement.get('name')
902 self.datatype = upElement.get('datatype')
853 self.datatype = upElement.get('datatype')
903 self.inputId = upElement.get('inputId')
904
854
905 if self.ELEMENTNAME == 'ReadUnit':
855 if self.ELEMENTNAME == 'ReadUnit':
906 self.datatype = self.datatype.replace('Reader', '')
856 self.datatype = self.datatype.replace('Reader', '')
907
857
908 if self.inputId == 'None':
909 self.inputId = '0'
910
911 self.opConfObjList = []
858 self.opConfObjList = []
912
859
913 opElementList = upElement.iter(OperationConf().getElementName())
860 opElementList = upElement.iter(OperationConf().getElementName())
@@ -927,20 +874,13 class ReadUnitConf(ProcUnitConf):
927
874
928 class Project(Process):
875 class Project(Process):
929
876
930 id = None
931 description = None
932 filename = None
933
934 procUnitConfObjDict = None
935
936 ELEMENTNAME = 'Project'
877 ELEMENTNAME = 'Project'
937
878
938
939
940 def __init__(self):
879 def __init__(self):
941
880
942 Process.__init__(self)
881 Process.__init__(self)
943 self.id = None
882 self.id = None
883 self.filename = None
944 self.description = None
884 self.description = None
945 self.email = None
885 self.email = None
946 self.alarm = None
886 self.alarm = None
@@ -949,7 +889,6 class Project(Process):
949 def __getNewId(self):
889 def __getNewId(self):
950
890
951 idList = list(self.procUnitConfObjDict.keys())
891 idList = list(self.procUnitConfObjDict.keys())
952
953 id = int(self.id) * 10
892 id = int(self.id) * 10
954
893
955 while True:
894 while True:
@@ -984,13 +923,13 class Project(Process):
984
923
985 procUnitConfObj = self.procUnitConfObjDict[procKey]
924 procUnitConfObj = self.procUnitConfObjDict[procKey]
986 idProcUnit = str(int(self.id) * 10 + n)
925 idProcUnit = str(int(self.id) * 10 + n)
987 procUnitConfObj.updateId(idProcUnit, parentId=self.id)
926 procUnitConfObj.updateId(idProcUnit)
988 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
927 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
989 n += 1
928 n += 1
990
929
991 self.procUnitConfObjDict = newProcUnitConfObjDict
930 self.procUnitConfObjDict = newProcUnitConfObjDict
992
931
993 def setup(self, id, name='', description='', email=None, alarm=[]):
932 def setup(self, id=1, name='', description='', email=None, alarm=[]):
994
933
995 print(' ')
934 print(' ')
996 print('*' * 60)
935 print('*' * 60)
@@ -1031,9 +970,7 class Project(Process):
1031 idReadUnit = str(id)
970 idReadUnit = str(id)
1032
971
1033 readUnitConfObj = ReadUnitConf()
972 readUnitConfObj = ReadUnitConf()
1034 readUnitConfObj.setup(idReadUnit, name, datatype,
973 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, **kwargs)
1035 parentId=self.id, **kwargs)
1036
1037 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
974 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1038
975
1039 return readUnitConfObj
976 return readUnitConfObj
@@ -1051,11 +988,8 class Project(Process):
1051 '''
988 '''
1052
989
1053 idProcUnit = self.__getNewId() #Topico para subscripcion
990 idProcUnit = self.__getNewId() #Topico para subscripcion
1054
1055 procUnitConfObj = ProcUnitConf()
991 procUnitConfObj = ProcUnitConf()
1056 procUnitConfObj.setup(idProcUnit, name, datatype, inputId, #topic_read, topic_write,
992 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId) #topic_read, topic_write,
1057 parentId=self.id)
1058
1059 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
993 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1060
994
1061 return procUnitConfObj
995 return procUnitConfObj
@@ -1176,10 +1110,6 class Project(Process):
1176 for readUnitElement in readUnitElementList:
1110 for readUnitElement in readUnitElementList:
1177 readUnitConfObj = ReadUnitConf()
1111 readUnitConfObj = ReadUnitConf()
1178 readUnitConfObj.readXml(readUnitElement)
1112 readUnitConfObj.readXml(readUnitElement)
1179
1180 if readUnitConfObj.parentId == None:
1181 readUnitConfObj.parentId = self.id
1182
1183 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1113 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1184
1114
1185 procUnitElementList = self.projectElement.iter(
1115 procUnitElementList = self.projectElement.iter(
@@ -1188,33 +1118,25 class Project(Process):
1188 for procUnitElement in procUnitElementList:
1118 for procUnitElement in procUnitElementList:
1189 procUnitConfObj = ProcUnitConf()
1119 procUnitConfObj = ProcUnitConf()
1190 procUnitConfObj.readXml(procUnitElement)
1120 procUnitConfObj.readXml(procUnitElement)
1191
1192 if procUnitConfObj.parentId == None:
1193 procUnitConfObj.parentId = self.id
1194
1195 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1121 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1196
1122
1197 self.filename = abs_file
1123 self.filename = abs_file
1198
1124
1199 return 1
1125 return 1
1200
1126
1201 def printattr(self):
1127 def __str__(self):
1202
1128
1203 print('Project[%s]: name = %s, description = %s' % (self.id,
1129 print('Project[%s]: name = %s, description = %s' % (self.id,
1204 self.name,
1130 self.name,
1205 self.description))
1131 self.description))
1206
1132
1207 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1133 for procUnitConfObj in self.procUnitConfObjDict.values():
1208 procUnitConfObj.printattr()
1134 print(procUnitConfObj)
1209
1135
1210 def createObjects(self):
1136 def createObjects(self):
1211
1137
1212 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1138 for procUnitConfObj in self.procUnitConfObjDict.values():
1213 print("Creating process:", procUnitConfObj.name)
1139 procUnitConfObj.createObjects()
1214 procUnitConfObj.createObjects(self.procUnitConfObjDict)
1215
1216
1217 print('All processes were created')
1218
1140
1219 def __handleError(self, procUnitConfObj, modes=None, stdout=True):
1141 def __handleError(self, procUnitConfObj, modes=None, stdout=True):
1220
1142
@@ -1305,29 +1227,33 class Project(Process):
1305 self.filename = filename
1227 self.filename = filename
1306
1228
1307 def setProxyCom(self):
1229 def setProxyCom(self):
1230
1231 if not os.path.exists('/tmp/schain'):
1232 os.mkdir('/tmp/schain')
1308
1233
1309 ctx = zmq.Context()
1234 self.ctx = zmq.Context()
1310 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
1235 xpub = self.ctx.socket(zmq.XPUB)
1311 xsub = ctx.socket(zmq.XSUB)
1236 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1312 xsub.bind('ipc:///tmp/socketTmp/a')
1237 xsub = self.ctx.socket(zmq.XSUB)
1313 xpub = ctx.socket(zmq.XPUB)
1238 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1314 xpub.bind('ipc:///tmp/socketTmp/b')
1315
1239
1316 print("Controller Ready: Processes and proxy created")
1240 try:
1317 zmq.proxy(xsub, xpub)
1241 zmq.proxy(xpub, xsub)
1318
1242 except zmq.ContextTerminated:
1319
1243 xpub.close()
1244 xsub.close()
1320
1245
1321 def run(self):
1246 def run(self):
1322
1247
1323 log.success('Starting {}'.format(self.name), tag='')
1248 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1324 self.start_time = time.time()
1249 self.start_time = time.time()
1325 self.createObjects()
1250 self.createObjects()
1251 # t = Thread(target=wait, args=(self.ctx, ))
1252 # t.start()
1326 self.setProxyCom()
1253 self.setProxyCom()
1327
1254
1328 # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo
1255 # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo
1329
1256
1330 # Closing every process
1331 log.success('{} finished (time: {}s)'.format(
1257 log.success('{} finished (time: {}s)'.format(
1332 self.name,
1258 self.name,
1333 time.time()-self.start_time)) No newline at end of file
1259 time.time()-self.start_time))
@@ -898,7 +898,7 class JRODataReader(JRODataIO):
898 newFile = self.__setNextFileOffline()
898 newFile = self.__setNextFileOffline()
899
899
900 if not(newFile):
900 if not(newFile):
901 raise(schainpy.admin.SchainWarning('No more files to read'))
901 self.dataOut.error = (-1, 'No more files to read')
902 return 0
902 return 0
903
903
904 if self.verbose:
904 if self.verbose:
@@ -1052,7 +1052,7 class JRODataReader(JRODataIO):
1052 # Skip block out of startTime and endTime
1052 # Skip block out of startTime and endTime
1053 while True:
1053 while True:
1054 if not(self.__setNewBlock()):
1054 if not(self.__setNewBlock()):
1055 raise(schainpy.admin.SchainWarning('No more files'))
1055 self.dataOut.error = (-1, 'No more files to read')
1056 return 0
1056 return 0
1057
1057
1058 if not(self.readBlock()):
1058 if not(self.readBlock()):
@@ -1324,7 +1324,7 class JRODataReader(JRODataIO):
1324 sleep(self.delay)
1324 sleep(self.delay)
1325
1325
1326 if not(fullpath):
1326 if not(fullpath):
1327 raise(schainpy.admin.SchainWarning('There isn\'t any valid file in {}'.format(path)))
1327 self.dataOut.error = (-1, 'There isn\'t any valid file in {}'.format(path))
1328 return
1328 return
1329
1329
1330 self.year = year
1330 self.year = year
@@ -18,7 +18,6 import time
18 import pickle
18 import pickle
19 import os
19 import os
20 from multiprocessing import Process
20 from multiprocessing import Process
21
22 from schainpy.utils import log
21 from schainpy.utils import log
23
22
24
23
@@ -36,43 +35,30 class ProcessingUnit(object):
36
35
37
36
38 """
37 """
39 # objeto de datos de entrada (Voltage, Spectra o Correlation)
38
39 METHODS = {}
40 dataIn = None
40 dataIn = None
41 dataInList = []
41 dataInList = []
42
43 # objeto de datos de entrada (Voltage, Spectra o Correlation)
44
45 id = None
42 id = None
46 inputId = None
43 inputId = None
47
48 dataOut = None
44 dataOut = None
49
50 dictProcs = None
45 dictProcs = None
51
52 operations2RunDict = None
53
54 isConfig = False
46 isConfig = False
55
47
56 def __init__(self):
48 def __init__(self):
57
49
58 self.dataIn = None
50 self.dataIn = None
59 self.dataOut = None
51 self.dataOut = None
60
61 self.isConfig = False
52 self.isConfig = False
53 self.operations = []
62
54
63 def getAllowedArgs(self):
55 def getAllowedArgs(self):
64 if hasattr(self, '__attrs__'):
56 if hasattr(self, '__attrs__'):
65 return self.__attrs__
57 return self.__attrs__
66 else:
58 else:
67 return inspect.getargspec(self.run).args
59 return inspect.getargspec(self.run).args
68
69 def addOperationKwargs(self, objId, **kwargs):
70 '''
71 '''
72
73 self.operationKwargs[objId] = kwargs
74
60
75 def addOperation(self, opObj, objId):
61 def addOperation(self, conf, operation):
76
62
77 """
63 """
78 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
64 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
@@ -90,17 +76,14 class ProcessingUnit(object):
90 objId : identificador del objeto, necesario para comunicar con master(procUnit)
76 objId : identificador del objeto, necesario para comunicar con master(procUnit)
91 """
77 """
92
78
93 self.operations2RunDict[objId] = opObj
79 self.operations.append((operation, conf.type, conf.id, conf.getKwargs()))
94
95 return objId
96
97
80
98 def getOperationObj(self, objId):
81 def getOperationObj(self, objId):
99
82
100 if objId not in list(self.operations2RunDict.keys()):
83 if objId not in list(self.operations.keys()):
101 return None
84 return None
102
85
103 return self.operations2RunDict[objId]
86 return self.operations[objId]
104
87
105 def operation(self, **kwargs):
88 def operation(self, **kwargs):
106
89
@@ -200,339 +183,185 class Operation(object):
200 pass
183 pass
201
184
202
185
203 ######### Decorator #########
204
205
206 def MPDecorator(BaseClass):
186 def MPDecorator(BaseClass):
207
187
208 """
188 """
209 "Multiprocessing class decorator"
189 Multiprocessing class decorator
210
190
211 This function add multiprocessing features to the base class. Also,
191 This function add multiprocessing features to a BaseClass. Also, it handle
212 it handle the communication beetween processes (readers, procUnits and operations).
192 the communication beetween processes (readers, procUnits and operations).
213 Receive the arguments at the moment of instantiation. According to that, discriminates if it
214 is a procUnit or an operation
215 """
193 """
216
194
217 class MPClass(BaseClass, Process):
195 class MPClass(BaseClass, Process):
218
196
219 "This is the overwritten class"
220 operations2RunDict = None
221 socket_l = None
222 socket_p = None
223 socketOP = None
224 socket_router = None
225 dictProcs = None
226 typeProc = None
227 def __init__(self, *args, **kwargs):
197 def __init__(self, *args, **kwargs):
228 super(MPClass, self).__init__()
198 super(MPClass, self).__init__()
229 Process.__init__(self)
199 Process.__init__(self)
230
231
232 self.operationKwargs = {}
200 self.operationKwargs = {}
233 self.args = args
201 self.args = args
234
235
236 self.operations2RunDict = {}
237 self.kwargs = kwargs
202 self.kwargs = kwargs
238
203 self.sender = None
239 # The number of arguments (args) determine the type of process
204 self.receiver = None
205 self.name = BaseClass.__name__
240
206
241 if len(self.args) is 3:
207 if len(self.args) is 3:
242 self.typeProc = "ProcUnit"
208 self.typeProc = "ProcUnit"
243 self.id = args[0] #topico de publicacion
209 self.id = args[0]
244 self.inputId = args[1] #topico de subcripcion
210 self.inputId = args[1]
245 self.dictProcs = args[2] #diccionario de procesos globales
211 self.project_id = args[2]
246 else:
212 else:
247 self.id = args[0]
213 self.id = args[0]
214 self.inputId = args[0]
215 self.project_id = args[1]
248 self.typeProc = "Operation"
216 self.typeProc = "Operation"
249
250 def addOperationKwargs(self, objId, **kwargs):
251
252 self.operationKwargs[objId] = kwargs
253
217
254 def getAllowedArgs(self):
218 def getAllowedArgs(self):
255
219
256 if hasattr(self, '__attrs__'):
220 if hasattr(self, '__attrs__'):
257 return self.__attrs__
221 return self.__attrs__
258 else:
222 else:
259 return inspect.getargspec(self.run).args
223 return inspect.getargspec(BaseClass.run).args
260
261
262 def sockListening(self, topic):
263
264 """
265 This function create a socket to receive objects.
266 The 'topic' argument is related to the publisher process from which the self process is
267 listening (data).
268 In the case were the self process is listening to a Reader (proc Unit),
269 special conditions are introduced to maximize parallelism.
270 """
271
272 cont = zmq.Context()
273 zmq_socket = cont.socket(zmq.SUB)
274 if not os.path.exists('/tmp/socketTmp'):
275 os.mkdir('/tmp/socketTmp')
276
277 if 'Reader' in self.dictProcs[self.inputId].name:
278 zmq_socket.connect('ipc:///tmp/socketTmp/b')
279
280 else:
281 zmq_socket.connect('ipc:///tmp/socketTmp/%s' % self.inputId)
282
283 #log.error('RECEIVING FROM {} {}'.format(self.inputId, str(topic).encode()))
284 zmq_socket.setsockopt(zmq.SUBSCRIBE, str(topic).encode()) #yong
285
286 return zmq_socket
287
288
289 def listenProc(self, sock):
290
291 """
292 This function listen to a ipc addres until a message is recovered. To serialize the
293 data (object), pickle has been use.
294 The 'sock' argument is the socket previously connect to an ipc address and with a topic subscription.
295 """
296
297 a = sock.recv_multipart()
298 a = pickle.loads(a[1])
299 return a
300
301 def sockPublishing(self):
302
303 """
304 This function create a socket for publishing purposes.
305 Depending on the process type from where is created, it binds or connect
306 to special IPC addresses.
307 """
308 time.sleep(4) #yong
309 context = zmq.Context()
310 zmq_socket = context.socket(zmq.PUB)
311 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
312 if 'Reader' in self.dictProcs[self.id].name:
313 zmq_socket.connect('ipc:///tmp/socketTmp/a')
314 else:
315 zmq_socket.bind('ipc:///tmp/socketTmp/%s' % self.id)
316
317 return zmq_socket
318
319 def publishProc(self, sock, data):
320
321 """
322 This function publish a python object (data) under a specific topic in a socket (sock).
323 Usually, the topic is the self id of the process.
324 """
325
326 sock.send_multipart([str(self.id).encode(), pickle.dumps(data)]) #yong
327
328 return True
329
330 def sockOp(self):
331
332 """
333 This function create a socket for communication purposes with operation processes.
334 """
335
336 cont = zmq.Context()
337 zmq_socket = cont.socket(zmq.DEALER)
338
339 if python_version()[0] == '2':
340 zmq_socket.setsockopt(zmq.IDENTITY, self.id)
341 if python_version()[0] == '3':
342 zmq_socket.setsockopt_string(zmq.IDENTITY, self.id)
343
344
345 return zmq_socket
346
347
348 def execOp(self, socket, opId, dataObj):
349
350 """
351 This function 'execute' an operation main routine by establishing a
352 connection with it and sending a python object (dataOut).
353 """
354 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
355 socket.connect('ipc:///tmp/socketTmp/%s' %opId)
356
357
358 socket.send(pickle.dumps(dataObj)) #yong
359
224
360 argument = socket.recv_multipart()[0]
225 def subscribe(self):
361
226 '''
362 argument = pickle.loads(argument)
227 This function create a socket to receive objects from the
363
228 topic `inputId`.
364 return argument
229 '''
365
366 def sockIO(self):
367
368 """
369 Socket defined for an operation process. It is able to recover the object sent from another process as well as a
370 identifier of who sent it.
371 """
372
373 cont = zmq.Context()
374 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
375 socket = cont.socket(zmq.ROUTER)
376 socket.bind('ipc:///tmp/socketTmp/%s' % self.id)
377
378 return socket
379
380 def funIOrec(self, socket):
381
382 """
383 Operation method, recover the id of the process who sent a python object.
384 The 'socket' argument is the socket binded to a specific process ipc.
385 """
386
387 #id_proc = socket.recv()
388
389 #dataObj = socket.recv_pyobj()
390
230
391 dataObj = socket.recv_multipart()
231 c = zmq.Context()
232 self.receiver = c.socket(zmq.SUB)
233 self.receiver.connect('ipc:///tmp/schain/{}_pub'.format(self.project_id))
234 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
235
236 def listen(self):
237 '''
238 This function waits for objects and deserialize using pickle
239 '''
392
240
393 dataObj[1] = pickle.loads(dataObj[1])
241 data = pickle.loads(self.receiver.recv_multipart()[1])
394 return dataObj[0], dataObj[1]
242 return data
395
243
396 def funIOsen(self, socket, data, dest):
244 def set_publisher(self):
245 '''
246 This function create a socket for publishing purposes.
247 '''
397
248
398 """
249 time.sleep(1)
399 Operation method, send a python object to a specific destination.
250 c = zmq.Context()
400 The 'dest' argument is the id of a proccesinf unit.
251 self.sender = c.socket(zmq.PUB)
401 """
252 self.sender.connect('ipc:///tmp/schain/{}_sub'.format(self.project_id))
402
403 socket.send_multipart([dest, pickle.dumps(data)]) #yong
404
253
405 return True
254 def publish(self, data, id):
255 '''
256 This function publish an object, to a specific topic.
257 '''
406
258
259 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
407
260
408 def runReader(self):
261 def runReader(self):
409
262 '''
410 # time.sleep(3)
263 Run fuction for read units
264 '''
411 while True:
265 while True:
412
266
413 BaseClass.run(self, **self.kwargs)
267 BaseClass.run(self, **self.kwargs)
414
268
415
269 if self.dataOut.error[0] == -1:
416 keyList = list(self.operations2RunDict.keys())
270 log.error(self.dataOut.error[1])
417 keyList.sort()
271 self.publish('end', self.id)
418
272 #self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
419 for key in keyList:
420 self.socketOP = self.sockOp()
421 self.dataOut = self.execOp(self.socketOP, key, self.dataOut)
422
423
424 if self.flagNoMoreFiles: #Usar un objeto con flags para saber si termino el proc o hubo un error
425 self.publishProc(self.socket_p, "Finish")
426 break
273 break
427
274
275 for op, optype, id, kwargs in self.operations:
276 if optype=='self':
277 op(**kwargs)
278 elif optype=='other':
279 self.dataOut = op.run(self.dataOut, **self.kwargs)
280 elif optype=='external':
281 self.publish(self.dataOut, opId)
282
428 if self.dataOut.flagNoData:
283 if self.dataOut.flagNoData:
429 continue
284 continue
430
285
431 #print("Publishing data...")
286 self.publish(self.dataOut, self.id)
432 self.publishProc(self.socket_p, self.dataOut)
287
433 # time.sleep(2)
434
435
436 print("%s done" %BaseClass.__name__)
437 return 0
438
439 def runProc(self):
288 def runProc(self):
440
289 '''
441 # All the procUnits with kwargs that require a setup initialization must be defined here.
290 Run function for proccessing units
442
291 '''
443 if self.setupReq:
444 BaseClass.setup(self, **self.kwargs)
445
292
446 while True:
293 while True:
447 self.dataIn = self.listenProc(self.socket_l)
294 self.dataIn = self.listen()
448 #print("%s received data" %BaseClass.__name__)
449
450 if self.dataIn == "Finish":
451 break
452
295
453 m_arg = list(self.kwargs.keys())
296 if self.dataIn == 'end':
454 num_arg = list(range(1,int(BaseClass.run.__code__.co_argcount)))
297 self.publish('end', self.id)
455
298 for op, optype, opId, kwargs in self.operations:
456 run_arg = {}
299 if optype == 'external':
457
300 self.publish('end', opId)
458 for var in num_arg:
301 break
459 if BaseClass.run.__code__.co_varnames[var] in m_arg:
460 run_arg[BaseClass.run.__code__.co_varnames[var]] = self.kwargs[BaseClass.run.__code__.co_varnames[var]]
461
462 #BaseClass.run(self, **self.kwargs)
463 BaseClass.run(self, **run_arg)
464
465 ## Iterar sobre una serie de data que podrias aplicarse
466
467 for m_name in BaseClass.METHODS:
468
302
469 met_arg = {}
303 if self.dataIn.flagNoData:
304 continue
470
305
471 for arg in m_arg:
306 BaseClass.run(self, **self.kwargs)
472 if arg in BaseClass.METHODS[m_name]:
473 for att in BaseClass.METHODS[m_name]:
474 met_arg[att] = self.kwargs[att]
475
307
476 method = getattr(BaseClass, m_name)
308 for op, optype, opId, kwargs in self.operations:
477 method(self, **met_arg)
309 if optype=='self':
478 break
310 op(**kwargs)
311 elif optype=='other':
312 self.dataOut = op.run(self.dataOut, **kwargs)
313 elif optype=='external':
314 self.publish(self.dataOut, opId)
479
315
480 if self.dataOut.flagNoData:
316 if self.dataOut.flagNoData:
481 continue
317 continue
482
318
483 keyList = list(self.operations2RunDict.keys())
319 self.publish(self.dataOut, self.id)
484 keyList.sort()
485
486 for key in keyList:
487
488 self.socketOP = self.sockOp()
489 self.dataOut = self.execOp(self.socketOP, key, self.dataOut)
490
491
492 self.publishProc(self.socket_p, self.dataOut)
493
494
495 print("%s done" %BaseClass.__name__)
496
497 return 0
498
320
499 def runOp(self):
321 def runOp(self):
322 '''
323 Run function for operations
324 '''
500
325
501 while True:
326 while True:
502
327
503 [self.dest ,self.buffer] = self.funIOrec(self.socket_router)
328 dataOut = self.listen()
504
505 self.buffer = BaseClass.run(self, self.buffer, **self.kwargs)
506
329
507 self.funIOsen(self.socket_router, self.buffer, self.dest)
330 if dataOut == 'end':
508
331 break
509 print("%s done" %BaseClass.__name__)
332
510 return 0
333 BaseClass.run(self, dataOut, **self.kwargs)
511
334
512
513 def run(self):
335 def run(self):
514
515 if self.typeProc is "ProcUnit":
516
517 self.socket_p = self.sockPublishing()
518
336
519 if 'Reader' not in self.dictProcs[self.id].name:
337 if self.typeProc is "ProcUnit":
520 self.socket_l = self.sockListening(self.inputId)
338
521 self.runProc()
339 if self.inputId is not None:
340 self.subscribe()
341 self.set_publisher()
522
342
343 if 'Reader' not in BaseClass.__name__:
344 self.runProc()
523 else:
345 else:
524
525 self.runReader()
346 self.runReader()
526
347
527 elif self.typeProc is "Operation":
348 elif self.typeProc is "Operation":
528
349
529 self.socket_router = self.sockIO()
350 self.subscribe()
530
531 self.runOp()
351 self.runOp()
532
352
533 else:
353 else:
534 raise ValueError("Unknown type")
354 raise ValueError("Unknown type")
535
355
536 return 0
356 print("%s done" % BaseClass.__name__)
537
357 self.close()
358
359 def close(self):
360
361 if self.sender:
362 self.sender.close()
363
364 if self.receiver:
365 self.receiver.close()
366
538 return MPClass No newline at end of file
367 return MPClass
@@ -127,8 +127,6 class SpectraProc(ProcessingUnit):
127
127
128 def run(self, nProfiles=None, nFFTPoints=None, pairsList=[], ippFactor=None, shift_fft=False):
128 def run(self, nProfiles=None, nFFTPoints=None, pairsList=[], ippFactor=None, shift_fft=False):
129
129
130 self.dataOut.flagNoData = True
131
132 if self.dataIn.type == "Spectra":
130 if self.dataIn.type == "Spectra":
133 self.dataOut.copy(self.dataIn)
131 self.dataOut.copy(self.dataIn)
134 # if not pairsList:
132 # if not pairsList:
@@ -783,7 +781,7 class SpectraProc(ProcessingUnit):
783
781
784 return 1
782 return 1
785
783
786 @MPDecorator
784
787 class IncohInt(Operation):
785 class IncohInt(Operation):
788
786
789 __profIndex = 0
787 __profIndex = 0
@@ -962,5 +960,5 class IncohInt(Operation):
962 dataOut.nIncohInt *= self.n
960 dataOut.nIncohInt *= self.n
963 dataOut.utctime = avgdatatime
961 dataOut.utctime = avgdatatime
964 dataOut.flagNoData = False
962 dataOut.flagNoData = False
965
963
966 return dataOut No newline at end of file
964 return dataOut
@@ -1,7 +1,7
1 import sys
1 import sys
2 import numpy
2 import numpy
3 from scipy import interpolate
3 from scipy import interpolate
4 from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator, Operation
4 from schainpy.model.proc.jroproc_base import ProcessingUnit,, Operation
5 from schainpy.model.data.jrodata import Voltage
5 from schainpy.model.data.jrodata import Voltage
6 from schainpy.utils import log
6 from schainpy.utils import log
7 from time import time
7 from time import time
@@ -10,16 +10,13 from time import time
10 @MPDecorator
10 @MPDecorator
11 class VoltageProc(ProcessingUnit):
11 class VoltageProc(ProcessingUnit):
12
12
13 METHODS = {} #yong
13 def __init__(self):
14
14
15 def __init__(self):#, **kwargs): #yong
15 ProcessingUnit.__init__(self)
16
16
17 ProcessingUnit.__init__(self)#, **kwargs)
18
19 # self.objectDict = {}
20 self.dataOut = Voltage()
17 self.dataOut = Voltage()
21 self.flip = 1
18 self.flip = 1
22 self.setupReq = False #yong
19 self.setupReq = False
23
20
24 def run(self):
21 def run(self):
25
22
@@ -319,7 +316,7 class VoltageProc(ProcessingUnit):
319 self.dataOut.data[:,:,botLim:topLim+1] = ynew
316 self.dataOut.data[:,:,botLim:topLim+1] = ynew
320
317
321 # import collections
318 # import collections
322 @MPDecorator
319
323 class CohInt(Operation):
320 class CohInt(Operation):
324
321
325 isConfig = False
322 isConfig = False
@@ -581,7 +578,7 class CohInt(Operation):
581 # dataOut.timeInterval = dataOut.ippSeconds * dataOut.nCohInt
578 # dataOut.timeInterval = dataOut.ippSeconds * dataOut.nCohInt
582 dataOut.flagNoData = False
579 dataOut.flagNoData = False
583 return dataOut
580 return dataOut
584 @MPDecorator
581
585 class Decoder(Operation):
582 class Decoder(Operation):
586
583
587 isConfig = False
584 isConfig = False
@@ -774,7 +771,7 class Decoder(Operation):
774 return dataOut
771 return dataOut
775 # dataOut.flagDeflipData = True #asumo q la data no esta sin flip
772 # dataOut.flagDeflipData = True #asumo q la data no esta sin flip
776
773
777 @MPDecorator
774
778 class ProfileConcat(Operation):
775 class ProfileConcat(Operation):
779
776
780 isConfig = False
777 isConfig = False
@@ -825,7 +822,7 class ProfileConcat(Operation):
825 dataOut.heightList = numpy.arange(dataOut.heightList[0], xf, deltaHeight)
822 dataOut.heightList = numpy.arange(dataOut.heightList[0], xf, deltaHeight)
826 dataOut.ippSeconds *= m
823 dataOut.ippSeconds *= m
827 return dataOut
824 return dataOut
828 @MPDecorator
825
829 class ProfileSelector(Operation):
826 class ProfileSelector(Operation):
830
827
831 profileIndex = None
828 profileIndex = None
@@ -986,7 +983,7 class ProfileSelector(Operation):
986
983
987 #return False
984 #return False
988 return dataOut
985 return dataOut
989 @MPDecorator
986
990 class Reshaper(Operation):
987 class Reshaper(Operation):
991
988
992 def __init__(self):#, **kwargs):
989 def __init__(self):#, **kwargs):
@@ -1091,7 +1088,7 class Reshaper(Operation):
1091 dataOut.ippSeconds /= self.__nTxs
1088 dataOut.ippSeconds /= self.__nTxs
1092
1089
1093 return dataOut
1090 return dataOut
1094 @MPDecorator
1091
1095 class SplitProfiles(Operation):
1092 class SplitProfiles(Operation):
1096
1093
1097 def __init__(self):#, **kwargs):
1094 def __init__(self):#, **kwargs):
@@ -1133,7 +1130,7 class SplitProfiles(Operation):
1133 dataOut.ippSeconds /= n
1130 dataOut.ippSeconds /= n
1134
1131
1135 return dataOut
1132 return dataOut
1136 @MPDecorator
1133
1137 class CombineProfiles(Operation):
1134 class CombineProfiles(Operation):
1138 def __init__(self):#, **kwargs):
1135 def __init__(self):#, **kwargs):
1139
1136
General Comments 0
You need to be logged in to leave comments. Login now