##// END OF EJS Templates
Remove C extension
Remove C extension

File last commit:

r1173:0148df60175f
r1176:d4dec4c4d8ae
Show More
jroproc_base.py
537 lines | 15.6 KiB | text/x-python | PythonLexer
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 '''
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Updated for multiprocessing
Author : Sergio Cortez
Jan 2018
Abstract:
Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
Based on:
$Author: murco $
$Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 '''
George Yong
Multiprocessing for Spectra (all operation) working
r1171 from platform import python_version
José Chávez
checking misspelled kwargs in operations/processing units
r929 import inspect
George Yong
Multiprocessing for Spectra (all operation) working
r1171 import zmq
import time
import pickle
import os
from multiprocessing import Process
José Chávez
checking misspelled kwargs in operations/processing units
r929
George Yong
Multiprocessing for Spectra (all operation) working
r1171 from schainpy.utils import log
José Chávez
checking misspelled kwargs in operations/processing units
r929
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487
Miguel Valdez
Merge with branch schain_julia_drifts from rev. 803 to 995....
r568 class ProcessingUnit(object):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Update - Jan 2018 - MULTIPROCESSING
All the "call" methods present in the previous base were removed.
The majority of operations are independant processes, thus
the decorator is in charge of communicate the operation processes
with the proccessing unit via IPC.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 The constructor does not receive any argument. The remaining methods
are related with the operations to execute.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
# objeto de datos de entrada (Voltage, Spectra o Correlation)
dataIn = None
dataInList = []
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 # objeto de datos de entrada (Voltage, Spectra o Correlation)
George Yong
Multiprocessing for Spectra (all operation) working
r1171
id = None
inputId = None
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 dataOut = None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 dictProcs = None
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 operations2RunDict = None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 isConfig = False
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def __init__(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 self.dataIn = None
Miguel Valdez
A new SendToServer Unit has been created to upload files to a remote server....
r573 self.dataOut = None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 self.isConfig = False
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
José Chávez
checking misspelled kwargs in operations/processing units
r929 def getAllowedArgs(self):
Juan C. Espinoza
Add __attrs__ attribute to Process clasess to improve CLI finder
r1097 if hasattr(self, '__attrs__'):
return self.__attrs__
else:
return inspect.getargspec(self.run).args
ReceiverData Operation, test PlotData
r889
Juan C. Espinoza
Update version, fix kwargs for self operations (methods), Add SendToWeb...
r906 def addOperationKwargs(self, objId, **kwargs):
'''
'''
José Chávez
checking misspelled kwargs in operations/processing units
r929
Juan C. Espinoza
Update version, fix kwargs for self operations (methods), Add SendToWeb...
r906 self.operationKwargs[objId] = kwargs
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 def addOperation(self, opObj, objId):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
George Yong
Multiprocessing for Spectra (all operation) working
r1171 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
posses the id of the operation process (IPC purposes)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
identificador asociado a este objeto.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Input:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 object : objeto de la clase "Operation"
Return:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 objId : identificador del objeto, necesario para comunicar con master(procUnit)
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 self.operations2RunDict[objId] = opObj
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 return objId
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Miguel Valdez
r577 def getOperationObj(self, objId):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Python 2to3, Spectra (all operations) working
r1167 if objId not in list(self.operations2RunDict.keys()):
Miguel Valdez
r577 return None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Miguel Valdez
r577 return self.operations2RunDict[objId]
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 def operation(self, **kwargs):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
atributos del objeto dataOut
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 Input:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 **kwargs : Diccionario de argumentos de la funcion a ejecutar
"""
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def setup(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def run(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def close(self):
#Close every thread, queue or any other object here is it is neccesary.
return
class Operation(object):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
Update - Jan 2018 - MULTIPROCESSING
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
The constructor doe snot receive any argument, neither the baseclass.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
acumulacion dentro de esta clase
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
id = None
__buffer = None
dest = None
isConfig = False
readyFlag = None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def __init__(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 self.buffer = None
self.dest = None
self.isConfig = False
self.readyFlag = False
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 if not hasattr(self, 'name'):
self.name = self.__class__.__name__
def getAllowedArgs(self):
if hasattr(self, '__attrs__'):
return self.__attrs__
else:
return inspect.getargspec(self.run).args
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def setup(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 self.isConfig = True
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def run(self, dataIn, **kwargs):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
atributos del objeto dataIn.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Miguel Valdez
Merge with branch schain_julia_drifts from rev. 803 to 995....
r568 Input:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 dataIn : objeto del tipo JROData
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Return:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Affected:
__buffer : buffer de recepcion de datos.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
if not self.isConfig:
self.setup(**kwargs)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def close(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 pass
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 ######### Decorator #########
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def MPDecorator(BaseClass):
"""
"Multiprocessing class decorator"
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 This function add multiprocessing features to the base class. Also,
it handle the communication beetween processes (readers, procUnits and operations).
Receive the arguments at the moment of instantiation. According to that, discriminates if it
is a procUnit or an operation
"""
class MPClass(BaseClass, Process):
"This is the overwritten class"
operations2RunDict = None
socket_l = None
socket_p = None
socketOP = None
socket_router = None
dictProcs = None
typeProc = None
def __init__(self, *args, **kwargs):
super(MPClass, self).__init__()
Process.__init__(self)
self.operationKwargs = {}
self.args = args
self.operations2RunDict = {}
self.kwargs = kwargs
# The number of arguments (args) determine the type of process
if len(self.args) is 3:
self.typeProc = "ProcUnit"
self.id = args[0] #topico de publicacion
self.inputId = args[1] #topico de subcripcion
self.dictProcs = args[2] #diccionario de procesos globales
else:
self.id = args[0]
self.typeProc = "Operation"
def addOperationKwargs(self, objId, **kwargs):
self.operationKwargs[objId] = kwargs
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def getAllowedArgs(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 if hasattr(self, '__attrs__'):
return self.__attrs__
else:
return inspect.getargspec(self.run).args
def sockListening(self, topic):
"""
This function create a socket to receive objects.
The 'topic' argument is related to the publisher process from which the self process is
listening (data).
In the case were the self process is listening to a Reader (proc Unit),
special conditions are introduced to maximize parallelism.
"""
cont = zmq.Context()
zmq_socket = cont.socket(zmq.SUB)
if not os.path.exists('/tmp/socketTmp'):
os.mkdir('/tmp/socketTmp')
if 'Reader' in self.dictProcs[self.inputId].name:
zmq_socket.connect('ipc:///tmp/socketTmp/b')
else:
zmq_socket.connect('ipc:///tmp/socketTmp/%s' % self.inputId)
#log.error('RECEIVING FROM {} {}'.format(self.inputId, str(topic).encode()))
zmq_socket.setsockopt(zmq.SUBSCRIBE, str(topic).encode()) #yong
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 return zmq_socket
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def listenProc(self, sock):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
This function listen to a ipc addres until a message is recovered. To serialize the
data (object), pickle has been use.
The 'sock' argument is the socket previously connect to an ipc address and with a topic subscription.
"""
a = sock.recv_multipart()
a = pickle.loads(a[1])
return a
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def sockPublishing(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
This function create a socket for publishing purposes.
Depending on the process type from where is created, it binds or connect
to special IPC addresses.
"""
time.sleep(4) #yong
context = zmq.Context()
zmq_socket = context.socket(zmq.PUB)
if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
if 'Reader' in self.dictProcs[self.id].name:
zmq_socket.connect('ipc:///tmp/socketTmp/a')
else:
zmq_socket.bind('ipc:///tmp/socketTmp/%s' % self.id)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 return zmq_socket
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def publishProc(self, sock, data):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
This function publish a python object (data) under a specific topic in a socket (sock).
Usually, the topic is the self id of the process.
"""
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 sock.send_multipart([str(self.id).encode(), pickle.dumps(data)]) #yong
return True
ReceiverData Operation, test PlotData
r889
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def sockOp(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
This function create a socket for communication purposes with operation processes.
"""
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 cont = zmq.Context()
zmq_socket = cont.socket(zmq.DEALER)
if python_version()[0] == '2':
zmq_socket.setsockopt(zmq.IDENTITY, self.id)
if python_version()[0] == '3':
zmq_socket.setsockopt_string(zmq.IDENTITY, self.id)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 return zmq_socket
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def execOp(self, socket, opId, dataObj):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
This function 'execute' an operation main routine by establishing a
connection with it and sending a python object (dataOut).
"""
if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
socket.connect('ipc:///tmp/socketTmp/%s' %opId)
socket.send(pickle.dumps(dataObj)) #yong
argument = socket.recv_multipart()[0]
argument = pickle.loads(argument)
return argument
def sockIO(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
Socket defined for an operation process. It is able to recover the object sent from another process as well as a
identifier of who sent it.
"""
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 cont = zmq.Context()
if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
socket = cont.socket(zmq.ROUTER)
socket.bind('ipc:///tmp/socketTmp/%s' % self.id)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 return socket
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def funIOrec(self, socket):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
Operation method, recover the id of the process who sent a python object.
The 'socket' argument is the socket binded to a specific process ipc.
"""
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 #id_proc = socket.recv()
#dataObj = socket.recv_pyobj()
dataObj = socket.recv_multipart()
dataObj[1] = pickle.loads(dataObj[1])
return dataObj[0], dataObj[1]
def funIOsen(self, socket, data, dest):
"""
Operation method, send a python object to a specific destination.
The 'dest' argument is the id of a proccesinf unit.
"""
socket.send_multipart([dest, pickle.dumps(data)]) #yong
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 return True
José Chávez
checking misspelled kwargs in operations/processing units
r929
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def runReader(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 # time.sleep(3)
while True:
BaseClass.run(self, **self.kwargs)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487
George Yong
Multiprocessing for Spectra (all operation) working
r1171 keyList = list(self.operations2RunDict.keys())
keyList.sort()
for key in keyList:
self.socketOP = self.sockOp()
self.dataOut = self.execOp(self.socketOP, key, self.dataOut)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171
if self.flagNoMoreFiles: #Usar un objeto con flags para saber si termino el proc o hubo un error
self.publishProc(self.socket_p, "Finish")
break
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 if self.dataOut.flagNoData:
continue
George Yong
Multiprocessing for voltage (all operations) working
r1173 #print("Publishing data...")
George Yong
Multiprocessing for Spectra (all operation) working
r1171 self.publishProc(self.socket_p, self.dataOut)
# time.sleep(2)
print("%s done" %BaseClass.__name__)
return 0
def runProc(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 # All the procUnits with kwargs that require a setup initialization must be defined here.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 if self.setupReq:
BaseClass.setup(self, **self.kwargs)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 while True:
self.dataIn = self.listenProc(self.socket_l)
George Yong
Multiprocessing for voltage (all operations) working
r1173 #print("%s received data" %BaseClass.__name__)
George Yong
Multiprocessing for Spectra (all operation) working
r1171
if self.dataIn == "Finish":
break
m_arg = list(self.kwargs.keys())
num_arg = list(range(1,int(BaseClass.run.__code__.co_argcount)))
run_arg = {}
for var in num_arg:
if BaseClass.run.__code__.co_varnames[var] in m_arg:
run_arg[BaseClass.run.__code__.co_varnames[var]] = self.kwargs[BaseClass.run.__code__.co_varnames[var]]
#BaseClass.run(self, **self.kwargs)
BaseClass.run(self, **run_arg)
## Iterar sobre una serie de data que podrias aplicarse
for m_name in BaseClass.METHODS:
met_arg = {}
for arg in m_arg:
if arg in BaseClass.METHODS[m_name]:
for att in BaseClass.METHODS[m_name]:
met_arg[att] = self.kwargs[att]
method = getattr(BaseClass, m_name)
method(self, **met_arg)
break
if self.dataOut.flagNoData:
continue
keyList = list(self.operations2RunDict.keys())
keyList.sort()
for key in keyList:
self.socketOP = self.sockOp()
self.dataOut = self.execOp(self.socketOP, key, self.dataOut)
self.publishProc(self.socket_p, self.dataOut)
print("%s done" %BaseClass.__name__)
return 0
def runOp(self):
while True:
[self.dest ,self.buffer] = self.funIOrec(self.socket_router)
self.buffer = BaseClass.run(self, self.buffer, **self.kwargs)
self.funIOsen(self.socket_router, self.buffer, self.dest)
print("%s done" %BaseClass.__name__)
return 0
def run(self):
if self.typeProc is "ProcUnit":
self.socket_p = self.sockPublishing()
if 'Reader' not in self.dictProcs[self.id].name:
self.socket_l = self.sockListening(self.inputId)
self.runProc()
else:
self.runReader()
elif self.typeProc is "Operation":
self.socket_router = self.sockIO()
self.runOp()
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 else:
raise ValueError("Unknown type")
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 return 0
return MPClass