##// END OF EJS Templates
Added ToLilBlock class from Roberto
Christianpl -
r1789:2739006ee497 isr_v2
parent child
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -1,677 +1,705
1 1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
2 2 # All rights reserved.
3 3 #
4 4 # Distributed under the terms of the BSD 3-clause license.
5 5 """API to create signal chain projects
6 6
7 7 The API is provide through class: Project
8 8 """
9 9
10 10 import re
11 11 import sys
12 12 import ast
13 13 import datetime
14 14 import traceback
15 15 import time
16 16 import multiprocessing
17 17 from multiprocessing import Process, Queue
18 18 from threading import Thread
19 19 from xml.etree.ElementTree import ElementTree, Element, SubElement
20 20
21 21 from schainpy.admin import Alarm, SchainWarning
22 22 from schainpy.model import *
23 23 from schainpy.utils import log
24 24
25 25 if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7:
26 26 multiprocessing.set_start_method('fork')
27 27
28 28 class ConfBase():
29 29
30 30 def __init__(self):
31 31
32 32 self.id = '0'
33 33 self.name = None
34 34 self.priority = None
35 35 self.parameters = {}
36 36 self.object = None
37 37 self.operations = []
38 38
39 39 def getId(self):
40 40
41 41 return self.id
42 42
43 43 def getNewId(self):
44 44
45 45 return int(self.id) * 10 + len(self.operations) + 1
46 46
47 47 def updateId(self, new_id):
48 48
49 49 self.id = str(new_id)
50 50
51 51 n = 1
52 52 for conf in self.operations:
53 53 conf_id = str(int(new_id) * 10 + n)
54 54 conf.updateId(conf_id)
55 55 n += 1
56 56
57 57 def getKwargs(self):
58 58
59 59 params = {}
60 60
61 61 for key, value in self.parameters.items():
62 62 if value not in (None, '', ' '):
63 63 params[key] = value
64 64
65 65 return params
66 66
67 67 def update(self, **kwargs):
68 68
69 69 for key, value in kwargs.items():
70 70 self.addParameter(name=key, value=value)
71 71
72 72 def addParameter(self, name, value, format=None):
73 73 '''
74 74 '''
75 75
76 76 if format is not None:
77 77 self.parameters[name] = eval(format)(value)
78 78 elif isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
79 79 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
80 80 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
81 81 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
82 82 else:
83 83 try:
84 84 self.parameters[name] = ast.literal_eval(value)
85 85 except:
86 86 if isinstance(value, str) and ',' in value:
87 87 self.parameters[name] = value.split(',')
88 88 else:
89 89 self.parameters[name] = value
90 90
91 91 def getParameters(self):
92 92
93 93 params = {}
94 94 for key, value in self.parameters.items():
95 95 s = type(value).__name__
96 96 if s == 'date':
97 97 params[key] = value.strftime('%Y/%m/%d')
98 98 elif s == 'time':
99 99 params[key] = value.strftime('%H:%M:%S')
100 100 else:
101 101 params[key] = str(value)
102 102
103 103 return params
104 104
105 105 def makeXml(self, element):
106 106
107 107 xml = SubElement(element, self.ELEMENTNAME)
108 108 for label in self.xml_labels:
109 109 xml.set(label, str(getattr(self, label)))
110 110
111 111 for key, value in self.getParameters().items():
112 112 xml_param = SubElement(xml, 'Parameter')
113 113 xml_param.set('name', key)
114 114 xml_param.set('value', value)
115 115
116 116 for conf in self.operations:
117 117 conf.makeXml(xml)
118 118
119 119 def __str__(self):
120 120
121 121 if self.ELEMENTNAME == 'Operation':
122 122 s = ' {}[id={}]\n'.format(self.name, self.id)
123 123 else:
124 124 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
125 125
126 126 for key, value in self.parameters.items():
127 127 if self.ELEMENTNAME == 'Operation':
128 128 s += ' {}: {}\n'.format(key, value)
129 129 else:
130 130 s += ' {}: {}\n'.format(key, value)
131 131
132 132 for conf in self.operations:
133 133 s += str(conf)
134 134
135 135 return s
136 136
137 137 class OperationConf(ConfBase):
138 138
139 139 ELEMENTNAME = 'Operation'
140 140 xml_labels = ['id', 'name']
141 141
142 142 def setup(self, id, name, priority, project_id, err_queue):
143 143
144 144 self.id = str(id)
145 145 self.project_id = project_id
146 146 self.name = name
147 147 self.type = 'other'
148 148 self.err_queue = err_queue
149 149
150 150 def readXml(self, element, project_id, err_queue):
151 151
152 152 self.id = element.get('id')
153 153 self.name = element.get('name')
154 154 self.type = 'other'
155 155 self.project_id = str(project_id)
156 156 self.err_queue = err_queue
157 157
158 158 for elm in element.iter('Parameter'):
159 159 self.addParameter(elm.get('name'), elm.get('value'))
160 160
161 161 def createObject(self):
162 162
163 163 className = eval(self.name)
164 164
165 165 if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name:
166 166 kwargs = self.getKwargs()
167 167 opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
168 168 opObj.start()
169 169 self.type = 'external'
170 170 else:
171 171 opObj = className()
172 172
173 173 self.object = opObj
174 174 return opObj
175 175
176 176 class ProcUnitConf(ConfBase):
177 177
178 178 ELEMENTNAME = 'ProcUnit'
179 179 xml_labels = ['id', 'inputId', 'name']
180 180
181 181 def setup(self, project_id, id, name, datatype, inputId, err_queue):
182 182 '''
183 183 '''
184 184
185 185 if datatype == None and name == None:
186 186 raise ValueError('datatype or name should be defined')
187 187
188 188 if name == None:
189 189 if 'Proc' in datatype:
190 190 name = datatype
191 191 else:
192 192 name = '%sProc' % (datatype)
193 193
194 194 if datatype == None:
195 195 datatype = name.replace('Proc', '')
196 196
197 197 self.id = str(id)
198 198 self.project_id = project_id
199 199 self.name = name
200 200 self.datatype = datatype
201 201 self.inputId = inputId
202 202 self.err_queue = err_queue
203 203 self.operations = []
204 204 self.parameters = {}
205 205
206 206 def removeOperation(self, id):
207 207
208 208 i = [1 if x.id == id else 0 for x in self.operations]
209 209 self.operations.pop(i.index(1))
210 210
211 211 def getOperation(self, id):
212 212
213 213 for conf in self.operations:
214 214 if conf.id == id:
215 215 return conf
216 216
217 217 def addOperation(self, name, optype='self'):
218 218 '''
219 219 '''
220 220
221 221 id = self.getNewId()
222 222 conf = OperationConf()
223 223 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
224 224 self.operations.append(conf)
225 225
226 226 return conf
227 227
228 228 def readXml(self, element, project_id, err_queue):
229 229
230 230 self.id = element.get('id')
231 231 self.name = element.get('name')
232 232 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
233 233 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
234 234 self.project_id = str(project_id)
235 235 self.err_queue = err_queue
236 236 self.operations = []
237 237 self.parameters = {}
238 238
239 239 for elm in element:
240 240 if elm.tag == 'Parameter':
241 241 self.addParameter(elm.get('name'), elm.get('value'))
242 242 elif elm.tag == 'Operation':
243 243 conf = OperationConf()
244 244 conf.readXml(elm, project_id, err_queue)
245 245 self.operations.append(conf)
246 246
247 247 def createObjects(self):
248 248 '''
249 249 Instancia de unidades de procesamiento.
250 250 '''
251 251
252 252 className = eval(self.name)
253 253 kwargs = self.getKwargs()
254 254 procUnitObj = className()
255 255 procUnitObj.name = self.name
256 256 log.success('creating process...', self.name)
257 257
258 258 for conf in self.operations:
259 259
260 260 opObj = conf.createObject()
261 261
262 262 log.success('adding operation: {}, type:{}'.format(
263 263 conf.name,
264 264 conf.type), self.name)
265 265
266 266 procUnitObj.addOperation(conf, opObj)
267 267
268 268 self.object = procUnitObj
269 269
270 270 def run(self):
271 271 '''
272 272 '''
273 273 #self.object.call(**self.getKwargs())
274 274
275 275 return self.object.call(**self.getKwargs())
276 276
277 277
278 278 class ReadUnitConf(ProcUnitConf):
279 279
280 280 ELEMENTNAME = 'ReadUnit'
281 281
282 282 def __init__(self):
283 283
284 284 self.id = None
285 285 self.datatype = None
286 286 self.name = None
287 287 self.inputId = None
288 288 self.operations = []
289 289 self.parameters = {}
290 290
291 291 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
292 292 startTime='', endTime='', server=None, topic='', **kwargs):
293 293
294 294 if datatype == None and name == None:
295 295 raise ValueError('datatype or name should be defined')
296 296 if name == None:
297 297 if 'Reader' in datatype:
298 298 name = datatype
299 299 datatype = name.replace('Reader', '')
300 300 else:
301 301 name = '{}Reader'.format(datatype)
302 302 if datatype == None:
303 303 if 'Reader' in name:
304 304 datatype = name.replace('Reader', '')
305 305 else:
306 306 datatype = name
307 307 name = '{}Reader'.format(name)
308 308
309 309 self.id = id
310 310 self.project_id = project_id
311 311 self.name = name
312 312 self.datatype = datatype
313 313 self.err_queue = err_queue
314 314
315 315 self.addParameter(name='path', value=path)
316 316 self.addParameter(name='startDate', value=startDate)
317 317 self.addParameter(name='endDate', value=endDate)
318 318 self.addParameter(name='startTime', value=startTime)
319 319 self.addParameter(name='endTime', value=endTime)
320 320 self.addParameter(name='server', value=server)
321 321 self.addParameter(name='topic', value=topic)
322 322
323 323 for key, value in kwargs.items():
324 324 self.addParameter(name=key, value=value)
325 325
326 326
327 327 class Project(Process):
328 328 """API to create signal chain projects"""
329 329
330 330 ELEMENTNAME = 'Project'
331 331
332 332 def __init__(self, name=''):
333 333
334 334 Process.__init__(self)
335 335 self.id = '1'
336 336 if name:
337 337 self.name = '{} ({})'.format(Process.__name__, name)
338 338 self.filename = None
339 339 self.description = None
340 340 self.email = None
341 341 self.alarm = []
342 342 self.configurations = {}
343 343 # self.err_queue = Queue()
344 344 self.err_queue = None
345 345 self.started = False
346 346
347 347 def getNewId(self):
348 348
349 349 idList = list(self.configurations.keys())
350 350 id = int(self.id) * 10
351 351
352 352 while True:
353 353 id += 1
354 354
355 355 if str(id) in idList:
356 356 continue
357 357
358 358 break
359 359
360 360 return str(id)
361 361
362 362 def updateId(self, new_id):
363 363
364 364 self.id = str(new_id)
365 365
366 366 keyList = list(self.configurations.keys())
367 367 keyList.sort()
368 368
369 369 n = 1
370 370 new_confs = {}
371 371
372 372 for procKey in keyList:
373 373
374 374 conf = self.configurations[procKey]
375 375 idProcUnit = str(int(self.id) * 10 + n)
376 376 conf.updateId(idProcUnit)
377 377 new_confs[idProcUnit] = conf
378 378 n += 1
379 379
380 380 self.configurations = new_confs
381 381
382 382 def setup(self, id=1, name='', description='', email=None, alarm=[]):
383 383
384 384 self.id = str(id)
385 385 self.description = description
386 386 self.email = email
387 387 self.alarm = alarm
388 388 if name:
389 389 self.name = '{} ({})'.format(Process.__name__, name)
390 390
391 391 def update(self, **kwargs):
392 392
393 393 for key, value in kwargs.items():
394 394 setattr(self, key, value)
395 395
396 396 def clone(self):
397 397
398 398 p = Project()
399 399 p.id = self.id
400 400 p.name = self.name
401 401 p.description = self.description
402 402 p.configurations = self.configurations.copy()
403 403
404 404 return p
405 405
406 406 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
407 407
408 408 '''
409 409 '''
410 410
411 411 if id is None:
412 412 idReadUnit = self.getNewId()
413 413 else:
414 414 idReadUnit = str(id)
415 415
416 416 conf = ReadUnitConf()
417 417 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
418 418 self.configurations[conf.id] = conf
419 419
420 420 return conf
421 421
422 422 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
423 423
424 424 '''
425 425 '''
426 426
427 427 if id is None:
428 428 idProcUnit = self.getNewId()
429 429 else:
430 430 idProcUnit = id
431 431
432 432 conf = ProcUnitConf()
433 433 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
434 434 self.configurations[conf.id] = conf
435 435
436 436 return conf
437 437
438 438 def removeProcUnit(self, id):
439 439
440 440 if id in self.configurations:
441 441 self.configurations.pop(id)
442 442
443 443 def getReadUnit(self):
444 444
445 445 for obj in list(self.configurations.values()):
446 446 if obj.ELEMENTNAME == 'ReadUnit':
447 447 return obj
448 448
449 449 return None
450 450
451 451 def getProcUnit(self, id):
452 452
453 453 return self.configurations[id]
454 454
455 455 def getUnits(self):
456 456
457 457 keys = list(self.configurations)
458 458 keys.sort()
459 459
460 460 for key in keys:
461 461 yield self.configurations[key]
462 462
463 463 def updateUnit(self, id, **kwargs):
464 464
465 465 conf = self.configurations[id].update(**kwargs)
466 466
467 467 def makeXml(self):
468 468
469 469 xml = Element('Project')
470 470 xml.set('id', str(self.id))
471 471 xml.set('name', self.name)
472 472 xml.set('description', self.description)
473 473
474 474 for conf in self.configurations.values():
475 475 conf.makeXml(xml)
476 476
477 477 self.xml = xml
478 478
479 479 def writeXml(self, filename=None):
480 480
481 481 if filename == None:
482 482 if self.filename:
483 483 filename = self.filename
484 484 else:
485 485 filename = 'schain.xml'
486 486
487 487 if not filename:
488 488 print('filename has not been defined. Use setFilename(filename) for do it.')
489 489 return 0
490 490
491 491 abs_file = os.path.abspath(filename)
492 492
493 493 if not os.access(os.path.dirname(abs_file), os.W_OK):
494 494 print('No write permission on %s' % os.path.dirname(abs_file))
495 495 return 0
496 496
497 497 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
498 498 print('File %s already exists and it could not be overwriten' % abs_file)
499 499 return 0
500 500
501 501 self.makeXml()
502 502
503 503 ElementTree(self.xml).write(abs_file, method='xml')
504 504
505 505 self.filename = abs_file
506 506
507 507 return 1
508 508
509 509 def readXml(self, filename):
510 510
511 511 abs_file = os.path.abspath(filename)
512 512
513 513 self.configurations = {}
514 514
515 515 try:
516 516 self.xml = ElementTree().parse(abs_file)
517 517 except:
518 518 log.error('Error reading %s, verify file format' % filename)
519 519 return 0
520 520
521 521 self.id = self.xml.get('id')
522 522 self.name = self.xml.get('name')
523 523 self.description = self.xml.get('description')
524 524
525 525 for element in self.xml:
526 526 if element.tag == 'ReadUnit':
527 527 conf = ReadUnitConf()
528 528 conf.readXml(element, self.id, self.err_queue)
529 529 self.configurations[conf.id] = conf
530 530 elif element.tag == 'ProcUnit':
531 531 conf = ProcUnitConf()
532 532 input_proc = self.configurations[element.get('inputId')]
533 533 conf.readXml(element, self.id, self.err_queue)
534 534 self.configurations[conf.id] = conf
535 535
536 536 self.filename = abs_file
537 537
538 538 return 1
539 539
540 540 def __str__(self):
541 541
542 542 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
543 543 self.id,
544 544 self.name,
545 545 self.description,
546 546 )
547 547
548 548 for conf in self.configurations.values():
549 549 text += '{}'.format(conf)
550 550
551 551 return text
552 552
553 553 def createObjects(self):
554 554
555 555 keys = list(self.configurations.keys())
556 556 keys.sort()
557 557 for key in keys:
558 558 conf = self.configurations[key]
559 559 conf.createObjects()
560 560 if 'Reader' in str(conf):
561 561 reader = conf.object
562 562 else:
563 563 conf.object.reader = reader
564 564 if conf.inputId is not None:
565 565 if isinstance(conf.inputId, list):
566 566 conf.object.setInput([self.configurations[x].object for x in conf.inputId])
567 567 else:
568 568 conf.object.setInput([self.configurations[conf.inputId].object])
569 569
570 570 def monitor(self):
571 571
572 572 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
573 573 t.start()
574 574
575 575 def _monitor(self, queue, ctx):
576 576
577 577 import socket
578 578
579 579 procs = 0
580 580 err_msg = ''
581 581
582 582 while True:
583 583 msg = queue.get()
584 584 if '#_start_#' in msg:
585 585 procs += 1
586 586 elif '#_end_#' in msg:
587 587 procs -= 1
588 588 else:
589 589 err_msg = msg
590 590
591 591 if procs == 0 or 'Traceback' in err_msg:
592 592 break
593 593 time.sleep(0.1)
594 594
595 595 if '|' in err_msg:
596 596 name, err = err_msg.split('|')
597 597 if 'SchainWarning' in err:
598 598 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
599 599 elif 'SchainError' in err:
600 600 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
601 601 else:
602 602 log.error(err, name)
603 603 else:
604 604 name, err = self.name, err_msg
605 605
606 606 time.sleep(1)
607 607
608 608 ctx.term()
609 609
610 610 message = ''.join(err)
611 611
612 612 if err_msg:
613 613 subject = 'SChain v%s: Error running %s\n' % (
614 614 schainpy.__version__, self.name)
615 615
616 616 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
617 617 socket.gethostname())
618 618 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
619 619 subtitle += 'Configuration file: %s\n' % self.filename
620 620 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
621 621
622 622 readUnitConfObj = self.getReadUnit()
623 623 if readUnitConfObj:
624 624 subtitle += '\nInput parameters:\n'
625 625 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
626 626 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
627 627 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
628 628 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
629 629 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
630 630
631 631 a = Alarm(
632 632 modes=self.alarm,
633 633 email=self.email,
634 634 message=message,
635 635 subject=subject,
636 636 subtitle=subtitle,
637 637 filename=self.filename
638 638 )
639 639
640 640 a.start()
641 641
642 642 def setFilename(self, filename):
643 643
644 644 self.filename = filename
645 645
646 646 def runProcs(self):
647 647
648 648 err = False
649 649 n = len(self.configurations)
650 #print(n)
651
650 flag_no_read = False
651 nProc_noRead = 0
652
653 #while not err:
654 # for conf in self.getUnits():
655 # ok = conf.run()
656 # if ok == 'Error':
657 # n -= 1
658 # continue
659 # elif not ok:
660 # break
661 # if n == 0:
662 # err = True
663
652 664 while not err:
653 #print(self.getUnits())
665 n_proc = 0
654 666 for conf in self.getUnits():
655 #print(conf)
656 ok = conf.run()
657 #print("ok", ok)
667 if flag_no_read:
668 if n_proc >= nProc_noRead:
669 ok = conf.run()
670 else:
671 n_proc += 1
672 continue
673 else:
674 ok = conf.run()
675
676 n_proc += 1
677
658 678 if ok == 'Error':
659 679 n -= 1
660 680 continue
681
682 elif ok == 'no_Read' and (not flag_no_read):
683 nProc_noRead = n_proc - 1
684 flag_no_read = True
685 continue
686 elif ok == 'new_Read':
687 nProc_noRead = 0
688 flag_no_read = False
689 continue
661 690 elif not ok:
662 691 break
663 #print("****************************************************end")
664 #exit(1)
692
665 693 if n == 0:
666 694 err = True
667 695
668 696 def run(self):
669 697
670 698 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
671 699 self.started = True
672 700 self.start_time = time.time()
673 701 self.createObjects()
674 702 self.runProcs()
675 703 log.success('{} Done (Time: {:4.2f}s)'.format(
676 704 self.name,
677 705 time.time() - self.start_time), '')
@@ -1,252 +1,246
1 1 '''
2 2 Base clases to create Processing units and operations, the MPDecorator
3 3 must be used in plotting and writing operations to allow to run as an
4 4 external process.
5 5 '''
6 6
7 7 import os
8 8 import inspect
9 9 import zmq
10 10 import time
11 11 import pickle
12 12 import traceback
13 13 from threading import Thread
14 14 from multiprocessing import Process, Queue
15 15 from schainpy.utils import log
16 16
17 17 QUEUE_SIZE = int(os.environ.get('QUEUE_MAX_SIZE', '100'))
18 18
19 19 class ProcessingUnit(object):
20 20 '''
21 21 Base class to create Signal Chain Units
22 22 '''
23 23
24 24 proc_type = 'processing'
25 25 bypass = False
26 26
27 27 def __init__(self):
28 28
29 29 self.dataIn = None
30 30 self.dataOut = None
31 31 self.isConfig = False
32 32 self.operations = []
33 33 self.name = 'Test'
34 34 self.inputs = []
35 35
36 36 def setInput(self, unit):
37 37
38 38 attr = 'dataIn'
39 39 for i, u in enumerate(unit):
40 40 if i==0:
41 41 #print(u.dataOut.flagNoData)
42 42 #exit(1)
43 43 self.dataIn = u.dataOut#.copy()
44 44 self.inputs.append('dataIn')
45 45 else:
46 46 setattr(self, 'dataIn{}'.format(i), u.dataOut)#.copy())
47 47 self.inputs.append('dataIn{}'.format(i))
48 48
49 49
50 50 def getAllowedArgs(self):
51 51 if hasattr(self, '__attrs__'):
52 52 return self.__attrs__
53 53 else:
54 54 return inspect.getargspec(self.run).args
55 55
56 56 def addOperation(self, conf, operation):
57 57 '''
58 58 '''
59 59
60 60 self.operations.append((operation, conf.type, conf.getKwargs()))
61 61
62 62 def getOperationObj(self, objId):
63 63
64 64 if objId not in list(self.operations.keys()):
65 65 return None
66 66
67 67 return self.operations[objId]
68 68
69 69 def call(self, **kwargs):
70 '''
71 '''
72 70
71 mybool = (self.dataOut.type == 'Voltage') and self.dataOut.useInputBuffer and (not self.dataOut.buffer_empty) #liberar desde buffer
72
73 73 try:
74 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
75 #if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error and not self.dataIn.runNextUnit:
76 if self.dataIn.runNextUnit:
77 #print("SUCCESSSSSSS")
78 #exit(1)
79 return not self.dataIn.isReady()
80 else:
81 return self.dataIn.isReady()
82 elif self.dataIn is None or not self.dataIn.error:
83 if 'Reader' in self.name and self.bypass:
84 print('Skipping...reader')
85 return self.dataOut.isReady()
74 if mybool:
75 #print("run jeje")
86 76 self.run(**kwargs)
87 elif self.dataIn.error:
88 #print("Elif 2")
89 self.dataOut.error = self.dataIn.error
90 self.dataOut.flagNoData = True
77 else:
78 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
79 return self.dataIn.isReady()
80 elif self.dataIn is None or not self.dataIn.error: #unidad de lectura o procesamiento regular
81 self.run(**kwargs)
82 elif self.dataIn.error:
83 self.dataOut.error = self.dataIn.error
84 self.dataOut.flagNoData = True
85 print("exec proc error")
86
91 87 except:
92 #print("Except")
88
93 89 err = traceback.format_exc()
94 90 if 'SchainWarning' in err:
95 91 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name)
96 92 elif 'SchainError' in err:
97 93 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), self.name)
98 94 else:
99 95 log.error(err, self.name)
100 96 self.dataOut.error = True
101 #print("before op")
97
98
102 99 for op, optype, opkwargs in self.operations:
103 aux = self.dataOut.copy()
104 #aux = copy.deepcopy(self.dataOut)
105 #print("**********************Before",op)
106 if optype == 'other' and not self.dataOut.flagNoData:
107 #print("**********************Other",op)
108 #print(self.dataOut.flagNoData)
109 self.dataOut = op.run(self.dataOut, **opkwargs)
110 elif optype == 'external' and not self.dataOut.flagNoData:
111 op.queue.put(aux)
100
101 if (optype == 'other' and self.dataOut.isReady()) or mybool:
102 try:
103 self.dataOut = op.run(self.dataOut, **opkwargs)
104 except Exception as e:
105 print(e)
106 self.dataOut.error = True
107 return 'Error'
108 elif optype == 'external' and self.dataOut.isReady() :
109 op.queue.put(copy.deepcopy(self.dataOut))
112 110 elif optype == 'external' and self.dataOut.error:
113 op.queue.put(aux)
114 #elif optype == 'external' and self.dataOut.isReady():
115 #op.queue.put(copy.deepcopy(self.dataOut))
116 #print(not self.dataOut.isReady())
111 op.queue.put(copy.deepcopy(self.dataOut))
117 112
118 try:
119 if self.dataOut.runNextUnit:
120 runNextUnit = self.dataOut.runNextUnit
121 #print(self.operations)
122 #print("Tru")
123 113
114 if not self.dataOut.error:
115 if self.dataOut.type == 'Voltage':
116 if not self.dataOut.buffer_empty : #continue
117 return 'no_Read'
118 elif self.dataOut.useInputBuffer and (self.dataOut.buffer_empty) and self.dataOut.isReady() :
119 return 'new_Read'
120 else:
121 return True
124 122 else:
125 runNextUnit = self.dataOut.isReady()
126 except:
127 runNextUnit = self.dataOut.isReady()
128 #exit(1)
129 #if not self.dataOut.isReady():
130 #return 'Error' if self.dataOut.error else input()
131 #print("NexT",runNextUnit)
132 #print("error: ",self.dataOut.error)
133 return 'Error' if self.dataOut.error else runNextUnit# self.dataOut.isReady()
123 #print("ret True")
124 return True
125 else:
126 return 'Error'
127 #return 'Error' if self.dataOut.error else True #self.dataOut.isReady()
134 128
135 129 def setup(self):
136 130
137 131 raise NotImplementedError
138 132
139 133 def run(self):
140 134
141 135 raise NotImplementedError
142 136
143 137 def close(self):
144 138
145 139 return
146 140
147 141
148 142 class Operation(object):
149 143
150 144 '''
151 145 '''
152 146
153 147 proc_type = 'operation'
154 148
155 149 def __init__(self):
156 150
157 151 self.id = None
158 152 self.isConfig = False
159 153
160 154 if not hasattr(self, 'name'):
161 155 self.name = self.__class__.__name__
162 156
163 157 def getAllowedArgs(self):
164 158 if hasattr(self, '__attrs__'):
165 159 return self.__attrs__
166 160 else:
167 161 return inspect.getargspec(self.run).args
168 162
169 163 def setup(self):
170 164
171 165 self.isConfig = True
172 166
173 167 raise NotImplementedError
174 168
175 169 def run(self, dataIn, **kwargs):
176 170 """
177 171 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
178 172 atributos del objeto dataIn.
179 173
180 174 Input:
181 175
182 176 dataIn : objeto del tipo JROData
183 177
184 178 Return:
185 179
186 180 None
187 181
188 182 Affected:
189 183 __buffer : buffer de recepcion de datos.
190 184
191 185 """
192 186 if not self.isConfig:
193 187 self.setup(**kwargs)
194 188
195 189 raise NotImplementedError
196 190
197 191 def close(self):
198 192
199 193 return
200 194
201 195
202 196 def MPDecorator(BaseClass):
203 197 """
204 198 Multiprocessing class decorator
205 199
206 200 This function add multiprocessing features to a BaseClass.
207 201 """
208 202
209 203 class MPClass(BaseClass, Process):
210 204
211 205 def __init__(self, *args, **kwargs):
212 206 super(MPClass, self).__init__()
213 207 Process.__init__(self)
214 208
215 209 self.args = args
216 210 self.kwargs = kwargs
217 211 self.t = time.time()
218 212 self.op_type = 'external'
219 213 self.name = BaseClass.__name__
220 214 self.__doc__ = BaseClass.__doc__
221 215
222 216 if 'plot' in self.name.lower() and not self.name.endswith('_'):
223 217 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
224 218
225 219 self.start_time = time.time()
226 220 self.err_queue = args[3]
227 221 self.queue = Queue(maxsize=QUEUE_SIZE)
228 222 self.myrun = BaseClass.run
229 223
230 224 def run(self):
231 225
232 226 while True:
233 227
234 228 dataOut = self.queue.get()
235 229
236 230 if not dataOut.error:
237 231 try:
238 232 BaseClass.run(self, dataOut, **self.kwargs)
239 233 except:
240 234 err = traceback.format_exc()
241 235 log.error(err, self.name)
242 236 else:
243 237 break
244 238
245 239 self.close()
246 240
247 241 def close(self):
248 242
249 243 BaseClass.close(self)
250 244 log.success('Done...(Time:{:4.2f} secs)'.format(time.time() - self.start_time), self.name)
251 245
252 246 return MPClass
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
General Comments 0
You need to be logged in to leave comments. Login now