From c3044f867269fb99ee1efca80e1a30c2323ee911 2019-08-26 20:46:58 From: jespinoza Date: 2019-08-26 20:46:58 Subject: [PATCH] Errors handling and gracefully terminate main process --- diff --git a/schainpy/admin.py b/schainpy/admin.py index 0bb49fa..5eb45dc 100644 --- a/schainpy/admin.py +++ b/schainpy/admin.py @@ -68,7 +68,6 @@ class Alarm(Process): @staticmethod def send_email(**kwargs): notifier = SchainNotify() - print(kwargs) notifier.notify(**kwargs) @staticmethod @@ -290,7 +289,7 @@ class SchainNotify: msg = MIMEMultipart() msg['Subject'] = subject - msg['From'] = "(Python SChain API): " + email_from + msg['From'] = "SChain API (v{}) <{}>".format(schainpy.__version__, email_from) msg['Reply-to'] = email_from msg['To'] = email_to @@ -330,7 +329,7 @@ class SchainNotify: smtp.login(self.__emailFromAddress, self.__emailPass) # Send the email - try: + try: smtp.sendmail(msg['From'], msg['To'], msg.as_string()) except: log.error('Could not send the email to {}'.format(msg['To']), 'System') diff --git a/schainpy/controller.py b/schainpy/controller.py index eb01383..0a1d0d9 100644 --- a/schainpy/controller.py +++ b/schainpy/controller.py @@ -11,7 +11,7 @@ import traceback import math import time import zmq -from multiprocessing import Process, cpu_count +from multiprocessing import Process, Queue, cpu_count from threading import Thread from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring from xml.dom import minidom @@ -361,13 +361,14 @@ class OperationConf(): return kwargs - def setup(self, id, name, priority, type, project_id): + def setup(self, id, name, priority, type, project_id, err_queue): self.id = str(id) self.project_id = project_id self.name = name self.type = type self.priority = priority + self.err_queue = err_queue self.parmConfObjList = [] def removeParameters(self): @@ -459,8 +460,9 @@ class OperationConf(): opObj = className() elif self.type == 'external': kwargs = self.getKwargs() - opObj = className(self.id, self.project_id, **kwargs) + opObj = className(self.id, self.id, self.project_id, self.err_queue, 'Operation', **kwargs) opObj.start() + self.opObj = opObj return opObj @@ -548,7 +550,7 @@ class ProcUnitConf(): return self.procUnitObj - def setup(self, project_id, id, name, datatype, inputId): + def setup(self, project_id, id, name, datatype, inputId, err_queue): ''' id sera el topico a publicar inputId sera el topico a subscribirse @@ -573,7 +575,8 @@ class ProcUnitConf(): self.project_id = project_id self.name = name self.datatype = datatype - self.inputId = inputId + self.inputId = inputId + self.err_queue = err_queue self.opConfObjList = [] self.addOperation(name='run', optype='self') @@ -607,7 +610,7 @@ class ProcUnitConf(): id = self.__getNewId() priority = self.__getPriority() # Sin mucho sentido, pero puede usarse opConfObj = OperationConf() - opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id) + opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue) self.opConfObjList.append(opConfObj) return opConfObj @@ -675,7 +678,7 @@ class ProcUnitConf(): className = eval(self.name) kwargs = self.getKwargs() - procUnitObj = className(self.id, self.inputId, self.project_id, **kwargs) # necesitan saber su id y su entrada por fines de ipc + procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, 'ProcUnit', **kwargs) log.success('creating process...', self.name) for opConfObj in self.opConfObjList: @@ -687,7 +690,7 @@ class ProcUnitConf(): else: opObj = opConfObj.createObject() - log.success('creating operation: {}, type:{}'.format( + log.success('adding operation: {}, type:{}'.format( opConfObj.name, opConfObj.type), self.name) @@ -726,7 +729,7 @@ class ReadUnitConf(ProcUnitConf): return self.ELEMENTNAME - def setup(self, project_id, id, name, datatype, path='', startDate='', endDate='', + def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='', startTime='', endTime='', server=None, **kwargs): @@ -765,6 +768,7 @@ class ReadUnitConf(ProcUnitConf): self.startTime = startTime self.endTime = endTime self.server = server + self.err_queue = err_queue self.addRunOperation(**kwargs) def update(self, **kwargs): @@ -878,6 +882,7 @@ class Project(Process): self.email = None self.alarm = None self.procUnitConfObjDict = {} + self.err_queue = Queue() def __getNewId(self): @@ -935,6 +940,8 @@ class Project(Process): self.description = description self.email = email self.alarm = alarm + if name: + self.name = '{} ({})'.format(Process.__name__, name) def update(self, **kwargs): @@ -963,7 +970,7 @@ class Project(Process): idReadUnit = str(id) readUnitConfObj = ReadUnitConf() - readUnitConfObj.setup(self.id, idReadUnit, name, datatype, **kwargs) + readUnitConfObj.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs) self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj return readUnitConfObj @@ -980,9 +987,9 @@ class Project(Process): ''' - idProcUnit = self.__getNewId() #Topico para subscripcion + idProcUnit = self.__getNewId() procUnitConfObj = ProcUnitConf() - procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId) #topic_read, topic_write, + procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue) self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj return procUnitConfObj @@ -1119,10 +1126,10 @@ class Project(Process): def __str__(self): - print('Project[%s]: name = %s, description = %s, project_id = %s' % (self.id, + print('Project: name = %s, description = %s, id = %s' % ( self.name, self.description, - self.project_id)) + self.id)) for procUnitConfObj in self.procUnitConfObjDict.values(): print(procUnitConfObj) @@ -1135,58 +1142,84 @@ class Project(Process): for key in keys: self.procUnitConfObjDict[key].createObjects() - def __handleError(self, procUnitConfObj, modes=None, stdout=True): + def monitor(self): - import socket + t = Thread(target=self.__monitor, args=(self.err_queue, self.ctx)) + t.start() + + def __monitor(self, queue, ctx): - if modes is None: - modes = self.alarm + import socket - if not self.alarm: - modes = [] - - err = traceback.format_exception(sys.exc_info()[0], - sys.exc_info()[1], - sys.exc_info()[2]) + procs = 0 + err_msg = '' + + while True: + msg = queue.get() + if '#_start_#' in msg: + procs += 1 + elif '#_end_#' in msg: + procs -=1 + else: + err_msg = msg + + if procs == 0 or 'Traceback' in err_msg: + break + time.sleep(0.1) + + if '|' in err_msg: + name, err = err_msg.split('|') + if 'SchainWarning' in err: + log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name) + elif 'SchainError' in err: + log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name) + else: + log.error(err, name) + else: + name, err = self.name, err_msg + + time.sleep(2) - log.error('{}'.format(err[-1]), procUnitConfObj.name) + for conf in self.procUnitConfObjDict.values(): + for confop in conf.opConfObjList: + if confop.type == 'external': + confop.opObj.terminate() + conf.procUnitObj.terminate() + + ctx.term() message = ''.join(err) - if stdout: - sys.stderr.write(message) - - subject = 'SChain v%s: Error running %s\n' % ( - schainpy.__version__, procUnitConfObj.name) - - subtitle = '%s: %s\n' % ( - procUnitConfObj.getElementName(), procUnitConfObj.name) - subtitle += 'Hostname: %s\n' % socket.gethostbyname( - socket.gethostname()) - subtitle += 'Working directory: %s\n' % os.path.abspath('./') - subtitle += 'Configuration file: %s\n' % self.filename - subtitle += 'Time: %s\n' % str(datetime.datetime.now()) - - readUnitConfObj = self.getReadUnitObj() - if readUnitConfObj: - subtitle += '\nInput parameters:\n' - subtitle += '[Data path = %s]\n' % readUnitConfObj.path - subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype - subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate - subtitle += '[End date = %s]\n' % readUnitConfObj.endDate - subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime - subtitle += '[End time = %s]\n' % readUnitConfObj.endTime - - a = Alarm( - modes=modes, - email=self.email, - message=message, - subject=subject, - subtitle=subtitle, - filename=self.filename - ) - - return a + if err_msg: + subject = 'SChain v%s: Error running %s\n' % ( + schainpy.__version__, self.name) + + subtitle = 'Hostname: %s\n' % socket.gethostbyname( + socket.gethostname()) + subtitle += 'Working directory: %s\n' % os.path.abspath('./') + subtitle += 'Configuration file: %s\n' % self.filename + subtitle += 'Time: %s\n' % str(datetime.datetime.now()) + + readUnitConfObj = self.getReadUnitObj() + if readUnitConfObj: + subtitle += '\nInput parameters:\n' + subtitle += '[Data path = %s]\n' % readUnitConfObj.path + subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype + subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate + subtitle += '[End date = %s]\n' % readUnitConfObj.endDate + subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime + subtitle += '[End time = %s]\n' % readUnitConfObj.endTime + + a = Alarm( + modes=self.alarm, + email=self.email, + message=message, + subject=subject, + subtitle=subtitle, + filename=self.filename + ) + + a.start() def isPaused(self): return 0 @@ -1223,7 +1256,7 @@ class Project(Process): self.filename = filename - def setProxyCom(self): + def setProxy(self): if not os.path.exists('/tmp/schain'): os.mkdir('/tmp/schain') @@ -1233,24 +1266,19 @@ class Project(Process): xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id)) xsub = self.ctx.socket(zmq.XSUB) xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id)) - + self.monitor() try: zmq.proxy(xpub, xsub) - except: # zmq.ContextTerminated: + except zmq.ContextTerminated: xpub.close() xsub.close() def run(self): log.success('Starting {}: {}'.format(self.name, self.id), tag='') - self.start_time = time.time() - self.createObjects() - # t = Thread(target=wait, args=(self.ctx, )) - # t.start() - self.setProxyCom() - - # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo - - log.success('{} Done (time: {}s)'.format( + self.start_time = time.time() + self.createObjects() + self.setProxy() + log.success('{} Done (Time: {}s)'.format( self.name, - time.time()-self.start_time)) + time.time()-self.start_time), '') diff --git a/schainpy/model/io/bltrIO_param.py b/schainpy/model/io/bltrIO_param.py index 07cd39c..ab3a29c 100644 --- a/schainpy/model/io/bltrIO_param.py +++ b/schainpy/model/io/bltrIO_param.py @@ -13,6 +13,7 @@ import datetime import numpy +import schainpy.admin from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator from schainpy.model.data.jrodata import Parameters from schainpy.model.io.jroIO_base import JRODataReader, isNumber @@ -394,12 +395,11 @@ class BLTRParamReader(JRODataReader, ProcessingUnit): ''' if self.flagNoMoreFiles: self.dataOut.flagNoData = True - self.dataOut.error = 'No More files to read' - return + raise schainpy.admin.SchainError('No More files to read') if not self.readNextBlock(): self.dataOut.flagNoData = True - self.dataOut.error = 'Time for wait new file reach!!!' + raise schainpy.admin.SchainError('Time for wait new file reach!!!') self.set_output() diff --git a/schainpy/model/io/bltrIO_spectra.py b/schainpy/model/io/bltrIO_spectra.py index 27b0988..b0eccef 100644 --- a/schainpy/model/io/bltrIO_spectra.py +++ b/schainpy/model/io/bltrIO_spectra.py @@ -15,13 +15,9 @@ from scipy import asarray as ar, exp SPEED_OF_LIGHT = 299792458 SPEED_OF_LIGHT = 3e8 -try: - from gevent import sleep -except: - from time import sleep - from .utils import folder_in_range +import schainpy.admin from schainpy.model.data.jrodata import Spectra from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator from schainpy.utils import log @@ -341,7 +337,7 @@ class BLTRSpectraReader (ProcessingUnit): if self.flagNoMoreFiles: self.dataOut.flagNoData = True - self.dataOut.error = 'No more files' + raise schainpy.admin.SchainError('No more files') self.readBlock() diff --git a/schainpy/model/io/jroIO_amisr.py b/schainpy/model/io/jroIO_amisr.py index 9e5c294..69ff987 100644 --- a/schainpy/model/io/jroIO_amisr.py +++ b/schainpy/model/io/jroIO_amisr.py @@ -661,7 +661,6 @@ class AMISRReader(ProcessingUnit): if self.flagNoMoreFiles: self.dataOut.flagNoData = True - print('Process finished') return 0 if self.__hasNotDataInBuffer(): diff --git a/schainpy/model/io/jroIO_base.py b/schainpy/model/io/jroIO_base.py index 32c730f..49f5b71 100644 --- a/schainpy/model/io/jroIO_base.py +++ b/schainpy/model/io/jroIO_base.py @@ -15,11 +15,6 @@ import datetime import traceback import zmq -try: - from gevent import sleep -except: - from time import sleep - from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width from schainpy.utils import log @@ -771,7 +766,6 @@ class JRODataReader(JRODataIO): idFile += 1 if not(idFile < len(self.filenameList)): self.flagNoMoreFiles = 1 -# print "[Reading] No more Files" return 0 filename = self.filenameList[idFile] @@ -843,10 +837,14 @@ class JRODataReader(JRODataIO): for nTries in range(tries): if firstTime_flag: - print("\t[Reading] Waiting %0.2f sec for the next file: \"%s\" , try %03d ..." % (self.delay, filename, nTries + 1)) - sleep(self.delay) + log.warning( + "Waiting %0.2f sec for the next file: \"%s\" , try %03d ..." % (self.delay, filename, nTries + 1), + self.name) + time.sleep(self.delay) else: - print("\t[Reading] Searching the next \"%s%04d%03d%03d%s\" file ..." % (self.optchar, self.year, self.doy, self.set, self.ext)) + log.warning( + "Searching the next \"%s%04d%03d%03d%s\" file ..." % (self.optchar, self.year, self.doy, self.set, self.ext), + self.name) fullfilename, filename = checkForRealPath( self.path, self.foldercounter, self.year, self.doy, self.set, self.ext) @@ -860,7 +858,9 @@ class JRODataReader(JRODataIO): firstTime_flag = False - log.warning('Skipping the file {} due to this file doesn\'t exist'.format(filename)) + log.warning( + 'Skipping the file {} due to this file doesn\'t exist'.format(filename), + self.name) self.set += 1 # si no encuentro el file buscado cambio de carpeta y busco en la siguiente carpeta @@ -877,14 +877,13 @@ class JRODataReader(JRODataIO): self.fp.close() self.fp = open(fullfilename, 'rb') self.flagNoMoreFiles = 0 -# print '[Reading] Setting the file: %s' % fullfilename else: + raise schainpy.admin.SchainError('Time for waiting new files reach') self.fileSize = 0 self.filename = None self.flagIsNewFile = 0 self.fp = None self.flagNoMoreFiles = 1 -# print '[Reading] No more files to read' return fileOk_flag @@ -898,8 +897,8 @@ class JRODataReader(JRODataIO): newFile = self.__setNextFileOffline() if not(newFile): - self.dataOut.error = 'No more files to read' - return 0 + raise schainpy.admin.SchainWarning('No more files to read') + if self.verbose: print('[Reading] Setting the file: %s' % self.filename) @@ -942,7 +941,7 @@ class JRODataReader(JRODataIO): return 0 print("[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1)) - sleep(self.delay) + time.sleep(self.delay) return 0 @@ -969,7 +968,7 @@ class JRODataReader(JRODataIO): "Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1), self.name ) - sleep(self.delay) + time.sleep(self.delay) return 0 @@ -1013,7 +1012,7 @@ class JRODataReader(JRODataIO): # if self.online: # self.__jumpToLastBlock() - + if self.flagIsNewFile: self.lastUTTime = self.basicHeaderObj.utc return 1 @@ -1057,8 +1056,7 @@ class JRODataReader(JRODataIO): # Skip block out of startTime and endTime while True: if not(self.__setNewBlock()): - self.dataOut.error = 'No more files to read' - return 0 + raise schainpy.admin.SchainWarning('No more files to read') if not(self.readBlock()): return 0 @@ -1260,10 +1258,10 @@ class JRODataReader(JRODataIO): pattern_path = multi_path[0] if path_empty: - print("[Reading] No *%s files in %s for %s to %s" % (ext, pattern_path, startDate, endDate)) + raise schainpy.admin.SchainError("[Reading] No *%s files in %s for %s to %s" % (ext, pattern_path, startDate, endDate)) else: if not dateList: - print("[Reading] Date range selected invalid [%s - %s]: No *%s files in %s)" % (startDate, endDate, ext, path)) + raise schainpy.admin.SchainError("[Reading] Date range selected invalid [%s - %s]: No *%s files in %s)" % (startDate, endDate, ext, path)) if include_path: return dateList, pathList @@ -1296,6 +1294,17 @@ class JRODataReader(JRODataIO): oneDDict=None, twoDDict=None, independentParam=None): + + self.online = online + self.realtime = realtime + self.delay = delay + self.getByBlock = getblock + self.nTxs = nTxs + self.startTime = startTime + self.endTime = endTime + self.endDate = endDate + self.startDate = startDate + if server is not None: if 'tcp://' in server: address = server @@ -1326,10 +1335,10 @@ class JRODataReader(JRODataIO): break print('[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (self.delay, path, nTries + 1)) - sleep(self.delay) + time.sleep(self.delay) if not(fullpath): - self.dataOut.error = 'There isn\'t any valid file in {}'.format(path) + raise schainpy.admin.SchainError('There isn\'t any valid file in {}'.format(path)) return self.year = year @@ -1359,17 +1368,10 @@ class JRODataReader(JRODataIO): basename, ext = os.path.splitext(file_name) last_set = int(basename[-3:]) - self.online = online - self.realtime = realtime - self.delay = delay + ext = ext.lower() self.ext = ext - self.getByBlock = getblock - self.nTxs = nTxs - self.startTime = startTime - self.endTime = endTime - self.endDate = endDate - self.startDate = startDate + # Added----------------- self.selBlocksize = blocksize self.selBlocktime = blocktime @@ -1391,8 +1393,6 @@ class JRODataReader(JRODataIO): self.filenameList = [] return - # self.getBasicHeader() - if last_set != None: self.dataOut.last_block = last_set * \ self.processingHeaderObj.dataBlocksPerFile + self.basicHeaderObj.dataBlock diff --git a/schainpy/model/io/jroIO_digitalRF.py b/schainpy/model/io/jroIO_digitalRF.py index 6bfcd7e..4f79c92 100644 --- a/schainpy/model/io/jroIO_digitalRF.py +++ b/schainpy/model/io/jroIO_digitalRF.py @@ -12,20 +12,16 @@ Created on Jul 3, 2014 # METADATA import os +import time import datetime import numpy import timeit from fractions import Fraction -try: - from gevent import sleep -except: - from time import sleep - +import schainpy.admin from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader from schainpy.model.data.jrodata import Voltage from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator -from time import time import pickle try: @@ -461,9 +457,9 @@ class DigitalRFReader(ProcessingUnit): return False def timeit(self, toExecute): - t0 = time() + t0 = time.time() toExecute() - self.executionTime = time() - t0 + self.executionTime = time.time() - t0 if self.oldAverage is None: self.oldAverage = self.executionTime self.oldAverage = (self.executionTime + self.count * @@ -569,24 +565,24 @@ class DigitalRFReader(ProcessingUnit): if self.__readNextBlock(): break if self.__thisUnixSample > self.__endUTCSecond * self.__sample_rate: - self.dataOut.error = 'Error' + raise schainpy.admin.SchainError('Error') return if self.__flagDiscontinuousBlock: - self.dataOut.error = 'discontinuous block found' + raise schainpy.admin.SchainError('discontinuous block found') return if not self.__online: - self.dataOut.error = 'Online?' + raise schainpy.admin.SchainError('Online?') return err_counter += 1 if err_counter > nTries: - self.dataOut.error = 'Max retrys reach' + raise schainpy.admin.SchainError('Max retrys reach') return print('[Reading] waiting %d seconds to read a new block' % seconds) - sleep(seconds) + time.sleep(seconds) self.dataOut.data = self.__data_buffer[:, self.__bufferIndex:self.__bufferIndex + self.__nSamples] diff --git a/schainpy/model/io/jroIO_hf.py b/schainpy/model/io/jroIO_hf.py index e262a44..2a89ab1 100644 --- a/schainpy/model/io/jroIO_hf.py +++ b/schainpy/model/io/jroIO_hf.py @@ -833,7 +833,6 @@ class HFReader(ProcessingUnit): def getData(self): if self.flagNoMoreFiles: self.dataOut.flagNoData = True - print('Process finished') return 0 if self.__hasNotDataInBuffer(): diff --git a/schainpy/model/io/jroIO_kamisr.py b/schainpy/model/io/jroIO_kamisr.py index bd27fb6..61a662f 100644 --- a/schainpy/model/io/jroIO_kamisr.py +++ b/schainpy/model/io/jroIO_kamisr.py @@ -585,7 +585,6 @@ class AMISRReader(ProcessingUnit): if self.flagNoMoreFiles: self.dataOut.flagNoData = True - print('Process finished') return 0 if self.__hasNotDataInBuffer(): diff --git a/schainpy/model/io/jroIO_madrigal.py b/schainpy/model/io/jroIO_madrigal.py index 8e5bac2..3a3cd25 100644 --- a/schainpy/model/io/jroIO_madrigal.py +++ b/schainpy/model/io/jroIO_madrigal.py @@ -13,6 +13,8 @@ import datetime import numpy import h5py + +import schainpy.admin from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator from schainpy.model.data.jrodata import Parameters @@ -390,7 +392,7 @@ class MADReader(JRODataReader, ProcessingUnit): ''' if self.flagNoMoreFiles: self.dataOut.flagNoData = True - self.dataOut.error = 'No file left to process' + raise schainpy.admin.SchainError('No file left to process') return 0 if not self.readNextBlock(): diff --git a/schainpy/model/io/jroIO_param.py b/schainpy/model/io/jroIO_param.py index efbd32b..631913b 100644 --- a/schainpy/model/io/jroIO_param.py +++ b/schainpy/model/io/jroIO_param.py @@ -5,6 +5,7 @@ import h5py import re import datetime +import schainpy.admin from schainpy.model.data.jrodata import * from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator from schainpy.model.io.jroIO_base import * @@ -233,7 +234,7 @@ class ParamReader(JRODataReader,ProcessingUnit): idFile = self.fileIndex if not(idFile < len(self.filenameList)): - self.dataOut.error = "No more Files" + raise schainpy.admin.SchainError("No more Files") return 0 filename = self.filenameList[idFile] @@ -1165,8 +1166,7 @@ class ParameterReader(JRODataReader,ProcessingUnit): idFile = self.fileIndex if not(idFile < len(self.filenameList)): - self.dataOut.error = 'No more files' - return 0 + raise schainpy.admin.SchainError('No more files') filename = self.filenameList[idFile] self.fp = h5py.File(filename, 'r') diff --git a/schainpy/model/io/jroIO_spectra.py b/schainpy/model/io/jroIO_spectra.py index 884c85a..60100a6 100644 --- a/schainpy/model/io/jroIO_spectra.py +++ b/schainpy/model/io/jroIO_spectra.py @@ -92,7 +92,6 @@ class SpectraReader(JRODataReader, ProcessingUnit): #Eliminar de la base la herencia ProcessingUnit.__init__(self)#, **kwargs) -# self.isConfig = False self.pts2read_SelfSpectra = 0 @@ -160,7 +159,6 @@ class SpectraReader(JRODataReader, ProcessingUnit): self.__isFirstTimeOnline = 1 -# self.ippSeconds = 0 self.flagDiscontinuousBlock = 0 @@ -226,7 +224,7 @@ class SpectraReader(JRODataReader, ProcessingUnit): self.pts2read_DCchannels = int(self.systemHeaderObj.nChannels * self.processingHeaderObj.nHeights) self.blocksize += self.pts2read_DCchannels -# self.blocksize = self.pts2read_SelfSpectra + self.pts2read_CrossSpectra + self.pts2read_DCchannels + # self.blocksize = self.pts2read_SelfSpectra + self.pts2read_CrossSpectra + self.pts2read_DCchannels def readBlock(self): @@ -356,7 +354,6 @@ class SpectraReader(JRODataReader, ProcessingUnit): if self.flagNoMoreFiles: self.dataOut.flagNoData = True - print('Process finished') return 0 self.flagDiscontinuousBlock = 0 diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index f82e1e3..788379e 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -17,10 +17,10 @@ import inspect import zmq import time import pickle +import traceback from queue import Queue from threading import Thread from multiprocessing import Process -from zmq.utils.monitor import recv_monitor_message from schainpy.utils import log @@ -219,22 +219,18 @@ def MPDecorator(BaseClass): self.receiver = None self.i = 0 self.name = BaseClass.__name__ + if 'plot' in self.name.lower() and not self.name.endswith('_'): self.name = '{}{}'.format(self.CODE.upper(), 'Plot') - self.start_time = time.time() - - if len(self.args) is 3: - self.typeProc = "ProcUnit" - self.id = args[0] - self.inputId = args[1] - self.project_id = args[2] - elif len(self.args) is 2: - self.id = args[0] - self.inputId = args[0] - self.project_id = args[1] - self.typeProc = "Operation" - + + self.start_time = time.time() + self.id = args[0] + self.inputId = args[1] + self.project_id = args[2] + self.err_queue = args[3] + self.typeProc = args[4] self.queue = InputQueue(self.project_id, self.inputId) + self.err_queue.put('#_start_#') def subscribe(self): ''' @@ -271,7 +267,7 @@ def MPDecorator(BaseClass): if self.inputId is None: self.i += 1 - if self.i % 100 == 0: + if self.i % 80 == 0: self.i = 0 time.sleep(0.01) @@ -283,7 +279,15 @@ def MPDecorator(BaseClass): ''' while True: - BaseClass.run(self, **self.kwargs) + try: + BaseClass.run(self, **self.kwargs) + except: + err = traceback.format_exc() + if 'No more files' in err: + log.warning('No more files to read', self.name) + else: + self.err_queue.put('{}|{}'.format(self.name, err)) + self.dataOut.error = True for op, optype, opId, kwargs in self.operations: if optype == 'self' and not self.dataOut.flagNoData: @@ -298,12 +302,10 @@ def MPDecorator(BaseClass): self.publish(self.dataOut, self.id) - if self.dataOut.error: - log.error(self.dataOut.error, self.name) - # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()]) + if self.dataOut.error: break - time.sleep(1) + time.sleep(0.5) def runProc(self): ''' @@ -315,10 +317,13 @@ def MPDecorator(BaseClass): if self.dataIn.flagNoData and self.dataIn.error is None: continue - - BaseClass.run(self, **self.kwargs) - - if self.dataIn.error: + elif not self.dataIn.error: + try: + BaseClass.run(self, **self.kwargs) + except: + self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc())) + self.dataOut.error = True + elif self.dataIn.error: self.dataOut.error = self.dataIn.error self.dataOut.flagNoData = True @@ -329,21 +334,16 @@ def MPDecorator(BaseClass): self.dataOut = op.run(self.dataOut, **kwargs) elif optype == 'external' and not self.dataOut.flagNoData: self.publish(self.dataOut, opId) - - if not self.dataOut.flagNoData or self.dataOut.error: - self.publish(self.dataOut, self.id) - for op, optype, opId, kwargs in self.operations: - if optype == 'self' and self.dataOut.error: - op(**kwargs) - elif optype == 'other' and self.dataOut.error: - self.dataOut = op.run(self.dataOut, **kwargs) - elif optype == 'external' and self.dataOut.error: - self.publish(self.dataOut, opId) - if self.dataIn.error: + self.publish(self.dataOut, self.id) + for op, optype, opId, kwargs in self.operations: + if optype == 'external' and self.dataOut.error: + self.publish(self.dataOut, opId) + + if self.dataOut.error: break - time.sleep(1) + time.sleep(0.5) def runOp(self): ''' @@ -355,18 +355,15 @@ def MPDecorator(BaseClass): dataOut = self.listen() - BaseClass.run(self, dataOut, **self.kwargs) - - if dataOut.error: - break - - time.sleep(1) + if not dataOut.error: + BaseClass.run(self, dataOut, **self.kwargs) + else: + break def run(self): if self.typeProc is "ProcUnit": if self.inputId is not None: - self.subscribe() self.set_publisher() @@ -386,32 +383,10 @@ def MPDecorator(BaseClass): self.close() - def event_monitor(self, monitor): - - events = {} - - for name in dir(zmq): - if name.startswith('EVENT_'): - value = getattr(zmq, name) - events[value] = name - - while monitor.poll(): - evt = recv_monitor_message(monitor) - if evt['event'] == 32: - self.connections += 1 - if evt['event'] == 512: - pass - - evt.update({'description': events[evt['event']]}) - - if evt['event'] == zmq.EVENT_MONITOR_STOPPED: - break - monitor.close() - print('event monitor thread done!') - def close(self): BaseClass.close(self) + self.err_queue.put('#_end_#') if self.sender: self.sender.close() diff --git a/schainpy/model/proc/jroproc_parameters.py b/schainpy/model/proc/jroproc_parameters.py index 37f4970..3a209b0 100755 --- a/schainpy/model/proc/jroproc_parameters.py +++ b/schainpy/model/proc/jroproc_parameters.py @@ -94,7 +94,6 @@ class ParametersProc(ProcessingUnit): self.dataOut.heightList = self.dataIn.getHeiRange() self.dataOut.frequency = self.dataIn.frequency # self.dataOut.noise = self.dataIn.noise - self.dataOut.error = self.dataIn.error def run(self):