##// END OF EJS Templates
update controller change class Project update createObjects
Alexander Valdez -
r1670:1003ac744c06
parent child
Show More
@@ -1,659 +1,661
1 1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
2 2 # All rights reserved.
3 3 #
4 4 # Distributed under the terms of the BSD 3-clause license.
5 5 """API to create signal chain projects
6 6
7 7 The API is provide through class: Project
8 8 """
9 9
10 10 import re
11 11 import sys
12 12 import ast
13 13 import datetime
14 14 import traceback
15 15 import time
16 16 import multiprocessing
17 17 from multiprocessing import Process, Queue
18 18 from threading import Thread
19 19 from xml.etree.ElementTree import ElementTree, Element, SubElement
20 20
21 21 from schainpy.admin import Alarm, SchainWarning
22 22 from schainpy.model import *
23 23 from schainpy.utils import log
24 24
25 25 if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7:
26 26 multiprocessing.set_start_method('fork')
27 27
28 28 class ConfBase():
29 29
30 30 def __init__(self):
31 31
32 32 self.id = '0'
33 33 self.name = None
34 34 self.priority = None
35 35 self.parameters = {}
36 36 self.object = None
37 37 self.operations = []
38 38
39 39 def getId(self):
40 40
41 41 return self.id
42
42
43 43 def getNewId(self):
44 44
45 45 return int(self.id) * 10 + len(self.operations) + 1
46 46
47 47 def updateId(self, new_id):
48 48
49 49 self.id = str(new_id)
50 50
51 51 n = 1
52 52 for conf in self.operations:
53 53 conf_id = str(int(new_id) * 10 + n)
54 54 conf.updateId(conf_id)
55 55 n += 1
56 56
57 57 def getKwargs(self):
58 58
59 59 params = {}
60 60
61 61 for key, value in self.parameters.items():
62 62 if value not in (None, '', ' '):
63 63 params[key] = value
64
64
65 65 return params
66 66
67 67 def update(self, **kwargs):
68 68
69 69 for key, value in kwargs.items():
70 70 self.addParameter(name=key, value=value)
71 71
72 72 def addParameter(self, name, value, format=None):
73 73 '''
74 74 '''
75 75
76 76 if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
77 77 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
78 78 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
79 79 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
80 80 else:
81 81 try:
82 82 self.parameters[name] = ast.literal_eval(value)
83 83 except:
84 84 if isinstance(value, str) and ',' in value:
85 85 self.parameters[name] = value.split(',')
86 86 else:
87 87 self.parameters[name] = value
88 88
89 89 def getParameters(self):
90 90
91 91 params = {}
92 92 for key, value in self.parameters.items():
93 93 s = type(value).__name__
94 94 if s == 'date':
95 95 params[key] = value.strftime('%Y/%m/%d')
96 96 elif s == 'time':
97 97 params[key] = value.strftime('%H:%M:%S')
98 98 else:
99 99 params[key] = str(value)
100 100
101 101 return params
102
102
103 103 def makeXml(self, element):
104 104
105 105 xml = SubElement(element, self.ELEMENTNAME)
106 106 for label in self.xml_labels:
107 107 xml.set(label, str(getattr(self, label)))
108
108
109 109 for key, value in self.getParameters().items():
110 110 xml_param = SubElement(xml, 'Parameter')
111 111 xml_param.set('name', key)
112 112 xml_param.set('value', value)
113
113
114 114 for conf in self.operations:
115 115 conf.makeXml(xml)
116
116
117 117 def __str__(self):
118 118
119 119 if self.ELEMENTNAME == 'Operation':
120 120 s = ' {}[id={}]\n'.format(self.name, self.id)
121 121 else:
122 122 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
123 123
124 124 for key, value in self.parameters.items():
125 125 if self.ELEMENTNAME == 'Operation':
126 126 s += ' {}: {}\n'.format(key, value)
127 127 else:
128 128 s += ' {}: {}\n'.format(key, value)
129
129
130 130 for conf in self.operations:
131 131 s += str(conf)
132 132
133 133 return s
134 134
135 135 class OperationConf(ConfBase):
136 136
137 137 ELEMENTNAME = 'Operation'
138 138 xml_labels = ['id', 'name']
139 139
140 140 def setup(self, id, name, priority, project_id, err_queue):
141 141
142 142 self.id = str(id)
143 143 self.project_id = project_id
144 144 self.name = name
145 145 self.type = 'other'
146 146 self.err_queue = err_queue
147 147
148 148 def readXml(self, element, project_id, err_queue):
149 149
150 150 self.id = element.get('id')
151 151 self.name = element.get('name')
152 152 self.type = 'other'
153 153 self.project_id = str(project_id)
154 154 self.err_queue = err_queue
155 155
156 156 for elm in element.iter('Parameter'):
157 157 self.addParameter(elm.get('name'), elm.get('value'))
158 158
159 159 def createObject(self):
160 160
161 161 className = eval(self.name)
162 162
163 163 if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name:
164 164 kwargs = self.getKwargs()
165 165 opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
166 166 opObj.start()
167 167 self.type = 'external'
168 168 else:
169 169 opObj = className()
170 170
171 171 self.object = opObj
172 172 return opObj
173 173
174 174 class ProcUnitConf(ConfBase):
175 175
176 176 ELEMENTNAME = 'ProcUnit'
177 177 xml_labels = ['id', 'inputId', 'name']
178 178
179 179 def setup(self, project_id, id, name, datatype, inputId, err_queue):
180 180 '''
181 181 '''
182
182
183 183 if datatype == None and name == None:
184 184 raise ValueError('datatype or name should be defined')
185 185
186 186 if name == None:
187 187 if 'Proc' in datatype:
188 188 name = datatype
189 189 else:
190 190 name = '%sProc' % (datatype)
191 191
192 192 if datatype == None:
193 193 datatype = name.replace('Proc', '')
194 194
195 195 self.id = str(id)
196 196 self.project_id = project_id
197 197 self.name = name
198 198 self.datatype = datatype
199 199 self.inputId = inputId
200 200 self.err_queue = err_queue
201 201 self.operations = []
202 202 self.parameters = {}
203 203
204 204 def removeOperation(self, id):
205 205
206 206 i = [1 if x.id==id else 0 for x in self.operations]
207 207 self.operations.pop(i.index(1))
208
208
209 209 def getOperation(self, id):
210 210
211 211 for conf in self.operations:
212 212 if conf.id == id:
213 213 return conf
214 214
215 215 def addOperation(self, name, optype='self'):
216 216 '''
217 217 '''
218 218
219 219 id = self.getNewId()
220 220 conf = OperationConf()
221 221 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
222 222 self.operations.append(conf)
223 223
224 224 return conf
225 225
226 226 def readXml(self, element, project_id, err_queue):
227 227
228 228 self.id = element.get('id')
229 229 self.name = element.get('name')
230 230 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
231 231 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
232 232 self.project_id = str(project_id)
233 233 self.err_queue = err_queue
234 234 self.operations = []
235 235 self.parameters = {}
236
236
237 237 for elm in element:
238 238 if elm.tag == 'Parameter':
239 239 self.addParameter(elm.get('name'), elm.get('value'))
240 240 elif elm.tag == 'Operation':
241 241 conf = OperationConf()
242 242 conf.readXml(elm, project_id, err_queue)
243 243 self.operations.append(conf)
244 244
245 245 def createObjects(self):
246 246 '''
247 247 Instancia de unidades de procesamiento.
248 248 '''
249 249
250 250 className = eval(self.name)
251 251 kwargs = self.getKwargs()
252 252 procUnitObj = className()
253 253 procUnitObj.name = self.name
254 254 log.success('creating process...', self.name)
255 255
256 256 for conf in self.operations:
257
257
258 258 opObj = conf.createObject()
259
259
260 260 log.success('adding operation: {}, type:{}'.format(
261 261 conf.name,
262 262 conf.type), self.name)
263
263
264 264 procUnitObj.addOperation(conf, opObj)
265
265
266 266 self.object = procUnitObj
267 267
268 268 def run(self):
269 269 '''
270 270 '''
271
271
272 272 return self.object.call(**self.getKwargs())
273 273
274 274
275 275 class ReadUnitConf(ProcUnitConf):
276 276
277 277 ELEMENTNAME = 'ReadUnit'
278 278
279 279 def __init__(self):
280 280
281 281 self.id = None
282 282 self.datatype = None
283 283 self.name = None
284 284 self.inputId = None
285 285 self.operations = []
286 286 self.parameters = {}
287
287
288 288 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
289 289 startTime='', endTime='', server=None, **kwargs):
290
290
291 291 if datatype == None and name == None:
292 292 raise ValueError('datatype or name should be defined')
293 293 if name == None:
294 294 if 'Reader' in datatype:
295 295 name = datatype
296 296 datatype = name.replace('Reader','')
297 297 else:
298 298 name = '{}Reader'.format(datatype)
299 299 if datatype == None:
300 300 if 'Reader' in name:
301 301 datatype = name.replace('Reader','')
302 302 else:
303 303 datatype = name
304 304 name = '{}Reader'.format(name)
305 305
306 306 self.id = id
307 307 self.project_id = project_id
308 308 self.name = name
309 309 self.datatype = datatype
310 self.err_queue = err_queue
311
310 self.err_queue = err_queue
311
312 312 self.addParameter(name='path', value=path)
313 313 self.addParameter(name='startDate', value=startDate)
314 314 self.addParameter(name='endDate', value=endDate)
315 315 self.addParameter(name='startTime', value=startTime)
316 316 self.addParameter(name='endTime', value=endTime)
317 317
318 318 for key, value in kwargs.items():
319 319 self.addParameter(name=key, value=value)
320 320
321 321
322 322 class Project(Process):
323 323 """API to create signal chain projects"""
324 324
325 325 ELEMENTNAME = 'Project'
326 326
327 327 def __init__(self, name=''):
328 328
329 329 Process.__init__(self)
330 330 self.id = '1'
331 331 if name:
332 332 self.name = '{} ({})'.format(Process.__name__, name)
333 333 self.filename = None
334 334 self.description = None
335 335 self.email = None
336 336 self.alarm = []
337 337 self.configurations = {}
338 338 # self.err_queue = Queue()
339 339 self.err_queue = None
340 340 self.started = False
341 341
342 342 def getNewId(self):
343 343
344 344 idList = list(self.configurations.keys())
345 345 id = int(self.id) * 10
346 346
347 347 while True:
348 348 id += 1
349 349
350 350 if str(id) in idList:
351 351 continue
352 352
353 353 break
354 354
355 355 return str(id)
356 356
357 357 def updateId(self, new_id):
358 358
359 359 self.id = str(new_id)
360 360
361 361 keyList = list(self.configurations.keys())
362 362 keyList.sort()
363 363
364 364 n = 1
365 365 new_confs = {}
366 366
367 367 for procKey in keyList:
368 368
369 369 conf = self.configurations[procKey]
370 370 idProcUnit = str(int(self.id) * 10 + n)
371 371 conf.updateId(idProcUnit)
372 372 new_confs[idProcUnit] = conf
373 373 n += 1
374 374
375 375 self.configurations = new_confs
376 376
377 377 def setup(self, id=1, name='', description='', email=None, alarm=[]):
378 378
379 379 self.id = str(id)
380 self.description = description
380 self.description = description
381 381 self.email = email
382 382 self.alarm = alarm
383 383 if name:
384 384 self.name = '{} ({})'.format(Process.__name__, name)
385 385
386 386 def update(self, **kwargs):
387 387
388 388 for key, value in kwargs.items():
389 389 setattr(self, key, value)
390 390
391 391 def clone(self):
392 392
393 393 p = Project()
394 394 p.id = self.id
395 395 p.name = self.name
396 396 p.description = self.description
397 397 p.configurations = self.configurations.copy()
398 398
399 399 return p
400 400
401 401 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
402 402
403 403 '''
404 404 '''
405 405
406 406 if id is None:
407 407 idReadUnit = self.getNewId()
408 408 else:
409 409 idReadUnit = str(id)
410 410
411 411 conf = ReadUnitConf()
412 412 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
413 413 self.configurations[conf.id] = conf
414
414
415 415 return conf
416 416
417 417 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
418 418
419 419 '''
420 420 '''
421 421
422 422 if id is None:
423 423 idProcUnit = self.getNewId()
424 424 else:
425 425 idProcUnit = id
426
426
427 427 conf = ProcUnitConf()
428 428 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
429 429 self.configurations[conf.id] = conf
430 430
431 431 return conf
432 432
433 433 def removeProcUnit(self, id):
434 434
435 435 if id in self.configurations:
436 436 self.configurations.pop(id)
437 437
438 438 def getReadUnit(self):
439 439
440 440 for obj in list(self.configurations.values()):
441 441 if obj.ELEMENTNAME == 'ReadUnit':
442 442 return obj
443 443
444 444 return None
445 445
446 446 def getProcUnit(self, id):
447 447
448 448 return self.configurations[id]
449 449
450 450 def getUnits(self):
451 451
452 452 keys = list(self.configurations)
453 453 keys.sort()
454 454
455 455 for key in keys:
456 456 yield self.configurations[key]
457 457
458 458 def updateUnit(self, id, **kwargs):
459 459
460 460 conf = self.configurations[id].update(**kwargs)
461
461
462 462 def makeXml(self):
463 463
464 464 xml = Element('Project')
465 465 xml.set('id', str(self.id))
466 466 xml.set('name', self.name)
467 467 xml.set('description', self.description)
468 468
469 469 for conf in self.configurations.values():
470 470 conf.makeXml(xml)
471 471
472 472 self.xml = xml
473 473
474 474 def writeXml(self, filename=None):
475 475
476 476 if filename == None:
477 477 if self.filename:
478 478 filename = self.filename
479 479 else:
480 480 filename = 'schain.xml'
481 481
482 482 if not filename:
483 483 print('filename has not been defined. Use setFilename(filename) for do it.')
484 484 return 0
485 485
486 486 abs_file = os.path.abspath(filename)
487 487
488 488 if not os.access(os.path.dirname(abs_file), os.W_OK):
489 489 print('No write permission on %s' % os.path.dirname(abs_file))
490 490 return 0
491 491
492 492 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
493 493 print('File %s already exists and it could not be overwriten' % abs_file)
494 494 return 0
495 495
496 496 self.makeXml()
497 497
498 498 ElementTree(self.xml).write(abs_file, method='xml')
499 499
500 500 self.filename = abs_file
501 501
502 502 return 1
503 503
504 504 def readXml(self, filename):
505 505
506 506 abs_file = os.path.abspath(filename)
507 507
508 508 self.configurations = {}
509 509
510 510 try:
511 511 self.xml = ElementTree().parse(abs_file)
512 512 except:
513 513 log.error('Error reading %s, verify file format' % filename)
514 514 return 0
515 515
516 516 self.id = self.xml.get('id')
517 517 self.name = self.xml.get('name')
518 518 self.description = self.xml.get('description')
519 519
520 520 for element in self.xml:
521 521 if element.tag == 'ReadUnit':
522 522 conf = ReadUnitConf()
523 523 conf.readXml(element, self.id, self.err_queue)
524 524 self.configurations[conf.id] = conf
525 525 elif element.tag == 'ProcUnit':
526 526 conf = ProcUnitConf()
527 527 input_proc = self.configurations[element.get('inputId')]
528 528 conf.readXml(element, self.id, self.err_queue)
529 529 self.configurations[conf.id] = conf
530 530
531 531 self.filename = abs_file
532
532
533 533 return 1
534 534
535 535 def __str__(self):
536 536
537 537 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
538 538 self.id,
539 539 self.name,
540 540 self.description,
541 541 )
542 542
543 543 for conf in self.configurations.values():
544 544 text += '{}'.format(conf)
545 545
546 546 return text
547 547
548 548 def createObjects(self):
549 549
550 550 keys = list(self.configurations.keys())
551 551 keys.sort()
552 552 for key in keys:
553 553 conf = self.configurations[key]
554 554 conf.createObjects()
555 555 if conf.inputId is not None:
556 conf.object.setInput(self.configurations[conf.inputId].object)
556 if isinstance(conf.inputId, list):
557 conf.object.setInput([self.configurations[x].object for x in conf.inputId])
558 else:
559 conf.object.setInput([self.configurations[conf.inputId].object])
557 560
558 561 def monitor(self):
559 562
560 563 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
561 564 t.start()
562
565
563 566 def _monitor(self, queue, ctx):
564 567
565 568 import socket
566
569
567 570 procs = 0
568 571 err_msg = ''
569
572
570 573 while True:
571 574 msg = queue.get()
572 575 if '#_start_#' in msg:
573 576 procs += 1
574 577 elif '#_end_#' in msg:
575 578 procs -=1
576 579 else:
577 580 err_msg = msg
578
579 if procs == 0 or 'Traceback' in err_msg:
581
582 if procs == 0 or 'Traceback' in err_msg:
580 583 break
581 584 time.sleep(0.1)
582
585
583 586 if '|' in err_msg:
584 587 name, err = err_msg.split('|')
585 588 if 'SchainWarning' in err:
586 589 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
587 590 elif 'SchainError' in err:
588 591 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
589 592 else:
590 593 log.error(err, name)
591 else:
594 else:
592 595 name, err = self.name, err_msg
593
596
594 597 time.sleep(1)
595
598
596 599 ctx.term()
597 600
598 601 message = ''.join(err)
599 602
600 603 if err_msg:
601 604 subject = 'SChain v%s: Error running %s\n' % (
602 605 schainpy.__version__, self.name)
603 606
604 607 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
605 608 socket.gethostname())
606 609 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
607 610 subtitle += 'Configuration file: %s\n' % self.filename
608 611 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
609 612
610 613 readUnitConfObj = self.getReadUnit()
611 614 if readUnitConfObj:
612 615 subtitle += '\nInput parameters:\n'
613 616 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
614 617 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
615 618 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
616 619 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
617 620 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
618 621
619 622 a = Alarm(
620 modes=self.alarm,
623 modes=self.alarm,
621 624 email=self.email,
622 625 message=message,
623 626 subject=subject,
624 627 subtitle=subtitle,
625 628 filename=self.filename
626 629 )
627 630
628 631 a.start()
629 632
630 633 def setFilename(self, filename):
631 634
632 635 self.filename = filename
633 636
634 637 def runProcs(self):
635 638
636 639 err = False
637 640 n = len(self.configurations)
638
639 641 while not err:
640 642 for conf in self.getUnits():
641 ok = conf.run()
643 ok = conf.run()
642 644 if ok == 'Error':
643 645 n -= 1
644 646 continue
645 647 elif not ok:
646 648 break
647 649 if n == 0:
648 650 err = True
649
651
650 652 def run(self):
651 653
652 654 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
653 655 self.started = True
654 self.start_time = time.time()
656 self.start_time = time.time()
655 657 self.createObjects()
656 658 self.runProcs()
657 659 log.success('{} Done (Time: {:4.2f}s)'.format(
658 660 self.name,
659 time.time()-self.start_time), '')
661 time.time()-self.start_time), '') No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now