##// END OF EJS Templates
Writing Unit for Madrigal decorated (just for python 2x)
Writing Unit for Madrigal decorated (just for python 2x)

File last commit:

r1206:59caf7a2130e
r1206:59caf7a2130e
Show More
jroproc_base.py
390 lines | 11.5 KiB | text/x-python | PythonLexer
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 '''
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Updated for multiprocessing
Author : Sergio Cortez
Jan 2018
Abstract:
Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
Based on:
$Author: murco $
$Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 '''
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
José Chávez
checking misspelled kwargs in operations/processing units
r929 import inspect
George Yong
Multiprocessing for Spectra (all operation) working
r1171 import zmq
import time
import pickle
import os
from multiprocessing import Process
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 from zmq.utils.monitor import recv_monitor_message
George Yong
Multiprocessing for Spectra (all operation) working
r1171 from schainpy.utils import log
José Chávez
checking misspelled kwargs in operations/processing units
r929
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487
Miguel Valdez
Merge with branch schain_julia_drifts from rev. 803 to 995....
r568 class ProcessingUnit(object):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Update - Jan 2018 - MULTIPROCESSING
All the "call" methods present in the previous base were removed.
The majority of operations are independant processes, thus
the decorator is in charge of communicate the operation processes
with the proccessing unit via IPC.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 The constructor does not receive any argument. The remaining methods
are related with the operations to execute.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def __init__(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 self.dataIn = None
Miguel Valdez
A new SendToServer Unit has been created to upload files to a remote server....
r573 self.dataOut = None
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 self.isConfig = False
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.operations = []
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 self.plots = []
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
José Chávez
checking misspelled kwargs in operations/processing units
r929 def getAllowedArgs(self):
Juan C. Espinoza
Add __attrs__ attribute to Process clasess to improve CLI finder
r1097 if hasattr(self, '__attrs__'):
return self.__attrs__
else:
return inspect.getargspec(self.run).args
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 def addOperation(self, conf, operation):
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
George Yong
Multiprocessing for Spectra (all operation) working
r1171 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
posses the id of the operation process (IPC purposes)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
identificador asociado a este objeto.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Input:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 object : objeto de la clase "Operation"
Return:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 objId : identificador del objeto, necesario para comunicar con master(procUnit)
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 self.operations.append(
(operation, conf.type, conf.id, conf.getKwargs()))
if 'plot' in self.name.lower():
self.plots.append(operation.CODE)
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Miguel Valdez
r577 def getOperationObj(self, objId):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if objId not in list(self.operations.keys()):
Miguel Valdez
r577 return None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 return self.operations[objId]
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 def operation(self, **kwargs):
"""
Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
atributos del objeto dataOut
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 Input:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 **kwargs : Diccionario de argumentos de la funcion a ejecutar
"""
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def setup(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def run(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def close(self):
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
George Yong
Multiprocessing for Spectra (all operation) working
r1171 return
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
George Yong
Multiprocessing for Spectra (all operation) working
r1171 class Operation(object):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
Update - Jan 2018 - MULTIPROCESSING
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
The constructor doe snot receive any argument, neither the baseclass.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
acumulacion dentro de esta clase
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def __init__(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 self.id = None
George Yong
Multiprocessing for Spectra (all operation) working
r1171 self.isConfig = False
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 if not hasattr(self, 'name'):
self.name = self.__class__.__name__
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def getAllowedArgs(self):
if hasattr(self, '__attrs__'):
return self.__attrs__
else:
return inspect.getargspec(self.run).args
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def setup(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 self.isConfig = True
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def run(self, dataIn, **kwargs):
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
atributos del objeto dataIn.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Miguel Valdez
Merge with branch schain_julia_drifts from rev. 803 to 995....
r568 Input:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 dataIn : objeto del tipo JROData
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Return:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Affected:
__buffer : buffer de recepcion de datos.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
if not self.isConfig:
self.setup(**kwargs)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def close(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 return
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def MPDecorator(BaseClass):
"""
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 Multiprocessing class decorator
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 This function add multiprocessing features to a BaseClass. Also, it handle
the communication beetween processes (readers, procUnits and operations).
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
George Yong
Multiprocessing for Spectra (all operation) working
r1171 class MPClass(BaseClass, Process):
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def __init__(self, *args, **kwargs):
super(MPClass, self).__init__()
Process.__init__(self)
self.operationKwargs = {}
self.args = args
self.kwargs = kwargs
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.sender = None
self.receiver = None
self.name = BaseClass.__name__
George Yong
Fix bug in CrossSpectraPlot
r1201 if 'plot' in self.name.lower() and not self.name.endswith('_'):
self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 self.start_time = time.time()
George Yong
Multiprocessing for Spectra (all operation) working
r1171
if len(self.args) is 3:
self.typeProc = "ProcUnit"
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 self.id = args[0]
self.inputId = args[1]
self.project_id = args[2]
elif len(self.args) is 2:
George Yong
Multiprocessing for Spectra (all operation) working
r1171 self.id = args[0]
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.inputId = args[0]
self.project_id = args[1]
George Yong
Multiprocessing for Spectra (all operation) working
r1171 self.typeProc = "Operation"
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 def subscribe(self):
'''
This function create a socket to receive objects from the
topic `inputId`.
'''
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 c = zmq.Context()
self.receiver = c.socket(zmq.SUB)
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 self.receiver.connect(
'ipc:///tmp/schain/{}_pub'.format(self.project_id))
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
George Yong
Multiprocessing for heispectra (Fits) all working
r1191
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 def listen(self):
'''
This function waits for objects and deserialize using pickle
'''
George Yong
Multiprocessing for heispectra (Fits) all working
r1191
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 data = pickle.loads(self.receiver.recv_multipart()[1])
George Yong
Multiprocessing for heispectra (Fits) all working
r1191
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 return data
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 def set_publisher(self):
'''
This function create a socket for publishing purposes.
'''
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 time.sleep(1)
c = zmq.Context()
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 self.sender = c.socket(zmq.PUB)
self.sender.connect(
'ipc:///tmp/schain/{}_sub'.format(self.project_id))
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 def publish(self, data, id):
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 '''
This function publish an object, to a specific topic.
'''
self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def runReader(self):
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 '''
Run fuction for read units
'''
George Yong
Multiprocessing for Spectra (all operation) working
r1171 while True:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 BaseClass.run(self, **self.kwargs)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 for op, optype, opId, kwargs in self.operations:
Juan C. Espinoza
Fix MPDecorator (check flagNoData for all type of operations)
r1198 if optype == 'self' and not self.dataOut.flagNoData:
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 op(**kwargs)
Juan C. Espinoza
Fix MPDecorator (check flagNoData for all type of operations)
r1198 elif optype == 'other' and not self.dataOut.flagNoData:
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.dataOut = op.run(self.dataOut, **self.kwargs)
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 elif optype == 'external':
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.publish(self.dataOut, opId)
Juan C. Espinoza
Review decorator logic for ending process
r1193 if self.dataOut.flagNoData and not self.dataOut.error:
George Yong
Multiprocessing for Spectra (all operation) working
r1171 continue
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
self.publish(self.dataOut, self.id)
if self.dataOut.error:
Juan C. Espinoza
Review decorator logic for ending process
r1193 log.error(self.dataOut.error, self.name)
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
break
time.sleep(1)
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def runProc(self):
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 '''
Run function for proccessing units
'''
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 while True:
George Yong
Multiprocessing for heispectra (Fits) all working
r1191 self.dataIn = self.listen()
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 if self.dataIn.flagNoData and self.dataIn.error is None:
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 continue
George Yong
Multiprocessing for heispectra (Fits) all working
r1191
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 BaseClass.run(self, **self.kwargs)
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Review decorator logic for ending process
r1193 if self.dataIn.error:
self.dataOut.error = self.dataIn.error
George Yong
Writing Unit for Madrigal decorated (just for python 2x)
r1206 self.dataOut.flagNoData = True
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 for op, optype, opId, kwargs in self.operations:
Juan C. Espinoza
Fix MPDecorator (check flagNoData for all type of operations)
r1198 if optype == 'self' and not self.dataOut.flagNoData:
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 op(**kwargs)
Juan C. Espinoza
Fix MPDecorator (check flagNoData for all type of operations)
r1198 elif optype == 'other' and not self.dataOut.flagNoData:
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.dataOut = op.run(self.dataOut, **kwargs)
George Yong
Writing Unit for Madrigal decorated (just for python 2x)
r1206 elif optype == 'external' and not self.dataOut.flagNoData:
self.publish(self.dataOut, opId)
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Fix MPDecorator (check flagNoData for all type of operations)
r1198 if not self.dataOut.flagNoData or self.dataOut.error:
self.publish(self.dataOut, self.id)
George Yong
Writing Unit for Madrigal decorated (just for python 2x)
r1206 for op, optype, opId, kwargs in self.operations:
if optype == 'self' and self.dataOut.error:
op(**kwargs)
elif optype == 'other' and self.dataOut.error:
self.dataOut = op.run(self.dataOut, **kwargs)
elif optype == 'external' and self.dataOut.error:
self.publish(self.dataOut, opId)
Juan C. Espinoza
Fix MPDecorator (check flagNoData for all type of operations)
r1198
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 if self.dataIn.error:
break
time.sleep(1)
George Yong
Multiprocessing for Spectra (all operation) working
r1171
def runOp(self):
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 '''
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 Run function for external operations (this operations just receive data
ex: plots, writers, publishers)
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 '''
George Yong
Multiprocessing for heispectra (Fits) all working
r1191
George Yong
Multiprocessing for Spectra (all operation) working
r1171 while True:
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 dataOut = self.listen()
George Yong
Multiprocessing for writing Units(Spectral, Voltage and Parameters)
r1179
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 BaseClass.run(self, dataOut, **self.kwargs)
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
if dataOut.error:
break
Juan C. Espinoza
Review decorator logic for ending process
r1193
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 time.sleep(1)
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def run(self):
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if self.typeProc is "ProcUnit":
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if self.inputId is not None:
George Yong
Multiprocessing for heispectra (Fits) all working
r1191
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.subscribe()
George Yong
Multiprocessing for heispectra (Fits) all working
r1191
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.set_publisher()
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if 'Reader' not in BaseClass.__name__:
self.runProc()
George Yong
Multiprocessing for Spectra (all operation) working
r1171 else:
self.runReader()
elif self.typeProc is "Operation":
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.subscribe()
George Yong
Multiprocessing for Spectra (all operation) working
r1171 self.runOp()
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 else:
raise ValueError("Unknown type")
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.close()
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 def event_monitor(self, monitor):
events = {}
for name in dir(zmq):
if name.startswith('EVENT_'):
value = getattr(zmq, name)
events[value] = name
while monitor.poll():
evt = recv_monitor_message(monitor)
if evt['event'] == 32:
self.connections += 1
if evt['event'] == 512:
pass
evt.update({'description': events[evt['event']]})
if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
break
monitor.close()
print('event monitor thread done!')
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 def close(self):
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
BaseClass.close(self)
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if self.sender:
self.sender.close()
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if self.receiver:
self.receiver.close()
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
return MPClass