##// END OF EJS Templates
Fix MPDecorator (check flagNoData for all type of operations)
Juan C. Espinoza -
r1198:f0bf03007c43
parent child
Show More
@@ -1,382 +1,385
1 '''
1 '''
2 Updated for multiprocessing
2 Updated for multiprocessing
3 Author : Sergio Cortez
3 Author : Sergio Cortez
4 Jan 2018
4 Jan 2018
5 Abstract:
5 Abstract:
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9
9
10 Based on:
10 Based on:
11 $Author: murco $
11 $Author: murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 '''
13 '''
14
14
15 import inspect
15 import inspect
16 import zmq
16 import zmq
17 import time
17 import time
18 import pickle
18 import pickle
19 import os
19 import os
20 from multiprocessing import Process
20 from multiprocessing import Process
21 from zmq.utils.monitor import recv_monitor_message
21 from zmq.utils.monitor import recv_monitor_message
22
22
23 from schainpy.utils import log
23 from schainpy.utils import log
24
24
25
25
26 class ProcessingUnit(object):
26 class ProcessingUnit(object):
27
27
28 """
28 """
29 Update - Jan 2018 - MULTIPROCESSING
29 Update - Jan 2018 - MULTIPROCESSING
30 All the "call" methods present in the previous base were removed.
30 All the "call" methods present in the previous base were removed.
31 The majority of operations are independant processes, thus
31 The majority of operations are independant processes, thus
32 the decorator is in charge of communicate the operation processes
32 the decorator is in charge of communicate the operation processes
33 with the proccessing unit via IPC.
33 with the proccessing unit via IPC.
34
34
35 The constructor does not receive any argument. The remaining methods
35 The constructor does not receive any argument. The remaining methods
36 are related with the operations to execute.
36 are related with the operations to execute.
37
37
38
38
39 """
39 """
40
40
41 def __init__(self):
41 def __init__(self):
42
42
43 self.dataIn = None
43 self.dataIn = None
44 self.dataOut = None
44 self.dataOut = None
45 self.isConfig = False
45 self.isConfig = False
46 self.operations = []
46 self.operations = []
47 self.plots = []
47 self.plots = []
48
48
49 def getAllowedArgs(self):
49 def getAllowedArgs(self):
50 if hasattr(self, '__attrs__'):
50 if hasattr(self, '__attrs__'):
51 return self.__attrs__
51 return self.__attrs__
52 else:
52 else:
53 return inspect.getargspec(self.run).args
53 return inspect.getargspec(self.run).args
54
54
55 def addOperation(self, conf, operation):
55 def addOperation(self, conf, operation):
56 """
56 """
57 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
57 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
58 posses the id of the operation process (IPC purposes)
58 posses the id of the operation process (IPC purposes)
59
59
60 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
60 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
61 identificador asociado a este objeto.
61 identificador asociado a este objeto.
62
62
63 Input:
63 Input:
64
64
65 object : objeto de la clase "Operation"
65 object : objeto de la clase "Operation"
66
66
67 Return:
67 Return:
68
68
69 objId : identificador del objeto, necesario para comunicar con master(procUnit)
69 objId : identificador del objeto, necesario para comunicar con master(procUnit)
70 """
70 """
71
71
72 self.operations.append(
72 self.operations.append(
73 (operation, conf.type, conf.id, conf.getKwargs()))
73 (operation, conf.type, conf.id, conf.getKwargs()))
74
74
75 if 'plot' in self.name.lower():
75 if 'plot' in self.name.lower():
76 self.plots.append(operation.CODE)
76 self.plots.append(operation.CODE)
77
77
78 def getOperationObj(self, objId):
78 def getOperationObj(self, objId):
79
79
80 if objId not in list(self.operations.keys()):
80 if objId not in list(self.operations.keys()):
81 return None
81 return None
82
82
83 return self.operations[objId]
83 return self.operations[objId]
84
84
85 def operation(self, **kwargs):
85 def operation(self, **kwargs):
86 """
86 """
87 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
87 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
88 atributos del objeto dataOut
88 atributos del objeto dataOut
89
89
90 Input:
90 Input:
91
91
92 **kwargs : Diccionario de argumentos de la funcion a ejecutar
92 **kwargs : Diccionario de argumentos de la funcion a ejecutar
93 """
93 """
94
94
95 raise NotImplementedError
95 raise NotImplementedError
96
96
97 def setup(self):
97 def setup(self):
98
98
99 raise NotImplementedError
99 raise NotImplementedError
100
100
101 def run(self):
101 def run(self):
102
102
103 raise NotImplementedError
103 raise NotImplementedError
104
104
105 def close(self):
105 def close(self):
106
106
107 return
107 return
108
108
109
109
110 class Operation(object):
110 class Operation(object):
111
111
112 """
112 """
113 Update - Jan 2018 - MULTIPROCESSING
113 Update - Jan 2018 - MULTIPROCESSING
114
114
115 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
115 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
116 The constructor doe snot receive any argument, neither the baseclass.
116 The constructor doe snot receive any argument, neither the baseclass.
117
117
118
118
119 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
119 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
120 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
120 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
121 acumulacion dentro de esta clase
121 acumulacion dentro de esta clase
122
122
123 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
123 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
124
124
125 """
125 """
126
126
127 def __init__(self):
127 def __init__(self):
128
128
129 self.id = None
129 self.id = None
130 self.isConfig = False
130 self.isConfig = False
131
131
132 if not hasattr(self, 'name'):
132 if not hasattr(self, 'name'):
133 self.name = self.__class__.__name__
133 self.name = self.__class__.__name__
134
134
135 def getAllowedArgs(self):
135 def getAllowedArgs(self):
136 if hasattr(self, '__attrs__'):
136 if hasattr(self, '__attrs__'):
137 return self.__attrs__
137 return self.__attrs__
138 else:
138 else:
139 return inspect.getargspec(self.run).args
139 return inspect.getargspec(self.run).args
140
140
141 def setup(self):
141 def setup(self):
142
142
143 self.isConfig = True
143 self.isConfig = True
144
144
145 raise NotImplementedError
145 raise NotImplementedError
146
146
147 def run(self, dataIn, **kwargs):
147 def run(self, dataIn, **kwargs):
148 """
148 """
149 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
149 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
150 atributos del objeto dataIn.
150 atributos del objeto dataIn.
151
151
152 Input:
152 Input:
153
153
154 dataIn : objeto del tipo JROData
154 dataIn : objeto del tipo JROData
155
155
156 Return:
156 Return:
157
157
158 None
158 None
159
159
160 Affected:
160 Affected:
161 __buffer : buffer de recepcion de datos.
161 __buffer : buffer de recepcion de datos.
162
162
163 """
163 """
164 if not self.isConfig:
164 if not self.isConfig:
165 self.setup(**kwargs)
165 self.setup(**kwargs)
166
166
167 raise NotImplementedError
167 raise NotImplementedError
168
168
169 def close(self):
169 def close(self):
170
170
171 return
171 return
172
172
173
173
174 def MPDecorator(BaseClass):
174 def MPDecorator(BaseClass):
175 """
175 """
176 Multiprocessing class decorator
176 Multiprocessing class decorator
177
177
178 This function add multiprocessing features to a BaseClass. Also, it handle
178 This function add multiprocessing features to a BaseClass. Also, it handle
179 the communication beetween processes (readers, procUnits and operations).
179 the communication beetween processes (readers, procUnits and operations).
180 """
180 """
181
181
182 class MPClass(BaseClass, Process):
182 class MPClass(BaseClass, Process):
183
183
184 def __init__(self, *args, **kwargs):
184 def __init__(self, *args, **kwargs):
185 super(MPClass, self).__init__()
185 super(MPClass, self).__init__()
186 Process.__init__(self)
186 Process.__init__(self)
187 self.operationKwargs = {}
187 self.operationKwargs = {}
188 self.args = args
188 self.args = args
189 self.kwargs = kwargs
189 self.kwargs = kwargs
190 self.sender = None
190 self.sender = None
191 self.receiver = None
191 self.receiver = None
192 self.name = BaseClass.__name__
192 self.name = BaseClass.__name__
193 if 'plot' in self.name.lower():
193 if 'plot' in self.name.lower():
194 if not self.name.endswith('_'):
194 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
195 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
195 self.start_time = time.time()
196 self.start_time = time.time()
196
197
197 if len(self.args) is 3:
198 if len(self.args) is 3:
198 self.typeProc = "ProcUnit"
199 self.typeProc = "ProcUnit"
199 self.id = args[0]
200 self.id = args[0]
200 self.inputId = args[1]
201 self.inputId = args[1]
201 self.project_id = args[2]
202 self.project_id = args[2]
202 elif len(self.args) is 2:
203 elif len(self.args) is 2:
203 self.id = args[0]
204 self.id = args[0]
204 self.inputId = args[0]
205 self.inputId = args[0]
205 self.project_id = args[1]
206 self.project_id = args[1]
206 self.typeProc = "Operation"
207 self.typeProc = "Operation"
207
208
208 def subscribe(self):
209 def subscribe(self):
209 '''
210 '''
210 This function create a socket to receive objects from the
211 This function create a socket to receive objects from the
211 topic `inputId`.
212 topic `inputId`.
212 '''
213 '''
213
214
214 c = zmq.Context()
215 c = zmq.Context()
215 self.receiver = c.socket(zmq.SUB)
216 self.receiver = c.socket(zmq.SUB)
216 self.receiver.connect(
217 self.receiver.connect(
217 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
218 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
218 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
219 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
219
220
220 def listen(self):
221 def listen(self):
221 '''
222 '''
222 This function waits for objects and deserialize using pickle
223 This function waits for objects and deserialize using pickle
223 '''
224 '''
224
225
225 data = pickle.loads(self.receiver.recv_multipart()[1])
226 data = pickle.loads(self.receiver.recv_multipart()[1])
226
227
227 return data
228 return data
228
229
229 def set_publisher(self):
230 def set_publisher(self):
230 '''
231 '''
231 This function create a socket for publishing purposes.
232 This function create a socket for publishing purposes.
232 '''
233 '''
233
234
234 time.sleep(1)
235 time.sleep(1)
235 c = zmq.Context()
236 c = zmq.Context()
236 self.sender = c.socket(zmq.PUB)
237 self.sender = c.socket(zmq.PUB)
237 self.sender.connect(
238 self.sender.connect(
238 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
239 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
239
240
240 def publish(self, data, id):
241 def publish(self, data, id):
241 '''
242 '''
242 This function publish an object, to a specific topic.
243 This function publish an object, to a specific topic.
243 '''
244 '''
244 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
245 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
245
246
246 def runReader(self):
247 def runReader(self):
247 '''
248 '''
248 Run fuction for read units
249 Run fuction for read units
249 '''
250 '''
250 while True:
251 while True:
251
252
252 BaseClass.run(self, **self.kwargs)
253 BaseClass.run(self, **self.kwargs)
253
254
254 for op, optype, opId, kwargs in self.operations:
255 for op, optype, opId, kwargs in self.operations:
255 if optype == 'self':
256 if optype == 'self' and not self.dataOut.flagNoData:
256 op(**kwargs)
257 op(**kwargs)
257 elif optype == 'other':
258 elif optype == 'other' and not self.dataOut.flagNoData:
258 self.dataOut = op.run(self.dataOut, **self.kwargs)
259 self.dataOut = op.run(self.dataOut, **self.kwargs)
259 elif optype == 'external':
260 elif optype == 'external':
260 self.publish(self.dataOut, opId)
261 self.publish(self.dataOut, opId)
261
262
262 if self.dataOut.flagNoData and not self.dataOut.error:
263 if self.dataOut.flagNoData and not self.dataOut.error:
263 continue
264 continue
264
265
265 self.publish(self.dataOut, self.id)
266 self.publish(self.dataOut, self.id)
266
267
267 if self.dataOut.error:
268 if self.dataOut.error:
268 log.error(self.dataOut.error, self.name)
269 log.error(self.dataOut.error, self.name)
269 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
270 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
270 break
271 break
271
272
272 time.sleep(1)
273 time.sleep(1)
273
274
274 def runProc(self):
275 def runProc(self):
275 '''
276 '''
276 Run function for proccessing units
277 Run function for proccessing units
277 '''
278 '''
278
279
279 while True:
280 while True:
280 self.dataIn = self.listen()
281 self.dataIn = self.listen()
281
282
282 if self.dataIn.flagNoData and self.dataIn.error is None:
283 if self.dataIn.flagNoData and self.dataIn.error is None:
283 continue
284 continue
284
285
285 BaseClass.run(self, **self.kwargs)
286 BaseClass.run(self, **self.kwargs)
286
287
287 if self.dataIn.error:
288 if self.dataIn.error:
288 self.dataOut.error = self.dataIn.error
289 self.dataOut.error = self.dataIn.error
289 self.dataOut.flagNoData = True
290 self.dataOut.flagNoData = True
290
291
291 for op, optype, opId, kwargs in self.operations:
292 for op, optype, opId, kwargs in self.operations:
292 if optype == 'self':
293 if optype == 'self' and not self.dataOut.flagNoData:
293 op(**kwargs)
294 op(**kwargs)
294 elif optype == 'other':
295 elif optype == 'other' and not self.dataOut.flagNoData:
295 self.dataOut = op.run(self.dataOut, **kwargs)
296 self.dataOut = op.run(self.dataOut, **kwargs)
296 elif optype == 'external':
297 elif optype == 'external':
297 if not self.dataOut.flagNoData or self.dataOut.error:
298 if not self.dataOut.flagNoData or self.dataOut.error:
298 self.publish(self.dataOut, opId)
299 self.publish(self.dataOut, opId)
299
300
301 if not self.dataOut.flagNoData or self.dataOut.error:
300 self.publish(self.dataOut, self.id)
302 self.publish(self.dataOut, self.id)
303
301 if self.dataIn.error:
304 if self.dataIn.error:
302 break
305 break
303
306
304 time.sleep(1)
307 time.sleep(1)
305
308
306 def runOp(self):
309 def runOp(self):
307 '''
310 '''
308 Run function for external operations (this operations just receive data
311 Run function for external operations (this operations just receive data
309 ex: plots, writers, publishers)
312 ex: plots, writers, publishers)
310 '''
313 '''
311
314
312 while True:
315 while True:
313
316
314 dataOut = self.listen()
317 dataOut = self.listen()
315
318
316 BaseClass.run(self, dataOut, **self.kwargs)
319 BaseClass.run(self, dataOut, **self.kwargs)
317
320
318 if dataOut.error:
321 if dataOut.error:
319 break
322 break
320
323
321 time.sleep(1)
324 time.sleep(1)
322
325
323 def run(self):
326 def run(self):
324 if self.typeProc is "ProcUnit":
327 if self.typeProc is "ProcUnit":
325
328
326 if self.inputId is not None:
329 if self.inputId is not None:
327
330
328 self.subscribe()
331 self.subscribe()
329
332
330 self.set_publisher()
333 self.set_publisher()
331
334
332 if 'Reader' not in BaseClass.__name__:
335 if 'Reader' not in BaseClass.__name__:
333 self.runProc()
336 self.runProc()
334 else:
337 else:
335 self.runReader()
338 self.runReader()
336
339
337 elif self.typeProc is "Operation":
340 elif self.typeProc is "Operation":
338
341
339 self.subscribe()
342 self.subscribe()
340 self.runOp()
343 self.runOp()
341
344
342 else:
345 else:
343 raise ValueError("Unknown type")
346 raise ValueError("Unknown type")
344
347
345 self.close()
348 self.close()
346
349
347 def event_monitor(self, monitor):
350 def event_monitor(self, monitor):
348
351
349 events = {}
352 events = {}
350
353
351 for name in dir(zmq):
354 for name in dir(zmq):
352 if name.startswith('EVENT_'):
355 if name.startswith('EVENT_'):
353 value = getattr(zmq, name)
356 value = getattr(zmq, name)
354 events[value] = name
357 events[value] = name
355
358
356 while monitor.poll():
359 while monitor.poll():
357 evt = recv_monitor_message(monitor)
360 evt = recv_monitor_message(monitor)
358 if evt['event'] == 32:
361 if evt['event'] == 32:
359 self.connections += 1
362 self.connections += 1
360 if evt['event'] == 512:
363 if evt['event'] == 512:
361 pass
364 pass
362
365
363 evt.update({'description': events[evt['event']]})
366 evt.update({'description': events[evt['event']]})
364
367
365 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
368 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
366 break
369 break
367 monitor.close()
370 monitor.close()
368 print('event monitor thread done!')
371 print('event monitor thread done!')
369
372
370 def close(self):
373 def close(self):
371
374
372 BaseClass.close(self)
375 BaseClass.close(self)
373
376
374 if self.sender:
377 if self.sender:
375 self.sender.close()
378 self.sender.close()
376
379
377 if self.receiver:
380 if self.receiver:
378 self.receiver.close()
381 self.receiver.close()
379
382
380 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
383 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
381
384
382 return MPClass
385 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now