##// END OF EJS Templates
Se añade el metodo fix_publish y se editan los metodos listen y publish, en el ultimo se colocaun pequeño retardo aplicado exclusivamente durante la comunacion desde la unidad de lectura, las demas operaciones se realizan igual.
Alexander Valdez -
r1220:dd4695cf07ce
parent child
Show More
@@ -1,390 +1,404
1 1 '''
2 2 Updated for multiprocessing
3 3 Author : Sergio Cortez
4 4 Jan 2018
5 5 Abstract:
6 6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9 9
10 10 Based on:
11 11 $Author: murco $
12 12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 13 '''
14 14
15 15 import os
16 16 import inspect
17 17 import zmq
18 18 import time
19 19 import pickle
20 20 from multiprocessing import Process
21 21 from zmq.utils.monitor import recv_monitor_message
22 22
23 23 from schainpy.utils import log
24 24
25 25
26 26 class ProcessingUnit(object):
27 27
28 28 """
29 29 Update - Jan 2018 - MULTIPROCESSING
30 30 All the "call" methods present in the previous base were removed.
31 31 The majority of operations are independant processes, thus
32 32 the decorator is in charge of communicate the operation processes
33 33 with the proccessing unit via IPC.
34 34
35 35 The constructor does not receive any argument. The remaining methods
36 36 are related with the operations to execute.
37 37
38 38
39 39 """
40 40
41 41 def __init__(self):
42 42
43 43 self.dataIn = None
44 44 self.dataOut = None
45 45 self.isConfig = False
46 46 self.operations = []
47 47 self.plots = []
48 48
49 49 def getAllowedArgs(self):
50 50 if hasattr(self, '__attrs__'):
51 51 return self.__attrs__
52 52 else:
53 53 return inspect.getargspec(self.run).args
54 54
55 55 def addOperation(self, conf, operation):
56 56 """
57 57 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
58 58 posses the id of the operation process (IPC purposes)
59 59
60 60 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
61 61 identificador asociado a este objeto.
62 62
63 63 Input:
64 64
65 65 object : objeto de la clase "Operation"
66 66
67 67 Return:
68 68
69 69 objId : identificador del objeto, necesario para comunicar con master(procUnit)
70 70 """
71 71
72 72 self.operations.append(
73 73 (operation, conf.type, conf.id, conf.getKwargs()))
74 74
75 75 if 'plot' in self.name.lower():
76 76 self.plots.append(operation.CODE)
77 77
78 78 def getOperationObj(self, objId):
79 79
80 80 if objId not in list(self.operations.keys()):
81 81 return None
82 82
83 83 return self.operations[objId]
84 84
85 85 def operation(self, **kwargs):
86 86 """
87 87 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
88 88 atributos del objeto dataOut
89 89
90 90 Input:
91 91
92 92 **kwargs : Diccionario de argumentos de la funcion a ejecutar
93 93 """
94 94
95 95 raise NotImplementedError
96 96
97 97 def setup(self):
98 98
99 99 raise NotImplementedError
100 100
101 101 def run(self):
102 102
103 103 raise NotImplementedError
104 104
105 105 def close(self):
106 106
107 107 return
108 108
109 109
110 110 class Operation(object):
111 111
112 112 """
113 113 Update - Jan 2018 - MULTIPROCESSING
114 114
115 115 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
116 116 The constructor doe snot receive any argument, neither the baseclass.
117 117
118 118
119 119 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
120 120 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
121 121 acumulacion dentro de esta clase
122 122
123 123 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
124 124
125 125 """
126 126
127 127 def __init__(self):
128 128
129 129 self.id = None
130 130 self.isConfig = False
131 131
132 132 if not hasattr(self, 'name'):
133 133 self.name = self.__class__.__name__
134 134
135 135 def getAllowedArgs(self):
136 136 if hasattr(self, '__attrs__'):
137 137 return self.__attrs__
138 138 else:
139 139 return inspect.getargspec(self.run).args
140 140
141 141 def setup(self):
142 142
143 143 self.isConfig = True
144 144
145 145 raise NotImplementedError
146 146
147 147 def run(self, dataIn, **kwargs):
148 148 """
149 149 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
150 150 atributos del objeto dataIn.
151 151
152 152 Input:
153 153
154 154 dataIn : objeto del tipo JROData
155 155
156 156 Return:
157 157
158 158 None
159 159
160 160 Affected:
161 161 __buffer : buffer de recepcion de datos.
162 162
163 163 """
164 164 if not self.isConfig:
165 165 self.setup(**kwargs)
166 166
167 167 raise NotImplementedError
168 168
169 169 def close(self):
170 170
171 171 return
172 172
173 173
174 174 def MPDecorator(BaseClass):
175 175 """
176 176 Multiprocessing class decorator
177 177
178 178 This function add multiprocessing features to a BaseClass. Also, it handle
179 179 the communication beetween processes (readers, procUnits and operations).
180 180 """
181 181
182 182 class MPClass(BaseClass, Process):
183 183
184 184 def __init__(self, *args, **kwargs):
185 185 super(MPClass, self).__init__()
186 186 Process.__init__(self)
187 187 self.operationKwargs = {}
188 188 self.args = args
189 189 self.kwargs = kwargs
190 190 self.sender = None
191 191 self.receiver = None
192 self.i = 0
192 193 self.name = BaseClass.__name__
193 194 if 'plot' in self.name.lower() and not self.name.endswith('_'):
194 195 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
195 196 self.start_time = time.time()
196 197
197 198 if len(self.args) is 3:
198 199 self.typeProc = "ProcUnit"
199 200 self.id = args[0]
200 201 self.inputId = args[1]
201 202 self.project_id = args[2]
202 203 elif len(self.args) is 2:
203 204 self.id = args[0]
204 205 self.inputId = args[0]
205 206 self.project_id = args[1]
206 207 self.typeProc = "Operation"
208
209 def fix_publish(self,valor,multiple1):
210 return True if valor%multiple1 ==0 else False
207 211
208 212 def subscribe(self):
209 213 '''
210 214 This function create a socket to receive objects from the
211 215 topic `inputId`.
212 216 '''
213 217
214 218 c = zmq.Context()
215 219 self.receiver = c.socket(zmq.SUB)
216 220 self.receiver.connect(
217 221 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
218 222 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
219 223
220 224 def listen(self):
221 225 '''
222 226 This function waits for objects and deserialize using pickle
223 227 '''
224
225 data = pickle.loads(self.receiver.recv_multipart()[1])
228 try:
229 data = pickle.loads(self.receiver.recv_multipart()[1])
230 except zmq.ZMQError as e:
231 if e.errno == zmq.ETERM:
232 print (e.errno)
226 233
227 234 return data
228 235
229 236 def set_publisher(self):
230 237 '''
231 238 This function create a socket for publishing purposes.
232 239 '''
233 240
234 241 time.sleep(1)
235 242 c = zmq.Context()
236 243 self.sender = c.socket(zmq.PUB)
237 244 self.sender.connect(
238 245 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
239 246
240 247 def publish(self, data, id):
241 248 '''
242 249 This function publish an object, to a specific topic.
250 The fix method only affect inputId None which is Read Unit
251 Use value between 64 80, you should notice a little retard in processing
243 252 '''
253 if self.inputId is None:
254 self.i+=1
255 if self.fix_publish(self.i,80) == True:# value n
256 time.sleep(0.01)
257
244 258 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
245 259
246 260 def runReader(self):
247 261 '''
248 262 Run fuction for read units
249 263 '''
250 264 while True:
251 265
252 266 BaseClass.run(self, **self.kwargs)
253 267
254 268 for op, optype, opId, kwargs in self.operations:
255 269 if optype == 'self' and not self.dataOut.flagNoData:
256 270 op(**kwargs)
257 271 elif optype == 'other' and not self.dataOut.flagNoData:
258 272 self.dataOut = op.run(self.dataOut, **self.kwargs)
259 273 elif optype == 'external':
260 274 self.publish(self.dataOut, opId)
261 275
262 276 if self.dataOut.flagNoData and not self.dataOut.error:
263 277 continue
264 278
265 279 self.publish(self.dataOut, self.id)
266 280
267 281 if self.dataOut.error:
268 282 log.error(self.dataOut.error, self.name)
269 283 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
270 284 break
271 285
272 286 time.sleep(1)
273 287
274 288 def runProc(self):
275 289 '''
276 290 Run function for proccessing units
277 291 '''
278 292
279 293 while True:
280 294 self.dataIn = self.listen()
281 295
282 296 if self.dataIn.flagNoData and self.dataIn.error is None:
283 297 continue
284 298
285 299 BaseClass.run(self, **self.kwargs)
286 300
287 301 if self.dataIn.error:
288 302 self.dataOut.error = self.dataIn.error
289 303 self.dataOut.flagNoData = True
290 304
291 305 for op, optype, opId, kwargs in self.operations:
292 306 if optype == 'self' and not self.dataOut.flagNoData:
293 307 op(**kwargs)
294 308 elif optype == 'other' and not self.dataOut.flagNoData:
295 309 self.dataOut = op.run(self.dataOut, **kwargs)
296 310 elif optype == 'external' and not self.dataOut.flagNoData:
297 311 self.publish(self.dataOut, opId)
298 312
299 313 if not self.dataOut.flagNoData or self.dataOut.error:
300 314 self.publish(self.dataOut, self.id)
301 315 for op, optype, opId, kwargs in self.operations:
302 316 if optype == 'self' and self.dataOut.error:
303 317 op(**kwargs)
304 318 elif optype == 'other' and self.dataOut.error:
305 319 self.dataOut = op.run(self.dataOut, **kwargs)
306 320 elif optype == 'external' and self.dataOut.error:
307 321 self.publish(self.dataOut, opId)
308 322
309 323 if self.dataIn.error:
310 324 break
311 325
312 326 time.sleep(1)
313 327
314 328 def runOp(self):
315 329 '''
316 330 Run function for external operations (this operations just receive data
317 331 ex: plots, writers, publishers)
318 332 '''
319 333
320 334 while True:
321 335
322 336 dataOut = self.listen()
323 337
324 338 BaseClass.run(self, dataOut, **self.kwargs)
325 339
326 340 if dataOut.error:
327 341 break
328 342
329 343 time.sleep(1)
330 344
331 345 def run(self):
332 346 if self.typeProc is "ProcUnit":
333 347
334 348 if self.inputId is not None:
335 349
336 350 self.subscribe()
337 351
338 352 self.set_publisher()
339 353
340 354 if 'Reader' not in BaseClass.__name__:
341 355 self.runProc()
342 356 else:
343 357 self.runReader()
344 358
345 359 elif self.typeProc is "Operation":
346 360
347 361 self.subscribe()
348 362 self.runOp()
349 363
350 364 else:
351 365 raise ValueError("Unknown type")
352 366
353 367 self.close()
354 368
355 369 def event_monitor(self, monitor):
356 370
357 371 events = {}
358 372
359 373 for name in dir(zmq):
360 374 if name.startswith('EVENT_'):
361 375 value = getattr(zmq, name)
362 376 events[value] = name
363 377
364 378 while monitor.poll():
365 379 evt = recv_monitor_message(monitor)
366 380 if evt['event'] == 32:
367 381 self.connections += 1
368 382 if evt['event'] == 512:
369 383 pass
370 384
371 385 evt.update({'description': events[evt['event']]})
372 386
373 387 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
374 388 break
375 389 monitor.close()
376 390 print('event monitor thread done!')
377 391
378 392 def close(self):
379 393
380 394 BaseClass.close(self)
381 395
382 396 if self.sender:
383 397 self.sender.close()
384 398
385 399 if self.receiver:
386 400 self.receiver.close()
387 401
388 402 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
389 403
390 404 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now