##// END OF EJS Templates
merged branches
merged branches

File last commit:

r1370:81f892b894eb merge
r1370:81f892b894eb merge
Show More
controller.py
918 lines | 25.5 KiB | text/x-python | PythonLexer
# Copyright (c) 2012-2020 Jicamarca Radio Observatory
# All rights reserved.
#
# Distributed under the terms of the BSD 3-clause license.
"""API to create signal chain projects
The API is provide through class: Project
"""
import re
import sys
import ast
import datetime
import traceback
import time
import multiprocessing
from multiprocessing import Process, Queue
from threading import Thread
from xml.etree.ElementTree import ElementTree, Element, SubElement
from schainpy.admin import Alarm, SchainWarning
from schainpy.model import *
from schainpy.utils import log
if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7:
multiprocessing.set_start_method('fork')
DTYPES = {
'Voltage': '.r',
'Spectra': '.pdata'
}
def MPProject(project, n=cpu_count()):
'''
Project wrapper to run schain in n processes
'''
rconf = project.getReadUnitObj()
op = rconf.getOperationObj('run')
dt1 = op.getParameterValue('startDate')
dt2 = op.getParameterValue('endDate')
tm1 = op.getParameterValue('startTime')
tm2 = op.getParameterValue('endTime')
days = (dt2 - dt1).days
for day in range(days + 1):
skip = 0
cursor = 0
processes = []
dt = dt1 + datetime.timedelta(day)
dt_str = dt.strftime('%Y/%m/%d')
reader = JRODataReader()
paths, files = reader.searchFilesOffLine(path=rconf.path,
startDate=dt,
endDate=dt,
startTime=tm1,
endTime=tm2,
ext=DTYPES[rconf.datatype])
nFiles = len(files)
if nFiles == 0:
continue
skip = int(math.ceil(nFiles / n))
while nFiles > cursor * skip:
rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
skip=skip)
p = project.clone()
p.start()
processes.append(p)
cursor += 1
def beforeExit(exctype, value, trace):
for process in processes:
process.terminate()
process.join()
print(traceback.print_tb(trace))
sys.excepthook = beforeExit
for process in processes:
process.join()
process.terminate()
time.sleep(3)
def wait(context):
time.sleep(1)
c = zmq.Context()
receiver = c.socket(zmq.SUB)
receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
msg = receiver.recv_multipart()[1]
context.terminate()
class ParameterConf():
id = None
name = None
value = None
format = None
__formated_value = None
ELEMENTNAME = 'Parameter'
def __init__(self):
self.format = 'str'
def getElementName(self):
return self.ELEMENTNAME
def getValue(self):
value = self.value
format = self.format
if self.__formated_value != None:
return self.__formated_value
if format == 'obj':
return value
if format == 'str':
self.__formated_value = str(value)
return self.__formated_value
if value == '':
raise ValueError('%s: This parameter value is empty' % self.name)
if format == 'list':
strList = [s.strip() for s in value.split(',')]
self.__formated_value = strList
return self.__formated_value
if format == 'intlist':
'''
Example:
value = (0,1,2)
'''
new_value = ast.literal_eval(value)
if type(new_value) not in (tuple, list):
new_value = [int(new_value)]
self.__formated_value = new_value
return self.__formated_value
if format == 'floatlist':
'''
Example:
value = (0.5, 1.4, 2.7)
'''
new_value = ast.literal_eval(value)
if type(new_value) not in (tuple, list):
new_value = [float(new_value)]
self.__formated_value = new_value
return self.__formated_value
if format == 'date':
strList = value.split('/')
intList = [int(x) for x in strList]
date = datetime.date(intList[0], intList[1], intList[2])
self.__formated_value = date
return self.__formated_value
if format == 'time':
strList = value.split(':')
intList = [int(x) for x in strList]
time = datetime.time(intList[0], intList[1], intList[2])
self.__formated_value = time
return self.__formated_value
if format == 'pairslist':
'''
Example:
value = (0,1),(1,2)
'''
new_value = ast.literal_eval(value)
if type(new_value) not in (tuple, list):
raise ValueError('%s has to be a tuple or list of pairs' % value)
if type(new_value[0]) not in (tuple, list):
if len(new_value) != 2:
raise ValueError('%s has to be a tuple or list of pairs' % value)
new_value = [new_value]
for thisPair in new_value:
if len(thisPair) != 2:
raise ValueError('%s has to be a tuple or list of pairs' % value)
self.__formated_value = new_value
return self.__formated_value
if format == 'multilist':
'''
Example:
value = (0,1,2),(3,4,5)
'''
multiList = ast.literal_eval(value)
if type(multiList[0]) == int:
multiList = ast.literal_eval('(' + value + ')')
self.__formated_value = multiList
return self.__formated_value
if format == 'bool':
value = int(value)
if format == 'int':
value = float(value)
format_func = eval(format)
self.__formated_value = format_func(value)
return self.__formated_value
def updateId(self, new_id):
self.id = str(new_id)
def setup(self, id, name, value, format='str'):
self.id = str(id)
self.name = name
if format == 'obj':
self.value = value
else:
self.value = str(value)
self.format = str.lower(format)
self.getValue()
return 1
def update(self, name, value, format='str'):
self.name = name
self.value = str(value)
self.format = format
def makeXml(self, opElement):
if self.name not in ('queue',):
parmElement = SubElement(opElement, self.ELEMENTNAME)
parmElement.set('id', str(self.id))
parmElement.set('name', self.name)
parmElement.set('value', self.value)
parmElement.set('format', self.format)
def readXml(self, parmElement):
self.id = parmElement.get('id')
self.name = parmElement.get('name')
self.value = parmElement.get('value')
self.format = str.lower(parmElement.get('format'))
# Compatible with old signal chain version
if self.format == 'int' and self.name == 'idfigure':
self.name = 'id'
def printattr(self):
print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id))
class OperationConf():
ELEMENTNAME = 'Operation'
def __init__(self):
self.id = '0'
self.name = None
self.priority = None
self.parameters = {}
self.object = None
self.operations = []
def getId(self):
return self.id
def getNewId(self):
return int(self.id) * 10 + len(self.operations) + 1
def updateId(self, new_id):
self.id = str(new_id)
n = 1
for conf in self.operations:
conf_id = str(int(new_id) * 10 + n)
conf.updateId(conf_id)
n += 1
def getKwargs(self):
params = {}
for key, value in self.parameters.items():
if value not in (None, '', ' '):
params[key] = value
return params
def update(self, **kwargs):
for key, value in kwargs.items():
self.addParameter(name=key, value=value)
def addParameter(self, name, value, format=None):
'''
'''
if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
else:
try:
self.parameters[name] = ast.literal_eval(value)
except:
if isinstance(value, str) and ',' in value:
self.parameters[name] = value.split(',')
else:
self.parameters[name] = value
def getParameters(self):
params = {}
for key, value in self.parameters.items():
s = type(value).__name__
if s == 'date':
params[key] = value.strftime('%Y/%m/%d')
elif s == 'time':
params[key] = value.strftime('%H:%M:%S')
else:
params[key] = str(value)
return params
def makeXml(self, element):
xml = SubElement(element, self.ELEMENTNAME)
for label in self.xml_labels:
xml.set(label, str(getattr(self, label)))
for key, value in self.getParameters().items():
xml_param = SubElement(xml, 'Parameter')
xml_param.set('name', key)
xml_param.set('value', value)
for conf in self.operations:
conf.makeXml(xml)
def __str__(self):
if self.ELEMENTNAME == 'Operation':
s = ' {}[id={}]\n'.format(self.name, self.id)
else:
s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
for key, value in self.parameters.items():
if self.ELEMENTNAME == 'Operation':
s += ' {}: {}\n'.format(key, value)
else:
s += ' {}: {}\n'.format(key, value)
for conf in self.operations:
s += str(conf)
return s
class OperationConf(ConfBase):
ELEMENTNAME = 'Operation'
xml_labels = ['id', 'name']
def setup(self, id, name, priority, project_id, err_queue):
self.id = str(id)
self.project_id = project_id
self.name = name
self.type = 'other'
self.err_queue = err_queue
def readXml(self, element, project_id, err_queue):
self.id = element.get('id')
self.name = element.get('name')
self.type = 'other'
self.project_id = str(project_id)
self.err_queue = err_queue
for elm in element.iter('Parameter'):
self.addParameter(elm.get('name'), elm.get('value'))
def createObject(self):
className = eval(self.name)
if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name:
kwargs = self.getKwargs()
opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
opObj.start()
self.type = 'external'
else:
opObj = className()
self.object = opObj
return opObj
class ProcUnitConf(ConfBase):
ELEMENTNAME = 'ProcUnit'
xml_labels = ['id', 'inputId', 'name']
def setup(self, project_id, id, name, datatype, inputId, err_queue):
'''
'''
if datatype == None and name == None:
raise ValueError('datatype or name should be defined')
if name == None:
if 'Proc' in datatype:
name = datatype
else:
name = '%sProc' % (datatype)
if datatype == None:
datatype = name.replace('Proc', '')
self.id = str(id)
self.project_id = project_id
self.name = name
self.datatype = datatype
self.inputId = inputId
self.err_queue = err_queue
self.operations = []
self.parameters = {}
def removeOperation(self, id):
i = [1 if x.id==id else 0 for x in self.operations]
self.operations.pop(i.index(1))
def getOperation(self, id):
for conf in self.operations:
if conf.id == id:
return conf
def addOperation(self, name, optype='self'):
'''
'''
id = self.getNewId()
conf = OperationConf()
conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
self.operations.append(conf)
return conf
def readXml(self, element, project_id, err_queue):
self.id = element.get('id')
self.name = element.get('name')
self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
self.project_id = str(project_id)
self.err_queue = err_queue
self.operations = []
self.parameters = {}
for elm in element:
if elm.tag == 'Parameter':
self.addParameter(elm.get('name'), elm.get('value'))
elif elm.tag == 'Operation':
conf = OperationConf()
conf.readXml(elm, project_id, err_queue)
self.operations.append(conf)
def createObjects(self):
'''
Instancia de unidades de procesamiento.
'''
className = eval(self.name)
#print(self.name)
kwargs = self.getKwargs()
procUnitObj = className()
procUnitObj.name = self.name
log.success('creating process...', self.name)
for conf in self.operations:
opObj = conf.createObject()
log.success('adding operation: {}, type:{}'.format(
conf.name,
conf.type), self.name)
procUnitObj.addOperation(conf, opObj)
self.object = procUnitObj
def run(self):
'''
'''
return self.object.call(**self.getKwargs())
class ReadUnitConf(ProcUnitConf):
ELEMENTNAME = 'ReadUnit'
def __init__(self):
self.id = None
self.datatype = None
self.name = None
self.inputId = None
self.operations = []
self.parameters = {}
def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
startTime='', endTime='', server=None, **kwargs):
if datatype == None and name == None:
raise ValueError('datatype or name should be defined')
if name == None:
if 'Reader' in datatype:
name = datatype
datatype = name.replace('Reader','')
else:
name = '{}Reader'.format(datatype)
if datatype == None:
if 'Reader' in name:
datatype = name.replace('Reader','')
else:
datatype = name
name = '{}Reader'.format(name)
self.id = id
self.project_id = project_id
self.name = name
self.datatype = datatype
self.err_queue = err_queue
self.addParameter(name='path', value=path)
self.addParameter(name='startDate', value=startDate)
self.addParameter(name='endDate', value=endDate)
self.addParameter(name='startTime', value=startTime)
self.addParameter(name='endTime', value=endTime)
for key, value in kwargs.items():
self.addParameter(name=key, value=value)
class Project(Process):
"""API to create signal chain projects"""
ELEMENTNAME = 'Project'
def __init__(self, name=''):
Process.__init__(self)
self.id = '1'
if name:
self.name = '{} ({})'.format(Process.__name__, name)
self.filename = None
self.description = None
self.email = None
self.alarm = []
self.configurations = {}
# self.err_queue = Queue()
self.err_queue = None
self.started = False
def getNewId(self):
idList = list(self.configurations.keys())
id = int(self.id) * 10
while True:
id += 1
if str(id) in idList:
continue
break
return str(id)
def updateId(self, new_id):
self.id = str(new_id)
keyList = list(self.configurations.keys())
keyList.sort()
n = 1
new_confs = {}
for procKey in keyList:
conf = self.configurations[procKey]
idProcUnit = str(int(self.id) * 10 + n)
conf.updateId(idProcUnit)
new_confs[idProcUnit] = conf
n += 1
self.configurations = new_confs
def setup(self, id=1, name='', description='', email=None, alarm=[]):
self.id = str(id)
self.description = description
self.email = email
self.alarm = alarm
if name:
self.name = '{} ({})'.format(Process.__name__, name)
def update(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
def clone(self):
p = Project()
p.id = self.id
p.name = self.name
p.description = self.description
p.configurations = self.configurations.copy()
return p
def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
'''
'''
if id is None:
idReadUnit = self.getNewId()
else:
idReadUnit = str(id)
conf = ReadUnitConf()
conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
self.configurations[conf.id] = conf
return conf
def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
'''
'''
if id is None:
idProcUnit = self.getNewId()
else:
idProcUnit = id
conf = ProcUnitConf()
conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
self.configurations[conf.id] = conf
return conf
def removeProcUnit(self, id):
if id in self.configurations:
self.configurations.pop(id)
def getReadUnit(self):
for obj in list(self.configurations.values()):
if obj.ELEMENTNAME == 'ReadUnit':
return obj
return None
def getProcUnit(self, id):
return self.configurations[id]
def getUnits(self):
keys = list(self.configurations)
keys.sort()
for key in keys:
yield self.configurations[key]
def updateUnit(self, id, **kwargs):
conf = self.configurations[id].update(**kwargs)
def makeXml(self):
xml = Element('Project')
xml.set('id', str(self.id))
xml.set('name', self.name)
xml.set('description', self.description)
for conf in self.configurations.values():
conf.makeXml(xml)
self.xml = xml
def writeXml(self, filename=None):
if filename == None:
if self.filename:
filename = self.filename
else:
filename = 'schain.xml'
if not filename:
print('filename has not been defined. Use setFilename(filename) for do it.')
return 0
abs_file = os.path.abspath(filename)
if not os.access(os.path.dirname(abs_file), os.W_OK):
print('No write permission on %s' % os.path.dirname(abs_file))
return 0
if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
print('File %s already exists and it could not be overwriten' % abs_file)
return 0
self.makeXml()
ElementTree(self.xml).write(abs_file, method='xml')
self.filename = abs_file
return 1
def readXml(self, filename):
abs_file = os.path.abspath(filename)
self.configurations = {}
try:
self.xml = ElementTree().parse(abs_file)
except:
log.error('Error reading %s, verify file format' % filename)
return 0
self.id = self.xml.get('id')
self.name = self.xml.get('name')
self.description = self.xml.get('description')
for element in self.xml:
if element.tag == 'ReadUnit':
conf = ReadUnitConf()
conf.readXml(element, self.id, self.err_queue)
self.configurations[conf.id] = conf
elif element.tag == 'ProcUnit':
conf = ProcUnitConf()
input_proc = self.configurations[element.get('inputId')]
conf.readXml(element, self.id, self.err_queue)
self.configurations[conf.id] = conf
self.filename = abs_file
return 1
def __str__(self):
text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
self.id,
self.name,
self.description,
)
for conf in self.configurations.values():
text += '{}'.format(conf)
return text
def createObjects(self):
keys = list(self.configurations.keys())
keys.sort()
for key in keys:
conf = self.configurations[key]
conf.createObjects()
if conf.inputId is not None:
conf.object.setInput(self.configurations[conf.inputId].object)
def monitor(self):
t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
t.start()
def _monitor(self, queue, ctx):
import socket
procs = 0
err_msg = ''
while True:
msg = queue.get()
if '#_start_#' in msg:
procs += 1
elif '#_end_#' in msg:
procs -=1
else:
err_msg = msg
if procs == 0 or 'Traceback' in err_msg:
break
time.sleep(0.1)
if '|' in err_msg:
name, err = err_msg.split('|')
if 'SchainWarning' in err:
log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
elif 'SchainError' in err:
log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
else:
log.error(err, name)
else:
name, err = self.name, err_msg
time.sleep(1)
ctx.term()
message = ''.join(err)
if err_msg:
subject = 'SChain v%s: Error running %s\n' % (
schainpy.__version__, self.name)
subtitle = 'Hostname: %s\n' % socket.gethostbyname(
socket.gethostname())
subtitle += 'Working directory: %s\n' % os.path.abspath('./')
subtitle += 'Configuration file: %s\n' % self.filename
subtitle += 'Time: %s\n' % str(datetime.datetime.now())
readUnitConfObj = self.getReadUnit()
if readUnitConfObj:
subtitle += '\nInput parameters:\n'
subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
a = Alarm(
modes=self.alarm,
email=self.email,
message=message,
subject=subject,
subtitle=subtitle,
filename=self.filename
)
a.start()
def setFilename(self, filename):
self.filename = filename
def runProcs(self):
err = False
n = len(self.configurations)
while not err:
for conf in self.getUnits():
ok = conf.run()
if ok == 'Error':
n -= 1
continue
elif not ok:
break
if n == 0:
err = True
def run(self):
log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
self.started = True
self.start_time = time.time()
self.createObjects()
self.runProcs()
log.success('{} Done (Time: {:4.2f}s)'.format(
self.name,
time.time()-self.start_time), '')