From 603b419b264119863c29562a597349c34177c37e 2021-08-25 19:22:07 From: joabAM Date: 2021-08-25 19:22:07 Subject: [PATCH] Separación de datos antes del 17 de Julio del 2021, donde se modificó los apuntes y la lectura de estos --- diff --git a/amisr_reader.xml b/amisr_reader.xml new file mode 100644 index 0000000..d9c0744 --- /dev/null +++ b/amisr_reader.xml @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/schainpy/controller.py b/schainpy/controller.py index a592ac0..427059a 100644 --- a/schainpy/controller.py +++ b/schainpy/controller.py @@ -1,918 +1,659 @@ -# Copyright (c) 2012-2020 Jicamarca Radio Observatory -# All rights reserved. -# -# Distributed under the terms of the BSD 3-clause license. -"""API to create signal chain projects - -The API is provide through class: Project -""" - -import re -import sys -import ast -import datetime -import traceback -import time -import multiprocessing -from multiprocessing import Process, Queue -from threading import Thread -from xml.etree.ElementTree import ElementTree, Element, SubElement - -from schainpy.admin import Alarm, SchainWarning -from schainpy.model import * -from schainpy.utils import log - -if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7: - multiprocessing.set_start_method('fork') - -DTYPES = { - 'Voltage': '.r', - 'Spectra': '.pdata' -} - - -def MPProject(project, n=cpu_count()): - ''' - Project wrapper to run schain in n processes - ''' - - rconf = project.getReadUnitObj() - op = rconf.getOperationObj('run') - dt1 = op.getParameterValue('startDate') - dt2 = op.getParameterValue('endDate') - tm1 = op.getParameterValue('startTime') - tm2 = op.getParameterValue('endTime') - days = (dt2 - dt1).days - - for day in range(days + 1): - skip = 0 - cursor = 0 - processes = [] - dt = dt1 + datetime.timedelta(day) - dt_str = dt.strftime('%Y/%m/%d') - reader = JRODataReader() - paths, files = reader.searchFilesOffLine(path=rconf.path, - startDate=dt, - endDate=dt, - startTime=tm1, - endTime=tm2, - ext=DTYPES[rconf.datatype]) - nFiles = len(files) - if nFiles == 0: - continue - skip = int(math.ceil(nFiles / n)) - while nFiles > cursor * skip: - rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor, - skip=skip) - p = project.clone() - p.start() - processes.append(p) - cursor += 1 - - def beforeExit(exctype, value, trace): - for process in processes: - process.terminate() - process.join() - print(traceback.print_tb(trace)) - - sys.excepthook = beforeExit - - for process in processes: - process.join() - process.terminate() - - time.sleep(3) - -def wait(context): - - time.sleep(1) - c = zmq.Context() - receiver = c.socket(zmq.SUB) - receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id)) - receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode()) - msg = receiver.recv_multipart()[1] - context.terminate() - -class ParameterConf(): - - id = None - name = None - value = None - format = None - - __formated_value = None - - ELEMENTNAME = 'Parameter' - - def __init__(self): - - self.format = 'str' - - def getElementName(self): - - return self.ELEMENTNAME - - def getValue(self): - - value = self.value - format = self.format - - if self.__formated_value != None: - - return self.__formated_value - - if format == 'obj': - return value - - if format == 'str': - self.__formated_value = str(value) - return self.__formated_value - - if value == '': - raise ValueError('%s: This parameter value is empty' % self.name) - - if format == 'list': - strList = [s.strip() for s in value.split(',')] - self.__formated_value = strList - - return self.__formated_value - - if format == 'intlist': - ''' - Example: - value = (0,1,2) - ''' - - new_value = ast.literal_eval(value) - - if type(new_value) not in (tuple, list): - new_value = [int(new_value)] - - self.__formated_value = new_value - - return self.__formated_value - - if format == 'floatlist': - ''' - Example: - value = (0.5, 1.4, 2.7) - ''' - - new_value = ast.literal_eval(value) - - if type(new_value) not in (tuple, list): - new_value = [float(new_value)] - - self.__formated_value = new_value - - return self.__formated_value - - if format == 'date': - strList = value.split('/') - intList = [int(x) for x in strList] - date = datetime.date(intList[0], intList[1], intList[2]) - - self.__formated_value = date - - return self.__formated_value - - if format == 'time': - strList = value.split(':') - intList = [int(x) for x in strList] - time = datetime.time(intList[0], intList[1], intList[2]) - - self.__formated_value = time - - return self.__formated_value - - if format == 'pairslist': - ''' - Example: - value = (0,1),(1,2) - ''' - - new_value = ast.literal_eval(value) - - if type(new_value) not in (tuple, list): - raise ValueError('%s has to be a tuple or list of pairs' % value) - - if type(new_value[0]) not in (tuple, list): - if len(new_value) != 2: - raise ValueError('%s has to be a tuple or list of pairs' % value) - new_value = [new_value] - - for thisPair in new_value: - if len(thisPair) != 2: - raise ValueError('%s has to be a tuple or list of pairs' % value) - - self.__formated_value = new_value - - return self.__formated_value - - if format == 'multilist': - ''' - Example: - value = (0,1,2),(3,4,5) - ''' - multiList = ast.literal_eval(value) - - if type(multiList[0]) == int: - multiList = ast.literal_eval('(' + value + ')') - - self.__formated_value = multiList - - return self.__formated_value - - if format == 'bool': - value = int(value) - - if format == 'int': - value = float(value) - - format_func = eval(format) - - self.__formated_value = format_func(value) - - return self.__formated_value - - def updateId(self, new_id): - - self.id = str(new_id) - - def setup(self, id, name, value, format='str'): - self.id = str(id) - self.name = name - if format == 'obj': - self.value = value - else: - self.value = str(value) - self.format = str.lower(format) - - self.getValue() - - return 1 - - def update(self, name, value, format='str'): - - self.name = name - self.value = str(value) - self.format = format - - def makeXml(self, opElement): - if self.name not in ('queue',): - parmElement = SubElement(opElement, self.ELEMENTNAME) - parmElement.set('id', str(self.id)) - parmElement.set('name', self.name) - parmElement.set('value', self.value) - parmElement.set('format', self.format) - - def readXml(self, parmElement): - - self.id = parmElement.get('id') - self.name = parmElement.get('name') - self.value = parmElement.get('value') - self.format = str.lower(parmElement.get('format')) - - # Compatible with old signal chain version - if self.format == 'int' and self.name == 'idfigure': - self.name = 'id' - - def printattr(self): - - print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id)) - -class OperationConf(): - - ELEMENTNAME = 'Operation' - - def __init__(self): - - self.id = '0' - self.name = None - self.priority = None - self.parameters = {} - self.object = None - self.operations = [] - - def getId(self): - - return self.id - - def getNewId(self): - - return int(self.id) * 10 + len(self.operations) + 1 - - def updateId(self, new_id): - - self.id = str(new_id) - - n = 1 - for conf in self.operations: - conf_id = str(int(new_id) * 10 + n) - conf.updateId(conf_id) - n += 1 - - def getKwargs(self): - - params = {} - - for key, value in self.parameters.items(): - if value not in (None, '', ' '): - params[key] = value - - return params - - def update(self, **kwargs): - - for key, value in kwargs.items(): - self.addParameter(name=key, value=value) - - def addParameter(self, name, value, format=None): - ''' - ''' - - if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value): - self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')]) - elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value): - self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')]) - else: - try: - self.parameters[name] = ast.literal_eval(value) - except: - if isinstance(value, str) and ',' in value: - self.parameters[name] = value.split(',') - else: - self.parameters[name] = value - - def getParameters(self): - - params = {} - for key, value in self.parameters.items(): - s = type(value).__name__ - if s == 'date': - params[key] = value.strftime('%Y/%m/%d') - elif s == 'time': - params[key] = value.strftime('%H:%M:%S') - else: - params[key] = str(value) - - return params - - def makeXml(self, element): - - xml = SubElement(element, self.ELEMENTNAME) - for label in self.xml_labels: - xml.set(label, str(getattr(self, label))) - - for key, value in self.getParameters().items(): - xml_param = SubElement(xml, 'Parameter') - xml_param.set('name', key) - xml_param.set('value', value) - - for conf in self.operations: - conf.makeXml(xml) - - def __str__(self): - - if self.ELEMENTNAME == 'Operation': - s = ' {}[id={}]\n'.format(self.name, self.id) - else: - s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId) - - for key, value in self.parameters.items(): - if self.ELEMENTNAME == 'Operation': - s += ' {}: {}\n'.format(key, value) - else: - s += ' {}: {}\n'.format(key, value) - - for conf in self.operations: - s += str(conf) - - return s - -class OperationConf(ConfBase): - - ELEMENTNAME = 'Operation' - xml_labels = ['id', 'name'] - - def setup(self, id, name, priority, project_id, err_queue): - - self.id = str(id) - self.project_id = project_id - self.name = name - self.type = 'other' - self.err_queue = err_queue - - def readXml(self, element, project_id, err_queue): - - self.id = element.get('id') - self.name = element.get('name') - self.type = 'other' - self.project_id = str(project_id) - self.err_queue = err_queue - - for elm in element.iter('Parameter'): - self.addParameter(elm.get('name'), elm.get('value')) - - def createObject(self): - - className = eval(self.name) - - if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name: - kwargs = self.getKwargs() - opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs) - opObj.start() - self.type = 'external' - else: - opObj = className() - - self.object = opObj - return opObj - -class ProcUnitConf(ConfBase): - - ELEMENTNAME = 'ProcUnit' - xml_labels = ['id', 'inputId', 'name'] - - def setup(self, project_id, id, name, datatype, inputId, err_queue): - ''' - ''' - - if datatype == None and name == None: - raise ValueError('datatype or name should be defined') - - if name == None: - if 'Proc' in datatype: - name = datatype - else: - name = '%sProc' % (datatype) - - if datatype == None: - datatype = name.replace('Proc', '') - - self.id = str(id) - self.project_id = project_id - self.name = name - self.datatype = datatype - self.inputId = inputId - self.err_queue = err_queue - self.operations = [] - self.parameters = {} - - def removeOperation(self, id): - - i = [1 if x.id==id else 0 for x in self.operations] - self.operations.pop(i.index(1)) - - def getOperation(self, id): - - for conf in self.operations: - if conf.id == id: - return conf - - def addOperation(self, name, optype='self'): - ''' - ''' - - id = self.getNewId() - conf = OperationConf() - conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue) - self.operations.append(conf) - - return conf - - def readXml(self, element, project_id, err_queue): - - self.id = element.get('id') - self.name = element.get('name') - self.inputId = None if element.get('inputId') == 'None' else element.get('inputId') - self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), '')) - self.project_id = str(project_id) - self.err_queue = err_queue - self.operations = [] - self.parameters = {} - - for elm in element: - if elm.tag == 'Parameter': - self.addParameter(elm.get('name'), elm.get('value')) - elif elm.tag == 'Operation': - conf = OperationConf() - conf.readXml(elm, project_id, err_queue) - self.operations.append(conf) - - def createObjects(self): - ''' - Instancia de unidades de procesamiento. - ''' - - className = eval(self.name) - #print(self.name) - kwargs = self.getKwargs() - procUnitObj = className() - procUnitObj.name = self.name - log.success('creating process...', self.name) - - for conf in self.operations: - - opObj = conf.createObject() - - log.success('adding operation: {}, type:{}'.format( - conf.name, - conf.type), self.name) - - procUnitObj.addOperation(conf, opObj) - - self.object = procUnitObj - - def run(self): - ''' - ''' - - return self.object.call(**self.getKwargs()) - - -class ReadUnitConf(ProcUnitConf): - - ELEMENTNAME = 'ReadUnit' - - def __init__(self): - - self.id = None - self.datatype = None - self.name = None - self.inputId = None - self.operations = [] - self.parameters = {} - - def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='', - startTime='', endTime='', server=None, **kwargs): - - if datatype == None and name == None: - raise ValueError('datatype or name should be defined') - if name == None: - if 'Reader' in datatype: - name = datatype - datatype = name.replace('Reader','') - else: - name = '{}Reader'.format(datatype) - if datatype == None: - if 'Reader' in name: - datatype = name.replace('Reader','') - else: - datatype = name - name = '{}Reader'.format(name) - - self.id = id - self.project_id = project_id - self.name = name - self.datatype = datatype - self.err_queue = err_queue - - self.addParameter(name='path', value=path) - self.addParameter(name='startDate', value=startDate) - self.addParameter(name='endDate', value=endDate) - self.addParameter(name='startTime', value=startTime) - self.addParameter(name='endTime', value=endTime) - - for key, value in kwargs.items(): - self.addParameter(name=key, value=value) - - -class Project(Process): - """API to create signal chain projects""" - - ELEMENTNAME = 'Project' - - def __init__(self, name=''): - - Process.__init__(self) - self.id = '1' - if name: - self.name = '{} ({})'.format(Process.__name__, name) - self.filename = None - self.description = None - self.email = None - self.alarm = [] - self.configurations = {} - # self.err_queue = Queue() - self.err_queue = None - self.started = False - - def getNewId(self): - - idList = list(self.configurations.keys()) - id = int(self.id) * 10 - - while True: - id += 1 - - if str(id) in idList: - continue - - break - - return str(id) - - def updateId(self, new_id): - - self.id = str(new_id) - - keyList = list(self.configurations.keys()) - keyList.sort() - - n = 1 - new_confs = {} - - for procKey in keyList: - - conf = self.configurations[procKey] - idProcUnit = str(int(self.id) * 10 + n) - conf.updateId(idProcUnit) - new_confs[idProcUnit] = conf - n += 1 - - self.configurations = new_confs - - def setup(self, id=1, name='', description='', email=None, alarm=[]): - - self.id = str(id) - self.description = description - self.email = email - self.alarm = alarm - if name: - self.name = '{} ({})'.format(Process.__name__, name) - - def update(self, **kwargs): - - for key, value in kwargs.items(): - setattr(self, key, value) - - def clone(self): - - p = Project() - p.id = self.id - p.name = self.name - p.description = self.description - p.configurations = self.configurations.copy() - - return p - - def addReadUnit(self, id=None, datatype=None, name=None, **kwargs): - - ''' - ''' - - if id is None: - idReadUnit = self.getNewId() - else: - idReadUnit = str(id) - - conf = ReadUnitConf() - conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs) - self.configurations[conf.id] = conf - - return conf - - def addProcUnit(self, id=None, inputId='0', datatype=None, name=None): - - ''' - ''' - - if id is None: - idProcUnit = self.getNewId() - else: - idProcUnit = id - - conf = ProcUnitConf() - conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue) - self.configurations[conf.id] = conf - - return conf - - def removeProcUnit(self, id): - - if id in self.configurations: - self.configurations.pop(id) - - def getReadUnit(self): - - for obj in list(self.configurations.values()): - if obj.ELEMENTNAME == 'ReadUnit': - return obj - - return None - - def getProcUnit(self, id): - - return self.configurations[id] - - def getUnits(self): - - keys = list(self.configurations) - keys.sort() - - for key in keys: - yield self.configurations[key] - - def updateUnit(self, id, **kwargs): - - conf = self.configurations[id].update(**kwargs) - - def makeXml(self): - - xml = Element('Project') - xml.set('id', str(self.id)) - xml.set('name', self.name) - xml.set('description', self.description) - - for conf in self.configurations.values(): - conf.makeXml(xml) - - self.xml = xml - - def writeXml(self, filename=None): - - if filename == None: - if self.filename: - filename = self.filename - else: - filename = 'schain.xml' - - if not filename: - print('filename has not been defined. Use setFilename(filename) for do it.') - return 0 - - abs_file = os.path.abspath(filename) - - if not os.access(os.path.dirname(abs_file), os.W_OK): - print('No write permission on %s' % os.path.dirname(abs_file)) - return 0 - - if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)): - print('File %s already exists and it could not be overwriten' % abs_file) - return 0 - - self.makeXml() - - ElementTree(self.xml).write(abs_file, method='xml') - - self.filename = abs_file - - return 1 - - def readXml(self, filename): - - abs_file = os.path.abspath(filename) - - self.configurations = {} - - try: - self.xml = ElementTree().parse(abs_file) - except: - log.error('Error reading %s, verify file format' % filename) - return 0 - - self.id = self.xml.get('id') - self.name = self.xml.get('name') - self.description = self.xml.get('description') - - for element in self.xml: - if element.tag == 'ReadUnit': - conf = ReadUnitConf() - conf.readXml(element, self.id, self.err_queue) - self.configurations[conf.id] = conf - elif element.tag == 'ProcUnit': - conf = ProcUnitConf() - input_proc = self.configurations[element.get('inputId')] - conf.readXml(element, self.id, self.err_queue) - self.configurations[conf.id] = conf - - self.filename = abs_file - - return 1 - - def __str__(self): - - text = '\nProject[id=%s, name=%s, description=%s]\n\n' % ( - self.id, - self.name, - self.description, - ) - - for conf in self.configurations.values(): - text += '{}'.format(conf) - - return text - - def createObjects(self): - - keys = list(self.configurations.keys()) - keys.sort() - for key in keys: - conf = self.configurations[key] - conf.createObjects() - if conf.inputId is not None: - conf.object.setInput(self.configurations[conf.inputId].object) - - def monitor(self): - - t = Thread(target=self._monitor, args=(self.err_queue, self.ctx)) - t.start() - - def _monitor(self, queue, ctx): - - import socket - - procs = 0 - err_msg = '' - - while True: - msg = queue.get() - if '#_start_#' in msg: - procs += 1 - elif '#_end_#' in msg: - procs -=1 - else: - err_msg = msg - - if procs == 0 or 'Traceback' in err_msg: - break - time.sleep(0.1) - - if '|' in err_msg: - name, err = err_msg.split('|') - if 'SchainWarning' in err: - log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name) - elif 'SchainError' in err: - log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name) - else: - log.error(err, name) - else: - name, err = self.name, err_msg - - time.sleep(1) - - ctx.term() - - message = ''.join(err) - - if err_msg: - subject = 'SChain v%s: Error running %s\n' % ( - schainpy.__version__, self.name) - - subtitle = 'Hostname: %s\n' % socket.gethostbyname( - socket.gethostname()) - subtitle += 'Working directory: %s\n' % os.path.abspath('./') - subtitle += 'Configuration file: %s\n' % self.filename - subtitle += 'Time: %s\n' % str(datetime.datetime.now()) - - readUnitConfObj = self.getReadUnit() - if readUnitConfObj: - subtitle += '\nInput parameters:\n' - subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path'] - subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate'] - subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate'] - subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime'] - subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime'] - - a = Alarm( - modes=self.alarm, - email=self.email, - message=message, - subject=subject, - subtitle=subtitle, - filename=self.filename - ) - - a.start() - - def setFilename(self, filename): - - self.filename = filename - - def runProcs(self): - - err = False - n = len(self.configurations) - - while not err: - for conf in self.getUnits(): - ok = conf.run() - if ok == 'Error': - n -= 1 - continue - elif not ok: - break - if n == 0: - err = True - - def run(self): - - log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='') - self.started = True - self.start_time = time.time() - self.createObjects() - self.runProcs() - log.success('{} Done (Time: {:4.2f}s)'.format( - self.name, - time.time()-self.start_time), '') +# Copyright (c) 2012-2020 Jicamarca Radio Observatory +# All rights reserved. +# +# Distributed under the terms of the BSD 3-clause license. +"""API to create signal chain projects + +The API is provide through class: Project +""" + +import re +import sys +import ast +import datetime +import traceback +import time +import multiprocessing +from multiprocessing import Process, Queue +from threading import Thread +from xml.etree.ElementTree import ElementTree, Element, SubElement + +from schainpy.admin import Alarm, SchainWarning +from schainpy.model import * +from schainpy.utils import log + +if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7: + multiprocessing.set_start_method('fork') + +class ConfBase(): + + def __init__(self): + + self.id = '0' + self.name = None + self.priority = None + self.parameters = {} + self.object = None + self.operations = [] + + def getId(self): + + return self.id + + def getNewId(self): + + return int(self.id) * 10 + len(self.operations) + 1 + + def updateId(self, new_id): + + self.id = str(new_id) + + n = 1 + for conf in self.operations: + conf_id = str(int(new_id) * 10 + n) + conf.updateId(conf_id) + n += 1 + + def getKwargs(self): + + params = {} + + for key, value in self.parameters.items(): + if value not in (None, '', ' '): + params[key] = value + + return params + + def update(self, **kwargs): + + for key, value in kwargs.items(): + self.addParameter(name=key, value=value) + + def addParameter(self, name, value, format=None): + ''' + ''' + + if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value): + self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')]) + elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value): + self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')]) + else: + try: + self.parameters[name] = ast.literal_eval(value) + except: + if isinstance(value, str) and ',' in value: + self.parameters[name] = value.split(',') + else: + self.parameters[name] = value + + def getParameters(self): + + params = {} + for key, value in self.parameters.items(): + s = type(value).__name__ + if s == 'date': + params[key] = value.strftime('%Y/%m/%d') + elif s == 'time': + params[key] = value.strftime('%H:%M:%S') + else: + params[key] = str(value) + + return params + + def makeXml(self, element): + + xml = SubElement(element, self.ELEMENTNAME) + for label in self.xml_labels: + xml.set(label, str(getattr(self, label))) + + for key, value in self.getParameters().items(): + xml_param = SubElement(xml, 'Parameter') + xml_param.set('name', key) + xml_param.set('value', value) + + for conf in self.operations: + conf.makeXml(xml) + + def __str__(self): + + if self.ELEMENTNAME == 'Operation': + s = ' {}[id={}]\n'.format(self.name, self.id) + else: + s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId) + + for key, value in self.parameters.items(): + if self.ELEMENTNAME == 'Operation': + s += ' {}: {}\n'.format(key, value) + else: + s += ' {}: {}\n'.format(key, value) + + for conf in self.operations: + s += str(conf) + + return s + +class OperationConf(ConfBase): + + ELEMENTNAME = 'Operation' + xml_labels = ['id', 'name'] + + def setup(self, id, name, priority, project_id, err_queue): + + self.id = str(id) + self.project_id = project_id + self.name = name + self.type = 'other' + self.err_queue = err_queue + + def readXml(self, element, project_id, err_queue): + + self.id = element.get('id') + self.name = element.get('name') + self.type = 'other' + self.project_id = str(project_id) + self.err_queue = err_queue + + for elm in element.iter('Parameter'): + self.addParameter(elm.get('name'), elm.get('value')) + + def createObject(self): + + className = eval(self.name) + + if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name: + kwargs = self.getKwargs() + opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs) + opObj.start() + self.type = 'external' + else: + opObj = className() + + self.object = opObj + return opObj + +class ProcUnitConf(ConfBase): + + ELEMENTNAME = 'ProcUnit' + xml_labels = ['id', 'inputId', 'name'] + + def setup(self, project_id, id, name, datatype, inputId, err_queue): + ''' + ''' + + if datatype == None and name == None: + raise ValueError('datatype or name should be defined') + + if name == None: + if 'Proc' in datatype: + name = datatype + else: + name = '%sProc' % (datatype) + + if datatype == None: + datatype = name.replace('Proc', '') + + self.id = str(id) + self.project_id = project_id + self.name = name + self.datatype = datatype + self.inputId = inputId + self.err_queue = err_queue + self.operations = [] + self.parameters = {} + + def removeOperation(self, id): + + i = [1 if x.id==id else 0 for x in self.operations] + self.operations.pop(i.index(1)) + + def getOperation(self, id): + + for conf in self.operations: + if conf.id == id: + return conf + + def addOperation(self, name, optype='self'): + ''' + ''' + + id = self.getNewId() + conf = OperationConf() + conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue) + self.operations.append(conf) + + return conf + + def readXml(self, element, project_id, err_queue): + + self.id = element.get('id') + self.name = element.get('name') + self.inputId = None if element.get('inputId') == 'None' else element.get('inputId') + self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), '')) + self.project_id = str(project_id) + self.err_queue = err_queue + self.operations = [] + self.parameters = {} + + for elm in element: + if elm.tag == 'Parameter': + self.addParameter(elm.get('name'), elm.get('value')) + elif elm.tag == 'Operation': + conf = OperationConf() + conf.readXml(elm, project_id, err_queue) + self.operations.append(conf) + + def createObjects(self): + ''' + Instancia de unidades de procesamiento. + ''' + + className = eval(self.name) + kwargs = self.getKwargs() + procUnitObj = className() + procUnitObj.name = self.name + log.success('creating process...', self.name) + + for conf in self.operations: + + opObj = conf.createObject() + + log.success('adding operation: {}, type:{}'.format( + conf.name, + conf.type), self.name) + + procUnitObj.addOperation(conf, opObj) + + self.object = procUnitObj + + def run(self): + ''' + ''' + + return self.object.call(**self.getKwargs()) + + +class ReadUnitConf(ProcUnitConf): + + ELEMENTNAME = 'ReadUnit' + + def __init__(self): + + self.id = None + self.datatype = None + self.name = None + self.inputId = None + self.operations = [] + self.parameters = {} + + def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='', + startTime='', endTime='', server=None, **kwargs): + + if datatype == None and name == None: + raise ValueError('datatype or name should be defined') + if name == None: + if 'Reader' in datatype: + name = datatype + datatype = name.replace('Reader','') + else: + name = '{}Reader'.format(datatype) + if datatype == None: + if 'Reader' in name: + datatype = name.replace('Reader','') + else: + datatype = name + name = '{}Reader'.format(name) + + self.id = id + self.project_id = project_id + self.name = name + self.datatype = datatype + self.err_queue = err_queue + + self.addParameter(name='path', value=path) + self.addParameter(name='startDate', value=startDate) + self.addParameter(name='endDate', value=endDate) + self.addParameter(name='startTime', value=startTime) + self.addParameter(name='endTime', value=endTime) + + for key, value in kwargs.items(): + self.addParameter(name=key, value=value) + + +class Project(Process): + """API to create signal chain projects""" + + ELEMENTNAME = 'Project' + + def __init__(self, name=''): + + Process.__init__(self) + self.id = '1' + if name: + self.name = '{} ({})'.format(Process.__name__, name) + self.filename = None + self.description = None + self.email = None + self.alarm = [] + self.configurations = {} + # self.err_queue = Queue() + self.err_queue = None + self.started = False + + def getNewId(self): + + idList = list(self.configurations.keys()) + id = int(self.id) * 10 + + while True: + id += 1 + + if str(id) in idList: + continue + + break + + return str(id) + + def updateId(self, new_id): + + self.id = str(new_id) + + keyList = list(self.configurations.keys()) + keyList.sort() + + n = 1 + new_confs = {} + + for procKey in keyList: + + conf = self.configurations[procKey] + idProcUnit = str(int(self.id) * 10 + n) + conf.updateId(idProcUnit) + new_confs[idProcUnit] = conf + n += 1 + + self.configurations = new_confs + + def setup(self, id=1, name='', description='', email=None, alarm=[]): + + self.id = str(id) + self.description = description + self.email = email + self.alarm = alarm + if name: + self.name = '{} ({})'.format(Process.__name__, name) + + def update(self, **kwargs): + + for key, value in kwargs.items(): + setattr(self, key, value) + + def clone(self): + + p = Project() + p.id = self.id + p.name = self.name + p.description = self.description + p.configurations = self.configurations.copy() + + return p + + def addReadUnit(self, id=None, datatype=None, name=None, **kwargs): + + ''' + ''' + + if id is None: + idReadUnit = self.getNewId() + else: + idReadUnit = str(id) + + conf = ReadUnitConf() + conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs) + self.configurations[conf.id] = conf + + return conf + + def addProcUnit(self, id=None, inputId='0', datatype=None, name=None): + + ''' + ''' + + if id is None: + idProcUnit = self.getNewId() + else: + idProcUnit = id + + conf = ProcUnitConf() + conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue) + self.configurations[conf.id] = conf + + return conf + + def removeProcUnit(self, id): + + if id in self.configurations: + self.configurations.pop(id) + + def getReadUnit(self): + + for obj in list(self.configurations.values()): + if obj.ELEMENTNAME == 'ReadUnit': + return obj + + return None + + def getProcUnit(self, id): + + return self.configurations[id] + + def getUnits(self): + + keys = list(self.configurations) + keys.sort() + + for key in keys: + yield self.configurations[key] + + def updateUnit(self, id, **kwargs): + + conf = self.configurations[id].update(**kwargs) + + def makeXml(self): + + xml = Element('Project') + xml.set('id', str(self.id)) + xml.set('name', self.name) + xml.set('description', self.description) + + for conf in self.configurations.values(): + conf.makeXml(xml) + + self.xml = xml + + def writeXml(self, filename=None): + + if filename == None: + if self.filename: + filename = self.filename + else: + filename = 'schain.xml' + + if not filename: + print('filename has not been defined. Use setFilename(filename) for do it.') + return 0 + + abs_file = os.path.abspath(filename) + + if not os.access(os.path.dirname(abs_file), os.W_OK): + print('No write permission on %s' % os.path.dirname(abs_file)) + return 0 + + if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)): + print('File %s already exists and it could not be overwriten' % abs_file) + return 0 + + self.makeXml() + + ElementTree(self.xml).write(abs_file, method='xml') + + self.filename = abs_file + + return 1 + + def readXml(self, filename): + + abs_file = os.path.abspath(filename) + + self.configurations = {} + + try: + self.xml = ElementTree().parse(abs_file) + except: + log.error('Error reading %s, verify file format' % filename) + return 0 + + self.id = self.xml.get('id') + self.name = self.xml.get('name') + self.description = self.xml.get('description') + + for element in self.xml: + if element.tag == 'ReadUnit': + conf = ReadUnitConf() + conf.readXml(element, self.id, self.err_queue) + self.configurations[conf.id] = conf + elif element.tag == 'ProcUnit': + conf = ProcUnitConf() + input_proc = self.configurations[element.get('inputId')] + conf.readXml(element, self.id, self.err_queue) + self.configurations[conf.id] = conf + + self.filename = abs_file + + return 1 + + def __str__(self): + + text = '\nProject[id=%s, name=%s, description=%s]\n\n' % ( + self.id, + self.name, + self.description, + ) + + for conf in self.configurations.values(): + text += '{}'.format(conf) + + return text + + def createObjects(self): + + keys = list(self.configurations.keys()) + keys.sort() + for key in keys: + conf = self.configurations[key] + conf.createObjects() + if conf.inputId is not None: + conf.object.setInput(self.configurations[conf.inputId].object) + + def monitor(self): + + t = Thread(target=self._monitor, args=(self.err_queue, self.ctx)) + t.start() + + def _monitor(self, queue, ctx): + + import socket + + procs = 0 + err_msg = '' + + while True: + msg = queue.get() + if '#_start_#' in msg: + procs += 1 + elif '#_end_#' in msg: + procs -=1 + else: + err_msg = msg + + if procs == 0 or 'Traceback' in err_msg: + break + time.sleep(0.1) + + if '|' in err_msg: + name, err = err_msg.split('|') + if 'SchainWarning' in err: + log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name) + elif 'SchainError' in err: + log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name) + else: + log.error(err, name) + else: + name, err = self.name, err_msg + + time.sleep(1) + + ctx.term() + + message = ''.join(err) + + if err_msg: + subject = 'SChain v%s: Error running %s\n' % ( + schainpy.__version__, self.name) + + subtitle = 'Hostname: %s\n' % socket.gethostbyname( + socket.gethostname()) + subtitle += 'Working directory: %s\n' % os.path.abspath('./') + subtitle += 'Configuration file: %s\n' % self.filename + subtitle += 'Time: %s\n' % str(datetime.datetime.now()) + + readUnitConfObj = self.getReadUnit() + if readUnitConfObj: + subtitle += '\nInput parameters:\n' + subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path'] + subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate'] + subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate'] + subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime'] + subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime'] + + a = Alarm( + modes=self.alarm, + email=self.email, + message=message, + subject=subject, + subtitle=subtitle, + filename=self.filename + ) + + a.start() + + def setFilename(self, filename): + + self.filename = filename + + def runProcs(self): + + err = False + n = len(self.configurations) + + while not err: + for conf in self.getUnits(): + ok = conf.run() + if ok == 'Error': + n -= 1 + continue + elif not ok: + break + if n == 0: + err = True + + def run(self): + + log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='') + self.started = True + self.start_time = time.time() + self.createObjects() + self.runProcs() + log.success('{} Done (Time: {:4.2f}s)'.format( + self.name, + time.time()-self.start_time), '') diff --git a/schainpy/model/io/jroIO_kamisr.py b/schainpy/model/io/jroIO_kamisr.py index 23f1f40..398b700 100644 --- a/schainpy/model/io/jroIO_kamisr.py +++ b/schainpy/model/io/jroIO_kamisr.py @@ -24,7 +24,7 @@ from schainpy.model.data.jrodata import Voltage from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator from numpy import imag -@MPDecorator + class AMISRReader(ProcessingUnit): ''' classdocs @@ -82,6 +82,7 @@ class AMISRReader(ProcessingUnit): self.dataOut = Voltage() self.dataOut.error=False + def setup(self,path=None, startDate=None, endDate=None, @@ -95,7 +96,7 @@ class AMISRReader(ProcessingUnit): nBaud = 0, online=False): - #print ("T",path) + self.timezone = timezone self.all = all @@ -126,20 +127,24 @@ class AMISRReader(ProcessingUnit): Add code ''' self.isConfig = True - + # print("Setup Done") pass def readAMISRHeader(self,fp): header = 'Raw11/Data/RadacHeader' self.beamCodeByPulse = fp.get(header+'/BeamCode') # LIST OF BEAMS PER PROFILE, TO BE USED ON REARRANGE - self.beamcodeFile = fp['Setup/Beamcodefile'][()].decode() - self.trueBeams = self.beamcodeFile.split("\n") - self.trueBeams.pop()#remove last - [self.realBeamCode.append(x) for x in self.trueBeams if x not in self.realBeamCode] - self.beamCode = [int(x, 16) for x in self.realBeamCode] + if (self.startDate> datetime.date(2021, 7, 15)): #Se cambió la forma de extracción de Apuntes el 17 + self.beamcodeFile = fp['Setup/Beamcodefile'][()].decode() + self.trueBeams = self.beamcodeFile.split("\n") + self.trueBeams.pop()#remove last + [self.realBeamCode.append(x) for x in self.trueBeams if x not in self.realBeamCode] + self.beamCode = [int(x, 16) for x in self.realBeamCode] + else: + _beamCode= fp.get('Raw11/Data/Beamcodes') #se usa la manera previa al cambio de apuntes + self.beamCode = _beamCode[0,:] + - #self.beamCode = fp.get('Raw11/Data/Beamcodes') # NUMBER OF CHANNELS AND IDENTIFY POSITION TO CREATE A FILE WITH THAT INFO #self.code = fp.get(header+'/Code') # NOT USE FOR THIS self.frameCount = fp.get(header+'/FrameCount')# NOT USE FOR THIS self.modeGroup = fp.get(header+'/ModeGroup')# NOT USE FOR THIS @@ -625,9 +630,6 @@ class AMISRReader(ProcessingUnit): diffUTC = 1.8e4 #UTC diference from peru in seconds --Joab diffUTC = 0 t_comp = (indexprof * self.ippSeconds * self.nchannels) + diffUTC # - #cambio posible 18/02/2020 - - #print("utc :",indexblock," __ ",t_comp) #print(numpy.shape(self.timeset)) @@ -648,7 +650,7 @@ class AMISRReader(ProcessingUnit): ''' This method will be called many times so here you should put all your code ''' - + #print("running kamisr") if not self.isConfig: self.setup(**kwargs) self.isConfig = True diff --git a/schainpy/model/io/jroIO_spectra.py b/schainpy/model/io/jroIO_spectra.py index 589c9b6..145b8d4 100644 --- a/schainpy/model/io/jroIO_spectra.py +++ b/schainpy/model/io/jroIO_spectra.py @@ -75,7 +75,7 @@ class SpectraReader(JRODataReader, ProcessingUnit): self.pts2read_SelfSpectra = 0 self.pts2read_CrossSpectra = 0 - self.pts2read_DCchannels = 0 + self.pts2read_DCchannels = 0 self.ext = ".pdata" self.optchar = "P" self.basicHeaderObj = BasicHeader(LOCALTIME) @@ -162,7 +162,7 @@ class SpectraReader(JRODataReader, ProcessingUnit): Exceptions: Si un bloque leido no es un bloque valido """ - + fpointer = self.fp.tell() spc = numpy.fromfile( self.fp, self.dtype[0], self.pts2read_SelfSpectra ) @@ -364,7 +364,7 @@ class SpectraWriter(JRODataWriter, Operation): data.tofile(self.fp) if self.data_cspc is not None: - + cspc = numpy.transpose( self.data_cspc, (0,2,1) ) data = numpy.zeros( numpy.shape(cspc), self.dtype ) #print 'data.shape', self.shape_cspc_Buffer @@ -376,7 +376,7 @@ class SpectraWriter(JRODataWriter, Operation): data.tofile(self.fp) if self.data_dc is not None: - + dc = self.data_dc data = numpy.zeros( numpy.shape(dc), self.dtype ) data['real'] = dc.real @@ -524,4 +524,4 @@ class SpectraWriter(JRODataWriter, Operation): self.processingHeaderObj.processFlags = self.getProcessFlags() - self.setBasicHeader() \ No newline at end of file + self.setBasicHeader() diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index a923b62..053dd92 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -174,7 +174,7 @@ def MPDecorator(BaseClass): self.name = '{}{}'.format(self.CODE.upper(), 'Plot') self.start_time = time.time() - self.err_queue = args[3] + #self.err_queue = args[2] self.queue = Queue(maxsize=1) self.myrun = BaseClass.run diff --git a/schainpy/scripts/amisr_eej_proc_offline_v3.py b/schainpy/scripts/amisr_eej_proc_offline_v3.py index 152cf9a..d351760 100644 --- a/schainpy/scripts/amisr_eej_proc_offline_v3.py +++ b/schainpy/scripts/amisr_eej_proc_offline_v3.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python + import os, sys import time import datetime @@ -20,10 +20,10 @@ def main(): ymax = '300' dbmin = '45' #'60'#'55' #'40' #noise esf eej dbmax = '65' #'70' #'55' - showSPC = '0' #view plot Spectra - showRTI = '0' #view plot RTI - showNOISE = '0' #view plot NOISE - localtime='0' #para ajustar el horario en las gráficas '0' para dejar en utc + showSPC = '1' #view plot Spectra + showRTI = '1' #view plot RTI + showNOISE = '1' #view plot NOISE + localtime='1' #para ajustar el horario en las gráficas '0' para dejar en utc code = '1,-1,-1,-1,1,1,1,1,-1,-1,-1,1,-1,-1,-1,1,-1,-1,-1,1,-1,-1,1,-1,1,1,-1,1' nCode = '1' nBaud = '28' @@ -31,10 +31,11 @@ def main(): today = time.strftime("%Y/%m/%d") #startDate=today #endDate=today - startDate='2019/12/16' - endDate='2019/12/16' + startDate='2021/07/11' + endDate='2021/07/11' #inPath= '/home/soporte/dataAMISR_test/' inPath= '/home/soporte/dataAMISR/' + inPath= '/media/soporte/UARS_4T_D02/AMISR_DATA/2021/' #inPath = '/mnt/data_amisr' outPath = '/home/soporte/Data/EEJ' @@ -64,19 +65,22 @@ def main(): endDate=endDate, #endDate '2014/10/07', startTime='07:01:30',#'07:00:00', endTime='19:00:00',#'15:00:00', - walk=0, + walk=1, code = code, nCode = nCode, nBaud = nBaud, - timezone='ut', + timezone='lt', online=0) + #AMISR Processing Unit ##....................................................................................... ##....................................................................................... procUnitConfObj0 = controllerObj.addProcUnit(datatype='VoltageProc', inputId=readUnitConfObj.getId()) - opObj10 = procUnitConfObj0.addOperation(name='setRadarFrequency') - opObj10.addParameter(name='frequency', value='445e6', format='float') + opObj10 = procUnitConfObj0.addOperation(name='setAttribute') + opObj10.addParameter(name='frequency', value='445.09e6') + # opObj10 = procUnitConfObj0.addOperation(name='setRadarFrequency') + # opObj10.addParameter(name='frequency', value='445e6', format='float') opObj01 = procUnitConfObj0.addOperation(name='Decoder', optype='other') @@ -86,8 +90,8 @@ def main(): opObj01.addParameter(name='osamp', value=nosamp, format='int') - opObj02 = procUnitConfObj0.addOperation(name='CohInt', optype='other') - opObj02.addParameter(name='n', value='2', format='int') + # opObj02 = procUnitConfObj0.addOperation(name='CohInt', optype='other') + # opObj02.addParameter(name='n', value='2', format='int') @@ -106,9 +110,9 @@ def main(): ##....................................................................................... ##....................................................................................... - opObj13 = procUnitConfObj1.addOperation(name='getNoise' , optype ='self') - opObj13.addParameter(name='minHei', value='100', format='float') - opObj13.addParameter(name='maxHei', value='280', format='float') + # opObj13 = procUnitConfObj1.addOperation(name='getNoise' , optype ='self') + # opObj13.addParameter(name='minHei', value='100', format='float') + # opObj13.addParameter(name='maxHei', value='280', format='float') # @@ -138,21 +142,21 @@ def main(): opObj14.addParameter(name='id', value='3', format='int') opObj14.addParameter(name='wintitle', value='title0', format='str') opObj14.addParameter(name='showprofile', value='0', format='int') - opObj14.addParameter(name='xmin', value=xmin, format='int') - opObj14.addParameter(name='xmax', value=xmax, format='int') + opObj14.addParameter(name='tmin', value=xmin, format='int') + opObj14.addParameter(name='tmax', value=xmax, format='int') opObj14.addParameter(name='ymin', value=dbmin, format='int') opObj14.addParameter(name='ymax', value=dbmax, format='int') opObj14.addParameter(name='save', value=outPath, format='str') opObj14.addParameter(name='localtime', value=localtime,format='int') opObj14.addParameter(name='show', value = showNOISE, format='int') - + # opObj15 = procUnitConfObj1.addOperation(name='RTIPlot', optype='external') opObj15.addParameter(name='id', value='2', format='int') opObj15.addParameter(name='localtime', value=localtime,format='int') opObj15.addParameter(name='wintitle', value='RTI', format='str') - opObj15.addParameter(name='xmin', value=xmin, format='int') - opObj15.addParameter(name='xmax', value=xmax, format='int') #max value =23 + opObj15.addParameter(name='tmin', value=xmin, format='int') + opObj15.addParameter(name='tmax', value=xmax, format='int') #max value =23 opObj15.addParameter(name='ymin', value=ymin, format='int') opObj15.addParameter(name='zmin', value=dbmin, format='int') opObj15.addParameter(name='zmax', value=dbmax, format='int') @@ -170,16 +174,15 @@ def main(): opObj16 = procUnitConfObj2.addOperation(name='SpectralMoments', optype='other') - #Using ParamWriter:::: + #Using HDFWriter:::: ##....................................................................................... ##....................................................................................... - opObj17 = procUnitConfObj2.addOperation(name='ParamWriter', optype='external') + opObj17 = procUnitConfObj2.addOperation(name='HDFWriter', optype='external') opObj17.addParameter(name='path', value=outPath) opObj17.addParameter(name='blocksPerFile', value='10', format='int') opObj17.addParameter(name='metadataList',value='type,inputUnit,heightList',format='list') opObj17.addParameter(name='dataList',value='moments,data_SNR,utctime',format='list') - opObj17.addParameter(name='mode',value='1',format='int') #'0' channels, '1' parameters, '3' table (for meteors) - ##opObj17.addParameter(name='setType', value ='anything', format='str')#no usar + ##....................................................................................... diff --git a/schainpy/scripts/joab.xml b/schainpy/scripts/joab.xml deleted file mode 100644 index c46eb27..0000000 --- a/schainpy/scripts/joab.xml +++ /dev/null @@ -1 +0,0 @@ - \ No newline at end of file