##// END OF EJS Templates
Add input queue again :(
jespinoza -
r1250:3296460acfb3
parent child
Show More
@@ -1,385 +1,430
1 1 '''
2 2 Updated for multiprocessing
3 3 Author : Sergio Cortez
4 4 Jan 2018
5 5 Abstract:
6 6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9 9
10 10 Based on:
11 11 $Author: murco $
12 12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 13 '''
14 14
15 15 import os
16 16 import inspect
17 17 import zmq
18 18 import time
19 19 import pickle
20 20 import traceback
21 21 from queue import Queue
22 22 from threading import Thread
23 23 from multiprocessing import Process
24 24
25 25 from schainpy.utils import log
26 26
27 27
28 28 class ProcessingUnit(object):
29 29
30 30 """
31 31 Update - Jan 2018 - MULTIPROCESSING
32 32 All the "call" methods present in the previous base were removed.
33 33 The majority of operations are independant processes, thus
34 34 the decorator is in charge of communicate the operation processes
35 35 with the proccessing unit via IPC.
36 36
37 37 The constructor does not receive any argument. The remaining methods
38 38 are related with the operations to execute.
39 39
40 40
41 41 """
42 42
43 43 def __init__(self):
44 44
45 45 self.dataIn = None
46 46 self.dataOut = None
47 47 self.isConfig = False
48 48 self.operations = []
49 49 self.plots = []
50 50
51 51 def getAllowedArgs(self):
52 52 if hasattr(self, '__attrs__'):
53 53 return self.__attrs__
54 54 else:
55 55 return inspect.getargspec(self.run).args
56 56
57 57 def addOperation(self, conf, operation):
58 58 """
59 59 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
60 60 posses the id of the operation process (IPC purposes)
61 61
62 62 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
63 63 identificador asociado a este objeto.
64 64
65 65 Input:
66 66
67 67 object : objeto de la clase "Operation"
68 68
69 69 Return:
70 70
71 71 objId : identificador del objeto, necesario para comunicar con master(procUnit)
72 72 """
73 73
74 74 self.operations.append(
75 75 (operation, conf.type, conf.id, conf.getKwargs()))
76 76
77 77 if 'plot' in self.name.lower():
78 78 self.plots.append(operation.CODE)
79 79
80 80 def getOperationObj(self, objId):
81 81
82 82 if objId not in list(self.operations.keys()):
83 83 return None
84 84
85 85 return self.operations[objId]
86 86
87 87 def operation(self, **kwargs):
88 88 """
89 89 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
90 90 atributos del objeto dataOut
91 91
92 92 Input:
93 93
94 94 **kwargs : Diccionario de argumentos de la funcion a ejecutar
95 95 """
96 96
97 97 raise NotImplementedError
98 98
99 99 def setup(self):
100 100
101 101 raise NotImplementedError
102 102
103 103 def run(self):
104 104
105 105 raise NotImplementedError
106 106
107 107 def close(self):
108 108
109 109 return
110 110
111 111
112 112 class Operation(object):
113 113
114 114 """
115 115 Update - Jan 2018 - MULTIPROCESSING
116 116
117 117 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
118 118 The constructor doe snot receive any argument, neither the baseclass.
119 119
120 120
121 121 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
122 122 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
123 123 acumulacion dentro de esta clase
124 124
125 125 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
126 126
127 127 """
128 128
129 129 def __init__(self):
130 130
131 131 self.id = None
132 132 self.isConfig = False
133 133
134 134 if not hasattr(self, 'name'):
135 135 self.name = self.__class__.__name__
136 136
137 137 def getAllowedArgs(self):
138 138 if hasattr(self, '__attrs__'):
139 139 return self.__attrs__
140 140 else:
141 141 return inspect.getargspec(self.run).args
142 142
143 143 def setup(self):
144 144
145 145 self.isConfig = True
146 146
147 147 raise NotImplementedError
148 148
149 149 def run(self, dataIn, **kwargs):
150 150 """
151 151 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
152 152 atributos del objeto dataIn.
153 153
154 154 Input:
155 155
156 156 dataIn : objeto del tipo JROData
157 157
158 158 Return:
159 159
160 160 None
161 161
162 162 Affected:
163 163 __buffer : buffer de recepcion de datos.
164 164
165 165 """
166 166 if not self.isConfig:
167 167 self.setup(**kwargs)
168 168
169 169 raise NotImplementedError
170 170
171 171 def close(self):
172 172
173 173 return
174 174
175 class InputQueue(Thread):
176
177 '''
178
179 Class to hold input data for Proccessing Units and external Operations,
180
181 '''
182
183
184
185 def __init__(self, project_id, inputId):
186
187 Thread.__init__(self)
188
189 self.queue = Queue()
190
191 self.project_id = project_id
192
193 self.inputId = inputId
194
195
196
197 def run(self):
198
199
200
201 c = zmq.Context()
202
203 self.receiver = c.socket(zmq.SUB)
204
205 self.receiver.connect(
206
207 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
208
209 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
210
211
212
213 while True:
214
215 self.queue.put(self.receiver.recv_multipart()[1])
216
217
218
219 def get(self):
220
221
222
223 return pickle.loads(self.queue.get())
224
225
175 226
176 227 def MPDecorator(BaseClass):
177 228 """
178 229 Multiprocessing class decorator
179 230
180 231 This function add multiprocessing features to a BaseClass. Also, it handle
181 232 the communication beetween processes (readers, procUnits and operations).
182 233 """
183 234
184 235 class MPClass(BaseClass, Process):
185 236
186 237 def __init__(self, *args, **kwargs):
187 238 super(MPClass, self).__init__()
188 239 Process.__init__(self)
189 240 self.operationKwargs = {}
190 241 self.args = args
191 242 self.kwargs = kwargs
192 243 self.sender = None
193 244 self.receiver = None
194 245 self.i = 0
195 246 self.t = time.time()
196 247 self.name = BaseClass.__name__
197 248
198 249 if 'plot' in self.name.lower() and not self.name.endswith('_'):
199 250 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
200 251
201 252 self.start_time = time.time()
202 253 self.id = args[0]
203 254 self.inputId = args[1]
204 255 self.project_id = args[2]
205 256 self.err_queue = args[3]
206 257 self.typeProc = args[4]
207 258 self.err_queue.put('#_start_#')
259 self.queue = InputQueue(self.project_id, self.inputId)
208 260
209 261 def subscribe(self):
210 262 '''
211 263 Start the zmq socket receiver and subcribe to input ID.
212 264 '''
213
214 c = zmq.Context()
215 self.receiver = c.socket(zmq.SUB)
216 self.receiver.connect(
217 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
218 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
219
265
266 self.queue.start()
220 267
221 268 def listen(self):
222 269 '''
223 270 This function waits for objects
224 271 '''
225 272
226 data = pickle.loads(self.receiver.recv_multipart()[1])
227
228 return data
273 return self.queue.get()
229 274
230 275 def set_publisher(self):
231 276 '''
232 277 This function create a zmq socket for publishing objects.
233 278 '''
234 279
235 280 time.sleep(0.5)
236 281
237 282 c = zmq.Context()
238 283 self.sender = c.socket(zmq.PUB)
239 284 self.sender.connect(
240 285 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
241 286
242 287 def publish(self, data, id):
243 288 '''
244 289 This function publish an object, to an specific topic.
245 290 For Read Units (inputId == None) adds a little delay
246 291 to avoid data loss
247 292 '''
248 293
249 294 if self.inputId is None:
250 295 self.i += 1
251 296 if self.i % 40 == 0 and time.time()-self.t > 0.1:
252 297 self.i = 0
253 298 self.t = time.time()
254 299 time.sleep(0.05)
255 300 elif self.i % 40 == 0:
256 301 self.i = 0
257 302 self.t = time.time()
258 303 time.sleep(0.01)
259 304
260 305 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
261 306
262 307 def runReader(self):
263 308 '''
264 309 Run fuction for read units
265 310 '''
266 311 while True:
267 312
268 313 try:
269 314 BaseClass.run(self, **self.kwargs)
270 315 except:
271 316 err = traceback.format_exc()
272 317 if 'No more files' in err:
273 318 log.warning('No more files to read', self.name)
274 319 else:
275 320 self.err_queue.put('{}|{}'.format(self.name, err))
276 321 self.dataOut.error = True
277 322
278 323 for op, optype, opId, kwargs in self.operations:
279 324 if optype == 'self' and not self.dataOut.flagNoData:
280 325 op(**kwargs)
281 326 elif optype == 'other' and not self.dataOut.flagNoData:
282 327 self.dataOut = op.run(self.dataOut, **self.kwargs)
283 328 elif optype == 'external':
284 329 self.publish(self.dataOut, opId)
285 330
286 331 if self.dataOut.flagNoData and not self.dataOut.error:
287 332 continue
288 333
289 334 self.publish(self.dataOut, self.id)
290 335
291 336 if self.dataOut.error:
292 337 break
293 338
294 339 time.sleep(0.5)
295 340
296 341 def runProc(self):
297 342 '''
298 343 Run function for proccessing units
299 344 '''
300 345
301 346 while True:
302 347 self.dataIn = self.listen()
303 348
304 349 if self.dataIn.flagNoData and self.dataIn.error is None:
305 350 continue
306 351 elif not self.dataIn.error:
307 352 try:
308 353 BaseClass.run(self, **self.kwargs)
309 354 except:
310 355 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
311 356 self.dataOut.error = True
312 357 elif self.dataIn.error:
313 358 self.dataOut.error = self.dataIn.error
314 359 self.dataOut.flagNoData = True
315 360
316 361 for op, optype, opId, kwargs in self.operations:
317 362 if optype == 'self' and not self.dataOut.flagNoData:
318 363 op(**kwargs)
319 364 elif optype == 'other' and not self.dataOut.flagNoData:
320 365 self.dataOut = op.run(self.dataOut, **kwargs)
321 366 elif optype == 'external' and not self.dataOut.flagNoData:
322 367 self.publish(self.dataOut, opId)
323 368
324 369 self.publish(self.dataOut, self.id)
325 370 for op, optype, opId, kwargs in self.operations:
326 371 if optype == 'external' and self.dataOut.error:
327 372 self.publish(self.dataOut, opId)
328 373
329 374 if self.dataOut.error:
330 375 break
331 376
332 377 time.sleep(0.5)
333 378
334 379 def runOp(self):
335 380 '''
336 381 Run function for external operations (this operations just receive data
337 382 ex: plots, writers, publishers)
338 383 '''
339 384
340 385 while True:
341 386
342 387 dataOut = self.listen()
343 388
344 389 if not dataOut.error:
345 390 BaseClass.run(self, dataOut, **self.kwargs)
346 391 else:
347 392 break
348 393
349 394 def run(self):
350 395 if self.typeProc is "ProcUnit":
351 396
352 397 if self.inputId is not None:
353 398 self.subscribe()
354 399
355 400 self.set_publisher()
356 401
357 402 if 'Reader' not in BaseClass.__name__:
358 403 self.runProc()
359 404 else:
360 405 self.runReader()
361 406
362 407 elif self.typeProc is "Operation":
363 408
364 409 self.subscribe()
365 410 self.runOp()
366 411
367 412 else:
368 413 raise ValueError("Unknown type")
369 414
370 415 self.close()
371 416
372 417 def close(self):
373 418
374 419 BaseClass.close(self)
375 420 self.err_queue.put('#_end_#')
376 421
377 422 if self.sender:
378 423 self.sender.close()
379 424
380 425 if self.receiver:
381 426 self.receiver.close()
382 427
383 428 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
384 429
385 430 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now