##// END OF EJS Templates
Allow to send data to new realtime app
Juan C. Espinoza -
r1212:bae4ef96bf63
parent child
Show More
@@ -1,824 +1,783
1
1
2 import os
2 import os
3 import sys
3 import sys
4 import zmq
4 import zmq
5 import time
5 import time
6 import numpy
6 import datetime
7 import datetime
7 from functools import wraps
8 from functools import wraps
8 import numpy
9 from threading import Thread
9 import matplotlib
10 import matplotlib
10
11
11 if 'BACKEND' in os.environ:
12 if 'BACKEND' in os.environ:
12 matplotlib.use(os.environ['BACKEND'])
13 matplotlib.use(os.environ['BACKEND'])
13 elif 'linux' in sys.platform:
14 elif 'linux' in sys.platform:
14 matplotlib.use("TkAgg")
15 matplotlib.use("TkAgg")
15 elif 'darwin' in sys.platform:
16 elif 'darwin' in sys.platform:
16 matplotlib.use('TkAgg')
17 matplotlib.use('WxAgg')
17 else:
18 else:
18 from schainpy.utils import log
19 from schainpy.utils import log
19 log.warning('Using default Backend="Agg"', 'INFO')
20 log.warning('Using default Backend="Agg"', 'INFO')
20 matplotlib.use('Agg')
21 matplotlib.use('Agg')
21
22
22 import matplotlib.pyplot as plt
23 import matplotlib.pyplot as plt
23 from matplotlib.patches import Polygon
24 from matplotlib.patches import Polygon
24 from mpl_toolkits.axes_grid1 import make_axes_locatable
25 from mpl_toolkits.axes_grid1 import make_axes_locatable
25 from matplotlib.ticker import FuncFormatter, LinearLocator, MultipleLocator
26 from matplotlib.ticker import FuncFormatter, LinearLocator, MultipleLocator
26
27
27 from schainpy.model.data.jrodata import PlotterData
28 from schainpy.model.data.jrodata import PlotterData
28 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
29 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
29 from schainpy.utils import log
30 from schainpy.utils import log
30
31
31 jet_values = matplotlib.pyplot.get_cmap('jet', 100)(numpy.arange(100))[10:90]
32 jet_values = matplotlib.pyplot.get_cmap('jet', 100)(numpy.arange(100))[10:90]
32 blu_values = matplotlib.pyplot.get_cmap(
33 blu_values = matplotlib.pyplot.get_cmap(
33 'seismic_r', 20)(numpy.arange(20))[10:15]
34 'seismic_r', 20)(numpy.arange(20))[10:15]
34 ncmap = matplotlib.colors.LinearSegmentedColormap.from_list(
35 ncmap = matplotlib.colors.LinearSegmentedColormap.from_list(
35 'jro', numpy.vstack((blu_values, jet_values)))
36 'jro', numpy.vstack((blu_values, jet_values)))
36 matplotlib.pyplot.register_cmap(cmap=ncmap)
37 matplotlib.pyplot.register_cmap(cmap=ncmap)
37
38
38 CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis',
39 CMAPS = [plt.get_cmap(s) for s in ('jro', 'jet', 'viridis',
39 'plasma', 'inferno', 'Greys', 'seismic', 'bwr', 'coolwarm')]
40 'plasma', 'inferno', 'Greys', 'seismic', 'bwr', 'coolwarm')]
40
41
41 EARTH_RADIUS = 6.3710e3
42 EARTH_RADIUS = 6.3710e3
42
43
43
44 def ll2xy(lat1, lon1, lat2, lon2):
44 def ll2xy(lat1, lon1, lat2, lon2):
45
45
46 p = 0.017453292519943295
46 p = 0.017453292519943295
47 a = 0.5 - numpy.cos((lat2 - lat1) * p)/2 + numpy.cos(lat1 * p) * \
47 a = 0.5 - numpy.cos((lat2 - lat1) * p)/2 + numpy.cos(lat1 * p) * \
48 numpy.cos(lat2 * p) * (1 - numpy.cos((lon2 - lon1) * p)) / 2
48 numpy.cos(lat2 * p) * (1 - numpy.cos((lon2 - lon1) * p)) / 2
49 r = 12742 * numpy.arcsin(numpy.sqrt(a))
49 r = 12742 * numpy.arcsin(numpy.sqrt(a))
50 theta = numpy.arctan2(numpy.sin((lon2-lon1)*p)*numpy.cos(lat2*p), numpy.cos(lat1*p)
50 theta = numpy.arctan2(numpy.sin((lon2-lon1)*p)*numpy.cos(lat2*p), numpy.cos(lat1*p)
51 * numpy.sin(lat2*p)-numpy.sin(lat1*p)*numpy.cos(lat2*p)*numpy.cos((lon2-lon1)*p))
51 * numpy.sin(lat2*p)-numpy.sin(lat1*p)*numpy.cos(lat2*p)*numpy.cos((lon2-lon1)*p))
52 theta = -theta + numpy.pi/2
52 theta = -theta + numpy.pi/2
53 return r*numpy.cos(theta), r*numpy.sin(theta)
53 return r*numpy.cos(theta), r*numpy.sin(theta)
54
54
55
55
56 def km2deg(km):
56 def km2deg(km):
57 '''
57 '''
58 Convert distance in km to degrees
58 Convert distance in km to degrees
59 '''
59 '''
60
60
61 return numpy.rad2deg(km/EARTH_RADIUS)
61 return numpy.rad2deg(km/EARTH_RADIUS)
62
62
63
63
64 def figpause(interval):
64 def figpause(interval):
65 backend = plt.rcParams['backend']
65 backend = plt.rcParams['backend']
66 if backend in matplotlib.rcsetup.interactive_bk:
66 if backend in matplotlib.rcsetup.interactive_bk:
67 figManager = matplotlib._pylab_helpers.Gcf.get_active()
67 figManager = matplotlib._pylab_helpers.Gcf.get_active()
68 if figManager is not None:
68 if figManager is not None:
69 canvas = figManager.canvas
69 canvas = figManager.canvas
70 if canvas.figure.stale:
70 if canvas.figure.stale:
71 canvas.draw()
71 canvas.draw()
72 try:
72 try:
73 canvas.start_event_loop(interval)
73 canvas.start_event_loop(interval)
74 except:
74 except:
75 pass
75 pass
76 return
76 return
77
77
78
78
79 def popup(message):
79 def popup(message):
80 '''
80 '''
81 '''
81 '''
82
82
83 fig = plt.figure(figsize=(12, 8), facecolor='r')
83 fig = plt.figure(figsize=(12, 8), facecolor='r')
84 text = '\n'.join([s.strip() for s in message.split(':')])
84 text = '\n'.join([s.strip() for s in message.split(':')])
85 fig.text(0.01, 0.5, text, ha='left', va='center',
85 fig.text(0.01, 0.5, text, ha='left', va='center',
86 size='20', weight='heavy', color='w')
86 size='20', weight='heavy', color='w')
87 fig.show()
87 fig.show()
88 figpause(1000)
88 figpause(1000)
89
89
90
90
91 class Throttle(object):
91 class Throttle(object):
92 '''
92 '''
93 Decorator that prevents a function from being called more than once every
93 Decorator that prevents a function from being called more than once every
94 time period.
94 time period.
95 To create a function that cannot be called more than once a minute, but
95 To create a function that cannot be called more than once a minute, but
96 will sleep until it can be called:
96 will sleep until it can be called:
97 @Throttle(minutes=1)
97 @Throttle(minutes=1)
98 def foo():
98 def foo():
99 pass
99 pass
100
100
101 for i in range(10):
101 for i in range(10):
102 foo()
102 foo()
103 print "This function has run %s times." % i
103 print "This function has run %s times." % i
104 '''
104 '''
105
105
106 def __init__(self, seconds=0, minutes=0, hours=0):
106 def __init__(self, seconds=0, minutes=0, hours=0):
107 self.throttle_period = datetime.timedelta(
107 self.throttle_period = datetime.timedelta(
108 seconds=seconds, minutes=minutes, hours=hours
108 seconds=seconds, minutes=minutes, hours=hours
109 )
109 )
110
110
111 self.time_of_last_call = datetime.datetime.min
111 self.time_of_last_call = datetime.datetime.min
112
112
113 def __call__(self, fn):
113 def __call__(self, fn):
114 @wraps(fn)
114 @wraps(fn)
115 def wrapper(*args, **kwargs):
115 def wrapper(*args, **kwargs):
116 coerce = kwargs.pop('coerce', None)
116 coerce = kwargs.pop('coerce', None)
117 if coerce:
117 if coerce:
118 self.time_of_last_call = datetime.datetime.now()
118 self.time_of_last_call = datetime.datetime.now()
119 return fn(*args, **kwargs)
119 return fn(*args, **kwargs)
120 else:
120 else:
121 now = datetime.datetime.now()
121 now = datetime.datetime.now()
122 time_since_last_call = now - self.time_of_last_call
122 time_since_last_call = now - self.time_of_last_call
123 time_left = self.throttle_period - time_since_last_call
123 time_left = self.throttle_period - time_since_last_call
124
124
125 if time_left > datetime.timedelta(seconds=0):
125 if time_left > datetime.timedelta(seconds=0):
126 return
126 return
127
127
128 self.time_of_last_call = datetime.datetime.now()
128 self.time_of_last_call = datetime.datetime.now()
129 return fn(*args, **kwargs)
129 return fn(*args, **kwargs)
130
130
131 return wrapper
131 return wrapper
132
132
133 def apply_throttle(value):
133 def apply_throttle(value):
134
134
135 @Throttle(seconds=value)
135 @Throttle(seconds=value)
136 def fnThrottled(fn):
136 def fnThrottled(fn):
137 fn()
137 fn()
138
138
139 return fnThrottled
139 return fnThrottled
140
140
141 @MPDecorator
142 class Plotter(ProcessingUnit):
143 '''
144 Proccessing unit to handle plot operations
145 '''
146
147 def __init__(self):
148
149 ProcessingUnit.__init__(self)
150
151 def setup(self, **kwargs):
152
153 self.connections = 0
154 self.web_address = kwargs.get('web_server', False)
155 self.realtime = kwargs.get('realtime', False)
156 self.localtime = kwargs.get('localtime', True)
157 self.buffering = kwargs.get('buffering', True)
158 self.throttle = kwargs.get('throttle', 2)
159 self.exp_code = kwargs.get('exp_code', None)
160 self.set_ready = apply_throttle(self.throttle)
161 self.dates = []
162 self.data = PlotterData(
163 self.plots, self.throttle, self.exp_code, self.buffering)
164 self.isConfig = True
165
166 def ready(self):
167 '''
168 Set dataOut ready
169 '''
170
171 self.data.ready = True
172 self.dataOut.data_plt = self.data
173
174 def run(self, realtime=True, localtime=True, buffering=True,
175 throttle=2, exp_code=None, web_server=None):
176
177 if not self.isConfig:
178 self.setup(realtime=realtime, localtime=localtime,
179 buffering=buffering, throttle=throttle, exp_code=exp_code,
180 web_server=web_server)
181
182 if self.web_address:
183 log.success(
184 'Sending to web: {}'.format(self.web_address),
185 self.name
186 )
187 self.context = zmq.Context()
188 self.sender_web = self.context.socket(zmq.REQ)
189 self.sender_web.connect(self.web_address)
190 self.poll = zmq.Poller()
191 self.poll.register(self.sender_web, zmq.POLLIN)
192 time.sleep(1)
193
194 # t = Thread(target=self.event_monitor, args=(monitor,))
195 # t.start()
196
197 self.dataOut = self.dataIn
198 self.data.ready = False
199
200 if self.dataOut.flagNoData:
201 coerce = True
202 else:
203 coerce = False
204
205 if self.dataOut.type == 'Parameters':
206 tm = self.dataOut.utctimeInit
207 else:
208 tm = self.dataOut.utctime
209 if self.dataOut.useLocalTime:
210 if not self.localtime:
211 tm += time.timezone
212 dt = datetime.datetime.fromtimestamp(tm).date()
213 else:
214 if self.localtime:
215 tm -= time.timezone
216 dt = datetime.datetime.utcfromtimestamp(tm).date()
217 if dt not in self.dates:
218 if self.data:
219 self.ready()
220 self.data.setup()
221 self.dates.append(dt)
222
223 self.data.update(self.dataOut, tm)
224
225 if False: # TODO check when publishers ends
226 self.connections -= 1
227 if self.connections == 0 and dt in self.dates:
228 self.data.ended = True
229 self.ready()
230 time.sleep(1)
231 else:
232 if self.realtime:
233 self.ready()
234 if self.web_address:
235 retries = 5
236 while True:
237 self.sender_web.send(self.data.jsonify())
238 socks = dict(self.poll.poll(5000))
239 if socks.get(self.sender_web) == zmq.POLLIN:
240 reply = self.sender_web.recv_string()
241 if reply == 'ok':
242 log.log("Response from server ok", self.name)
243 break
244 else:
245 log.warning(
246 "Malformed reply from server: {}".format(reply), self.name)
247
248 else:
249 log.warning(
250 "No response from server, retrying...", self.name)
251 self.sender_web.setsockopt(zmq.LINGER, 0)
252 self.sender_web.close()
253 self.poll.unregister(self.sender_web)
254 retries -= 1
255 if retries == 0:
256 log.error(
257 "Server seems to be offline, abandoning", self.name)
258 self.sender_web = self.context.socket(zmq.REQ)
259 self.sender_web.connect(self.web_address)
260 self.poll.register(self.sender_web, zmq.POLLIN)
261 time.sleep(1)
262 break
263 self.sender_web = self.context.socket(zmq.REQ)
264 self.sender_web.connect(self.web_address)
265 self.poll.register(self.sender_web, zmq.POLLIN)
266 time.sleep(1)
267 else:
268 self.set_ready(self.ready, coerce=coerce)
269
270 return
271
272 def close(self):
273 pass
274
275
141
276 @MPDecorator
142 @MPDecorator
277 class Plot(Operation):
143 class Plot(Operation):
278 '''
144 '''
279 Base class for Schain plotting operations
145 Base class for Schain plotting operations
280 '''
146 '''
281
147
282 CODE = 'Figure'
148 CODE = 'Figure'
283 colormap = 'jro'
149 colormap = 'jet'
284 bgcolor = 'white'
150 bgcolor = 'white'
285 __missing = 1E30
151 __missing = 1E30
286
152
287 __attrs__ = ['show', 'save', 'xmin', 'xmax', 'ymin', 'ymax', 'zmin', 'zmax',
153 __attrs__ = ['show', 'save', 'xmin', 'xmax', 'ymin', 'ymax', 'zmin', 'zmax',
288 'zlimits', 'xlabel', 'ylabel', 'xaxis', 'cb_label', 'title',
154 'zlimits', 'xlabel', 'ylabel', 'xaxis', 'cb_label', 'title',
289 'colorbar', 'bgcolor', 'width', 'height', 'localtime', 'oneFigure',
155 'colorbar', 'bgcolor', 'width', 'height', 'localtime', 'oneFigure',
290 'showprofile', 'decimation', 'pause']
156 'showprofile', 'decimation', 'pause']
291
157
292 def __init__(self):
158 def __init__(self):
293
159
294 Operation.__init__(self)
160 Operation.__init__(self)
295 self.isConfig = False
161 self.isConfig = False
296 self.isPlotConfig = False
162 self.isPlotConfig = False
163 self.save_counter = 1
164 self.sender_counter = 1
297
165
298 def __fmtTime(self, x, pos):
166 def __fmtTime(self, x, pos):
299 '''
167 '''
300 '''
168 '''
301
169
302 return '{}'.format(self.getDateTime(x).strftime('%H:%M'))
170 return '{}'.format(self.getDateTime(x).strftime('%H:%M'))
303
171
304 def __setup(self, **kwargs):
172 def __setup(self, **kwargs):
305 '''
173 '''
306 Initialize variables
174 Initialize variables
307 '''
175 '''
308
176
309 self.figures = []
177 self.figures = []
310 self.axes = []
178 self.axes = []
311 self.cb_axes = []
179 self.cb_axes = []
312 self.localtime = kwargs.pop('localtime', True)
180 self.localtime = kwargs.pop('localtime', True)
313 self.show = kwargs.get('show', True)
181 self.show = kwargs.get('show', True)
314 self.save = kwargs.get('save', False)
182 self.save = kwargs.get('save', False)
183 self.save_period = kwargs.get('save_period', 2)
315 self.ftp = kwargs.get('ftp', False)
184 self.ftp = kwargs.get('ftp', False)
316 self.colormap = kwargs.get('colormap', self.colormap)
185 self.colormap = kwargs.get('colormap', self.colormap)
317 self.colormap_coh = kwargs.get('colormap_coh', 'jet')
186 self.colormap_coh = kwargs.get('colormap_coh', 'jet')
318 self.colormap_phase = kwargs.get('colormap_phase', 'RdBu_r')
187 self.colormap_phase = kwargs.get('colormap_phase', 'RdBu_r')
319 self.colormaps = kwargs.get('colormaps', None)
188 self.colormaps = kwargs.get('colormaps', None)
320 self.bgcolor = kwargs.get('bgcolor', self.bgcolor)
189 self.bgcolor = kwargs.get('bgcolor', self.bgcolor)
321 self.showprofile = kwargs.get('showprofile', False)
190 self.showprofile = kwargs.get('showprofile', False)
322 self.title = kwargs.get('wintitle', self.CODE.upper())
191 self.title = kwargs.get('wintitle', self.CODE.upper())
323 self.cb_label = kwargs.get('cb_label', None)
192 self.cb_label = kwargs.get('cb_label', None)
324 self.cb_labels = kwargs.get('cb_labels', None)
193 self.cb_labels = kwargs.get('cb_labels', None)
325 self.labels = kwargs.get('labels', None)
194 self.labels = kwargs.get('labels', None)
326 self.xaxis = kwargs.get('xaxis', 'frequency')
195 self.xaxis = kwargs.get('xaxis', 'frequency')
327 self.zmin = kwargs.get('zmin', None)
196 self.zmin = kwargs.get('zmin', None)
328 self.zmax = kwargs.get('zmax', None)
197 self.zmax = kwargs.get('zmax', None)
329 self.zlimits = kwargs.get('zlimits', None)
198 self.zlimits = kwargs.get('zlimits', None)
330 self.xmin = kwargs.get('xmin', None)
199 self.xmin = kwargs.get('xmin', None)
331 self.xmax = kwargs.get('xmax', None)
200 self.xmax = kwargs.get('xmax', None)
332 self.xrange = kwargs.get('xrange', 12)
201 self.xrange = kwargs.get('xrange', 12)
333 self.xscale = kwargs.get('xscale', None)
202 self.xscale = kwargs.get('xscale', None)
334 self.ymin = kwargs.get('ymin', None)
203 self.ymin = kwargs.get('ymin', None)
335 self.ymax = kwargs.get('ymax', None)
204 self.ymax = kwargs.get('ymax', None)
336 self.yscale = kwargs.get('yscale', None)
205 self.yscale = kwargs.get('yscale', None)
337 self.xlabel = kwargs.get('xlabel', None)
206 self.xlabel = kwargs.get('xlabel', None)
338 self.decimation = kwargs.get('decimation', None)
207 self.decimation = kwargs.get('decimation', None)
339 self.showSNR = kwargs.get('showSNR', False)
208 self.showSNR = kwargs.get('showSNR', False)
340 self.oneFigure = kwargs.get('oneFigure', True)
209 self.oneFigure = kwargs.get('oneFigure', True)
341 self.width = kwargs.get('width', None)
210 self.width = kwargs.get('width', None)
342 self.height = kwargs.get('height', None)
211 self.height = kwargs.get('height', None)
343 self.colorbar = kwargs.get('colorbar', True)
212 self.colorbar = kwargs.get('colorbar', True)
344 self.factors = kwargs.get('factors', [1, 1, 1, 1, 1, 1, 1, 1])
213 self.factors = kwargs.get('factors', [1, 1, 1, 1, 1, 1, 1, 1])
345 self.channels = kwargs.get('channels', None)
214 self.channels = kwargs.get('channels', None)
346 self.titles = kwargs.get('titles', [])
215 self.titles = kwargs.get('titles', [])
347 self.polar = False
216 self.polar = False
348 self.type = kwargs.get('type', 'iq')
217 self.type = kwargs.get('type', 'iq')
349 self.grid = kwargs.get('grid', False)
218 self.grid = kwargs.get('grid', False)
350 self.pause = kwargs.get('pause', False)
219 self.pause = kwargs.get('pause', False)
351 self.save_labels = kwargs.get('save_labels', None)
220 self.save_labels = kwargs.get('save_labels', None)
352 self.realtime = kwargs.get('realtime', True)
221 self.realtime = kwargs.get('realtime', True)
353 self.buffering = kwargs.get('buffering', True)
222 self.buffering = kwargs.get('buffering', True)
354 self.throttle = kwargs.get('throttle', 2)
223 self.throttle = kwargs.get('throttle', 2)
355 self.exp_code = kwargs.get('exp_code', None)
224 self.exp_code = kwargs.get('exp_code', None)
225 self.plot_server = kwargs.get('plot_server', False)
226 self.sender_period = kwargs.get('sender_period', 2)
356 self.__throttle_plot = apply_throttle(self.throttle)
227 self.__throttle_plot = apply_throttle(self.throttle)
357 self.data = PlotterData(
228 self.data = PlotterData(
358 self.CODE, self.throttle, self.exp_code, self.buffering)
229 self.CODE, self.throttle, self.exp_code, self.buffering)
359
230
231 if self.plot_server:
232 if not self.plot_server.startswith('tcp://'):
233 self.plot_server = 'tcp://{}'.format(self.plot_server)
234 log.success(
235 'Sending to server: {}'.format(self.plot_server),
236 self.name
237 )
238
360 def __setup_plot(self):
239 def __setup_plot(self):
361 '''
240 '''
362 Common setup for all figures, here figures and axes are created
241 Common setup for all figures, here figures and axes are created
363 '''
242 '''
364
243
365 self.setup()
244 self.setup()
366
245
367 self.time_label = 'LT' if self.localtime else 'UTC'
246 self.time_label = 'LT' if self.localtime else 'UTC'
368 if self.data.localtime:
247 if self.data.localtime:
369 self.getDateTime = datetime.datetime.fromtimestamp
248 self.getDateTime = datetime.datetime.fromtimestamp
370 else:
249 else:
371 self.getDateTime = datetime.datetime.utcfromtimestamp
250 self.getDateTime = datetime.datetime.utcfromtimestamp
372
251
373 if self.width is None:
252 if self.width is None:
374 self.width = 8
253 self.width = 8
375
254
376 self.figures = []
255 self.figures = []
377 self.axes = []
256 self.axes = []
378 self.cb_axes = []
257 self.cb_axes = []
379 self.pf_axes = []
258 self.pf_axes = []
380 self.cmaps = []
259 self.cmaps = []
381
260
382 size = '15%' if self.ncols == 1 else '30%'
261 size = '15%' if self.ncols == 1 else '30%'
383 pad = '4%' if self.ncols == 1 else '8%'
262 pad = '4%' if self.ncols == 1 else '8%'
384
263
385 if self.oneFigure:
264 if self.oneFigure:
386 if self.height is None:
265 if self.height is None:
387 self.height = 1.4 * self.nrows + 1
266 self.height = 1.4 * self.nrows + 1
388 fig = plt.figure(figsize=(self.width, self.height),
267 fig = plt.figure(figsize=(self.width, self.height),
389 edgecolor='k',
268 edgecolor='k',
390 facecolor='w')
269 facecolor='w')
391 self.figures.append(fig)
270 self.figures.append(fig)
392 for n in range(self.nplots):
271 for n in range(self.nplots):
393 ax = fig.add_subplot(self.nrows, self.ncols,
272 ax = fig.add_subplot(self.nrows, self.ncols,
394 n + 1, polar=self.polar)
273 n + 1, polar=self.polar)
395 ax.tick_params(labelsize=8)
274 ax.tick_params(labelsize=8)
396 ax.firsttime = True
275 ax.firsttime = True
397 ax.index = 0
276 ax.index = 0
398 ax.press = None
277 ax.press = None
399 self.axes.append(ax)
278 self.axes.append(ax)
400 if self.showprofile:
279 if self.showprofile:
401 cax = self.__add_axes(ax, size=size, pad=pad)
280 cax = self.__add_axes(ax, size=size, pad=pad)
402 cax.tick_params(labelsize=8)
281 cax.tick_params(labelsize=8)
403 self.pf_axes.append(cax)
282 self.pf_axes.append(cax)
404 else:
283 else:
405 if self.height is None:
284 if self.height is None:
406 self.height = 3
285 self.height = 3
407 for n in range(self.nplots):
286 for n in range(self.nplots):
408 fig = plt.figure(figsize=(self.width, self.height),
287 fig = plt.figure(figsize=(self.width, self.height),
409 edgecolor='k',
288 edgecolor='k',
410 facecolor='w')
289 facecolor='w')
411 ax = fig.add_subplot(1, 1, 1, polar=self.polar)
290 ax = fig.add_subplot(1, 1, 1, polar=self.polar)
412 ax.tick_params(labelsize=8)
291 ax.tick_params(labelsize=8)
413 ax.firsttime = True
292 ax.firsttime = True
414 ax.index = 0
293 ax.index = 0
415 ax.press = None
294 ax.press = None
416 self.figures.append(fig)
295 self.figures.append(fig)
417 self.axes.append(ax)
296 self.axes.append(ax)
418 if self.showprofile:
297 if self.showprofile:
419 cax = self.__add_axes(ax, size=size, pad=pad)
298 cax = self.__add_axes(ax, size=size, pad=pad)
420 cax.tick_params(labelsize=8)
299 cax.tick_params(labelsize=8)
421 self.pf_axes.append(cax)
300 self.pf_axes.append(cax)
422
301
423 for n in range(self.nrows):
302 for n in range(self.nrows):
424 if self.colormaps is not None:
303 if self.colormaps is not None:
425 cmap = plt.get_cmap(self.colormaps[n])
304 cmap = plt.get_cmap(self.colormaps[n])
426 else:
305 else:
427 cmap = plt.get_cmap(self.colormap)
306 cmap = plt.get_cmap(self.colormap)
428 cmap.set_bad(self.bgcolor, 1.)
307 cmap.set_bad(self.bgcolor, 1.)
429 self.cmaps.append(cmap)
308 self.cmaps.append(cmap)
430
309
431 for fig in self.figures:
310 for fig in self.figures:
432 fig.canvas.mpl_connect('key_press_event', self.OnKeyPress)
311 fig.canvas.mpl_connect('key_press_event', self.OnKeyPress)
433 fig.canvas.mpl_connect('scroll_event', self.OnBtnScroll)
312 fig.canvas.mpl_connect('scroll_event', self.OnBtnScroll)
434 fig.canvas.mpl_connect('button_press_event', self.onBtnPress)
313 fig.canvas.mpl_connect('button_press_event', self.onBtnPress)
435 fig.canvas.mpl_connect('motion_notify_event', self.onMotion)
314 fig.canvas.mpl_connect('motion_notify_event', self.onMotion)
436 fig.canvas.mpl_connect('button_release_event', self.onBtnRelease)
315 fig.canvas.mpl_connect('button_release_event', self.onBtnRelease)
437 if self.show:
316 if self.show:
438 fig.show()
317 fig.show()
439
318
440 def OnKeyPress(self, event):
319 def OnKeyPress(self, event):
441 '''
320 '''
442 Event for pressing keys (up, down) change colormap
321 Event for pressing keys (up, down) change colormap
443 '''
322 '''
444 ax = event.inaxes
323 ax = event.inaxes
445 if ax in self.axes:
324 if ax in self.axes:
446 if event.key == 'down':
325 if event.key == 'down':
447 ax.index += 1
326 ax.index += 1
448 elif event.key == 'up':
327 elif event.key == 'up':
449 ax.index -= 1
328 ax.index -= 1
450 if ax.index < 0:
329 if ax.index < 0:
451 ax.index = len(CMAPS) - 1
330 ax.index = len(CMAPS) - 1
452 elif ax.index == len(CMAPS):
331 elif ax.index == len(CMAPS):
453 ax.index = 0
332 ax.index = 0
454 cmap = CMAPS[ax.index]
333 cmap = CMAPS[ax.index]
455 ax.cbar.set_cmap(cmap)
334 ax.cbar.set_cmap(cmap)
456 ax.cbar.draw_all()
335 ax.cbar.draw_all()
457 ax.plt.set_cmap(cmap)
336 ax.plt.set_cmap(cmap)
458 ax.cbar.patch.figure.canvas.draw()
337 ax.cbar.patch.figure.canvas.draw()
459 self.colormap = cmap.name
338 self.colormap = cmap.name
460
339
461 def OnBtnScroll(self, event):
340 def OnBtnScroll(self, event):
462 '''
341 '''
463 Event for scrolling, scale figure
342 Event for scrolling, scale figure
464 '''
343 '''
465 cb_ax = event.inaxes
344 cb_ax = event.inaxes
466 if cb_ax in [ax.cbar.ax for ax in self.axes if ax.cbar]:
345 if cb_ax in [ax.cbar.ax for ax in self.axes if ax.cbar]:
467 ax = [ax for ax in self.axes if cb_ax == ax.cbar.ax][0]
346 ax = [ax for ax in self.axes if cb_ax == ax.cbar.ax][0]
468 pt = ax.cbar.ax.bbox.get_points()[:, 1]
347 pt = ax.cbar.ax.bbox.get_points()[:, 1]
469 nrm = ax.cbar.norm
348 nrm = ax.cbar.norm
470 vmin, vmax, p0, p1, pS = (
349 vmin, vmax, p0, p1, pS = (
471 nrm.vmin, nrm.vmax, pt[0], pt[1], event.y)
350 nrm.vmin, nrm.vmax, pt[0], pt[1], event.y)
472 scale = 2 if event.step == 1 else 0.5
351 scale = 2 if event.step == 1 else 0.5
473 point = vmin + (vmax - vmin) / (p1 - p0) * (pS - p0)
352 point = vmin + (vmax - vmin) / (p1 - p0) * (pS - p0)
474 ax.cbar.norm.vmin = point - scale * (point - vmin)
353 ax.cbar.norm.vmin = point - scale * (point - vmin)
475 ax.cbar.norm.vmax = point - scale * (point - vmax)
354 ax.cbar.norm.vmax = point - scale * (point - vmax)
476 ax.plt.set_norm(ax.cbar.norm)
355 ax.plt.set_norm(ax.cbar.norm)
477 ax.cbar.draw_all()
356 ax.cbar.draw_all()
478 ax.cbar.patch.figure.canvas.draw()
357 ax.cbar.patch.figure.canvas.draw()
479
358
480 def onBtnPress(self, event):
359 def onBtnPress(self, event):
481 '''
360 '''
482 Event for mouse button press
361 Event for mouse button press
483 '''
362 '''
484 cb_ax = event.inaxes
363 cb_ax = event.inaxes
485 if cb_ax is None:
364 if cb_ax is None:
486 return
365 return
487
366
488 if cb_ax in [ax.cbar.ax for ax in self.axes if ax.cbar]:
367 if cb_ax in [ax.cbar.ax for ax in self.axes if ax.cbar]:
489 cb_ax.press = event.x, event.y
368 cb_ax.press = event.x, event.y
490 else:
369 else:
491 cb_ax.press = None
370 cb_ax.press = None
492
371
493 def onMotion(self, event):
372 def onMotion(self, event):
494 '''
373 '''
495 Event for move inside colorbar
374 Event for move inside colorbar
496 '''
375 '''
497 cb_ax = event.inaxes
376 cb_ax = event.inaxes
498 if cb_ax is None:
377 if cb_ax is None:
499 return
378 return
500 if cb_ax not in [ax.cbar.ax for ax in self.axes if ax.cbar]:
379 if cb_ax not in [ax.cbar.ax for ax in self.axes if ax.cbar]:
501 return
380 return
502 if cb_ax.press is None:
381 if cb_ax.press is None:
503 return
382 return
504
383
505 ax = [ax for ax in self.axes if cb_ax == ax.cbar.ax][0]
384 ax = [ax for ax in self.axes if cb_ax == ax.cbar.ax][0]
506 xprev, yprev = cb_ax.press
385 xprev, yprev = cb_ax.press
507 dx = event.x - xprev
386 dx = event.x - xprev
508 dy = event.y - yprev
387 dy = event.y - yprev
509 cb_ax.press = event.x, event.y
388 cb_ax.press = event.x, event.y
510 scale = ax.cbar.norm.vmax - ax.cbar.norm.vmin
389 scale = ax.cbar.norm.vmax - ax.cbar.norm.vmin
511 perc = 0.03
390 perc = 0.03
512
391
513 if event.button == 1:
392 if event.button == 1:
514 ax.cbar.norm.vmin -= (perc * scale) * numpy.sign(dy)
393 ax.cbar.norm.vmin -= (perc * scale) * numpy.sign(dy)
515 ax.cbar.norm.vmax -= (perc * scale) * numpy.sign(dy)
394 ax.cbar.norm.vmax -= (perc * scale) * numpy.sign(dy)
516 elif event.button == 3:
395 elif event.button == 3:
517 ax.cbar.norm.vmin -= (perc * scale) * numpy.sign(dy)
396 ax.cbar.norm.vmin -= (perc * scale) * numpy.sign(dy)
518 ax.cbar.norm.vmax += (perc * scale) * numpy.sign(dy)
397 ax.cbar.norm.vmax += (perc * scale) * numpy.sign(dy)
519
398
520 ax.cbar.draw_all()
399 ax.cbar.draw_all()
521 ax.plt.set_norm(ax.cbar.norm)
400 ax.plt.set_norm(ax.cbar.norm)
522 ax.cbar.patch.figure.canvas.draw()
401 ax.cbar.patch.figure.canvas.draw()
523
402
524 def onBtnRelease(self, event):
403 def onBtnRelease(self, event):
525 '''
404 '''
526 Event for mouse button release
405 Event for mouse button release
527 '''
406 '''
528 cb_ax = event.inaxes
407 cb_ax = event.inaxes
529 if cb_ax is not None:
408 if cb_ax is not None:
530 cb_ax.press = None
409 cb_ax.press = None
531
410
532 def __add_axes(self, ax, size='30%', pad='8%'):
411 def __add_axes(self, ax, size='30%', pad='8%'):
533 '''
412 '''
534 Add new axes to the given figure
413 Add new axes to the given figure
535 '''
414 '''
536 divider = make_axes_locatable(ax)
415 divider = make_axes_locatable(ax)
537 nax = divider.new_horizontal(size=size, pad=pad)
416 nax = divider.new_horizontal(size=size, pad=pad)
538 ax.figure.add_axes(nax)
417 ax.figure.add_axes(nax)
539 return nax
418 return nax
540
419
541 def setup(self):
542 '''
543 This method should be implemented in the child class, the following
544 attributes should be set:
545
546 self.nrows: number of rows
547 self.ncols: number of cols
548 self.nplots: number of plots (channels or pairs)
549 self.ylabel: label for Y axes
550 self.titles: list of axes title
551
552 '''
553 raise NotImplementedError
554
555 def fill_gaps(self, x_buffer, y_buffer, z_buffer):
420 def fill_gaps(self, x_buffer, y_buffer, z_buffer):
556 '''
421 '''
557 Create a masked array for missing data
422 Create a masked array for missing data
558 '''
423 '''
559 if x_buffer.shape[0] < 2:
424 if x_buffer.shape[0] < 2:
560 return x_buffer, y_buffer, z_buffer
425 return x_buffer, y_buffer, z_buffer
561
426
562 deltas = x_buffer[1:] - x_buffer[0:-1]
427 deltas = x_buffer[1:] - x_buffer[0:-1]
563 x_median = numpy.median(deltas)
428 x_median = numpy.median(deltas)
564
429
565 index = numpy.where(deltas > 5 * x_median)
430 index = numpy.where(deltas > 5 * x_median)
566
431
567 if len(index[0]) != 0:
432 if len(index[0]) != 0:
568 z_buffer[::, index[0], ::] = self.__missing
433 z_buffer[::, index[0], ::] = self.__missing
569 z_buffer = numpy.ma.masked_inside(z_buffer,
434 z_buffer = numpy.ma.masked_inside(z_buffer,
570 0.99 * self.__missing,
435 0.99 * self.__missing,
571 1.01 * self.__missing)
436 1.01 * self.__missing)
572
437
573 return x_buffer, y_buffer, z_buffer
438 return x_buffer, y_buffer, z_buffer
574
439
575 def decimate(self):
440 def decimate(self):
576
441
577 # dx = int(len(self.x)/self.__MAXNUMX) + 1
442 # dx = int(len(self.x)/self.__MAXNUMX) + 1
578 dy = int(len(self.y) / self.decimation) + 1
443 dy = int(len(self.y) / self.decimation) + 1
579
444
580 # x = self.x[::dx]
445 # x = self.x[::dx]
581 x = self.x
446 x = self.x
582 y = self.y[::dy]
447 y = self.y[::dy]
583 z = self.z[::, ::, ::dy]
448 z = self.z[::, ::, ::dy]
584
449
585 return x, y, z
450 return x, y, z
586
451
587 def format(self):
452 def format(self):
588 '''
453 '''
589 Set min and max values, labels, ticks and titles
454 Set min and max values, labels, ticks and titles
590 '''
455 '''
591
456
592 if self.xmin is None:
457 if self.xmin is None:
593 xmin = self.data.min_time
458 xmin = self.data.min_time
594 else:
459 else:
595 if self.xaxis is 'time':
460 if self.xaxis is 'time':
596 dt = self.getDateTime(self.data.min_time)
461 dt = self.getDateTime(self.data.min_time)
597 xmin = (dt.replace(hour=int(self.xmin), minute=0, second=0) -
462 xmin = (dt.replace(hour=int(self.xmin), minute=0, second=0) -
598 datetime.datetime(1970, 1, 1)).total_seconds()
463 datetime.datetime(1970, 1, 1)).total_seconds()
599 if self.data.localtime:
464 if self.data.localtime:
600 xmin += time.timezone
465 xmin += time.timezone
601 else:
466 else:
602 xmin = self.xmin
467 xmin = self.xmin
603
468
604 if self.xmax is None:
469 if self.xmax is None:
605 xmax = xmin + self.xrange * 60 * 60
470 xmax = xmin + self.xrange * 60 * 60
606 else:
471 else:
607 if self.xaxis is 'time':
472 if self.xaxis is 'time':
608 dt = self.getDateTime(self.data.max_time)
473 dt = self.getDateTime(self.data.max_time)
609 xmax = (dt.replace(hour=int(self.xmax), minute=59, second=59) -
474 xmax = (dt.replace(hour=int(self.xmax), minute=59, second=59) -
610 datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=1)).total_seconds()
475 datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=1)).total_seconds()
611 if self.data.localtime:
476 if self.data.localtime:
612 xmax += time.timezone
477 xmax += time.timezone
613 else:
478 else:
614 xmax = self.xmax
479 xmax = self.xmax
615
480
616 ymin = self.ymin if self.ymin else numpy.nanmin(self.y)
481 ymin = self.ymin if self.ymin else numpy.nanmin(self.y)
617 ymax = self.ymax if self.ymax else numpy.nanmax(self.y)
482 ymax = self.ymax if self.ymax else numpy.nanmax(self.y)
618 #Y = numpy.array([1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 50000])
483 #Y = numpy.array([1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 50000])
619
484
620 #i = 1 if numpy.where(
485 #i = 1 if numpy.where(
621 # abs(ymax-ymin) <= Y)[0][0] < 0 else numpy.where(abs(ymax-ymin) <= Y)[0][0]
486 # abs(ymax-ymin) <= Y)[0][0] < 0 else numpy.where(abs(ymax-ymin) <= Y)[0][0]
622 #ystep = Y[i] / 10.
487 #ystep = Y[i] / 10.
623 dig = int(numpy.log10(ymax))
488 dig = int(numpy.log10(ymax))
624 if dig == 0:
489 if dig == 0:
625 digD = len(str(ymax)) - 2
490 digD = len(str(ymax)) - 2
626 ydec = ymax*(10**digD)
491 ydec = ymax*(10**digD)
627
492
628 dig = int(numpy.log10(ydec))
493 dig = int(numpy.log10(ydec))
629 ystep = ((ydec + (10**(dig)))//10**(dig))*(10**(dig))
494 ystep = ((ydec + (10**(dig)))//10**(dig))*(10**(dig))
630 ystep = ystep/5
495 ystep = ystep/5
631 ystep = ystep/(10**digD)
496 ystep = ystep/(10**digD)
632
497
633 else:
498 else:
634 ystep = ((ymax + (10**(dig)))//10**(dig))*(10**(dig))
499 ystep = ((ymax + (10**(dig)))//10**(dig))*(10**(dig))
635 ystep = ystep/5
500 ystep = ystep/5
636
501
637 if self.xaxis is not 'time':
502 if self.xaxis is not 'time':
638
503
639 dig = int(numpy.log10(xmax))
504 dig = int(numpy.log10(xmax))
640
505
641 if dig <= 0:
506 if dig <= 0:
642 digD = len(str(xmax)) - 2
507 digD = len(str(xmax)) - 2
643 xdec = xmax*(10**digD)
508 xdec = xmax*(10**digD)
644
509
645 dig = int(numpy.log10(xdec))
510 dig = int(numpy.log10(xdec))
646 xstep = ((xdec + (10**(dig)))//10**(dig))*(10**(dig))
511 xstep = ((xdec + (10**(dig)))//10**(dig))*(10**(dig))
647 xstep = xstep*0.5
512 xstep = xstep*0.5
648 xstep = xstep/(10**digD)
513 xstep = xstep/(10**digD)
649
514
650 else:
515 else:
651 xstep = ((xmax + (10**(dig)))//10**(dig))*(10**(dig))
516 xstep = ((xmax + (10**(dig)))//10**(dig))*(10**(dig))
652 xstep = xstep/5
517 xstep = xstep/5
653
518
654 for n, ax in enumerate(self.axes):
519 for n, ax in enumerate(self.axes):
655 if ax.firsttime:
520 if ax.firsttime:
656 ax.set_facecolor(self.bgcolor)
521 ax.set_facecolor(self.bgcolor)
657 ax.yaxis.set_major_locator(MultipleLocator(ystep))
522 ax.yaxis.set_major_locator(MultipleLocator(ystep))
658 if self.xscale:
523 if self.xscale:
659 ax.xaxis.set_major_formatter(FuncFormatter(
524 ax.xaxis.set_major_formatter(FuncFormatter(
660 lambda x, pos: '{0:g}'.format(x*self.xscale)))
525 lambda x, pos: '{0:g}'.format(x*self.xscale)))
661 if self.xscale:
526 if self.xscale:
662 ax.yaxis.set_major_formatter(FuncFormatter(
527 ax.yaxis.set_major_formatter(FuncFormatter(
663 lambda x, pos: '{0:g}'.format(x*self.yscale)))
528 lambda x, pos: '{0:g}'.format(x*self.yscale)))
664 if self.xaxis is 'time':
529 if self.xaxis is 'time':
665 ax.xaxis.set_major_formatter(FuncFormatter(self.__fmtTime))
530 ax.xaxis.set_major_formatter(FuncFormatter(self.__fmtTime))
666 ax.xaxis.set_major_locator(LinearLocator(9))
531 ax.xaxis.set_major_locator(LinearLocator(9))
667 else:
532 else:
668 ax.xaxis.set_major_locator(MultipleLocator(xstep))
533 ax.xaxis.set_major_locator(MultipleLocator(xstep))
669 if self.xlabel is not None:
534 if self.xlabel is not None:
670 ax.set_xlabel(self.xlabel)
535 ax.set_xlabel(self.xlabel)
671 ax.set_ylabel(self.ylabel)
536 ax.set_ylabel(self.ylabel)
672 ax.firsttime = False
537 ax.firsttime = False
673 if self.showprofile:
538 if self.showprofile:
674 self.pf_axes[n].set_ylim(ymin, ymax)
539 self.pf_axes[n].set_ylim(ymin, ymax)
675 self.pf_axes[n].set_xlim(self.zmin, self.zmax)
540 self.pf_axes[n].set_xlim(self.zmin, self.zmax)
676 self.pf_axes[n].set_xlabel('dB')
541 self.pf_axes[n].set_xlabel('dB')
677 self.pf_axes[n].grid(b=True, axis='x')
542 self.pf_axes[n].grid(b=True, axis='x')
678 [tick.set_visible(False)
543 [tick.set_visible(False)
679 for tick in self.pf_axes[n].get_yticklabels()]
544 for tick in self.pf_axes[n].get_yticklabels()]
680 if self.colorbar:
545 if self.colorbar:
681 ax.cbar = plt.colorbar(
546 ax.cbar = plt.colorbar(
682 ax.plt, ax=ax, fraction=0.05, pad=0.02, aspect=10)
547 ax.plt, ax=ax, fraction=0.05, pad=0.02, aspect=10)
683 ax.cbar.ax.tick_params(labelsize=8)
548 ax.cbar.ax.tick_params(labelsize=8)
684 ax.cbar.ax.press = None
549 ax.cbar.ax.press = None
685 if self.cb_label:
550 if self.cb_label:
686 ax.cbar.set_label(self.cb_label, size=8)
551 ax.cbar.set_label(self.cb_label, size=8)
687 elif self.cb_labels:
552 elif self.cb_labels:
688 ax.cbar.set_label(self.cb_labels[n], size=8)
553 ax.cbar.set_label(self.cb_labels[n], size=8)
689 else:
554 else:
690 ax.cbar = None
555 ax.cbar = None
691 if self.grid:
556 if self.grid:
692 ax.grid(True)
557 ax.grid(True)
693
558
694 if not self.polar:
559 if not self.polar:
695 ax.set_xlim(xmin, xmax)
560 ax.set_xlim(xmin, xmax)
696 ax.set_ylim(ymin, ymax)
561 ax.set_ylim(ymin, ymax)
697 ax.set_title('{} {} {}'.format(
562 ax.set_title('{} {} {}'.format(
698 self.titles[n],
563 self.titles[n],
699 self.getDateTime(self.data.max_time).strftime(
564 self.getDateTime(self.data.max_time).strftime(
700 '%H:%M:%S'),
565 '%H:%M:%S'),
701 self.time_label),
566 self.time_label),
702 size=8)
567 size=8)
703 else:
568 else:
704 ax.set_title('{}'.format(self.titles[n]), size=8)
569 ax.set_title('{}'.format(self.titles[n]), size=8)
705 ax.set_ylim(0, 90)
570 ax.set_ylim(0, 90)
706 ax.set_yticks(numpy.arange(0, 90, 20))
571 ax.set_yticks(numpy.arange(0, 90, 20))
707 ax.yaxis.labelpad = 40
572 ax.yaxis.labelpad = 40
708
573
709 def clear_figures(self):
574 def clear_figures(self):
710 '''
575 '''
711 Reset axes for redraw plots
576 Reset axes for redraw plots
712 '''
577 '''
713
578
714 for ax in self.axes:
579 for ax in self.axes:
715 ax.clear()
580 ax.clear()
716 ax.firsttime = True
581 ax.firsttime = True
717 if ax.cbar:
582 if ax.cbar:
718 ax.cbar.remove()
583 ax.cbar.remove()
719
584
720 def __plot(self):
585 def __plot(self):
721 '''
586 '''
722 Main function to plot, format and save figures
587 Main function to plot, format and save figures
723 '''
588 '''
724
589
725 #try:
590 try:
726 self.plot()
591 self.plot()
727 self.format()
592 self.format()
728 #except Exception as e:
593 except Exception as e:
729 # log.warning('{} Plot could not be updated... check data'.format(
594 log.warning('{} Plot could not be updated... check data'.format(
730 # self.CODE), self.name)
595 self.CODE), self.name)
731 # log.error(str(e), '')
596 log.error(str(e), '')
732 # return
597 return
733
598
734 for n, fig in enumerate(self.figures):
599 for n, fig in enumerate(self.figures):
735 if self.nrows == 0 or self.nplots == 0:
600 if self.nrows == 0 or self.nplots == 0:
736 log.warning('No data', self.name)
601 log.warning('No data', self.name)
737 fig.text(0.5, 0.5, 'No Data', fontsize='large', ha='center')
602 fig.text(0.5, 0.5, 'No Data', fontsize='large', ha='center')
738 fig.canvas.manager.set_window_title(self.CODE)
603 fig.canvas.manager.set_window_title(self.CODE)
739 continue
604 continue
740
605
741 fig.tight_layout()
606 fig.tight_layout()
742 fig.canvas.manager.set_window_title('{} - {}'.format(self.title,
607 fig.canvas.manager.set_window_title('{} - {}'.format(self.title,
743 self.getDateTime(self.data.max_time).strftime('%Y/%m/%d')))
608 self.getDateTime(self.data.max_time).strftime('%Y/%m/%d')))
744 fig.canvas.draw()
609 fig.canvas.draw()
745
610
746 if self.save:
611 if self.save:
612 self.save_figure(n)
613
614 if self.plot_server:
615 self.send_to_server()
616 # t = Thread(target=self.send_to_server)
617 # t.start()
618
619 def save_figure(self, n):
620 '''
621 '''
622
623 if self.save_counter < self.save_period:
624 self.save_counter += 1
625 return
626
627 self.save_counter = 1
628
629 fig = self.figures[n]
747
630
748 if self.save_labels:
631 if self.save_labels:
749 labels = self.save_labels
632 labels = self.save_labels
750 else:
633 else:
751 labels = list(range(self.nrows))
634 labels = list(range(self.nrows))
752
635
753 if self.oneFigure:
636 if self.oneFigure:
754 label = ''
637 label = ''
755 else:
638 else:
756 label = '-{}'.format(labels[n])
639 label = '-{}'.format(labels[n])
757 figname = os.path.join(
640 figname = os.path.join(
758 self.save,
641 self.save,
759 self.CODE,
642 self.CODE,
760 '{}{}_{}.png'.format(
643 '{}{}_{}.png'.format(
761 self.CODE,
644 self.CODE,
762 label,
645 label,
763 self.getDateTime(self.data.max_time).strftime(
646 self.getDateTime(self.data.max_time).strftime(
764 '%Y%m%d_%H%M%S'),
647 '%Y%m%d_%H%M%S'
648 ),
765 )
649 )
766 )
650 )
767 log.log('Saving figure: {}'.format(figname), self.name)
651 log.log('Saving figure: {}'.format(figname), self.name)
768 if not os.path.isdir(os.path.dirname(figname)):
652 if not os.path.isdir(os.path.dirname(figname)):
769 os.makedirs(os.path.dirname(figname))
653 os.makedirs(os.path.dirname(figname))
770 fig.savefig(figname)
654 fig.savefig(figname)
771
655
656 if self.realtime:
657 figname = os.path.join(
658 self.save,
659 '{}{}_{}.png'.format(
660 self.CODE,
661 label,
662 self.getDateTime(self.data.min_time).strftime(
663 '%Y%m%d'
664 ),
665 )
666 )
667 fig.savefig(figname)
668
669 def send_to_server(self):
670 '''
671 '''
672
673 if self.sender_counter < self.sender_period:
674 self.sender_counter += 1
675
676 self.sender_counter = 1
677
678 retries = 2
679 while True:
680 self.socket.send_string(self.data.jsonify())
681 socks = dict(self.poll.poll(5000))
682 if socks.get(self.socket) == zmq.POLLIN:
683 reply = self.socket.recv_string()
684 if reply == 'ok':
685 log.log("Response from server ok", self.name)
686 break
687 else:
688 log.warning(
689 "Malformed reply from server: {}".format(reply), self.name)
690
691 else:
692 log.warning(
693 "No response from server, retrying...", self.name)
694 self.socket.setsockopt(zmq.LINGER, 0)
695 self.socket.close()
696 self.poll.unregister(self.socket)
697 retries -= 1
698 if retries == 0:
699 log.error(
700 "Server seems to be offline, abandoning", self.name)
701 self.socket = self.context.socket(zmq.REQ)
702 self.socket.connect(self.plot_server)
703 self.poll.register(self.socket, zmq.POLLIN)
704 time.sleep(1)
705 break
706 self.socket = self.context.socket(zmq.REQ)
707 self.socket.connect(self.plot_server)
708 self.poll.register(self.socket, zmq.POLLIN)
709 time.sleep(0.5)
710
711 def setup(self):
712 '''
713 This method should be implemented in the child class, the following
714 attributes should be set:
715
716 self.nrows: number of rows
717 self.ncols: number of cols
718 self.nplots: number of plots (channels or pairs)
719 self.ylabel: label for Y axes
720 self.titles: list of axes title
721
722 '''
723 raise NotImplementedError
724
772 def plot(self):
725 def plot(self):
773 '''
726 '''
774 Must be defined in the child class
727 Must be defined in the child class
775 '''
728 '''
776 raise NotImplementedError
729 raise NotImplementedError
777
730
778 def run(self, dataOut, **kwargs):
731 def run(self, dataOut, **kwargs):
779
732
780 if dataOut.error:
733 if dataOut.error:
781 coerce = True
734 coerce = True
782 else:
735 else:
783 coerce = False
736 coerce = False
784
737
785 if self.isConfig is False:
738 if self.isConfig is False:
786 self.__setup(**kwargs)
739 self.__setup(**kwargs)
787 self.data.setup()
740 self.data.setup()
788 self.isConfig = True
741 self.isConfig = True
742 if self.plot_server:
743 self.context = zmq.Context()
744 self.socket = self.context.socket(zmq.REQ)
745 self.socket.connect(self.plot_server)
746 self.poll = zmq.Poller()
747 self.poll.register(self.socket, zmq.POLLIN)
789
748
790 if dataOut.type == 'Parameters':
749 if dataOut.type == 'Parameters':
791 tm = dataOut.utctimeInit
750 tm = dataOut.utctimeInit
792 else:
751 else:
793 tm = dataOut.utctime
752 tm = dataOut.utctime
794
753
795 if dataOut.useLocalTime:
754 if dataOut.useLocalTime:
796 if not self.localtime:
755 if not self.localtime:
797 tm += time.timezone
756 tm += time.timezone
798 else:
757 else:
799 if self.localtime:
758 if self.localtime:
800 tm -= time.timezone
759 tm -= time.timezone
801
760
802 if self.data and (tm - self.data.min_time) >= self.xrange*60*60:
761 if self.data and (tm - self.data.min_time) >= self.xrange*60*60:
803 self.__plot()
762 self.__plot()
804 self.data.setup()
763 self.data.setup()
805 self.clear_figures()
764 self.clear_figures()
806
765
807 self.data.update(dataOut, tm)
766 self.data.update(dataOut, tm)
808
767
809 if self.isPlotConfig is False:
768 if self.isPlotConfig is False:
810 self.__setup_plot()
769 self.__setup_plot()
811 self.isPlotConfig = True
770 self.isPlotConfig = True
812
771
813 if self.realtime:
772 if self.realtime:
814 self.__plot()
773 self.__plot()
815 else:
774 else:
816 self.__throttle_plot(self.__plot, coerce=coerce)
775 self.__throttle_plot(self.__plot, coerce=coerce)
817
776
818 figpause(0.001)
777 figpause(0.001)
819
778
820 def close(self):
779 def close(self):
821
780
822 if self.data and self.pause:
781 if self.data and self.pause:
823 figpause(10)
782 figpause(10)
824
783
@@ -1,390 +1,390
1 '''
1 '''
2 Updated for multiprocessing
2 Updated for multiprocessing
3 Author : Sergio Cortez
3 Author : Sergio Cortez
4 Jan 2018
4 Jan 2018
5 Abstract:
5 Abstract:
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9
9
10 Based on:
10 Based on:
11 $Author: murco $
11 $Author: murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 '''
13 '''
14
14
15 import os
15 import inspect
16 import inspect
16 import zmq
17 import zmq
17 import time
18 import time
18 import pickle
19 import pickle
19 import os
20 from multiprocessing import Process
20 from multiprocessing import Process
21 from zmq.utils.monitor import recv_monitor_message
21 from zmq.utils.monitor import recv_monitor_message
22
22
23 from schainpy.utils import log
23 from schainpy.utils import log
24
24
25
25
26 class ProcessingUnit(object):
26 class ProcessingUnit(object):
27
27
28 """
28 """
29 Update - Jan 2018 - MULTIPROCESSING
29 Update - Jan 2018 - MULTIPROCESSING
30 All the "call" methods present in the previous base were removed.
30 All the "call" methods present in the previous base were removed.
31 The majority of operations are independant processes, thus
31 The majority of operations are independant processes, thus
32 the decorator is in charge of communicate the operation processes
32 the decorator is in charge of communicate the operation processes
33 with the proccessing unit via IPC.
33 with the proccessing unit via IPC.
34
34
35 The constructor does not receive any argument. The remaining methods
35 The constructor does not receive any argument. The remaining methods
36 are related with the operations to execute.
36 are related with the operations to execute.
37
37
38
38
39 """
39 """
40
40
41 def __init__(self):
41 def __init__(self):
42
42
43 self.dataIn = None
43 self.dataIn = None
44 self.dataOut = None
44 self.dataOut = None
45 self.isConfig = False
45 self.isConfig = False
46 self.operations = []
46 self.operations = []
47 self.plots = []
47 self.plots = []
48
48
49 def getAllowedArgs(self):
49 def getAllowedArgs(self):
50 if hasattr(self, '__attrs__'):
50 if hasattr(self, '__attrs__'):
51 return self.__attrs__
51 return self.__attrs__
52 else:
52 else:
53 return inspect.getargspec(self.run).args
53 return inspect.getargspec(self.run).args
54
54
55 def addOperation(self, conf, operation):
55 def addOperation(self, conf, operation):
56 """
56 """
57 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
57 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
58 posses the id of the operation process (IPC purposes)
58 posses the id of the operation process (IPC purposes)
59
59
60 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
60 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
61 identificador asociado a este objeto.
61 identificador asociado a este objeto.
62
62
63 Input:
63 Input:
64
64
65 object : objeto de la clase "Operation"
65 object : objeto de la clase "Operation"
66
66
67 Return:
67 Return:
68
68
69 objId : identificador del objeto, necesario para comunicar con master(procUnit)
69 objId : identificador del objeto, necesario para comunicar con master(procUnit)
70 """
70 """
71
71
72 self.operations.append(
72 self.operations.append(
73 (operation, conf.type, conf.id, conf.getKwargs()))
73 (operation, conf.type, conf.id, conf.getKwargs()))
74
74
75 if 'plot' in self.name.lower():
75 if 'plot' in self.name.lower():
76 self.plots.append(operation.CODE)
76 self.plots.append(operation.CODE)
77
77
78 def getOperationObj(self, objId):
78 def getOperationObj(self, objId):
79
79
80 if objId not in list(self.operations.keys()):
80 if objId not in list(self.operations.keys()):
81 return None
81 return None
82
82
83 return self.operations[objId]
83 return self.operations[objId]
84
84
85 def operation(self, **kwargs):
85 def operation(self, **kwargs):
86 """
86 """
87 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
87 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
88 atributos del objeto dataOut
88 atributos del objeto dataOut
89
89
90 Input:
90 Input:
91
91
92 **kwargs : Diccionario de argumentos de la funcion a ejecutar
92 **kwargs : Diccionario de argumentos de la funcion a ejecutar
93 """
93 """
94
94
95 raise NotImplementedError
95 raise NotImplementedError
96
96
97 def setup(self):
97 def setup(self):
98
98
99 raise NotImplementedError
99 raise NotImplementedError
100
100
101 def run(self):
101 def run(self):
102
102
103 raise NotImplementedError
103 raise NotImplementedError
104
104
105 def close(self):
105 def close(self):
106
106
107 return
107 return
108
108
109
109
110 class Operation(object):
110 class Operation(object):
111
111
112 """
112 """
113 Update - Jan 2018 - MULTIPROCESSING
113 Update - Jan 2018 - MULTIPROCESSING
114
114
115 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
115 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
116 The constructor doe snot receive any argument, neither the baseclass.
116 The constructor doe snot receive any argument, neither the baseclass.
117
117
118
118
119 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
119 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
120 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
120 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
121 acumulacion dentro de esta clase
121 acumulacion dentro de esta clase
122
122
123 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
123 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
124
124
125 """
125 """
126
126
127 def __init__(self):
127 def __init__(self):
128
128
129 self.id = None
129 self.id = None
130 self.isConfig = False
130 self.isConfig = False
131
131
132 if not hasattr(self, 'name'):
132 if not hasattr(self, 'name'):
133 self.name = self.__class__.__name__
133 self.name = self.__class__.__name__
134
134
135 def getAllowedArgs(self):
135 def getAllowedArgs(self):
136 if hasattr(self, '__attrs__'):
136 if hasattr(self, '__attrs__'):
137 return self.__attrs__
137 return self.__attrs__
138 else:
138 else:
139 return inspect.getargspec(self.run).args
139 return inspect.getargspec(self.run).args
140
140
141 def setup(self):
141 def setup(self):
142
142
143 self.isConfig = True
143 self.isConfig = True
144
144
145 raise NotImplementedError
145 raise NotImplementedError
146
146
147 def run(self, dataIn, **kwargs):
147 def run(self, dataIn, **kwargs):
148 """
148 """
149 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
149 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
150 atributos del objeto dataIn.
150 atributos del objeto dataIn.
151
151
152 Input:
152 Input:
153
153
154 dataIn : objeto del tipo JROData
154 dataIn : objeto del tipo JROData
155
155
156 Return:
156 Return:
157
157
158 None
158 None
159
159
160 Affected:
160 Affected:
161 __buffer : buffer de recepcion de datos.
161 __buffer : buffer de recepcion de datos.
162
162
163 """
163 """
164 if not self.isConfig:
164 if not self.isConfig:
165 self.setup(**kwargs)
165 self.setup(**kwargs)
166
166
167 raise NotImplementedError
167 raise NotImplementedError
168
168
169 def close(self):
169 def close(self):
170
170
171 return
171 return
172
172
173
173
174 def MPDecorator(BaseClass):
174 def MPDecorator(BaseClass):
175 """
175 """
176 Multiprocessing class decorator
176 Multiprocessing class decorator
177
177
178 This function add multiprocessing features to a BaseClass. Also, it handle
178 This function add multiprocessing features to a BaseClass. Also, it handle
179 the communication beetween processes (readers, procUnits and operations).
179 the communication beetween processes (readers, procUnits and operations).
180 """
180 """
181
181
182 class MPClass(BaseClass, Process):
182 class MPClass(BaseClass, Process):
183
183
184 def __init__(self, *args, **kwargs):
184 def __init__(self, *args, **kwargs):
185 super(MPClass, self).__init__()
185 super(MPClass, self).__init__()
186 Process.__init__(self)
186 Process.__init__(self)
187 self.operationKwargs = {}
187 self.operationKwargs = {}
188 self.args = args
188 self.args = args
189 self.kwargs = kwargs
189 self.kwargs = kwargs
190 self.sender = None
190 self.sender = None
191 self.receiver = None
191 self.receiver = None
192 self.name = BaseClass.__name__
192 self.name = BaseClass.__name__
193 if 'plot' in self.name.lower() and not self.name.endswith('_'):
193 if 'plot' in self.name.lower() and not self.name.endswith('_'):
194 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
194 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
195 self.start_time = time.time()
195 self.start_time = time.time()
196
196
197 if len(self.args) is 3:
197 if len(self.args) is 3:
198 self.typeProc = "ProcUnit"
198 self.typeProc = "ProcUnit"
199 self.id = args[0]
199 self.id = args[0]
200 self.inputId = args[1]
200 self.inputId = args[1]
201 self.project_id = args[2]
201 self.project_id = args[2]
202 elif len(self.args) is 2:
202 elif len(self.args) is 2:
203 self.id = args[0]
203 self.id = args[0]
204 self.inputId = args[0]
204 self.inputId = args[0]
205 self.project_id = args[1]
205 self.project_id = args[1]
206 self.typeProc = "Operation"
206 self.typeProc = "Operation"
207
207
208 def subscribe(self):
208 def subscribe(self):
209 '''
209 '''
210 This function create a socket to receive objects from the
210 This function create a socket to receive objects from the
211 topic `inputId`.
211 topic `inputId`.
212 '''
212 '''
213
213
214 c = zmq.Context()
214 c = zmq.Context()
215 self.receiver = c.socket(zmq.SUB)
215 self.receiver = c.socket(zmq.SUB)
216 self.receiver.connect(
216 self.receiver.connect(
217 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
217 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
218 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
218 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
219
219
220 def listen(self):
220 def listen(self):
221 '''
221 '''
222 This function waits for objects and deserialize using pickle
222 This function waits for objects and deserialize using pickle
223 '''
223 '''
224
224
225 data = pickle.loads(self.receiver.recv_multipart()[1])
225 data = pickle.loads(self.receiver.recv_multipart()[1])
226
226
227 return data
227 return data
228
228
229 def set_publisher(self):
229 def set_publisher(self):
230 '''
230 '''
231 This function create a socket for publishing purposes.
231 This function create a socket for publishing purposes.
232 '''
232 '''
233
233
234 time.sleep(1)
234 time.sleep(1)
235 c = zmq.Context()
235 c = zmq.Context()
236 self.sender = c.socket(zmq.PUB)
236 self.sender = c.socket(zmq.PUB)
237 self.sender.connect(
237 self.sender.connect(
238 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
238 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
239
239
240 def publish(self, data, id):
240 def publish(self, data, id):
241 '''
241 '''
242 This function publish an object, to a specific topic.
242 This function publish an object, to a specific topic.
243 '''
243 '''
244 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
244 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
245
245
246 def runReader(self):
246 def runReader(self):
247 '''
247 '''
248 Run fuction for read units
248 Run fuction for read units
249 '''
249 '''
250 while True:
250 while True:
251
251
252 BaseClass.run(self, **self.kwargs)
252 BaseClass.run(self, **self.kwargs)
253
253
254 for op, optype, opId, kwargs in self.operations:
254 for op, optype, opId, kwargs in self.operations:
255 if optype == 'self' and not self.dataOut.flagNoData:
255 if optype == 'self' and not self.dataOut.flagNoData:
256 op(**kwargs)
256 op(**kwargs)
257 elif optype == 'other' and not self.dataOut.flagNoData:
257 elif optype == 'other' and not self.dataOut.flagNoData:
258 self.dataOut = op.run(self.dataOut, **self.kwargs)
258 self.dataOut = op.run(self.dataOut, **self.kwargs)
259 elif optype == 'external':
259 elif optype == 'external':
260 self.publish(self.dataOut, opId)
260 self.publish(self.dataOut, opId)
261
261
262 if self.dataOut.flagNoData and not self.dataOut.error:
262 if self.dataOut.flagNoData and not self.dataOut.error:
263 continue
263 continue
264
264
265 self.publish(self.dataOut, self.id)
265 self.publish(self.dataOut, self.id)
266
266
267 if self.dataOut.error:
267 if self.dataOut.error:
268 log.error(self.dataOut.error, self.name)
268 log.error(self.dataOut.error, self.name)
269 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
269 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
270 break
270 break
271
271
272 time.sleep(1)
272 time.sleep(1)
273
273
274 def runProc(self):
274 def runProc(self):
275 '''
275 '''
276 Run function for proccessing units
276 Run function for proccessing units
277 '''
277 '''
278
278
279 while True:
279 while True:
280 self.dataIn = self.listen()
280 self.dataIn = self.listen()
281
281
282 if self.dataIn.flagNoData and self.dataIn.error is None:
282 if self.dataIn.flagNoData and self.dataIn.error is None:
283 continue
283 continue
284
284
285 BaseClass.run(self, **self.kwargs)
285 BaseClass.run(self, **self.kwargs)
286
286
287 if self.dataIn.error:
287 if self.dataIn.error:
288 self.dataOut.error = self.dataIn.error
288 self.dataOut.error = self.dataIn.error
289 self.dataOut.flagNoData = True
289 self.dataOut.flagNoData = True
290
290
291 for op, optype, opId, kwargs in self.operations:
291 for op, optype, opId, kwargs in self.operations:
292 if optype == 'self' and not self.dataOut.flagNoData:
292 if optype == 'self' and not self.dataOut.flagNoData:
293 op(**kwargs)
293 op(**kwargs)
294 elif optype == 'other' and not self.dataOut.flagNoData:
294 elif optype == 'other' and not self.dataOut.flagNoData:
295 self.dataOut = op.run(self.dataOut, **kwargs)
295 self.dataOut = op.run(self.dataOut, **kwargs)
296 elif optype == 'external' and not self.dataOut.flagNoData:
296 elif optype == 'external' and not self.dataOut.flagNoData:
297 self.publish(self.dataOut, opId)
297 self.publish(self.dataOut, opId)
298
298
299 if not self.dataOut.flagNoData or self.dataOut.error:
299 if not self.dataOut.flagNoData or self.dataOut.error:
300 self.publish(self.dataOut, self.id)
300 self.publish(self.dataOut, self.id)
301 for op, optype, opId, kwargs in self.operations:
301 for op, optype, opId, kwargs in self.operations:
302 if optype == 'self' and self.dataOut.error:
302 if optype == 'self' and self.dataOut.error:
303 op(**kwargs)
303 op(**kwargs)
304 elif optype == 'other' and self.dataOut.error:
304 elif optype == 'other' and self.dataOut.error:
305 self.dataOut = op.run(self.dataOut, **kwargs)
305 self.dataOut = op.run(self.dataOut, **kwargs)
306 elif optype == 'external' and self.dataOut.error:
306 elif optype == 'external' and self.dataOut.error:
307 self.publish(self.dataOut, opId)
307 self.publish(self.dataOut, opId)
308
308
309 if self.dataIn.error:
309 if self.dataIn.error:
310 break
310 break
311
311
312 time.sleep(1)
312 time.sleep(1)
313
313
314 def runOp(self):
314 def runOp(self):
315 '''
315 '''
316 Run function for external operations (this operations just receive data
316 Run function for external operations (this operations just receive data
317 ex: plots, writers, publishers)
317 ex: plots, writers, publishers)
318 '''
318 '''
319
319
320 while True:
320 while True:
321
321
322 dataOut = self.listen()
322 dataOut = self.listen()
323
323
324 BaseClass.run(self, dataOut, **self.kwargs)
324 BaseClass.run(self, dataOut, **self.kwargs)
325
325
326 if dataOut.error:
326 if dataOut.error:
327 break
327 break
328
328
329 time.sleep(1)
329 time.sleep(1)
330
330
331 def run(self):
331 def run(self):
332 if self.typeProc is "ProcUnit":
332 if self.typeProc is "ProcUnit":
333
333
334 if self.inputId is not None:
334 if self.inputId is not None:
335
335
336 self.subscribe()
336 self.subscribe()
337
337
338 self.set_publisher()
338 self.set_publisher()
339
339
340 if 'Reader' not in BaseClass.__name__:
340 if 'Reader' not in BaseClass.__name__:
341 self.runProc()
341 self.runProc()
342 else:
342 else:
343 self.runReader()
343 self.runReader()
344
344
345 elif self.typeProc is "Operation":
345 elif self.typeProc is "Operation":
346
346
347 self.subscribe()
347 self.subscribe()
348 self.runOp()
348 self.runOp()
349
349
350 else:
350 else:
351 raise ValueError("Unknown type")
351 raise ValueError("Unknown type")
352
352
353 self.close()
353 self.close()
354
354
355 def event_monitor(self, monitor):
355 def event_monitor(self, monitor):
356
356
357 events = {}
357 events = {}
358
358
359 for name in dir(zmq):
359 for name in dir(zmq):
360 if name.startswith('EVENT_'):
360 if name.startswith('EVENT_'):
361 value = getattr(zmq, name)
361 value = getattr(zmq, name)
362 events[value] = name
362 events[value] = name
363
363
364 while monitor.poll():
364 while monitor.poll():
365 evt = recv_monitor_message(monitor)
365 evt = recv_monitor_message(monitor)
366 if evt['event'] == 32:
366 if evt['event'] == 32:
367 self.connections += 1
367 self.connections += 1
368 if evt['event'] == 512:
368 if evt['event'] == 512:
369 pass
369 pass
370
370
371 evt.update({'description': events[evt['event']]})
371 evt.update({'description': events[evt['event']]})
372
372
373 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
373 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
374 break
374 break
375 monitor.close()
375 monitor.close()
376 print('event monitor thread done!')
376 print('event monitor thread done!')
377
377
378 def close(self):
378 def close(self):
379
379
380 BaseClass.close(self)
380 BaseClass.close(self)
381
381
382 if self.sender:
382 if self.sender:
383 self.sender.close()
383 self.sender.close()
384
384
385 if self.receiver:
385 if self.receiver:
386 self.receiver.close()
386 self.receiver.close()
387
387
388 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
388 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
389
389
390 return MPClass
390 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now