##// END OF EJS Templates
Review MP changes, three types of operations: self, other and external
Juan C. Espinoza -
r1177:b013e28003fc
parent child
Show More
@@ -11,7 +11,10 import sys
11 11 import time
12 12 import traceback
13 13 import smtplib
14 import configparser
14 if sys.version[0] == '3':
15 from configparser import ConfigParser
16 else:
17 from ConfigParser import ConfigParser
15 18 import io
16 19 from threading import Thread
17 20 from multiprocessing import Process
@@ -144,7 +147,7 class SchainConfigure():
144 147 return
145 148
146 149 # create Parser using standard module ConfigParser
147 self.__parser = configparser.ConfigParser()
150 self.__parser = ConfigParser()
148 151
149 152 # read conf file into a StringIO with "[madrigal]\n" section heading prepended
150 153 strConfFile = io.StringIO("[schain]\n" + self.__confFile.read())
@@ -12,7 +12,7 import math
12 12 import time
13 13 import zmq
14 14 from multiprocessing import Process, cpu_count
15
15 from threading import Thread
16 16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 17 from xml.dom import minidom
18 18
@@ -90,6 +90,18 def MPProject(project, n=cpu_count()):
90 90
91 91 time.sleep(3)
92 92
93 def wait(context):
94
95 time.sleep(1)
96 c = zmq.Context()
97 receiver = c.socket(zmq.SUB)
98 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
99 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
100 log.error('startinggg')
101 msg = receiver.recv_multipart()[1]
102 #log.error(msg)
103 context.terminate()
104
93 105 class ParameterConf():
94 106
95 107 id = None
@@ -281,13 +293,6 class ParameterConf():
281 293
282 294 class OperationConf():
283 295
284 id = None
285 name = None
286 priority = None
287 type = None
288
289 parmConfObjList = []
290
291 296 ELEMENTNAME = 'Operation'
292 297
293 298 def __init__(self):
@@ -369,9 +374,10 class OperationConf():
369 374
370 375 return kwargs
371 376
372 def setup(self, id, name, priority, type):
377 def setup(self, id, name, priority, type, project_id):
373 378
374 379 self.id = str(id)
380 self.project_id = project_id
375 381 self.name = name
376 382 self.type = type
377 383 self.priority = priority
@@ -459,29 +465,18 class OperationConf():
459 465 def createObject(self):
460 466
461 467 className = eval(self.name)
462 kwargs = self.getKwargs()
463
464 opObj = className(self.id, **kwargs)
465 468
469 if self.type == 'other':
470 opObj = className()
471 elif self.type == 'external':
472 kwargs = self.getKwargs()
473 opObj = className(self.id, self.project_id, **kwargs)
466 474 opObj.start()
467 475
468 print(' Operation created')
469
470 476 return opObj
471 477
472 478 class ProcUnitConf():
473 479
474 id = None
475 name = None
476 datatype = None
477 inputId = None
478 parentId = None
479
480 opConfObjList = []
481
482 procUnitObj = None
483 opObjList = []
484
485 480 ELEMENTNAME = 'ProcUnit'
486 481
487 482 def __init__(self):
@@ -490,9 +485,7 class ProcUnitConf():
490 485 self.datatype = None
491 486 self.name = None
492 487 self.inputId = None
493
494 488 self.opConfObjList = []
495
496 489 self.procUnitObj = None
497 490 self.opObjDict = {}
498 491
@@ -512,7 +505,7 class ProcUnitConf():
512 505
513 506 return self.id
514 507
515 def updateId(self, new_id, parentId=parentId):
508 def updateId(self, new_id):
516 509 '''
517 510 new_id = int(parentId) * 10 + (int(self.id) % 10)
518 511 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
@@ -534,6 +527,7 class ProcUnitConf():
534 527 #self.inputId = str(new_inputId)
535 528 '''
536 529 n = 1
530
537 531 def getInputId(self):
538 532
539 533 return self.inputId
@@ -565,7 +559,7 class ProcUnitConf():
565 559
566 560 return self.procUnitObj
567 561
568 def setup(self, id, name, datatype, inputId, parentId=None):
562 def setup(self, project_id, id, name, datatype, inputId):
569 563 '''
570 564 id sera el topico a publicar
571 565 inputId sera el topico a subscribirse
@@ -587,10 +581,10 class ProcUnitConf():
587 581 datatype = name.replace('Proc', '')
588 582
589 583 self.id = str(id)
584 self.project_id = project_id
590 585 self.name = name
591 586 self.datatype = datatype
592 587 self.inputId = inputId
593 self.parentId = parentId
594 588 self.opConfObjList = []
595 589
596 590 self.addOperation(name='run', optype='self')
@@ -623,10 +617,8 class ProcUnitConf():
623 617
624 618 id = self.__getNewId()
625 619 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
626
627 620 opConfObj = OperationConf()
628 opConfObj.setup(id, name=name, priority=priority, type=optype)
629
621 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id)
630 622 self.opConfObjList.append(opConfObj)
631 623
632 624 return opConfObj
@@ -685,62 +677,33 class ProcUnitConf():
685 677
686 678 return kwargs
687 679
688 def createObjects(self, dictUnits):
680 def createObjects(self):
689 681 '''
690 682 Instancia de unidades de procesamiento.
691
692 683 '''
693 684 className = eval(self.name)
694 685 kwargs = self.getKwargs()
695 procUnitObj = className(self.id, self.inputId, dictUnits, **kwargs) # necesitan saber su id y su entrada por fines de ipc
696
686 procUnitObj = className(self.id, self.inputId, self.project_id, **kwargs) # necesitan saber su id y su entrada por fines de ipc
687 log.success('creating process...', self.name)
697 688
698 689 for opConfObj in self.opConfObjList:
699 690
700 if opConfObj.type == 'self' and self.name == 'run':
691 if opConfObj.type == 'self' and opConfObj.name == 'run':
701 692 continue
702 693 elif opConfObj.type == 'self':
703 procUnitObj.addOperationKwargs(
704 opConfObj.id, **opConfObj.getKwargs())
705 continue
706 print("Creating operation process:", opConfObj.name, "for", self.name)
694 opObj = getattr(procUnitObj, opConfObj.name)
695 else:
707 696 opObj = opConfObj.createObject()
708 697
698 log.success('creating operation: {}, type:{}'.format(
699 opConfObj.name,
700 opConfObj.type), self.name)
709 701
710 #self.opObjDict[opConfObj.id] = opObj.name
711
712 procUnitObj.addOperation(opConfObj.name, opConfObj.id)
702 procUnitObj.addOperation(opConfObj, opObj)
713 703
714 704 procUnitObj.start()
715
716 705 self.procUnitObj = procUnitObj
717 706
718
719 return procUnitObj
720
721 def run(self):
722
723 is_ok = True
724 """
725 for opConfObj in self.opConfObjList:
726
727 kwargs = {}
728 for parmConfObj in opConfObj.getParameterObjList():
729 if opConfObj.name == 'run' and parmConfObj.name == 'datatype':
730 continue
731
732 kwargs[parmConfObj.name] = parmConfObj.getValue()
733
734 sts = self.procUnitObj.call(opType=opConfObj.type,
735 opName=opConfObj.name,
736 opId=opConfObj.id)
737
738 is_ok = is_ok or sts
739
740 """
741 return is_ok
742
743
744 707 def close(self):
745 708
746 709 for opConfObj in self.opConfObjList:
@@ -757,12 +720,6 class ProcUnitConf():
757 720
758 721 class ReadUnitConf(ProcUnitConf):
759 722
760 path = None
761 startDate = None
762 endDate = None
763 startTime = None
764 endTime = None
765
766 723 ELEMENTNAME = 'ReadUnit'
767 724
768 725 def __init__(self):
@@ -771,18 +728,14 class ReadUnitConf(ProcUnitConf):
771 728 self.datatype = None
772 729 self.name = None
773 730 self.inputId = None
774
775 self.parentId = None
776
777 731 self.opConfObjList = []
778 self.opObjList = []
779 732
780 733 def getElementName(self):
781 734
782 735 return self.ELEMENTNAME
783 736
784 def setup(self, id, name, datatype, path='', startDate='', endDate='',
785 startTime='', endTime='', parentId=None, server=None, **kwargs):
737 def setup(self, project_id, id, name, datatype, path='', startDate='', endDate='',
738 startTime='', endTime='', server=None, **kwargs):
786 739
787 740
788 741 '''
@@ -810,6 +763,7 class ReadUnitConf(ProcUnitConf):
810 763 name = '{}Reader'.format(name)
811 764
812 765 self.id = id
766 self.project_id = project_id
813 767 self.name = name
814 768 self.datatype = datatype
815 769 if path != '':
@@ -818,8 +772,6 class ReadUnitConf(ProcUnitConf):
818 772 self.endDate = endDate
819 773 self.startTime = startTime
820 774 self.endTime = endTime
821 self.inputId = '0'
822 self.parentId = parentId
823 775 self.server = server
824 776 self.addRunOperation(**kwargs)
825 777
@@ -834,13 +786,12 class ReadUnitConf(ProcUnitConf):
834 786 self.datatype = self.name.replace('Reader', '')
835 787
836 788 attrs = ('path', 'startDate', 'endDate',
837 'startTime', 'endTime', 'parentId')
789 'startTime', 'endTime')
838 790
839 791 for attr in attrs:
840 792 if attr in kwargs:
841 793 setattr(self, attr, kwargs.pop(attr))
842 794
843 self.inputId = '0'
844 795 self.updateRunOperation(**kwargs)
845 796
846 797 def removeOperations(self):
@@ -900,14 +851,10 class ReadUnitConf(ProcUnitConf):
900 851 self.id = upElement.get('id')
901 852 self.name = upElement.get('name')
902 853 self.datatype = upElement.get('datatype')
903 self.inputId = upElement.get('inputId')
904 854
905 855 if self.ELEMENTNAME == 'ReadUnit':
906 856 self.datatype = self.datatype.replace('Reader', '')
907 857
908 if self.inputId == 'None':
909 self.inputId = '0'
910
911 858 self.opConfObjList = []
912 859
913 860 opElementList = upElement.iter(OperationConf().getElementName())
@@ -927,20 +874,13 class ReadUnitConf(ProcUnitConf):
927 874
928 875 class Project(Process):
929 876
930 id = None
931 description = None
932 filename = None
933
934 procUnitConfObjDict = None
935
936 877 ELEMENTNAME = 'Project'
937 878
938
939
940 879 def __init__(self):
941 880
942 881 Process.__init__(self)
943 882 self.id = None
883 self.filename = None
944 884 self.description = None
945 885 self.email = None
946 886 self.alarm = None
@@ -949,7 +889,6 class Project(Process):
949 889 def __getNewId(self):
950 890
951 891 idList = list(self.procUnitConfObjDict.keys())
952
953 892 id = int(self.id) * 10
954 893
955 894 while True:
@@ -984,13 +923,13 class Project(Process):
984 923
985 924 procUnitConfObj = self.procUnitConfObjDict[procKey]
986 925 idProcUnit = str(int(self.id) * 10 + n)
987 procUnitConfObj.updateId(idProcUnit, parentId=self.id)
926 procUnitConfObj.updateId(idProcUnit)
988 927 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
989 928 n += 1
990 929
991 930 self.procUnitConfObjDict = newProcUnitConfObjDict
992 931
993 def setup(self, id, name='', description='', email=None, alarm=[]):
932 def setup(self, id=1, name='', description='', email=None, alarm=[]):
994 933
995 934 print(' ')
996 935 print('*' * 60)
@@ -1031,9 +970,7 class Project(Process):
1031 970 idReadUnit = str(id)
1032 971
1033 972 readUnitConfObj = ReadUnitConf()
1034 readUnitConfObj.setup(idReadUnit, name, datatype,
1035 parentId=self.id, **kwargs)
1036
973 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, **kwargs)
1037 974 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1038 975
1039 976 return readUnitConfObj
@@ -1051,11 +988,8 class Project(Process):
1051 988 '''
1052 989
1053 990 idProcUnit = self.__getNewId() #Topico para subscripcion
1054
1055 991 procUnitConfObj = ProcUnitConf()
1056 procUnitConfObj.setup(idProcUnit, name, datatype, inputId, #topic_read, topic_write,
1057 parentId=self.id)
1058
992 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId) #topic_read, topic_write,
1059 993 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1060 994
1061 995 return procUnitConfObj
@@ -1176,10 +1110,6 class Project(Process):
1176 1110 for readUnitElement in readUnitElementList:
1177 1111 readUnitConfObj = ReadUnitConf()
1178 1112 readUnitConfObj.readXml(readUnitElement)
1179
1180 if readUnitConfObj.parentId == None:
1181 readUnitConfObj.parentId = self.id
1182
1183 1113 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1184 1114
1185 1115 procUnitElementList = self.projectElement.iter(
@@ -1188,33 +1118,25 class Project(Process):
1188 1118 for procUnitElement in procUnitElementList:
1189 1119 procUnitConfObj = ProcUnitConf()
1190 1120 procUnitConfObj.readXml(procUnitElement)
1191
1192 if procUnitConfObj.parentId == None:
1193 procUnitConfObj.parentId = self.id
1194
1195 1121 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1196 1122
1197 1123 self.filename = abs_file
1198 1124
1199 1125 return 1
1200 1126
1201 def printattr(self):
1127 def __str__(self):
1202 1128
1203 1129 print('Project[%s]: name = %s, description = %s' % (self.id,
1204 1130 self.name,
1205 1131 self.description))
1206 1132
1207 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1208 procUnitConfObj.printattr()
1133 for procUnitConfObj in self.procUnitConfObjDict.values():
1134 print(procUnitConfObj)
1209 1135
1210 1136 def createObjects(self):
1211 1137
1212 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1213 print("Creating process:", procUnitConfObj.name)
1214 procUnitConfObj.createObjects(self.procUnitConfObjDict)
1215
1216
1217 print('All processes were created')
1138 for procUnitConfObj in self.procUnitConfObjDict.values():
1139 procUnitConfObj.createObjects()
1218 1140
1219 1141 def __handleError(self, procUnitConfObj, modes=None, stdout=True):
1220 1142
@@ -1306,28 +1228,32 class Project(Process):
1306 1228
1307 1229 def setProxyCom(self):
1308 1230
1309 ctx = zmq.Context()
1310 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
1311 xsub = ctx.socket(zmq.XSUB)
1312 xsub.bind('ipc:///tmp/socketTmp/a')
1313 xpub = ctx.socket(zmq.XPUB)
1314 xpub.bind('ipc:///tmp/socketTmp/b')
1315
1316 print("Controller Ready: Processes and proxy created")
1317 zmq.proxy(xsub, xpub)
1231 if not os.path.exists('/tmp/schain'):
1232 os.mkdir('/tmp/schain')
1318 1233
1234 self.ctx = zmq.Context()
1235 xpub = self.ctx.socket(zmq.XPUB)
1236 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1237 xsub = self.ctx.socket(zmq.XSUB)
1238 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1319 1239
1240 try:
1241 zmq.proxy(xpub, xsub)
1242 except zmq.ContextTerminated:
1243 xpub.close()
1244 xsub.close()
1320 1245
1321 1246 def run(self):
1322 1247
1323 log.success('Starting {}'.format(self.name), tag='')
1248 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1324 1249 self.start_time = time.time()
1325 1250 self.createObjects()
1251 # t = Thread(target=wait, args=(self.ctx, ))
1252 # t.start()
1326 1253 self.setProxyCom()
1327 1254
1328 1255 # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo
1329 1256
1330 # Closing every process
1331 1257 log.success('{} finished (time: {}s)'.format(
1332 1258 self.name,
1333 1259 time.time()-self.start_time))
@@ -898,7 +898,7 class JRODataReader(JRODataIO):
898 898 newFile = self.__setNextFileOffline()
899 899
900 900 if not(newFile):
901 raise(schainpy.admin.SchainWarning('No more files to read'))
901 self.dataOut.error = (-1, 'No more files to read')
902 902 return 0
903 903
904 904 if self.verbose:
@@ -1052,7 +1052,7 class JRODataReader(JRODataIO):
1052 1052 # Skip block out of startTime and endTime
1053 1053 while True:
1054 1054 if not(self.__setNewBlock()):
1055 raise(schainpy.admin.SchainWarning('No more files'))
1055 self.dataOut.error = (-1, 'No more files to read')
1056 1056 return 0
1057 1057
1058 1058 if not(self.readBlock()):
@@ -1324,7 +1324,7 class JRODataReader(JRODataIO):
1324 1324 sleep(self.delay)
1325 1325
1326 1326 if not(fullpath):
1327 raise(schainpy.admin.SchainWarning('There isn\'t any valid file in {}'.format(path)))
1327 self.dataOut.error = (-1, 'There isn\'t any valid file in {}'.format(path))
1328 1328 return
1329 1329
1330 1330 self.year = year
@@ -18,7 +18,6 import time
18 18 import pickle
19 19 import os
20 20 from multiprocessing import Process
21
22 21 from schainpy.utils import log
23 22
24 23
@@ -36,29 +35,22 class ProcessingUnit(object):
36 35
37 36
38 37 """
39 # objeto de datos de entrada (Voltage, Spectra o Correlation)
38
39 METHODS = {}
40 40 dataIn = None
41 41 dataInList = []
42
43 # objeto de datos de entrada (Voltage, Spectra o Correlation)
44
45 42 id = None
46 43 inputId = None
47
48 44 dataOut = None
49
50 45 dictProcs = None
51
52 operations2RunDict = None
53
54 46 isConfig = False
55 47
56 48 def __init__(self):
57 49
58 50 self.dataIn = None
59 51 self.dataOut = None
60
61 52 self.isConfig = False
53 self.operations = []
62 54
63 55 def getAllowedArgs(self):
64 56 if hasattr(self, '__attrs__'):
@@ -66,13 +58,7 class ProcessingUnit(object):
66 58 else:
67 59 return inspect.getargspec(self.run).args
68 60
69 def addOperationKwargs(self, objId, **kwargs):
70 '''
71 '''
72
73 self.operationKwargs[objId] = kwargs
74
75 def addOperation(self, opObj, objId):
61 def addOperation(self, conf, operation):
76 62
77 63 """
78 64 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
@@ -90,17 +76,14 class ProcessingUnit(object):
90 76 objId : identificador del objeto, necesario para comunicar con master(procUnit)
91 77 """
92 78
93 self.operations2RunDict[objId] = opObj
94
95 return objId
96
79 self.operations.append((operation, conf.type, conf.id, conf.getKwargs()))
97 80
98 81 def getOperationObj(self, objId):
99 82
100 if objId not in list(self.operations2RunDict.keys()):
83 if objId not in list(self.operations.keys()):
101 84 return None
102 85
103 return self.operations2RunDict[objId]
86 return self.operations[objId]
104 87
105 88 def operation(self, **kwargs):
106 89
@@ -200,339 +183,185 class Operation(object):
200 183 pass
201 184
202 185
203 ######### Decorator #########
204
205
206 186 def MPDecorator(BaseClass):
207 187
208 188 """
209 "Multiprocessing class decorator"
189 Multiprocessing class decorator
210 190
211 This function add multiprocessing features to the base class. Also,
212 it handle the communication beetween processes (readers, procUnits and operations).
213 Receive the arguments at the moment of instantiation. According to that, discriminates if it
214 is a procUnit or an operation
191 This function add multiprocessing features to a BaseClass. Also, it handle
192 the communication beetween processes (readers, procUnits and operations).
215 193 """
216 194
217 195 class MPClass(BaseClass, Process):
218 196
219 "This is the overwritten class"
220 operations2RunDict = None
221 socket_l = None
222 socket_p = None
223 socketOP = None
224 socket_router = None
225 dictProcs = None
226 typeProc = None
227 197 def __init__(self, *args, **kwargs):
228 198 super(MPClass, self).__init__()
229 199 Process.__init__(self)
230
231
232 200 self.operationKwargs = {}
233 201 self.args = args
234
235
236 self.operations2RunDict = {}
237 202 self.kwargs = kwargs
238
239 # The number of arguments (args) determine the type of process
203 self.sender = None
204 self.receiver = None
205 self.name = BaseClass.__name__
240 206
241 207 if len(self.args) is 3:
242 208 self.typeProc = "ProcUnit"
243 self.id = args[0] #topico de publicacion
244 self.inputId = args[1] #topico de subcripcion
245 self.dictProcs = args[2] #diccionario de procesos globales
209 self.id = args[0]
210 self.inputId = args[1]
211 self.project_id = args[2]
246 212 else:
247 213 self.id = args[0]
214 self.inputId = args[0]
215 self.project_id = args[1]
248 216 self.typeProc = "Operation"
249 217
250 def addOperationKwargs(self, objId, **kwargs):
251
252 self.operationKwargs[objId] = kwargs
253
254 218 def getAllowedArgs(self):
255 219
256 220 if hasattr(self, '__attrs__'):
257 221 return self.__attrs__
258 222 else:
259 return inspect.getargspec(self.run).args
260
261
262 def sockListening(self, topic):
263
264 """
265 This function create a socket to receive objects.
266 The 'topic' argument is related to the publisher process from which the self process is
267 listening (data).
268 In the case were the self process is listening to a Reader (proc Unit),
269 special conditions are introduced to maximize parallelism.
270 """
271
272 cont = zmq.Context()
273 zmq_socket = cont.socket(zmq.SUB)
274 if not os.path.exists('/tmp/socketTmp'):
275 os.mkdir('/tmp/socketTmp')
276
277 if 'Reader' in self.dictProcs[self.inputId].name:
278 zmq_socket.connect('ipc:///tmp/socketTmp/b')
279
280 else:
281 zmq_socket.connect('ipc:///tmp/socketTmp/%s' % self.inputId)
282
283 #log.error('RECEIVING FROM {} {}'.format(self.inputId, str(topic).encode()))
284 zmq_socket.setsockopt(zmq.SUBSCRIBE, str(topic).encode()) #yong
285
286 return zmq_socket
223 return inspect.getargspec(BaseClass.run).args
287 224
225 def subscribe(self):
226 '''
227 This function create a socket to receive objects from the
228 topic `inputId`.
229 '''
288 230
289 def listenProc(self, sock):
231 c = zmq.Context()
232 self.receiver = c.socket(zmq.SUB)
233 self.receiver.connect('ipc:///tmp/schain/{}_pub'.format(self.project_id))
234 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
290 235
291 """
292 This function listen to a ipc addres until a message is recovered. To serialize the
293 data (object), pickle has been use.
294 The 'sock' argument is the socket previously connect to an ipc address and with a topic subscription.
295 """
296
297 a = sock.recv_multipart()
298 a = pickle.loads(a[1])
299 return a
236 def listen(self):
237 '''
238 This function waits for objects and deserialize using pickle
239 '''
300 240
301 def sockPublishing(self):
241 data = pickle.loads(self.receiver.recv_multipart()[1])
242 return data
302 243
303 """
244 def set_publisher(self):
245 '''
304 246 This function create a socket for publishing purposes.
305 Depending on the process type from where is created, it binds or connect
306 to special IPC addresses.
307 """
308 time.sleep(4) #yong
309 context = zmq.Context()
310 zmq_socket = context.socket(zmq.PUB)
311 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
312 if 'Reader' in self.dictProcs[self.id].name:
313 zmq_socket.connect('ipc:///tmp/socketTmp/a')
314 else:
315 zmq_socket.bind('ipc:///tmp/socketTmp/%s' % self.id)
316
317 return zmq_socket
318
319 def publishProc(self, sock, data):
320
321 """
322 This function publish a python object (data) under a specific topic in a socket (sock).
323 Usually, the topic is the self id of the process.
324 """
325
326 sock.send_multipart([str(self.id).encode(), pickle.dumps(data)]) #yong
327
328 return True
329
330 def sockOp(self):
331
332 """
333 This function create a socket for communication purposes with operation processes.
334 """
335
336 cont = zmq.Context()
337 zmq_socket = cont.socket(zmq.DEALER)
338
339 if python_version()[0] == '2':
340 zmq_socket.setsockopt(zmq.IDENTITY, self.id)
341 if python_version()[0] == '3':
342 zmq_socket.setsockopt_string(zmq.IDENTITY, self.id)
343
344
345 return zmq_socket
346
347
348 def execOp(self, socket, opId, dataObj):
349
350 """
351 This function 'execute' an operation main routine by establishing a
352 connection with it and sending a python object (dataOut).
353 """
354 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
355 socket.connect('ipc:///tmp/socketTmp/%s' %opId)
356
357
358 socket.send(pickle.dumps(dataObj)) #yong
359
360 argument = socket.recv_multipart()[0]
361
362 argument = pickle.loads(argument)
363
364 return argument
365
366 def sockIO(self):
367
368 """
369 Socket defined for an operation process. It is able to recover the object sent from another process as well as a
370 identifier of who sent it.
371 """
372
373 cont = zmq.Context()
374 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
375 socket = cont.socket(zmq.ROUTER)
376 socket.bind('ipc:///tmp/socketTmp/%s' % self.id)
377
378 return socket
379
380 def funIOrec(self, socket):
381
382 """
383 Operation method, recover the id of the process who sent a python object.
384 The 'socket' argument is the socket binded to a specific process ipc.
385 """
386
387 #id_proc = socket.recv()
388
389 #dataObj = socket.recv_pyobj()
390
391 dataObj = socket.recv_multipart()
392
393 dataObj[1] = pickle.loads(dataObj[1])
394 return dataObj[0], dataObj[1]
395
396 def funIOsen(self, socket, data, dest):
397
398 """
399 Operation method, send a python object to a specific destination.
400 The 'dest' argument is the id of a proccesinf unit.
401 """
247 '''
402 248
403 socket.send_multipart([dest, pickle.dumps(data)]) #yong
249 time.sleep(1)
250 c = zmq.Context()
251 self.sender = c.socket(zmq.PUB)
252 self.sender.connect('ipc:///tmp/schain/{}_sub'.format(self.project_id))
404 253
405 return True
254 def publish(self, data, id):
255 '''
256 This function publish an object, to a specific topic.
257 '''
406 258
259 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
407 260
408 261 def runReader(self):
409
410 # time.sleep(3)
262 '''
263 Run fuction for read units
264 '''
411 265 while True:
412 266
413 267 BaseClass.run(self, **self.kwargs)
414 268
415
416 keyList = list(self.operations2RunDict.keys())
417 keyList.sort()
418
419 for key in keyList:
420 self.socketOP = self.sockOp()
421 self.dataOut = self.execOp(self.socketOP, key, self.dataOut)
422
423
424 if self.flagNoMoreFiles: #Usar un objeto con flags para saber si termino el proc o hubo un error
425 self.publishProc(self.socket_p, "Finish")
269 if self.dataOut.error[0] == -1:
270 log.error(self.dataOut.error[1])
271 self.publish('end', self.id)
272 #self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
426 273 break
427 274
275 for op, optype, id, kwargs in self.operations:
276 if optype=='self':
277 op(**kwargs)
278 elif optype=='other':
279 self.dataOut = op.run(self.dataOut, **self.kwargs)
280 elif optype=='external':
281 self.publish(self.dataOut, opId)
282
428 283 if self.dataOut.flagNoData:
429 284 continue
430 285
431 #print("Publishing data...")
432 self.publishProc(self.socket_p, self.dataOut)
433 # time.sleep(2)
434
435
436 print("%s done" %BaseClass.__name__)
437 return 0
286 self.publish(self.dataOut, self.id)
438 287
439 288 def runProc(self):
440
441 # All the procUnits with kwargs that require a setup initialization must be defined here.
442
443 if self.setupReq:
444 BaseClass.setup(self, **self.kwargs)
289 '''
290 Run function for proccessing units
291 '''
445 292
446 293 while True:
447 self.dataIn = self.listenProc(self.socket_l)
448 #print("%s received data" %BaseClass.__name__)
294 self.dataIn = self.listen()
449 295
450 if self.dataIn == "Finish":
296 if self.dataIn == 'end':
297 self.publish('end', self.id)
298 for op, optype, opId, kwargs in self.operations:
299 if optype == 'external':
300 self.publish('end', opId)
451 301 break
452 302
453 m_arg = list(self.kwargs.keys())
454 num_arg = list(range(1,int(BaseClass.run.__code__.co_argcount)))
455
456 run_arg = {}
457
458 for var in num_arg:
459 if BaseClass.run.__code__.co_varnames[var] in m_arg:
460 run_arg[BaseClass.run.__code__.co_varnames[var]] = self.kwargs[BaseClass.run.__code__.co_varnames[var]]
461
462 #BaseClass.run(self, **self.kwargs)
463 BaseClass.run(self, **run_arg)
464
465 ## Iterar sobre una serie de data que podrias aplicarse
466
467 for m_name in BaseClass.METHODS:
303 if self.dataIn.flagNoData:
304 continue
468 305
469 met_arg = {}
306 BaseClass.run(self, **self.kwargs)
470 307
471 for arg in m_arg:
472 if arg in BaseClass.METHODS[m_name]:
473 for att in BaseClass.METHODS[m_name]:
474 met_arg[att] = self.kwargs[att]
475
476 method = getattr(BaseClass, m_name)
477 method(self, **met_arg)
478 break
308 for op, optype, opId, kwargs in self.operations:
309 if optype=='self':
310 op(**kwargs)
311 elif optype=='other':
312 self.dataOut = op.run(self.dataOut, **kwargs)
313 elif optype=='external':
314 self.publish(self.dataOut, opId)
479 315
480 316 if self.dataOut.flagNoData:
481 317 continue
482 318
483 keyList = list(self.operations2RunDict.keys())
484 keyList.sort()
485
486 for key in keyList:
487
488 self.socketOP = self.sockOp()
489 self.dataOut = self.execOp(self.socketOP, key, self.dataOut)
490
491
492 self.publishProc(self.socket_p, self.dataOut)
493
494
495 print("%s done" %BaseClass.__name__)
496
497 return 0
319 self.publish(self.dataOut, self.id)
498 320
499 321 def runOp(self):
322 '''
323 Run function for operations
324 '''
500 325
501 326 while True:
502 327
503 [self.dest ,self.buffer] = self.funIOrec(self.socket_router)
328 dataOut = self.listen()
504 329
505 self.buffer = BaseClass.run(self, self.buffer, **self.kwargs)
506
507 self.funIOsen(self.socket_router, self.buffer, self.dest)
508
509 print("%s done" %BaseClass.__name__)
510 return 0
330 if dataOut == 'end':
331 break
511 332
333 BaseClass.run(self, dataOut, **self.kwargs)
512 334
513 335 def run(self):
514 336
515 337 if self.typeProc is "ProcUnit":
516 338
517 self.socket_p = self.sockPublishing()
339 if self.inputId is not None:
340 self.subscribe()
341 self.set_publisher()
518 342
519 if 'Reader' not in self.dictProcs[self.id].name:
520 self.socket_l = self.sockListening(self.inputId)
343 if 'Reader' not in BaseClass.__name__:
521 344 self.runProc()
522
523 345 else:
524
525 346 self.runReader()
526 347
527 348 elif self.typeProc is "Operation":
528 349
529 self.socket_router = self.sockIO()
530
350 self.subscribe()
531 351 self.runOp()
532 352
533 353 else:
534 354 raise ValueError("Unknown type")
535 355
536 return 0
356 print("%s done" % BaseClass.__name__)
357 self.close()
358
359 def close(self):
360
361 if self.sender:
362 self.sender.close()
363
364 if self.receiver:
365 self.receiver.close()
537 366
538 367 return MPClass No newline at end of file
@@ -127,8 +127,6 class SpectraProc(ProcessingUnit):
127 127
128 128 def run(self, nProfiles=None, nFFTPoints=None, pairsList=[], ippFactor=None, shift_fft=False):
129 129
130 self.dataOut.flagNoData = True
131
132 130 if self.dataIn.type == "Spectra":
133 131 self.dataOut.copy(self.dataIn)
134 132 # if not pairsList:
@@ -783,7 +781,7 class SpectraProc(ProcessingUnit):
783 781
784 782 return 1
785 783
786 @MPDecorator
784
787 785 class IncohInt(Operation):
788 786
789 787 __profIndex = 0
@@ -1,7 +1,7
1 1 import sys
2 2 import numpy
3 3 from scipy import interpolate
4 from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator, Operation
4 from schainpy.model.proc.jroproc_base import ProcessingUnit,, Operation
5 5 from schainpy.model.data.jrodata import Voltage
6 6 from schainpy.utils import log
7 7 from time import time
@@ -10,16 +10,13 from time import time
10 10 @MPDecorator
11 11 class VoltageProc(ProcessingUnit):
12 12
13 METHODS = {} #yong
13 def __init__(self):
14 14
15 def __init__(self):#, **kwargs): #yong
15 ProcessingUnit.__init__(self)
16 16
17 ProcessingUnit.__init__(self)#, **kwargs)
18
19 # self.objectDict = {}
20 17 self.dataOut = Voltage()
21 18 self.flip = 1
22 self.setupReq = False #yong
19 self.setupReq = False
23 20
24 21 def run(self):
25 22
@@ -319,7 +316,7 class VoltageProc(ProcessingUnit):
319 316 self.dataOut.data[:,:,botLim:topLim+1] = ynew
320 317
321 318 # import collections
322 @MPDecorator
319
323 320 class CohInt(Operation):
324 321
325 322 isConfig = False
@@ -581,7 +578,7 class CohInt(Operation):
581 578 # dataOut.timeInterval = dataOut.ippSeconds * dataOut.nCohInt
582 579 dataOut.flagNoData = False
583 580 return dataOut
584 @MPDecorator
581
585 582 class Decoder(Operation):
586 583
587 584 isConfig = False
@@ -774,7 +771,7 class Decoder(Operation):
774 771 return dataOut
775 772 # dataOut.flagDeflipData = True #asumo q la data no esta sin flip
776 773
777 @MPDecorator
774
778 775 class ProfileConcat(Operation):
779 776
780 777 isConfig = False
@@ -825,7 +822,7 class ProfileConcat(Operation):
825 822 dataOut.heightList = numpy.arange(dataOut.heightList[0], xf, deltaHeight)
826 823 dataOut.ippSeconds *= m
827 824 return dataOut
828 @MPDecorator
825
829 826 class ProfileSelector(Operation):
830 827
831 828 profileIndex = None
@@ -986,7 +983,7 class ProfileSelector(Operation):
986 983
987 984 #return False
988 985 return dataOut
989 @MPDecorator
986
990 987 class Reshaper(Operation):
991 988
992 989 def __init__(self):#, **kwargs):
@@ -1091,7 +1088,7 class Reshaper(Operation):
1091 1088 dataOut.ippSeconds /= self.__nTxs
1092 1089
1093 1090 return dataOut
1094 @MPDecorator
1091
1095 1092 class SplitProfiles(Operation):
1096 1093
1097 1094 def __init__(self):#, **kwargs):
@@ -1133,7 +1130,7 class SplitProfiles(Operation):
1133 1130 dataOut.ippSeconds /= n
1134 1131
1135 1132 return dataOut
1136 @MPDecorator
1133
1137 1134 class CombineProfiles(Operation):
1138 1135 def __init__(self):#, **kwargs):
1139 1136
General Comments 0
You need to be logged in to leave comments. Login now