##// END OF EJS Templates
Fix Read function for CLI 'schain xml'
George Yong -
r1184:d00a3ddd0dd0
parent child
Show More
@@ -1,1259 +1,1265
1 '''
1 '''
2 Updated on January , 2018, for multiprocessing purposes
2 Updated on January , 2018, for multiprocessing purposes
3 Author: Sergio Cortez
3 Author: Sergio Cortez
4 Created on September , 2012
4 Created on September , 2012
5 '''
5 '''
6 from platform import python_version
6 from platform import python_version
7 import sys
7 import sys
8 import ast
8 import ast
9 import datetime
9 import datetime
10 import traceback
10 import traceback
11 import math
11 import math
12 import time
12 import time
13 import zmq
13 import zmq
14 from multiprocessing import Process, cpu_count
14 from multiprocessing import Process, cpu_count
15 from threading import Thread
15 from threading import Thread
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 from xml.dom import minidom
17 from xml.dom import minidom
18
18
19
19
20 from schainpy.admin import Alarm, SchainWarning
20 from schainpy.admin import Alarm, SchainWarning
21
21
22 ### Temporary imports!!!
22 ### Temporary imports!!!
23 # from schainpy.model import *
23 # from schainpy.model import *
24 from schainpy.model.io import *
24 from schainpy.model.io import *
25 from schainpy.model.graphics import *
25 from schainpy.model.graphics import *
26 from schainpy.model.proc.jroproc_base import *
26 from schainpy.model.proc.jroproc_base import *
27 from schainpy.model.proc.bltrproc_parameters import *
27 from schainpy.model.proc.bltrproc_parameters import *
28 from schainpy.model.proc.jroproc_spectra import *
28 from schainpy.model.proc.jroproc_spectra import *
29 from schainpy.model.proc.jroproc_voltage import *
29 from schainpy.model.proc.jroproc_voltage import *
30 from schainpy.model.proc.jroproc_parameters import *
30 from schainpy.model.proc.jroproc_parameters import *
31 from schainpy.model.utils.jroutils_publish import *
31 from schainpy.model.utils.jroutils_publish import *
32 from schainpy.utils import log
32 from schainpy.utils import log
33 ###
33 ###
34
34
35 DTYPES = {
35 DTYPES = {
36 'Voltage': '.r',
36 'Voltage': '.r',
37 'Spectra': '.pdata'
37 'Spectra': '.pdata'
38 }
38 }
39
39
40
40
41 def MPProject(project, n=cpu_count()):
41 def MPProject(project, n=cpu_count()):
42 '''
42 '''
43 Project wrapper to run schain in n processes
43 Project wrapper to run schain in n processes
44 '''
44 '''
45
45
46 rconf = project.getReadUnitObj()
46 rconf = project.getReadUnitObj()
47 op = rconf.getOperationObj('run')
47 op = rconf.getOperationObj('run')
48 dt1 = op.getParameterValue('startDate')
48 dt1 = op.getParameterValue('startDate')
49 dt2 = op.getParameterValue('endDate')
49 dt2 = op.getParameterValue('endDate')
50 tm1 = op.getParameterValue('startTime')
50 tm1 = op.getParameterValue('startTime')
51 tm2 = op.getParameterValue('endTime')
51 tm2 = op.getParameterValue('endTime')
52 days = (dt2 - dt1).days
52 days = (dt2 - dt1).days
53
53
54 for day in range(days + 1):
54 for day in range(days + 1):
55 skip = 0
55 skip = 0
56 cursor = 0
56 cursor = 0
57 processes = []
57 processes = []
58 dt = dt1 + datetime.timedelta(day)
58 dt = dt1 + datetime.timedelta(day)
59 dt_str = dt.strftime('%Y/%m/%d')
59 dt_str = dt.strftime('%Y/%m/%d')
60 reader = JRODataReader()
60 reader = JRODataReader()
61 paths, files = reader.searchFilesOffLine(path=rconf.path,
61 paths, files = reader.searchFilesOffLine(path=rconf.path,
62 startDate=dt,
62 startDate=dt,
63 endDate=dt,
63 endDate=dt,
64 startTime=tm1,
64 startTime=tm1,
65 endTime=tm2,
65 endTime=tm2,
66 ext=DTYPES[rconf.datatype])
66 ext=DTYPES[rconf.datatype])
67 nFiles = len(files)
67 nFiles = len(files)
68 if nFiles == 0:
68 if nFiles == 0:
69 continue
69 continue
70 skip = int(math.ceil(nFiles / n))
70 skip = int(math.ceil(nFiles / n))
71 while nFiles > cursor * skip:
71 while nFiles > cursor * skip:
72 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
72 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
73 skip=skip)
73 skip=skip)
74 p = project.clone()
74 p = project.clone()
75 p.start()
75 p.start()
76 processes.append(p)
76 processes.append(p)
77 cursor += 1
77 cursor += 1
78
78
79 def beforeExit(exctype, value, trace):
79 def beforeExit(exctype, value, trace):
80 for process in processes:
80 for process in processes:
81 process.terminate()
81 process.terminate()
82 process.join()
82 process.join()
83 print(traceback.print_tb(trace))
83 print(traceback.print_tb(trace))
84
84
85 sys.excepthook = beforeExit
85 sys.excepthook = beforeExit
86
86
87 for process in processes:
87 for process in processes:
88 process.join()
88 process.join()
89 process.terminate()
89 process.terminate()
90
90
91 time.sleep(3)
91 time.sleep(3)
92
92
93 def wait(context):
93 def wait(context):
94
94
95 time.sleep(1)
95 time.sleep(1)
96 c = zmq.Context()
96 c = zmq.Context()
97 receiver = c.socket(zmq.SUB)
97 receiver = c.socket(zmq.SUB)
98 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
98 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
99 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
99 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
100 log.error('startinggg')
100 log.error('startinggg')
101 msg = receiver.recv_multipart()[1]
101 msg = receiver.recv_multipart()[1]
102 #log.error(msg)
102 #log.error(msg)
103 context.terminate()
103 context.terminate()
104
104
105 class ParameterConf():
105 class ParameterConf():
106
106
107 id = None
107 id = None
108 name = None
108 name = None
109 value = None
109 value = None
110 format = None
110 format = None
111
111
112 __formated_value = None
112 __formated_value = None
113
113
114 ELEMENTNAME = 'Parameter'
114 ELEMENTNAME = 'Parameter'
115
115
116 def __init__(self):
116 def __init__(self):
117
117
118 self.format = 'str'
118 self.format = 'str'
119
119
120 def getElementName(self):
120 def getElementName(self):
121
121
122 return self.ELEMENTNAME
122 return self.ELEMENTNAME
123
123
124 def getValue(self):
124 def getValue(self):
125
125
126 value = self.value
126 value = self.value
127 format = self.format
127 format = self.format
128
128
129 if self.__formated_value != None:
129 if self.__formated_value != None:
130
130
131 return self.__formated_value
131 return self.__formated_value
132
132
133 if format == 'obj':
133 if format == 'obj':
134 return value
134 return value
135
135
136 if format == 'str':
136 if format == 'str':
137 self.__formated_value = str(value)
137 self.__formated_value = str(value)
138 return self.__formated_value
138 return self.__formated_value
139
139
140 if value == '':
140 if value == '':
141 raise ValueError('%s: This parameter value is empty' % self.name)
141 raise ValueError('%s: This parameter value is empty' % self.name)
142
142
143 if format == 'list':
143 if format == 'list':
144 strList = value.split(',')
144 strList = value.split(',')
145
145
146 self.__formated_value = strList
146 self.__formated_value = strList
147
147
148 return self.__formated_value
148 return self.__formated_value
149
149
150 if format == 'intlist':
150 if format == 'intlist':
151 '''
151 '''
152 Example:
152 Example:
153 value = (0,1,2)
153 value = (0,1,2)
154 '''
154 '''
155
155
156 new_value = ast.literal_eval(value)
156 new_value = ast.literal_eval(value)
157
157
158 if type(new_value) not in (tuple, list):
158 if type(new_value) not in (tuple, list):
159 new_value = [int(new_value)]
159 new_value = [int(new_value)]
160
160
161 self.__formated_value = new_value
161 self.__formated_value = new_value
162
162
163 return self.__formated_value
163 return self.__formated_value
164
164
165 if format == 'floatlist':
165 if format == 'floatlist':
166 '''
166 '''
167 Example:
167 Example:
168 value = (0.5, 1.4, 2.7)
168 value = (0.5, 1.4, 2.7)
169 '''
169 '''
170
170
171 new_value = ast.literal_eval(value)
171 new_value = ast.literal_eval(value)
172
172
173 if type(new_value) not in (tuple, list):
173 if type(new_value) not in (tuple, list):
174 new_value = [float(new_value)]
174 new_value = [float(new_value)]
175
175
176 self.__formated_value = new_value
176 self.__formated_value = new_value
177
177
178 return self.__formated_value
178 return self.__formated_value
179
179
180 if format == 'date':
180 if format == 'date':
181 strList = value.split('/')
181 strList = value.split('/')
182 intList = [int(x) for x in strList]
182 intList = [int(x) for x in strList]
183 date = datetime.date(intList[0], intList[1], intList[2])
183 date = datetime.date(intList[0], intList[1], intList[2])
184
184
185 self.__formated_value = date
185 self.__formated_value = date
186
186
187 return self.__formated_value
187 return self.__formated_value
188
188
189 if format == 'time':
189 if format == 'time':
190 strList = value.split(':')
190 strList = value.split(':')
191 intList = [int(x) for x in strList]
191 intList = [int(x) for x in strList]
192 time = datetime.time(intList[0], intList[1], intList[2])
192 time = datetime.time(intList[0], intList[1], intList[2])
193
193
194 self.__formated_value = time
194 self.__formated_value = time
195
195
196 return self.__formated_value
196 return self.__formated_value
197
197
198 if format == 'pairslist':
198 if format == 'pairslist':
199 '''
199 '''
200 Example:
200 Example:
201 value = (0,1),(1,2)
201 value = (0,1),(1,2)
202 '''
202 '''
203
203
204 new_value = ast.literal_eval(value)
204 new_value = ast.literal_eval(value)
205
205
206 if type(new_value) not in (tuple, list):
206 if type(new_value) not in (tuple, list):
207 raise ValueError('%s has to be a tuple or list of pairs' % value)
207 raise ValueError('%s has to be a tuple or list of pairs' % value)
208
208
209 if type(new_value[0]) not in (tuple, list):
209 if type(new_value[0]) not in (tuple, list):
210 if len(new_value) != 2:
210 if len(new_value) != 2:
211 raise ValueError('%s has to be a tuple or list of pairs' % value)
211 raise ValueError('%s has to be a tuple or list of pairs' % value)
212 new_value = [new_value]
212 new_value = [new_value]
213
213
214 for thisPair in new_value:
214 for thisPair in new_value:
215 if len(thisPair) != 2:
215 if len(thisPair) != 2:
216 raise ValueError('%s has to be a tuple or list of pairs' % value)
216 raise ValueError('%s has to be a tuple or list of pairs' % value)
217
217
218 self.__formated_value = new_value
218 self.__formated_value = new_value
219
219
220 return self.__formated_value
220 return self.__formated_value
221
221
222 if format == 'multilist':
222 if format == 'multilist':
223 '''
223 '''
224 Example:
224 Example:
225 value = (0,1,2),(3,4,5)
225 value = (0,1,2),(3,4,5)
226 '''
226 '''
227 multiList = ast.literal_eval(value)
227 multiList = ast.literal_eval(value)
228
228
229 if type(multiList[0]) == int:
229 if type(multiList[0]) == int:
230 multiList = ast.literal_eval('(' + value + ')')
230 multiList = ast.literal_eval('(' + value + ')')
231
231
232 self.__formated_value = multiList
232 self.__formated_value = multiList
233
233
234 return self.__formated_value
234 return self.__formated_value
235
235
236 if format == 'bool':
236 if format == 'bool':
237 value = int(value)
237 value = int(value)
238
238
239 if format == 'int':
239 if format == 'int':
240 value = float(value)
240 value = float(value)
241
241
242 format_func = eval(format)
242 format_func = eval(format)
243
243
244 self.__formated_value = format_func(value)
244 self.__formated_value = format_func(value)
245
245
246 return self.__formated_value
246 return self.__formated_value
247
247
248 def updateId(self, new_id):
248 def updateId(self, new_id):
249
249
250 self.id = str(new_id)
250 self.id = str(new_id)
251
251
252 def setup(self, id, name, value, format='str'):
252 def setup(self, id, name, value, format='str'):
253 self.id = str(id)
253 self.id = str(id)
254 self.name = name
254 self.name = name
255 if format == 'obj':
255 if format == 'obj':
256 self.value = value
256 self.value = value
257 else:
257 else:
258 self.value = str(value)
258 self.value = str(value)
259 self.format = str.lower(format)
259 self.format = str.lower(format)
260
260
261 self.getValue()
261 self.getValue()
262
262
263 return 1
263 return 1
264
264
265 def update(self, name, value, format='str'):
265 def update(self, name, value, format='str'):
266
266
267 self.name = name
267 self.name = name
268 self.value = str(value)
268 self.value = str(value)
269 self.format = format
269 self.format = format
270
270
271 def makeXml(self, opElement):
271 def makeXml(self, opElement):
272 if self.name not in ('queue',):
272 if self.name not in ('queue',):
273 parmElement = SubElement(opElement, self.ELEMENTNAME)
273 parmElement = SubElement(opElement, self.ELEMENTNAME)
274 parmElement.set('id', str(self.id))
274 parmElement.set('id', str(self.id))
275 parmElement.set('name', self.name)
275 parmElement.set('name', self.name)
276 parmElement.set('value', self.value)
276 parmElement.set('value', self.value)
277 parmElement.set('format', self.format)
277 parmElement.set('format', self.format)
278
278
279 def readXml(self, parmElement):
279 def readXml(self, parmElement):
280
280
281 self.id = parmElement.get('id')
281 self.id = parmElement.get('id')
282 self.name = parmElement.get('name')
282 self.name = parmElement.get('name')
283 self.value = parmElement.get('value')
283 self.value = parmElement.get('value')
284 self.format = str.lower(parmElement.get('format'))
284 self.format = str.lower(parmElement.get('format'))
285
285
286 # Compatible with old signal chain version
286 # Compatible with old signal chain version
287 if self.format == 'int' and self.name == 'idfigure':
287 if self.format == 'int' and self.name == 'idfigure':
288 self.name = 'id'
288 self.name = 'id'
289
289
290 def printattr(self):
290 def printattr(self):
291
291
292 print('Parameter[%s]: name = %s, value = %s, format = %s' % (self.id, self.name, self.value, self.format))
292 print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id))
293
293
294 class OperationConf():
294 class OperationConf():
295
295
296 ELEMENTNAME = 'Operation'
296 ELEMENTNAME = 'Operation'
297
297
298 def __init__(self):
298 def __init__(self):
299
299
300 self.id = '0'
300 self.id = '0'
301 self.name = None
301 self.name = None
302 self.priority = None
302 self.priority = None
303 self.topic = None
303 self.topic = None
304
304
305 def __getNewId(self):
305 def __getNewId(self):
306
306
307 return int(self.id) * 10 + len(self.parmConfObjList) + 1
307 return int(self.id) * 10 + len(self.parmConfObjList) + 1
308
308
309 def getId(self):
309 def getId(self):
310 return self.id
310 return self.id
311
311
312 def updateId(self, new_id):
312 def updateId(self, new_id):
313
313
314 self.id = str(new_id)
314 self.id = str(new_id)
315
315
316 n = 1
316 n = 1
317 for parmObj in self.parmConfObjList:
317 for parmObj in self.parmConfObjList:
318
318
319 idParm = str(int(new_id) * 10 + n)
319 idParm = str(int(new_id) * 10 + n)
320 parmObj.updateId(idParm)
320 parmObj.updateId(idParm)
321
321
322 n += 1
322 n += 1
323
323
324 def getElementName(self):
324 def getElementName(self):
325
325
326 return self.ELEMENTNAME
326 return self.ELEMENTNAME
327
327
328 def getParameterObjList(self):
328 def getParameterObjList(self):
329
329
330 return self.parmConfObjList
330 return self.parmConfObjList
331
331
332 def getParameterObj(self, parameterName):
332 def getParameterObj(self, parameterName):
333
333
334 for parmConfObj in self.parmConfObjList:
334 for parmConfObj in self.parmConfObjList:
335
335
336 if parmConfObj.name != parameterName:
336 if parmConfObj.name != parameterName:
337 continue
337 continue
338
338
339 return parmConfObj
339 return parmConfObj
340
340
341 return None
341 return None
342
342
343 def getParameterObjfromValue(self, parameterValue):
343 def getParameterObjfromValue(self, parameterValue):
344
344
345 for parmConfObj in self.parmConfObjList:
345 for parmConfObj in self.parmConfObjList:
346
346
347 if parmConfObj.getValue() != parameterValue:
347 if parmConfObj.getValue() != parameterValue:
348 continue
348 continue
349
349
350 return parmConfObj.getValue()
350 return parmConfObj.getValue()
351
351
352 return None
352 return None
353
353
354 def getParameterValue(self, parameterName):
354 def getParameterValue(self, parameterName):
355
355
356 parameterObj = self.getParameterObj(parameterName)
356 parameterObj = self.getParameterObj(parameterName)
357
357
358 # if not parameterObj:
358 # if not parameterObj:
359 # return None
359 # return None
360
360
361 value = parameterObj.getValue()
361 value = parameterObj.getValue()
362
362
363 return value
363 return value
364
364
365 def getKwargs(self):
365 def getKwargs(self):
366
366
367 kwargs = {}
367 kwargs = {}
368
368
369 for parmConfObj in self.parmConfObjList:
369 for parmConfObj in self.parmConfObjList:
370 if self.name == 'run' and parmConfObj.name == 'datatype':
370 if self.name == 'run' and parmConfObj.name == 'datatype':
371 continue
371 continue
372
372
373 kwargs[parmConfObj.name] = parmConfObj.getValue()
373 kwargs[parmConfObj.name] = parmConfObj.getValue()
374
374
375 return kwargs
375 return kwargs
376
376
377 def setup(self, id, name, priority, type, project_id):
377 def setup(self, id, name, priority, type, project_id):
378
378
379 self.id = str(id)
379 self.id = str(id)
380 self.project_id = project_id
380 self.project_id = project_id
381 self.name = name
381 self.name = name
382 self.type = type
382 self.type = type
383 self.priority = priority
383 self.priority = priority
384 self.parmConfObjList = []
384 self.parmConfObjList = []
385
385
386 def removeParameters(self):
386 def removeParameters(self):
387
387
388 for obj in self.parmConfObjList:
388 for obj in self.parmConfObjList:
389 del obj
389 del obj
390
390
391 self.parmConfObjList = []
391 self.parmConfObjList = []
392
392
393 def addParameter(self, name, value, format='str'):
393 def addParameter(self, name, value, format='str'):
394
394
395 if value is None:
395 if value is None:
396 return None
396 return None
397 id = self.__getNewId()
397 id = self.__getNewId()
398
398
399 parmConfObj = ParameterConf()
399 parmConfObj = ParameterConf()
400 if not parmConfObj.setup(id, name, value, format):
400 if not parmConfObj.setup(id, name, value, format):
401 return None
401 return None
402
402
403 self.parmConfObjList.append(parmConfObj)
403 self.parmConfObjList.append(parmConfObj)
404
404
405 return parmConfObj
405 return parmConfObj
406
406
407 def changeParameter(self, name, value, format='str'):
407 def changeParameter(self, name, value, format='str'):
408
408
409 parmConfObj = self.getParameterObj(name)
409 parmConfObj = self.getParameterObj(name)
410 parmConfObj.update(name, value, format)
410 parmConfObj.update(name, value, format)
411
411
412 return parmConfObj
412 return parmConfObj
413
413
414 def makeXml(self, procUnitElement):
414 def makeXml(self, procUnitElement):
415
415
416 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
416 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
417 opElement.set('id', str(self.id))
417 opElement.set('id', str(self.id))
418 opElement.set('name', self.name)
418 opElement.set('name', self.name)
419 opElement.set('type', self.type)
419 opElement.set('type', self.type)
420 opElement.set('priority', str(self.priority))
420 opElement.set('priority', str(self.priority))
421
421
422 for parmConfObj in self.parmConfObjList:
422 for parmConfObj in self.parmConfObjList:
423 parmConfObj.makeXml(opElement)
423 parmConfObj.makeXml(opElement)
424
424
425 def readXml(self, opElement):
425 def readXml(self, opElement, project_id):
426
426
427 self.id = opElement.get('id')
427 self.id = opElement.get('id')
428 self.name = opElement.get('name')
428 self.name = opElement.get('name')
429 self.type = opElement.get('type')
429 self.type = opElement.get('type')
430 self.priority = opElement.get('priority')
430 self.priority = opElement.get('priority')
431 self.project_id = str(project_id) #yong
431
432
432 # Compatible with old signal chain version
433 # Compatible with old signal chain version
433 # Use of 'run' method instead 'init'
434 # Use of 'run' method instead 'init'
434 if self.type == 'self' and self.name == 'init':
435 if self.type == 'self' and self.name == 'init':
435 self.name = 'run'
436 self.name = 'run'
436
437
437 self.parmConfObjList = []
438 self.parmConfObjList = []
438
439
439 parmElementList = opElement.iter(ParameterConf().getElementName())
440 parmElementList = opElement.iter(ParameterConf().getElementName())
440
441
441 for parmElement in parmElementList:
442 for parmElement in parmElementList:
442 parmConfObj = ParameterConf()
443 parmConfObj = ParameterConf()
443 parmConfObj.readXml(parmElement)
444 parmConfObj.readXml(parmElement)
444
445
445 # Compatible with old signal chain version
446 # Compatible with old signal chain version
446 # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
447 # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
447 if self.type != 'self' and self.name == 'Plot':
448 if self.type != 'self' and self.name == 'Plot':
448 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
449 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
449 self.name = parmConfObj.value
450 self.name = parmConfObj.value
450 continue
451 continue
451
452
452 self.parmConfObjList.append(parmConfObj)
453 self.parmConfObjList.append(parmConfObj)
453
454
454 def printattr(self):
455 def printattr(self):
455
456
456 print('%s[%s]: name = %s, type = %s, priority = %s' % (self.ELEMENTNAME,
457 print('%s[%s]: name = %s, type = %s, priority = %s, project_id = %s' % (self.ELEMENTNAME,
457 self.id,
458 self.id,
458 self.name,
459 self.name,
459 self.type,
460 self.type,
460 self.priority))
461 self.priority,
462 self.project_id))
461
463
462 for parmConfObj in self.parmConfObjList:
464 for parmConfObj in self.parmConfObjList:
463 parmConfObj.printattr()
465 parmConfObj.printattr()
464
466
465 def createObject(self):
467 def createObject(self):
466
468
467 className = eval(self.name)
469 className = eval(self.name)
468
470
469 if self.type == 'other':
471 if self.type == 'other':
470 opObj = className()
472 opObj = className()
471 elif self.type == 'external':
473 elif self.type == 'external':
472 kwargs = self.getKwargs()
474 kwargs = self.getKwargs()
473 opObj = className(self.id, self.project_id, **kwargs)
475 opObj = className(self.id, self.project_id, **kwargs)
474 opObj.start()
476 opObj.start()
475
477
476 return opObj
478 return opObj
477
479
478 class ProcUnitConf():
480 class ProcUnitConf():
479
481
480 ELEMENTNAME = 'ProcUnit'
482 ELEMENTNAME = 'ProcUnit'
481
483
482 def __init__(self):
484 def __init__(self):
483
485
484 self.id = None
486 self.id = None
485 self.datatype = None
487 self.datatype = None
486 self.name = None
488 self.name = None
487 self.inputId = None
489 self.inputId = None
488 self.opConfObjList = []
490 self.opConfObjList = []
489 self.procUnitObj = None
491 self.procUnitObj = None
490 self.opObjDict = {}
492 self.opObjDict = {}
491
493
492 def __getPriority(self):
494 def __getPriority(self):
493
495
494 return len(self.opConfObjList) + 1
496 return len(self.opConfObjList) + 1
495
497
496 def __getNewId(self):
498 def __getNewId(self):
497
499
498 return int(self.id) * 10 + len(self.opConfObjList) + 1
500 return int(self.id) * 10 + len(self.opConfObjList) + 1
499
501
500 def getElementName(self):
502 def getElementName(self):
501
503
502 return self.ELEMENTNAME
504 return self.ELEMENTNAME
503
505
504 def getId(self):
506 def getId(self):
505
507
506 return self.id
508 return self.id
507
509
508 def updateId(self, new_id):
510 def updateId(self, new_id):
509 '''
511 '''
510 new_id = int(parentId) * 10 + (int(self.id) % 10)
512 new_id = int(parentId) * 10 + (int(self.id) % 10)
511 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
513 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
512
514
513 # If this proc unit has not inputs
515 # If this proc unit has not inputs
514 #if self.inputId == '0':
516 #if self.inputId == '0':
515 #new_inputId = 0
517 #new_inputId = 0
516
518
517 n = 1
519 n = 1
518 for opConfObj in self.opConfObjList:
520 for opConfObj in self.opConfObjList:
519
521
520 idOp = str(int(new_id) * 10 + n)
522 idOp = str(int(new_id) * 10 + n)
521 opConfObj.updateId(idOp)
523 opConfObj.updateId(idOp)
522
524
523 n += 1
525 n += 1
524
526
525 self.parentId = str(parentId)
527 self.parentId = str(parentId)
526 self.id = str(new_id)
528 self.id = str(new_id)
527 #self.inputId = str(new_inputId)
529 #self.inputId = str(new_inputId)
528 '''
530 '''
529 n = 1
531 n = 1
530
532
531 def getInputId(self):
533 def getInputId(self):
532
534
533 return self.inputId
535 return self.inputId
534
536
535 def getOperationObjList(self):
537 def getOperationObjList(self):
536
538
537 return self.opConfObjList
539 return self.opConfObjList
538
540
539 def getOperationObj(self, name=None):
541 def getOperationObj(self, name=None):
540
542
541 for opConfObj in self.opConfObjList:
543 for opConfObj in self.opConfObjList:
542
544
543 if opConfObj.name != name:
545 if opConfObj.name != name:
544 continue
546 continue
545
547
546 return opConfObj
548 return opConfObj
547
549
548 return None
550 return None
549
551
550 def getOpObjfromParamValue(self, value=None):
552 def getOpObjfromParamValue(self, value=None):
551
553
552 for opConfObj in self.opConfObjList:
554 for opConfObj in self.opConfObjList:
553 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
555 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
554 continue
556 continue
555 return opConfObj
557 return opConfObj
556 return None
558 return None
557
559
558 def getProcUnitObj(self):
560 def getProcUnitObj(self):
559
561
560 return self.procUnitObj
562 return self.procUnitObj
561
563
562 def setup(self, project_id, id, name, datatype, inputId):
564 def setup(self, project_id, id, name, datatype, inputId):
563 '''
565 '''
564 id sera el topico a publicar
566 id sera el topico a publicar
565 inputId sera el topico a subscribirse
567 inputId sera el topico a subscribirse
566 '''
568 '''
567
569
568 # Compatible with old signal chain version
570 # Compatible with old signal chain version
569 if datatype == None and name == None:
571 if datatype == None and name == None:
570 raise ValueError('datatype or name should be defined')
572 raise ValueError('datatype or name should be defined')
571
573
572 #Definir una condicion para inputId cuando sea 0
574 #Definir una condicion para inputId cuando sea 0
573
575
574 if name == None:
576 if name == None:
575 if 'Proc' in datatype:
577 if 'Proc' in datatype:
576 name = datatype
578 name = datatype
577 else:
579 else:
578 name = '%sProc' % (datatype)
580 name = '%sProc' % (datatype)
579
581
580 if datatype == None:
582 if datatype == None:
581 datatype = name.replace('Proc', '')
583 datatype = name.replace('Proc', '')
582
584
583 self.id = str(id)
585 self.id = str(id)
584 self.project_id = project_id
586 self.project_id = project_id
585 self.name = name
587 self.name = name
586 self.datatype = datatype
588 self.datatype = datatype
587 self.inputId = inputId
589 self.inputId = inputId
588 self.opConfObjList = []
590 self.opConfObjList = []
589
591
590 self.addOperation(name='run', optype='self')
592 self.addOperation(name='run', optype='self')
591
593
592 def removeOperations(self):
594 def removeOperations(self):
593
595
594 for obj in self.opConfObjList:
596 for obj in self.opConfObjList:
595 del obj
597 del obj
596
598
597 self.opConfObjList = []
599 self.opConfObjList = []
598 self.addOperation(name='run')
600 self.addOperation(name='run')
599
601
600 def addParameter(self, **kwargs):
602 def addParameter(self, **kwargs):
601 '''
603 '''
602 Add parameters to 'run' operation
604 Add parameters to 'run' operation
603 '''
605 '''
604 opObj = self.opConfObjList[0]
606 opObj = self.opConfObjList[0]
605
607
606 opObj.addParameter(**kwargs)
608 opObj.addParameter(**kwargs)
607
609
608 return opObj
610 return opObj
609
611
610 def addOperation(self, name, optype='self'):
612 def addOperation(self, name, optype='self'):
611 '''
613 '''
612 Actualizacion - > proceso comunicacion
614 Actualizacion - > proceso comunicacion
613 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
615 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
614 definir el tipoc de socket o comunicacion ipc++
616 definir el tipoc de socket o comunicacion ipc++
615
617
616 '''
618 '''
617
619
618 id = self.__getNewId()
620 id = self.__getNewId()
619 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
621 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
620 opConfObj = OperationConf()
622 opConfObj = OperationConf()
621 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id)
623 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id)
622 self.opConfObjList.append(opConfObj)
624 self.opConfObjList.append(opConfObj)
623
625
624 return opConfObj
626 return opConfObj
625
627
626 def makeXml(self, projectElement):
628 def makeXml(self, projectElement):
627
629
628 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
630 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
629 procUnitElement.set('id', str(self.id))
631 procUnitElement.set('id', str(self.id))
630 procUnitElement.set('name', self.name)
632 procUnitElement.set('name', self.name)
631 procUnitElement.set('datatype', self.datatype)
633 procUnitElement.set('datatype', self.datatype)
632 procUnitElement.set('inputId', str(self.inputId))
634 procUnitElement.set('inputId', str(self.inputId))
633
635
634 for opConfObj in self.opConfObjList:
636 for opConfObj in self.opConfObjList:
635 opConfObj.makeXml(procUnitElement)
637 opConfObj.makeXml(procUnitElement)
636
638
637 def readXml(self, upElement):
639 def readXml(self, upElement, project_id):
638
640
639 self.id = upElement.get('id')
641 self.id = upElement.get('id')
640 self.name = upElement.get('name')
642 self.name = upElement.get('name')
641 self.datatype = upElement.get('datatype')
643 self.datatype = upElement.get('datatype')
642 self.inputId = upElement.get('inputId')
644 self.inputId = upElement.get('inputId')
645 self.project_id = str(project_id)
643
646
644 if self.ELEMENTNAME == 'ReadUnit':
647 if self.ELEMENTNAME == 'ReadUnit':
645 self.datatype = self.datatype.replace('Reader', '')
648 self.datatype = self.datatype.replace('Reader', '')
646
649
647 if self.ELEMENTNAME == 'ProcUnit':
650 if self.ELEMENTNAME == 'ProcUnit':
648 self.datatype = self.datatype.replace('Proc', '')
651 self.datatype = self.datatype.replace('Proc', '')
649
652
650 if self.inputId == 'None':
653 if self.inputId == 'None':
651 self.inputId = '0'
654 self.inputId = '0'
652
655
653 self.opConfObjList = []
656 self.opConfObjList = []
654
657
655 opElementList = upElement.iter(OperationConf().getElementName())
658 opElementList = upElement.iter(OperationConf().getElementName())
656
659
657 for opElement in opElementList:
660 for opElement in opElementList:
658 opConfObj = OperationConf()
661 opConfObj = OperationConf()
659 opConfObj.readXml(opElement)
662 opConfObj.readXml(opElement, project_id)
660 self.opConfObjList.append(opConfObj)
663 self.opConfObjList.append(opConfObj)
661
664
662 def printattr(self):
665 def printattr(self):
663
666
664 print('%s[%s]: name = %s, datatype = %s, inputId = %s' % (self.ELEMENTNAME,
667 print('%s[%s]: name = %s, datatype = %s, inputId = %s, project_id = %s' % (self.ELEMENTNAME,
665 self.id,
668 self.id,
666 self.name,
669 self.name,
667 self.datatype,
670 self.datatype,
668 self.inputId))
671 self.inputId,
672 self.project_id))
669
673
670 for opConfObj in self.opConfObjList:
674 for opConfObj in self.opConfObjList:
671 opConfObj.printattr()
675 opConfObj.printattr()
672
676
673 def getKwargs(self):
677 def getKwargs(self):
674
678
675 opObj = self.opConfObjList[0]
679 opObj = self.opConfObjList[0]
676 kwargs = opObj.getKwargs()
680 kwargs = opObj.getKwargs()
677
681
678 return kwargs
682 return kwargs
679
683
680 def createObjects(self):
684 def createObjects(self):
681 '''
685 '''
682 Instancia de unidades de procesamiento.
686 Instancia de unidades de procesamiento.
683 '''
687 '''
684 className = eval(self.name)
688 className = eval(self.name)
685 kwargs = self.getKwargs()
689 kwargs = self.getKwargs()
686 procUnitObj = className(self.id, self.inputId, self.project_id, **kwargs) # necesitan saber su id y su entrada por fines de ipc
690 procUnitObj = className(self.id, self.inputId, self.project_id, **kwargs) # necesitan saber su id y su entrada por fines de ipc
687 log.success('creating process...', self.name)
691 log.success('creating process...', self.name)
688
692
689 for opConfObj in self.opConfObjList:
693 for opConfObj in self.opConfObjList:
690
694
691 if opConfObj.type == 'self' and opConfObj.name == 'run':
695 if opConfObj.type == 'self' and opConfObj.name == 'run':
692 continue
696 continue
693 elif opConfObj.type == 'self':
697 elif opConfObj.type == 'self':
694 opObj = getattr(procUnitObj, opConfObj.name)
698 opObj = getattr(procUnitObj, opConfObj.name)
695 else:
699 else:
696 opObj = opConfObj.createObject()
700 opObj = opConfObj.createObject()
697
701
698 log.success('creating operation: {}, type:{}'.format(
702 log.success('creating operation: {}, type:{}'.format(
699 opConfObj.name,
703 opConfObj.name,
700 opConfObj.type), self.name)
704 opConfObj.type), self.name)
701
705
702 procUnitObj.addOperation(opConfObj, opObj)
706 procUnitObj.addOperation(opConfObj, opObj)
703
707
704 procUnitObj.start()
708 procUnitObj.start()
705 self.procUnitObj = procUnitObj
709 self.procUnitObj = procUnitObj
706
710
707 def close(self):
711 def close(self):
708
712
709 for opConfObj in self.opConfObjList:
713 for opConfObj in self.opConfObjList:
710 if opConfObj.type == 'self':
714 if opConfObj.type == 'self':
711 continue
715 continue
712
716
713 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
717 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
714 opObj.close()
718 opObj.close()
715
719
716 self.procUnitObj.close()
720 self.procUnitObj.close()
717
721
718 return
722 return
719
723
720
724
721 class ReadUnitConf(ProcUnitConf):
725 class ReadUnitConf(ProcUnitConf):
722
726
723 ELEMENTNAME = 'ReadUnit'
727 ELEMENTNAME = 'ReadUnit'
724
728
725 def __init__(self):
729 def __init__(self):
726
730
727 self.id = None
731 self.id = None
728 self.datatype = None
732 self.datatype = None
729 self.name = None
733 self.name = None
730 self.inputId = None
734 self.inputId = None
731 self.opConfObjList = []
735 self.opConfObjList = []
732
736
733 def getElementName(self):
737 def getElementName(self):
734
738
735 return self.ELEMENTNAME
739 return self.ELEMENTNAME
736
740
737 def setup(self, project_id, id, name, datatype, path='', startDate='', endDate='',
741 def setup(self, project_id, id, name, datatype, path='', startDate='', endDate='',
738 startTime='', endTime='', server=None, **kwargs):
742 startTime='', endTime='', server=None, **kwargs):
739
743
740
744
741 '''
745 '''
742 *****el id del proceso sera el Topico
746 *****el id del proceso sera el Topico
743
747
744 Adicion de {topic}, si no esta presente -> error
748 Adicion de {topic}, si no esta presente -> error
745 kwargs deben ser trasmitidos en la instanciacion
749 kwargs deben ser trasmitidos en la instanciacion
746
750
747 '''
751 '''
748
752
749 # Compatible with old signal chain version
753 # Compatible with old signal chain version
750 if datatype == None and name == None:
754 if datatype == None and name == None:
751 raise ValueError('datatype or name should be defined')
755 raise ValueError('datatype or name should be defined')
752 if name == None:
756 if name == None:
753 if 'Reader' in datatype:
757 if 'Reader' in datatype:
754 name = datatype
758 name = datatype
755 datatype = name.replace('Reader','')
759 datatype = name.replace('Reader','')
756 else:
760 else:
757 name = '{}Reader'.format(datatype)
761 name = '{}Reader'.format(datatype)
758 if datatype == None:
762 if datatype == None:
759 if 'Reader' in name:
763 if 'Reader' in name:
760 datatype = name.replace('Reader','')
764 datatype = name.replace('Reader','')
761 else:
765 else:
762 datatype = name
766 datatype = name
763 name = '{}Reader'.format(name)
767 name = '{}Reader'.format(name)
764
768
765 self.id = id
769 self.id = id
766 self.project_id = project_id
770 self.project_id = project_id
767 self.name = name
771 self.name = name
768 self.datatype = datatype
772 self.datatype = datatype
769 if path != '':
773 if path != '':
770 self.path = os.path.abspath(path)
774 self.path = os.path.abspath(path)
771 self.startDate = startDate
775 self.startDate = startDate
772 self.endDate = endDate
776 self.endDate = endDate
773 self.startTime = startTime
777 self.startTime = startTime
774 self.endTime = endTime
778 self.endTime = endTime
775 self.server = server
779 self.server = server
776 self.addRunOperation(**kwargs)
780 self.addRunOperation(**kwargs)
777
781
778 def update(self, **kwargs):
782 def update(self, **kwargs):
779
783
780 if 'datatype' in kwargs:
784 if 'datatype' in kwargs:
781 datatype = kwargs.pop('datatype')
785 datatype = kwargs.pop('datatype')
782 if 'Reader' in datatype:
786 if 'Reader' in datatype:
783 self.name = datatype
787 self.name = datatype
784 else:
788 else:
785 self.name = '%sReader' % (datatype)
789 self.name = '%sReader' % (datatype)
786 self.datatype = self.name.replace('Reader', '')
790 self.datatype = self.name.replace('Reader', '')
787
791
788 attrs = ('path', 'startDate', 'endDate',
792 attrs = ('path', 'startDate', 'endDate',
789 'startTime', 'endTime')
793 'startTime', 'endTime')
790
794
791 for attr in attrs:
795 for attr in attrs:
792 if attr in kwargs:
796 if attr in kwargs:
793 setattr(self, attr, kwargs.pop(attr))
797 setattr(self, attr, kwargs.pop(attr))
794
798
795 self.updateRunOperation(**kwargs)
799 self.updateRunOperation(**kwargs)
796
800
797 def removeOperations(self):
801 def removeOperations(self):
798
802
799 for obj in self.opConfObjList:
803 for obj in self.opConfObjList:
800 del obj
804 del obj
801
805
802 self.opConfObjList = []
806 self.opConfObjList = []
803
807
804 def addRunOperation(self, **kwargs):
808 def addRunOperation(self, **kwargs):
805
809
806 opObj = self.addOperation(name='run', optype='self')
810 opObj = self.addOperation(name='run', optype='self')
807
811
808 if self.server is None:
812 if self.server is None:
809 opObj.addParameter(
813 opObj.addParameter(
810 name='datatype', value=self.datatype, format='str')
814 name='datatype', value=self.datatype, format='str')
811 opObj.addParameter(name='path', value=self.path, format='str')
815 opObj.addParameter(name='path', value=self.path, format='str')
812 opObj.addParameter(
816 opObj.addParameter(
813 name='startDate', value=self.startDate, format='date')
817 name='startDate', value=self.startDate, format='date')
814 opObj.addParameter(
818 opObj.addParameter(
815 name='endDate', value=self.endDate, format='date')
819 name='endDate', value=self.endDate, format='date')
816 opObj.addParameter(
820 opObj.addParameter(
817 name='startTime', value=self.startTime, format='time')
821 name='startTime', value=self.startTime, format='time')
818 opObj.addParameter(
822 opObj.addParameter(
819 name='endTime', value=self.endTime, format='time')
823 name='endTime', value=self.endTime, format='time')
820
824
821 for key, value in list(kwargs.items()):
825 for key, value in list(kwargs.items()):
822 opObj.addParameter(name=key, value=value,
826 opObj.addParameter(name=key, value=value,
823 format=type(value).__name__)
827 format=type(value).__name__)
824 else:
828 else:
825 opObj.addParameter(name='server', value=self.server, format='str')
829 opObj.addParameter(name='server', value=self.server, format='str')
826
830
827 return opObj
831 return opObj
828
832
829 def updateRunOperation(self, **kwargs):
833 def updateRunOperation(self, **kwargs):
830
834
831 opObj = self.getOperationObj(name='run')
835 opObj = self.getOperationObj(name='run')
832 opObj.removeParameters()
836 opObj.removeParameters()
833
837
834 opObj.addParameter(name='datatype', value=self.datatype, format='str')
838 opObj.addParameter(name='datatype', value=self.datatype, format='str')
835 opObj.addParameter(name='path', value=self.path, format='str')
839 opObj.addParameter(name='path', value=self.path, format='str')
836 opObj.addParameter(
840 opObj.addParameter(
837 name='startDate', value=self.startDate, format='date')
841 name='startDate', value=self.startDate, format='date')
838 opObj.addParameter(name='endDate', value=self.endDate, format='date')
842 opObj.addParameter(name='endDate', value=self.endDate, format='date')
839 opObj.addParameter(
843 opObj.addParameter(
840 name='startTime', value=self.startTime, format='time')
844 name='startTime', value=self.startTime, format='time')
841 opObj.addParameter(name='endTime', value=self.endTime, format='time')
845 opObj.addParameter(name='endTime', value=self.endTime, format='time')
842
846
843 for key, value in list(kwargs.items()):
847 for key, value in list(kwargs.items()):
844 opObj.addParameter(name=key, value=value,
848 opObj.addParameter(name=key, value=value,
845 format=type(value).__name__)
849 format=type(value).__name__)
846
850
847 return opObj
851 return opObj
848
852
849 def readXml(self, upElement):
853 def readXml(self, upElement, project_id):
850
854
851 self.id = upElement.get('id')
855 self.id = upElement.get('id')
852 self.name = upElement.get('name')
856 self.name = upElement.get('name')
853 self.datatype = upElement.get('datatype')
857 self.datatype = upElement.get('datatype')
858 self.project_id = str(project_id) #yong
854
859
855 if self.ELEMENTNAME == 'ReadUnit':
860 if self.ELEMENTNAME == 'ReadUnit':
856 self.datatype = self.datatype.replace('Reader', '')
861 self.datatype = self.datatype.replace('Reader', '')
857
862
858 self.opConfObjList = []
863 self.opConfObjList = []
859
864
860 opElementList = upElement.iter(OperationConf().getElementName())
865 opElementList = upElement.iter(OperationConf().getElementName())
861
866
862 for opElement in opElementList:
867 for opElement in opElementList:
863 opConfObj = OperationConf()
868 opConfObj = OperationConf()
864 opConfObj.readXml(opElement)
869 opConfObj.readXml(opElement, project_id)
865 self.opConfObjList.append(opConfObj)
870 self.opConfObjList.append(opConfObj)
866
871
867 if opConfObj.name == 'run':
872 if opConfObj.name == 'run':
868 self.path = opConfObj.getParameterValue('path')
873 self.path = opConfObj.getParameterValue('path')
869 self.startDate = opConfObj.getParameterValue('startDate')
874 self.startDate = opConfObj.getParameterValue('startDate')
870 self.endDate = opConfObj.getParameterValue('endDate')
875 self.endDate = opConfObj.getParameterValue('endDate')
871 self.startTime = opConfObj.getParameterValue('startTime')
876 self.startTime = opConfObj.getParameterValue('startTime')
872 self.endTime = opConfObj.getParameterValue('endTime')
877 self.endTime = opConfObj.getParameterValue('endTime')
873
878
874
879
875 class Project(Process):
880 class Project(Process):
876
881
877 ELEMENTNAME = 'Project'
882 ELEMENTNAME = 'Project'
878
883
879 def __init__(self):
884 def __init__(self):
880
885
881 Process.__init__(self)
886 Process.__init__(self)
882 self.id = None
887 self.id = None
883 self.filename = None
888 self.filename = None
884 self.description = None
889 self.description = None
885 self.email = None
890 self.email = None
886 self.alarm = None
891 self.alarm = None
887 self.procUnitConfObjDict = {}
892 self.procUnitConfObjDict = {}
888
893
889 def __getNewId(self):
894 def __getNewId(self):
890
895
891 idList = list(self.procUnitConfObjDict.keys())
896 idList = list(self.procUnitConfObjDict.keys())
892 id = int(self.id) * 10
897 id = int(self.id) * 10
893
898
894 while True:
899 while True:
895 id += 1
900 id += 1
896
901
897 if str(id) in idList:
902 if str(id) in idList:
898 continue
903 continue
899
904
900 break
905 break
901
906
902 return str(id)
907 return str(id)
903
908
904 def getElementName(self):
909 def getElementName(self):
905
910
906 return self.ELEMENTNAME
911 return self.ELEMENTNAME
907
912
908 def getId(self):
913 def getId(self):
909
914
910 return self.id
915 return self.id
911
916
912 def updateId(self, new_id):
917 def updateId(self, new_id):
913
918
914 self.id = str(new_id)
919 self.id = str(new_id)
915
920
916 keyList = list(self.procUnitConfObjDict.keys())
921 keyList = list(self.procUnitConfObjDict.keys())
917 keyList.sort()
922 keyList.sort()
918
923
919 n = 1
924 n = 1
920 newProcUnitConfObjDict = {}
925 newProcUnitConfObjDict = {}
921
926
922 for procKey in keyList:
927 for procKey in keyList:
923
928
924 procUnitConfObj = self.procUnitConfObjDict[procKey]
929 procUnitConfObj = self.procUnitConfObjDict[procKey]
925 idProcUnit = str(int(self.id) * 10 + n)
930 idProcUnit = str(int(self.id) * 10 + n)
926 procUnitConfObj.updateId(idProcUnit)
931 procUnitConfObj.updateId(idProcUnit)
927 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
932 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
928 n += 1
933 n += 1
929
934
930 self.procUnitConfObjDict = newProcUnitConfObjDict
935 self.procUnitConfObjDict = newProcUnitConfObjDict
931
936
932 def setup(self, id=1, name='', description='', email=None, alarm=[]):
937 def setup(self, id=1, name='', description='', email=None, alarm=[]):
933
938
934 print(' ')
939 print(' ')
935 print('*' * 60)
940 print('*' * 60)
936 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
941 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
937 print('*' * 60)
942 print('*' * 60)
938 print("* Python " + python_version() + " *")
943 print("* Python " + python_version() + " *")
939 print('*' * 19)
944 print('*' * 19)
940 print(' ')
945 print(' ')
941 self.id = str(id)
946 self.id = str(id)
942 self.description = description
947 self.description = description
943 self.email = email
948 self.email = email
944 self.alarm = alarm
949 self.alarm = alarm
945
950
946 def update(self, **kwargs):
951 def update(self, **kwargs):
947
952
948 for key, value in list(kwargs.items()):
953 for key, value in list(kwargs.items()):
949 setattr(self, key, value)
954 setattr(self, key, value)
950
955
951 def clone(self):
956 def clone(self):
952
957
953 p = Project()
958 p = Project()
954 p.procUnitConfObjDict = self.procUnitConfObjDict
959 p.procUnitConfObjDict = self.procUnitConfObjDict
955 return p
960 return p
956
961
957 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
962 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
958
963
959 '''
964 '''
960 Actualizacion:
965 Actualizacion:
961 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
966 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
962
967
963 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
968 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
964
969
965 '''
970 '''
966
971
967 if id is None:
972 if id is None:
968 idReadUnit = self.__getNewId()
973 idReadUnit = self.__getNewId()
969 else:
974 else:
970 idReadUnit = str(id)
975 idReadUnit = str(id)
971
976
972 readUnitConfObj = ReadUnitConf()
977 readUnitConfObj = ReadUnitConf()
973 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, **kwargs)
978 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, **kwargs)
974 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
979 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
975
980
976 return readUnitConfObj
981 return readUnitConfObj
977
982
978 def addProcUnit(self, inputId='0', datatype=None, name=None):
983 def addProcUnit(self, inputId='0', datatype=None, name=None):
979
984
980 '''
985 '''
981 Actualizacion:
986 Actualizacion:
982 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
987 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
983 Deberia reemplazar a "inputId"
988 Deberia reemplazar a "inputId"
984
989
985 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
990 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
986 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
991 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
987
992
988 '''
993 '''
989
994
990 idProcUnit = self.__getNewId() #Topico para subscripcion
995 idProcUnit = self.__getNewId() #Topico para subscripcion
991 procUnitConfObj = ProcUnitConf()
996 procUnitConfObj = ProcUnitConf()
992 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId) #topic_read, topic_write,
997 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId) #topic_read, topic_write,
993 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
998 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
994
999
995 return procUnitConfObj
1000 return procUnitConfObj
996
1001
997 def removeProcUnit(self, id):
1002 def removeProcUnit(self, id):
998
1003
999 if id in list(self.procUnitConfObjDict.keys()):
1004 if id in list(self.procUnitConfObjDict.keys()):
1000 self.procUnitConfObjDict.pop(id)
1005 self.procUnitConfObjDict.pop(id)
1001
1006
1002 def getReadUnitId(self):
1007 def getReadUnitId(self):
1003
1008
1004 readUnitConfObj = self.getReadUnitObj()
1009 readUnitConfObj = self.getReadUnitObj()
1005
1010
1006 return readUnitConfObj.id
1011 return readUnitConfObj.id
1007
1012
1008 def getReadUnitObj(self):
1013 def getReadUnitObj(self):
1009
1014
1010 for obj in list(self.procUnitConfObjDict.values()):
1015 for obj in list(self.procUnitConfObjDict.values()):
1011 if obj.getElementName() == 'ReadUnit':
1016 if obj.getElementName() == 'ReadUnit':
1012 return obj
1017 return obj
1013
1018
1014 return None
1019 return None
1015
1020
1016 def getProcUnitObj(self, id=None, name=None):
1021 def getProcUnitObj(self, id=None, name=None):
1017
1022
1018 if id != None:
1023 if id != None:
1019 return self.procUnitConfObjDict[id]
1024 return self.procUnitConfObjDict[id]
1020
1025
1021 if name != None:
1026 if name != None:
1022 return self.getProcUnitObjByName(name)
1027 return self.getProcUnitObjByName(name)
1023
1028
1024 return None
1029 return None
1025
1030
1026 def getProcUnitObjByName(self, name):
1031 def getProcUnitObjByName(self, name):
1027
1032
1028 for obj in list(self.procUnitConfObjDict.values()):
1033 for obj in list(self.procUnitConfObjDict.values()):
1029 if obj.name == name:
1034 if obj.name == name:
1030 return obj
1035 return obj
1031
1036
1032 return None
1037 return None
1033
1038
1034 def procUnitItems(self):
1039 def procUnitItems(self):
1035
1040
1036 return list(self.procUnitConfObjDict.items())
1041 return list(self.procUnitConfObjDict.items())
1037
1042
1038 def makeXml(self):
1043 def makeXml(self):
1039
1044
1040 projectElement = Element('Project')
1045 projectElement = Element('Project')
1041 projectElement.set('id', str(self.id))
1046 projectElement.set('id', str(self.id))
1042 projectElement.set('name', self.name)
1047 projectElement.set('name', self.name)
1043 projectElement.set('description', self.description)
1048 projectElement.set('description', self.description)
1044
1049
1045 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1050 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1046 procUnitConfObj.makeXml(projectElement)
1051 procUnitConfObj.makeXml(projectElement)
1047
1052
1048 self.projectElement = projectElement
1053 self.projectElement = projectElement
1049
1054
1050 def writeXml(self, filename=None):
1055 def writeXml(self, filename=None):
1051
1056
1052 if filename == None:
1057 if filename == None:
1053 if self.filename:
1058 if self.filename:
1054 filename = self.filename
1059 filename = self.filename
1055 else:
1060 else:
1056 filename = 'schain.xml'
1061 filename = 'schain.xml'
1057
1062
1058 if not filename:
1063 if not filename:
1059 print('filename has not been defined. Use setFilename(filename) for do it.')
1064 print('filename has not been defined. Use setFilename(filename) for do it.')
1060 return 0
1065 return 0
1061
1066
1062 abs_file = os.path.abspath(filename)
1067 abs_file = os.path.abspath(filename)
1063
1068
1064 if not os.access(os.path.dirname(abs_file), os.W_OK):
1069 if not os.access(os.path.dirname(abs_file), os.W_OK):
1065 print('No write permission on %s' % os.path.dirname(abs_file))
1070 print('No write permission on %s' % os.path.dirname(abs_file))
1066 return 0
1071 return 0
1067
1072
1068 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1073 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1069 print('File %s already exists and it could not be overwriten' % abs_file)
1074 print('File %s already exists and it could not be overwriten' % abs_file)
1070 return 0
1075 return 0
1071
1076
1072 self.makeXml()
1077 self.makeXml()
1073
1078
1074 ElementTree(self.projectElement).write(abs_file, method='xml')
1079 ElementTree(self.projectElement).write(abs_file, method='xml')
1075
1080
1076 self.filename = abs_file
1081 self.filename = abs_file
1077
1082
1078 return 1
1083 return 1
1079
1084
1080 def readXml(self, filename=None):
1085 def readXml(self, filename=None):
1081
1086
1082 if not filename:
1087 if not filename:
1083 print('filename is not defined')
1088 print('filename is not defined')
1084 return 0
1089 return 0
1085
1090
1086 abs_file = os.path.abspath(filename)
1091 abs_file = os.path.abspath(filename)
1087
1092
1088 if not os.path.isfile(abs_file):
1093 if not os.path.isfile(abs_file):
1089 print('%s file does not exist' % abs_file)
1094 print('%s file does not exist' % abs_file)
1090 return 0
1095 return 0
1091
1096
1092 self.projectElement = None
1097 self.projectElement = None
1093 self.procUnitConfObjDict = {}
1098 self.procUnitConfObjDict = {}
1094
1099
1095 try:
1100 try:
1096 self.projectElement = ElementTree().parse(abs_file)
1101 self.projectElement = ElementTree().parse(abs_file)
1097 except:
1102 except:
1098 print('Error reading %s, verify file format' % filename)
1103 print('Error reading %s, verify file format' % filename)
1099 return 0
1104 return 0
1100
1105
1101 self.project = self.projectElement.tag
1106 self.project = self.projectElement.tag
1102
1107
1103 self.id = self.projectElement.get('id')
1108 self.id = self.projectElement.get('id')
1104 self.name = self.projectElement.get('name')
1109 self.name = self.projectElement.get('name')
1105 self.description = self.projectElement.get('description')
1110 self.description = self.projectElement.get('description')
1106
1111
1107 readUnitElementList = self.projectElement.iter(
1112 readUnitElementList = self.projectElement.iter(
1108 ReadUnitConf().getElementName())
1113 ReadUnitConf().getElementName())
1109
1114
1110 for readUnitElement in readUnitElementList:
1115 for readUnitElement in readUnitElementList:
1111 readUnitConfObj = ReadUnitConf()
1116 readUnitConfObj = ReadUnitConf()
1112 readUnitConfObj.readXml(readUnitElement)
1117 readUnitConfObj.readXml(readUnitElement, self.id)
1113 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1118 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1114
1119
1115 procUnitElementList = self.projectElement.iter(
1120 procUnitElementList = self.projectElement.iter(
1116 ProcUnitConf().getElementName())
1121 ProcUnitConf().getElementName())
1117
1122
1118 for procUnitElement in procUnitElementList:
1123 for procUnitElement in procUnitElementList:
1119 procUnitConfObj = ProcUnitConf()
1124 procUnitConfObj = ProcUnitConf()
1120 procUnitConfObj.readXml(procUnitElement)
1125 procUnitConfObj.readXml(procUnitElement, self.id)
1121 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1126 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1122
1127
1123 self.filename = abs_file
1128 self.filename = abs_file
1124
1129
1125 return 1
1130 return 1
1126
1131
1127 def __str__(self):
1132 def __str__(self):
1128
1133
1129 print('Project[%s]: name = %s, description = %s' % (self.id,
1134 print('Project[%s]: name = %s, description = %s, project_id = %s' % (self.id,
1130 self.name,
1135 self.name,
1131 self.description))
1136 self.description,
1137 self.project_id))
1132
1138
1133 for procUnitConfObj in self.procUnitConfObjDict.values():
1139 for procUnitConfObj in self.procUnitConfObjDict.values():
1134 print(procUnitConfObj)
1140 print(procUnitConfObj)
1135
1141
1136 def createObjects(self):
1142 def createObjects(self):
1137
1143
1138 for procUnitConfObj in self.procUnitConfObjDict.values():
1144 for procUnitConfObj in self.procUnitConfObjDict.values():
1139 procUnitConfObj.createObjects()
1145 procUnitConfObj.createObjects()
1140
1146
1141 def __handleError(self, procUnitConfObj, modes=None, stdout=True):
1147 def __handleError(self, procUnitConfObj, modes=None, stdout=True):
1142
1148
1143 import socket
1149 import socket
1144
1150
1145 if modes is None:
1151 if modes is None:
1146 modes = self.alarm
1152 modes = self.alarm
1147
1153
1148 if not self.alarm:
1154 if not self.alarm:
1149 modes = []
1155 modes = []
1150
1156
1151 err = traceback.format_exception(sys.exc_info()[0],
1157 err = traceback.format_exception(sys.exc_info()[0],
1152 sys.exc_info()[1],
1158 sys.exc_info()[1],
1153 sys.exc_info()[2])
1159 sys.exc_info()[2])
1154
1160
1155 log.error('{}'.format(err[-1]), procUnitConfObj.name)
1161 log.error('{}'.format(err[-1]), procUnitConfObj.name)
1156
1162
1157 message = ''.join(err)
1163 message = ''.join(err)
1158
1164
1159 if stdout:
1165 if stdout:
1160 sys.stderr.write(message)
1166 sys.stderr.write(message)
1161
1167
1162 subject = 'SChain v%s: Error running %s\n' % (
1168 subject = 'SChain v%s: Error running %s\n' % (
1163 schainpy.__version__, procUnitConfObj.name)
1169 schainpy.__version__, procUnitConfObj.name)
1164
1170
1165 subtitle = '%s: %s\n' % (
1171 subtitle = '%s: %s\n' % (
1166 procUnitConfObj.getElementName(), procUnitConfObj.name)
1172 procUnitConfObj.getElementName(), procUnitConfObj.name)
1167 subtitle += 'Hostname: %s\n' % socket.gethostbyname(
1173 subtitle += 'Hostname: %s\n' % socket.gethostbyname(
1168 socket.gethostname())
1174 socket.gethostname())
1169 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
1175 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
1170 subtitle += 'Configuration file: %s\n' % self.filename
1176 subtitle += 'Configuration file: %s\n' % self.filename
1171 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1177 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1172
1178
1173 readUnitConfObj = self.getReadUnitObj()
1179 readUnitConfObj = self.getReadUnitObj()
1174 if readUnitConfObj:
1180 if readUnitConfObj:
1175 subtitle += '\nInput parameters:\n'
1181 subtitle += '\nInput parameters:\n'
1176 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
1182 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
1177 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
1183 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
1178 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
1184 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
1179 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
1185 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
1180 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
1186 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
1181 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1187 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1182
1188
1183 a = Alarm(
1189 a = Alarm(
1184 modes=modes,
1190 modes=modes,
1185 email=self.email,
1191 email=self.email,
1186 message=message,
1192 message=message,
1187 subject=subject,
1193 subject=subject,
1188 subtitle=subtitle,
1194 subtitle=subtitle,
1189 filename=self.filename
1195 filename=self.filename
1190 )
1196 )
1191
1197
1192 return a
1198 return a
1193
1199
1194 def isPaused(self):
1200 def isPaused(self):
1195 return 0
1201 return 0
1196
1202
1197 def isStopped(self):
1203 def isStopped(self):
1198 return 0
1204 return 0
1199
1205
1200 def runController(self):
1206 def runController(self):
1201 '''
1207 '''
1202 returns 0 when this process has been stopped, 1 otherwise
1208 returns 0 when this process has been stopped, 1 otherwise
1203 '''
1209 '''
1204
1210
1205 if self.isPaused():
1211 if self.isPaused():
1206 print('Process suspended')
1212 print('Process suspended')
1207
1213
1208 while True:
1214 while True:
1209 time.sleep(0.1)
1215 time.sleep(0.1)
1210
1216
1211 if not self.isPaused():
1217 if not self.isPaused():
1212 break
1218 break
1213
1219
1214 if self.isStopped():
1220 if self.isStopped():
1215 break
1221 break
1216
1222
1217 print('Process reinitialized')
1223 print('Process reinitialized')
1218
1224
1219 if self.isStopped():
1225 if self.isStopped():
1220 print('Process stopped')
1226 print('Process stopped')
1221 return 0
1227 return 0
1222
1228
1223 return 1
1229 return 1
1224
1230
1225 def setFilename(self, filename):
1231 def setFilename(self, filename):
1226
1232
1227 self.filename = filename
1233 self.filename = filename
1228
1234
1229 def setProxyCom(self):
1235 def setProxyCom(self):
1230
1236
1231 if not os.path.exists('/tmp/schain'):
1237 if not os.path.exists('/tmp/schain'):
1232 os.mkdir('/tmp/schain')
1238 os.mkdir('/tmp/schain')
1233
1239
1234 self.ctx = zmq.Context()
1240 self.ctx = zmq.Context()
1235 xpub = self.ctx.socket(zmq.XPUB)
1241 xpub = self.ctx.socket(zmq.XPUB)
1236 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1242 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1237 xsub = self.ctx.socket(zmq.XSUB)
1243 xsub = self.ctx.socket(zmq.XSUB)
1238 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1244 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1239
1245
1240 try:
1246 try:
1241 zmq.proxy(xpub, xsub)
1247 zmq.proxy(xpub, xsub)
1242 except zmq.ContextTerminated:
1248 except zmq.ContextTerminated:
1243 xpub.close()
1249 xpub.close()
1244 xsub.close()
1250 xsub.close()
1245
1251
1246 def run(self):
1252 def run(self):
1247
1253
1248 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1254 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1249 self.start_time = time.time()
1255 self.start_time = time.time()
1250 self.createObjects()
1256 self.createObjects()
1251 # t = Thread(target=wait, args=(self.ctx, ))
1257 # t = Thread(target=wait, args=(self.ctx, ))
1252 # t.start()
1258 # t.start()
1253 self.setProxyCom()
1259 self.setProxyCom()
1254
1260
1255 # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo
1261 # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo
1256
1262
1257 log.success('{} finished (time: {}s)'.format(
1263 log.success('{} finished (time: {}s)'.format(
1258 self.name,
1264 self.name,
1259 time.time()-self.start_time))
1265 time.time()-self.start_time))
General Comments 0
You need to be logged in to leave comments. Login now