##// END OF EJS Templates
Merge branch 'v3.0-devel' of http://jro-dev.igp.gob.pe/rhodecode/schain into v3.0-devel
Juan C. Espinoza -
r1175:7e36d2e90f1d merge
parent child
Show More
@@ -1,23 +1,36
1 1 '''
2 Updated on January , 2018, for multiprocessing purposes
3 Author: Sergio Cortez
2 4 Created on September , 2012
3 @author:
4 5 '''
5
6 from platform import python_version
6 7 import sys
7 8 import ast
8 9 import datetime
9 10 import traceback
10 11 import math
11 12 import time
13 import zmq
12 14 from multiprocessing import Process, cpu_count
13 15
14 16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
15 17 from xml.dom import minidom
16 18
17 import schainpy
19
18 20 from schainpy.admin import Alarm, SchainWarning
19 from schainpy.model import *
21
22 ### Temporary imports!!!
23 # from schainpy.model import *
24 from schainpy.model.io import *
25 from schainpy.model.graphics import *
26 from schainpy.model.proc.jroproc_base import *
27 from schainpy.model.proc.bltrproc_parameters import *
28 from schainpy.model.proc.jroproc_spectra import *
29 from schainpy.model.proc.jroproc_voltage import *
30 from schainpy.model.proc.jroproc_parameters import *
31 from schainpy.model.utils.jroutils_publish import *
20 32 from schainpy.utils import log
33 ###
21 34
22 35 DTYPES = {
23 36 'Voltage': '.r',
@@ -77,7 +90,6 def MPProject(project, n=cpu_count()):
77 90
78 91 time.sleep(3)
79 92
80
81 93 class ParameterConf():
82 94
83 95 id = None
@@ -267,7 +279,6 class ParameterConf():
267 279
268 280 print('Parameter[%s]: name = %s, value = %s, format = %s' % (self.id, self.name, self.value, self.format))
269 281
270
271 282 class OperationConf():
272 283
273 284 id = None
@@ -284,12 +295,15 class OperationConf():
284 295 self.id = '0'
285 296 self.name = None
286 297 self.priority = None
287 self.type = 'self'
298 self.topic = None
288 299
289 300 def __getNewId(self):
290 301
291 302 return int(self.id) * 10 + len(self.parmConfObjList) + 1
292 303
304 def getId(self):
305 return self.id
306
293 307 def updateId(self, new_id):
294 308
295 309 self.id = str(new_id)
@@ -361,7 +375,6 class OperationConf():
361 375 self.name = name
362 376 self.type = type
363 377 self.priority = priority
364
365 378 self.parmConfObjList = []
366 379
367 380 def removeParameters(self):
@@ -443,27 +456,19 class OperationConf():
443 456 for parmConfObj in self.parmConfObjList:
444 457 parmConfObj.printattr()
445 458
446 def createObject(self, plotter_queue=None):
447
448 if self.type == 'self':
449 raise ValueError('This operation type cannot be created')
459 def createObject(self):
450 460
451 if self.type == 'plotter':
452 if not plotter_queue:
453 raise ValueError('plotter_queue is not defined. Use:\nmyProject = Project()\nmyProject.setPlotterQueue(plotter_queue)')
454
455 opObj = Plotter(self.name, plotter_queue)
461 className = eval(self.name)
462 kwargs = self.getKwargs()
456 463
457 if self.type == 'external' or self.type == 'other':
464 opObj = className(self.id, **kwargs)
458 465
459 className = eval(self.name)
460 kwargs = self.getKwargs()
466 opObj.start()
461 467
462 opObj = className(**kwargs)
468 print(' Operation created')
463 469
464 470 return opObj
465 471
466
467 472 class ProcUnitConf():
468 473
469 474 id = None
@@ -484,7 +489,7 class ProcUnitConf():
484 489 self.id = None
485 490 self.datatype = None
486 491 self.name = None
487 self.inputId = None
492 self.inputId = None
488 493
489 494 self.opConfObjList = []
490 495
@@ -507,14 +512,14 class ProcUnitConf():
507 512
508 513 return self.id
509 514
510 def updateId(self, new_id, parentId=parentId):
511
515 def updateId(self, new_id, parentId=parentId):
516 '''
512 517 new_id = int(parentId) * 10 + (int(self.id) % 10)
513 518 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
514 519
515 520 # If this proc unit has not inputs
516 if self.inputId == '0':
517 new_inputId = 0
521 #if self.inputId == '0':
522 #new_inputId = 0
518 523
519 524 n = 1
520 525 for opConfObj in self.opConfObjList:
@@ -526,8 +531,9 class ProcUnitConf():
526 531
527 532 self.parentId = str(parentId)
528 533 self.id = str(new_id)
529 self.inputId = str(new_inputId)
530
534 #self.inputId = str(new_inputId)
535 '''
536 n = 1
531 537 def getInputId(self):
532 538
533 539 return self.inputId
@@ -560,11 +566,17 class ProcUnitConf():
560 566 return self.procUnitObj
561 567
562 568 def setup(self, id, name, datatype, inputId, parentId=None):
563
569 '''
570 id sera el topico a publicar
571 inputId sera el topico a subscribirse
572 '''
573
564 574 # Compatible with old signal chain version
565 575 if datatype == None and name == None:
566 576 raise ValueError('datatype or name should be defined')
567 577
578 #Definir una condicion para inputId cuando sea 0
579
568 580 if name == None:
569 581 if 'Proc' in datatype:
570 582 name = datatype
@@ -577,12 +589,11 class ProcUnitConf():
577 589 self.id = str(id)
578 590 self.name = name
579 591 self.datatype = datatype
580 self.inputId = inputId
592 self.inputId = inputId
581 593 self.parentId = parentId
582
583 594 self.opConfObjList = []
584 595
585 self.addOperation(name='run', optype='self')
596 self.addOperation(name='run', optype='self')
586 597
587 598 def removeOperations(self):
588 599
@@ -602,10 +613,16 class ProcUnitConf():
602 613
603 614 return opObj
604 615
605 def addOperation(self, name, optype='self'):
616 def addOperation(self, name, optype = 'self'):
617 '''
618 Actualizacion - > proceso comunicacion
619 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
620 definir el tipoc de socket o comunicacion ipc++
621
622 '''
606 623
607 624 id = self.__getNewId()
608 priority = self.__getPriority()
625 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
609 626
610 627 opConfObj = OperationConf()
611 628 opConfObj.setup(id, name=name, priority=priority, type=optype)
@@ -668,11 +685,15 class ProcUnitConf():
668 685
669 686 return kwargs
670 687
671 def createObjects(self, plotter_queue=None):
688 def createObjects(self, dictUnits):
689 '''
690 Instancia de unidades de procesamiento.
672 691
692 '''
673 693 className = eval(self.name)
674 694 kwargs = self.getKwargs()
675 procUnitObj = className(**kwargs)
695 procUnitObj = className(self.id, self.inputId, dictUnits, **kwargs) # necesitan saber su id y su entrada por fines de ipc
696
676 697
677 698 for opConfObj in self.opConfObjList:
678 699
@@ -682,21 +703,25 class ProcUnitConf():
682 703 procUnitObj.addOperationKwargs(
683 704 opConfObj.id, **opConfObj.getKwargs())
684 705 continue
685
686 opObj = opConfObj.createObject(plotter_queue)
687
688 self.opObjDict[opConfObj.id] = opObj
689
690 procUnitObj.addOperation(opObj, opConfObj.id)
706 print("Creating operation process:", opConfObj.name, "for", self.name)
707 opObj = opConfObj.createObject()
708
709
710 #self.opObjDict[opConfObj.id] = opObj.name
711
712 procUnitObj.addOperation(opConfObj.name, opConfObj.id)
713
714 procUnitObj.start()
691 715
692 716 self.procUnitObj = procUnitObj
717
693 718
694 719 return procUnitObj
695 720
696 721 def run(self):
697
698 is_ok = False
699
722
723 is_ok = True
724 """
700 725 for opConfObj in self.opConfObjList:
701 726
702 727 kwargs = {}
@@ -711,9 +736,11 class ProcUnitConf():
711 736 opId=opConfObj.id)
712 737
713 738 is_ok = is_ok or sts
714
739
740 """
715 741 return is_ok
716
742
743
717 744 def close(self):
718 745
719 746 for opConfObj in self.opConfObjList:
@@ -752,11 +779,20 class ReadUnitConf(ProcUnitConf):
752 779
753 780 def getElementName(self):
754 781
755 return self.ELEMENTNAME
756
782 return self.ELEMENTNAME
783
757 784 def setup(self, id, name, datatype, path='', startDate='', endDate='',
758 785 startTime='', endTime='', parentId=None, server=None, **kwargs):
759 786
787
788 '''
789 *****el id del proceso sera el Topico
790
791 Adicion de {topic}, si no esta presente -> error
792 kwargs deben ser trasmitidos en la instanciacion
793
794 '''
795
760 796 # Compatible with old signal chain version
761 797 if datatype == None and name == None:
762 798 raise ValueError('datatype or name should be defined')
@@ -814,9 +850,9 class ReadUnitConf(ProcUnitConf):
814 850
815 851 self.opConfObjList = []
816 852
817 def addRunOperation(self, **kwargs):
853 def addRunOperation(self, **kwargs):
818 854
819 opObj = self.addOperation(name='run', optype='self')
855 opObj = self.addOperation(name='run', optype='self')
820 856
821 857 if self.server is None:
822 858 opObj.addParameter(
@@ -892,7 +928,6 class ReadUnitConf(ProcUnitConf):
892 928 class Project(Process):
893 929
894 930 id = None
895 # name = None
896 931 description = None
897 932 filename = None
898 933
@@ -900,16 +935,15 class Project(Process):
900 935
901 936 ELEMENTNAME = 'Project'
902 937
903 plotterQueue = None
938
904 939
905 def __init__(self, plotter_queue=None):
940 def __init__(self):
906 941
907 942 Process.__init__(self)
908 self.id = None
943 self.id = None
909 944 self.description = None
910 945 self.email = None
911 946 self.alarm = None
912 self.plotterQueue = plotter_queue
913 947 self.procUnitConfObjDict = {}
914 948
915 949 def __getNewId(self):
@@ -958,13 +992,15 class Project(Process):
958 992
959 993 def setup(self, id, name='', description='', email=None, alarm=[]):
960 994
961 print()
995 print(' ')
962 996 print('*' * 60)
963 print(' Starting SIGNAL CHAIN PROCESSING v%s ' % schainpy.__version__)
997 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
964 998 print('*' * 60)
965 print()
999 print("* Python " + python_version() + " *")
1000 print('*' * 19)
1001 print(' ')
966 1002 self.id = str(id)
967 self.description = description
1003 self.description = description
968 1004 self.email = email
969 1005 self.alarm = alarm
970 1006
@@ -981,6 +1017,14 class Project(Process):
981 1017
982 1018 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
983 1019
1020 '''
1021 Actualizacion:
1022 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
1023
1024 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
1025
1026 '''
1027
984 1028 if id is None:
985 1029 idReadUnit = self.__getNewId()
986 1030 else:
@@ -991,16 +1035,26 class Project(Process):
991 1035 parentId=self.id, **kwargs)
992 1036
993 1037 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
994
1038
995 1039 return readUnitConfObj
996 1040
997 1041 def addProcUnit(self, inputId='0', datatype=None, name=None):
998 1042
999 idProcUnit = self.__getNewId()
1043 '''
1044 Actualizacion:
1045 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
1046 Deberia reemplazar a "inputId"
1047
1048 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
1049 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
1050
1051 '''
1052
1053 idProcUnit = self.__getNewId() #Topico para subscripcion
1000 1054
1001 1055 procUnitConfObj = ProcUnitConf()
1002 procUnitConfObj.setup(idProcUnit, name, datatype,
1003 inputId, parentId=self.id)
1056 procUnitConfObj.setup(idProcUnit, name, datatype, inputId, #topic_read, topic_write,
1057 parentId=self.id)
1004 1058
1005 1059 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1006 1060
@@ -1156,29 +1210,11 class Project(Process):
1156 1210 def createObjects(self):
1157 1211
1158 1212 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1159 procUnitConfObj.createObjects(self.plotterQueue)
1160
1161 def __connect(self, objIN, thisObj):
1162
1163 thisObj.setInput(objIN.getOutputObj())
1164
1165 def connectObjects(self):
1166
1167 for thisPUConfObj in list(self.procUnitConfObjDict.values()):
1213 print("Creating process:", procUnitConfObj.name)
1214 procUnitConfObj.createObjects(self.procUnitConfObjDict)
1215
1168 1216
1169 inputId = thisPUConfObj.getInputId()
1170
1171 if int(inputId) == 0:
1172 continue
1173
1174 # Get input object
1175 puConfINObj = self.procUnitConfObjDict[inputId]
1176 puObjIN = puConfINObj.getProcUnitObj()
1177
1178 # Get current object
1179 thisPUObj = thisPUConfObj.getProcUnitObj()
1180
1181 self.__connect(puObjIN, thisPUObj)
1217 print('All processes were created')
1182 1218
1183 1219 def __handleError(self, procUnitConfObj, modes=None, stdout=True):
1184 1220
@@ -1193,7 +1229,7 class Project(Process):
1193 1229 err = traceback.format_exception(sys.exc_info()[0],
1194 1230 sys.exc_info()[1],
1195 1231 sys.exc_info()[2])
1196
1232
1197 1233 log.error('{}'.format(err[-1]), procUnitConfObj.name)
1198 1234
1199 1235 message = ''.join(err)
@@ -1268,75 +1304,30 class Project(Process):
1268 1304
1269 1305 self.filename = filename
1270 1306
1271 def setPlotterQueue(self, plotter_queue):
1272
1273 raise NotImplementedError('Use schainpy.controller_api.ControllerThread instead Project class')
1274
1275 def getPlotterQueue(self):
1276
1277 raise NotImplementedError('Use schainpy.controller_api.ControllerThread instead Project class')
1278
1279 def useExternalPlotter(self):
1307 def setProxyCom(self):
1308
1309 ctx = zmq.Context()
1310 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
1311 xsub = ctx.socket(zmq.XSUB)
1312 xsub.bind('ipc:///tmp/socketTmp/a')
1313 xpub = ctx.socket(zmq.XPUB)
1314 xpub.bind('ipc:///tmp/socketTmp/b')
1315
1316 print("Controller Ready: Processes and proxy created")
1317 zmq.proxy(xsub, xpub)
1280 1318
1281 raise NotImplementedError('Use schainpy.controller_api.ControllerThread instead Project class')
1319
1282 1320
1283 1321 def run(self):
1284 1322
1285 1323 log.success('Starting {}'.format(self.name), tag='')
1286 1324 self.start_time = time.time()
1287 self.createObjects()
1288 self.connectObjects()
1289
1290 keyList = list(self.procUnitConfObjDict.keys())
1291 keyList.sort()
1292
1293 err = None
1294
1295 while(True):
1296
1297 is_ok = False
1325 self.createObjects()
1326 self.setProxyCom()
1298 1327
1299 for procKey in keyList:
1300
1301 procUnitConfObj = self.procUnitConfObjDict[procKey]
1302
1303 try:
1304 sts = procUnitConfObj.run()
1305 is_ok = is_ok or sts
1306 except SchainWarning:
1307 err = self.__handleError(procUnitConfObj, modes=[2, 3], stdout=False)
1308 is_ok = False
1309 break
1310 except KeyboardInterrupt:
1311 is_ok = False
1312 break
1313 except ValueError as e:
1314 time.sleep(0.5)
1315 err = self.__handleError(procUnitConfObj)
1316 is_ok = False
1317 break
1318 except:
1319 time.sleep(0.5)
1320 err = self.__handleError(procUnitConfObj)
1321 is_ok = False
1322 break
1323
1324 # If every process unit finished so end process
1325 if not(is_ok):
1326 break
1327
1328 if not self.runController():
1329 break
1328 # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo
1330 1329
1331 1330 # Closing every process
1332 for procKey in keyList:
1333 procUnitConfObj = self.procUnitConfObjDict[procKey]
1334 procUnitConfObj.close()
1335
1336 if err is not None:
1337 err.start()
1338 # err.join()
1339
1340 1331 log.success('{} finished (time: {}s)'.format(
1341 1332 self.name,
1342 1333 time.time()-self.start_time)) No newline at end of file
@@ -3,7 +3,8 import numpy
3 3 import time, datetime
4 4 from schainpy.model.graphics import mpldriver
5 5
6 from schainpy.model.proc.jroproc_base import Operation
6 from schainpy.model.proc.jroproc_base import MPDecorator, Operation
7
7 8
8 9 def isTimeInHourRange(datatime, xmin, xmax):
9 10
@@ -62,9 +63,9 class Figure(Operation):
62 63
63 64 created = False
64 65 parameters = {}
65 def __init__(self, **kwargs):
66 def __init__(self):#, **kwargs):
66 67
67 Operation.__init__(self, **kwargs)
68 Operation.__init__(self)#, **kwargs)
68 69
69 70 def __del__(self):
70 71
@@ -9,8 +9,11 import numpy
9 9
10 10 from .figure import Figure, isRealtime, isTimeInHourRange
11 11 from .plotting_codes import *
12 from schainpy.model.proc.jroproc_base import MPDecorator
12 13
14 from schainpy.utils import log
13 15
16 @MPDecorator
14 17 class SpectraPlot(Figure):
15 18
16 19 isConfig = None
@@ -20,11 +23,10 class SpectraPlot(Figure):
20 23 HEIGHTPROF = None
21 24 PREFIX = 'spc'
22 25
23 def __init__(self, **kwargs):
24 Figure.__init__(self, **kwargs)
26 def __init__(self):#, **kwargs):
27 Figure.__init__(self)#, **kwargs)
25 28 self.isConfig = False
26 29 self.__nsubplots = 1
27
28 30 self.WIDTH = 250
29 31 self.HEIGHT = 250
30 32 self.WIDTHPROF = 120
@@ -104,6 +106,9 class SpectraPlot(Figure):
104 106 zmin : None,
105 107 zmax : None
106 108 """
109 if dataOut.flagNoData:
110 return dataOut
111
107 112 if realtime:
108 113 if not(isRealtime(utcdatatime = dataOut.utctime)):
109 114 print('Skipping this plot function')
@@ -219,6 +224,8 class SpectraPlot(Figure):
219 224 wr_period=wr_period,
220 225 thisDatetime=thisDatetime)
221 226
227 return dataOut
228 @MPDecorator
222 229 class CrossSpectraPlot(Figure):
223 230
224 231 isConfig = None
@@ -230,8 +237,8 class CrossSpectraPlot(Figure):
230 237 HEIGHTPROF = None
231 238 PREFIX = 'cspc'
232 239
233 def __init__(self, **kwargs):
234 Figure.__init__(self, **kwargs)
240 def __init__(self):#, **kwargs):
241 Figure.__init__(self)#, **kwargs)
235 242 self.isConfig = False
236 243 self.__nsubplots = 4
237 244 self.counter_imagwr = 0
@@ -301,6 +308,9 class CrossSpectraPlot(Figure):
301 308 zmax : None
302 309 """
303 310
311 if dataOut.flagNoData:
312 return dataOut
313
304 314 if pairsList == None:
305 315 pairsIndexList = dataOut.pairsIndexList
306 316 else:
@@ -440,7 +450,9 class CrossSpectraPlot(Figure):
440 450 wr_period=wr_period,
441 451 thisDatetime=thisDatetime)
442 452
453 return dataOut
443 454
455 @MPDecorator
444 456 class RTIPlot(Figure):
445 457
446 458 __isConfig = None
@@ -450,9 +462,9 class RTIPlot(Figure):
450 462 HEIGHTPROF = None
451 463 PREFIX = 'rti'
452 464
453 def __init__(self, **kwargs):
465 def __init__(self):#, **kwargs):
454 466
455 Figure.__init__(self, **kwargs)
467 Figure.__init__(self)#, **kwargs)
456 468 self.timerange = None
457 469 self.isConfig = False
458 470 self.__nsubplots = 1
@@ -540,6 +552,8 class RTIPlot(Figure):
540 552 zmin : None,
541 553 zmax : None
542 554 """
555 if dataOut.flagNoData:
556 return dataOut
543 557
544 558 #colormap = kwargs.get('colormap', 'jet')
545 559 if HEIGHT is not None:
@@ -650,7 +664,9 class RTIPlot(Figure):
650 664 wr_period=wr_period,
651 665 thisDatetime=thisDatetime,
652 666 update_figfile=update_figfile)
667 return dataOut
653 668
669 @MPDecorator
654 670 class CoherenceMap(Figure):
655 671 isConfig = None
656 672 __nsubplots = None
@@ -659,8 +675,8 class CoherenceMap(Figure):
659 675 HEIGHTPROF = None
660 676 PREFIX = 'cmap'
661 677
662 def __init__(self, **kwargs):
663 Figure.__init__(self, **kwargs)
678 def __init__(self):#, **kwargs):
679 Figure.__init__(self)#, **kwargs)
664 680 self.timerange = 2*60*60
665 681 self.isConfig = False
666 682 self.__nsubplots = 1
@@ -723,6 +739,10 class CoherenceMap(Figure):
723 739 server=None, folder=None, username=None, password=None,
724 740 ftp_wei=0, exp_code=0, sub_exp_code=0, plot_pos=0):
725 741
742
743 if dataOut.flagNoData:
744 return dataOut
745
726 746 if not isTimeInHourRange(dataOut.datatime, xmin, xmax):
727 747 return
728 748
@@ -855,6 +875,9 class CoherenceMap(Figure):
855 875 thisDatetime=thisDatetime,
856 876 update_figfile=update_figfile)
857 877
878 return dataOut
879
880 @MPDecorator
858 881 class PowerProfilePlot(Figure):
859 882
860 883 isConfig = None
@@ -864,8 +887,8 class PowerProfilePlot(Figure):
864 887 HEIGHTPROF = None
865 888 PREFIX = 'spcprofile'
866 889
867 def __init__(self, **kwargs):
868 Figure.__init__(self, **kwargs)
890 def __init__(self):#, **kwargs):
891 Figure.__init__(self)#, **kwargs)
869 892 self.isConfig = False
870 893 self.__nsubplots = 1
871 894
@@ -907,6 +930,9 class PowerProfilePlot(Figure):
907 930 ftp=False, wr_period=1, server=None,
908 931 folder=None, username=None, password=None):
909 932
933 if dataOut.flagNoData:
934 return dataOut
935
910 936
911 937 if channelList == None:
912 938 channelIndexList = dataOut.channelIndexList
@@ -978,7 +1004,10 class PowerProfilePlot(Figure):
978 1004 ftp=ftp,
979 1005 wr_period=wr_period,
980 1006 thisDatetime=thisDatetime)
1007
1008 return dataOut
981 1009
1010 @MPDecorator
982 1011 class SpectraCutPlot(Figure):
983 1012
984 1013 isConfig = None
@@ -988,8 +1017,8 class SpectraCutPlot(Figure):
988 1017 HEIGHTPROF = None
989 1018 PREFIX = 'spc_cut'
990 1019
991 def __init__(self, **kwargs):
992 Figure.__init__(self, **kwargs)
1020 def __init__(self):#, **kwargs):
1021 Figure.__init__(self)#, **kwargs)
993 1022 self.isConfig = False
994 1023 self.__nsubplots = 1
995 1024
@@ -1032,6 +1061,8 class SpectraCutPlot(Figure):
1032 1061 folder=None, username=None, password=None,
1033 1062 xaxis="frequency"):
1034 1063
1064 if dataOut.flagNoData:
1065 return dataOut
1035 1066
1036 1067 if channelList == None:
1037 1068 channelIndexList = dataOut.channelIndexList
@@ -1111,6 +1142,9 class SpectraCutPlot(Figure):
1111 1142 wr_period=wr_period,
1112 1143 thisDatetime=thisDatetime)
1113 1144
1145 return dataOut
1146
1147 @MPDecorator
1114 1148 class Noise(Figure):
1115 1149
1116 1150 isConfig = None
@@ -1119,8 +1153,8 class Noise(Figure):
1119 1153 PREFIX = 'noise'
1120 1154
1121 1155
1122 def __init__(self, **kwargs):
1123 Figure.__init__(self, **kwargs)
1156 def __init__(self):#, **kwargs):
1157 Figure.__init__(self)#, **kwargs)
1124 1158 self.timerange = 24*60*60
1125 1159 self.isConfig = False
1126 1160 self.__nsubplots = 1
@@ -1209,6 +1243,9 class Noise(Figure):
1209 1243 server=None, folder=None, username=None, password=None,
1210 1244 ftp_wei=0, exp_code=0, sub_exp_code=0, plot_pos=0):
1211 1245
1246 if dataOut.flagNoData:
1247 return dataOut
1248
1212 1249 if not isTimeInHourRange(dataOut.datatime, xmin, xmax):
1213 1250 return
1214 1251
@@ -1312,6 +1349,9 class Noise(Figure):
1312 1349 if save:
1313 1350 self.save_data(self.filename_noise, noisedB, thisDatetime)
1314 1351
1352 return dataOut
1353
1354 @MPDecorator
1315 1355 class BeaconPhase(Figure):
1316 1356
1317 1357 __isConfig = None
@@ -1319,8 +1359,8 class BeaconPhase(Figure):
1319 1359
1320 1360 PREFIX = 'beacon_phase'
1321 1361
1322 def __init__(self, **kwargs):
1323 Figure.__init__(self, **kwargs)
1362 def __init__(self):#, **kwargs):
1363 Figure.__init__(self)#, **kwargs)
1324 1364 self.timerange = 24*60*60
1325 1365 self.isConfig = False
1326 1366 self.__nsubplots = 1
@@ -1399,6 +1439,9 class BeaconPhase(Figure):
1399 1439 server=None, folder=None, username=None, password=None,
1400 1440 ftp_wei=0, exp_code=0, sub_exp_code=0, plot_pos=0):
1401 1441
1442 if dataOut.flagNoData:
1443 return dataOut
1444
1402 1445 if not isTimeInHourRange(dataOut.datatime, xmin, xmax):
1403 1446 return
1404 1447
@@ -1539,4 +1582,6 class BeaconPhase(Figure):
1539 1582 ftp=ftp,
1540 1583 wr_period=wr_period,
1541 1584 thisDatetime=thisDatetime,
1542 update_figfile=update_figfile) No newline at end of file
1585 update_figfile=update_figfile)
1586
1587 return dataOut #Yong No newline at end of file
@@ -6,15 +6,18 Created on Jul 9, 2014
6 6 import os
7 7 import datetime
8 8 import numpy
9
9 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator #YONG
10 from schainpy.utils import log
10 11 from .figure import Figure
11 12
13
14 @MPDecorator
12 15 class Scope(Figure):
13 16
14 17 isConfig = None
15 18
16 def __init__(self, **kwargs):
17 Figure.__init__(self, **kwargs)
19 def __init__(self):#, **kwargs): #YONG
20 Figure.__init__(self)#, **kwargs)
18 21 self.isConfig = False
19 22 self.WIDTH = 300
20 23 self.HEIGHT = 200
@@ -127,6 +130,8 class Scope(Figure):
127 130 ymin : None,
128 131 ymax : None,
129 132 """
133 if dataOut.flagNoData:
134 return dataOut
130 135
131 136 if channelList == None:
132 137 channelIndexList = dataOut.channelIndexList
@@ -222,4 +227,6 class Scope(Figure):
222 227 save=save,
223 228 ftp=ftp,
224 229 wr_period=wr_period,
225 thisDatetime=thisDatetime) No newline at end of file
230 thisDatetime=thisDatetime)
231
232 return dataOut No newline at end of file
@@ -20,7 +20,6 try:
20 20 except:
21 21 from time import sleep
22 22
23 import schainpy.admin
24 23 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
25 24 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
26 25 from schainpy.utils import log
@@ -885,6 +884,7 class JRODataReader(JRODataIO):
885 884 self.flagIsNewFile = 0
886 885 self.fp = None
887 886 self.flagNoMoreFiles = 1
887 # print '[Reading] No more files to read'
888 888
889 889 return fileOk_flag
890 890
@@ -897,8 +897,8 class JRODataReader(JRODataIO):
897 897 else:
898 898 newFile = self.__setNextFileOffline()
899 899
900 if not(newFile):
901 raise schainpy.admin.SchainWarning('No more files to read')
900 if not(newFile):
901 raise(schainpy.admin.SchainWarning('No more files to read'))
902 902 return 0
903 903
904 904 if self.verbose:
@@ -1052,7 +1052,7 class JRODataReader(JRODataIO):
1052 1052 # Skip block out of startTime and endTime
1053 1053 while True:
1054 1054 if not(self.__setNewBlock()):
1055 raise schainpy
1055 raise(schainpy.admin.SchainWarning('No more files'))
1056 1056 return 0
1057 1057
1058 1058 if not(self.readBlock()):
@@ -1320,11 +1320,11 class JRODataReader(JRODataIO):
1320 1320 if fullpath:
1321 1321 break
1322 1322
1323 print('[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (delay, path, nTries + 1))
1324 sleep(delay)
1323 print('[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (self.delay, path, nTries + 1))
1324 sleep(self.delay)
1325 1325
1326 if not(fullpath):
1327 raise schainpy.admin.SchainWarning('There isn\'t any valid file in {}'.format(path))
1326 if not(fullpath):
1327 raise(schainpy.admin.SchainWarning('There isn\'t any valid file in {}'.format(path)))
1328 1328 return
1329 1329
1330 1330 self.year = year
@@ -6,10 +6,11 Created on Jul 2, 2014
6 6 import numpy
7 7
8 8 from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
9 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
9 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
10 10 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
11 11 from schainpy.model.data.jrodata import Spectra
12 12
13 @MPDecorator
13 14 class SpectraReader(JRODataReader, ProcessingUnit):
14 15 """
15 16 Esta clase permite leer datos de espectros desde archivos procesados (.pdata). La lectura
@@ -69,7 +70,7 class SpectraReader(JRODataReader, ProcessingUnit):
69 70
70 71 rdPairList = []
71 72
72 def __init__(self, **kwargs):
73 def __init__(self):#, **kwargs):
73 74 """
74 75 Inicializador de la clase SpectraReader para la lectura de datos de espectros.
75 76
@@ -88,7 +89,7 class SpectraReader(JRODataReader, ProcessingUnit):
88 89 """
89 90
90 91 #Eliminar de la base la herencia
91 ProcessingUnit.__init__(self, **kwargs)
92 ProcessingUnit.__init__(self)#, **kwargs)
92 93
93 94 # self.isConfig = False
94 95
@@ -7,7 +7,7 Created on Jul 2, 2014
7 7 import numpy
8 8
9 9 from .jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
12 12 from schainpy.model.data.jrodata import Voltage
13 13 import zmq
@@ -15,7 +15,7 import tempfile
15 15 from io import StringIO
16 16 # from _sha import blocksize
17 17
18
18 @MPDecorator
19 19 class VoltageReader(JRODataReader, ProcessingUnit):
20 20 """
21 21 Esta clase permite leer datos de voltage desde archivos en formato rawdata (.r). La lectura
@@ -62,7 +62,7 class VoltageReader(JRODataReader, ProcessingUnit):
62 62 optchar = "D"
63 63 dataOut = None
64 64
65 def __init__(self, **kwargs):
65 def __init__(self):#, **kwargs):
66 66 """
67 67 Inicializador de la clase VoltageReader para la lectura de datos de voltage.
68 68
@@ -81,7 +81,7 class VoltageReader(JRODataReader, ProcessingUnit):
81 81 None
82 82 """
83 83
84 ProcessingUnit.__init__(self, **kwargs)
84 ProcessingUnit.__init__(self)#, **kwargs)
85 85
86 86 self.isConfig = False
87 87
@@ -761,4 +761,5 class VoltageWriter(JRODataWriter, Operation):
761 761
762 762 self.processingHeaderObj.processFlags = self.getProcessFlags()
763 763
764 self.setBasicHeader() No newline at end of file
764 self.setBasicHeader()
765 No newline at end of file
This diff has been collapsed as it changes many lines, (606 lines changed) Show them Hide them
@@ -1,41 +1,39
1 1 '''
2
3 $Author: murco $
4 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
2 Updated for multiprocessing
3 Author : Sergio Cortez
4 Jan 2018
5 Abstract:
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9
10 Based on:
11 $Author: murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
5 13 '''
14 from platform import python_version
6 15 import inspect
7 from fuzzywuzzy import process
8
9 def checkKwargs(method, kwargs):
10 currentKwargs = kwargs
11 choices = inspect.getargspec(method).args
12 try:
13 choices.remove('self')
14 except Exception as e:
15 pass
16 import zmq
17 import time
18 import pickle
19 import os
20 from multiprocessing import Process
16 21
17 try:
18 choices.remove('dataOut')
19 except Exception as e:
20 pass
22 from schainpy.utils import log
21 23
22 for kwarg in kwargs:
23 fuzz = process.extractOne(kwarg, choices)
24 if fuzz is None:
25 continue
26 if fuzz[1] < 100:
27 raise Exception('\x1b[0;32;40mDid you mean {} instead of {} in {}? \x1b[0m'.
28 format(fuzz[0], kwarg, method.__self__.__class__.__name__))
29 24
30 25 class ProcessingUnit(object):
31 26
32 27 """
33 Esta es la clase base para el procesamiento de datos.
28 Update - Jan 2018 - MULTIPROCESSING
29 All the "call" methods present in the previous base were removed.
30 The majority of operations are independant processes, thus
31 the decorator is in charge of communicate the operation processes
32 with the proccessing unit via IPC.
34 33
35 Contiene el metodo "call" para llamar operaciones. Las operaciones pueden ser:
36 - Metodos internos (callMethod)
37 - Objetos del tipo Operation (callObject). Antes de ser llamados, estos objetos
38 tienen que ser agreagados con el metodo "add".
34 The constructor does not receive any argument. The remaining methods
35 are related with the operations to execute.
36
39 37
40 38 """
41 39 # objeto de datos de entrada (Voltage, Spectra o Correlation)
@@ -43,33 +41,25 class ProcessingUnit(object):
43 41 dataInList = []
44 42
45 43 # objeto de datos de entrada (Voltage, Spectra o Correlation)
44
45 id = None
46 inputId = None
47
46 48 dataOut = None
47 49
50 dictProcs = None
51
48 52 operations2RunDict = None
49 53
50 54 isConfig = False
51 55
52
53 def __init__(self, *args, **kwargs):
56 def __init__(self):
54 57
55 58 self.dataIn = None
56 self.dataInList = []
57
58 59 self.dataOut = None
59 60
60 self.operations2RunDict = {}
61 self.operationKwargs = {}
62
63 61 self.isConfig = False
64 62
65 self.args = args
66 self.kwargs = kwargs
67
68 if not hasattr(self, 'name'):
69 self.name = self.__class__.__name__
70
71 checkKwargs(self.run, kwargs)
72
73 63 def getAllowedArgs(self):
74 64 if hasattr(self, '__attrs__'):
75 65 return self.__attrs__
@@ -81,27 +71,30 class ProcessingUnit(object):
81 71 '''
82 72
83 73 self.operationKwargs[objId] = kwargs
84
85
74
86 75 def addOperation(self, opObj, objId):
87 76
88 77 """
89 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
90 identificador asociado a este objeto.
78 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
79 posses the id of the operation process (IPC purposes)
91 80
92 Input:
81 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
82 identificador asociado a este objeto.
93 83
94 object : objeto de la clase "Operation"
84 Input:
95 85
96 Return:
86 object : objeto de la clase "Operation"
87
88 Return:
97 89
98 objId : identificador del objeto, necesario para ejecutar la operacion
90 objId : identificador del objeto, necesario para comunicar con master(procUnit)
99 91 """
100 92
101 93 self.operations2RunDict[objId] = opObj
102 94
103 95 return objId
104 96
97
105 98 def getOperationObj(self, objId):
106 99
107 100 if objId not in list(self.operations2RunDict.keys()):
@@ -120,241 +113,426 class ProcessingUnit(object):
120 113 **kwargs : Diccionario de argumentos de la funcion a ejecutar
121 114 """
122 115
123 raise NotImplementedError
124
125 def callMethod(self, name, opId):
126
127 """
128 Ejecuta el metodo con el nombre "name" y con argumentos **kwargs de la propia clase.
129
130 Input:
131 name : nombre del metodo a ejecutar
132
133 **kwargs : diccionario con los nombres y valores de la funcion a ejecutar.
134
135 """
116 raise NotImplementedError
136 117
137 #Checking the inputs
138 if name == 'run':
139
140 if not self.checkInputs():
141 self.dataOut.flagNoData = True
142 return False
143 else:
144 #Si no es un metodo RUN la entrada es la misma dataOut (interna)
145 if self.dataOut is not None and self.dataOut.isEmpty():
146 return False
118 def setup(self):
147 119
148 #Getting the pointer to method
149 methodToCall = getattr(self, name)
120 raise NotImplementedError
150 121
151 #Executing the self method
122 def run(self):
152 123
153 if hasattr(self, 'mp'):
154 if name=='run':
155 if self.mp is False:
156 self.mp = True
157 self.start()
158 else:
159 self.operationKwargs[opId]['parent'] = self.kwargs
160 methodToCall(**self.operationKwargs[opId])
161 else:
162 if name=='run':
163 methodToCall(**self.kwargs)
164 else:
165 methodToCall(**self.operationKwargs[opId])
124 raise NotImplementedError
166 125
167 if self.dataOut is None:
168 return False
126 def close(self):
127 #Close every thread, queue or any other object here is it is neccesary.
128 return
129
130 class Operation(object):
169 131
170 if self.dataOut.isEmpty():
171 return False
132 """
133 Update - Jan 2018 - MULTIPROCESSING
172 134
173 return True
135 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
136 The constructor doe snot receive any argument, neither the baseclass.
174 137
175 def callObject(self, objId):
176 138
177 """
178 Ejecuta la operacion asociada al identificador del objeto "objId"
139 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
140 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
141 acumulacion dentro de esta clase
179 142
180 Input:
143 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
181 144
182 objId : identificador del objeto a ejecutar
145 """
146 id = None
147 __buffer = None
148 dest = None
149 isConfig = False
150 readyFlag = None
183 151
184 **kwargs : diccionario con los nombres y valores de la funcion a ejecutar.
152 def __init__(self):
185 153
186 Return:
154 self.buffer = None
155 self.dest = None
156 self.isConfig = False
157 self.readyFlag = False
187 158
188 None
189 """
159 if not hasattr(self, 'name'):
160 self.name = self.__class__.__name__
161
162 def getAllowedArgs(self):
163 if hasattr(self, '__attrs__'):
164 return self.__attrs__
165 else:
166 return inspect.getargspec(self.run).args
190 167
191 if self.dataOut is not None and self.dataOut.isEmpty():
192 return False
168 def setup(self):
193 169
194 externalProcObj = self.operations2RunDict[objId]
170 self.isConfig = True
195 171
196 if hasattr(externalProcObj, 'mp'):
197 if externalProcObj.mp is False:
198 externalProcObj.kwargs['parent'] = self.kwargs
199 self.operationKwargs[objId] = externalProcObj.kwargs
200 externalProcObj.mp = True
201 externalProcObj.start()
202 else:
203 externalProcObj.run(self.dataOut, **externalProcObj.kwargs)
204 self.operationKwargs[objId] = externalProcObj.kwargs
172 raise NotImplementedError
205 173
206 174
207 return True
175 def run(self, dataIn, **kwargs):
208 176
209 def call(self, opType, opName=None, opId=None):
210 177 """
211 Return True si ejecuta la operacion interna nombrada "opName" o la operacion externa
212 identificada con el id "opId"; con los argumentos "**kwargs".
213
214 False si la operacion no se ha ejecutado.
178 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
179 atributos del objeto dataIn.
215 180
216 181 Input:
217 182
218 opType : Puede ser "self" o "external"
183 dataIn : objeto del tipo JROData
219 184
220 Depende del tipo de operacion para llamar a:callMethod or callObject:
185 Return:
221 186
222 1. If opType = "self": Llama a un metodo propio de esta clase:
187 None
223 188
224 name_method = getattr(self, name)
225 name_method(**kwargs)
189 Affected:
190 __buffer : buffer de recepcion de datos.
226 191
192 """
193 if not self.isConfig:
194 self.setup(**kwargs)
227 195
228 2. If opType = "other" o"external": Llama al metodo "run()" de una instancia de la
229 clase "Operation" o de un derivado de ella:
196 raise NotImplementedError
230 197
231 instanceName = self.operationList[opId]
232 instanceName.run(**kwargs)
198 def close(self):
233 199
234 opName : Si la operacion es interna (opType = 'self'), entonces el "opName" sera
235 usada para llamar a un metodo interno de la clase Processing
200 pass
236 201
237 opId : Si la operacion es externa (opType = 'other' o 'external), entonces el
238 "opId" sera usada para llamar al metodo "run" de la clase Operation
239 registrada anteriormente con ese Id
240 202
241 Exception:
242 Este objeto de tipo Operation debe de haber sido agregado antes con el metodo:
243 "addOperation" e identificado con el valor "opId" = el id de la operacion.
244 De lo contrario retornara un error del tipo ValueError
203 ######### Decorator #########
245 204
246 """
247 205
248 if opType == 'self':
206 def MPDecorator(BaseClass):
207
208 """
209 "Multiprocessing class decorator"
249 210
250 if not opName:
251 raise ValueError("opName parameter should be defined")
211 This function add multiprocessing features to the base class. Also,
212 it handle the communication beetween processes (readers, procUnits and operations).
213 Receive the arguments at the moment of instantiation. According to that, discriminates if it
214 is a procUnit or an operation
215 """
216
217 class MPClass(BaseClass, Process):
218
219 "This is the overwritten class"
220 operations2RunDict = None
221 socket_l = None
222 socket_p = None
223 socketOP = None
224 socket_router = None
225 dictProcs = None
226 typeProc = None
227 def __init__(self, *args, **kwargs):
228 super(MPClass, self).__init__()
229 Process.__init__(self)
230
231
232 self.operationKwargs = {}
233 self.args = args
234
235
236 self.operations2RunDict = {}
237 self.kwargs = kwargs
238
239 # The number of arguments (args) determine the type of process
240
241 if len(self.args) is 3:
242 self.typeProc = "ProcUnit"
243 self.id = args[0] #topico de publicacion
244 self.inputId = args[1] #topico de subcripcion
245 self.dictProcs = args[2] #diccionario de procesos globales
246 else:
247 self.id = args[0]
248 self.typeProc = "Operation"
249
250 def addOperationKwargs(self, objId, **kwargs):
251
252 self.operationKwargs[objId] = kwargs
252 253
253 sts = self.callMethod(opName, opId)
254 def getAllowedArgs(self):
254 255
255 elif opType == 'other' or opType == 'external' or opType == 'plotter':
256 if hasattr(self, '__attrs__'):
257 return self.__attrs__
258 else:
259 return inspect.getargspec(self.run).args
260
261
262 def sockListening(self, topic):
263
264 """
265 This function create a socket to receive objects.
266 The 'topic' argument is related to the publisher process from which the self process is
267 listening (data).
268 In the case were the self process is listening to a Reader (proc Unit),
269 special conditions are introduced to maximize parallelism.
270 """
271
272 cont = zmq.Context()
273 zmq_socket = cont.socket(zmq.SUB)
274 if not os.path.exists('/tmp/socketTmp'):
275 os.mkdir('/tmp/socketTmp')
276
277 if 'Reader' in self.dictProcs[self.inputId].name:
278 zmq_socket.connect('ipc:///tmp/socketTmp/b')
279
280 else:
281 zmq_socket.connect('ipc:///tmp/socketTmp/%s' % self.inputId)
282
283 #log.error('RECEIVING FROM {} {}'.format(self.inputId, str(topic).encode()))
284 zmq_socket.setsockopt(zmq.SUBSCRIBE, str(topic).encode()) #yong
256 285
257 if not opId:
258 raise ValueError("opId parameter should be defined")
286 return zmq_socket
259 287
260 if opId not in list(self.operations2RunDict.keys()):
261 raise ValueError("Any operation with id=%s has been added" %str(opId))
262 288
263 sts = self.callObject(opId)
289 def listenProc(self, sock):
264 290
265 else:
266 raise ValueError("opType should be 'self', 'external' or 'plotter'; and not '%s'" %opType)
291 """
292 This function listen to a ipc addres until a message is recovered. To serialize the
293 data (object), pickle has been use.
294 The 'sock' argument is the socket previously connect to an ipc address and with a topic subscription.
295 """
296
297 a = sock.recv_multipart()
298 a = pickle.loads(a[1])
299 return a
267 300
268 return sts
301 def sockPublishing(self):
269 302
270 def setInput(self, dataIn):
303 """
304 This function create a socket for publishing purposes.
305 Depending on the process type from where is created, it binds or connect
306 to special IPC addresses.
307 """
308 time.sleep(4) #yong
309 context = zmq.Context()
310 zmq_socket = context.socket(zmq.PUB)
311 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
312 if 'Reader' in self.dictProcs[self.id].name:
313 zmq_socket.connect('ipc:///tmp/socketTmp/a')
314 else:
315 zmq_socket.bind('ipc:///tmp/socketTmp/%s' % self.id)
271 316
272 self.dataIn = dataIn
273 self.dataInList.append(dataIn)
317 return zmq_socket
274 318
275 def getOutputObj(self):
319 def publishProc(self, sock, data):
276 320
277 return self.dataOut
321 """
322 This function publish a python object (data) under a specific topic in a socket (sock).
323 Usually, the topic is the self id of the process.
324 """
278 325
279 def checkInputs(self):
326 sock.send_multipart([str(self.id).encode(), pickle.dumps(data)]) #yong
327
328 return True
280 329
281 for thisDataIn in self.dataInList:
330 def sockOp(self):
282 331
283 if thisDataIn.isEmpty():
284 return False
332 """
333 This function create a socket for communication purposes with operation processes.
334 """
285 335
286 return True
336 cont = zmq.Context()
337 zmq_socket = cont.socket(zmq.DEALER)
338
339 if python_version()[0] == '2':
340 zmq_socket.setsockopt(zmq.IDENTITY, self.id)
341 if python_version()[0] == '3':
342 zmq_socket.setsockopt_string(zmq.IDENTITY, self.id)
287 343
288 def setup(self):
289 344
290 raise NotImplementedError
345 return zmq_socket
291 346
292 def run(self):
293 347
294 raise NotImplementedError
348 def execOp(self, socket, opId, dataObj):
295 349
296 def close(self):
297 #Close every thread, queue or any other object here is it is neccesary.
298 return
350 """
351 This function 'execute' an operation main routine by establishing a
352 connection with it and sending a python object (dataOut).
353 """
354 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
355 socket.connect('ipc:///tmp/socketTmp/%s' %opId)
356
357
358 socket.send(pickle.dumps(dataObj)) #yong
359
360 argument = socket.recv_multipart()[0]
361
362 argument = pickle.loads(argument)
363
364 return argument
365
366 def sockIO(self):
299 367
300 class Operation(object):
368 """
369 Socket defined for an operation process. It is able to recover the object sent from another process as well as a
370 identifier of who sent it.
371 """
301 372
302 """
303 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
304 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
305 acumulacion dentro de esta clase
373 cont = zmq.Context()
374 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
375 socket = cont.socket(zmq.ROUTER)
376 socket.bind('ipc:///tmp/socketTmp/%s' % self.id)
306 377
307 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
378 return socket
308 379
309 """
380 def funIOrec(self, socket):
310 381
311 __buffer = None
312 isConfig = False
382 """
383 Operation method, recover the id of the process who sent a python object.
384 The 'socket' argument is the socket binded to a specific process ipc.
385 """
313 386
314 def __init__(self, **kwargs):
387 #id_proc = socket.recv()
388
389 #dataObj = socket.recv_pyobj()
390
391 dataObj = socket.recv_multipart()
392
393 dataObj[1] = pickle.loads(dataObj[1])
394 return dataObj[0], dataObj[1]
395
396 def funIOsen(self, socket, data, dest):
397
398 """
399 Operation method, send a python object to a specific destination.
400 The 'dest' argument is the id of a proccesinf unit.
401 """
402
403 socket.send_multipart([dest, pickle.dumps(data)]) #yong
315 404
316 self.__buffer = None
317 self.isConfig = False
318 self.kwargs = kwargs
319 if not hasattr(self, 'name'):
320 self.name = self.__class__.__name__
321 checkKwargs(self.run, kwargs)
405 return True
322 406
323 def getAllowedArgs(self):
324 if hasattr(self, '__attrs__'):
325 return self.__attrs__
326 else:
327 return inspect.getargspec(self.run).args
328 407
329 def setup(self):
408 def runReader(self):
330 409
331 self.isConfig = True
410 # time.sleep(3)
411 while True:
412
413 BaseClass.run(self, **self.kwargs)
332 414
333 raise NotImplementedError
334 415
335 def run(self, dataIn, **kwargs):
416 keyList = list(self.operations2RunDict.keys())
417 keyList.sort()
418
419 for key in keyList:
420 self.socketOP = self.sockOp()
421 self.dataOut = self.execOp(self.socketOP, key, self.dataOut)
336 422
337 """
338 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
339 atributos del objeto dataIn.
423
424 if self.flagNoMoreFiles: #Usar un objeto con flags para saber si termino el proc o hubo un error
425 self.publishProc(self.socket_p, "Finish")
426 break
340 427
341 Input:
342
343 dataIn : objeto del tipo JROData
428 if self.dataOut.flagNoData:
429 continue
430
431 #print("Publishing data...")
432 self.publishProc(self.socket_p, self.dataOut)
433 # time.sleep(2)
434
435
436 print("%s done" %BaseClass.__name__)
437 return 0
438
439 def runProc(self):
344 440
345 Return:
441 # All the procUnits with kwargs that require a setup initialization must be defined here.
346 442
347 None
443 if self.setupReq:
444 BaseClass.setup(self, **self.kwargs)
348 445
349 Affected:
350 __buffer : buffer de recepcion de datos.
446 while True:
447 self.dataIn = self.listenProc(self.socket_l)
448 #print("%s received data" %BaseClass.__name__)
449
450 if self.dataIn == "Finish":
451 break
452
453 m_arg = list(self.kwargs.keys())
454 num_arg = list(range(1,int(BaseClass.run.__code__.co_argcount)))
455
456 run_arg = {}
457
458 for var in num_arg:
459 if BaseClass.run.__code__.co_varnames[var] in m_arg:
460 run_arg[BaseClass.run.__code__.co_varnames[var]] = self.kwargs[BaseClass.run.__code__.co_varnames[var]]
461
462 #BaseClass.run(self, **self.kwargs)
463 BaseClass.run(self, **run_arg)
464
465 ## Iterar sobre una serie de data que podrias aplicarse
466
467 for m_name in BaseClass.METHODS:
468
469 met_arg = {}
470
471 for arg in m_arg:
472 if arg in BaseClass.METHODS[m_name]:
473 for att in BaseClass.METHODS[m_name]:
474 met_arg[att] = self.kwargs[att]
475
476 method = getattr(BaseClass, m_name)
477 method(self, **met_arg)
478 break
479
480 if self.dataOut.flagNoData:
481 continue
482
483 keyList = list(self.operations2RunDict.keys())
484 keyList.sort()
485
486 for key in keyList:
487
488 self.socketOP = self.sockOp()
489 self.dataOut = self.execOp(self.socketOP, key, self.dataOut)
490
491
492 self.publishProc(self.socket_p, self.dataOut)
493
494
495 print("%s done" %BaseClass.__name__)
496
497 return 0
498
499 def runOp(self):
500
501 while True:
502
503 [self.dest ,self.buffer] = self.funIOrec(self.socket_router)
504
505 self.buffer = BaseClass.run(self, self.buffer, **self.kwargs)
506
507 self.funIOsen(self.socket_router, self.buffer, self.dest)
508
509 print("%s done" %BaseClass.__name__)
510 return 0
511
512
513 def run(self):
514
515 if self.typeProc is "ProcUnit":
516
517 self.socket_p = self.sockPublishing()
518
519 if 'Reader' not in self.dictProcs[self.id].name:
520 self.socket_l = self.sockListening(self.inputId)
521 self.runProc()
522
523 else:
524
525 self.runReader()
526
527 elif self.typeProc is "Operation":
528
529 self.socket_router = self.sockIO()
530
531 self.runOp()
351 532
352 """
353 if not self.isConfig:
354 self.setup(**kwargs)
355
356 raise NotImplementedError
357
358 def close(self):
533 else:
534 raise ValueError("Unknown type")
359 535
360 pass No newline at end of file
536 return 0
537
538 return MPClass No newline at end of file
@@ -2,16 +2,24 import itertools
2 2
3 3 import numpy
4 4
5 from .jroproc_base import ProcessingUnit, Operation
5 from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator, Operation
6 6 from schainpy.model.data.jrodata import Spectra
7 7 from schainpy.model.data.jrodata import hildebrand_sekhon
8 from schainpy.utils import log #yong
8 from schainpy.utils import log
9 9
10 @MPDecorator
10 11 class SpectraProc(ProcessingUnit):
11 12
12 def __init__(self, **kwargs):
13 METHODS = {'selectHeights' : ['minHei', 'maxHei'],
14 'selectChannels' : 'channelList',
15 'selectChannelsByIndex': 'channelIndexList',
16 'getBeaconSignal' : ['tauindex', 'channelindex', 'hei_ref'],
17 'selectHeightsByIndex' : ['minIndex', 'maxIndex']
18 }
13 19
14 ProcessingUnit.__init__(self, **kwargs)
20 def __init__(self):#, **kwargs):
21
22 ProcessingUnit.__init__(self)#, **kwargs)
15 23
16 24 self.buffer = None
17 25 self.firstdatatime = None
@@ -19,6 +27,7 class SpectraProc(ProcessingUnit):
19 27 self.dataOut = Spectra()
20 28 self.id_min = None
21 29 self.id_max = None
30 self.setupReq = False #Agregar a todas las unidades de proc
22 31
23 32 def __updateSpecFromVoltage(self):
24 33
@@ -134,7 +143,7 class SpectraProc(ProcessingUnit):
134 143 if self.dataOut.data_cspc is not None:
135 144 #desplaza a la derecha en el eje 2 determinadas posiciones
136 145 self.dataOut.data_cspc = numpy.roll(self.dataOut.data_cspc, shift, axis=1)
137
146
138 147 return True
139 148
140 149 if self.dataIn.type == "Voltage":
@@ -774,7 +783,7 class SpectraProc(ProcessingUnit):
774 783
775 784 return 1
776 785
777
786 @MPDecorator
778 787 class IncohInt(Operation):
779 788
780 789 __profIndex = 0
@@ -795,9 +804,11 class IncohInt(Operation):
795 804
796 805 n = None
797 806
798 def __init__(self, **kwargs):
807 def __init__(self):#, **kwargs):
808
809 Operation.__init__(self)#, **kwargs)
810
799 811
800 Operation.__init__(self, **kwargs)
801 812 # self.isConfig = False
802 813
803 814 def setup(self, n=None, timeInterval=None, overlapping=False):
@@ -930,7 +941,7 class IncohInt(Operation):
930 941 def run(self, dataOut, n=None, timeInterval=None, overlapping=False):
931 942 if n == 1:
932 943 return
933
944
934 945 dataOut.flagNoData = True
935 946
936 947 if not self.isConfig:
@@ -950,4 +961,6 class IncohInt(Operation):
950 961
951 962 dataOut.nIncohInt *= self.n
952 963 dataOut.utctime = avgdatatime
953 dataOut.flagNoData = False No newline at end of file
964 dataOut.flagNoData = False
965
966 return dataOut No newline at end of file
@@ -3,24 +3,28 import numpy
3 3 from scipy import interpolate
4 4 #TODO
5 5 #from schainpy import cSchain
6 from .jroproc_base import ProcessingUnit, Operation
6 from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator, Operation
7 7 from schainpy.model.data.jrodata import Voltage
8 from time import time
9 8 from schainpy.utils import log
9 from time import time
10 10
11 11
12 @MPDecorator
12 13 class VoltageProc(ProcessingUnit):
14
15 METHODS = {} #yong
13 16
17 def __init__(self):#, **kwargs): #yong
14 18
15 def __init__(self, **kwargs):
16
17 ProcessingUnit.__init__(self, **kwargs)
19 ProcessingUnit.__init__(self)#, **kwargs)
18 20
19 21 # self.objectDict = {}
20 22 self.dataOut = Voltage()
21 23 self.flip = 1
24 self.setupReq = False #yong
22 25
23 26 def run(self):
27
24 28 if self.dataIn.type == 'AMISR':
25 29 self.__updateObjFromAmisrInput()
26 30
@@ -317,7 +321,7 class VoltageProc(ProcessingUnit):
317 321 self.dataOut.data[:,:,botLim:topLim+1] = ynew
318 322
319 323 # import collections
320
324 @MPDecorator
321 325 class CohInt(Operation):
322 326
323 327 isConfig = False
@@ -333,9 +337,9 class CohInt(Operation):
333 337 __dataToPutStride = False
334 338 n = None
335 339
336 def __init__(self, **kwargs):
340 def __init__(self):#, **kwargs):
337 341
338 Operation.__init__(self, **kwargs)
342 Operation.__init__(self)#, **kwargs)
339 343
340 344 # self.isConfig = False
341 345
@@ -549,6 +553,7 class CohInt(Operation):
549 553 return avgdata, avgdatatime
550 554
551 555 def run(self, dataOut, n=None, timeInterval=None, stride=None, overlapping=False, byblock=False, **kwargs):
556
552 557 if not self.isConfig:
553 558 self.setup(n=n, stride=stride, timeInterval=timeInterval, overlapping=overlapping, byblock=byblock, **kwargs)
554 559 self.isConfig = True
@@ -577,7 +582,8 class CohInt(Operation):
577 582 # raise
578 583 # dataOut.timeInterval = dataOut.ippSeconds * dataOut.nCohInt
579 584 dataOut.flagNoData = False
580
585 return dataOut
586 @MPDecorator
581 587 class Decoder(Operation):
582 588
583 589 isConfig = False
@@ -588,15 +594,15 class Decoder(Operation):
588 594 nCode = None
589 595 nBaud = None
590 596
591 def __init__(self, **kwargs):
597 def __init__(self):#, **kwargs):
592 598
593 Operation.__init__(self, **kwargs)
599 Operation.__init__(self)#, **kwargs)
594 600
595 601 self.times = None
596 602 self.osamp = None
597 603 # self.__setValues = False
598 self.isConfig = False
599
604 # self.isConfig = False
605 self.setupReq = False
600 606 def setup(self, code, osamp, dataOut):
601 607
602 608 self.__profIndex = 0
@@ -763,22 +769,22 class Decoder(Operation):
763 769
764 770 if self.__profIndex == self.nCode-1:
765 771 self.__profIndex = 0
766 return 1
772 return dataOut
767 773
768 774 self.__profIndex += 1
769 775
770 return 1
776 return dataOut
771 777 # dataOut.flagDeflipData = True #asumo q la data no esta sin flip
772 778
773
779 @MPDecorator
774 780 class ProfileConcat(Operation):
775 781
776 782 isConfig = False
777 783 buffer = None
778 784
779 def __init__(self, **kwargs):
785 def __init__(self):#, **kwargs):
780 786
781 Operation.__init__(self, **kwargs)
787 Operation.__init__(self)#, **kwargs)
782 788 self.profileIndex = 0
783 789
784 790 def reset(self):
@@ -820,16 +826,17 class ProfileConcat(Operation):
820 826 xf = dataOut.heightList[0] + dataOut.nHeights * deltaHeight * m
821 827 dataOut.heightList = numpy.arange(dataOut.heightList[0], xf, deltaHeight)
822 828 dataOut.ippSeconds *= m
823
829 return dataOut
830 @MPDecorator
824 831 class ProfileSelector(Operation):
825 832
826 833 profileIndex = None
827 834 # Tamanho total de los perfiles
828 835 nProfiles = None
829 836
830 def __init__(self, **kwargs):
837 def __init__(self):#, **kwargs):
831 838
832 Operation.__init__(self, **kwargs)
839 Operation.__init__(self)#, **kwargs)
833 840 self.profileIndex = 0
834 841
835 842 def incProfileIndex(self):
@@ -979,13 +986,14 class ProfileSelector(Operation):
979 986
980 987 raise ValueError("ProfileSelector needs profileList, profileRangeList or rangeList parameter")
981 988
982 return False
983
989 #return False
990 return dataOut
991 @MPDecorator
984 992 class Reshaper(Operation):
985 993
986 def __init__(self, **kwargs):
994 def __init__(self):#, **kwargs):
987 995
988 Operation.__init__(self, **kwargs)
996 Operation.__init__(self)#, **kwargs)
989 997
990 998 self.__buffer = None
991 999 self.__nitems = 0
@@ -1084,11 +1092,13 class Reshaper(Operation):
1084 1092
1085 1093 dataOut.ippSeconds /= self.__nTxs
1086 1094
1095 return dataOut
1096 @MPDecorator
1087 1097 class SplitProfiles(Operation):
1088 1098
1089 def __init__(self, **kwargs):
1099 def __init__(self):#, **kwargs):
1090 1100
1091 Operation.__init__(self, **kwargs)
1101 Operation.__init__(self)#, **kwargs)
1092 1102
1093 1103 def run(self, dataOut, n):
1094 1104
@@ -1102,8 +1112,9 class SplitProfiles(Operation):
1102 1112
1103 1113 if shape[2] % n != 0:
1104 1114 raise ValueError("Could not split the data, n=%d has to be multiple of %d" %(n, shape[2]))
1105
1115
1106 1116 new_shape = shape[0], shape[1]*n, int(shape[2]/n)
1117
1107 1118 dataOut.data = numpy.reshape(dataOut.data, new_shape)
1108 1119 dataOut.flagNoData = False
1109 1120
@@ -1123,10 +1134,12 class SplitProfiles(Operation):
1123 1134
1124 1135 dataOut.ippSeconds /= n
1125 1136
1137 return dataOut
1138 @MPDecorator
1126 1139 class CombineProfiles(Operation):
1127 def __init__(self, **kwargs):
1140 def __init__(self):#, **kwargs):
1128 1141
1129 Operation.__init__(self, **kwargs)
1142 Operation.__init__(self)#, **kwargs)
1130 1143
1131 1144 self.__remData = None
1132 1145 self.__profileIndex = 0
@@ -1184,6 +1197,7 class CombineProfiles(Operation):
1184 1197
1185 1198 dataOut.ippSeconds *= n
1186 1199
1200 return dataOut
1187 1201 # import collections
1188 1202 # from scipy.stats import mode
1189 1203 #
@@ -1318,4 +1332,4 class CombineProfiles(Operation):
1318 1332 #
1319 1333 # self.__startIndex += self.__newNSamples
1320 1334 #
1321 # return No newline at end of file
1335 # return
General Comments 0
You need to be logged in to leave comments. Login now