##// END OF EJS Templates
Fix external operation for Senders
Juan C. Espinoza -
r1298:c763f649a942
parent child
Show More
@@ -1,648 +1,648
1 1 '''
2 2 Main routines to create a Signal Chain project
3 3 '''
4 4
5 5 import re
6 6 import sys
7 7 import ast
8 8 import datetime
9 9 import traceback
10 10 import time
11 11 from multiprocessing import Process, Queue
12 12 from threading import Thread
13 13 from xml.etree.ElementTree import ElementTree, Element, SubElement
14 14
15 15 from schainpy.admin import Alarm, SchainWarning
16 16 from schainpy.model import *
17 17 from schainpy.utils import log
18 18
19 19
20 20 class ConfBase():
21 21
22 22 def __init__(self):
23 23
24 24 self.id = '0'
25 25 self.name = None
26 26 self.priority = None
27 27 self.parameters = {}
28 28 self.object = None
29 29 self.operations = []
30 30
31 31 def getId(self):
32 32
33 33 return self.id
34 34
35 35 def getNewId(self):
36 36
37 37 return int(self.id) * 10 + len(self.operations) + 1
38 38
39 39 def updateId(self, new_id):
40 40
41 41 self.id = str(new_id)
42 42
43 43 n = 1
44 44 for conf in self.operations:
45 45 conf_id = str(int(new_id) * 10 + n)
46 46 conf.updateId(conf_id)
47 47 n += 1
48 48
49 49 def getKwargs(self):
50 50
51 51 params = {}
52 52
53 53 for key, value in self.parameters.items():
54 54 if value not in (None, '', ' '):
55 55 params[key] = value
56 56
57 57 return params
58 58
59 59 def update(self, **kwargs):
60 60
61 61 for key, value in kwargs.items():
62 62 self.addParameter(name=key, value=value)
63 63
64 64 def addParameter(self, name, value, format=None):
65 65 '''
66 66 '''
67 67
68 68 if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
69 69 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
70 70 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
71 71 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
72 72 else:
73 73 try:
74 74 self.parameters[name] = ast.literal_eval(value)
75 75 except:
76 76 if isinstance(value, str) and ',' in value:
77 77 self.parameters[name] = value.split(',')
78 78 else:
79 79 self.parameters[name] = value
80 80
81 81 def getParameters(self):
82 82
83 83 params = {}
84 84 for key, value in self.parameters.items():
85 85 s = type(value).__name__
86 86 if s == 'date':
87 87 params[key] = value.strftime('%Y/%m/%d')
88 88 elif s == 'time':
89 89 params[key] = value.strftime('%H:%M:%S')
90 90 else:
91 91 params[key] = str(value)
92 92
93 93 return params
94 94
95 95 def makeXml(self, element):
96 96
97 97 xml = SubElement(element, self.ELEMENTNAME)
98 98 for label in self.xml_labels:
99 99 xml.set(label, str(getattr(self, label)))
100 100
101 101 for key, value in self.getParameters().items():
102 102 xml_param = SubElement(xml, 'Parameter')
103 103 xml_param.set('name', key)
104 104 xml_param.set('value', value)
105 105
106 106 for conf in self.operations:
107 107 conf.makeXml(xml)
108 108
109 109 def __str__(self):
110 110
111 111 if self.ELEMENTNAME == 'Operation':
112 112 s = ' {}[id={}]\n'.format(self.name, self.id)
113 113 else:
114 114 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
115 115
116 116 for key, value in self.parameters.items():
117 117 if self.ELEMENTNAME == 'Operation':
118 118 s += ' {}: {}\n'.format(key, value)
119 119 else:
120 120 s += ' {}: {}\n'.format(key, value)
121 121
122 122 for conf in self.operations:
123 123 s += str(conf)
124 124
125 125 return s
126 126
127 127 class OperationConf(ConfBase):
128 128
129 129 ELEMENTNAME = 'Operation'
130 130 xml_labels = ['id', 'name']
131 131
132 132 def setup(self, id, name, priority, project_id, err_queue):
133 133
134 134 self.id = str(id)
135 135 self.project_id = project_id
136 136 self.name = name
137 137 self.type = 'other'
138 138 self.err_queue = err_queue
139 139
140 140 def readXml(self, element, project_id, err_queue):
141 141
142 142 self.id = element.get('id')
143 143 self.name = element.get('name')
144 144 self.type = 'other'
145 145 self.project_id = str(project_id)
146 146 self.err_queue = err_queue
147 147
148 148 for elm in element.iter('Parameter'):
149 149 self.addParameter(elm.get('name'), elm.get('value'))
150 150
151 151 def createObject(self):
152 152
153 153 className = eval(self.name)
154 154
155 if 'Plot' in self.name or 'Writer' in self.name:
155 if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name:
156 156 kwargs = self.getKwargs()
157 157 opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
158 158 opObj.start()
159 159 self.type = 'external'
160 160 else:
161 161 opObj = className()
162 162
163 163 self.object = opObj
164 164 return opObj
165 165
166 166 class ProcUnitConf(ConfBase):
167 167
168 168 ELEMENTNAME = 'ProcUnit'
169 169 xml_labels = ['id', 'inputId', 'name']
170 170
171 171 def setup(self, project_id, id, name, datatype, inputId, err_queue):
172 172 '''
173 173 '''
174 174
175 175 if datatype == None and name == None:
176 176 raise ValueError('datatype or name should be defined')
177 177
178 178 if name == None:
179 179 if 'Proc' in datatype:
180 180 name = datatype
181 181 else:
182 182 name = '%sProc' % (datatype)
183 183
184 184 if datatype == None:
185 185 datatype = name.replace('Proc', '')
186 186
187 187 self.id = str(id)
188 188 self.project_id = project_id
189 189 self.name = name
190 190 self.datatype = datatype
191 191 self.inputId = inputId
192 192 self.err_queue = err_queue
193 193 self.operations = []
194 194 self.parameters = {}
195 195
196 196 def removeOperation(self, id):
197 197
198 198 i = [1 if x.id==id else 0 for x in self.operations]
199 199 self.operations.pop(i.index(1))
200 200
201 201 def getOperation(self, id):
202 202
203 203 for conf in self.operations:
204 204 if conf.id == id:
205 205 return conf
206 206
207 207 def addOperation(self, name, optype='self'):
208 208 '''
209 209 '''
210 210
211 211 id = self.getNewId()
212 212 conf = OperationConf()
213 213 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
214 214 self.operations.append(conf)
215 215
216 216 return conf
217 217
218 218 def readXml(self, element, project_id, err_queue):
219 219
220 220 self.id = element.get('id')
221 221 self.name = element.get('name')
222 222 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
223 223 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
224 224 self.project_id = str(project_id)
225 225 self.err_queue = err_queue
226 226 self.operations = []
227 227 self.parameters = {}
228 228
229 229 for elm in element:
230 230 if elm.tag == 'Parameter':
231 231 self.addParameter(elm.get('name'), elm.get('value'))
232 232 elif elm.tag == 'Operation':
233 233 conf = OperationConf()
234 234 conf.readXml(elm, project_id, err_queue)
235 235 self.operations.append(conf)
236 236
237 237 def createObjects(self):
238 238 '''
239 239 Instancia de unidades de procesamiento.
240 240 '''
241 241
242 242 className = eval(self.name)
243 243 kwargs = self.getKwargs()
244 244 procUnitObj = className()
245 245 procUnitObj.name = self.name
246 246 log.success('creating process...', self.name)
247 247
248 248 for conf in self.operations:
249 249
250 250 opObj = conf.createObject()
251 251
252 252 log.success('adding operation: {}, type:{}'.format(
253 253 conf.name,
254 254 conf.type), self.name)
255 255
256 256 procUnitObj.addOperation(conf, opObj)
257 257
258 258 self.object = procUnitObj
259 259
260 260 def run(self):
261 261 '''
262 262 '''
263 263
264 264 return self.object.call(**self.getKwargs())
265 265
266 266
267 267 class ReadUnitConf(ProcUnitConf):
268 268
269 269 ELEMENTNAME = 'ReadUnit'
270 270
271 271 def __init__(self):
272 272
273 273 self.id = None
274 274 self.datatype = None
275 275 self.name = None
276 276 self.inputId = None
277 277 self.operations = []
278 278 self.parameters = {}
279 279
280 280 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
281 281 startTime='', endTime='', server=None, **kwargs):
282 282
283 283 if datatype == None and name == None:
284 284 raise ValueError('datatype or name should be defined')
285 285 if name == None:
286 286 if 'Reader' in datatype:
287 287 name = datatype
288 288 datatype = name.replace('Reader','')
289 289 else:
290 290 name = '{}Reader'.format(datatype)
291 291 if datatype == None:
292 292 if 'Reader' in name:
293 293 datatype = name.replace('Reader','')
294 294 else:
295 295 datatype = name
296 296 name = '{}Reader'.format(name)
297 297
298 298 self.id = id
299 299 self.project_id = project_id
300 300 self.name = name
301 301 self.datatype = datatype
302 302 self.err_queue = err_queue
303 303
304 304 self.addParameter(name='path', value=path)
305 305 self.addParameter(name='startDate', value=startDate)
306 306 self.addParameter(name='endDate', value=endDate)
307 307 self.addParameter(name='startTime', value=startTime)
308 308 self.addParameter(name='endTime', value=endTime)
309 309
310 310 for key, value in kwargs.items():
311 311 self.addParameter(name=key, value=value)
312 312
313 313
314 314 class Project(Process):
315 315
316 316 ELEMENTNAME = 'Project'
317 317
318 318 def __init__(self):
319 319
320 320 Process.__init__(self)
321 321 self.id = None
322 322 self.filename = None
323 323 self.description = None
324 324 self.email = None
325 325 self.alarm = []
326 326 self.configurations = {}
327 327 # self.err_queue = Queue()
328 328 self.err_queue = None
329 329 self.started = False
330 330
331 331 def getNewId(self):
332 332
333 333 idList = list(self.configurations.keys())
334 334 id = int(self.id) * 10
335 335
336 336 while True:
337 337 id += 1
338 338
339 339 if str(id) in idList:
340 340 continue
341 341
342 342 break
343 343
344 344 return str(id)
345 345
346 346 def updateId(self, new_id):
347 347
348 348 self.id = str(new_id)
349 349
350 350 keyList = list(self.configurations.keys())
351 351 keyList.sort()
352 352
353 353 n = 1
354 354 new_confs = {}
355 355
356 356 for procKey in keyList:
357 357
358 358 conf = self.configurations[procKey]
359 359 idProcUnit = str(int(self.id) * 10 + n)
360 360 conf.updateId(idProcUnit)
361 361 new_confs[idProcUnit] = conf
362 362 n += 1
363 363
364 364 self.configurations = new_confs
365 365
366 366 def setup(self, id=1, name='', description='', email=None, alarm=[]):
367 367
368 368 self.id = str(id)
369 369 self.description = description
370 370 self.email = email
371 371 self.alarm = alarm
372 372 if name:
373 373 self.name = '{} ({})'.format(Process.__name__, name)
374 374
375 375 def update(self, **kwargs):
376 376
377 377 for key, value in kwargs.items():
378 378 setattr(self, key, value)
379 379
380 380 def clone(self):
381 381
382 382 p = Project()
383 383 p.id = self.id
384 384 p.name = self.name
385 385 p.description = self.description
386 386 p.configurations = self.configurations.copy()
387 387
388 388 return p
389 389
390 390 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
391 391
392 392 '''
393 393 '''
394 394
395 395 if id is None:
396 396 idReadUnit = self.getNewId()
397 397 else:
398 398 idReadUnit = str(id)
399 399
400 400 conf = ReadUnitConf()
401 401 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
402 402 self.configurations[conf.id] = conf
403 403
404 404 return conf
405 405
406 406 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
407 407
408 408 '''
409 409 '''
410 410
411 411 if id is None:
412 412 idProcUnit = self.getNewId()
413 413 else:
414 414 idProcUnit = id
415 415
416 416 conf = ProcUnitConf()
417 417 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
418 418 self.configurations[conf.id] = conf
419 419
420 420 return conf
421 421
422 422 def removeProcUnit(self, id):
423 423
424 424 if id in self.configurations:
425 425 self.configurations.pop(id)
426 426
427 427 def getReadUnit(self):
428 428
429 429 for obj in list(self.configurations.values()):
430 430 if obj.ELEMENTNAME == 'ReadUnit':
431 431 return obj
432 432
433 433 return None
434 434
435 435 def getProcUnit(self, id):
436 436
437 437 return self.configurations[id]
438 438
439 439 def getUnits(self):
440 440
441 441 keys = list(self.configurations)
442 442 keys.sort()
443 443
444 444 for key in keys:
445 445 yield self.configurations[key]
446 446
447 447 def updateUnit(self, id, **kwargs):
448 448
449 449 conf = self.configurations[id].update(**kwargs)
450 450
451 451 def makeXml(self):
452 452
453 453 xml = Element('Project')
454 454 xml.set('id', str(self.id))
455 455 xml.set('name', self.name)
456 456 xml.set('description', self.description)
457 457
458 458 for conf in self.configurations.values():
459 459 conf.makeXml(xml)
460 460
461 461 self.xml = xml
462 462
463 463 def writeXml(self, filename=None):
464 464
465 465 if filename == None:
466 466 if self.filename:
467 467 filename = self.filename
468 468 else:
469 469 filename = 'schain.xml'
470 470
471 471 if not filename:
472 472 print('filename has not been defined. Use setFilename(filename) for do it.')
473 473 return 0
474 474
475 475 abs_file = os.path.abspath(filename)
476 476
477 477 if not os.access(os.path.dirname(abs_file), os.W_OK):
478 478 print('No write permission on %s' % os.path.dirname(abs_file))
479 479 return 0
480 480
481 481 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
482 482 print('File %s already exists and it could not be overwriten' % abs_file)
483 483 return 0
484 484
485 485 self.makeXml()
486 486
487 487 ElementTree(self.xml).write(abs_file, method='xml')
488 488
489 489 self.filename = abs_file
490 490
491 491 return 1
492 492
493 493 def readXml(self, filename):
494 494
495 495 abs_file = os.path.abspath(filename)
496 496
497 497 self.configurations = {}
498 498
499 499 try:
500 500 self.xml = ElementTree().parse(abs_file)
501 501 except:
502 502 log.error('Error reading %s, verify file format' % filename)
503 503 return 0
504 504
505 505 self.id = self.xml.get('id')
506 506 self.name = self.xml.get('name')
507 507 self.description = self.xml.get('description')
508 508
509 509 for element in self.xml:
510 510 if element.tag == 'ReadUnit':
511 511 conf = ReadUnitConf()
512 512 conf.readXml(element, self.id, self.err_queue)
513 513 self.configurations[conf.id] = conf
514 514 elif element.tag == 'ProcUnit':
515 515 conf = ProcUnitConf()
516 516 input_proc = self.configurations[element.get('inputId')]
517 517 conf.readXml(element, self.id, self.err_queue)
518 518 self.configurations[conf.id] = conf
519 519
520 520 self.filename = abs_file
521 521
522 522 return 1
523 523
524 524 def __str__(self):
525 525
526 526 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
527 527 self.id,
528 528 self.name,
529 529 self.description,
530 530 )
531 531
532 532 for conf in self.configurations.values():
533 533 text += '{}'.format(conf)
534 534
535 535 return text
536 536
537 537 def createObjects(self):
538 538
539 539 keys = list(self.configurations.keys())
540 540 keys.sort()
541 541 for key in keys:
542 542 conf = self.configurations[key]
543 543 conf.createObjects()
544 544 if conf.inputId is not None:
545 545 conf.object.setInput(self.configurations[conf.inputId].object)
546 546
547 547 def monitor(self):
548 548
549 549 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
550 550 t.start()
551 551
552 552 def _monitor(self, queue, ctx):
553 553
554 554 import socket
555 555
556 556 procs = 0
557 557 err_msg = ''
558 558
559 559 while True:
560 560 msg = queue.get()
561 561 if '#_start_#' in msg:
562 562 procs += 1
563 563 elif '#_end_#' in msg:
564 564 procs -=1
565 565 else:
566 566 err_msg = msg
567 567
568 568 if procs == 0 or 'Traceback' in err_msg:
569 569 break
570 570 time.sleep(0.1)
571 571
572 572 if '|' in err_msg:
573 573 name, err = err_msg.split('|')
574 574 if 'SchainWarning' in err:
575 575 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
576 576 elif 'SchainError' in err:
577 577 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
578 578 else:
579 579 log.error(err, name)
580 580 else:
581 581 name, err = self.name, err_msg
582 582
583 583 time.sleep(1)
584 584
585 585 ctx.term()
586 586
587 587 message = ''.join(err)
588 588
589 589 if err_msg:
590 590 subject = 'SChain v%s: Error running %s\n' % (
591 591 schainpy.__version__, self.name)
592 592
593 593 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
594 594 socket.gethostname())
595 595 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
596 596 subtitle += 'Configuration file: %s\n' % self.filename
597 597 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
598 598
599 599 readUnitConfObj = self.getReadUnit()
600 600 if readUnitConfObj:
601 601 subtitle += '\nInput parameters:\n'
602 602 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
603 603 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
604 604 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
605 605 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
606 606 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
607 607
608 608 a = Alarm(
609 609 modes=self.alarm,
610 610 email=self.email,
611 611 message=message,
612 612 subject=subject,
613 613 subtitle=subtitle,
614 614 filename=self.filename
615 615 )
616 616
617 617 a.start()
618 618
619 619 def setFilename(self, filename):
620 620
621 621 self.filename = filename
622 622
623 623 def runProcs(self):
624 624
625 625 err = False
626 626 n = len(self.configurations)
627 627
628 628 while not err:
629 629 for conf in self.getUnits():
630 630 ok = conf.run()
631 631 if ok is 'Error':
632 632 n -= 1
633 633 continue
634 634 elif not ok:
635 635 break
636 636 if n == 0:
637 637 err = True
638 638
639 639 def run(self):
640 640
641 641 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
642 642 self.started = True
643 643 self.start_time = time.time()
644 644 self.createObjects()
645 645 self.runProcs()
646 646 log.success('{} Done (Time: {:4.2f}s)'.format(
647 647 self.name,
648 648 time.time()-self.start_time), '')
General Comments 0
You need to be logged in to leave comments. Login now