##// END OF EJS Templates
exiting processes properly
José Chávez -
r924:8925c3feabcd
parent child
Show More
@@ -1,1323 +1,1321
1 '''
1 '''
2 Created on September , 2012
2 Created on September , 2012
3 @author:
3 @author:
4 '''
4 '''
5
5
6 import sys
6 import sys
7 import ast
7 import ast
8 import datetime
8 import datetime
9 import traceback
9 import traceback
10 import math
10 import math
11 from multiprocessing import Process, Queue, cpu_count
11 from multiprocessing import Process, Queue, cpu_count
12
12
13 import schainpy
13 import schainpy
14 import schainpy.admin
14 import schainpy.admin
15
15
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 from xml.dom import minidom
17 from xml.dom import minidom
18
18
19 from schainpy.model import *
19 from schainpy.model import *
20 from time import sleep
20 from time import sleep
21
21
22 def prettify(elem):
22 def prettify(elem):
23 """Return a pretty-printed XML string for the Element.
23 """Return a pretty-printed XML string for the Element.
24 """
24 """
25 rough_string = tostring(elem, 'utf-8')
25 rough_string = tostring(elem, 'utf-8')
26 reparsed = minidom.parseString(rough_string)
26 reparsed = minidom.parseString(rough_string)
27 return reparsed.toprettyxml(indent=" ")
27 return reparsed.toprettyxml(indent=" ")
28
28
29 def multiSchain(child, nProcess=cpu_count(), startDate=None, endDate=None, receiver=None):
29 def multiSchain(child, nProcess=cpu_count(), startDate=None, endDate=None, receiver=None):
30 skip = 0
30 skip = 0
31 cursor = 0
31 cursor = 0
32 nFiles = None
32 nFiles = None
33 processes = []
33 processes = []
34
34
35
36
37
35 dt1 = datetime.datetime.strptime(startDate, '%Y/%m/%d')
38 dt1 = datetime.datetime.strptime(startDate, '%Y/%m/%d')
36 dt2 = datetime.datetime.strptime(endDate, '%Y/%m/%d')
39 dt2 = datetime.datetime.strptime(endDate, '%Y/%m/%d')
37 days = (dt2 - dt1).days
40 days = (dt2 - dt1).days
38 print days
41 print days
39 for day in range(days+1):
42 for day in range(days+1):
40 skip = 0
43 skip = 0
41 cursor = 0
44 cursor = 0
42 q = Queue()
45 q = Queue()
43 processes = []
46 processes = []
44 dt = (dt1 + datetime.timedelta(day)).strftime('%Y/%m/%d')
47 dt = (dt1 + datetime.timedelta(day)).strftime('%Y/%m/%d')
45 firstProcess = Process(target=child, args=(cursor, skip, q, dt))
48 firstProcess = Process(target=child, args=(cursor, skip, q, dt))
46 firstProcess.start()
49 firstProcess.start()
47 nFiles = q.get()
50 nFiles = q.get()
48 firstProcess.terminate()
51 firstProcess.terminate()
49 skip = int(math.ceil(nFiles/nProcess))
52 skip = int(math.ceil(nFiles/nProcess))
50 try:
51 while True:
53 while True:
52 processes.append(Process(target=child, args=(cursor, skip, q, dt)))
54 processes.append(Process(target=child, args=(cursor, skip, q, dt)))
53 processes[cursor].start()
55 processes[cursor].start()
54 if nFiles < cursor*skip:
56 if nFiles < cursor*skip:
55 break
57 break
56 cursor += 1
58 cursor += 1
57 except KeyboardInterrupt:
59
60 def beforeExit(exctype, value, trace):
58 for process in processes:
61 for process in processes:
59 process.terminate()
62 process.terminate()
60 process.join()
63 process.join()
61 for process in processes:
64 sys.excepthook = beforeExit
62 process.join()
63 # process.terminate()
64 sleep(3)
65
65
66 try:
67 while True:
68 pass
69 except KeyboardInterrupt:
70 for process in processes:
66 for process in processes:
71 process.terminate()
72 process.join()
67 process.join()
68 process.terminate()
69
70
73
71
74 class ParameterConf():
72 class ParameterConf():
75
73
76 id = None
74 id = None
77 name = None
75 name = None
78 value = None
76 value = None
79 format = None
77 format = None
80
78
81 __formated_value = None
79 __formated_value = None
82
80
83 ELEMENTNAME = 'Parameter'
81 ELEMENTNAME = 'Parameter'
84
82
85 def __init__(self):
83 def __init__(self):
86
84
87 self.format = 'str'
85 self.format = 'str'
88
86
89 def getElementName(self):
87 def getElementName(self):
90
88
91 return self.ELEMENTNAME
89 return self.ELEMENTNAME
92
90
93 def getValue(self):
91 def getValue(self):
94
92
95 value = self.value
93 value = self.value
96 format = self.format
94 format = self.format
97
95
98 if self.__formated_value != None:
96 if self.__formated_value != None:
99
97
100 return self.__formated_value
98 return self.__formated_value
101
99
102 if format == 'obj':
100 if format == 'obj':
103 return value
101 return value
104
102
105 if format == 'str':
103 if format == 'str':
106 self.__formated_value = str(value)
104 self.__formated_value = str(value)
107 return self.__formated_value
105 return self.__formated_value
108
106
109 if value == '':
107 if value == '':
110 raise ValueError, "%s: This parameter value is empty" %self.name
108 raise ValueError, "%s: This parameter value is empty" %self.name
111
109
112 if format == 'list':
110 if format == 'list':
113 strList = value.split(',')
111 strList = value.split(',')
114
112
115 self.__formated_value = strList
113 self.__formated_value = strList
116
114
117 return self.__formated_value
115 return self.__formated_value
118
116
119 if format == 'intlist':
117 if format == 'intlist':
120 """
118 """
121 Example:
119 Example:
122 value = (0,1,2)
120 value = (0,1,2)
123 """
121 """
124
122
125 new_value = ast.literal_eval(value)
123 new_value = ast.literal_eval(value)
126
124
127 if type(new_value) not in (tuple, list):
125 if type(new_value) not in (tuple, list):
128 new_value = [int(new_value)]
126 new_value = [int(new_value)]
129
127
130 self.__formated_value = new_value
128 self.__formated_value = new_value
131
129
132 return self.__formated_value
130 return self.__formated_value
133
131
134 if format == 'floatlist':
132 if format == 'floatlist':
135 """
133 """
136 Example:
134 Example:
137 value = (0.5, 1.4, 2.7)
135 value = (0.5, 1.4, 2.7)
138 """
136 """
139
137
140 new_value = ast.literal_eval(value)
138 new_value = ast.literal_eval(value)
141
139
142 if type(new_value) not in (tuple, list):
140 if type(new_value) not in (tuple, list):
143 new_value = [float(new_value)]
141 new_value = [float(new_value)]
144
142
145 self.__formated_value = new_value
143 self.__formated_value = new_value
146
144
147 return self.__formated_value
145 return self.__formated_value
148
146
149 if format == 'date':
147 if format == 'date':
150 strList = value.split('/')
148 strList = value.split('/')
151 intList = [int(x) for x in strList]
149 intList = [int(x) for x in strList]
152 date = datetime.date(intList[0], intList[1], intList[2])
150 date = datetime.date(intList[0], intList[1], intList[2])
153
151
154 self.__formated_value = date
152 self.__formated_value = date
155
153
156 return self.__formated_value
154 return self.__formated_value
157
155
158 if format == 'time':
156 if format == 'time':
159 strList = value.split(':')
157 strList = value.split(':')
160 intList = [int(x) for x in strList]
158 intList = [int(x) for x in strList]
161 time = datetime.time(intList[0], intList[1], intList[2])
159 time = datetime.time(intList[0], intList[1], intList[2])
162
160
163 self.__formated_value = time
161 self.__formated_value = time
164
162
165 return self.__formated_value
163 return self.__formated_value
166
164
167 if format == 'pairslist':
165 if format == 'pairslist':
168 """
166 """
169 Example:
167 Example:
170 value = (0,1),(1,2)
168 value = (0,1),(1,2)
171 """
169 """
172
170
173 new_value = ast.literal_eval(value)
171 new_value = ast.literal_eval(value)
174
172
175 if type(new_value) not in (tuple, list):
173 if type(new_value) not in (tuple, list):
176 raise ValueError, "%s has to be a tuple or list of pairs" %value
174 raise ValueError, "%s has to be a tuple or list of pairs" %value
177
175
178 if type(new_value[0]) not in (tuple, list):
176 if type(new_value[0]) not in (tuple, list):
179 if len(new_value) != 2:
177 if len(new_value) != 2:
180 raise ValueError, "%s has to be a tuple or list of pairs" %value
178 raise ValueError, "%s has to be a tuple or list of pairs" %value
181 new_value = [new_value]
179 new_value = [new_value]
182
180
183 for thisPair in new_value:
181 for thisPair in new_value:
184 if len(thisPair) != 2:
182 if len(thisPair) != 2:
185 raise ValueError, "%s has to be a tuple or list of pairs" %value
183 raise ValueError, "%s has to be a tuple or list of pairs" %value
186
184
187 self.__formated_value = new_value
185 self.__formated_value = new_value
188
186
189 return self.__formated_value
187 return self.__formated_value
190
188
191 if format == 'multilist':
189 if format == 'multilist':
192 """
190 """
193 Example:
191 Example:
194 value = (0,1,2),(3,4,5)
192 value = (0,1,2),(3,4,5)
195 """
193 """
196 multiList = ast.literal_eval(value)
194 multiList = ast.literal_eval(value)
197
195
198 if type(multiList[0]) == int:
196 if type(multiList[0]) == int:
199 multiList = ast.literal_eval("(" + value + ")")
197 multiList = ast.literal_eval("(" + value + ")")
200
198
201 self.__formated_value = multiList
199 self.__formated_value = multiList
202
200
203 return self.__formated_value
201 return self.__formated_value
204
202
205 if format == 'bool':
203 if format == 'bool':
206 value = int(value)
204 value = int(value)
207
205
208 if format == 'int':
206 if format == 'int':
209 value = float(value)
207 value = float(value)
210
208
211 format_func = eval(format)
209 format_func = eval(format)
212
210
213 self.__formated_value = format_func(value)
211 self.__formated_value = format_func(value)
214
212
215 return self.__formated_value
213 return self.__formated_value
216
214
217 def updateId(self, new_id):
215 def updateId(self, new_id):
218
216
219 self.id = str(new_id)
217 self.id = str(new_id)
220
218
221 def setup(self, id, name, value, format='str'):
219 def setup(self, id, name, value, format='str'):
222
220
223 self.id = str(id)
221 self.id = str(id)
224 self.name = name
222 self.name = name
225 if format == 'obj':
223 if format == 'obj':
226 self.value = value
224 self.value = value
227 else:
225 else:
228 self.value = str(value)
226 self.value = str(value)
229 self.format = str.lower(format)
227 self.format = str.lower(format)
230
228
231 self.getValue()
229 self.getValue()
232
230
233 return 1
231 return 1
234
232
235 def update(self, name, value, format='str'):
233 def update(self, name, value, format='str'):
236
234
237 self.name = name
235 self.name = name
238 self.value = str(value)
236 self.value = str(value)
239 self.format = format
237 self.format = format
240
238
241 def makeXml(self, opElement):
239 def makeXml(self, opElement):
242 if self.name not in ('queue',):
240 if self.name not in ('queue',):
243 parmElement = SubElement(opElement, self.ELEMENTNAME)
241 parmElement = SubElement(opElement, self.ELEMENTNAME)
244 parmElement.set('id', str(self.id))
242 parmElement.set('id', str(self.id))
245 parmElement.set('name', self.name)
243 parmElement.set('name', self.name)
246 parmElement.set('value', self.value)
244 parmElement.set('value', self.value)
247 parmElement.set('format', self.format)
245 parmElement.set('format', self.format)
248
246
249 def readXml(self, parmElement):
247 def readXml(self, parmElement):
250
248
251 self.id = parmElement.get('id')
249 self.id = parmElement.get('id')
252 self.name = parmElement.get('name')
250 self.name = parmElement.get('name')
253 self.value = parmElement.get('value')
251 self.value = parmElement.get('value')
254 self.format = str.lower(parmElement.get('format'))
252 self.format = str.lower(parmElement.get('format'))
255
253
256 #Compatible with old signal chain version
254 #Compatible with old signal chain version
257 if self.format == 'int' and self.name == 'idfigure':
255 if self.format == 'int' and self.name == 'idfigure':
258 self.name = 'id'
256 self.name = 'id'
259
257
260 def printattr(self):
258 def printattr(self):
261
259
262 print "Parameter[%s]: name = %s, value = %s, format = %s" %(self.id, self.name, self.value, self.format)
260 print "Parameter[%s]: name = %s, value = %s, format = %s" %(self.id, self.name, self.value, self.format)
263
261
264 class OperationConf():
262 class OperationConf():
265
263
266 id = None
264 id = None
267 name = None
265 name = None
268 priority = None
266 priority = None
269 type = None
267 type = None
270
268
271 parmConfObjList = []
269 parmConfObjList = []
272
270
273 ELEMENTNAME = 'Operation'
271 ELEMENTNAME = 'Operation'
274
272
275 def __init__(self):
273 def __init__(self):
276
274
277 self.id = '0'
275 self.id = '0'
278 self.name = None
276 self.name = None
279 self.priority = None
277 self.priority = None
280 self.type = 'self'
278 self.type = 'self'
281
279
282
280
283 def __getNewId(self):
281 def __getNewId(self):
284
282
285 return int(self.id)*10 + len(self.parmConfObjList) + 1
283 return int(self.id)*10 + len(self.parmConfObjList) + 1
286
284
287 def updateId(self, new_id):
285 def updateId(self, new_id):
288
286
289 self.id = str(new_id)
287 self.id = str(new_id)
290
288
291 n = 1
289 n = 1
292 for parmObj in self.parmConfObjList:
290 for parmObj in self.parmConfObjList:
293
291
294 idParm = str(int(new_id)*10 + n)
292 idParm = str(int(new_id)*10 + n)
295 parmObj.updateId(idParm)
293 parmObj.updateId(idParm)
296
294
297 n += 1
295 n += 1
298
296
299 def getElementName(self):
297 def getElementName(self):
300
298
301 return self.ELEMENTNAME
299 return self.ELEMENTNAME
302
300
303 def getParameterObjList(self):
301 def getParameterObjList(self):
304
302
305 return self.parmConfObjList
303 return self.parmConfObjList
306
304
307 def getParameterObj(self, parameterName):
305 def getParameterObj(self, parameterName):
308
306
309 for parmConfObj in self.parmConfObjList:
307 for parmConfObj in self.parmConfObjList:
310
308
311 if parmConfObj.name != parameterName:
309 if parmConfObj.name != parameterName:
312 continue
310 continue
313
311
314 return parmConfObj
312 return parmConfObj
315
313
316 return None
314 return None
317
315
318 def getParameterObjfromValue(self, parameterValue):
316 def getParameterObjfromValue(self, parameterValue):
319
317
320 for parmConfObj in self.parmConfObjList:
318 for parmConfObj in self.parmConfObjList:
321
319
322 if parmConfObj.getValue() != parameterValue:
320 if parmConfObj.getValue() != parameterValue:
323 continue
321 continue
324
322
325 return parmConfObj.getValue()
323 return parmConfObj.getValue()
326
324
327 return None
325 return None
328
326
329 def getParameterValue(self, parameterName):
327 def getParameterValue(self, parameterName):
330
328
331 parameterObj = self.getParameterObj(parameterName)
329 parameterObj = self.getParameterObj(parameterName)
332
330
333 # if not parameterObj:
331 # if not parameterObj:
334 # return None
332 # return None
335
333
336 value = parameterObj.getValue()
334 value = parameterObj.getValue()
337
335
338 return value
336 return value
339
337
340
338
341 def getKwargs(self):
339 def getKwargs(self):
342
340
343 kwargs = {}
341 kwargs = {}
344
342
345 for parmConfObj in self.parmConfObjList:
343 for parmConfObj in self.parmConfObjList:
346 if self.name == 'run' and parmConfObj.name == 'datatype':
344 if self.name == 'run' and parmConfObj.name == 'datatype':
347 continue
345 continue
348
346
349 kwargs[parmConfObj.name] = parmConfObj.getValue()
347 kwargs[parmConfObj.name] = parmConfObj.getValue()
350
348
351 return kwargs
349 return kwargs
352
350
353 def setup(self, id, name, priority, type):
351 def setup(self, id, name, priority, type):
354
352
355 self.id = str(id)
353 self.id = str(id)
356 self.name = name
354 self.name = name
357 self.type = type
355 self.type = type
358 self.priority = priority
356 self.priority = priority
359
357
360 self.parmConfObjList = []
358 self.parmConfObjList = []
361
359
362 def removeParameters(self):
360 def removeParameters(self):
363
361
364 for obj in self.parmConfObjList:
362 for obj in self.parmConfObjList:
365 del obj
363 del obj
366
364
367 self.parmConfObjList = []
365 self.parmConfObjList = []
368
366
369 def addParameter(self, name, value, format='str'):
367 def addParameter(self, name, value, format='str'):
370
368
371 id = self.__getNewId()
369 id = self.__getNewId()
372
370
373 parmConfObj = ParameterConf()
371 parmConfObj = ParameterConf()
374 if not parmConfObj.setup(id, name, value, format):
372 if not parmConfObj.setup(id, name, value, format):
375 return None
373 return None
376
374
377 self.parmConfObjList.append(parmConfObj)
375 self.parmConfObjList.append(parmConfObj)
378
376
379 return parmConfObj
377 return parmConfObj
380
378
381 def changeParameter(self, name, value, format='str'):
379 def changeParameter(self, name, value, format='str'):
382
380
383 parmConfObj = self.getParameterObj(name)
381 parmConfObj = self.getParameterObj(name)
384 parmConfObj.update(name, value, format)
382 parmConfObj.update(name, value, format)
385
383
386 return parmConfObj
384 return parmConfObj
387
385
388 def makeXml(self, procUnitElement):
386 def makeXml(self, procUnitElement):
389
387
390 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
388 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
391 opElement.set('id', str(self.id))
389 opElement.set('id', str(self.id))
392 opElement.set('name', self.name)
390 opElement.set('name', self.name)
393 opElement.set('type', self.type)
391 opElement.set('type', self.type)
394 opElement.set('priority', str(self.priority))
392 opElement.set('priority', str(self.priority))
395
393
396 for parmConfObj in self.parmConfObjList:
394 for parmConfObj in self.parmConfObjList:
397 parmConfObj.makeXml(opElement)
395 parmConfObj.makeXml(opElement)
398
396
399 def readXml(self, opElement):
397 def readXml(self, opElement):
400
398
401 self.id = opElement.get('id')
399 self.id = opElement.get('id')
402 self.name = opElement.get('name')
400 self.name = opElement.get('name')
403 self.type = opElement.get('type')
401 self.type = opElement.get('type')
404 self.priority = opElement.get('priority')
402 self.priority = opElement.get('priority')
405
403
406 #Compatible with old signal chain version
404 #Compatible with old signal chain version
407 #Use of 'run' method instead 'init'
405 #Use of 'run' method instead 'init'
408 if self.type == 'self' and self.name == 'init':
406 if self.type == 'self' and self.name == 'init':
409 self.name = 'run'
407 self.name = 'run'
410
408
411 self.parmConfObjList = []
409 self.parmConfObjList = []
412
410
413 parmElementList = opElement.iter(ParameterConf().getElementName())
411 parmElementList = opElement.iter(ParameterConf().getElementName())
414
412
415 for parmElement in parmElementList:
413 for parmElement in parmElementList:
416 parmConfObj = ParameterConf()
414 parmConfObj = ParameterConf()
417 parmConfObj.readXml(parmElement)
415 parmConfObj.readXml(parmElement)
418
416
419 #Compatible with old signal chain version
417 #Compatible with old signal chain version
420 #If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
418 #If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
421 if self.type != 'self' and self.name == 'Plot':
419 if self.type != 'self' and self.name == 'Plot':
422 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
420 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
423 self.name = parmConfObj.value
421 self.name = parmConfObj.value
424 continue
422 continue
425
423
426 self.parmConfObjList.append(parmConfObj)
424 self.parmConfObjList.append(parmConfObj)
427
425
428 def printattr(self):
426 def printattr(self):
429
427
430 print "%s[%s]: name = %s, type = %s, priority = %s" %(self.ELEMENTNAME,
428 print "%s[%s]: name = %s, type = %s, priority = %s" %(self.ELEMENTNAME,
431 self.id,
429 self.id,
432 self.name,
430 self.name,
433 self.type,
431 self.type,
434 self.priority)
432 self.priority)
435
433
436 for parmConfObj in self.parmConfObjList:
434 for parmConfObj in self.parmConfObjList:
437 parmConfObj.printattr()
435 parmConfObj.printattr()
438
436
439 def createObject(self, plotter_queue=None):
437 def createObject(self, plotter_queue=None):
440
438
441
439
442 if self.type == 'self':
440 if self.type == 'self':
443 raise ValueError, "This operation type cannot be created"
441 raise ValueError, "This operation type cannot be created"
444
442
445 if self.type == 'plotter':
443 if self.type == 'plotter':
446 #Plotter(plotter_name)
444 #Plotter(plotter_name)
447 if not plotter_queue:
445 if not plotter_queue:
448 raise ValueError, "plotter_queue is not defined. Use:\nmyProject = Project()\nmyProject.setPlotterQueue(plotter_queue)"
446 raise ValueError, "plotter_queue is not defined. Use:\nmyProject = Project()\nmyProject.setPlotterQueue(plotter_queue)"
449
447
450 opObj = Plotter(self.name, plotter_queue)
448 opObj = Plotter(self.name, plotter_queue)
451
449
452 if self.type == 'external' or self.type == 'other':
450 if self.type == 'external' or self.type == 'other':
453
451
454 className = eval(self.name)
452 className = eval(self.name)
455 kwargs = self.getKwargs()
453 kwargs = self.getKwargs()
456
454
457 opObj = className(**kwargs)
455 opObj = className(**kwargs)
458
456
459 return opObj
457 return opObj
460
458
461
459
462 class ProcUnitConf():
460 class ProcUnitConf():
463
461
464 id = None
462 id = None
465 name = None
463 name = None
466 datatype = None
464 datatype = None
467 inputId = None
465 inputId = None
468 parentId = None
466 parentId = None
469
467
470 opConfObjList = []
468 opConfObjList = []
471
469
472 procUnitObj = None
470 procUnitObj = None
473 opObjList = []
471 opObjList = []
474
472
475 ELEMENTNAME = 'ProcUnit'
473 ELEMENTNAME = 'ProcUnit'
476
474
477 def __init__(self):
475 def __init__(self):
478
476
479 self.id = None
477 self.id = None
480 self.datatype = None
478 self.datatype = None
481 self.name = None
479 self.name = None
482 self.inputId = None
480 self.inputId = None
483
481
484 self.opConfObjList = []
482 self.opConfObjList = []
485
483
486 self.procUnitObj = None
484 self.procUnitObj = None
487 self.opObjDict = {}
485 self.opObjDict = {}
488
486
489 def __getPriority(self):
487 def __getPriority(self):
490
488
491 return len(self.opConfObjList)+1
489 return len(self.opConfObjList)+1
492
490
493 def __getNewId(self):
491 def __getNewId(self):
494
492
495 return int(self.id)*10 + len(self.opConfObjList) + 1
493 return int(self.id)*10 + len(self.opConfObjList) + 1
496
494
497 def getElementName(self):
495 def getElementName(self):
498
496
499 return self.ELEMENTNAME
497 return self.ELEMENTNAME
500
498
501 def getId(self):
499 def getId(self):
502
500
503 return self.id
501 return self.id
504
502
505 def updateId(self, new_id, parentId=parentId):
503 def updateId(self, new_id, parentId=parentId):
506
504
507
505
508 new_id = int(parentId)*10 + (int(self.id) % 10)
506 new_id = int(parentId)*10 + (int(self.id) % 10)
509 new_inputId = int(parentId)*10 + (int(self.inputId) % 10)
507 new_inputId = int(parentId)*10 + (int(self.inputId) % 10)
510
508
511 #If this proc unit has not inputs
509 #If this proc unit has not inputs
512 if self.inputId == '0':
510 if self.inputId == '0':
513 new_inputId = 0
511 new_inputId = 0
514
512
515 n = 1
513 n = 1
516 for opConfObj in self.opConfObjList:
514 for opConfObj in self.opConfObjList:
517
515
518 idOp = str(int(new_id)*10 + n)
516 idOp = str(int(new_id)*10 + n)
519 opConfObj.updateId(idOp)
517 opConfObj.updateId(idOp)
520
518
521 n += 1
519 n += 1
522
520
523 self.parentId = str(parentId)
521 self.parentId = str(parentId)
524 self.id = str(new_id)
522 self.id = str(new_id)
525 self.inputId = str(new_inputId)
523 self.inputId = str(new_inputId)
526
524
527
525
528 def getInputId(self):
526 def getInputId(self):
529
527
530 return self.inputId
528 return self.inputId
531
529
532 def getOperationObjList(self):
530 def getOperationObjList(self):
533
531
534 return self.opConfObjList
532 return self.opConfObjList
535
533
536 def getOperationObj(self, name=None):
534 def getOperationObj(self, name=None):
537
535
538 for opConfObj in self.opConfObjList:
536 for opConfObj in self.opConfObjList:
539
537
540 if opConfObj.name != name:
538 if opConfObj.name != name:
541 continue
539 continue
542
540
543 return opConfObj
541 return opConfObj
544
542
545 return None
543 return None
546
544
547 def getOpObjfromParamValue(self, value=None):
545 def getOpObjfromParamValue(self, value=None):
548
546
549 for opConfObj in self.opConfObjList:
547 for opConfObj in self.opConfObjList:
550 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
548 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
551 continue
549 continue
552 return opConfObj
550 return opConfObj
553 return None
551 return None
554
552
555 def getProcUnitObj(self):
553 def getProcUnitObj(self):
556
554
557 return self.procUnitObj
555 return self.procUnitObj
558
556
559 def setup(self, id, name, datatype, inputId, parentId=None):
557 def setup(self, id, name, datatype, inputId, parentId=None):
560
558
561 #Compatible with old signal chain version
559 #Compatible with old signal chain version
562 if datatype==None and name==None:
560 if datatype==None and name==None:
563 raise ValueError, "datatype or name should be defined"
561 raise ValueError, "datatype or name should be defined"
564
562
565 if name==None:
563 if name==None:
566 if 'Proc' in datatype:
564 if 'Proc' in datatype:
567 name = datatype
565 name = datatype
568 else:
566 else:
569 name = '%sProc' %(datatype)
567 name = '%sProc' %(datatype)
570
568
571 if datatype==None:
569 if datatype==None:
572 datatype = name.replace('Proc','')
570 datatype = name.replace('Proc','')
573
571
574 self.id = str(id)
572 self.id = str(id)
575 self.name = name
573 self.name = name
576 self.datatype = datatype
574 self.datatype = datatype
577 self.inputId = inputId
575 self.inputId = inputId
578 self.parentId = parentId
576 self.parentId = parentId
579
577
580 self.opConfObjList = []
578 self.opConfObjList = []
581
579
582 self.addOperation(name='run', optype='self')
580 self.addOperation(name='run', optype='self')
583
581
584 def removeOperations(self):
582 def removeOperations(self):
585
583
586 for obj in self.opConfObjList:
584 for obj in self.opConfObjList:
587 del obj
585 del obj
588
586
589 self.opConfObjList = []
587 self.opConfObjList = []
590 self.addOperation(name='run')
588 self.addOperation(name='run')
591
589
592 def addParameter(self, **kwargs):
590 def addParameter(self, **kwargs):
593 '''
591 '''
594 Add parameters to "run" operation
592 Add parameters to "run" operation
595 '''
593 '''
596 opObj = self.opConfObjList[0]
594 opObj = self.opConfObjList[0]
597
595
598 opObj.addParameter(**kwargs)
596 opObj.addParameter(**kwargs)
599
597
600 return opObj
598 return opObj
601
599
602 def addOperation(self, name, optype='self'):
600 def addOperation(self, name, optype='self'):
603
601
604 id = self.__getNewId()
602 id = self.__getNewId()
605 priority = self.__getPriority()
603 priority = self.__getPriority()
606
604
607 opConfObj = OperationConf()
605 opConfObj = OperationConf()
608 opConfObj.setup(id, name=name, priority=priority, type=optype)
606 opConfObj.setup(id, name=name, priority=priority, type=optype)
609
607
610 self.opConfObjList.append(opConfObj)
608 self.opConfObjList.append(opConfObj)
611
609
612 return opConfObj
610 return opConfObj
613
611
614 def makeXml(self, projectElement):
612 def makeXml(self, projectElement):
615
613
616 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
614 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
617 procUnitElement.set('id', str(self.id))
615 procUnitElement.set('id', str(self.id))
618 procUnitElement.set('name', self.name)
616 procUnitElement.set('name', self.name)
619 procUnitElement.set('datatype', self.datatype)
617 procUnitElement.set('datatype', self.datatype)
620 procUnitElement.set('inputId', str(self.inputId))
618 procUnitElement.set('inputId', str(self.inputId))
621
619
622 for opConfObj in self.opConfObjList:
620 for opConfObj in self.opConfObjList:
623 opConfObj.makeXml(procUnitElement)
621 opConfObj.makeXml(procUnitElement)
624
622
625 def readXml(self, upElement):
623 def readXml(self, upElement):
626
624
627 self.id = upElement.get('id')
625 self.id = upElement.get('id')
628 self.name = upElement.get('name')
626 self.name = upElement.get('name')
629 self.datatype = upElement.get('datatype')
627 self.datatype = upElement.get('datatype')
630 self.inputId = upElement.get('inputId')
628 self.inputId = upElement.get('inputId')
631
629
632 if self.ELEMENTNAME == "ReadUnit":
630 if self.ELEMENTNAME == "ReadUnit":
633 self.datatype = self.datatype.replace("Reader", "")
631 self.datatype = self.datatype.replace("Reader", "")
634
632
635 if self.ELEMENTNAME == "ProcUnit":
633 if self.ELEMENTNAME == "ProcUnit":
636 self.datatype = self.datatype.replace("Proc", "")
634 self.datatype = self.datatype.replace("Proc", "")
637
635
638 if self.inputId == 'None':
636 if self.inputId == 'None':
639 self.inputId = '0'
637 self.inputId = '0'
640
638
641 self.opConfObjList = []
639 self.opConfObjList = []
642
640
643 opElementList = upElement.iter(OperationConf().getElementName())
641 opElementList = upElement.iter(OperationConf().getElementName())
644
642
645 for opElement in opElementList:
643 for opElement in opElementList:
646 opConfObj = OperationConf()
644 opConfObj = OperationConf()
647 opConfObj.readXml(opElement)
645 opConfObj.readXml(opElement)
648 self.opConfObjList.append(opConfObj)
646 self.opConfObjList.append(opConfObj)
649
647
650 def printattr(self):
648 def printattr(self):
651
649
652 print "%s[%s]: name = %s, datatype = %s, inputId = %s" %(self.ELEMENTNAME,
650 print "%s[%s]: name = %s, datatype = %s, inputId = %s" %(self.ELEMENTNAME,
653 self.id,
651 self.id,
654 self.name,
652 self.name,
655 self.datatype,
653 self.datatype,
656 self.inputId)
654 self.inputId)
657
655
658 for opConfObj in self.opConfObjList:
656 for opConfObj in self.opConfObjList:
659 opConfObj.printattr()
657 opConfObj.printattr()
660
658
661
659
662 def getKwargs(self):
660 def getKwargs(self):
663
661
664 opObj = self.opConfObjList[0]
662 opObj = self.opConfObjList[0]
665 kwargs = opObj.getKwargs()
663 kwargs = opObj.getKwargs()
666
664
667 return kwargs
665 return kwargs
668
666
669 def createObjects(self, plotter_queue=None):
667 def createObjects(self, plotter_queue=None):
670
668
671 className = eval(self.name)
669 className = eval(self.name)
672 kwargs = self.getKwargs()
670 kwargs = self.getKwargs()
673 procUnitObj = className(**kwargs)
671 procUnitObj = className(**kwargs)
674
672
675 for opConfObj in self.opConfObjList:
673 for opConfObj in self.opConfObjList:
676
674
677 if opConfObj.type=='self' and self.name=='run':
675 if opConfObj.type=='self' and self.name=='run':
678 continue
676 continue
679 elif opConfObj.type=='self':
677 elif opConfObj.type=='self':
680 procUnitObj.addOperationKwargs(opConfObj.id, **opConfObj.getKwargs())
678 procUnitObj.addOperationKwargs(opConfObj.id, **opConfObj.getKwargs())
681 continue
679 continue
682
680
683 opObj = opConfObj.createObject(plotter_queue)
681 opObj = opConfObj.createObject(plotter_queue)
684
682
685 self.opObjDict[opConfObj.id] = opObj
683 self.opObjDict[opConfObj.id] = opObj
686
684
687 procUnitObj.addOperation(opObj, opConfObj.id)
685 procUnitObj.addOperation(opObj, opConfObj.id)
688
686
689 self.procUnitObj = procUnitObj
687 self.procUnitObj = procUnitObj
690
688
691 return procUnitObj
689 return procUnitObj
692
690
693 def run(self):
691 def run(self):
694
692
695 is_ok = False
693 is_ok = False
696
694
697 for opConfObj in self.opConfObjList:
695 for opConfObj in self.opConfObjList:
698
696
699 kwargs = {}
697 kwargs = {}
700 for parmConfObj in opConfObj.getParameterObjList():
698 for parmConfObj in opConfObj.getParameterObjList():
701 if opConfObj.name == 'run' and parmConfObj.name == 'datatype':
699 if opConfObj.name == 'run' and parmConfObj.name == 'datatype':
702 continue
700 continue
703
701
704 kwargs[parmConfObj.name] = parmConfObj.getValue()
702 kwargs[parmConfObj.name] = parmConfObj.getValue()
705
703
706 #ini = time.time()
704 #ini = time.time()
707
705
708 #print "\tRunning the '%s' operation with %s" %(opConfObj.name, opConfObj.id)
706 #print "\tRunning the '%s' operation with %s" %(opConfObj.name, opConfObj.id)
709 sts = self.procUnitObj.call(opType = opConfObj.type,
707 sts = self.procUnitObj.call(opType = opConfObj.type,
710 opName = opConfObj.name,
708 opName = opConfObj.name,
711 opId = opConfObj.id,
709 opId = opConfObj.id,
712 )
710 )
713
711
714 # total_time = time.time() - ini
712 # total_time = time.time() - ini
715 #
713 #
716 # if total_time > 0.002:
714 # if total_time > 0.002:
717 # print "%s::%s took %f seconds" %(self.name, opConfObj.name, total_time)
715 # print "%s::%s took %f seconds" %(self.name, opConfObj.name, total_time)
718
716
719 is_ok = is_ok or sts
717 is_ok = is_ok or sts
720
718
721 return is_ok
719 return is_ok
722
720
723 def close(self):
721 def close(self):
724
722
725 for opConfObj in self.opConfObjList:
723 for opConfObj in self.opConfObjList:
726 if opConfObj.type == 'self':
724 if opConfObj.type == 'self':
727 continue
725 continue
728
726
729 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
727 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
730 opObj.close()
728 opObj.close()
731
729
732 self.procUnitObj.close()
730 self.procUnitObj.close()
733
731
734 return
732 return
735
733
736 class ReadUnitConf(ProcUnitConf):
734 class ReadUnitConf(ProcUnitConf):
737
735
738 path = None
736 path = None
739 startDate = None
737 startDate = None
740 endDate = None
738 endDate = None
741 startTime = None
739 startTime = None
742 endTime = None
740 endTime = None
743
741
744 ELEMENTNAME = 'ReadUnit'
742 ELEMENTNAME = 'ReadUnit'
745
743
746 def __init__(self):
744 def __init__(self):
747
745
748 self.id = None
746 self.id = None
749 self.datatype = None
747 self.datatype = None
750 self.name = None
748 self.name = None
751 self.inputId = None
749 self.inputId = None
752
750
753 self.parentId = None
751 self.parentId = None
754
752
755 self.opConfObjList = []
753 self.opConfObjList = []
756 self.opObjList = []
754 self.opObjList = []
757
755
758 def getElementName(self):
756 def getElementName(self):
759
757
760 return self.ELEMENTNAME
758 return self.ELEMENTNAME
761
759
762 def setup(self, id, name, datatype, path, startDate="", endDate="", startTime="", endTime="", parentId=None, queue=None, **kwargs):
760 def setup(self, id, name, datatype, path, startDate="", endDate="", startTime="", endTime="", parentId=None, queue=None, **kwargs):
763
761
764 #Compatible with old signal chain version
762 #Compatible with old signal chain version
765 if datatype==None and name==None:
763 if datatype==None and name==None:
766 raise ValueError, "datatype or name should be defined"
764 raise ValueError, "datatype or name should be defined"
767
765
768 if name==None:
766 if name==None:
769 if 'Reader' in datatype:
767 if 'Reader' in datatype:
770 name = datatype
768 name = datatype
771 else:
769 else:
772 name = '%sReader' %(datatype)
770 name = '%sReader' %(datatype)
773
771
774 if datatype==None:
772 if datatype==None:
775 datatype = name.replace('Reader','')
773 datatype = name.replace('Reader','')
776
774
777 self.id = id
775 self.id = id
778 self.name = name
776 self.name = name
779 self.datatype = datatype
777 self.datatype = datatype
780
778
781 self.path = os.path.abspath(path)
779 self.path = os.path.abspath(path)
782 self.startDate = startDate
780 self.startDate = startDate
783 self.endDate = endDate
781 self.endDate = endDate
784 self.startTime = startTime
782 self.startTime = startTime
785 self.endTime = endTime
783 self.endTime = endTime
786
784
787 self.inputId = '0'
785 self.inputId = '0'
788 self.parentId = parentId
786 self.parentId = parentId
789 self.queue = queue
787 self.queue = queue
790 self.addRunOperation(**kwargs)
788 self.addRunOperation(**kwargs)
791
789
792 def update(self, datatype, path, startDate, endDate, startTime, endTime, parentId=None, name=None, **kwargs):
790 def update(self, datatype, path, startDate, endDate, startTime, endTime, parentId=None, name=None, **kwargs):
793
791
794 #Compatible with old signal chain version
792 #Compatible with old signal chain version
795 if datatype==None and name==None:
793 if datatype==None and name==None:
796 raise ValueError, "datatype or name should be defined"
794 raise ValueError, "datatype or name should be defined"
797
795
798 if name==None:
796 if name==None:
799 if 'Reader' in datatype:
797 if 'Reader' in datatype:
800 name = datatype
798 name = datatype
801 else:
799 else:
802 name = '%sReader' %(datatype)
800 name = '%sReader' %(datatype)
803
801
804 if datatype==None:
802 if datatype==None:
805 datatype = name.replace('Reader','')
803 datatype = name.replace('Reader','')
806
804
807 self.datatype = datatype
805 self.datatype = datatype
808 self.name = name
806 self.name = name
809 self.path = path
807 self.path = path
810 self.startDate = startDate
808 self.startDate = startDate
811 self.endDate = endDate
809 self.endDate = endDate
812 self.startTime = startTime
810 self.startTime = startTime
813 self.endTime = endTime
811 self.endTime = endTime
814
812
815 self.inputId = '0'
813 self.inputId = '0'
816 self.parentId = parentId
814 self.parentId = parentId
817
815
818 self.updateRunOperation(**kwargs)
816 self.updateRunOperation(**kwargs)
819
817
820 def removeOperations(self):
818 def removeOperations(self):
821
819
822 for obj in self.opConfObjList:
820 for obj in self.opConfObjList:
823 del obj
821 del obj
824
822
825 self.opConfObjList = []
823 self.opConfObjList = []
826
824
827 def addRunOperation(self, **kwargs):
825 def addRunOperation(self, **kwargs):
828
826
829 opObj = self.addOperation(name = 'run', optype = 'self')
827 opObj = self.addOperation(name = 'run', optype = 'self')
830
828
831 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
829 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
832 opObj.addParameter(name='path' , value=self.path, format='str')
830 opObj.addParameter(name='path' , value=self.path, format='str')
833 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
831 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
834 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
832 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
835 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
833 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
836 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
834 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
837 opObj.addParameter(name='queue' , value=self.queue, format='obj')
835 opObj.addParameter(name='queue' , value=self.queue, format='obj')
838
836
839 for key, value in kwargs.items():
837 for key, value in kwargs.items():
840 opObj.addParameter(name=key, value=value, format=type(value).__name__)
838 opObj.addParameter(name=key, value=value, format=type(value).__name__)
841
839
842 return opObj
840 return opObj
843
841
844 def updateRunOperation(self, **kwargs):
842 def updateRunOperation(self, **kwargs):
845
843
846 opObj = self.getOperationObj(name = 'run')
844 opObj = self.getOperationObj(name = 'run')
847 opObj.removeParameters()
845 opObj.removeParameters()
848
846
849 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
847 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
850 opObj.addParameter(name='path' , value=self.path, format='str')
848 opObj.addParameter(name='path' , value=self.path, format='str')
851 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
849 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
852 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
850 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
853 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
851 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
854 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
852 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
855
853
856 for key, value in kwargs.items():
854 for key, value in kwargs.items():
857 opObj.addParameter(name=key, value=value, format=type(value).__name__)
855 opObj.addParameter(name=key, value=value, format=type(value).__name__)
858
856
859 return opObj
857 return opObj
860
858
861 # def makeXml(self, projectElement):
859 # def makeXml(self, projectElement):
862 #
860 #
863 # procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
861 # procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
864 # procUnitElement.set('id', str(self.id))
862 # procUnitElement.set('id', str(self.id))
865 # procUnitElement.set('name', self.name)
863 # procUnitElement.set('name', self.name)
866 # procUnitElement.set('datatype', self.datatype)
864 # procUnitElement.set('datatype', self.datatype)
867 # procUnitElement.set('inputId', str(self.inputId))
865 # procUnitElement.set('inputId', str(self.inputId))
868 #
866 #
869 # for opConfObj in self.opConfObjList:
867 # for opConfObj in self.opConfObjList:
870 # opConfObj.makeXml(procUnitElement)
868 # opConfObj.makeXml(procUnitElement)
871
869
872 def readXml(self, upElement):
870 def readXml(self, upElement):
873
871
874 self.id = upElement.get('id')
872 self.id = upElement.get('id')
875 self.name = upElement.get('name')
873 self.name = upElement.get('name')
876 self.datatype = upElement.get('datatype')
874 self.datatype = upElement.get('datatype')
877 self.inputId = upElement.get('inputId')
875 self.inputId = upElement.get('inputId')
878
876
879 if self.ELEMENTNAME == "ReadUnit":
877 if self.ELEMENTNAME == "ReadUnit":
880 self.datatype = self.datatype.replace("Reader", "")
878 self.datatype = self.datatype.replace("Reader", "")
881
879
882 if self.inputId == 'None':
880 if self.inputId == 'None':
883 self.inputId = '0'
881 self.inputId = '0'
884
882
885 self.opConfObjList = []
883 self.opConfObjList = []
886
884
887 opElementList = upElement.iter(OperationConf().getElementName())
885 opElementList = upElement.iter(OperationConf().getElementName())
888
886
889 for opElement in opElementList:
887 for opElement in opElementList:
890 opConfObj = OperationConf()
888 opConfObj = OperationConf()
891 opConfObj.readXml(opElement)
889 opConfObj.readXml(opElement)
892 self.opConfObjList.append(opConfObj)
890 self.opConfObjList.append(opConfObj)
893
891
894 if opConfObj.name == 'run':
892 if opConfObj.name == 'run':
895 self.path = opConfObj.getParameterValue('path')
893 self.path = opConfObj.getParameterValue('path')
896 self.startDate = opConfObj.getParameterValue('startDate')
894 self.startDate = opConfObj.getParameterValue('startDate')
897 self.endDate = opConfObj.getParameterValue('endDate')
895 self.endDate = opConfObj.getParameterValue('endDate')
898 self.startTime = opConfObj.getParameterValue('startTime')
896 self.startTime = opConfObj.getParameterValue('startTime')
899 self.endTime = opConfObj.getParameterValue('endTime')
897 self.endTime = opConfObj.getParameterValue('endTime')
900
898
901 class Project():
899 class Project():
902
900
903 id = None
901 id = None
904 name = None
902 name = None
905 description = None
903 description = None
906 filename = None
904 filename = None
907
905
908 procUnitConfObjDict = None
906 procUnitConfObjDict = None
909
907
910 ELEMENTNAME = 'Project'
908 ELEMENTNAME = 'Project'
911
909
912 plotterQueue = None
910 plotterQueue = None
913
911
914 def __init__(self, plotter_queue=None):
912 def __init__(self, plotter_queue=None):
915
913
916 self.id = None
914 self.id = None
917 self.name = None
915 self.name = None
918 self.description = None
916 self.description = None
919
917
920 self.plotterQueue = plotter_queue
918 self.plotterQueue = plotter_queue
921
919
922 self.procUnitConfObjDict = {}
920 self.procUnitConfObjDict = {}
923
921
924 def __getNewId(self):
922 def __getNewId(self):
925
923
926 idList = self.procUnitConfObjDict.keys()
924 idList = self.procUnitConfObjDict.keys()
927
925
928 id = int(self.id)*10
926 id = int(self.id)*10
929
927
930 while True:
928 while True:
931 id += 1
929 id += 1
932
930
933 if str(id) in idList:
931 if str(id) in idList:
934 continue
932 continue
935
933
936 break
934 break
937
935
938 return str(id)
936 return str(id)
939
937
940 def getElementName(self):
938 def getElementName(self):
941
939
942 return self.ELEMENTNAME
940 return self.ELEMENTNAME
943
941
944 def getId(self):
942 def getId(self):
945
943
946 return self.id
944 return self.id
947
945
948 def updateId(self, new_id):
946 def updateId(self, new_id):
949
947
950 self.id = str(new_id)
948 self.id = str(new_id)
951
949
952 keyList = self.procUnitConfObjDict.keys()
950 keyList = self.procUnitConfObjDict.keys()
953 keyList.sort()
951 keyList.sort()
954
952
955 n = 1
953 n = 1
956 newProcUnitConfObjDict = {}
954 newProcUnitConfObjDict = {}
957
955
958 for procKey in keyList:
956 for procKey in keyList:
959
957
960 procUnitConfObj = self.procUnitConfObjDict[procKey]
958 procUnitConfObj = self.procUnitConfObjDict[procKey]
961 idProcUnit = str(int(self.id)*10 + n)
959 idProcUnit = str(int(self.id)*10 + n)
962 procUnitConfObj.updateId(idProcUnit, parentId = self.id)
960 procUnitConfObj.updateId(idProcUnit, parentId = self.id)
963
961
964 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
962 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
965 n += 1
963 n += 1
966
964
967 self.procUnitConfObjDict = newProcUnitConfObjDict
965 self.procUnitConfObjDict = newProcUnitConfObjDict
968
966
969 def setup(self, id, name, description):
967 def setup(self, id, name, description):
970
968
971 self.id = str(id)
969 self.id = str(id)
972 self.name = name
970 self.name = name
973 self.description = description
971 self.description = description
974
972
975 def update(self, name, description):
973 def update(self, name, description):
976
974
977 self.name = name
975 self.name = name
978 self.description = description
976 self.description = description
979
977
980 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
978 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
981
979
982 if id is None:
980 if id is None:
983 idReadUnit = self.__getNewId()
981 idReadUnit = self.__getNewId()
984 else:
982 else:
985 idReadUnit = str(id)
983 idReadUnit = str(id)
986
984
987 readUnitConfObj = ReadUnitConf()
985 readUnitConfObj = ReadUnitConf()
988 readUnitConfObj.setup(idReadUnit, name, datatype, parentId=self.id, **kwargs)
986 readUnitConfObj.setup(idReadUnit, name, datatype, parentId=self.id, **kwargs)
989
987
990 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
988 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
991
989
992 return readUnitConfObj
990 return readUnitConfObj
993
991
994 def addProcUnit(self, inputId='0', datatype=None, name=None):
992 def addProcUnit(self, inputId='0', datatype=None, name=None):
995
993
996 idProcUnit = self.__getNewId()
994 idProcUnit = self.__getNewId()
997
995
998 procUnitConfObj = ProcUnitConf()
996 procUnitConfObj = ProcUnitConf()
999 procUnitConfObj.setup(idProcUnit, name, datatype, inputId, parentId=self.id)
997 procUnitConfObj.setup(idProcUnit, name, datatype, inputId, parentId=self.id)
1000
998
1001 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
999 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1002
1000
1003 return procUnitConfObj
1001 return procUnitConfObj
1004
1002
1005 def removeProcUnit(self, id):
1003 def removeProcUnit(self, id):
1006
1004
1007 if id in self.procUnitConfObjDict.keys():
1005 if id in self.procUnitConfObjDict.keys():
1008 self.procUnitConfObjDict.pop(id)
1006 self.procUnitConfObjDict.pop(id)
1009
1007
1010 def getReadUnitId(self):
1008 def getReadUnitId(self):
1011
1009
1012 readUnitConfObj = self.getReadUnitObj()
1010 readUnitConfObj = self.getReadUnitObj()
1013
1011
1014 return readUnitConfObj.id
1012 return readUnitConfObj.id
1015
1013
1016 def getReadUnitObj(self):
1014 def getReadUnitObj(self):
1017
1015
1018 for obj in self.procUnitConfObjDict.values():
1016 for obj in self.procUnitConfObjDict.values():
1019 if obj.getElementName() == "ReadUnit":
1017 if obj.getElementName() == "ReadUnit":
1020 return obj
1018 return obj
1021
1019
1022 return None
1020 return None
1023
1021
1024 def getProcUnitObj(self, id=None, name=None):
1022 def getProcUnitObj(self, id=None, name=None):
1025
1023
1026 if id != None:
1024 if id != None:
1027 return self.procUnitConfObjDict[id]
1025 return self.procUnitConfObjDict[id]
1028
1026
1029 if name != None:
1027 if name != None:
1030 return self.getProcUnitObjByName(name)
1028 return self.getProcUnitObjByName(name)
1031
1029
1032 return None
1030 return None
1033
1031
1034 def getProcUnitObjByName(self, name):
1032 def getProcUnitObjByName(self, name):
1035
1033
1036 for obj in self.procUnitConfObjDict.values():
1034 for obj in self.procUnitConfObjDict.values():
1037 if obj.name == name:
1035 if obj.name == name:
1038 return obj
1036 return obj
1039
1037
1040 return None
1038 return None
1041
1039
1042 def procUnitItems(self):
1040 def procUnitItems(self):
1043
1041
1044 return self.procUnitConfObjDict.items()
1042 return self.procUnitConfObjDict.items()
1045
1043
1046 def makeXml(self):
1044 def makeXml(self):
1047
1045
1048 projectElement = Element('Project')
1046 projectElement = Element('Project')
1049 projectElement.set('id', str(self.id))
1047 projectElement.set('id', str(self.id))
1050 projectElement.set('name', self.name)
1048 projectElement.set('name', self.name)
1051 projectElement.set('description', self.description)
1049 projectElement.set('description', self.description)
1052
1050
1053 for procUnitConfObj in self.procUnitConfObjDict.values():
1051 for procUnitConfObj in self.procUnitConfObjDict.values():
1054 procUnitConfObj.makeXml(projectElement)
1052 procUnitConfObj.makeXml(projectElement)
1055
1053
1056 self.projectElement = projectElement
1054 self.projectElement = projectElement
1057
1055
1058 def writeXml(self, filename=None):
1056 def writeXml(self, filename=None):
1059
1057
1060 if filename == None:
1058 if filename == None:
1061 if self.filename:
1059 if self.filename:
1062 filename = self.filename
1060 filename = self.filename
1063 else:
1061 else:
1064 filename = "schain.xml"
1062 filename = "schain.xml"
1065
1063
1066 if not filename:
1064 if not filename:
1067 print "filename has not been defined. Use setFilename(filename) for do it."
1065 print "filename has not been defined. Use setFilename(filename) for do it."
1068 return 0
1066 return 0
1069
1067
1070 abs_file = os.path.abspath(filename)
1068 abs_file = os.path.abspath(filename)
1071
1069
1072 if not os.access(os.path.dirname(abs_file), os.W_OK):
1070 if not os.access(os.path.dirname(abs_file), os.W_OK):
1073 print "No write permission on %s" %os.path.dirname(abs_file)
1071 print "No write permission on %s" %os.path.dirname(abs_file)
1074 return 0
1072 return 0
1075
1073
1076 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1074 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1077 print "File %s already exists and it could not be overwriten" %abs_file
1075 print "File %s already exists and it could not be overwriten" %abs_file
1078 return 0
1076 return 0
1079
1077
1080 self.makeXml()
1078 self.makeXml()
1081
1079
1082 ElementTree(self.projectElement).write(abs_file, method='xml')
1080 ElementTree(self.projectElement).write(abs_file, method='xml')
1083
1081
1084 self.filename = abs_file
1082 self.filename = abs_file
1085
1083
1086 return 1
1084 return 1
1087
1085
1088 def readXml(self, filename = None):
1086 def readXml(self, filename = None):
1089
1087
1090 if not filename:
1088 if not filename:
1091 print "filename is not defined"
1089 print "filename is not defined"
1092 return 0
1090 return 0
1093
1091
1094 abs_file = os.path.abspath(filename)
1092 abs_file = os.path.abspath(filename)
1095
1093
1096 if not os.path.isfile(abs_file):
1094 if not os.path.isfile(abs_file):
1097 print "%s file does not exist" %abs_file
1095 print "%s file does not exist" %abs_file
1098 return 0
1096 return 0
1099
1097
1100 self.projectElement = None
1098 self.projectElement = None
1101 self.procUnitConfObjDict = {}
1099 self.procUnitConfObjDict = {}
1102
1100
1103 try:
1101 try:
1104 self.projectElement = ElementTree().parse(abs_file)
1102 self.projectElement = ElementTree().parse(abs_file)
1105 except:
1103 except:
1106 print "Error reading %s, verify file format" %filename
1104 print "Error reading %s, verify file format" %filename
1107 return 0
1105 return 0
1108
1106
1109 self.project = self.projectElement.tag
1107 self.project = self.projectElement.tag
1110
1108
1111 self.id = self.projectElement.get('id')
1109 self.id = self.projectElement.get('id')
1112 self.name = self.projectElement.get('name')
1110 self.name = self.projectElement.get('name')
1113 self.description = self.projectElement.get('description')
1111 self.description = self.projectElement.get('description')
1114
1112
1115 readUnitElementList = self.projectElement.iter(ReadUnitConf().getElementName())
1113 readUnitElementList = self.projectElement.iter(ReadUnitConf().getElementName())
1116
1114
1117 for readUnitElement in readUnitElementList:
1115 for readUnitElement in readUnitElementList:
1118 readUnitConfObj = ReadUnitConf()
1116 readUnitConfObj = ReadUnitConf()
1119 readUnitConfObj.readXml(readUnitElement)
1117 readUnitConfObj.readXml(readUnitElement)
1120
1118
1121 if readUnitConfObj.parentId == None:
1119 if readUnitConfObj.parentId == None:
1122 readUnitConfObj.parentId = self.id
1120 readUnitConfObj.parentId = self.id
1123
1121
1124 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1122 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1125
1123
1126 procUnitElementList = self.projectElement.iter(ProcUnitConf().getElementName())
1124 procUnitElementList = self.projectElement.iter(ProcUnitConf().getElementName())
1127
1125
1128 for procUnitElement in procUnitElementList:
1126 for procUnitElement in procUnitElementList:
1129 procUnitConfObj = ProcUnitConf()
1127 procUnitConfObj = ProcUnitConf()
1130 procUnitConfObj.readXml(procUnitElement)
1128 procUnitConfObj.readXml(procUnitElement)
1131
1129
1132 if procUnitConfObj.parentId == None:
1130 if procUnitConfObj.parentId == None:
1133 procUnitConfObj.parentId = self.id
1131 procUnitConfObj.parentId = self.id
1134
1132
1135 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1133 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1136
1134
1137 self.filename = abs_file
1135 self.filename = abs_file
1138
1136
1139 return 1
1137 return 1
1140
1138
1141 def printattr(self):
1139 def printattr(self):
1142
1140
1143 print "Project[%s]: name = %s, description = %s" %(self.id,
1141 print "Project[%s]: name = %s, description = %s" %(self.id,
1144 self.name,
1142 self.name,
1145 self.description)
1143 self.description)
1146
1144
1147 for procUnitConfObj in self.procUnitConfObjDict.values():
1145 for procUnitConfObj in self.procUnitConfObjDict.values():
1148 procUnitConfObj.printattr()
1146 procUnitConfObj.printattr()
1149
1147
1150 def createObjects(self):
1148 def createObjects(self):
1151
1149
1152 for procUnitConfObj in self.procUnitConfObjDict.values():
1150 for procUnitConfObj in self.procUnitConfObjDict.values():
1153 procUnitConfObj.createObjects(self.plotterQueue)
1151 procUnitConfObj.createObjects(self.plotterQueue)
1154
1152
1155 def __connect(self, objIN, thisObj):
1153 def __connect(self, objIN, thisObj):
1156
1154
1157 thisObj.setInput(objIN.getOutputObj())
1155 thisObj.setInput(objIN.getOutputObj())
1158
1156
1159 def connectObjects(self):
1157 def connectObjects(self):
1160
1158
1161 for thisPUConfObj in self.procUnitConfObjDict.values():
1159 for thisPUConfObj in self.procUnitConfObjDict.values():
1162
1160
1163 inputId = thisPUConfObj.getInputId()
1161 inputId = thisPUConfObj.getInputId()
1164
1162
1165 if int(inputId) == 0:
1163 if int(inputId) == 0:
1166 continue
1164 continue
1167
1165
1168 #Get input object
1166 #Get input object
1169 puConfINObj = self.procUnitConfObjDict[inputId]
1167 puConfINObj = self.procUnitConfObjDict[inputId]
1170 puObjIN = puConfINObj.getProcUnitObj()
1168 puObjIN = puConfINObj.getProcUnitObj()
1171
1169
1172 #Get current object
1170 #Get current object
1173 thisPUObj = thisPUConfObj.getProcUnitObj()
1171 thisPUObj = thisPUConfObj.getProcUnitObj()
1174
1172
1175 self.__connect(puObjIN, thisPUObj)
1173 self.__connect(puObjIN, thisPUObj)
1176
1174
1177 def __handleError(self, procUnitConfObj, send_email=True):
1175 def __handleError(self, procUnitConfObj, send_email=True):
1178
1176
1179 import socket
1177 import socket
1180
1178
1181 err = traceback.format_exception(sys.exc_info()[0],
1179 err = traceback.format_exception(sys.exc_info()[0],
1182 sys.exc_info()[1],
1180 sys.exc_info()[1],
1183 sys.exc_info()[2])
1181 sys.exc_info()[2])
1184
1182
1185 print "***** Error occurred in %s *****" %(procUnitConfObj.name)
1183 print "***** Error occurred in %s *****" %(procUnitConfObj.name)
1186 print "***** %s" %err[-1]
1184 print "***** %s" %err[-1]
1187
1185
1188 message = "".join(err)
1186 message = "".join(err)
1189
1187
1190 sys.stderr.write(message)
1188 sys.stderr.write(message)
1191
1189
1192 if not send_email:
1190 if not send_email:
1193 return
1191 return
1194
1192
1195 subject = "SChain v%s: Error running %s\n" %(schainpy.__version__, procUnitConfObj.name)
1193 subject = "SChain v%s: Error running %s\n" %(schainpy.__version__, procUnitConfObj.name)
1196
1194
1197 subtitle = "%s: %s\n" %(procUnitConfObj.getElementName() ,procUnitConfObj.name)
1195 subtitle = "%s: %s\n" %(procUnitConfObj.getElementName() ,procUnitConfObj.name)
1198 subtitle += "Hostname: %s\n" %socket.gethostbyname(socket.gethostname())
1196 subtitle += "Hostname: %s\n" %socket.gethostbyname(socket.gethostname())
1199 subtitle += "Working directory: %s\n" %os.path.abspath("./")
1197 subtitle += "Working directory: %s\n" %os.path.abspath("./")
1200 subtitle += "Configuration file: %s\n" %self.filename
1198 subtitle += "Configuration file: %s\n" %self.filename
1201 subtitle += "Time: %s\n" %str(datetime.datetime.now())
1199 subtitle += "Time: %s\n" %str(datetime.datetime.now())
1202
1200
1203 readUnitConfObj = self.getReadUnitObj()
1201 readUnitConfObj = self.getReadUnitObj()
1204 if readUnitConfObj:
1202 if readUnitConfObj:
1205 subtitle += "\nInput parameters:\n"
1203 subtitle += "\nInput parameters:\n"
1206 subtitle += "[Data path = %s]\n" %readUnitConfObj.path
1204 subtitle += "[Data path = %s]\n" %readUnitConfObj.path
1207 subtitle += "[Data type = %s]\n" %readUnitConfObj.datatype
1205 subtitle += "[Data type = %s]\n" %readUnitConfObj.datatype
1208 subtitle += "[Start date = %s]\n" %readUnitConfObj.startDate
1206 subtitle += "[Start date = %s]\n" %readUnitConfObj.startDate
1209 subtitle += "[End date = %s]\n" %readUnitConfObj.endDate
1207 subtitle += "[End date = %s]\n" %readUnitConfObj.endDate
1210 subtitle += "[Start time = %s]\n" %readUnitConfObj.startTime
1208 subtitle += "[Start time = %s]\n" %readUnitConfObj.startTime
1211 subtitle += "[End time = %s]\n" %readUnitConfObj.endTime
1209 subtitle += "[End time = %s]\n" %readUnitConfObj.endTime
1212
1210
1213 adminObj = schainpy.admin.SchainNotify()
1211 adminObj = schainpy.admin.SchainNotify()
1214 adminObj.sendAlert(message=message,
1212 adminObj.sendAlert(message=message,
1215 subject=subject,
1213 subject=subject,
1216 subtitle=subtitle,
1214 subtitle=subtitle,
1217 filename=self.filename)
1215 filename=self.filename)
1218
1216
1219 def isPaused(self):
1217 def isPaused(self):
1220 return 0
1218 return 0
1221
1219
1222 def isStopped(self):
1220 def isStopped(self):
1223 return 0
1221 return 0
1224
1222
1225 def runController(self):
1223 def runController(self):
1226 """
1224 """
1227 returns 0 when this process has been stopped, 1 otherwise
1225 returns 0 when this process has been stopped, 1 otherwise
1228 """
1226 """
1229
1227
1230 if self.isPaused():
1228 if self.isPaused():
1231 print "Process suspended"
1229 print "Process suspended"
1232
1230
1233 while True:
1231 while True:
1234 sleep(0.1)
1232 sleep(0.1)
1235
1233
1236 if not self.isPaused():
1234 if not self.isPaused():
1237 break
1235 break
1238
1236
1239 if self.isStopped():
1237 if self.isStopped():
1240 break
1238 break
1241
1239
1242 print "Process reinitialized"
1240 print "Process reinitialized"
1243
1241
1244 if self.isStopped():
1242 if self.isStopped():
1245 print "Process stopped"
1243 print "Process stopped"
1246 return 0
1244 return 0
1247
1245
1248 return 1
1246 return 1
1249
1247
1250 def setFilename(self, filename):
1248 def setFilename(self, filename):
1251
1249
1252 self.filename = filename
1250 self.filename = filename
1253
1251
1254 def setPlotterQueue(self, plotter_queue):
1252 def setPlotterQueue(self, plotter_queue):
1255
1253
1256 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1254 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1257
1255
1258 def getPlotterQueue(self):
1256 def getPlotterQueue(self):
1259
1257
1260 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1258 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1261
1259
1262 def useExternalPlotter(self):
1260 def useExternalPlotter(self):
1263
1261
1264 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1262 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1265
1263
1266 def run(self):
1264 def run(self):
1267
1265
1268 print
1266 print
1269 print "*"*60
1267 print "*"*60
1270 print " Starting SIGNAL CHAIN PROCESSING v%s " %schainpy.__version__
1268 print " Starting SIGNAL CHAIN PROCESSING v%s " %schainpy.__version__
1271 print "*"*60
1269 print "*"*60
1272 print
1270 print
1273
1271
1274 keyList = self.procUnitConfObjDict.keys()
1272 keyList = self.procUnitConfObjDict.keys()
1275 keyList.sort()
1273 keyList.sort()
1276
1274
1277 while(True):
1275 while(True):
1278
1276
1279 is_ok = False
1277 is_ok = False
1280
1278
1281 for procKey in keyList:
1279 for procKey in keyList:
1282 # print "Running the '%s' process with %s" %(procUnitConfObj.name, procUnitConfObj.id)
1280 # print "Running the '%s' process with %s" %(procUnitConfObj.name, procUnitConfObj.id)
1283
1281
1284 procUnitConfObj = self.procUnitConfObjDict[procKey]
1282 procUnitConfObj = self.procUnitConfObjDict[procKey]
1285
1283
1286 try:
1284 try:
1287 sts = procUnitConfObj.run()
1285 sts = procUnitConfObj.run()
1288 is_ok = is_ok or sts
1286 is_ok = is_ok or sts
1289 except KeyboardInterrupt:
1287 except KeyboardInterrupt:
1290 is_ok = False
1288 is_ok = False
1291 break
1289 break
1292 except ValueError, e:
1290 except ValueError, e:
1293 sleep(0.5)
1291 sleep(0.5)
1294 self.__handleError(procUnitConfObj, send_email=True)
1292 self.__handleError(procUnitConfObj, send_email=True)
1295 is_ok = False
1293 is_ok = False
1296 break
1294 break
1297 except:
1295 except:
1298 sleep(0.5)
1296 sleep(0.5)
1299 self.__handleError(procUnitConfObj)
1297 self.__handleError(procUnitConfObj)
1300 is_ok = False
1298 is_ok = False
1301 break
1299 break
1302
1300
1303 #If every process unit finished so end process
1301 #If every process unit finished so end process
1304 if not(is_ok):
1302 if not(is_ok):
1305 # print "Every process unit have finished"
1303 # print "Every process unit have finished"
1306 break
1304 break
1307
1305
1308 if not self.runController():
1306 if not self.runController():
1309 break
1307 break
1310
1308
1311 #Closing every process
1309 #Closing every process
1312 for procKey in keyList:
1310 for procKey in keyList:
1313 procUnitConfObj = self.procUnitConfObjDict[procKey]
1311 procUnitConfObj = self.procUnitConfObjDict[procKey]
1314 procUnitConfObj.close()
1312 procUnitConfObj.close()
1315
1313
1316 print "Process finished"
1314 print "Process finished"
1317
1315
1318 def start(self):
1316 def start(self):
1319
1317
1320 self.writeXml()
1318 self.writeXml()
1321 self.createObjects()
1319 self.createObjects()
1322 self.connectObjects()
1320 self.connectObjects()
1323 self.run()
1321 self.run()
@@ -1,1 +1,1
1 <Project description="Segundo Test" id="191" name="test01"><ProcUnit datatype="ReceiverData" id="1911" inputId="0" name="ReceiverData"><Operation id="19111" name="run" priority="1" type="self"><Parameter format="bool" id="191111" name="realtime" value="1" /><Parameter format="str" id="191112" name="plottypes" value="rti" /><Parameter format="str" id="191113" name="plot_server" value="tcp://10.10.10.82:7000" /></Operation></ProcUnit></Project> No newline at end of file
1 <Project description="HF_EXAMPLE" id="191" name="test01"><ReadUnit datatype="SpectraReader" id="1911" inputId="0" name="SpectraReader"><Operation id="19111" name="run" priority="1" type="self"><Parameter format="str" id="191111" name="datatype" value="SpectraReader" /><Parameter format="str" id="191112" name="path" value="/home/nanosat/data/hysell_data20/pdata" /><Parameter format="date" id="191113" name="startDate" value="2015/09/26" /><Parameter format="date" id="191114" name="endDate" value="2015/09/26" /><Parameter format="time" id="191115" name="startTime" value="00:00:00" /><Parameter format="time" id="191116" name="endTime" value="23:59:59" /><Parameter format="int" id="191118" name="cursor" value="33" /><Parameter format="int" id="191119" name="skip" value="22" /><Parameter format="int" id="191120" name="delay" value="10" /><Parameter format="int" id="191121" name="walk" value="1" /><Parameter format="int" id="191122" name="online" value="0" /></Operation></ReadUnit><ProcUnit datatype="Spectra" id="1912" inputId="1911" name="SpectraProc"><Operation id="19121" name="run" priority="1" type="self" /><Operation id="19122" name="PublishData" priority="2" type="other"><Parameter format="int" id="191221" name="zeromq" value="1" /><Parameter format="int" id="191222" name="delay" value="1" /></Operation></ProcUnit></Project> No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now