##// END OF EJS Templates
Beta version!
Juan C. Espinoza -
r1330:959603ec6724
parent child
Show More
1 NO CONTENT: file renamed from LICENSE to LICENSE.txt
@@ -1,238 +1,238
1 1 import click
2 2 import subprocess
3 3 import os
4 4 import sys
5 5 import glob
6 6 import schainpy
7 7 from schainpy.controller import Project
8 8 from schainpy.model import Operation, ProcessingUnit
9 9 from schainpy.utils import log
10 10 from importlib import import_module
11 11 from pydoc import locate
12 12 from fuzzywuzzy import process
13 13 from schainpy.cli import templates
14 14 import inspect
15 15 try:
16 16 from queue import Queue
17 17 except:
18 18 from Queue import Queue
19 19
20 20
21 21 def getProcs():
22 22 modules = dir(schainpy.model)
23 23 procs = check_module(modules, 'processing')
24 24 try:
25 25 procs.remove('ProcessingUnit')
26 26 except Exception as e:
27 27 pass
28 28 return procs
29 29
30 30 def getOperations():
31 31 module = dir(schainpy.model)
32 32 noProcs = [x for x in module if not x.endswith('Proc')]
33 33 operations = check_module(noProcs, 'operation')
34 34 try:
35 35 operations.remove('Operation')
36 36 operations.remove('Figure')
37 37 operations.remove('Plot')
38 38 except Exception as e:
39 39 pass
40 40 return operations
41 41
42 42 def getArgs(op):
43 43 module = locate('schainpy.model.{}'.format(op))
44 44 try:
45 45 obj = module(1, 2, 3, Queue())
46 46 except:
47 47 obj = module()
48 48
49 49 if hasattr(obj, '__attrs__'):
50 50 args = obj.__attrs__
51 51 else:
52 52 if hasattr(obj, 'myrun'):
53 53 args = inspect.getfullargspec(obj.myrun).args
54 54 else:
55 55 args = inspect.getfullargspec(obj.run).args
56 56
57 57 try:
58 58 args.remove('self')
59 59 except Exception as e:
60 60 pass
61 61 try:
62 62 args.remove('dataOut')
63 63 except Exception as e:
64 64 pass
65 65 return args
66 66
67 67 def getDoc(obj):
68 68 module = locate('schainpy.model.{}'.format(obj))
69 69 try:
70 70 obj = module(1, 2, 3, Queue())
71 71 except:
72 72 obj = module()
73 73 return obj.__doc__
74 74
75 75 def getAll():
76 76 modules = getOperations()
77 77 modules.extend(getProcs())
78 78 return modules
79 79
80 80
81 81 def print_version(ctx, param, value):
82 82 if not value or ctx.resilient_parsing:
83 83 return
84 84 click.echo(schainpy.__version__)
85 85 ctx.exit()
86 86
87 87
88 88 PREFIX = 'experiment'
89 89
90 90 @click.command()
91 91 @click.option('--version', '-v', is_flag=True, callback=print_version, help='SChain version', type=str)
92 92 @click.argument('command', default='run', required=True)
93 93 @click.argument('nextcommand', default=None, required=False, type=str)
94 94 def main(command, nextcommand, version):
95 95 """COMMAND LINE INTERFACE FOR SIGNAL CHAIN - JICAMARCA RADIO OBSERVATORY V3.0\n
96 96 Available commands:\n
97 97 xml: runs a schain XML generated file\n
98 98 run: runs any python script'\n
99 99 generate: generates a template schain script\n
100 100 list: return a list of available procs and operations\n
101 101 search: return avilable operations, procs or arguments of the given
102 102 operation/proc\n"""
103 103 if command == 'xml':
104 104 runFromXML(nextcommand)
105 105 elif command == 'generate':
106 106 generate()
107 107 elif command == 'test':
108 108 test()
109 109 elif command == 'run':
110 110 runschain(nextcommand)
111 111 elif command == 'search':
112 112 search(nextcommand)
113 113 elif command == 'list':
114 114 cmdlist(nextcommand)
115 115 else:
116 116 log.error('Command {} is not defined'.format(command))
117 117
118 118
119 119 def check_module(possible, instance):
120 120 def check(x):
121 121 try:
122 122 instancia = locate('schainpy.model.{}'.format(x))
123 123 ret = instancia.proc_type == instance
124 124 return ret
125 125 except Exception as e:
126 126 return False
127 127 clean = clean_modules(possible)
128 128 return [x for x in clean if check(x)]
129 129
130 130
131 131 def clean_modules(module):
132 132 noEndsUnder = [x for x in module if not x.endswith('__')]
133 133 noStartUnder = [x for x in noEndsUnder if not x.startswith('__')]
134 134 noFullUpper = [x for x in noStartUnder if not x.isupper()]
135 135 return noFullUpper
136 136
137 137 def cmdlist(nextcommand):
138 138 if nextcommand is None:
139 139 log.error('Missing argument, available arguments: procs, operations', '')
140 140 elif nextcommand == 'procs':
141 141 procs = getProcs()
142 142 log.success(
143 143 'Current ProcessingUnits are:\n {}'.format('\n '.join(procs)), '')
144 144 elif nextcommand == 'operations':
145 145 operations = getOperations()
146 146 log.success('Current Operations are:\n {}'.format(
147 147 '\n '.join(operations)), '')
148 148 else:
149 149 log.error('Wrong argument', '')
150 150
151 151 def search(nextcommand):
152 152 if nextcommand is None:
153 153 log.error('There is no Operation/ProcessingUnit to search', '')
154 154 else:
155 155 try:
156 156 args = getArgs(nextcommand)
157 157 doc = getDoc(nextcommand)
158 log.success('{}\n{}\n\narguments:\n {}'.format(
158 log.success('{}\n{}\n\nparameters:\n {}'.format(
159 159 nextcommand, doc, ', '.join(args)), ''
160 160 )
161 161 except Exception as e:
162 162 log.error('Module `{}` does not exists'.format(nextcommand), '')
163 163 allModules = getAll()
164 164 similar = [t[0] for t in process.extract(nextcommand, allModules, limit=12) if t[1]>80]
165 165 log.success('Possible modules are: {}'.format(', '.join(similar)), '')
166 166
167 167 def runschain(nextcommand):
168 168 if nextcommand is None:
169 169 currentfiles = glob.glob('./{}_*.py'.format(PREFIX))
170 170 numberfiles = len(currentfiles)
171 171 if numberfiles > 1:
172 172 log.error('There is more than one file to run')
173 173 elif numberfiles == 1:
174 174 subprocess.call(['python ' + currentfiles[0]], shell=True)
175 175 else:
176 176 log.error('There is no file to run')
177 177 else:
178 178 try:
179 179 subprocess.call(['python ' + nextcommand], shell=True)
180 180 except Exception as e:
181 181 log.error("I cannot run the file. Does it exists?")
182 182
183 183
184 184 def basicInputs():
185 185 inputs = {}
186 186 inputs['name'] = click.prompt(
187 187 'Name of the project', default="project", type=str)
188 188 inputs['desc'] = click.prompt(
189 189 'Enter a description', default="A schain project", type=str)
190 190 inputs['multiprocess'] = click.prompt(
191 191 '''Select data type:
192 192
193 193 - Voltage (*.r): [1]
194 194 - Spectra (*.pdata): [2]
195 195 - Voltage and Spectra (*.r): [3]
196 196
197 197 -->''', type=int)
198 198 inputs['path'] = click.prompt('Data path', default=os.getcwd(
199 199 ), type=click.Path(exists=True, resolve_path=True))
200 200 inputs['startDate'] = click.prompt(
201 201 'Start date', default='1970/01/01', type=str)
202 202 inputs['endDate'] = click.prompt(
203 203 'End date', default='2018/12/31', type=str)
204 204 inputs['startHour'] = click.prompt(
205 205 'Start hour', default='00:00:00', type=str)
206 206 inputs['endHour'] = click.prompt('End hour', default='23:59:59', type=str)
207 207 inputs['figpath'] = inputs['path'] + '/figs'
208 208 return inputs
209 209
210 210
211 211 def generate():
212 212 inputs = basicInputs()
213 213
214 214 if inputs['multiprocess'] == 1:
215 215 current = templates.voltage.format(**inputs)
216 216 elif inputs['multiprocess'] == 2:
217 217 current = templates.spectra.format(**inputs)
218 218 elif inputs['multiprocess'] == 3:
219 219 current = templates.voltagespectra.format(**inputs)
220 220 scriptname = '{}_{}.py'.format(PREFIX, inputs['name'])
221 221 script = open(scriptname, 'w')
222 222 try:
223 223 script.write(current)
224 224 log.success('Script {} generated'.format(scriptname))
225 225 except Exception as e:
226 226 log.error('I cannot create the file. Do you have writing permissions?')
227 227
228 228
229 229 def test():
230 230 log.warning('testing')
231 231
232 232
233 233 def runFromXML(filename):
234 234 controller = Project()
235 235 if not controller.readXml(filename):
236 236 return
237 237 controller.start()
238 238 return
@@ -1,656 +1,656
1 1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
2 2 # All rights reserved.
3 3 #
4 4 # Distributed under the terms of the BSD 3-clause license.
5 5 """API to create signal chain projects
6 6
7 7 The API is provide through class: Project
8 8 """
9 9
10 10 import re
11 11 import sys
12 12 import ast
13 13 import datetime
14 14 import traceback
15 15 import time
16 16 from multiprocessing import Process, Queue
17 17 from threading import Thread
18 18 from xml.etree.ElementTree import ElementTree, Element, SubElement
19 19
20 20 from schainpy.admin import Alarm, SchainWarning
21 21 from schainpy.model import *
22 22 from schainpy.utils import log
23 23
24 24
25 25 class ConfBase():
26 26
27 27 def __init__(self):
28 28
29 29 self.id = '0'
30 30 self.name = None
31 31 self.priority = None
32 32 self.parameters = {}
33 33 self.object = None
34 34 self.operations = []
35 35
36 36 def getId(self):
37 37
38 38 return self.id
39 39
40 40 def getNewId(self):
41 41
42 42 return int(self.id) * 10 + len(self.operations) + 1
43 43
44 44 def updateId(self, new_id):
45 45
46 46 self.id = str(new_id)
47 47
48 48 n = 1
49 49 for conf in self.operations:
50 50 conf_id = str(int(new_id) * 10 + n)
51 51 conf.updateId(conf_id)
52 52 n += 1
53 53
54 54 def getKwargs(self):
55 55
56 56 params = {}
57 57
58 58 for key, value in self.parameters.items():
59 59 if value not in (None, '', ' '):
60 60 params[key] = value
61 61
62 62 return params
63 63
64 64 def update(self, **kwargs):
65 65
66 66 for key, value in kwargs.items():
67 67 self.addParameter(name=key, value=value)
68 68
69 69 def addParameter(self, name, value, format=None):
70 70 '''
71 71 '''
72 72
73 73 if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
74 74 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
75 75 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
76 76 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
77 77 else:
78 78 try:
79 79 self.parameters[name] = ast.literal_eval(value)
80 80 except:
81 81 if isinstance(value, str) and ',' in value:
82 82 self.parameters[name] = value.split(',')
83 83 else:
84 84 self.parameters[name] = value
85 85
86 86 def getParameters(self):
87 87
88 88 params = {}
89 89 for key, value in self.parameters.items():
90 90 s = type(value).__name__
91 91 if s == 'date':
92 92 params[key] = value.strftime('%Y/%m/%d')
93 93 elif s == 'time':
94 94 params[key] = value.strftime('%H:%M:%S')
95 95 else:
96 96 params[key] = str(value)
97 97
98 98 return params
99 99
100 100 def makeXml(self, element):
101 101
102 102 xml = SubElement(element, self.ELEMENTNAME)
103 103 for label in self.xml_labels:
104 104 xml.set(label, str(getattr(self, label)))
105 105
106 106 for key, value in self.getParameters().items():
107 107 xml_param = SubElement(xml, 'Parameter')
108 108 xml_param.set('name', key)
109 109 xml_param.set('value', value)
110 110
111 111 for conf in self.operations:
112 112 conf.makeXml(xml)
113 113
114 114 def __str__(self):
115 115
116 116 if self.ELEMENTNAME == 'Operation':
117 117 s = ' {}[id={}]\n'.format(self.name, self.id)
118 118 else:
119 119 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
120 120
121 121 for key, value in self.parameters.items():
122 122 if self.ELEMENTNAME == 'Operation':
123 123 s += ' {}: {}\n'.format(key, value)
124 124 else:
125 125 s += ' {}: {}\n'.format(key, value)
126 126
127 127 for conf in self.operations:
128 128 s += str(conf)
129 129
130 130 return s
131 131
132 132 class OperationConf(ConfBase):
133 133
134 134 ELEMENTNAME = 'Operation'
135 135 xml_labels = ['id', 'name']
136 136
137 137 def setup(self, id, name, priority, project_id, err_queue):
138 138
139 139 self.id = str(id)
140 140 self.project_id = project_id
141 141 self.name = name
142 142 self.type = 'other'
143 143 self.err_queue = err_queue
144 144
145 145 def readXml(self, element, project_id, err_queue):
146 146
147 147 self.id = element.get('id')
148 148 self.name = element.get('name')
149 149 self.type = 'other'
150 150 self.project_id = str(project_id)
151 151 self.err_queue = err_queue
152 152
153 153 for elm in element.iter('Parameter'):
154 154 self.addParameter(elm.get('name'), elm.get('value'))
155 155
156 156 def createObject(self):
157 157
158 158 className = eval(self.name)
159 159
160 160 if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name:
161 161 kwargs = self.getKwargs()
162 162 opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
163 163 opObj.start()
164 164 self.type = 'external'
165 165 else:
166 166 opObj = className()
167 167
168 168 self.object = opObj
169 169 return opObj
170 170
171 171 class ProcUnitConf(ConfBase):
172 172
173 173 ELEMENTNAME = 'ProcUnit'
174 174 xml_labels = ['id', 'inputId', 'name']
175 175
176 176 def setup(self, project_id, id, name, datatype, inputId, err_queue):
177 177 '''
178 178 '''
179 179
180 180 if datatype == None and name == None:
181 181 raise ValueError('datatype or name should be defined')
182 182
183 183 if name == None:
184 184 if 'Proc' in datatype:
185 185 name = datatype
186 186 else:
187 187 name = '%sProc' % (datatype)
188 188
189 189 if datatype == None:
190 190 datatype = name.replace('Proc', '')
191 191
192 192 self.id = str(id)
193 193 self.project_id = project_id
194 194 self.name = name
195 195 self.datatype = datatype
196 196 self.inputId = inputId
197 197 self.err_queue = err_queue
198 198 self.operations = []
199 199 self.parameters = {}
200 200
201 201 def removeOperation(self, id):
202 202
203 203 i = [1 if x.id==id else 0 for x in self.operations]
204 204 self.operations.pop(i.index(1))
205 205
206 206 def getOperation(self, id):
207 207
208 208 for conf in self.operations:
209 209 if conf.id == id:
210 210 return conf
211 211
212 212 def addOperation(self, name, optype='self'):
213 213 '''
214 214 '''
215 215
216 216 id = self.getNewId()
217 217 conf = OperationConf()
218 218 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
219 219 self.operations.append(conf)
220 220
221 221 return conf
222 222
223 223 def readXml(self, element, project_id, err_queue):
224 224
225 225 self.id = element.get('id')
226 226 self.name = element.get('name')
227 227 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
228 228 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
229 229 self.project_id = str(project_id)
230 230 self.err_queue = err_queue
231 231 self.operations = []
232 232 self.parameters = {}
233 233
234 234 for elm in element:
235 235 if elm.tag == 'Parameter':
236 236 self.addParameter(elm.get('name'), elm.get('value'))
237 237 elif elm.tag == 'Operation':
238 238 conf = OperationConf()
239 239 conf.readXml(elm, project_id, err_queue)
240 240 self.operations.append(conf)
241 241
242 242 def createObjects(self):
243 243 '''
244 244 Instancia de unidades de procesamiento.
245 245 '''
246 246
247 247 className = eval(self.name)
248 248 kwargs = self.getKwargs()
249 249 procUnitObj = className()
250 250 procUnitObj.name = self.name
251 251 log.success('creating process...', self.name)
252 252
253 253 for conf in self.operations:
254 254
255 255 opObj = conf.createObject()
256 256
257 257 log.success('adding operation: {}, type:{}'.format(
258 258 conf.name,
259 259 conf.type), self.name)
260 260
261 261 procUnitObj.addOperation(conf, opObj)
262 262
263 263 self.object = procUnitObj
264 264
265 265 def run(self):
266 266 '''
267 267 '''
268 268
269 269 return self.object.call(**self.getKwargs())
270 270
271 271
272 272 class ReadUnitConf(ProcUnitConf):
273 273
274 274 ELEMENTNAME = 'ReadUnit'
275 275
276 276 def __init__(self):
277 277
278 278 self.id = None
279 279 self.datatype = None
280 280 self.name = None
281 281 self.inputId = None
282 282 self.operations = []
283 283 self.parameters = {}
284 284
285 285 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
286 286 startTime='', endTime='', server=None, **kwargs):
287 287
288 288 if datatype == None and name == None:
289 289 raise ValueError('datatype or name should be defined')
290 290 if name == None:
291 291 if 'Reader' in datatype:
292 292 name = datatype
293 293 datatype = name.replace('Reader','')
294 294 else:
295 295 name = '{}Reader'.format(datatype)
296 296 if datatype == None:
297 297 if 'Reader' in name:
298 298 datatype = name.replace('Reader','')
299 299 else:
300 300 datatype = name
301 301 name = '{}Reader'.format(name)
302 302
303 303 self.id = id
304 304 self.project_id = project_id
305 305 self.name = name
306 306 self.datatype = datatype
307 307 self.err_queue = err_queue
308 308
309 309 self.addParameter(name='path', value=path)
310 310 self.addParameter(name='startDate', value=startDate)
311 311 self.addParameter(name='endDate', value=endDate)
312 312 self.addParameter(name='startTime', value=startTime)
313 313 self.addParameter(name='endTime', value=endTime)
314 314
315 315 for key, value in kwargs.items():
316 316 self.addParameter(name=key, value=value)
317 317
318 318
319 319 class Project(Process):
320 320 """API to create signal chain projects"""
321 321
322 322 ELEMENTNAME = 'Project'
323 323
324 def __init__(self):
324 def __init__(self, name=''):
325 325
326 Process.__init__(self, name='')
326 Process.__init__(self)
327 327 self.id = '1'
328 328 if name:
329 329 self.name = '{} ({})'.format(Process.__name__, name)
330 330 self.filename = None
331 331 self.description = None
332 332 self.email = None
333 333 self.alarm = []
334 334 self.configurations = {}
335 335 # self.err_queue = Queue()
336 336 self.err_queue = None
337 337 self.started = False
338 338
339 339 def getNewId(self):
340 340
341 341 idList = list(self.configurations.keys())
342 342 id = int(self.id) * 10
343 343
344 344 while True:
345 345 id += 1
346 346
347 347 if str(id) in idList:
348 348 continue
349 349
350 350 break
351 351
352 352 return str(id)
353 353
354 354 def updateId(self, new_id):
355 355
356 356 self.id = str(new_id)
357 357
358 358 keyList = list(self.configurations.keys())
359 359 keyList.sort()
360 360
361 361 n = 1
362 362 new_confs = {}
363 363
364 364 for procKey in keyList:
365 365
366 366 conf = self.configurations[procKey]
367 367 idProcUnit = str(int(self.id) * 10 + n)
368 368 conf.updateId(idProcUnit)
369 369 new_confs[idProcUnit] = conf
370 370 n += 1
371 371
372 372 self.configurations = new_confs
373 373
374 374 def setup(self, id=1, name='', description='', email=None, alarm=[]):
375 375
376 376 self.id = str(id)
377 377 self.description = description
378 378 self.email = email
379 379 self.alarm = alarm
380 380 if name:
381 381 self.name = '{} ({})'.format(Process.__name__, name)
382 382
383 383 def update(self, **kwargs):
384 384
385 385 for key, value in kwargs.items():
386 386 setattr(self, key, value)
387 387
388 388 def clone(self):
389 389
390 390 p = Project()
391 391 p.id = self.id
392 392 p.name = self.name
393 393 p.description = self.description
394 394 p.configurations = self.configurations.copy()
395 395
396 396 return p
397 397
398 398 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
399 399
400 400 '''
401 401 '''
402 402
403 403 if id is None:
404 404 idReadUnit = self.getNewId()
405 405 else:
406 406 idReadUnit = str(id)
407 407
408 408 conf = ReadUnitConf()
409 409 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
410 410 self.configurations[conf.id] = conf
411 411
412 412 return conf
413 413
414 414 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
415 415
416 416 '''
417 417 '''
418 418
419 419 if id is None:
420 420 idProcUnit = self.getNewId()
421 421 else:
422 422 idProcUnit = id
423 423
424 424 conf = ProcUnitConf()
425 425 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
426 426 self.configurations[conf.id] = conf
427 427
428 428 return conf
429 429
430 430 def removeProcUnit(self, id):
431 431
432 432 if id in self.configurations:
433 433 self.configurations.pop(id)
434 434
435 435 def getReadUnit(self):
436 436
437 437 for obj in list(self.configurations.values()):
438 438 if obj.ELEMENTNAME == 'ReadUnit':
439 439 return obj
440 440
441 441 return None
442 442
443 443 def getProcUnit(self, id):
444 444
445 445 return self.configurations[id]
446 446
447 447 def getUnits(self):
448 448
449 449 keys = list(self.configurations)
450 450 keys.sort()
451 451
452 452 for key in keys:
453 453 yield self.configurations[key]
454 454
455 455 def updateUnit(self, id, **kwargs):
456 456
457 457 conf = self.configurations[id].update(**kwargs)
458 458
459 459 def makeXml(self):
460 460
461 461 xml = Element('Project')
462 462 xml.set('id', str(self.id))
463 463 xml.set('name', self.name)
464 464 xml.set('description', self.description)
465 465
466 466 for conf in self.configurations.values():
467 467 conf.makeXml(xml)
468 468
469 469 self.xml = xml
470 470
471 471 def writeXml(self, filename=None):
472 472
473 473 if filename == None:
474 474 if self.filename:
475 475 filename = self.filename
476 476 else:
477 477 filename = 'schain.xml'
478 478
479 479 if not filename:
480 480 print('filename has not been defined. Use setFilename(filename) for do it.')
481 481 return 0
482 482
483 483 abs_file = os.path.abspath(filename)
484 484
485 485 if not os.access(os.path.dirname(abs_file), os.W_OK):
486 486 print('No write permission on %s' % os.path.dirname(abs_file))
487 487 return 0
488 488
489 489 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
490 490 print('File %s already exists and it could not be overwriten' % abs_file)
491 491 return 0
492 492
493 493 self.makeXml()
494 494
495 495 ElementTree(self.xml).write(abs_file, method='xml')
496 496
497 497 self.filename = abs_file
498 498
499 499 return 1
500 500
501 501 def readXml(self, filename):
502 502
503 503 abs_file = os.path.abspath(filename)
504 504
505 505 self.configurations = {}
506 506
507 507 try:
508 508 self.xml = ElementTree().parse(abs_file)
509 509 except:
510 510 log.error('Error reading %s, verify file format' % filename)
511 511 return 0
512 512
513 513 self.id = self.xml.get('id')
514 514 self.name = self.xml.get('name')
515 515 self.description = self.xml.get('description')
516 516
517 517 for element in self.xml:
518 518 if element.tag == 'ReadUnit':
519 519 conf = ReadUnitConf()
520 520 conf.readXml(element, self.id, self.err_queue)
521 521 self.configurations[conf.id] = conf
522 522 elif element.tag == 'ProcUnit':
523 523 conf = ProcUnitConf()
524 524 input_proc = self.configurations[element.get('inputId')]
525 525 conf.readXml(element, self.id, self.err_queue)
526 526 self.configurations[conf.id] = conf
527 527
528 528 self.filename = abs_file
529 529
530 530 return 1
531 531
532 532 def __str__(self):
533 533
534 534 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
535 535 self.id,
536 536 self.name,
537 537 self.description,
538 538 )
539 539
540 540 for conf in self.configurations.values():
541 541 text += '{}'.format(conf)
542 542
543 543 return text
544 544
545 545 def createObjects(self):
546 546
547 547 keys = list(self.configurations.keys())
548 548 keys.sort()
549 549 for key in keys:
550 550 conf = self.configurations[key]
551 551 conf.createObjects()
552 552 if conf.inputId is not None:
553 553 conf.object.setInput(self.configurations[conf.inputId].object)
554 554
555 555 def monitor(self):
556 556
557 557 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
558 558 t.start()
559 559
560 560 def _monitor(self, queue, ctx):
561 561
562 562 import socket
563 563
564 564 procs = 0
565 565 err_msg = ''
566 566
567 567 while True:
568 568 msg = queue.get()
569 569 if '#_start_#' in msg:
570 570 procs += 1
571 571 elif '#_end_#' in msg:
572 572 procs -=1
573 573 else:
574 574 err_msg = msg
575 575
576 576 if procs == 0 or 'Traceback' in err_msg:
577 577 break
578 578 time.sleep(0.1)
579 579
580 580 if '|' in err_msg:
581 581 name, err = err_msg.split('|')
582 582 if 'SchainWarning' in err:
583 583 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
584 584 elif 'SchainError' in err:
585 585 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
586 586 else:
587 587 log.error(err, name)
588 588 else:
589 589 name, err = self.name, err_msg
590 590
591 591 time.sleep(1)
592 592
593 593 ctx.term()
594 594
595 595 message = ''.join(err)
596 596
597 597 if err_msg:
598 598 subject = 'SChain v%s: Error running %s\n' % (
599 599 schainpy.__version__, self.name)
600 600
601 601 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
602 602 socket.gethostname())
603 603 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
604 604 subtitle += 'Configuration file: %s\n' % self.filename
605 605 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
606 606
607 607 readUnitConfObj = self.getReadUnit()
608 608 if readUnitConfObj:
609 609 subtitle += '\nInput parameters:\n'
610 610 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
611 611 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
612 612 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
613 613 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
614 614 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
615 615
616 616 a = Alarm(
617 617 modes=self.alarm,
618 618 email=self.email,
619 619 message=message,
620 620 subject=subject,
621 621 subtitle=subtitle,
622 622 filename=self.filename
623 623 )
624 624
625 625 a.start()
626 626
627 627 def setFilename(self, filename):
628 628
629 629 self.filename = filename
630 630
631 631 def runProcs(self):
632 632
633 633 err = False
634 634 n = len(self.configurations)
635 635
636 636 while not err:
637 637 for conf in self.getUnits():
638 638 ok = conf.run()
639 639 if ok is 'Error':
640 640 n -= 1
641 641 continue
642 642 elif not ok:
643 643 break
644 644 if n == 0:
645 645 err = True
646 646
647 647 def run(self):
648 648
649 649 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
650 650 self.started = True
651 651 self.start_time = time.time()
652 652 self.createObjects()
653 653 self.runProcs()
654 654 log.success('{} Done (Time: {:4.2f}s)'.format(
655 655 self.name,
656 656 time.time()-self.start_time), '')
General Comments 0
You need to be logged in to leave comments. Login now