##// END OF EJS Templates
Se añade el metodo fix_publish y se editan los metodos listen y publish, en el ultimo se colocaun pequeño retardo aplicado exclusivamente durante la comunacion desde la unidad de lectura, las demas operaciones se realizan igual.
Alexander Valdez -
r1220:dd4695cf07ce
parent child
Show More
@@ -1,390 +1,404
1 '''
1 '''
2 Updated for multiprocessing
2 Updated for multiprocessing
3 Author : Sergio Cortez
3 Author : Sergio Cortez
4 Jan 2018
4 Jan 2018
5 Abstract:
5 Abstract:
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9
9
10 Based on:
10 Based on:
11 $Author: murco $
11 $Author: murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 '''
13 '''
14
14
15 import os
15 import os
16 import inspect
16 import inspect
17 import zmq
17 import zmq
18 import time
18 import time
19 import pickle
19 import pickle
20 from multiprocessing import Process
20 from multiprocessing import Process
21 from zmq.utils.monitor import recv_monitor_message
21 from zmq.utils.monitor import recv_monitor_message
22
22
23 from schainpy.utils import log
23 from schainpy.utils import log
24
24
25
25
26 class ProcessingUnit(object):
26 class ProcessingUnit(object):
27
27
28 """
28 """
29 Update - Jan 2018 - MULTIPROCESSING
29 Update - Jan 2018 - MULTIPROCESSING
30 All the "call" methods present in the previous base were removed.
30 All the "call" methods present in the previous base were removed.
31 The majority of operations are independant processes, thus
31 The majority of operations are independant processes, thus
32 the decorator is in charge of communicate the operation processes
32 the decorator is in charge of communicate the operation processes
33 with the proccessing unit via IPC.
33 with the proccessing unit via IPC.
34
34
35 The constructor does not receive any argument. The remaining methods
35 The constructor does not receive any argument. The remaining methods
36 are related with the operations to execute.
36 are related with the operations to execute.
37
37
38
38
39 """
39 """
40
40
41 def __init__(self):
41 def __init__(self):
42
42
43 self.dataIn = None
43 self.dataIn = None
44 self.dataOut = None
44 self.dataOut = None
45 self.isConfig = False
45 self.isConfig = False
46 self.operations = []
46 self.operations = []
47 self.plots = []
47 self.plots = []
48
48
49 def getAllowedArgs(self):
49 def getAllowedArgs(self):
50 if hasattr(self, '__attrs__'):
50 if hasattr(self, '__attrs__'):
51 return self.__attrs__
51 return self.__attrs__
52 else:
52 else:
53 return inspect.getargspec(self.run).args
53 return inspect.getargspec(self.run).args
54
54
55 def addOperation(self, conf, operation):
55 def addOperation(self, conf, operation):
56 """
56 """
57 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
57 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
58 posses the id of the operation process (IPC purposes)
58 posses the id of the operation process (IPC purposes)
59
59
60 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
60 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
61 identificador asociado a este objeto.
61 identificador asociado a este objeto.
62
62
63 Input:
63 Input:
64
64
65 object : objeto de la clase "Operation"
65 object : objeto de la clase "Operation"
66
66
67 Return:
67 Return:
68
68
69 objId : identificador del objeto, necesario para comunicar con master(procUnit)
69 objId : identificador del objeto, necesario para comunicar con master(procUnit)
70 """
70 """
71
71
72 self.operations.append(
72 self.operations.append(
73 (operation, conf.type, conf.id, conf.getKwargs()))
73 (operation, conf.type, conf.id, conf.getKwargs()))
74
74
75 if 'plot' in self.name.lower():
75 if 'plot' in self.name.lower():
76 self.plots.append(operation.CODE)
76 self.plots.append(operation.CODE)
77
77
78 def getOperationObj(self, objId):
78 def getOperationObj(self, objId):
79
79
80 if objId not in list(self.operations.keys()):
80 if objId not in list(self.operations.keys()):
81 return None
81 return None
82
82
83 return self.operations[objId]
83 return self.operations[objId]
84
84
85 def operation(self, **kwargs):
85 def operation(self, **kwargs):
86 """
86 """
87 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
87 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
88 atributos del objeto dataOut
88 atributos del objeto dataOut
89
89
90 Input:
90 Input:
91
91
92 **kwargs : Diccionario de argumentos de la funcion a ejecutar
92 **kwargs : Diccionario de argumentos de la funcion a ejecutar
93 """
93 """
94
94
95 raise NotImplementedError
95 raise NotImplementedError
96
96
97 def setup(self):
97 def setup(self):
98
98
99 raise NotImplementedError
99 raise NotImplementedError
100
100
101 def run(self):
101 def run(self):
102
102
103 raise NotImplementedError
103 raise NotImplementedError
104
104
105 def close(self):
105 def close(self):
106
106
107 return
107 return
108
108
109
109
110 class Operation(object):
110 class Operation(object):
111
111
112 """
112 """
113 Update - Jan 2018 - MULTIPROCESSING
113 Update - Jan 2018 - MULTIPROCESSING
114
114
115 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
115 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
116 The constructor doe snot receive any argument, neither the baseclass.
116 The constructor doe snot receive any argument, neither the baseclass.
117
117
118
118
119 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
119 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
120 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
120 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
121 acumulacion dentro de esta clase
121 acumulacion dentro de esta clase
122
122
123 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
123 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
124
124
125 """
125 """
126
126
127 def __init__(self):
127 def __init__(self):
128
128
129 self.id = None
129 self.id = None
130 self.isConfig = False
130 self.isConfig = False
131
131
132 if not hasattr(self, 'name'):
132 if not hasattr(self, 'name'):
133 self.name = self.__class__.__name__
133 self.name = self.__class__.__name__
134
134
135 def getAllowedArgs(self):
135 def getAllowedArgs(self):
136 if hasattr(self, '__attrs__'):
136 if hasattr(self, '__attrs__'):
137 return self.__attrs__
137 return self.__attrs__
138 else:
138 else:
139 return inspect.getargspec(self.run).args
139 return inspect.getargspec(self.run).args
140
140
141 def setup(self):
141 def setup(self):
142
142
143 self.isConfig = True
143 self.isConfig = True
144
144
145 raise NotImplementedError
145 raise NotImplementedError
146
146
147 def run(self, dataIn, **kwargs):
147 def run(self, dataIn, **kwargs):
148 """
148 """
149 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
149 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
150 atributos del objeto dataIn.
150 atributos del objeto dataIn.
151
151
152 Input:
152 Input:
153
153
154 dataIn : objeto del tipo JROData
154 dataIn : objeto del tipo JROData
155
155
156 Return:
156 Return:
157
157
158 None
158 None
159
159
160 Affected:
160 Affected:
161 __buffer : buffer de recepcion de datos.
161 __buffer : buffer de recepcion de datos.
162
162
163 """
163 """
164 if not self.isConfig:
164 if not self.isConfig:
165 self.setup(**kwargs)
165 self.setup(**kwargs)
166
166
167 raise NotImplementedError
167 raise NotImplementedError
168
168
169 def close(self):
169 def close(self):
170
170
171 return
171 return
172
172
173
173
174 def MPDecorator(BaseClass):
174 def MPDecorator(BaseClass):
175 """
175 """
176 Multiprocessing class decorator
176 Multiprocessing class decorator
177
177
178 This function add multiprocessing features to a BaseClass. Also, it handle
178 This function add multiprocessing features to a BaseClass. Also, it handle
179 the communication beetween processes (readers, procUnits and operations).
179 the communication beetween processes (readers, procUnits and operations).
180 """
180 """
181
181
182 class MPClass(BaseClass, Process):
182 class MPClass(BaseClass, Process):
183
183
184 def __init__(self, *args, **kwargs):
184 def __init__(self, *args, **kwargs):
185 super(MPClass, self).__init__()
185 super(MPClass, self).__init__()
186 Process.__init__(self)
186 Process.__init__(self)
187 self.operationKwargs = {}
187 self.operationKwargs = {}
188 self.args = args
188 self.args = args
189 self.kwargs = kwargs
189 self.kwargs = kwargs
190 self.sender = None
190 self.sender = None
191 self.receiver = None
191 self.receiver = None
192 self.i = 0
192 self.name = BaseClass.__name__
193 self.name = BaseClass.__name__
193 if 'plot' in self.name.lower() and not self.name.endswith('_'):
194 if 'plot' in self.name.lower() and not self.name.endswith('_'):
194 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
195 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
195 self.start_time = time.time()
196 self.start_time = time.time()
196
197
197 if len(self.args) is 3:
198 if len(self.args) is 3:
198 self.typeProc = "ProcUnit"
199 self.typeProc = "ProcUnit"
199 self.id = args[0]
200 self.id = args[0]
200 self.inputId = args[1]
201 self.inputId = args[1]
201 self.project_id = args[2]
202 self.project_id = args[2]
202 elif len(self.args) is 2:
203 elif len(self.args) is 2:
203 self.id = args[0]
204 self.id = args[0]
204 self.inputId = args[0]
205 self.inputId = args[0]
205 self.project_id = args[1]
206 self.project_id = args[1]
206 self.typeProc = "Operation"
207 self.typeProc = "Operation"
207
208
209 def fix_publish(self,valor,multiple1):
210 return True if valor%multiple1 ==0 else False
211
208 def subscribe(self):
212 def subscribe(self):
209 '''
213 '''
210 This function create a socket to receive objects from the
214 This function create a socket to receive objects from the
211 topic `inputId`.
215 topic `inputId`.
212 '''
216 '''
213
217
214 c = zmq.Context()
218 c = zmq.Context()
215 self.receiver = c.socket(zmq.SUB)
219 self.receiver = c.socket(zmq.SUB)
216 self.receiver.connect(
220 self.receiver.connect(
217 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
221 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
218 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
222 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
219
223
220 def listen(self):
224 def listen(self):
221 '''
225 '''
222 This function waits for objects and deserialize using pickle
226 This function waits for objects and deserialize using pickle
223 '''
227 '''
224
228 try:
225 data = pickle.loads(self.receiver.recv_multipart()[1])
229 data = pickle.loads(self.receiver.recv_multipart()[1])
230 except zmq.ZMQError as e:
231 if e.errno == zmq.ETERM:
232 print (e.errno)
226
233
227 return data
234 return data
228
235
229 def set_publisher(self):
236 def set_publisher(self):
230 '''
237 '''
231 This function create a socket for publishing purposes.
238 This function create a socket for publishing purposes.
232 '''
239 '''
233
240
234 time.sleep(1)
241 time.sleep(1)
235 c = zmq.Context()
242 c = zmq.Context()
236 self.sender = c.socket(zmq.PUB)
243 self.sender = c.socket(zmq.PUB)
237 self.sender.connect(
244 self.sender.connect(
238 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
245 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
239
246
240 def publish(self, data, id):
247 def publish(self, data, id):
241 '''
248 '''
242 This function publish an object, to a specific topic.
249 This function publish an object, to a specific topic.
250 The fix method only affect inputId None which is Read Unit
251 Use value between 64 80, you should notice a little retard in processing
243 '''
252 '''
253 if self.inputId is None:
254 self.i+=1
255 if self.fix_publish(self.i,80) == True:# value n
256 time.sleep(0.01)
257
244 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
258 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
245
259
246 def runReader(self):
260 def runReader(self):
247 '''
261 '''
248 Run fuction for read units
262 Run fuction for read units
249 '''
263 '''
250 while True:
264 while True:
251
265
252 BaseClass.run(self, **self.kwargs)
266 BaseClass.run(self, **self.kwargs)
253
267
254 for op, optype, opId, kwargs in self.operations:
268 for op, optype, opId, kwargs in self.operations:
255 if optype == 'self' and not self.dataOut.flagNoData:
269 if optype == 'self' and not self.dataOut.flagNoData:
256 op(**kwargs)
270 op(**kwargs)
257 elif optype == 'other' and not self.dataOut.flagNoData:
271 elif optype == 'other' and not self.dataOut.flagNoData:
258 self.dataOut = op.run(self.dataOut, **self.kwargs)
272 self.dataOut = op.run(self.dataOut, **self.kwargs)
259 elif optype == 'external':
273 elif optype == 'external':
260 self.publish(self.dataOut, opId)
274 self.publish(self.dataOut, opId)
261
275
262 if self.dataOut.flagNoData and not self.dataOut.error:
276 if self.dataOut.flagNoData and not self.dataOut.error:
263 continue
277 continue
264
278
265 self.publish(self.dataOut, self.id)
279 self.publish(self.dataOut, self.id)
266
280
267 if self.dataOut.error:
281 if self.dataOut.error:
268 log.error(self.dataOut.error, self.name)
282 log.error(self.dataOut.error, self.name)
269 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
283 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
270 break
284 break
271
285
272 time.sleep(1)
286 time.sleep(1)
273
287
274 def runProc(self):
288 def runProc(self):
275 '''
289 '''
276 Run function for proccessing units
290 Run function for proccessing units
277 '''
291 '''
278
292
279 while True:
293 while True:
280 self.dataIn = self.listen()
294 self.dataIn = self.listen()
281
295
282 if self.dataIn.flagNoData and self.dataIn.error is None:
296 if self.dataIn.flagNoData and self.dataIn.error is None:
283 continue
297 continue
284
298
285 BaseClass.run(self, **self.kwargs)
299 BaseClass.run(self, **self.kwargs)
286
300
287 if self.dataIn.error:
301 if self.dataIn.error:
288 self.dataOut.error = self.dataIn.error
302 self.dataOut.error = self.dataIn.error
289 self.dataOut.flagNoData = True
303 self.dataOut.flagNoData = True
290
304
291 for op, optype, opId, kwargs in self.operations:
305 for op, optype, opId, kwargs in self.operations:
292 if optype == 'self' and not self.dataOut.flagNoData:
306 if optype == 'self' and not self.dataOut.flagNoData:
293 op(**kwargs)
307 op(**kwargs)
294 elif optype == 'other' and not self.dataOut.flagNoData:
308 elif optype == 'other' and not self.dataOut.flagNoData:
295 self.dataOut = op.run(self.dataOut, **kwargs)
309 self.dataOut = op.run(self.dataOut, **kwargs)
296 elif optype == 'external' and not self.dataOut.flagNoData:
310 elif optype == 'external' and not self.dataOut.flagNoData:
297 self.publish(self.dataOut, opId)
311 self.publish(self.dataOut, opId)
298
312
299 if not self.dataOut.flagNoData or self.dataOut.error:
313 if not self.dataOut.flagNoData or self.dataOut.error:
300 self.publish(self.dataOut, self.id)
314 self.publish(self.dataOut, self.id)
301 for op, optype, opId, kwargs in self.operations:
315 for op, optype, opId, kwargs in self.operations:
302 if optype == 'self' and self.dataOut.error:
316 if optype == 'self' and self.dataOut.error:
303 op(**kwargs)
317 op(**kwargs)
304 elif optype == 'other' and self.dataOut.error:
318 elif optype == 'other' and self.dataOut.error:
305 self.dataOut = op.run(self.dataOut, **kwargs)
319 self.dataOut = op.run(self.dataOut, **kwargs)
306 elif optype == 'external' and self.dataOut.error:
320 elif optype == 'external' and self.dataOut.error:
307 self.publish(self.dataOut, opId)
321 self.publish(self.dataOut, opId)
308
322
309 if self.dataIn.error:
323 if self.dataIn.error:
310 break
324 break
311
325
312 time.sleep(1)
326 time.sleep(1)
313
327
314 def runOp(self):
328 def runOp(self):
315 '''
329 '''
316 Run function for external operations (this operations just receive data
330 Run function for external operations (this operations just receive data
317 ex: plots, writers, publishers)
331 ex: plots, writers, publishers)
318 '''
332 '''
319
333
320 while True:
334 while True:
321
335
322 dataOut = self.listen()
336 dataOut = self.listen()
323
337
324 BaseClass.run(self, dataOut, **self.kwargs)
338 BaseClass.run(self, dataOut, **self.kwargs)
325
339
326 if dataOut.error:
340 if dataOut.error:
327 break
341 break
328
342
329 time.sleep(1)
343 time.sleep(1)
330
344
331 def run(self):
345 def run(self):
332 if self.typeProc is "ProcUnit":
346 if self.typeProc is "ProcUnit":
333
347
334 if self.inputId is not None:
348 if self.inputId is not None:
335
349
336 self.subscribe()
350 self.subscribe()
337
351
338 self.set_publisher()
352 self.set_publisher()
339
353
340 if 'Reader' not in BaseClass.__name__:
354 if 'Reader' not in BaseClass.__name__:
341 self.runProc()
355 self.runProc()
342 else:
356 else:
343 self.runReader()
357 self.runReader()
344
358
345 elif self.typeProc is "Operation":
359 elif self.typeProc is "Operation":
346
360
347 self.subscribe()
361 self.subscribe()
348 self.runOp()
362 self.runOp()
349
363
350 else:
364 else:
351 raise ValueError("Unknown type")
365 raise ValueError("Unknown type")
352
366
353 self.close()
367 self.close()
354
368
355 def event_monitor(self, monitor):
369 def event_monitor(self, monitor):
356
370
357 events = {}
371 events = {}
358
372
359 for name in dir(zmq):
373 for name in dir(zmq):
360 if name.startswith('EVENT_'):
374 if name.startswith('EVENT_'):
361 value = getattr(zmq, name)
375 value = getattr(zmq, name)
362 events[value] = name
376 events[value] = name
363
377
364 while monitor.poll():
378 while monitor.poll():
365 evt = recv_monitor_message(monitor)
379 evt = recv_monitor_message(monitor)
366 if evt['event'] == 32:
380 if evt['event'] == 32:
367 self.connections += 1
381 self.connections += 1
368 if evt['event'] == 512:
382 if evt['event'] == 512:
369 pass
383 pass
370
384
371 evt.update({'description': events[evt['event']]})
385 evt.update({'description': events[evt['event']]})
372
386
373 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
387 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
374 break
388 break
375 monitor.close()
389 monitor.close()
376 print('event monitor thread done!')
390 print('event monitor thread done!')
377
391
378 def close(self):
392 def close(self):
379
393
380 BaseClass.close(self)
394 BaseClass.close(self)
381
395
382 if self.sender:
396 if self.sender:
383 self.sender.close()
397 self.sender.close()
384
398
385 if self.receiver:
399 if self.receiver:
386 self.receiver.close()
400 self.receiver.close()
387
401
388 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
402 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
389
403
390 return MPClass
404 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now