##// END OF EJS Templates
Use of delays instead of input queue to keep dataouts and avoid loose of them
jespinoza -
r1245:1ee18bfa3eb6
parent child
Show More
@@ -1,399 +1,385
1 '''
1 '''
2 Updated for multiprocessing
2 Updated for multiprocessing
3 Author : Sergio Cortez
3 Author : Sergio Cortez
4 Jan 2018
4 Jan 2018
5 Abstract:
5 Abstract:
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9
9
10 Based on:
10 Based on:
11 $Author: murco $
11 $Author: murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 '''
13 '''
14
14
15 import os
15 import os
16 import inspect
16 import inspect
17 import zmq
17 import zmq
18 import time
18 import time
19 import pickle
19 import pickle
20 import traceback
20 import traceback
21 from queue import Queue
21 from queue import Queue
22 from threading import Thread
22 from threading import Thread
23 from multiprocessing import Process
23 from multiprocessing import Process
24
24
25 from schainpy.utils import log
25 from schainpy.utils import log
26
26
27
27
28 class ProcessingUnit(object):
28 class ProcessingUnit(object):
29
29
30 """
30 """
31 Update - Jan 2018 - MULTIPROCESSING
31 Update - Jan 2018 - MULTIPROCESSING
32 All the "call" methods present in the previous base were removed.
32 All the "call" methods present in the previous base were removed.
33 The majority of operations are independant processes, thus
33 The majority of operations are independant processes, thus
34 the decorator is in charge of communicate the operation processes
34 the decorator is in charge of communicate the operation processes
35 with the proccessing unit via IPC.
35 with the proccessing unit via IPC.
36
36
37 The constructor does not receive any argument. The remaining methods
37 The constructor does not receive any argument. The remaining methods
38 are related with the operations to execute.
38 are related with the operations to execute.
39
39
40
40
41 """
41 """
42
42
43 def __init__(self):
43 def __init__(self):
44
44
45 self.dataIn = None
45 self.dataIn = None
46 self.dataOut = None
46 self.dataOut = None
47 self.isConfig = False
47 self.isConfig = False
48 self.operations = []
48 self.operations = []
49 self.plots = []
49 self.plots = []
50
50
51 def getAllowedArgs(self):
51 def getAllowedArgs(self):
52 if hasattr(self, '__attrs__'):
52 if hasattr(self, '__attrs__'):
53 return self.__attrs__
53 return self.__attrs__
54 else:
54 else:
55 return inspect.getargspec(self.run).args
55 return inspect.getargspec(self.run).args
56
56
57 def addOperation(self, conf, operation):
57 def addOperation(self, conf, operation):
58 """
58 """
59 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
59 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
60 posses the id of the operation process (IPC purposes)
60 posses the id of the operation process (IPC purposes)
61
61
62 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
62 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
63 identificador asociado a este objeto.
63 identificador asociado a este objeto.
64
64
65 Input:
65 Input:
66
66
67 object : objeto de la clase "Operation"
67 object : objeto de la clase "Operation"
68
68
69 Return:
69 Return:
70
70
71 objId : identificador del objeto, necesario para comunicar con master(procUnit)
71 objId : identificador del objeto, necesario para comunicar con master(procUnit)
72 """
72 """
73
73
74 self.operations.append(
74 self.operations.append(
75 (operation, conf.type, conf.id, conf.getKwargs()))
75 (operation, conf.type, conf.id, conf.getKwargs()))
76
76
77 if 'plot' in self.name.lower():
77 if 'plot' in self.name.lower():
78 self.plots.append(operation.CODE)
78 self.plots.append(operation.CODE)
79
79
80 def getOperationObj(self, objId):
80 def getOperationObj(self, objId):
81
81
82 if objId not in list(self.operations.keys()):
82 if objId not in list(self.operations.keys()):
83 return None
83 return None
84
84
85 return self.operations[objId]
85 return self.operations[objId]
86
86
87 def operation(self, **kwargs):
87 def operation(self, **kwargs):
88 """
88 """
89 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
89 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
90 atributos del objeto dataOut
90 atributos del objeto dataOut
91
91
92 Input:
92 Input:
93
93
94 **kwargs : Diccionario de argumentos de la funcion a ejecutar
94 **kwargs : Diccionario de argumentos de la funcion a ejecutar
95 """
95 """
96
96
97 raise NotImplementedError
97 raise NotImplementedError
98
98
99 def setup(self):
99 def setup(self):
100
100
101 raise NotImplementedError
101 raise NotImplementedError
102
102
103 def run(self):
103 def run(self):
104
104
105 raise NotImplementedError
105 raise NotImplementedError
106
106
107 def close(self):
107 def close(self):
108
108
109 return
109 return
110
110
111
111
112 class Operation(object):
112 class Operation(object):
113
113
114 """
114 """
115 Update - Jan 2018 - MULTIPROCESSING
115 Update - Jan 2018 - MULTIPROCESSING
116
116
117 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
117 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
118 The constructor doe snot receive any argument, neither the baseclass.
118 The constructor doe snot receive any argument, neither the baseclass.
119
119
120
120
121 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
121 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
122 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
122 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
123 acumulacion dentro de esta clase
123 acumulacion dentro de esta clase
124
124
125 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
125 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
126
126
127 """
127 """
128
128
129 def __init__(self):
129 def __init__(self):
130
130
131 self.id = None
131 self.id = None
132 self.isConfig = False
132 self.isConfig = False
133
133
134 if not hasattr(self, 'name'):
134 if not hasattr(self, 'name'):
135 self.name = self.__class__.__name__
135 self.name = self.__class__.__name__
136
136
137 def getAllowedArgs(self):
137 def getAllowedArgs(self):
138 if hasattr(self, '__attrs__'):
138 if hasattr(self, '__attrs__'):
139 return self.__attrs__
139 return self.__attrs__
140 else:
140 else:
141 return inspect.getargspec(self.run).args
141 return inspect.getargspec(self.run).args
142
142
143 def setup(self):
143 def setup(self):
144
144
145 self.isConfig = True
145 self.isConfig = True
146
146
147 raise NotImplementedError
147 raise NotImplementedError
148
148
149 def run(self, dataIn, **kwargs):
149 def run(self, dataIn, **kwargs):
150 """
150 """
151 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
151 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
152 atributos del objeto dataIn.
152 atributos del objeto dataIn.
153
153
154 Input:
154 Input:
155
155
156 dataIn : objeto del tipo JROData
156 dataIn : objeto del tipo JROData
157
157
158 Return:
158 Return:
159
159
160 None
160 None
161
161
162 Affected:
162 Affected:
163 __buffer : buffer de recepcion de datos.
163 __buffer : buffer de recepcion de datos.
164
164
165 """
165 """
166 if not self.isConfig:
166 if not self.isConfig:
167 self.setup(**kwargs)
167 self.setup(**kwargs)
168
168
169 raise NotImplementedError
169 raise NotImplementedError
170
170
171 def close(self):
171 def close(self):
172
172
173 return
173 return
174
174
175 class InputQueue(Thread):
176 '''
177 Class to hold input data for Proccessing Units and external Operations,
178 '''
179
180 def __init__(self, project_id, inputId):
181 Thread.__init__(self)
182 self.queue = Queue()
183 self.project_id = project_id
184 self.inputId = inputId
185
186 def run(self):
187
188 c = zmq.Context()
189 self.receiver = c.socket(zmq.SUB)
190 self.receiver.connect(
191 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
192 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
193
194 while True:
195 self.queue.put(self.receiver.recv_multipart()[1])
196
197 def get(self):
198
199 return pickle.loads(self.queue.get())
200
201
175
202 def MPDecorator(BaseClass):
176 def MPDecorator(BaseClass):
203 """
177 """
204 Multiprocessing class decorator
178 Multiprocessing class decorator
205
179
206 This function add multiprocessing features to a BaseClass. Also, it handle
180 This function add multiprocessing features to a BaseClass. Also, it handle
207 the communication beetween processes (readers, procUnits and operations).
181 the communication beetween processes (readers, procUnits and operations).
208 """
182 """
209
183
210 class MPClass(BaseClass, Process):
184 class MPClass(BaseClass, Process):
211
185
212 def __init__(self, *args, **kwargs):
186 def __init__(self, *args, **kwargs):
213 super(MPClass, self).__init__()
187 super(MPClass, self).__init__()
214 Process.__init__(self)
188 Process.__init__(self)
215 self.operationKwargs = {}
189 self.operationKwargs = {}
216 self.args = args
190 self.args = args
217 self.kwargs = kwargs
191 self.kwargs = kwargs
218 self.sender = None
192 self.sender = None
219 self.receiver = None
193 self.receiver = None
220 self.i = 0
194 self.i = 0
195 self.t = time.time()
221 self.name = BaseClass.__name__
196 self.name = BaseClass.__name__
222
197
223 if 'plot' in self.name.lower() and not self.name.endswith('_'):
198 if 'plot' in self.name.lower() and not self.name.endswith('_'):
224 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
199 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
225
200
226 self.start_time = time.time()
201 self.start_time = time.time()
227 self.id = args[0]
202 self.id = args[0]
228 self.inputId = args[1]
203 self.inputId = args[1]
229 self.project_id = args[2]
204 self.project_id = args[2]
230 self.err_queue = args[3]
205 self.err_queue = args[3]
231 self.typeProc = args[4]
206 self.typeProc = args[4]
232 self.queue = InputQueue(self.project_id, self.inputId)
233 self.err_queue.put('#_start_#')
207 self.err_queue.put('#_start_#')
234
208
235 def subscribe(self):
209 def subscribe(self):
236 '''
210 '''
237 This function start the input queue.
211 Start the zmq socket receiver and subcribe to input ID.
238 '''
212 '''
239
213
240 self.queue.start()
214 c = zmq.Context()
215 self.receiver = c.socket(zmq.SUB)
216 self.receiver.connect(
217 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
218 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
241
219
242
220
243 def listen(self):
221 def listen(self):
244 '''
222 '''
245 This function waits for objects
223 This function waits for objects
246 '''
224 '''
247
225
248 return self.queue.get()
226 data = pickle.loads(self.receiver.recv_multipart()[1])
227
228 return data
249
229
250 def set_publisher(self):
230 def set_publisher(self):
251 '''
231 '''
252 This function create a socket for publishing purposes.
232 This function create a zmq socket for publishing objects.
253 '''
233 '''
254
234
255 time.sleep(1)
235 time.sleep(0.5)
236
256 c = zmq.Context()
237 c = zmq.Context()
257 self.sender = c.socket(zmq.PUB)
238 self.sender = c.socket(zmq.PUB)
258 self.sender.connect(
239 self.sender.connect(
259 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
240 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
260
241
261 def publish(self, data, id):
242 def publish(self, data, id):
262 '''
243 '''
263 This function publish an object, to a specific topic.
244 This function publish an object, to an specific topic.
264 For Read Units (inputId == None) adds a little delay
245 For Read Units (inputId == None) adds a little delay
265 to avoid data loss
246 to avoid data loss
266 '''
247 '''
267
248
268 if self.inputId is None:
249 if self.inputId is None:
269 self.i += 1
250 self.i += 1
270 if self.i % 80 == 0:
251 if self.i % 40 == 0 and time.time()-self.t > 0.1:
252 self.i = 0
253 self.t = time.time()
254 time.sleep(0.05)
255 elif self.i % 40 == 0:
271 self.i = 0
256 self.i = 0
257 self.t = time.time()
272 time.sleep(0.01)
258 time.sleep(0.01)
273
259
274 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
260 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
275
261
276 def runReader(self):
262 def runReader(self):
277 '''
263 '''
278 Run fuction for read units
264 Run fuction for read units
279 '''
265 '''
280 while True:
266 while True:
281
267
282 try:
268 try:
283 BaseClass.run(self, **self.kwargs)
269 BaseClass.run(self, **self.kwargs)
284 except:
270 except:
285 err = traceback.format_exc()
271 err = traceback.format_exc()
286 if 'No more files' in err:
272 if 'No more files' in err:
287 log.warning('No more files to read', self.name)
273 log.warning('No more files to read', self.name)
288 else:
274 else:
289 self.err_queue.put('{}|{}'.format(self.name, err))
275 self.err_queue.put('{}|{}'.format(self.name, err))
290 self.dataOut.error = True
276 self.dataOut.error = True
291
277
292 for op, optype, opId, kwargs in self.operations:
278 for op, optype, opId, kwargs in self.operations:
293 if optype == 'self' and not self.dataOut.flagNoData:
279 if optype == 'self' and not self.dataOut.flagNoData:
294 op(**kwargs)
280 op(**kwargs)
295 elif optype == 'other' and not self.dataOut.flagNoData:
281 elif optype == 'other' and not self.dataOut.flagNoData:
296 self.dataOut = op.run(self.dataOut, **self.kwargs)
282 self.dataOut = op.run(self.dataOut, **self.kwargs)
297 elif optype == 'external':
283 elif optype == 'external':
298 self.publish(self.dataOut, opId)
284 self.publish(self.dataOut, opId)
299
285
300 if self.dataOut.flagNoData and not self.dataOut.error:
286 if self.dataOut.flagNoData and not self.dataOut.error:
301 continue
287 continue
302
288
303 self.publish(self.dataOut, self.id)
289 self.publish(self.dataOut, self.id)
304
290
305 if self.dataOut.error:
291 if self.dataOut.error:
306 break
292 break
307
293
308 time.sleep(0.5)
294 time.sleep(0.5)
309
295
310 def runProc(self):
296 def runProc(self):
311 '''
297 '''
312 Run function for proccessing units
298 Run function for proccessing units
313 '''
299 '''
314
300
315 while True:
301 while True:
316 self.dataIn = self.listen()
302 self.dataIn = self.listen()
317
303
318 if self.dataIn.flagNoData and self.dataIn.error is None:
304 if self.dataIn.flagNoData and self.dataIn.error is None:
319 continue
305 continue
320 elif not self.dataIn.error:
306 elif not self.dataIn.error:
321 try:
307 try:
322 BaseClass.run(self, **self.kwargs)
308 BaseClass.run(self, **self.kwargs)
323 except:
309 except:
324 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
310 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
325 self.dataOut.error = True
311 self.dataOut.error = True
326 elif self.dataIn.error:
312 elif self.dataIn.error:
327 self.dataOut.error = self.dataIn.error
313 self.dataOut.error = self.dataIn.error
328 self.dataOut.flagNoData = True
314 self.dataOut.flagNoData = True
329
315
330 for op, optype, opId, kwargs in self.operations:
316 for op, optype, opId, kwargs in self.operations:
331 if optype == 'self' and not self.dataOut.flagNoData:
317 if optype == 'self' and not self.dataOut.flagNoData:
332 op(**kwargs)
318 op(**kwargs)
333 elif optype == 'other' and not self.dataOut.flagNoData:
319 elif optype == 'other' and not self.dataOut.flagNoData:
334 self.dataOut = op.run(self.dataOut, **kwargs)
320 self.dataOut = op.run(self.dataOut, **kwargs)
335 elif optype == 'external' and not self.dataOut.flagNoData:
321 elif optype == 'external' and not self.dataOut.flagNoData:
336 self.publish(self.dataOut, opId)
322 self.publish(self.dataOut, opId)
337
323
338 self.publish(self.dataOut, self.id)
324 self.publish(self.dataOut, self.id)
339 for op, optype, opId, kwargs in self.operations:
325 for op, optype, opId, kwargs in self.operations:
340 if optype == 'external' and self.dataOut.error:
326 if optype == 'external' and self.dataOut.error:
341 self.publish(self.dataOut, opId)
327 self.publish(self.dataOut, opId)
342
328
343 if self.dataOut.error:
329 if self.dataOut.error:
344 break
330 break
345
331
346 time.sleep(0.5)
332 time.sleep(0.5)
347
333
348 def runOp(self):
334 def runOp(self):
349 '''
335 '''
350 Run function for external operations (this operations just receive data
336 Run function for external operations (this operations just receive data
351 ex: plots, writers, publishers)
337 ex: plots, writers, publishers)
352 '''
338 '''
353
339
354 while True:
340 while True:
355
341
356 dataOut = self.listen()
342 dataOut = self.listen()
357
343
358 if not dataOut.error:
344 if not dataOut.error:
359 BaseClass.run(self, dataOut, **self.kwargs)
345 BaseClass.run(self, dataOut, **self.kwargs)
360 else:
346 else:
361 break
347 break
362
348
363 def run(self):
349 def run(self):
364 if self.typeProc is "ProcUnit":
350 if self.typeProc is "ProcUnit":
365
351
366 if self.inputId is not None:
352 if self.inputId is not None:
367 self.subscribe()
353 self.subscribe()
368
354
369 self.set_publisher()
355 self.set_publisher()
370
356
371 if 'Reader' not in BaseClass.__name__:
357 if 'Reader' not in BaseClass.__name__:
372 self.runProc()
358 self.runProc()
373 else:
359 else:
374 self.runReader()
360 self.runReader()
375
361
376 elif self.typeProc is "Operation":
362 elif self.typeProc is "Operation":
377
363
378 self.subscribe()
364 self.subscribe()
379 self.runOp()
365 self.runOp()
380
366
381 else:
367 else:
382 raise ValueError("Unknown type")
368 raise ValueError("Unknown type")
383
369
384 self.close()
370 self.close()
385
371
386 def close(self):
372 def close(self):
387
373
388 BaseClass.close(self)
374 BaseClass.close(self)
389 self.err_queue.put('#_end_#')
375 self.err_queue.put('#_end_#')
390
376
391 if self.sender:
377 if self.sender:
392 self.sender.close()
378 self.sender.close()
393
379
394 if self.receiver:
380 if self.receiver:
395 self.receiver.close()
381 self.receiver.close()
396
382
397 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
383 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
398
384
399 return MPClass
385 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now