##// END OF EJS Templates
update controller change class Project update createObjects
Alexander Valdez -
r1670:1003ac744c06
parent child
Show More
@@ -1,659 +1,661
1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
2 # All rights reserved.
2 # All rights reserved.
3 #
3 #
4 # Distributed under the terms of the BSD 3-clause license.
4 # Distributed under the terms of the BSD 3-clause license.
5 """API to create signal chain projects
5 """API to create signal chain projects
6
6
7 The API is provide through class: Project
7 The API is provide through class: Project
8 """
8 """
9
9
10 import re
10 import re
11 import sys
11 import sys
12 import ast
12 import ast
13 import datetime
13 import datetime
14 import traceback
14 import traceback
15 import time
15 import time
16 import multiprocessing
16 import multiprocessing
17 from multiprocessing import Process, Queue
17 from multiprocessing import Process, Queue
18 from threading import Thread
18 from threading import Thread
19 from xml.etree.ElementTree import ElementTree, Element, SubElement
19 from xml.etree.ElementTree import ElementTree, Element, SubElement
20
20
21 from schainpy.admin import Alarm, SchainWarning
21 from schainpy.admin import Alarm, SchainWarning
22 from schainpy.model import *
22 from schainpy.model import *
23 from schainpy.utils import log
23 from schainpy.utils import log
24
24
25 if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7:
25 if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7:
26 multiprocessing.set_start_method('fork')
26 multiprocessing.set_start_method('fork')
27
27
28 class ConfBase():
28 class ConfBase():
29
29
30 def __init__(self):
30 def __init__(self):
31
31
32 self.id = '0'
32 self.id = '0'
33 self.name = None
33 self.name = None
34 self.priority = None
34 self.priority = None
35 self.parameters = {}
35 self.parameters = {}
36 self.object = None
36 self.object = None
37 self.operations = []
37 self.operations = []
38
38
39 def getId(self):
39 def getId(self):
40
40
41 return self.id
41 return self.id
42
42
43 def getNewId(self):
43 def getNewId(self):
44
44
45 return int(self.id) * 10 + len(self.operations) + 1
45 return int(self.id) * 10 + len(self.operations) + 1
46
46
47 def updateId(self, new_id):
47 def updateId(self, new_id):
48
48
49 self.id = str(new_id)
49 self.id = str(new_id)
50
50
51 n = 1
51 n = 1
52 for conf in self.operations:
52 for conf in self.operations:
53 conf_id = str(int(new_id) * 10 + n)
53 conf_id = str(int(new_id) * 10 + n)
54 conf.updateId(conf_id)
54 conf.updateId(conf_id)
55 n += 1
55 n += 1
56
56
57 def getKwargs(self):
57 def getKwargs(self):
58
58
59 params = {}
59 params = {}
60
60
61 for key, value in self.parameters.items():
61 for key, value in self.parameters.items():
62 if value not in (None, '', ' '):
62 if value not in (None, '', ' '):
63 params[key] = value
63 params[key] = value
64
64
65 return params
65 return params
66
66
67 def update(self, **kwargs):
67 def update(self, **kwargs):
68
68
69 for key, value in kwargs.items():
69 for key, value in kwargs.items():
70 self.addParameter(name=key, value=value)
70 self.addParameter(name=key, value=value)
71
71
72 def addParameter(self, name, value, format=None):
72 def addParameter(self, name, value, format=None):
73 '''
73 '''
74 '''
74 '''
75
75
76 if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
76 if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
77 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
77 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
78 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
78 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
79 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
79 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
80 else:
80 else:
81 try:
81 try:
82 self.parameters[name] = ast.literal_eval(value)
82 self.parameters[name] = ast.literal_eval(value)
83 except:
83 except:
84 if isinstance(value, str) and ',' in value:
84 if isinstance(value, str) and ',' in value:
85 self.parameters[name] = value.split(',')
85 self.parameters[name] = value.split(',')
86 else:
86 else:
87 self.parameters[name] = value
87 self.parameters[name] = value
88
88
89 def getParameters(self):
89 def getParameters(self):
90
90
91 params = {}
91 params = {}
92 for key, value in self.parameters.items():
92 for key, value in self.parameters.items():
93 s = type(value).__name__
93 s = type(value).__name__
94 if s == 'date':
94 if s == 'date':
95 params[key] = value.strftime('%Y/%m/%d')
95 params[key] = value.strftime('%Y/%m/%d')
96 elif s == 'time':
96 elif s == 'time':
97 params[key] = value.strftime('%H:%M:%S')
97 params[key] = value.strftime('%H:%M:%S')
98 else:
98 else:
99 params[key] = str(value)
99 params[key] = str(value)
100
100
101 return params
101 return params
102
102
103 def makeXml(self, element):
103 def makeXml(self, element):
104
104
105 xml = SubElement(element, self.ELEMENTNAME)
105 xml = SubElement(element, self.ELEMENTNAME)
106 for label in self.xml_labels:
106 for label in self.xml_labels:
107 xml.set(label, str(getattr(self, label)))
107 xml.set(label, str(getattr(self, label)))
108
108
109 for key, value in self.getParameters().items():
109 for key, value in self.getParameters().items():
110 xml_param = SubElement(xml, 'Parameter')
110 xml_param = SubElement(xml, 'Parameter')
111 xml_param.set('name', key)
111 xml_param.set('name', key)
112 xml_param.set('value', value)
112 xml_param.set('value', value)
113
113
114 for conf in self.operations:
114 for conf in self.operations:
115 conf.makeXml(xml)
115 conf.makeXml(xml)
116
116
117 def __str__(self):
117 def __str__(self):
118
118
119 if self.ELEMENTNAME == 'Operation':
119 if self.ELEMENTNAME == 'Operation':
120 s = ' {}[id={}]\n'.format(self.name, self.id)
120 s = ' {}[id={}]\n'.format(self.name, self.id)
121 else:
121 else:
122 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
122 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
123
123
124 for key, value in self.parameters.items():
124 for key, value in self.parameters.items():
125 if self.ELEMENTNAME == 'Operation':
125 if self.ELEMENTNAME == 'Operation':
126 s += ' {}: {}\n'.format(key, value)
126 s += ' {}: {}\n'.format(key, value)
127 else:
127 else:
128 s += ' {}: {}\n'.format(key, value)
128 s += ' {}: {}\n'.format(key, value)
129
129
130 for conf in self.operations:
130 for conf in self.operations:
131 s += str(conf)
131 s += str(conf)
132
132
133 return s
133 return s
134
134
135 class OperationConf(ConfBase):
135 class OperationConf(ConfBase):
136
136
137 ELEMENTNAME = 'Operation'
137 ELEMENTNAME = 'Operation'
138 xml_labels = ['id', 'name']
138 xml_labels = ['id', 'name']
139
139
140 def setup(self, id, name, priority, project_id, err_queue):
140 def setup(self, id, name, priority, project_id, err_queue):
141
141
142 self.id = str(id)
142 self.id = str(id)
143 self.project_id = project_id
143 self.project_id = project_id
144 self.name = name
144 self.name = name
145 self.type = 'other'
145 self.type = 'other'
146 self.err_queue = err_queue
146 self.err_queue = err_queue
147
147
148 def readXml(self, element, project_id, err_queue):
148 def readXml(self, element, project_id, err_queue):
149
149
150 self.id = element.get('id')
150 self.id = element.get('id')
151 self.name = element.get('name')
151 self.name = element.get('name')
152 self.type = 'other'
152 self.type = 'other'
153 self.project_id = str(project_id)
153 self.project_id = str(project_id)
154 self.err_queue = err_queue
154 self.err_queue = err_queue
155
155
156 for elm in element.iter('Parameter'):
156 for elm in element.iter('Parameter'):
157 self.addParameter(elm.get('name'), elm.get('value'))
157 self.addParameter(elm.get('name'), elm.get('value'))
158
158
159 def createObject(self):
159 def createObject(self):
160
160
161 className = eval(self.name)
161 className = eval(self.name)
162
162
163 if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name:
163 if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name:
164 kwargs = self.getKwargs()
164 kwargs = self.getKwargs()
165 opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
165 opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
166 opObj.start()
166 opObj.start()
167 self.type = 'external'
167 self.type = 'external'
168 else:
168 else:
169 opObj = className()
169 opObj = className()
170
170
171 self.object = opObj
171 self.object = opObj
172 return opObj
172 return opObj
173
173
174 class ProcUnitConf(ConfBase):
174 class ProcUnitConf(ConfBase):
175
175
176 ELEMENTNAME = 'ProcUnit'
176 ELEMENTNAME = 'ProcUnit'
177 xml_labels = ['id', 'inputId', 'name']
177 xml_labels = ['id', 'inputId', 'name']
178
178
179 def setup(self, project_id, id, name, datatype, inputId, err_queue):
179 def setup(self, project_id, id, name, datatype, inputId, err_queue):
180 '''
180 '''
181 '''
181 '''
182
182
183 if datatype == None and name == None:
183 if datatype == None and name == None:
184 raise ValueError('datatype or name should be defined')
184 raise ValueError('datatype or name should be defined')
185
185
186 if name == None:
186 if name == None:
187 if 'Proc' in datatype:
187 if 'Proc' in datatype:
188 name = datatype
188 name = datatype
189 else:
189 else:
190 name = '%sProc' % (datatype)
190 name = '%sProc' % (datatype)
191
191
192 if datatype == None:
192 if datatype == None:
193 datatype = name.replace('Proc', '')
193 datatype = name.replace('Proc', '')
194
194
195 self.id = str(id)
195 self.id = str(id)
196 self.project_id = project_id
196 self.project_id = project_id
197 self.name = name
197 self.name = name
198 self.datatype = datatype
198 self.datatype = datatype
199 self.inputId = inputId
199 self.inputId = inputId
200 self.err_queue = err_queue
200 self.err_queue = err_queue
201 self.operations = []
201 self.operations = []
202 self.parameters = {}
202 self.parameters = {}
203
203
204 def removeOperation(self, id):
204 def removeOperation(self, id):
205
205
206 i = [1 if x.id==id else 0 for x in self.operations]
206 i = [1 if x.id==id else 0 for x in self.operations]
207 self.operations.pop(i.index(1))
207 self.operations.pop(i.index(1))
208
208
209 def getOperation(self, id):
209 def getOperation(self, id):
210
210
211 for conf in self.operations:
211 for conf in self.operations:
212 if conf.id == id:
212 if conf.id == id:
213 return conf
213 return conf
214
214
215 def addOperation(self, name, optype='self'):
215 def addOperation(self, name, optype='self'):
216 '''
216 '''
217 '''
217 '''
218
218
219 id = self.getNewId()
219 id = self.getNewId()
220 conf = OperationConf()
220 conf = OperationConf()
221 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
221 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
222 self.operations.append(conf)
222 self.operations.append(conf)
223
223
224 return conf
224 return conf
225
225
226 def readXml(self, element, project_id, err_queue):
226 def readXml(self, element, project_id, err_queue):
227
227
228 self.id = element.get('id')
228 self.id = element.get('id')
229 self.name = element.get('name')
229 self.name = element.get('name')
230 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
230 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
231 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
231 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
232 self.project_id = str(project_id)
232 self.project_id = str(project_id)
233 self.err_queue = err_queue
233 self.err_queue = err_queue
234 self.operations = []
234 self.operations = []
235 self.parameters = {}
235 self.parameters = {}
236
236
237 for elm in element:
237 for elm in element:
238 if elm.tag == 'Parameter':
238 if elm.tag == 'Parameter':
239 self.addParameter(elm.get('name'), elm.get('value'))
239 self.addParameter(elm.get('name'), elm.get('value'))
240 elif elm.tag == 'Operation':
240 elif elm.tag == 'Operation':
241 conf = OperationConf()
241 conf = OperationConf()
242 conf.readXml(elm, project_id, err_queue)
242 conf.readXml(elm, project_id, err_queue)
243 self.operations.append(conf)
243 self.operations.append(conf)
244
244
245 def createObjects(self):
245 def createObjects(self):
246 '''
246 '''
247 Instancia de unidades de procesamiento.
247 Instancia de unidades de procesamiento.
248 '''
248 '''
249
249
250 className = eval(self.name)
250 className = eval(self.name)
251 kwargs = self.getKwargs()
251 kwargs = self.getKwargs()
252 procUnitObj = className()
252 procUnitObj = className()
253 procUnitObj.name = self.name
253 procUnitObj.name = self.name
254 log.success('creating process...', self.name)
254 log.success('creating process...', self.name)
255
255
256 for conf in self.operations:
256 for conf in self.operations:
257
257
258 opObj = conf.createObject()
258 opObj = conf.createObject()
259
259
260 log.success('adding operation: {}, type:{}'.format(
260 log.success('adding operation: {}, type:{}'.format(
261 conf.name,
261 conf.name,
262 conf.type), self.name)
262 conf.type), self.name)
263
263
264 procUnitObj.addOperation(conf, opObj)
264 procUnitObj.addOperation(conf, opObj)
265
265
266 self.object = procUnitObj
266 self.object = procUnitObj
267
267
268 def run(self):
268 def run(self):
269 '''
269 '''
270 '''
270 '''
271
271
272 return self.object.call(**self.getKwargs())
272 return self.object.call(**self.getKwargs())
273
273
274
274
275 class ReadUnitConf(ProcUnitConf):
275 class ReadUnitConf(ProcUnitConf):
276
276
277 ELEMENTNAME = 'ReadUnit'
277 ELEMENTNAME = 'ReadUnit'
278
278
279 def __init__(self):
279 def __init__(self):
280
280
281 self.id = None
281 self.id = None
282 self.datatype = None
282 self.datatype = None
283 self.name = None
283 self.name = None
284 self.inputId = None
284 self.inputId = None
285 self.operations = []
285 self.operations = []
286 self.parameters = {}
286 self.parameters = {}
287
287
288 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
288 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
289 startTime='', endTime='', server=None, **kwargs):
289 startTime='', endTime='', server=None, **kwargs):
290
290
291 if datatype == None and name == None:
291 if datatype == None and name == None:
292 raise ValueError('datatype or name should be defined')
292 raise ValueError('datatype or name should be defined')
293 if name == None:
293 if name == None:
294 if 'Reader' in datatype:
294 if 'Reader' in datatype:
295 name = datatype
295 name = datatype
296 datatype = name.replace('Reader','')
296 datatype = name.replace('Reader','')
297 else:
297 else:
298 name = '{}Reader'.format(datatype)
298 name = '{}Reader'.format(datatype)
299 if datatype == None:
299 if datatype == None:
300 if 'Reader' in name:
300 if 'Reader' in name:
301 datatype = name.replace('Reader','')
301 datatype = name.replace('Reader','')
302 else:
302 else:
303 datatype = name
303 datatype = name
304 name = '{}Reader'.format(name)
304 name = '{}Reader'.format(name)
305
305
306 self.id = id
306 self.id = id
307 self.project_id = project_id
307 self.project_id = project_id
308 self.name = name
308 self.name = name
309 self.datatype = datatype
309 self.datatype = datatype
310 self.err_queue = err_queue
310 self.err_queue = err_queue
311
311
312 self.addParameter(name='path', value=path)
312 self.addParameter(name='path', value=path)
313 self.addParameter(name='startDate', value=startDate)
313 self.addParameter(name='startDate', value=startDate)
314 self.addParameter(name='endDate', value=endDate)
314 self.addParameter(name='endDate', value=endDate)
315 self.addParameter(name='startTime', value=startTime)
315 self.addParameter(name='startTime', value=startTime)
316 self.addParameter(name='endTime', value=endTime)
316 self.addParameter(name='endTime', value=endTime)
317
317
318 for key, value in kwargs.items():
318 for key, value in kwargs.items():
319 self.addParameter(name=key, value=value)
319 self.addParameter(name=key, value=value)
320
320
321
321
322 class Project(Process):
322 class Project(Process):
323 """API to create signal chain projects"""
323 """API to create signal chain projects"""
324
324
325 ELEMENTNAME = 'Project'
325 ELEMENTNAME = 'Project'
326
326
327 def __init__(self, name=''):
327 def __init__(self, name=''):
328
328
329 Process.__init__(self)
329 Process.__init__(self)
330 self.id = '1'
330 self.id = '1'
331 if name:
331 if name:
332 self.name = '{} ({})'.format(Process.__name__, name)
332 self.name = '{} ({})'.format(Process.__name__, name)
333 self.filename = None
333 self.filename = None
334 self.description = None
334 self.description = None
335 self.email = None
335 self.email = None
336 self.alarm = []
336 self.alarm = []
337 self.configurations = {}
337 self.configurations = {}
338 # self.err_queue = Queue()
338 # self.err_queue = Queue()
339 self.err_queue = None
339 self.err_queue = None
340 self.started = False
340 self.started = False
341
341
342 def getNewId(self):
342 def getNewId(self):
343
343
344 idList = list(self.configurations.keys())
344 idList = list(self.configurations.keys())
345 id = int(self.id) * 10
345 id = int(self.id) * 10
346
346
347 while True:
347 while True:
348 id += 1
348 id += 1
349
349
350 if str(id) in idList:
350 if str(id) in idList:
351 continue
351 continue
352
352
353 break
353 break
354
354
355 return str(id)
355 return str(id)
356
356
357 def updateId(self, new_id):
357 def updateId(self, new_id):
358
358
359 self.id = str(new_id)
359 self.id = str(new_id)
360
360
361 keyList = list(self.configurations.keys())
361 keyList = list(self.configurations.keys())
362 keyList.sort()
362 keyList.sort()
363
363
364 n = 1
364 n = 1
365 new_confs = {}
365 new_confs = {}
366
366
367 for procKey in keyList:
367 for procKey in keyList:
368
368
369 conf = self.configurations[procKey]
369 conf = self.configurations[procKey]
370 idProcUnit = str(int(self.id) * 10 + n)
370 idProcUnit = str(int(self.id) * 10 + n)
371 conf.updateId(idProcUnit)
371 conf.updateId(idProcUnit)
372 new_confs[idProcUnit] = conf
372 new_confs[idProcUnit] = conf
373 n += 1
373 n += 1
374
374
375 self.configurations = new_confs
375 self.configurations = new_confs
376
376
377 def setup(self, id=1, name='', description='', email=None, alarm=[]):
377 def setup(self, id=1, name='', description='', email=None, alarm=[]):
378
378
379 self.id = str(id)
379 self.id = str(id)
380 self.description = description
380 self.description = description
381 self.email = email
381 self.email = email
382 self.alarm = alarm
382 self.alarm = alarm
383 if name:
383 if name:
384 self.name = '{} ({})'.format(Process.__name__, name)
384 self.name = '{} ({})'.format(Process.__name__, name)
385
385
386 def update(self, **kwargs):
386 def update(self, **kwargs):
387
387
388 for key, value in kwargs.items():
388 for key, value in kwargs.items():
389 setattr(self, key, value)
389 setattr(self, key, value)
390
390
391 def clone(self):
391 def clone(self):
392
392
393 p = Project()
393 p = Project()
394 p.id = self.id
394 p.id = self.id
395 p.name = self.name
395 p.name = self.name
396 p.description = self.description
396 p.description = self.description
397 p.configurations = self.configurations.copy()
397 p.configurations = self.configurations.copy()
398
398
399 return p
399 return p
400
400
401 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
401 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
402
402
403 '''
403 '''
404 '''
404 '''
405
405
406 if id is None:
406 if id is None:
407 idReadUnit = self.getNewId()
407 idReadUnit = self.getNewId()
408 else:
408 else:
409 idReadUnit = str(id)
409 idReadUnit = str(id)
410
410
411 conf = ReadUnitConf()
411 conf = ReadUnitConf()
412 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
412 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
413 self.configurations[conf.id] = conf
413 self.configurations[conf.id] = conf
414
414
415 return conf
415 return conf
416
416
417 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
417 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
418
418
419 '''
419 '''
420 '''
420 '''
421
421
422 if id is None:
422 if id is None:
423 idProcUnit = self.getNewId()
423 idProcUnit = self.getNewId()
424 else:
424 else:
425 idProcUnit = id
425 idProcUnit = id
426
426
427 conf = ProcUnitConf()
427 conf = ProcUnitConf()
428 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
428 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
429 self.configurations[conf.id] = conf
429 self.configurations[conf.id] = conf
430
430
431 return conf
431 return conf
432
432
433 def removeProcUnit(self, id):
433 def removeProcUnit(self, id):
434
434
435 if id in self.configurations:
435 if id in self.configurations:
436 self.configurations.pop(id)
436 self.configurations.pop(id)
437
437
438 def getReadUnit(self):
438 def getReadUnit(self):
439
439
440 for obj in list(self.configurations.values()):
440 for obj in list(self.configurations.values()):
441 if obj.ELEMENTNAME == 'ReadUnit':
441 if obj.ELEMENTNAME == 'ReadUnit':
442 return obj
442 return obj
443
443
444 return None
444 return None
445
445
446 def getProcUnit(self, id):
446 def getProcUnit(self, id):
447
447
448 return self.configurations[id]
448 return self.configurations[id]
449
449
450 def getUnits(self):
450 def getUnits(self):
451
451
452 keys = list(self.configurations)
452 keys = list(self.configurations)
453 keys.sort()
453 keys.sort()
454
454
455 for key in keys:
455 for key in keys:
456 yield self.configurations[key]
456 yield self.configurations[key]
457
457
458 def updateUnit(self, id, **kwargs):
458 def updateUnit(self, id, **kwargs):
459
459
460 conf = self.configurations[id].update(**kwargs)
460 conf = self.configurations[id].update(**kwargs)
461
461
462 def makeXml(self):
462 def makeXml(self):
463
463
464 xml = Element('Project')
464 xml = Element('Project')
465 xml.set('id', str(self.id))
465 xml.set('id', str(self.id))
466 xml.set('name', self.name)
466 xml.set('name', self.name)
467 xml.set('description', self.description)
467 xml.set('description', self.description)
468
468
469 for conf in self.configurations.values():
469 for conf in self.configurations.values():
470 conf.makeXml(xml)
470 conf.makeXml(xml)
471
471
472 self.xml = xml
472 self.xml = xml
473
473
474 def writeXml(self, filename=None):
474 def writeXml(self, filename=None):
475
475
476 if filename == None:
476 if filename == None:
477 if self.filename:
477 if self.filename:
478 filename = self.filename
478 filename = self.filename
479 else:
479 else:
480 filename = 'schain.xml'
480 filename = 'schain.xml'
481
481
482 if not filename:
482 if not filename:
483 print('filename has not been defined. Use setFilename(filename) for do it.')
483 print('filename has not been defined. Use setFilename(filename) for do it.')
484 return 0
484 return 0
485
485
486 abs_file = os.path.abspath(filename)
486 abs_file = os.path.abspath(filename)
487
487
488 if not os.access(os.path.dirname(abs_file), os.W_OK):
488 if not os.access(os.path.dirname(abs_file), os.W_OK):
489 print('No write permission on %s' % os.path.dirname(abs_file))
489 print('No write permission on %s' % os.path.dirname(abs_file))
490 return 0
490 return 0
491
491
492 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
492 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
493 print('File %s already exists and it could not be overwriten' % abs_file)
493 print('File %s already exists and it could not be overwriten' % abs_file)
494 return 0
494 return 0
495
495
496 self.makeXml()
496 self.makeXml()
497
497
498 ElementTree(self.xml).write(abs_file, method='xml')
498 ElementTree(self.xml).write(abs_file, method='xml')
499
499
500 self.filename = abs_file
500 self.filename = abs_file
501
501
502 return 1
502 return 1
503
503
504 def readXml(self, filename):
504 def readXml(self, filename):
505
505
506 abs_file = os.path.abspath(filename)
506 abs_file = os.path.abspath(filename)
507
507
508 self.configurations = {}
508 self.configurations = {}
509
509
510 try:
510 try:
511 self.xml = ElementTree().parse(abs_file)
511 self.xml = ElementTree().parse(abs_file)
512 except:
512 except:
513 log.error('Error reading %s, verify file format' % filename)
513 log.error('Error reading %s, verify file format' % filename)
514 return 0
514 return 0
515
515
516 self.id = self.xml.get('id')
516 self.id = self.xml.get('id')
517 self.name = self.xml.get('name')
517 self.name = self.xml.get('name')
518 self.description = self.xml.get('description')
518 self.description = self.xml.get('description')
519
519
520 for element in self.xml:
520 for element in self.xml:
521 if element.tag == 'ReadUnit':
521 if element.tag == 'ReadUnit':
522 conf = ReadUnitConf()
522 conf = ReadUnitConf()
523 conf.readXml(element, self.id, self.err_queue)
523 conf.readXml(element, self.id, self.err_queue)
524 self.configurations[conf.id] = conf
524 self.configurations[conf.id] = conf
525 elif element.tag == 'ProcUnit':
525 elif element.tag == 'ProcUnit':
526 conf = ProcUnitConf()
526 conf = ProcUnitConf()
527 input_proc = self.configurations[element.get('inputId')]
527 input_proc = self.configurations[element.get('inputId')]
528 conf.readXml(element, self.id, self.err_queue)
528 conf.readXml(element, self.id, self.err_queue)
529 self.configurations[conf.id] = conf
529 self.configurations[conf.id] = conf
530
530
531 self.filename = abs_file
531 self.filename = abs_file
532
532
533 return 1
533 return 1
534
534
535 def __str__(self):
535 def __str__(self):
536
536
537 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
537 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
538 self.id,
538 self.id,
539 self.name,
539 self.name,
540 self.description,
540 self.description,
541 )
541 )
542
542
543 for conf in self.configurations.values():
543 for conf in self.configurations.values():
544 text += '{}'.format(conf)
544 text += '{}'.format(conf)
545
545
546 return text
546 return text
547
547
548 def createObjects(self):
548 def createObjects(self):
549
549
550 keys = list(self.configurations.keys())
550 keys = list(self.configurations.keys())
551 keys.sort()
551 keys.sort()
552 for key in keys:
552 for key in keys:
553 conf = self.configurations[key]
553 conf = self.configurations[key]
554 conf.createObjects()
554 conf.createObjects()
555 if conf.inputId is not None:
555 if conf.inputId is not None:
556 conf.object.setInput(self.configurations[conf.inputId].object)
556 if isinstance(conf.inputId, list):
557 conf.object.setInput([self.configurations[x].object for x in conf.inputId])
558 else:
559 conf.object.setInput([self.configurations[conf.inputId].object])
557
560
558 def monitor(self):
561 def monitor(self):
559
562
560 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
563 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
561 t.start()
564 t.start()
562
565
563 def _monitor(self, queue, ctx):
566 def _monitor(self, queue, ctx):
564
567
565 import socket
568 import socket
566
569
567 procs = 0
570 procs = 0
568 err_msg = ''
571 err_msg = ''
569
572
570 while True:
573 while True:
571 msg = queue.get()
574 msg = queue.get()
572 if '#_start_#' in msg:
575 if '#_start_#' in msg:
573 procs += 1
576 procs += 1
574 elif '#_end_#' in msg:
577 elif '#_end_#' in msg:
575 procs -=1
578 procs -=1
576 else:
579 else:
577 err_msg = msg
580 err_msg = msg
578
581
579 if procs == 0 or 'Traceback' in err_msg:
582 if procs == 0 or 'Traceback' in err_msg:
580 break
583 break
581 time.sleep(0.1)
584 time.sleep(0.1)
582
585
583 if '|' in err_msg:
586 if '|' in err_msg:
584 name, err = err_msg.split('|')
587 name, err = err_msg.split('|')
585 if 'SchainWarning' in err:
588 if 'SchainWarning' in err:
586 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
589 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
587 elif 'SchainError' in err:
590 elif 'SchainError' in err:
588 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
591 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
589 else:
592 else:
590 log.error(err, name)
593 log.error(err, name)
591 else:
594 else:
592 name, err = self.name, err_msg
595 name, err = self.name, err_msg
593
596
594 time.sleep(1)
597 time.sleep(1)
595
598
596 ctx.term()
599 ctx.term()
597
600
598 message = ''.join(err)
601 message = ''.join(err)
599
602
600 if err_msg:
603 if err_msg:
601 subject = 'SChain v%s: Error running %s\n' % (
604 subject = 'SChain v%s: Error running %s\n' % (
602 schainpy.__version__, self.name)
605 schainpy.__version__, self.name)
603
606
604 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
607 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
605 socket.gethostname())
608 socket.gethostname())
606 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
609 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
607 subtitle += 'Configuration file: %s\n' % self.filename
610 subtitle += 'Configuration file: %s\n' % self.filename
608 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
611 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
609
612
610 readUnitConfObj = self.getReadUnit()
613 readUnitConfObj = self.getReadUnit()
611 if readUnitConfObj:
614 if readUnitConfObj:
612 subtitle += '\nInput parameters:\n'
615 subtitle += '\nInput parameters:\n'
613 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
616 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
614 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
617 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
615 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
618 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
616 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
619 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
617 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
620 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
618
621
619 a = Alarm(
622 a = Alarm(
620 modes=self.alarm,
623 modes=self.alarm,
621 email=self.email,
624 email=self.email,
622 message=message,
625 message=message,
623 subject=subject,
626 subject=subject,
624 subtitle=subtitle,
627 subtitle=subtitle,
625 filename=self.filename
628 filename=self.filename
626 )
629 )
627
630
628 a.start()
631 a.start()
629
632
630 def setFilename(self, filename):
633 def setFilename(self, filename):
631
634
632 self.filename = filename
635 self.filename = filename
633
636
634 def runProcs(self):
637 def runProcs(self):
635
638
636 err = False
639 err = False
637 n = len(self.configurations)
640 n = len(self.configurations)
638
639 while not err:
641 while not err:
640 for conf in self.getUnits():
642 for conf in self.getUnits():
641 ok = conf.run()
643 ok = conf.run()
642 if ok == 'Error':
644 if ok == 'Error':
643 n -= 1
645 n -= 1
644 continue
646 continue
645 elif not ok:
647 elif not ok:
646 break
648 break
647 if n == 0:
649 if n == 0:
648 err = True
650 err = True
649
651
650 def run(self):
652 def run(self):
651
653
652 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
654 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
653 self.started = True
655 self.started = True
654 self.start_time = time.time()
656 self.start_time = time.time()
655 self.createObjects()
657 self.createObjects()
656 self.runProcs()
658 self.runProcs()
657 log.success('{} Done (Time: {:4.2f}s)'.format(
659 log.success('{} Done (Time: {:4.2f}s)'.format(
658 self.name,
660 self.name,
659 time.time()-self.start_time), '')
661 time.time()-self.start_time), '') No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now