##// END OF EJS Templates
SOLO UN CANAL EN LA ACTUALIDAD
SOLO UN CANAL EN LA ACTUALIDAD

File last commit:

r1533:37c6d1c7452d
r1775:fba17541610f
Show More
controller.py
676 lines | 19.1 KiB | text/x-python | PythonLexer
# Copyright (c) 2012-2020 Jicamarca Radio Observatory
# All rights reserved.
#
# Distributed under the terms of the BSD 3-clause license.
"""API to create signal chain projects
The API is provide through class: Project
"""
import re
import sys
import ast
import datetime
import traceback
import time
import multiprocessing
import signal as sig
from multiprocessing import Process, Queue, active_children
from threading import Thread
from xml.etree.ElementTree import ElementTree, Element, SubElement
from schainpy.admin import Alarm, SchainWarning
from schainpy.model import *
from schainpy.utils import log
if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7:
multiprocessing.set_start_method('fork')
def handler(sig, frame):
# get all active child processes
active = active_children()
# terminate all active children
for child in active:
child.terminate()
# terminate the process
sys.exit(0)
class ConfBase():
def __init__(self):
self.id = '0'
self.name = None
self.priority = None
self.parameters = {}
self.object = None
self.operations = []
def getId(self):
return self.id
def getNewId(self):
return int(self.id) * 10 + len(self.operations) + 1
def updateId(self, new_id):
self.id = str(new_id)
n = 1
for conf in self.operations:
conf_id = str(int(new_id) * 10 + n)
conf.updateId(conf_id)
n += 1
def getKwargs(self):
params = {}
for key, value in self.parameters.items():
if value not in (None, '', ' '):
params[key] = value
return params
def update(self, **kwargs):
if 'format' not in kwargs:
kwargs['format'] = None
for key, value, fmt in kwargs.items():
self.addParameter(name=key, value=value, format=fmt)
def addParameter(self, name, value, format=None):
'''
'''
if format is not None:
self.parameters[name] = eval(format)(value)
elif isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
else:
try:
self.parameters[name] = ast.literal_eval(value)
except:
if isinstance(value, str) and ',' in value:
self.parameters[name] = value.split(',')
else:
self.parameters[name] = value
def getParameters(self):
params = {}
for key, value in self.parameters.items():
s = type(value).__name__
if s == 'date':
params[key] = value.strftime('%Y/%m/%d')
elif s == 'time':
params[key] = value.strftime('%H:%M:%S')
else:
params[key] = str(value)
return params
def makeXml(self, element):
xml = SubElement(element, self.ELEMENTNAME)
for label in self.xml_labels:
xml.set(label, str(getattr(self, label)))
for key, value in self.getParameters().items():
xml_param = SubElement(xml, 'Parameter')
xml_param.set('name', key)
xml_param.set('value', value)
for conf in self.operations:
conf.makeXml(xml)
def __str__(self):
if self.ELEMENTNAME == 'Operation':
s = ' {}[id={}]\n'.format(self.name, self.id)
else:
s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
for key, value in self.parameters.items():
if self.ELEMENTNAME == 'Operation':
s += ' {}: {}\n'.format(key, value)
else:
s += ' {}: {}\n'.format(key, value)
for conf in self.operations:
s += str(conf)
return s
class OperationConf(ConfBase):
ELEMENTNAME = 'Operation'
xml_labels = ['id', 'name']
def setup(self, id, name, priority, project_id, err_queue):
self.id = str(id)
self.project_id = project_id
self.name = name
self.type = 'other'
self.err_queue = err_queue
def readXml(self, element, project_id, err_queue):
self.id = element.get('id')
self.name = element.get('name')
self.type = 'other'
self.project_id = str(project_id)
self.err_queue = err_queue
for elm in element.iter('Parameter'):
self.addParameter(elm.get('name'), elm.get('value'))
def createObject(self):
className = eval(self.name)
if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name:
kwargs = self.getKwargs()
opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
opObj.start()
self.type = 'external'
else:
opObj = className()
self.object = opObj
return opObj
class ProcUnitConf(ConfBase):
ELEMENTNAME = 'ProcUnit'
xml_labels = ['id', 'inputId', 'name']
def setup(self, project_id, id, name, datatype, inputId, err_queue):
'''
'''
if datatype == None and name == None:
raise ValueError('datatype or name should be defined')
if name == None:
if 'Proc' in datatype:
name = datatype
else:
name = '%sProc' % (datatype)
if datatype == None:
datatype = name.replace('Proc', '')
self.id = str(id)
self.project_id = project_id
self.name = name
self.datatype = datatype
self.inputId = inputId
self.err_queue = err_queue
self.operations = []
self.parameters = {}
def removeOperation(self, id):
i = [1 if x.id==id else 0 for x in self.operations]
self.operations.pop(i.index(1))
def getOperation(self, id):
for conf in self.operations:
if conf.id == id:
return conf
def addOperation(self, name, optype='self'):
'''
'''
id = self.getNewId()
conf = OperationConf()
conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
self.operations.append(conf)
return conf
def readXml(self, element, project_id, err_queue):
self.id = element.get('id')
self.name = element.get('name')
self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
self.project_id = str(project_id)
self.err_queue = err_queue
self.operations = []
self.parameters = {}
for elm in element:
if elm.tag == 'Parameter':
self.addParameter(elm.get('name'), elm.get('value'))
elif elm.tag == 'Operation':
conf = OperationConf()
conf.readXml(elm, project_id, err_queue)
self.operations.append(conf)
def createObjects(self):
'''
Instancia de unidades de procesamiento.
'''
className = eval(self.name)
kwargs = self.getKwargs()
procUnitObj = className()
procUnitObj.name = self.name
log.success('creating process...', self.name)
for conf in self.operations:
opObj = conf.createObject()
log.success('adding operation: {}, type:{}'.format(
conf.name,
conf.type), self.name)
procUnitObj.addOperation(conf, opObj)
self.object = procUnitObj
def run(self):
'''
'''
return self.object.call(**self.getKwargs())
class ReadUnitConf(ProcUnitConf):
ELEMENTNAME = 'ReadUnit'
def __init__(self):
self.id = None
self.datatype = None
self.name = None
self.inputId = None
self.operations = []
self.parameters = {}
def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
startTime='', endTime='', server=None, **kwargs):
if datatype == None and name == None:
raise ValueError('datatype or name should be defined')
if name == None:
if 'Reader' in datatype:
name = datatype
datatype = name.replace('Reader','')
else:
name = '{}Reader'.format(datatype)
if datatype == None:
if 'Reader' in name:
datatype = name.replace('Reader','')
else:
datatype = name
name = '{}Reader'.format(name)
self.id = id
self.project_id = project_id
self.name = name
self.datatype = datatype
self.err_queue = err_queue
self.addParameter(name='path', value=path, format='str')
self.addParameter(name='startDate', value=startDate)
self.addParameter(name='endDate', value=endDate)
self.addParameter(name='startTime', value=startTime)
self.addParameter(name='endTime', value=endTime)
for key, value in kwargs.items():
self.addParameter(name=key, value=value)
class Project(Process):
"""API to create signal chain projects"""
ELEMENTNAME = 'Project'
def __init__(self, name=''):
Process.__init__(self)
self.id = '1'
if name:
self.name = '{} ({})'.format(Process.__name__, name)
self.filename = None
self.description = None
self.email = None
self.alarm = []
self.configurations = {}
# self.err_queue = Queue()
self.err_queue = None
self.started = False
def getNewId(self):
idList = list(self.configurations.keys())
id = int(self.id) * 10
while True:
id += 1
if str(id) in idList:
continue
break
return str(id)
def updateId(self, new_id):
self.id = str(new_id)
keyList = list(self.configurations.keys())
keyList.sort()
n = 1
new_confs = {}
for procKey in keyList:
conf = self.configurations[procKey]
idProcUnit = str(int(self.id) * 10 + n)
conf.updateId(idProcUnit)
new_confs[idProcUnit] = conf
n += 1
self.configurations = new_confs
def setup(self, id=1, name='', description='', email=None, alarm=[]):
self.id = str(id)
self.description = description
self.email = email
self.alarm = alarm
if name:
self.name = '{} ({})'.format(Process.__name__, name)
def update(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def clone(self):
p = Project()
p.id = self.id
p.name = self.name
p.description = self.description
p.configurations = self.configurations.copy()
return p
def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
'''
'''
if id is None:
idReadUnit = self.getNewId()
else:
idReadUnit = str(id)
conf = ReadUnitConf()
conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
self.configurations[conf.id] = conf
return conf
def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
'''
'''
if id is None:
idProcUnit = self.getNewId()
else:
idProcUnit = id
conf = ProcUnitConf()
conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
self.configurations[conf.id] = conf
return conf
def removeProcUnit(self, id):
if id in self.configurations:
self.configurations.pop(id)
def getReadUnit(self):
for obj in list(self.configurations.values()):
if obj.ELEMENTNAME == 'ReadUnit':
return obj
return None
def getProcUnit(self, id):
return self.configurations[id]
def getUnits(self):
keys = list(self.configurations)
keys.sort()
for key in keys:
yield self.configurations[key]
def updateUnit(self, id, **kwargs):
conf = self.configurations[id].update(**kwargs)
def makeXml(self):
xml = Element('Project')
xml.set('id', str(self.id))
xml.set('name', self.name)
xml.set('description', self.description)
for conf in self.configurations.values():
conf.makeXml(xml)
self.xml = xml
def writeXml(self, filename=None):
if filename == None:
if self.filename:
filename = self.filename
else:
filename = 'schain.xml'
if not filename:
print('filename has not been defined. Use setFilename(filename) for do it.')
return 0
abs_file = os.path.abspath(filename)
if not os.access(os.path.dirname(abs_file), os.W_OK):
print('No write permission on %s' % os.path.dirname(abs_file))
return 0
if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
print('File %s already exists and it could not be overwriten' % abs_file)
return 0
self.makeXml()
ElementTree(self.xml).write(abs_file, method='xml')
self.filename = abs_file
return 1
def readXml(self, filename):
abs_file = os.path.abspath(filename)
self.configurations = {}
try:
self.xml = ElementTree().parse(abs_file)
except:
log.error('Error reading %s, verify file format' % filename)
return 0
self.id = self.xml.get('id')
self.name = self.xml.get('name')
self.description = self.xml.get('description')
for element in self.xml:
if element.tag == 'ReadUnit':
conf = ReadUnitConf()
conf.readXml(element, self.id, self.err_queue)
self.configurations[conf.id] = conf
elif element.tag == 'ProcUnit':
conf = ProcUnitConf()
input_proc = self.configurations[element.get('inputId')]
conf.readXml(element, self.id, self.err_queue)
self.configurations[conf.id] = conf
self.filename = abs_file
return 1
def __str__(self):
text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
self.id,
self.name,
self.description,
)
for conf in self.configurations.values():
text += '{}'.format(conf)
return text
def createObjects(self):
keys = list(self.configurations.keys())
keys.sort()
for key in keys:
conf = self.configurations[key]
conf.createObjects()
if conf.inputId is not None:
if isinstance(conf.inputId, list):
conf.object.setInput([self.configurations[x].object for x in conf.inputId])
else:
conf.object.setInput([self.configurations[conf.inputId].object])
def monitor(self):
t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
t.start()
def _monitor(self, queue, ctx):
import socket
procs = 0
err_msg = ''
while True:
msg = queue.get()
if '#_start_#' in msg:
procs += 1
elif '#_end_#' in msg:
procs -=1
else:
err_msg = msg
if procs == 0 or 'Traceback' in err_msg:
break
time.sleep(0.1)
if '|' in err_msg:
name, err = err_msg.split('|')
if 'SchainWarning' in err:
log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
elif 'SchainError' in err:
log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
else:
log.error(err, name)
else:
name, err = self.name, err_msg
time.sleep(1)
ctx.term()
message = ''.join(err)
if err_msg:
subject = 'SChain v%s: Error running %s\n' % (
schainpy.__version__, self.name)
subtitle = 'Hostname: %s\n' % socket.gethostbyname(
socket.gethostname())
subtitle += 'Working directory: %s\n' % os.path.abspath('./')
subtitle += 'Configuration file: %s\n' % self.filename
subtitle += 'Time: %s\n' % str(datetime.datetime.now())
readUnitConfObj = self.getReadUnit()
if readUnitConfObj:
subtitle += '\nInput parameters:\n'
subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
a = Alarm(
modes=self.alarm,
email=self.email,
message=message,
subject=subject,
subtitle=subtitle,
filename=self.filename
)
a.start()
def setFilename(self, filename):
self.filename = filename
def runProcs(self):
err = False
n = len(self.configurations)
while not err:
for conf in self.getUnits():
ok = conf.run()
if ok == 'Error':
n -= 1
continue
elif not ok:
break
if n == 0:
err = True
def run(self):
log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
self.started = True
self.start_time = time.time()
self.createObjects()
sig.signal(sig.SIGTERM, handler)
self.runProcs()
log.success('{} Done (Time: {:4.2f}s)'.format(
self.name,
time.time()-self.start_time), '')