##// END OF EJS Templates
Fix Read function for CLI 'schain xml'
George Yong -
r1184:d00a3ddd0dd0
parent child
Show More
@@ -1,1259 +1,1265
1 1 '''
2 2 Updated on January , 2018, for multiprocessing purposes
3 3 Author: Sergio Cortez
4 4 Created on September , 2012
5 5 '''
6 6 from platform import python_version
7 7 import sys
8 8 import ast
9 9 import datetime
10 10 import traceback
11 11 import math
12 12 import time
13 13 import zmq
14 14 from multiprocessing import Process, cpu_count
15 15 from threading import Thread
16 16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 17 from xml.dom import minidom
18 18
19 19
20 20 from schainpy.admin import Alarm, SchainWarning
21 21
22 22 ### Temporary imports!!!
23 23 # from schainpy.model import *
24 24 from schainpy.model.io import *
25 25 from schainpy.model.graphics import *
26 26 from schainpy.model.proc.jroproc_base import *
27 27 from schainpy.model.proc.bltrproc_parameters import *
28 28 from schainpy.model.proc.jroproc_spectra import *
29 29 from schainpy.model.proc.jroproc_voltage import *
30 30 from schainpy.model.proc.jroproc_parameters import *
31 31 from schainpy.model.utils.jroutils_publish import *
32 32 from schainpy.utils import log
33 33 ###
34 34
35 35 DTYPES = {
36 36 'Voltage': '.r',
37 37 'Spectra': '.pdata'
38 38 }
39 39
40 40
41 41 def MPProject(project, n=cpu_count()):
42 42 '''
43 43 Project wrapper to run schain in n processes
44 44 '''
45 45
46 46 rconf = project.getReadUnitObj()
47 47 op = rconf.getOperationObj('run')
48 48 dt1 = op.getParameterValue('startDate')
49 49 dt2 = op.getParameterValue('endDate')
50 50 tm1 = op.getParameterValue('startTime')
51 51 tm2 = op.getParameterValue('endTime')
52 52 days = (dt2 - dt1).days
53 53
54 54 for day in range(days + 1):
55 55 skip = 0
56 56 cursor = 0
57 57 processes = []
58 58 dt = dt1 + datetime.timedelta(day)
59 59 dt_str = dt.strftime('%Y/%m/%d')
60 60 reader = JRODataReader()
61 61 paths, files = reader.searchFilesOffLine(path=rconf.path,
62 62 startDate=dt,
63 63 endDate=dt,
64 64 startTime=tm1,
65 65 endTime=tm2,
66 66 ext=DTYPES[rconf.datatype])
67 67 nFiles = len(files)
68 68 if nFiles == 0:
69 69 continue
70 70 skip = int(math.ceil(nFiles / n))
71 71 while nFiles > cursor * skip:
72 72 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
73 73 skip=skip)
74 74 p = project.clone()
75 75 p.start()
76 76 processes.append(p)
77 77 cursor += 1
78 78
79 79 def beforeExit(exctype, value, trace):
80 80 for process in processes:
81 81 process.terminate()
82 82 process.join()
83 83 print(traceback.print_tb(trace))
84 84
85 85 sys.excepthook = beforeExit
86 86
87 87 for process in processes:
88 88 process.join()
89 89 process.terminate()
90 90
91 91 time.sleep(3)
92 92
93 93 def wait(context):
94 94
95 95 time.sleep(1)
96 96 c = zmq.Context()
97 97 receiver = c.socket(zmq.SUB)
98 98 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
99 99 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
100 100 log.error('startinggg')
101 101 msg = receiver.recv_multipart()[1]
102 102 #log.error(msg)
103 103 context.terminate()
104 104
105 105 class ParameterConf():
106 106
107 107 id = None
108 108 name = None
109 109 value = None
110 110 format = None
111 111
112 112 __formated_value = None
113 113
114 114 ELEMENTNAME = 'Parameter'
115 115
116 116 def __init__(self):
117 117
118 118 self.format = 'str'
119 119
120 120 def getElementName(self):
121 121
122 122 return self.ELEMENTNAME
123 123
124 124 def getValue(self):
125 125
126 126 value = self.value
127 127 format = self.format
128 128
129 129 if self.__formated_value != None:
130 130
131 131 return self.__formated_value
132 132
133 133 if format == 'obj':
134 134 return value
135 135
136 136 if format == 'str':
137 137 self.__formated_value = str(value)
138 138 return self.__formated_value
139 139
140 140 if value == '':
141 141 raise ValueError('%s: This parameter value is empty' % self.name)
142 142
143 143 if format == 'list':
144 144 strList = value.split(',')
145 145
146 146 self.__formated_value = strList
147 147
148 148 return self.__formated_value
149 149
150 150 if format == 'intlist':
151 151 '''
152 152 Example:
153 153 value = (0,1,2)
154 154 '''
155 155
156 156 new_value = ast.literal_eval(value)
157 157
158 158 if type(new_value) not in (tuple, list):
159 159 new_value = [int(new_value)]
160 160
161 161 self.__formated_value = new_value
162 162
163 163 return self.__formated_value
164 164
165 165 if format == 'floatlist':
166 166 '''
167 167 Example:
168 168 value = (0.5, 1.4, 2.7)
169 169 '''
170 170
171 171 new_value = ast.literal_eval(value)
172 172
173 173 if type(new_value) not in (tuple, list):
174 174 new_value = [float(new_value)]
175 175
176 176 self.__formated_value = new_value
177 177
178 178 return self.__formated_value
179 179
180 180 if format == 'date':
181 181 strList = value.split('/')
182 182 intList = [int(x) for x in strList]
183 183 date = datetime.date(intList[0], intList[1], intList[2])
184 184
185 185 self.__formated_value = date
186 186
187 187 return self.__formated_value
188 188
189 189 if format == 'time':
190 190 strList = value.split(':')
191 191 intList = [int(x) for x in strList]
192 192 time = datetime.time(intList[0], intList[1], intList[2])
193 193
194 194 self.__formated_value = time
195 195
196 196 return self.__formated_value
197 197
198 198 if format == 'pairslist':
199 199 '''
200 200 Example:
201 201 value = (0,1),(1,2)
202 202 '''
203 203
204 204 new_value = ast.literal_eval(value)
205 205
206 206 if type(new_value) not in (tuple, list):
207 207 raise ValueError('%s has to be a tuple or list of pairs' % value)
208 208
209 209 if type(new_value[0]) not in (tuple, list):
210 210 if len(new_value) != 2:
211 211 raise ValueError('%s has to be a tuple or list of pairs' % value)
212 212 new_value = [new_value]
213 213
214 214 for thisPair in new_value:
215 215 if len(thisPair) != 2:
216 216 raise ValueError('%s has to be a tuple or list of pairs' % value)
217 217
218 218 self.__formated_value = new_value
219 219
220 220 return self.__formated_value
221 221
222 222 if format == 'multilist':
223 223 '''
224 224 Example:
225 225 value = (0,1,2),(3,4,5)
226 226 '''
227 227 multiList = ast.literal_eval(value)
228 228
229 229 if type(multiList[0]) == int:
230 230 multiList = ast.literal_eval('(' + value + ')')
231 231
232 232 self.__formated_value = multiList
233 233
234 234 return self.__formated_value
235 235
236 236 if format == 'bool':
237 237 value = int(value)
238 238
239 239 if format == 'int':
240 240 value = float(value)
241 241
242 242 format_func = eval(format)
243 243
244 244 self.__formated_value = format_func(value)
245 245
246 246 return self.__formated_value
247 247
248 248 def updateId(self, new_id):
249 249
250 250 self.id = str(new_id)
251 251
252 252 def setup(self, id, name, value, format='str'):
253 253 self.id = str(id)
254 254 self.name = name
255 255 if format == 'obj':
256 256 self.value = value
257 257 else:
258 258 self.value = str(value)
259 259 self.format = str.lower(format)
260 260
261 261 self.getValue()
262 262
263 263 return 1
264 264
265 265 def update(self, name, value, format='str'):
266 266
267 267 self.name = name
268 268 self.value = str(value)
269 269 self.format = format
270 270
271 271 def makeXml(self, opElement):
272 272 if self.name not in ('queue',):
273 273 parmElement = SubElement(opElement, self.ELEMENTNAME)
274 274 parmElement.set('id', str(self.id))
275 275 parmElement.set('name', self.name)
276 276 parmElement.set('value', self.value)
277 277 parmElement.set('format', self.format)
278
278
279 279 def readXml(self, parmElement):
280 280
281 281 self.id = parmElement.get('id')
282 282 self.name = parmElement.get('name')
283 283 self.value = parmElement.get('value')
284 284 self.format = str.lower(parmElement.get('format'))
285 285
286 286 # Compatible with old signal chain version
287 287 if self.format == 'int' and self.name == 'idfigure':
288 288 self.name = 'id'
289 289
290 290 def printattr(self):
291 291
292 print('Parameter[%s]: name = %s, value = %s, format = %s' % (self.id, self.name, self.value, self.format))
292 print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id))
293 293
294 294 class OperationConf():
295 295
296 296 ELEMENTNAME = 'Operation'
297 297
298 298 def __init__(self):
299 299
300 300 self.id = '0'
301 301 self.name = None
302 302 self.priority = None
303 303 self.topic = None
304 304
305 305 def __getNewId(self):
306 306
307 307 return int(self.id) * 10 + len(self.parmConfObjList) + 1
308 308
309 309 def getId(self):
310 310 return self.id
311 311
312 312 def updateId(self, new_id):
313 313
314 314 self.id = str(new_id)
315 315
316 316 n = 1
317 317 for parmObj in self.parmConfObjList:
318 318
319 319 idParm = str(int(new_id) * 10 + n)
320 320 parmObj.updateId(idParm)
321 321
322 322 n += 1
323 323
324 324 def getElementName(self):
325 325
326 326 return self.ELEMENTNAME
327 327
328 328 def getParameterObjList(self):
329 329
330 330 return self.parmConfObjList
331 331
332 332 def getParameterObj(self, parameterName):
333 333
334 334 for parmConfObj in self.parmConfObjList:
335 335
336 336 if parmConfObj.name != parameterName:
337 337 continue
338 338
339 339 return parmConfObj
340 340
341 341 return None
342 342
343 343 def getParameterObjfromValue(self, parameterValue):
344 344
345 345 for parmConfObj in self.parmConfObjList:
346 346
347 347 if parmConfObj.getValue() != parameterValue:
348 348 continue
349 349
350 350 return parmConfObj.getValue()
351 351
352 352 return None
353 353
354 354 def getParameterValue(self, parameterName):
355 355
356 356 parameterObj = self.getParameterObj(parameterName)
357 357
358 358 # if not parameterObj:
359 359 # return None
360 360
361 361 value = parameterObj.getValue()
362 362
363 363 return value
364 364
365 365 def getKwargs(self):
366 366
367 367 kwargs = {}
368 368
369 369 for parmConfObj in self.parmConfObjList:
370 370 if self.name == 'run' and parmConfObj.name == 'datatype':
371 371 continue
372 372
373 373 kwargs[parmConfObj.name] = parmConfObj.getValue()
374 374
375 375 return kwargs
376 376
377 377 def setup(self, id, name, priority, type, project_id):
378 378
379 379 self.id = str(id)
380 380 self.project_id = project_id
381 381 self.name = name
382 382 self.type = type
383 383 self.priority = priority
384 384 self.parmConfObjList = []
385 385
386 386 def removeParameters(self):
387 387
388 388 for obj in self.parmConfObjList:
389 389 del obj
390 390
391 391 self.parmConfObjList = []
392 392
393 393 def addParameter(self, name, value, format='str'):
394 394
395 395 if value is None:
396 396 return None
397 397 id = self.__getNewId()
398 398
399 399 parmConfObj = ParameterConf()
400 400 if not parmConfObj.setup(id, name, value, format):
401 401 return None
402 402
403 403 self.parmConfObjList.append(parmConfObj)
404 404
405 405 return parmConfObj
406 406
407 407 def changeParameter(self, name, value, format='str'):
408 408
409 409 parmConfObj = self.getParameterObj(name)
410 410 parmConfObj.update(name, value, format)
411 411
412 412 return parmConfObj
413 413
414 414 def makeXml(self, procUnitElement):
415 415
416 416 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
417 417 opElement.set('id', str(self.id))
418 418 opElement.set('name', self.name)
419 419 opElement.set('type', self.type)
420 420 opElement.set('priority', str(self.priority))
421 421
422 422 for parmConfObj in self.parmConfObjList:
423 423 parmConfObj.makeXml(opElement)
424 424
425 def readXml(self, opElement):
425 def readXml(self, opElement, project_id):
426 426
427 427 self.id = opElement.get('id')
428 428 self.name = opElement.get('name')
429 429 self.type = opElement.get('type')
430 430 self.priority = opElement.get('priority')
431 self.project_id = str(project_id) #yong
431 432
432 433 # Compatible with old signal chain version
433 434 # Use of 'run' method instead 'init'
434 435 if self.type == 'self' and self.name == 'init':
435 436 self.name = 'run'
436 437
437 438 self.parmConfObjList = []
438 439
439 440 parmElementList = opElement.iter(ParameterConf().getElementName())
440 441
441 442 for parmElement in parmElementList:
442 443 parmConfObj = ParameterConf()
443 444 parmConfObj.readXml(parmElement)
444 445
445 446 # Compatible with old signal chain version
446 447 # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
447 448 if self.type != 'self' and self.name == 'Plot':
448 449 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
449 450 self.name = parmConfObj.value
450 451 continue
451 452
452 453 self.parmConfObjList.append(parmConfObj)
453 454
454 455 def printattr(self):
455 456
456 print('%s[%s]: name = %s, type = %s, priority = %s' % (self.ELEMENTNAME,
457 print('%s[%s]: name = %s, type = %s, priority = %s, project_id = %s' % (self.ELEMENTNAME,
457 458 self.id,
458 459 self.name,
459 460 self.type,
460 self.priority))
461 self.priority,
462 self.project_id))
461 463
462 464 for parmConfObj in self.parmConfObjList:
463 465 parmConfObj.printattr()
464 466
465 467 def createObject(self):
466 468
467 469 className = eval(self.name)
468
470
469 471 if self.type == 'other':
470 472 opObj = className()
471 473 elif self.type == 'external':
472 474 kwargs = self.getKwargs()
473 475 opObj = className(self.id, self.project_id, **kwargs)
474 476 opObj.start()
475 477
476 478 return opObj
477 479
478 480 class ProcUnitConf():
479 481
480 482 ELEMENTNAME = 'ProcUnit'
481 483
482 484 def __init__(self):
483 485
484 486 self.id = None
485 487 self.datatype = None
486 488 self.name = None
487 489 self.inputId = None
488 490 self.opConfObjList = []
489 491 self.procUnitObj = None
490 492 self.opObjDict = {}
491 493
492 494 def __getPriority(self):
493 495
494 496 return len(self.opConfObjList) + 1
495 497
496 498 def __getNewId(self):
497 499
498 500 return int(self.id) * 10 + len(self.opConfObjList) + 1
499 501
500 502 def getElementName(self):
501 503
502 504 return self.ELEMENTNAME
503 505
504 506 def getId(self):
505 507
506 508 return self.id
507 509
508 510 def updateId(self, new_id):
509 511 '''
510 512 new_id = int(parentId) * 10 + (int(self.id) % 10)
511 513 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
512 514
513 515 # If this proc unit has not inputs
514 516 #if self.inputId == '0':
515 517 #new_inputId = 0
516 518
517 519 n = 1
518 520 for opConfObj in self.opConfObjList:
519 521
520 522 idOp = str(int(new_id) * 10 + n)
521 523 opConfObj.updateId(idOp)
522 524
523 525 n += 1
524 526
525 527 self.parentId = str(parentId)
526 528 self.id = str(new_id)
527 529 #self.inputId = str(new_inputId)
528 530 '''
529 531 n = 1
530 532
531 533 def getInputId(self):
532 534
533 535 return self.inputId
534 536
535 537 def getOperationObjList(self):
536 538
537 539 return self.opConfObjList
538 540
539 541 def getOperationObj(self, name=None):
540 542
541 543 for opConfObj in self.opConfObjList:
542 544
543 545 if opConfObj.name != name:
544 546 continue
545 547
546 548 return opConfObj
547 549
548 550 return None
549 551
550 552 def getOpObjfromParamValue(self, value=None):
551 553
552 554 for opConfObj in self.opConfObjList:
553 555 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
554 556 continue
555 557 return opConfObj
556 558 return None
557 559
558 560 def getProcUnitObj(self):
559 561
560 562 return self.procUnitObj
561 563
562 564 def setup(self, project_id, id, name, datatype, inputId):
563 565 '''
564 566 id sera el topico a publicar
565 567 inputId sera el topico a subscribirse
566 568 '''
567 569
568 570 # Compatible with old signal chain version
569 571 if datatype == None and name == None:
570 572 raise ValueError('datatype or name should be defined')
571 573
572 574 #Definir una condicion para inputId cuando sea 0
573 575
574 576 if name == None:
575 577 if 'Proc' in datatype:
576 578 name = datatype
577 579 else:
578 580 name = '%sProc' % (datatype)
579 581
580 582 if datatype == None:
581 583 datatype = name.replace('Proc', '')
582 584
583 585 self.id = str(id)
584 586 self.project_id = project_id
585 587 self.name = name
586 588 self.datatype = datatype
587 589 self.inputId = inputId
588 590 self.opConfObjList = []
589 591
590 592 self.addOperation(name='run', optype='self')
591 593
592 594 def removeOperations(self):
593 595
594 596 for obj in self.opConfObjList:
595 597 del obj
596 598
597 599 self.opConfObjList = []
598 600 self.addOperation(name='run')
599 601
600 602 def addParameter(self, **kwargs):
601 603 '''
602 604 Add parameters to 'run' operation
603 605 '''
604 606 opObj = self.opConfObjList[0]
605 607
606 608 opObj.addParameter(**kwargs)
607 609
608 610 return opObj
609 611
610 612 def addOperation(self, name, optype='self'):
611 613 '''
612 614 Actualizacion - > proceso comunicacion
613 615 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
614 616 definir el tipoc de socket o comunicacion ipc++
615 617
616 618 '''
617 619
618 620 id = self.__getNewId()
619 621 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
620 622 opConfObj = OperationConf()
621 623 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id)
622 624 self.opConfObjList.append(opConfObj)
623 625
624 626 return opConfObj
625 627
626 628 def makeXml(self, projectElement):
627 629
628 630 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
629 631 procUnitElement.set('id', str(self.id))
630 632 procUnitElement.set('name', self.name)
631 633 procUnitElement.set('datatype', self.datatype)
632 634 procUnitElement.set('inputId', str(self.inputId))
633 635
634 636 for opConfObj in self.opConfObjList:
635 637 opConfObj.makeXml(procUnitElement)
636 638
637 def readXml(self, upElement):
639 def readXml(self, upElement, project_id):
638 640
639 641 self.id = upElement.get('id')
640 642 self.name = upElement.get('name')
641 643 self.datatype = upElement.get('datatype')
642 644 self.inputId = upElement.get('inputId')
645 self.project_id = str(project_id)
643 646
644 647 if self.ELEMENTNAME == 'ReadUnit':
645 648 self.datatype = self.datatype.replace('Reader', '')
646 649
647 650 if self.ELEMENTNAME == 'ProcUnit':
648 651 self.datatype = self.datatype.replace('Proc', '')
649 652
650 653 if self.inputId == 'None':
651 654 self.inputId = '0'
652 655
653 656 self.opConfObjList = []
654 657
655 658 opElementList = upElement.iter(OperationConf().getElementName())
656 659
657 660 for opElement in opElementList:
658 661 opConfObj = OperationConf()
659 opConfObj.readXml(opElement)
662 opConfObj.readXml(opElement, project_id)
660 663 self.opConfObjList.append(opConfObj)
661 664
662 665 def printattr(self):
663 666
664 print('%s[%s]: name = %s, datatype = %s, inputId = %s' % (self.ELEMENTNAME,
667 print('%s[%s]: name = %s, datatype = %s, inputId = %s, project_id = %s' % (self.ELEMENTNAME,
665 668 self.id,
666 669 self.name,
667 670 self.datatype,
668 self.inputId))
671 self.inputId,
672 self.project_id))
669 673
670 674 for opConfObj in self.opConfObjList:
671 675 opConfObj.printattr()
672 676
673 677 def getKwargs(self):
674 678
675 679 opObj = self.opConfObjList[0]
676 680 kwargs = opObj.getKwargs()
677 681
678 682 return kwargs
679 683
680 684 def createObjects(self):
681 685 '''
682 686 Instancia de unidades de procesamiento.
683 687 '''
684 688 className = eval(self.name)
685 689 kwargs = self.getKwargs()
686 690 procUnitObj = className(self.id, self.inputId, self.project_id, **kwargs) # necesitan saber su id y su entrada por fines de ipc
687 691 log.success('creating process...', self.name)
688 692
689 693 for opConfObj in self.opConfObjList:
690 694
691 695 if opConfObj.type == 'self' and opConfObj.name == 'run':
692 696 continue
693 697 elif opConfObj.type == 'self':
694 698 opObj = getattr(procUnitObj, opConfObj.name)
695 699 else:
696 700 opObj = opConfObj.createObject()
697 701
698 702 log.success('creating operation: {}, type:{}'.format(
699 703 opConfObj.name,
700 704 opConfObj.type), self.name)
701 705
702 706 procUnitObj.addOperation(opConfObj, opObj)
703 707
704 708 procUnitObj.start()
705 709 self.procUnitObj = procUnitObj
706 710
707 711 def close(self):
708 712
709 713 for opConfObj in self.opConfObjList:
710 714 if opConfObj.type == 'self':
711 715 continue
712 716
713 717 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
714 718 opObj.close()
715 719
716 720 self.procUnitObj.close()
717 721
718 722 return
719 723
720 724
721 725 class ReadUnitConf(ProcUnitConf):
722 726
723 727 ELEMENTNAME = 'ReadUnit'
724 728
725 729 def __init__(self):
726 730
727 731 self.id = None
728 732 self.datatype = None
729 733 self.name = None
730 734 self.inputId = None
731 735 self.opConfObjList = []
732 736
733 737 def getElementName(self):
734 738
735 739 return self.ELEMENTNAME
736 740
737 741 def setup(self, project_id, id, name, datatype, path='', startDate='', endDate='',
738 742 startTime='', endTime='', server=None, **kwargs):
739 743
740 744
741 745 '''
742 746 *****el id del proceso sera el Topico
743 747
744 748 Adicion de {topic}, si no esta presente -> error
745 749 kwargs deben ser trasmitidos en la instanciacion
746 750
747 751 '''
748 752
749 753 # Compatible with old signal chain version
750 754 if datatype == None and name == None:
751 755 raise ValueError('datatype or name should be defined')
752 756 if name == None:
753 757 if 'Reader' in datatype:
754 758 name = datatype
755 759 datatype = name.replace('Reader','')
756 760 else:
757 761 name = '{}Reader'.format(datatype)
758 762 if datatype == None:
759 763 if 'Reader' in name:
760 764 datatype = name.replace('Reader','')
761 765 else:
762 766 datatype = name
763 767 name = '{}Reader'.format(name)
764 768
765 769 self.id = id
766 770 self.project_id = project_id
767 771 self.name = name
768 772 self.datatype = datatype
769 773 if path != '':
770 774 self.path = os.path.abspath(path)
771 775 self.startDate = startDate
772 776 self.endDate = endDate
773 777 self.startTime = startTime
774 778 self.endTime = endTime
775 779 self.server = server
776 780 self.addRunOperation(**kwargs)
777 781
778 782 def update(self, **kwargs):
779 783
780 784 if 'datatype' in kwargs:
781 785 datatype = kwargs.pop('datatype')
782 786 if 'Reader' in datatype:
783 787 self.name = datatype
784 788 else:
785 789 self.name = '%sReader' % (datatype)
786 790 self.datatype = self.name.replace('Reader', '')
787 791
788 792 attrs = ('path', 'startDate', 'endDate',
789 793 'startTime', 'endTime')
790 794
791 795 for attr in attrs:
792 796 if attr in kwargs:
793 797 setattr(self, attr, kwargs.pop(attr))
794 798
795 799 self.updateRunOperation(**kwargs)
796 800
797 801 def removeOperations(self):
798 802
799 803 for obj in self.opConfObjList:
800 804 del obj
801 805
802 806 self.opConfObjList = []
803 807
804 808 def addRunOperation(self, **kwargs):
805 809
806 810 opObj = self.addOperation(name='run', optype='self')
807 811
808 812 if self.server is None:
809 813 opObj.addParameter(
810 814 name='datatype', value=self.datatype, format='str')
811 815 opObj.addParameter(name='path', value=self.path, format='str')
812 816 opObj.addParameter(
813 817 name='startDate', value=self.startDate, format='date')
814 818 opObj.addParameter(
815 819 name='endDate', value=self.endDate, format='date')
816 820 opObj.addParameter(
817 821 name='startTime', value=self.startTime, format='time')
818 822 opObj.addParameter(
819 823 name='endTime', value=self.endTime, format='time')
820 824
821 825 for key, value in list(kwargs.items()):
822 826 opObj.addParameter(name=key, value=value,
823 827 format=type(value).__name__)
824 828 else:
825 829 opObj.addParameter(name='server', value=self.server, format='str')
826 830
827 831 return opObj
828 832
829 833 def updateRunOperation(self, **kwargs):
830 834
831 835 opObj = self.getOperationObj(name='run')
832 836 opObj.removeParameters()
833 837
834 838 opObj.addParameter(name='datatype', value=self.datatype, format='str')
835 839 opObj.addParameter(name='path', value=self.path, format='str')
836 840 opObj.addParameter(
837 841 name='startDate', value=self.startDate, format='date')
838 842 opObj.addParameter(name='endDate', value=self.endDate, format='date')
839 843 opObj.addParameter(
840 844 name='startTime', value=self.startTime, format='time')
841 845 opObj.addParameter(name='endTime', value=self.endTime, format='time')
842 846
843 847 for key, value in list(kwargs.items()):
844 848 opObj.addParameter(name=key, value=value,
845 849 format=type(value).__name__)
846 850
847 851 return opObj
848 852
849 def readXml(self, upElement):
853 def readXml(self, upElement, project_id):
850 854
851 855 self.id = upElement.get('id')
852 856 self.name = upElement.get('name')
853 857 self.datatype = upElement.get('datatype')
858 self.project_id = str(project_id) #yong
854 859
855 860 if self.ELEMENTNAME == 'ReadUnit':
856 861 self.datatype = self.datatype.replace('Reader', '')
857 862
858 863 self.opConfObjList = []
859 864
860 865 opElementList = upElement.iter(OperationConf().getElementName())
861 866
862 867 for opElement in opElementList:
863 868 opConfObj = OperationConf()
864 opConfObj.readXml(opElement)
869 opConfObj.readXml(opElement, project_id)
865 870 self.opConfObjList.append(opConfObj)
866 871
867 872 if opConfObj.name == 'run':
868 873 self.path = opConfObj.getParameterValue('path')
869 874 self.startDate = opConfObj.getParameterValue('startDate')
870 875 self.endDate = opConfObj.getParameterValue('endDate')
871 876 self.startTime = opConfObj.getParameterValue('startTime')
872 877 self.endTime = opConfObj.getParameterValue('endTime')
873 878
874 879
875 880 class Project(Process):
876 881
877 882 ELEMENTNAME = 'Project'
878 883
879 884 def __init__(self):
880 885
881 886 Process.__init__(self)
882 887 self.id = None
883 888 self.filename = None
884 889 self.description = None
885 890 self.email = None
886 891 self.alarm = None
887 892 self.procUnitConfObjDict = {}
888 893
889 894 def __getNewId(self):
890 895
891 896 idList = list(self.procUnitConfObjDict.keys())
892 897 id = int(self.id) * 10
893 898
894 899 while True:
895 900 id += 1
896 901
897 902 if str(id) in idList:
898 903 continue
899 904
900 905 break
901 906
902 907 return str(id)
903 908
904 909 def getElementName(self):
905 910
906 911 return self.ELEMENTNAME
907 912
908 913 def getId(self):
909 914
910 915 return self.id
911 916
912 917 def updateId(self, new_id):
913 918
914 919 self.id = str(new_id)
915 920
916 921 keyList = list(self.procUnitConfObjDict.keys())
917 922 keyList.sort()
918 923
919 924 n = 1
920 925 newProcUnitConfObjDict = {}
921 926
922 927 for procKey in keyList:
923 928
924 929 procUnitConfObj = self.procUnitConfObjDict[procKey]
925 930 idProcUnit = str(int(self.id) * 10 + n)
926 931 procUnitConfObj.updateId(idProcUnit)
927 932 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
928 933 n += 1
929 934
930 935 self.procUnitConfObjDict = newProcUnitConfObjDict
931 936
932 937 def setup(self, id=1, name='', description='', email=None, alarm=[]):
933 938
934 939 print(' ')
935 940 print('*' * 60)
936 941 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
937 942 print('*' * 60)
938 943 print("* Python " + python_version() + " *")
939 944 print('*' * 19)
940 945 print(' ')
941 946 self.id = str(id)
942 947 self.description = description
943 948 self.email = email
944 949 self.alarm = alarm
945 950
946 951 def update(self, **kwargs):
947 952
948 953 for key, value in list(kwargs.items()):
949 954 setattr(self, key, value)
950 955
951 956 def clone(self):
952 957
953 958 p = Project()
954 959 p.procUnitConfObjDict = self.procUnitConfObjDict
955 960 return p
956 961
957 962 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
958 963
959 964 '''
960 965 Actualizacion:
961 966 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
962 967
963 968 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
964 969
965 970 '''
966 971
967 972 if id is None:
968 973 idReadUnit = self.__getNewId()
969 974 else:
970 975 idReadUnit = str(id)
971 976
972 977 readUnitConfObj = ReadUnitConf()
973 978 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, **kwargs)
974 979 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
975 980
976 981 return readUnitConfObj
977 982
978 983 def addProcUnit(self, inputId='0', datatype=None, name=None):
979 984
980 985 '''
981 986 Actualizacion:
982 987 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
983 988 Deberia reemplazar a "inputId"
984 989
985 990 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
986 991 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
987 992
988 993 '''
989 994
990 995 idProcUnit = self.__getNewId() #Topico para subscripcion
991 996 procUnitConfObj = ProcUnitConf()
992 997 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId) #topic_read, topic_write,
993 998 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
994 999
995 1000 return procUnitConfObj
996 1001
997 1002 def removeProcUnit(self, id):
998 1003
999 1004 if id in list(self.procUnitConfObjDict.keys()):
1000 1005 self.procUnitConfObjDict.pop(id)
1001 1006
1002 1007 def getReadUnitId(self):
1003 1008
1004 1009 readUnitConfObj = self.getReadUnitObj()
1005 1010
1006 1011 return readUnitConfObj.id
1007 1012
1008 1013 def getReadUnitObj(self):
1009 1014
1010 1015 for obj in list(self.procUnitConfObjDict.values()):
1011 1016 if obj.getElementName() == 'ReadUnit':
1012 1017 return obj
1013 1018
1014 1019 return None
1015 1020
1016 1021 def getProcUnitObj(self, id=None, name=None):
1017 1022
1018 1023 if id != None:
1019 1024 return self.procUnitConfObjDict[id]
1020 1025
1021 1026 if name != None:
1022 1027 return self.getProcUnitObjByName(name)
1023 1028
1024 1029 return None
1025 1030
1026 1031 def getProcUnitObjByName(self, name):
1027 1032
1028 1033 for obj in list(self.procUnitConfObjDict.values()):
1029 1034 if obj.name == name:
1030 1035 return obj
1031 1036
1032 1037 return None
1033 1038
1034 1039 def procUnitItems(self):
1035 1040
1036 1041 return list(self.procUnitConfObjDict.items())
1037 1042
1038 1043 def makeXml(self):
1039 1044
1040 1045 projectElement = Element('Project')
1041 1046 projectElement.set('id', str(self.id))
1042 1047 projectElement.set('name', self.name)
1043 1048 projectElement.set('description', self.description)
1044 1049
1045 1050 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1046 1051 procUnitConfObj.makeXml(projectElement)
1047 1052
1048 1053 self.projectElement = projectElement
1049 1054
1050 1055 def writeXml(self, filename=None):
1051 1056
1052 1057 if filename == None:
1053 1058 if self.filename:
1054 1059 filename = self.filename
1055 1060 else:
1056 1061 filename = 'schain.xml'
1057 1062
1058 1063 if not filename:
1059 1064 print('filename has not been defined. Use setFilename(filename) for do it.')
1060 1065 return 0
1061 1066
1062 1067 abs_file = os.path.abspath(filename)
1063 1068
1064 1069 if not os.access(os.path.dirname(abs_file), os.W_OK):
1065 1070 print('No write permission on %s' % os.path.dirname(abs_file))
1066 1071 return 0
1067 1072
1068 1073 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1069 1074 print('File %s already exists and it could not be overwriten' % abs_file)
1070 1075 return 0
1071 1076
1072 1077 self.makeXml()
1073 1078
1074 1079 ElementTree(self.projectElement).write(abs_file, method='xml')
1075 1080
1076 1081 self.filename = abs_file
1077 1082
1078 1083 return 1
1079 1084
1080 1085 def readXml(self, filename=None):
1081 1086
1082 1087 if not filename:
1083 1088 print('filename is not defined')
1084 1089 return 0
1085 1090
1086 1091 abs_file = os.path.abspath(filename)
1087 1092
1088 1093 if not os.path.isfile(abs_file):
1089 1094 print('%s file does not exist' % abs_file)
1090 1095 return 0
1091 1096
1092 1097 self.projectElement = None
1093 1098 self.procUnitConfObjDict = {}
1094 1099
1095 1100 try:
1096 1101 self.projectElement = ElementTree().parse(abs_file)
1097 1102 except:
1098 1103 print('Error reading %s, verify file format' % filename)
1099 1104 return 0
1100 1105
1101 1106 self.project = self.projectElement.tag
1102 1107
1103 1108 self.id = self.projectElement.get('id')
1104 1109 self.name = self.projectElement.get('name')
1105 1110 self.description = self.projectElement.get('description')
1106 1111
1107 1112 readUnitElementList = self.projectElement.iter(
1108 1113 ReadUnitConf().getElementName())
1109 1114
1110 1115 for readUnitElement in readUnitElementList:
1111 1116 readUnitConfObj = ReadUnitConf()
1112 readUnitConfObj.readXml(readUnitElement)
1117 readUnitConfObj.readXml(readUnitElement, self.id)
1113 1118 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1114 1119
1115 1120 procUnitElementList = self.projectElement.iter(
1116 1121 ProcUnitConf().getElementName())
1117 1122
1118 1123 for procUnitElement in procUnitElementList:
1119 1124 procUnitConfObj = ProcUnitConf()
1120 procUnitConfObj.readXml(procUnitElement)
1125 procUnitConfObj.readXml(procUnitElement, self.id)
1121 1126 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1122 1127
1123 1128 self.filename = abs_file
1124 1129
1125 1130 return 1
1126 1131
1127 1132 def __str__(self):
1128 1133
1129 print('Project[%s]: name = %s, description = %s' % (self.id,
1134 print('Project[%s]: name = %s, description = %s, project_id = %s' % (self.id,
1130 1135 self.name,
1131 self.description))
1136 self.description,
1137 self.project_id))
1132 1138
1133 1139 for procUnitConfObj in self.procUnitConfObjDict.values():
1134 1140 print(procUnitConfObj)
1135 1141
1136 1142 def createObjects(self):
1137 1143
1138 1144 for procUnitConfObj in self.procUnitConfObjDict.values():
1139 1145 procUnitConfObj.createObjects()
1140 1146
1141 1147 def __handleError(self, procUnitConfObj, modes=None, stdout=True):
1142 1148
1143 1149 import socket
1144 1150
1145 1151 if modes is None:
1146 1152 modes = self.alarm
1147 1153
1148 1154 if not self.alarm:
1149 1155 modes = []
1150 1156
1151 1157 err = traceback.format_exception(sys.exc_info()[0],
1152 1158 sys.exc_info()[1],
1153 1159 sys.exc_info()[2])
1154 1160
1155 1161 log.error('{}'.format(err[-1]), procUnitConfObj.name)
1156 1162
1157 1163 message = ''.join(err)
1158 1164
1159 1165 if stdout:
1160 1166 sys.stderr.write(message)
1161 1167
1162 1168 subject = 'SChain v%s: Error running %s\n' % (
1163 1169 schainpy.__version__, procUnitConfObj.name)
1164 1170
1165 1171 subtitle = '%s: %s\n' % (
1166 1172 procUnitConfObj.getElementName(), procUnitConfObj.name)
1167 1173 subtitle += 'Hostname: %s\n' % socket.gethostbyname(
1168 1174 socket.gethostname())
1169 1175 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
1170 1176 subtitle += 'Configuration file: %s\n' % self.filename
1171 1177 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1172 1178
1173 1179 readUnitConfObj = self.getReadUnitObj()
1174 1180 if readUnitConfObj:
1175 1181 subtitle += '\nInput parameters:\n'
1176 1182 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
1177 1183 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
1178 1184 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
1179 1185 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
1180 1186 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
1181 1187 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1182 1188
1183 1189 a = Alarm(
1184 1190 modes=modes,
1185 1191 email=self.email,
1186 1192 message=message,
1187 1193 subject=subject,
1188 1194 subtitle=subtitle,
1189 1195 filename=self.filename
1190 1196 )
1191 1197
1192 1198 return a
1193 1199
1194 1200 def isPaused(self):
1195 1201 return 0
1196 1202
1197 1203 def isStopped(self):
1198 1204 return 0
1199 1205
1200 1206 def runController(self):
1201 1207 '''
1202 1208 returns 0 when this process has been stopped, 1 otherwise
1203 1209 '''
1204 1210
1205 1211 if self.isPaused():
1206 1212 print('Process suspended')
1207 1213
1208 1214 while True:
1209 1215 time.sleep(0.1)
1210 1216
1211 1217 if not self.isPaused():
1212 1218 break
1213 1219
1214 1220 if self.isStopped():
1215 1221 break
1216 1222
1217 1223 print('Process reinitialized')
1218 1224
1219 1225 if self.isStopped():
1220 1226 print('Process stopped')
1221 1227 return 0
1222 1228
1223 1229 return 1
1224 1230
1225 1231 def setFilename(self, filename):
1226 1232
1227 1233 self.filename = filename
1228 1234
1229 1235 def setProxyCom(self):
1230 1236
1231 1237 if not os.path.exists('/tmp/schain'):
1232 1238 os.mkdir('/tmp/schain')
1233 1239
1234 1240 self.ctx = zmq.Context()
1235 1241 xpub = self.ctx.socket(zmq.XPUB)
1236 1242 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1237 1243 xsub = self.ctx.socket(zmq.XSUB)
1238 1244 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1239 1245
1240 1246 try:
1241 1247 zmq.proxy(xpub, xsub)
1242 1248 except zmq.ContextTerminated:
1243 1249 xpub.close()
1244 1250 xsub.close()
1245 1251
1246 1252 def run(self):
1247 1253
1248 1254 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1249 1255 self.start_time = time.time()
1250 1256 self.createObjects()
1251 1257 # t = Thread(target=wait, args=(self.ctx, ))
1252 1258 # t.start()
1253 1259 self.setProxyCom()
1254 1260
1255 1261 # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo
1256 1262
1257 1263 log.success('{} finished (time: {}s)'.format(
1258 1264 self.name,
1259 1265 time.time()-self.start_time))
General Comments 0
You need to be logged in to leave comments. Login now