@@ -68,7 +68,6 class Alarm(Process): | |||||
68 | @staticmethod |
|
68 | @staticmethod | |
69 | def send_email(**kwargs): |
|
69 | def send_email(**kwargs): | |
70 | notifier = SchainNotify() |
|
70 | notifier = SchainNotify() | |
71 | print(kwargs) |
|
|||
72 | notifier.notify(**kwargs) |
|
71 | notifier.notify(**kwargs) | |
73 |
|
72 | |||
74 | @staticmethod |
|
73 | @staticmethod | |
@@ -290,7 +289,7 class SchainNotify: | |||||
290 |
|
289 | |||
291 | msg = MIMEMultipart() |
|
290 | msg = MIMEMultipart() | |
292 | msg['Subject'] = subject |
|
291 | msg['Subject'] = subject | |
293 |
msg['From'] = " |
|
292 | msg['From'] = "SChain API (v{}) <{}>".format(schainpy.__version__, email_from) | |
294 | msg['Reply-to'] = email_from |
|
293 | msg['Reply-to'] = email_from | |
295 | msg['To'] = email_to |
|
294 | msg['To'] = email_to | |
296 |
|
295 |
@@ -11,7 +11,7 import traceback | |||||
11 | import math |
|
11 | import math | |
12 | import time |
|
12 | import time | |
13 | import zmq |
|
13 | import zmq | |
14 | from multiprocessing import Process, cpu_count |
|
14 | from multiprocessing import Process, Queue, cpu_count | |
15 | from threading import Thread |
|
15 | from threading import Thread | |
16 | from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring |
|
16 | from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring | |
17 | from xml.dom import minidom |
|
17 | from xml.dom import minidom | |
@@ -361,13 +361,14 class OperationConf(): | |||||
361 |
|
361 | |||
362 | return kwargs |
|
362 | return kwargs | |
363 |
|
363 | |||
364 | def setup(self, id, name, priority, type, project_id): |
|
364 | def setup(self, id, name, priority, type, project_id, err_queue): | |
365 |
|
365 | |||
366 | self.id = str(id) |
|
366 | self.id = str(id) | |
367 | self.project_id = project_id |
|
367 | self.project_id = project_id | |
368 | self.name = name |
|
368 | self.name = name | |
369 | self.type = type |
|
369 | self.type = type | |
370 | self.priority = priority |
|
370 | self.priority = priority | |
|
371 | self.err_queue = err_queue | |||
371 | self.parmConfObjList = [] |
|
372 | self.parmConfObjList = [] | |
372 |
|
373 | |||
373 | def removeParameters(self): |
|
374 | def removeParameters(self): | |
@@ -459,8 +460,9 class OperationConf(): | |||||
459 | opObj = className() |
|
460 | opObj = className() | |
460 | elif self.type == 'external': |
|
461 | elif self.type == 'external': | |
461 | kwargs = self.getKwargs() |
|
462 | kwargs = self.getKwargs() | |
462 | opObj = className(self.id, self.project_id, **kwargs) |
|
463 | opObj = className(self.id, self.id, self.project_id, self.err_queue, 'Operation', **kwargs) | |
463 | opObj.start() |
|
464 | opObj.start() | |
|
465 | self.opObj = opObj | |||
464 |
|
466 | |||
465 | return opObj |
|
467 | return opObj | |
466 |
|
468 | |||
@@ -548,7 +550,7 class ProcUnitConf(): | |||||
548 |
|
550 | |||
549 | return self.procUnitObj |
|
551 | return self.procUnitObj | |
550 |
|
552 | |||
551 | def setup(self, project_id, id, name, datatype, inputId): |
|
553 | def setup(self, project_id, id, name, datatype, inputId, err_queue): | |
552 | ''' |
|
554 | ''' | |
553 | id sera el topico a publicar |
|
555 | id sera el topico a publicar | |
554 | inputId sera el topico a subscribirse |
|
556 | inputId sera el topico a subscribirse | |
@@ -574,6 +576,7 class ProcUnitConf(): | |||||
574 | self.name = name |
|
576 | self.name = name | |
575 | self.datatype = datatype |
|
577 | self.datatype = datatype | |
576 |
self.inputId = inputId |
|
578 | self.inputId = inputId | |
|
579 | self.err_queue = err_queue | |||
577 | self.opConfObjList = [] |
|
580 | self.opConfObjList = [] | |
578 |
|
581 | |||
579 | self.addOperation(name='run', optype='self') |
|
582 | self.addOperation(name='run', optype='self') | |
@@ -607,7 +610,7 class ProcUnitConf(): | |||||
607 | id = self.__getNewId() |
|
610 | id = self.__getNewId() | |
608 | priority = self.__getPriority() # Sin mucho sentido, pero puede usarse |
|
611 | priority = self.__getPriority() # Sin mucho sentido, pero puede usarse | |
609 | opConfObj = OperationConf() |
|
612 | opConfObj = OperationConf() | |
610 | opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id) |
|
613 | opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue) | |
611 | self.opConfObjList.append(opConfObj) |
|
614 | self.opConfObjList.append(opConfObj) | |
612 |
|
615 | |||
613 | return opConfObj |
|
616 | return opConfObj | |
@@ -675,7 +678,7 class ProcUnitConf(): | |||||
675 |
|
678 | |||
676 | className = eval(self.name) |
|
679 | className = eval(self.name) | |
677 | kwargs = self.getKwargs() |
|
680 | kwargs = self.getKwargs() | |
678 |
procUnitObj = className(self.id, self.inputId, self.project_id, **kwargs) |
|
681 | procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, 'ProcUnit', **kwargs) | |
679 | log.success('creating process...', self.name) |
|
682 | log.success('creating process...', self.name) | |
680 |
|
683 | |||
681 | for opConfObj in self.opConfObjList: |
|
684 | for opConfObj in self.opConfObjList: | |
@@ -687,7 +690,7 class ProcUnitConf(): | |||||
687 | else: |
|
690 | else: | |
688 | opObj = opConfObj.createObject() |
|
691 | opObj = opConfObj.createObject() | |
689 |
|
692 | |||
690 |
log.success(' |
|
693 | log.success('adding operation: {}, type:{}'.format( | |
691 | opConfObj.name, |
|
694 | opConfObj.name, | |
692 | opConfObj.type), self.name) |
|
695 | opConfObj.type), self.name) | |
693 |
|
696 | |||
@@ -726,7 +729,7 class ReadUnitConf(ProcUnitConf): | |||||
726 |
|
729 | |||
727 | return self.ELEMENTNAME |
|
730 | return self.ELEMENTNAME | |
728 |
|
731 | |||
729 | def setup(self, project_id, id, name, datatype, path='', startDate='', endDate='', |
|
732 | def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='', | |
730 | startTime='', endTime='', server=None, **kwargs): |
|
733 | startTime='', endTime='', server=None, **kwargs): | |
731 |
|
734 | |||
732 |
|
735 | |||
@@ -765,6 +768,7 class ReadUnitConf(ProcUnitConf): | |||||
765 | self.startTime = startTime |
|
768 | self.startTime = startTime | |
766 | self.endTime = endTime |
|
769 | self.endTime = endTime | |
767 | self.server = server |
|
770 | self.server = server | |
|
771 | self.err_queue = err_queue | |||
768 | self.addRunOperation(**kwargs) |
|
772 | self.addRunOperation(**kwargs) | |
769 |
|
773 | |||
770 | def update(self, **kwargs): |
|
774 | def update(self, **kwargs): | |
@@ -878,6 +882,7 class Project(Process): | |||||
878 | self.email = None |
|
882 | self.email = None | |
879 | self.alarm = None |
|
883 | self.alarm = None | |
880 | self.procUnitConfObjDict = {} |
|
884 | self.procUnitConfObjDict = {} | |
|
885 | self.err_queue = Queue() | |||
881 |
|
886 | |||
882 | def __getNewId(self): |
|
887 | def __getNewId(self): | |
883 |
|
888 | |||
@@ -935,6 +940,8 class Project(Process): | |||||
935 | self.description = description |
|
940 | self.description = description | |
936 | self.email = email |
|
941 | self.email = email | |
937 | self.alarm = alarm |
|
942 | self.alarm = alarm | |
|
943 | if name: | |||
|
944 | self.name = '{} ({})'.format(Process.__name__, name) | |||
938 |
|
945 | |||
939 | def update(self, **kwargs): |
|
946 | def update(self, **kwargs): | |
940 |
|
947 | |||
@@ -963,7 +970,7 class Project(Process): | |||||
963 | idReadUnit = str(id) |
|
970 | idReadUnit = str(id) | |
964 |
|
971 | |||
965 | readUnitConfObj = ReadUnitConf() |
|
972 | readUnitConfObj = ReadUnitConf() | |
966 | readUnitConfObj.setup(self.id, idReadUnit, name, datatype, **kwargs) |
|
973 | readUnitConfObj.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs) | |
967 | self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj |
|
974 | self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj | |
968 |
|
975 | |||
969 | return readUnitConfObj |
|
976 | return readUnitConfObj | |
@@ -980,9 +987,9 class Project(Process): | |||||
980 |
|
987 | |||
981 | ''' |
|
988 | ''' | |
982 |
|
989 | |||
983 |
idProcUnit = self.__getNewId() |
|
990 | idProcUnit = self.__getNewId() | |
984 | procUnitConfObj = ProcUnitConf() |
|
991 | procUnitConfObj = ProcUnitConf() | |
985 |
procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId) |
|
992 | procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue) | |
986 | self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj |
|
993 | self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj | |
987 |
|
994 | |||
988 | return procUnitConfObj |
|
995 | return procUnitConfObj | |
@@ -1119,10 +1126,10 class Project(Process): | |||||
1119 |
|
1126 | |||
1120 | def __str__(self): |
|
1127 | def __str__(self): | |
1121 |
|
1128 | |||
1122 |
print('Project |
|
1129 | print('Project: name = %s, description = %s, id = %s' % ( | |
1123 | self.name, |
|
1130 | self.name, | |
1124 | self.description, |
|
1131 | self.description, | |
1125 |
self. |
|
1132 | self.id)) | |
1126 |
|
1133 | |||
1127 | for procUnitConfObj in self.procUnitConfObjDict.values(): |
|
1134 | for procUnitConfObj in self.procUnitConfObjDict.values(): | |
1128 | print(procUnitConfObj) |
|
1135 | print(procUnitConfObj) | |
@@ -1135,33 +1142,59 class Project(Process): | |||||
1135 | for key in keys: |
|
1142 | for key in keys: | |
1136 | self.procUnitConfObjDict[key].createObjects() |
|
1143 | self.procUnitConfObjDict[key].createObjects() | |
1137 |
|
1144 | |||
1138 | def __handleError(self, procUnitConfObj, modes=None, stdout=True): |
|
1145 | def monitor(self): | |
|
1146 | ||||
|
1147 | t = Thread(target=self.__monitor, args=(self.err_queue, self.ctx)) | |||
|
1148 | t.start() | |||
|
1149 | ||||
|
1150 | def __monitor(self, queue, ctx): | |||
1139 |
|
1151 | |||
1140 | import socket |
|
1152 | import socket | |
1141 |
|
1153 | |||
1142 | if modes is None: |
|
1154 | procs = 0 | |
1143 | modes = self.alarm |
|
1155 | err_msg = '' | |
|
1156 | ||||
|
1157 | while True: | |||
|
1158 | msg = queue.get() | |||
|
1159 | if '#_start_#' in msg: | |||
|
1160 | procs += 1 | |||
|
1161 | elif '#_end_#' in msg: | |||
|
1162 | procs -=1 | |||
|
1163 | else: | |||
|
1164 | err_msg = msg | |||
|
1165 | ||||
|
1166 | if procs == 0 or 'Traceback' in err_msg: | |||
|
1167 | break | |||
|
1168 | time.sleep(0.1) | |||
|
1169 | ||||
|
1170 | if '|' in err_msg: | |||
|
1171 | name, err = err_msg.split('|') | |||
|
1172 | if 'SchainWarning' in err: | |||
|
1173 | log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name) | |||
|
1174 | elif 'SchainError' in err: | |||
|
1175 | log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name) | |||
|
1176 | else: | |||
|
1177 | log.error(err, name) | |||
|
1178 | else: | |||
|
1179 | name, err = self.name, err_msg | |||
1144 |
|
1180 | |||
1145 | if not self.alarm: |
|
1181 | time.sleep(2) | |
1146 | modes = [] |
|
|||
1147 |
|
1182 | |||
1148 | err = traceback.format_exception(sys.exc_info()[0], |
|
1183 | for conf in self.procUnitConfObjDict.values(): | |
1149 | sys.exc_info()[1], |
|
1184 | for confop in conf.opConfObjList: | |
1150 | sys.exc_info()[2]) |
|
1185 | if confop.type == 'external': | |
|
1186 | confop.opObj.terminate() | |||
|
1187 | conf.procUnitObj.terminate() | |||
1151 |
|
1188 | |||
1152 | log.error('{}'.format(err[-1]), procUnitConfObj.name) |
|
1189 | ctx.term() | |
1153 |
|
1190 | |||
1154 | message = ''.join(err) |
|
1191 | message = ''.join(err) | |
1155 |
|
1192 | |||
1156 |
if |
|
1193 | if err_msg: | |
1157 | sys.stderr.write(message) |
|
|||
1158 |
|
||||
1159 | subject = 'SChain v%s: Error running %s\n' % ( |
|
1194 | subject = 'SChain v%s: Error running %s\n' % ( | |
1160 |
schainpy.__version__, |
|
1195 | schainpy.__version__, self.name) | |
1161 |
|
1196 | |||
1162 |
subtitle = ' |
|
1197 | subtitle = 'Hostname: %s\n' % socket.gethostbyname( | |
1163 | procUnitConfObj.getElementName(), procUnitConfObj.name) |
|
|||
1164 | subtitle += 'Hostname: %s\n' % socket.gethostbyname( |
|
|||
1165 | socket.gethostname()) |
|
1198 | socket.gethostname()) | |
1166 | subtitle += 'Working directory: %s\n' % os.path.abspath('./') |
|
1199 | subtitle += 'Working directory: %s\n' % os.path.abspath('./') | |
1167 | subtitle += 'Configuration file: %s\n' % self.filename |
|
1200 | subtitle += 'Configuration file: %s\n' % self.filename | |
@@ -1178,7 +1211,7 class Project(Process): | |||||
1178 | subtitle += '[End time = %s]\n' % readUnitConfObj.endTime |
|
1211 | subtitle += '[End time = %s]\n' % readUnitConfObj.endTime | |
1179 |
|
1212 | |||
1180 | a = Alarm( |
|
1213 | a = Alarm( | |
1181 |
modes=m |
|
1214 | modes=self.alarm, | |
1182 | email=self.email, |
|
1215 | email=self.email, | |
1183 | message=message, |
|
1216 | message=message, | |
1184 | subject=subject, |
|
1217 | subject=subject, | |
@@ -1186,7 +1219,7 class Project(Process): | |||||
1186 | filename=self.filename |
|
1219 | filename=self.filename | |
1187 | ) |
|
1220 | ) | |
1188 |
|
1221 | |||
1189 | return a |
|
1222 | a.start() | |
1190 |
|
1223 | |||
1191 | def isPaused(self): |
|
1224 | def isPaused(self): | |
1192 | return 0 |
|
1225 | return 0 | |
@@ -1223,7 +1256,7 class Project(Process): | |||||
1223 |
|
1256 | |||
1224 | self.filename = filename |
|
1257 | self.filename = filename | |
1225 |
|
1258 | |||
1226 |
def setProxy |
|
1259 | def setProxy(self): | |
1227 |
|
1260 | |||
1228 | if not os.path.exists('/tmp/schain'): |
|
1261 | if not os.path.exists('/tmp/schain'): | |
1229 | os.mkdir('/tmp/schain') |
|
1262 | os.mkdir('/tmp/schain') | |
@@ -1233,10 +1266,10 class Project(Process): | |||||
1233 | xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id)) |
|
1266 | xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id)) | |
1234 | xsub = self.ctx.socket(zmq.XSUB) |
|
1267 | xsub = self.ctx.socket(zmq.XSUB) | |
1235 | xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id)) |
|
1268 | xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id)) | |
1236 |
|
1269 | self.monitor() | ||
1237 | try: |
|
1270 | try: | |
1238 | zmq.proxy(xpub, xsub) |
|
1271 | zmq.proxy(xpub, xsub) | |
1239 |
except: |
|
1272 | except zmq.ContextTerminated: | |
1240 | xpub.close() |
|
1273 | xpub.close() | |
1241 | xsub.close() |
|
1274 | xsub.close() | |
1242 |
|
1275 | |||
@@ -1245,12 +1278,7 class Project(Process): | |||||
1245 | log.success('Starting {}: {}'.format(self.name, self.id), tag='') |
|
1278 | log.success('Starting {}: {}'.format(self.name, self.id), tag='') | |
1246 | self.start_time = time.time() |
|
1279 | self.start_time = time.time() | |
1247 | self.createObjects() |
|
1280 | self.createObjects() | |
1248 | # t = Thread(target=wait, args=(self.ctx, )) |
|
1281 | self.setProxy() | |
1249 | # t.start() |
|
1282 | log.success('{} Done (Time: {}s)'.format( | |
1250 | self.setProxyCom() |
|
|||
1251 |
|
||||
1252 | # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo |
|
|||
1253 |
|
||||
1254 | log.success('{} Done (time: {}s)'.format( |
|
|||
1255 | self.name, |
|
1283 | self.name, | |
1256 | time.time()-self.start_time)) |
|
1284 | time.time()-self.start_time), '') |
@@ -13,6 +13,7 import datetime | |||||
13 |
|
13 | |||
14 | import numpy |
|
14 | import numpy | |
15 |
|
15 | |||
|
16 | import schainpy.admin | |||
16 | from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator |
|
17 | from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator | |
17 | from schainpy.model.data.jrodata import Parameters |
|
18 | from schainpy.model.data.jrodata import Parameters | |
18 | from schainpy.model.io.jroIO_base import JRODataReader, isNumber |
|
19 | from schainpy.model.io.jroIO_base import JRODataReader, isNumber | |
@@ -394,12 +395,11 class BLTRParamReader(JRODataReader, ProcessingUnit): | |||||
394 | ''' |
|
395 | ''' | |
395 | if self.flagNoMoreFiles: |
|
396 | if self.flagNoMoreFiles: | |
396 | self.dataOut.flagNoData = True |
|
397 | self.dataOut.flagNoData = True | |
397 |
|
|
398 | raise schainpy.admin.SchainError('No More files to read') | |
398 | return |
|
|||
399 |
|
399 | |||
400 | if not self.readNextBlock(): |
|
400 | if not self.readNextBlock(): | |
401 | self.dataOut.flagNoData = True |
|
401 | self.dataOut.flagNoData = True | |
402 |
|
|
402 | raise schainpy.admin.SchainError('Time for wait new file reach!!!') | |
403 |
|
403 | |||
404 | self.set_output() |
|
404 | self.set_output() | |
405 |
|
405 |
@@ -15,13 +15,9 from scipy import asarray as ar, exp | |||||
15 | SPEED_OF_LIGHT = 299792458 |
|
15 | SPEED_OF_LIGHT = 299792458 | |
16 | SPEED_OF_LIGHT = 3e8 |
|
16 | SPEED_OF_LIGHT = 3e8 | |
17 |
|
17 | |||
18 | try: |
|
|||
19 | from gevent import sleep |
|
|||
20 | except: |
|
|||
21 | from time import sleep |
|
|||
22 |
|
||||
23 | from .utils import folder_in_range |
|
18 | from .utils import folder_in_range | |
24 |
|
19 | |||
|
20 | import schainpy.admin | |||
25 | from schainpy.model.data.jrodata import Spectra |
|
21 | from schainpy.model.data.jrodata import Spectra | |
26 | from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator |
|
22 | from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator | |
27 | from schainpy.utils import log |
|
23 | from schainpy.utils import log | |
@@ -341,7 +337,7 class BLTRSpectraReader (ProcessingUnit): | |||||
341 |
|
337 | |||
342 | if self.flagNoMoreFiles: |
|
338 | if self.flagNoMoreFiles: | |
343 | self.dataOut.flagNoData = True |
|
339 | self.dataOut.flagNoData = True | |
344 |
|
|
340 | raise schainpy.admin.SchainError('No more files') | |
345 |
|
341 | |||
346 | self.readBlock() |
|
342 | self.readBlock() | |
347 |
|
343 |
@@ -661,7 +661,6 class AMISRReader(ProcessingUnit): | |||||
661 |
|
661 | |||
662 | if self.flagNoMoreFiles: |
|
662 | if self.flagNoMoreFiles: | |
663 | self.dataOut.flagNoData = True |
|
663 | self.dataOut.flagNoData = True | |
664 | print('Process finished') |
|
|||
665 | return 0 |
|
664 | return 0 | |
666 |
|
665 | |||
667 | if self.__hasNotDataInBuffer(): |
|
666 | if self.__hasNotDataInBuffer(): |
@@ -15,11 +15,6 import datetime | |||||
15 | import traceback |
|
15 | import traceback | |
16 | import zmq |
|
16 | import zmq | |
17 |
|
17 | |||
18 | try: |
|
|||
19 | from gevent import sleep |
|
|||
20 | except: |
|
|||
21 | from time import sleep |
|
|||
22 |
|
||||
23 | from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader |
|
18 | from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader | |
24 | from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width |
|
19 | from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width | |
25 | from schainpy.utils import log |
|
20 | from schainpy.utils import log | |
@@ -771,7 +766,6 class JRODataReader(JRODataIO): | |||||
771 | idFile += 1 |
|
766 | idFile += 1 | |
772 | if not(idFile < len(self.filenameList)): |
|
767 | if not(idFile < len(self.filenameList)): | |
773 | self.flagNoMoreFiles = 1 |
|
768 | self.flagNoMoreFiles = 1 | |
774 | # print "[Reading] No more Files" |
|
|||
775 | return 0 |
|
769 | return 0 | |
776 |
|
770 | |||
777 | filename = self.filenameList[idFile] |
|
771 | filename = self.filenameList[idFile] | |
@@ -843,10 +837,14 class JRODataReader(JRODataIO): | |||||
843 |
|
837 | |||
844 | for nTries in range(tries): |
|
838 | for nTries in range(tries): | |
845 | if firstTime_flag: |
|
839 | if firstTime_flag: | |
846 | print("\t[Reading] Waiting %0.2f sec for the next file: \"%s\" , try %03d ..." % (self.delay, filename, nTries + 1)) |
|
840 | log.warning( | |
847 | sleep(self.delay) |
|
841 | "Waiting %0.2f sec for the next file: \"%s\" , try %03d ..." % (self.delay, filename, nTries + 1), | |
|
842 | self.name) | |||
|
843 | time.sleep(self.delay) | |||
848 | else: |
|
844 | else: | |
849 | print("\t[Reading] Searching the next \"%s%04d%03d%03d%s\" file ..." % (self.optchar, self.year, self.doy, self.set, self.ext)) |
|
845 | log.warning( | |
|
846 | "Searching the next \"%s%04d%03d%03d%s\" file ..." % (self.optchar, self.year, self.doy, self.set, self.ext), | |||
|
847 | self.name) | |||
850 |
|
848 | |||
851 | fullfilename, filename = checkForRealPath( |
|
849 | fullfilename, filename = checkForRealPath( | |
852 | self.path, self.foldercounter, self.year, self.doy, self.set, self.ext) |
|
850 | self.path, self.foldercounter, self.year, self.doy, self.set, self.ext) | |
@@ -860,7 +858,9 class JRODataReader(JRODataIO): | |||||
860 |
|
858 | |||
861 | firstTime_flag = False |
|
859 | firstTime_flag = False | |
862 |
|
860 | |||
863 | log.warning('Skipping the file {} due to this file doesn\'t exist'.format(filename)) |
|
861 | log.warning( | |
|
862 | 'Skipping the file {} due to this file doesn\'t exist'.format(filename), | |||
|
863 | self.name) | |||
864 | self.set += 1 |
|
864 | self.set += 1 | |
865 |
|
865 | |||
866 | # si no encuentro el file buscado cambio de carpeta y busco en la siguiente carpeta |
|
866 | # si no encuentro el file buscado cambio de carpeta y busco en la siguiente carpeta | |
@@ -877,14 +877,13 class JRODataReader(JRODataIO): | |||||
877 | self.fp.close() |
|
877 | self.fp.close() | |
878 | self.fp = open(fullfilename, 'rb') |
|
878 | self.fp = open(fullfilename, 'rb') | |
879 | self.flagNoMoreFiles = 0 |
|
879 | self.flagNoMoreFiles = 0 | |
880 | # print '[Reading] Setting the file: %s' % fullfilename |
|
|||
881 | else: |
|
880 | else: | |
|
881 | raise schainpy.admin.SchainError('Time for waiting new files reach') | |||
882 | self.fileSize = 0 |
|
882 | self.fileSize = 0 | |
883 | self.filename = None |
|
883 | self.filename = None | |
884 | self.flagIsNewFile = 0 |
|
884 | self.flagIsNewFile = 0 | |
885 | self.fp = None |
|
885 | self.fp = None | |
886 | self.flagNoMoreFiles = 1 |
|
886 | self.flagNoMoreFiles = 1 | |
887 | # print '[Reading] No more files to read' |
|
|||
888 |
|
887 | |||
889 | return fileOk_flag |
|
888 | return fileOk_flag | |
890 |
|
889 | |||
@@ -898,8 +897,8 class JRODataReader(JRODataIO): | |||||
898 | newFile = self.__setNextFileOffline() |
|
897 | newFile = self.__setNextFileOffline() | |
899 |
|
898 | |||
900 | if not(newFile): |
|
899 | if not(newFile): | |
901 |
|
|
900 | raise schainpy.admin.SchainWarning('No more files to read') | |
902 |
|
|
901 | ||
903 |
|
902 | |||
904 | if self.verbose: |
|
903 | if self.verbose: | |
905 | print('[Reading] Setting the file: %s' % self.filename) |
|
904 | print('[Reading] Setting the file: %s' % self.filename) | |
@@ -942,7 +941,7 class JRODataReader(JRODataIO): | |||||
942 | return 0 |
|
941 | return 0 | |
943 |
|
942 | |||
944 | print("[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1)) |
|
943 | print("[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1)) | |
945 | sleep(self.delay) |
|
944 | time.sleep(self.delay) | |
946 |
|
945 | |||
947 | return 0 |
|
946 | return 0 | |
948 |
|
947 | |||
@@ -969,7 +968,7 class JRODataReader(JRODataIO): | |||||
969 | "Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1), |
|
968 | "Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1), | |
970 | self.name |
|
969 | self.name | |
971 | ) |
|
970 | ) | |
972 | sleep(self.delay) |
|
971 | time.sleep(self.delay) | |
973 |
|
972 | |||
974 | return 0 |
|
973 | return 0 | |
975 |
|
974 | |||
@@ -1057,8 +1056,7 class JRODataReader(JRODataIO): | |||||
1057 | # Skip block out of startTime and endTime |
|
1056 | # Skip block out of startTime and endTime | |
1058 | while True: |
|
1057 | while True: | |
1059 | if not(self.__setNewBlock()): |
|
1058 | if not(self.__setNewBlock()): | |
1060 |
|
|
1059 | raise schainpy.admin.SchainWarning('No more files to read') | |
1061 | return 0 |
|
|||
1062 |
|
1060 | |||
1063 | if not(self.readBlock()): |
|
1061 | if not(self.readBlock()): | |
1064 | return 0 |
|
1062 | return 0 | |
@@ -1260,10 +1258,10 class JRODataReader(JRODataIO): | |||||
1260 | pattern_path = multi_path[0] |
|
1258 | pattern_path = multi_path[0] | |
1261 |
|
1259 | |||
1262 | if path_empty: |
|
1260 | if path_empty: | |
1263 |
|
|
1261 | raise schainpy.admin.SchainError("[Reading] No *%s files in %s for %s to %s" % (ext, pattern_path, startDate, endDate)) | |
1264 | else: |
|
1262 | else: | |
1265 | if not dateList: |
|
1263 | if not dateList: | |
1266 |
|
|
1264 | raise schainpy.admin.SchainError("[Reading] Date range selected invalid [%s - %s]: No *%s files in %s)" % (startDate, endDate, ext, path)) | |
1267 |
|
1265 | |||
1268 | if include_path: |
|
1266 | if include_path: | |
1269 | return dateList, pathList |
|
1267 | return dateList, pathList | |
@@ -1296,6 +1294,17 class JRODataReader(JRODataIO): | |||||
1296 | oneDDict=None, |
|
1294 | oneDDict=None, | |
1297 | twoDDict=None, |
|
1295 | twoDDict=None, | |
1298 | independentParam=None): |
|
1296 | independentParam=None): | |
|
1297 | ||||
|
1298 | self.online = online | |||
|
1299 | self.realtime = realtime | |||
|
1300 | self.delay = delay | |||
|
1301 | self.getByBlock = getblock | |||
|
1302 | self.nTxs = nTxs | |||
|
1303 | self.startTime = startTime | |||
|
1304 | self.endTime = endTime | |||
|
1305 | self.endDate = endDate | |||
|
1306 | self.startDate = startDate | |||
|
1307 | ||||
1299 | if server is not None: |
|
1308 | if server is not None: | |
1300 | if 'tcp://' in server: |
|
1309 | if 'tcp://' in server: | |
1301 | address = server |
|
1310 | address = server | |
@@ -1326,10 +1335,10 class JRODataReader(JRODataIO): | |||||
1326 | break |
|
1335 | break | |
1327 |
|
1336 | |||
1328 | print('[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (self.delay, path, nTries + 1)) |
|
1337 | print('[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (self.delay, path, nTries + 1)) | |
1329 | sleep(self.delay) |
|
1338 | time.sleep(self.delay) | |
1330 |
|
1339 | |||
1331 | if not(fullpath): |
|
1340 | if not(fullpath): | |
1332 |
|
|
1341 | raise schainpy.admin.SchainError('There isn\'t any valid file in {}'.format(path)) | |
1333 | return |
|
1342 | return | |
1334 |
|
1343 | |||
1335 | self.year = year |
|
1344 | self.year = year | |
@@ -1359,17 +1368,10 class JRODataReader(JRODataIO): | |||||
1359 | basename, ext = os.path.splitext(file_name) |
|
1368 | basename, ext = os.path.splitext(file_name) | |
1360 | last_set = int(basename[-3:]) |
|
1369 | last_set = int(basename[-3:]) | |
1361 |
|
1370 | |||
1362 | self.online = online |
|
1371 | ||
1363 | self.realtime = realtime |
|
|||
1364 | self.delay = delay |
|
|||
1365 | ext = ext.lower() |
|
1372 | ext = ext.lower() | |
1366 | self.ext = ext |
|
1373 | self.ext = ext | |
1367 | self.getByBlock = getblock |
|
1374 | ||
1368 | self.nTxs = nTxs |
|
|||
1369 | self.startTime = startTime |
|
|||
1370 | self.endTime = endTime |
|
|||
1371 | self.endDate = endDate |
|
|||
1372 | self.startDate = startDate |
|
|||
1373 | # Added----------------- |
|
1375 | # Added----------------- | |
1374 | self.selBlocksize = blocksize |
|
1376 | self.selBlocksize = blocksize | |
1375 | self.selBlocktime = blocktime |
|
1377 | self.selBlocktime = blocktime | |
@@ -1391,8 +1393,6 class JRODataReader(JRODataIO): | |||||
1391 | self.filenameList = [] |
|
1393 | self.filenameList = [] | |
1392 | return |
|
1394 | return | |
1393 |
|
1395 | |||
1394 | # self.getBasicHeader() |
|
|||
1395 |
|
||||
1396 | if last_set != None: |
|
1396 | if last_set != None: | |
1397 | self.dataOut.last_block = last_set * \ |
|
1397 | self.dataOut.last_block = last_set * \ | |
1398 | self.processingHeaderObj.dataBlocksPerFile + self.basicHeaderObj.dataBlock |
|
1398 | self.processingHeaderObj.dataBlocksPerFile + self.basicHeaderObj.dataBlock |
@@ -12,20 +12,16 Created on Jul 3, 2014 | |||||
12 | # METADATA |
|
12 | # METADATA | |
13 |
|
13 | |||
14 | import os |
|
14 | import os | |
|
15 | import time | |||
15 | import datetime |
|
16 | import datetime | |
16 | import numpy |
|
17 | import numpy | |
17 | import timeit |
|
18 | import timeit | |
18 | from fractions import Fraction |
|
19 | from fractions import Fraction | |
19 |
|
20 | |||
20 | try: |
|
21 | import schainpy.admin | |
21 | from gevent import sleep |
|
|||
22 | except: |
|
|||
23 | from time import sleep |
|
|||
24 |
|
||||
25 | from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader |
|
22 | from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader | |
26 | from schainpy.model.data.jrodata import Voltage |
|
23 | from schainpy.model.data.jrodata import Voltage | |
27 | from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator |
|
24 | from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator | |
28 | from time import time |
|
|||
29 |
|
25 | |||
30 | import pickle |
|
26 | import pickle | |
31 | try: |
|
27 | try: | |
@@ -461,9 +457,9 class DigitalRFReader(ProcessingUnit): | |||||
461 | return False |
|
457 | return False | |
462 |
|
458 | |||
463 | def timeit(self, toExecute): |
|
459 | def timeit(self, toExecute): | |
464 | t0 = time() |
|
460 | t0 = time.time() | |
465 | toExecute() |
|
461 | toExecute() | |
466 | self.executionTime = time() - t0 |
|
462 | self.executionTime = time.time() - t0 | |
467 | if self.oldAverage is None: |
|
463 | if self.oldAverage is None: | |
468 | self.oldAverage = self.executionTime |
|
464 | self.oldAverage = self.executionTime | |
469 | self.oldAverage = (self.executionTime + self.count * |
|
465 | self.oldAverage = (self.executionTime + self.count * | |
@@ -569,24 +565,24 class DigitalRFReader(ProcessingUnit): | |||||
569 | if self.__readNextBlock(): |
|
565 | if self.__readNextBlock(): | |
570 | break |
|
566 | break | |
571 | if self.__thisUnixSample > self.__endUTCSecond * self.__sample_rate: |
|
567 | if self.__thisUnixSample > self.__endUTCSecond * self.__sample_rate: | |
572 |
|
|
568 | raise schainpy.admin.SchainError('Error') | |
573 | return |
|
569 | return | |
574 |
|
570 | |||
575 | if self.__flagDiscontinuousBlock: |
|
571 | if self.__flagDiscontinuousBlock: | |
576 |
|
|
572 | raise schainpy.admin.SchainError('discontinuous block found') | |
577 | return |
|
573 | return | |
578 |
|
574 | |||
579 | if not self.__online: |
|
575 | if not self.__online: | |
580 |
|
|
576 | raise schainpy.admin.SchainError('Online?') | |
581 | return |
|
577 | return | |
582 |
|
578 | |||
583 | err_counter += 1 |
|
579 | err_counter += 1 | |
584 | if err_counter > nTries: |
|
580 | if err_counter > nTries: | |
585 |
|
|
581 | raise schainpy.admin.SchainError('Max retrys reach') | |
586 | return |
|
582 | return | |
587 |
|
583 | |||
588 | print('[Reading] waiting %d seconds to read a new block' % seconds) |
|
584 | print('[Reading] waiting %d seconds to read a new block' % seconds) | |
589 | sleep(seconds) |
|
585 | time.sleep(seconds) | |
590 |
|
586 | |||
591 | self.dataOut.data = self.__data_buffer[:, |
|
587 | self.dataOut.data = self.__data_buffer[:, | |
592 | self.__bufferIndex:self.__bufferIndex + self.__nSamples] |
|
588 | self.__bufferIndex:self.__bufferIndex + self.__nSamples] |
@@ -833,7 +833,6 class HFReader(ProcessingUnit): | |||||
833 | def getData(self): |
|
833 | def getData(self): | |
834 | if self.flagNoMoreFiles: |
|
834 | if self.flagNoMoreFiles: | |
835 | self.dataOut.flagNoData = True |
|
835 | self.dataOut.flagNoData = True | |
836 | print('Process finished') |
|
|||
837 | return 0 |
|
836 | return 0 | |
838 |
|
837 | |||
839 | if self.__hasNotDataInBuffer(): |
|
838 | if self.__hasNotDataInBuffer(): |
@@ -585,7 +585,6 class AMISRReader(ProcessingUnit): | |||||
585 |
|
585 | |||
586 | if self.flagNoMoreFiles: |
|
586 | if self.flagNoMoreFiles: | |
587 | self.dataOut.flagNoData = True |
|
587 | self.dataOut.flagNoData = True | |
588 | print('Process finished') |
|
|||
589 | return 0 |
|
588 | return 0 | |
590 |
|
589 | |||
591 | if self.__hasNotDataInBuffer(): |
|
590 | if self.__hasNotDataInBuffer(): |
@@ -13,6 +13,8 import datetime | |||||
13 |
|
13 | |||
14 | import numpy |
|
14 | import numpy | |
15 | import h5py |
|
15 | import h5py | |
|
16 | ||||
|
17 | import schainpy.admin | |||
16 | from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter |
|
18 | from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter | |
17 | from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator |
|
19 | from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator | |
18 | from schainpy.model.data.jrodata import Parameters |
|
20 | from schainpy.model.data.jrodata import Parameters | |
@@ -390,7 +392,7 class MADReader(JRODataReader, ProcessingUnit): | |||||
390 | ''' |
|
392 | ''' | |
391 | if self.flagNoMoreFiles: |
|
393 | if self.flagNoMoreFiles: | |
392 | self.dataOut.flagNoData = True |
|
394 | self.dataOut.flagNoData = True | |
393 |
|
|
395 | raise schainpy.admin.SchainError('No file left to process') | |
394 | return 0 |
|
396 | return 0 | |
395 |
|
397 | |||
396 | if not self.readNextBlock(): |
|
398 | if not self.readNextBlock(): |
@@ -5,6 +5,7 import h5py | |||||
5 | import re |
|
5 | import re | |
6 | import datetime |
|
6 | import datetime | |
7 |
|
7 | |||
|
8 | import schainpy.admin | |||
8 | from schainpy.model.data.jrodata import * |
|
9 | from schainpy.model.data.jrodata import * | |
9 | from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator |
|
10 | from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator | |
10 | from schainpy.model.io.jroIO_base import * |
|
11 | from schainpy.model.io.jroIO_base import * | |
@@ -233,7 +234,7 class ParamReader(JRODataReader,ProcessingUnit): | |||||
233 | idFile = self.fileIndex |
|
234 | idFile = self.fileIndex | |
234 |
|
235 | |||
235 | if not(idFile < len(self.filenameList)): |
|
236 | if not(idFile < len(self.filenameList)): | |
236 |
|
|
237 | raise schainpy.admin.SchainError("No more Files") | |
237 | return 0 |
|
238 | return 0 | |
238 |
|
239 | |||
239 | filename = self.filenameList[idFile] |
|
240 | filename = self.filenameList[idFile] | |
@@ -1165,8 +1166,7 class ParameterReader(JRODataReader,ProcessingUnit): | |||||
1165 | idFile = self.fileIndex |
|
1166 | idFile = self.fileIndex | |
1166 |
|
1167 | |||
1167 | if not(idFile < len(self.filenameList)): |
|
1168 | if not(idFile < len(self.filenameList)): | |
1168 | self.dataOut.error = 'No more files' |
|
1169 | raise schainpy.admin.SchainError('No more files') | |
1169 | return 0 |
|
|||
1170 |
|
1170 | |||
1171 | filename = self.filenameList[idFile] |
|
1171 | filename = self.filenameList[idFile] | |
1172 | self.fp = h5py.File(filename, 'r') |
|
1172 | self.fp = h5py.File(filename, 'r') |
@@ -92,7 +92,6 class SpectraReader(JRODataReader, ProcessingUnit): | |||||
92 | #Eliminar de la base la herencia |
|
92 | #Eliminar de la base la herencia | |
93 | ProcessingUnit.__init__(self)#, **kwargs) |
|
93 | ProcessingUnit.__init__(self)#, **kwargs) | |
94 |
|
94 | |||
95 | # self.isConfig = False |
|
|||
96 |
|
95 | |||
97 | self.pts2read_SelfSpectra = 0 |
|
96 | self.pts2read_SelfSpectra = 0 | |
98 |
|
97 | |||
@@ -160,7 +159,6 class SpectraReader(JRODataReader, ProcessingUnit): | |||||
160 |
|
159 | |||
161 | self.__isFirstTimeOnline = 1 |
|
160 | self.__isFirstTimeOnline = 1 | |
162 |
|
161 | |||
163 | # self.ippSeconds = 0 |
|
|||
164 |
|
162 | |||
165 | self.flagDiscontinuousBlock = 0 |
|
163 | self.flagDiscontinuousBlock = 0 | |
166 |
|
164 | |||
@@ -356,7 +354,6 class SpectraReader(JRODataReader, ProcessingUnit): | |||||
356 |
|
354 | |||
357 | if self.flagNoMoreFiles: |
|
355 | if self.flagNoMoreFiles: | |
358 | self.dataOut.flagNoData = True |
|
356 | self.dataOut.flagNoData = True | |
359 | print('Process finished') |
|
|||
360 | return 0 |
|
357 | return 0 | |
361 |
|
358 | |||
362 | self.flagDiscontinuousBlock = 0 |
|
359 | self.flagDiscontinuousBlock = 0 |
@@ -17,10 +17,10 import inspect | |||||
17 | import zmq |
|
17 | import zmq | |
18 | import time |
|
18 | import time | |
19 | import pickle |
|
19 | import pickle | |
|
20 | import traceback | |||
20 | from queue import Queue |
|
21 | from queue import Queue | |
21 | from threading import Thread |
|
22 | from threading import Thread | |
22 | from multiprocessing import Process |
|
23 | from multiprocessing import Process | |
23 | from zmq.utils.monitor import recv_monitor_message |
|
|||
24 |
|
24 | |||
25 | from schainpy.utils import log |
|
25 | from schainpy.utils import log | |
26 |
|
26 | |||
@@ -219,22 +219,18 def MPDecorator(BaseClass): | |||||
219 | self.receiver = None |
|
219 | self.receiver = None | |
220 | self.i = 0 |
|
220 | self.i = 0 | |
221 | self.name = BaseClass.__name__ |
|
221 | self.name = BaseClass.__name__ | |
|
222 | ||||
222 | if 'plot' in self.name.lower() and not self.name.endswith('_'): |
|
223 | if 'plot' in self.name.lower() and not self.name.endswith('_'): | |
223 | self.name = '{}{}'.format(self.CODE.upper(), 'Plot') |
|
224 | self.name = '{}{}'.format(self.CODE.upper(), 'Plot') | |
224 | self.start_time = time.time() |
|
|||
225 |
|
225 | |||
226 | if len(self.args) is 3: |
|
226 | self.start_time = time.time() | |
227 | self.typeProc = "ProcUnit" |
|
|||
228 |
|
|
227 | self.id = args[0] | |
229 |
|
|
228 | self.inputId = args[1] | |
230 |
|
|
229 | self.project_id = args[2] | |
231 | elif len(self.args) is 2: |
|
230 | self.err_queue = args[3] | |
232 |
|
|
231 | self.typeProc = args[4] | |
233 | self.inputId = args[0] |
|
|||
234 | self.project_id = args[1] |
|
|||
235 | self.typeProc = "Operation" |
|
|||
236 |
|
||||
237 | self.queue = InputQueue(self.project_id, self.inputId) |
|
232 | self.queue = InputQueue(self.project_id, self.inputId) | |
|
233 | self.err_queue.put('#_start_#') | |||
238 |
|
234 | |||
239 | def subscribe(self): |
|
235 | def subscribe(self): | |
240 | ''' |
|
236 | ''' | |
@@ -271,7 +267,7 def MPDecorator(BaseClass): | |||||
271 |
|
267 | |||
272 | if self.inputId is None: |
|
268 | if self.inputId is None: | |
273 | self.i += 1 |
|
269 | self.i += 1 | |
274 |
if self.i % |
|
270 | if self.i % 80 == 0: | |
275 | self.i = 0 |
|
271 | self.i = 0 | |
276 | time.sleep(0.01) |
|
272 | time.sleep(0.01) | |
277 |
|
273 | |||
@@ -283,7 +279,15 def MPDecorator(BaseClass): | |||||
283 | ''' |
|
279 | ''' | |
284 | while True: |
|
280 | while True: | |
285 |
|
281 | |||
|
282 | try: | |||
286 | BaseClass.run(self, **self.kwargs) |
|
283 | BaseClass.run(self, **self.kwargs) | |
|
284 | except: | |||
|
285 | err = traceback.format_exc() | |||
|
286 | if 'No more files' in err: | |||
|
287 | log.warning('No more files to read', self.name) | |||
|
288 | else: | |||
|
289 | self.err_queue.put('{}|{}'.format(self.name, err)) | |||
|
290 | self.dataOut.error = True | |||
287 |
|
291 | |||
288 | for op, optype, opId, kwargs in self.operations: |
|
292 | for op, optype, opId, kwargs in self.operations: | |
289 | if optype == 'self' and not self.dataOut.flagNoData: |
|
293 | if optype == 'self' and not self.dataOut.flagNoData: | |
@@ -299,11 +303,9 def MPDecorator(BaseClass): | |||||
299 | self.publish(self.dataOut, self.id) |
|
303 | self.publish(self.dataOut, self.id) | |
300 |
|
304 | |||
301 | if self.dataOut.error: |
|
305 | if self.dataOut.error: | |
302 | log.error(self.dataOut.error, self.name) |
|
|||
303 | # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()]) |
|
|||
304 | break |
|
306 | break | |
305 |
|
307 | |||
306 |
time.sleep( |
|
308 | time.sleep(0.5) | |
307 |
|
309 | |||
308 | def runProc(self): |
|
310 | def runProc(self): | |
309 | ''' |
|
311 | ''' | |
@@ -315,10 +317,13 def MPDecorator(BaseClass): | |||||
315 |
|
317 | |||
316 | if self.dataIn.flagNoData and self.dataIn.error is None: |
|
318 | if self.dataIn.flagNoData and self.dataIn.error is None: | |
317 | continue |
|
319 | continue | |
318 |
|
320 | elif not self.dataIn.error: | ||
|
321 | try: | |||
319 | BaseClass.run(self, **self.kwargs) |
|
322 | BaseClass.run(self, **self.kwargs) | |
320 |
|
323 | except: | ||
321 | if self.dataIn.error: |
|
324 | self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc())) | |
|
325 | self.dataOut.error = True | |||
|
326 | elif self.dataIn.error: | |||
322 | self.dataOut.error = self.dataIn.error |
|
327 | self.dataOut.error = self.dataIn.error | |
323 | self.dataOut.flagNoData = True |
|
328 | self.dataOut.flagNoData = True | |
324 |
|
329 | |||
@@ -330,20 +335,15 def MPDecorator(BaseClass): | |||||
330 | elif optype == 'external' and not self.dataOut.flagNoData: |
|
335 | elif optype == 'external' and not self.dataOut.flagNoData: | |
331 | self.publish(self.dataOut, opId) |
|
336 | self.publish(self.dataOut, opId) | |
332 |
|
337 | |||
333 | if not self.dataOut.flagNoData or self.dataOut.error: |
|
|||
334 |
|
|
338 | self.publish(self.dataOut, self.id) | |
335 |
|
|
339 | for op, optype, opId, kwargs in self.operations: | |
336 |
|
|
340 | if optype == 'external' and self.dataOut.error: | |
337 | op(**kwargs) |
|
|||
338 | elif optype == 'other' and self.dataOut.error: |
|
|||
339 | self.dataOut = op.run(self.dataOut, **kwargs) |
|
|||
340 | elif optype == 'external' and self.dataOut.error: |
|
|||
341 |
|
|
341 | self.publish(self.dataOut, opId) | |
342 |
|
342 | |||
343 |
if self.data |
|
343 | if self.dataOut.error: | |
344 | break |
|
344 | break | |
345 |
|
345 | |||
346 |
time.sleep( |
|
346 | time.sleep(0.5) | |
347 |
|
347 | |||
348 | def runOp(self): |
|
348 | def runOp(self): | |
349 | ''' |
|
349 | ''' | |
@@ -355,18 +355,15 def MPDecorator(BaseClass): | |||||
355 |
|
355 | |||
356 | dataOut = self.listen() |
|
356 | dataOut = self.listen() | |
357 |
|
357 | |||
|
358 | if not dataOut.error: | |||
358 | BaseClass.run(self, dataOut, **self.kwargs) |
|
359 | BaseClass.run(self, dataOut, **self.kwargs) | |
359 |
|
360 | else: | ||
360 | if dataOut.error: |
|
|||
361 | break |
|
361 | break | |
362 |
|
362 | |||
363 | time.sleep(1) |
|
|||
364 |
|
||||
365 | def run(self): |
|
363 | def run(self): | |
366 | if self.typeProc is "ProcUnit": |
|
364 | if self.typeProc is "ProcUnit": | |
367 |
|
365 | |||
368 | if self.inputId is not None: |
|
366 | if self.inputId is not None: | |
369 |
|
||||
370 | self.subscribe() |
|
367 | self.subscribe() | |
371 |
|
368 | |||
372 | self.set_publisher() |
|
369 | self.set_publisher() | |
@@ -386,32 +383,10 def MPDecorator(BaseClass): | |||||
386 |
|
383 | |||
387 | self.close() |
|
384 | self.close() | |
388 |
|
385 | |||
389 | def event_monitor(self, monitor): |
|
|||
390 |
|
||||
391 | events = {} |
|
|||
392 |
|
||||
393 | for name in dir(zmq): |
|
|||
394 | if name.startswith('EVENT_'): |
|
|||
395 | value = getattr(zmq, name) |
|
|||
396 | events[value] = name |
|
|||
397 |
|
||||
398 | while monitor.poll(): |
|
|||
399 | evt = recv_monitor_message(monitor) |
|
|||
400 | if evt['event'] == 32: |
|
|||
401 | self.connections += 1 |
|
|||
402 | if evt['event'] == 512: |
|
|||
403 | pass |
|
|||
404 |
|
||||
405 | evt.update({'description': events[evt['event']]}) |
|
|||
406 |
|
||||
407 | if evt['event'] == zmq.EVENT_MONITOR_STOPPED: |
|
|||
408 | break |
|
|||
409 | monitor.close() |
|
|||
410 | print('event monitor thread done!') |
|
|||
411 |
|
||||
412 | def close(self): |
|
386 | def close(self): | |
413 |
|
387 | |||
414 | BaseClass.close(self) |
|
388 | BaseClass.close(self) | |
|
389 | self.err_queue.put('#_end_#') | |||
415 |
|
390 | |||
416 | if self.sender: |
|
391 | if self.sender: | |
417 | self.sender.close() |
|
392 | self.sender.close() |
@@ -94,7 +94,6 class ParametersProc(ProcessingUnit): | |||||
94 | self.dataOut.heightList = self.dataIn.getHeiRange() |
|
94 | self.dataOut.heightList = self.dataIn.getHeiRange() | |
95 | self.dataOut.frequency = self.dataIn.frequency |
|
95 | self.dataOut.frequency = self.dataIn.frequency | |
96 | # self.dataOut.noise = self.dataIn.noise |
|
96 | # self.dataOut.noise = self.dataIn.noise | |
97 | self.dataOut.error = self.dataIn.error |
|
|||
98 |
|
97 | |||
99 | def run(self): |
|
98 | def run(self): | |
100 |
|
99 |
General Comments 0
You need to be logged in to leave comments.
Login now