##// END OF EJS Templates
Fix excessive memory RAM consumption
jespinoza -
r1268:b2726fff6520
parent child
Show More
@@ -1,1290 +1,1290
1 '''
1 '''
2 Updated on January , 2018, for multiprocessing purposes
2 Updated on January , 2018, for multiprocessing purposes
3 Author: Sergio Cortez
3 Author: Sergio Cortez
4 Created on September , 2012
4 Created on September , 2012
5 '''
5 '''
6 from platform import python_version
6 from platform import python_version
7 import sys
7 import sys
8 import ast
8 import ast
9 import datetime
9 import datetime
10 import traceback
10 import traceback
11 import math
11 import math
12 import time
12 import time
13 import zmq
13 import zmq
14 from multiprocessing import Process, Queue, Event, cpu_count
14 from multiprocessing import Process, Queue, Event, Value, cpu_count
15 from threading import Thread
15 from threading import Thread
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 from xml.dom import minidom
17 from xml.dom import minidom
18
18
19
19
20 from schainpy.admin import Alarm, SchainWarning
20 from schainpy.admin import Alarm, SchainWarning
21 from schainpy.model import *
21 from schainpy.model import *
22 from schainpy.utils import log
22 from schainpy.utils import log
23
23
24
24
25 DTYPES = {
25 DTYPES = {
26 'Voltage': '.r',
26 'Voltage': '.r',
27 'Spectra': '.pdata'
27 'Spectra': '.pdata'
28 }
28 }
29
29
30
30
31 def MPProject(project, n=cpu_count()):
31 def MPProject(project, n=cpu_count()):
32 '''
32 '''
33 Project wrapper to run schain in n processes
33 Project wrapper to run schain in n processes
34 '''
34 '''
35
35
36 rconf = project.getReadUnitObj()
36 rconf = project.getReadUnitObj()
37 op = rconf.getOperationObj('run')
37 op = rconf.getOperationObj('run')
38 dt1 = op.getParameterValue('startDate')
38 dt1 = op.getParameterValue('startDate')
39 dt2 = op.getParameterValue('endDate')
39 dt2 = op.getParameterValue('endDate')
40 tm1 = op.getParameterValue('startTime')
40 tm1 = op.getParameterValue('startTime')
41 tm2 = op.getParameterValue('endTime')
41 tm2 = op.getParameterValue('endTime')
42 days = (dt2 - dt1).days
42 days = (dt2 - dt1).days
43
43
44 for day in range(days + 1):
44 for day in range(days + 1):
45 skip = 0
45 skip = 0
46 cursor = 0
46 cursor = 0
47 processes = []
47 processes = []
48 dt = dt1 + datetime.timedelta(day)
48 dt = dt1 + datetime.timedelta(day)
49 dt_str = dt.strftime('%Y/%m/%d')
49 dt_str = dt.strftime('%Y/%m/%d')
50 reader = JRODataReader()
50 reader = JRODataReader()
51 paths, files = reader.searchFilesOffLine(path=rconf.path,
51 paths, files = reader.searchFilesOffLine(path=rconf.path,
52 startDate=dt,
52 startDate=dt,
53 endDate=dt,
53 endDate=dt,
54 startTime=tm1,
54 startTime=tm1,
55 endTime=tm2,
55 endTime=tm2,
56 ext=DTYPES[rconf.datatype])
56 ext=DTYPES[rconf.datatype])
57 nFiles = len(files)
57 nFiles = len(files)
58 if nFiles == 0:
58 if nFiles == 0:
59 continue
59 continue
60 skip = int(math.ceil(nFiles / n))
60 skip = int(math.ceil(nFiles / n))
61 while nFiles > cursor * skip:
61 while nFiles > cursor * skip:
62 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
62 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
63 skip=skip)
63 skip=skip)
64 p = project.clone()
64 p = project.clone()
65 p.start()
65 p.start()
66 processes.append(p)
66 processes.append(p)
67 cursor += 1
67 cursor += 1
68
68
69 def beforeExit(exctype, value, trace):
69 def beforeExit(exctype, value, trace):
70 for process in processes:
70 for process in processes:
71 process.terminate()
71 process.terminate()
72 process.join()
72 process.join()
73 print(traceback.print_tb(trace))
73 print(traceback.print_tb(trace))
74
74
75 sys.excepthook = beforeExit
75 sys.excepthook = beforeExit
76
76
77 for process in processes:
77 for process in processes:
78 process.join()
78 process.join()
79 process.terminate()
79 process.terminate()
80
80
81 time.sleep(3)
81 time.sleep(3)
82
82
83 def wait(context):
83 def wait(context):
84
84
85 time.sleep(1)
85 time.sleep(1)
86 c = zmq.Context()
86 c = zmq.Context()
87 receiver = c.socket(zmq.SUB)
87 receiver = c.socket(zmq.SUB)
88 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
88 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
89 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
89 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
90 msg = receiver.recv_multipart()[1]
90 msg = receiver.recv_multipart()[1]
91 context.terminate()
91 context.terminate()
92
92
93 class ParameterConf():
93 class ParameterConf():
94
94
95 id = None
95 id = None
96 name = None
96 name = None
97 value = None
97 value = None
98 format = None
98 format = None
99
99
100 __formated_value = None
100 __formated_value = None
101
101
102 ELEMENTNAME = 'Parameter'
102 ELEMENTNAME = 'Parameter'
103
103
104 def __init__(self):
104 def __init__(self):
105
105
106 self.format = 'str'
106 self.format = 'str'
107
107
108 def getElementName(self):
108 def getElementName(self):
109
109
110 return self.ELEMENTNAME
110 return self.ELEMENTNAME
111
111
112 def getValue(self):
112 def getValue(self):
113
113
114 value = self.value
114 value = self.value
115 format = self.format
115 format = self.format
116
116
117 if self.__formated_value != None:
117 if self.__formated_value != None:
118
118
119 return self.__formated_value
119 return self.__formated_value
120
120
121 if format == 'obj':
121 if format == 'obj':
122 return value
122 return value
123
123
124 if format == 'str':
124 if format == 'str':
125 self.__formated_value = str(value)
125 self.__formated_value = str(value)
126 return self.__formated_value
126 return self.__formated_value
127
127
128 if value == '':
128 if value == '':
129 raise ValueError('%s: This parameter value is empty' % self.name)
129 raise ValueError('%s: This parameter value is empty' % self.name)
130
130
131 if format == 'list':
131 if format == 'list':
132 strList = [s.strip() for s in value.split(',')]
132 strList = [s.strip() for s in value.split(',')]
133 self.__formated_value = strList
133 self.__formated_value = strList
134
134
135 return self.__formated_value
135 return self.__formated_value
136
136
137 if format == 'intlist':
137 if format == 'intlist':
138 '''
138 '''
139 Example:
139 Example:
140 value = (0,1,2)
140 value = (0,1,2)
141 '''
141 '''
142
142
143 new_value = ast.literal_eval(value)
143 new_value = ast.literal_eval(value)
144
144
145 if type(new_value) not in (tuple, list):
145 if type(new_value) not in (tuple, list):
146 new_value = [int(new_value)]
146 new_value = [int(new_value)]
147
147
148 self.__formated_value = new_value
148 self.__formated_value = new_value
149
149
150 return self.__formated_value
150 return self.__formated_value
151
151
152 if format == 'floatlist':
152 if format == 'floatlist':
153 '''
153 '''
154 Example:
154 Example:
155 value = (0.5, 1.4, 2.7)
155 value = (0.5, 1.4, 2.7)
156 '''
156 '''
157
157
158 new_value = ast.literal_eval(value)
158 new_value = ast.literal_eval(value)
159
159
160 if type(new_value) not in (tuple, list):
160 if type(new_value) not in (tuple, list):
161 new_value = [float(new_value)]
161 new_value = [float(new_value)]
162
162
163 self.__formated_value = new_value
163 self.__formated_value = new_value
164
164
165 return self.__formated_value
165 return self.__formated_value
166
166
167 if format == 'date':
167 if format == 'date':
168 strList = value.split('/')
168 strList = value.split('/')
169 intList = [int(x) for x in strList]
169 intList = [int(x) for x in strList]
170 date = datetime.date(intList[0], intList[1], intList[2])
170 date = datetime.date(intList[0], intList[1], intList[2])
171
171
172 self.__formated_value = date
172 self.__formated_value = date
173
173
174 return self.__formated_value
174 return self.__formated_value
175
175
176 if format == 'time':
176 if format == 'time':
177 strList = value.split(':')
177 strList = value.split(':')
178 intList = [int(x) for x in strList]
178 intList = [int(x) for x in strList]
179 time = datetime.time(intList[0], intList[1], intList[2])
179 time = datetime.time(intList[0], intList[1], intList[2])
180
180
181 self.__formated_value = time
181 self.__formated_value = time
182
182
183 return self.__formated_value
183 return self.__formated_value
184
184
185 if format == 'pairslist':
185 if format == 'pairslist':
186 '''
186 '''
187 Example:
187 Example:
188 value = (0,1),(1,2)
188 value = (0,1),(1,2)
189 '''
189 '''
190
190
191 new_value = ast.literal_eval(value)
191 new_value = ast.literal_eval(value)
192
192
193 if type(new_value) not in (tuple, list):
193 if type(new_value) not in (tuple, list):
194 raise ValueError('%s has to be a tuple or list of pairs' % value)
194 raise ValueError('%s has to be a tuple or list of pairs' % value)
195
195
196 if type(new_value[0]) not in (tuple, list):
196 if type(new_value[0]) not in (tuple, list):
197 if len(new_value) != 2:
197 if len(new_value) != 2:
198 raise ValueError('%s has to be a tuple or list of pairs' % value)
198 raise ValueError('%s has to be a tuple or list of pairs' % value)
199 new_value = [new_value]
199 new_value = [new_value]
200
200
201 for thisPair in new_value:
201 for thisPair in new_value:
202 if len(thisPair) != 2:
202 if len(thisPair) != 2:
203 raise ValueError('%s has to be a tuple or list of pairs' % value)
203 raise ValueError('%s has to be a tuple or list of pairs' % value)
204
204
205 self.__formated_value = new_value
205 self.__formated_value = new_value
206
206
207 return self.__formated_value
207 return self.__formated_value
208
208
209 if format == 'multilist':
209 if format == 'multilist':
210 '''
210 '''
211 Example:
211 Example:
212 value = (0,1,2),(3,4,5)
212 value = (0,1,2),(3,4,5)
213 '''
213 '''
214 multiList = ast.literal_eval(value)
214 multiList = ast.literal_eval(value)
215
215
216 if type(multiList[0]) == int:
216 if type(multiList[0]) == int:
217 multiList = ast.literal_eval('(' + value + ')')
217 multiList = ast.literal_eval('(' + value + ')')
218
218
219 self.__formated_value = multiList
219 self.__formated_value = multiList
220
220
221 return self.__formated_value
221 return self.__formated_value
222
222
223 if format == 'bool':
223 if format == 'bool':
224 value = int(value)
224 value = int(value)
225
225
226 if format == 'int':
226 if format == 'int':
227 value = float(value)
227 value = float(value)
228
228
229 format_func = eval(format)
229 format_func = eval(format)
230
230
231 self.__formated_value = format_func(value)
231 self.__formated_value = format_func(value)
232
232
233 return self.__formated_value
233 return self.__formated_value
234
234
235 def updateId(self, new_id):
235 def updateId(self, new_id):
236
236
237 self.id = str(new_id)
237 self.id = str(new_id)
238
238
239 def setup(self, id, name, value, format='str'):
239 def setup(self, id, name, value, format='str'):
240 self.id = str(id)
240 self.id = str(id)
241 self.name = name
241 self.name = name
242 if format == 'obj':
242 if format == 'obj':
243 self.value = value
243 self.value = value
244 else:
244 else:
245 self.value = str(value)
245 self.value = str(value)
246 self.format = str.lower(format)
246 self.format = str.lower(format)
247
247
248 self.getValue()
248 self.getValue()
249
249
250 return 1
250 return 1
251
251
252 def update(self, name, value, format='str'):
252 def update(self, name, value, format='str'):
253
253
254 self.name = name
254 self.name = name
255 self.value = str(value)
255 self.value = str(value)
256 self.format = format
256 self.format = format
257
257
258 def makeXml(self, opElement):
258 def makeXml(self, opElement):
259 if self.name not in ('queue',):
259 if self.name not in ('queue',):
260 parmElement = SubElement(opElement, self.ELEMENTNAME)
260 parmElement = SubElement(opElement, self.ELEMENTNAME)
261 parmElement.set('id', str(self.id))
261 parmElement.set('id', str(self.id))
262 parmElement.set('name', self.name)
262 parmElement.set('name', self.name)
263 parmElement.set('value', self.value)
263 parmElement.set('value', self.value)
264 parmElement.set('format', self.format)
264 parmElement.set('format', self.format)
265
265
266 def readXml(self, parmElement):
266 def readXml(self, parmElement):
267
267
268 self.id = parmElement.get('id')
268 self.id = parmElement.get('id')
269 self.name = parmElement.get('name')
269 self.name = parmElement.get('name')
270 self.value = parmElement.get('value')
270 self.value = parmElement.get('value')
271 self.format = str.lower(parmElement.get('format'))
271 self.format = str.lower(parmElement.get('format'))
272
272
273 # Compatible with old signal chain version
273 # Compatible with old signal chain version
274 if self.format == 'int' and self.name == 'idfigure':
274 if self.format == 'int' and self.name == 'idfigure':
275 self.name = 'id'
275 self.name = 'id'
276
276
277 def printattr(self):
277 def printattr(self):
278
278
279 print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id))
279 print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id))
280
280
281 class OperationConf():
281 class OperationConf():
282
282
283 ELEMENTNAME = 'Operation'
283 ELEMENTNAME = 'Operation'
284
284
285 def __init__(self):
285 def __init__(self):
286
286
287 self.id = '0'
287 self.id = '0'
288 self.name = None
288 self.name = None
289 self.priority = None
289 self.priority = None
290 self.topic = None
290 self.topic = None
291
291
292 def __getNewId(self):
292 def __getNewId(self):
293
293
294 return int(self.id) * 10 + len(self.parmConfObjList) + 1
294 return int(self.id) * 10 + len(self.parmConfObjList) + 1
295
295
296 def getId(self):
296 def getId(self):
297 return self.id
297 return self.id
298
298
299 def updateId(self, new_id):
299 def updateId(self, new_id):
300
300
301 self.id = str(new_id)
301 self.id = str(new_id)
302
302
303 n = 1
303 n = 1
304 for parmObj in self.parmConfObjList:
304 for parmObj in self.parmConfObjList:
305
305
306 idParm = str(int(new_id) * 10 + n)
306 idParm = str(int(new_id) * 10 + n)
307 parmObj.updateId(idParm)
307 parmObj.updateId(idParm)
308
308
309 n += 1
309 n += 1
310
310
311 def getElementName(self):
311 def getElementName(self):
312
312
313 return self.ELEMENTNAME
313 return self.ELEMENTNAME
314
314
315 def getParameterObjList(self):
315 def getParameterObjList(self):
316
316
317 return self.parmConfObjList
317 return self.parmConfObjList
318
318
319 def getParameterObj(self, parameterName):
319 def getParameterObj(self, parameterName):
320
320
321 for parmConfObj in self.parmConfObjList:
321 for parmConfObj in self.parmConfObjList:
322
322
323 if parmConfObj.name != parameterName:
323 if parmConfObj.name != parameterName:
324 continue
324 continue
325
325
326 return parmConfObj
326 return parmConfObj
327
327
328 return None
328 return None
329
329
330 def getParameterObjfromValue(self, parameterValue):
330 def getParameterObjfromValue(self, parameterValue):
331
331
332 for parmConfObj in self.parmConfObjList:
332 for parmConfObj in self.parmConfObjList:
333
333
334 if parmConfObj.getValue() != parameterValue:
334 if parmConfObj.getValue() != parameterValue:
335 continue
335 continue
336
336
337 return parmConfObj.getValue()
337 return parmConfObj.getValue()
338
338
339 return None
339 return None
340
340
341 def getParameterValue(self, parameterName):
341 def getParameterValue(self, parameterName):
342
342
343 parameterObj = self.getParameterObj(parameterName)
343 parameterObj = self.getParameterObj(parameterName)
344
344
345 # if not parameterObj:
345 # if not parameterObj:
346 # return None
346 # return None
347
347
348 value = parameterObj.getValue()
348 value = parameterObj.getValue()
349
349
350 return value
350 return value
351
351
352 def getKwargs(self):
352 def getKwargs(self):
353
353
354 kwargs = {}
354 kwargs = {}
355
355
356 for parmConfObj in self.parmConfObjList:
356 for parmConfObj in self.parmConfObjList:
357 if self.name == 'run' and parmConfObj.name == 'datatype':
357 if self.name == 'run' and parmConfObj.name == 'datatype':
358 continue
358 continue
359
359
360 kwargs[parmConfObj.name] = parmConfObj.getValue()
360 kwargs[parmConfObj.name] = parmConfObj.getValue()
361
361
362 return kwargs
362 return kwargs
363
363
364 def setup(self, id, name, priority, type, project_id, err_queue, lock):
364 def setup(self, id, name, priority, type, project_id, err_queue, lock):
365
365
366 self.id = str(id)
366 self.id = str(id)
367 self.project_id = project_id
367 self.project_id = project_id
368 self.name = name
368 self.name = name
369 self.type = type
369 self.type = type
370 self.priority = priority
370 self.priority = priority
371 self.err_queue = err_queue
371 self.err_queue = err_queue
372 self.lock = lock
372 self.lock = lock
373 self.parmConfObjList = []
373 self.parmConfObjList = []
374
374
375 def removeParameters(self):
375 def removeParameters(self):
376
376
377 for obj in self.parmConfObjList:
377 for obj in self.parmConfObjList:
378 del obj
378 del obj
379
379
380 self.parmConfObjList = []
380 self.parmConfObjList = []
381
381
382 def addParameter(self, name, value, format='str'):
382 def addParameter(self, name, value, format='str'):
383
383
384 if value is None:
384 if value is None:
385 return None
385 return None
386 id = self.__getNewId()
386 id = self.__getNewId()
387
387
388 parmConfObj = ParameterConf()
388 parmConfObj = ParameterConf()
389 if not parmConfObj.setup(id, name, value, format):
389 if not parmConfObj.setup(id, name, value, format):
390 return None
390 return None
391
391
392 self.parmConfObjList.append(parmConfObj)
392 self.parmConfObjList.append(parmConfObj)
393
393
394 return parmConfObj
394 return parmConfObj
395
395
396 def changeParameter(self, name, value, format='str'):
396 def changeParameter(self, name, value, format='str'):
397
397
398 parmConfObj = self.getParameterObj(name)
398 parmConfObj = self.getParameterObj(name)
399 parmConfObj.update(name, value, format)
399 parmConfObj.update(name, value, format)
400
400
401 return parmConfObj
401 return parmConfObj
402
402
403 def makeXml(self, procUnitElement):
403 def makeXml(self, procUnitElement):
404
404
405 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
405 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
406 opElement.set('id', str(self.id))
406 opElement.set('id', str(self.id))
407 opElement.set('name', self.name)
407 opElement.set('name', self.name)
408 opElement.set('type', self.type)
408 opElement.set('type', self.type)
409 opElement.set('priority', str(self.priority))
409 opElement.set('priority', str(self.priority))
410
410
411 for parmConfObj in self.parmConfObjList:
411 for parmConfObj in self.parmConfObjList:
412 parmConfObj.makeXml(opElement)
412 parmConfObj.makeXml(opElement)
413
413
414 def readXml(self, opElement, project_id):
414 def readXml(self, opElement, project_id):
415
415
416 self.id = opElement.get('id')
416 self.id = opElement.get('id')
417 self.name = opElement.get('name')
417 self.name = opElement.get('name')
418 self.type = opElement.get('type')
418 self.type = opElement.get('type')
419 self.priority = opElement.get('priority')
419 self.priority = opElement.get('priority')
420 self.project_id = str(project_id)
420 self.project_id = str(project_id)
421
421
422 # Compatible with old signal chain version
422 # Compatible with old signal chain version
423 # Use of 'run' method instead 'init'
423 # Use of 'run' method instead 'init'
424 if self.type == 'self' and self.name == 'init':
424 if self.type == 'self' and self.name == 'init':
425 self.name = 'run'
425 self.name = 'run'
426
426
427 self.parmConfObjList = []
427 self.parmConfObjList = []
428
428
429 parmElementList = opElement.iter(ParameterConf().getElementName())
429 parmElementList = opElement.iter(ParameterConf().getElementName())
430
430
431 for parmElement in parmElementList:
431 for parmElement in parmElementList:
432 parmConfObj = ParameterConf()
432 parmConfObj = ParameterConf()
433 parmConfObj.readXml(parmElement)
433 parmConfObj.readXml(parmElement)
434
434
435 # Compatible with old signal chain version
435 # Compatible with old signal chain version
436 # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
436 # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
437 if self.type != 'self' and self.name == 'Plot':
437 if self.type != 'self' and self.name == 'Plot':
438 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
438 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
439 self.name = parmConfObj.value
439 self.name = parmConfObj.value
440 continue
440 continue
441
441
442 self.parmConfObjList.append(parmConfObj)
442 self.parmConfObjList.append(parmConfObj)
443
443
444 def printattr(self):
444 def printattr(self):
445
445
446 print('%s[%s]: name = %s, type = %s, priority = %s, project_id = %s' % (self.ELEMENTNAME,
446 print('%s[%s]: name = %s, type = %s, priority = %s, project_id = %s' % (self.ELEMENTNAME,
447 self.id,
447 self.id,
448 self.name,
448 self.name,
449 self.type,
449 self.type,
450 self.priority,
450 self.priority,
451 self.project_id))
451 self.project_id))
452
452
453 for parmConfObj in self.parmConfObjList:
453 for parmConfObj in self.parmConfObjList:
454 parmConfObj.printattr()
454 parmConfObj.printattr()
455
455
456 def createObject(self):
456 def createObject(self):
457
457
458 className = eval(self.name)
458 className = eval(self.name)
459
459
460 if self.type == 'other':
460 if self.type == 'other':
461 opObj = className()
461 opObj = className()
462 elif self.type == 'external':
462 elif self.type == 'external':
463 kwargs = self.getKwargs()
463 kwargs = self.getKwargs()
464 opObj = className(self.id, self.id, self.project_id, self.err_queue, self.lock, 'Operation', **kwargs)
464 opObj = className(self.id, self.id, self.project_id, self.err_queue, self.lock, 'Operation', **kwargs)
465 opObj.start()
465 opObj.start()
466 self.opObj = opObj
466 self.opObj = opObj
467
467
468 return opObj
468 return opObj
469
469
470 class ProcUnitConf():
470 class ProcUnitConf():
471
471
472 ELEMENTNAME = 'ProcUnit'
472 ELEMENTNAME = 'ProcUnit'
473
473
474 def __init__(self):
474 def __init__(self):
475
475
476 self.id = None
476 self.id = None
477 self.datatype = None
477 self.datatype = None
478 self.name = None
478 self.name = None
479 self.inputId = None
479 self.inputId = None
480 self.opConfObjList = []
480 self.opConfObjList = []
481 self.procUnitObj = None
481 self.procUnitObj = None
482 self.opObjDict = {}
482 self.opObjDict = {}
483 self.mylock = Event()
484
483
485 def __getPriority(self):
484 def __getPriority(self):
486
485
487 return len(self.opConfObjList) + 1
486 return len(self.opConfObjList) + 1
488
487
489 def __getNewId(self):
488 def __getNewId(self):
490
489
491 return int(self.id) * 10 + len(self.opConfObjList) + 1
490 return int(self.id) * 10 + len(self.opConfObjList) + 1
492
491
493 def getElementName(self):
492 def getElementName(self):
494
493
495 return self.ELEMENTNAME
494 return self.ELEMENTNAME
496
495
497 def getId(self):
496 def getId(self):
498
497
499 return self.id
498 return self.id
500
499
501 def updateId(self, new_id):
500 def updateId(self, new_id):
502 '''
501 '''
503 new_id = int(parentId) * 10 + (int(self.id) % 10)
502 new_id = int(parentId) * 10 + (int(self.id) % 10)
504 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
503 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
505
504
506 # If this proc unit has not inputs
505 # If this proc unit has not inputs
507 #if self.inputId == '0':
506 #if self.inputId == '0':
508 #new_inputId = 0
507 #new_inputId = 0
509
508
510 n = 1
509 n = 1
511 for opConfObj in self.opConfObjList:
510 for opConfObj in self.opConfObjList:
512
511
513 idOp = str(int(new_id) * 10 + n)
512 idOp = str(int(new_id) * 10 + n)
514 opConfObj.updateId(idOp)
513 opConfObj.updateId(idOp)
515
514
516 n += 1
515 n += 1
517
516
518 self.parentId = str(parentId)
517 self.parentId = str(parentId)
519 self.id = str(new_id)
518 self.id = str(new_id)
520 #self.inputId = str(new_inputId)
519 #self.inputId = str(new_inputId)
521 '''
520 '''
522 n = 1
521 n = 1
523
522
524 def getInputId(self):
523 def getInputId(self):
525
524
526 return self.inputId
525 return self.inputId
527
526
528 def getOperationObjList(self):
527 def getOperationObjList(self):
529
528
530 return self.opConfObjList
529 return self.opConfObjList
531
530
532 def getOperationObj(self, name=None):
531 def getOperationObj(self, name=None):
533
532
534 for opConfObj in self.opConfObjList:
533 for opConfObj in self.opConfObjList:
535
534
536 if opConfObj.name != name:
535 if opConfObj.name != name:
537 continue
536 continue
538
537
539 return opConfObj
538 return opConfObj
540
539
541 return None
540 return None
542
541
543 def getOpObjfromParamValue(self, value=None):
542 def getOpObjfromParamValue(self, value=None):
544
543
545 for opConfObj in self.opConfObjList:
544 for opConfObj in self.opConfObjList:
546 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
545 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
547 continue
546 continue
548 return opConfObj
547 return opConfObj
549 return None
548 return None
550
549
551 def getProcUnitObj(self):
550 def getProcUnitObj(self):
552
551
553 return self.procUnitObj
552 return self.procUnitObj
554
553
555 def setup(self, project_id, id, name, datatype, inputId, err_queue, lock):
554 def setup(self, project_id, id, name, datatype, inputId, err_queue, lock):
556 '''
555 '''
557 id sera el topico a publicar
556 id sera el topico a publicar
558 inputId sera el topico a subscribirse
557 inputId sera el topico a subscribirse
559 '''
558 '''
560
559
561 # Compatible with old signal chain version
560 # Compatible with old signal chain version
562 if datatype == None and name == None:
561 if datatype == None and name == None:
563 raise ValueError('datatype or name should be defined')
562 raise ValueError('datatype or name should be defined')
564
563
565 #Definir una condicion para inputId cuando sea 0
564 #Definir una condicion para inputId cuando sea 0
566
565
567 if name == None:
566 if name == None:
568 if 'Proc' in datatype:
567 if 'Proc' in datatype:
569 name = datatype
568 name = datatype
570 else:
569 else:
571 name = '%sProc' % (datatype)
570 name = '%sProc' % (datatype)
572
571
573 if datatype == None:
572 if datatype == None:
574 datatype = name.replace('Proc', '')
573 datatype = name.replace('Proc', '')
575
574
576 self.id = str(id)
575 self.id = str(id)
577 self.project_id = project_id
576 self.project_id = project_id
578 self.name = name
577 self.name = name
579 self.datatype = datatype
578 self.datatype = datatype
580 self.inputId = inputId
579 self.inputId = inputId
581 self.err_queue = err_queue
580 self.err_queue = err_queue
582 self.lock = lock
581 self.lock = lock
583 self.opConfObjList = []
582 self.opConfObjList = []
584
583
585 self.addOperation(name='run', optype='self')
584 self.addOperation(name='run', optype='self')
586
585
587 def removeOperations(self):
586 def removeOperations(self):
588
587
589 for obj in self.opConfObjList:
588 for obj in self.opConfObjList:
590 del obj
589 del obj
591
590
592 self.opConfObjList = []
591 self.opConfObjList = []
593 self.addOperation(name='run')
592 self.addOperation(name='run')
594
593
595 def addParameter(self, **kwargs):
594 def addParameter(self, **kwargs):
596 '''
595 '''
597 Add parameters to 'run' operation
596 Add parameters to 'run' operation
598 '''
597 '''
599 opObj = self.opConfObjList[0]
598 opObj = self.opConfObjList[0]
600
599
601 opObj.addParameter(**kwargs)
600 opObj.addParameter(**kwargs)
602
601
603 return opObj
602 return opObj
604
603
605 def addOperation(self, name, optype='self'):
604 def addOperation(self, name, optype='self'):
606 '''
605 '''
607 Actualizacion - > proceso comunicacion
606 Actualizacion - > proceso comunicacion
608 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
607 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
609 definir el tipoc de socket o comunicacion ipc++
608 definir el tipoc de socket o comunicacion ipc++
610
609
611 '''
610 '''
612
611
613 id = self.__getNewId()
612 id = self.__getNewId()
614 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
613 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
615 opConfObj = OperationConf()
614 opConfObj = OperationConf()
616 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.mylock)
615 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.lock)
617 self.opConfObjList.append(opConfObj)
616 self.opConfObjList.append(opConfObj)
618
617
619 return opConfObj
618 return opConfObj
620
619
621 def makeXml(self, projectElement):
620 def makeXml(self, projectElement):
622
621
623 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
622 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
624 procUnitElement.set('id', str(self.id))
623 procUnitElement.set('id', str(self.id))
625 procUnitElement.set('name', self.name)
624 procUnitElement.set('name', self.name)
626 procUnitElement.set('datatype', self.datatype)
625 procUnitElement.set('datatype', self.datatype)
627 procUnitElement.set('inputId', str(self.inputId))
626 procUnitElement.set('inputId', str(self.inputId))
628
627
629 for opConfObj in self.opConfObjList:
628 for opConfObj in self.opConfObjList:
630 opConfObj.makeXml(procUnitElement)
629 opConfObj.makeXml(procUnitElement)
631
630
632 def readXml(self, upElement, project_id):
631 def readXml(self, upElement, project_id):
633
632
634 self.id = upElement.get('id')
633 self.id = upElement.get('id')
635 self.name = upElement.get('name')
634 self.name = upElement.get('name')
636 self.datatype = upElement.get('datatype')
635 self.datatype = upElement.get('datatype')
637 self.inputId = upElement.get('inputId')
636 self.inputId = upElement.get('inputId')
638 self.project_id = str(project_id)
637 self.project_id = str(project_id)
639
638
640 if self.ELEMENTNAME == 'ReadUnit':
639 if self.ELEMENTNAME == 'ReadUnit':
641 self.datatype = self.datatype.replace('Reader', '')
640 self.datatype = self.datatype.replace('Reader', '')
642
641
643 if self.ELEMENTNAME == 'ProcUnit':
642 if self.ELEMENTNAME == 'ProcUnit':
644 self.datatype = self.datatype.replace('Proc', '')
643 self.datatype = self.datatype.replace('Proc', '')
645
644
646 if self.inputId == 'None':
645 if self.inputId == 'None':
647 self.inputId = '0'
646 self.inputId = '0'
648
647
649 self.opConfObjList = []
648 self.opConfObjList = []
650
649
651 opElementList = upElement.iter(OperationConf().getElementName())
650 opElementList = upElement.iter(OperationConf().getElementName())
652
651
653 for opElement in opElementList:
652 for opElement in opElementList:
654 opConfObj = OperationConf()
653 opConfObj = OperationConf()
655 opConfObj.readXml(opElement, project_id)
654 opConfObj.readXml(opElement, project_id)
656 self.opConfObjList.append(opConfObj)
655 self.opConfObjList.append(opConfObj)
657
656
658 def printattr(self):
657 def printattr(self):
659
658
660 print('%s[%s]: name = %s, datatype = %s, inputId = %s, project_id = %s' % (self.ELEMENTNAME,
659 print('%s[%s]: name = %s, datatype = %s, inputId = %s, project_id = %s' % (self.ELEMENTNAME,
661 self.id,
660 self.id,
662 self.name,
661 self.name,
663 self.datatype,
662 self.datatype,
664 self.inputId,
663 self.inputId,
665 self.project_id))
664 self.project_id))
666
665
667 for opConfObj in self.opConfObjList:
666 for opConfObj in self.opConfObjList:
668 opConfObj.printattr()
667 opConfObj.printattr()
669
668
670 def getKwargs(self):
669 def getKwargs(self):
671
670
672 opObj = self.opConfObjList[0]
671 opObj = self.opConfObjList[0]
673 kwargs = opObj.getKwargs()
672 kwargs = opObj.getKwargs()
674
673
675 return kwargs
674 return kwargs
676
675
677 def createObjects(self):
676 def createObjects(self):
678 '''
677 '''
679 Instancia de unidades de procesamiento.
678 Instancia de unidades de procesamiento.
680 '''
679 '''
681
680
682 className = eval(self.name)
681 className = eval(self.name)
683 kwargs = self.getKwargs()
682 kwargs = self.getKwargs()
684 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, self.lock, 'ProcUnit', **kwargs)
683 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, self.lock, 'ProcUnit', **kwargs)
685 log.success('creating process...', self.name)
684 log.success('creating process...', self.name)
686
685
687 for opConfObj in self.opConfObjList:
686 for opConfObj in self.opConfObjList:
688
687
689 if opConfObj.type == 'self' and opConfObj.name == 'run':
688 if opConfObj.type == 'self' and opConfObj.name == 'run':
690 continue
689 continue
691 elif opConfObj.type == 'self':
690 elif opConfObj.type == 'self':
692 opObj = getattr(procUnitObj, opConfObj.name)
691 opObj = getattr(procUnitObj, opConfObj.name)
693 else:
692 else:
694 opObj = opConfObj.createObject()
693 opObj = opConfObj.createObject()
695
694
696 log.success('adding operation: {}, type:{}'.format(
695 log.success('adding operation: {}, type:{}'.format(
697 opConfObj.name,
696 opConfObj.name,
698 opConfObj.type), self.name)
697 opConfObj.type), self.name)
699
698
700 procUnitObj.addOperation(opConfObj, opObj)
699 procUnitObj.addOperation(opConfObj, opObj)
701
700
702 procUnitObj.start()
701 procUnitObj.start()
703 self.procUnitObj = procUnitObj
702 self.procUnitObj = procUnitObj
704
703
705 def close(self):
704 def close(self):
706
705
707 for opConfObj in self.opConfObjList:
706 for opConfObj in self.opConfObjList:
708 if opConfObj.type == 'self':
707 if opConfObj.type == 'self':
709 continue
708 continue
710
709
711 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
710 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
712 opObj.close()
711 opObj.close()
713
712
714 self.procUnitObj.close()
713 self.procUnitObj.close()
715
714
716 return
715 return
717
716
718
717
719 class ReadUnitConf(ProcUnitConf):
718 class ReadUnitConf(ProcUnitConf):
720
719
721 ELEMENTNAME = 'ReadUnit'
720 ELEMENTNAME = 'ReadUnit'
722
721
723 def __init__(self):
722 def __init__(self):
724
723
725 self.id = None
724 self.id = None
726 self.datatype = None
725 self.datatype = None
727 self.name = None
726 self.name = None
728 self.inputId = None
727 self.inputId = None
729 self.opConfObjList = []
728 self.opConfObjList = []
730 self.mylock = Event()
729 self.lock = Event()
730 self.lock.set()
731 self.lock.n = Value('d', 0)
731
732
732 def getElementName(self):
733 def getElementName(self):
733
734
734 return self.ELEMENTNAME
735 return self.ELEMENTNAME
735
736
736 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
737 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
737 startTime='', endTime='', server=None, **kwargs):
738 startTime='', endTime='', server=None, **kwargs):
738
739
739
740
740 '''
741 '''
741 *****el id del proceso sera el Topico
742 *****el id del proceso sera el Topico
742
743
743 Adicion de {topic}, si no esta presente -> error
744 Adicion de {topic}, si no esta presente -> error
744 kwargs deben ser trasmitidos en la instanciacion
745 kwargs deben ser trasmitidos en la instanciacion
745
746
746 '''
747 '''
747
748
748 # Compatible with old signal chain version
749 # Compatible with old signal chain version
749 if datatype == None and name == None:
750 if datatype == None and name == None:
750 raise ValueError('datatype or name should be defined')
751 raise ValueError('datatype or name should be defined')
751 if name == None:
752 if name == None:
752 if 'Reader' in datatype:
753 if 'Reader' in datatype:
753 name = datatype
754 name = datatype
754 datatype = name.replace('Reader','')
755 datatype = name.replace('Reader','')
755 else:
756 else:
756 name = '{}Reader'.format(datatype)
757 name = '{}Reader'.format(datatype)
757 if datatype == None:
758 if datatype == None:
758 if 'Reader' in name:
759 if 'Reader' in name:
759 datatype = name.replace('Reader','')
760 datatype = name.replace('Reader','')
760 else:
761 else:
761 datatype = name
762 datatype = name
762 name = '{}Reader'.format(name)
763 name = '{}Reader'.format(name)
763
764
764 self.id = id
765 self.id = id
765 self.project_id = project_id
766 self.project_id = project_id
766 self.name = name
767 self.name = name
767 self.datatype = datatype
768 self.datatype = datatype
768 if path != '':
769 if path != '':
769 self.path = os.path.abspath(path)
770 self.path = os.path.abspath(path)
770 self.startDate = startDate
771 self.startDate = startDate
771 self.endDate = endDate
772 self.endDate = endDate
772 self.startTime = startTime
773 self.startTime = startTime
773 self.endTime = endTime
774 self.endTime = endTime
774 self.server = server
775 self.server = server
775 self.err_queue = err_queue
776 self.err_queue = err_queue
776 self.lock = self.mylock
777 self.addRunOperation(**kwargs)
777 self.addRunOperation(**kwargs)
778
778
779 def update(self, **kwargs):
779 def update(self, **kwargs):
780
780
781 if 'datatype' in kwargs:
781 if 'datatype' in kwargs:
782 datatype = kwargs.pop('datatype')
782 datatype = kwargs.pop('datatype')
783 if 'Reader' in datatype:
783 if 'Reader' in datatype:
784 self.name = datatype
784 self.name = datatype
785 else:
785 else:
786 self.name = '%sReader' % (datatype)
786 self.name = '%sReader' % (datatype)
787 self.datatype = self.name.replace('Reader', '')
787 self.datatype = self.name.replace('Reader', '')
788
788
789 attrs = ('path', 'startDate', 'endDate',
789 attrs = ('path', 'startDate', 'endDate',
790 'startTime', 'endTime')
790 'startTime', 'endTime')
791
791
792 for attr in attrs:
792 for attr in attrs:
793 if attr in kwargs:
793 if attr in kwargs:
794 setattr(self, attr, kwargs.pop(attr))
794 setattr(self, attr, kwargs.pop(attr))
795
795
796 self.updateRunOperation(**kwargs)
796 self.updateRunOperation(**kwargs)
797
797
798 def removeOperations(self):
798 def removeOperations(self):
799
799
800 for obj in self.opConfObjList:
800 for obj in self.opConfObjList:
801 del obj
801 del obj
802
802
803 self.opConfObjList = []
803 self.opConfObjList = []
804
804
805 def addRunOperation(self, **kwargs):
805 def addRunOperation(self, **kwargs):
806
806
807 opObj = self.addOperation(name='run', optype='self')
807 opObj = self.addOperation(name='run', optype='self')
808
808
809 if self.server is None:
809 if self.server is None:
810 opObj.addParameter(
810 opObj.addParameter(
811 name='datatype', value=self.datatype, format='str')
811 name='datatype', value=self.datatype, format='str')
812 opObj.addParameter(name='path', value=self.path, format='str')
812 opObj.addParameter(name='path', value=self.path, format='str')
813 opObj.addParameter(
813 opObj.addParameter(
814 name='startDate', value=self.startDate, format='date')
814 name='startDate', value=self.startDate, format='date')
815 opObj.addParameter(
815 opObj.addParameter(
816 name='endDate', value=self.endDate, format='date')
816 name='endDate', value=self.endDate, format='date')
817 opObj.addParameter(
817 opObj.addParameter(
818 name='startTime', value=self.startTime, format='time')
818 name='startTime', value=self.startTime, format='time')
819 opObj.addParameter(
819 opObj.addParameter(
820 name='endTime', value=self.endTime, format='time')
820 name='endTime', value=self.endTime, format='time')
821
821
822 for key, value in list(kwargs.items()):
822 for key, value in list(kwargs.items()):
823 opObj.addParameter(name=key, value=value,
823 opObj.addParameter(name=key, value=value,
824 format=type(value).__name__)
824 format=type(value).__name__)
825 else:
825 else:
826 opObj.addParameter(name='server', value=self.server, format='str')
826 opObj.addParameter(name='server', value=self.server, format='str')
827
827
828 return opObj
828 return opObj
829
829
830 def updateRunOperation(self, **kwargs):
830 def updateRunOperation(self, **kwargs):
831
831
832 opObj = self.getOperationObj(name='run')
832 opObj = self.getOperationObj(name='run')
833 opObj.removeParameters()
833 opObj.removeParameters()
834
834
835 opObj.addParameter(name='datatype', value=self.datatype, format='str')
835 opObj.addParameter(name='datatype', value=self.datatype, format='str')
836 opObj.addParameter(name='path', value=self.path, format='str')
836 opObj.addParameter(name='path', value=self.path, format='str')
837 opObj.addParameter(
837 opObj.addParameter(
838 name='startDate', value=self.startDate, format='date')
838 name='startDate', value=self.startDate, format='date')
839 opObj.addParameter(name='endDate', value=self.endDate, format='date')
839 opObj.addParameter(name='endDate', value=self.endDate, format='date')
840 opObj.addParameter(
840 opObj.addParameter(
841 name='startTime', value=self.startTime, format='time')
841 name='startTime', value=self.startTime, format='time')
842 opObj.addParameter(name='endTime', value=self.endTime, format='time')
842 opObj.addParameter(name='endTime', value=self.endTime, format='time')
843
843
844 for key, value in list(kwargs.items()):
844 for key, value in list(kwargs.items()):
845 opObj.addParameter(name=key, value=value,
845 opObj.addParameter(name=key, value=value,
846 format=type(value).__name__)
846 format=type(value).__name__)
847
847
848 return opObj
848 return opObj
849
849
850 def readXml(self, upElement, project_id):
850 def readXml(self, upElement, project_id):
851
851
852 self.id = upElement.get('id')
852 self.id = upElement.get('id')
853 self.name = upElement.get('name')
853 self.name = upElement.get('name')
854 self.datatype = upElement.get('datatype')
854 self.datatype = upElement.get('datatype')
855 self.project_id = str(project_id) #yong
855 self.project_id = str(project_id) #yong
856
856
857 if self.ELEMENTNAME == 'ReadUnit':
857 if self.ELEMENTNAME == 'ReadUnit':
858 self.datatype = self.datatype.replace('Reader', '')
858 self.datatype = self.datatype.replace('Reader', '')
859
859
860 self.opConfObjList = []
860 self.opConfObjList = []
861
861
862 opElementList = upElement.iter(OperationConf().getElementName())
862 opElementList = upElement.iter(OperationConf().getElementName())
863
863
864 for opElement in opElementList:
864 for opElement in opElementList:
865 opConfObj = OperationConf()
865 opConfObj = OperationConf()
866 opConfObj.readXml(opElement, project_id)
866 opConfObj.readXml(opElement, project_id)
867 self.opConfObjList.append(opConfObj)
867 self.opConfObjList.append(opConfObj)
868
868
869 if opConfObj.name == 'run':
869 if opConfObj.name == 'run':
870 self.path = opConfObj.getParameterValue('path')
870 self.path = opConfObj.getParameterValue('path')
871 self.startDate = opConfObj.getParameterValue('startDate')
871 self.startDate = opConfObj.getParameterValue('startDate')
872 self.endDate = opConfObj.getParameterValue('endDate')
872 self.endDate = opConfObj.getParameterValue('endDate')
873 self.startTime = opConfObj.getParameterValue('startTime')
873 self.startTime = opConfObj.getParameterValue('startTime')
874 self.endTime = opConfObj.getParameterValue('endTime')
874 self.endTime = opConfObj.getParameterValue('endTime')
875
875
876
876
877 class Project(Process):
877 class Project(Process):
878
878
879 ELEMENTNAME = 'Project'
879 ELEMENTNAME = 'Project'
880
880
881 def __init__(self):
881 def __init__(self):
882
882
883 Process.__init__(self)
883 Process.__init__(self)
884 self.id = None
884 self.id = None
885 self.filename = None
885 self.filename = None
886 self.description = None
886 self.description = None
887 self.email = None
887 self.email = None
888 self.alarm = None
888 self.alarm = None
889 self.procUnitConfObjDict = {}
889 self.procUnitConfObjDict = {}
890 self.err_queue = Queue()
890 self.err_queue = Queue()
891
891
892 def __getNewId(self):
892 def __getNewId(self):
893
893
894 idList = list(self.procUnitConfObjDict.keys())
894 idList = list(self.procUnitConfObjDict.keys())
895 id = int(self.id) * 10
895 id = int(self.id) * 10
896
896
897 while True:
897 while True:
898 id += 1
898 id += 1
899
899
900 if str(id) in idList:
900 if str(id) in idList:
901 continue
901 continue
902
902
903 break
903 break
904
904
905 return str(id)
905 return str(id)
906
906
907 def getElementName(self):
907 def getElementName(self):
908
908
909 return self.ELEMENTNAME
909 return self.ELEMENTNAME
910
910
911 def getId(self):
911 def getId(self):
912
912
913 return self.id
913 return self.id
914
914
915 def updateId(self, new_id):
915 def updateId(self, new_id):
916
916
917 self.id = str(new_id)
917 self.id = str(new_id)
918
918
919 keyList = list(self.procUnitConfObjDict.keys())
919 keyList = list(self.procUnitConfObjDict.keys())
920 keyList.sort()
920 keyList.sort()
921
921
922 n = 1
922 n = 1
923 newProcUnitConfObjDict = {}
923 newProcUnitConfObjDict = {}
924
924
925 for procKey in keyList:
925 for procKey in keyList:
926
926
927 procUnitConfObj = self.procUnitConfObjDict[procKey]
927 procUnitConfObj = self.procUnitConfObjDict[procKey]
928 idProcUnit = str(int(self.id) * 10 + n)
928 idProcUnit = str(int(self.id) * 10 + n)
929 procUnitConfObj.updateId(idProcUnit)
929 procUnitConfObj.updateId(idProcUnit)
930 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
930 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
931 n += 1
931 n += 1
932
932
933 self.procUnitConfObjDict = newProcUnitConfObjDict
933 self.procUnitConfObjDict = newProcUnitConfObjDict
934
934
935 def setup(self, id=1, name='', description='', email=None, alarm=[]):
935 def setup(self, id=1, name='', description='', email=None, alarm=[]):
936
936
937 print(' ')
937 print(' ')
938 print('*' * 60)
938 print('*' * 60)
939 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
939 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
940 print('*' * 60)
940 print('*' * 60)
941 print("* Python " + python_version() + " *")
941 print("* Python " + python_version() + " *")
942 print('*' * 19)
942 print('*' * 19)
943 print(' ')
943 print(' ')
944 self.id = str(id)
944 self.id = str(id)
945 self.description = description
945 self.description = description
946 self.email = email
946 self.email = email
947 self.alarm = alarm
947 self.alarm = alarm
948 if name:
948 if name:
949 self.name = '{} ({})'.format(Process.__name__, name)
949 self.name = '{} ({})'.format(Process.__name__, name)
950
950
951 def update(self, **kwargs):
951 def update(self, **kwargs):
952
952
953 for key, value in list(kwargs.items()):
953 for key, value in list(kwargs.items()):
954 setattr(self, key, value)
954 setattr(self, key, value)
955
955
956 def clone(self):
956 def clone(self):
957
957
958 p = Project()
958 p = Project()
959 p.procUnitConfObjDict = self.procUnitConfObjDict
959 p.procUnitConfObjDict = self.procUnitConfObjDict
960 return p
960 return p
961
961
962 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
962 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
963
963
964 '''
964 '''
965 Actualizacion:
965 Actualizacion:
966 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
966 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
967
967
968 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
968 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
969
969
970 '''
970 '''
971
971
972 if id is None:
972 if id is None:
973 idReadUnit = self.__getNewId()
973 idReadUnit = self.__getNewId()
974 else:
974 else:
975 idReadUnit = str(id)
975 idReadUnit = str(id)
976
976
977 readUnitConfObj = ReadUnitConf()
977 readUnitConfObj = ReadUnitConf()
978 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
978 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
979 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
979 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
980
980
981 return readUnitConfObj
981 return readUnitConfObj
982
982
983 def addProcUnit(self, inputId='0', datatype=None, name=None):
983 def addProcUnit(self, inputId='0', datatype=None, name=None):
984
984
985 '''
985 '''
986 Actualizacion:
986 Actualizacion:
987 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
987 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
988 Deberia reemplazar a "inputId"
988 Deberia reemplazar a "inputId"
989
989
990 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
990 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
991 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
991 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
992
992
993 '''
993 '''
994
994
995 idProcUnit = self.__getNewId()
995 idProcUnit = self.__getNewId()
996 procUnitConfObj = ProcUnitConf()
996 procUnitConfObj = ProcUnitConf()
997 input_proc = self.procUnitConfObjDict[inputId]
997 input_proc = self.procUnitConfObjDict[inputId]
998 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.mylock)
998 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.lock)
999 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
999 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1000
1000
1001 return procUnitConfObj
1001 return procUnitConfObj
1002
1002
1003 def removeProcUnit(self, id):
1003 def removeProcUnit(self, id):
1004
1004
1005 if id in list(self.procUnitConfObjDict.keys()):
1005 if id in list(self.procUnitConfObjDict.keys()):
1006 self.procUnitConfObjDict.pop(id)
1006 self.procUnitConfObjDict.pop(id)
1007
1007
1008 def getReadUnitId(self):
1008 def getReadUnitId(self):
1009
1009
1010 readUnitConfObj = self.getReadUnitObj()
1010 readUnitConfObj = self.getReadUnitObj()
1011
1011
1012 return readUnitConfObj.id
1012 return readUnitConfObj.id
1013
1013
1014 def getReadUnitObj(self):
1014 def getReadUnitObj(self):
1015
1015
1016 for obj in list(self.procUnitConfObjDict.values()):
1016 for obj in list(self.procUnitConfObjDict.values()):
1017 if obj.getElementName() == 'ReadUnit':
1017 if obj.getElementName() == 'ReadUnit':
1018 return obj
1018 return obj
1019
1019
1020 return None
1020 return None
1021
1021
1022 def getProcUnitObj(self, id=None, name=None):
1022 def getProcUnitObj(self, id=None, name=None):
1023
1023
1024 if id != None:
1024 if id != None:
1025 return self.procUnitConfObjDict[id]
1025 return self.procUnitConfObjDict[id]
1026
1026
1027 if name != None:
1027 if name != None:
1028 return self.getProcUnitObjByName(name)
1028 return self.getProcUnitObjByName(name)
1029
1029
1030 return None
1030 return None
1031
1031
1032 def getProcUnitObjByName(self, name):
1032 def getProcUnitObjByName(self, name):
1033
1033
1034 for obj in list(self.procUnitConfObjDict.values()):
1034 for obj in list(self.procUnitConfObjDict.values()):
1035 if obj.name == name:
1035 if obj.name == name:
1036 return obj
1036 return obj
1037
1037
1038 return None
1038 return None
1039
1039
1040 def procUnitItems(self):
1040 def procUnitItems(self):
1041
1041
1042 return list(self.procUnitConfObjDict.items())
1042 return list(self.procUnitConfObjDict.items())
1043
1043
1044 def makeXml(self):
1044 def makeXml(self):
1045
1045
1046 projectElement = Element('Project')
1046 projectElement = Element('Project')
1047 projectElement.set('id', str(self.id))
1047 projectElement.set('id', str(self.id))
1048 projectElement.set('name', self.name)
1048 projectElement.set('name', self.name)
1049 projectElement.set('description', self.description)
1049 projectElement.set('description', self.description)
1050
1050
1051 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1051 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1052 procUnitConfObj.makeXml(projectElement)
1052 procUnitConfObj.makeXml(projectElement)
1053
1053
1054 self.projectElement = projectElement
1054 self.projectElement = projectElement
1055
1055
1056 def writeXml(self, filename=None):
1056 def writeXml(self, filename=None):
1057
1057
1058 if filename == None:
1058 if filename == None:
1059 if self.filename:
1059 if self.filename:
1060 filename = self.filename
1060 filename = self.filename
1061 else:
1061 else:
1062 filename = 'schain.xml'
1062 filename = 'schain.xml'
1063
1063
1064 if not filename:
1064 if not filename:
1065 print('filename has not been defined. Use setFilename(filename) for do it.')
1065 print('filename has not been defined. Use setFilename(filename) for do it.')
1066 return 0
1066 return 0
1067
1067
1068 abs_file = os.path.abspath(filename)
1068 abs_file = os.path.abspath(filename)
1069
1069
1070 if not os.access(os.path.dirname(abs_file), os.W_OK):
1070 if not os.access(os.path.dirname(abs_file), os.W_OK):
1071 print('No write permission on %s' % os.path.dirname(abs_file))
1071 print('No write permission on %s' % os.path.dirname(abs_file))
1072 return 0
1072 return 0
1073
1073
1074 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1074 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1075 print('File %s already exists and it could not be overwriten' % abs_file)
1075 print('File %s already exists and it could not be overwriten' % abs_file)
1076 return 0
1076 return 0
1077
1077
1078 self.makeXml()
1078 self.makeXml()
1079
1079
1080 ElementTree(self.projectElement).write(abs_file, method='xml')
1080 ElementTree(self.projectElement).write(abs_file, method='xml')
1081
1081
1082 self.filename = abs_file
1082 self.filename = abs_file
1083
1083
1084 return 1
1084 return 1
1085
1085
1086 def readXml(self, filename=None):
1086 def readXml(self, filename=None):
1087
1087
1088 if not filename:
1088 if not filename:
1089 print('filename is not defined')
1089 print('filename is not defined')
1090 return 0
1090 return 0
1091
1091
1092 abs_file = os.path.abspath(filename)
1092 abs_file = os.path.abspath(filename)
1093
1093
1094 if not os.path.isfile(abs_file):
1094 if not os.path.isfile(abs_file):
1095 print('%s file does not exist' % abs_file)
1095 print('%s file does not exist' % abs_file)
1096 return 0
1096 return 0
1097
1097
1098 self.projectElement = None
1098 self.projectElement = None
1099 self.procUnitConfObjDict = {}
1099 self.procUnitConfObjDict = {}
1100
1100
1101 try:
1101 try:
1102 self.projectElement = ElementTree().parse(abs_file)
1102 self.projectElement = ElementTree().parse(abs_file)
1103 except:
1103 except:
1104 print('Error reading %s, verify file format' % filename)
1104 print('Error reading %s, verify file format' % filename)
1105 return 0
1105 return 0
1106
1106
1107 self.project = self.projectElement.tag
1107 self.project = self.projectElement.tag
1108
1108
1109 self.id = self.projectElement.get('id')
1109 self.id = self.projectElement.get('id')
1110 self.name = self.projectElement.get('name')
1110 self.name = self.projectElement.get('name')
1111 self.description = self.projectElement.get('description')
1111 self.description = self.projectElement.get('description')
1112
1112
1113 readUnitElementList = self.projectElement.iter(
1113 readUnitElementList = self.projectElement.iter(
1114 ReadUnitConf().getElementName())
1114 ReadUnitConf().getElementName())
1115
1115
1116 for readUnitElement in readUnitElementList:
1116 for readUnitElement in readUnitElementList:
1117 readUnitConfObj = ReadUnitConf()
1117 readUnitConfObj = ReadUnitConf()
1118 readUnitConfObj.readXml(readUnitElement, self.id)
1118 readUnitConfObj.readXml(readUnitElement, self.id)
1119 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1119 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1120
1120
1121 procUnitElementList = self.projectElement.iter(
1121 procUnitElementList = self.projectElement.iter(
1122 ProcUnitConf().getElementName())
1122 ProcUnitConf().getElementName())
1123
1123
1124 for procUnitElement in procUnitElementList:
1124 for procUnitElement in procUnitElementList:
1125 procUnitConfObj = ProcUnitConf()
1125 procUnitConfObj = ProcUnitConf()
1126 procUnitConfObj.readXml(procUnitElement, self.id)
1126 procUnitConfObj.readXml(procUnitElement, self.id)
1127 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1127 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1128
1128
1129 self.filename = abs_file
1129 self.filename = abs_file
1130
1130
1131 return 1
1131 return 1
1132
1132
1133 def __str__(self):
1133 def __str__(self):
1134
1134
1135 print('Project: name = %s, description = %s, id = %s' % (
1135 print('Project: name = %s, description = %s, id = %s' % (
1136 self.name,
1136 self.name,
1137 self.description,
1137 self.description,
1138 self.id))
1138 self.id))
1139
1139
1140 for procUnitConfObj in self.procUnitConfObjDict.values():
1140 for procUnitConfObj in self.procUnitConfObjDict.values():
1141 print(procUnitConfObj)
1141 print(procUnitConfObj)
1142
1142
1143 def createObjects(self):
1143 def createObjects(self):
1144
1144
1145
1145
1146 keys = list(self.procUnitConfObjDict.keys())
1146 keys = list(self.procUnitConfObjDict.keys())
1147 keys.sort()
1147 keys.sort()
1148 for key in keys:
1148 for key in keys:
1149 self.procUnitConfObjDict[key].createObjects()
1149 self.procUnitConfObjDict[key].createObjects()
1150
1150
1151 def monitor(self):
1151 def monitor(self):
1152
1152
1153 t = Thread(target=self.__monitor, args=(self.err_queue, self.ctx))
1153 t = Thread(target=self.__monitor, args=(self.err_queue, self.ctx))
1154 t.start()
1154 t.start()
1155
1155
1156 def __monitor(self, queue, ctx):
1156 def __monitor(self, queue, ctx):
1157
1157
1158 import socket
1158 import socket
1159
1159
1160 procs = 0
1160 procs = 0
1161 err_msg = ''
1161 err_msg = ''
1162
1162
1163 while True:
1163 while True:
1164 msg = queue.get()
1164 msg = queue.get()
1165 if '#_start_#' in msg:
1165 if '#_start_#' in msg:
1166 procs += 1
1166 procs += 1
1167 elif '#_end_#' in msg:
1167 elif '#_end_#' in msg:
1168 procs -=1
1168 procs -=1
1169 else:
1169 else:
1170 err_msg = msg
1170 err_msg = msg
1171
1171
1172 if procs == 0 or 'Traceback' in err_msg:
1172 if procs == 0 or 'Traceback' in err_msg:
1173 break
1173 break
1174 time.sleep(0.1)
1174 time.sleep(0.1)
1175
1175
1176 if '|' in err_msg:
1176 if '|' in err_msg:
1177 name, err = err_msg.split('|')
1177 name, err = err_msg.split('|')
1178 if 'SchainWarning' in err:
1178 if 'SchainWarning' in err:
1179 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
1179 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
1180 elif 'SchainError' in err:
1180 elif 'SchainError' in err:
1181 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
1181 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
1182 else:
1182 else:
1183 log.error(err, name)
1183 log.error(err, name)
1184 else:
1184 else:
1185 name, err = self.name, err_msg
1185 name, err = self.name, err_msg
1186
1186
1187 time.sleep(2)
1187 time.sleep(2)
1188
1188
1189 for conf in self.procUnitConfObjDict.values():
1189 for conf in self.procUnitConfObjDict.values():
1190 for confop in conf.opConfObjList:
1190 for confop in conf.opConfObjList:
1191 if confop.type == 'external':
1191 if confop.type == 'external':
1192 confop.opObj.terminate()
1192 confop.opObj.terminate()
1193 conf.procUnitObj.terminate()
1193 conf.procUnitObj.terminate()
1194
1194
1195 ctx.term()
1195 ctx.term()
1196
1196
1197 message = ''.join(err)
1197 message = ''.join(err)
1198
1198
1199 if err_msg:
1199 if err_msg:
1200 subject = 'SChain v%s: Error running %s\n' % (
1200 subject = 'SChain v%s: Error running %s\n' % (
1201 schainpy.__version__, self.name)
1201 schainpy.__version__, self.name)
1202
1202
1203 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
1203 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
1204 socket.gethostname())
1204 socket.gethostname())
1205 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
1205 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
1206 subtitle += 'Configuration file: %s\n' % self.filename
1206 subtitle += 'Configuration file: %s\n' % self.filename
1207 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1207 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1208
1208
1209 readUnitConfObj = self.getReadUnitObj()
1209 readUnitConfObj = self.getReadUnitObj()
1210 if readUnitConfObj:
1210 if readUnitConfObj:
1211 subtitle += '\nInput parameters:\n'
1211 subtitle += '\nInput parameters:\n'
1212 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
1212 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
1213 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
1213 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
1214 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
1214 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
1215 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
1215 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
1216 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
1216 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
1217 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1217 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1218
1218
1219 a = Alarm(
1219 a = Alarm(
1220 modes=self.alarm,
1220 modes=self.alarm,
1221 email=self.email,
1221 email=self.email,
1222 message=message,
1222 message=message,
1223 subject=subject,
1223 subject=subject,
1224 subtitle=subtitle,
1224 subtitle=subtitle,
1225 filename=self.filename
1225 filename=self.filename
1226 )
1226 )
1227
1227
1228 a.start()
1228 a.start()
1229
1229
1230 def isPaused(self):
1230 def isPaused(self):
1231 return 0
1231 return 0
1232
1232
1233 def isStopped(self):
1233 def isStopped(self):
1234 return 0
1234 return 0
1235
1235
1236 def runController(self):
1236 def runController(self):
1237 '''
1237 '''
1238 returns 0 when this process has been stopped, 1 otherwise
1238 returns 0 when this process has been stopped, 1 otherwise
1239 '''
1239 '''
1240
1240
1241 if self.isPaused():
1241 if self.isPaused():
1242 print('Process suspended')
1242 print('Process suspended')
1243
1243
1244 while True:
1244 while True:
1245 time.sleep(0.1)
1245 time.sleep(0.1)
1246
1246
1247 if not self.isPaused():
1247 if not self.isPaused():
1248 break
1248 break
1249
1249
1250 if self.isStopped():
1250 if self.isStopped():
1251 break
1251 break
1252
1252
1253 print('Process reinitialized')
1253 print('Process reinitialized')
1254
1254
1255 if self.isStopped():
1255 if self.isStopped():
1256 print('Process stopped')
1256 print('Process stopped')
1257 return 0
1257 return 0
1258
1258
1259 return 1
1259 return 1
1260
1260
1261 def setFilename(self, filename):
1261 def setFilename(self, filename):
1262
1262
1263 self.filename = filename
1263 self.filename = filename
1264
1264
1265 def setProxy(self):
1265 def setProxy(self):
1266
1266
1267 if not os.path.exists('/tmp/schain'):
1267 if not os.path.exists('/tmp/schain'):
1268 os.mkdir('/tmp/schain')
1268 os.mkdir('/tmp/schain')
1269
1269
1270 self.ctx = zmq.Context()
1270 self.ctx = zmq.Context()
1271 xpub = self.ctx.socket(zmq.XPUB)
1271 xpub = self.ctx.socket(zmq.XPUB)
1272 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1272 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1273 xsub = self.ctx.socket(zmq.XSUB)
1273 xsub = self.ctx.socket(zmq.XSUB)
1274 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1274 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1275 self.monitor()
1275 self.monitor()
1276 try:
1276 try:
1277 zmq.proxy(xpub, xsub)
1277 zmq.proxy(xpub, xsub)
1278 except zmq.ContextTerminated:
1278 except zmq.ContextTerminated:
1279 xpub.close()
1279 xpub.close()
1280 xsub.close()
1280 xsub.close()
1281
1281
1282 def run(self):
1282 def run(self):
1283
1283
1284 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1284 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1285 self.start_time = time.time()
1285 self.start_time = time.time()
1286 self.createObjects()
1286 self.createObjects()
1287 self.setProxy()
1287 self.setProxy()
1288 log.success('{} Done (Time: {}s)'.format(
1288 log.success('{} Done (Time: {}s)'.format(
1289 self.name,
1289 self.name,
1290 time.time()-self.start_time), '')
1290 time.time()-self.start_time), '')
@@ -1,416 +1,429
1 '''
1 '''
2 Updated for multiprocessing
2 Updated for multiprocessing
3 Author : Sergio Cortez
3 Author : Sergio Cortez
4 Jan 2018
4 Jan 2018
5 Abstract:
5 Abstract:
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9
9
10 Based on:
10 Based on:
11 $Author: murco $
11 $Author: murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 '''
13 '''
14
14
15 import os
15 import os
16 import sys
16 import sys
17 import inspect
17 import inspect
18 import zmq
18 import zmq
19 import time
19 import time
20 import pickle
20 import pickle
21 import traceback
21 import traceback
22 try:
22 try:
23 from queue import Queue
23 from queue import Queue
24 except:
24 except:
25 from Queue import Queue
25 from Queue import Queue
26 from threading import Thread
26 from threading import Thread
27 from multiprocessing import Process
27 from multiprocessing import Process
28
28
29 from schainpy.utils import log
29 from schainpy.utils import log
30
30
31
31
32 class ProcessingUnit(object):
32 class ProcessingUnit(object):
33
33
34 """
34 """
35 Update - Jan 2018 - MULTIPROCESSING
35 Update - Jan 2018 - MULTIPROCESSING
36 All the "call" methods present in the previous base were removed.
36 All the "call" methods present in the previous base were removed.
37 The majority of operations are independant processes, thus
37 The majority of operations are independant processes, thus
38 the decorator is in charge of communicate the operation processes
38 the decorator is in charge of communicate the operation processes
39 with the proccessing unit via IPC.
39 with the proccessing unit via IPC.
40
40
41 The constructor does not receive any argument. The remaining methods
41 The constructor does not receive any argument. The remaining methods
42 are related with the operations to execute.
42 are related with the operations to execute.
43
43
44
44
45 """
45 """
46 proc_type = 'processing'
46 proc_type = 'processing'
47 __attrs__ = []
47 __attrs__ = []
48
48
49 def __init__(self):
49 def __init__(self):
50
50
51 self.dataIn = None
51 self.dataIn = None
52 self.dataOut = None
52 self.dataOut = None
53 self.isConfig = False
53 self.isConfig = False
54 self.operations = []
54 self.operations = []
55 self.plots = []
55 self.plots = []
56
56
57 def getAllowedArgs(self):
57 def getAllowedArgs(self):
58 if hasattr(self, '__attrs__'):
58 if hasattr(self, '__attrs__'):
59 return self.__attrs__
59 return self.__attrs__
60 else:
60 else:
61 return inspect.getargspec(self.run).args
61 return inspect.getargspec(self.run).args
62
62
63 def addOperation(self, conf, operation):
63 def addOperation(self, conf, operation):
64 """
64 """
65 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
65 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
66 posses the id of the operation process (IPC purposes)
66 posses the id of the operation process (IPC purposes)
67
67
68 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
68 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
69 identificador asociado a este objeto.
69 identificador asociado a este objeto.
70
70
71 Input:
71 Input:
72
72
73 object : objeto de la clase "Operation"
73 object : objeto de la clase "Operation"
74
74
75 Return:
75 Return:
76
76
77 objId : identificador del objeto, necesario para comunicar con master(procUnit)
77 objId : identificador del objeto, necesario para comunicar con master(procUnit)
78 """
78 """
79
79
80 self.operations.append(
80 self.operations.append(
81 (operation, conf.type, conf.id, conf.getKwargs()))
81 (operation, conf.type, conf.id, conf.getKwargs()))
82
82
83 if 'plot' in self.name.lower():
83 if 'plot' in self.name.lower():
84 self.plots.append(operation.CODE)
84 self.plots.append(operation.CODE)
85
85
86 def getOperationObj(self, objId):
86 def getOperationObj(self, objId):
87
87
88 if objId not in list(self.operations.keys()):
88 if objId not in list(self.operations.keys()):
89 return None
89 return None
90
90
91 return self.operations[objId]
91 return self.operations[objId]
92
92
93 def operation(self, **kwargs):
93 def operation(self, **kwargs):
94 """
94 """
95 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
95 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
96 atributos del objeto dataOut
96 atributos del objeto dataOut
97
97
98 Input:
98 Input:
99
99
100 **kwargs : Diccionario de argumentos de la funcion a ejecutar
100 **kwargs : Diccionario de argumentos de la funcion a ejecutar
101 """
101 """
102
102
103 raise NotImplementedError
103 raise NotImplementedError
104
104
105 def setup(self):
105 def setup(self):
106
106
107 raise NotImplementedError
107 raise NotImplementedError
108
108
109 def run(self):
109 def run(self):
110
110
111 raise NotImplementedError
111 raise NotImplementedError
112
112
113 def close(self):
113 def close(self):
114
114
115 return
115 return
116
116
117
117
118 class Operation(object):
118 class Operation(object):
119
119
120 """
120 """
121 Update - Jan 2018 - MULTIPROCESSING
121 Update - Jan 2018 - MULTIPROCESSING
122
122
123 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
123 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
124 The constructor doe snot receive any argument, neither the baseclass.
124 The constructor doe snot receive any argument, neither the baseclass.
125
125
126
126
127 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
127 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
128 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
128 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
129 acumulacion dentro de esta clase
129 acumulacion dentro de esta clase
130
130
131 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
131 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
132
132
133 """
133 """
134 proc_type = 'operation'
134 proc_type = 'operation'
135 __attrs__ = []
135 __attrs__ = []
136
136
137 def __init__(self):
137 def __init__(self):
138
138
139 self.id = None
139 self.id = None
140 self.isConfig = False
140 self.isConfig = False
141
141
142 if not hasattr(self, 'name'):
142 if not hasattr(self, 'name'):
143 self.name = self.__class__.__name__
143 self.name = self.__class__.__name__
144
144
145 def getAllowedArgs(self):
145 def getAllowedArgs(self):
146 if hasattr(self, '__attrs__'):
146 if hasattr(self, '__attrs__'):
147 return self.__attrs__
147 return self.__attrs__
148 else:
148 else:
149 return inspect.getargspec(self.run).args
149 return inspect.getargspec(self.run).args
150
150
151 def setup(self):
151 def setup(self):
152
152
153 self.isConfig = True
153 self.isConfig = True
154
154
155 raise NotImplementedError
155 raise NotImplementedError
156
156
157 def run(self, dataIn, **kwargs):
157 def run(self, dataIn, **kwargs):
158 """
158 """
159 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
159 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
160 atributos del objeto dataIn.
160 atributos del objeto dataIn.
161
161
162 Input:
162 Input:
163
163
164 dataIn : objeto del tipo JROData
164 dataIn : objeto del tipo JROData
165
165
166 Return:
166 Return:
167
167
168 None
168 None
169
169
170 Affected:
170 Affected:
171 __buffer : buffer de recepcion de datos.
171 __buffer : buffer de recepcion de datos.
172
172
173 """
173 """
174 if not self.isConfig:
174 if not self.isConfig:
175 self.setup(**kwargs)
175 self.setup(**kwargs)
176
176
177 raise NotImplementedError
177 raise NotImplementedError
178
178
179 def close(self):
179 def close(self):
180
180
181 return
181 return
182
182
183 class InputQueue(Thread):
183 class InputQueue(Thread):
184
184
185 '''
185 '''
186 Class to hold input data for Proccessing Units and external Operations,
186 Class to hold input data for Proccessing Units and external Operations,
187 '''
187 '''
188
188
189 def __init__(self, project_id, inputId, lock=None):
189 def __init__(self, project_id, inputId, lock=None):
190
190
191 Thread.__init__(self)
191 Thread.__init__(self)
192 self.queue = Queue()
192 self.queue = Queue()
193 self.project_id = project_id
193 self.project_id = project_id
194 self.inputId = inputId
194 self.inputId = inputId
195 self.lock = lock
195 self.lock = lock
196 self.islocked = False
196 self.size = 0
197 self.size = 0
197
198
198 def run(self):
199 def run(self):
199
200
200 c = zmq.Context()
201 c = zmq.Context()
201 self.receiver = c.socket(zmq.SUB)
202 self.receiver = c.socket(zmq.SUB)
202 self.receiver.connect(
203 self.receiver.connect(
203 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
204 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
204 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
205 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
205
206
206 while True:
207 while True:
207 obj = self.receiver.recv_multipart()[1]
208 obj = self.receiver.recv_multipart()[1]
208 self.size += sys.getsizeof(obj)
209 self.size += sys.getsizeof(obj)
209 self.queue.put(obj)
210 self.queue.put(obj)
210
211
211 def get(self):
212 def get(self):
212 if self.size/1000000 > 2048:
213
214 if not self.islocked and self.size/1000000 > 512:
215 self.lock.n.value += 1
216 self.islocked = True
213 self.lock.clear()
217 self.lock.clear()
214 else:
218 elif self.islocked and self.size/1000000 <= 512:
219 self.islocked = False
220 self.lock.n.value -= 1
221 if self.lock.n.value == 0:
215 self.lock.set()
222 self.lock.set()
223
216 obj = self.queue.get()
224 obj = self.queue.get()
217 self.size -= sys.getsizeof(obj)
225 self.size -= sys.getsizeof(obj)
218 return pickle.loads(obj)
226 return pickle.loads(obj)
219
227
220
228
221 def MPDecorator(BaseClass):
229 def MPDecorator(BaseClass):
222 """
230 """
223 Multiprocessing class decorator
231 Multiprocessing class decorator
224
232
225 This function add multiprocessing features to a BaseClass. Also, it handle
233 This function add multiprocessing features to a BaseClass. Also, it handle
226 the communication beetween processes (readers, procUnits and operations).
234 the communication beetween processes (readers, procUnits and operations).
227 """
235 """
228
236
229 class MPClass(BaseClass, Process):
237 class MPClass(BaseClass, Process):
230
238
231 def __init__(self, *args, **kwargs):
239 def __init__(self, *args, **kwargs):
232 super(MPClass, self).__init__()
240 super(MPClass, self).__init__()
233 Process.__init__(self)
241 Process.__init__(self)
234 self.operationKwargs = {}
242 self.operationKwargs = {}
235 self.args = args
243 self.args = args
236 self.kwargs = kwargs
244 self.kwargs = kwargs
237 self.sender = None
245 self.sender = None
238 self.receiver = None
246 self.receiver = None
239 self.i = 0
247 self.i = 0
240 self.t = time.time()
248 self.t = time.time()
241 self.name = BaseClass.__name__
249 self.name = BaseClass.__name__
242 self.__doc__ = BaseClass.__doc__
250 self.__doc__ = BaseClass.__doc__
243
251
244 if 'plot' in self.name.lower() and not self.name.endswith('_'):
252 if 'plot' in self.name.lower() and not self.name.endswith('_'):
245 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
253 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
246
254
247 self.start_time = time.time()
255 self.start_time = time.time()
248 self.id = args[0]
256 self.id = args[0]
249 self.inputId = args[1]
257 self.inputId = args[1]
250 self.project_id = args[2]
258 self.project_id = args[2]
251 self.err_queue = args[3]
259 self.err_queue = args[3]
252 self.lock = args[4]
260 self.lock = args[4]
253 self.typeProc = args[5]
261 self.typeProc = args[5]
254 self.err_queue.put('#_start_#')
262 self.err_queue.put('#_start_#')
263 if self.inputId is not None:
255 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
264 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
256
265
257 def subscribe(self):
266 def subscribe(self):
258 '''
267 '''
259 Start the zmq socket receiver and subcribe to input ID.
268 Start the zmq socket receiver and subcribe to input ID.
260 '''
269 '''
261
270
262 self.queue.start()
271 self.queue.start()
263
272
264 def listen(self):
273 def listen(self):
265 '''
274 '''
266 This function waits for objects
275 This function waits for objects
267 '''
276 '''
268
277
269 return self.queue.get()
278 return self.queue.get()
270
279
271 def set_publisher(self):
280 def set_publisher(self):
272 '''
281 '''
273 This function create a zmq socket for publishing objects.
282 This function create a zmq socket for publishing objects.
274 '''
283 '''
275
284
276 time.sleep(0.5)
285 time.sleep(0.5)
277
286
278 c = zmq.Context()
287 c = zmq.Context()
279 self.sender = c.socket(zmq.PUB)
288 self.sender = c.socket(zmq.PUB)
280 self.sender.connect(
289 self.sender.connect(
281 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
290 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
282
291
283 def publish(self, data, id):
292 def publish(self, data, id):
284 '''
293 '''
285 This function publish an object, to an specific topic.
294 This function publish an object, to an specific topic.
286 It blocks publishing when receiver queue is full to avoid data loss
295 It blocks publishing when receiver queue is full to avoid data loss
287 '''
296 '''
288
297
289 if self.inputId is None:
298 if self.inputId is None:
290 self.lock.wait()
299 self.lock.wait()
291 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
300 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
292
301
293 def runReader(self):
302 def runReader(self):
294 '''
303 '''
295 Run fuction for read units
304 Run fuction for read units
296 '''
305 '''
297 while True:
306 while True:
298
307
299 try:
308 try:
300 BaseClass.run(self, **self.kwargs)
309 BaseClass.run(self, **self.kwargs)
301 except:
310 except:
302 err = traceback.format_exc()
311 err = traceback.format_exc()
303 if 'No more files' in err:
312 if 'No more files' in err:
304 log.warning('No more files to read', self.name)
313 log.warning('No more files to read', self.name)
305 else:
314 else:
306 self.err_queue.put('{}|{}'.format(self.name, err))
315 self.err_queue.put('{}|{}'.format(self.name, err))
307 self.dataOut.error = True
316 self.dataOut.error = True
308
317
309 for op, optype, opId, kwargs in self.operations:
318 for op, optype, opId, kwargs in self.operations:
310 if optype == 'self' and not self.dataOut.flagNoData:
319 if optype == 'self' and not self.dataOut.flagNoData:
311 op(**kwargs)
320 op(**kwargs)
312 elif optype == 'other' and not self.dataOut.flagNoData:
321 elif optype == 'other' and not self.dataOut.flagNoData:
313 self.dataOut = op.run(self.dataOut, **self.kwargs)
322 self.dataOut = op.run(self.dataOut, **self.kwargs)
314 elif optype == 'external':
323 elif optype == 'external':
315 self.publish(self.dataOut, opId)
324 self.publish(self.dataOut, opId)
316
325
317 if self.dataOut.flagNoData and not self.dataOut.error:
326 if self.dataOut.flagNoData and not self.dataOut.error:
318 continue
327 continue
319
328
320 self.publish(self.dataOut, self.id)
329 self.publish(self.dataOut, self.id)
321
330
322 if self.dataOut.error:
331 if self.dataOut.error:
323 break
332 break
324
333
325 time.sleep(0.5)
334 time.sleep(0.5)
326
335
327 def runProc(self):
336 def runProc(self):
328 '''
337 '''
329 Run function for proccessing units
338 Run function for proccessing units
330 '''
339 '''
331
340
332 while True:
341 while True:
333 self.dataIn = self.listen()
342 self.dataIn = self.listen()
334
343
335 if self.dataIn.flagNoData and self.dataIn.error is None:
344 if self.dataIn.flagNoData and self.dataIn.error is None:
336 continue
345 continue
337 elif not self.dataIn.error:
346 elif not self.dataIn.error:
338 try:
347 try:
339 BaseClass.run(self, **self.kwargs)
348 BaseClass.run(self, **self.kwargs)
340 except:
349 except:
341 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
350 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
342 self.dataOut.error = True
351 self.dataOut.error = True
343 elif self.dataIn.error:
352 elif self.dataIn.error:
344 self.dataOut.error = self.dataIn.error
353 self.dataOut.error = self.dataIn.error
345 self.dataOut.flagNoData = True
354 self.dataOut.flagNoData = True
346
355
347 for op, optype, opId, kwargs in self.operations:
356 for op, optype, opId, kwargs in self.operations:
348 if optype == 'self' and not self.dataOut.flagNoData:
357 if optype == 'self' and not self.dataOut.flagNoData:
349 op(**kwargs)
358 op(**kwargs)
350 elif optype == 'other' and not self.dataOut.flagNoData:
359 elif optype == 'other' and not self.dataOut.flagNoData:
351 self.dataOut = op.run(self.dataOut, **kwargs)
360 self.dataOut = op.run(self.dataOut, **kwargs)
352 elif optype == 'external' and not self.dataOut.flagNoData:
361 elif optype == 'external' and not self.dataOut.flagNoData:
353 self.publish(self.dataOut, opId)
362 self.publish(self.dataOut, opId)
354
363
355 self.publish(self.dataOut, self.id)
364 self.publish(self.dataOut, self.id)
356 for op, optype, opId, kwargs in self.operations:
365 for op, optype, opId, kwargs in self.operations:
357 if optype == 'external' and self.dataOut.error:
366 if optype == 'external' and self.dataOut.error:
358 self.publish(self.dataOut, opId)
367 self.publish(self.dataOut, opId)
359
368
360 if self.dataOut.error:
369 if self.dataOut.error:
361 break
370 break
362
371
363 time.sleep(0.5)
372 time.sleep(0.5)
364
373
365 def runOp(self):
374 def runOp(self):
366 '''
375 '''
367 Run function for external operations (this operations just receive data
376 Run function for external operations (this operations just receive data
368 ex: plots, writers, publishers)
377 ex: plots, writers, publishers)
369 '''
378 '''
370
379
371 while True:
380 while True:
372
381
373 dataOut = self.listen()
382 dataOut = self.listen()
374
383
375 if not dataOut.error:
384 if not dataOut.error:
385 try:
376 BaseClass.run(self, dataOut, **self.kwargs)
386 BaseClass.run(self, dataOut, **self.kwargs)
387 except:
388 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
389 dataOut.error = True
377 else:
390 else:
378 break
391 break
379
392
380 def run(self):
393 def run(self):
381 if self.typeProc is "ProcUnit":
394 if self.typeProc is "ProcUnit":
382
395
383 if self.inputId is not None:
396 if self.inputId is not None:
384 self.subscribe()
397 self.subscribe()
385
398
386 self.set_publisher()
399 self.set_publisher()
387
400
388 if 'Reader' not in BaseClass.__name__:
401 if 'Reader' not in BaseClass.__name__:
389 self.runProc()
402 self.runProc()
390 else:
403 else:
391 self.runReader()
404 self.runReader()
392
405
393 elif self.typeProc is "Operation":
406 elif self.typeProc is "Operation":
394
407
395 self.subscribe()
408 self.subscribe()
396 self.runOp()
409 self.runOp()
397
410
398 else:
411 else:
399 raise ValueError("Unknown type")
412 raise ValueError("Unknown type")
400
413
401 self.close()
414 self.close()
402
415
403 def close(self):
416 def close(self):
404
417
405 BaseClass.close(self)
418 BaseClass.close(self)
406 self.err_queue.put('#_end_#')
419 self.err_queue.put('#_end_#')
407
420
408 if self.sender:
421 if self.sender:
409 self.sender.close()
422 self.sender.close()
410
423
411 if self.receiver:
424 if self.receiver:
412 self.receiver.close()
425 self.receiver.close()
413
426
414 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
427 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
415
428
416 return MPClass
429 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now