##// END OF EJS Templates
Added ToLilBlock class from Roberto
Christianpl -
r1789:2739006ee497 isr_v2
parent child
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -1,677 +1,705
1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
2 # All rights reserved.
2 # All rights reserved.
3 #
3 #
4 # Distributed under the terms of the BSD 3-clause license.
4 # Distributed under the terms of the BSD 3-clause license.
5 """API to create signal chain projects
5 """API to create signal chain projects
6
6
7 The API is provide through class: Project
7 The API is provide through class: Project
8 """
8 """
9
9
10 import re
10 import re
11 import sys
11 import sys
12 import ast
12 import ast
13 import datetime
13 import datetime
14 import traceback
14 import traceback
15 import time
15 import time
16 import multiprocessing
16 import multiprocessing
17 from multiprocessing import Process, Queue
17 from multiprocessing import Process, Queue
18 from threading import Thread
18 from threading import Thread
19 from xml.etree.ElementTree import ElementTree, Element, SubElement
19 from xml.etree.ElementTree import ElementTree, Element, SubElement
20
20
21 from schainpy.admin import Alarm, SchainWarning
21 from schainpy.admin import Alarm, SchainWarning
22 from schainpy.model import *
22 from schainpy.model import *
23 from schainpy.utils import log
23 from schainpy.utils import log
24
24
25 if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7:
25 if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7:
26 multiprocessing.set_start_method('fork')
26 multiprocessing.set_start_method('fork')
27
27
28 class ConfBase():
28 class ConfBase():
29
29
30 def __init__(self):
30 def __init__(self):
31
31
32 self.id = '0'
32 self.id = '0'
33 self.name = None
33 self.name = None
34 self.priority = None
34 self.priority = None
35 self.parameters = {}
35 self.parameters = {}
36 self.object = None
36 self.object = None
37 self.operations = []
37 self.operations = []
38
38
39 def getId(self):
39 def getId(self):
40
40
41 return self.id
41 return self.id
42
42
43 def getNewId(self):
43 def getNewId(self):
44
44
45 return int(self.id) * 10 + len(self.operations) + 1
45 return int(self.id) * 10 + len(self.operations) + 1
46
46
47 def updateId(self, new_id):
47 def updateId(self, new_id):
48
48
49 self.id = str(new_id)
49 self.id = str(new_id)
50
50
51 n = 1
51 n = 1
52 for conf in self.operations:
52 for conf in self.operations:
53 conf_id = str(int(new_id) * 10 + n)
53 conf_id = str(int(new_id) * 10 + n)
54 conf.updateId(conf_id)
54 conf.updateId(conf_id)
55 n += 1
55 n += 1
56
56
57 def getKwargs(self):
57 def getKwargs(self):
58
58
59 params = {}
59 params = {}
60
60
61 for key, value in self.parameters.items():
61 for key, value in self.parameters.items():
62 if value not in (None, '', ' '):
62 if value not in (None, '', ' '):
63 params[key] = value
63 params[key] = value
64
64
65 return params
65 return params
66
66
67 def update(self, **kwargs):
67 def update(self, **kwargs):
68
68
69 for key, value in kwargs.items():
69 for key, value in kwargs.items():
70 self.addParameter(name=key, value=value)
70 self.addParameter(name=key, value=value)
71
71
72 def addParameter(self, name, value, format=None):
72 def addParameter(self, name, value, format=None):
73 '''
73 '''
74 '''
74 '''
75
75
76 if format is not None:
76 if format is not None:
77 self.parameters[name] = eval(format)(value)
77 self.parameters[name] = eval(format)(value)
78 elif isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
78 elif isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
79 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
79 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
80 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
80 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
81 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
81 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
82 else:
82 else:
83 try:
83 try:
84 self.parameters[name] = ast.literal_eval(value)
84 self.parameters[name] = ast.literal_eval(value)
85 except:
85 except:
86 if isinstance(value, str) and ',' in value:
86 if isinstance(value, str) and ',' in value:
87 self.parameters[name] = value.split(',')
87 self.parameters[name] = value.split(',')
88 else:
88 else:
89 self.parameters[name] = value
89 self.parameters[name] = value
90
90
91 def getParameters(self):
91 def getParameters(self):
92
92
93 params = {}
93 params = {}
94 for key, value in self.parameters.items():
94 for key, value in self.parameters.items():
95 s = type(value).__name__
95 s = type(value).__name__
96 if s == 'date':
96 if s == 'date':
97 params[key] = value.strftime('%Y/%m/%d')
97 params[key] = value.strftime('%Y/%m/%d')
98 elif s == 'time':
98 elif s == 'time':
99 params[key] = value.strftime('%H:%M:%S')
99 params[key] = value.strftime('%H:%M:%S')
100 else:
100 else:
101 params[key] = str(value)
101 params[key] = str(value)
102
102
103 return params
103 return params
104
104
105 def makeXml(self, element):
105 def makeXml(self, element):
106
106
107 xml = SubElement(element, self.ELEMENTNAME)
107 xml = SubElement(element, self.ELEMENTNAME)
108 for label in self.xml_labels:
108 for label in self.xml_labels:
109 xml.set(label, str(getattr(self, label)))
109 xml.set(label, str(getattr(self, label)))
110
110
111 for key, value in self.getParameters().items():
111 for key, value in self.getParameters().items():
112 xml_param = SubElement(xml, 'Parameter')
112 xml_param = SubElement(xml, 'Parameter')
113 xml_param.set('name', key)
113 xml_param.set('name', key)
114 xml_param.set('value', value)
114 xml_param.set('value', value)
115
115
116 for conf in self.operations:
116 for conf in self.operations:
117 conf.makeXml(xml)
117 conf.makeXml(xml)
118
118
119 def __str__(self):
119 def __str__(self):
120
120
121 if self.ELEMENTNAME == 'Operation':
121 if self.ELEMENTNAME == 'Operation':
122 s = ' {}[id={}]\n'.format(self.name, self.id)
122 s = ' {}[id={}]\n'.format(self.name, self.id)
123 else:
123 else:
124 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
124 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
125
125
126 for key, value in self.parameters.items():
126 for key, value in self.parameters.items():
127 if self.ELEMENTNAME == 'Operation':
127 if self.ELEMENTNAME == 'Operation':
128 s += ' {}: {}\n'.format(key, value)
128 s += ' {}: {}\n'.format(key, value)
129 else:
129 else:
130 s += ' {}: {}\n'.format(key, value)
130 s += ' {}: {}\n'.format(key, value)
131
131
132 for conf in self.operations:
132 for conf in self.operations:
133 s += str(conf)
133 s += str(conf)
134
134
135 return s
135 return s
136
136
137 class OperationConf(ConfBase):
137 class OperationConf(ConfBase):
138
138
139 ELEMENTNAME = 'Operation'
139 ELEMENTNAME = 'Operation'
140 xml_labels = ['id', 'name']
140 xml_labels = ['id', 'name']
141
141
142 def setup(self, id, name, priority, project_id, err_queue):
142 def setup(self, id, name, priority, project_id, err_queue):
143
143
144 self.id = str(id)
144 self.id = str(id)
145 self.project_id = project_id
145 self.project_id = project_id
146 self.name = name
146 self.name = name
147 self.type = 'other'
147 self.type = 'other'
148 self.err_queue = err_queue
148 self.err_queue = err_queue
149
149
150 def readXml(self, element, project_id, err_queue):
150 def readXml(self, element, project_id, err_queue):
151
151
152 self.id = element.get('id')
152 self.id = element.get('id')
153 self.name = element.get('name')
153 self.name = element.get('name')
154 self.type = 'other'
154 self.type = 'other'
155 self.project_id = str(project_id)
155 self.project_id = str(project_id)
156 self.err_queue = err_queue
156 self.err_queue = err_queue
157
157
158 for elm in element.iter('Parameter'):
158 for elm in element.iter('Parameter'):
159 self.addParameter(elm.get('name'), elm.get('value'))
159 self.addParameter(elm.get('name'), elm.get('value'))
160
160
161 def createObject(self):
161 def createObject(self):
162
162
163 className = eval(self.name)
163 className = eval(self.name)
164
164
165 if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name:
165 if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name:
166 kwargs = self.getKwargs()
166 kwargs = self.getKwargs()
167 opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
167 opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
168 opObj.start()
168 opObj.start()
169 self.type = 'external'
169 self.type = 'external'
170 else:
170 else:
171 opObj = className()
171 opObj = className()
172
172
173 self.object = opObj
173 self.object = opObj
174 return opObj
174 return opObj
175
175
176 class ProcUnitConf(ConfBase):
176 class ProcUnitConf(ConfBase):
177
177
178 ELEMENTNAME = 'ProcUnit'
178 ELEMENTNAME = 'ProcUnit'
179 xml_labels = ['id', 'inputId', 'name']
179 xml_labels = ['id', 'inputId', 'name']
180
180
181 def setup(self, project_id, id, name, datatype, inputId, err_queue):
181 def setup(self, project_id, id, name, datatype, inputId, err_queue):
182 '''
182 '''
183 '''
183 '''
184
184
185 if datatype == None and name == None:
185 if datatype == None and name == None:
186 raise ValueError('datatype or name should be defined')
186 raise ValueError('datatype or name should be defined')
187
187
188 if name == None:
188 if name == None:
189 if 'Proc' in datatype:
189 if 'Proc' in datatype:
190 name = datatype
190 name = datatype
191 else:
191 else:
192 name = '%sProc' % (datatype)
192 name = '%sProc' % (datatype)
193
193
194 if datatype == None:
194 if datatype == None:
195 datatype = name.replace('Proc', '')
195 datatype = name.replace('Proc', '')
196
196
197 self.id = str(id)
197 self.id = str(id)
198 self.project_id = project_id
198 self.project_id = project_id
199 self.name = name
199 self.name = name
200 self.datatype = datatype
200 self.datatype = datatype
201 self.inputId = inputId
201 self.inputId = inputId
202 self.err_queue = err_queue
202 self.err_queue = err_queue
203 self.operations = []
203 self.operations = []
204 self.parameters = {}
204 self.parameters = {}
205
205
206 def removeOperation(self, id):
206 def removeOperation(self, id):
207
207
208 i = [1 if x.id == id else 0 for x in self.operations]
208 i = [1 if x.id == id else 0 for x in self.operations]
209 self.operations.pop(i.index(1))
209 self.operations.pop(i.index(1))
210
210
211 def getOperation(self, id):
211 def getOperation(self, id):
212
212
213 for conf in self.operations:
213 for conf in self.operations:
214 if conf.id == id:
214 if conf.id == id:
215 return conf
215 return conf
216
216
217 def addOperation(self, name, optype='self'):
217 def addOperation(self, name, optype='self'):
218 '''
218 '''
219 '''
219 '''
220
220
221 id = self.getNewId()
221 id = self.getNewId()
222 conf = OperationConf()
222 conf = OperationConf()
223 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
223 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
224 self.operations.append(conf)
224 self.operations.append(conf)
225
225
226 return conf
226 return conf
227
227
228 def readXml(self, element, project_id, err_queue):
228 def readXml(self, element, project_id, err_queue):
229
229
230 self.id = element.get('id')
230 self.id = element.get('id')
231 self.name = element.get('name')
231 self.name = element.get('name')
232 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
232 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
233 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
233 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
234 self.project_id = str(project_id)
234 self.project_id = str(project_id)
235 self.err_queue = err_queue
235 self.err_queue = err_queue
236 self.operations = []
236 self.operations = []
237 self.parameters = {}
237 self.parameters = {}
238
238
239 for elm in element:
239 for elm in element:
240 if elm.tag == 'Parameter':
240 if elm.tag == 'Parameter':
241 self.addParameter(elm.get('name'), elm.get('value'))
241 self.addParameter(elm.get('name'), elm.get('value'))
242 elif elm.tag == 'Operation':
242 elif elm.tag == 'Operation':
243 conf = OperationConf()
243 conf = OperationConf()
244 conf.readXml(elm, project_id, err_queue)
244 conf.readXml(elm, project_id, err_queue)
245 self.operations.append(conf)
245 self.operations.append(conf)
246
246
247 def createObjects(self):
247 def createObjects(self):
248 '''
248 '''
249 Instancia de unidades de procesamiento.
249 Instancia de unidades de procesamiento.
250 '''
250 '''
251
251
252 className = eval(self.name)
252 className = eval(self.name)
253 kwargs = self.getKwargs()
253 kwargs = self.getKwargs()
254 procUnitObj = className()
254 procUnitObj = className()
255 procUnitObj.name = self.name
255 procUnitObj.name = self.name
256 log.success('creating process...', self.name)
256 log.success('creating process...', self.name)
257
257
258 for conf in self.operations:
258 for conf in self.operations:
259
259
260 opObj = conf.createObject()
260 opObj = conf.createObject()
261
261
262 log.success('adding operation: {}, type:{}'.format(
262 log.success('adding operation: {}, type:{}'.format(
263 conf.name,
263 conf.name,
264 conf.type), self.name)
264 conf.type), self.name)
265
265
266 procUnitObj.addOperation(conf, opObj)
266 procUnitObj.addOperation(conf, opObj)
267
267
268 self.object = procUnitObj
268 self.object = procUnitObj
269
269
270 def run(self):
270 def run(self):
271 '''
271 '''
272 '''
272 '''
273 #self.object.call(**self.getKwargs())
273 #self.object.call(**self.getKwargs())
274
274
275 return self.object.call(**self.getKwargs())
275 return self.object.call(**self.getKwargs())
276
276
277
277
278 class ReadUnitConf(ProcUnitConf):
278 class ReadUnitConf(ProcUnitConf):
279
279
280 ELEMENTNAME = 'ReadUnit'
280 ELEMENTNAME = 'ReadUnit'
281
281
282 def __init__(self):
282 def __init__(self):
283
283
284 self.id = None
284 self.id = None
285 self.datatype = None
285 self.datatype = None
286 self.name = None
286 self.name = None
287 self.inputId = None
287 self.inputId = None
288 self.operations = []
288 self.operations = []
289 self.parameters = {}
289 self.parameters = {}
290
290
291 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
291 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
292 startTime='', endTime='', server=None, topic='', **kwargs):
292 startTime='', endTime='', server=None, topic='', **kwargs):
293
293
294 if datatype == None and name == None:
294 if datatype == None and name == None:
295 raise ValueError('datatype or name should be defined')
295 raise ValueError('datatype or name should be defined')
296 if name == None:
296 if name == None:
297 if 'Reader' in datatype:
297 if 'Reader' in datatype:
298 name = datatype
298 name = datatype
299 datatype = name.replace('Reader', '')
299 datatype = name.replace('Reader', '')
300 else:
300 else:
301 name = '{}Reader'.format(datatype)
301 name = '{}Reader'.format(datatype)
302 if datatype == None:
302 if datatype == None:
303 if 'Reader' in name:
303 if 'Reader' in name:
304 datatype = name.replace('Reader', '')
304 datatype = name.replace('Reader', '')
305 else:
305 else:
306 datatype = name
306 datatype = name
307 name = '{}Reader'.format(name)
307 name = '{}Reader'.format(name)
308
308
309 self.id = id
309 self.id = id
310 self.project_id = project_id
310 self.project_id = project_id
311 self.name = name
311 self.name = name
312 self.datatype = datatype
312 self.datatype = datatype
313 self.err_queue = err_queue
313 self.err_queue = err_queue
314
314
315 self.addParameter(name='path', value=path)
315 self.addParameter(name='path', value=path)
316 self.addParameter(name='startDate', value=startDate)
316 self.addParameter(name='startDate', value=startDate)
317 self.addParameter(name='endDate', value=endDate)
317 self.addParameter(name='endDate', value=endDate)
318 self.addParameter(name='startTime', value=startTime)
318 self.addParameter(name='startTime', value=startTime)
319 self.addParameter(name='endTime', value=endTime)
319 self.addParameter(name='endTime', value=endTime)
320 self.addParameter(name='server', value=server)
320 self.addParameter(name='server', value=server)
321 self.addParameter(name='topic', value=topic)
321 self.addParameter(name='topic', value=topic)
322
322
323 for key, value in kwargs.items():
323 for key, value in kwargs.items():
324 self.addParameter(name=key, value=value)
324 self.addParameter(name=key, value=value)
325
325
326
326
327 class Project(Process):
327 class Project(Process):
328 """API to create signal chain projects"""
328 """API to create signal chain projects"""
329
329
330 ELEMENTNAME = 'Project'
330 ELEMENTNAME = 'Project'
331
331
332 def __init__(self, name=''):
332 def __init__(self, name=''):
333
333
334 Process.__init__(self)
334 Process.__init__(self)
335 self.id = '1'
335 self.id = '1'
336 if name:
336 if name:
337 self.name = '{} ({})'.format(Process.__name__, name)
337 self.name = '{} ({})'.format(Process.__name__, name)
338 self.filename = None
338 self.filename = None
339 self.description = None
339 self.description = None
340 self.email = None
340 self.email = None
341 self.alarm = []
341 self.alarm = []
342 self.configurations = {}
342 self.configurations = {}
343 # self.err_queue = Queue()
343 # self.err_queue = Queue()
344 self.err_queue = None
344 self.err_queue = None
345 self.started = False
345 self.started = False
346
346
347 def getNewId(self):
347 def getNewId(self):
348
348
349 idList = list(self.configurations.keys())
349 idList = list(self.configurations.keys())
350 id = int(self.id) * 10
350 id = int(self.id) * 10
351
351
352 while True:
352 while True:
353 id += 1
353 id += 1
354
354
355 if str(id) in idList:
355 if str(id) in idList:
356 continue
356 continue
357
357
358 break
358 break
359
359
360 return str(id)
360 return str(id)
361
361
362 def updateId(self, new_id):
362 def updateId(self, new_id):
363
363
364 self.id = str(new_id)
364 self.id = str(new_id)
365
365
366 keyList = list(self.configurations.keys())
366 keyList = list(self.configurations.keys())
367 keyList.sort()
367 keyList.sort()
368
368
369 n = 1
369 n = 1
370 new_confs = {}
370 new_confs = {}
371
371
372 for procKey in keyList:
372 for procKey in keyList:
373
373
374 conf = self.configurations[procKey]
374 conf = self.configurations[procKey]
375 idProcUnit = str(int(self.id) * 10 + n)
375 idProcUnit = str(int(self.id) * 10 + n)
376 conf.updateId(idProcUnit)
376 conf.updateId(idProcUnit)
377 new_confs[idProcUnit] = conf
377 new_confs[idProcUnit] = conf
378 n += 1
378 n += 1
379
379
380 self.configurations = new_confs
380 self.configurations = new_confs
381
381
382 def setup(self, id=1, name='', description='', email=None, alarm=[]):
382 def setup(self, id=1, name='', description='', email=None, alarm=[]):
383
383
384 self.id = str(id)
384 self.id = str(id)
385 self.description = description
385 self.description = description
386 self.email = email
386 self.email = email
387 self.alarm = alarm
387 self.alarm = alarm
388 if name:
388 if name:
389 self.name = '{} ({})'.format(Process.__name__, name)
389 self.name = '{} ({})'.format(Process.__name__, name)
390
390
391 def update(self, **kwargs):
391 def update(self, **kwargs):
392
392
393 for key, value in kwargs.items():
393 for key, value in kwargs.items():
394 setattr(self, key, value)
394 setattr(self, key, value)
395
395
396 def clone(self):
396 def clone(self):
397
397
398 p = Project()
398 p = Project()
399 p.id = self.id
399 p.id = self.id
400 p.name = self.name
400 p.name = self.name
401 p.description = self.description
401 p.description = self.description
402 p.configurations = self.configurations.copy()
402 p.configurations = self.configurations.copy()
403
403
404 return p
404 return p
405
405
406 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
406 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
407
407
408 '''
408 '''
409 '''
409 '''
410
410
411 if id is None:
411 if id is None:
412 idReadUnit = self.getNewId()
412 idReadUnit = self.getNewId()
413 else:
413 else:
414 idReadUnit = str(id)
414 idReadUnit = str(id)
415
415
416 conf = ReadUnitConf()
416 conf = ReadUnitConf()
417 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
417 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
418 self.configurations[conf.id] = conf
418 self.configurations[conf.id] = conf
419
419
420 return conf
420 return conf
421
421
422 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
422 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
423
423
424 '''
424 '''
425 '''
425 '''
426
426
427 if id is None:
427 if id is None:
428 idProcUnit = self.getNewId()
428 idProcUnit = self.getNewId()
429 else:
429 else:
430 idProcUnit = id
430 idProcUnit = id
431
431
432 conf = ProcUnitConf()
432 conf = ProcUnitConf()
433 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
433 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
434 self.configurations[conf.id] = conf
434 self.configurations[conf.id] = conf
435
435
436 return conf
436 return conf
437
437
438 def removeProcUnit(self, id):
438 def removeProcUnit(self, id):
439
439
440 if id in self.configurations:
440 if id in self.configurations:
441 self.configurations.pop(id)
441 self.configurations.pop(id)
442
442
443 def getReadUnit(self):
443 def getReadUnit(self):
444
444
445 for obj in list(self.configurations.values()):
445 for obj in list(self.configurations.values()):
446 if obj.ELEMENTNAME == 'ReadUnit':
446 if obj.ELEMENTNAME == 'ReadUnit':
447 return obj
447 return obj
448
448
449 return None
449 return None
450
450
451 def getProcUnit(self, id):
451 def getProcUnit(self, id):
452
452
453 return self.configurations[id]
453 return self.configurations[id]
454
454
455 def getUnits(self):
455 def getUnits(self):
456
456
457 keys = list(self.configurations)
457 keys = list(self.configurations)
458 keys.sort()
458 keys.sort()
459
459
460 for key in keys:
460 for key in keys:
461 yield self.configurations[key]
461 yield self.configurations[key]
462
462
463 def updateUnit(self, id, **kwargs):
463 def updateUnit(self, id, **kwargs):
464
464
465 conf = self.configurations[id].update(**kwargs)
465 conf = self.configurations[id].update(**kwargs)
466
466
467 def makeXml(self):
467 def makeXml(self):
468
468
469 xml = Element('Project')
469 xml = Element('Project')
470 xml.set('id', str(self.id))
470 xml.set('id', str(self.id))
471 xml.set('name', self.name)
471 xml.set('name', self.name)
472 xml.set('description', self.description)
472 xml.set('description', self.description)
473
473
474 for conf in self.configurations.values():
474 for conf in self.configurations.values():
475 conf.makeXml(xml)
475 conf.makeXml(xml)
476
476
477 self.xml = xml
477 self.xml = xml
478
478
479 def writeXml(self, filename=None):
479 def writeXml(self, filename=None):
480
480
481 if filename == None:
481 if filename == None:
482 if self.filename:
482 if self.filename:
483 filename = self.filename
483 filename = self.filename
484 else:
484 else:
485 filename = 'schain.xml'
485 filename = 'schain.xml'
486
486
487 if not filename:
487 if not filename:
488 print('filename has not been defined. Use setFilename(filename) for do it.')
488 print('filename has not been defined. Use setFilename(filename) for do it.')
489 return 0
489 return 0
490
490
491 abs_file = os.path.abspath(filename)
491 abs_file = os.path.abspath(filename)
492
492
493 if not os.access(os.path.dirname(abs_file), os.W_OK):
493 if not os.access(os.path.dirname(abs_file), os.W_OK):
494 print('No write permission on %s' % os.path.dirname(abs_file))
494 print('No write permission on %s' % os.path.dirname(abs_file))
495 return 0
495 return 0
496
496
497 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
497 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
498 print('File %s already exists and it could not be overwriten' % abs_file)
498 print('File %s already exists and it could not be overwriten' % abs_file)
499 return 0
499 return 0
500
500
501 self.makeXml()
501 self.makeXml()
502
502
503 ElementTree(self.xml).write(abs_file, method='xml')
503 ElementTree(self.xml).write(abs_file, method='xml')
504
504
505 self.filename = abs_file
505 self.filename = abs_file
506
506
507 return 1
507 return 1
508
508
509 def readXml(self, filename):
509 def readXml(self, filename):
510
510
511 abs_file = os.path.abspath(filename)
511 abs_file = os.path.abspath(filename)
512
512
513 self.configurations = {}
513 self.configurations = {}
514
514
515 try:
515 try:
516 self.xml = ElementTree().parse(abs_file)
516 self.xml = ElementTree().parse(abs_file)
517 except:
517 except:
518 log.error('Error reading %s, verify file format' % filename)
518 log.error('Error reading %s, verify file format' % filename)
519 return 0
519 return 0
520
520
521 self.id = self.xml.get('id')
521 self.id = self.xml.get('id')
522 self.name = self.xml.get('name')
522 self.name = self.xml.get('name')
523 self.description = self.xml.get('description')
523 self.description = self.xml.get('description')
524
524
525 for element in self.xml:
525 for element in self.xml:
526 if element.tag == 'ReadUnit':
526 if element.tag == 'ReadUnit':
527 conf = ReadUnitConf()
527 conf = ReadUnitConf()
528 conf.readXml(element, self.id, self.err_queue)
528 conf.readXml(element, self.id, self.err_queue)
529 self.configurations[conf.id] = conf
529 self.configurations[conf.id] = conf
530 elif element.tag == 'ProcUnit':
530 elif element.tag == 'ProcUnit':
531 conf = ProcUnitConf()
531 conf = ProcUnitConf()
532 input_proc = self.configurations[element.get('inputId')]
532 input_proc = self.configurations[element.get('inputId')]
533 conf.readXml(element, self.id, self.err_queue)
533 conf.readXml(element, self.id, self.err_queue)
534 self.configurations[conf.id] = conf
534 self.configurations[conf.id] = conf
535
535
536 self.filename = abs_file
536 self.filename = abs_file
537
537
538 return 1
538 return 1
539
539
540 def __str__(self):
540 def __str__(self):
541
541
542 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
542 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
543 self.id,
543 self.id,
544 self.name,
544 self.name,
545 self.description,
545 self.description,
546 )
546 )
547
547
548 for conf in self.configurations.values():
548 for conf in self.configurations.values():
549 text += '{}'.format(conf)
549 text += '{}'.format(conf)
550
550
551 return text
551 return text
552
552
553 def createObjects(self):
553 def createObjects(self):
554
554
555 keys = list(self.configurations.keys())
555 keys = list(self.configurations.keys())
556 keys.sort()
556 keys.sort()
557 for key in keys:
557 for key in keys:
558 conf = self.configurations[key]
558 conf = self.configurations[key]
559 conf.createObjects()
559 conf.createObjects()
560 if 'Reader' in str(conf):
560 if 'Reader' in str(conf):
561 reader = conf.object
561 reader = conf.object
562 else:
562 else:
563 conf.object.reader = reader
563 conf.object.reader = reader
564 if conf.inputId is not None:
564 if conf.inputId is not None:
565 if isinstance(conf.inputId, list):
565 if isinstance(conf.inputId, list):
566 conf.object.setInput([self.configurations[x].object for x in conf.inputId])
566 conf.object.setInput([self.configurations[x].object for x in conf.inputId])
567 else:
567 else:
568 conf.object.setInput([self.configurations[conf.inputId].object])
568 conf.object.setInput([self.configurations[conf.inputId].object])
569
569
570 def monitor(self):
570 def monitor(self):
571
571
572 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
572 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
573 t.start()
573 t.start()
574
574
575 def _monitor(self, queue, ctx):
575 def _monitor(self, queue, ctx):
576
576
577 import socket
577 import socket
578
578
579 procs = 0
579 procs = 0
580 err_msg = ''
580 err_msg = ''
581
581
582 while True:
582 while True:
583 msg = queue.get()
583 msg = queue.get()
584 if '#_start_#' in msg:
584 if '#_start_#' in msg:
585 procs += 1
585 procs += 1
586 elif '#_end_#' in msg:
586 elif '#_end_#' in msg:
587 procs -= 1
587 procs -= 1
588 else:
588 else:
589 err_msg = msg
589 err_msg = msg
590
590
591 if procs == 0 or 'Traceback' in err_msg:
591 if procs == 0 or 'Traceback' in err_msg:
592 break
592 break
593 time.sleep(0.1)
593 time.sleep(0.1)
594
594
595 if '|' in err_msg:
595 if '|' in err_msg:
596 name, err = err_msg.split('|')
596 name, err = err_msg.split('|')
597 if 'SchainWarning' in err:
597 if 'SchainWarning' in err:
598 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
598 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
599 elif 'SchainError' in err:
599 elif 'SchainError' in err:
600 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
600 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
601 else:
601 else:
602 log.error(err, name)
602 log.error(err, name)
603 else:
603 else:
604 name, err = self.name, err_msg
604 name, err = self.name, err_msg
605
605
606 time.sleep(1)
606 time.sleep(1)
607
607
608 ctx.term()
608 ctx.term()
609
609
610 message = ''.join(err)
610 message = ''.join(err)
611
611
612 if err_msg:
612 if err_msg:
613 subject = 'SChain v%s: Error running %s\n' % (
613 subject = 'SChain v%s: Error running %s\n' % (
614 schainpy.__version__, self.name)
614 schainpy.__version__, self.name)
615
615
616 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
616 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
617 socket.gethostname())
617 socket.gethostname())
618 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
618 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
619 subtitle += 'Configuration file: %s\n' % self.filename
619 subtitle += 'Configuration file: %s\n' % self.filename
620 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
620 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
621
621
622 readUnitConfObj = self.getReadUnit()
622 readUnitConfObj = self.getReadUnit()
623 if readUnitConfObj:
623 if readUnitConfObj:
624 subtitle += '\nInput parameters:\n'
624 subtitle += '\nInput parameters:\n'
625 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
625 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
626 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
626 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
627 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
627 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
628 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
628 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
629 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
629 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
630
630
631 a = Alarm(
631 a = Alarm(
632 modes=self.alarm,
632 modes=self.alarm,
633 email=self.email,
633 email=self.email,
634 message=message,
634 message=message,
635 subject=subject,
635 subject=subject,
636 subtitle=subtitle,
636 subtitle=subtitle,
637 filename=self.filename
637 filename=self.filename
638 )
638 )
639
639
640 a.start()
640 a.start()
641
641
642 def setFilename(self, filename):
642 def setFilename(self, filename):
643
643
644 self.filename = filename
644 self.filename = filename
645
645
646 def runProcs(self):
646 def runProcs(self):
647
647
648 err = False
648 err = False
649 n = len(self.configurations)
649 n = len(self.configurations)
650 #print(n)
650 flag_no_read = False
651
651 nProc_noRead = 0
652
653 #while not err:
654 # for conf in self.getUnits():
655 # ok = conf.run()
656 # if ok == 'Error':
657 # n -= 1
658 # continue
659 # elif not ok:
660 # break
661 # if n == 0:
662 # err = True
663
652 while not err:
664 while not err:
653 #print(self.getUnits())
665 n_proc = 0
654 for conf in self.getUnits():
666 for conf in self.getUnits():
655 #print(conf)
667 if flag_no_read:
656 ok = conf.run()
668 if n_proc >= nProc_noRead:
657 #print("ok", ok)
669 ok = conf.run()
670 else:
671 n_proc += 1
672 continue
673 else:
674 ok = conf.run()
675
676 n_proc += 1
677
658 if ok == 'Error':
678 if ok == 'Error':
659 n -= 1
679 n -= 1
660 continue
680 continue
681
682 elif ok == 'no_Read' and (not flag_no_read):
683 nProc_noRead = n_proc - 1
684 flag_no_read = True
685 continue
686 elif ok == 'new_Read':
687 nProc_noRead = 0
688 flag_no_read = False
689 continue
661 elif not ok:
690 elif not ok:
662 break
691 break
663 #print("****************************************************end")
692
664 #exit(1)
665 if n == 0:
693 if n == 0:
666 err = True
694 err = True
667
695
668 def run(self):
696 def run(self):
669
697
670 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
698 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
671 self.started = True
699 self.started = True
672 self.start_time = time.time()
700 self.start_time = time.time()
673 self.createObjects()
701 self.createObjects()
674 self.runProcs()
702 self.runProcs()
675 log.success('{} Done (Time: {:4.2f}s)'.format(
703 log.success('{} Done (Time: {:4.2f}s)'.format(
676 self.name,
704 self.name,
677 time.time() - self.start_time), '')
705 time.time() - self.start_time), '')
@@ -1,252 +1,246
1 '''
1 '''
2 Base clases to create Processing units and operations, the MPDecorator
2 Base clases to create Processing units and operations, the MPDecorator
3 must be used in plotting and writing operations to allow to run as an
3 must be used in plotting and writing operations to allow to run as an
4 external process.
4 external process.
5 '''
5 '''
6
6
7 import os
7 import os
8 import inspect
8 import inspect
9 import zmq
9 import zmq
10 import time
10 import time
11 import pickle
11 import pickle
12 import traceback
12 import traceback
13 from threading import Thread
13 from threading import Thread
14 from multiprocessing import Process, Queue
14 from multiprocessing import Process, Queue
15 from schainpy.utils import log
15 from schainpy.utils import log
16
16
17 QUEUE_SIZE = int(os.environ.get('QUEUE_MAX_SIZE', '100'))
17 QUEUE_SIZE = int(os.environ.get('QUEUE_MAX_SIZE', '100'))
18
18
19 class ProcessingUnit(object):
19 class ProcessingUnit(object):
20 '''
20 '''
21 Base class to create Signal Chain Units
21 Base class to create Signal Chain Units
22 '''
22 '''
23
23
24 proc_type = 'processing'
24 proc_type = 'processing'
25 bypass = False
25 bypass = False
26
26
27 def __init__(self):
27 def __init__(self):
28
28
29 self.dataIn = None
29 self.dataIn = None
30 self.dataOut = None
30 self.dataOut = None
31 self.isConfig = False
31 self.isConfig = False
32 self.operations = []
32 self.operations = []
33 self.name = 'Test'
33 self.name = 'Test'
34 self.inputs = []
34 self.inputs = []
35
35
36 def setInput(self, unit):
36 def setInput(self, unit):
37
37
38 attr = 'dataIn'
38 attr = 'dataIn'
39 for i, u in enumerate(unit):
39 for i, u in enumerate(unit):
40 if i==0:
40 if i==0:
41 #print(u.dataOut.flagNoData)
41 #print(u.dataOut.flagNoData)
42 #exit(1)
42 #exit(1)
43 self.dataIn = u.dataOut#.copy()
43 self.dataIn = u.dataOut#.copy()
44 self.inputs.append('dataIn')
44 self.inputs.append('dataIn')
45 else:
45 else:
46 setattr(self, 'dataIn{}'.format(i), u.dataOut)#.copy())
46 setattr(self, 'dataIn{}'.format(i), u.dataOut)#.copy())
47 self.inputs.append('dataIn{}'.format(i))
47 self.inputs.append('dataIn{}'.format(i))
48
48
49
49
50 def getAllowedArgs(self):
50 def getAllowedArgs(self):
51 if hasattr(self, '__attrs__'):
51 if hasattr(self, '__attrs__'):
52 return self.__attrs__
52 return self.__attrs__
53 else:
53 else:
54 return inspect.getargspec(self.run).args
54 return inspect.getargspec(self.run).args
55
55
56 def addOperation(self, conf, operation):
56 def addOperation(self, conf, operation):
57 '''
57 '''
58 '''
58 '''
59
59
60 self.operations.append((operation, conf.type, conf.getKwargs()))
60 self.operations.append((operation, conf.type, conf.getKwargs()))
61
61
62 def getOperationObj(self, objId):
62 def getOperationObj(self, objId):
63
63
64 if objId not in list(self.operations.keys()):
64 if objId not in list(self.operations.keys()):
65 return None
65 return None
66
66
67 return self.operations[objId]
67 return self.operations[objId]
68
68
69 def call(self, **kwargs):
69 def call(self, **kwargs):
70 '''
71 '''
72
70
71 mybool = (self.dataOut.type == 'Voltage') and self.dataOut.useInputBuffer and (not self.dataOut.buffer_empty) #liberar desde buffer
72
73 try:
73 try:
74 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
74 if mybool:
75 #if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error and not self.dataIn.runNextUnit:
75 #print("run jeje")
76 if self.dataIn.runNextUnit:
77 #print("SUCCESSSSSSS")
78 #exit(1)
79 return not self.dataIn.isReady()
80 else:
81 return self.dataIn.isReady()
82 elif self.dataIn is None or not self.dataIn.error:
83 if 'Reader' in self.name and self.bypass:
84 print('Skipping...reader')
85 return self.dataOut.isReady()
86 self.run(**kwargs)
76 self.run(**kwargs)
87 elif self.dataIn.error:
77 else:
88 #print("Elif 2")
78 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
89 self.dataOut.error = self.dataIn.error
79 return self.dataIn.isReady()
90 self.dataOut.flagNoData = True
80 elif self.dataIn is None or not self.dataIn.error: #unidad de lectura o procesamiento regular
81 self.run(**kwargs)
82 elif self.dataIn.error:
83 self.dataOut.error = self.dataIn.error
84 self.dataOut.flagNoData = True
85 print("exec proc error")
86
91 except:
87 except:
92 #print("Except")
88
93 err = traceback.format_exc()
89 err = traceback.format_exc()
94 if 'SchainWarning' in err:
90 if 'SchainWarning' in err:
95 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name)
91 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name)
96 elif 'SchainError' in err:
92 elif 'SchainError' in err:
97 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), self.name)
93 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), self.name)
98 else:
94 else:
99 log.error(err, self.name)
95 log.error(err, self.name)
100 self.dataOut.error = True
96 self.dataOut.error = True
101 #print("before op")
97
98
102 for op, optype, opkwargs in self.operations:
99 for op, optype, opkwargs in self.operations:
103 aux = self.dataOut.copy()
100
104 #aux = copy.deepcopy(self.dataOut)
101 if (optype == 'other' and self.dataOut.isReady()) or mybool:
105 #print("**********************Before",op)
102 try:
106 if optype == 'other' and not self.dataOut.flagNoData:
103 self.dataOut = op.run(self.dataOut, **opkwargs)
107 #print("**********************Other",op)
104 except Exception as e:
108 #print(self.dataOut.flagNoData)
105 print(e)
109 self.dataOut = op.run(self.dataOut, **opkwargs)
106 self.dataOut.error = True
110 elif optype == 'external' and not self.dataOut.flagNoData:
107 return 'Error'
111 op.queue.put(aux)
108 elif optype == 'external' and self.dataOut.isReady() :
109 op.queue.put(copy.deepcopy(self.dataOut))
112 elif optype == 'external' and self.dataOut.error:
110 elif optype == 'external' and self.dataOut.error:
113 op.queue.put(aux)
111 op.queue.put(copy.deepcopy(self.dataOut))
114 #elif optype == 'external' and self.dataOut.isReady():
115 #op.queue.put(copy.deepcopy(self.dataOut))
116 #print(not self.dataOut.isReady())
117
112
118 try:
119 if self.dataOut.runNextUnit:
120 runNextUnit = self.dataOut.runNextUnit
121 #print(self.operations)
122 #print("Tru")
123
113
114 if not self.dataOut.error:
115 if self.dataOut.type == 'Voltage':
116 if not self.dataOut.buffer_empty : #continue
117 return 'no_Read'
118 elif self.dataOut.useInputBuffer and (self.dataOut.buffer_empty) and self.dataOut.isReady() :
119 return 'new_Read'
120 else:
121 return True
124 else:
122 else:
125 runNextUnit = self.dataOut.isReady()
123 #print("ret True")
126 except:
124 return True
127 runNextUnit = self.dataOut.isReady()
125 else:
128 #exit(1)
126 return 'Error'
129 #if not self.dataOut.isReady():
127 #return 'Error' if self.dataOut.error else True #self.dataOut.isReady()
130 #return 'Error' if self.dataOut.error else input()
131 #print("NexT",runNextUnit)
132 #print("error: ",self.dataOut.error)
133 return 'Error' if self.dataOut.error else runNextUnit# self.dataOut.isReady()
134
128
135 def setup(self):
129 def setup(self):
136
130
137 raise NotImplementedError
131 raise NotImplementedError
138
132
139 def run(self):
133 def run(self):
140
134
141 raise NotImplementedError
135 raise NotImplementedError
142
136
143 def close(self):
137 def close(self):
144
138
145 return
139 return
146
140
147
141
148 class Operation(object):
142 class Operation(object):
149
143
150 '''
144 '''
151 '''
145 '''
152
146
153 proc_type = 'operation'
147 proc_type = 'operation'
154
148
155 def __init__(self):
149 def __init__(self):
156
150
157 self.id = None
151 self.id = None
158 self.isConfig = False
152 self.isConfig = False
159
153
160 if not hasattr(self, 'name'):
154 if not hasattr(self, 'name'):
161 self.name = self.__class__.__name__
155 self.name = self.__class__.__name__
162
156
163 def getAllowedArgs(self):
157 def getAllowedArgs(self):
164 if hasattr(self, '__attrs__'):
158 if hasattr(self, '__attrs__'):
165 return self.__attrs__
159 return self.__attrs__
166 else:
160 else:
167 return inspect.getargspec(self.run).args
161 return inspect.getargspec(self.run).args
168
162
169 def setup(self):
163 def setup(self):
170
164
171 self.isConfig = True
165 self.isConfig = True
172
166
173 raise NotImplementedError
167 raise NotImplementedError
174
168
175 def run(self, dataIn, **kwargs):
169 def run(self, dataIn, **kwargs):
176 """
170 """
177 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
171 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
178 atributos del objeto dataIn.
172 atributos del objeto dataIn.
179
173
180 Input:
174 Input:
181
175
182 dataIn : objeto del tipo JROData
176 dataIn : objeto del tipo JROData
183
177
184 Return:
178 Return:
185
179
186 None
180 None
187
181
188 Affected:
182 Affected:
189 __buffer : buffer de recepcion de datos.
183 __buffer : buffer de recepcion de datos.
190
184
191 """
185 """
192 if not self.isConfig:
186 if not self.isConfig:
193 self.setup(**kwargs)
187 self.setup(**kwargs)
194
188
195 raise NotImplementedError
189 raise NotImplementedError
196
190
197 def close(self):
191 def close(self):
198
192
199 return
193 return
200
194
201
195
202 def MPDecorator(BaseClass):
196 def MPDecorator(BaseClass):
203 """
197 """
204 Multiprocessing class decorator
198 Multiprocessing class decorator
205
199
206 This function add multiprocessing features to a BaseClass.
200 This function add multiprocessing features to a BaseClass.
207 """
201 """
208
202
209 class MPClass(BaseClass, Process):
203 class MPClass(BaseClass, Process):
210
204
211 def __init__(self, *args, **kwargs):
205 def __init__(self, *args, **kwargs):
212 super(MPClass, self).__init__()
206 super(MPClass, self).__init__()
213 Process.__init__(self)
207 Process.__init__(self)
214
208
215 self.args = args
209 self.args = args
216 self.kwargs = kwargs
210 self.kwargs = kwargs
217 self.t = time.time()
211 self.t = time.time()
218 self.op_type = 'external'
212 self.op_type = 'external'
219 self.name = BaseClass.__name__
213 self.name = BaseClass.__name__
220 self.__doc__ = BaseClass.__doc__
214 self.__doc__ = BaseClass.__doc__
221
215
222 if 'plot' in self.name.lower() and not self.name.endswith('_'):
216 if 'plot' in self.name.lower() and not self.name.endswith('_'):
223 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
217 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
224
218
225 self.start_time = time.time()
219 self.start_time = time.time()
226 self.err_queue = args[3]
220 self.err_queue = args[3]
227 self.queue = Queue(maxsize=QUEUE_SIZE)
221 self.queue = Queue(maxsize=QUEUE_SIZE)
228 self.myrun = BaseClass.run
222 self.myrun = BaseClass.run
229
223
230 def run(self):
224 def run(self):
231
225
232 while True:
226 while True:
233
227
234 dataOut = self.queue.get()
228 dataOut = self.queue.get()
235
229
236 if not dataOut.error:
230 if not dataOut.error:
237 try:
231 try:
238 BaseClass.run(self, dataOut, **self.kwargs)
232 BaseClass.run(self, dataOut, **self.kwargs)
239 except:
233 except:
240 err = traceback.format_exc()
234 err = traceback.format_exc()
241 log.error(err, self.name)
235 log.error(err, self.name)
242 else:
236 else:
243 break
237 break
244
238
245 self.close()
239 self.close()
246
240
247 def close(self):
241 def close(self):
248
242
249 BaseClass.close(self)
243 BaseClass.close(self)
250 log.success('Done...(Time:{:4.2f} secs)'.format(time.time() - self.start_time), self.name)
244 log.success('Done...(Time:{:4.2f} secs)'.format(time.time() - self.start_time), self.name)
251
245
252 return MPClass
246 return MPClass
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
General Comments 0
You need to be logged in to leave comments. Login now