##// END OF EJS Templates
manual header update
manual header update

File last commit:

r1774:e7bb496918b2
r1788:c7146b87b3fa
Show More
jroproc_base.py
252 lines | 6.9 KiB | text/x-python | PythonLexer
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 '''
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 Base clases to create Processing units and operations, the MPDecorator
must be used in plotting and writing operations to allow to run as an
external process.
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 '''
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
Fix bug in external operations queues
r1379 import os
José Chávez
checking misspelled kwargs in operations/processing units
r929 import inspect
George Yong
Multiprocessing for Spectra (all operation) working
r1171 import zmq
import time
import pickle
Errors handling and gracefully terminate main process
r1241 import traceback
Juan C. Espinoza
Add input queues for processing units and external operations
r1235 from threading import Thread
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 from multiprocessing import Process, Queue
George Yong
Multiprocessing for Spectra (all operation) working
r1171 from schainpy.utils import log
José Chávez
checking misspelled kwargs in operations/processing units
r929
Faraday manual processing improved
r1774 QUEUE_SIZE = int(os.environ.get('QUEUE_MAX_SIZE', '100'))
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487
Miguel Valdez
Merge with branch schain_julia_drifts from rev. 803 to 995....
r568 class ProcessingUnit(object):
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 '''
Base class to create Signal Chain Units
'''
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 proc_type = 'processing'
updated with changes in v3.0-devel
r1737 bypass = False
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def __init__(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 self.dataIn = None
Miguel Valdez
A new SendToServer Unit has been created to upload files to a remote server....
r573 self.dataOut = None
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 self.isConfig = False
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.operations = []
ISR update
r1504 self.name = 'Test'
self.inputs = []
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 def setInput(self, unit):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
ISR update
r1504 attr = 'dataIn'
for i, u in enumerate(unit):
if i==0:
#print(u.dataOut.flagNoData)
#exit(1)
self.dataIn = u.dataOut#.copy()
self.inputs.append('dataIn')
else:
setattr(self, 'dataIn{}'.format(i), u.dataOut)#.copy())
self.inputs.append('dataIn{}'.format(i))
José Chávez
checking misspelled kwargs in operations/processing units
r929 def getAllowedArgs(self):
Juan C. Espinoza
Add __attrs__ attribute to Process clasess to improve CLI finder
r1097 if hasattr(self, '__attrs__'):
return self.__attrs__
else:
return inspect.getargspec(self.run).args
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 def addOperation(self, conf, operation):
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 '''
'''
ISR update
r1504
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 self.operations.append((operation, conf.type, conf.getKwargs()))
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Miguel Valdez
r577 def getOperationObj(self, objId):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 if objId not in list(self.operations.keys()):
Miguel Valdez
r577 return None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 return self.operations[objId]
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 def call(self, **kwargs):
'''
'''
DP+LP Join Spc+Voltage
r1549
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 try:
if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
LP Faraday update
r1542 #if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error and not self.dataIn.runNextUnit:
if self.dataIn.runNextUnit:
#print("SUCCESSSSSSS")
#exit(1)
return not self.dataIn.isReady()
else:
return self.dataIn.isReady()
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 elif self.dataIn is None or not self.dataIn.error:
updated with changes in v3.0-devel
r1737 if 'Reader' in self.name and self.bypass:
print('Skipping...reader')
return self.dataOut.isReady()
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 self.run(**kwargs)
elif self.dataIn.error:
ISR update
r1504 #print("Elif 2")
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 self.dataOut.error = self.dataIn.error
self.dataOut.flagNoData = True
except:
ISR update
r1504 #print("Except")
err = traceback.format_exc()
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 if 'SchainWarning' in err:
log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name)
elif 'SchainError' in err:
log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), self.name)
else:
Juan C. Espinoza
Fix ScpecraWriter and CohInt attribute
r1310 log.error(err, self.name)
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 self.dataOut.error = True
ISR update
r1504 #print("before op")
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 for op, optype, opkwargs in self.operations:
Fix bug in external operations queues
r1379 aux = self.dataOut.copy()
ISR update
r1504 #aux = copy.deepcopy(self.dataOut)
#print("**********************Before",op)
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 if optype == 'other' and not self.dataOut.flagNoData:
ISR update
r1504 #print("**********************Other",op)
#print(self.dataOut.flagNoData)
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 self.dataOut = op.run(self.dataOut, **opkwargs)
elif optype == 'external' and not self.dataOut.flagNoData:
Fix bug in external operations queues
r1379 op.queue.put(aux)
ISR update
r1504 elif optype == 'external' and self.dataOut.error:
Fix bug in external operations queues
r1379 op.queue.put(aux)
ISR update
r1504 #elif optype == 'external' and self.dataOut.isReady():
#op.queue.put(copy.deepcopy(self.dataOut))
#print(not self.dataOut.isReady())
LP Faraday update
r1542
ISR update
r1504 try:
if self.dataOut.runNextUnit:
runNextUnit = self.dataOut.runNextUnit
#print(self.operations)
#print("Tru")
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
ISR update
r1504 else:
runNextUnit = self.dataOut.isReady()
except:
runNextUnit = self.dataOut.isReady()
DP+LP Join Spc+Voltage
r1549 #exit(1)
ISR update
r1504 #if not self.dataOut.isReady():
#return 'Error' if self.dataOut.error else input()
#print("NexT",runNextUnit)
LP Faraday update
r1542 #print("error: ",self.dataOut.error)
ISR update
r1504 return 'Error' if self.dataOut.error else runNextUnit# self.dataOut.isReady()
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def setup(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def run(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def close(self):
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
George Yong
Multiprocessing for Spectra (all operation) working
r1171 return
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
George Yong
Multiprocessing for Spectra (all operation) working
r1171 class Operation(object):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 '''
'''
ISR update
r1504
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 proc_type = 'operation'
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def __init__(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 self.id = None
George Yong
Multiprocessing for Spectra (all operation) working
r1171 self.isConfig = False
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 if not hasattr(self, 'name'):
self.name = self.__class__.__name__
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def getAllowedArgs(self):
if hasattr(self, '__attrs__'):
return self.__attrs__
else:
return inspect.getargspec(self.run).args
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def setup(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 self.isConfig = True
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def run(self, dataIn, **kwargs):
Daniel Valdez
This is the new organization by packages and scripts for Signal Chain, this version contains new features and bugs fixed until August 2014
r487 """
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
atributos del objeto dataIn.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Miguel Valdez
Merge with branch schain_julia_drifts from rev. 803 to 995....
r568 Input:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 dataIn : objeto del tipo JROData
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Return:
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 None
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 Affected:
__buffer : buffer de recepcion de datos.
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
if not self.isConfig:
self.setup(**kwargs)
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 raise NotImplementedError
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def close(self):
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187 return
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
ISR update
r1504
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def MPDecorator(BaseClass):
"""
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 Multiprocessing class decorator
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
ISR update
r1504 This function add multiprocessing features to a BaseClass.
George Yong
Multiprocessing for Spectra (all operation) working
r1171 """
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
George Yong
Multiprocessing for Spectra (all operation) working
r1171 class MPClass(BaseClass, Process):
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177
George Yong
Multiprocessing for Spectra (all operation) working
r1171 def __init__(self, *args, **kwargs):
super(MPClass, self).__init__()
Process.__init__(self)
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287
George Yong
Multiprocessing for Spectra (all operation) working
r1171 self.args = args
self.kwargs = kwargs
Use of delays instead of input queue to keep dataouts and avoid loose of them
r1245 self.t = time.time()
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 self.op_type = 'external'
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.name = BaseClass.__name__
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
r1254 self.__doc__ = BaseClass.__doc__
isr commit
r1377
George Yong
Fix bug in CrossSpectraPlot
r1201 if 'plot' in self.name.lower() and not self.name.endswith('_'):
self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
ISR update
r1504
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 self.start_time = time.time()
Errors handling and gracefully terminate main process
r1241 self.err_queue = args[3]
Fix bug in external operations queues
r1379 self.queue = Queue(maxsize=QUEUE_SIZE)
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 self.myrun = BaseClass.run
George Yong
Multiprocessing for Spectra (all operation) working
r1171
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 def run(self):
ISR update
r1504
George Yong
Multiprocessing for Spectra (all operation) working
r1171 while True:
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 dataOut = self.queue.get()
George Yong
Multiprocessing for writing Units(Spectral, Voltage and Parameters)
r1179
Errors handling and gracefully terminate main process
r1241 if not dataOut.error:
Fix excessive memory RAM consumption
r1268 try:
BaseClass.run(self, dataOut, **self.kwargs)
except:
ISR update
r1504 err = traceback.format_exc()
Juan C. Espinoza
Fix ScpecraWriter and CohInt attribute
r1310 log.error(err, self.name)
George Yong
Multiprocessing for Spectra (all operation) working
r1171 else:
Juan C. Espinoza
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
r1287 break
Juan C. Valdez
Version 2.2.5 Fixed several bugs, add jro colormap for spectra/rti, add ParamWriter, TODO: Fix decimation currently disabled
r860
Juan C. Espinoza
Review MP changes, three types of operations: self, other and external
r1177 self.close()
def close(self):
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
BaseClass.close(self)
ISR update
r1504 log.success('Done...(Time:{:4.2f} secs)'.format(time.time() - self.start_time), self.name)
Juan C. Espinoza
New plotting architecture with buffering/throttle capabilities
r1187
return MPClass