@@ -68,7 +68,6 class Alarm(Process): | |||
|
68 | 68 | @staticmethod |
|
69 | 69 | def send_email(**kwargs): |
|
70 | 70 | notifier = SchainNotify() |
|
71 | print(kwargs) | |
|
72 | 71 | notifier.notify(**kwargs) |
|
73 | 72 | |
|
74 | 73 | @staticmethod |
@@ -290,7 +289,7 class SchainNotify: | |||
|
290 | 289 | |
|
291 | 290 | msg = MIMEMultipart() |
|
292 | 291 | msg['Subject'] = subject |
|
293 |
msg['From'] = " |
|
|
292 | msg['From'] = "SChain API (v{}) <{}>".format(schainpy.__version__, email_from) | |
|
294 | 293 | msg['Reply-to'] = email_from |
|
295 | 294 | msg['To'] = email_to |
|
296 | 295 |
@@ -11,7 +11,7 import traceback | |||
|
11 | 11 | import math |
|
12 | 12 | import time |
|
13 | 13 | import zmq |
|
14 | from multiprocessing import Process, cpu_count | |
|
14 | from multiprocessing import Process, Queue, cpu_count | |
|
15 | 15 | from threading import Thread |
|
16 | 16 | from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring |
|
17 | 17 | from xml.dom import minidom |
@@ -361,13 +361,14 class OperationConf(): | |||
|
361 | 361 | |
|
362 | 362 | return kwargs |
|
363 | 363 | |
|
364 | def setup(self, id, name, priority, type, project_id): | |
|
364 | def setup(self, id, name, priority, type, project_id, err_queue): | |
|
365 | 365 | |
|
366 | 366 | self.id = str(id) |
|
367 | 367 | self.project_id = project_id |
|
368 | 368 | self.name = name |
|
369 | 369 | self.type = type |
|
370 | 370 | self.priority = priority |
|
371 | self.err_queue = err_queue | |
|
371 | 372 | self.parmConfObjList = [] |
|
372 | 373 | |
|
373 | 374 | def removeParameters(self): |
@@ -459,8 +460,9 class OperationConf(): | |||
|
459 | 460 | opObj = className() |
|
460 | 461 | elif self.type == 'external': |
|
461 | 462 | kwargs = self.getKwargs() |
|
462 | opObj = className(self.id, self.project_id, **kwargs) | |
|
463 | opObj = className(self.id, self.id, self.project_id, self.err_queue, 'Operation', **kwargs) | |
|
463 | 464 | opObj.start() |
|
465 | self.opObj = opObj | |
|
464 | 466 | |
|
465 | 467 | return opObj |
|
466 | 468 | |
@@ -548,7 +550,7 class ProcUnitConf(): | |||
|
548 | 550 | |
|
549 | 551 | return self.procUnitObj |
|
550 | 552 | |
|
551 | def setup(self, project_id, id, name, datatype, inputId): | |
|
553 | def setup(self, project_id, id, name, datatype, inputId, err_queue): | |
|
552 | 554 | ''' |
|
553 | 555 | id sera el topico a publicar |
|
554 | 556 | inputId sera el topico a subscribirse |
@@ -574,6 +576,7 class ProcUnitConf(): | |||
|
574 | 576 | self.name = name |
|
575 | 577 | self.datatype = datatype |
|
576 | 578 |
self.inputId = inputId |
|
579 | self.err_queue = err_queue | |
|
577 | 580 | self.opConfObjList = [] |
|
578 | 581 | |
|
579 | 582 | self.addOperation(name='run', optype='self') |
@@ -607,7 +610,7 class ProcUnitConf(): | |||
|
607 | 610 | id = self.__getNewId() |
|
608 | 611 | priority = self.__getPriority() # Sin mucho sentido, pero puede usarse |
|
609 | 612 | opConfObj = OperationConf() |
|
610 | opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id) | |
|
613 | opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue) | |
|
611 | 614 | self.opConfObjList.append(opConfObj) |
|
612 | 615 | |
|
613 | 616 | return opConfObj |
@@ -675,7 +678,7 class ProcUnitConf(): | |||
|
675 | 678 | |
|
676 | 679 | className = eval(self.name) |
|
677 | 680 | kwargs = self.getKwargs() |
|
678 |
procUnitObj = className(self.id, self.inputId, self.project_id, **kwargs) |
|
|
681 | procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, 'ProcUnit', **kwargs) | |
|
679 | 682 | log.success('creating process...', self.name) |
|
680 | 683 | |
|
681 | 684 | for opConfObj in self.opConfObjList: |
@@ -687,7 +690,7 class ProcUnitConf(): | |||
|
687 | 690 | else: |
|
688 | 691 | opObj = opConfObj.createObject() |
|
689 | 692 | |
|
690 |
log.success(' |
|
|
693 | log.success('adding operation: {}, type:{}'.format( | |
|
691 | 694 | opConfObj.name, |
|
692 | 695 | opConfObj.type), self.name) |
|
693 | 696 | |
@@ -726,7 +729,7 class ReadUnitConf(ProcUnitConf): | |||
|
726 | 729 | |
|
727 | 730 | return self.ELEMENTNAME |
|
728 | 731 | |
|
729 | def setup(self, project_id, id, name, datatype, path='', startDate='', endDate='', | |
|
732 | def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='', | |
|
730 | 733 | startTime='', endTime='', server=None, **kwargs): |
|
731 | 734 | |
|
732 | 735 | |
@@ -765,6 +768,7 class ReadUnitConf(ProcUnitConf): | |||
|
765 | 768 | self.startTime = startTime |
|
766 | 769 | self.endTime = endTime |
|
767 | 770 | self.server = server |
|
771 | self.err_queue = err_queue | |
|
768 | 772 | self.addRunOperation(**kwargs) |
|
769 | 773 | |
|
770 | 774 | def update(self, **kwargs): |
@@ -878,6 +882,7 class Project(Process): | |||
|
878 | 882 | self.email = None |
|
879 | 883 | self.alarm = None |
|
880 | 884 | self.procUnitConfObjDict = {} |
|
885 | self.err_queue = Queue() | |
|
881 | 886 | |
|
882 | 887 | def __getNewId(self): |
|
883 | 888 | |
@@ -935,6 +940,8 class Project(Process): | |||
|
935 | 940 | self.description = description |
|
936 | 941 | self.email = email |
|
937 | 942 | self.alarm = alarm |
|
943 | if name: | |
|
944 | self.name = '{} ({})'.format(Process.__name__, name) | |
|
938 | 945 | |
|
939 | 946 | def update(self, **kwargs): |
|
940 | 947 | |
@@ -963,7 +970,7 class Project(Process): | |||
|
963 | 970 | idReadUnit = str(id) |
|
964 | 971 | |
|
965 | 972 | readUnitConfObj = ReadUnitConf() |
|
966 | readUnitConfObj.setup(self.id, idReadUnit, name, datatype, **kwargs) | |
|
973 | readUnitConfObj.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs) | |
|
967 | 974 | self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj |
|
968 | 975 | |
|
969 | 976 | return readUnitConfObj |
@@ -980,9 +987,9 class Project(Process): | |||
|
980 | 987 | |
|
981 | 988 | ''' |
|
982 | 989 | |
|
983 |
idProcUnit = self.__getNewId() |
|
|
990 | idProcUnit = self.__getNewId() | |
|
984 | 991 | procUnitConfObj = ProcUnitConf() |
|
985 |
procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId) |
|
|
992 | procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue) | |
|
986 | 993 | self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj |
|
987 | 994 | |
|
988 | 995 | return procUnitConfObj |
@@ -1119,10 +1126,10 class Project(Process): | |||
|
1119 | 1126 | |
|
1120 | 1127 | def __str__(self): |
|
1121 | 1128 | |
|
1122 |
print('Project |
|
|
1129 | print('Project: name = %s, description = %s, id = %s' % ( | |
|
1123 | 1130 | self.name, |
|
1124 | 1131 | self.description, |
|
1125 |
self. |
|
|
1132 | self.id)) | |
|
1126 | 1133 | |
|
1127 | 1134 | for procUnitConfObj in self.procUnitConfObjDict.values(): |
|
1128 | 1135 | print(procUnitConfObj) |
@@ -1135,33 +1142,59 class Project(Process): | |||
|
1135 | 1142 | for key in keys: |
|
1136 | 1143 | self.procUnitConfObjDict[key].createObjects() |
|
1137 | 1144 | |
|
1138 | def __handleError(self, procUnitConfObj, modes=None, stdout=True): | |
|
1145 | def monitor(self): | |
|
1146 | ||
|
1147 | t = Thread(target=self.__monitor, args=(self.err_queue, self.ctx)) | |
|
1148 | t.start() | |
|
1149 | ||
|
1150 | def __monitor(self, queue, ctx): | |
|
1139 | 1151 | |
|
1140 | 1152 | import socket |
|
1141 | 1153 | |
|
1142 | if modes is None: | |
|
1143 | modes = self.alarm | |
|
1154 | procs = 0 | |
|
1155 | err_msg = '' | |
|
1156 | ||
|
1157 | while True: | |
|
1158 | msg = queue.get() | |
|
1159 | if '#_start_#' in msg: | |
|
1160 | procs += 1 | |
|
1161 | elif '#_end_#' in msg: | |
|
1162 | procs -=1 | |
|
1163 | else: | |
|
1164 | err_msg = msg | |
|
1165 | ||
|
1166 | if procs == 0 or 'Traceback' in err_msg: | |
|
1167 | break | |
|
1168 | time.sleep(0.1) | |
|
1169 | ||
|
1170 | if '|' in err_msg: | |
|
1171 | name, err = err_msg.split('|') | |
|
1172 | if 'SchainWarning' in err: | |
|
1173 | log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name) | |
|
1174 | elif 'SchainError' in err: | |
|
1175 | log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name) | |
|
1176 | else: | |
|
1177 | log.error(err, name) | |
|
1178 | else: | |
|
1179 | name, err = self.name, err_msg | |
|
1144 | 1180 | |
|
1145 | if not self.alarm: | |
|
1146 | modes = [] | |
|
1181 | time.sleep(2) | |
|
1147 | 1182 | |
|
1148 | err = traceback.format_exception(sys.exc_info()[0], | |
|
1149 | sys.exc_info()[1], | |
|
1150 | sys.exc_info()[2]) | |
|
1183 | for conf in self.procUnitConfObjDict.values(): | |
|
1184 | for confop in conf.opConfObjList: | |
|
1185 | if confop.type == 'external': | |
|
1186 | confop.opObj.terminate() | |
|
1187 | conf.procUnitObj.terminate() | |
|
1151 | 1188 | |
|
1152 | log.error('{}'.format(err[-1]), procUnitConfObj.name) | |
|
1189 | ctx.term() | |
|
1153 | 1190 | |
|
1154 | 1191 | message = ''.join(err) |
|
1155 | 1192 | |
|
1156 |
if |
|
|
1157 | sys.stderr.write(message) | |
|
1158 | ||
|
1193 | if err_msg: | |
|
1159 | 1194 | subject = 'SChain v%s: Error running %s\n' % ( |
|
1160 |
schainpy.__version__, |
|
|
1195 | schainpy.__version__, self.name) | |
|
1161 | 1196 | |
|
1162 |
subtitle = ' |
|
|
1163 | procUnitConfObj.getElementName(), procUnitConfObj.name) | |
|
1164 | subtitle += 'Hostname: %s\n' % socket.gethostbyname( | |
|
1197 | subtitle = 'Hostname: %s\n' % socket.gethostbyname( | |
|
1165 | 1198 | socket.gethostname()) |
|
1166 | 1199 | subtitle += 'Working directory: %s\n' % os.path.abspath('./') |
|
1167 | 1200 | subtitle += 'Configuration file: %s\n' % self.filename |
@@ -1178,7 +1211,7 class Project(Process): | |||
|
1178 | 1211 | subtitle += '[End time = %s]\n' % readUnitConfObj.endTime |
|
1179 | 1212 | |
|
1180 | 1213 | a = Alarm( |
|
1181 |
modes=m |
|
|
1214 | modes=self.alarm, | |
|
1182 | 1215 | email=self.email, |
|
1183 | 1216 | message=message, |
|
1184 | 1217 | subject=subject, |
@@ -1186,7 +1219,7 class Project(Process): | |||
|
1186 | 1219 | filename=self.filename |
|
1187 | 1220 | ) |
|
1188 | 1221 | |
|
1189 | return a | |
|
1222 | a.start() | |
|
1190 | 1223 | |
|
1191 | 1224 | def isPaused(self): |
|
1192 | 1225 | return 0 |
@@ -1223,7 +1256,7 class Project(Process): | |||
|
1223 | 1256 | |
|
1224 | 1257 | self.filename = filename |
|
1225 | 1258 | |
|
1226 |
def setProxy |
|
|
1259 | def setProxy(self): | |
|
1227 | 1260 | |
|
1228 | 1261 | if not os.path.exists('/tmp/schain'): |
|
1229 | 1262 | os.mkdir('/tmp/schain') |
@@ -1233,10 +1266,10 class Project(Process): | |||
|
1233 | 1266 | xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id)) |
|
1234 | 1267 | xsub = self.ctx.socket(zmq.XSUB) |
|
1235 | 1268 | xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id)) |
|
1236 | ||
|
1269 | self.monitor() | |
|
1237 | 1270 | try: |
|
1238 | 1271 | zmq.proxy(xpub, xsub) |
|
1239 |
except: |
|
|
1272 | except zmq.ContextTerminated: | |
|
1240 | 1273 | xpub.close() |
|
1241 | 1274 | xsub.close() |
|
1242 | 1275 | |
@@ -1245,12 +1278,7 class Project(Process): | |||
|
1245 | 1278 | log.success('Starting {}: {}'.format(self.name, self.id), tag='') |
|
1246 | 1279 | self.start_time = time.time() |
|
1247 | 1280 | self.createObjects() |
|
1248 | # t = Thread(target=wait, args=(self.ctx, )) | |
|
1249 | # t.start() | |
|
1250 | self.setProxyCom() | |
|
1251 | ||
|
1252 | # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo | |
|
1253 | ||
|
1254 | log.success('{} Done (time: {}s)'.format( | |
|
1281 | self.setProxy() | |
|
1282 | log.success('{} Done (Time: {}s)'.format( | |
|
1255 | 1283 | self.name, |
|
1256 | time.time()-self.start_time)) | |
|
1284 | time.time()-self.start_time), '') |
@@ -13,6 +13,7 import datetime | |||
|
13 | 13 | |
|
14 | 14 | import numpy |
|
15 | 15 | |
|
16 | import schainpy.admin | |
|
16 | 17 | from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator |
|
17 | 18 | from schainpy.model.data.jrodata import Parameters |
|
18 | 19 | from schainpy.model.io.jroIO_base import JRODataReader, isNumber |
@@ -394,12 +395,11 class BLTRParamReader(JRODataReader, ProcessingUnit): | |||
|
394 | 395 | ''' |
|
395 | 396 | if self.flagNoMoreFiles: |
|
396 | 397 | self.dataOut.flagNoData = True |
|
397 |
|
|
|
398 | return | |
|
398 | raise schainpy.admin.SchainError('No More files to read') | |
|
399 | 399 | |
|
400 | 400 | if not self.readNextBlock(): |
|
401 | 401 | self.dataOut.flagNoData = True |
|
402 |
|
|
|
402 | raise schainpy.admin.SchainError('Time for wait new file reach!!!') | |
|
403 | 403 | |
|
404 | 404 | self.set_output() |
|
405 | 405 |
@@ -15,13 +15,9 from scipy import asarray as ar, exp | |||
|
15 | 15 | SPEED_OF_LIGHT = 299792458 |
|
16 | 16 | SPEED_OF_LIGHT = 3e8 |
|
17 | 17 | |
|
18 | try: | |
|
19 | from gevent import sleep | |
|
20 | except: | |
|
21 | from time import sleep | |
|
22 | ||
|
23 | 18 | from .utils import folder_in_range |
|
24 | 19 | |
|
20 | import schainpy.admin | |
|
25 | 21 | from schainpy.model.data.jrodata import Spectra |
|
26 | 22 | from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator |
|
27 | 23 | from schainpy.utils import log |
@@ -341,7 +337,7 class BLTRSpectraReader (ProcessingUnit): | |||
|
341 | 337 | |
|
342 | 338 | if self.flagNoMoreFiles: |
|
343 | 339 | self.dataOut.flagNoData = True |
|
344 |
|
|
|
340 | raise schainpy.admin.SchainError('No more files') | |
|
345 | 341 | |
|
346 | 342 | self.readBlock() |
|
347 | 343 |
@@ -661,7 +661,6 class AMISRReader(ProcessingUnit): | |||
|
661 | 661 | |
|
662 | 662 | if self.flagNoMoreFiles: |
|
663 | 663 | self.dataOut.flagNoData = True |
|
664 | print('Process finished') | |
|
665 | 664 | return 0 |
|
666 | 665 | |
|
667 | 666 | if self.__hasNotDataInBuffer(): |
@@ -15,11 +15,6 import datetime | |||
|
15 | 15 | import traceback |
|
16 | 16 | import zmq |
|
17 | 17 | |
|
18 | try: | |
|
19 | from gevent import sleep | |
|
20 | except: | |
|
21 | from time import sleep | |
|
22 | ||
|
23 | 18 | from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader |
|
24 | 19 | from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width |
|
25 | 20 | from schainpy.utils import log |
@@ -771,7 +766,6 class JRODataReader(JRODataIO): | |||
|
771 | 766 | idFile += 1 |
|
772 | 767 | if not(idFile < len(self.filenameList)): |
|
773 | 768 | self.flagNoMoreFiles = 1 |
|
774 | # print "[Reading] No more Files" | |
|
775 | 769 | return 0 |
|
776 | 770 | |
|
777 | 771 | filename = self.filenameList[idFile] |
@@ -843,10 +837,14 class JRODataReader(JRODataIO): | |||
|
843 | 837 | |
|
844 | 838 | for nTries in range(tries): |
|
845 | 839 | if firstTime_flag: |
|
846 | print("\t[Reading] Waiting %0.2f sec for the next file: \"%s\" , try %03d ..." % (self.delay, filename, nTries + 1)) | |
|
847 | sleep(self.delay) | |
|
840 | log.warning( | |
|
841 | "Waiting %0.2f sec for the next file: \"%s\" , try %03d ..." % (self.delay, filename, nTries + 1), | |
|
842 | self.name) | |
|
843 | time.sleep(self.delay) | |
|
848 | 844 | else: |
|
849 | print("\t[Reading] Searching the next \"%s%04d%03d%03d%s\" file ..." % (self.optchar, self.year, self.doy, self.set, self.ext)) | |
|
845 | log.warning( | |
|
846 | "Searching the next \"%s%04d%03d%03d%s\" file ..." % (self.optchar, self.year, self.doy, self.set, self.ext), | |
|
847 | self.name) | |
|
850 | 848 | |
|
851 | 849 | fullfilename, filename = checkForRealPath( |
|
852 | 850 | self.path, self.foldercounter, self.year, self.doy, self.set, self.ext) |
@@ -860,7 +858,9 class JRODataReader(JRODataIO): | |||
|
860 | 858 | |
|
861 | 859 | firstTime_flag = False |
|
862 | 860 | |
|
863 | log.warning('Skipping the file {} due to this file doesn\'t exist'.format(filename)) | |
|
861 | log.warning( | |
|
862 | 'Skipping the file {} due to this file doesn\'t exist'.format(filename), | |
|
863 | self.name) | |
|
864 | 864 | self.set += 1 |
|
865 | 865 | |
|
866 | 866 | # si no encuentro el file buscado cambio de carpeta y busco en la siguiente carpeta |
@@ -877,14 +877,13 class JRODataReader(JRODataIO): | |||
|
877 | 877 | self.fp.close() |
|
878 | 878 | self.fp = open(fullfilename, 'rb') |
|
879 | 879 | self.flagNoMoreFiles = 0 |
|
880 | # print '[Reading] Setting the file: %s' % fullfilename | |
|
881 | 880 | else: |
|
881 | raise schainpy.admin.SchainError('Time for waiting new files reach') | |
|
882 | 882 | self.fileSize = 0 |
|
883 | 883 | self.filename = None |
|
884 | 884 | self.flagIsNewFile = 0 |
|
885 | 885 | self.fp = None |
|
886 | 886 | self.flagNoMoreFiles = 1 |
|
887 | # print '[Reading] No more files to read' | |
|
888 | 887 | |
|
889 | 888 | return fileOk_flag |
|
890 | 889 | |
@@ -898,8 +897,8 class JRODataReader(JRODataIO): | |||
|
898 | 897 | newFile = self.__setNextFileOffline() |
|
899 | 898 | |
|
900 | 899 | if not(newFile): |
|
901 |
|
|
|
902 |
|
|
|
900 | raise schainpy.admin.SchainWarning('No more files to read') | |
|
901 | ||
|
903 | 902 | |
|
904 | 903 | if self.verbose: |
|
905 | 904 | print('[Reading] Setting the file: %s' % self.filename) |
@@ -942,7 +941,7 class JRODataReader(JRODataIO): | |||
|
942 | 941 | return 0 |
|
943 | 942 | |
|
944 | 943 | print("[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1)) |
|
945 | sleep(self.delay) | |
|
944 | time.sleep(self.delay) | |
|
946 | 945 | |
|
947 | 946 | return 0 |
|
948 | 947 | |
@@ -969,7 +968,7 class JRODataReader(JRODataIO): | |||
|
969 | 968 | "Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1), |
|
970 | 969 | self.name |
|
971 | 970 | ) |
|
972 | sleep(self.delay) | |
|
971 | time.sleep(self.delay) | |
|
973 | 972 | |
|
974 | 973 | return 0 |
|
975 | 974 | |
@@ -1057,8 +1056,7 class JRODataReader(JRODataIO): | |||
|
1057 | 1056 | # Skip block out of startTime and endTime |
|
1058 | 1057 | while True: |
|
1059 | 1058 | if not(self.__setNewBlock()): |
|
1060 |
|
|
|
1061 | return 0 | |
|
1059 | raise schainpy.admin.SchainWarning('No more files to read') | |
|
1062 | 1060 | |
|
1063 | 1061 | if not(self.readBlock()): |
|
1064 | 1062 | return 0 |
@@ -1260,10 +1258,10 class JRODataReader(JRODataIO): | |||
|
1260 | 1258 | pattern_path = multi_path[0] |
|
1261 | 1259 | |
|
1262 | 1260 | if path_empty: |
|
1263 |
|
|
|
1261 | raise schainpy.admin.SchainError("[Reading] No *%s files in %s for %s to %s" % (ext, pattern_path, startDate, endDate)) | |
|
1264 | 1262 | else: |
|
1265 | 1263 | if not dateList: |
|
1266 |
|
|
|
1264 | raise schainpy.admin.SchainError("[Reading] Date range selected invalid [%s - %s]: No *%s files in %s)" % (startDate, endDate, ext, path)) | |
|
1267 | 1265 | |
|
1268 | 1266 | if include_path: |
|
1269 | 1267 | return dateList, pathList |
@@ -1296,6 +1294,17 class JRODataReader(JRODataIO): | |||
|
1296 | 1294 | oneDDict=None, |
|
1297 | 1295 | twoDDict=None, |
|
1298 | 1296 | independentParam=None): |
|
1297 | ||
|
1298 | self.online = online | |
|
1299 | self.realtime = realtime | |
|
1300 | self.delay = delay | |
|
1301 | self.getByBlock = getblock | |
|
1302 | self.nTxs = nTxs | |
|
1303 | self.startTime = startTime | |
|
1304 | self.endTime = endTime | |
|
1305 | self.endDate = endDate | |
|
1306 | self.startDate = startDate | |
|
1307 | ||
|
1299 | 1308 | if server is not None: |
|
1300 | 1309 | if 'tcp://' in server: |
|
1301 | 1310 | address = server |
@@ -1326,10 +1335,10 class JRODataReader(JRODataIO): | |||
|
1326 | 1335 | break |
|
1327 | 1336 | |
|
1328 | 1337 | print('[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (self.delay, path, nTries + 1)) |
|
1329 | sleep(self.delay) | |
|
1338 | time.sleep(self.delay) | |
|
1330 | 1339 | |
|
1331 | 1340 | if not(fullpath): |
|
1332 |
|
|
|
1341 | raise schainpy.admin.SchainError('There isn\'t any valid file in {}'.format(path)) | |
|
1333 | 1342 | return |
|
1334 | 1343 | |
|
1335 | 1344 | self.year = year |
@@ -1359,17 +1368,10 class JRODataReader(JRODataIO): | |||
|
1359 | 1368 | basename, ext = os.path.splitext(file_name) |
|
1360 | 1369 | last_set = int(basename[-3:]) |
|
1361 | 1370 | |
|
1362 | self.online = online | |
|
1363 | self.realtime = realtime | |
|
1364 | self.delay = delay | |
|
1371 | ||
|
1365 | 1372 | ext = ext.lower() |
|
1366 | 1373 | self.ext = ext |
|
1367 | self.getByBlock = getblock | |
|
1368 | self.nTxs = nTxs | |
|
1369 | self.startTime = startTime | |
|
1370 | self.endTime = endTime | |
|
1371 | self.endDate = endDate | |
|
1372 | self.startDate = startDate | |
|
1374 | ||
|
1373 | 1375 | # Added----------------- |
|
1374 | 1376 | self.selBlocksize = blocksize |
|
1375 | 1377 | self.selBlocktime = blocktime |
@@ -1391,8 +1393,6 class JRODataReader(JRODataIO): | |||
|
1391 | 1393 | self.filenameList = [] |
|
1392 | 1394 | return |
|
1393 | 1395 | |
|
1394 | # self.getBasicHeader() | |
|
1395 | ||
|
1396 | 1396 | if last_set != None: |
|
1397 | 1397 | self.dataOut.last_block = last_set * \ |
|
1398 | 1398 | self.processingHeaderObj.dataBlocksPerFile + self.basicHeaderObj.dataBlock |
@@ -12,20 +12,16 Created on Jul 3, 2014 | |||
|
12 | 12 | # METADATA |
|
13 | 13 | |
|
14 | 14 | import os |
|
15 | import time | |
|
15 | 16 | import datetime |
|
16 | 17 | import numpy |
|
17 | 18 | import timeit |
|
18 | 19 | from fractions import Fraction |
|
19 | 20 | |
|
20 | try: | |
|
21 | from gevent import sleep | |
|
22 | except: | |
|
23 | from time import sleep | |
|
24 | ||
|
21 | import schainpy.admin | |
|
25 | 22 | from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader |
|
26 | 23 | from schainpy.model.data.jrodata import Voltage |
|
27 | 24 | from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator |
|
28 | from time import time | |
|
29 | 25 | |
|
30 | 26 | import pickle |
|
31 | 27 | try: |
@@ -461,9 +457,9 class DigitalRFReader(ProcessingUnit): | |||
|
461 | 457 | return False |
|
462 | 458 | |
|
463 | 459 | def timeit(self, toExecute): |
|
464 | t0 = time() | |
|
460 | t0 = time.time() | |
|
465 | 461 | toExecute() |
|
466 | self.executionTime = time() - t0 | |
|
462 | self.executionTime = time.time() - t0 | |
|
467 | 463 | if self.oldAverage is None: |
|
468 | 464 | self.oldAverage = self.executionTime |
|
469 | 465 | self.oldAverage = (self.executionTime + self.count * |
@@ -569,24 +565,24 class DigitalRFReader(ProcessingUnit): | |||
|
569 | 565 | if self.__readNextBlock(): |
|
570 | 566 | break |
|
571 | 567 | if self.__thisUnixSample > self.__endUTCSecond * self.__sample_rate: |
|
572 |
|
|
|
568 | raise schainpy.admin.SchainError('Error') | |
|
573 | 569 | return |
|
574 | 570 | |
|
575 | 571 | if self.__flagDiscontinuousBlock: |
|
576 |
|
|
|
572 | raise schainpy.admin.SchainError('discontinuous block found') | |
|
577 | 573 | return |
|
578 | 574 | |
|
579 | 575 | if not self.__online: |
|
580 |
|
|
|
576 | raise schainpy.admin.SchainError('Online?') | |
|
581 | 577 | return |
|
582 | 578 | |
|
583 | 579 | err_counter += 1 |
|
584 | 580 | if err_counter > nTries: |
|
585 |
|
|
|
581 | raise schainpy.admin.SchainError('Max retrys reach') | |
|
586 | 582 | return |
|
587 | 583 | |
|
588 | 584 | print('[Reading] waiting %d seconds to read a new block' % seconds) |
|
589 | sleep(seconds) | |
|
585 | time.sleep(seconds) | |
|
590 | 586 | |
|
591 | 587 | self.dataOut.data = self.__data_buffer[:, |
|
592 | 588 | self.__bufferIndex:self.__bufferIndex + self.__nSamples] |
@@ -833,7 +833,6 class HFReader(ProcessingUnit): | |||
|
833 | 833 | def getData(self): |
|
834 | 834 | if self.flagNoMoreFiles: |
|
835 | 835 | self.dataOut.flagNoData = True |
|
836 | print('Process finished') | |
|
837 | 836 | return 0 |
|
838 | 837 | |
|
839 | 838 | if self.__hasNotDataInBuffer(): |
@@ -585,7 +585,6 class AMISRReader(ProcessingUnit): | |||
|
585 | 585 | |
|
586 | 586 | if self.flagNoMoreFiles: |
|
587 | 587 | self.dataOut.flagNoData = True |
|
588 | print('Process finished') | |
|
589 | 588 | return 0 |
|
590 | 589 | |
|
591 | 590 | if self.__hasNotDataInBuffer(): |
@@ -13,6 +13,8 import datetime | |||
|
13 | 13 | |
|
14 | 14 | import numpy |
|
15 | 15 | import h5py |
|
16 | ||
|
17 | import schainpy.admin | |
|
16 | 18 | from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter |
|
17 | 19 | from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator |
|
18 | 20 | from schainpy.model.data.jrodata import Parameters |
@@ -390,7 +392,7 class MADReader(JRODataReader, ProcessingUnit): | |||
|
390 | 392 | ''' |
|
391 | 393 | if self.flagNoMoreFiles: |
|
392 | 394 | self.dataOut.flagNoData = True |
|
393 |
|
|
|
395 | raise schainpy.admin.SchainError('No file left to process') | |
|
394 | 396 | return 0 |
|
395 | 397 | |
|
396 | 398 | if not self.readNextBlock(): |
@@ -5,6 +5,7 import h5py | |||
|
5 | 5 | import re |
|
6 | 6 | import datetime |
|
7 | 7 | |
|
8 | import schainpy.admin | |
|
8 | 9 | from schainpy.model.data.jrodata import * |
|
9 | 10 | from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator |
|
10 | 11 | from schainpy.model.io.jroIO_base import * |
@@ -233,7 +234,7 class ParamReader(JRODataReader,ProcessingUnit): | |||
|
233 | 234 | idFile = self.fileIndex |
|
234 | 235 | |
|
235 | 236 | if not(idFile < len(self.filenameList)): |
|
236 |
|
|
|
237 | raise schainpy.admin.SchainError("No more Files") | |
|
237 | 238 | return 0 |
|
238 | 239 | |
|
239 | 240 | filename = self.filenameList[idFile] |
@@ -1165,8 +1166,7 class ParameterReader(JRODataReader,ProcessingUnit): | |||
|
1165 | 1166 | idFile = self.fileIndex |
|
1166 | 1167 | |
|
1167 | 1168 | if not(idFile < len(self.filenameList)): |
|
1168 | self.dataOut.error = 'No more files' | |
|
1169 | return 0 | |
|
1169 | raise schainpy.admin.SchainError('No more files') | |
|
1170 | 1170 | |
|
1171 | 1171 | filename = self.filenameList[idFile] |
|
1172 | 1172 | self.fp = h5py.File(filename, 'r') |
@@ -92,7 +92,6 class SpectraReader(JRODataReader, ProcessingUnit): | |||
|
92 | 92 | #Eliminar de la base la herencia |
|
93 | 93 | ProcessingUnit.__init__(self)#, **kwargs) |
|
94 | 94 | |
|
95 | # self.isConfig = False | |
|
96 | 95 | |
|
97 | 96 | self.pts2read_SelfSpectra = 0 |
|
98 | 97 | |
@@ -160,7 +159,6 class SpectraReader(JRODataReader, ProcessingUnit): | |||
|
160 | 159 | |
|
161 | 160 | self.__isFirstTimeOnline = 1 |
|
162 | 161 | |
|
163 | # self.ippSeconds = 0 | |
|
164 | 162 | |
|
165 | 163 | self.flagDiscontinuousBlock = 0 |
|
166 | 164 | |
@@ -356,7 +354,6 class SpectraReader(JRODataReader, ProcessingUnit): | |||
|
356 | 354 | |
|
357 | 355 | if self.flagNoMoreFiles: |
|
358 | 356 | self.dataOut.flagNoData = True |
|
359 | print('Process finished') | |
|
360 | 357 | return 0 |
|
361 | 358 | |
|
362 | 359 | self.flagDiscontinuousBlock = 0 |
@@ -17,10 +17,10 import inspect | |||
|
17 | 17 | import zmq |
|
18 | 18 | import time |
|
19 | 19 | import pickle |
|
20 | import traceback | |
|
20 | 21 | from queue import Queue |
|
21 | 22 | from threading import Thread |
|
22 | 23 | from multiprocessing import Process |
|
23 | from zmq.utils.monitor import recv_monitor_message | |
|
24 | 24 | |
|
25 | 25 | from schainpy.utils import log |
|
26 | 26 | |
@@ -219,22 +219,18 def MPDecorator(BaseClass): | |||
|
219 | 219 | self.receiver = None |
|
220 | 220 | self.i = 0 |
|
221 | 221 | self.name = BaseClass.__name__ |
|
222 | ||
|
222 | 223 | if 'plot' in self.name.lower() and not self.name.endswith('_'): |
|
223 | 224 | self.name = '{}{}'.format(self.CODE.upper(), 'Plot') |
|
224 | self.start_time = time.time() | |
|
225 | 225 | |
|
226 | if len(self.args) is 3: | |
|
227 | self.typeProc = "ProcUnit" | |
|
226 | self.start_time = time.time() | |
|
228 | 227 |
|
|
229 | 228 |
|
|
230 | 229 |
|
|
231 | elif len(self.args) is 2: | |
|
232 |
|
|
|
233 | self.inputId = args[0] | |
|
234 | self.project_id = args[1] | |
|
235 | self.typeProc = "Operation" | |
|
236 | ||
|
230 | self.err_queue = args[3] | |
|
231 | self.typeProc = args[4] | |
|
237 | 232 | self.queue = InputQueue(self.project_id, self.inputId) |
|
233 | self.err_queue.put('#_start_#') | |
|
238 | 234 | |
|
239 | 235 | def subscribe(self): |
|
240 | 236 | ''' |
@@ -271,7 +267,7 def MPDecorator(BaseClass): | |||
|
271 | 267 | |
|
272 | 268 | if self.inputId is None: |
|
273 | 269 | self.i += 1 |
|
274 |
if self.i % |
|
|
270 | if self.i % 80 == 0: | |
|
275 | 271 | self.i = 0 |
|
276 | 272 | time.sleep(0.01) |
|
277 | 273 | |
@@ -283,7 +279,15 def MPDecorator(BaseClass): | |||
|
283 | 279 | ''' |
|
284 | 280 | while True: |
|
285 | 281 | |
|
282 | try: | |
|
286 | 283 | BaseClass.run(self, **self.kwargs) |
|
284 | except: | |
|
285 | err = traceback.format_exc() | |
|
286 | if 'No more files' in err: | |
|
287 | log.warning('No more files to read', self.name) | |
|
288 | else: | |
|
289 | self.err_queue.put('{}|{}'.format(self.name, err)) | |
|
290 | self.dataOut.error = True | |
|
287 | 291 | |
|
288 | 292 | for op, optype, opId, kwargs in self.operations: |
|
289 | 293 | if optype == 'self' and not self.dataOut.flagNoData: |
@@ -299,11 +303,9 def MPDecorator(BaseClass): | |||
|
299 | 303 | self.publish(self.dataOut, self.id) |
|
300 | 304 | |
|
301 | 305 | if self.dataOut.error: |
|
302 | log.error(self.dataOut.error, self.name) | |
|
303 | # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()]) | |
|
304 | 306 | break |
|
305 | 307 | |
|
306 |
time.sleep( |
|
|
308 | time.sleep(0.5) | |
|
307 | 309 | |
|
308 | 310 | def runProc(self): |
|
309 | 311 | ''' |
@@ -315,10 +317,13 def MPDecorator(BaseClass): | |||
|
315 | 317 | |
|
316 | 318 | if self.dataIn.flagNoData and self.dataIn.error is None: |
|
317 | 319 | continue |
|
318 | ||
|
320 | elif not self.dataIn.error: | |
|
321 | try: | |
|
319 | 322 | BaseClass.run(self, **self.kwargs) |
|
320 | ||
|
321 | if self.dataIn.error: | |
|
323 | except: | |
|
324 | self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc())) | |
|
325 | self.dataOut.error = True | |
|
326 | elif self.dataIn.error: | |
|
322 | 327 | self.dataOut.error = self.dataIn.error |
|
323 | 328 | self.dataOut.flagNoData = True |
|
324 | 329 | |
@@ -330,20 +335,15 def MPDecorator(BaseClass): | |||
|
330 | 335 | elif optype == 'external' and not self.dataOut.flagNoData: |
|
331 | 336 | self.publish(self.dataOut, opId) |
|
332 | 337 | |
|
333 | if not self.dataOut.flagNoData or self.dataOut.error: | |
|
334 | 338 |
|
|
335 | 339 |
|
|
336 |
|
|
|
337 | op(**kwargs) | |
|
338 | elif optype == 'other' and self.dataOut.error: | |
|
339 | self.dataOut = op.run(self.dataOut, **kwargs) | |
|
340 | elif optype == 'external' and self.dataOut.error: | |
|
340 | if optype == 'external' and self.dataOut.error: | |
|
341 | 341 |
|
|
342 | 342 | |
|
343 |
if self.data |
|
|
343 | if self.dataOut.error: | |
|
344 | 344 | break |
|
345 | 345 | |
|
346 |
time.sleep( |
|
|
346 | time.sleep(0.5) | |
|
347 | 347 | |
|
348 | 348 | def runOp(self): |
|
349 | 349 | ''' |
@@ -355,18 +355,15 def MPDecorator(BaseClass): | |||
|
355 | 355 | |
|
356 | 356 | dataOut = self.listen() |
|
357 | 357 | |
|
358 | if not dataOut.error: | |
|
358 | 359 | BaseClass.run(self, dataOut, **self.kwargs) |
|
359 | ||
|
360 | if dataOut.error: | |
|
360 | else: | |
|
361 | 361 | break |
|
362 | 362 | |
|
363 | time.sleep(1) | |
|
364 | ||
|
365 | 363 | def run(self): |
|
366 | 364 | if self.typeProc is "ProcUnit": |
|
367 | 365 | |
|
368 | 366 | if self.inputId is not None: |
|
369 | ||
|
370 | 367 | self.subscribe() |
|
371 | 368 | |
|
372 | 369 | self.set_publisher() |
@@ -386,32 +383,10 def MPDecorator(BaseClass): | |||
|
386 | 383 | |
|
387 | 384 | self.close() |
|
388 | 385 | |
|
389 | def event_monitor(self, monitor): | |
|
390 | ||
|
391 | events = {} | |
|
392 | ||
|
393 | for name in dir(zmq): | |
|
394 | if name.startswith('EVENT_'): | |
|
395 | value = getattr(zmq, name) | |
|
396 | events[value] = name | |
|
397 | ||
|
398 | while monitor.poll(): | |
|
399 | evt = recv_monitor_message(monitor) | |
|
400 | if evt['event'] == 32: | |
|
401 | self.connections += 1 | |
|
402 | if evt['event'] == 512: | |
|
403 | pass | |
|
404 | ||
|
405 | evt.update({'description': events[evt['event']]}) | |
|
406 | ||
|
407 | if evt['event'] == zmq.EVENT_MONITOR_STOPPED: | |
|
408 | break | |
|
409 | monitor.close() | |
|
410 | print('event monitor thread done!') | |
|
411 | ||
|
412 | 386 | def close(self): |
|
413 | 387 | |
|
414 | 388 | BaseClass.close(self) |
|
389 | self.err_queue.put('#_end_#') | |
|
415 | 390 | |
|
416 | 391 | if self.sender: |
|
417 | 392 | self.sender.close() |
General Comments 0
You need to be logged in to leave comments.
Login now