##// END OF EJS Templates
Add handler to terminate child process
Juan C. Espinoza -
r1533:37c6d1c7452d
parent child
Show More
@@ -1,665 +1,676
1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
2 # All rights reserved.
2 # All rights reserved.
3 #
3 #
4 # Distributed under the terms of the BSD 3-clause license.
4 # Distributed under the terms of the BSD 3-clause license.
5 """API to create signal chain projects
5 """API to create signal chain projects
6
6
7 The API is provide through class: Project
7 The API is provide through class: Project
8 """
8 """
9
9
10 import re
10 import re
11 import sys
11 import sys
12 import ast
12 import ast
13 import datetime
13 import datetime
14 import traceback
14 import traceback
15 import time
15 import time
16 import multiprocessing
16 import multiprocessing
17 from multiprocessing import Process, Queue
17 import signal as sig
18 from multiprocessing import Process, Queue, active_children
18 from threading import Thread
19 from threading import Thread
19 from xml.etree.ElementTree import ElementTree, Element, SubElement
20 from xml.etree.ElementTree import ElementTree, Element, SubElement
20
21
21 from schainpy.admin import Alarm, SchainWarning
22 from schainpy.admin import Alarm, SchainWarning
22 from schainpy.model import *
23 from schainpy.model import *
23 from schainpy.utils import log
24 from schainpy.utils import log
24
25
25 if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7:
26 if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7:
26 multiprocessing.set_start_method('fork')
27 multiprocessing.set_start_method('fork')
27
28
29 def handler(sig, frame):
30 # get all active child processes
31 active = active_children()
32 # terminate all active children
33 for child in active:
34 child.terminate()
35 # terminate the process
36 sys.exit(0)
37
28 class ConfBase():
38 class ConfBase():
29
39
30 def __init__(self):
40 def __init__(self):
31
41
32 self.id = '0'
42 self.id = '0'
33 self.name = None
43 self.name = None
34 self.priority = None
44 self.priority = None
35 self.parameters = {}
45 self.parameters = {}
36 self.object = None
46 self.object = None
37 self.operations = []
47 self.operations = []
38
48
39 def getId(self):
49 def getId(self):
40
50
41 return self.id
51 return self.id
42
52
43 def getNewId(self):
53 def getNewId(self):
44
54
45 return int(self.id) * 10 + len(self.operations) + 1
55 return int(self.id) * 10 + len(self.operations) + 1
46
56
47 def updateId(self, new_id):
57 def updateId(self, new_id):
48
58
49 self.id = str(new_id)
59 self.id = str(new_id)
50
60
51 n = 1
61 n = 1
52 for conf in self.operations:
62 for conf in self.operations:
53 conf_id = str(int(new_id) * 10 + n)
63 conf_id = str(int(new_id) * 10 + n)
54 conf.updateId(conf_id)
64 conf.updateId(conf_id)
55 n += 1
65 n += 1
56
66
57 def getKwargs(self):
67 def getKwargs(self):
58
68
59 params = {}
69 params = {}
60
70
61 for key, value in self.parameters.items():
71 for key, value in self.parameters.items():
62 if value not in (None, '', ' '):
72 if value not in (None, '', ' '):
63 params[key] = value
73 params[key] = value
64
74
65 return params
75 return params
66
76
67 def update(self, **kwargs):
77 def update(self, **kwargs):
68
78
69 if 'format' not in kwargs:
79 if 'format' not in kwargs:
70 kwargs['format'] = None
80 kwargs['format'] = None
71 for key, value, fmt in kwargs.items():
81 for key, value, fmt in kwargs.items():
72 self.addParameter(name=key, value=value, format=fmt)
82 self.addParameter(name=key, value=value, format=fmt)
73
83
74 def addParameter(self, name, value, format=None):
84 def addParameter(self, name, value, format=None):
75 '''
85 '''
76 '''
86 '''
77 if format is not None:
87 if format is not None:
78 self.parameters[name] = eval(format)(value)
88 self.parameters[name] = eval(format)(value)
79 elif isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
89 elif isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
80 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
90 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
81 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
91 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
82 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
92 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
83 else:
93 else:
84 try:
94 try:
85 self.parameters[name] = ast.literal_eval(value)
95 self.parameters[name] = ast.literal_eval(value)
86 except:
96 except:
87 if isinstance(value, str) and ',' in value:
97 if isinstance(value, str) and ',' in value:
88 self.parameters[name] = value.split(',')
98 self.parameters[name] = value.split(',')
89 else:
99 else:
90 self.parameters[name] = value
100 self.parameters[name] = value
91
101
92 def getParameters(self):
102 def getParameters(self):
93
103
94 params = {}
104 params = {}
95 for key, value in self.parameters.items():
105 for key, value in self.parameters.items():
96 s = type(value).__name__
106 s = type(value).__name__
97 if s == 'date':
107 if s == 'date':
98 params[key] = value.strftime('%Y/%m/%d')
108 params[key] = value.strftime('%Y/%m/%d')
99 elif s == 'time':
109 elif s == 'time':
100 params[key] = value.strftime('%H:%M:%S')
110 params[key] = value.strftime('%H:%M:%S')
101 else:
111 else:
102 params[key] = str(value)
112 params[key] = str(value)
103
113
104 return params
114 return params
105
115
106 def makeXml(self, element):
116 def makeXml(self, element):
107
117
108 xml = SubElement(element, self.ELEMENTNAME)
118 xml = SubElement(element, self.ELEMENTNAME)
109 for label in self.xml_labels:
119 for label in self.xml_labels:
110 xml.set(label, str(getattr(self, label)))
120 xml.set(label, str(getattr(self, label)))
111
121
112 for key, value in self.getParameters().items():
122 for key, value in self.getParameters().items():
113 xml_param = SubElement(xml, 'Parameter')
123 xml_param = SubElement(xml, 'Parameter')
114 xml_param.set('name', key)
124 xml_param.set('name', key)
115 xml_param.set('value', value)
125 xml_param.set('value', value)
116
126
117 for conf in self.operations:
127 for conf in self.operations:
118 conf.makeXml(xml)
128 conf.makeXml(xml)
119
129
120 def __str__(self):
130 def __str__(self):
121
131
122 if self.ELEMENTNAME == 'Operation':
132 if self.ELEMENTNAME == 'Operation':
123 s = ' {}[id={}]\n'.format(self.name, self.id)
133 s = ' {}[id={}]\n'.format(self.name, self.id)
124 else:
134 else:
125 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
135 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
126
136
127 for key, value in self.parameters.items():
137 for key, value in self.parameters.items():
128 if self.ELEMENTNAME == 'Operation':
138 if self.ELEMENTNAME == 'Operation':
129 s += ' {}: {}\n'.format(key, value)
139 s += ' {}: {}\n'.format(key, value)
130 else:
140 else:
131 s += ' {}: {}\n'.format(key, value)
141 s += ' {}: {}\n'.format(key, value)
132
142
133 for conf in self.operations:
143 for conf in self.operations:
134 s += str(conf)
144 s += str(conf)
135
145
136 return s
146 return s
137
147
138 class OperationConf(ConfBase):
148 class OperationConf(ConfBase):
139
149
140 ELEMENTNAME = 'Operation'
150 ELEMENTNAME = 'Operation'
141 xml_labels = ['id', 'name']
151 xml_labels = ['id', 'name']
142
152
143 def setup(self, id, name, priority, project_id, err_queue):
153 def setup(self, id, name, priority, project_id, err_queue):
144
154
145 self.id = str(id)
155 self.id = str(id)
146 self.project_id = project_id
156 self.project_id = project_id
147 self.name = name
157 self.name = name
148 self.type = 'other'
158 self.type = 'other'
149 self.err_queue = err_queue
159 self.err_queue = err_queue
150
160
151 def readXml(self, element, project_id, err_queue):
161 def readXml(self, element, project_id, err_queue):
152
162
153 self.id = element.get('id')
163 self.id = element.get('id')
154 self.name = element.get('name')
164 self.name = element.get('name')
155 self.type = 'other'
165 self.type = 'other'
156 self.project_id = str(project_id)
166 self.project_id = str(project_id)
157 self.err_queue = err_queue
167 self.err_queue = err_queue
158
168
159 for elm in element.iter('Parameter'):
169 for elm in element.iter('Parameter'):
160 self.addParameter(elm.get('name'), elm.get('value'))
170 self.addParameter(elm.get('name'), elm.get('value'))
161
171
162 def createObject(self):
172 def createObject(self):
163
173
164 className = eval(self.name)
174 className = eval(self.name)
165
175
166 if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name:
176 if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name:
167 kwargs = self.getKwargs()
177 kwargs = self.getKwargs()
168 opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
178 opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
169 opObj.start()
179 opObj.start()
170 self.type = 'external'
180 self.type = 'external'
171 else:
181 else:
172 opObj = className()
182 opObj = className()
173
183
174 self.object = opObj
184 self.object = opObj
175 return opObj
185 return opObj
176
186
177 class ProcUnitConf(ConfBase):
187 class ProcUnitConf(ConfBase):
178
188
179 ELEMENTNAME = 'ProcUnit'
189 ELEMENTNAME = 'ProcUnit'
180 xml_labels = ['id', 'inputId', 'name']
190 xml_labels = ['id', 'inputId', 'name']
181
191
182 def setup(self, project_id, id, name, datatype, inputId, err_queue):
192 def setup(self, project_id, id, name, datatype, inputId, err_queue):
183 '''
193 '''
184 '''
194 '''
185
195
186 if datatype == None and name == None:
196 if datatype == None and name == None:
187 raise ValueError('datatype or name should be defined')
197 raise ValueError('datatype or name should be defined')
188
198
189 if name == None:
199 if name == None:
190 if 'Proc' in datatype:
200 if 'Proc' in datatype:
191 name = datatype
201 name = datatype
192 else:
202 else:
193 name = '%sProc' % (datatype)
203 name = '%sProc' % (datatype)
194
204
195 if datatype == None:
205 if datatype == None:
196 datatype = name.replace('Proc', '')
206 datatype = name.replace('Proc', '')
197
207
198 self.id = str(id)
208 self.id = str(id)
199 self.project_id = project_id
209 self.project_id = project_id
200 self.name = name
210 self.name = name
201 self.datatype = datatype
211 self.datatype = datatype
202 self.inputId = inputId
212 self.inputId = inputId
203 self.err_queue = err_queue
213 self.err_queue = err_queue
204 self.operations = []
214 self.operations = []
205 self.parameters = {}
215 self.parameters = {}
206
216
207 def removeOperation(self, id):
217 def removeOperation(self, id):
208
218
209 i = [1 if x.id==id else 0 for x in self.operations]
219 i = [1 if x.id==id else 0 for x in self.operations]
210 self.operations.pop(i.index(1))
220 self.operations.pop(i.index(1))
211
221
212 def getOperation(self, id):
222 def getOperation(self, id):
213
223
214 for conf in self.operations:
224 for conf in self.operations:
215 if conf.id == id:
225 if conf.id == id:
216 return conf
226 return conf
217
227
218 def addOperation(self, name, optype='self'):
228 def addOperation(self, name, optype='self'):
219 '''
229 '''
220 '''
230 '''
221
231
222 id = self.getNewId()
232 id = self.getNewId()
223 conf = OperationConf()
233 conf = OperationConf()
224 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
234 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
225 self.operations.append(conf)
235 self.operations.append(conf)
226
236
227 return conf
237 return conf
228
238
229 def readXml(self, element, project_id, err_queue):
239 def readXml(self, element, project_id, err_queue):
230
240
231 self.id = element.get('id')
241 self.id = element.get('id')
232 self.name = element.get('name')
242 self.name = element.get('name')
233 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
243 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
234 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
244 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
235 self.project_id = str(project_id)
245 self.project_id = str(project_id)
236 self.err_queue = err_queue
246 self.err_queue = err_queue
237 self.operations = []
247 self.operations = []
238 self.parameters = {}
248 self.parameters = {}
239
249
240 for elm in element:
250 for elm in element:
241 if elm.tag == 'Parameter':
251 if elm.tag == 'Parameter':
242 self.addParameter(elm.get('name'), elm.get('value'))
252 self.addParameter(elm.get('name'), elm.get('value'))
243 elif elm.tag == 'Operation':
253 elif elm.tag == 'Operation':
244 conf = OperationConf()
254 conf = OperationConf()
245 conf.readXml(elm, project_id, err_queue)
255 conf.readXml(elm, project_id, err_queue)
246 self.operations.append(conf)
256 self.operations.append(conf)
247
257
248 def createObjects(self):
258 def createObjects(self):
249 '''
259 '''
250 Instancia de unidades de procesamiento.
260 Instancia de unidades de procesamiento.
251 '''
261 '''
252
262
253 className = eval(self.name)
263 className = eval(self.name)
254 kwargs = self.getKwargs()
264 kwargs = self.getKwargs()
255 procUnitObj = className()
265 procUnitObj = className()
256 procUnitObj.name = self.name
266 procUnitObj.name = self.name
257 log.success('creating process...', self.name)
267 log.success('creating process...', self.name)
258
268
259 for conf in self.operations:
269 for conf in self.operations:
260
270
261 opObj = conf.createObject()
271 opObj = conf.createObject()
262
272
263 log.success('adding operation: {}, type:{}'.format(
273 log.success('adding operation: {}, type:{}'.format(
264 conf.name,
274 conf.name,
265 conf.type), self.name)
275 conf.type), self.name)
266
276
267 procUnitObj.addOperation(conf, opObj)
277 procUnitObj.addOperation(conf, opObj)
268
278
269 self.object = procUnitObj
279 self.object = procUnitObj
270
280
271 def run(self):
281 def run(self):
272 '''
282 '''
273 '''
283 '''
274
284
275 return self.object.call(**self.getKwargs())
285 return self.object.call(**self.getKwargs())
276
286
277
287
278 class ReadUnitConf(ProcUnitConf):
288 class ReadUnitConf(ProcUnitConf):
279
289
280 ELEMENTNAME = 'ReadUnit'
290 ELEMENTNAME = 'ReadUnit'
281
291
282 def __init__(self):
292 def __init__(self):
283
293
284 self.id = None
294 self.id = None
285 self.datatype = None
295 self.datatype = None
286 self.name = None
296 self.name = None
287 self.inputId = None
297 self.inputId = None
288 self.operations = []
298 self.operations = []
289 self.parameters = {}
299 self.parameters = {}
290
300
291 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
301 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
292 startTime='', endTime='', server=None, **kwargs):
302 startTime='', endTime='', server=None, **kwargs):
293
303
294 if datatype == None and name == None:
304 if datatype == None and name == None:
295 raise ValueError('datatype or name should be defined')
305 raise ValueError('datatype or name should be defined')
296 if name == None:
306 if name == None:
297 if 'Reader' in datatype:
307 if 'Reader' in datatype:
298 name = datatype
308 name = datatype
299 datatype = name.replace('Reader','')
309 datatype = name.replace('Reader','')
300 else:
310 else:
301 name = '{}Reader'.format(datatype)
311 name = '{}Reader'.format(datatype)
302 if datatype == None:
312 if datatype == None:
303 if 'Reader' in name:
313 if 'Reader' in name:
304 datatype = name.replace('Reader','')
314 datatype = name.replace('Reader','')
305 else:
315 else:
306 datatype = name
316 datatype = name
307 name = '{}Reader'.format(name)
317 name = '{}Reader'.format(name)
308
318
309 self.id = id
319 self.id = id
310 self.project_id = project_id
320 self.project_id = project_id
311 self.name = name
321 self.name = name
312 self.datatype = datatype
322 self.datatype = datatype
313 self.err_queue = err_queue
323 self.err_queue = err_queue
314
324
315 self.addParameter(name='path', value=path, format='str')
325 self.addParameter(name='path', value=path, format='str')
316 self.addParameter(name='startDate', value=startDate)
326 self.addParameter(name='startDate', value=startDate)
317 self.addParameter(name='endDate', value=endDate)
327 self.addParameter(name='endDate', value=endDate)
318 self.addParameter(name='startTime', value=startTime)
328 self.addParameter(name='startTime', value=startTime)
319 self.addParameter(name='endTime', value=endTime)
329 self.addParameter(name='endTime', value=endTime)
320
330
321 for key, value in kwargs.items():
331 for key, value in kwargs.items():
322 self.addParameter(name=key, value=value)
332 self.addParameter(name=key, value=value)
323
333
324
334
325 class Project(Process):
335 class Project(Process):
326 """API to create signal chain projects"""
336 """API to create signal chain projects"""
327
337
328 ELEMENTNAME = 'Project'
338 ELEMENTNAME = 'Project'
329
339
330 def __init__(self, name=''):
340 def __init__(self, name=''):
331
341
332 Process.__init__(self)
342 Process.__init__(self)
333 self.id = '1'
343 self.id = '1'
334 if name:
344 if name:
335 self.name = '{} ({})'.format(Process.__name__, name)
345 self.name = '{} ({})'.format(Process.__name__, name)
336 self.filename = None
346 self.filename = None
337 self.description = None
347 self.description = None
338 self.email = None
348 self.email = None
339 self.alarm = []
349 self.alarm = []
340 self.configurations = {}
350 self.configurations = {}
341 # self.err_queue = Queue()
351 # self.err_queue = Queue()
342 self.err_queue = None
352 self.err_queue = None
343 self.started = False
353 self.started = False
344
354
345 def getNewId(self):
355 def getNewId(self):
346
356
347 idList = list(self.configurations.keys())
357 idList = list(self.configurations.keys())
348 id = int(self.id) * 10
358 id = int(self.id) * 10
349
359
350 while True:
360 while True:
351 id += 1
361 id += 1
352
362
353 if str(id) in idList:
363 if str(id) in idList:
354 continue
364 continue
355
365
356 break
366 break
357
367
358 return str(id)
368 return str(id)
359
369
360 def updateId(self, new_id):
370 def updateId(self, new_id):
361
371
362 self.id = str(new_id)
372 self.id = str(new_id)
363
373
364 keyList = list(self.configurations.keys())
374 keyList = list(self.configurations.keys())
365 keyList.sort()
375 keyList.sort()
366
376
367 n = 1
377 n = 1
368 new_confs = {}
378 new_confs = {}
369
379
370 for procKey in keyList:
380 for procKey in keyList:
371
381
372 conf = self.configurations[procKey]
382 conf = self.configurations[procKey]
373 idProcUnit = str(int(self.id) * 10 + n)
383 idProcUnit = str(int(self.id) * 10 + n)
374 conf.updateId(idProcUnit)
384 conf.updateId(idProcUnit)
375 new_confs[idProcUnit] = conf
385 new_confs[idProcUnit] = conf
376 n += 1
386 n += 1
377
387
378 self.configurations = new_confs
388 self.configurations = new_confs
379
389
380 def setup(self, id=1, name='', description='', email=None, alarm=[]):
390 def setup(self, id=1, name='', description='', email=None, alarm=[]):
381
391
382 self.id = str(id)
392 self.id = str(id)
383 self.description = description
393 self.description = description
384 self.email = email
394 self.email = email
385 self.alarm = alarm
395 self.alarm = alarm
386 if name:
396 if name:
387 self.name = '{} ({})'.format(Process.__name__, name)
397 self.name = '{} ({})'.format(Process.__name__, name)
388
398
389 def update(self, **kwargs):
399 def update(self, **kwargs):
390
400
391 for key, value in kwargs.items():
401 for key, value in kwargs.items():
392 setattr(self, key, value)
402 setattr(self, key, value)
393
403
394 def clone(self):
404 def clone(self):
395
405
396 p = Project()
406 p = Project()
397 p.id = self.id
407 p.id = self.id
398 p.name = self.name
408 p.name = self.name
399 p.description = self.description
409 p.description = self.description
400 p.configurations = self.configurations.copy()
410 p.configurations = self.configurations.copy()
401
411
402 return p
412 return p
403
413
404 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
414 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
405
415
406 '''
416 '''
407 '''
417 '''
408
418
409 if id is None:
419 if id is None:
410 idReadUnit = self.getNewId()
420 idReadUnit = self.getNewId()
411 else:
421 else:
412 idReadUnit = str(id)
422 idReadUnit = str(id)
413
423
414 conf = ReadUnitConf()
424 conf = ReadUnitConf()
415 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
425 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
416 self.configurations[conf.id] = conf
426 self.configurations[conf.id] = conf
417
427
418 return conf
428 return conf
419
429
420 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
430 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
421
431
422 '''
432 '''
423 '''
433 '''
424
434
425 if id is None:
435 if id is None:
426 idProcUnit = self.getNewId()
436 idProcUnit = self.getNewId()
427 else:
437 else:
428 idProcUnit = id
438 idProcUnit = id
429
439
430 conf = ProcUnitConf()
440 conf = ProcUnitConf()
431 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
441 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
432 self.configurations[conf.id] = conf
442 self.configurations[conf.id] = conf
433
443
434 return conf
444 return conf
435
445
436 def removeProcUnit(self, id):
446 def removeProcUnit(self, id):
437
447
438 if id in self.configurations:
448 if id in self.configurations:
439 self.configurations.pop(id)
449 self.configurations.pop(id)
440
450
441 def getReadUnit(self):
451 def getReadUnit(self):
442
452
443 for obj in list(self.configurations.values()):
453 for obj in list(self.configurations.values()):
444 if obj.ELEMENTNAME == 'ReadUnit':
454 if obj.ELEMENTNAME == 'ReadUnit':
445 return obj
455 return obj
446
456
447 return None
457 return None
448
458
449 def getProcUnit(self, id):
459 def getProcUnit(self, id):
450
460
451 return self.configurations[id]
461 return self.configurations[id]
452
462
453 def getUnits(self):
463 def getUnits(self):
454
464
455 keys = list(self.configurations)
465 keys = list(self.configurations)
456 keys.sort()
466 keys.sort()
457
467
458 for key in keys:
468 for key in keys:
459 yield self.configurations[key]
469 yield self.configurations[key]
460
470
461 def updateUnit(self, id, **kwargs):
471 def updateUnit(self, id, **kwargs):
462
472
463 conf = self.configurations[id].update(**kwargs)
473 conf = self.configurations[id].update(**kwargs)
464
474
465 def makeXml(self):
475 def makeXml(self):
466
476
467 xml = Element('Project')
477 xml = Element('Project')
468 xml.set('id', str(self.id))
478 xml.set('id', str(self.id))
469 xml.set('name', self.name)
479 xml.set('name', self.name)
470 xml.set('description', self.description)
480 xml.set('description', self.description)
471
481
472 for conf in self.configurations.values():
482 for conf in self.configurations.values():
473 conf.makeXml(xml)
483 conf.makeXml(xml)
474
484
475 self.xml = xml
485 self.xml = xml
476
486
477 def writeXml(self, filename=None):
487 def writeXml(self, filename=None):
478
488
479 if filename == None:
489 if filename == None:
480 if self.filename:
490 if self.filename:
481 filename = self.filename
491 filename = self.filename
482 else:
492 else:
483 filename = 'schain.xml'
493 filename = 'schain.xml'
484
494
485 if not filename:
495 if not filename:
486 print('filename has not been defined. Use setFilename(filename) for do it.')
496 print('filename has not been defined. Use setFilename(filename) for do it.')
487 return 0
497 return 0
488
498
489 abs_file = os.path.abspath(filename)
499 abs_file = os.path.abspath(filename)
490
500
491 if not os.access(os.path.dirname(abs_file), os.W_OK):
501 if not os.access(os.path.dirname(abs_file), os.W_OK):
492 print('No write permission on %s' % os.path.dirname(abs_file))
502 print('No write permission on %s' % os.path.dirname(abs_file))
493 return 0
503 return 0
494
504
495 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
505 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
496 print('File %s already exists and it could not be overwriten' % abs_file)
506 print('File %s already exists and it could not be overwriten' % abs_file)
497 return 0
507 return 0
498
508
499 self.makeXml()
509 self.makeXml()
500
510
501 ElementTree(self.xml).write(abs_file, method='xml')
511 ElementTree(self.xml).write(abs_file, method='xml')
502
512
503 self.filename = abs_file
513 self.filename = abs_file
504
514
505 return 1
515 return 1
506
516
507 def readXml(self, filename):
517 def readXml(self, filename):
508
518
509 abs_file = os.path.abspath(filename)
519 abs_file = os.path.abspath(filename)
510
520
511 self.configurations = {}
521 self.configurations = {}
512
522
513 try:
523 try:
514 self.xml = ElementTree().parse(abs_file)
524 self.xml = ElementTree().parse(abs_file)
515 except:
525 except:
516 log.error('Error reading %s, verify file format' % filename)
526 log.error('Error reading %s, verify file format' % filename)
517 return 0
527 return 0
518
528
519 self.id = self.xml.get('id')
529 self.id = self.xml.get('id')
520 self.name = self.xml.get('name')
530 self.name = self.xml.get('name')
521 self.description = self.xml.get('description')
531 self.description = self.xml.get('description')
522
532
523 for element in self.xml:
533 for element in self.xml:
524 if element.tag == 'ReadUnit':
534 if element.tag == 'ReadUnit':
525 conf = ReadUnitConf()
535 conf = ReadUnitConf()
526 conf.readXml(element, self.id, self.err_queue)
536 conf.readXml(element, self.id, self.err_queue)
527 self.configurations[conf.id] = conf
537 self.configurations[conf.id] = conf
528 elif element.tag == 'ProcUnit':
538 elif element.tag == 'ProcUnit':
529 conf = ProcUnitConf()
539 conf = ProcUnitConf()
530 input_proc = self.configurations[element.get('inputId')]
540 input_proc = self.configurations[element.get('inputId')]
531 conf.readXml(element, self.id, self.err_queue)
541 conf.readXml(element, self.id, self.err_queue)
532 self.configurations[conf.id] = conf
542 self.configurations[conf.id] = conf
533
543
534 self.filename = abs_file
544 self.filename = abs_file
535
545
536 return 1
546 return 1
537
547
538 def __str__(self):
548 def __str__(self):
539
549
540 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
550 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
541 self.id,
551 self.id,
542 self.name,
552 self.name,
543 self.description,
553 self.description,
544 )
554 )
545
555
546 for conf in self.configurations.values():
556 for conf in self.configurations.values():
547 text += '{}'.format(conf)
557 text += '{}'.format(conf)
548
558
549 return text
559 return text
550
560
551 def createObjects(self):
561 def createObjects(self):
552
562
553 keys = list(self.configurations.keys())
563 keys = list(self.configurations.keys())
554 keys.sort()
564 keys.sort()
555 for key in keys:
565 for key in keys:
556 conf = self.configurations[key]
566 conf = self.configurations[key]
557 conf.createObjects()
567 conf.createObjects()
558 if conf.inputId is not None:
568 if conf.inputId is not None:
559 if isinstance(conf.inputId, list):
569 if isinstance(conf.inputId, list):
560 conf.object.setInput([self.configurations[x].object for x in conf.inputId])
570 conf.object.setInput([self.configurations[x].object for x in conf.inputId])
561 else:
571 else:
562 conf.object.setInput([self.configurations[conf.inputId].object])
572 conf.object.setInput([self.configurations[conf.inputId].object])
563
573
564 def monitor(self):
574 def monitor(self):
565
575
566 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
576 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
567 t.start()
577 t.start()
568
578
569 def _monitor(self, queue, ctx):
579 def _monitor(self, queue, ctx):
570
580
571 import socket
581 import socket
572
582
573 procs = 0
583 procs = 0
574 err_msg = ''
584 err_msg = ''
575
585
576 while True:
586 while True:
577 msg = queue.get()
587 msg = queue.get()
578 if '#_start_#' in msg:
588 if '#_start_#' in msg:
579 procs += 1
589 procs += 1
580 elif '#_end_#' in msg:
590 elif '#_end_#' in msg:
581 procs -=1
591 procs -=1
582 else:
592 else:
583 err_msg = msg
593 err_msg = msg
584
594
585 if procs == 0 or 'Traceback' in err_msg:
595 if procs == 0 or 'Traceback' in err_msg:
586 break
596 break
587 time.sleep(0.1)
597 time.sleep(0.1)
588
598
589 if '|' in err_msg:
599 if '|' in err_msg:
590 name, err = err_msg.split('|')
600 name, err = err_msg.split('|')
591 if 'SchainWarning' in err:
601 if 'SchainWarning' in err:
592 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
602 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
593 elif 'SchainError' in err:
603 elif 'SchainError' in err:
594 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
604 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
595 else:
605 else:
596 log.error(err, name)
606 log.error(err, name)
597 else:
607 else:
598 name, err = self.name, err_msg
608 name, err = self.name, err_msg
599
609
600 time.sleep(1)
610 time.sleep(1)
601
611
602 ctx.term()
612 ctx.term()
603
613
604 message = ''.join(err)
614 message = ''.join(err)
605
615
606 if err_msg:
616 if err_msg:
607 subject = 'SChain v%s: Error running %s\n' % (
617 subject = 'SChain v%s: Error running %s\n' % (
608 schainpy.__version__, self.name)
618 schainpy.__version__, self.name)
609
619
610 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
620 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
611 socket.gethostname())
621 socket.gethostname())
612 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
622 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
613 subtitle += 'Configuration file: %s\n' % self.filename
623 subtitle += 'Configuration file: %s\n' % self.filename
614 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
624 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
615
625
616 readUnitConfObj = self.getReadUnit()
626 readUnitConfObj = self.getReadUnit()
617 if readUnitConfObj:
627 if readUnitConfObj:
618 subtitle += '\nInput parameters:\n'
628 subtitle += '\nInput parameters:\n'
619 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
629 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
620 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
630 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
621 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
631 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
622 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
632 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
623 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
633 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
624
634
625 a = Alarm(
635 a = Alarm(
626 modes=self.alarm,
636 modes=self.alarm,
627 email=self.email,
637 email=self.email,
628 message=message,
638 message=message,
629 subject=subject,
639 subject=subject,
630 subtitle=subtitle,
640 subtitle=subtitle,
631 filename=self.filename
641 filename=self.filename
632 )
642 )
633
643
634 a.start()
644 a.start()
635
645
636 def setFilename(self, filename):
646 def setFilename(self, filename):
637
647
638 self.filename = filename
648 self.filename = filename
639
649
640 def runProcs(self):
650 def runProcs(self):
641
651
642 err = False
652 err = False
643 n = len(self.configurations)
653 n = len(self.configurations)
644
654
645 while not err:
655 while not err:
646 for conf in self.getUnits():
656 for conf in self.getUnits():
647 ok = conf.run()
657 ok = conf.run()
648 if ok == 'Error':
658 if ok == 'Error':
649 n -= 1
659 n -= 1
650 continue
660 continue
651 elif not ok:
661 elif not ok:
652 break
662 break
653 if n == 0:
663 if n == 0:
654 err = True
664 err = True
655
665
656 def run(self):
666 def run(self):
657
667
658 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
668 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
659 self.started = True
669 self.started = True
660 self.start_time = time.time()
670 self.start_time = time.time()
661 self.createObjects()
671 self.createObjects()
672 sig.signal(sig.SIGTERM, handler)
662 self.runProcs()
673 self.runProcs()
663 log.success('{} Done (Time: {:4.2f}s)'.format(
674 log.success('{} Done (Time: {:4.2f}s)'.format(
664 self.name,
675 self.name,
665 time.time()-self.start_time), '')
676 time.time()-self.start_time), '')
General Comments 0
You need to be logged in to leave comments. Login now