controller.py
659 lines
| 17.8 KiB
| text/x-python
|
PythonLexer
/ schainpy / controller.py
r1371 | # Copyright (c) 2012-2020 Jicamarca Radio Observatory | |||
# All rights reserved. | ||||
# | ||||
# Distributed under the terms of the BSD 3-clause license. | ||||
"""API to create signal chain projects | ||||
The API is provide through class: Project | ||||
""" | ||||
import re | ||||
import sys | ||||
import ast | ||||
import datetime | ||||
import traceback | ||||
import time | ||||
import multiprocessing | ||||
from multiprocessing import Process, Queue | ||||
from threading import Thread | ||||
from xml.etree.ElementTree import ElementTree, Element, SubElement | ||||
from schainpy.admin import Alarm, SchainWarning | ||||
from schainpy.model import * | ||||
from schainpy.utils import log | ||||
if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7: | ||||
multiprocessing.set_start_method('fork') | ||||
class ConfBase(): | ||||
def __init__(self): | ||||
self.id = '0' | ||||
self.name = None | ||||
self.priority = None | ||||
self.parameters = {} | ||||
self.object = None | ||||
self.operations = [] | ||||
def getId(self): | ||||
return self.id | ||||
def getNewId(self): | ||||
return int(self.id) * 10 + len(self.operations) + 1 | ||||
def updateId(self, new_id): | ||||
self.id = str(new_id) | ||||
n = 1 | ||||
for conf in self.operations: | ||||
conf_id = str(int(new_id) * 10 + n) | ||||
conf.updateId(conf_id) | ||||
n += 1 | ||||
def getKwargs(self): | ||||
params = {} | ||||
for key, value in self.parameters.items(): | ||||
if value not in (None, '', ' '): | ||||
params[key] = value | ||||
return params | ||||
def update(self, **kwargs): | ||||
for key, value in kwargs.items(): | ||||
self.addParameter(name=key, value=value) | ||||
def addParameter(self, name, value, format=None): | ||||
''' | ||||
''' | ||||
if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value): | ||||
self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')]) | ||||
elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value): | ||||
self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')]) | ||||
else: | ||||
try: | ||||
self.parameters[name] = ast.literal_eval(value) | ||||
except: | ||||
if isinstance(value, str) and ',' in value: | ||||
self.parameters[name] = value.split(',') | ||||
else: | ||||
self.parameters[name] = value | ||||
def getParameters(self): | ||||
params = {} | ||||
for key, value in self.parameters.items(): | ||||
s = type(value).__name__ | ||||
if s == 'date': | ||||
params[key] = value.strftime('%Y/%m/%d') | ||||
elif s == 'time': | ||||
params[key] = value.strftime('%H:%M:%S') | ||||
else: | ||||
params[key] = str(value) | ||||
return params | ||||
def makeXml(self, element): | ||||
xml = SubElement(element, self.ELEMENTNAME) | ||||
for label in self.xml_labels: | ||||
xml.set(label, str(getattr(self, label))) | ||||
for key, value in self.getParameters().items(): | ||||
xml_param = SubElement(xml, 'Parameter') | ||||
xml_param.set('name', key) | ||||
xml_param.set('value', value) | ||||
for conf in self.operations: | ||||
conf.makeXml(xml) | ||||
def __str__(self): | ||||
if self.ELEMENTNAME == 'Operation': | ||||
s = ' {}[id={}]\n'.format(self.name, self.id) | ||||
else: | ||||
s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId) | ||||
for key, value in self.parameters.items(): | ||||
if self.ELEMENTNAME == 'Operation': | ||||
s += ' {}: {}\n'.format(key, value) | ||||
else: | ||||
s += ' {}: {}\n'.format(key, value) | ||||
for conf in self.operations: | ||||
s += str(conf) | ||||
return s | ||||
class OperationConf(ConfBase): | ||||
ELEMENTNAME = 'Operation' | ||||
xml_labels = ['id', 'name'] | ||||
def setup(self, id, name, priority, project_id, err_queue): | ||||
self.id = str(id) | ||||
self.project_id = project_id | ||||
self.name = name | ||||
self.type = 'other' | ||||
self.err_queue = err_queue | ||||
def readXml(self, element, project_id, err_queue): | ||||
self.id = element.get('id') | ||||
self.name = element.get('name') | ||||
self.type = 'other' | ||||
self.project_id = str(project_id) | ||||
self.err_queue = err_queue | ||||
for elm in element.iter('Parameter'): | ||||
self.addParameter(elm.get('name'), elm.get('value')) | ||||
def createObject(self): | ||||
className = eval(self.name) | ||||
if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name: | ||||
kwargs = self.getKwargs() | ||||
opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs) | ||||
opObj.start() | ||||
self.type = 'external' | ||||
else: | ||||
opObj = className() | ||||
self.object = opObj | ||||
return opObj | ||||
class ProcUnitConf(ConfBase): | ||||
ELEMENTNAME = 'ProcUnit' | ||||
xml_labels = ['id', 'inputId', 'name'] | ||||
def setup(self, project_id, id, name, datatype, inputId, err_queue): | ||||
''' | ||||
''' | ||||
if datatype == None and name == None: | ||||
raise ValueError('datatype or name should be defined') | ||||
if name == None: | ||||
if 'Proc' in datatype: | ||||
name = datatype | ||||
else: | ||||
name = '%sProc' % (datatype) | ||||
if datatype == None: | ||||
datatype = name.replace('Proc', '') | ||||
self.id = str(id) | ||||
self.project_id = project_id | ||||
self.name = name | ||||
self.datatype = datatype | ||||
self.inputId = inputId | ||||
self.err_queue = err_queue | ||||
self.operations = [] | ||||
self.parameters = {} | ||||
def removeOperation(self, id): | ||||
i = [1 if x.id==id else 0 for x in self.operations] | ||||
self.operations.pop(i.index(1)) | ||||
def getOperation(self, id): | ||||
for conf in self.operations: | ||||
if conf.id == id: | ||||
return conf | ||||
def addOperation(self, name, optype='self'): | ||||
''' | ||||
''' | ||||
id = self.getNewId() | ||||
conf = OperationConf() | ||||
conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue) | ||||
self.operations.append(conf) | ||||
return conf | ||||
def readXml(self, element, project_id, err_queue): | ||||
self.id = element.get('id') | ||||
self.name = element.get('name') | ||||
self.inputId = None if element.get('inputId') == 'None' else element.get('inputId') | ||||
self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), '')) | ||||
self.project_id = str(project_id) | ||||
self.err_queue = err_queue | ||||
self.operations = [] | ||||
self.parameters = {} | ||||
for elm in element: | ||||
if elm.tag == 'Parameter': | ||||
self.addParameter(elm.get('name'), elm.get('value')) | ||||
elif elm.tag == 'Operation': | ||||
conf = OperationConf() | ||||
conf.readXml(elm, project_id, err_queue) | ||||
self.operations.append(conf) | ||||
def createObjects(self): | ||||
''' | ||||
Instancia de unidades de procesamiento. | ||||
''' | ||||
className = eval(self.name) | ||||
kwargs = self.getKwargs() | ||||
procUnitObj = className() | ||||
procUnitObj.name = self.name | ||||
log.success('creating process...', self.name) | ||||
for conf in self.operations: | ||||
opObj = conf.createObject() | ||||
log.success('adding operation: {}, type:{}'.format( | ||||
conf.name, | ||||
conf.type), self.name) | ||||
procUnitObj.addOperation(conf, opObj) | ||||
self.object = procUnitObj | ||||
def run(self): | ||||
''' | ||||
''' | ||||
return self.object.call(**self.getKwargs()) | ||||
class ReadUnitConf(ProcUnitConf): | ||||
ELEMENTNAME = 'ReadUnit' | ||||
def __init__(self): | ||||
self.id = None | ||||
self.datatype = None | ||||
self.name = None | ||||
self.inputId = None | ||||
self.operations = [] | ||||
self.parameters = {} | ||||
def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='', | ||||
startTime='', endTime='', server=None, **kwargs): | ||||
if datatype == None and name == None: | ||||
raise ValueError('datatype or name should be defined') | ||||
if name == None: | ||||
if 'Reader' in datatype: | ||||
name = datatype | ||||
datatype = name.replace('Reader','') | ||||
else: | ||||
name = '{}Reader'.format(datatype) | ||||
if datatype == None: | ||||
if 'Reader' in name: | ||||
datatype = name.replace('Reader','') | ||||
else: | ||||
datatype = name | ||||
name = '{}Reader'.format(name) | ||||
self.id = id | ||||
self.project_id = project_id | ||||
self.name = name | ||||
self.datatype = datatype | ||||
self.err_queue = err_queue | ||||
self.addParameter(name='path', value=path) | ||||
self.addParameter(name='startDate', value=startDate) | ||||
self.addParameter(name='endDate', value=endDate) | ||||
self.addParameter(name='startTime', value=startTime) | ||||
self.addParameter(name='endTime', value=endTime) | ||||
for key, value in kwargs.items(): | ||||
self.addParameter(name=key, value=value) | ||||
class Project(Process): | ||||
"""API to create signal chain projects""" | ||||
ELEMENTNAME = 'Project' | ||||
def __init__(self, name=''): | ||||
Process.__init__(self) | ||||
self.id = '1' | ||||
if name: | ||||
self.name = '{} ({})'.format(Process.__name__, name) | ||||
self.filename = None | ||||
self.description = None | ||||
self.email = None | ||||
self.alarm = [] | ||||
self.configurations = {} | ||||
# self.err_queue = Queue() | ||||
self.err_queue = None | ||||
self.started = False | ||||
def getNewId(self): | ||||
idList = list(self.configurations.keys()) | ||||
id = int(self.id) * 10 | ||||
while True: | ||||
id += 1 | ||||
if str(id) in idList: | ||||
continue | ||||
break | ||||
return str(id) | ||||
def updateId(self, new_id): | ||||
self.id = str(new_id) | ||||
keyList = list(self.configurations.keys()) | ||||
keyList.sort() | ||||
n = 1 | ||||
new_confs = {} | ||||
for procKey in keyList: | ||||
conf = self.configurations[procKey] | ||||
idProcUnit = str(int(self.id) * 10 + n) | ||||
conf.updateId(idProcUnit) | ||||
new_confs[idProcUnit] = conf | ||||
n += 1 | ||||
self.configurations = new_confs | ||||
def setup(self, id=1, name='', description='', email=None, alarm=[]): | ||||
self.id = str(id) | ||||
self.description = description | ||||
self.email = email | ||||
self.alarm = alarm | ||||
if name: | ||||
self.name = '{} ({})'.format(Process.__name__, name) | ||||
def update(self, **kwargs): | ||||
for key, value in kwargs.items(): | ||||
setattr(self, key, value) | ||||
def clone(self): | ||||
p = Project() | ||||
p.id = self.id | ||||
p.name = self.name | ||||
p.description = self.description | ||||
p.configurations = self.configurations.copy() | ||||
return p | ||||
def addReadUnit(self, id=None, datatype=None, name=None, **kwargs): | ||||
''' | ||||
''' | ||||
if id is None: | ||||
idReadUnit = self.getNewId() | ||||
else: | ||||
idReadUnit = str(id) | ||||
conf = ReadUnitConf() | ||||
conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs) | ||||
self.configurations[conf.id] = conf | ||||
return conf | ||||
def addProcUnit(self, id=None, inputId='0', datatype=None, name=None): | ||||
''' | ||||
''' | ||||
if id is None: | ||||
idProcUnit = self.getNewId() | ||||
else: | ||||
idProcUnit = id | ||||
conf = ProcUnitConf() | ||||
conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue) | ||||
self.configurations[conf.id] = conf | ||||
return conf | ||||
def removeProcUnit(self, id): | ||||
if id in self.configurations: | ||||
self.configurations.pop(id) | ||||
def getReadUnit(self): | ||||
for obj in list(self.configurations.values()): | ||||
if obj.ELEMENTNAME == 'ReadUnit': | ||||
return obj | ||||
return None | ||||
def getProcUnit(self, id): | ||||
return self.configurations[id] | ||||
def getUnits(self): | ||||
keys = list(self.configurations) | ||||
keys.sort() | ||||
for key in keys: | ||||
yield self.configurations[key] | ||||
def updateUnit(self, id, **kwargs): | ||||
conf = self.configurations[id].update(**kwargs) | ||||
def makeXml(self): | ||||
xml = Element('Project') | ||||
xml.set('id', str(self.id)) | ||||
xml.set('name', self.name) | ||||
xml.set('description', self.description) | ||||
for conf in self.configurations.values(): | ||||
conf.makeXml(xml) | ||||
self.xml = xml | ||||
def writeXml(self, filename=None): | ||||
if filename == None: | ||||
if self.filename: | ||||
filename = self.filename | ||||
else: | ||||
filename = 'schain.xml' | ||||
if not filename: | ||||
print('filename has not been defined. Use setFilename(filename) for do it.') | ||||
return 0 | ||||
abs_file = os.path.abspath(filename) | ||||
if not os.access(os.path.dirname(abs_file), os.W_OK): | ||||
print('No write permission on %s' % os.path.dirname(abs_file)) | ||||
return 0 | ||||
if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)): | ||||
print('File %s already exists and it could not be overwriten' % abs_file) | ||||
return 0 | ||||
self.makeXml() | ||||
ElementTree(self.xml).write(abs_file, method='xml') | ||||
self.filename = abs_file | ||||
return 1 | ||||
def readXml(self, filename): | ||||
abs_file = os.path.abspath(filename) | ||||
self.configurations = {} | ||||
try: | ||||
self.xml = ElementTree().parse(abs_file) | ||||
except: | ||||
log.error('Error reading %s, verify file format' % filename) | ||||
return 0 | ||||
self.id = self.xml.get('id') | ||||
self.name = self.xml.get('name') | ||||
self.description = self.xml.get('description') | ||||
for element in self.xml: | ||||
if element.tag == 'ReadUnit': | ||||
conf = ReadUnitConf() | ||||
conf.readXml(element, self.id, self.err_queue) | ||||
self.configurations[conf.id] = conf | ||||
elif element.tag == 'ProcUnit': | ||||
conf = ProcUnitConf() | ||||
input_proc = self.configurations[element.get('inputId')] | ||||
conf.readXml(element, self.id, self.err_queue) | ||||
self.configurations[conf.id] = conf | ||||
self.filename = abs_file | ||||
return 1 | ||||
def __str__(self): | ||||
text = '\nProject[id=%s, name=%s, description=%s]\n\n' % ( | ||||
self.id, | ||||
self.name, | ||||
self.description, | ||||
) | ||||
for conf in self.configurations.values(): | ||||
text += '{}'.format(conf) | ||||
return text | ||||
def createObjects(self): | ||||
keys = list(self.configurations.keys()) | ||||
keys.sort() | ||||
for key in keys: | ||||
conf = self.configurations[key] | ||||
conf.createObjects() | ||||
if conf.inputId is not None: | ||||
conf.object.setInput(self.configurations[conf.inputId].object) | ||||
def monitor(self): | ||||
t = Thread(target=self._monitor, args=(self.err_queue, self.ctx)) | ||||
t.start() | ||||
def _monitor(self, queue, ctx): | ||||
import socket | ||||
procs = 0 | ||||
err_msg = '' | ||||
while True: | ||||
msg = queue.get() | ||||
if '#_start_#' in msg: | ||||
procs += 1 | ||||
elif '#_end_#' in msg: | ||||
procs -=1 | ||||
else: | ||||
err_msg = msg | ||||
if procs == 0 or 'Traceback' in err_msg: | ||||
break | ||||
time.sleep(0.1) | ||||
if '|' in err_msg: | ||||
name, err = err_msg.split('|') | ||||
if 'SchainWarning' in err: | ||||
log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name) | ||||
elif 'SchainError' in err: | ||||
log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name) | ||||
else: | ||||
log.error(err, name) | ||||
else: | ||||
name, err = self.name, err_msg | ||||
time.sleep(1) | ||||
ctx.term() | ||||
message = ''.join(err) | ||||
if err_msg: | ||||
subject = 'SChain v%s: Error running %s\n' % ( | ||||
schainpy.__version__, self.name) | ||||
subtitle = 'Hostname: %s\n' % socket.gethostbyname( | ||||
socket.gethostname()) | ||||
subtitle += 'Working directory: %s\n' % os.path.abspath('./') | ||||
subtitle += 'Configuration file: %s\n' % self.filename | ||||
subtitle += 'Time: %s\n' % str(datetime.datetime.now()) | ||||
readUnitConfObj = self.getReadUnit() | ||||
if readUnitConfObj: | ||||
subtitle += '\nInput parameters:\n' | ||||
subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path'] | ||||
subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate'] | ||||
subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate'] | ||||
subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime'] | ||||
subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime'] | ||||
a = Alarm( | ||||
modes=self.alarm, | ||||
email=self.email, | ||||
message=message, | ||||
subject=subject, | ||||
subtitle=subtitle, | ||||
filename=self.filename | ||||
) | ||||
a.start() | ||||
def setFilename(self, filename): | ||||
self.filename = filename | ||||
def runProcs(self): | ||||
err = False | ||||
n = len(self.configurations) | ||||
while not err: | ||||
for conf in self.getUnits(): | ||||
ok = conf.run() | ||||
if ok == 'Error': | ||||
n -= 1 | ||||
continue | ||||
elif not ok: | ||||
break | ||||
if n == 0: | ||||
err = True | ||||
def run(self): | ||||
log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='') | ||||
self.started = True | ||||
self.start_time = time.time() | ||||
self.createObjects() | ||||
self.runProcs() | ||||
log.success('{} Done (Time: {:4.2f}s)'.format( | ||||
self.name, | ||||
time.time()-self.start_time), '') | ||||