##// END OF EJS Templates
Errors handling and gracefully terminate main process
jespinoza -
r1241:c3044f867269
parent child
Show More
@@ -68,7 +68,6 class Alarm(Process):
68 68 @staticmethod
69 69 def send_email(**kwargs):
70 70 notifier = SchainNotify()
71 print(kwargs)
72 71 notifier.notify(**kwargs)
73 72
74 73 @staticmethod
@@ -290,7 +289,7 class SchainNotify:
290 289
291 290 msg = MIMEMultipart()
292 291 msg['Subject'] = subject
293 msg['From'] = "(Python SChain API): " + email_from
292 msg['From'] = "SChain API (v{}) <{}>".format(schainpy.__version__, email_from)
294 293 msg['Reply-to'] = email_from
295 294 msg['To'] = email_to
296 295
@@ -11,7 +11,7 import traceback
11 11 import math
12 12 import time
13 13 import zmq
14 from multiprocessing import Process, cpu_count
14 from multiprocessing import Process, Queue, cpu_count
15 15 from threading import Thread
16 16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 17 from xml.dom import minidom
@@ -361,13 +361,14 class OperationConf():
361 361
362 362 return kwargs
363 363
364 def setup(self, id, name, priority, type, project_id):
364 def setup(self, id, name, priority, type, project_id, err_queue):
365 365
366 366 self.id = str(id)
367 367 self.project_id = project_id
368 368 self.name = name
369 369 self.type = type
370 370 self.priority = priority
371 self.err_queue = err_queue
371 372 self.parmConfObjList = []
372 373
373 374 def removeParameters(self):
@@ -459,8 +460,9 class OperationConf():
459 460 opObj = className()
460 461 elif self.type == 'external':
461 462 kwargs = self.getKwargs()
462 opObj = className(self.id, self.project_id, **kwargs)
463 opObj = className(self.id, self.id, self.project_id, self.err_queue, 'Operation', **kwargs)
463 464 opObj.start()
465 self.opObj = opObj
464 466
465 467 return opObj
466 468
@@ -548,7 +550,7 class ProcUnitConf():
548 550
549 551 return self.procUnitObj
550 552
551 def setup(self, project_id, id, name, datatype, inputId):
553 def setup(self, project_id, id, name, datatype, inputId, err_queue):
552 554 '''
553 555 id sera el topico a publicar
554 556 inputId sera el topico a subscribirse
@@ -574,6 +576,7 class ProcUnitConf():
574 576 self.name = name
575 577 self.datatype = datatype
576 578 self.inputId = inputId
579 self.err_queue = err_queue
577 580 self.opConfObjList = []
578 581
579 582 self.addOperation(name='run', optype='self')
@@ -607,7 +610,7 class ProcUnitConf():
607 610 id = self.__getNewId()
608 611 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
609 612 opConfObj = OperationConf()
610 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id)
613 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue)
611 614 self.opConfObjList.append(opConfObj)
612 615
613 616 return opConfObj
@@ -675,7 +678,7 class ProcUnitConf():
675 678
676 679 className = eval(self.name)
677 680 kwargs = self.getKwargs()
678 procUnitObj = className(self.id, self.inputId, self.project_id, **kwargs) # necesitan saber su id y su entrada por fines de ipc
681 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, 'ProcUnit', **kwargs)
679 682 log.success('creating process...', self.name)
680 683
681 684 for opConfObj in self.opConfObjList:
@@ -687,7 +690,7 class ProcUnitConf():
687 690 else:
688 691 opObj = opConfObj.createObject()
689 692
690 log.success('creating operation: {}, type:{}'.format(
693 log.success('adding operation: {}, type:{}'.format(
691 694 opConfObj.name,
692 695 opConfObj.type), self.name)
693 696
@@ -726,7 +729,7 class ReadUnitConf(ProcUnitConf):
726 729
727 730 return self.ELEMENTNAME
728 731
729 def setup(self, project_id, id, name, datatype, path='', startDate='', endDate='',
732 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
730 733 startTime='', endTime='', server=None, **kwargs):
731 734
732 735
@@ -765,6 +768,7 class ReadUnitConf(ProcUnitConf):
765 768 self.startTime = startTime
766 769 self.endTime = endTime
767 770 self.server = server
771 self.err_queue = err_queue
768 772 self.addRunOperation(**kwargs)
769 773
770 774 def update(self, **kwargs):
@@ -878,6 +882,7 class Project(Process):
878 882 self.email = None
879 883 self.alarm = None
880 884 self.procUnitConfObjDict = {}
885 self.err_queue = Queue()
881 886
882 887 def __getNewId(self):
883 888
@@ -935,6 +940,8 class Project(Process):
935 940 self.description = description
936 941 self.email = email
937 942 self.alarm = alarm
943 if name:
944 self.name = '{} ({})'.format(Process.__name__, name)
938 945
939 946 def update(self, **kwargs):
940 947
@@ -963,7 +970,7 class Project(Process):
963 970 idReadUnit = str(id)
964 971
965 972 readUnitConfObj = ReadUnitConf()
966 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, **kwargs)
973 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
967 974 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
968 975
969 976 return readUnitConfObj
@@ -980,9 +987,9 class Project(Process):
980 987
981 988 '''
982 989
983 idProcUnit = self.__getNewId() #Topico para subscripcion
990 idProcUnit = self.__getNewId()
984 991 procUnitConfObj = ProcUnitConf()
985 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId) #topic_read, topic_write,
992 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
986 993 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
987 994
988 995 return procUnitConfObj
@@ -1119,10 +1126,10 class Project(Process):
1119 1126
1120 1127 def __str__(self):
1121 1128
1122 print('Project[%s]: name = %s, description = %s, project_id = %s' % (self.id,
1129 print('Project: name = %s, description = %s, id = %s' % (
1123 1130 self.name,
1124 1131 self.description,
1125 self.project_id))
1132 self.id))
1126 1133
1127 1134 for procUnitConfObj in self.procUnitConfObjDict.values():
1128 1135 print(procUnitConfObj)
@@ -1135,33 +1142,59 class Project(Process):
1135 1142 for key in keys:
1136 1143 self.procUnitConfObjDict[key].createObjects()
1137 1144
1138 def __handleError(self, procUnitConfObj, modes=None, stdout=True):
1145 def monitor(self):
1146
1147 t = Thread(target=self.__monitor, args=(self.err_queue, self.ctx))
1148 t.start()
1149
1150 def __monitor(self, queue, ctx):
1139 1151
1140 1152 import socket
1141 1153
1142 if modes is None:
1143 modes = self.alarm
1154 procs = 0
1155 err_msg = ''
1156
1157 while True:
1158 msg = queue.get()
1159 if '#_start_#' in msg:
1160 procs += 1
1161 elif '#_end_#' in msg:
1162 procs -=1
1163 else:
1164 err_msg = msg
1165
1166 if procs == 0 or 'Traceback' in err_msg:
1167 break
1168 time.sleep(0.1)
1169
1170 if '|' in err_msg:
1171 name, err = err_msg.split('|')
1172 if 'SchainWarning' in err:
1173 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
1174 elif 'SchainError' in err:
1175 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
1176 else:
1177 log.error(err, name)
1178 else:
1179 name, err = self.name, err_msg
1144 1180
1145 if not self.alarm:
1146 modes = []
1181 time.sleep(2)
1147 1182
1148 err = traceback.format_exception(sys.exc_info()[0],
1149 sys.exc_info()[1],
1150 sys.exc_info()[2])
1183 for conf in self.procUnitConfObjDict.values():
1184 for confop in conf.opConfObjList:
1185 if confop.type == 'external':
1186 confop.opObj.terminate()
1187 conf.procUnitObj.terminate()
1151 1188
1152 log.error('{}'.format(err[-1]), procUnitConfObj.name)
1189 ctx.term()
1153 1190
1154 1191 message = ''.join(err)
1155 1192
1156 if stdout:
1157 sys.stderr.write(message)
1158
1193 if err_msg:
1159 1194 subject = 'SChain v%s: Error running %s\n' % (
1160 schainpy.__version__, procUnitConfObj.name)
1195 schainpy.__version__, self.name)
1161 1196
1162 subtitle = '%s: %s\n' % (
1163 procUnitConfObj.getElementName(), procUnitConfObj.name)
1164 subtitle += 'Hostname: %s\n' % socket.gethostbyname(
1197 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
1165 1198 socket.gethostname())
1166 1199 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
1167 1200 subtitle += 'Configuration file: %s\n' % self.filename
@@ -1178,7 +1211,7 class Project(Process):
1178 1211 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1179 1212
1180 1213 a = Alarm(
1181 modes=modes,
1214 modes=self.alarm,
1182 1215 email=self.email,
1183 1216 message=message,
1184 1217 subject=subject,
@@ -1186,7 +1219,7 class Project(Process):
1186 1219 filename=self.filename
1187 1220 )
1188 1221
1189 return a
1222 a.start()
1190 1223
1191 1224 def isPaused(self):
1192 1225 return 0
@@ -1223,7 +1256,7 class Project(Process):
1223 1256
1224 1257 self.filename = filename
1225 1258
1226 def setProxyCom(self):
1259 def setProxy(self):
1227 1260
1228 1261 if not os.path.exists('/tmp/schain'):
1229 1262 os.mkdir('/tmp/schain')
@@ -1233,10 +1266,10 class Project(Process):
1233 1266 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1234 1267 xsub = self.ctx.socket(zmq.XSUB)
1235 1268 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1236
1269 self.monitor()
1237 1270 try:
1238 1271 zmq.proxy(xpub, xsub)
1239 except: # zmq.ContextTerminated:
1272 except zmq.ContextTerminated:
1240 1273 xpub.close()
1241 1274 xsub.close()
1242 1275
@@ -1245,12 +1278,7 class Project(Process):
1245 1278 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1246 1279 self.start_time = time.time()
1247 1280 self.createObjects()
1248 # t = Thread(target=wait, args=(self.ctx, ))
1249 # t.start()
1250 self.setProxyCom()
1251
1252 # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo
1253
1254 log.success('{} Done (time: {}s)'.format(
1281 self.setProxy()
1282 log.success('{} Done (Time: {}s)'.format(
1255 1283 self.name,
1256 time.time()-self.start_time))
1284 time.time()-self.start_time), '')
@@ -13,6 +13,7 import datetime
13 13
14 14 import numpy
15 15
16 import schainpy.admin
16 17 from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator
17 18 from schainpy.model.data.jrodata import Parameters
18 19 from schainpy.model.io.jroIO_base import JRODataReader, isNumber
@@ -394,12 +395,11 class BLTRParamReader(JRODataReader, ProcessingUnit):
394 395 '''
395 396 if self.flagNoMoreFiles:
396 397 self.dataOut.flagNoData = True
397 self.dataOut.error = 'No More files to read'
398 return
398 raise schainpy.admin.SchainError('No More files to read')
399 399
400 400 if not self.readNextBlock():
401 401 self.dataOut.flagNoData = True
402 self.dataOut.error = 'Time for wait new file reach!!!'
402 raise schainpy.admin.SchainError('Time for wait new file reach!!!')
403 403
404 404 self.set_output()
405 405
@@ -15,13 +15,9 from scipy import asarray as ar, exp
15 15 SPEED_OF_LIGHT = 299792458
16 16 SPEED_OF_LIGHT = 3e8
17 17
18 try:
19 from gevent import sleep
20 except:
21 from time import sleep
22
23 18 from .utils import folder_in_range
24 19
20 import schainpy.admin
25 21 from schainpy.model.data.jrodata import Spectra
26 22 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
27 23 from schainpy.utils import log
@@ -341,7 +337,7 class BLTRSpectraReader (ProcessingUnit):
341 337
342 338 if self.flagNoMoreFiles:
343 339 self.dataOut.flagNoData = True
344 self.dataOut.error = 'No more files'
340 raise schainpy.admin.SchainError('No more files')
345 341
346 342 self.readBlock()
347 343
@@ -661,7 +661,6 class AMISRReader(ProcessingUnit):
661 661
662 662 if self.flagNoMoreFiles:
663 663 self.dataOut.flagNoData = True
664 print('Process finished')
665 664 return 0
666 665
667 666 if self.__hasNotDataInBuffer():
@@ -15,11 +15,6 import datetime
15 15 import traceback
16 16 import zmq
17 17
18 try:
19 from gevent import sleep
20 except:
21 from time import sleep
22
23 18 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
24 19 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
25 20 from schainpy.utils import log
@@ -771,7 +766,6 class JRODataReader(JRODataIO):
771 766 idFile += 1
772 767 if not(idFile < len(self.filenameList)):
773 768 self.flagNoMoreFiles = 1
774 # print "[Reading] No more Files"
775 769 return 0
776 770
777 771 filename = self.filenameList[idFile]
@@ -843,10 +837,14 class JRODataReader(JRODataIO):
843 837
844 838 for nTries in range(tries):
845 839 if firstTime_flag:
846 print("\t[Reading] Waiting %0.2f sec for the next file: \"%s\" , try %03d ..." % (self.delay, filename, nTries + 1))
847 sleep(self.delay)
840 log.warning(
841 "Waiting %0.2f sec for the next file: \"%s\" , try %03d ..." % (self.delay, filename, nTries + 1),
842 self.name)
843 time.sleep(self.delay)
848 844 else:
849 print("\t[Reading] Searching the next \"%s%04d%03d%03d%s\" file ..." % (self.optchar, self.year, self.doy, self.set, self.ext))
845 log.warning(
846 "Searching the next \"%s%04d%03d%03d%s\" file ..." % (self.optchar, self.year, self.doy, self.set, self.ext),
847 self.name)
850 848
851 849 fullfilename, filename = checkForRealPath(
852 850 self.path, self.foldercounter, self.year, self.doy, self.set, self.ext)
@@ -860,7 +858,9 class JRODataReader(JRODataIO):
860 858
861 859 firstTime_flag = False
862 860
863 log.warning('Skipping the file {} due to this file doesn\'t exist'.format(filename))
861 log.warning(
862 'Skipping the file {} due to this file doesn\'t exist'.format(filename),
863 self.name)
864 864 self.set += 1
865 865
866 866 # si no encuentro el file buscado cambio de carpeta y busco en la siguiente carpeta
@@ -877,14 +877,13 class JRODataReader(JRODataIO):
877 877 self.fp.close()
878 878 self.fp = open(fullfilename, 'rb')
879 879 self.flagNoMoreFiles = 0
880 # print '[Reading] Setting the file: %s' % fullfilename
881 880 else:
881 raise schainpy.admin.SchainError('Time for waiting new files reach')
882 882 self.fileSize = 0
883 883 self.filename = None
884 884 self.flagIsNewFile = 0
885 885 self.fp = None
886 886 self.flagNoMoreFiles = 1
887 # print '[Reading] No more files to read'
888 887
889 888 return fileOk_flag
890 889
@@ -898,8 +897,8 class JRODataReader(JRODataIO):
898 897 newFile = self.__setNextFileOffline()
899 898
900 899 if not(newFile):
901 self.dataOut.error = 'No more files to read'
902 return 0
900 raise schainpy.admin.SchainWarning('No more files to read')
901
903 902
904 903 if self.verbose:
905 904 print('[Reading] Setting the file: %s' % self.filename)
@@ -942,7 +941,7 class JRODataReader(JRODataIO):
942 941 return 0
943 942
944 943 print("[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1))
945 sleep(self.delay)
944 time.sleep(self.delay)
946 945
947 946 return 0
948 947
@@ -969,7 +968,7 class JRODataReader(JRODataIO):
969 968 "Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1),
970 969 self.name
971 970 )
972 sleep(self.delay)
971 time.sleep(self.delay)
973 972
974 973 return 0
975 974
@@ -1057,8 +1056,7 class JRODataReader(JRODataIO):
1057 1056 # Skip block out of startTime and endTime
1058 1057 while True:
1059 1058 if not(self.__setNewBlock()):
1060 self.dataOut.error = 'No more files to read'
1061 return 0
1059 raise schainpy.admin.SchainWarning('No more files to read')
1062 1060
1063 1061 if not(self.readBlock()):
1064 1062 return 0
@@ -1260,10 +1258,10 class JRODataReader(JRODataIO):
1260 1258 pattern_path = multi_path[0]
1261 1259
1262 1260 if path_empty:
1263 print("[Reading] No *%s files in %s for %s to %s" % (ext, pattern_path, startDate, endDate))
1261 raise schainpy.admin.SchainError("[Reading] No *%s files in %s for %s to %s" % (ext, pattern_path, startDate, endDate))
1264 1262 else:
1265 1263 if not dateList:
1266 print("[Reading] Date range selected invalid [%s - %s]: No *%s files in %s)" % (startDate, endDate, ext, path))
1264 raise schainpy.admin.SchainError("[Reading] Date range selected invalid [%s - %s]: No *%s files in %s)" % (startDate, endDate, ext, path))
1267 1265
1268 1266 if include_path:
1269 1267 return dateList, pathList
@@ -1296,6 +1294,17 class JRODataReader(JRODataIO):
1296 1294 oneDDict=None,
1297 1295 twoDDict=None,
1298 1296 independentParam=None):
1297
1298 self.online = online
1299 self.realtime = realtime
1300 self.delay = delay
1301 self.getByBlock = getblock
1302 self.nTxs = nTxs
1303 self.startTime = startTime
1304 self.endTime = endTime
1305 self.endDate = endDate
1306 self.startDate = startDate
1307
1299 1308 if server is not None:
1300 1309 if 'tcp://' in server:
1301 1310 address = server
@@ -1326,10 +1335,10 class JRODataReader(JRODataIO):
1326 1335 break
1327 1336
1328 1337 print('[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (self.delay, path, nTries + 1))
1329 sleep(self.delay)
1338 time.sleep(self.delay)
1330 1339
1331 1340 if not(fullpath):
1332 self.dataOut.error = 'There isn\'t any valid file in {}'.format(path)
1341 raise schainpy.admin.SchainError('There isn\'t any valid file in {}'.format(path))
1333 1342 return
1334 1343
1335 1344 self.year = year
@@ -1359,17 +1368,10 class JRODataReader(JRODataIO):
1359 1368 basename, ext = os.path.splitext(file_name)
1360 1369 last_set = int(basename[-3:])
1361 1370
1362 self.online = online
1363 self.realtime = realtime
1364 self.delay = delay
1371
1365 1372 ext = ext.lower()
1366 1373 self.ext = ext
1367 self.getByBlock = getblock
1368 self.nTxs = nTxs
1369 self.startTime = startTime
1370 self.endTime = endTime
1371 self.endDate = endDate
1372 self.startDate = startDate
1374
1373 1375 # Added-----------------
1374 1376 self.selBlocksize = blocksize
1375 1377 self.selBlocktime = blocktime
@@ -1391,8 +1393,6 class JRODataReader(JRODataIO):
1391 1393 self.filenameList = []
1392 1394 return
1393 1395
1394 # self.getBasicHeader()
1395
1396 1396 if last_set != None:
1397 1397 self.dataOut.last_block = last_set * \
1398 1398 self.processingHeaderObj.dataBlocksPerFile + self.basicHeaderObj.dataBlock
@@ -12,20 +12,16 Created on Jul 3, 2014
12 12 # METADATA
13 13
14 14 import os
15 import time
15 16 import datetime
16 17 import numpy
17 18 import timeit
18 19 from fractions import Fraction
19 20
20 try:
21 from gevent import sleep
22 except:
23 from time import sleep
24
21 import schainpy.admin
25 22 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
26 23 from schainpy.model.data.jrodata import Voltage
27 24 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
28 from time import time
29 25
30 26 import pickle
31 27 try:
@@ -461,9 +457,9 class DigitalRFReader(ProcessingUnit):
461 457 return False
462 458
463 459 def timeit(self, toExecute):
464 t0 = time()
460 t0 = time.time()
465 461 toExecute()
466 self.executionTime = time() - t0
462 self.executionTime = time.time() - t0
467 463 if self.oldAverage is None:
468 464 self.oldAverage = self.executionTime
469 465 self.oldAverage = (self.executionTime + self.count *
@@ -569,24 +565,24 class DigitalRFReader(ProcessingUnit):
569 565 if self.__readNextBlock():
570 566 break
571 567 if self.__thisUnixSample > self.__endUTCSecond * self.__sample_rate:
572 self.dataOut.error = 'Error'
568 raise schainpy.admin.SchainError('Error')
573 569 return
574 570
575 571 if self.__flagDiscontinuousBlock:
576 self.dataOut.error = 'discontinuous block found'
572 raise schainpy.admin.SchainError('discontinuous block found')
577 573 return
578 574
579 575 if not self.__online:
580 self.dataOut.error = 'Online?'
576 raise schainpy.admin.SchainError('Online?')
581 577 return
582 578
583 579 err_counter += 1
584 580 if err_counter > nTries:
585 self.dataOut.error = 'Max retrys reach'
581 raise schainpy.admin.SchainError('Max retrys reach')
586 582 return
587 583
588 584 print('[Reading] waiting %d seconds to read a new block' % seconds)
589 sleep(seconds)
585 time.sleep(seconds)
590 586
591 587 self.dataOut.data = self.__data_buffer[:,
592 588 self.__bufferIndex:self.__bufferIndex + self.__nSamples]
@@ -833,7 +833,6 class HFReader(ProcessingUnit):
833 833 def getData(self):
834 834 if self.flagNoMoreFiles:
835 835 self.dataOut.flagNoData = True
836 print('Process finished')
837 836 return 0
838 837
839 838 if self.__hasNotDataInBuffer():
@@ -585,7 +585,6 class AMISRReader(ProcessingUnit):
585 585
586 586 if self.flagNoMoreFiles:
587 587 self.dataOut.flagNoData = True
588 print('Process finished')
589 588 return 0
590 589
591 590 if self.__hasNotDataInBuffer():
@@ -13,6 +13,8 import datetime
13 13
14 14 import numpy
15 15 import h5py
16
17 import schainpy.admin
16 18 from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
17 19 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
18 20 from schainpy.model.data.jrodata import Parameters
@@ -390,7 +392,7 class MADReader(JRODataReader, ProcessingUnit):
390 392 '''
391 393 if self.flagNoMoreFiles:
392 394 self.dataOut.flagNoData = True
393 self.dataOut.error = 'No file left to process'
395 raise schainpy.admin.SchainError('No file left to process')
394 396 return 0
395 397
396 398 if not self.readNextBlock():
@@ -5,6 +5,7 import h5py
5 5 import re
6 6 import datetime
7 7
8 import schainpy.admin
8 9 from schainpy.model.data.jrodata import *
9 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
10 11 from schainpy.model.io.jroIO_base import *
@@ -233,7 +234,7 class ParamReader(JRODataReader,ProcessingUnit):
233 234 idFile = self.fileIndex
234 235
235 236 if not(idFile < len(self.filenameList)):
236 self.dataOut.error = "No more Files"
237 raise schainpy.admin.SchainError("No more Files")
237 238 return 0
238 239
239 240 filename = self.filenameList[idFile]
@@ -1165,8 +1166,7 class ParameterReader(JRODataReader,ProcessingUnit):
1165 1166 idFile = self.fileIndex
1166 1167
1167 1168 if not(idFile < len(self.filenameList)):
1168 self.dataOut.error = 'No more files'
1169 return 0
1169 raise schainpy.admin.SchainError('No more files')
1170 1170
1171 1171 filename = self.filenameList[idFile]
1172 1172 self.fp = h5py.File(filename, 'r')
@@ -92,7 +92,6 class SpectraReader(JRODataReader, ProcessingUnit):
92 92 #Eliminar de la base la herencia
93 93 ProcessingUnit.__init__(self)#, **kwargs)
94 94
95 # self.isConfig = False
96 95
97 96 self.pts2read_SelfSpectra = 0
98 97
@@ -160,7 +159,6 class SpectraReader(JRODataReader, ProcessingUnit):
160 159
161 160 self.__isFirstTimeOnline = 1
162 161
163 # self.ippSeconds = 0
164 162
165 163 self.flagDiscontinuousBlock = 0
166 164
@@ -356,7 +354,6 class SpectraReader(JRODataReader, ProcessingUnit):
356 354
357 355 if self.flagNoMoreFiles:
358 356 self.dataOut.flagNoData = True
359 print('Process finished')
360 357 return 0
361 358
362 359 self.flagDiscontinuousBlock = 0
@@ -17,10 +17,10 import inspect
17 17 import zmq
18 18 import time
19 19 import pickle
20 import traceback
20 21 from queue import Queue
21 22 from threading import Thread
22 23 from multiprocessing import Process
23 from zmq.utils.monitor import recv_monitor_message
24 24
25 25 from schainpy.utils import log
26 26
@@ -219,22 +219,18 def MPDecorator(BaseClass):
219 219 self.receiver = None
220 220 self.i = 0
221 221 self.name = BaseClass.__name__
222
222 223 if 'plot' in self.name.lower() and not self.name.endswith('_'):
223 224 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
224 self.start_time = time.time()
225 225
226 if len(self.args) is 3:
227 self.typeProc = "ProcUnit"
226 self.start_time = time.time()
228 227 self.id = args[0]
229 228 self.inputId = args[1]
230 229 self.project_id = args[2]
231 elif len(self.args) is 2:
232 self.id = args[0]
233 self.inputId = args[0]
234 self.project_id = args[1]
235 self.typeProc = "Operation"
236
230 self.err_queue = args[3]
231 self.typeProc = args[4]
237 232 self.queue = InputQueue(self.project_id, self.inputId)
233 self.err_queue.put('#_start_#')
238 234
239 235 def subscribe(self):
240 236 '''
@@ -271,7 +267,7 def MPDecorator(BaseClass):
271 267
272 268 if self.inputId is None:
273 269 self.i += 1
274 if self.i % 100 == 0:
270 if self.i % 80 == 0:
275 271 self.i = 0
276 272 time.sleep(0.01)
277 273
@@ -283,7 +279,15 def MPDecorator(BaseClass):
283 279 '''
284 280 while True:
285 281
282 try:
286 283 BaseClass.run(self, **self.kwargs)
284 except:
285 err = traceback.format_exc()
286 if 'No more files' in err:
287 log.warning('No more files to read', self.name)
288 else:
289 self.err_queue.put('{}|{}'.format(self.name, err))
290 self.dataOut.error = True
287 291
288 292 for op, optype, opId, kwargs in self.operations:
289 293 if optype == 'self' and not self.dataOut.flagNoData:
@@ -299,11 +303,9 def MPDecorator(BaseClass):
299 303 self.publish(self.dataOut, self.id)
300 304
301 305 if self.dataOut.error:
302 log.error(self.dataOut.error, self.name)
303 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
304 306 break
305 307
306 time.sleep(1)
308 time.sleep(0.5)
307 309
308 310 def runProc(self):
309 311 '''
@@ -315,10 +317,13 def MPDecorator(BaseClass):
315 317
316 318 if self.dataIn.flagNoData and self.dataIn.error is None:
317 319 continue
318
320 elif not self.dataIn.error:
321 try:
319 322 BaseClass.run(self, **self.kwargs)
320
321 if self.dataIn.error:
323 except:
324 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
325 self.dataOut.error = True
326 elif self.dataIn.error:
322 327 self.dataOut.error = self.dataIn.error
323 328 self.dataOut.flagNoData = True
324 329
@@ -330,20 +335,15 def MPDecorator(BaseClass):
330 335 elif optype == 'external' and not self.dataOut.flagNoData:
331 336 self.publish(self.dataOut, opId)
332 337
333 if not self.dataOut.flagNoData or self.dataOut.error:
334 338 self.publish(self.dataOut, self.id)
335 339 for op, optype, opId, kwargs in self.operations:
336 if optype == 'self' and self.dataOut.error:
337 op(**kwargs)
338 elif optype == 'other' and self.dataOut.error:
339 self.dataOut = op.run(self.dataOut, **kwargs)
340 elif optype == 'external' and self.dataOut.error:
340 if optype == 'external' and self.dataOut.error:
341 341 self.publish(self.dataOut, opId)
342 342
343 if self.dataIn.error:
343 if self.dataOut.error:
344 344 break
345 345
346 time.sleep(1)
346 time.sleep(0.5)
347 347
348 348 def runOp(self):
349 349 '''
@@ -355,18 +355,15 def MPDecorator(BaseClass):
355 355
356 356 dataOut = self.listen()
357 357
358 if not dataOut.error:
358 359 BaseClass.run(self, dataOut, **self.kwargs)
359
360 if dataOut.error:
360 else:
361 361 break
362 362
363 time.sleep(1)
364
365 363 def run(self):
366 364 if self.typeProc is "ProcUnit":
367 365
368 366 if self.inputId is not None:
369
370 367 self.subscribe()
371 368
372 369 self.set_publisher()
@@ -386,32 +383,10 def MPDecorator(BaseClass):
386 383
387 384 self.close()
388 385
389 def event_monitor(self, monitor):
390
391 events = {}
392
393 for name in dir(zmq):
394 if name.startswith('EVENT_'):
395 value = getattr(zmq, name)
396 events[value] = name
397
398 while monitor.poll():
399 evt = recv_monitor_message(monitor)
400 if evt['event'] == 32:
401 self.connections += 1
402 if evt['event'] == 512:
403 pass
404
405 evt.update({'description': events[evt['event']]})
406
407 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
408 break
409 monitor.close()
410 print('event monitor thread done!')
411
412 386 def close(self):
413 387
414 388 BaseClass.close(self)
389 self.err_queue.put('#_end_#')
415 390
416 391 if self.sender:
417 392 self.sender.close()
@@ -94,7 +94,6 class ParametersProc(ProcessingUnit):
94 94 self.dataOut.heightList = self.dataIn.getHeiRange()
95 95 self.dataOut.frequency = self.dataIn.frequency
96 96 # self.dataOut.noise = self.dataIn.noise
97 self.dataOut.error = self.dataIn.error
98 97
99 98 def run(self):
100 99
General Comments 0
You need to be logged in to leave comments. Login now