controller.py
918 lines
| 25.5 KiB
| text/x-python
|
PythonLexer
/ schainpy / controller.py
|
r1329 | # Copyright (c) 2012-2020 Jicamarca Radio Observatory | ||
# All rights reserved. | ||||
# | ||||
# Distributed under the terms of the BSD 3-clause license. | ||||
"""API to create signal chain projects | ||||
The API is provide through class: Project | ||||
""" | ||||
|
r1287 | |||
import re | ||||
|
r672 | import sys | ||
|
r514 | import ast | ||
|
r687 | import datetime | ||
|
r672 | import traceback | ||
|
r931 | import time | ||
r1342 | import multiprocessing | |||
|
r1287 | from multiprocessing import Process, Queue | ||
|
r1177 | from threading import Thread | ||
|
r1287 | from xml.etree.ElementTree import ElementTree, Element, SubElement | ||
|
r1171 | |||
r1129 | from schainpy.admin import Alarm, SchainWarning | |||
|
r1191 | from schainpy.model import * | ||
|
r1052 | from schainpy.utils import log | ||
|
r1191 | |||
r1342 | if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7: | |||
multiprocessing.set_start_method('fork') | ||||
|
r1052 | |||
DTYPES = { | ||||
'Voltage': '.r', | ||||
'Spectra': '.pdata' | ||||
} | ||||
|
r1082 | |||
|
r1052 | def MPProject(project, n=cpu_count()): | ||
''' | ||||
Project wrapper to run schain in n processes | ||||
''' | ||||
rconf = project.getReadUnitObj() | ||||
op = rconf.getOperationObj('run') | ||||
dt1 = op.getParameterValue('startDate') | ||||
dt2 = op.getParameterValue('endDate') | ||||
|
r1112 | tm1 = op.getParameterValue('startTime') | ||
tm2 = op.getParameterValue('endTime') | ||||
r892 | days = (dt2 - dt1).days | |||
|
r1082 | |||
for day in range(days + 1): | ||||
r892 | skip = 0 | |||
cursor = 0 | ||||
processes = [] | ||||
|
r1052 | dt = dt1 + datetime.timedelta(day) | ||
dt_str = dt.strftime('%Y/%m/%d') | ||||
reader = JRODataReader() | ||||
paths, files = reader.searchFilesOffLine(path=rconf.path, | ||||
|
r1082 | startDate=dt, | ||
endDate=dt, | ||||
|
r1112 | startTime=tm1, | ||
endTime=tm2, | ||||
|
r1082 | ext=DTYPES[rconf.datatype]) | ||
|
r1052 | nFiles = len(files) | ||
if nFiles == 0: | ||||
|
r999 | continue | ||
r1279 | skip = int(math.ceil(nFiles / n)) | |||
|
r1082 | while nFiles > cursor * skip: | ||
rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor, | ||||
skip=skip) | ||||
p = project.clone() | ||||
|
r1052 | p.start() | ||
processes.append(p) | ||||
|
r924 | cursor += 1 | ||
def beforeExit(exctype, value, trace): | ||||
r892 | for process in processes: | |||
process.terminate() | ||||
process.join() | ||||
|
r1167 | print(traceback.print_tb(trace)) | ||
|
r957 | |||
|
r924 | sys.excepthook = beforeExit | ||
r892 | ||||
for process in processes: | ||||
process.join() | ||||
|
r924 | process.terminate() | ||
|
r958 | |||
|
r931 | time.sleep(3) | ||
r892 | ||||
|
r1177 | def wait(context): | ||
r1279 | ||||
|
r1177 | time.sleep(1) | ||
c = zmq.Context() | ||||
receiver = c.socket(zmq.SUB) | ||||
r1279 | receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id)) | |||
|
r1177 | receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode()) | ||
msg = receiver.recv_multipart()[1] | ||||
context.terminate() | ||||
|
r197 | class ParameterConf(): | ||
r889 | ||||
|
r197 | id = None | ||
name = None | ||||
value = None | ||||
|
r199 | format = None | ||
r889 | ||||
|
r529 | __formated_value = None | ||
r889 | ||||
|
r197 | ELEMENTNAME = 'Parameter' | ||
r889 | ||||
|
r197 | def __init__(self): | ||
r889 | ||||
|
r199 | self.format = 'str' | ||
r889 | ||||
|
r197 | def getElementName(self): | ||
r889 | ||||
|
r197 | return self.ELEMENTNAME | ||
r889 | ||||
|
r197 | def getValue(self): | ||
|
r600 | |||
value = self.value | ||||
format = self.format | ||||
r889 | ||||
|
r529 | if self.__formated_value != None: | ||
r889 | ||||
|
r529 | return self.__formated_value | ||
r889 | ||||
r892 | if format == 'obj': | |||
return value | ||||
|
r600 | if format == 'str': | ||
|
r596 | self.__formated_value = str(value) | ||
return self.__formated_value | ||||
r889 | ||||
|
r596 | if value == '': | ||
|
r1167 | raise ValueError('%s: This parameter value is empty' % self.name) | ||
r889 | ||||
|
r600 | if format == 'list': | ||
|
r1193 | strList = [s.strip() for s in value.split(',')] | ||
|
r529 | self.__formated_value = strList | ||
r889 | ||||
|
r529 | return self.__formated_value | ||
r889 | ||||
|
r600 | if format == 'intlist': | ||
|
r1052 | ''' | ||
|
r535 | Example: | ||
value = (0,1,2) | ||||
|
r1052 | ''' | ||
r889 | ||||
|
r735 | new_value = ast.literal_eval(value) | ||
r889 | ||||
|
r735 | if type(new_value) not in (tuple, list): | ||
new_value = [int(new_value)] | ||||
r889 | ||||
|
r735 | self.__formated_value = new_value | ||
r889 | ||||
|
r529 | return self.__formated_value | ||
r889 | ||||
|
r600 | if format == 'floatlist': | ||
|
r1052 | ''' | ||
|
r535 | Example: | ||
value = (0.5, 1.4, 2.7) | ||||
|
r1052 | ''' | ||
r889 | ||||
|
r735 | new_value = ast.literal_eval(value) | ||
r889 | ||||
|
r735 | if type(new_value) not in (tuple, list): | ||
new_value = [float(new_value)] | ||||
r889 | ||||
|
r741 | self.__formated_value = new_value | ||
r889 | ||||
|
r529 | return self.__formated_value | ||
r889 | ||||
|
r600 | if format == 'date': | ||
|
r224 | strList = value.split('/') | ||
|
r197 | intList = [int(x) for x in strList] | ||
date = datetime.date(intList[0], intList[1], intList[2]) | ||||
r889 | ||||
|
r529 | self.__formated_value = date | ||
r889 | ||||
|
r529 | return self.__formated_value | ||
r889 | ||||
|
r600 | if format == 'time': | ||
|
r224 | strList = value.split(':') | ||
|
r197 | intList = [int(x) for x in strList] | ||
time = datetime.time(intList[0], intList[1], intList[2]) | ||||
r889 | ||||
|
r529 | self.__formated_value = time | ||
r889 | ||||
|
r529 | return self.__formated_value | ||
r889 | ||||
|
r600 | if format == 'pairslist': | ||
|
r1052 | ''' | ||
|
r226 | Example: | ||
value = (0,1),(1,2) | ||||
|
r1052 | ''' | ||
|
r226 | |||
|
r735 | new_value = ast.literal_eval(value) | ||
r889 | ||||
|
r735 | if type(new_value) not in (tuple, list): | ||
|
r1167 | raise ValueError('%s has to be a tuple or list of pairs' % value) | ||
r889 | ||||
|
r735 | if type(new_value[0]) not in (tuple, list): | ||
if len(new_value) != 2: | ||||
|
r1167 | raise ValueError('%s has to be a tuple or list of pairs' % value) | ||
|
r735 | new_value = [new_value] | ||
r889 | ||||
|
r735 | for thisPair in new_value: | ||
if len(thisPair) != 2: | ||||
|
r1167 | raise ValueError('%s has to be a tuple or list of pairs' % value) | ||
r889 | ||||
|
r735 | self.__formated_value = new_value | ||
r889 | ||||
|
r529 | return self.__formated_value | ||
r889 | ||||
|
r600 | if format == 'multilist': | ||
|
r1052 | ''' | ||
|
r514 | Example: | ||
value = (0,1,2),(3,4,5) | ||||
|
r1052 | ''' | ||
|
r514 | multiList = ast.literal_eval(value) | ||
r889 | ||||
|
r600 | if type(multiList[0]) == int: | ||
|
r1052 | multiList = ast.literal_eval('(' + value + ')') | ||
r889 | ||||
|
r529 | self.__formated_value = multiList | ||
r889 | ||||
|
r529 | return self.__formated_value | ||
r889 | ||||
|
r677 | if format == 'bool': | ||
value = int(value) | ||||
r889 | ||||
|
r677 | if format == 'int': | ||
value = float(value) | ||||
r889 | ||||
|
r600 | format_func = eval(format) | ||
r889 | ||||
|
r529 | self.__formated_value = format_func(value) | ||
r889 | ||||
|
r529 | return self.__formated_value | ||
|
r596 | |||
def updateId(self, new_id): | ||||
r889 | ||||
|
r596 | self.id = str(new_id) | ||
r889 | ||||
|
r199 | def setup(self, id, name, value, format='str'): | ||
|
r596 | self.id = str(id) | ||
|
r197 | self.name = name | ||
r892 | if format == 'obj': | |||
self.value = value | ||||
else: | ||||
self.value = str(value) | ||||
|
r535 | self.format = str.lower(format) | ||
r889 | ||||
|
r735 | self.getValue() | ||
r889 | ||||
|
r643 | return 1 | ||
r889 | ||||
|
r577 | def update(self, name, value, format='str'): | ||
r889 | ||||
|
r577 | self.name = name | ||
self.value = str(value) | ||||
self.format = format | ||||
r889 | ||||
|
r197 | def makeXml(self, opElement): | ||
|
r898 | if self.name not in ('queue',): | ||
parmElement = SubElement(opElement, self.ELEMENTNAME) | ||||
parmElement.set('id', str(self.id)) | ||||
parmElement.set('name', self.name) | ||||
parmElement.set('value', self.value) | ||||
parmElement.set('format', self.format) | ||||
r1279 | ||||
|
r197 | def readXml(self, parmElement): | ||
r889 | ||||
|
r197 | self.id = parmElement.get('id') | ||
self.name = parmElement.get('name') | ||||
self.value = parmElement.get('value') | ||||
|
r568 | self.format = str.lower(parmElement.get('format')) | ||
r889 | ||||
|
r1082 | # Compatible with old signal chain version | ||
|
r568 | if self.format == 'int' and self.name == 'idfigure': | ||
self.name = 'id' | ||||
r889 | ||||
|
r197 | def printattr(self): | ||
r889 | ||||
|
r1184 | print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id)) | ||
|
r1082 | |||
|
r1052 | class OperationConf(): | ||
r889 | ||||
|
r197 | ELEMENTNAME = 'Operation' | ||
r889 | ||||
|
r197 | def __init__(self): | ||
r889 | ||||
|
r589 | self.id = '0' | ||
|
r568 | self.name = None | ||
self.priority = None | ||||
|
r1287 | self.parameters = {} | ||
self.object = None | ||||
self.operations = [] | ||||
|
r596 | |||
|
r1171 | def getId(self): | ||
|
r1287 | |||
|
r1171 | return self.id | ||
|
r1287 | def getNewId(self): | ||
return int(self.id) * 10 + len(self.operations) + 1 | ||||
|
r1171 | |||
|
r596 | def updateId(self, new_id): | ||
r889 | ||||
|
r596 | self.id = str(new_id) | ||
r889 | ||||
|
r596 | n = 1 | ||
|
r1287 | for conf in self.operations: | ||
conf_id = str(int(new_id) * 10 + n) | ||||
conf.updateId(conf_id) | ||||
|
r596 | n += 1 | ||
r889 | ||||
|
r1287 | def getKwargs(self): | ||
r889 | ||||
|
r1287 | params = {} | ||
r889 | ||||
|
r1287 | for key, value in self.parameters.items(): | ||
if value not in (None, '', ' '): | ||||
params[key] = value | ||||
r889 | ||||
|
r1287 | return params | ||
r889 | ||||
|
r1287 | def update(self, **kwargs): | ||
|
r577 | |||
|
r1287 | for key, value in kwargs.items(): | ||
self.addParameter(name=key, value=value) | ||||
r889 | ||||
|
r1287 | def addParameter(self, name, value, format=None): | ||
''' | ||||
''' | ||||
r889 | ||||
|
r1287 | if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value): | ||
self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')]) | ||||
elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value): | ||||
self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')]) | ||||
else: | ||||
try: | ||||
self.parameters[name] = ast.literal_eval(value) | ||||
except: | ||||
if isinstance(value, str) and ',' in value: | ||||
self.parameters[name] = value.split(',') | ||||
else: | ||||
self.parameters[name] = value | ||||
def getParameters(self): | ||||
params = {} | ||||
for key, value in self.parameters.items(): | ||||
s = type(value).__name__ | ||||
if s == 'date': | ||||
params[key] = value.strftime('%Y/%m/%d') | ||||
elif s == 'time': | ||||
params[key] = value.strftime('%H:%M:%S') | ||||
else: | ||||
params[key] = str(value) | ||||
r889 | ||||
|
r1287 | return params | ||
r889 | ||||
|
r1287 | def makeXml(self, element): | ||
r889 | ||||
|
r1287 | xml = SubElement(element, self.ELEMENTNAME) | ||
for label in self.xml_labels: | ||||
xml.set(label, str(getattr(self, label))) | ||||
r889 | ||||
|
r1287 | for key, value in self.getParameters().items(): | ||
xml_param = SubElement(xml, 'Parameter') | ||||
xml_param.set('name', key) | ||||
xml_param.set('value', value) | ||||
|
r1082 | |||
|
r1287 | for conf in self.operations: | ||
conf.makeXml(xml) | ||||
|
r1082 | |||
|
r1287 | def __str__(self): | ||
r889 | ||||
|
r1287 | if self.ELEMENTNAME == 'Operation': | ||
s = ' {}[id={}]\n'.format(self.name, self.id) | ||||
else: | ||||
s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId) | ||||
r889 | ||||
|
r1287 | for key, value in self.parameters.items(): | ||
if self.ELEMENTNAME == 'Operation': | ||||
s += ' {}: {}\n'.format(key, value) | ||||
else: | ||||
s += ' {}: {}\n'.format(key, value) | ||||
r889 | ||||
|
r1287 | for conf in self.operations: | ||
s += str(conf) | ||||
r889 | ||||
|
r1287 | return s | ||
r889 | ||||
|
r1287 | class OperationConf(ConfBase): | ||
r889 | ||||
|
r1287 | ELEMENTNAME = 'Operation' | ||
xml_labels = ['id', 'name'] | ||||
r889 | ||||
|
r1287 | def setup(self, id, name, priority, project_id, err_queue): | ||
r889 | ||||
|
r596 | self.id = str(id) | ||
|
r1177 | self.project_id = project_id | ||
|
r197 | self.name = name | ||
|
r1287 | self.type = 'other' | ||
r1241 | self.err_queue = err_queue | |||
r889 | ||||
|
r1287 | def readXml(self, element, project_id, err_queue): | ||
r889 | ||||
|
r1287 | self.id = element.get('id') | ||
self.name = element.get('name') | ||||
self.type = 'other' | ||||
r1279 | self.project_id = str(project_id) | |||
|
r1287 | self.err_queue = err_queue | ||
r889 | ||||
|
r1287 | for elm in element.iter('Parameter'): | ||
self.addParameter(elm.get('name'), elm.get('value')) | ||||
r889 | ||||
|
r1171 | def createObject(self): | ||
r889 | ||||
|
r1171 | className = eval(self.name) | ||
|
r1184 | |||
|
r1308 | if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name: | ||
|
r1177 | kwargs = self.getKwargs() | ||
|
r1287 | opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs) | ||
|
r1177 | opObj.start() | ||
|
r1287 | self.type = 'external' | ||
else: | ||||
opObj = className() | ||||
r889 | ||||
|
r1287 | self.object = opObj | ||
|
r197 | return opObj | ||
r889 | ||||
|
r1287 | class ProcUnitConf(ConfBase): | ||
r889 | ||||
|
r197 | ELEMENTNAME = 'ProcUnit' | ||
|
r1287 | xml_labels = ['id', 'inputId', 'name'] | ||
r889 | ||||
|
r1287 | def setup(self, project_id, id, name, datatype, inputId, err_queue): | ||
|
r1171 | ''' | ||
''' | ||||
r1279 | ||||
|
r1082 | if datatype == None and name == None: | ||
|
r1167 | raise ValueError('datatype or name should be defined') | ||
r889 | ||||
|
r1082 | if name == None: | ||
|
r596 | if 'Proc' in datatype: | ||
name = datatype | ||||
else: | ||||
|
r1082 | name = '%sProc' % (datatype) | ||
r889 | ||||
|
r1082 | if datatype == None: | ||
datatype = name.replace('Proc', '') | ||||
r889 | ||||
|
r596 | self.id = str(id) | ||
|
r1177 | self.project_id = project_id | ||
|
r197 | self.name = name | ||
|
r199 | self.datatype = datatype | ||
r1241 | self.inputId = inputId | |||
self.err_queue = err_queue | ||||
|
r1287 | self.operations = [] | ||
self.parameters = {} | ||||
r889 | ||||
|
r1287 | def removeOperation(self, id): | ||
r889 | ||||
|
r1287 | i = [1 if x.id==id else 0 for x in self.operations] | ||
self.operations.pop(i.index(1)) | ||||
r889 | ||||
|
r1287 | def getOperation(self, id): | ||
r889 | ||||
|
r1287 | for conf in self.operations: | ||
if conf.id == id: | ||||
return conf | ||||
r889 | ||||
|
r1177 | def addOperation(self, name, optype='self'): | ||
|
r1171 | ''' | ||
''' | ||||
r889 | ||||
|
r1287 | id = self.getNewId() | ||
conf = OperationConf() | ||||
conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue) | ||||
self.operations.append(conf) | ||||
r889 | ||||
|
r1287 | return conf | ||
r889 | ||||
|
r1287 | def readXml(self, element, project_id, err_queue): | ||
r889 | ||||
|
r1287 | self.id = element.get('id') | ||
self.name = element.get('name') | ||||
self.inputId = None if element.get('inputId') == 'None' else element.get('inputId') | ||||
self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), '')) | ||||
|
r1184 | self.project_id = str(project_id) | ||
|
r1287 | self.err_queue = err_queue | ||
self.operations = [] | ||||
self.parameters = {} | ||||
r889 | ||||
|
r1287 | for elm in element: | ||
if elm.tag == 'Parameter': | ||||
self.addParameter(elm.get('name'), elm.get('value')) | ||||
elif elm.tag == 'Operation': | ||||
conf = OperationConf() | ||||
conf.readXml(elm, project_id, err_queue) | ||||
self.operations.append(conf) | ||||
r889 | ||||
|
r1177 | def createObjects(self): | ||
|
r1171 | ''' | ||
|
r1177 | Instancia de unidades de procesamiento. | ||
|
r1171 | ''' | ||
|
r1192 | |||
|
r197 | className = eval(self.name) | ||
r1279 | #print(self.name) | |||
r889 | kwargs = self.getKwargs() | |||
|
r1287 | procUnitObj = className() | ||
procUnitObj.name = self.name | ||||
|
r1177 | log.success('creating process...', self.name) | ||
r889 | ||||
|
r1287 | for conf in self.operations: | ||
r1279 | ||||
|
r1287 | opObj = conf.createObject() | ||
r1279 | ||||
r1241 | log.success('adding operation: {}, type:{}'.format( | |||
|
r1287 | conf.name, | ||
conf.type), self.name) | ||||
r1279 | ||||
|
r1287 | procUnitObj.addOperation(conf, opObj) | ||
r1279 | ||||
|
r1287 | self.object = procUnitObj | ||
r889 | ||||
|
r1287 | def run(self): | ||
''' | ||||
''' | ||||
r889 | ||||
|
r1287 | return self.object.call(**self.getKwargs()) | ||
r889 | ||||
|
r1082 | |||
|
r197 | class ReadUnitConf(ProcUnitConf): | ||
r889 | ||||
|
r197 | ELEMENTNAME = 'ReadUnit' | ||
r889 | ||||
|
r197 | def __init__(self): | ||
r889 | ||||
|
r197 | self.id = None | ||
|
r199 | self.datatype = None | ||
|
r197 | self.name = None | ||
|
r589 | self.inputId = None | ||
|
r1287 | self.operations = [] | ||
self.parameters = {} | ||||
r1279 | ||||
r1241 | def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='', | |||
|
r1177 | startTime='', endTime='', server=None, **kwargs): | ||
|
r1052 | |||
|
r1082 | if datatype == None and name == None: | ||
|
r1167 | raise ValueError('datatype or name should be defined') | ||
|
r1088 | if name == None: | ||
|
r596 | if 'Reader' in datatype: | ||
name = datatype | ||||
|
r1088 | datatype = name.replace('Reader','') | ||
|
r596 | else: | ||
|
r1088 | name = '{}Reader'.format(datatype) | ||
if datatype == None: | ||||
if 'Reader' in name: | ||||
datatype = name.replace('Reader','') | ||||
else: | ||||
datatype = name | ||||
name = '{}Reader'.format(name) | ||||
r889 | ||||
|
r197 | self.id = id | ||
|
r1177 | self.project_id = project_id | ||
|
r197 | self.name = name | ||
|
r199 | self.datatype = datatype | ||
r1279 | self.err_queue = err_queue | |||
r889 | ||||
|
r1287 | self.addParameter(name='path', value=path) | ||
self.addParameter(name='startDate', value=startDate) | ||||
self.addParameter(name='endDate', value=endDate) | ||||
self.addParameter(name='startTime', value=startTime) | ||||
self.addParameter(name='endTime', value=endTime) | ||||
r889 | ||||
|
r1287 | for key, value in kwargs.items(): | ||
self.addParameter(name=key, value=value) | ||||
r889 | ||||
|
r1082 | |||
|
r1040 | class Project(Process): | ||
|
r1329 | """API to create signal chain projects""" | ||
|
r1052 | |||
|
r224 | ELEMENTNAME = 'Project' | ||
r889 | ||||
|
r1330 | def __init__(self, name=''): | ||
|
r1052 | |||
|
r1040 | Process.__init__(self) | ||
|
r1329 | self.id = '1' | ||
if name: | ||||
self.name = '{} ({})'.format(Process.__name__, name) | ||||
|
r1177 | self.filename = None | ||
|
r197 | self.description = None | ||
|
r1126 | self.email = None | ||
|
r1287 | self.alarm = [] | ||
self.configurations = {} | ||||
# self.err_queue = Queue() | ||||
self.err_queue = None | ||||
self.started = False | ||||
|
r1052 | |||
|
r1287 | def getNewId(self): | ||
r889 | ||||
|
r1287 | idList = list(self.configurations.keys()) | ||
|
r1082 | id = int(self.id) * 10 | ||
r889 | ||||
|
r764 | while True: | ||
id += 1 | ||||
r889 | ||||
|
r764 | if str(id) in idList: | ||
continue | ||||
r889 | ||||
|
r764 | break | ||
r889 | ||||
|
r197 | return str(id) | ||
r889 | ||||
|
r596 | def updateId(self, new_id): | ||
r889 | ||||
|
r596 | self.id = str(new_id) | ||
r889 | ||||
|
r1287 | keyList = list(self.configurations.keys()) | ||
|
r596 | keyList.sort() | ||
r889 | ||||
|
r596 | n = 1 | ||
|
r1287 | new_confs = {} | ||
r889 | ||||
|
r596 | for procKey in keyList: | ||
r889 | ||||
|
r1287 | conf = self.configurations[procKey] | ||
|
r1082 | idProcUnit = str(int(self.id) * 10 + n) | ||
|
r1287 | conf.updateId(idProcUnit) | ||
new_confs[idProcUnit] = conf | ||||
|
r596 | n += 1 | ||
r889 | ||||
|
r1287 | self.configurations = new_confs | ||
r889 | ||||
|
r1177 | def setup(self, id=1, name='', description='', email=None, alarm=[]): | ||
r889 | ||||
|
r596 | self.id = str(id) | ||
r1279 | self.description = description | |||
|
r1126 | self.email = email | ||
self.alarm = alarm | ||||
r1241 | if name: | |||
self.name = '{} ({})'.format(Process.__name__, name) | ||||
|
r577 | |||
|
r1126 | def update(self, **kwargs): | ||
r889 | ||||
|
r1287 | for key, value in kwargs.items(): | ||
|
r1126 | setattr(self, key, value) | ||
r889 | ||||
|
r1052 | def clone(self): | ||
p = Project() | ||||
|
r1287 | p.id = self.id | ||
p.name = self.name | ||||
p.description = self.description | ||||
p.configurations = self.configurations.copy() | ||||
|
r1052 | return p | ||
|
r687 | def addReadUnit(self, id=None, datatype=None, name=None, **kwargs): | ||
|
r1052 | |||
|
r1171 | ''' | ||
''' | ||||
|
r687 | if id is None: | ||
|
r1287 | idReadUnit = self.getNewId() | ||
|
r687 | else: | ||
idReadUnit = str(id) | ||||
r889 | ||||
|
r1287 | conf = ReadUnitConf() | ||
conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs) | ||||
self.configurations[conf.id] = conf | ||||
r1279 | ||||
|
r1287 | return conf | ||
r889 | ||||
|
r1287 | def addProcUnit(self, id=None, inputId='0', datatype=None, name=None): | ||
r889 | ||||
|
r1171 | ''' | ||
''' | ||||
|
r1287 | if id is None: | ||
idProcUnit = self.getNewId() | ||||
else: | ||||
idProcUnit = id | ||||
r889 | ||||
|
r1287 | conf = ProcUnitConf() | ||
conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue) | ||||
self.configurations[conf.id] = conf | ||||
r889 | ||||
|
r1287 | return conf | ||
r889 | ||||
|
r587 | def removeProcUnit(self, id): | ||
r889 | ||||
|
r1287 | if id in self.configurations: | ||
self.configurations.pop(id) | ||||
r889 | ||||
|
r1287 | def getReadUnit(self): | ||
r889 | ||||
|
r1287 | for obj in list(self.configurations.values()): | ||
if obj.ELEMENTNAME == 'ReadUnit': | ||||
|
r577 | return obj | ||
r889 | ||||
|
r577 | return None | ||
r889 | ||||
|
r1287 | def getProcUnit(self, id): | ||
r889 | ||||
|
r1287 | return self.configurations[id] | ||
r889 | ||||
|
r1287 | def getUnits(self): | ||
r889 | ||||
|
r1287 | keys = list(self.configurations) | ||
keys.sort() | ||||
r889 | ||||
|
r1287 | for key in keys: | ||
yield self.configurations[key] | ||||
r889 | ||||
|
r1287 | def updateUnit(self, id, **kwargs): | ||
r889 | ||||
|
r1287 | conf = self.configurations[id].update(**kwargs) | ||
r889 | ||||
def makeXml(self): | ||||
|
r1287 | xml = Element('Project') | ||
xml.set('id', str(self.id)) | ||||
xml.set('name', self.name) | ||||
xml.set('description', self.description) | ||||
r889 | ||||
|
r1287 | for conf in self.configurations.values(): | ||
conf.makeXml(xml) | ||||
r889 | ||||
|
r1287 | self.xml = xml | ||
r889 | ||||
|
r702 | def writeXml(self, filename=None): | ||
r889 | ||||
|
r702 | if filename == None: | ||
|
r708 | if self.filename: | ||
filename = self.filename | ||||
else: | ||||
|
r1052 | filename = 'schain.xml' | ||
r889 | ||||
|
r702 | if not filename: | ||
|
r1167 | print('filename has not been defined. Use setFilename(filename) for do it.') | ||
|
r702 | return 0 | ||
r889 | ||||
|
r687 | abs_file = os.path.abspath(filename) | ||
r889 | ||||
|
r687 | if not os.access(os.path.dirname(abs_file), os.W_OK): | ||
|
r1167 | print('No write permission on %s' % os.path.dirname(abs_file)) | ||
|
r681 | return 0 | ||
r889 | ||||
|
r687 | if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)): | ||
|
r1167 | print('File %s already exists and it could not be overwriten' % abs_file) | ||
|
r681 | return 0 | ||
r889 | ||||
|
r681 | self.makeXml() | ||
r889 | ||||
|
r1287 | ElementTree(self.xml).write(abs_file, method='xml') | ||
r889 | ||||
|
r708 | self.filename = abs_file | ||
r889 | ||||
|
r681 | return 1 | ||
|
r197 | |||
|
r1287 | def readXml(self, filename): | ||
r889 | ||||
|
r687 | abs_file = os.path.abspath(filename) | ||
r889 | ||||
|
r1287 | self.configurations = {} | ||
r889 | ||||
|
r735 | try: | ||
|
r1287 | self.xml = ElementTree().parse(abs_file) | ||
|
r735 | except: | ||
|
r1287 | log.error('Error reading %s, verify file format' % filename) | ||
|
r735 | return 0 | ||
r889 | ||||
|
r1287 | self.id = self.xml.get('id') | ||
self.name = self.xml.get('name') | ||||
self.description = self.xml.get('description') | ||||
for element in self.xml: | ||||
if element.tag == 'ReadUnit': | ||||
conf = ReadUnitConf() | ||||
conf.readXml(element, self.id, self.err_queue) | ||||
self.configurations[conf.id] = conf | ||||
elif element.tag == 'ProcUnit': | ||||
conf = ProcUnitConf() | ||||
input_proc = self.configurations[element.get('inputId')] | ||||
conf.readXml(element, self.id, self.err_queue) | ||||
self.configurations[conf.id] = conf | ||||
r889 | ||||
|
r708 | self.filename = abs_file | ||
r889 | ||||
|
r681 | return 1 | ||
r889 | ||||
|
r1177 | def __str__(self): | ||
r889 | ||||
|
r1287 | text = '\nProject[id=%s, name=%s, description=%s]\n\n' % ( | ||
self.id, | ||||
self.name, | ||||
self.description, | ||||
) | ||||
for conf in self.configurations.values(): | ||||
text += '{}'.format(conf) | ||||
|
r1082 | |||
|
r1287 | return text | ||
r889 | ||||
|
r197 | def createObjects(self): | ||
r889 | ||||
|
r1287 | keys = list(self.configurations.keys()) | ||
|
r1192 | keys.sort() | ||
for key in keys: | ||||
|
r1287 | conf = self.configurations[key] | ||
conf.createObjects() | ||||
if conf.inputId is not None: | ||||
conf.object.setInput(self.configurations[conf.inputId].object) | ||||
r889 | ||||
r1241 | def monitor(self): | |||
r889 | ||||
|
r1287 | t = Thread(target=self._monitor, args=(self.err_queue, self.ctx)) | ||
r1241 | t.start() | |||
r1279 | ||||
|
r1287 | def _monitor(self, queue, ctx): | ||
r889 | ||||
r1241 | import socket | |||
r1279 | ||||
r1241 | procs = 0 | |||
err_msg = '' | ||||
r1279 | ||||
r1241 | while True: | |||
msg = queue.get() | ||||
if '#_start_#' in msg: | ||||
procs += 1 | ||||
elif '#_end_#' in msg: | ||||
procs -=1 | ||||
else: | ||||
err_msg = msg | ||||
r1279 | ||||
if procs == 0 or 'Traceback' in err_msg: | ||||
r1241 | break | |||
time.sleep(0.1) | ||||
r1279 | ||||
r1241 | if '|' in err_msg: | |||
name, err = err_msg.split('|') | ||||
if 'SchainWarning' in err: | ||||
log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name) | ||||
elif 'SchainError' in err: | ||||
log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name) | ||||
else: | ||||
log.error(err, name) | ||||
r1279 | else: | |||
r1241 | name, err = self.name, err_msg | |||
r1279 | ||||
|
r1287 | time.sleep(1) | ||
r1279 | ||||
r1241 | ctx.term() | |||
|
r735 | |||
|
r1052 | message = ''.join(err) | ||
r889 | ||||
r1241 | if err_msg: | |||
subject = 'SChain v%s: Error running %s\n' % ( | ||||
schainpy.__version__, self.name) | ||||
subtitle = 'Hostname: %s\n' % socket.gethostbyname( | ||||
socket.gethostname()) | ||||
subtitle += 'Working directory: %s\n' % os.path.abspath('./') | ||||
subtitle += 'Configuration file: %s\n' % self.filename | ||||
subtitle += 'Time: %s\n' % str(datetime.datetime.now()) | ||||
|
r1287 | readUnitConfObj = self.getReadUnit() | ||
r1241 | if readUnitConfObj: | |||
subtitle += '\nInput parameters:\n' | ||||
|
r1287 | subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path'] | ||
subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate'] | ||||
subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate'] | ||||
subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime'] | ||||
subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime'] | ||||
r1241 | ||||
a = Alarm( | ||||
r1279 | modes=self.alarm, | |||
r1241 | email=self.email, | |||
message=message, | ||||
subject=subject, | ||||
subtitle=subtitle, | ||||
filename=self.filename | ||||
) | ||||
a.start() | ||||
r1129 | ||||
|
r691 | def setFilename(self, filename): | ||
r889 | ||||
|
r691 | self.filename = filename | ||
r889 | ||||
|
r1287 | def runProcs(self): | ||
|
r1177 | |||
|
r1287 | err = False | ||
n = len(self.configurations) | ||||
r1279 | ||||
|
r1287 | while not err: | ||
for conf in self.getUnits(): | ||||
r1370 | ok = conf.run() | |||
r1342 | if ok == 'Error': | |||
|
r1287 | n -= 1 | ||
continue | ||||
elif not ok: | ||||
break | ||||
if n == 0: | ||||
err = True | ||||
r889 | ||||
|
r1052 | def run(self): | ||
|
r1004 | |||
|
r1287 | log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='') | ||
self.started = True | ||||
r1279 | self.start_time = time.time() | |||
self.createObjects() | ||||
|
r1287 | self.runProcs() | ||
log.success('{} Done (Time: {:4.2f}s)'.format( | ||||
|
r1112 | self.name, | ||
r1241 | time.time()-self.start_time), '') | |||