jroplotter.py
239 lines
| 5.7 KiB
| text/x-python
|
PythonLexer
|
r691 | ''' | |
Created on Jul 9, 2014 | |||
@author: roj-idl71 | |||
''' | |||
|
r732 | import os, sys | |
|
r691 | import datetime | |
import numpy | |||
|
r732 | import traceback | |
|
r691 | ||
|
r698 | from time import sleep | |
|
r707 | from threading import Lock | |
# from threading import Thread | |||
|
r691 | ||
|
r732 | import schainpy | |
import schainpy.admin | |||
|
r698 | from schainpy.model.proc.jroproc_base import Operation | |
from schainpy.model.serializer.data import obj2Dict, dict2Obj | |||
|
r1167 | from .jroplot_correlation import * | |
from .jroplot_heispectra import * | |||
from .jroplot_parameters import * | |||
from .jroplot_spectra import * | |||
from .jroplot_voltage import * | |||
|
r701 | ||
|
r698 | ||
class Plotter(Operation): | |||
|
r897 | ||
|
r691 | isConfig = None | |
name = None | |||
|
r698 | __queue = None | |
|
r897 | ||
|
r927 | def __init__(self, plotter_name, plotter_queue=None, **kwargs): | |
Operation.__init__(self, **kwargs) | |||
|
r897 | ||
|
r691 | self.isConfig = False | |
self.name = plotter_name | |||
|
r698 | self.__queue = plotter_queue | |
|
r897 | ||
|
r691 | def getSubplots(self): | |
|
r897 | ||
|
r691 | nrow = self.nplots | |
ncol = 1 | |||
return nrow, ncol | |||
|
r897 | ||
|
r691 | def setup(self, **kwargs): | |
|
r897 | ||
|
r1167 | print("Initializing ...") | |
|
r897 | ||
|
r698 | def run(self, dataOut, id=None, **kwargs): | |
|
r897 | ||
|
r691 | """ | |
|
r897 | ||
|
r691 | Input: | |
dataOut : | |||
id : | |||
""" | |||
|
r897 | ||
|
r698 | packDict = {} | |
|
r897 | ||
|
r698 | packDict['id'] = id | |
packDict['name'] = self.name | |||
packDict['kwargs'] = kwargs | |||
|
r897 | ||
|
r824 | # packDict['data'] = obj2Dict(dataOut) | |
packDict['data'] = dataOut | |||
|
r897 | ||
|
r698 | self.__queue.put(packDict) | |
|
r707 | # class PlotManager(Thread): | |
class PlotManager(): | |||
|
r897 | ||
|
r732 | __err = False | |
|
r698 | __stop = False | |
|
r732 | __realtime = False | |
|
r897 | ||
|
r716 | controllerThreadObj = None | |
|
r897 | ||
|
r732 | plotterList = ['Scope', | |
'SpectraPlot', 'RTIPlot', | |||
|
r775 | 'SpectraCutPlot', | |
|
r732 | 'CrossSpectraPlot', 'CoherenceMap', | |
'PowerProfilePlot', 'Noise', 'BeaconPhase', | |||
'CorrelationPlot', | |||
|
r927 | 'SpectraHeisScope', 'RTIfromSpectraHeis'] | |
|
r897 | ||
|
r698 | def __init__(self, plotter_queue): | |
|
r897 | ||
|
r707 | # Thread.__init__(self) | |
# self.setDaemon(True) | |||
|
r897 | ||
|
r698 | self.__queue = plotter_queue | |
|
r707 | self.__lock = Lock() | |
|
r897 | ||
|
r698 | self.plotInstanceDict = {} | |
|
r897 | ||
|
r732 | self.__err = False | |
|
r698 | self.__stop = False | |
|
r732 | self.__realtime = False | |
|
r897 | ||
|
r732 | def __handleError(self, name="", send_email=False): | |
|
r897 | ||
|
r732 | err = traceback.format_exception(sys.exc_info()[0], | |
sys.exc_info()[1], | |||
sys.exc_info()[2]) | |||
|
r897 | ||
|
r1167 | print("***** Error occurred in PlotManager *****") | |
print("***** [%s]: %s" %(name, err[-1])) | |||
|
r732 | ||
message = "\nError ocurred in %s:\n" %name | |||
message += "".join(err) | |||
|
r897 | ||
|
r732 | sys.stderr.write(message) | |
|
r897 | ||
|
r732 | if not send_email: | |
return | |||
|
r897 | ||
|
r732 | import socket | |
|
r897 | ||
|
r732 | subject = "SChain v%s: Error running %s\n" %(schainpy.__version__, name) | |
|
r897 | ||
|
r732 | subtitle = "%s:\n" %(name) | |
subtitle += "Hostname: %s\n" %socket.gethostbyname(socket.gethostname()) | |||
subtitle += "Working directory: %s\n" %os.path.abspath("./") | |||
|
r1013 | # subtitle += "Configuration file: %s\n" %self.filename | |
|
r732 | subtitle += "Time: %s\n" %str(datetime.datetime.now()) | |
|
r897 | ||
|
r732 | adminObj = schainpy.admin.SchainNotify() | |
adminObj.sendAlert(message=message, | |||
subject=subject, | |||
subtitle=subtitle) | |||
|
r897 | ||
|
r698 | def run(self): | |
|
r897 | ||
|
r707 | if self.__queue.empty(): | |
return | |||
|
r897 | ||
|
r732 | if self.__err: | |
serial_data = self.__queue.get() | |||
self.__queue.task_done() | |||
return | |||
|
r897 | ||
|
r707 | self.__lock.acquire() | |
|
r897 | ||
|
r707 | # if self.__queue.full(): | |
# for i in range(int(self.__queue.qsize()/2)): | |||
# serial_data = self.__queue.get() | |||
# self.__queue.task_done() | |||
|
r897 | ||
|
r707 | n = int(self.__queue.qsize()/3 + 1) | |
|
r897 | ||
|
r707 | for i in range(n): | |
|
r897 | ||
|
r716 | if self.__queue.empty(): | |
break | |||
|
r897 | ||
|
r707 | serial_data = self.__queue.get() | |
self.__queue.task_done() | |||
|
r897 | ||
|
r698 | plot_id = serial_data['id'] | |
plot_name = serial_data['name'] | |||
kwargs = serial_data['kwargs'] | |||
|
r824 | # dataDict = serial_data['data'] | |
|
r897 | # | |
|
r824 | # dataPlot = dict2Obj(dataDict) | |
|
r897 | ||
|
r824 | dataPlot = serial_data['data'] | |
|
r897 | ||
|
r1167 | if plot_id not in list(self.plotInstanceDict.keys()): | |
|
r698 | className = eval(plot_name) | |
|
r927 | self.plotInstanceDict[plot_id] = className(**kwargs) | |
|
r897 | ||
|
r698 | plotter = self.plotInstanceDict[plot_id] | |
|
r732 | try: | |
plotter.run(dataPlot, plot_id, **kwargs) | |||
except: | |||
self.__err = True | |||
self.__handleError(plot_name, send_email=True) | |||
break | |||
|
r897 | ||
|
r707 | self.__lock.release() | |
|
r897 | ||
|
r710 | def isEmpty(self): | |
|
r897 | ||
|
r710 | return self.__queue.empty() | |
|
r897 | ||
|
r698 | def stop(self): | |
|
r897 | ||
|
r707 | self.__lock.acquire() | |
|
r897 | ||
|
r707 | self.__stop = True | |
|
r897 | ||
|
r707 | self.__lock.release() | |
|
r897 | ||
|
r707 | def close(self): | |
|
r897 | ||
|
r707 | self.__lock.acquire() | |
|
r897 | ||
|
r1167 | for plot_id in list(self.plotInstanceDict.keys()): | |
|
r707 | plotter = self.plotInstanceDict[plot_id] | |
plotter.close() | |||
|
r897 | ||
|
r707 | self.__lock.release() | |
|
r897 | ||
|
r716 | def setController(self, controllerThreadObj): | |
|
r897 | ||
|
r716 | self.controllerThreadObj = controllerThreadObj | |
|
r897 | ||
|
r716 | def start(self): | |
|
r897 | ||
|
r716 | if not self.controllerThreadObj.isRunning(): | |
|
r1167 | raise RuntimeError("controllerThreadObj has not been initialized. Use controllerThreadObj.start() before call this method") | |
|
r897 | ||
|
r716 | self.join() | |
|
r897 | ||
|
r716 | def join(self): | |
|
r897 | ||
|
r716 | #Execute plotter while controller is running | |
while self.controllerThreadObj.isRunning(): | |||
self.run() | |||
|
r897 | ||
|
r716 | self.controllerThreadObj.stop() | |
|
r897 | ||
|
r716 | #Wait until plotter queue is empty | |
while not self.isEmpty(): | |||
self.run() | |||
|
r897 | ||
|
r732 | self.close() | |
|
r897 | ||
|
r732 | def isErrorDetected(self): | |
|
r897 | ||
|
r732 | self.__lock.acquire() | |
|
r897 | ||
|
r732 | err = self.__err | |
|
r897 | ||
|
r732 | self.__lock.release() | |
|
r897 | ||
|
r1167 | return err |