##// END OF EJS Templates
Fix MPDecorator (check flagNoData for all type of operations)
Juan C. Espinoza -
r1198:f0bf03007c43
parent child
Show More
@@ -1,382 +1,385
1 1 '''
2 2 Updated for multiprocessing
3 3 Author : Sergio Cortez
4 4 Jan 2018
5 5 Abstract:
6 6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9 9
10 10 Based on:
11 11 $Author: murco $
12 12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 13 '''
14 14
15 15 import inspect
16 16 import zmq
17 17 import time
18 18 import pickle
19 19 import os
20 20 from multiprocessing import Process
21 21 from zmq.utils.monitor import recv_monitor_message
22 22
23 23 from schainpy.utils import log
24 24
25 25
26 26 class ProcessingUnit(object):
27 27
28 28 """
29 29 Update - Jan 2018 - MULTIPROCESSING
30 30 All the "call" methods present in the previous base were removed.
31 31 The majority of operations are independant processes, thus
32 32 the decorator is in charge of communicate the operation processes
33 33 with the proccessing unit via IPC.
34 34
35 35 The constructor does not receive any argument. The remaining methods
36 36 are related with the operations to execute.
37 37
38 38
39 39 """
40 40
41 41 def __init__(self):
42 42
43 43 self.dataIn = None
44 44 self.dataOut = None
45 45 self.isConfig = False
46 46 self.operations = []
47 47 self.plots = []
48 48
49 49 def getAllowedArgs(self):
50 50 if hasattr(self, '__attrs__'):
51 51 return self.__attrs__
52 52 else:
53 53 return inspect.getargspec(self.run).args
54 54
55 55 def addOperation(self, conf, operation):
56 56 """
57 57 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
58 58 posses the id of the operation process (IPC purposes)
59 59
60 60 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
61 61 identificador asociado a este objeto.
62 62
63 63 Input:
64 64
65 65 object : objeto de la clase "Operation"
66 66
67 67 Return:
68 68
69 69 objId : identificador del objeto, necesario para comunicar con master(procUnit)
70 70 """
71 71
72 72 self.operations.append(
73 73 (operation, conf.type, conf.id, conf.getKwargs()))
74 74
75 75 if 'plot' in self.name.lower():
76 76 self.plots.append(operation.CODE)
77 77
78 78 def getOperationObj(self, objId):
79 79
80 80 if objId not in list(self.operations.keys()):
81 81 return None
82 82
83 83 return self.operations[objId]
84 84
85 85 def operation(self, **kwargs):
86 86 """
87 87 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
88 88 atributos del objeto dataOut
89 89
90 90 Input:
91 91
92 92 **kwargs : Diccionario de argumentos de la funcion a ejecutar
93 93 """
94 94
95 95 raise NotImplementedError
96 96
97 97 def setup(self):
98 98
99 99 raise NotImplementedError
100 100
101 101 def run(self):
102 102
103 103 raise NotImplementedError
104 104
105 105 def close(self):
106 106
107 107 return
108 108
109 109
110 110 class Operation(object):
111 111
112 112 """
113 113 Update - Jan 2018 - MULTIPROCESSING
114 114
115 115 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
116 116 The constructor doe snot receive any argument, neither the baseclass.
117 117
118 118
119 119 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
120 120 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
121 121 acumulacion dentro de esta clase
122 122
123 123 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
124 124
125 125 """
126 126
127 127 def __init__(self):
128 128
129 129 self.id = None
130 130 self.isConfig = False
131 131
132 132 if not hasattr(self, 'name'):
133 133 self.name = self.__class__.__name__
134 134
135 135 def getAllowedArgs(self):
136 136 if hasattr(self, '__attrs__'):
137 137 return self.__attrs__
138 138 else:
139 139 return inspect.getargspec(self.run).args
140 140
141 141 def setup(self):
142 142
143 143 self.isConfig = True
144 144
145 145 raise NotImplementedError
146 146
147 147 def run(self, dataIn, **kwargs):
148 148 """
149 149 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
150 150 atributos del objeto dataIn.
151 151
152 152 Input:
153 153
154 154 dataIn : objeto del tipo JROData
155 155
156 156 Return:
157 157
158 158 None
159 159
160 160 Affected:
161 161 __buffer : buffer de recepcion de datos.
162 162
163 163 """
164 164 if not self.isConfig:
165 165 self.setup(**kwargs)
166 166
167 167 raise NotImplementedError
168 168
169 169 def close(self):
170 170
171 171 return
172 172
173 173
174 174 def MPDecorator(BaseClass):
175 175 """
176 176 Multiprocessing class decorator
177 177
178 178 This function add multiprocessing features to a BaseClass. Also, it handle
179 179 the communication beetween processes (readers, procUnits and operations).
180 180 """
181 181
182 182 class MPClass(BaseClass, Process):
183 183
184 184 def __init__(self, *args, **kwargs):
185 185 super(MPClass, self).__init__()
186 186 Process.__init__(self)
187 187 self.operationKwargs = {}
188 188 self.args = args
189 189 self.kwargs = kwargs
190 190 self.sender = None
191 191 self.receiver = None
192 192 self.name = BaseClass.__name__
193 193 if 'plot' in self.name.lower():
194 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
194 if not self.name.endswith('_'):
195 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
195 196 self.start_time = time.time()
196 197
197 198 if len(self.args) is 3:
198 199 self.typeProc = "ProcUnit"
199 200 self.id = args[0]
200 201 self.inputId = args[1]
201 202 self.project_id = args[2]
202 203 elif len(self.args) is 2:
203 204 self.id = args[0]
204 205 self.inputId = args[0]
205 206 self.project_id = args[1]
206 207 self.typeProc = "Operation"
207 208
208 209 def subscribe(self):
209 210 '''
210 211 This function create a socket to receive objects from the
211 212 topic `inputId`.
212 213 '''
213 214
214 215 c = zmq.Context()
215 216 self.receiver = c.socket(zmq.SUB)
216 217 self.receiver.connect(
217 218 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
218 219 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
219 220
220 221 def listen(self):
221 222 '''
222 223 This function waits for objects and deserialize using pickle
223 224 '''
224 225
225 226 data = pickle.loads(self.receiver.recv_multipart()[1])
226 227
227 228 return data
228 229
229 230 def set_publisher(self):
230 231 '''
231 232 This function create a socket for publishing purposes.
232 233 '''
233 234
234 235 time.sleep(1)
235 236 c = zmq.Context()
236 237 self.sender = c.socket(zmq.PUB)
237 238 self.sender.connect(
238 239 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
239 240
240 241 def publish(self, data, id):
241 242 '''
242 243 This function publish an object, to a specific topic.
243 244 '''
244 245 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
245 246
246 247 def runReader(self):
247 248 '''
248 249 Run fuction for read units
249 250 '''
250 251 while True:
251 252
252 253 BaseClass.run(self, **self.kwargs)
253 254
254 255 for op, optype, opId, kwargs in self.operations:
255 if optype == 'self':
256 if optype == 'self' and not self.dataOut.flagNoData:
256 257 op(**kwargs)
257 elif optype == 'other':
258 elif optype == 'other' and not self.dataOut.flagNoData:
258 259 self.dataOut = op.run(self.dataOut, **self.kwargs)
259 260 elif optype == 'external':
260 261 self.publish(self.dataOut, opId)
261 262
262 263 if self.dataOut.flagNoData and not self.dataOut.error:
263 264 continue
264 265
265 266 self.publish(self.dataOut, self.id)
266 267
267 268 if self.dataOut.error:
268 269 log.error(self.dataOut.error, self.name)
269 270 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
270 271 break
271 272
272 273 time.sleep(1)
273 274
274 275 def runProc(self):
275 276 '''
276 277 Run function for proccessing units
277 278 '''
278 279
279 280 while True:
280 281 self.dataIn = self.listen()
281 282
282 283 if self.dataIn.flagNoData and self.dataIn.error is None:
283 284 continue
284 285
285 286 BaseClass.run(self, **self.kwargs)
286 287
287 288 if self.dataIn.error:
288 289 self.dataOut.error = self.dataIn.error
289 290 self.dataOut.flagNoData = True
290 291
291 292 for op, optype, opId, kwargs in self.operations:
292 if optype == 'self':
293 if optype == 'self' and not self.dataOut.flagNoData:
293 294 op(**kwargs)
294 elif optype == 'other':
295 elif optype == 'other' and not self.dataOut.flagNoData:
295 296 self.dataOut = op.run(self.dataOut, **kwargs)
296 297 elif optype == 'external':
297 298 if not self.dataOut.flagNoData or self.dataOut.error:
298 299 self.publish(self.dataOut, opId)
299 300
300 self.publish(self.dataOut, self.id)
301 if not self.dataOut.flagNoData or self.dataOut.error:
302 self.publish(self.dataOut, self.id)
303
301 304 if self.dataIn.error:
302 305 break
303 306
304 307 time.sleep(1)
305 308
306 309 def runOp(self):
307 310 '''
308 311 Run function for external operations (this operations just receive data
309 312 ex: plots, writers, publishers)
310 313 '''
311 314
312 315 while True:
313 316
314 317 dataOut = self.listen()
315 318
316 319 BaseClass.run(self, dataOut, **self.kwargs)
317 320
318 321 if dataOut.error:
319 322 break
320 323
321 324 time.sleep(1)
322 325
323 326 def run(self):
324 327 if self.typeProc is "ProcUnit":
325 328
326 329 if self.inputId is not None:
327 330
328 331 self.subscribe()
329 332
330 333 self.set_publisher()
331 334
332 335 if 'Reader' not in BaseClass.__name__:
333 336 self.runProc()
334 337 else:
335 338 self.runReader()
336 339
337 340 elif self.typeProc is "Operation":
338 341
339 342 self.subscribe()
340 343 self.runOp()
341 344
342 345 else:
343 346 raise ValueError("Unknown type")
344 347
345 348 self.close()
346 349
347 350 def event_monitor(self, monitor):
348 351
349 352 events = {}
350 353
351 354 for name in dir(zmq):
352 355 if name.startswith('EVENT_'):
353 356 value = getattr(zmq, name)
354 357 events[value] = name
355 358
356 359 while monitor.poll():
357 360 evt = recv_monitor_message(monitor)
358 361 if evt['event'] == 32:
359 362 self.connections += 1
360 363 if evt['event'] == 512:
361 364 pass
362 365
363 366 evt.update({'description': events[evt['event']]})
364 367
365 368 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
366 369 break
367 370 monitor.close()
368 371 print('event monitor thread done!')
369 372
370 373 def close(self):
371 374
372 375 BaseClass.close(self)
373 376
374 377 if self.sender:
375 378 self.sender.close()
376 379
377 380 if self.receiver:
378 381 self.receiver.close()
379 382
380 383 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
381 384
382 385 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now