##// END OF EJS Templates
Add handler to terminate child process
Juan C. Espinoza -
r1533:37c6d1c7452d
parent child
Show More
@@ -1,665 +1,676
1 1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
2 2 # All rights reserved.
3 3 #
4 4 # Distributed under the terms of the BSD 3-clause license.
5 5 """API to create signal chain projects
6 6
7 7 The API is provide through class: Project
8 8 """
9 9
10 10 import re
11 11 import sys
12 12 import ast
13 13 import datetime
14 14 import traceback
15 15 import time
16 16 import multiprocessing
17 from multiprocessing import Process, Queue
17 import signal as sig
18 from multiprocessing import Process, Queue, active_children
18 19 from threading import Thread
19 20 from xml.etree.ElementTree import ElementTree, Element, SubElement
20 21
21 22 from schainpy.admin import Alarm, SchainWarning
22 23 from schainpy.model import *
23 24 from schainpy.utils import log
24 25
25 26 if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7:
26 27 multiprocessing.set_start_method('fork')
27 28
29 def handler(sig, frame):
30 # get all active child processes
31 active = active_children()
32 # terminate all active children
33 for child in active:
34 child.terminate()
35 # terminate the process
36 sys.exit(0)
37
28 38 class ConfBase():
29 39
30 40 def __init__(self):
31 41
32 42 self.id = '0'
33 43 self.name = None
34 44 self.priority = None
35 45 self.parameters = {}
36 46 self.object = None
37 47 self.operations = []
38 48
39 49 def getId(self):
40 50
41 51 return self.id
42 52
43 53 def getNewId(self):
44 54
45 55 return int(self.id) * 10 + len(self.operations) + 1
46 56
47 57 def updateId(self, new_id):
48 58
49 59 self.id = str(new_id)
50 60
51 61 n = 1
52 62 for conf in self.operations:
53 63 conf_id = str(int(new_id) * 10 + n)
54 64 conf.updateId(conf_id)
55 65 n += 1
56 66
57 67 def getKwargs(self):
58 68
59 69 params = {}
60 70
61 71 for key, value in self.parameters.items():
62 72 if value not in (None, '', ' '):
63 73 params[key] = value
64 74
65 75 return params
66 76
67 77 def update(self, **kwargs):
68 78
69 79 if 'format' not in kwargs:
70 80 kwargs['format'] = None
71 81 for key, value, fmt in kwargs.items():
72 82 self.addParameter(name=key, value=value, format=fmt)
73 83
74 84 def addParameter(self, name, value, format=None):
75 85 '''
76 86 '''
77 87 if format is not None:
78 88 self.parameters[name] = eval(format)(value)
79 89 elif isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
80 90 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
81 91 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
82 92 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
83 93 else:
84 94 try:
85 95 self.parameters[name] = ast.literal_eval(value)
86 96 except:
87 97 if isinstance(value, str) and ',' in value:
88 98 self.parameters[name] = value.split(',')
89 99 else:
90 100 self.parameters[name] = value
91 101
92 102 def getParameters(self):
93 103
94 104 params = {}
95 105 for key, value in self.parameters.items():
96 106 s = type(value).__name__
97 107 if s == 'date':
98 108 params[key] = value.strftime('%Y/%m/%d')
99 109 elif s == 'time':
100 110 params[key] = value.strftime('%H:%M:%S')
101 111 else:
102 112 params[key] = str(value)
103 113
104 114 return params
105 115
106 116 def makeXml(self, element):
107 117
108 118 xml = SubElement(element, self.ELEMENTNAME)
109 119 for label in self.xml_labels:
110 120 xml.set(label, str(getattr(self, label)))
111 121
112 122 for key, value in self.getParameters().items():
113 123 xml_param = SubElement(xml, 'Parameter')
114 124 xml_param.set('name', key)
115 125 xml_param.set('value', value)
116 126
117 127 for conf in self.operations:
118 128 conf.makeXml(xml)
119 129
120 130 def __str__(self):
121 131
122 132 if self.ELEMENTNAME == 'Operation':
123 133 s = ' {}[id={}]\n'.format(self.name, self.id)
124 134 else:
125 135 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
126 136
127 137 for key, value in self.parameters.items():
128 138 if self.ELEMENTNAME == 'Operation':
129 139 s += ' {}: {}\n'.format(key, value)
130 140 else:
131 141 s += ' {}: {}\n'.format(key, value)
132 142
133 143 for conf in self.operations:
134 144 s += str(conf)
135 145
136 146 return s
137 147
138 148 class OperationConf(ConfBase):
139 149
140 150 ELEMENTNAME = 'Operation'
141 151 xml_labels = ['id', 'name']
142 152
143 153 def setup(self, id, name, priority, project_id, err_queue):
144 154
145 155 self.id = str(id)
146 156 self.project_id = project_id
147 157 self.name = name
148 158 self.type = 'other'
149 159 self.err_queue = err_queue
150 160
151 161 def readXml(self, element, project_id, err_queue):
152 162
153 163 self.id = element.get('id')
154 164 self.name = element.get('name')
155 165 self.type = 'other'
156 166 self.project_id = str(project_id)
157 167 self.err_queue = err_queue
158 168
159 169 for elm in element.iter('Parameter'):
160 170 self.addParameter(elm.get('name'), elm.get('value'))
161 171
162 172 def createObject(self):
163 173
164 174 className = eval(self.name)
165 175
166 176 if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name:
167 177 kwargs = self.getKwargs()
168 178 opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
169 179 opObj.start()
170 180 self.type = 'external'
171 181 else:
172 182 opObj = className()
173 183
174 184 self.object = opObj
175 185 return opObj
176 186
177 187 class ProcUnitConf(ConfBase):
178 188
179 189 ELEMENTNAME = 'ProcUnit'
180 190 xml_labels = ['id', 'inputId', 'name']
181 191
182 192 def setup(self, project_id, id, name, datatype, inputId, err_queue):
183 193 '''
184 194 '''
185 195
186 196 if datatype == None and name == None:
187 197 raise ValueError('datatype or name should be defined')
188 198
189 199 if name == None:
190 200 if 'Proc' in datatype:
191 201 name = datatype
192 202 else:
193 203 name = '%sProc' % (datatype)
194 204
195 205 if datatype == None:
196 206 datatype = name.replace('Proc', '')
197 207
198 208 self.id = str(id)
199 209 self.project_id = project_id
200 210 self.name = name
201 211 self.datatype = datatype
202 212 self.inputId = inputId
203 213 self.err_queue = err_queue
204 214 self.operations = []
205 215 self.parameters = {}
206 216
207 217 def removeOperation(self, id):
208 218
209 219 i = [1 if x.id==id else 0 for x in self.operations]
210 220 self.operations.pop(i.index(1))
211 221
212 222 def getOperation(self, id):
213 223
214 224 for conf in self.operations:
215 225 if conf.id == id:
216 226 return conf
217 227
218 228 def addOperation(self, name, optype='self'):
219 229 '''
220 230 '''
221 231
222 232 id = self.getNewId()
223 233 conf = OperationConf()
224 234 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
225 235 self.operations.append(conf)
226 236
227 237 return conf
228 238
229 239 def readXml(self, element, project_id, err_queue):
230 240
231 241 self.id = element.get('id')
232 242 self.name = element.get('name')
233 243 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
234 244 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
235 245 self.project_id = str(project_id)
236 246 self.err_queue = err_queue
237 247 self.operations = []
238 248 self.parameters = {}
239 249
240 250 for elm in element:
241 251 if elm.tag == 'Parameter':
242 252 self.addParameter(elm.get('name'), elm.get('value'))
243 253 elif elm.tag == 'Operation':
244 254 conf = OperationConf()
245 255 conf.readXml(elm, project_id, err_queue)
246 256 self.operations.append(conf)
247 257
248 258 def createObjects(self):
249 259 '''
250 260 Instancia de unidades de procesamiento.
251 261 '''
252 262
253 263 className = eval(self.name)
254 264 kwargs = self.getKwargs()
255 265 procUnitObj = className()
256 266 procUnitObj.name = self.name
257 267 log.success('creating process...', self.name)
258 268
259 269 for conf in self.operations:
260 270
261 271 opObj = conf.createObject()
262 272
263 273 log.success('adding operation: {}, type:{}'.format(
264 274 conf.name,
265 275 conf.type), self.name)
266 276
267 277 procUnitObj.addOperation(conf, opObj)
268 278
269 279 self.object = procUnitObj
270 280
271 281 def run(self):
272 282 '''
273 283 '''
274 284
275 285 return self.object.call(**self.getKwargs())
276 286
277 287
278 288 class ReadUnitConf(ProcUnitConf):
279 289
280 290 ELEMENTNAME = 'ReadUnit'
281 291
282 292 def __init__(self):
283 293
284 294 self.id = None
285 295 self.datatype = None
286 296 self.name = None
287 297 self.inputId = None
288 298 self.operations = []
289 299 self.parameters = {}
290 300
291 301 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
292 302 startTime='', endTime='', server=None, **kwargs):
293 303
294 304 if datatype == None and name == None:
295 305 raise ValueError('datatype or name should be defined')
296 306 if name == None:
297 307 if 'Reader' in datatype:
298 308 name = datatype
299 309 datatype = name.replace('Reader','')
300 310 else:
301 311 name = '{}Reader'.format(datatype)
302 312 if datatype == None:
303 313 if 'Reader' in name:
304 314 datatype = name.replace('Reader','')
305 315 else:
306 316 datatype = name
307 317 name = '{}Reader'.format(name)
308 318
309 319 self.id = id
310 320 self.project_id = project_id
311 321 self.name = name
312 322 self.datatype = datatype
313 323 self.err_queue = err_queue
314 324
315 325 self.addParameter(name='path', value=path, format='str')
316 326 self.addParameter(name='startDate', value=startDate)
317 327 self.addParameter(name='endDate', value=endDate)
318 328 self.addParameter(name='startTime', value=startTime)
319 329 self.addParameter(name='endTime', value=endTime)
320 330
321 331 for key, value in kwargs.items():
322 332 self.addParameter(name=key, value=value)
323 333
324 334
325 335 class Project(Process):
326 336 """API to create signal chain projects"""
327 337
328 338 ELEMENTNAME = 'Project'
329 339
330 340 def __init__(self, name=''):
331 341
332 342 Process.__init__(self)
333 343 self.id = '1'
334 344 if name:
335 345 self.name = '{} ({})'.format(Process.__name__, name)
336 346 self.filename = None
337 347 self.description = None
338 348 self.email = None
339 349 self.alarm = []
340 350 self.configurations = {}
341 351 # self.err_queue = Queue()
342 352 self.err_queue = None
343 353 self.started = False
344 354
345 355 def getNewId(self):
346 356
347 357 idList = list(self.configurations.keys())
348 358 id = int(self.id) * 10
349 359
350 360 while True:
351 361 id += 1
352 362
353 363 if str(id) in idList:
354 364 continue
355 365
356 366 break
357 367
358 368 return str(id)
359 369
360 370 def updateId(self, new_id):
361 371
362 372 self.id = str(new_id)
363 373
364 374 keyList = list(self.configurations.keys())
365 375 keyList.sort()
366 376
367 377 n = 1
368 378 new_confs = {}
369 379
370 380 for procKey in keyList:
371 381
372 382 conf = self.configurations[procKey]
373 383 idProcUnit = str(int(self.id) * 10 + n)
374 384 conf.updateId(idProcUnit)
375 385 new_confs[idProcUnit] = conf
376 386 n += 1
377 387
378 388 self.configurations = new_confs
379 389
380 390 def setup(self, id=1, name='', description='', email=None, alarm=[]):
381 391
382 392 self.id = str(id)
383 393 self.description = description
384 394 self.email = email
385 395 self.alarm = alarm
386 396 if name:
387 397 self.name = '{} ({})'.format(Process.__name__, name)
388 398
389 399 def update(self, **kwargs):
390 400
391 401 for key, value in kwargs.items():
392 402 setattr(self, key, value)
393 403
394 404 def clone(self):
395 405
396 406 p = Project()
397 407 p.id = self.id
398 408 p.name = self.name
399 409 p.description = self.description
400 410 p.configurations = self.configurations.copy()
401 411
402 412 return p
403 413
404 414 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
405 415
406 416 '''
407 417 '''
408 418
409 419 if id is None:
410 420 idReadUnit = self.getNewId()
411 421 else:
412 422 idReadUnit = str(id)
413 423
414 424 conf = ReadUnitConf()
415 425 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
416 426 self.configurations[conf.id] = conf
417 427
418 428 return conf
419 429
420 430 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
421 431
422 432 '''
423 433 '''
424 434
425 435 if id is None:
426 436 idProcUnit = self.getNewId()
427 437 else:
428 438 idProcUnit = id
429 439
430 440 conf = ProcUnitConf()
431 441 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
432 442 self.configurations[conf.id] = conf
433 443
434 444 return conf
435 445
436 446 def removeProcUnit(self, id):
437 447
438 448 if id in self.configurations:
439 449 self.configurations.pop(id)
440 450
441 451 def getReadUnit(self):
442 452
443 453 for obj in list(self.configurations.values()):
444 454 if obj.ELEMENTNAME == 'ReadUnit':
445 455 return obj
446 456
447 457 return None
448 458
449 459 def getProcUnit(self, id):
450 460
451 461 return self.configurations[id]
452 462
453 463 def getUnits(self):
454 464
455 465 keys = list(self.configurations)
456 466 keys.sort()
457 467
458 468 for key in keys:
459 469 yield self.configurations[key]
460 470
461 471 def updateUnit(self, id, **kwargs):
462 472
463 473 conf = self.configurations[id].update(**kwargs)
464 474
465 475 def makeXml(self):
466 476
467 477 xml = Element('Project')
468 478 xml.set('id', str(self.id))
469 479 xml.set('name', self.name)
470 480 xml.set('description', self.description)
471 481
472 482 for conf in self.configurations.values():
473 483 conf.makeXml(xml)
474 484
475 485 self.xml = xml
476 486
477 487 def writeXml(self, filename=None):
478 488
479 489 if filename == None:
480 490 if self.filename:
481 491 filename = self.filename
482 492 else:
483 493 filename = 'schain.xml'
484 494
485 495 if not filename:
486 496 print('filename has not been defined. Use setFilename(filename) for do it.')
487 497 return 0
488 498
489 499 abs_file = os.path.abspath(filename)
490 500
491 501 if not os.access(os.path.dirname(abs_file), os.W_OK):
492 502 print('No write permission on %s' % os.path.dirname(abs_file))
493 503 return 0
494 504
495 505 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
496 506 print('File %s already exists and it could not be overwriten' % abs_file)
497 507 return 0
498 508
499 509 self.makeXml()
500 510
501 511 ElementTree(self.xml).write(abs_file, method='xml')
502 512
503 513 self.filename = abs_file
504 514
505 515 return 1
506 516
507 517 def readXml(self, filename):
508 518
509 519 abs_file = os.path.abspath(filename)
510 520
511 521 self.configurations = {}
512 522
513 523 try:
514 524 self.xml = ElementTree().parse(abs_file)
515 525 except:
516 526 log.error('Error reading %s, verify file format' % filename)
517 527 return 0
518 528
519 529 self.id = self.xml.get('id')
520 530 self.name = self.xml.get('name')
521 531 self.description = self.xml.get('description')
522 532
523 533 for element in self.xml:
524 534 if element.tag == 'ReadUnit':
525 535 conf = ReadUnitConf()
526 536 conf.readXml(element, self.id, self.err_queue)
527 537 self.configurations[conf.id] = conf
528 538 elif element.tag == 'ProcUnit':
529 539 conf = ProcUnitConf()
530 540 input_proc = self.configurations[element.get('inputId')]
531 541 conf.readXml(element, self.id, self.err_queue)
532 542 self.configurations[conf.id] = conf
533 543
534 544 self.filename = abs_file
535 545
536 546 return 1
537 547
538 548 def __str__(self):
539 549
540 550 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
541 551 self.id,
542 552 self.name,
543 553 self.description,
544 554 )
545 555
546 556 for conf in self.configurations.values():
547 557 text += '{}'.format(conf)
548 558
549 559 return text
550 560
551 561 def createObjects(self):
552 562
553 563 keys = list(self.configurations.keys())
554 564 keys.sort()
555 565 for key in keys:
556 566 conf = self.configurations[key]
557 567 conf.createObjects()
558 568 if conf.inputId is not None:
559 569 if isinstance(conf.inputId, list):
560 570 conf.object.setInput([self.configurations[x].object for x in conf.inputId])
561 571 else:
562 572 conf.object.setInput([self.configurations[conf.inputId].object])
563 573
564 574 def monitor(self):
565 575
566 576 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
567 577 t.start()
568 578
569 579 def _monitor(self, queue, ctx):
570 580
571 581 import socket
572 582
573 583 procs = 0
574 584 err_msg = ''
575 585
576 586 while True:
577 587 msg = queue.get()
578 588 if '#_start_#' in msg:
579 589 procs += 1
580 590 elif '#_end_#' in msg:
581 591 procs -=1
582 592 else:
583 593 err_msg = msg
584 594
585 595 if procs == 0 or 'Traceback' in err_msg:
586 596 break
587 597 time.sleep(0.1)
588 598
589 599 if '|' in err_msg:
590 600 name, err = err_msg.split('|')
591 601 if 'SchainWarning' in err:
592 602 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
593 603 elif 'SchainError' in err:
594 604 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
595 605 else:
596 606 log.error(err, name)
597 607 else:
598 608 name, err = self.name, err_msg
599 609
600 610 time.sleep(1)
601 611
602 612 ctx.term()
603 613
604 614 message = ''.join(err)
605 615
606 616 if err_msg:
607 617 subject = 'SChain v%s: Error running %s\n' % (
608 618 schainpy.__version__, self.name)
609 619
610 620 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
611 621 socket.gethostname())
612 622 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
613 623 subtitle += 'Configuration file: %s\n' % self.filename
614 624 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
615 625
616 626 readUnitConfObj = self.getReadUnit()
617 627 if readUnitConfObj:
618 628 subtitle += '\nInput parameters:\n'
619 629 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
620 630 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
621 631 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
622 632 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
623 633 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
624 634
625 635 a = Alarm(
626 636 modes=self.alarm,
627 637 email=self.email,
628 638 message=message,
629 639 subject=subject,
630 640 subtitle=subtitle,
631 641 filename=self.filename
632 642 )
633 643
634 644 a.start()
635 645
636 646 def setFilename(self, filename):
637 647
638 648 self.filename = filename
639 649
640 650 def runProcs(self):
641 651
642 652 err = False
643 653 n = len(self.configurations)
644 654
645 655 while not err:
646 656 for conf in self.getUnits():
647 657 ok = conf.run()
648 658 if ok == 'Error':
649 659 n -= 1
650 660 continue
651 661 elif not ok:
652 662 break
653 663 if n == 0:
654 664 err = True
655 665
656 666 def run(self):
657 667
658 668 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
659 669 self.started = True
660 670 self.start_time = time.time()
661 671 self.createObjects()
672 sig.signal(sig.SIGTERM, handler)
662 673 self.runProcs()
663 674 log.success('{} Done (Time: {:4.2f}s)'.format(
664 675 self.name,
665 676 time.time()-self.start_time), '')
General Comments 0
You need to be logged in to leave comments. Login now