##// END OF EJS Templates
Add input queue again :(
jespinoza -
r1250:3296460acfb3
parent child
Show More
@@ -1,385 +1,430
1 '''
1 '''
2 Updated for multiprocessing
2 Updated for multiprocessing
3 Author : Sergio Cortez
3 Author : Sergio Cortez
4 Jan 2018
4 Jan 2018
5 Abstract:
5 Abstract:
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9
9
10 Based on:
10 Based on:
11 $Author: murco $
11 $Author: murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 '''
13 '''
14
14
15 import os
15 import os
16 import inspect
16 import inspect
17 import zmq
17 import zmq
18 import time
18 import time
19 import pickle
19 import pickle
20 import traceback
20 import traceback
21 from queue import Queue
21 from queue import Queue
22 from threading import Thread
22 from threading import Thread
23 from multiprocessing import Process
23 from multiprocessing import Process
24
24
25 from schainpy.utils import log
25 from schainpy.utils import log
26
26
27
27
28 class ProcessingUnit(object):
28 class ProcessingUnit(object):
29
29
30 """
30 """
31 Update - Jan 2018 - MULTIPROCESSING
31 Update - Jan 2018 - MULTIPROCESSING
32 All the "call" methods present in the previous base were removed.
32 All the "call" methods present in the previous base were removed.
33 The majority of operations are independant processes, thus
33 The majority of operations are independant processes, thus
34 the decorator is in charge of communicate the operation processes
34 the decorator is in charge of communicate the operation processes
35 with the proccessing unit via IPC.
35 with the proccessing unit via IPC.
36
36
37 The constructor does not receive any argument. The remaining methods
37 The constructor does not receive any argument. The remaining methods
38 are related with the operations to execute.
38 are related with the operations to execute.
39
39
40
40
41 """
41 """
42
42
43 def __init__(self):
43 def __init__(self):
44
44
45 self.dataIn = None
45 self.dataIn = None
46 self.dataOut = None
46 self.dataOut = None
47 self.isConfig = False
47 self.isConfig = False
48 self.operations = []
48 self.operations = []
49 self.plots = []
49 self.plots = []
50
50
51 def getAllowedArgs(self):
51 def getAllowedArgs(self):
52 if hasattr(self, '__attrs__'):
52 if hasattr(self, '__attrs__'):
53 return self.__attrs__
53 return self.__attrs__
54 else:
54 else:
55 return inspect.getargspec(self.run).args
55 return inspect.getargspec(self.run).args
56
56
57 def addOperation(self, conf, operation):
57 def addOperation(self, conf, operation):
58 """
58 """
59 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
59 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
60 posses the id of the operation process (IPC purposes)
60 posses the id of the operation process (IPC purposes)
61
61
62 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
62 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
63 identificador asociado a este objeto.
63 identificador asociado a este objeto.
64
64
65 Input:
65 Input:
66
66
67 object : objeto de la clase "Operation"
67 object : objeto de la clase "Operation"
68
68
69 Return:
69 Return:
70
70
71 objId : identificador del objeto, necesario para comunicar con master(procUnit)
71 objId : identificador del objeto, necesario para comunicar con master(procUnit)
72 """
72 """
73
73
74 self.operations.append(
74 self.operations.append(
75 (operation, conf.type, conf.id, conf.getKwargs()))
75 (operation, conf.type, conf.id, conf.getKwargs()))
76
76
77 if 'plot' in self.name.lower():
77 if 'plot' in self.name.lower():
78 self.plots.append(operation.CODE)
78 self.plots.append(operation.CODE)
79
79
80 def getOperationObj(self, objId):
80 def getOperationObj(self, objId):
81
81
82 if objId not in list(self.operations.keys()):
82 if objId not in list(self.operations.keys()):
83 return None
83 return None
84
84
85 return self.operations[objId]
85 return self.operations[objId]
86
86
87 def operation(self, **kwargs):
87 def operation(self, **kwargs):
88 """
88 """
89 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
89 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
90 atributos del objeto dataOut
90 atributos del objeto dataOut
91
91
92 Input:
92 Input:
93
93
94 **kwargs : Diccionario de argumentos de la funcion a ejecutar
94 **kwargs : Diccionario de argumentos de la funcion a ejecutar
95 """
95 """
96
96
97 raise NotImplementedError
97 raise NotImplementedError
98
98
99 def setup(self):
99 def setup(self):
100
100
101 raise NotImplementedError
101 raise NotImplementedError
102
102
103 def run(self):
103 def run(self):
104
104
105 raise NotImplementedError
105 raise NotImplementedError
106
106
107 def close(self):
107 def close(self):
108
108
109 return
109 return
110
110
111
111
112 class Operation(object):
112 class Operation(object):
113
113
114 """
114 """
115 Update - Jan 2018 - MULTIPROCESSING
115 Update - Jan 2018 - MULTIPROCESSING
116
116
117 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
117 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
118 The constructor doe snot receive any argument, neither the baseclass.
118 The constructor doe snot receive any argument, neither the baseclass.
119
119
120
120
121 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
121 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
122 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
122 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
123 acumulacion dentro de esta clase
123 acumulacion dentro de esta clase
124
124
125 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
125 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
126
126
127 """
127 """
128
128
129 def __init__(self):
129 def __init__(self):
130
130
131 self.id = None
131 self.id = None
132 self.isConfig = False
132 self.isConfig = False
133
133
134 if not hasattr(self, 'name'):
134 if not hasattr(self, 'name'):
135 self.name = self.__class__.__name__
135 self.name = self.__class__.__name__
136
136
137 def getAllowedArgs(self):
137 def getAllowedArgs(self):
138 if hasattr(self, '__attrs__'):
138 if hasattr(self, '__attrs__'):
139 return self.__attrs__
139 return self.__attrs__
140 else:
140 else:
141 return inspect.getargspec(self.run).args
141 return inspect.getargspec(self.run).args
142
142
143 def setup(self):
143 def setup(self):
144
144
145 self.isConfig = True
145 self.isConfig = True
146
146
147 raise NotImplementedError
147 raise NotImplementedError
148
148
149 def run(self, dataIn, **kwargs):
149 def run(self, dataIn, **kwargs):
150 """
150 """
151 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
151 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
152 atributos del objeto dataIn.
152 atributos del objeto dataIn.
153
153
154 Input:
154 Input:
155
155
156 dataIn : objeto del tipo JROData
156 dataIn : objeto del tipo JROData
157
157
158 Return:
158 Return:
159
159
160 None
160 None
161
161
162 Affected:
162 Affected:
163 __buffer : buffer de recepcion de datos.
163 __buffer : buffer de recepcion de datos.
164
164
165 """
165 """
166 if not self.isConfig:
166 if not self.isConfig:
167 self.setup(**kwargs)
167 self.setup(**kwargs)
168
168
169 raise NotImplementedError
169 raise NotImplementedError
170
170
171 def close(self):
171 def close(self):
172
172
173 return
173 return
174
174
175 class InputQueue(Thread):
176
177 '''
178
179 Class to hold input data for Proccessing Units and external Operations,
180
181 '''
182
183
184
185 def __init__(self, project_id, inputId):
186
187 Thread.__init__(self)
188
189 self.queue = Queue()
190
191 self.project_id = project_id
192
193 self.inputId = inputId
194
195
196
197 def run(self):
198
199
200
201 c = zmq.Context()
202
203 self.receiver = c.socket(zmq.SUB)
204
205 self.receiver.connect(
206
207 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
208
209 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
210
211
212
213 while True:
214
215 self.queue.put(self.receiver.recv_multipart()[1])
216
217
218
219 def get(self):
220
221
222
223 return pickle.loads(self.queue.get())
224
225
175
226
176 def MPDecorator(BaseClass):
227 def MPDecorator(BaseClass):
177 """
228 """
178 Multiprocessing class decorator
229 Multiprocessing class decorator
179
230
180 This function add multiprocessing features to a BaseClass. Also, it handle
231 This function add multiprocessing features to a BaseClass. Also, it handle
181 the communication beetween processes (readers, procUnits and operations).
232 the communication beetween processes (readers, procUnits and operations).
182 """
233 """
183
234
184 class MPClass(BaseClass, Process):
235 class MPClass(BaseClass, Process):
185
236
186 def __init__(self, *args, **kwargs):
237 def __init__(self, *args, **kwargs):
187 super(MPClass, self).__init__()
238 super(MPClass, self).__init__()
188 Process.__init__(self)
239 Process.__init__(self)
189 self.operationKwargs = {}
240 self.operationKwargs = {}
190 self.args = args
241 self.args = args
191 self.kwargs = kwargs
242 self.kwargs = kwargs
192 self.sender = None
243 self.sender = None
193 self.receiver = None
244 self.receiver = None
194 self.i = 0
245 self.i = 0
195 self.t = time.time()
246 self.t = time.time()
196 self.name = BaseClass.__name__
247 self.name = BaseClass.__name__
197
248
198 if 'plot' in self.name.lower() and not self.name.endswith('_'):
249 if 'plot' in self.name.lower() and not self.name.endswith('_'):
199 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
250 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
200
251
201 self.start_time = time.time()
252 self.start_time = time.time()
202 self.id = args[0]
253 self.id = args[0]
203 self.inputId = args[1]
254 self.inputId = args[1]
204 self.project_id = args[2]
255 self.project_id = args[2]
205 self.err_queue = args[3]
256 self.err_queue = args[3]
206 self.typeProc = args[4]
257 self.typeProc = args[4]
207 self.err_queue.put('#_start_#')
258 self.err_queue.put('#_start_#')
259 self.queue = InputQueue(self.project_id, self.inputId)
208
260
209 def subscribe(self):
261 def subscribe(self):
210 '''
262 '''
211 Start the zmq socket receiver and subcribe to input ID.
263 Start the zmq socket receiver and subcribe to input ID.
212 '''
264 '''
213
265
214 c = zmq.Context()
266 self.queue.start()
215 self.receiver = c.socket(zmq.SUB)
216 self.receiver.connect(
217 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
218 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
219
220
267
221 def listen(self):
268 def listen(self):
222 '''
269 '''
223 This function waits for objects
270 This function waits for objects
224 '''
271 '''
225
272
226 data = pickle.loads(self.receiver.recv_multipart()[1])
273 return self.queue.get()
227
228 return data
229
274
230 def set_publisher(self):
275 def set_publisher(self):
231 '''
276 '''
232 This function create a zmq socket for publishing objects.
277 This function create a zmq socket for publishing objects.
233 '''
278 '''
234
279
235 time.sleep(0.5)
280 time.sleep(0.5)
236
281
237 c = zmq.Context()
282 c = zmq.Context()
238 self.sender = c.socket(zmq.PUB)
283 self.sender = c.socket(zmq.PUB)
239 self.sender.connect(
284 self.sender.connect(
240 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
285 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
241
286
242 def publish(self, data, id):
287 def publish(self, data, id):
243 '''
288 '''
244 This function publish an object, to an specific topic.
289 This function publish an object, to an specific topic.
245 For Read Units (inputId == None) adds a little delay
290 For Read Units (inputId == None) adds a little delay
246 to avoid data loss
291 to avoid data loss
247 '''
292 '''
248
293
249 if self.inputId is None:
294 if self.inputId is None:
250 self.i += 1
295 self.i += 1
251 if self.i % 40 == 0 and time.time()-self.t > 0.1:
296 if self.i % 40 == 0 and time.time()-self.t > 0.1:
252 self.i = 0
297 self.i = 0
253 self.t = time.time()
298 self.t = time.time()
254 time.sleep(0.05)
299 time.sleep(0.05)
255 elif self.i % 40 == 0:
300 elif self.i % 40 == 0:
256 self.i = 0
301 self.i = 0
257 self.t = time.time()
302 self.t = time.time()
258 time.sleep(0.01)
303 time.sleep(0.01)
259
304
260 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
305 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
261
306
262 def runReader(self):
307 def runReader(self):
263 '''
308 '''
264 Run fuction for read units
309 Run fuction for read units
265 '''
310 '''
266 while True:
311 while True:
267
312
268 try:
313 try:
269 BaseClass.run(self, **self.kwargs)
314 BaseClass.run(self, **self.kwargs)
270 except:
315 except:
271 err = traceback.format_exc()
316 err = traceback.format_exc()
272 if 'No more files' in err:
317 if 'No more files' in err:
273 log.warning('No more files to read', self.name)
318 log.warning('No more files to read', self.name)
274 else:
319 else:
275 self.err_queue.put('{}|{}'.format(self.name, err))
320 self.err_queue.put('{}|{}'.format(self.name, err))
276 self.dataOut.error = True
321 self.dataOut.error = True
277
322
278 for op, optype, opId, kwargs in self.operations:
323 for op, optype, opId, kwargs in self.operations:
279 if optype == 'self' and not self.dataOut.flagNoData:
324 if optype == 'self' and not self.dataOut.flagNoData:
280 op(**kwargs)
325 op(**kwargs)
281 elif optype == 'other' and not self.dataOut.flagNoData:
326 elif optype == 'other' and not self.dataOut.flagNoData:
282 self.dataOut = op.run(self.dataOut, **self.kwargs)
327 self.dataOut = op.run(self.dataOut, **self.kwargs)
283 elif optype == 'external':
328 elif optype == 'external':
284 self.publish(self.dataOut, opId)
329 self.publish(self.dataOut, opId)
285
330
286 if self.dataOut.flagNoData and not self.dataOut.error:
331 if self.dataOut.flagNoData and not self.dataOut.error:
287 continue
332 continue
288
333
289 self.publish(self.dataOut, self.id)
334 self.publish(self.dataOut, self.id)
290
335
291 if self.dataOut.error:
336 if self.dataOut.error:
292 break
337 break
293
338
294 time.sleep(0.5)
339 time.sleep(0.5)
295
340
296 def runProc(self):
341 def runProc(self):
297 '''
342 '''
298 Run function for proccessing units
343 Run function for proccessing units
299 '''
344 '''
300
345
301 while True:
346 while True:
302 self.dataIn = self.listen()
347 self.dataIn = self.listen()
303
348
304 if self.dataIn.flagNoData and self.dataIn.error is None:
349 if self.dataIn.flagNoData and self.dataIn.error is None:
305 continue
350 continue
306 elif not self.dataIn.error:
351 elif not self.dataIn.error:
307 try:
352 try:
308 BaseClass.run(self, **self.kwargs)
353 BaseClass.run(self, **self.kwargs)
309 except:
354 except:
310 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
355 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
311 self.dataOut.error = True
356 self.dataOut.error = True
312 elif self.dataIn.error:
357 elif self.dataIn.error:
313 self.dataOut.error = self.dataIn.error
358 self.dataOut.error = self.dataIn.error
314 self.dataOut.flagNoData = True
359 self.dataOut.flagNoData = True
315
360
316 for op, optype, opId, kwargs in self.operations:
361 for op, optype, opId, kwargs in self.operations:
317 if optype == 'self' and not self.dataOut.flagNoData:
362 if optype == 'self' and not self.dataOut.flagNoData:
318 op(**kwargs)
363 op(**kwargs)
319 elif optype == 'other' and not self.dataOut.flagNoData:
364 elif optype == 'other' and not self.dataOut.flagNoData:
320 self.dataOut = op.run(self.dataOut, **kwargs)
365 self.dataOut = op.run(self.dataOut, **kwargs)
321 elif optype == 'external' and not self.dataOut.flagNoData:
366 elif optype == 'external' and not self.dataOut.flagNoData:
322 self.publish(self.dataOut, opId)
367 self.publish(self.dataOut, opId)
323
368
324 self.publish(self.dataOut, self.id)
369 self.publish(self.dataOut, self.id)
325 for op, optype, opId, kwargs in self.operations:
370 for op, optype, opId, kwargs in self.operations:
326 if optype == 'external' and self.dataOut.error:
371 if optype == 'external' and self.dataOut.error:
327 self.publish(self.dataOut, opId)
372 self.publish(self.dataOut, opId)
328
373
329 if self.dataOut.error:
374 if self.dataOut.error:
330 break
375 break
331
376
332 time.sleep(0.5)
377 time.sleep(0.5)
333
378
334 def runOp(self):
379 def runOp(self):
335 '''
380 '''
336 Run function for external operations (this operations just receive data
381 Run function for external operations (this operations just receive data
337 ex: plots, writers, publishers)
382 ex: plots, writers, publishers)
338 '''
383 '''
339
384
340 while True:
385 while True:
341
386
342 dataOut = self.listen()
387 dataOut = self.listen()
343
388
344 if not dataOut.error:
389 if not dataOut.error:
345 BaseClass.run(self, dataOut, **self.kwargs)
390 BaseClass.run(self, dataOut, **self.kwargs)
346 else:
391 else:
347 break
392 break
348
393
349 def run(self):
394 def run(self):
350 if self.typeProc is "ProcUnit":
395 if self.typeProc is "ProcUnit":
351
396
352 if self.inputId is not None:
397 if self.inputId is not None:
353 self.subscribe()
398 self.subscribe()
354
399
355 self.set_publisher()
400 self.set_publisher()
356
401
357 if 'Reader' not in BaseClass.__name__:
402 if 'Reader' not in BaseClass.__name__:
358 self.runProc()
403 self.runProc()
359 else:
404 else:
360 self.runReader()
405 self.runReader()
361
406
362 elif self.typeProc is "Operation":
407 elif self.typeProc is "Operation":
363
408
364 self.subscribe()
409 self.subscribe()
365 self.runOp()
410 self.runOp()
366
411
367 else:
412 else:
368 raise ValueError("Unknown type")
413 raise ValueError("Unknown type")
369
414
370 self.close()
415 self.close()
371
416
372 def close(self):
417 def close(self):
373
418
374 BaseClass.close(self)
419 BaseClass.close(self)
375 self.err_queue.put('#_end_#')
420 self.err_queue.put('#_end_#')
376
421
377 if self.sender:
422 if self.sender:
378 self.sender.close()
423 self.sender.close()
379
424
380 if self.receiver:
425 if self.receiver:
381 self.receiver.close()
426 self.receiver.close()
382
427
383 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
428 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
384
429
385 return MPClass
430 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now