From af11e4aac00c5d414e7bad450bec2dbf931d770a 2020-05-24 09:43:14 From: Juan C. Espinoza Date: 2020-05-24 09:43:14 Subject: [PATCH] Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported --- diff --git a/schainpy/cli/cli.py b/schainpy/cli/cli.py index 8873109..ef7089b 100644 --- a/schainpy/cli/cli.py +++ b/schainpy/cli/cli.py @@ -1,10 +1,9 @@ import click -import schainpy import subprocess import os import sys import glob -from multiprocessing import cpu_count +import schainpy from schainpy.controller import Project from schainpy.model import Operation, ProcessingUnit from schainpy.utils import log @@ -43,7 +42,7 @@ def getOperations(): def getArgs(op): module = locate('schainpy.model.{}'.format(op)) try: - obj = module(1,2,3,Queue(),5,6) + obj = module(1, 2, 3, Queue()) except: obj = module() @@ -68,7 +67,7 @@ def getArgs(op): def getDoc(obj): module = locate('schainpy.model.{}'.format(obj)) try: - obj = module(1,2,3,Queue(),5,6) + obj = module(1, 2, 3, Queue()) except: obj = module() return obj.__doc__ @@ -94,9 +93,9 @@ PREFIX = 'experiment' @click.argument('nextcommand', default=None, required=False, type=str) def main(command, nextcommand, version): """COMMAND LINE INTERFACE FOR SIGNAL CHAIN - JICAMARCA RADIO OBSERVATORY V3.0\n - Available commands.\n + Available commands:\n xml: runs a schain XML generated file\n - run: runs any python script starting 'experiment_'\n + run: runs any python script'\n generate: generates a template schain script\n list: return a list of available procs and operations\n search: return avilable operations, procs or arguments of the given @@ -156,11 +155,9 @@ def search(nextcommand): try: args = getArgs(nextcommand) doc = getDoc(nextcommand) - if len(args) == 0: - log.success('\n{} has no arguments'.format(nextcommand), '') - else: - log.success('{}\n{}\n\narguments:\n {}'.format( - nextcommand, doc, ', '.join(args)), '') + log.success('{}\n{}\n\narguments:\n {}'.format( + nextcommand, doc, ', '.join(args)), '' + ) except Exception as e: log.error('Module `{}` does not exists'.format(nextcommand), '') allModules = getAll() diff --git a/schainpy/controller.py b/schainpy/controller.py index 8514d8e..ffa502e 100644 --- a/schainpy/controller.py +++ b/schainpy/controller.py @@ -1,568 +1,180 @@ ''' -Updated on January , 2018, for multiprocessing purposes -Author: Sergio Cortez -Created on September , 2012 +Main routines to create a Signal Chain project ''' -from platform import python_version + +import re import sys import ast import datetime import traceback -import math import time -import zmq -from multiprocessing import Process, Queue, Event, Value, cpu_count +from multiprocessing import Process, Queue from threading import Thread -from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring -from xml.dom import minidom - +from xml.etree.ElementTree import ElementTree, Element, SubElement from schainpy.admin import Alarm, SchainWarning from schainpy.model import * from schainpy.utils import log -DTYPES = { - 'Voltage': '.r', - 'Spectra': '.pdata' -} - - -def MPProject(project, n=cpu_count()): - ''' - Project wrapper to run schain in n processes - ''' - - rconf = project.getReadUnitObj() - op = rconf.getOperationObj('run') - dt1 = op.getParameterValue('startDate') - dt2 = op.getParameterValue('endDate') - tm1 = op.getParameterValue('startTime') - tm2 = op.getParameterValue('endTime') - days = (dt2 - dt1).days - - for day in range(days + 1): - skip = 0 - cursor = 0 - processes = [] - dt = dt1 + datetime.timedelta(day) - dt_str = dt.strftime('%Y/%m/%d') - reader = JRODataReader() - paths, files = reader.searchFilesOffLine(path=rconf.path, - startDate=dt, - endDate=dt, - startTime=tm1, - endTime=tm2, - ext=DTYPES[rconf.datatype]) - nFiles = len(files) - if nFiles == 0: - continue - skip = int(math.ceil(nFiles / n)) - while nFiles > cursor * skip: - rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor, - skip=skip) - p = project.clone() - p.start() - processes.append(p) - cursor += 1 - - def beforeExit(exctype, value, trace): - for process in processes: - process.terminate() - process.join() - print(traceback.print_tb(trace)) - - sys.excepthook = beforeExit - - for process in processes: - process.join() - process.terminate() - - time.sleep(3) - -def wait(context): - - time.sleep(1) - c = zmq.Context() - receiver = c.socket(zmq.SUB) - receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id)) - receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode()) - msg = receiver.recv_multipart()[1] - context.terminate() - -class ParameterConf(): - - id = None - name = None - value = None - format = None - - __formated_value = None - - ELEMENTNAME = 'Parameter' - - def __init__(self): - - self.format = 'str' - - def getElementName(self): - - return self.ELEMENTNAME - - def getValue(self): - - value = self.value - format = self.format - - if self.__formated_value != None: - - return self.__formated_value - - if format == 'obj': - return value - - if format == 'str': - self.__formated_value = str(value) - return self.__formated_value - - if value == '': - raise ValueError('%s: This parameter value is empty' % self.name) - - if format == 'list': - strList = [s.strip() for s in value.split(',')] - self.__formated_value = strList - - return self.__formated_value - - if format == 'intlist': - ''' - Example: - value = (0,1,2) - ''' - - new_value = ast.literal_eval(value) - - if type(new_value) not in (tuple, list): - new_value = [int(new_value)] - - self.__formated_value = new_value - - return self.__formated_value - - if format == 'floatlist': - ''' - Example: - value = (0.5, 1.4, 2.7) - ''' - - new_value = ast.literal_eval(value) - - if type(new_value) not in (tuple, list): - new_value = [float(new_value)] - - self.__formated_value = new_value - - return self.__formated_value - - if format == 'date': - strList = value.split('/') - intList = [int(x) for x in strList] - date = datetime.date(intList[0], intList[1], intList[2]) - - self.__formated_value = date - - return self.__formated_value - - if format == 'time': - strList = value.split(':') - intList = [int(x) for x in strList] - time = datetime.time(intList[0], intList[1], intList[2]) - - self.__formated_value = time - - return self.__formated_value - - if format == 'pairslist': - ''' - Example: - value = (0,1),(1,2) - ''' - - new_value = ast.literal_eval(value) - - if type(new_value) not in (tuple, list): - raise ValueError('%s has to be a tuple or list of pairs' % value) - - if type(new_value[0]) not in (tuple, list): - if len(new_value) != 2: - raise ValueError('%s has to be a tuple or list of pairs' % value) - new_value = [new_value] - - for thisPair in new_value: - if len(thisPair) != 2: - raise ValueError('%s has to be a tuple or list of pairs' % value) - - self.__formated_value = new_value - - return self.__formated_value - - if format == 'multilist': - ''' - Example: - value = (0,1,2),(3,4,5) - ''' - multiList = ast.literal_eval(value) - - if type(multiList[0]) == int: - multiList = ast.literal_eval('(' + value + ')') - - self.__formated_value = multiList - - return self.__formated_value - - if format == 'bool': - value = int(value) - - if format == 'int': - value = float(value) - - format_func = eval(format) - - self.__formated_value = format_func(value) - - return self.__formated_value - - def updateId(self, new_id): - - self.id = str(new_id) - - def setup(self, id, name, value, format='str'): - self.id = str(id) - self.name = name - if format == 'obj': - self.value = value - else: - self.value = str(value) - self.format = str.lower(format) - - self.getValue() - - return 1 - - def update(self, name, value, format='str'): - - self.name = name - self.value = str(value) - self.format = format - - def makeXml(self, opElement): - if self.name not in ('queue',): - parmElement = SubElement(opElement, self.ELEMENTNAME) - parmElement.set('id', str(self.id)) - parmElement.set('name', self.name) - parmElement.set('value', self.value) - parmElement.set('format', self.format) - - def readXml(self, parmElement): - - self.id = parmElement.get('id') - self.name = parmElement.get('name') - self.value = parmElement.get('value') - self.format = str.lower(parmElement.get('format')) - - # Compatible with old signal chain version - if self.format == 'int' and self.name == 'idfigure': - self.name = 'id' - - def printattr(self): - - print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id)) - -class OperationConf(): - - ELEMENTNAME = 'Operation' +class ConfBase(): def __init__(self): self.id = '0' self.name = None self.priority = None - self.topic = None - - def __getNewId(self): - - return int(self.id) * 10 + len(self.parmConfObjList) + 1 + self.parameters = {} + self.object = None + self.operations = [] def getId(self): + return self.id + + def getNewId(self): + + return int(self.id) * 10 + len(self.operations) + 1 def updateId(self, new_id): self.id = str(new_id) n = 1 - for parmObj in self.parmConfObjList: - - idParm = str(int(new_id) * 10 + n) - parmObj.updateId(idParm) - + for conf in self.operations: + conf_id = str(int(new_id) * 10 + n) + conf.updateId(conf_id) n += 1 - def getElementName(self): - - return self.ELEMENTNAME - - def getParameterObjList(self): - - return self.parmConfObjList - - def getParameterObj(self, parameterName): - - for parmConfObj in self.parmConfObjList: - - if parmConfObj.name != parameterName: - continue - - return parmConfObj - - return None - - def getParameterObjfromValue(self, parameterValue): - - for parmConfObj in self.parmConfObjList: + def getKwargs(self): - if parmConfObj.getValue() != parameterValue: - continue + params = {} - return parmConfObj.getValue() + for key, value in self.parameters.items(): + if value not in (None, '', ' '): + params[key] = value + + return params - return None + def update(self, **kwargs): - def getParameterValue(self, parameterName): + for key, value in kwargs.items(): + self.addParameter(name=key, value=value) - parameterObj = self.getParameterObj(parameterName) + def addParameter(self, name, value, format=None): + ''' + ''' - # if not parameterObj: - # return None + if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value): + self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')]) + elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value): + self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')]) + else: + try: + self.parameters[name] = ast.literal_eval(value) + except: + if isinstance(value, str) and ',' in value: + self.parameters[name] = value.split(',') + else: + self.parameters[name] = value + + def getParameters(self): + + params = {} + for key, value in self.parameters.items(): + s = type(value).__name__ + if s == 'date': + params[key] = value.strftime('%Y/%m/%d') + elif s == 'time': + params[key] = value.strftime('%H:%M:%S') + else: + params[key] = str(value) - value = parameterObj.getValue() + return params + + def makeXml(self, element): - return value + xml = SubElement(element, self.ELEMENTNAME) + for label in self.xml_labels: + xml.set(label, str(getattr(self, label))) + + for key, value in self.getParameters().items(): + xml_param = SubElement(xml, 'Parameter') + xml_param.set('name', key) + xml_param.set('value', value) + + for conf in self.operations: + conf.makeXml(xml) + + def __str__(self): - def getKwargs(self): + if self.ELEMENTNAME == 'Operation': + s = ' {}[id={}]\n'.format(self.name, self.id) + else: + s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId) - kwargs = {} + for key, value in self.parameters.items(): + if self.ELEMENTNAME == 'Operation': + s += ' {}: {}\n'.format(key, value) + else: + s += ' {}: {}\n'.format(key, value) + + for conf in self.operations: + s += str(conf) - for parmConfObj in self.parmConfObjList: - if self.name == 'run' and parmConfObj.name == 'datatype': - continue + return s - kwargs[parmConfObj.name] = parmConfObj.getValue() +class OperationConf(ConfBase): - return kwargs + ELEMENTNAME = 'Operation' + xml_labels = ['id', 'name'] - def setup(self, id, name, priority, type, project_id, err_queue, lock): + def setup(self, id, name, priority, project_id, err_queue): self.id = str(id) self.project_id = project_id self.name = name - self.type = type - self.priority = priority + self.type = 'other' self.err_queue = err_queue - self.lock = lock - self.parmConfObjList = [] - - def removeParameters(self): - - for obj in self.parmConfObjList: - del obj - - self.parmConfObjList = [] - - def addParameter(self, name, value, format='str'): - - if value is None: - return None - id = self.__getNewId() - - parmConfObj = ParameterConf() - if not parmConfObj.setup(id, name, value, format): - return None - - self.parmConfObjList.append(parmConfObj) - - return parmConfObj - - def changeParameter(self, name, value, format='str'): - - parmConfObj = self.getParameterObj(name) - parmConfObj.update(name, value, format) - return parmConfObj + def readXml(self, element, project_id, err_queue): - def makeXml(self, procUnitElement): - - opElement = SubElement(procUnitElement, self.ELEMENTNAME) - opElement.set('id', str(self.id)) - opElement.set('name', self.name) - opElement.set('type', self.type) - opElement.set('priority', str(self.priority)) - - for parmConfObj in self.parmConfObjList: - parmConfObj.makeXml(opElement) - - def readXml(self, opElement, project_id): - - self.id = opElement.get('id') - self.name = opElement.get('name') - self.type = opElement.get('type') - self.priority = opElement.get('priority') - self.project_id = str(project_id) - - # Compatible with old signal chain version - # Use of 'run' method instead 'init' - if self.type == 'self' and self.name == 'init': - self.name = 'run' - - self.parmConfObjList = [] - - parmElementList = opElement.iter(ParameterConf().getElementName()) - - for parmElement in parmElementList: - parmConfObj = ParameterConf() - parmConfObj.readXml(parmElement) - - # Compatible with old signal chain version - # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER - if self.type != 'self' and self.name == 'Plot': - if parmConfObj.format == 'str' and parmConfObj.name == 'type': - self.name = parmConfObj.value - continue - - self.parmConfObjList.append(parmConfObj) - - def printattr(self): - - print('%s[%s]: name = %s, type = %s, priority = %s, project_id = %s' % (self.ELEMENTNAME, - self.id, - self.name, - self.type, - self.priority, - self.project_id)) + self.id = element.get('id') + self.name = element.get('name') + self.type = 'other' + self.project_id = str(project_id) + self.err_queue = err_queue - for parmConfObj in self.parmConfObjList: - parmConfObj.printattr() + for elm in element.iter('Parameter'): + self.addParameter(elm.get('name'), elm.get('value')) def createObject(self): className = eval(self.name) - if self.type == 'other': - opObj = className() - elif self.type == 'external': + if 'Plot' in self.name or 'Writer' in self.name: kwargs = self.getKwargs() - opObj = className(self.id, self.id, self.project_id, self.err_queue, self.lock, 'Operation', **kwargs) + opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs) opObj.start() - self.opObj = opObj + self.type = 'external' + else: + opObj = className() + self.object = opObj return opObj -class ProcUnitConf(): +class ProcUnitConf(ConfBase): ELEMENTNAME = 'ProcUnit' + xml_labels = ['id', 'inputId', 'name'] - def __init__(self): - - self.id = None - self.datatype = None - self.name = None - self.inputId = None - self.opConfObjList = [] - self.procUnitObj = None - self.opObjDict = {} - - def __getPriority(self): - - return len(self.opConfObjList) + 1 - - def __getNewId(self): - - return int(self.id) * 10 + len(self.opConfObjList) + 1 - - def getElementName(self): - - return self.ELEMENTNAME - - def getId(self): - - return self.id - - def updateId(self, new_id): + def setup(self, project_id, id, name, datatype, inputId, err_queue): ''' - new_id = int(parentId) * 10 + (int(self.id) % 10) - new_inputId = int(parentId) * 10 + (int(self.inputId) % 10) - - # If this proc unit has not inputs - #if self.inputId == '0': - #new_inputId = 0 - - n = 1 - for opConfObj in self.opConfObjList: - - idOp = str(int(new_id) * 10 + n) - opConfObj.updateId(idOp) - - n += 1 - - self.parentId = str(parentId) - self.id = str(new_id) - #self.inputId = str(new_inputId) - ''' - n = 1 - - def getInputId(self): - - return self.inputId - - def getOperationObjList(self): - - return self.opConfObjList - - def getOperationObj(self, name=None): - - for opConfObj in self.opConfObjList: - - if opConfObj.name != name: - continue - - return opConfObj - - return None - - def getOpObjfromParamValue(self, value=None): - - for opConfObj in self.opConfObjList: - if opConfObj.getParameterObjfromValue(parameterValue=value) != value: - continue - return opConfObj - return None - - def getProcUnitObj(self): - - return self.procUnitObj - - def setup(self, project_id, id, name, datatype, inputId, err_queue, lock): - ''' - id sera el topico a publicar - inputId sera el topico a subscribirse ''' - # Compatible with old signal chain version if datatype == None and name == None: raise ValueError('datatype or name should be defined') - #Definir una condicion para inputId cuando sea 0 - if name == None: if 'Proc' in datatype: name = datatype @@ -578,100 +190,49 @@ class ProcUnitConf(): self.datatype = datatype self.inputId = inputId self.err_queue = err_queue - self.lock = lock - self.opConfObjList = [] - - self.addOperation(name='run', optype='self') - - def removeOperations(self): - - for obj in self.opConfObjList: - del obj + self.operations = [] + self.parameters = {} - self.opConfObjList = [] - self.addOperation(name='run') + def removeOperation(self, id): - def addParameter(self, **kwargs): - ''' - Add parameters to 'run' operation - ''' - opObj = self.opConfObjList[0] - - opObj.addParameter(**kwargs) + i = [1 if x.id==id else 0 for x in self.operations] + self.operations.pop(i.index(1)) + + def getOperation(self, id): - return opObj + for conf in self.operations: + if conf.id == id: + return conf def addOperation(self, name, optype='self'): ''' - Actualizacion - > proceso comunicacion - En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic - definir el tipoc de socket o comunicacion ipc++ - ''' - id = self.__getNewId() - priority = self.__getPriority() # Sin mucho sentido, pero puede usarse - opConfObj = OperationConf() - opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.lock) - self.opConfObjList.append(opConfObj) - - return opConfObj - - def makeXml(self, projectElement): + id = self.getNewId() + conf = OperationConf() + conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue) + self.operations.append(conf) - procUnitElement = SubElement(projectElement, self.ELEMENTNAME) - procUnitElement.set('id', str(self.id)) - procUnitElement.set('name', self.name) - procUnitElement.set('datatype', self.datatype) - procUnitElement.set('inputId', str(self.inputId)) + return conf - for opConfObj in self.opConfObjList: - opConfObj.makeXml(procUnitElement) + def readXml(self, element, project_id, err_queue): - def readXml(self, upElement, project_id): - - self.id = upElement.get('id') - self.name = upElement.get('name') - self.datatype = upElement.get('datatype') - self.inputId = upElement.get('inputId') + self.id = element.get('id') + self.name = element.get('name') + self.inputId = None if element.get('inputId') == 'None' else element.get('inputId') + self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), '')) self.project_id = str(project_id) - - if self.ELEMENTNAME == 'ReadUnit': - self.datatype = self.datatype.replace('Reader', '') - - if self.ELEMENTNAME == 'ProcUnit': - self.datatype = self.datatype.replace('Proc', '') - - if self.inputId == 'None': - self.inputId = '0' - - self.opConfObjList = [] - - opElementList = upElement.iter(OperationConf().getElementName()) - - for opElement in opElementList: - opConfObj = OperationConf() - opConfObj.readXml(opElement, project_id) - self.opConfObjList.append(opConfObj) - - def printattr(self): - - print('%s[%s]: name = %s, datatype = %s, inputId = %s, project_id = %s' % (self.ELEMENTNAME, - self.id, - self.name, - self.datatype, - self.inputId, - self.project_id)) - - for opConfObj in self.opConfObjList: - opConfObj.printattr() - - def getKwargs(self): - - opObj = self.opConfObjList[0] - kwargs = opObj.getKwargs() - - return kwargs + self.err_queue = err_queue + self.operations = [] + self.parameters = {} + + for elm in element: + if elm.tag == 'Parameter': + self.addParameter(elm.get('name'), elm.get('value')) + elif elm.tag == 'Operation': + conf = OperationConf() + conf.readXml(elm, project_id, err_queue) + self.operations.append(conf) def createObjects(self): ''' @@ -680,39 +241,27 @@ class ProcUnitConf(): className = eval(self.name) kwargs = self.getKwargs() - procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, self.lock, 'ProcUnit', **kwargs) + procUnitObj = className() + procUnitObj.name = self.name log.success('creating process...', self.name) - for opConfObj in self.opConfObjList: + for conf in self.operations: - if opConfObj.type == 'self' and opConfObj.name == 'run': - continue - elif opConfObj.type == 'self': - opObj = getattr(procUnitObj, opConfObj.name) - else: - opObj = opConfObj.createObject() + opObj = conf.createObject() log.success('adding operation: {}, type:{}'.format( - opConfObj.name, - opConfObj.type), self.name) + conf.name, + conf.type), self.name) - procUnitObj.addOperation(opConfObj, opObj) + procUnitObj.addOperation(conf, opObj) - procUnitObj.start() - self.procUnitObj = procUnitObj - - def close(self): - - for opConfObj in self.opConfObjList: - if opConfObj.type == 'self': - continue - - opObj = self.procUnitObj.getOperationObj(opConfObj.id) - opObj.close() - - self.procUnitObj.close() + self.object = procUnitObj - return + def run(self): + ''' + ''' + + return self.object.call(**self.getKwargs()) class ReadUnitConf(ProcUnitConf): @@ -725,28 +274,12 @@ class ReadUnitConf(ProcUnitConf): self.datatype = None self.name = None self.inputId = None - self.opConfObjList = [] - self.lock = Event() - self.lock.set() - self.lock.n = Value('d', 0) - - def getElementName(self): - - return self.ELEMENTNAME + self.operations = [] + self.parameters = {} def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='', startTime='', endTime='', server=None, **kwargs): - - - ''' - *****el id del proceso sera el Topico - - Adicion de {topic}, si no esta presente -> error - kwargs deben ser trasmitidos en la instanciacion - - ''' - # Compatible with old signal chain version if datatype == None and name == None: raise ValueError('datatype or name should be defined') if name == None: @@ -766,112 +299,16 @@ class ReadUnitConf(ProcUnitConf): self.project_id = project_id self.name = name self.datatype = datatype - if path != '': - self.path = os.path.abspath(path) - self.startDate = startDate - self.endDate = endDate - self.startTime = startTime - self.endTime = endTime - self.server = server self.err_queue = err_queue - self.addRunOperation(**kwargs) - - def update(self, **kwargs): - - if 'datatype' in kwargs: - datatype = kwargs.pop('datatype') - if 'Reader' in datatype: - self.name = datatype - else: - self.name = '%sReader' % (datatype) - self.datatype = self.name.replace('Reader', '') - - attrs = ('path', 'startDate', 'endDate', - 'startTime', 'endTime') - - for attr in attrs: - if attr in kwargs: - setattr(self, attr, kwargs.pop(attr)) - - self.updateRunOperation(**kwargs) - - def removeOperations(self): - - for obj in self.opConfObjList: - del obj - - self.opConfObjList = [] - - def addRunOperation(self, **kwargs): - - opObj = self.addOperation(name='run', optype='self') - - if self.server is None: - opObj.addParameter( - name='datatype', value=self.datatype, format='str') - opObj.addParameter(name='path', value=self.path, format='str') - opObj.addParameter( - name='startDate', value=self.startDate, format='date') - opObj.addParameter( - name='endDate', value=self.endDate, format='date') - opObj.addParameter( - name='startTime', value=self.startTime, format='time') - opObj.addParameter( - name='endTime', value=self.endTime, format='time') - - for key, value in list(kwargs.items()): - opObj.addParameter(name=key, value=value, - format=type(value).__name__) - else: - opObj.addParameter(name='server', value=self.server, format='str') - - return opObj - - def updateRunOperation(self, **kwargs): - - opObj = self.getOperationObj(name='run') - opObj.removeParameters() - - opObj.addParameter(name='datatype', value=self.datatype, format='str') - opObj.addParameter(name='path', value=self.path, format='str') - opObj.addParameter( - name='startDate', value=self.startDate, format='date') - opObj.addParameter(name='endDate', value=self.endDate, format='date') - opObj.addParameter( - name='startTime', value=self.startTime, format='time') - opObj.addParameter(name='endTime', value=self.endTime, format='time') - - for key, value in list(kwargs.items()): - opObj.addParameter(name=key, value=value, - format=type(value).__name__) - - return opObj - - def readXml(self, upElement, project_id): - - self.id = upElement.get('id') - self.name = upElement.get('name') - self.datatype = upElement.get('datatype') - self.project_id = str(project_id) #yong - - if self.ELEMENTNAME == 'ReadUnit': - self.datatype = self.datatype.replace('Reader', '') - - self.opConfObjList = [] - - opElementList = upElement.iter(OperationConf().getElementName()) - - for opElement in opElementList: - opConfObj = OperationConf() - opConfObj.readXml(opElement, project_id) - self.opConfObjList.append(opConfObj) + + self.addParameter(name='path', value=path) + self.addParameter(name='startDate', value=startDate) + self.addParameter(name='endDate', value=endDate) + self.addParameter(name='startTime', value=startTime) + self.addParameter(name='endTime', value=endTime) - if opConfObj.name == 'run': - self.path = opConfObj.getParameterValue('path') - self.startDate = opConfObj.getParameterValue('startDate') - self.endDate = opConfObj.getParameterValue('endDate') - self.startTime = opConfObj.getParameterValue('startTime') - self.endTime = opConfObj.getParameterValue('endTime') + for key, value in kwargs.items(): + self.addParameter(name=key, value=value) class Project(Process): @@ -885,13 +322,15 @@ class Project(Process): self.filename = None self.description = None self.email = None - self.alarm = None - self.procUnitConfObjDict = {} - self.err_queue = Queue() + self.alarm = [] + self.configurations = {} + # self.err_queue = Queue() + self.err_queue = None + self.started = False - def __getNewId(self): + def getNewId(self): - idList = list(self.procUnitConfObjDict.keys()) + idList = list(self.configurations.keys()) id = int(self.id) * 10 while True: @@ -904,43 +343,28 @@ class Project(Process): return str(id) - def getElementName(self): - - return self.ELEMENTNAME - - def getId(self): - - return self.id - def updateId(self, new_id): self.id = str(new_id) - keyList = list(self.procUnitConfObjDict.keys()) + keyList = list(self.configurations.keys()) keyList.sort() n = 1 - newProcUnitConfObjDict = {} + new_confs = {} for procKey in keyList: - procUnitConfObj = self.procUnitConfObjDict[procKey] + conf = self.configurations[procKey] idProcUnit = str(int(self.id) * 10 + n) - procUnitConfObj.updateId(idProcUnit) - newProcUnitConfObjDict[idProcUnit] = procUnitConfObj + conf.updateId(idProcUnit) + new_confs[idProcUnit] = conf n += 1 - self.procUnitConfObjDict = newProcUnitConfObjDict + self.configurations = new_confs def setup(self, id=1, name='', description='', email=None, alarm=[]): - print(' ') - print('*' * 60) - print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__) - print('*' * 60) - print("* Python " + python_version() + " *") - print('*' * 19) - print(' ') self.id = str(id) self.description = description self.email = email @@ -950,108 +374,91 @@ class Project(Process): def update(self, **kwargs): - for key, value in list(kwargs.items()): + for key, value in kwargs.items(): setattr(self, key, value) def clone(self): p = Project() - p.procUnitConfObjDict = self.procUnitConfObjDict + p.id = self.id + p.name = self.name + p.description = self.description + p.configurations = self.configurations.copy() + return p def addReadUnit(self, id=None, datatype=None, name=None, **kwargs): ''' - Actualizacion: - Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos - - * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data) - ''' if id is None: - idReadUnit = self.__getNewId() + idReadUnit = self.getNewId() else: idReadUnit = str(id) - readUnitConfObj = ReadUnitConf() - readUnitConfObj.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs) - self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj + conf = ReadUnitConf() + conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs) + self.configurations[conf.id] = conf - return readUnitConfObj + return conf - def addProcUnit(self, inputId='0', datatype=None, name=None): + def addProcUnit(self, id=None, inputId='0', datatype=None, name=None): ''' - Actualizacion: - Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit) - Deberia reemplazar a "inputId" - - ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia - (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica. - ''' - idProcUnit = self.__getNewId() - procUnitConfObj = ProcUnitConf() - input_proc = self.procUnitConfObjDict[inputId] - procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.lock) - self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj + if id is None: + idProcUnit = self.getNewId() + else: + idProcUnit = id + + conf = ProcUnitConf() + conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue) + self.configurations[conf.id] = conf - return procUnitConfObj + return conf def removeProcUnit(self, id): - if id in list(self.procUnitConfObjDict.keys()): - self.procUnitConfObjDict.pop(id) - - def getReadUnitId(self): - - readUnitConfObj = self.getReadUnitObj() + if id in self.configurations: + self.configurations.pop(id) - return readUnitConfObj.id + def getReadUnit(self): - def getReadUnitObj(self): - - for obj in list(self.procUnitConfObjDict.values()): - if obj.getElementName() == 'ReadUnit': + for obj in list(self.configurations.values()): + if obj.ELEMENTNAME == 'ReadUnit': return obj return None - def getProcUnitObj(self, id=None, name=None): - - if id != None: - return self.procUnitConfObjDict[id] + def getProcUnit(self, id): - if name != None: - return self.getProcUnitObjByName(name) + return self.configurations[id] - return None - - def getProcUnitObjByName(self, name): + def getUnits(self): - for obj in list(self.procUnitConfObjDict.values()): - if obj.name == name: - return obj - - return None + keys = list(self.configurations) + keys.sort() - def procUnitItems(self): + for key in keys: + yield self.configurations[key] - return list(self.procUnitConfObjDict.items()) + def updateUnit(self, id, **kwargs): + conf = self.configurations[id].update(**kwargs) + def makeXml(self): - projectElement = Element('Project') - projectElement.set('id', str(self.id)) - projectElement.set('name', self.name) - projectElement.set('description', self.description) + xml = Element('Project') + xml.set('id', str(self.id)) + xml.set('name', self.name) + xml.set('description', self.description) - for procUnitConfObj in list(self.procUnitConfObjDict.values()): - procUnitConfObj.makeXml(projectElement) + for conf in self.configurations.values(): + conf.makeXml(xml) - self.projectElement = projectElement + self.xml = xml def writeXml(self, filename=None): @@ -1077,83 +484,72 @@ class Project(Process): self.makeXml() - ElementTree(self.projectElement).write(abs_file, method='xml') + ElementTree(self.xml).write(abs_file, method='xml') self.filename = abs_file return 1 - def readXml(self, filename=None): - - if not filename: - print('filename is not defined') - return 0 + def readXml(self, filename): abs_file = os.path.abspath(filename) - if not os.path.isfile(abs_file): - print('%s file does not exist' % abs_file) - return 0 - - self.projectElement = None - self.procUnitConfObjDict = {} + self.configurations = {} try: - self.projectElement = ElementTree().parse(abs_file) + self.xml = ElementTree().parse(abs_file) except: - print('Error reading %s, verify file format' % filename) + log.error('Error reading %s, verify file format' % filename) return 0 - self.project = self.projectElement.tag - - self.id = self.projectElement.get('id') - self.name = self.projectElement.get('name') - self.description = self.projectElement.get('description') - - readUnitElementList = self.projectElement.iter( - ReadUnitConf().getElementName()) - - for readUnitElement in readUnitElementList: - readUnitConfObj = ReadUnitConf() - readUnitConfObj.readXml(readUnitElement, self.id) - self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj - - procUnitElementList = self.projectElement.iter( - ProcUnitConf().getElementName()) - - for procUnitElement in procUnitElementList: - procUnitConfObj = ProcUnitConf() - procUnitConfObj.readXml(procUnitElement, self.id) - self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj + self.id = self.xml.get('id') + self.name = self.xml.get('name') + self.description = self.xml.get('description') + + for element in self.xml: + if element.tag == 'ReadUnit': + conf = ReadUnitConf() + conf.readXml(element, self.id, self.err_queue) + self.configurations[conf.id] = conf + elif element.tag == 'ProcUnit': + conf = ProcUnitConf() + input_proc = self.configurations[element.get('inputId')] + conf.readXml(element, self.id, self.err_queue) + self.configurations[conf.id] = conf self.filename = abs_file - + return 1 def __str__(self): - print('Project: name = %s, description = %s, id = %s' % ( - self.name, - self.description, - self.id)) + text = '\nProject[id=%s, name=%s, description=%s]\n\n' % ( + self.id, + self.name, + self.description, + ) + + for conf in self.configurations.values(): + text += '{}'.format(conf) - for procUnitConfObj in self.procUnitConfObjDict.values(): - print(procUnitConfObj) + return text def createObjects(self): - - keys = list(self.procUnitConfObjDict.keys()) + keys = list(self.configurations.keys()) keys.sort() for key in keys: - self.procUnitConfObjDict[key].createObjects() + conf = self.configurations[key] + conf.createObjects() + if conf.inputId is not None: + conf.object.setInput(self.configurations[conf.inputId].object) def monitor(self): - t = Thread(target=self.__monitor, args=(self.err_queue, self.ctx)) + t = Thread(target=self._monitor, args=(self.err_queue, self.ctx)) t.start() - def __monitor(self, queue, ctx): + def _monitor(self, queue, ctx): import socket @@ -1184,13 +580,7 @@ class Project(Process): else: name, err = self.name, err_msg - time.sleep(2) - - for conf in self.procUnitConfObjDict.values(): - for confop in conf.opConfObjList: - if confop.type == 'external': - confop.opObj.terminate() - conf.procUnitObj.terminate() + time.sleep(1) ctx.term() @@ -1206,15 +596,14 @@ class Project(Process): subtitle += 'Configuration file: %s\n' % self.filename subtitle += 'Time: %s\n' % str(datetime.datetime.now()) - readUnitConfObj = self.getReadUnitObj() + readUnitConfObj = self.getReadUnit() if readUnitConfObj: subtitle += '\nInput parameters:\n' - subtitle += '[Data path = %s]\n' % readUnitConfObj.path - subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype - subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate - subtitle += '[End date = %s]\n' % readUnitConfObj.endDate - subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime - subtitle += '[End time = %s]\n' % readUnitConfObj.endTime + subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path'] + subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate'] + subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate'] + subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime'] + subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime'] a = Alarm( modes=self.alarm, @@ -1227,64 +616,33 @@ class Project(Process): a.start() - def isPaused(self): - return 0 - - def isStopped(self): - return 0 - - def runController(self): - ''' - returns 0 when this process has been stopped, 1 otherwise - ''' - - if self.isPaused(): - print('Process suspended') - - while True: - time.sleep(0.1) - - if not self.isPaused(): - break - - if self.isStopped(): - break - - print('Process reinitialized') - - if self.isStopped(): - print('Process stopped') - return 0 - - return 1 - def setFilename(self, filename): self.filename = filename - def setProxy(self): + def runProcs(self): - if not os.path.exists('/tmp/schain'): - os.mkdir('/tmp/schain') + err = False + n = len(self.configurations) + + while not err: + for conf in self.getUnits(): + ok = conf.run() + if ok is 'Error': + n -= 1 + continue + elif not ok: + break + if n == 0: + err = True - self.ctx = zmq.Context() - xpub = self.ctx.socket(zmq.XPUB) - xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id)) - xsub = self.ctx.socket(zmq.XSUB) - xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id)) - self.monitor() - try: - zmq.proxy(xpub, xsub) - except zmq.ContextTerminated: - xpub.close() - xsub.close() - def run(self): - log.success('Starting {}: {}'.format(self.name, self.id), tag='') + log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='') + self.started = True self.start_time = time.time() - self.createObjects() - self.setProxy() - log.success('{} Done (Time: {}s)'.format( + self.createObjects() + self.runProcs() + log.success('{} Done (Time: {:4.2f}s)'.format( self.name, time.time()-self.start_time), '') diff --git a/schainpy/model/io/bltrIO_param.py b/schainpy/model/io/bltrIO_param.py index ab3a29c..148557e 100644 --- a/schainpy/model/io/bltrIO_param.py +++ b/schainpy/model/io/bltrIO_param.py @@ -84,7 +84,7 @@ DATA_STRUCTURE = numpy.dtype([ ('sea_algorithm', ' %s" %(self.nReadBlocks, - # self.processingHeaderObj.dataBlocksPerFile, - # self.dataOut.datatime.ctime()) - - def printInfo(self): - - if self.__printInfo == False: - return - - self.basicHeaderObj.printInfo() - self.systemHeaderObj.printInfo() - self.radarControllerHeaderObj.printInfo() - self.processingHeaderObj.printInfo() - - self.__printInfo = False - def run(self, **kwargs): """ @@ -1573,3 +1554,27 @@ class JRODataWriter(Reader): self.dataOut = dataOut self.putData() return self.dataOut + +class printInfo(Operation): + + def __init__(self): + + Operation.__init__(self) + self.__printInfo = True + + def run(self, dataOut, headers = ['systemHeaderObj', 'radarControllerHeaderObj', 'processingHeaderObj']): + if self.__printInfo == False: + return dataOut + + for header in headers: + if hasattr(dataOut, header): + obj = getattr(dataOut, header) + if hasattr(obj, 'printInfo'): + obj.printInfo() + else: + print(obj) + else: + log.warning('Header {} Not found in object'.format(header)) + + self.__printInfo = False + return dataOut \ No newline at end of file diff --git a/schainpy/model/io/jroIO_digitalRF.py b/schainpy/model/io/jroIO_digitalRF.py index 743703a..91bbb6b 100644 --- a/schainpy/model/io/jroIO_digitalRF.py +++ b/schainpy/model/io/jroIO_digitalRF.py @@ -31,7 +31,7 @@ try: except: pass -@MPDecorator + class DigitalRFReader(ProcessingUnit): ''' classdocs @@ -633,7 +633,7 @@ class DigitalRFReader(ProcessingUnit): return - +@MPDecorator class DigitalRFWriter(Operation): ''' classdocs diff --git a/schainpy/model/io/jroIO_heispectra.py b/schainpy/model/io/jroIO_heispectra.py index 3a144d0..3832760 100644 --- a/schainpy/model/io/jroIO_heispectra.py +++ b/schainpy/model/io/jroIO_heispectra.py @@ -120,6 +120,7 @@ class Metadata(object): parmConfObj.readXml(parmElement) self.parmConfObjList.append(parmConfObj) +@MPDecorator class FitsWriter(Operation): def __init__(self, **kwargs): Operation.__init__(self, **kwargs) @@ -283,7 +284,7 @@ class FitsWriter(Operation): self.isConfig = True self.putData() -@MPDecorator + class FitsReader(ProcessingUnit): # __TIMEZONE = time.timezone diff --git a/schainpy/model/io/jroIO_madrigal.py b/schainpy/model/io/jroIO_madrigal.py index c587169..485428c 100644 --- a/schainpy/model/io/jroIO_madrigal.py +++ b/schainpy/model/io/jroIO_madrigal.py @@ -78,7 +78,7 @@ def load_json(obj): return iterable -@MPDecorator + class MADReader(Reader, ProcessingUnit): def __init__(self): diff --git a/schainpy/model/io/jroIO_param.py b/schainpy/model/io/jroIO_param.py index cdb91da..0e176ba 100644 --- a/schainpy/model/io/jroIO_param.py +++ b/schainpy/model/io/jroIO_param.py @@ -11,7 +11,7 @@ from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecora from schainpy.model.io.jroIO_base import * from schainpy.utils import log -@MPDecorator + class ParamReader(JRODataReader,ProcessingUnit): ''' Reads HDF5 format files @@ -965,7 +965,7 @@ class ParamWriter(Operation): return -@MPDecorator + class ParameterReader(Reader, ProcessingUnit): ''' Reads HDF5 format files diff --git a/schainpy/model/io/jroIO_spectra.py b/schainpy/model/io/jroIO_spectra.py index b7683c7..81d7972 100644 --- a/schainpy/model/io/jroIO_spectra.py +++ b/schainpy/model/io/jroIO_spectra.py @@ -11,7 +11,7 @@ from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, from schainpy.model.data.jrodata import Spectra from schainpy.utils import log -@MPDecorator + class SpectraReader(JRODataReader, ProcessingUnit): """ Esta clase permite leer datos de espectros desde archivos procesados (.pdata). La lectura diff --git a/schainpy/model/io/jroIO_usrp.py b/schainpy/model/io/jroIO_usrp.py index 06d2d3c..785c2b9 100644 --- a/schainpy/model/io/jroIO_usrp.py +++ b/schainpy/model/io/jroIO_usrp.py @@ -14,7 +14,7 @@ except: from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader from schainpy.model.data.jrodata import Voltage -from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation +from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator try: import digital_rf_hdf5 @@ -546,6 +546,8 @@ class USRPReader(ProcessingUnit): return + +@MPDecorator class USRPWriter(Operation): ''' classdocs diff --git a/schainpy/model/io/jroIO_voltage.py b/schainpy/model/io/jroIO_voltage.py index 86f857e..cb484db 100644 --- a/schainpy/model/io/jroIO_voltage.py +++ b/schainpy/model/io/jroIO_voltage.py @@ -10,12 +10,8 @@ from .jroIO_base import LOCALTIME, JRODataReader, JRODataWriter from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader from schainpy.model.data.jrodata import Voltage -import zmq -import tempfile -from io import StringIO -# from _sha import blocksize -@MPDecorator + class VoltageReader(JRODataReader, ProcessingUnit): """ Esta clase permite leer datos de voltage desde archivos en formato rawdata (.r). La lectura diff --git a/schainpy/model/proc/bltrproc_parameters.py b/schainpy/model/proc/bltrproc_parameters.py index 30fcc63..a4d4b98 100644 --- a/schainpy/model/proc/bltrproc_parameters.py +++ b/schainpy/model/proc/bltrproc_parameters.py @@ -15,7 +15,7 @@ from numpy import transpose from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator from schainpy.model.data.jrodata import Parameters -@MPDecorator + class BLTRParametersProc(ProcessingUnit): ''' Processing unit for BLTR parameters data (winds) @@ -76,7 +76,7 @@ class BLTRParametersProc(ProcessingUnit): self.dataOut.data_param[i][SNRavgdB <= snr_threshold] = numpy.nan # TODO -@MPDecorator + class OutliersFilter(Operation): def __init__(self): diff --git a/schainpy/model/proc/jroproc_amisr.py b/schainpy/model/proc/jroproc_amisr.py index 223be95..93f3dd8 100644 --- a/schainpy/model/proc/jroproc_amisr.py +++ b/schainpy/model/proc/jroproc_amisr.py @@ -16,7 +16,7 @@ class AMISRProc(ProcessingUnit): self.dataOut.copy(self.dataIn) -class PrintInfo(Operation): +class PrintInfoAMISR(Operation): def __init__(self, **kwargs): Operation.__init__(self, **kwargs) self.__isPrinted = False diff --git a/schainpy/model/proc/jroproc_base.py b/schainpy/model/proc/jroproc_base.py index 75e7ffe..9d85238 100644 --- a/schainpy/model/proc/jroproc_base.py +++ b/schainpy/model/proc/jroproc_base.py @@ -1,19 +1,9 @@ ''' -Updated for multiprocessing -Author : Sergio Cortez -Jan 2018 -Abstract: - Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created. - The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated. - The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self'). - -Based on: - $Author: murco $ - $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $ +Base clases to create Processing units and operations, the MPDecorator +must be used in plotting and writing operations to allow to run as an +external process. ''' -import os -import sys import inspect import zmq import time @@ -24,27 +14,16 @@ try: except: from Queue import Queue from threading import Thread -from multiprocessing import Process - +from multiprocessing import Process, Queue from schainpy.utils import log class ProcessingUnit(object): + ''' + Base class to create Signal Chain Units + ''' - """ - Update - Jan 2018 - MULTIPROCESSING - All the "call" methods present in the previous base were removed. - The majority of operations are independant processes, thus - the decorator is in charge of communicate the operation processes - with the proccessing unit via IPC. - - The constructor does not receive any argument. The remaining methods - are related with the operations to execute. - - - """ proc_type = 'processing' - __attrs__ = [] def __init__(self): @@ -52,8 +31,11 @@ class ProcessingUnit(object): self.dataOut = None self.isConfig = False self.operations = [] - self.plots = [] + + def setInput(self, unit): + self.dataIn = unit.dataOut + def getAllowedArgs(self): if hasattr(self, '__attrs__'): return self.__attrs__ @@ -61,27 +43,10 @@ class ProcessingUnit(object): return inspect.getargspec(self.run).args def addOperation(self, conf, operation): - """ - This method is used in the controller, and update the dictionary containing the operations to execute. The dict - posses the id of the operation process (IPC purposes) - - Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el - identificador asociado a este objeto. - - Input: - - object : objeto de la clase "Operation" - - Return: - - objId : identificador del objeto, necesario para comunicar con master(procUnit) - """ - - self.operations.append( - (operation, conf.type, conf.id, conf.getKwargs())) + ''' + ''' - if 'plot' in self.name.lower(): - self.plots.append(operation.CODE) + self.operations.append((operation, conf.type, conf.getKwargs())) def getOperationObj(self, objId): @@ -90,17 +55,37 @@ class ProcessingUnit(object): return self.operations[objId] - def operation(self, **kwargs): - """ - Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los - atributos del objeto dataOut - - Input: - - **kwargs : Diccionario de argumentos de la funcion a ejecutar - """ + def call(self, **kwargs): + ''' + ''' + + try: + if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error: + return self.dataIn.isReady() + elif self.dataIn is None or not self.dataIn.error: + self.run(**kwargs) + elif self.dataIn.error: + self.dataOut.error = self.dataIn.error + self.dataOut.flagNoData = True + except: + err = traceback.format_exc() + if 'SchainWarning' in err: + log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name) + elif 'SchainError' in err: + log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), self.name) + else: + log.error(err, self.name) + self.dataOut.error = True + + for op, optype, opkwargs in self.operations: + if optype == 'other' and not self.dataOut.flagNoData: + self.dataOut = op.run(self.dataOut, **opkwargs) + elif optype == 'external' and not self.dataOut.flagNoData: + op.queue.put(self.dataOut) + elif optype == 'external' and self.dataOut.error: + op.queue.put(self.dataOut) - raise NotImplementedError + return 'Error' if self.dataOut.error else self.dataOut.isReady() def setup(self): @@ -117,22 +102,10 @@ class ProcessingUnit(object): class Operation(object): - """ - Update - Jan 2018 - MULTIPROCESSING - - Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process. - The constructor doe snot receive any argument, neither the baseclass. - - - Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit - y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de - acumulacion dentro de esta clase - - Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer) - - """ + ''' + ''' + proc_type = 'operation' - __attrs__ = [] def __init__(self): @@ -180,58 +153,12 @@ class Operation(object): return -class InputQueue(Thread): - - ''' - Class to hold input data for Proccessing Units and external Operations, - ''' - - def __init__(self, project_id, inputId, lock=None): - - Thread.__init__(self) - self.queue = Queue() - self.project_id = project_id - self.inputId = inputId - self.lock = lock - self.islocked = False - self.size = 0 - - def run(self): - - c = zmq.Context() - self.receiver = c.socket(zmq.SUB) - self.receiver.connect( - 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) - self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) - - while True: - obj = self.receiver.recv_multipart()[1] - self.size += sys.getsizeof(obj) - self.queue.put(obj) - - def get(self): - - if not self.islocked and self.size/1000000 > 512: - self.lock.n.value += 1 - self.islocked = True - self.lock.clear() - elif self.islocked and self.size/1000000 <= 512: - self.islocked = False - self.lock.n.value -= 1 - if self.lock.n.value == 0: - self.lock.set() - - obj = self.queue.get() - self.size -= sys.getsizeof(obj) - return pickle.loads(obj) - def MPDecorator(BaseClass): """ Multiprocessing class decorator - This function add multiprocessing features to a BaseClass. Also, it handle - the communication beetween processes (readers, procUnits and operations). + This function add multiprocessing features to a BaseClass. """ class MPClass(BaseClass, Process): @@ -239,191 +166,42 @@ def MPDecorator(BaseClass): def __init__(self, *args, **kwargs): super(MPClass, self).__init__() Process.__init__(self) - self.operationKwargs = {} + self.args = args self.kwargs = kwargs - self.sender = None - self.receiver = None - self.i = 0 self.t = time.time() + self.op_type = 'external' self.name = BaseClass.__name__ self.__doc__ = BaseClass.__doc__ if 'plot' in self.name.lower() and not self.name.endswith('_'): self.name = '{}{}'.format(self.CODE.upper(), 'Plot') - self.start_time = time.time() - self.id = args[0] - self.inputId = args[1] - self.project_id = args[2] + self.start_time = time.time() self.err_queue = args[3] - self.lock = args[4] - self.typeProc = args[5] - self.err_queue.put('#_start_#') - if self.inputId is not None: - self.queue = InputQueue(self.project_id, self.inputId, self.lock) - - def subscribe(self): - ''' - Start the zmq socket receiver and subcribe to input ID. - ''' - - self.queue.start() - - def listen(self): - ''' - This function waits for objects - ''' - - return self.queue.get() - - def set_publisher(self): - ''' - This function create a zmq socket for publishing objects. - ''' - - time.sleep(0.5) - - c = zmq.Context() - self.sender = c.socket(zmq.PUB) - self.sender.connect( - 'ipc:///tmp/schain/{}_sub'.format(self.project_id)) - - def publish(self, data, id): - ''' - This function publish an object, to an specific topic. - It blocks publishing when receiver queue is full to avoid data loss - ''' - - if self.inputId is None: - self.lock.wait() - self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) - - def runReader(self): - ''' - Run fuction for read units - ''' - while True: - - try: - BaseClass.run(self, **self.kwargs) - except: - err = traceback.format_exc() - if 'No more files' in err: - log.warning('No more files to read', self.name) - else: - self.err_queue.put('{}|{}'.format(self.name, err)) - self.dataOut.error = True - - for op, optype, opId, kwargs in self.operations: - if optype == 'self' and not self.dataOut.flagNoData: - op(**kwargs) - elif optype == 'other' and not self.dataOut.flagNoData: - self.dataOut = op.run(self.dataOut, **self.kwargs) - elif optype == 'external': - self.publish(self.dataOut, opId) - - if self.dataOut.flagNoData and not self.dataOut.error: - continue - - self.publish(self.dataOut, self.id) - - if self.dataOut.error: - break - - time.sleep(0.5) - - def runProc(self): - ''' - Run function for proccessing units - ''' - - while True: - self.dataIn = self.listen() - - if self.dataIn.flagNoData and self.dataIn.error is None: - continue - elif not self.dataIn.error: - try: - BaseClass.run(self, **self.kwargs) - except: - self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc())) - self.dataOut.error = True - elif self.dataIn.error: - self.dataOut.error = self.dataIn.error - self.dataOut.flagNoData = True - - for op, optype, opId, kwargs in self.operations: - if optype == 'self' and not self.dataOut.flagNoData: - op(**kwargs) - elif optype == 'other' and not self.dataOut.flagNoData: - self.dataOut = op.run(self.dataOut, **kwargs) - elif optype == 'external' and not self.dataOut.flagNoData: - self.publish(self.dataOut, opId) - - self.publish(self.dataOut, self.id) - for op, optype, opId, kwargs in self.operations: - if optype == 'external' and self.dataOut.error: - self.publish(self.dataOut, opId) - - if self.dataOut.error: - break - - time.sleep(0.5) + self.queue = Queue(maxsize=1) + self.myrun = BaseClass.run - def runOp(self): - ''' - Run function for external operations (this operations just receive data - ex: plots, writers, publishers) - ''' + def run(self): while True: - dataOut = self.listen() + dataOut = self.queue.get() if not dataOut.error: try: BaseClass.run(self, dataOut, **self.kwargs) except: - self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc())) - dataOut.error = True - else: - break - - def run(self): - if self.typeProc is "ProcUnit": - - if self.inputId is not None: - self.subscribe() - - self.set_publisher() - - if 'Reader' not in BaseClass.__name__: - self.runProc() + err = traceback.format_exc() + log.error(err.split('\n')[-2], self.name) else: - self.runReader() - - elif self.typeProc is "Operation": - - self.subscribe() - self.runOp() - - else: - raise ValueError("Unknown type") + break self.close() def close(self): BaseClass.close(self) - self.err_queue.put('#_end_#') - - if self.sender: - self.sender.close() - - if self.receiver: - self.receiver.close() - log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name) return MPClass diff --git a/schainpy/model/proc/jroproc_correlation.py b/schainpy/model/proc/jroproc_correlation.py index f3b9ae0..76a92fe 100644 --- a/schainpy/model/proc/jroproc_correlation.py +++ b/schainpy/model/proc/jroproc_correlation.py @@ -1,7 +1,7 @@ import numpy from .jroproc_base import ProcessingUnit, Operation -from schainpy.model.data.jrodata import Correlation, hildebrand_sekhon +from schainpy.model.data.jrodata import Correlation class CorrelationProc(ProcessingUnit): diff --git a/schainpy/model/proc/jroproc_heispectra.py b/schainpy/model/proc/jroproc_heispectra.py index 1e3a11e..2fbd0b1 100644 --- a/schainpy/model/proc/jroproc_heispectra.py +++ b/schainpy/model/proc/jroproc_heispectra.py @@ -5,7 +5,7 @@ from schainpy.model.data.jrodata import SpectraHeis from schainpy.utils import log -@MPDecorator + class SpectraHeisProc(ProcessingUnit): def __init__(self):#, **kwargs): diff --git a/schainpy/model/proc/jroproc_parameters.py b/schainpy/model/proc/jroproc_parameters.py index 571dfe7..c880b58 100755 --- a/schainpy/model/proc/jroproc_parameters.py +++ b/schainpy/model/proc/jroproc_parameters.py @@ -46,7 +46,7 @@ def _unpickle_method(func_name, obj, cls): break return func.__get__(obj, cls) -@MPDecorator + class ParametersProc(ProcessingUnit): METHODS = {} @@ -1329,13 +1329,12 @@ class SpectralMoments(Operation): def run(self, dataOut): - #dataOut.data_pre = dataOut.data_pre[0] data = dataOut.data_pre[0] absc = dataOut.abscissaList[:-1] noise = dataOut.noise nChannel = data.shape[0] data_param = numpy.zeros((nChannel, 4, data.shape[2])) - + for ind in range(nChannel): data_param[ind,:,:] = self.__calculateMoments( data[ind,:,:] , absc , noise[ind] ) @@ -1344,6 +1343,7 @@ class SpectralMoments(Operation): dataOut.data_POW = data_param[:,1] dataOut.data_DOP = data_param[:,2] dataOut.data_WIDTH = data_param[:,3] + return dataOut def __calculateMoments(self, oldspec, oldfreq, n0, @@ -1370,25 +1370,27 @@ class SpectralMoments(Operation): vec_w = numpy.zeros(oldspec.shape[1]) vec_snr = numpy.zeros(oldspec.shape[1]) - oldspec = numpy.ma.masked_invalid(oldspec) - + # oldspec = numpy.ma.masked_invalid(oldspec) + for ind in range(oldspec.shape[1]): - + spec = oldspec[:,ind] aux = spec*fwindow max_spec = aux.max() - m = list(aux).index(max_spec) - + m = aux.tolist().index(max_spec) + #Smooth - if (smooth == 0): spec2 = spec - else: spec2 = scipy.ndimage.filters.uniform_filter1d(spec,size=smooth) - + if (smooth == 0): + spec2 = spec + else: + spec2 = scipy.ndimage.filters.uniform_filter1d(spec,size=smooth) + # Calculo de Momentos - bb = spec2[list(range(m,spec2.size))] + bb = spec2[numpy.arange(m,spec2.size)] bb = (bb m): ss1 = m + if (ss1 > m): + ss1 = m - valid = numpy.asarray(list(range(int(m + bb0 - ss1 + 1)))) + ss1 - power = ((spec2[valid] - n0)*fwindow[valid]).sum() - fd = ((spec2[valid]- n0)*freq[valid]*fwindow[valid]).sum()/power - w = math.sqrt(((spec2[valid] - n0)*fwindow[valid]*(freq[valid]- fd)**2).sum()/power) - snr = (spec2.mean()-n0)/n0 + valid = numpy.arange(int(m + bb0 - ss1 + 1)) + ss1 + power = ((spec2[valid] - n0) * fwindow[valid]).sum() + fd = ((spec2[valid]- n0)*freq[valid] * fwindow[valid]).sum() / power + w = numpy.sqrt(((spec2[valid] - n0)*fwindow[valid]*(freq[valid]- fd)**2).sum() / power) + snr = (spec2.mean()-n0)/n0 if (snr < 1.e-20) : snr = 1.e-20 @@ -1417,9 +1422,8 @@ class SpectralMoments(Operation): vec_fd[ind] = fd vec_w[ind] = w vec_snr[ind] = snr - - moments = numpy.vstack((vec_snr, vec_power, vec_fd, vec_w)) - return moments + + return numpy.vstack((vec_snr, vec_power, vec_fd, vec_w)) #------------------ Get SA Parameters -------------------------- diff --git a/schainpy/model/proc/jroproc_spectra.py b/schainpy/model/proc/jroproc_spectra.py index d83d0e0..6197449 100644 --- a/schainpy/model/proc/jroproc_spectra.py +++ b/schainpy/model/proc/jroproc_spectra.py @@ -1,3 +1,4 @@ +import time import itertools import numpy @@ -7,7 +8,7 @@ from schainpy.model.data.jrodata import Spectra from schainpy.model.data.jrodata import hildebrand_sekhon from schainpy.utils import log -@MPDecorator + class SpectraProc(ProcessingUnit): @@ -120,7 +121,7 @@ class SpectraProc(ProcessingUnit): self.dataOut.flagShiftFFT = False def run(self, nProfiles=None, nFFTPoints=None, pairsList=[], ippFactor=None, shift_fft=False): - + if self.dataIn.type == "Spectra": self.dataOut.copy(self.dataIn) if shift_fft: @@ -219,81 +220,6 @@ class SpectraProc(ProcessingUnit): self.dataOut.pairsList = pairs return - - def __selectPairsByChannel(self, channelList=None): - - if channelList == None: - return - - pairsIndexListSelected = [] - for pairIndex in self.dataOut.pairsIndexList: - # First pair - if self.dataOut.pairsList[pairIndex][0] not in channelList: - continue - # Second pair - if self.dataOut.pairsList[pairIndex][1] not in channelList: - continue - - pairsIndexListSelected.append(pairIndex) - - if not pairsIndexListSelected: - self.dataOut.data_cspc = None - self.dataOut.pairsList = [] - return - - self.dataOut.data_cspc = self.dataOut.data_cspc[pairsIndexListSelected] - self.dataOut.pairsList = [self.dataOut.pairsList[i] - for i in pairsIndexListSelected] - - return - - def selectChannels(self, channelList): - - channelIndexList = [] - - for channel in channelList: - if channel not in self.dataOut.channelList: - raise ValueError("Error selecting channels, Channel %d is not valid.\nAvailable channels = %s" % ( - channel, str(self.dataOut.channelList))) - - index = self.dataOut.channelList.index(channel) - channelIndexList.append(index) - - self.selectChannelsByIndex(channelIndexList) - - def selectChannelsByIndex(self, channelIndexList): - """ - Selecciona un bloque de datos en base a canales segun el channelIndexList - - Input: - channelIndexList : lista sencilla de canales a seleccionar por ej. [2,3,7] - - Affected: - self.dataOut.data_spc - self.dataOut.channelIndexList - self.dataOut.nChannels - - Return: - None - """ - - for channelIndex in channelIndexList: - if channelIndex not in self.dataOut.channelIndexList: - raise ValueError("Error selecting channels: The value %d in channelIndexList is not valid.\nAvailable channel indexes = " % ( - channelIndex, self.dataOut.channelIndexList)) - - data_spc = self.dataOut.data_spc[channelIndexList, :] - data_dc = self.dataOut.data_dc[channelIndexList, :] - - self.dataOut.data_spc = data_spc - self.dataOut.data_dc = data_dc - - # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList] - self.dataOut.channelList = range(len(channelIndexList)) - self.__selectPairsByChannel(channelIndexList) - - return 1 - def selectFFTs(self, minFFT, maxFFT ): """ @@ -331,67 +257,6 @@ class SpectraProc(ProcessingUnit): return 1 - - def setH0(self, h0, deltaHeight = None): - - if not deltaHeight: - deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0] - - nHeights = self.dataOut.nHeights - - newHeiRange = h0 + numpy.arange(nHeights)*deltaHeight - - self.dataOut.heightList = newHeiRange - - - def selectHeights(self, minHei, maxHei): - """ - Selecciona un bloque de datos en base a un grupo de valores de alturas segun el rango - minHei <= height <= maxHei - - Input: - minHei : valor minimo de altura a considerar - maxHei : valor maximo de altura a considerar - - Affected: - Indirectamente son cambiados varios valores a travez del metodo selectHeightsByIndex - - Return: - 1 si el metodo se ejecuto con exito caso contrario devuelve 0 - """ - - - if (minHei > maxHei): - raise ValueError("Error selecting heights: Height range (%d,%d) is not valid" % (minHei, maxHei)) - - if (minHei < self.dataOut.heightList[0]): - minHei = self.dataOut.heightList[0] - - if (maxHei > self.dataOut.heightList[-1]): - maxHei = self.dataOut.heightList[-1] - - minIndex = 0 - maxIndex = 0 - heights = self.dataOut.heightList - - inda = numpy.where(heights >= minHei) - indb = numpy.where(heights <= maxHei) - - try: - minIndex = inda[0][0] - except: - minIndex = 0 - - try: - maxIndex = indb[0][-1] - except: - maxIndex = len(heights) - - self.selectHeightsByIndex(minIndex, maxIndex) - - - return 1 - def getBeaconSignal(self, tauindex=0, channelindex=0, hei_ref=None): newheis = numpy.where( self.dataOut.heightList > self.dataOut.radarControllerHeaderObj.Taus[tauindex]) @@ -466,54 +331,100 @@ class SpectraProc(ProcessingUnit): return 1 + def getNoise(self, minHei=None, maxHei=None, minVel=None, maxVel=None): + # validacion de rango + if minHei == None: + minHei = self.dataOut.heightList[0] + if maxHei == None: + maxHei = self.dataOut.heightList[-1] - def selectHeightsByIndex(self, minIndex, maxIndex): - """ - Selecciona un bloque de datos en base a un grupo indices de alturas segun el rango - minIndex <= index <= maxIndex + if (minHei < self.dataOut.heightList[0]) or (minHei > maxHei): + print('minHei: %.2f is out of the heights range' % (minHei)) + print('minHei is setting to %.2f' % (self.dataOut.heightList[0])) + minHei = self.dataOut.heightList[0] - Input: - minIndex : valor de indice minimo de altura a considerar - maxIndex : valor de indice maximo de altura a considerar + if (maxHei > self.dataOut.heightList[-1]) or (maxHei < minHei): + print('maxHei: %.2f is out of the heights range' % (maxHei)) + print('maxHei is setting to %.2f' % (self.dataOut.heightList[-1])) + maxHei = self.dataOut.heightList[-1] - Affected: - self.dataOut.data_spc - self.dataOut.data_cspc - self.dataOut.data_dc - self.dataOut.heightList + # validacion de velocidades + velrange = self.dataOut.getVelRange(1) - Return: - 1 si el metodo se ejecuto con exito caso contrario devuelve 0 - """ + if minVel == None: + minVel = velrange[0] + + if maxVel == None: + maxVel = velrange[-1] + + if (minVel < velrange[0]) or (minVel > maxVel): + print('minVel: %.2f is out of the velocity range' % (minVel)) + print('minVel is setting to %.2f' % (velrange[0])) + minVel = velrange[0] + + if (maxVel > velrange[-1]) or (maxVel < minVel): + print('maxVel: %.2f is out of the velocity range' % (maxVel)) + print('maxVel is setting to %.2f' % (velrange[-1])) + maxVel = velrange[-1] + + # seleccion de indices para rango + minIndex = 0 + maxIndex = 0 + heights = self.dataOut.heightList + + inda = numpy.where(heights >= minHei) + indb = numpy.where(heights <= maxHei) + + try: + minIndex = inda[0][0] + except: + minIndex = 0 + + try: + maxIndex = indb[0][-1] + except: + maxIndex = len(heights) if (minIndex < 0) or (minIndex > maxIndex): - raise ValueError("Error selecting heights: Index range (%d,%d) is not valid" % ( + raise ValueError("some value in (%d,%d) is not valid" % ( minIndex, maxIndex)) if (maxIndex >= self.dataOut.nHeights): maxIndex = self.dataOut.nHeights - 1 - # Spectra - data_spc = self.dataOut.data_spc[:, :, minIndex:maxIndex + 1] + # seleccion de indices para velocidades + indminvel = numpy.where(velrange >= minVel) + indmaxvel = numpy.where(velrange <= maxVel) + try: + minIndexVel = indminvel[0][0] + except: + minIndexVel = 0 - data_cspc = None - if self.dataOut.data_cspc is not None: - data_cspc = self.dataOut.data_cspc[:, :, minIndex:maxIndex + 1] + try: + maxIndexVel = indmaxvel[0][-1] + except: + maxIndexVel = len(velrange) - data_dc = None - if self.dataOut.data_dc is not None: - data_dc = self.dataOut.data_dc[:, minIndex:maxIndex + 1] + # seleccion del espectro + data_spc = self.dataOut.data_spc[:, + minIndexVel:maxIndexVel + 1, minIndex:maxIndex + 1] + # estimacion de ruido + noise = numpy.zeros(self.dataOut.nChannels) - self.dataOut.data_spc = data_spc - self.dataOut.data_cspc = data_cspc - self.dataOut.data_dc = data_dc + for channel in range(self.dataOut.nChannels): + daux = data_spc[channel, :, :] + sortdata = numpy.sort(daux, axis=None) + noise[channel] = hildebrand_sekhon(sortdata, self.dataOut.nIncohInt) - self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex + 1] + self.dataOut.noise_estimation = noise.copy() return 1 - def removeDC(self, mode=2): +class removeDC(Operation): + + def run(self, dataOut, mode=2): + self.dataOut = dataOut jspectra = self.dataOut.data_spc jcspectra = self.dataOut.data_cspc @@ -571,7 +482,9 @@ class SpectraProc(ProcessingUnit): self.dataOut.data_spc = jspectra self.dataOut.data_cspc = jcspectra - return 1 + return self.dataOut + +class removeInterference(Operation): def removeInterference2(self): @@ -594,11 +507,9 @@ class SpectraProc(ProcessingUnit): if len(InterferenceRange) maxHei): - print('minHei: %.2f is out of the heights range' % (minHei)) - print('minHei is setting to %.2f' % (self.dataOut.heightList[0])) - minHei = self.dataOut.heightList[0] - - if (maxHei > self.dataOut.heightList[-1]) or (maxHei < minHei): - print('maxHei: %.2f is out of the heights range' % (maxHei)) - print('maxHei is setting to %.2f' % (self.dataOut.heightList[-1])) - maxHei = self.dataOut.heightList[-1] - - # validacion de velocidades - velrange = self.dataOut.getVelRange(1) + def run(self, dataOut, interf = 2,hei_interf = None, nhei_interf = None, offhei_interf = None, mode=1): - if minVel == None: - minVel = velrange[0] + self.dataOut = dataOut - if maxVel == None: - maxVel = velrange[-1] - - if (minVel < velrange[0]) or (minVel > maxVel): - print('minVel: %.2f is out of the velocity range' % (minVel)) - print('minVel is setting to %.2f' % (velrange[0])) - minVel = velrange[0] - - if (maxVel > velrange[-1]) or (maxVel < minVel): - print('maxVel: %.2f is out of the velocity range' % (maxVel)) - print('maxVel is setting to %.2f' % (velrange[-1])) - maxVel = velrange[-1] - - # seleccion de indices para rango - minIndex = 0 - maxIndex = 0 - heights = self.dataOut.heightList - - inda = numpy.where(heights >= minHei) - indb = numpy.where(heights <= maxHei) - - try: - minIndex = inda[0][0] - except: - minIndex = 0 - - try: - maxIndex = indb[0][-1] - except: - maxIndex = len(heights) - - if (minIndex < 0) or (minIndex > maxIndex): - raise ValueError("some value in (%d,%d) is not valid" % ( - minIndex, maxIndex)) - - if (maxIndex >= self.dataOut.nHeights): - maxIndex = self.dataOut.nHeights - 1 - - # seleccion de indices para velocidades - indminvel = numpy.where(velrange >= minVel) - indmaxvel = numpy.where(velrange <= maxVel) - try: - minIndexVel = indminvel[0][0] - except: - minIndexVel = 0 - - try: - maxIndexVel = indmaxvel[0][-1] - except: - maxIndexVel = len(velrange) - - # seleccion del espectro - data_spc = self.dataOut.data_spc[:, - minIndexVel:maxIndexVel + 1, minIndex:maxIndex + 1] - # estimacion de ruido - noise = numpy.zeros(self.dataOut.nChannels) - - for channel in range(self.dataOut.nChannels): - daux = data_spc[channel, :, :] - noise[channel] = hildebrand_sekhon(daux, self.dataOut.nIncohInt) - - self.dataOut.noise_estimation = noise.copy() + if mode == 1: + self.removeInterference(interf = 2,hei_interf = None, nhei_interf = None, offhei_interf = None) + elif mode == 2: + self.removeInterference2() - return 1 + return self.dataOut class IncohInt(Operation): @@ -1031,7 +857,7 @@ class IncohInt(Operation): def run(self, dataOut, n=None, timeInterval=None, overlapping=False): if n == 1: - return + return dataOut dataOut.flagNoData = True diff --git a/schainpy/model/proc/jroproc_voltage.py b/schainpy/model/proc/jroproc_voltage.py index 8c75d37..ac33b0c 100644 --- a/schainpy/model/proc/jroproc_voltage.py +++ b/schainpy/model/proc/jroproc_voltage.py @@ -7,7 +7,7 @@ from schainpy.utils import log from time import time -@MPDecorator + class VoltageProc(ProcessingUnit): def __init__(self): @@ -26,8 +26,6 @@ class VoltageProc(ProcessingUnit): if self.dataIn.type == 'Voltage': self.dataOut.copy(self.dataIn) - # self.dataOut.copy(self.dataIn) - def __updateObjFromAmisrInput(self): self.dataOut.timeZone = self.dataIn.timeZone @@ -53,23 +51,14 @@ class VoltageProc(ProcessingUnit): self.dataOut.beam.codeList = self.dataIn.beam.codeList self.dataOut.beam.azimuthList = self.dataIn.beam.azimuthList self.dataOut.beam.zenithList = self.dataIn.beam.zenithList - # - # pass# - # - # def init(self): - # - # - # if self.dataIn.type == 'AMISR': - # self.__updateObjFromAmisrInput() - # - # if self.dataIn.type == 'Voltage': - # self.dataOut.copy(self.dataIn) - # # No necesita copiar en cada init() los atributos de dataIn - # # la copia deberia hacerse por cada nuevo bloque de datos - - def selectChannels(self, channelList): + + +class selectChannels(Operation): + + def run(self, dataOut, channelList): channelIndexList = [] + self.dataOut = dataOut for channel in channelList: if channel not in self.dataOut.channelList: @@ -79,6 +68,7 @@ class VoltageProc(ProcessingUnit): channelIndexList.append(index) self.selectChannelsByIndex(channelIndexList) + return self.dataOut def selectChannelsByIndex(self, channelIndexList): """ @@ -101,24 +91,63 @@ class VoltageProc(ProcessingUnit): for channelIndex in channelIndexList: if channelIndex not in self.dataOut.channelIndexList: - print(channelIndexList) raise ValueError("The value %d in channelIndexList is not valid" %channelIndex) - if self.dataOut.flagDataAsBlock: - """ - Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis] - """ - data = self.dataOut.data[channelIndexList,:,:] - else: - data = self.dataOut.data[channelIndexList,:] + if self.dataOut.type == 'Voltage': + if self.dataOut.flagDataAsBlock: + """ + Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis] + """ + data = self.dataOut.data[channelIndexList,:,:] + else: + data = self.dataOut.data[channelIndexList,:] + + self.dataOut.data = data + # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList] + self.dataOut.channelList = range(len(channelIndexList)) + elif self.dataOut.type == 'Spectra': + data_spc = self.dataOut.data_spc[channelIndexList, :] + data_dc = self.dataOut.data_dc[channelIndexList, :] + + self.dataOut.data_spc = data_spc + self.dataOut.data_dc = data_dc + + # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList] + self.dataOut.channelList = range(len(channelIndexList)) + self.__selectPairsByChannel(channelIndexList) - self.dataOut.data = data - # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList] - self.dataOut.channelList = range(len(channelIndexList)) - return 1 - def selectHeights(self, minHei=None, maxHei=None): + def __selectPairsByChannel(self, channelList=None): + + if channelList == None: + return + + pairsIndexListSelected = [] + for pairIndex in self.dataOut.pairsIndexList: + # First pair + if self.dataOut.pairsList[pairIndex][0] not in channelList: + continue + # Second pair + if self.dataOut.pairsList[pairIndex][1] not in channelList: + continue + + pairsIndexListSelected.append(pairIndex) + + if not pairsIndexListSelected: + self.dataOut.data_cspc = None + self.dataOut.pairsList = [] + return + + self.dataOut.data_cspc = self.dataOut.data_cspc[pairsIndexListSelected] + self.dataOut.pairsList = [self.dataOut.pairsList[i] + for i in pairsIndexListSelected] + + return + +class selectHeights(Operation): + + def run(self, dataOut, minHei=None, maxHei=None): """ Selecciona un bloque de datos en base a un grupo de valores de alturas segun el rango minHei <= height <= maxHei @@ -134,6 +163,8 @@ class VoltageProc(ProcessingUnit): 1 si el metodo se ejecuto con exito caso contrario devuelve 0 """ + self.dataOut = dataOut + if minHei == None: minHei = self.dataOut.heightList[0] @@ -165,8 +196,7 @@ class VoltageProc(ProcessingUnit): self.selectHeightsByIndex(minIndex, maxIndex) - return 1 - + return self.dataOut def selectHeightsByIndex(self, minIndex, maxIndex): """ @@ -185,81 +215,118 @@ class VoltageProc(ProcessingUnit): 1 si el metodo se ejecuto con exito caso contrario devuelve 0 """ - if (minIndex < 0) or (minIndex > maxIndex): - raise ValueError("Height index range (%d,%d) is not valid" % (minIndex, maxIndex)) + if self.dataOut.type == 'Voltage': + if (minIndex < 0) or (minIndex > maxIndex): + raise ValueError("Height index range (%d,%d) is not valid" % (minIndex, maxIndex)) - if (maxIndex >= self.dataOut.nHeights): - maxIndex = self.dataOut.nHeights + if (maxIndex >= self.dataOut.nHeights): + maxIndex = self.dataOut.nHeights - #voltage - if self.dataOut.flagDataAsBlock: - """ - Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis] - """ - data = self.dataOut.data[:,:, minIndex:maxIndex] - else: - data = self.dataOut.data[:, minIndex:maxIndex] + #voltage + if self.dataOut.flagDataAsBlock: + """ + Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis] + """ + data = self.dataOut.data[:,:, minIndex:maxIndex] + else: + data = self.dataOut.data[:, minIndex:maxIndex] + + # firstHeight = self.dataOut.heightList[minIndex] + + self.dataOut.data = data + self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex] + + if self.dataOut.nHeights <= 1: + raise ValueError("selectHeights: Too few heights. Current number of heights is %d" %(self.dataOut.nHeights)) + elif self.dataOut.type == 'Spectra': + if (minIndex < 0) or (minIndex > maxIndex): + raise ValueError("Error selecting heights: Index range (%d,%d) is not valid" % ( + minIndex, maxIndex)) - # firstHeight = self.dataOut.heightList[minIndex] + if (maxIndex >= self.dataOut.nHeights): + maxIndex = self.dataOut.nHeights - 1 - self.dataOut.data = data - self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex] + # Spectra + data_spc = self.dataOut.data_spc[:, :, minIndex:maxIndex + 1] - if self.dataOut.nHeights <= 1: - raise ValueError("selectHeights: Too few heights. Current number of heights is %d" %(self.dataOut.nHeights)) + data_cspc = None + if self.dataOut.data_cspc is not None: + data_cspc = self.dataOut.data_cspc[:, :, minIndex:maxIndex + 1] + data_dc = None + if self.dataOut.data_dc is not None: + data_dc = self.dataOut.data_dc[:, minIndex:maxIndex + 1] + + self.dataOut.data_spc = data_spc + self.dataOut.data_cspc = data_cspc + self.dataOut.data_dc = data_dc + + self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex + 1] + return 1 - def filterByHeights(self, window): +class filterByHeights(Operation): + + def run(self, dataOut, window): - deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0] + deltaHeight = dataOut.heightList[1] - dataOut.heightList[0] if window == None: - window = (self.dataOut.radarControllerHeaderObj.txA/self.dataOut.radarControllerHeaderObj.nBaud) / deltaHeight + window = (dataOut.radarControllerHeaderObj.txA/dataOut.radarControllerHeaderObj.nBaud) / deltaHeight newdelta = deltaHeight * window - r = self.dataOut.nHeights % window - newheights = (self.dataOut.nHeights-r)/window + r = dataOut.nHeights % window + newheights = (dataOut.nHeights-r)/window if newheights <= 1: - raise ValueError("filterByHeights: Too few heights. Current number of heights is %d and window is %d" %(self.dataOut.nHeights, window)) + raise ValueError("filterByHeights: Too few heights. Current number of heights is %d and window is %d" %(dataOut.nHeights, window)) - if self.dataOut.flagDataAsBlock: + if dataOut.flagDataAsBlock: """ Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis] """ - buffer = self.dataOut.data[:, :, 0:int(self.dataOut.nHeights-r)] - buffer = buffer.reshape(self.dataOut.nChannels, self.dataOut.nProfiles, int(self.dataOut.nHeights/window), window) + buffer = dataOut.data[:, :, 0:int(dataOut.nHeights-r)] + buffer = buffer.reshape(dataOut.nChannels, dataOut.nProfiles, int(dataOut.nHeights/window), window) buffer = numpy.sum(buffer,3) else: - buffer = self.dataOut.data[:,0:int(self.dataOut.nHeights-r)] - buffer = buffer.reshape(self.dataOut.nChannels,int(self.dataOut.nHeights/window),int(window)) + buffer = dataOut.data[:,0:int(dataOut.nHeights-r)] + buffer = buffer.reshape(dataOut.nChannels,int(dataOut.nHeights/window),int(window)) buffer = numpy.sum(buffer,2) - self.dataOut.data = buffer - self.dataOut.heightList = self.dataOut.heightList[0] + numpy.arange( newheights )*newdelta - self.dataOut.windowOfFilter = window + dataOut.data = buffer + dataOut.heightList = dataOut.heightList[0] + numpy.arange( newheights )*newdelta + dataOut.windowOfFilter = window + + return dataOut + + +class setH0(Operation): - def setH0(self, h0, deltaHeight = None): + def run(self, dataOut, h0, deltaHeight = None): if not deltaHeight: - deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0] + deltaHeight = dataOut.heightList[1] - dataOut.heightList[0] - nHeights = self.dataOut.nHeights + nHeights = dataOut.nHeights newHeiRange = h0 + numpy.arange(nHeights)*deltaHeight - self.dataOut.heightList = newHeiRange + dataOut.heightList = newHeiRange + + return dataOut + + +class deFlip(Operation): - def deFlip(self, channelList = []): + def run(self, dataOut, channelList = []): - data = self.dataOut.data.copy() + data = dataOut.data.copy() - if self.dataOut.flagDataAsBlock: + if dataOut.flagDataAsBlock: flip = self.flip - profileList = list(range(self.dataOut.nProfiles)) + profileList = list(range(dataOut.nProfiles)) if not channelList: for thisProfile in profileList: @@ -267,7 +334,7 @@ class VoltageProc(ProcessingUnit): flip *= -1.0 else: for thisChannel in channelList: - if thisChannel not in self.dataOut.channelList: + if thisChannel not in dataOut.channelList: continue for thisProfile in profileList: @@ -281,41 +348,57 @@ class VoltageProc(ProcessingUnit): data[:,:] = data[:,:]*self.flip else: for thisChannel in channelList: - if thisChannel not in self.dataOut.channelList: + if thisChannel not in dataOut.channelList: continue data[thisChannel,:] = data[thisChannel,:]*self.flip self.flip *= -1. - self.dataOut.data = data + dataOut.data = data + + return dataOut - def setRadarFrequency(self, frequency=None): - if frequency != None: - self.dataOut.frequency = frequency +class setAttribute(Operation): + ''' + Set an arbitrary attribute to dataOut + ''' - return 1 + def __init__(self): + + Operation.__init__(self) + self._ready = False - def interpolateHeights(self, topLim, botLim): + def run(self, dataOut, **kwargs): + + for key, value in kwargs.items(): + setattr(dataOut, key, value) + + return dataOut + + +class interpolateHeights(Operation): + + def run(self, dataOut, topLim, botLim): #69 al 72 para julia #82-84 para meteoros - if len(numpy.shape(self.dataOut.data))==2: - sampInterp = (self.dataOut.data[:,botLim-1] + self.dataOut.data[:,topLim+1])/2 + if len(numpy.shape(dataOut.data))==2: + sampInterp = (dataOut.data[:,botLim-1] + dataOut.data[:,topLim+1])/2 sampInterp = numpy.transpose(numpy.tile(sampInterp,(topLim-botLim + 1,1))) - #self.dataOut.data[:,botLim:limSup+1] = sampInterp - self.dataOut.data[:,botLim:topLim+1] = sampInterp + #dataOut.data[:,botLim:limSup+1] = sampInterp + dataOut.data[:,botLim:topLim+1] = sampInterp else: - nHeights = self.dataOut.data.shape[2] + nHeights = dataOut.data.shape[2] x = numpy.hstack((numpy.arange(botLim),numpy.arange(topLim+1,nHeights))) - y = self.dataOut.data[:,:,list(range(botLim))+list(range(topLim+1,nHeights))] + y = dataOut.data[:,:,list(range(botLim))+list(range(topLim+1,nHeights))] f = interpolate.interp1d(x, y, axis = 2) xnew = numpy.arange(botLim,topLim+1) ynew = f(xnew) + dataOut.data[:,:,botLim:topLim+1] = ynew - self.dataOut.data[:,:,botLim:topLim+1] = ynew + return dataOut - # import collections class CohInt(Operation): @@ -979,8 +1062,6 @@ class ProfileSelector(Operation): raise ValueError("ProfileSelector needs profileList, profileRangeList or rangeList parameter") - #return False - return dataOut class Reshaper(Operation):