##// END OF EJS Templates
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()

File last commit:

r1279:c53fe2a4a291
r1279:c53fe2a4a291
Show More
jroproc_base.py
426 lines | 12.3 KiB | text/x-python | PythonLexer
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 '''
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Updated for multiprocessing
Author : Sergio Cortez
Jan 2018
Abstract:
Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Based on:
$Author: murco $
$Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 '''
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
Juan C. Espinoza
Allow to send data to new realtime app
r1212 import os
Block publishing when input queues are full to avoid data loss
r1256 import sys
José Chávez
checking misspelled kwargs in operations/processing units
r929 import inspect
George Yong
Multiprocessing for Spectra (all operation) working
r1171 import zmq
import time
import pickle
Errors handling and gracefully terminate main process
r1241 import traceback
Fix python 2.7 compatibility
r1252 try:
from queue import Queue
except:
from Queue import Queue
Juan C. Espinoza
Add input queues for processing units and external operations
r1235 from threading import Thread
George Yong
Multiprocessing for Spectra (all operation) working
r1171 from multiprocessing import Process
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
George Yong
Multiprocessing for Spectra (all operation) working
r1171 from schainpy.utils import log
José Chávez
checking misspelled kwargs in operations/processing units
r929
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487
Miguel Valdez
Merge with branch schain_julia_drifts from rev. 803 to 995....
r568 class ProcessingUnit(object):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Update - Jan 2018 - MULTIPROCESSING
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 All the "call" methods present in the previous base were removed.
George Yong
Multiprocessing for Spectra (all operation) working
r1171 The majority of operations are independant processes, thus
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 the decorator is in charge of communicate the operation processes
George Yong
Multiprocessing for Spectra (all operation) working
r1171 with the proccessing unit via IPC.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 The constructor does not receive any argument. The remaining methods
are related with the operations to execute.
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 proc_type = 'processing'
__attrs__ = []
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def __init__(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 self.dataIn = None
Miguel Valdez
A new SendToServer Unit has been created to upload files to a remote server....
r573 self.dataOut = None
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 self.isConfig = False
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.operations = []
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 self.plots = []
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
José Chávez
checking misspelled kwargs in operations/processing units
r929 def getAllowedArgs(self):
Juan C. Espinoza
Add __attrs__ attribute to Process clasess to improve CLI finder
r1097 if hasattr(self, '__attrs__'):
return self.__attrs__
else:
return inspect.getargspec(self.run).args
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 def addOperation(self, conf, operation):
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
George Yong
Multiprocessing for Spectra (all operation) working
r1171 posses the id of the operation process (IPC purposes)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
identificador asociado a este objeto.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Input:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 object : objeto de la clase "Operation"
Return:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 objId : identificador del objeto, necesario para comunicar con master(procUnit)
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 self.operations.append(
(operation, conf.type, conf.id, conf.getKwargs()))
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 if 'plot' in self.name.lower():
self.plots.append(operation.CODE)
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Miguel Valdez
r577 def getOperationObj(self, objId):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if objId not in list(self.operations.keys()):
Miguel Valdez
r577 return None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 return self.operations[objId]
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 def operation(self, **kwargs):
"""
Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
atributos del objeto dataOut
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 Input:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 **kwargs : Diccionario de argumentos de la funcion a ejecutar
"""
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def setup(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def run(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def close(self):
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
George Yong
Multiprocessing for Spectra (all operation) working
r1171 return
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
George Yong
Multiprocessing for Spectra (all operation) working
r1171 class Operation(object):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
Update - Jan 2018 - MULTIPROCESSING
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
The constructor doe snot receive any argument, neither the baseclass.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
acumulacion dentro de esta clase
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 proc_type = 'operation'
__attrs__ = []
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def __init__(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 self.id = None
George Yong
Multiprocessing for Spectra (all operation) working
r1171 self.isConfig = False
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 if not hasattr(self, 'name'):
self.name = self.__class__.__name__
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def getAllowedArgs(self):
if hasattr(self, '__attrs__'):
return self.__attrs__
else:
return inspect.getargspec(self.run).args
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def setup(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 self.isConfig = True
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def run(self, dataIn, **kwargs):
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
atributos del objeto dataIn.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Miguel Valdez
Merge with branch schain_julia_drifts from rev. 803 to 995....
r568 Input:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 dataIn : objeto del tipo JROData
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Return:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Affected:
__buffer : buffer de recepcion de datos.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
if not self.isConfig:
self.setup(**kwargs)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def close(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 return
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Add input queue again :(
r1250 class InputQueue(Thread):
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
Block publishing when input queues are full to avoid data loss
r1256 '''
Class to hold input data for Proccessing Units and external Operations,
'''
def __init__(self, project_id, inputId, lock=None):
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254
Block publishing when input queues are full to avoid data loss
r1256 Thread.__init__(self)
self.queue = Queue()
self.project_id = project_id
self.inputId = inputId
self.lock = lock
Fix excessive memory RAM consumption
r1268 self.islocked = False
Block publishing when input queues are full to avoid data loss
r1256 self.size = 0
def run(self):
c = zmq.Context()
self.receiver = c.socket(zmq.SUB)
self.receiver.connect(
'ipc:///tmp/schain/{}_pub'.format(self.project_id))
self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
while True:
obj = self.receiver.recv_multipart()[1]
self.size += sys.getsizeof(obj)
self.queue.put(obj)
Fix excessive memory RAM consumption
r1268 def get(self):
if not self.islocked and self.size/1000000 > 512:
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 self.lock.n.value += 1
Fix excessive memory RAM consumption
r1268 self.islocked = True
Block publishing when input queues are full to avoid data loss
r1256 self.lock.clear()
Fix excessive memory RAM consumption
r1268 elif self.islocked and self.size/1000000 <= 512:
self.islocked = False
self.lock.n.value -= 1
if self.lock.n.value == 0:
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 self.lock.set()
Block publishing when input queues are full to avoid data loss
r1256 obj = self.queue.get()
Fix excessive memory RAM consumption
r1268 self.size -= sys.getsizeof(obj)
Block publishing when input queues are full to avoid data loss
r1256 return pickle.loads(obj)
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def MPDecorator(BaseClass):
"""
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 Multiprocessing class decorator
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 This function add multiprocessing features to a BaseClass. Also, it handle
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 the communication beetween processes (readers, procUnits and operations).
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
George Yong
Multiprocessing for Spectra (all operation) working
r1171 class MPClass(BaseClass, Process):
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def __init__(self, *args, **kwargs):
super(MPClass, self).__init__()
Process.__init__(self)
self.operationKwargs = {}
self.args = args
self.kwargs = kwargs
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.sender = None
self.receiver = None
Juan C. Espinoza
Add input queues for processing units and external operations
r1235 self.i = 0
Use of delays instead of input queue to keep dataouts and avoid loose of them
r1245 self.t = time.time()
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.name = BaseClass.__name__
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.__doc__ = BaseClass.__doc__
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
George Yong
Fix bug in CrossSpectraPlot
r1201 if 'plot' in self.name.lower() and not self.name.endswith('_'):
self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
self.start_time = time.time()
Errors handling and gracefully terminate main process
r1241 self.id = args[0]
self.inputId = args[1]
self.project_id = args[2]
self.err_queue = args[3]
Block publishing when input queues are full to avoid data loss
r1256 self.lock = args[4]
self.typeProc = args[5]
Errors handling and gracefully terminate main process
r1241 self.err_queue.put('#_start_#')
Fix excessive memory RAM consumption
r1268 if self.inputId is not None:
self.queue = InputQueue(self.project_id, self.inputId, self.lock)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 def subscribe(self):
'''
Use of delays instead of input queue to keep dataouts and avoid loose of them
r1245 Start the zmq socket receiver and subcribe to input ID.
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 '''
Add input queue again :(
r1250
self.queue.start()
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 def listen(self):
'''
Juan C. Espinoza
Add input queues for processing units and external operations
r1235 This function waits for objects
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 '''
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
return self.queue.get()
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 def set_publisher(self):
'''
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 This function create a zmq socket for publishing objects.
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 '''
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Use of delays instead of input queue to keep dataouts and avoid loose of them
r1245 time.sleep(0.5)
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 c = zmq.Context()
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 self.sender = c.socket(zmq.PUB)
self.sender.connect(
'ipc:///tmp/schain/{}_sub'.format(self.project_id))
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 def publish(self, data, id):
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 '''
Use of delays instead of input queue to keep dataouts and avoid loose of them
r1245 This function publish an object, to an specific topic.
Block publishing when input queues are full to avoid data loss
r1256 It blocks publishing when receiver queue is full to avoid data loss
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 '''
Alexander Valdez
Se añade el metodo fix_publish y se editan los metodos listen y publish, en el ultimo se colocaun pequeño retardo aplicado exclusivamente durante la comunacion desde la unidad de lectura, las demas operaciones se realizan igual.
r1220 if self.inputId is None:
Block publishing when input queues are full to avoid data loss
r1256 self.lock.wait()
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def runReader(self):
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 '''
Run fuction for read units
'''
George Yong
Multiprocessing for Spectra (all operation) working
r1171 while True:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Errors handling and gracefully terminate main process
r1241 try:
BaseClass.run(self, **self.kwargs)
except:
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 err = traceback.format_exc()
Errors handling and gracefully terminate main process
r1241 if 'No more files' in err:
log.warning('No more files to read', self.name)
else:
self.err_queue.put('{}|{}'.format(self.name, err))
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 self.dataOut.error = True
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 for op, optype, opId, kwargs in self.operations:
Juan C. Espinoza
Fix MPDecorator (check flagNoData for all type of operations)
r1198 if optype == 'self' and not self.dataOut.flagNoData:
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 op(**kwargs)
Juan C. Espinoza
Fix MPDecorator (check flagNoData for all type of operations)
r1198 elif optype == 'other' and not self.dataOut.flagNoData:
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.dataOut = op.run(self.dataOut, **self.kwargs)
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 elif optype == 'external':
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.publish(self.dataOut, opId)
Juan C. Espinoza
Review decorator logic for ending process
r1193 if self.dataOut.flagNoData and not self.dataOut.error:
George Yong
Multiprocessing for Spectra (all operation) working
r1171 continue
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
self.publish(self.dataOut, self.id)
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 if self.dataOut.error:
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 break
Errors handling and gracefully terminate main process
r1241 time.sleep(0.5)
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def runProc(self):
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 '''
Run function for proccessing units
'''
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 while True:
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 self.dataIn = self.listen()
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 if self.dataIn.flagNoData and self.dataIn.error is None:
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 continue
Errors handling and gracefully terminate main process
r1241 elif not self.dataIn.error:
try:
BaseClass.run(self, **self.kwargs)
except:
self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
self.dataOut.error = True
elif self.dataIn.error:
Juan C. Espinoza
Review decorator logic for ending process
r1193 self.dataOut.error = self.dataIn.error
George Yong
Writing Unit for Madrigal decorated (just for python 2x)
r1206 self.dataOut.flagNoData = True
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 for op, optype, opId, kwargs in self.operations:
Juan C. Espinoza
Fix MPDecorator (check flagNoData for all type of operations)
r1198 if optype == 'self' and not self.dataOut.flagNoData:
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 op(**kwargs)
Juan C. Espinoza
Fix MPDecorator (check flagNoData for all type of operations)
r1198 elif optype == 'other' and not self.dataOut.flagNoData:
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.dataOut = op.run(self.dataOut, **kwargs)
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 elif optype == 'external' and not self.dataOut.flagNoData:
George Yong
Writing Unit for Madrigal decorated (just for python 2x)
r1206 self.publish(self.dataOut, opId)
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
Errors handling and gracefully terminate main process
r1241 self.publish(self.dataOut, self.id)
for op, optype, opId, kwargs in self.operations:
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 if optype == 'external' and self.dataOut.error:
Errors handling and gracefully terminate main process
r1241 self.publish(self.dataOut, opId)
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
Errors handling and gracefully terminate main process
r1241 if self.dataOut.error:
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 break
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
Errors handling and gracefully terminate main process
r1241 time.sleep(0.5)
George Yong
Multiprocessing for Spectra (all operation) working
r1171
def runOp(self):
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 '''
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 Run function for external operations (this operations just receive data
ex: plots, writers, publishers)
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 '''
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
George Yong
Multiprocessing for Spectra (all operation) working
r1171 while True:
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 dataOut = self.listen()
George Yong
Multiprocessing for writing Units(Spectral, Voltage and Parameters)
r1179
Errors handling and gracefully terminate main process
r1241 if not dataOut.error:
Fix excessive memory RAM consumption
r1268 try:
BaseClass.run(self, dataOut, **self.kwargs)
except:
self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
dataOut.error = True
Errors handling and gracefully terminate main process
r1241 else:
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279 break
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def run(self):
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if self.typeProc is "ProcUnit":
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if self.inputId is not None:
self.subscribe()
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
r1279
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.set_publisher()
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if 'Reader' not in BaseClass.__name__:
self.runProc()
George Yong
Multiprocessing for Spectra (all operation) working
r1171 else:
self.runReader()
elif self.typeProc is "Operation":
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.subscribe()
George Yong
Multiprocessing for Spectra (all operation) working
r1171 self.runOp()
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 else:
raise ValueError("Unknown type")
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.close()
def close(self):
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
BaseClass.close(self)
Errors handling and gracefully terminate main process
r1241 self.err_queue.put('#_end_#')
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if self.sender:
self.sender.close()
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if self.receiver:
self.receiver.close()
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
return MPClass