##// END OF EJS Templates
Fix excessive memory RAM consumption
jespinoza -
r1268:b2726fff6520
parent child
Show More
@@ -1,1290 +1,1290
1 1 '''
2 2 Updated on January , 2018, for multiprocessing purposes
3 3 Author: Sergio Cortez
4 4 Created on September , 2012
5 5 '''
6 6 from platform import python_version
7 7 import sys
8 8 import ast
9 9 import datetime
10 10 import traceback
11 11 import math
12 12 import time
13 13 import zmq
14 from multiprocessing import Process, Queue, Event, cpu_count
14 from multiprocessing import Process, Queue, Event, Value, cpu_count
15 15 from threading import Thread
16 16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 17 from xml.dom import minidom
18 18
19 19
20 20 from schainpy.admin import Alarm, SchainWarning
21 21 from schainpy.model import *
22 22 from schainpy.utils import log
23 23
24 24
25 25 DTYPES = {
26 26 'Voltage': '.r',
27 27 'Spectra': '.pdata'
28 28 }
29 29
30 30
31 31 def MPProject(project, n=cpu_count()):
32 32 '''
33 33 Project wrapper to run schain in n processes
34 34 '''
35 35
36 36 rconf = project.getReadUnitObj()
37 37 op = rconf.getOperationObj('run')
38 38 dt1 = op.getParameterValue('startDate')
39 39 dt2 = op.getParameterValue('endDate')
40 40 tm1 = op.getParameterValue('startTime')
41 41 tm2 = op.getParameterValue('endTime')
42 42 days = (dt2 - dt1).days
43 43
44 44 for day in range(days + 1):
45 45 skip = 0
46 46 cursor = 0
47 47 processes = []
48 48 dt = dt1 + datetime.timedelta(day)
49 49 dt_str = dt.strftime('%Y/%m/%d')
50 50 reader = JRODataReader()
51 51 paths, files = reader.searchFilesOffLine(path=rconf.path,
52 52 startDate=dt,
53 53 endDate=dt,
54 54 startTime=tm1,
55 55 endTime=tm2,
56 56 ext=DTYPES[rconf.datatype])
57 57 nFiles = len(files)
58 58 if nFiles == 0:
59 59 continue
60 60 skip = int(math.ceil(nFiles / n))
61 61 while nFiles > cursor * skip:
62 62 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
63 63 skip=skip)
64 64 p = project.clone()
65 65 p.start()
66 66 processes.append(p)
67 67 cursor += 1
68 68
69 69 def beforeExit(exctype, value, trace):
70 70 for process in processes:
71 71 process.terminate()
72 72 process.join()
73 73 print(traceback.print_tb(trace))
74 74
75 75 sys.excepthook = beforeExit
76 76
77 77 for process in processes:
78 78 process.join()
79 79 process.terminate()
80 80
81 81 time.sleep(3)
82 82
83 83 def wait(context):
84 84
85 85 time.sleep(1)
86 86 c = zmq.Context()
87 87 receiver = c.socket(zmq.SUB)
88 88 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
89 89 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
90 90 msg = receiver.recv_multipart()[1]
91 91 context.terminate()
92 92
93 93 class ParameterConf():
94 94
95 95 id = None
96 96 name = None
97 97 value = None
98 98 format = None
99 99
100 100 __formated_value = None
101 101
102 102 ELEMENTNAME = 'Parameter'
103 103
104 104 def __init__(self):
105 105
106 106 self.format = 'str'
107 107
108 108 def getElementName(self):
109 109
110 110 return self.ELEMENTNAME
111 111
112 112 def getValue(self):
113 113
114 114 value = self.value
115 115 format = self.format
116 116
117 117 if self.__formated_value != None:
118 118
119 119 return self.__formated_value
120 120
121 121 if format == 'obj':
122 122 return value
123 123
124 124 if format == 'str':
125 125 self.__formated_value = str(value)
126 126 return self.__formated_value
127 127
128 128 if value == '':
129 129 raise ValueError('%s: This parameter value is empty' % self.name)
130 130
131 131 if format == 'list':
132 132 strList = [s.strip() for s in value.split(',')]
133 133 self.__formated_value = strList
134 134
135 135 return self.__formated_value
136 136
137 137 if format == 'intlist':
138 138 '''
139 139 Example:
140 140 value = (0,1,2)
141 141 '''
142 142
143 143 new_value = ast.literal_eval(value)
144 144
145 145 if type(new_value) not in (tuple, list):
146 146 new_value = [int(new_value)]
147 147
148 148 self.__formated_value = new_value
149 149
150 150 return self.__formated_value
151 151
152 152 if format == 'floatlist':
153 153 '''
154 154 Example:
155 155 value = (0.5, 1.4, 2.7)
156 156 '''
157 157
158 158 new_value = ast.literal_eval(value)
159 159
160 160 if type(new_value) not in (tuple, list):
161 161 new_value = [float(new_value)]
162 162
163 163 self.__formated_value = new_value
164 164
165 165 return self.__formated_value
166 166
167 167 if format == 'date':
168 168 strList = value.split('/')
169 169 intList = [int(x) for x in strList]
170 170 date = datetime.date(intList[0], intList[1], intList[2])
171 171
172 172 self.__formated_value = date
173 173
174 174 return self.__formated_value
175 175
176 176 if format == 'time':
177 177 strList = value.split(':')
178 178 intList = [int(x) for x in strList]
179 179 time = datetime.time(intList[0], intList[1], intList[2])
180 180
181 181 self.__formated_value = time
182 182
183 183 return self.__formated_value
184 184
185 185 if format == 'pairslist':
186 186 '''
187 187 Example:
188 188 value = (0,1),(1,2)
189 189 '''
190 190
191 191 new_value = ast.literal_eval(value)
192 192
193 193 if type(new_value) not in (tuple, list):
194 194 raise ValueError('%s has to be a tuple or list of pairs' % value)
195 195
196 196 if type(new_value[0]) not in (tuple, list):
197 197 if len(new_value) != 2:
198 198 raise ValueError('%s has to be a tuple or list of pairs' % value)
199 199 new_value = [new_value]
200 200
201 201 for thisPair in new_value:
202 202 if len(thisPair) != 2:
203 203 raise ValueError('%s has to be a tuple or list of pairs' % value)
204 204
205 205 self.__formated_value = new_value
206 206
207 207 return self.__formated_value
208 208
209 209 if format == 'multilist':
210 210 '''
211 211 Example:
212 212 value = (0,1,2),(3,4,5)
213 213 '''
214 214 multiList = ast.literal_eval(value)
215 215
216 216 if type(multiList[0]) == int:
217 217 multiList = ast.literal_eval('(' + value + ')')
218 218
219 219 self.__formated_value = multiList
220 220
221 221 return self.__formated_value
222 222
223 223 if format == 'bool':
224 224 value = int(value)
225 225
226 226 if format == 'int':
227 227 value = float(value)
228 228
229 229 format_func = eval(format)
230 230
231 231 self.__formated_value = format_func(value)
232 232
233 233 return self.__formated_value
234 234
235 235 def updateId(self, new_id):
236 236
237 237 self.id = str(new_id)
238 238
239 239 def setup(self, id, name, value, format='str'):
240 240 self.id = str(id)
241 241 self.name = name
242 242 if format == 'obj':
243 243 self.value = value
244 244 else:
245 245 self.value = str(value)
246 246 self.format = str.lower(format)
247 247
248 248 self.getValue()
249 249
250 250 return 1
251 251
252 252 def update(self, name, value, format='str'):
253 253
254 254 self.name = name
255 255 self.value = str(value)
256 256 self.format = format
257 257
258 258 def makeXml(self, opElement):
259 259 if self.name not in ('queue',):
260 260 parmElement = SubElement(opElement, self.ELEMENTNAME)
261 261 parmElement.set('id', str(self.id))
262 262 parmElement.set('name', self.name)
263 263 parmElement.set('value', self.value)
264 264 parmElement.set('format', self.format)
265 265
266 266 def readXml(self, parmElement):
267 267
268 268 self.id = parmElement.get('id')
269 269 self.name = parmElement.get('name')
270 270 self.value = parmElement.get('value')
271 271 self.format = str.lower(parmElement.get('format'))
272 272
273 273 # Compatible with old signal chain version
274 274 if self.format == 'int' and self.name == 'idfigure':
275 275 self.name = 'id'
276 276
277 277 def printattr(self):
278 278
279 279 print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id))
280 280
281 281 class OperationConf():
282 282
283 283 ELEMENTNAME = 'Operation'
284 284
285 285 def __init__(self):
286 286
287 287 self.id = '0'
288 288 self.name = None
289 289 self.priority = None
290 290 self.topic = None
291 291
292 292 def __getNewId(self):
293 293
294 294 return int(self.id) * 10 + len(self.parmConfObjList) + 1
295 295
296 296 def getId(self):
297 297 return self.id
298 298
299 299 def updateId(self, new_id):
300 300
301 301 self.id = str(new_id)
302 302
303 303 n = 1
304 304 for parmObj in self.parmConfObjList:
305 305
306 306 idParm = str(int(new_id) * 10 + n)
307 307 parmObj.updateId(idParm)
308 308
309 309 n += 1
310 310
311 311 def getElementName(self):
312 312
313 313 return self.ELEMENTNAME
314 314
315 315 def getParameterObjList(self):
316 316
317 317 return self.parmConfObjList
318 318
319 319 def getParameterObj(self, parameterName):
320 320
321 321 for parmConfObj in self.parmConfObjList:
322 322
323 323 if parmConfObj.name != parameterName:
324 324 continue
325 325
326 326 return parmConfObj
327 327
328 328 return None
329 329
330 330 def getParameterObjfromValue(self, parameterValue):
331 331
332 332 for parmConfObj in self.parmConfObjList:
333 333
334 334 if parmConfObj.getValue() != parameterValue:
335 335 continue
336 336
337 337 return parmConfObj.getValue()
338 338
339 339 return None
340 340
341 341 def getParameterValue(self, parameterName):
342 342
343 343 parameterObj = self.getParameterObj(parameterName)
344 344
345 345 # if not parameterObj:
346 346 # return None
347 347
348 348 value = parameterObj.getValue()
349 349
350 350 return value
351 351
352 352 def getKwargs(self):
353 353
354 354 kwargs = {}
355 355
356 356 for parmConfObj in self.parmConfObjList:
357 357 if self.name == 'run' and parmConfObj.name == 'datatype':
358 358 continue
359 359
360 360 kwargs[parmConfObj.name] = parmConfObj.getValue()
361 361
362 362 return kwargs
363 363
364 364 def setup(self, id, name, priority, type, project_id, err_queue, lock):
365 365
366 366 self.id = str(id)
367 367 self.project_id = project_id
368 368 self.name = name
369 369 self.type = type
370 370 self.priority = priority
371 371 self.err_queue = err_queue
372 372 self.lock = lock
373 373 self.parmConfObjList = []
374 374
375 375 def removeParameters(self):
376 376
377 377 for obj in self.parmConfObjList:
378 378 del obj
379 379
380 380 self.parmConfObjList = []
381 381
382 382 def addParameter(self, name, value, format='str'):
383 383
384 384 if value is None:
385 385 return None
386 386 id = self.__getNewId()
387 387
388 388 parmConfObj = ParameterConf()
389 389 if not parmConfObj.setup(id, name, value, format):
390 390 return None
391 391
392 392 self.parmConfObjList.append(parmConfObj)
393 393
394 394 return parmConfObj
395 395
396 396 def changeParameter(self, name, value, format='str'):
397 397
398 398 parmConfObj = self.getParameterObj(name)
399 399 parmConfObj.update(name, value, format)
400 400
401 401 return parmConfObj
402 402
403 403 def makeXml(self, procUnitElement):
404 404
405 405 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
406 406 opElement.set('id', str(self.id))
407 407 opElement.set('name', self.name)
408 408 opElement.set('type', self.type)
409 409 opElement.set('priority', str(self.priority))
410 410
411 411 for parmConfObj in self.parmConfObjList:
412 412 parmConfObj.makeXml(opElement)
413 413
414 414 def readXml(self, opElement, project_id):
415 415
416 416 self.id = opElement.get('id')
417 417 self.name = opElement.get('name')
418 418 self.type = opElement.get('type')
419 419 self.priority = opElement.get('priority')
420 420 self.project_id = str(project_id)
421 421
422 422 # Compatible with old signal chain version
423 423 # Use of 'run' method instead 'init'
424 424 if self.type == 'self' and self.name == 'init':
425 425 self.name = 'run'
426 426
427 427 self.parmConfObjList = []
428 428
429 429 parmElementList = opElement.iter(ParameterConf().getElementName())
430 430
431 431 for parmElement in parmElementList:
432 432 parmConfObj = ParameterConf()
433 433 parmConfObj.readXml(parmElement)
434 434
435 435 # Compatible with old signal chain version
436 436 # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
437 437 if self.type != 'self' and self.name == 'Plot':
438 438 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
439 439 self.name = parmConfObj.value
440 440 continue
441 441
442 442 self.parmConfObjList.append(parmConfObj)
443 443
444 444 def printattr(self):
445 445
446 446 print('%s[%s]: name = %s, type = %s, priority = %s, project_id = %s' % (self.ELEMENTNAME,
447 447 self.id,
448 448 self.name,
449 449 self.type,
450 450 self.priority,
451 451 self.project_id))
452 452
453 453 for parmConfObj in self.parmConfObjList:
454 454 parmConfObj.printattr()
455 455
456 456 def createObject(self):
457 457
458 458 className = eval(self.name)
459 459
460 460 if self.type == 'other':
461 461 opObj = className()
462 462 elif self.type == 'external':
463 463 kwargs = self.getKwargs()
464 464 opObj = className(self.id, self.id, self.project_id, self.err_queue, self.lock, 'Operation', **kwargs)
465 465 opObj.start()
466 466 self.opObj = opObj
467 467
468 468 return opObj
469 469
470 470 class ProcUnitConf():
471 471
472 472 ELEMENTNAME = 'ProcUnit'
473 473
474 474 def __init__(self):
475 475
476 476 self.id = None
477 477 self.datatype = None
478 478 self.name = None
479 479 self.inputId = None
480 480 self.opConfObjList = []
481 481 self.procUnitObj = None
482 482 self.opObjDict = {}
483 self.mylock = Event()
484 483
485 484 def __getPriority(self):
486 485
487 486 return len(self.opConfObjList) + 1
488 487
489 488 def __getNewId(self):
490 489
491 490 return int(self.id) * 10 + len(self.opConfObjList) + 1
492 491
493 492 def getElementName(self):
494 493
495 494 return self.ELEMENTNAME
496 495
497 496 def getId(self):
498 497
499 498 return self.id
500 499
501 500 def updateId(self, new_id):
502 501 '''
503 502 new_id = int(parentId) * 10 + (int(self.id) % 10)
504 503 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
505 504
506 505 # If this proc unit has not inputs
507 506 #if self.inputId == '0':
508 507 #new_inputId = 0
509 508
510 509 n = 1
511 510 for opConfObj in self.opConfObjList:
512 511
513 512 idOp = str(int(new_id) * 10 + n)
514 513 opConfObj.updateId(idOp)
515 514
516 515 n += 1
517 516
518 517 self.parentId = str(parentId)
519 518 self.id = str(new_id)
520 519 #self.inputId = str(new_inputId)
521 520 '''
522 521 n = 1
523 522
524 523 def getInputId(self):
525 524
526 525 return self.inputId
527 526
528 527 def getOperationObjList(self):
529 528
530 529 return self.opConfObjList
531 530
532 531 def getOperationObj(self, name=None):
533 532
534 533 for opConfObj in self.opConfObjList:
535 534
536 535 if opConfObj.name != name:
537 536 continue
538 537
539 538 return opConfObj
540 539
541 540 return None
542 541
543 542 def getOpObjfromParamValue(self, value=None):
544 543
545 544 for opConfObj in self.opConfObjList:
546 545 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
547 546 continue
548 547 return opConfObj
549 548 return None
550 549
551 550 def getProcUnitObj(self):
552 551
553 552 return self.procUnitObj
554 553
555 554 def setup(self, project_id, id, name, datatype, inputId, err_queue, lock):
556 555 '''
557 556 id sera el topico a publicar
558 557 inputId sera el topico a subscribirse
559 558 '''
560 559
561 560 # Compatible with old signal chain version
562 561 if datatype == None and name == None:
563 562 raise ValueError('datatype or name should be defined')
564 563
565 564 #Definir una condicion para inputId cuando sea 0
566 565
567 566 if name == None:
568 567 if 'Proc' in datatype:
569 568 name = datatype
570 569 else:
571 570 name = '%sProc' % (datatype)
572 571
573 572 if datatype == None:
574 573 datatype = name.replace('Proc', '')
575 574
576 575 self.id = str(id)
577 576 self.project_id = project_id
578 577 self.name = name
579 578 self.datatype = datatype
580 579 self.inputId = inputId
581 580 self.err_queue = err_queue
582 581 self.lock = lock
583 582 self.opConfObjList = []
584 583
585 584 self.addOperation(name='run', optype='self')
586 585
587 586 def removeOperations(self):
588 587
589 588 for obj in self.opConfObjList:
590 589 del obj
591 590
592 591 self.opConfObjList = []
593 592 self.addOperation(name='run')
594 593
595 594 def addParameter(self, **kwargs):
596 595 '''
597 596 Add parameters to 'run' operation
598 597 '''
599 598 opObj = self.opConfObjList[0]
600 599
601 600 opObj.addParameter(**kwargs)
602 601
603 602 return opObj
604 603
605 604 def addOperation(self, name, optype='self'):
606 605 '''
607 606 Actualizacion - > proceso comunicacion
608 607 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
609 608 definir el tipoc de socket o comunicacion ipc++
610 609
611 610 '''
612 611
613 612 id = self.__getNewId()
614 613 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
615 614 opConfObj = OperationConf()
616 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.mylock)
615 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.lock)
617 616 self.opConfObjList.append(opConfObj)
618 617
619 618 return opConfObj
620 619
621 620 def makeXml(self, projectElement):
622 621
623 622 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
624 623 procUnitElement.set('id', str(self.id))
625 624 procUnitElement.set('name', self.name)
626 625 procUnitElement.set('datatype', self.datatype)
627 626 procUnitElement.set('inputId', str(self.inputId))
628 627
629 628 for opConfObj in self.opConfObjList:
630 629 opConfObj.makeXml(procUnitElement)
631 630
632 631 def readXml(self, upElement, project_id):
633 632
634 633 self.id = upElement.get('id')
635 634 self.name = upElement.get('name')
636 635 self.datatype = upElement.get('datatype')
637 636 self.inputId = upElement.get('inputId')
638 637 self.project_id = str(project_id)
639 638
640 639 if self.ELEMENTNAME == 'ReadUnit':
641 640 self.datatype = self.datatype.replace('Reader', '')
642 641
643 642 if self.ELEMENTNAME == 'ProcUnit':
644 643 self.datatype = self.datatype.replace('Proc', '')
645 644
646 645 if self.inputId == 'None':
647 646 self.inputId = '0'
648 647
649 648 self.opConfObjList = []
650 649
651 650 opElementList = upElement.iter(OperationConf().getElementName())
652 651
653 652 for opElement in opElementList:
654 653 opConfObj = OperationConf()
655 654 opConfObj.readXml(opElement, project_id)
656 655 self.opConfObjList.append(opConfObj)
657 656
658 657 def printattr(self):
659 658
660 659 print('%s[%s]: name = %s, datatype = %s, inputId = %s, project_id = %s' % (self.ELEMENTNAME,
661 660 self.id,
662 661 self.name,
663 662 self.datatype,
664 663 self.inputId,
665 664 self.project_id))
666 665
667 666 for opConfObj in self.opConfObjList:
668 667 opConfObj.printattr()
669 668
670 669 def getKwargs(self):
671 670
672 671 opObj = self.opConfObjList[0]
673 672 kwargs = opObj.getKwargs()
674 673
675 674 return kwargs
676 675
677 676 def createObjects(self):
678 677 '''
679 678 Instancia de unidades de procesamiento.
680 679 '''
681 680
682 681 className = eval(self.name)
683 682 kwargs = self.getKwargs()
684 683 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, self.lock, 'ProcUnit', **kwargs)
685 684 log.success('creating process...', self.name)
686 685
687 686 for opConfObj in self.opConfObjList:
688 687
689 688 if opConfObj.type == 'self' and opConfObj.name == 'run':
690 689 continue
691 690 elif opConfObj.type == 'self':
692 691 opObj = getattr(procUnitObj, opConfObj.name)
693 692 else:
694 693 opObj = opConfObj.createObject()
695 694
696 695 log.success('adding operation: {}, type:{}'.format(
697 696 opConfObj.name,
698 697 opConfObj.type), self.name)
699 698
700 699 procUnitObj.addOperation(opConfObj, opObj)
701 700
702 701 procUnitObj.start()
703 702 self.procUnitObj = procUnitObj
704 703
705 704 def close(self):
706 705
707 706 for opConfObj in self.opConfObjList:
708 707 if opConfObj.type == 'self':
709 708 continue
710 709
711 710 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
712 711 opObj.close()
713 712
714 713 self.procUnitObj.close()
715 714
716 715 return
717 716
718 717
719 718 class ReadUnitConf(ProcUnitConf):
720 719
721 720 ELEMENTNAME = 'ReadUnit'
722 721
723 722 def __init__(self):
724 723
725 724 self.id = None
726 725 self.datatype = None
727 726 self.name = None
728 727 self.inputId = None
729 728 self.opConfObjList = []
730 self.mylock = Event()
729 self.lock = Event()
730 self.lock.set()
731 self.lock.n = Value('d', 0)
731 732
732 733 def getElementName(self):
733 734
734 735 return self.ELEMENTNAME
735 736
736 737 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
737 738 startTime='', endTime='', server=None, **kwargs):
738 739
739 740
740 741 '''
741 742 *****el id del proceso sera el Topico
742 743
743 744 Adicion de {topic}, si no esta presente -> error
744 745 kwargs deben ser trasmitidos en la instanciacion
745 746
746 747 '''
747 748
748 749 # Compatible with old signal chain version
749 750 if datatype == None and name == None:
750 751 raise ValueError('datatype or name should be defined')
751 752 if name == None:
752 753 if 'Reader' in datatype:
753 754 name = datatype
754 755 datatype = name.replace('Reader','')
755 756 else:
756 757 name = '{}Reader'.format(datatype)
757 758 if datatype == None:
758 759 if 'Reader' in name:
759 760 datatype = name.replace('Reader','')
760 761 else:
761 762 datatype = name
762 763 name = '{}Reader'.format(name)
763 764
764 765 self.id = id
765 766 self.project_id = project_id
766 767 self.name = name
767 768 self.datatype = datatype
768 769 if path != '':
769 770 self.path = os.path.abspath(path)
770 771 self.startDate = startDate
771 772 self.endDate = endDate
772 773 self.startTime = startTime
773 774 self.endTime = endTime
774 775 self.server = server
775 776 self.err_queue = err_queue
776 self.lock = self.mylock
777 777 self.addRunOperation(**kwargs)
778 778
779 779 def update(self, **kwargs):
780 780
781 781 if 'datatype' in kwargs:
782 782 datatype = kwargs.pop('datatype')
783 783 if 'Reader' in datatype:
784 784 self.name = datatype
785 785 else:
786 786 self.name = '%sReader' % (datatype)
787 787 self.datatype = self.name.replace('Reader', '')
788 788
789 789 attrs = ('path', 'startDate', 'endDate',
790 790 'startTime', 'endTime')
791 791
792 792 for attr in attrs:
793 793 if attr in kwargs:
794 794 setattr(self, attr, kwargs.pop(attr))
795 795
796 796 self.updateRunOperation(**kwargs)
797 797
798 798 def removeOperations(self):
799 799
800 800 for obj in self.opConfObjList:
801 801 del obj
802 802
803 803 self.opConfObjList = []
804 804
805 805 def addRunOperation(self, **kwargs):
806 806
807 807 opObj = self.addOperation(name='run', optype='self')
808 808
809 809 if self.server is None:
810 810 opObj.addParameter(
811 811 name='datatype', value=self.datatype, format='str')
812 812 opObj.addParameter(name='path', value=self.path, format='str')
813 813 opObj.addParameter(
814 814 name='startDate', value=self.startDate, format='date')
815 815 opObj.addParameter(
816 816 name='endDate', value=self.endDate, format='date')
817 817 opObj.addParameter(
818 818 name='startTime', value=self.startTime, format='time')
819 819 opObj.addParameter(
820 820 name='endTime', value=self.endTime, format='time')
821 821
822 822 for key, value in list(kwargs.items()):
823 823 opObj.addParameter(name=key, value=value,
824 824 format=type(value).__name__)
825 825 else:
826 826 opObj.addParameter(name='server', value=self.server, format='str')
827 827
828 828 return opObj
829 829
830 830 def updateRunOperation(self, **kwargs):
831 831
832 832 opObj = self.getOperationObj(name='run')
833 833 opObj.removeParameters()
834 834
835 835 opObj.addParameter(name='datatype', value=self.datatype, format='str')
836 836 opObj.addParameter(name='path', value=self.path, format='str')
837 837 opObj.addParameter(
838 838 name='startDate', value=self.startDate, format='date')
839 839 opObj.addParameter(name='endDate', value=self.endDate, format='date')
840 840 opObj.addParameter(
841 841 name='startTime', value=self.startTime, format='time')
842 842 opObj.addParameter(name='endTime', value=self.endTime, format='time')
843 843
844 844 for key, value in list(kwargs.items()):
845 845 opObj.addParameter(name=key, value=value,
846 846 format=type(value).__name__)
847 847
848 848 return opObj
849 849
850 850 def readXml(self, upElement, project_id):
851 851
852 852 self.id = upElement.get('id')
853 853 self.name = upElement.get('name')
854 854 self.datatype = upElement.get('datatype')
855 855 self.project_id = str(project_id) #yong
856 856
857 857 if self.ELEMENTNAME == 'ReadUnit':
858 858 self.datatype = self.datatype.replace('Reader', '')
859 859
860 860 self.opConfObjList = []
861 861
862 862 opElementList = upElement.iter(OperationConf().getElementName())
863 863
864 864 for opElement in opElementList:
865 865 opConfObj = OperationConf()
866 866 opConfObj.readXml(opElement, project_id)
867 867 self.opConfObjList.append(opConfObj)
868 868
869 869 if opConfObj.name == 'run':
870 870 self.path = opConfObj.getParameterValue('path')
871 871 self.startDate = opConfObj.getParameterValue('startDate')
872 872 self.endDate = opConfObj.getParameterValue('endDate')
873 873 self.startTime = opConfObj.getParameterValue('startTime')
874 874 self.endTime = opConfObj.getParameterValue('endTime')
875 875
876 876
877 877 class Project(Process):
878 878
879 879 ELEMENTNAME = 'Project'
880 880
881 881 def __init__(self):
882 882
883 883 Process.__init__(self)
884 884 self.id = None
885 885 self.filename = None
886 886 self.description = None
887 887 self.email = None
888 888 self.alarm = None
889 889 self.procUnitConfObjDict = {}
890 890 self.err_queue = Queue()
891 891
892 892 def __getNewId(self):
893 893
894 894 idList = list(self.procUnitConfObjDict.keys())
895 895 id = int(self.id) * 10
896 896
897 897 while True:
898 898 id += 1
899 899
900 900 if str(id) in idList:
901 901 continue
902 902
903 903 break
904 904
905 905 return str(id)
906 906
907 907 def getElementName(self):
908 908
909 909 return self.ELEMENTNAME
910 910
911 911 def getId(self):
912 912
913 913 return self.id
914 914
915 915 def updateId(self, new_id):
916 916
917 917 self.id = str(new_id)
918 918
919 919 keyList = list(self.procUnitConfObjDict.keys())
920 920 keyList.sort()
921 921
922 922 n = 1
923 923 newProcUnitConfObjDict = {}
924 924
925 925 for procKey in keyList:
926 926
927 927 procUnitConfObj = self.procUnitConfObjDict[procKey]
928 928 idProcUnit = str(int(self.id) * 10 + n)
929 929 procUnitConfObj.updateId(idProcUnit)
930 930 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
931 931 n += 1
932 932
933 933 self.procUnitConfObjDict = newProcUnitConfObjDict
934 934
935 935 def setup(self, id=1, name='', description='', email=None, alarm=[]):
936 936
937 937 print(' ')
938 938 print('*' * 60)
939 939 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
940 940 print('*' * 60)
941 941 print("* Python " + python_version() + " *")
942 942 print('*' * 19)
943 943 print(' ')
944 944 self.id = str(id)
945 945 self.description = description
946 946 self.email = email
947 947 self.alarm = alarm
948 948 if name:
949 949 self.name = '{} ({})'.format(Process.__name__, name)
950 950
951 951 def update(self, **kwargs):
952 952
953 953 for key, value in list(kwargs.items()):
954 954 setattr(self, key, value)
955 955
956 956 def clone(self):
957 957
958 958 p = Project()
959 959 p.procUnitConfObjDict = self.procUnitConfObjDict
960 960 return p
961 961
962 962 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
963 963
964 964 '''
965 965 Actualizacion:
966 966 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
967 967
968 968 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
969 969
970 970 '''
971 971
972 972 if id is None:
973 973 idReadUnit = self.__getNewId()
974 974 else:
975 975 idReadUnit = str(id)
976 976
977 977 readUnitConfObj = ReadUnitConf()
978 978 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
979 979 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
980 980
981 981 return readUnitConfObj
982 982
983 983 def addProcUnit(self, inputId='0', datatype=None, name=None):
984 984
985 985 '''
986 986 Actualizacion:
987 987 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
988 988 Deberia reemplazar a "inputId"
989 989
990 990 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
991 991 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
992 992
993 993 '''
994 994
995 995 idProcUnit = self.__getNewId()
996 996 procUnitConfObj = ProcUnitConf()
997 997 input_proc = self.procUnitConfObjDict[inputId]
998 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.mylock)
998 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.lock)
999 999 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1000 1000
1001 1001 return procUnitConfObj
1002 1002
1003 1003 def removeProcUnit(self, id):
1004 1004
1005 1005 if id in list(self.procUnitConfObjDict.keys()):
1006 1006 self.procUnitConfObjDict.pop(id)
1007 1007
1008 1008 def getReadUnitId(self):
1009 1009
1010 1010 readUnitConfObj = self.getReadUnitObj()
1011 1011
1012 1012 return readUnitConfObj.id
1013 1013
1014 1014 def getReadUnitObj(self):
1015 1015
1016 1016 for obj in list(self.procUnitConfObjDict.values()):
1017 1017 if obj.getElementName() == 'ReadUnit':
1018 1018 return obj
1019 1019
1020 1020 return None
1021 1021
1022 1022 def getProcUnitObj(self, id=None, name=None):
1023 1023
1024 1024 if id != None:
1025 1025 return self.procUnitConfObjDict[id]
1026 1026
1027 1027 if name != None:
1028 1028 return self.getProcUnitObjByName(name)
1029 1029
1030 1030 return None
1031 1031
1032 1032 def getProcUnitObjByName(self, name):
1033 1033
1034 1034 for obj in list(self.procUnitConfObjDict.values()):
1035 1035 if obj.name == name:
1036 1036 return obj
1037 1037
1038 1038 return None
1039 1039
1040 1040 def procUnitItems(self):
1041 1041
1042 1042 return list(self.procUnitConfObjDict.items())
1043 1043
1044 1044 def makeXml(self):
1045 1045
1046 1046 projectElement = Element('Project')
1047 1047 projectElement.set('id', str(self.id))
1048 1048 projectElement.set('name', self.name)
1049 1049 projectElement.set('description', self.description)
1050 1050
1051 1051 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1052 1052 procUnitConfObj.makeXml(projectElement)
1053 1053
1054 1054 self.projectElement = projectElement
1055 1055
1056 1056 def writeXml(self, filename=None):
1057 1057
1058 1058 if filename == None:
1059 1059 if self.filename:
1060 1060 filename = self.filename
1061 1061 else:
1062 1062 filename = 'schain.xml'
1063 1063
1064 1064 if not filename:
1065 1065 print('filename has not been defined. Use setFilename(filename) for do it.')
1066 1066 return 0
1067 1067
1068 1068 abs_file = os.path.abspath(filename)
1069 1069
1070 1070 if not os.access(os.path.dirname(abs_file), os.W_OK):
1071 1071 print('No write permission on %s' % os.path.dirname(abs_file))
1072 1072 return 0
1073 1073
1074 1074 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1075 1075 print('File %s already exists and it could not be overwriten' % abs_file)
1076 1076 return 0
1077 1077
1078 1078 self.makeXml()
1079 1079
1080 1080 ElementTree(self.projectElement).write(abs_file, method='xml')
1081 1081
1082 1082 self.filename = abs_file
1083 1083
1084 1084 return 1
1085 1085
1086 1086 def readXml(self, filename=None):
1087 1087
1088 1088 if not filename:
1089 1089 print('filename is not defined')
1090 1090 return 0
1091 1091
1092 1092 abs_file = os.path.abspath(filename)
1093 1093
1094 1094 if not os.path.isfile(abs_file):
1095 1095 print('%s file does not exist' % abs_file)
1096 1096 return 0
1097 1097
1098 1098 self.projectElement = None
1099 1099 self.procUnitConfObjDict = {}
1100 1100
1101 1101 try:
1102 1102 self.projectElement = ElementTree().parse(abs_file)
1103 1103 except:
1104 1104 print('Error reading %s, verify file format' % filename)
1105 1105 return 0
1106 1106
1107 1107 self.project = self.projectElement.tag
1108 1108
1109 1109 self.id = self.projectElement.get('id')
1110 1110 self.name = self.projectElement.get('name')
1111 1111 self.description = self.projectElement.get('description')
1112 1112
1113 1113 readUnitElementList = self.projectElement.iter(
1114 1114 ReadUnitConf().getElementName())
1115 1115
1116 1116 for readUnitElement in readUnitElementList:
1117 1117 readUnitConfObj = ReadUnitConf()
1118 1118 readUnitConfObj.readXml(readUnitElement, self.id)
1119 1119 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1120 1120
1121 1121 procUnitElementList = self.projectElement.iter(
1122 1122 ProcUnitConf().getElementName())
1123 1123
1124 1124 for procUnitElement in procUnitElementList:
1125 1125 procUnitConfObj = ProcUnitConf()
1126 1126 procUnitConfObj.readXml(procUnitElement, self.id)
1127 1127 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1128 1128
1129 1129 self.filename = abs_file
1130 1130
1131 1131 return 1
1132 1132
1133 1133 def __str__(self):
1134 1134
1135 1135 print('Project: name = %s, description = %s, id = %s' % (
1136 1136 self.name,
1137 1137 self.description,
1138 1138 self.id))
1139 1139
1140 1140 for procUnitConfObj in self.procUnitConfObjDict.values():
1141 1141 print(procUnitConfObj)
1142 1142
1143 1143 def createObjects(self):
1144 1144
1145 1145
1146 1146 keys = list(self.procUnitConfObjDict.keys())
1147 1147 keys.sort()
1148 1148 for key in keys:
1149 1149 self.procUnitConfObjDict[key].createObjects()
1150 1150
1151 1151 def monitor(self):
1152 1152
1153 1153 t = Thread(target=self.__monitor, args=(self.err_queue, self.ctx))
1154 1154 t.start()
1155 1155
1156 1156 def __monitor(self, queue, ctx):
1157 1157
1158 1158 import socket
1159 1159
1160 1160 procs = 0
1161 1161 err_msg = ''
1162 1162
1163 1163 while True:
1164 1164 msg = queue.get()
1165 1165 if '#_start_#' in msg:
1166 1166 procs += 1
1167 1167 elif '#_end_#' in msg:
1168 1168 procs -=1
1169 1169 else:
1170 1170 err_msg = msg
1171 1171
1172 1172 if procs == 0 or 'Traceback' in err_msg:
1173 1173 break
1174 1174 time.sleep(0.1)
1175 1175
1176 1176 if '|' in err_msg:
1177 1177 name, err = err_msg.split('|')
1178 1178 if 'SchainWarning' in err:
1179 1179 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
1180 1180 elif 'SchainError' in err:
1181 1181 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
1182 1182 else:
1183 1183 log.error(err, name)
1184 1184 else:
1185 1185 name, err = self.name, err_msg
1186 1186
1187 1187 time.sleep(2)
1188 1188
1189 1189 for conf in self.procUnitConfObjDict.values():
1190 1190 for confop in conf.opConfObjList:
1191 1191 if confop.type == 'external':
1192 1192 confop.opObj.terminate()
1193 1193 conf.procUnitObj.terminate()
1194 1194
1195 1195 ctx.term()
1196 1196
1197 1197 message = ''.join(err)
1198 1198
1199 1199 if err_msg:
1200 1200 subject = 'SChain v%s: Error running %s\n' % (
1201 1201 schainpy.__version__, self.name)
1202 1202
1203 1203 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
1204 1204 socket.gethostname())
1205 1205 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
1206 1206 subtitle += 'Configuration file: %s\n' % self.filename
1207 1207 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1208 1208
1209 1209 readUnitConfObj = self.getReadUnitObj()
1210 1210 if readUnitConfObj:
1211 1211 subtitle += '\nInput parameters:\n'
1212 1212 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
1213 1213 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
1214 1214 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
1215 1215 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
1216 1216 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
1217 1217 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1218 1218
1219 1219 a = Alarm(
1220 1220 modes=self.alarm,
1221 1221 email=self.email,
1222 1222 message=message,
1223 1223 subject=subject,
1224 1224 subtitle=subtitle,
1225 1225 filename=self.filename
1226 1226 )
1227 1227
1228 1228 a.start()
1229 1229
1230 1230 def isPaused(self):
1231 1231 return 0
1232 1232
1233 1233 def isStopped(self):
1234 1234 return 0
1235 1235
1236 1236 def runController(self):
1237 1237 '''
1238 1238 returns 0 when this process has been stopped, 1 otherwise
1239 1239 '''
1240 1240
1241 1241 if self.isPaused():
1242 1242 print('Process suspended')
1243 1243
1244 1244 while True:
1245 1245 time.sleep(0.1)
1246 1246
1247 1247 if not self.isPaused():
1248 1248 break
1249 1249
1250 1250 if self.isStopped():
1251 1251 break
1252 1252
1253 1253 print('Process reinitialized')
1254 1254
1255 1255 if self.isStopped():
1256 1256 print('Process stopped')
1257 1257 return 0
1258 1258
1259 1259 return 1
1260 1260
1261 1261 def setFilename(self, filename):
1262 1262
1263 1263 self.filename = filename
1264 1264
1265 1265 def setProxy(self):
1266 1266
1267 1267 if not os.path.exists('/tmp/schain'):
1268 1268 os.mkdir('/tmp/schain')
1269 1269
1270 1270 self.ctx = zmq.Context()
1271 1271 xpub = self.ctx.socket(zmq.XPUB)
1272 1272 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1273 1273 xsub = self.ctx.socket(zmq.XSUB)
1274 1274 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1275 1275 self.monitor()
1276 1276 try:
1277 1277 zmq.proxy(xpub, xsub)
1278 1278 except zmq.ContextTerminated:
1279 1279 xpub.close()
1280 1280 xsub.close()
1281 1281
1282 1282 def run(self):
1283 1283
1284 1284 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1285 1285 self.start_time = time.time()
1286 1286 self.createObjects()
1287 1287 self.setProxy()
1288 1288 log.success('{} Done (Time: {}s)'.format(
1289 1289 self.name,
1290 1290 time.time()-self.start_time), '')
@@ -1,416 +1,429
1 1 '''
2 2 Updated for multiprocessing
3 3 Author : Sergio Cortez
4 4 Jan 2018
5 5 Abstract:
6 6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9 9
10 10 Based on:
11 11 $Author: murco $
12 12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 13 '''
14 14
15 15 import os
16 16 import sys
17 17 import inspect
18 18 import zmq
19 19 import time
20 20 import pickle
21 21 import traceback
22 22 try:
23 23 from queue import Queue
24 24 except:
25 25 from Queue import Queue
26 26 from threading import Thread
27 27 from multiprocessing import Process
28 28
29 29 from schainpy.utils import log
30 30
31 31
32 32 class ProcessingUnit(object):
33 33
34 34 """
35 35 Update - Jan 2018 - MULTIPROCESSING
36 36 All the "call" methods present in the previous base were removed.
37 37 The majority of operations are independant processes, thus
38 38 the decorator is in charge of communicate the operation processes
39 39 with the proccessing unit via IPC.
40 40
41 41 The constructor does not receive any argument. The remaining methods
42 42 are related with the operations to execute.
43 43
44 44
45 45 """
46 46 proc_type = 'processing'
47 47 __attrs__ = []
48 48
49 49 def __init__(self):
50 50
51 51 self.dataIn = None
52 52 self.dataOut = None
53 53 self.isConfig = False
54 54 self.operations = []
55 55 self.plots = []
56 56
57 57 def getAllowedArgs(self):
58 58 if hasattr(self, '__attrs__'):
59 59 return self.__attrs__
60 60 else:
61 61 return inspect.getargspec(self.run).args
62 62
63 63 def addOperation(self, conf, operation):
64 64 """
65 65 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
66 66 posses the id of the operation process (IPC purposes)
67 67
68 68 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
69 69 identificador asociado a este objeto.
70 70
71 71 Input:
72 72
73 73 object : objeto de la clase "Operation"
74 74
75 75 Return:
76 76
77 77 objId : identificador del objeto, necesario para comunicar con master(procUnit)
78 78 """
79 79
80 80 self.operations.append(
81 81 (operation, conf.type, conf.id, conf.getKwargs()))
82 82
83 83 if 'plot' in self.name.lower():
84 84 self.plots.append(operation.CODE)
85 85
86 86 def getOperationObj(self, objId):
87 87
88 88 if objId not in list(self.operations.keys()):
89 89 return None
90 90
91 91 return self.operations[objId]
92 92
93 93 def operation(self, **kwargs):
94 94 """
95 95 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
96 96 atributos del objeto dataOut
97 97
98 98 Input:
99 99
100 100 **kwargs : Diccionario de argumentos de la funcion a ejecutar
101 101 """
102 102
103 103 raise NotImplementedError
104 104
105 105 def setup(self):
106 106
107 107 raise NotImplementedError
108 108
109 109 def run(self):
110 110
111 111 raise NotImplementedError
112 112
113 113 def close(self):
114 114
115 115 return
116 116
117 117
118 118 class Operation(object):
119 119
120 120 """
121 121 Update - Jan 2018 - MULTIPROCESSING
122 122
123 123 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
124 124 The constructor doe snot receive any argument, neither the baseclass.
125 125
126 126
127 127 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
128 128 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
129 129 acumulacion dentro de esta clase
130 130
131 131 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
132 132
133 133 """
134 134 proc_type = 'operation'
135 135 __attrs__ = []
136 136
137 137 def __init__(self):
138 138
139 139 self.id = None
140 140 self.isConfig = False
141 141
142 142 if not hasattr(self, 'name'):
143 143 self.name = self.__class__.__name__
144 144
145 145 def getAllowedArgs(self):
146 146 if hasattr(self, '__attrs__'):
147 147 return self.__attrs__
148 148 else:
149 149 return inspect.getargspec(self.run).args
150 150
151 151 def setup(self):
152 152
153 153 self.isConfig = True
154 154
155 155 raise NotImplementedError
156 156
157 157 def run(self, dataIn, **kwargs):
158 158 """
159 159 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
160 160 atributos del objeto dataIn.
161 161
162 162 Input:
163 163
164 164 dataIn : objeto del tipo JROData
165 165
166 166 Return:
167 167
168 168 None
169 169
170 170 Affected:
171 171 __buffer : buffer de recepcion de datos.
172 172
173 173 """
174 174 if not self.isConfig:
175 175 self.setup(**kwargs)
176 176
177 177 raise NotImplementedError
178 178
179 179 def close(self):
180 180
181 181 return
182 182
183 183 class InputQueue(Thread):
184 184
185 185 '''
186 186 Class to hold input data for Proccessing Units and external Operations,
187 187 '''
188 188
189 189 def __init__(self, project_id, inputId, lock=None):
190 190
191 191 Thread.__init__(self)
192 192 self.queue = Queue()
193 193 self.project_id = project_id
194 194 self.inputId = inputId
195 195 self.lock = lock
196 self.islocked = False
196 197 self.size = 0
197 198
198 199 def run(self):
199 200
200 201 c = zmq.Context()
201 202 self.receiver = c.socket(zmq.SUB)
202 203 self.receiver.connect(
203 204 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
204 205 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
205 206
206 207 while True:
207 208 obj = self.receiver.recv_multipart()[1]
208 209 self.size += sys.getsizeof(obj)
209 210 self.queue.put(obj)
210 211
211 212 def get(self):
212 if self.size/1000000 > 2048:
213
214 if not self.islocked and self.size/1000000 > 512:
215 self.lock.n.value += 1
216 self.islocked = True
213 217 self.lock.clear()
214 else:
218 elif self.islocked and self.size/1000000 <= 512:
219 self.islocked = False
220 self.lock.n.value -= 1
221 if self.lock.n.value == 0:
215 222 self.lock.set()
223
216 224 obj = self.queue.get()
217 225 self.size -= sys.getsizeof(obj)
218 226 return pickle.loads(obj)
219 227
220 228
221 229 def MPDecorator(BaseClass):
222 230 """
223 231 Multiprocessing class decorator
224 232
225 233 This function add multiprocessing features to a BaseClass. Also, it handle
226 234 the communication beetween processes (readers, procUnits and operations).
227 235 """
228 236
229 237 class MPClass(BaseClass, Process):
230 238
231 239 def __init__(self, *args, **kwargs):
232 240 super(MPClass, self).__init__()
233 241 Process.__init__(self)
234 242 self.operationKwargs = {}
235 243 self.args = args
236 244 self.kwargs = kwargs
237 245 self.sender = None
238 246 self.receiver = None
239 247 self.i = 0
240 248 self.t = time.time()
241 249 self.name = BaseClass.__name__
242 250 self.__doc__ = BaseClass.__doc__
243 251
244 252 if 'plot' in self.name.lower() and not self.name.endswith('_'):
245 253 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
246 254
247 255 self.start_time = time.time()
248 256 self.id = args[0]
249 257 self.inputId = args[1]
250 258 self.project_id = args[2]
251 259 self.err_queue = args[3]
252 260 self.lock = args[4]
253 261 self.typeProc = args[5]
254 262 self.err_queue.put('#_start_#')
263 if self.inputId is not None:
255 264 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
256 265
257 266 def subscribe(self):
258 267 '''
259 268 Start the zmq socket receiver and subcribe to input ID.
260 269 '''
261 270
262 271 self.queue.start()
263 272
264 273 def listen(self):
265 274 '''
266 275 This function waits for objects
267 276 '''
268 277
269 278 return self.queue.get()
270 279
271 280 def set_publisher(self):
272 281 '''
273 282 This function create a zmq socket for publishing objects.
274 283 '''
275 284
276 285 time.sleep(0.5)
277 286
278 287 c = zmq.Context()
279 288 self.sender = c.socket(zmq.PUB)
280 289 self.sender.connect(
281 290 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
282 291
283 292 def publish(self, data, id):
284 293 '''
285 294 This function publish an object, to an specific topic.
286 295 It blocks publishing when receiver queue is full to avoid data loss
287 296 '''
288 297
289 298 if self.inputId is None:
290 299 self.lock.wait()
291 300 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
292 301
293 302 def runReader(self):
294 303 '''
295 304 Run fuction for read units
296 305 '''
297 306 while True:
298 307
299 308 try:
300 309 BaseClass.run(self, **self.kwargs)
301 310 except:
302 311 err = traceback.format_exc()
303 312 if 'No more files' in err:
304 313 log.warning('No more files to read', self.name)
305 314 else:
306 315 self.err_queue.put('{}|{}'.format(self.name, err))
307 316 self.dataOut.error = True
308 317
309 318 for op, optype, opId, kwargs in self.operations:
310 319 if optype == 'self' and not self.dataOut.flagNoData:
311 320 op(**kwargs)
312 321 elif optype == 'other' and not self.dataOut.flagNoData:
313 322 self.dataOut = op.run(self.dataOut, **self.kwargs)
314 323 elif optype == 'external':
315 324 self.publish(self.dataOut, opId)
316 325
317 326 if self.dataOut.flagNoData and not self.dataOut.error:
318 327 continue
319 328
320 329 self.publish(self.dataOut, self.id)
321 330
322 331 if self.dataOut.error:
323 332 break
324 333
325 334 time.sleep(0.5)
326 335
327 336 def runProc(self):
328 337 '''
329 338 Run function for proccessing units
330 339 '''
331 340
332 341 while True:
333 342 self.dataIn = self.listen()
334 343
335 344 if self.dataIn.flagNoData and self.dataIn.error is None:
336 345 continue
337 346 elif not self.dataIn.error:
338 347 try:
339 348 BaseClass.run(self, **self.kwargs)
340 349 except:
341 350 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
342 351 self.dataOut.error = True
343 352 elif self.dataIn.error:
344 353 self.dataOut.error = self.dataIn.error
345 354 self.dataOut.flagNoData = True
346 355
347 356 for op, optype, opId, kwargs in self.operations:
348 357 if optype == 'self' and not self.dataOut.flagNoData:
349 358 op(**kwargs)
350 359 elif optype == 'other' and not self.dataOut.flagNoData:
351 360 self.dataOut = op.run(self.dataOut, **kwargs)
352 361 elif optype == 'external' and not self.dataOut.flagNoData:
353 362 self.publish(self.dataOut, opId)
354 363
355 364 self.publish(self.dataOut, self.id)
356 365 for op, optype, opId, kwargs in self.operations:
357 366 if optype == 'external' and self.dataOut.error:
358 367 self.publish(self.dataOut, opId)
359 368
360 369 if self.dataOut.error:
361 370 break
362 371
363 372 time.sleep(0.5)
364 373
365 374 def runOp(self):
366 375 '''
367 376 Run function for external operations (this operations just receive data
368 377 ex: plots, writers, publishers)
369 378 '''
370 379
371 380 while True:
372 381
373 382 dataOut = self.listen()
374 383
375 384 if not dataOut.error:
385 try:
376 386 BaseClass.run(self, dataOut, **self.kwargs)
387 except:
388 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
389 dataOut.error = True
377 390 else:
378 391 break
379 392
380 393 def run(self):
381 394 if self.typeProc is "ProcUnit":
382 395
383 396 if self.inputId is not None:
384 397 self.subscribe()
385 398
386 399 self.set_publisher()
387 400
388 401 if 'Reader' not in BaseClass.__name__:
389 402 self.runProc()
390 403 else:
391 404 self.runReader()
392 405
393 406 elif self.typeProc is "Operation":
394 407
395 408 self.subscribe()
396 409 self.runOp()
397 410
398 411 else:
399 412 raise ValueError("Unknown type")
400 413
401 414 self.close()
402 415
403 416 def close(self):
404 417
405 418 BaseClass.close(self)
406 419 self.err_queue.put('#_end_#')
407 420
408 421 if self.sender:
409 422 self.sender.close()
410 423
411 424 if self.receiver:
412 425 self.receiver.close()
413 426
414 427 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
415 428
416 429 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now