##// END OF EJS Templates
Use of delays instead of input queue to keep dataouts and avoid loose of them
jespinoza -
r1245:1ee18bfa3eb6
parent child
Show More
@@ -1,399 +1,385
1 1 '''
2 2 Updated for multiprocessing
3 3 Author : Sergio Cortez
4 4 Jan 2018
5 5 Abstract:
6 6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9 9
10 10 Based on:
11 11 $Author: murco $
12 12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 13 '''
14 14
15 15 import os
16 16 import inspect
17 17 import zmq
18 18 import time
19 19 import pickle
20 20 import traceback
21 21 from queue import Queue
22 22 from threading import Thread
23 23 from multiprocessing import Process
24 24
25 25 from schainpy.utils import log
26 26
27 27
28 28 class ProcessingUnit(object):
29 29
30 30 """
31 31 Update - Jan 2018 - MULTIPROCESSING
32 32 All the "call" methods present in the previous base were removed.
33 33 The majority of operations are independant processes, thus
34 34 the decorator is in charge of communicate the operation processes
35 35 with the proccessing unit via IPC.
36 36
37 37 The constructor does not receive any argument. The remaining methods
38 38 are related with the operations to execute.
39 39
40 40
41 41 """
42 42
43 43 def __init__(self):
44 44
45 45 self.dataIn = None
46 46 self.dataOut = None
47 47 self.isConfig = False
48 48 self.operations = []
49 49 self.plots = []
50 50
51 51 def getAllowedArgs(self):
52 52 if hasattr(self, '__attrs__'):
53 53 return self.__attrs__
54 54 else:
55 55 return inspect.getargspec(self.run).args
56 56
57 57 def addOperation(self, conf, operation):
58 58 """
59 59 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
60 60 posses the id of the operation process (IPC purposes)
61 61
62 62 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
63 63 identificador asociado a este objeto.
64 64
65 65 Input:
66 66
67 67 object : objeto de la clase "Operation"
68 68
69 69 Return:
70 70
71 71 objId : identificador del objeto, necesario para comunicar con master(procUnit)
72 72 """
73 73
74 74 self.operations.append(
75 75 (operation, conf.type, conf.id, conf.getKwargs()))
76 76
77 77 if 'plot' in self.name.lower():
78 78 self.plots.append(operation.CODE)
79 79
80 80 def getOperationObj(self, objId):
81 81
82 82 if objId not in list(self.operations.keys()):
83 83 return None
84 84
85 85 return self.operations[objId]
86 86
87 87 def operation(self, **kwargs):
88 88 """
89 89 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
90 90 atributos del objeto dataOut
91 91
92 92 Input:
93 93
94 94 **kwargs : Diccionario de argumentos de la funcion a ejecutar
95 95 """
96 96
97 97 raise NotImplementedError
98 98
99 99 def setup(self):
100 100
101 101 raise NotImplementedError
102 102
103 103 def run(self):
104 104
105 105 raise NotImplementedError
106 106
107 107 def close(self):
108 108
109 109 return
110 110
111 111
112 112 class Operation(object):
113 113
114 114 """
115 115 Update - Jan 2018 - MULTIPROCESSING
116 116
117 117 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
118 118 The constructor doe snot receive any argument, neither the baseclass.
119 119
120 120
121 121 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
122 122 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
123 123 acumulacion dentro de esta clase
124 124
125 125 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
126 126
127 127 """
128 128
129 129 def __init__(self):
130 130
131 131 self.id = None
132 132 self.isConfig = False
133 133
134 134 if not hasattr(self, 'name'):
135 135 self.name = self.__class__.__name__
136 136
137 137 def getAllowedArgs(self):
138 138 if hasattr(self, '__attrs__'):
139 139 return self.__attrs__
140 140 else:
141 141 return inspect.getargspec(self.run).args
142 142
143 143 def setup(self):
144 144
145 145 self.isConfig = True
146 146
147 147 raise NotImplementedError
148 148
149 149 def run(self, dataIn, **kwargs):
150 150 """
151 151 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
152 152 atributos del objeto dataIn.
153 153
154 154 Input:
155 155
156 156 dataIn : objeto del tipo JROData
157 157
158 158 Return:
159 159
160 160 None
161 161
162 162 Affected:
163 163 __buffer : buffer de recepcion de datos.
164 164
165 165 """
166 166 if not self.isConfig:
167 167 self.setup(**kwargs)
168 168
169 169 raise NotImplementedError
170 170
171 171 def close(self):
172 172
173 173 return
174 174
175 class InputQueue(Thread):
176 '''
177 Class to hold input data for Proccessing Units and external Operations,
178 '''
179
180 def __init__(self, project_id, inputId):
181 Thread.__init__(self)
182 self.queue = Queue()
183 self.project_id = project_id
184 self.inputId = inputId
185
186 def run(self):
187
188 c = zmq.Context()
189 self.receiver = c.socket(zmq.SUB)
190 self.receiver.connect(
191 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
192 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
193
194 while True:
195 self.queue.put(self.receiver.recv_multipart()[1])
196
197 def get(self):
198
199 return pickle.loads(self.queue.get())
200
201 175
202 176 def MPDecorator(BaseClass):
203 177 """
204 178 Multiprocessing class decorator
205 179
206 180 This function add multiprocessing features to a BaseClass. Also, it handle
207 181 the communication beetween processes (readers, procUnits and operations).
208 182 """
209 183
210 184 class MPClass(BaseClass, Process):
211 185
212 186 def __init__(self, *args, **kwargs):
213 187 super(MPClass, self).__init__()
214 188 Process.__init__(self)
215 189 self.operationKwargs = {}
216 190 self.args = args
217 191 self.kwargs = kwargs
218 192 self.sender = None
219 193 self.receiver = None
220 194 self.i = 0
195 self.t = time.time()
221 196 self.name = BaseClass.__name__
222 197
223 198 if 'plot' in self.name.lower() and not self.name.endswith('_'):
224 199 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
225 200
226 201 self.start_time = time.time()
227 202 self.id = args[0]
228 203 self.inputId = args[1]
229 204 self.project_id = args[2]
230 205 self.err_queue = args[3]
231 self.typeProc = args[4]
232 self.queue = InputQueue(self.project_id, self.inputId)
206 self.typeProc = args[4]
233 207 self.err_queue.put('#_start_#')
234 208
235 209 def subscribe(self):
236 210 '''
237 This function start the input queue.
211 Start the zmq socket receiver and subcribe to input ID.
238 212 '''
239
240 self.queue.start()
213
214 c = zmq.Context()
215 self.receiver = c.socket(zmq.SUB)
216 self.receiver.connect(
217 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
218 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
241 219
242 220
243 221 def listen(self):
244 222 '''
245 223 This function waits for objects
246 224 '''
247
248 return self.queue.get()
225
226 data = pickle.loads(self.receiver.recv_multipart()[1])
227
228 return data
249 229
250 230 def set_publisher(self):
251 231 '''
252 This function create a socket for publishing purposes.
232 This function create a zmq socket for publishing objects.
253 233 '''
254 234
255 time.sleep(1)
235 time.sleep(0.5)
236
256 237 c = zmq.Context()
257 238 self.sender = c.socket(zmq.PUB)
258 239 self.sender.connect(
259 240 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
260 241
261 242 def publish(self, data, id):
262 243 '''
263 This function publish an object, to a specific topic.
244 This function publish an object, to an specific topic.
264 245 For Read Units (inputId == None) adds a little delay
265 246 to avoid data loss
266 247 '''
267 248
268 249 if self.inputId is None:
269 250 self.i += 1
270 if self.i % 80 == 0:
251 if self.i % 40 == 0 and time.time()-self.t > 0.1:
252 self.i = 0
253 self.t = time.time()
254 time.sleep(0.05)
255 elif self.i % 40 == 0:
271 256 self.i = 0
272 time.sleep(0.01)
257 self.t = time.time()
258 time.sleep(0.01)
273 259
274 260 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
275 261
276 262 def runReader(self):
277 263 '''
278 264 Run fuction for read units
279 265 '''
280 266 while True:
281 267
282 268 try:
283 269 BaseClass.run(self, **self.kwargs)
284 270 except:
285 271 err = traceback.format_exc()
286 272 if 'No more files' in err:
287 273 log.warning('No more files to read', self.name)
288 274 else:
289 275 self.err_queue.put('{}|{}'.format(self.name, err))
290 276 self.dataOut.error = True
291 277
292 278 for op, optype, opId, kwargs in self.operations:
293 279 if optype == 'self' and not self.dataOut.flagNoData:
294 280 op(**kwargs)
295 281 elif optype == 'other' and not self.dataOut.flagNoData:
296 282 self.dataOut = op.run(self.dataOut, **self.kwargs)
297 283 elif optype == 'external':
298 284 self.publish(self.dataOut, opId)
299 285
300 286 if self.dataOut.flagNoData and not self.dataOut.error:
301 287 continue
302 288
303 289 self.publish(self.dataOut, self.id)
304 290
305 291 if self.dataOut.error:
306 292 break
307 293
308 294 time.sleep(0.5)
309 295
310 296 def runProc(self):
311 297 '''
312 298 Run function for proccessing units
313 299 '''
314 300
315 301 while True:
316 302 self.dataIn = self.listen()
317 303
318 304 if self.dataIn.flagNoData and self.dataIn.error is None:
319 305 continue
320 306 elif not self.dataIn.error:
321 307 try:
322 308 BaseClass.run(self, **self.kwargs)
323 309 except:
324 310 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
325 311 self.dataOut.error = True
326 312 elif self.dataIn.error:
327 313 self.dataOut.error = self.dataIn.error
328 314 self.dataOut.flagNoData = True
329 315
330 316 for op, optype, opId, kwargs in self.operations:
331 317 if optype == 'self' and not self.dataOut.flagNoData:
332 318 op(**kwargs)
333 319 elif optype == 'other' and not self.dataOut.flagNoData:
334 320 self.dataOut = op.run(self.dataOut, **kwargs)
335 321 elif optype == 'external' and not self.dataOut.flagNoData:
336 322 self.publish(self.dataOut, opId)
337 323
338 324 self.publish(self.dataOut, self.id)
339 325 for op, optype, opId, kwargs in self.operations:
340 326 if optype == 'external' and self.dataOut.error:
341 327 self.publish(self.dataOut, opId)
342 328
343 329 if self.dataOut.error:
344 330 break
345 331
346 332 time.sleep(0.5)
347 333
348 334 def runOp(self):
349 335 '''
350 336 Run function for external operations (this operations just receive data
351 337 ex: plots, writers, publishers)
352 338 '''
353 339
354 340 while True:
355 341
356 342 dataOut = self.listen()
357 343
358 344 if not dataOut.error:
359 345 BaseClass.run(self, dataOut, **self.kwargs)
360 346 else:
361 347 break
362 348
363 349 def run(self):
364 350 if self.typeProc is "ProcUnit":
365 351
366 352 if self.inputId is not None:
367 353 self.subscribe()
368 354
369 355 self.set_publisher()
370 356
371 357 if 'Reader' not in BaseClass.__name__:
372 358 self.runProc()
373 359 else:
374 360 self.runReader()
375 361
376 362 elif self.typeProc is "Operation":
377 363
378 364 self.subscribe()
379 365 self.runOp()
380 366
381 367 else:
382 368 raise ValueError("Unknown type")
383 369
384 370 self.close()
385 371
386 372 def close(self):
387 373
388 374 BaseClass.close(self)
389 375 self.err_queue.put('#_end_#')
390 376
391 377 if self.sender:
392 378 self.sender.close()
393 379
394 380 if self.receiver:
395 381 self.receiver.close()
396 382
397 383 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
398 384
399 385 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now