# Copyright (c) 2012-2020 Jicamarca Radio Observatory # All rights reserved. # # Distributed under the terms of the BSD 3-clause license. """API to create signal chain projects The API is provide through class: Project """ import re import sys import ast import datetime import traceback import time import multiprocessing from multiprocessing import Process, Queue from threading import Thread from xml.etree.ElementTree import ElementTree, Element, SubElement from schainpy.admin import Alarm, SchainWarning from schainpy.model import * from schainpy.utils import log if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7: multiprocessing.set_start_method('fork') DTYPES = { 'Voltage': '.r', 'Spectra': '.pdata' } def MPProject(project, n=cpu_count()): ''' Project wrapper to run schain in n processes ''' rconf = project.getReadUnitObj() op = rconf.getOperationObj('run') dt1 = op.getParameterValue('startDate') dt2 = op.getParameterValue('endDate') tm1 = op.getParameterValue('startTime') tm2 = op.getParameterValue('endTime') days = (dt2 - dt1).days for day in range(days + 1): skip = 0 cursor = 0 processes = [] dt = dt1 + datetime.timedelta(day) dt_str = dt.strftime('%Y/%m/%d') reader = JRODataReader() paths, files = reader.searchFilesOffLine(path=rconf.path, startDate=dt, endDate=dt, startTime=tm1, endTime=tm2, ext=DTYPES[rconf.datatype]) nFiles = len(files) if nFiles == 0: continue skip = int(math.ceil(nFiles / n)) while nFiles > cursor * skip: rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor, skip=skip) p = project.clone() p.start() processes.append(p) cursor += 1 def beforeExit(exctype, value, trace): for process in processes: process.terminate() process.join() print(traceback.print_tb(trace)) sys.excepthook = beforeExit for process in processes: process.join() process.terminate() time.sleep(3) def wait(context): time.sleep(1) c = zmq.Context() receiver = c.socket(zmq.SUB) receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id)) receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode()) msg = receiver.recv_multipart()[1] context.terminate() class ParameterConf(): id = None name = None value = None format = None __formated_value = None ELEMENTNAME = 'Parameter' def __init__(self): self.format = 'str' def getElementName(self): return self.ELEMENTNAME def getValue(self): value = self.value format = self.format if self.__formated_value != None: return self.__formated_value if format == 'obj': return value if format == 'str': self.__formated_value = str(value) return self.__formated_value if value == '': raise ValueError('%s: This parameter value is empty' % self.name) if format == 'list': strList = [s.strip() for s in value.split(',')] self.__formated_value = strList return self.__formated_value if format == 'intlist': ''' Example: value = (0,1,2) ''' new_value = ast.literal_eval(value) if type(new_value) not in (tuple, list): new_value = [int(new_value)] self.__formated_value = new_value return self.__formated_value if format == 'floatlist': ''' Example: value = (0.5, 1.4, 2.7) ''' new_value = ast.literal_eval(value) if type(new_value) not in (tuple, list): new_value = [float(new_value)] self.__formated_value = new_value return self.__formated_value if format == 'date': strList = value.split('/') intList = [int(x) for x in strList] date = datetime.date(intList[0], intList[1], intList[2]) self.__formated_value = date return self.__formated_value if format == 'time': strList = value.split(':') intList = [int(x) for x in strList] time = datetime.time(intList[0], intList[1], intList[2]) self.__formated_value = time return self.__formated_value if format == 'pairslist': ''' Example: value = (0,1),(1,2) ''' new_value = ast.literal_eval(value) if type(new_value) not in (tuple, list): raise ValueError('%s has to be a tuple or list of pairs' % value) if type(new_value[0]) not in (tuple, list): if len(new_value) != 2: raise ValueError('%s has to be a tuple or list of pairs' % value) new_value = [new_value] for thisPair in new_value: if len(thisPair) != 2: raise ValueError('%s has to be a tuple or list of pairs' % value) self.__formated_value = new_value return self.__formated_value if format == 'multilist': ''' Example: value = (0,1,2),(3,4,5) ''' multiList = ast.literal_eval(value) if type(multiList[0]) == int: multiList = ast.literal_eval('(' + value + ')') self.__formated_value = multiList return self.__formated_value if format == 'bool': value = int(value) if format == 'int': value = float(value) format_func = eval(format) self.__formated_value = format_func(value) return self.__formated_value def updateId(self, new_id): self.id = str(new_id) def setup(self, id, name, value, format='str'): self.id = str(id) self.name = name if format == 'obj': self.value = value else: self.value = str(value) self.format = str.lower(format) self.getValue() return 1 def update(self, name, value, format='str'): self.name = name self.value = str(value) self.format = format def makeXml(self, opElement): if self.name not in ('queue',): parmElement = SubElement(opElement, self.ELEMENTNAME) parmElement.set('id', str(self.id)) parmElement.set('name', self.name) parmElement.set('value', self.value) parmElement.set('format', self.format) def readXml(self, parmElement): self.id = parmElement.get('id') self.name = parmElement.get('name') self.value = parmElement.get('value') self.format = str.lower(parmElement.get('format')) # Compatible with old signal chain version if self.format == 'int' and self.name == 'idfigure': self.name = 'id' def printattr(self): print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id)) class OperationConf(): ELEMENTNAME = 'Operation' def __init__(self): self.id = '0' self.name = None self.priority = None self.parameters = {} self.object = None self.operations = [] def getId(self): return self.id def getNewId(self): return int(self.id) * 10 + len(self.operations) + 1 def updateId(self, new_id): self.id = str(new_id) n = 1 for conf in self.operations: conf_id = str(int(new_id) * 10 + n) conf.updateId(conf_id) n += 1 def getKwargs(self): params = {} for key, value in self.parameters.items(): if value not in (None, '', ' '): params[key] = value return params def update(self, **kwargs): for key, value in kwargs.items(): self.addParameter(name=key, value=value) def addParameter(self, name, value, format=None): ''' ''' if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value): self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')]) elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value): self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')]) else: try: self.parameters[name] = ast.literal_eval(value) except: if isinstance(value, str) and ',' in value: self.parameters[name] = value.split(',') else: self.parameters[name] = value def getParameters(self): params = {} for key, value in self.parameters.items(): s = type(value).__name__ if s == 'date': params[key] = value.strftime('%Y/%m/%d') elif s == 'time': params[key] = value.strftime('%H:%M:%S') else: params[key] = str(value) return params def makeXml(self, element): xml = SubElement(element, self.ELEMENTNAME) for label in self.xml_labels: xml.set(label, str(getattr(self, label))) for key, value in self.getParameters().items(): xml_param = SubElement(xml, 'Parameter') xml_param.set('name', key) xml_param.set('value', value) for conf in self.operations: conf.makeXml(xml) def __str__(self): if self.ELEMENTNAME == 'Operation': s = ' {}[id={}]\n'.format(self.name, self.id) else: s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId) for key, value in self.parameters.items(): if self.ELEMENTNAME == 'Operation': s += ' {}: {}\n'.format(key, value) else: s += ' {}: {}\n'.format(key, value) for conf in self.operations: s += str(conf) return s class OperationConf(ConfBase): ELEMENTNAME = 'Operation' xml_labels = ['id', 'name'] def setup(self, id, name, priority, project_id, err_queue): self.id = str(id) self.project_id = project_id self.name = name self.type = 'other' self.err_queue = err_queue def readXml(self, element, project_id, err_queue): self.id = element.get('id') self.name = element.get('name') self.type = 'other' self.project_id = str(project_id) self.err_queue = err_queue for elm in element.iter('Parameter'): self.addParameter(elm.get('name'), elm.get('value')) def createObject(self): className = eval(self.name) if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name: kwargs = self.getKwargs() opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs) opObj.start() self.type = 'external' else: opObj = className() self.object = opObj return opObj class ProcUnitConf(ConfBase): ELEMENTNAME = 'ProcUnit' xml_labels = ['id', 'inputId', 'name'] def setup(self, project_id, id, name, datatype, inputId, err_queue): ''' ''' if datatype == None and name == None: raise ValueError('datatype or name should be defined') if name == None: if 'Proc' in datatype: name = datatype else: name = '%sProc' % (datatype) if datatype == None: datatype = name.replace('Proc', '') self.id = str(id) self.project_id = project_id self.name = name self.datatype = datatype self.inputId = inputId self.err_queue = err_queue self.operations = [] self.parameters = {} def removeOperation(self, id): i = [1 if x.id==id else 0 for x in self.operations] self.operations.pop(i.index(1)) def getOperation(self, id): for conf in self.operations: if conf.id == id: return conf def addOperation(self, name, optype='self'): ''' ''' id = self.getNewId() conf = OperationConf() conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue) self.operations.append(conf) return conf def readXml(self, element, project_id, err_queue): self.id = element.get('id') self.name = element.get('name') self.inputId = None if element.get('inputId') == 'None' else element.get('inputId') self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), '')) self.project_id = str(project_id) self.err_queue = err_queue self.operations = [] self.parameters = {} for elm in element: if elm.tag == 'Parameter': self.addParameter(elm.get('name'), elm.get('value')) elif elm.tag == 'Operation': conf = OperationConf() conf.readXml(elm, project_id, err_queue) self.operations.append(conf) def createObjects(self): ''' Instancia de unidades de procesamiento. ''' className = eval(self.name) #print(self.name) kwargs = self.getKwargs() procUnitObj = className() procUnitObj.name = self.name log.success('creating process...', self.name) for conf in self.operations: opObj = conf.createObject() log.success('adding operation: {}, type:{}'.format( conf.name, conf.type), self.name) procUnitObj.addOperation(conf, opObj) self.object = procUnitObj def run(self): ''' ''' return self.object.call(**self.getKwargs()) class ReadUnitConf(ProcUnitConf): ELEMENTNAME = 'ReadUnit' def __init__(self): self.id = None self.datatype = None self.name = None self.inputId = None self.operations = [] self.parameters = {} def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='', startTime='', endTime='', server=None, **kwargs): if datatype == None and name == None: raise ValueError('datatype or name should be defined') if name == None: if 'Reader' in datatype: name = datatype datatype = name.replace('Reader','') else: name = '{}Reader'.format(datatype) if datatype == None: if 'Reader' in name: datatype = name.replace('Reader','') else: datatype = name name = '{}Reader'.format(name) self.id = id self.project_id = project_id self.name = name self.datatype = datatype self.err_queue = err_queue self.addParameter(name='path', value=path) self.addParameter(name='startDate', value=startDate) self.addParameter(name='endDate', value=endDate) self.addParameter(name='startTime', value=startTime) self.addParameter(name='endTime', value=endTime) for key, value in kwargs.items(): self.addParameter(name=key, value=value) class Project(Process): """API to create signal chain projects""" ELEMENTNAME = 'Project' def __init__(self, name=''): Process.__init__(self) self.id = '1' if name: self.name = '{} ({})'.format(Process.__name__, name) self.filename = None self.description = None self.email = None self.alarm = [] self.configurations = {} # self.err_queue = Queue() self.err_queue = None self.started = False def getNewId(self): idList = list(self.configurations.keys()) id = int(self.id) * 10 while True: id += 1 if str(id) in idList: continue break return str(id) def updateId(self, new_id): self.id = str(new_id) keyList = list(self.configurations.keys()) keyList.sort() n = 1 new_confs = {} for procKey in keyList: conf = self.configurations[procKey] idProcUnit = str(int(self.id) * 10 + n) conf.updateId(idProcUnit) new_confs[idProcUnit] = conf n += 1 self.configurations = new_confs def setup(self, id=1, name='', description='', email=None, alarm=[]): self.id = str(id) self.description = description self.email = email self.alarm = alarm if name: self.name = '{} ({})'.format(Process.__name__, name) def update(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) def clone(self): p = Project() p.id = self.id p.name = self.name p.description = self.description p.configurations = self.configurations.copy() return p def addReadUnit(self, id=None, datatype=None, name=None, **kwargs): ''' ''' if id is None: idReadUnit = self.getNewId() else: idReadUnit = str(id) conf = ReadUnitConf() conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs) self.configurations[conf.id] = conf return conf def addProcUnit(self, id=None, inputId='0', datatype=None, name=None): ''' ''' if id is None: idProcUnit = self.getNewId() else: idProcUnit = id conf = ProcUnitConf() conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue) self.configurations[conf.id] = conf return conf def removeProcUnit(self, id): if id in self.configurations: self.configurations.pop(id) def getReadUnit(self): for obj in list(self.configurations.values()): if obj.ELEMENTNAME == 'ReadUnit': return obj return None def getProcUnit(self, id): return self.configurations[id] def getUnits(self): keys = list(self.configurations) keys.sort() for key in keys: yield self.configurations[key] def updateUnit(self, id, **kwargs): conf = self.configurations[id].update(**kwargs) def makeXml(self): xml = Element('Project') xml.set('id', str(self.id)) xml.set('name', self.name) xml.set('description', self.description) for conf in self.configurations.values(): conf.makeXml(xml) self.xml = xml def writeXml(self, filename=None): if filename == None: if self.filename: filename = self.filename else: filename = 'schain.xml' if not filename: print('filename has not been defined. Use setFilename(filename) for do it.') return 0 abs_file = os.path.abspath(filename) if not os.access(os.path.dirname(abs_file), os.W_OK): print('No write permission on %s' % os.path.dirname(abs_file)) return 0 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)): print('File %s already exists and it could not be overwriten' % abs_file) return 0 self.makeXml() ElementTree(self.xml).write(abs_file, method='xml') self.filename = abs_file return 1 def readXml(self, filename): abs_file = os.path.abspath(filename) self.configurations = {} try: self.xml = ElementTree().parse(abs_file) except: log.error('Error reading %s, verify file format' % filename) return 0 self.id = self.xml.get('id') self.name = self.xml.get('name') self.description = self.xml.get('description') for element in self.xml: if element.tag == 'ReadUnit': conf = ReadUnitConf() conf.readXml(element, self.id, self.err_queue) self.configurations[conf.id] = conf elif element.tag == 'ProcUnit': conf = ProcUnitConf() input_proc = self.configurations[element.get('inputId')] conf.readXml(element, self.id, self.err_queue) self.configurations[conf.id] = conf self.filename = abs_file return 1 def __str__(self): text = '\nProject[id=%s, name=%s, description=%s]\n\n' % ( self.id, self.name, self.description, ) for conf in self.configurations.values(): text += '{}'.format(conf) return text def createObjects(self): keys = list(self.configurations.keys()) keys.sort() for key in keys: conf = self.configurations[key] conf.createObjects() if conf.inputId is not None: conf.object.setInput(self.configurations[conf.inputId].object) def monitor(self): t = Thread(target=self._monitor, args=(self.err_queue, self.ctx)) t.start() def _monitor(self, queue, ctx): import socket procs = 0 err_msg = '' while True: msg = queue.get() if '#_start_#' in msg: procs += 1 elif '#_end_#' in msg: procs -=1 else: err_msg = msg if procs == 0 or 'Traceback' in err_msg: break time.sleep(0.1) if '|' in err_msg: name, err = err_msg.split('|') if 'SchainWarning' in err: log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name) elif 'SchainError' in err: log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name) else: log.error(err, name) else: name, err = self.name, err_msg time.sleep(1) ctx.term() message = ''.join(err) if err_msg: subject = 'SChain v%s: Error running %s\n' % ( schainpy.__version__, self.name) subtitle = 'Hostname: %s\n' % socket.gethostbyname( socket.gethostname()) subtitle += 'Working directory: %s\n' % os.path.abspath('./') subtitle += 'Configuration file: %s\n' % self.filename subtitle += 'Time: %s\n' % str(datetime.datetime.now()) readUnitConfObj = self.getReadUnit() if readUnitConfObj: subtitle += '\nInput parameters:\n' subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path'] subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate'] subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate'] subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime'] subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime'] a = Alarm( modes=self.alarm, email=self.email, message=message, subject=subject, subtitle=subtitle, filename=self.filename ) a.start() def setFilename(self, filename): self.filename = filename def runProcs(self): err = False n = len(self.configurations) while not err: for conf in self.getUnits(): ok = conf.run() if ok == 'Error': n -= 1 continue elif not ok: break if n == 0: err = True def run(self): log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='') self.started = True self.start_time = time.time() self.createObjects() self.runProcs() log.success('{} Done (Time: {:4.2f}s)'.format( self.name, time.time()-self.start_time), '')