##// END OF EJS Templates
Fix external operation for Senders
Juan C. Espinoza -
r1298:c763f649a942
parent child
Show More
@@ -1,648 +1,648
1 '''
1 '''
2 Main routines to create a Signal Chain project
2 Main routines to create a Signal Chain project
3 '''
3 '''
4
4
5 import re
5 import re
6 import sys
6 import sys
7 import ast
7 import ast
8 import datetime
8 import datetime
9 import traceback
9 import traceback
10 import time
10 import time
11 from multiprocessing import Process, Queue
11 from multiprocessing import Process, Queue
12 from threading import Thread
12 from threading import Thread
13 from xml.etree.ElementTree import ElementTree, Element, SubElement
13 from xml.etree.ElementTree import ElementTree, Element, SubElement
14
14
15 from schainpy.admin import Alarm, SchainWarning
15 from schainpy.admin import Alarm, SchainWarning
16 from schainpy.model import *
16 from schainpy.model import *
17 from schainpy.utils import log
17 from schainpy.utils import log
18
18
19
19
20 class ConfBase():
20 class ConfBase():
21
21
22 def __init__(self):
22 def __init__(self):
23
23
24 self.id = '0'
24 self.id = '0'
25 self.name = None
25 self.name = None
26 self.priority = None
26 self.priority = None
27 self.parameters = {}
27 self.parameters = {}
28 self.object = None
28 self.object = None
29 self.operations = []
29 self.operations = []
30
30
31 def getId(self):
31 def getId(self):
32
32
33 return self.id
33 return self.id
34
34
35 def getNewId(self):
35 def getNewId(self):
36
36
37 return int(self.id) * 10 + len(self.operations) + 1
37 return int(self.id) * 10 + len(self.operations) + 1
38
38
39 def updateId(self, new_id):
39 def updateId(self, new_id):
40
40
41 self.id = str(new_id)
41 self.id = str(new_id)
42
42
43 n = 1
43 n = 1
44 for conf in self.operations:
44 for conf in self.operations:
45 conf_id = str(int(new_id) * 10 + n)
45 conf_id = str(int(new_id) * 10 + n)
46 conf.updateId(conf_id)
46 conf.updateId(conf_id)
47 n += 1
47 n += 1
48
48
49 def getKwargs(self):
49 def getKwargs(self):
50
50
51 params = {}
51 params = {}
52
52
53 for key, value in self.parameters.items():
53 for key, value in self.parameters.items():
54 if value not in (None, '', ' '):
54 if value not in (None, '', ' '):
55 params[key] = value
55 params[key] = value
56
56
57 return params
57 return params
58
58
59 def update(self, **kwargs):
59 def update(self, **kwargs):
60
60
61 for key, value in kwargs.items():
61 for key, value in kwargs.items():
62 self.addParameter(name=key, value=value)
62 self.addParameter(name=key, value=value)
63
63
64 def addParameter(self, name, value, format=None):
64 def addParameter(self, name, value, format=None):
65 '''
65 '''
66 '''
66 '''
67
67
68 if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
68 if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
69 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
69 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
70 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
70 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
71 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
71 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
72 else:
72 else:
73 try:
73 try:
74 self.parameters[name] = ast.literal_eval(value)
74 self.parameters[name] = ast.literal_eval(value)
75 except:
75 except:
76 if isinstance(value, str) and ',' in value:
76 if isinstance(value, str) and ',' in value:
77 self.parameters[name] = value.split(',')
77 self.parameters[name] = value.split(',')
78 else:
78 else:
79 self.parameters[name] = value
79 self.parameters[name] = value
80
80
81 def getParameters(self):
81 def getParameters(self):
82
82
83 params = {}
83 params = {}
84 for key, value in self.parameters.items():
84 for key, value in self.parameters.items():
85 s = type(value).__name__
85 s = type(value).__name__
86 if s == 'date':
86 if s == 'date':
87 params[key] = value.strftime('%Y/%m/%d')
87 params[key] = value.strftime('%Y/%m/%d')
88 elif s == 'time':
88 elif s == 'time':
89 params[key] = value.strftime('%H:%M:%S')
89 params[key] = value.strftime('%H:%M:%S')
90 else:
90 else:
91 params[key] = str(value)
91 params[key] = str(value)
92
92
93 return params
93 return params
94
94
95 def makeXml(self, element):
95 def makeXml(self, element):
96
96
97 xml = SubElement(element, self.ELEMENTNAME)
97 xml = SubElement(element, self.ELEMENTNAME)
98 for label in self.xml_labels:
98 for label in self.xml_labels:
99 xml.set(label, str(getattr(self, label)))
99 xml.set(label, str(getattr(self, label)))
100
100
101 for key, value in self.getParameters().items():
101 for key, value in self.getParameters().items():
102 xml_param = SubElement(xml, 'Parameter')
102 xml_param = SubElement(xml, 'Parameter')
103 xml_param.set('name', key)
103 xml_param.set('name', key)
104 xml_param.set('value', value)
104 xml_param.set('value', value)
105
105
106 for conf in self.operations:
106 for conf in self.operations:
107 conf.makeXml(xml)
107 conf.makeXml(xml)
108
108
109 def __str__(self):
109 def __str__(self):
110
110
111 if self.ELEMENTNAME == 'Operation':
111 if self.ELEMENTNAME == 'Operation':
112 s = ' {}[id={}]\n'.format(self.name, self.id)
112 s = ' {}[id={}]\n'.format(self.name, self.id)
113 else:
113 else:
114 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
114 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
115
115
116 for key, value in self.parameters.items():
116 for key, value in self.parameters.items():
117 if self.ELEMENTNAME == 'Operation':
117 if self.ELEMENTNAME == 'Operation':
118 s += ' {}: {}\n'.format(key, value)
118 s += ' {}: {}\n'.format(key, value)
119 else:
119 else:
120 s += ' {}: {}\n'.format(key, value)
120 s += ' {}: {}\n'.format(key, value)
121
121
122 for conf in self.operations:
122 for conf in self.operations:
123 s += str(conf)
123 s += str(conf)
124
124
125 return s
125 return s
126
126
127 class OperationConf(ConfBase):
127 class OperationConf(ConfBase):
128
128
129 ELEMENTNAME = 'Operation'
129 ELEMENTNAME = 'Operation'
130 xml_labels = ['id', 'name']
130 xml_labels = ['id', 'name']
131
131
132 def setup(self, id, name, priority, project_id, err_queue):
132 def setup(self, id, name, priority, project_id, err_queue):
133
133
134 self.id = str(id)
134 self.id = str(id)
135 self.project_id = project_id
135 self.project_id = project_id
136 self.name = name
136 self.name = name
137 self.type = 'other'
137 self.type = 'other'
138 self.err_queue = err_queue
138 self.err_queue = err_queue
139
139
140 def readXml(self, element, project_id, err_queue):
140 def readXml(self, element, project_id, err_queue):
141
141
142 self.id = element.get('id')
142 self.id = element.get('id')
143 self.name = element.get('name')
143 self.name = element.get('name')
144 self.type = 'other'
144 self.type = 'other'
145 self.project_id = str(project_id)
145 self.project_id = str(project_id)
146 self.err_queue = err_queue
146 self.err_queue = err_queue
147
147
148 for elm in element.iter('Parameter'):
148 for elm in element.iter('Parameter'):
149 self.addParameter(elm.get('name'), elm.get('value'))
149 self.addParameter(elm.get('name'), elm.get('value'))
150
150
151 def createObject(self):
151 def createObject(self):
152
152
153 className = eval(self.name)
153 className = eval(self.name)
154
154
155 if 'Plot' in self.name or 'Writer' in self.name:
155 if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name:
156 kwargs = self.getKwargs()
156 kwargs = self.getKwargs()
157 opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
157 opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
158 opObj.start()
158 opObj.start()
159 self.type = 'external'
159 self.type = 'external'
160 else:
160 else:
161 opObj = className()
161 opObj = className()
162
162
163 self.object = opObj
163 self.object = opObj
164 return opObj
164 return opObj
165
165
166 class ProcUnitConf(ConfBase):
166 class ProcUnitConf(ConfBase):
167
167
168 ELEMENTNAME = 'ProcUnit'
168 ELEMENTNAME = 'ProcUnit'
169 xml_labels = ['id', 'inputId', 'name']
169 xml_labels = ['id', 'inputId', 'name']
170
170
171 def setup(self, project_id, id, name, datatype, inputId, err_queue):
171 def setup(self, project_id, id, name, datatype, inputId, err_queue):
172 '''
172 '''
173 '''
173 '''
174
174
175 if datatype == None and name == None:
175 if datatype == None and name == None:
176 raise ValueError('datatype or name should be defined')
176 raise ValueError('datatype or name should be defined')
177
177
178 if name == None:
178 if name == None:
179 if 'Proc' in datatype:
179 if 'Proc' in datatype:
180 name = datatype
180 name = datatype
181 else:
181 else:
182 name = '%sProc' % (datatype)
182 name = '%sProc' % (datatype)
183
183
184 if datatype == None:
184 if datatype == None:
185 datatype = name.replace('Proc', '')
185 datatype = name.replace('Proc', '')
186
186
187 self.id = str(id)
187 self.id = str(id)
188 self.project_id = project_id
188 self.project_id = project_id
189 self.name = name
189 self.name = name
190 self.datatype = datatype
190 self.datatype = datatype
191 self.inputId = inputId
191 self.inputId = inputId
192 self.err_queue = err_queue
192 self.err_queue = err_queue
193 self.operations = []
193 self.operations = []
194 self.parameters = {}
194 self.parameters = {}
195
195
196 def removeOperation(self, id):
196 def removeOperation(self, id):
197
197
198 i = [1 if x.id==id else 0 for x in self.operations]
198 i = [1 if x.id==id else 0 for x in self.operations]
199 self.operations.pop(i.index(1))
199 self.operations.pop(i.index(1))
200
200
201 def getOperation(self, id):
201 def getOperation(self, id):
202
202
203 for conf in self.operations:
203 for conf in self.operations:
204 if conf.id == id:
204 if conf.id == id:
205 return conf
205 return conf
206
206
207 def addOperation(self, name, optype='self'):
207 def addOperation(self, name, optype='self'):
208 '''
208 '''
209 '''
209 '''
210
210
211 id = self.getNewId()
211 id = self.getNewId()
212 conf = OperationConf()
212 conf = OperationConf()
213 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
213 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
214 self.operations.append(conf)
214 self.operations.append(conf)
215
215
216 return conf
216 return conf
217
217
218 def readXml(self, element, project_id, err_queue):
218 def readXml(self, element, project_id, err_queue):
219
219
220 self.id = element.get('id')
220 self.id = element.get('id')
221 self.name = element.get('name')
221 self.name = element.get('name')
222 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
222 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
223 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
223 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
224 self.project_id = str(project_id)
224 self.project_id = str(project_id)
225 self.err_queue = err_queue
225 self.err_queue = err_queue
226 self.operations = []
226 self.operations = []
227 self.parameters = {}
227 self.parameters = {}
228
228
229 for elm in element:
229 for elm in element:
230 if elm.tag == 'Parameter':
230 if elm.tag == 'Parameter':
231 self.addParameter(elm.get('name'), elm.get('value'))
231 self.addParameter(elm.get('name'), elm.get('value'))
232 elif elm.tag == 'Operation':
232 elif elm.tag == 'Operation':
233 conf = OperationConf()
233 conf = OperationConf()
234 conf.readXml(elm, project_id, err_queue)
234 conf.readXml(elm, project_id, err_queue)
235 self.operations.append(conf)
235 self.operations.append(conf)
236
236
237 def createObjects(self):
237 def createObjects(self):
238 '''
238 '''
239 Instancia de unidades de procesamiento.
239 Instancia de unidades de procesamiento.
240 '''
240 '''
241
241
242 className = eval(self.name)
242 className = eval(self.name)
243 kwargs = self.getKwargs()
243 kwargs = self.getKwargs()
244 procUnitObj = className()
244 procUnitObj = className()
245 procUnitObj.name = self.name
245 procUnitObj.name = self.name
246 log.success('creating process...', self.name)
246 log.success('creating process...', self.name)
247
247
248 for conf in self.operations:
248 for conf in self.operations:
249
249
250 opObj = conf.createObject()
250 opObj = conf.createObject()
251
251
252 log.success('adding operation: {}, type:{}'.format(
252 log.success('adding operation: {}, type:{}'.format(
253 conf.name,
253 conf.name,
254 conf.type), self.name)
254 conf.type), self.name)
255
255
256 procUnitObj.addOperation(conf, opObj)
256 procUnitObj.addOperation(conf, opObj)
257
257
258 self.object = procUnitObj
258 self.object = procUnitObj
259
259
260 def run(self):
260 def run(self):
261 '''
261 '''
262 '''
262 '''
263
263
264 return self.object.call(**self.getKwargs())
264 return self.object.call(**self.getKwargs())
265
265
266
266
267 class ReadUnitConf(ProcUnitConf):
267 class ReadUnitConf(ProcUnitConf):
268
268
269 ELEMENTNAME = 'ReadUnit'
269 ELEMENTNAME = 'ReadUnit'
270
270
271 def __init__(self):
271 def __init__(self):
272
272
273 self.id = None
273 self.id = None
274 self.datatype = None
274 self.datatype = None
275 self.name = None
275 self.name = None
276 self.inputId = None
276 self.inputId = None
277 self.operations = []
277 self.operations = []
278 self.parameters = {}
278 self.parameters = {}
279
279
280 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
280 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
281 startTime='', endTime='', server=None, **kwargs):
281 startTime='', endTime='', server=None, **kwargs):
282
282
283 if datatype == None and name == None:
283 if datatype == None and name == None:
284 raise ValueError('datatype or name should be defined')
284 raise ValueError('datatype or name should be defined')
285 if name == None:
285 if name == None:
286 if 'Reader' in datatype:
286 if 'Reader' in datatype:
287 name = datatype
287 name = datatype
288 datatype = name.replace('Reader','')
288 datatype = name.replace('Reader','')
289 else:
289 else:
290 name = '{}Reader'.format(datatype)
290 name = '{}Reader'.format(datatype)
291 if datatype == None:
291 if datatype == None:
292 if 'Reader' in name:
292 if 'Reader' in name:
293 datatype = name.replace('Reader','')
293 datatype = name.replace('Reader','')
294 else:
294 else:
295 datatype = name
295 datatype = name
296 name = '{}Reader'.format(name)
296 name = '{}Reader'.format(name)
297
297
298 self.id = id
298 self.id = id
299 self.project_id = project_id
299 self.project_id = project_id
300 self.name = name
300 self.name = name
301 self.datatype = datatype
301 self.datatype = datatype
302 self.err_queue = err_queue
302 self.err_queue = err_queue
303
303
304 self.addParameter(name='path', value=path)
304 self.addParameter(name='path', value=path)
305 self.addParameter(name='startDate', value=startDate)
305 self.addParameter(name='startDate', value=startDate)
306 self.addParameter(name='endDate', value=endDate)
306 self.addParameter(name='endDate', value=endDate)
307 self.addParameter(name='startTime', value=startTime)
307 self.addParameter(name='startTime', value=startTime)
308 self.addParameter(name='endTime', value=endTime)
308 self.addParameter(name='endTime', value=endTime)
309
309
310 for key, value in kwargs.items():
310 for key, value in kwargs.items():
311 self.addParameter(name=key, value=value)
311 self.addParameter(name=key, value=value)
312
312
313
313
314 class Project(Process):
314 class Project(Process):
315
315
316 ELEMENTNAME = 'Project'
316 ELEMENTNAME = 'Project'
317
317
318 def __init__(self):
318 def __init__(self):
319
319
320 Process.__init__(self)
320 Process.__init__(self)
321 self.id = None
321 self.id = None
322 self.filename = None
322 self.filename = None
323 self.description = None
323 self.description = None
324 self.email = None
324 self.email = None
325 self.alarm = []
325 self.alarm = []
326 self.configurations = {}
326 self.configurations = {}
327 # self.err_queue = Queue()
327 # self.err_queue = Queue()
328 self.err_queue = None
328 self.err_queue = None
329 self.started = False
329 self.started = False
330
330
331 def getNewId(self):
331 def getNewId(self):
332
332
333 idList = list(self.configurations.keys())
333 idList = list(self.configurations.keys())
334 id = int(self.id) * 10
334 id = int(self.id) * 10
335
335
336 while True:
336 while True:
337 id += 1
337 id += 1
338
338
339 if str(id) in idList:
339 if str(id) in idList:
340 continue
340 continue
341
341
342 break
342 break
343
343
344 return str(id)
344 return str(id)
345
345
346 def updateId(self, new_id):
346 def updateId(self, new_id):
347
347
348 self.id = str(new_id)
348 self.id = str(new_id)
349
349
350 keyList = list(self.configurations.keys())
350 keyList = list(self.configurations.keys())
351 keyList.sort()
351 keyList.sort()
352
352
353 n = 1
353 n = 1
354 new_confs = {}
354 new_confs = {}
355
355
356 for procKey in keyList:
356 for procKey in keyList:
357
357
358 conf = self.configurations[procKey]
358 conf = self.configurations[procKey]
359 idProcUnit = str(int(self.id) * 10 + n)
359 idProcUnit = str(int(self.id) * 10 + n)
360 conf.updateId(idProcUnit)
360 conf.updateId(idProcUnit)
361 new_confs[idProcUnit] = conf
361 new_confs[idProcUnit] = conf
362 n += 1
362 n += 1
363
363
364 self.configurations = new_confs
364 self.configurations = new_confs
365
365
366 def setup(self, id=1, name='', description='', email=None, alarm=[]):
366 def setup(self, id=1, name='', description='', email=None, alarm=[]):
367
367
368 self.id = str(id)
368 self.id = str(id)
369 self.description = description
369 self.description = description
370 self.email = email
370 self.email = email
371 self.alarm = alarm
371 self.alarm = alarm
372 if name:
372 if name:
373 self.name = '{} ({})'.format(Process.__name__, name)
373 self.name = '{} ({})'.format(Process.__name__, name)
374
374
375 def update(self, **kwargs):
375 def update(self, **kwargs):
376
376
377 for key, value in kwargs.items():
377 for key, value in kwargs.items():
378 setattr(self, key, value)
378 setattr(self, key, value)
379
379
380 def clone(self):
380 def clone(self):
381
381
382 p = Project()
382 p = Project()
383 p.id = self.id
383 p.id = self.id
384 p.name = self.name
384 p.name = self.name
385 p.description = self.description
385 p.description = self.description
386 p.configurations = self.configurations.copy()
386 p.configurations = self.configurations.copy()
387
387
388 return p
388 return p
389
389
390 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
390 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
391
391
392 '''
392 '''
393 '''
393 '''
394
394
395 if id is None:
395 if id is None:
396 idReadUnit = self.getNewId()
396 idReadUnit = self.getNewId()
397 else:
397 else:
398 idReadUnit = str(id)
398 idReadUnit = str(id)
399
399
400 conf = ReadUnitConf()
400 conf = ReadUnitConf()
401 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
401 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
402 self.configurations[conf.id] = conf
402 self.configurations[conf.id] = conf
403
403
404 return conf
404 return conf
405
405
406 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
406 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
407
407
408 '''
408 '''
409 '''
409 '''
410
410
411 if id is None:
411 if id is None:
412 idProcUnit = self.getNewId()
412 idProcUnit = self.getNewId()
413 else:
413 else:
414 idProcUnit = id
414 idProcUnit = id
415
415
416 conf = ProcUnitConf()
416 conf = ProcUnitConf()
417 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
417 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
418 self.configurations[conf.id] = conf
418 self.configurations[conf.id] = conf
419
419
420 return conf
420 return conf
421
421
422 def removeProcUnit(self, id):
422 def removeProcUnit(self, id):
423
423
424 if id in self.configurations:
424 if id in self.configurations:
425 self.configurations.pop(id)
425 self.configurations.pop(id)
426
426
427 def getReadUnit(self):
427 def getReadUnit(self):
428
428
429 for obj in list(self.configurations.values()):
429 for obj in list(self.configurations.values()):
430 if obj.ELEMENTNAME == 'ReadUnit':
430 if obj.ELEMENTNAME == 'ReadUnit':
431 return obj
431 return obj
432
432
433 return None
433 return None
434
434
435 def getProcUnit(self, id):
435 def getProcUnit(self, id):
436
436
437 return self.configurations[id]
437 return self.configurations[id]
438
438
439 def getUnits(self):
439 def getUnits(self):
440
440
441 keys = list(self.configurations)
441 keys = list(self.configurations)
442 keys.sort()
442 keys.sort()
443
443
444 for key in keys:
444 for key in keys:
445 yield self.configurations[key]
445 yield self.configurations[key]
446
446
447 def updateUnit(self, id, **kwargs):
447 def updateUnit(self, id, **kwargs):
448
448
449 conf = self.configurations[id].update(**kwargs)
449 conf = self.configurations[id].update(**kwargs)
450
450
451 def makeXml(self):
451 def makeXml(self):
452
452
453 xml = Element('Project')
453 xml = Element('Project')
454 xml.set('id', str(self.id))
454 xml.set('id', str(self.id))
455 xml.set('name', self.name)
455 xml.set('name', self.name)
456 xml.set('description', self.description)
456 xml.set('description', self.description)
457
457
458 for conf in self.configurations.values():
458 for conf in self.configurations.values():
459 conf.makeXml(xml)
459 conf.makeXml(xml)
460
460
461 self.xml = xml
461 self.xml = xml
462
462
463 def writeXml(self, filename=None):
463 def writeXml(self, filename=None):
464
464
465 if filename == None:
465 if filename == None:
466 if self.filename:
466 if self.filename:
467 filename = self.filename
467 filename = self.filename
468 else:
468 else:
469 filename = 'schain.xml'
469 filename = 'schain.xml'
470
470
471 if not filename:
471 if not filename:
472 print('filename has not been defined. Use setFilename(filename) for do it.')
472 print('filename has not been defined. Use setFilename(filename) for do it.')
473 return 0
473 return 0
474
474
475 abs_file = os.path.abspath(filename)
475 abs_file = os.path.abspath(filename)
476
476
477 if not os.access(os.path.dirname(abs_file), os.W_OK):
477 if not os.access(os.path.dirname(abs_file), os.W_OK):
478 print('No write permission on %s' % os.path.dirname(abs_file))
478 print('No write permission on %s' % os.path.dirname(abs_file))
479 return 0
479 return 0
480
480
481 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
481 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
482 print('File %s already exists and it could not be overwriten' % abs_file)
482 print('File %s already exists and it could not be overwriten' % abs_file)
483 return 0
483 return 0
484
484
485 self.makeXml()
485 self.makeXml()
486
486
487 ElementTree(self.xml).write(abs_file, method='xml')
487 ElementTree(self.xml).write(abs_file, method='xml')
488
488
489 self.filename = abs_file
489 self.filename = abs_file
490
490
491 return 1
491 return 1
492
492
493 def readXml(self, filename):
493 def readXml(self, filename):
494
494
495 abs_file = os.path.abspath(filename)
495 abs_file = os.path.abspath(filename)
496
496
497 self.configurations = {}
497 self.configurations = {}
498
498
499 try:
499 try:
500 self.xml = ElementTree().parse(abs_file)
500 self.xml = ElementTree().parse(abs_file)
501 except:
501 except:
502 log.error('Error reading %s, verify file format' % filename)
502 log.error('Error reading %s, verify file format' % filename)
503 return 0
503 return 0
504
504
505 self.id = self.xml.get('id')
505 self.id = self.xml.get('id')
506 self.name = self.xml.get('name')
506 self.name = self.xml.get('name')
507 self.description = self.xml.get('description')
507 self.description = self.xml.get('description')
508
508
509 for element in self.xml:
509 for element in self.xml:
510 if element.tag == 'ReadUnit':
510 if element.tag == 'ReadUnit':
511 conf = ReadUnitConf()
511 conf = ReadUnitConf()
512 conf.readXml(element, self.id, self.err_queue)
512 conf.readXml(element, self.id, self.err_queue)
513 self.configurations[conf.id] = conf
513 self.configurations[conf.id] = conf
514 elif element.tag == 'ProcUnit':
514 elif element.tag == 'ProcUnit':
515 conf = ProcUnitConf()
515 conf = ProcUnitConf()
516 input_proc = self.configurations[element.get('inputId')]
516 input_proc = self.configurations[element.get('inputId')]
517 conf.readXml(element, self.id, self.err_queue)
517 conf.readXml(element, self.id, self.err_queue)
518 self.configurations[conf.id] = conf
518 self.configurations[conf.id] = conf
519
519
520 self.filename = abs_file
520 self.filename = abs_file
521
521
522 return 1
522 return 1
523
523
524 def __str__(self):
524 def __str__(self):
525
525
526 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
526 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
527 self.id,
527 self.id,
528 self.name,
528 self.name,
529 self.description,
529 self.description,
530 )
530 )
531
531
532 for conf in self.configurations.values():
532 for conf in self.configurations.values():
533 text += '{}'.format(conf)
533 text += '{}'.format(conf)
534
534
535 return text
535 return text
536
536
537 def createObjects(self):
537 def createObjects(self):
538
538
539 keys = list(self.configurations.keys())
539 keys = list(self.configurations.keys())
540 keys.sort()
540 keys.sort()
541 for key in keys:
541 for key in keys:
542 conf = self.configurations[key]
542 conf = self.configurations[key]
543 conf.createObjects()
543 conf.createObjects()
544 if conf.inputId is not None:
544 if conf.inputId is not None:
545 conf.object.setInput(self.configurations[conf.inputId].object)
545 conf.object.setInput(self.configurations[conf.inputId].object)
546
546
547 def monitor(self):
547 def monitor(self):
548
548
549 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
549 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
550 t.start()
550 t.start()
551
551
552 def _monitor(self, queue, ctx):
552 def _monitor(self, queue, ctx):
553
553
554 import socket
554 import socket
555
555
556 procs = 0
556 procs = 0
557 err_msg = ''
557 err_msg = ''
558
558
559 while True:
559 while True:
560 msg = queue.get()
560 msg = queue.get()
561 if '#_start_#' in msg:
561 if '#_start_#' in msg:
562 procs += 1
562 procs += 1
563 elif '#_end_#' in msg:
563 elif '#_end_#' in msg:
564 procs -=1
564 procs -=1
565 else:
565 else:
566 err_msg = msg
566 err_msg = msg
567
567
568 if procs == 0 or 'Traceback' in err_msg:
568 if procs == 0 or 'Traceback' in err_msg:
569 break
569 break
570 time.sleep(0.1)
570 time.sleep(0.1)
571
571
572 if '|' in err_msg:
572 if '|' in err_msg:
573 name, err = err_msg.split('|')
573 name, err = err_msg.split('|')
574 if 'SchainWarning' in err:
574 if 'SchainWarning' in err:
575 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
575 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
576 elif 'SchainError' in err:
576 elif 'SchainError' in err:
577 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
577 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
578 else:
578 else:
579 log.error(err, name)
579 log.error(err, name)
580 else:
580 else:
581 name, err = self.name, err_msg
581 name, err = self.name, err_msg
582
582
583 time.sleep(1)
583 time.sleep(1)
584
584
585 ctx.term()
585 ctx.term()
586
586
587 message = ''.join(err)
587 message = ''.join(err)
588
588
589 if err_msg:
589 if err_msg:
590 subject = 'SChain v%s: Error running %s\n' % (
590 subject = 'SChain v%s: Error running %s\n' % (
591 schainpy.__version__, self.name)
591 schainpy.__version__, self.name)
592
592
593 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
593 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
594 socket.gethostname())
594 socket.gethostname())
595 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
595 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
596 subtitle += 'Configuration file: %s\n' % self.filename
596 subtitle += 'Configuration file: %s\n' % self.filename
597 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
597 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
598
598
599 readUnitConfObj = self.getReadUnit()
599 readUnitConfObj = self.getReadUnit()
600 if readUnitConfObj:
600 if readUnitConfObj:
601 subtitle += '\nInput parameters:\n'
601 subtitle += '\nInput parameters:\n'
602 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
602 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
603 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
603 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
604 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
604 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
605 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
605 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
606 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
606 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
607
607
608 a = Alarm(
608 a = Alarm(
609 modes=self.alarm,
609 modes=self.alarm,
610 email=self.email,
610 email=self.email,
611 message=message,
611 message=message,
612 subject=subject,
612 subject=subject,
613 subtitle=subtitle,
613 subtitle=subtitle,
614 filename=self.filename
614 filename=self.filename
615 )
615 )
616
616
617 a.start()
617 a.start()
618
618
619 def setFilename(self, filename):
619 def setFilename(self, filename):
620
620
621 self.filename = filename
621 self.filename = filename
622
622
623 def runProcs(self):
623 def runProcs(self):
624
624
625 err = False
625 err = False
626 n = len(self.configurations)
626 n = len(self.configurations)
627
627
628 while not err:
628 while not err:
629 for conf in self.getUnits():
629 for conf in self.getUnits():
630 ok = conf.run()
630 ok = conf.run()
631 if ok is 'Error':
631 if ok is 'Error':
632 n -= 1
632 n -= 1
633 continue
633 continue
634 elif not ok:
634 elif not ok:
635 break
635 break
636 if n == 0:
636 if n == 0:
637 err = True
637 err = True
638
638
639 def run(self):
639 def run(self):
640
640
641 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
641 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
642 self.started = True
642 self.started = True
643 self.start_time = time.time()
643 self.start_time = time.time()
644 self.createObjects()
644 self.createObjects()
645 self.runProcs()
645 self.runProcs()
646 log.success('{} Done (Time: {:4.2f}s)'.format(
646 log.success('{} Done (Time: {:4.2f}s)'.format(
647 self.name,
647 self.name,
648 time.time()-self.start_time), '')
648 time.time()-self.start_time), '')
General Comments 0
You need to be logged in to leave comments. Login now