##// END OF EJS Templates
Errors handling and gracefully terminate main process
jespinoza -
r1241:c3044f867269
parent child
Show More
@@ -68,7 +68,6 class Alarm(Process):
68 @staticmethod
68 @staticmethod
69 def send_email(**kwargs):
69 def send_email(**kwargs):
70 notifier = SchainNotify()
70 notifier = SchainNotify()
71 print(kwargs)
72 notifier.notify(**kwargs)
71 notifier.notify(**kwargs)
73
72
74 @staticmethod
73 @staticmethod
@@ -290,7 +289,7 class SchainNotify:
290
289
291 msg = MIMEMultipart()
290 msg = MIMEMultipart()
292 msg['Subject'] = subject
291 msg['Subject'] = subject
293 msg['From'] = "(Python SChain API): " + email_from
292 msg['From'] = "SChain API (v{}) <{}>".format(schainpy.__version__, email_from)
294 msg['Reply-to'] = email_from
293 msg['Reply-to'] = email_from
295 msg['To'] = email_to
294 msg['To'] = email_to
296
295
@@ -330,7 +329,7 class SchainNotify:
330 smtp.login(self.__emailFromAddress, self.__emailPass)
329 smtp.login(self.__emailFromAddress, self.__emailPass)
331
330
332 # Send the email
331 # Send the email
333 try:
332 try:
334 smtp.sendmail(msg['From'], msg['To'], msg.as_string())
333 smtp.sendmail(msg['From'], msg['To'], msg.as_string())
335 except:
334 except:
336 log.error('Could not send the email to {}'.format(msg['To']), 'System')
335 log.error('Could not send the email to {}'.format(msg['To']), 'System')
@@ -11,7 +11,7 import traceback
11 import math
11 import math
12 import time
12 import time
13 import zmq
13 import zmq
14 from multiprocessing import Process, cpu_count
14 from multiprocessing import Process, Queue, cpu_count
15 from threading import Thread
15 from threading import Thread
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 from xml.dom import minidom
17 from xml.dom import minidom
@@ -361,13 +361,14 class OperationConf():
361
361
362 return kwargs
362 return kwargs
363
363
364 def setup(self, id, name, priority, type, project_id):
364 def setup(self, id, name, priority, type, project_id, err_queue):
365
365
366 self.id = str(id)
366 self.id = str(id)
367 self.project_id = project_id
367 self.project_id = project_id
368 self.name = name
368 self.name = name
369 self.type = type
369 self.type = type
370 self.priority = priority
370 self.priority = priority
371 self.err_queue = err_queue
371 self.parmConfObjList = []
372 self.parmConfObjList = []
372
373
373 def removeParameters(self):
374 def removeParameters(self):
@@ -459,8 +460,9 class OperationConf():
459 opObj = className()
460 opObj = className()
460 elif self.type == 'external':
461 elif self.type == 'external':
461 kwargs = self.getKwargs()
462 kwargs = self.getKwargs()
462 opObj = className(self.id, self.project_id, **kwargs)
463 opObj = className(self.id, self.id, self.project_id, self.err_queue, 'Operation', **kwargs)
463 opObj.start()
464 opObj.start()
465 self.opObj = opObj
464
466
465 return opObj
467 return opObj
466
468
@@ -548,7 +550,7 class ProcUnitConf():
548
550
549 return self.procUnitObj
551 return self.procUnitObj
550
552
551 def setup(self, project_id, id, name, datatype, inputId):
553 def setup(self, project_id, id, name, datatype, inputId, err_queue):
552 '''
554 '''
553 id sera el topico a publicar
555 id sera el topico a publicar
554 inputId sera el topico a subscribirse
556 inputId sera el topico a subscribirse
@@ -573,7 +575,8 class ProcUnitConf():
573 self.project_id = project_id
575 self.project_id = project_id
574 self.name = name
576 self.name = name
575 self.datatype = datatype
577 self.datatype = datatype
576 self.inputId = inputId
578 self.inputId = inputId
579 self.err_queue = err_queue
577 self.opConfObjList = []
580 self.opConfObjList = []
578
581
579 self.addOperation(name='run', optype='self')
582 self.addOperation(name='run', optype='self')
@@ -607,7 +610,7 class ProcUnitConf():
607 id = self.__getNewId()
610 id = self.__getNewId()
608 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
611 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
609 opConfObj = OperationConf()
612 opConfObj = OperationConf()
610 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id)
613 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue)
611 self.opConfObjList.append(opConfObj)
614 self.opConfObjList.append(opConfObj)
612
615
613 return opConfObj
616 return opConfObj
@@ -675,7 +678,7 class ProcUnitConf():
675
678
676 className = eval(self.name)
679 className = eval(self.name)
677 kwargs = self.getKwargs()
680 kwargs = self.getKwargs()
678 procUnitObj = className(self.id, self.inputId, self.project_id, **kwargs) # necesitan saber su id y su entrada por fines de ipc
681 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, 'ProcUnit', **kwargs)
679 log.success('creating process...', self.name)
682 log.success('creating process...', self.name)
680
683
681 for opConfObj in self.opConfObjList:
684 for opConfObj in self.opConfObjList:
@@ -687,7 +690,7 class ProcUnitConf():
687 else:
690 else:
688 opObj = opConfObj.createObject()
691 opObj = opConfObj.createObject()
689
692
690 log.success('creating operation: {}, type:{}'.format(
693 log.success('adding operation: {}, type:{}'.format(
691 opConfObj.name,
694 opConfObj.name,
692 opConfObj.type), self.name)
695 opConfObj.type), self.name)
693
696
@@ -726,7 +729,7 class ReadUnitConf(ProcUnitConf):
726
729
727 return self.ELEMENTNAME
730 return self.ELEMENTNAME
728
731
729 def setup(self, project_id, id, name, datatype, path='', startDate='', endDate='',
732 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
730 startTime='', endTime='', server=None, **kwargs):
733 startTime='', endTime='', server=None, **kwargs):
731
734
732
735
@@ -765,6 +768,7 class ReadUnitConf(ProcUnitConf):
765 self.startTime = startTime
768 self.startTime = startTime
766 self.endTime = endTime
769 self.endTime = endTime
767 self.server = server
770 self.server = server
771 self.err_queue = err_queue
768 self.addRunOperation(**kwargs)
772 self.addRunOperation(**kwargs)
769
773
770 def update(self, **kwargs):
774 def update(self, **kwargs):
@@ -878,6 +882,7 class Project(Process):
878 self.email = None
882 self.email = None
879 self.alarm = None
883 self.alarm = None
880 self.procUnitConfObjDict = {}
884 self.procUnitConfObjDict = {}
885 self.err_queue = Queue()
881
886
882 def __getNewId(self):
887 def __getNewId(self):
883
888
@@ -935,6 +940,8 class Project(Process):
935 self.description = description
940 self.description = description
936 self.email = email
941 self.email = email
937 self.alarm = alarm
942 self.alarm = alarm
943 if name:
944 self.name = '{} ({})'.format(Process.__name__, name)
938
945
939 def update(self, **kwargs):
946 def update(self, **kwargs):
940
947
@@ -963,7 +970,7 class Project(Process):
963 idReadUnit = str(id)
970 idReadUnit = str(id)
964
971
965 readUnitConfObj = ReadUnitConf()
972 readUnitConfObj = ReadUnitConf()
966 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, **kwargs)
973 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
967 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
974 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
968
975
969 return readUnitConfObj
976 return readUnitConfObj
@@ -980,9 +987,9 class Project(Process):
980
987
981 '''
988 '''
982
989
983 idProcUnit = self.__getNewId() #Topico para subscripcion
990 idProcUnit = self.__getNewId()
984 procUnitConfObj = ProcUnitConf()
991 procUnitConfObj = ProcUnitConf()
985 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId) #topic_read, topic_write,
992 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
986 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
993 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
987
994
988 return procUnitConfObj
995 return procUnitConfObj
@@ -1119,10 +1126,10 class Project(Process):
1119
1126
1120 def __str__(self):
1127 def __str__(self):
1121
1128
1122 print('Project[%s]: name = %s, description = %s, project_id = %s' % (self.id,
1129 print('Project: name = %s, description = %s, id = %s' % (
1123 self.name,
1130 self.name,
1124 self.description,
1131 self.description,
1125 self.project_id))
1132 self.id))
1126
1133
1127 for procUnitConfObj in self.procUnitConfObjDict.values():
1134 for procUnitConfObj in self.procUnitConfObjDict.values():
1128 print(procUnitConfObj)
1135 print(procUnitConfObj)
@@ -1135,58 +1142,84 class Project(Process):
1135 for key in keys:
1142 for key in keys:
1136 self.procUnitConfObjDict[key].createObjects()
1143 self.procUnitConfObjDict[key].createObjects()
1137
1144
1138 def __handleError(self, procUnitConfObj, modes=None, stdout=True):
1145 def monitor(self):
1139
1146
1140 import socket
1147 t = Thread(target=self.__monitor, args=(self.err_queue, self.ctx))
1148 t.start()
1149
1150 def __monitor(self, queue, ctx):
1141
1151
1142 if modes is None:
1152 import socket
1143 modes = self.alarm
1144
1153
1145 if not self.alarm:
1154 procs = 0
1146 modes = []
1155 err_msg = ''
1147
1156
1148 err = traceback.format_exception(sys.exc_info()[0],
1157 while True:
1149 sys.exc_info()[1],
1158 msg = queue.get()
1150 sys.exc_info()[2])
1159 if '#_start_#' in msg:
1160 procs += 1
1161 elif '#_end_#' in msg:
1162 procs -=1
1163 else:
1164 err_msg = msg
1165
1166 if procs == 0 or 'Traceback' in err_msg:
1167 break
1168 time.sleep(0.1)
1169
1170 if '|' in err_msg:
1171 name, err = err_msg.split('|')
1172 if 'SchainWarning' in err:
1173 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
1174 elif 'SchainError' in err:
1175 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
1176 else:
1177 log.error(err, name)
1178 else:
1179 name, err = self.name, err_msg
1180
1181 time.sleep(2)
1151
1182
1152 log.error('{}'.format(err[-1]), procUnitConfObj.name)
1183 for conf in self.procUnitConfObjDict.values():
1184 for confop in conf.opConfObjList:
1185 if confop.type == 'external':
1186 confop.opObj.terminate()
1187 conf.procUnitObj.terminate()
1188
1189 ctx.term()
1153
1190
1154 message = ''.join(err)
1191 message = ''.join(err)
1155
1192
1156 if stdout:
1193 if err_msg:
1157 sys.stderr.write(message)
1194 subject = 'SChain v%s: Error running %s\n' % (
1158
1195 schainpy.__version__, self.name)
1159 subject = 'SChain v%s: Error running %s\n' % (
1196
1160 schainpy.__version__, procUnitConfObj.name)
1197 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
1161
1198 socket.gethostname())
1162 subtitle = '%s: %s\n' % (
1199 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
1163 procUnitConfObj.getElementName(), procUnitConfObj.name)
1200 subtitle += 'Configuration file: %s\n' % self.filename
1164 subtitle += 'Hostname: %s\n' % socket.gethostbyname(
1201 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1165 socket.gethostname())
1202
1166 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
1203 readUnitConfObj = self.getReadUnitObj()
1167 subtitle += 'Configuration file: %s\n' % self.filename
1204 if readUnitConfObj:
1168 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1205 subtitle += '\nInput parameters:\n'
1169
1206 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
1170 readUnitConfObj = self.getReadUnitObj()
1207 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
1171 if readUnitConfObj:
1208 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
1172 subtitle += '\nInput parameters:\n'
1209 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
1173 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
1210 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
1174 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
1211 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1175 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
1212
1176 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
1213 a = Alarm(
1177 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
1214 modes=self.alarm,
1178 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1215 email=self.email,
1179
1216 message=message,
1180 a = Alarm(
1217 subject=subject,
1181 modes=modes,
1218 subtitle=subtitle,
1182 email=self.email,
1219 filename=self.filename
1183 message=message,
1220 )
1184 subject=subject,
1221
1185 subtitle=subtitle,
1222 a.start()
1186 filename=self.filename
1187 )
1188
1189 return a
1190
1223
1191 def isPaused(self):
1224 def isPaused(self):
1192 return 0
1225 return 0
@@ -1223,7 +1256,7 class Project(Process):
1223
1256
1224 self.filename = filename
1257 self.filename = filename
1225
1258
1226 def setProxyCom(self):
1259 def setProxy(self):
1227
1260
1228 if not os.path.exists('/tmp/schain'):
1261 if not os.path.exists('/tmp/schain'):
1229 os.mkdir('/tmp/schain')
1262 os.mkdir('/tmp/schain')
@@ -1233,24 +1266,19 class Project(Process):
1233 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1266 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1234 xsub = self.ctx.socket(zmq.XSUB)
1267 xsub = self.ctx.socket(zmq.XSUB)
1235 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1268 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1236
1269 self.monitor()
1237 try:
1270 try:
1238 zmq.proxy(xpub, xsub)
1271 zmq.proxy(xpub, xsub)
1239 except: # zmq.ContextTerminated:
1272 except zmq.ContextTerminated:
1240 xpub.close()
1273 xpub.close()
1241 xsub.close()
1274 xsub.close()
1242
1275
1243 def run(self):
1276 def run(self):
1244
1277
1245 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1278 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1246 self.start_time = time.time()
1279 self.start_time = time.time()
1247 self.createObjects()
1280 self.createObjects()
1248 # t = Thread(target=wait, args=(self.ctx, ))
1281 self.setProxy()
1249 # t.start()
1282 log.success('{} Done (Time: {}s)'.format(
1250 self.setProxyCom()
1251
1252 # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo
1253
1254 log.success('{} Done (time: {}s)'.format(
1255 self.name,
1283 self.name,
1256 time.time()-self.start_time))
1284 time.time()-self.start_time), '')
@@ -13,6 +13,7 import datetime
13
13
14 import numpy
14 import numpy
15
15
16 import schainpy.admin
16 from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator
17 from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator
17 from schainpy.model.data.jrodata import Parameters
18 from schainpy.model.data.jrodata import Parameters
18 from schainpy.model.io.jroIO_base import JRODataReader, isNumber
19 from schainpy.model.io.jroIO_base import JRODataReader, isNumber
@@ -394,12 +395,11 class BLTRParamReader(JRODataReader, ProcessingUnit):
394 '''
395 '''
395 if self.flagNoMoreFiles:
396 if self.flagNoMoreFiles:
396 self.dataOut.flagNoData = True
397 self.dataOut.flagNoData = True
397 self.dataOut.error = 'No More files to read'
398 raise schainpy.admin.SchainError('No More files to read')
398 return
399
399
400 if not self.readNextBlock():
400 if not self.readNextBlock():
401 self.dataOut.flagNoData = True
401 self.dataOut.flagNoData = True
402 self.dataOut.error = 'Time for wait new file reach!!!'
402 raise schainpy.admin.SchainError('Time for wait new file reach!!!')
403
403
404 self.set_output()
404 self.set_output()
405
405
@@ -15,13 +15,9 from scipy import asarray as ar, exp
15 SPEED_OF_LIGHT = 299792458
15 SPEED_OF_LIGHT = 299792458
16 SPEED_OF_LIGHT = 3e8
16 SPEED_OF_LIGHT = 3e8
17
17
18 try:
19 from gevent import sleep
20 except:
21 from time import sleep
22
23 from .utils import folder_in_range
18 from .utils import folder_in_range
24
19
20 import schainpy.admin
25 from schainpy.model.data.jrodata import Spectra
21 from schainpy.model.data.jrodata import Spectra
26 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
22 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
27 from schainpy.utils import log
23 from schainpy.utils import log
@@ -341,7 +337,7 class BLTRSpectraReader (ProcessingUnit):
341
337
342 if self.flagNoMoreFiles:
338 if self.flagNoMoreFiles:
343 self.dataOut.flagNoData = True
339 self.dataOut.flagNoData = True
344 self.dataOut.error = 'No more files'
340 raise schainpy.admin.SchainError('No more files')
345
341
346 self.readBlock()
342 self.readBlock()
347
343
@@ -661,7 +661,6 class AMISRReader(ProcessingUnit):
661
661
662 if self.flagNoMoreFiles:
662 if self.flagNoMoreFiles:
663 self.dataOut.flagNoData = True
663 self.dataOut.flagNoData = True
664 print('Process finished')
665 return 0
664 return 0
666
665
667 if self.__hasNotDataInBuffer():
666 if self.__hasNotDataInBuffer():
@@ -15,11 +15,6 import datetime
15 import traceback
15 import traceback
16 import zmq
16 import zmq
17
17
18 try:
19 from gevent import sleep
20 except:
21 from time import sleep
22
23 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
18 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
24 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
19 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
25 from schainpy.utils import log
20 from schainpy.utils import log
@@ -771,7 +766,6 class JRODataReader(JRODataIO):
771 idFile += 1
766 idFile += 1
772 if not(idFile < len(self.filenameList)):
767 if not(idFile < len(self.filenameList)):
773 self.flagNoMoreFiles = 1
768 self.flagNoMoreFiles = 1
774 # print "[Reading] No more Files"
775 return 0
769 return 0
776
770
777 filename = self.filenameList[idFile]
771 filename = self.filenameList[idFile]
@@ -843,10 +837,14 class JRODataReader(JRODataIO):
843
837
844 for nTries in range(tries):
838 for nTries in range(tries):
845 if firstTime_flag:
839 if firstTime_flag:
846 print("\t[Reading] Waiting %0.2f sec for the next file: \"%s\" , try %03d ..." % (self.delay, filename, nTries + 1))
840 log.warning(
847 sleep(self.delay)
841 "Waiting %0.2f sec for the next file: \"%s\" , try %03d ..." % (self.delay, filename, nTries + 1),
842 self.name)
843 time.sleep(self.delay)
848 else:
844 else:
849 print("\t[Reading] Searching the next \"%s%04d%03d%03d%s\" file ..." % (self.optchar, self.year, self.doy, self.set, self.ext))
845 log.warning(
846 "Searching the next \"%s%04d%03d%03d%s\" file ..." % (self.optchar, self.year, self.doy, self.set, self.ext),
847 self.name)
850
848
851 fullfilename, filename = checkForRealPath(
849 fullfilename, filename = checkForRealPath(
852 self.path, self.foldercounter, self.year, self.doy, self.set, self.ext)
850 self.path, self.foldercounter, self.year, self.doy, self.set, self.ext)
@@ -860,7 +858,9 class JRODataReader(JRODataIO):
860
858
861 firstTime_flag = False
859 firstTime_flag = False
862
860
863 log.warning('Skipping the file {} due to this file doesn\'t exist'.format(filename))
861 log.warning(
862 'Skipping the file {} due to this file doesn\'t exist'.format(filename),
863 self.name)
864 self.set += 1
864 self.set += 1
865
865
866 # si no encuentro el file buscado cambio de carpeta y busco en la siguiente carpeta
866 # si no encuentro el file buscado cambio de carpeta y busco en la siguiente carpeta
@@ -877,14 +877,13 class JRODataReader(JRODataIO):
877 self.fp.close()
877 self.fp.close()
878 self.fp = open(fullfilename, 'rb')
878 self.fp = open(fullfilename, 'rb')
879 self.flagNoMoreFiles = 0
879 self.flagNoMoreFiles = 0
880 # print '[Reading] Setting the file: %s' % fullfilename
881 else:
880 else:
881 raise schainpy.admin.SchainError('Time for waiting new files reach')
882 self.fileSize = 0
882 self.fileSize = 0
883 self.filename = None
883 self.filename = None
884 self.flagIsNewFile = 0
884 self.flagIsNewFile = 0
885 self.fp = None
885 self.fp = None
886 self.flagNoMoreFiles = 1
886 self.flagNoMoreFiles = 1
887 # print '[Reading] No more files to read'
888
887
889 return fileOk_flag
888 return fileOk_flag
890
889
@@ -898,8 +897,8 class JRODataReader(JRODataIO):
898 newFile = self.__setNextFileOffline()
897 newFile = self.__setNextFileOffline()
899
898
900 if not(newFile):
899 if not(newFile):
901 self.dataOut.error = 'No more files to read'
900 raise schainpy.admin.SchainWarning('No more files to read')
902 return 0
901
903
902
904 if self.verbose:
903 if self.verbose:
905 print('[Reading] Setting the file: %s' % self.filename)
904 print('[Reading] Setting the file: %s' % self.filename)
@@ -942,7 +941,7 class JRODataReader(JRODataIO):
942 return 0
941 return 0
943
942
944 print("[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1))
943 print("[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1))
945 sleep(self.delay)
944 time.sleep(self.delay)
946
945
947 return 0
946 return 0
948
947
@@ -969,7 +968,7 class JRODataReader(JRODataIO):
969 "Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1),
968 "Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1),
970 self.name
969 self.name
971 )
970 )
972 sleep(self.delay)
971 time.sleep(self.delay)
973
972
974 return 0
973 return 0
975
974
@@ -1013,7 +1012,7 class JRODataReader(JRODataIO):
1013
1012
1014 # if self.online:
1013 # if self.online:
1015 # self.__jumpToLastBlock()
1014 # self.__jumpToLastBlock()
1016
1015
1017 if self.flagIsNewFile:
1016 if self.flagIsNewFile:
1018 self.lastUTTime = self.basicHeaderObj.utc
1017 self.lastUTTime = self.basicHeaderObj.utc
1019 return 1
1018 return 1
@@ -1057,8 +1056,7 class JRODataReader(JRODataIO):
1057 # Skip block out of startTime and endTime
1056 # Skip block out of startTime and endTime
1058 while True:
1057 while True:
1059 if not(self.__setNewBlock()):
1058 if not(self.__setNewBlock()):
1060 self.dataOut.error = 'No more files to read'
1059 raise schainpy.admin.SchainWarning('No more files to read')
1061 return 0
1062
1060
1063 if not(self.readBlock()):
1061 if not(self.readBlock()):
1064 return 0
1062 return 0
@@ -1260,10 +1258,10 class JRODataReader(JRODataIO):
1260 pattern_path = multi_path[0]
1258 pattern_path = multi_path[0]
1261
1259
1262 if path_empty:
1260 if path_empty:
1263 print("[Reading] No *%s files in %s for %s to %s" % (ext, pattern_path, startDate, endDate))
1261 raise schainpy.admin.SchainError("[Reading] No *%s files in %s for %s to %s" % (ext, pattern_path, startDate, endDate))
1264 else:
1262 else:
1265 if not dateList:
1263 if not dateList:
1266 print("[Reading] Date range selected invalid [%s - %s]: No *%s files in %s)" % (startDate, endDate, ext, path))
1264 raise schainpy.admin.SchainError("[Reading] Date range selected invalid [%s - %s]: No *%s files in %s)" % (startDate, endDate, ext, path))
1267
1265
1268 if include_path:
1266 if include_path:
1269 return dateList, pathList
1267 return dateList, pathList
@@ -1296,6 +1294,17 class JRODataReader(JRODataIO):
1296 oneDDict=None,
1294 oneDDict=None,
1297 twoDDict=None,
1295 twoDDict=None,
1298 independentParam=None):
1296 independentParam=None):
1297
1298 self.online = online
1299 self.realtime = realtime
1300 self.delay = delay
1301 self.getByBlock = getblock
1302 self.nTxs = nTxs
1303 self.startTime = startTime
1304 self.endTime = endTime
1305 self.endDate = endDate
1306 self.startDate = startDate
1307
1299 if server is not None:
1308 if server is not None:
1300 if 'tcp://' in server:
1309 if 'tcp://' in server:
1301 address = server
1310 address = server
@@ -1326,10 +1335,10 class JRODataReader(JRODataIO):
1326 break
1335 break
1327
1336
1328 print('[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (self.delay, path, nTries + 1))
1337 print('[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (self.delay, path, nTries + 1))
1329 sleep(self.delay)
1338 time.sleep(self.delay)
1330
1339
1331 if not(fullpath):
1340 if not(fullpath):
1332 self.dataOut.error = 'There isn\'t any valid file in {}'.format(path)
1341 raise schainpy.admin.SchainError('There isn\'t any valid file in {}'.format(path))
1333 return
1342 return
1334
1343
1335 self.year = year
1344 self.year = year
@@ -1359,17 +1368,10 class JRODataReader(JRODataIO):
1359 basename, ext = os.path.splitext(file_name)
1368 basename, ext = os.path.splitext(file_name)
1360 last_set = int(basename[-3:])
1369 last_set = int(basename[-3:])
1361
1370
1362 self.online = online
1371
1363 self.realtime = realtime
1364 self.delay = delay
1365 ext = ext.lower()
1372 ext = ext.lower()
1366 self.ext = ext
1373 self.ext = ext
1367 self.getByBlock = getblock
1374
1368 self.nTxs = nTxs
1369 self.startTime = startTime
1370 self.endTime = endTime
1371 self.endDate = endDate
1372 self.startDate = startDate
1373 # Added-----------------
1375 # Added-----------------
1374 self.selBlocksize = blocksize
1376 self.selBlocksize = blocksize
1375 self.selBlocktime = blocktime
1377 self.selBlocktime = blocktime
@@ -1391,8 +1393,6 class JRODataReader(JRODataIO):
1391 self.filenameList = []
1393 self.filenameList = []
1392 return
1394 return
1393
1395
1394 # self.getBasicHeader()
1395
1396 if last_set != None:
1396 if last_set != None:
1397 self.dataOut.last_block = last_set * \
1397 self.dataOut.last_block = last_set * \
1398 self.processingHeaderObj.dataBlocksPerFile + self.basicHeaderObj.dataBlock
1398 self.processingHeaderObj.dataBlocksPerFile + self.basicHeaderObj.dataBlock
@@ -12,20 +12,16 Created on Jul 3, 2014
12 # METADATA
12 # METADATA
13
13
14 import os
14 import os
15 import time
15 import datetime
16 import datetime
16 import numpy
17 import numpy
17 import timeit
18 import timeit
18 from fractions import Fraction
19 from fractions import Fraction
19
20
20 try:
21 import schainpy.admin
21 from gevent import sleep
22 except:
23 from time import sleep
24
25 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
22 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
26 from schainpy.model.data.jrodata import Voltage
23 from schainpy.model.data.jrodata import Voltage
27 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
24 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
28 from time import time
29
25
30 import pickle
26 import pickle
31 try:
27 try:
@@ -461,9 +457,9 class DigitalRFReader(ProcessingUnit):
461 return False
457 return False
462
458
463 def timeit(self, toExecute):
459 def timeit(self, toExecute):
464 t0 = time()
460 t0 = time.time()
465 toExecute()
461 toExecute()
466 self.executionTime = time() - t0
462 self.executionTime = time.time() - t0
467 if self.oldAverage is None:
463 if self.oldAverage is None:
468 self.oldAverage = self.executionTime
464 self.oldAverage = self.executionTime
469 self.oldAverage = (self.executionTime + self.count *
465 self.oldAverage = (self.executionTime + self.count *
@@ -569,24 +565,24 class DigitalRFReader(ProcessingUnit):
569 if self.__readNextBlock():
565 if self.__readNextBlock():
570 break
566 break
571 if self.__thisUnixSample > self.__endUTCSecond * self.__sample_rate:
567 if self.__thisUnixSample > self.__endUTCSecond * self.__sample_rate:
572 self.dataOut.error = 'Error'
568 raise schainpy.admin.SchainError('Error')
573 return
569 return
574
570
575 if self.__flagDiscontinuousBlock:
571 if self.__flagDiscontinuousBlock:
576 self.dataOut.error = 'discontinuous block found'
572 raise schainpy.admin.SchainError('discontinuous block found')
577 return
573 return
578
574
579 if not self.__online:
575 if not self.__online:
580 self.dataOut.error = 'Online?'
576 raise schainpy.admin.SchainError('Online?')
581 return
577 return
582
578
583 err_counter += 1
579 err_counter += 1
584 if err_counter > nTries:
580 if err_counter > nTries:
585 self.dataOut.error = 'Max retrys reach'
581 raise schainpy.admin.SchainError('Max retrys reach')
586 return
582 return
587
583
588 print('[Reading] waiting %d seconds to read a new block' % seconds)
584 print('[Reading] waiting %d seconds to read a new block' % seconds)
589 sleep(seconds)
585 time.sleep(seconds)
590
586
591 self.dataOut.data = self.__data_buffer[:,
587 self.dataOut.data = self.__data_buffer[:,
592 self.__bufferIndex:self.__bufferIndex + self.__nSamples]
588 self.__bufferIndex:self.__bufferIndex + self.__nSamples]
@@ -833,7 +833,6 class HFReader(ProcessingUnit):
833 def getData(self):
833 def getData(self):
834 if self.flagNoMoreFiles:
834 if self.flagNoMoreFiles:
835 self.dataOut.flagNoData = True
835 self.dataOut.flagNoData = True
836 print('Process finished')
837 return 0
836 return 0
838
837
839 if self.__hasNotDataInBuffer():
838 if self.__hasNotDataInBuffer():
@@ -585,7 +585,6 class AMISRReader(ProcessingUnit):
585
585
586 if self.flagNoMoreFiles:
586 if self.flagNoMoreFiles:
587 self.dataOut.flagNoData = True
587 self.dataOut.flagNoData = True
588 print('Process finished')
589 return 0
588 return 0
590
589
591 if self.__hasNotDataInBuffer():
590 if self.__hasNotDataInBuffer():
@@ -13,6 +13,8 import datetime
13
13
14 import numpy
14 import numpy
15 import h5py
15 import h5py
16
17 import schainpy.admin
16 from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
18 from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
17 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
19 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
18 from schainpy.model.data.jrodata import Parameters
20 from schainpy.model.data.jrodata import Parameters
@@ -390,7 +392,7 class MADReader(JRODataReader, ProcessingUnit):
390 '''
392 '''
391 if self.flagNoMoreFiles:
393 if self.flagNoMoreFiles:
392 self.dataOut.flagNoData = True
394 self.dataOut.flagNoData = True
393 self.dataOut.error = 'No file left to process'
395 raise schainpy.admin.SchainError('No file left to process')
394 return 0
396 return 0
395
397
396 if not self.readNextBlock():
398 if not self.readNextBlock():
@@ -5,6 +5,7 import h5py
5 import re
5 import re
6 import datetime
6 import datetime
7
7
8 import schainpy.admin
8 from schainpy.model.data.jrodata import *
9 from schainpy.model.data.jrodata import *
9 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
10 from schainpy.model.io.jroIO_base import *
11 from schainpy.model.io.jroIO_base import *
@@ -233,7 +234,7 class ParamReader(JRODataReader,ProcessingUnit):
233 idFile = self.fileIndex
234 idFile = self.fileIndex
234
235
235 if not(idFile < len(self.filenameList)):
236 if not(idFile < len(self.filenameList)):
236 self.dataOut.error = "No more Files"
237 raise schainpy.admin.SchainError("No more Files")
237 return 0
238 return 0
238
239
239 filename = self.filenameList[idFile]
240 filename = self.filenameList[idFile]
@@ -1165,8 +1166,7 class ParameterReader(JRODataReader,ProcessingUnit):
1165 idFile = self.fileIndex
1166 idFile = self.fileIndex
1166
1167
1167 if not(idFile < len(self.filenameList)):
1168 if not(idFile < len(self.filenameList)):
1168 self.dataOut.error = 'No more files'
1169 raise schainpy.admin.SchainError('No more files')
1169 return 0
1170
1170
1171 filename = self.filenameList[idFile]
1171 filename = self.filenameList[idFile]
1172 self.fp = h5py.File(filename, 'r')
1172 self.fp = h5py.File(filename, 'r')
@@ -92,7 +92,6 class SpectraReader(JRODataReader, ProcessingUnit):
92 #Eliminar de la base la herencia
92 #Eliminar de la base la herencia
93 ProcessingUnit.__init__(self)#, **kwargs)
93 ProcessingUnit.__init__(self)#, **kwargs)
94
94
95 # self.isConfig = False
96
95
97 self.pts2read_SelfSpectra = 0
96 self.pts2read_SelfSpectra = 0
98
97
@@ -160,7 +159,6 class SpectraReader(JRODataReader, ProcessingUnit):
160
159
161 self.__isFirstTimeOnline = 1
160 self.__isFirstTimeOnline = 1
162
161
163 # self.ippSeconds = 0
164
162
165 self.flagDiscontinuousBlock = 0
163 self.flagDiscontinuousBlock = 0
166
164
@@ -226,7 +224,7 class SpectraReader(JRODataReader, ProcessingUnit):
226 self.pts2read_DCchannels = int(self.systemHeaderObj.nChannels * self.processingHeaderObj.nHeights)
224 self.pts2read_DCchannels = int(self.systemHeaderObj.nChannels * self.processingHeaderObj.nHeights)
227 self.blocksize += self.pts2read_DCchannels
225 self.blocksize += self.pts2read_DCchannels
228
226
229 # self.blocksize = self.pts2read_SelfSpectra + self.pts2read_CrossSpectra + self.pts2read_DCchannels
227 # self.blocksize = self.pts2read_SelfSpectra + self.pts2read_CrossSpectra + self.pts2read_DCchannels
230
228
231
229
232 def readBlock(self):
230 def readBlock(self):
@@ -356,7 +354,6 class SpectraReader(JRODataReader, ProcessingUnit):
356
354
357 if self.flagNoMoreFiles:
355 if self.flagNoMoreFiles:
358 self.dataOut.flagNoData = True
356 self.dataOut.flagNoData = True
359 print('Process finished')
360 return 0
357 return 0
361
358
362 self.flagDiscontinuousBlock = 0
359 self.flagDiscontinuousBlock = 0
@@ -17,10 +17,10 import inspect
17 import zmq
17 import zmq
18 import time
18 import time
19 import pickle
19 import pickle
20 import traceback
20 from queue import Queue
21 from queue import Queue
21 from threading import Thread
22 from threading import Thread
22 from multiprocessing import Process
23 from multiprocessing import Process
23 from zmq.utils.monitor import recv_monitor_message
24
24
25 from schainpy.utils import log
25 from schainpy.utils import log
26
26
@@ -219,22 +219,18 def MPDecorator(BaseClass):
219 self.receiver = None
219 self.receiver = None
220 self.i = 0
220 self.i = 0
221 self.name = BaseClass.__name__
221 self.name = BaseClass.__name__
222
222 if 'plot' in self.name.lower() and not self.name.endswith('_'):
223 if 'plot' in self.name.lower() and not self.name.endswith('_'):
223 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
224 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
224 self.start_time = time.time()
225
225
226 self.start_time = time.time()
226 if len(self.args) is 3:
227 self.id = args[0]
227 self.typeProc = "ProcUnit"
228 self.inputId = args[1]
228 self.id = args[0]
229 self.project_id = args[2]
229 self.inputId = args[1]
230 self.err_queue = args[3]
230 self.project_id = args[2]
231 self.typeProc = args[4]
231 elif len(self.args) is 2:
232 self.id = args[0]
233 self.inputId = args[0]
234 self.project_id = args[1]
235 self.typeProc = "Operation"
236
237 self.queue = InputQueue(self.project_id, self.inputId)
232 self.queue = InputQueue(self.project_id, self.inputId)
233 self.err_queue.put('#_start_#')
238
234
239 def subscribe(self):
235 def subscribe(self):
240 '''
236 '''
@@ -271,7 +267,7 def MPDecorator(BaseClass):
271
267
272 if self.inputId is None:
268 if self.inputId is None:
273 self.i += 1
269 self.i += 1
274 if self.i % 100 == 0:
270 if self.i % 80 == 0:
275 self.i = 0
271 self.i = 0
276 time.sleep(0.01)
272 time.sleep(0.01)
277
273
@@ -283,7 +279,15 def MPDecorator(BaseClass):
283 '''
279 '''
284 while True:
280 while True:
285
281
286 BaseClass.run(self, **self.kwargs)
282 try:
283 BaseClass.run(self, **self.kwargs)
284 except:
285 err = traceback.format_exc()
286 if 'No more files' in err:
287 log.warning('No more files to read', self.name)
288 else:
289 self.err_queue.put('{}|{}'.format(self.name, err))
290 self.dataOut.error = True
287
291
288 for op, optype, opId, kwargs in self.operations:
292 for op, optype, opId, kwargs in self.operations:
289 if optype == 'self' and not self.dataOut.flagNoData:
293 if optype == 'self' and not self.dataOut.flagNoData:
@@ -298,12 +302,10 def MPDecorator(BaseClass):
298
302
299 self.publish(self.dataOut, self.id)
303 self.publish(self.dataOut, self.id)
300
304
301 if self.dataOut.error:
305 if self.dataOut.error:
302 log.error(self.dataOut.error, self.name)
303 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
304 break
306 break
305
307
306 time.sleep(1)
308 time.sleep(0.5)
307
309
308 def runProc(self):
310 def runProc(self):
309 '''
311 '''
@@ -315,10 +317,13 def MPDecorator(BaseClass):
315
317
316 if self.dataIn.flagNoData and self.dataIn.error is None:
318 if self.dataIn.flagNoData and self.dataIn.error is None:
317 continue
319 continue
318
320 elif not self.dataIn.error:
319 BaseClass.run(self, **self.kwargs)
321 try:
320
322 BaseClass.run(self, **self.kwargs)
321 if self.dataIn.error:
323 except:
324 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
325 self.dataOut.error = True
326 elif self.dataIn.error:
322 self.dataOut.error = self.dataIn.error
327 self.dataOut.error = self.dataIn.error
323 self.dataOut.flagNoData = True
328 self.dataOut.flagNoData = True
324
329
@@ -329,21 +334,16 def MPDecorator(BaseClass):
329 self.dataOut = op.run(self.dataOut, **kwargs)
334 self.dataOut = op.run(self.dataOut, **kwargs)
330 elif optype == 'external' and not self.dataOut.flagNoData:
335 elif optype == 'external' and not self.dataOut.flagNoData:
331 self.publish(self.dataOut, opId)
336 self.publish(self.dataOut, opId)
332
333 if not self.dataOut.flagNoData or self.dataOut.error:
334 self.publish(self.dataOut, self.id)
335 for op, optype, opId, kwargs in self.operations:
336 if optype == 'self' and self.dataOut.error:
337 op(**kwargs)
338 elif optype == 'other' and self.dataOut.error:
339 self.dataOut = op.run(self.dataOut, **kwargs)
340 elif optype == 'external' and self.dataOut.error:
341 self.publish(self.dataOut, opId)
342
337
343 if self.dataIn.error:
338 self.publish(self.dataOut, self.id)
339 for op, optype, opId, kwargs in self.operations:
340 if optype == 'external' and self.dataOut.error:
341 self.publish(self.dataOut, opId)
342
343 if self.dataOut.error:
344 break
344 break
345
345
346 time.sleep(1)
346 time.sleep(0.5)
347
347
348 def runOp(self):
348 def runOp(self):
349 '''
349 '''
@@ -355,18 +355,15 def MPDecorator(BaseClass):
355
355
356 dataOut = self.listen()
356 dataOut = self.listen()
357
357
358 BaseClass.run(self, dataOut, **self.kwargs)
358 if not dataOut.error:
359
359 BaseClass.run(self, dataOut, **self.kwargs)
360 if dataOut.error:
360 else:
361 break
361 break
362
363 time.sleep(1)
364
362
365 def run(self):
363 def run(self):
366 if self.typeProc is "ProcUnit":
364 if self.typeProc is "ProcUnit":
367
365
368 if self.inputId is not None:
366 if self.inputId is not None:
369
370 self.subscribe()
367 self.subscribe()
371
368
372 self.set_publisher()
369 self.set_publisher()
@@ -386,32 +383,10 def MPDecorator(BaseClass):
386
383
387 self.close()
384 self.close()
388
385
389 def event_monitor(self, monitor):
390
391 events = {}
392
393 for name in dir(zmq):
394 if name.startswith('EVENT_'):
395 value = getattr(zmq, name)
396 events[value] = name
397
398 while monitor.poll():
399 evt = recv_monitor_message(monitor)
400 if evt['event'] == 32:
401 self.connections += 1
402 if evt['event'] == 512:
403 pass
404
405 evt.update({'description': events[evt['event']]})
406
407 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
408 break
409 monitor.close()
410 print('event monitor thread done!')
411
412 def close(self):
386 def close(self):
413
387
414 BaseClass.close(self)
388 BaseClass.close(self)
389 self.err_queue.put('#_end_#')
415
390
416 if self.sender:
391 if self.sender:
417 self.sender.close()
392 self.sender.close()
@@ -94,7 +94,6 class ParametersProc(ProcessingUnit):
94 self.dataOut.heightList = self.dataIn.getHeiRange()
94 self.dataOut.heightList = self.dataIn.getHeiRange()
95 self.dataOut.frequency = self.dataIn.frequency
95 self.dataOut.frequency = self.dataIn.frequency
96 # self.dataOut.noise = self.dataIn.noise
96 # self.dataOut.noise = self.dataIn.noise
97 self.dataOut.error = self.dataIn.error
98
97
99 def run(self):
98 def run(self):
100
99
General Comments 0
You need to be logged in to leave comments. Login now