|
|
'''
|
|
|
Updated for multiprocessing
|
|
|
Author : Sergio Cortez
|
|
|
Jan 2018
|
|
|
Abstract:
|
|
|
Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
|
|
|
The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
|
|
|
The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
|
|
|
|
|
|
Based on:
|
|
|
$Author: murco $
|
|
|
$Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
|
|
|
'''
|
|
|
from platform import python_version
|
|
|
import inspect
|
|
|
import zmq
|
|
|
import time
|
|
|
import pickle
|
|
|
import os
|
|
|
from multiprocessing import Process
|
|
|
|
|
|
from schainpy.utils import log
|
|
|
|
|
|
|
|
|
class ProcessingUnit(object):
|
|
|
|
|
|
"""
|
|
|
Update - Jan 2018 - MULTIPROCESSING
|
|
|
All the "call" methods present in the previous base were removed.
|
|
|
The majority of operations are independant processes, thus
|
|
|
the decorator is in charge of communicate the operation processes
|
|
|
with the proccessing unit via IPC.
|
|
|
|
|
|
The constructor does not receive any argument. The remaining methods
|
|
|
are related with the operations to execute.
|
|
|
|
|
|
|
|
|
"""
|
|
|
# objeto de datos de entrada (Voltage, Spectra o Correlation)
|
|
|
dataIn = None
|
|
|
dataInList = []
|
|
|
|
|
|
# objeto de datos de entrada (Voltage, Spectra o Correlation)
|
|
|
|
|
|
id = None
|
|
|
inputId = None
|
|
|
|
|
|
dataOut = None
|
|
|
|
|
|
dictProcs = None
|
|
|
|
|
|
operations2RunDict = None
|
|
|
|
|
|
isConfig = False
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
|
self.dataIn = None
|
|
|
self.dataOut = None
|
|
|
|
|
|
self.isConfig = False
|
|
|
|
|
|
def getAllowedArgs(self):
|
|
|
if hasattr(self, '__attrs__'):
|
|
|
return self.__attrs__
|
|
|
else:
|
|
|
return inspect.getargspec(self.run).args
|
|
|
|
|
|
def addOperationKwargs(self, objId, **kwargs):
|
|
|
'''
|
|
|
'''
|
|
|
|
|
|
self.operationKwargs[objId] = kwargs
|
|
|
|
|
|
def addOperation(self, opObj, objId):
|
|
|
|
|
|
"""
|
|
|
This method is used in the controller, and update the dictionary containing the operations to execute. The dict
|
|
|
posses the id of the operation process (IPC purposes)
|
|
|
|
|
|
Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
|
|
|
identificador asociado a este objeto.
|
|
|
|
|
|
Input:
|
|
|
|
|
|
object : objeto de la clase "Operation"
|
|
|
|
|
|
Return:
|
|
|
|
|
|
objId : identificador del objeto, necesario para comunicar con master(procUnit)
|
|
|
"""
|
|
|
|
|
|
self.operations2RunDict[objId] = opObj
|
|
|
|
|
|
return objId
|
|
|
|
|
|
|
|
|
def getOperationObj(self, objId):
|
|
|
|
|
|
if objId not in list(self.operations2RunDict.keys()):
|
|
|
return None
|
|
|
|
|
|
return self.operations2RunDict[objId]
|
|
|
|
|
|
def operation(self, **kwargs):
|
|
|
|
|
|
"""
|
|
|
Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
|
|
|
atributos del objeto dataOut
|
|
|
|
|
|
Input:
|
|
|
|
|
|
**kwargs : Diccionario de argumentos de la funcion a ejecutar
|
|
|
"""
|
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def setup(self):
|
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def run(self):
|
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def close(self):
|
|
|
#Close every thread, queue or any other object here is it is neccesary.
|
|
|
return
|
|
|
|
|
|
class Operation(object):
|
|
|
|
|
|
"""
|
|
|
Update - Jan 2018 - MULTIPROCESSING
|
|
|
|
|
|
Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
|
|
|
The constructor doe snot receive any argument, neither the baseclass.
|
|
|
|
|
|
|
|
|
Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
|
|
|
y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
|
|
|
acumulacion dentro de esta clase
|
|
|
|
|
|
Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
|
|
|
|
|
|
"""
|
|
|
id = None
|
|
|
__buffer = None
|
|
|
dest = None
|
|
|
isConfig = False
|
|
|
readyFlag = None
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
|
self.buffer = None
|
|
|
self.dest = None
|
|
|
self.isConfig = False
|
|
|
self.readyFlag = False
|
|
|
|
|
|
if not hasattr(self, 'name'):
|
|
|
self.name = self.__class__.__name__
|
|
|
|
|
|
def getAllowedArgs(self):
|
|
|
if hasattr(self, '__attrs__'):
|
|
|
return self.__attrs__
|
|
|
else:
|
|
|
return inspect.getargspec(self.run).args
|
|
|
|
|
|
def setup(self):
|
|
|
|
|
|
self.isConfig = True
|
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
|
def run(self, dataIn, **kwargs):
|
|
|
|
|
|
"""
|
|
|
Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
|
|
|
atributos del objeto dataIn.
|
|
|
|
|
|
Input:
|
|
|
|
|
|
dataIn : objeto del tipo JROData
|
|
|
|
|
|
Return:
|
|
|
|
|
|
None
|
|
|
|
|
|
Affected:
|
|
|
__buffer : buffer de recepcion de datos.
|
|
|
|
|
|
"""
|
|
|
if not self.isConfig:
|
|
|
self.setup(**kwargs)
|
|
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def close(self):
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
######### Decorator #########
|
|
|
|
|
|
|
|
|
def MPDecorator(BaseClass):
|
|
|
|
|
|
"""
|
|
|
"Multiprocessing class decorator"
|
|
|
|
|
|
This function add multiprocessing features to the base class. Also,
|
|
|
it handle the communication beetween processes (readers, procUnits and operations).
|
|
|
Receive the arguments at the moment of instantiation. According to that, discriminates if it
|
|
|
is a procUnit or an operation
|
|
|
"""
|
|
|
|
|
|
class MPClass(BaseClass, Process):
|
|
|
|
|
|
"This is the overwritten class"
|
|
|
operations2RunDict = None
|
|
|
socket_l = None
|
|
|
socket_p = None
|
|
|
socketOP = None
|
|
|
socket_router = None
|
|
|
dictProcs = None
|
|
|
typeProc = None
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
super(MPClass, self).__init__()
|
|
|
Process.__init__(self)
|
|
|
|
|
|
|
|
|
self.operationKwargs = {}
|
|
|
self.args = args
|
|
|
|
|
|
|
|
|
self.operations2RunDict = {}
|
|
|
self.kwargs = kwargs
|
|
|
|
|
|
# The number of arguments (args) determine the type of process
|
|
|
|
|
|
if len(self.args) is 3:
|
|
|
self.typeProc = "ProcUnit"
|
|
|
self.id = args[0] #topico de publicacion
|
|
|
self.inputId = args[1] #topico de subcripcion
|
|
|
self.dictProcs = args[2] #diccionario de procesos globales
|
|
|
else:
|
|
|
self.id = args[0]
|
|
|
self.typeProc = "Operation"
|
|
|
|
|
|
def addOperationKwargs(self, objId, **kwargs):
|
|
|
|
|
|
self.operationKwargs[objId] = kwargs
|
|
|
|
|
|
def getAllowedArgs(self):
|
|
|
|
|
|
if hasattr(self, '__attrs__'):
|
|
|
return self.__attrs__
|
|
|
else:
|
|
|
return inspect.getargspec(self.run).args
|
|
|
|
|
|
|
|
|
def sockListening(self, topic):
|
|
|
|
|
|
"""
|
|
|
This function create a socket to receive objects.
|
|
|
The 'topic' argument is related to the publisher process from which the self process is
|
|
|
listening (data).
|
|
|
In the case were the self process is listening to a Reader (proc Unit),
|
|
|
special conditions are introduced to maximize parallelism.
|
|
|
"""
|
|
|
|
|
|
cont = zmq.Context()
|
|
|
zmq_socket = cont.socket(zmq.SUB)
|
|
|
if not os.path.exists('/tmp/socketTmp'):
|
|
|
os.mkdir('/tmp/socketTmp')
|
|
|
|
|
|
if 'Reader' in self.dictProcs[self.inputId].name:
|
|
|
zmq_socket.connect('ipc:///tmp/socketTmp/b')
|
|
|
|
|
|
else:
|
|
|
zmq_socket.connect('ipc:///tmp/socketTmp/%s' % self.inputId)
|
|
|
|
|
|
#log.error('RECEIVING FROM {} {}'.format(self.inputId, str(topic).encode()))
|
|
|
zmq_socket.setsockopt(zmq.SUBSCRIBE, str(topic).encode()) #yong
|
|
|
|
|
|
return zmq_socket
|
|
|
|
|
|
|
|
|
def listenProc(self, sock):
|
|
|
|
|
|
"""
|
|
|
This function listen to a ipc addres until a message is recovered. To serialize the
|
|
|
data (object), pickle has been use.
|
|
|
The 'sock' argument is the socket previously connect to an ipc address and with a topic subscription.
|
|
|
"""
|
|
|
|
|
|
a = sock.recv_multipart()
|
|
|
a = pickle.loads(a[1])
|
|
|
return a
|
|
|
|
|
|
def sockPublishing(self):
|
|
|
|
|
|
"""
|
|
|
This function create a socket for publishing purposes.
|
|
|
Depending on the process type from where is created, it binds or connect
|
|
|
to special IPC addresses.
|
|
|
"""
|
|
|
time.sleep(4) #yong
|
|
|
context = zmq.Context()
|
|
|
zmq_socket = context.socket(zmq.PUB)
|
|
|
if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
|
|
|
if 'Reader' in self.dictProcs[self.id].name:
|
|
|
zmq_socket.connect('ipc:///tmp/socketTmp/a')
|
|
|
else:
|
|
|
zmq_socket.bind('ipc:///tmp/socketTmp/%s' % self.id)
|
|
|
|
|
|
return zmq_socket
|
|
|
|
|
|
def publishProc(self, sock, data):
|
|
|
|
|
|
"""
|
|
|
This function publish a python object (data) under a specific topic in a socket (sock).
|
|
|
Usually, the topic is the self id of the process.
|
|
|
"""
|
|
|
|
|
|
sock.send_multipart([str(self.id).encode(), pickle.dumps(data)]) #yong
|
|
|
|
|
|
return True
|
|
|
|
|
|
def sockOp(self):
|
|
|
|
|
|
"""
|
|
|
This function create a socket for communication purposes with operation processes.
|
|
|
"""
|
|
|
|
|
|
cont = zmq.Context()
|
|
|
zmq_socket = cont.socket(zmq.DEALER)
|
|
|
|
|
|
if python_version()[0] == '2':
|
|
|
zmq_socket.setsockopt(zmq.IDENTITY, self.id)
|
|
|
if python_version()[0] == '3':
|
|
|
zmq_socket.setsockopt_string(zmq.IDENTITY, self.id)
|
|
|
|
|
|
|
|
|
return zmq_socket
|
|
|
|
|
|
|
|
|
def execOp(self, socket, opId, dataObj):
|
|
|
|
|
|
"""
|
|
|
This function 'execute' an operation main routine by establishing a
|
|
|
connection with it and sending a python object (dataOut).
|
|
|
"""
|
|
|
if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
|
|
|
socket.connect('ipc:///tmp/socketTmp/%s' %opId)
|
|
|
|
|
|
|
|
|
socket.send(pickle.dumps(dataObj)) #yong
|
|
|
|
|
|
argument = socket.recv_multipart()[0]
|
|
|
|
|
|
argument = pickle.loads(argument)
|
|
|
|
|
|
return argument
|
|
|
|
|
|
def sockIO(self):
|
|
|
|
|
|
"""
|
|
|
Socket defined for an operation process. It is able to recover the object sent from another process as well as a
|
|
|
identifier of who sent it.
|
|
|
"""
|
|
|
|
|
|
cont = zmq.Context()
|
|
|
if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
|
|
|
socket = cont.socket(zmq.ROUTER)
|
|
|
socket.bind('ipc:///tmp/socketTmp/%s' % self.id)
|
|
|
|
|
|
return socket
|
|
|
|
|
|
def funIOrec(self, socket):
|
|
|
|
|
|
"""
|
|
|
Operation method, recover the id of the process who sent a python object.
|
|
|
The 'socket' argument is the socket binded to a specific process ipc.
|
|
|
"""
|
|
|
|
|
|
#id_proc = socket.recv()
|
|
|
|
|
|
#dataObj = socket.recv_pyobj()
|
|
|
|
|
|
dataObj = socket.recv_multipart()
|
|
|
|
|
|
dataObj[1] = pickle.loads(dataObj[1])
|
|
|
return dataObj[0], dataObj[1]
|
|
|
|
|
|
def funIOsen(self, socket, data, dest):
|
|
|
|
|
|
"""
|
|
|
Operation method, send a python object to a specific destination.
|
|
|
The 'dest' argument is the id of a proccesinf unit.
|
|
|
"""
|
|
|
|
|
|
socket.send_multipart([dest, pickle.dumps(data)]) #yong
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
def runReader(self):
|
|
|
|
|
|
# time.sleep(3)
|
|
|
while True:
|
|
|
|
|
|
BaseClass.run(self, **self.kwargs)
|
|
|
|
|
|
|
|
|
keyList = list(self.operations2RunDict.keys())
|
|
|
keyList.sort()
|
|
|
|
|
|
for key in keyList:
|
|
|
self.socketOP = self.sockOp()
|
|
|
self.dataOut = self.execOp(self.socketOP, key, self.dataOut)
|
|
|
|
|
|
|
|
|
if self.flagNoMoreFiles: #Usar un objeto con flags para saber si termino el proc o hubo un error
|
|
|
self.publishProc(self.socket_p, "Finish")
|
|
|
break
|
|
|
|
|
|
if self.dataOut.flagNoData:
|
|
|
continue
|
|
|
|
|
|
#print("Publishing data...")
|
|
|
self.publishProc(self.socket_p, self.dataOut)
|
|
|
# time.sleep(2)
|
|
|
|
|
|
|
|
|
print("%s done" %BaseClass.__name__)
|
|
|
return 0
|
|
|
|
|
|
def runProc(self):
|
|
|
|
|
|
# All the procUnits with kwargs that require a setup initialization must be defined here.
|
|
|
|
|
|
if self.setupReq:
|
|
|
BaseClass.setup(self, **self.kwargs)
|
|
|
|
|
|
while True:
|
|
|
self.dataIn = self.listenProc(self.socket_l)
|
|
|
#print("%s received data" %BaseClass.__name__)
|
|
|
|
|
|
if self.dataIn == "Finish":
|
|
|
break
|
|
|
|
|
|
m_arg = list(self.kwargs.keys())
|
|
|
num_arg = list(range(1,int(BaseClass.run.__code__.co_argcount)))
|
|
|
|
|
|
run_arg = {}
|
|
|
|
|
|
for var in num_arg:
|
|
|
if BaseClass.run.__code__.co_varnames[var] in m_arg:
|
|
|
run_arg[BaseClass.run.__code__.co_varnames[var]] = self.kwargs[BaseClass.run.__code__.co_varnames[var]]
|
|
|
|
|
|
#BaseClass.run(self, **self.kwargs)
|
|
|
BaseClass.run(self, **run_arg)
|
|
|
|
|
|
## Iterar sobre una serie de data que podrias aplicarse
|
|
|
|
|
|
for m_name in BaseClass.METHODS:
|
|
|
|
|
|
met_arg = {}
|
|
|
|
|
|
for arg in m_arg:
|
|
|
if arg in BaseClass.METHODS[m_name]:
|
|
|
for att in BaseClass.METHODS[m_name]:
|
|
|
met_arg[att] = self.kwargs[att]
|
|
|
|
|
|
method = getattr(BaseClass, m_name)
|
|
|
method(self, **met_arg)
|
|
|
break
|
|
|
|
|
|
if self.dataOut.flagNoData:
|
|
|
continue
|
|
|
|
|
|
keyList = list(self.operations2RunDict.keys())
|
|
|
keyList.sort()
|
|
|
|
|
|
for key in keyList:
|
|
|
|
|
|
self.socketOP = self.sockOp()
|
|
|
self.dataOut = self.execOp(self.socketOP, key, self.dataOut)
|
|
|
|
|
|
|
|
|
self.publishProc(self.socket_p, self.dataOut)
|
|
|
|
|
|
|
|
|
print("%s done" %BaseClass.__name__)
|
|
|
|
|
|
return 0
|
|
|
|
|
|
def runOp(self):
|
|
|
|
|
|
while True:
|
|
|
|
|
|
[self.dest ,self.buffer] = self.funIOrec(self.socket_router)
|
|
|
|
|
|
self.buffer = BaseClass.run(self, self.buffer, **self.kwargs)
|
|
|
|
|
|
self.funIOsen(self.socket_router, self.buffer, self.dest)
|
|
|
|
|
|
print("%s done" %BaseClass.__name__)
|
|
|
return 0
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
|
if self.typeProc is "ProcUnit":
|
|
|
|
|
|
self.socket_p = self.sockPublishing()
|
|
|
|
|
|
if 'Reader' not in self.dictProcs[self.id].name:
|
|
|
self.socket_l = self.sockListening(self.inputId)
|
|
|
self.runProc()
|
|
|
|
|
|
else:
|
|
|
|
|
|
self.runReader()
|
|
|
|
|
|
elif self.typeProc is "Operation":
|
|
|
|
|
|
self.socket_router = self.sockIO()
|
|
|
|
|
|
self.runOp()
|
|
|
|
|
|
else:
|
|
|
raise ValueError("Unknown type")
|
|
|
|
|
|
return 0
|
|
|
|
|
|
return MPClass
|