##// END OF EJS Templates
Fix SendToWeb operation
jespinoza -
r911:7e8096f3f048
parent child
Show More
@@ -1,428 +1,426
1 1
2 2 import os
3 3 import zmq
4 4 import time
5 5 import numpy
6 6 import datetime
7 7 import numpy as np
8 8 import matplotlib.pyplot as plt
9 9 from mpl_toolkits.axes_grid1 import make_axes_locatable
10 10 from matplotlib.ticker import FuncFormatter, LinearLocator
11 11 from multiprocessing import Process
12 12
13 13 from schainpy.model.proc.jroproc_base import Operation
14 14
15 15 #plt.ion()
16 16
17 17 func = lambda x, pos: ('%s') %(datetime.datetime.utcfromtimestamp(x).strftime('%H:%M'))
18 18
19 19 d1970 = datetime.datetime(1970,1,1)
20 20
21 21 class PlotData(Operation, Process):
22 22
23 23 CODE = 'Figure'
24 24 colormap = 'jet'
25 25 CONFLATE = True
26 26 __MAXNUMX = 80
27 27 __MAXNUMY = 80
28 28 __missing = 1E30
29 29
30 30 def __init__(self, **kwargs):
31 31
32 32 Operation.__init__(self, plot=True, **kwargs)
33 33 Process.__init__(self)
34 34 self.kwargs['code'] = self.CODE
35 35 self.mp = False
36 36 self.dataOut = None
37 37 self.isConfig = False
38 38 self.figure = None
39 39 self.axes = []
40 40 self.localtime = kwargs.pop('localtime', True)
41 41 self.show = kwargs.get('show', True)
42 42 self.save = kwargs.get('save', False)
43 43 self.colormap = kwargs.get('colormap', self.colormap)
44 44 self.showprofile = kwargs.get('showprofile', True)
45 45 self.title = kwargs.get('wintitle', '')
46 46 self.xaxis = kwargs.get('xaxis', 'time')
47 47 self.zmin = kwargs.get('zmin', None)
48 48 self.zmax = kwargs.get('zmax', None)
49 49 self.xmin = kwargs.get('xmin', None)
50 50 self.xmax = kwargs.get('xmax', None)
51 51 self.xrange = kwargs.get('xrange', 24)
52 52 self.ymin = kwargs.get('ymin', None)
53 53 self.ymax = kwargs.get('ymax', None)
54 54 self.throttle_value = 1
55 55 def fill_gaps(self, x_buffer, y_buffer, z_buffer):
56 56
57 57 if x_buffer.shape[0] < 2:
58 58 return x_buffer, y_buffer, z_buffer
59 59
60 60 deltas = x_buffer[1:] - x_buffer[0:-1]
61 61 x_median = np.median(deltas)
62 62
63 63 index = np.where(deltas > 5*x_median)
64 64
65 65 if len(index[0]) != 0:
66 66 z_buffer[::, index[0], ::] = self.__missing
67 67 z_buffer = np.ma.masked_inside(z_buffer,
68 68 0.99*self.__missing,
69 69 1.01*self.__missing)
70 70
71 71 return x_buffer, y_buffer, z_buffer
72 72
73 73 def decimate(self):
74 74
75 75 # dx = int(len(self.x)/self.__MAXNUMX) + 1
76 76 dy = int(len(self.y)/self.__MAXNUMY) + 1
77 77
78 78 # x = self.x[::dx]
79 79 x = self.x
80 80 y = self.y[::dy]
81 81 z = self.z[::, ::, ::dy]
82 82
83 83 return x, y, z
84 84
85 85 def __plot(self):
86 86
87 87 print 'plotting...{}'.format(self.CODE)
88 88
89 if self.show:
90 self.figure.show()
91
89 92 self.plot()
90 93 self.figure.suptitle('{} {} - Date:{}'.format(self.title, self.CODE.upper(),
91 94 datetime.datetime.utcfromtimestamp(self.max_time).strftime('%y/%m/%d %H:%M:%S')))
92 95
93 96 if self.save:
94 97 figname = os.path.join(self.save, '{}_{}.png'.format(self.CODE,
95 98 datetime.datetime.utcfromtimestamp(self.times[0]).strftime('%y%m%d_%H%M%S')))
96 99 print 'Saving figure: {}'.format(figname)
97 100 self.figure.savefig(figname)
98 101
99 102 self.figure.canvas.draw()
100 103
101 104 def plot(self):
102 105
103 106 print 'plotting...{}'.format(self.CODE.upper())
104 107 return
105 108
106 109 def run(self):
107 110
108 111 print '[Starting] {}'.format(self.name)
109 112 context = zmq.Context()
110 113 receiver = context.socket(zmq.SUB)
111 114 receiver.setsockopt(zmq.SUBSCRIBE, '')
112 115 receiver.setsockopt(zmq.CONFLATE, self.CONFLATE)
113 116 receiver.connect("ipc:///tmp/zmq.plots")
114 117
115 118 while True:
116 119 try:
117 120 #if True:
118 121 self.data = receiver.recv_pyobj(flags=zmq.NOBLOCK)
119 122 self.dataOut = self.data['dataOut']
120 123 self.times = self.data['times']
121 124 self.times.sort()
122 125 self.throttle_value = self.data['throttle']
123 126 self.min_time = self.times[0]
124 127 self.max_time = self.times[-1]
125 128
126 129 if self.isConfig is False:
127 130 self.setup()
128 131 self.isConfig = True
129 132 self.__plot()
130 133
131 134 if self.data['ENDED'] is True:
132 135 # self.__plot()
133 136 self.isConfig = False
134 137
135 138 except zmq.Again as e:
136 139 print 'Waiting for data...'
137 140 plt.pause(self.throttle_value)
138 141 # time.sleep(3)
139 142
140 143 def close(self):
141 144 if self.dataOut:
142 145 self._plot()
143 146
144 147
145 148 class PlotSpectraData(PlotData):
146 149
147 150 CODE = 'spc'
148 151 colormap = 'jro'
149 152 CONFLATE = False
150 153 def setup(self):
151 154
152 155 ncolspan = 1
153 156 colspan = 1
154 157 self.ncols = int(numpy.sqrt(self.dataOut.nChannels)+0.9)
155 158 self.nrows = int(self.dataOut.nChannels*1./self.ncols + 0.9)
156 159 self.width = 3.6*self.ncols
157 160 self.height = 3.2*self.nrows
158 161 if self.showprofile:
159 162 ncolspan = 3
160 163 colspan = 2
161 164 self.width += 1.2*self.ncols
162 165
163 166 self.ylabel = 'Range [Km]'
164 167 self.titles = ['Channel {}'.format(x) for x in self.dataOut.channelList]
165 168
166 169 if self.figure is None:
167 170 self.figure = plt.figure(figsize=(self.width, self.height),
168 171 edgecolor='k',
169 172 facecolor='w')
170 173 else:
171 174 self.figure.clf()
172 175
173 176 n = 0
174 177 for y in range(self.nrows):
175 178 for x in range(self.ncols):
176 179 if n >= self.dataOut.nChannels:
177 180 break
178 181 ax = plt.subplot2grid((self.nrows, self.ncols*ncolspan), (y, x*ncolspan), 1, colspan)
179 182 if self.showprofile:
180 183 ax.ax_profile = plt.subplot2grid((self.nrows, self.ncols*ncolspan), (y, x*ncolspan+colspan), 1, 1)
181 184
182 185 ax.firsttime = True
183 186 self.axes.append(ax)
184 187 n += 1
185 188
186 189 self.figure.subplots_adjust(left=0.1, right=0.95, bottom=0.15, top=0.85, wspace=0.9, hspace=0.5)
187 self.figure.show()
188 190
189 191 def plot(self):
190 192
191 193 if self.xaxis == "frequency":
192 194 x = self.dataOut.getFreqRange(1)/1000.
193 195 xlabel = "Frequency (kHz)"
194 196 elif self.xaxis == "time":
195 197 x = self.dataOut.getAcfRange(1)
196 198 xlabel = "Time (ms)"
197 199 else:
198 200 x = self.dataOut.getVelRange(1)
199 201 xlabel = "Velocity (m/s)"
200 202
201 203 y = self.dataOut.getHeiRange()
202 204 z = self.data[self.CODE]
203 205
204 206 for n, ax in enumerate(self.axes):
205 207
206 208 if ax.firsttime:
207 209 self.xmax = self.xmax if self.xmax else np.nanmax(x)
208 210 self.xmin = self.xmin if self.xmin else -self.xmax
209 211 self.ymin = self.ymin if self.ymin else np.nanmin(y)
210 212 self.ymax = self.ymax if self.ymax else np.nanmax(y)
211 213 self.zmin = self.zmin if self.zmin else np.nanmin(z)
212 214 self.zmax = self.zmax if self.zmax else np.nanmax(z)
213 215 ax.plot = ax.pcolormesh(x, y, z[n].T,
214 216 vmin=self.zmin,
215 217 vmax=self.zmax,
216 218 cmap=plt.get_cmap(self.colormap)
217 219 )
218 220 divider = make_axes_locatable(ax)
219 221 cax = divider.new_horizontal(size='3%', pad=0.05)
220 222 self.figure.add_axes(cax)
221 223 plt.colorbar(ax.plot, cax)
222 224
223 225 ax.set_xlim(self.xmin, self.xmax)
224 226 ax.set_ylim(self.ymin, self.ymax)
225 227
226 228 ax.xaxis.set_major_locator(LinearLocator(5))
227 229 #ax.yaxis.set_major_locator(LinearLocator(4))
228 230
229 231 ax.set_ylabel(self.ylabel)
230 232 ax.set_xlabel(xlabel)
231 233
232 234 ax.firsttime = False
233 235
234 236 if self.showprofile:
235 237 ax.plot_profile= ax.ax_profile.plot(self.data['rti'][self.max_time][n], y)[0]
236 238 ax.ax_profile.set_xlim(self.zmin, self.zmax)
237 239 ax.ax_profile.set_ylim(self.ymin, self.ymax)
238 240 ax.ax_profile.set_xlabel('dB')
239 241 ax.ax_profile.grid(b=True, axis='x')
240 242 ax.plot_noise = ax.ax_profile.plot(numpy.repeat(self.data['noise'][self.max_time][n], len(y)), y,
241 243 color="k", linestyle="dashed", lw=2)[0]
242 244 [tick.set_visible(False) for tick in ax.ax_profile.get_yticklabels()]
243 245 else:
244 246 ax.plot.set_array(z[n].T.ravel())
245 247 if self.showprofile:
246 248 ax.plot_profile.set_data(self.data['rti'][self.max_time][n], y)
247 249 ax.plot_noise.set_data(numpy.repeat(self.data['noise'][self.max_time][n], len(y)), y)
248 250
249 251 ax.set_title('{} - Noise: {:.2f} dB'.format(self.titles[n], self.data['noise'][self.max_time][n]),
250 252 size=8)
251 253
252 254 class PlotRTIData(PlotData):
253 255
254 256 CODE = 'rti'
255 257 colormap = 'jro'
256 258
257 259 def setup(self):
258 260 self.ncols = 1
259 261 self.nrows = self.dataOut.nChannels
260 262 self.width = 10
261 263 self.height = 2.2*self.nrows
262 264 if self.nrows==1:
263 265 self.height += 1
264 266 self.ylabel = 'Range [Km]'
265 267 self.titles = ['Channel {}'.format(x) for x in self.dataOut.channelList]
266 268
267 269 if self.figure is None:
268 270 self.figure = plt.figure(figsize=(self.width, self.height),
269 271 edgecolor='k',
270 272 facecolor='w')
271 273 else:
272 274 self.figure.clf()
273 275 self.axes = []
274 276
275 277 for n in range(self.nrows):
276 278 ax = self.figure.add_subplot(self.nrows, self.ncols, n+1)
277 279 ax.firsttime = True
278 280 self.axes.append(ax)
279 281 self.figure.subplots_adjust(hspace=0.5)
280 self.figure.show()
281 282
282 283 def plot(self):
283 284
284 285 self.x = np.array(self.times)
285 286 self.y = self.dataOut.getHeiRange()
286 287 self.z = []
287 288
288 289 for ch in range(self.nrows):
289 290 self.z.append([self.data[self.CODE][t][ch] for t in self.times])
290 291
291 292 self.z = np.array(self.z)
292 293 for n, ax in enumerate(self.axes):
293 294
294 295 x, y, z = self.fill_gaps(*self.decimate())
295 296 xmin = self.min_time
296 297 xmax = xmin+self.xrange*60*60
297 298 if ax.firsttime:
298 299 self.ymin = self.ymin if self.ymin else np.nanmin(self.y)
299 300 self.ymax = self.ymax if self.ymax else np.nanmax(self.y)
300 301 self.zmin = self.zmin if self.zmin else np.nanmin(self.z)
301 302 self.zmax = self.zmax if self.zmax else np.nanmax(self.z)
302 303 plot = ax.pcolormesh(x, y, z[n].T,
303 304 vmin=self.zmin,
304 305 vmax=self.zmax,
305 306 cmap=plt.get_cmap(self.colormap)
306 307 )
307 308 divider = make_axes_locatable(ax)
308 309 cax = divider.new_horizontal(size='2%', pad=0.05)
309 310 self.figure.add_axes(cax)
310 311 plt.colorbar(plot, cax)
311 312 ax.set_ylim(self.ymin, self.ymax)
312 313 if self.xaxis == 'time':
313 314 ax.xaxis.set_major_formatter(FuncFormatter(func))
314 315 ax.xaxis.set_major_locator(LinearLocator(6))
315 316
316 317 # ax.yaxis.set_major_locator(LinearLocator(4))
317 318
318 319 ax.set_ylabel(self.ylabel)
319 320
320 321 # if self.xmin is None:
321 322 # xmin = self.min_time
322 323 # else:
323 324 # xmin = (datetime.datetime.combine(self.dataOut.datatime.date(),
324 325 # datetime.time(self.xmin, 0, 0))-d1970).total_seconds()
325 326
326 327 ax.set_xlim(xmin, xmax)
327 328 ax.firsttime = False
328 329 else:
329 330 ax.collections.remove(ax.collections[0])
330 331 ax.set_xlim(xmin, xmax)
331 332 plot = ax.pcolormesh(x, y, z[n].T,
332 333 vmin=self.zmin,
333 334 vmax=self.zmax,
334 335 cmap=plt.get_cmap(self.colormap)
335 336 )
336 337 ax.set_title('{} {}'.format(self.titles[n],
337 338 datetime.datetime.utcfromtimestamp(self.max_time).strftime('%y/%m/%d %H:%M:%S')),
338 339 size=8)
339 340
340 341
341 342 class PlotCOHData(PlotRTIData):
342 343
343 344 CODE = 'coh'
344 345
345 346 def setup(self):
346 347
347 348 self.ncols = 1
348 349 self.nrows = self.dataOut.nPairs
349 350 self.width = 10
350 351 self.height = 2.2*self.nrows
351 352 if self.nrows==1:
352 353 self.height += 1
353 354 self.ylabel = 'Range [Km]'
354 355 self.titles = ['Channels {}'.format(x) for x in self.dataOut.pairsList]
355 356
356 357 if self.figure is None:
357 358 self.figure = plt.figure(figsize=(self.width, self.height),
358 359 edgecolor='k',
359 360 facecolor='w')
360 361 else:
361 362 self.figure.clf()
362 363 self.axes = []
363 364
364 365 for n in range(self.nrows):
365 366 ax = self.figure.add_subplot(self.nrows, self.ncols, n+1)
366 367 ax.firsttime = True
367 368 self.axes.append(ax)
368 369
369 370 self.figure.subplots_adjust(hspace=0.5)
370 self.figure.show()
371 371
372 372 class PlotNoiseData(PlotData):
373 373 CODE = 'noise'
374 374
375 375 def setup(self):
376 376
377 377 self.ncols = 1
378 378 self.nrows = 1
379 379 self.width = 10
380 380 self.height = 3.2
381 381 self.ylabel = 'Intensity [dB]'
382 382 self.titles = ['Noise']
383 383
384 384 if self.figure is None:
385 385 self.figure = plt.figure(figsize=(self.width, self.height),
386 386 edgecolor='k',
387 387 facecolor='w')
388 388 else:
389 389 self.figure.clf()
390 390 self.axes = []
391 391
392 392 self.ax = self.figure.add_subplot(self.nrows, self.ncols, 1)
393 393 self.ax.firsttime = True
394 394
395 self.figure.show()
396
397 395 def plot(self):
398 396
399 397 x = self.times
400 398 xmin = self.min_time
401 399 xmax = xmin+self.xrange*60*60
402 400 if self.ax.firsttime:
403 401 for ch in self.dataOut.channelList:
404 402 y = [self.data[self.CODE][t][ch] for t in self.times]
405 403 self.ax.plot(x, y, lw=1, label='Ch{}'.format(ch))
406 404 self.ax.firsttime = False
407 405 self.ax.xaxis.set_major_formatter(FuncFormatter(func))
408 406 self.ax.xaxis.set_major_locator(LinearLocator(6))
409 407 self.ax.set_ylabel(self.ylabel)
410 408 plt.legend()
411 409 else:
412 410 for ch in self.dataOut.channelList:
413 411 y = [self.data[self.CODE][t][ch] for t in self.times]
414 412 self.ax.lines[ch].set_data(x, y)
415 413
416 414 self.ax.set_xlim(xmin, xmax)
417 415 self.ax.set_ylim(min(y)-5, max(y)+5)
418 416
419 417 class PlotSNRData(PlotRTIData):
420 418 CODE = 'snr'
421 419
422 420 class PlotDOPData(PlotRTIData):
423 421 CODE = 'dop'
424 422 colormap = 'jet'
425 423
426 424 class PlotPHASEData(PlotCOHData):
427 425 CODE = 'phase'
428 426 colormap = 'seismic'
@@ -1,420 +1,422
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import time
6 6 import json
7 7 import numpy
8 8 import paho.mqtt.client as mqtt
9 9 import zmq
10 10 import cPickle as pickle
11 11 import datetime
12 12 from zmq.utils.monitor import recv_monitor_message
13 13 from functools import wraps
14 14 from threading import Thread
15 15 from multiprocessing import Process
16 16
17 17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
18 18
19 19 MAXNUMX = 100
20 20 MAXNUMY = 100
21 21
22 22 class PrettyFloat(float):
23 23 def __repr__(self):
24 24 return '%.2f' % self
25 25
26 26 def roundFloats(obj):
27 27 if isinstance(obj, list):
28 28 return map(roundFloats, obj)
29 29 elif isinstance(obj, float):
30 30 return round(obj, 2)
31 31
32 32 def decimate(z):
33 33 # dx = int(len(self.x)/self.__MAXNUMX) + 1
34
34 35 dy = int(len(z[0])/MAXNUMY) + 1
36
35 37 return z[::, ::dy]
36 38
37 39 class throttle(object):
38 40 """Decorator that prevents a function from being called more than once every
39 41 time period.
40 42 To create a function that cannot be called more than once a minute, but
41 43 will sleep until it can be called:
42 44 @throttle(minutes=1)
43 45 def foo():
44 46 pass
45 47
46 48 for i in range(10):
47 49 foo()
48 50 print "This function has run %s times." % i
49 51 """
50 52
51 53 def __init__(self, seconds=0, minutes=0, hours=0):
52 54 self.throttle_period = datetime.timedelta(
53 55 seconds=seconds, minutes=minutes, hours=hours
54 56 )
55 57
56 58 self.time_of_last_call = datetime.datetime.min
57 59
58 60 def __call__(self, fn):
59 61 @wraps(fn)
60 62 def wrapper(*args, **kwargs):
61 63 now = datetime.datetime.now()
62 64 time_since_last_call = now - self.time_of_last_call
63 65 time_left = self.throttle_period - time_since_last_call
64 66
65 67 if time_left > datetime.timedelta(seconds=0):
66 68 return
67 69
68 70 self.time_of_last_call = datetime.datetime.now()
69 71 return fn(*args, **kwargs)
70 72
71 73 return wrapper
72 74
73 75
74 76 class PublishData(Operation):
75 77 """Clase publish."""
76 78
77 79 def __init__(self, **kwargs):
78 80 """Inicio."""
79 81 Operation.__init__(self, **kwargs)
80 82 self.isConfig = False
81 83 self.client = None
82 84 self.zeromq = None
83 85 self.mqtt = None
84 86
85 87 def on_disconnect(self, client, userdata, rc):
86 88 if rc != 0:
87 89 print("Unexpected disconnection.")
88 90 self.connect()
89 91
90 92 def connect(self):
91 93 print 'trying to connect'
92 94 try:
93 95 self.client.connect(
94 96 host=self.host,
95 97 port=self.port,
96 98 keepalive=60*10,
97 99 bind_address='')
98 100 self.client.loop_start()
99 101 # self.client.publish(
100 102 # self.topic + 'SETUP',
101 103 # json.dumps(setup),
102 104 # retain=True
103 105 # )
104 106 except:
105 107 print "MQTT Conection error."
106 108 self.client = False
107 109
108 110 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
109 111 self.counter = 0
110 112 self.topic = kwargs.get('topic', 'schain')
111 113 self.delay = kwargs.get('delay', 0)
112 114 self.plottype = kwargs.get('plottype', 'spectra')
113 115 self.host = kwargs.get('host', "10.10.10.82")
114 116 self.port = kwargs.get('port', 3000)
115 117 self.clientId = clientId
116 118 self.cnt = 0
117 119 self.zeromq = zeromq
118 120 self.mqtt = kwargs.get('plottype', 0)
119 121 self.client = None
120 122 setup = []
121 123 if mqtt is 1:
122 124 self.client = mqtt.Client(
123 125 client_id=self.clientId + self.topic + 'SCHAIN',
124 126 clean_session=True)
125 127 self.client.on_disconnect = self.on_disconnect
126 128 self.connect()
127 129 for plot in self.plottype:
128 130 setup.append({
129 131 'plot': plot,
130 132 'topic': self.topic + plot,
131 133 'title': getattr(self, plot + '_' + 'title', False),
132 134 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
133 135 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
134 136 'xrange': getattr(self, plot + '_' + 'xrange', False),
135 137 'yrange': getattr(self, plot + '_' + 'yrange', False),
136 138 'zrange': getattr(self, plot + '_' + 'zrange', False),
137 139 })
138 140 if zeromq is 1:
139 141 context = zmq.Context()
140 142 self.zmq_socket = context.socket(zmq.PUSH)
141 143 server = kwargs.get('server', 'zmq.pipe')
142 144
143 145 if 'tcp://' in server:
144 146 address = server
145 147 else:
146 148 address = 'ipc:///tmp/%s' % server
147 149
148 150 self.zmq_socket.connect(address)
149 151 time.sleep(1)
150 152
151
152
153 153 def publish_data(self):
154 154 self.dataOut.finished = False
155 155 if self.mqtt is 1:
156 156 yData = self.dataOut.heightList[:2].tolist()
157 157 if self.plottype == 'spectra':
158 158 data = getattr(self.dataOut, 'data_spc')
159 159 z = data/self.dataOut.normFactor
160 160 zdB = 10*numpy.log10(z)
161 161 xlen, ylen = zdB[0].shape
162 162 dx = int(xlen/MAXNUMX) + 1
163 163 dy = int(ylen/MAXNUMY) + 1
164 164 Z = [0 for i in self.dataOut.channelList]
165 165 for i in self.dataOut.channelList:
166 166 Z[i] = zdB[i][::dx, ::dy].tolist()
167 167 payload = {
168 168 'timestamp': self.dataOut.utctime,
169 169 'data': roundFloats(Z),
170 170 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
171 171 'interval': self.dataOut.getTimeInterval(),
172 172 'type': self.plottype,
173 173 'yData': yData
174 174 }
175 175 # print payload
176 176
177 177 elif self.plottype in ('rti', 'power'):
178 178 data = getattr(self.dataOut, 'data_spc')
179 179 z = data/self.dataOut.normFactor
180 180 avg = numpy.average(z, axis=1)
181 181 avgdB = 10*numpy.log10(avg)
182 182 xlen, ylen = z[0].shape
183 183 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
184 184 AVG = [0 for i in self.dataOut.channelList]
185 185 for i in self.dataOut.channelList:
186 186 AVG[i] = avgdB[i][::dy].tolist()
187 187 payload = {
188 188 'timestamp': self.dataOut.utctime,
189 189 'data': roundFloats(AVG),
190 190 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
191 191 'interval': self.dataOut.getTimeInterval(),
192 192 'type': self.plottype,
193 193 'yData': yData
194 194 }
195 195 elif self.plottype == 'noise':
196 196 noise = self.dataOut.getNoise()/self.dataOut.normFactor
197 197 noisedB = 10*numpy.log10(noise)
198 198 payload = {
199 199 'timestamp': self.dataOut.utctime,
200 200 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
201 201 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
202 202 'interval': self.dataOut.getTimeInterval(),
203 203 'type': self.plottype,
204 204 'yData': yData
205 205 }
206 206 elif self.plottype == 'snr':
207 207 data = getattr(self.dataOut, 'data_SNR')
208 208 avgdB = 10*numpy.log10(data)
209 209
210 210 ylen = data[0].size
211 211 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
212 212 AVG = [0 for i in self.dataOut.channelList]
213 213 for i in self.dataOut.channelList:
214 214 AVG[i] = avgdB[i][::dy].tolist()
215 215 payload = {
216 216 'timestamp': self.dataOut.utctime,
217 217 'data': roundFloats(AVG),
218 218 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
219 219 'type': self.plottype,
220 220 'yData': yData
221 221 }
222 222 else:
223 223 print "Tipo de grafico invalido"
224 224 payload = {
225 225 'data': 'None',
226 226 'timestamp': 'None',
227 227 'type': None
228 228 }
229 229 # print 'Publishing data to {}'.format(self.host)
230 230 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
231 231
232 232 if self.zeromq is 1:
233 233 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
234 234 self.zmq_socket.send_pyobj(self.dataOut)
235 235
236 236 def run(self, dataOut, **kwargs):
237 237 self.dataOut = dataOut
238 238 if not self.isConfig:
239 239 self.setup(**kwargs)
240 240 self.isConfig = True
241 241
242 242 self.publish_data()
243 243 time.sleep(self.delay)
244 244
245 245 def close(self):
246 246 if self.zeromq is 1:
247 247 self.dataOut.finished = True
248 248 self.zmq_socket.send_pyobj(self.dataOut)
249 249
250 250 if self.client:
251 251 self.client.loop_stop()
252 252 self.client.disconnect()
253 253
254 254
255 255 class ReceiverData(ProcessingUnit, Process):
256 256
257 257 throttle_value = 5
258 258
259 259 def __init__(self, **kwargs):
260 260
261 261 ProcessingUnit.__init__(self, **kwargs)
262 262 Process.__init__(self)
263 263 self.mp = False
264 264 self.isConfig = False
265 265 self.isWebConfig = False
266 266 self.plottypes =[]
267 267 self.connections = 0
268 268 server = kwargs.get('server', 'zmq.pipe')
269 269 plot_server = kwargs.get('plot_server', 'zmq.web')
270 270 if 'tcp://' in server:
271 271 address = server
272 272 else:
273 273 address = 'ipc:///tmp/%s' % server
274 274
275 275 if 'tcp://' in plot_server:
276 276 plot_address = plot_server
277 277 else:
278 278 plot_address = 'ipc:///tmp/%s' % plot_server
279 279
280 280 self.address = address
281 281 self.plot_address = plot_address
282 282 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
283 283 self.realtime = kwargs.get('realtime', False)
284 284 self.throttle_value = kwargs.get('throttle', 10)
285 285 self.sendData = self.initThrottle(self.throttle_value)
286 286 self.setup()
287 287
288 288 def setup(self):
289 289
290 290 self.data = {}
291 291 self.data['times'] = []
292 292 for plottype in self.plottypes:
293 293 self.data[plottype] = {}
294 294 self.data['noise'] = {}
295 295 self.data['throttle'] = self.throttle_value
296 296 self.data['ENDED'] = False
297 297 self.isConfig = True
298 298 self.data_web = {}
299 299
300 300 def event_monitor(self, monitor):
301 301
302 302 events = {}
303 303
304 304 for name in dir(zmq):
305 305 if name.startswith('EVENT_'):
306 306 value = getattr(zmq, name)
307 307 events[value] = name
308 308
309 309 while monitor.poll():
310 310 evt = recv_monitor_message(monitor)
311 311 if evt['event'] == 32:
312 312 self.connections += 1
313 313 if evt['event'] == 512:
314 314 pass
315 315 if self.connections == 0 and self.started is True:
316 316 self.ended = True
317 317 # send('ENDED')
318 318 evt.update({'description': events[evt['event']]})
319 319
320 320 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
321 321 break
322 322 monitor.close()
323 323 print("event monitor thread done!")
324 324
325 325 def initThrottle(self, throttle_value):
326 326
327 327 @throttle(seconds=throttle_value)
328 328 def sendDataThrottled(fn_sender, data):
329 329 fn_sender(data)
330 330
331 331 return sendDataThrottled
332 332
333 333 def send(self, data):
334 334 # print '[sending] data=%s size=%s' % (data.keys(), len(data['times']))
335 335 self.sender.send_pyobj(data)
336 336
337 337 def update(self):
338 338
339 339 t = self.dataOut.ltctime
340 340 self.data['times'].append(t)
341 341 self.data['dataOut'] = self.dataOut
342 342 for plottype in self.plottypes:
343 343 if plottype == 'spc':
344 344 z = self.dataOut.data_spc/self.dataOut.normFactor
345 345 self.data[plottype] = 10*numpy.log10(z)
346 346 self.data['noise'][t] = 10*numpy.log10(self.dataOut.getNoise()/self.dataOut.normFactor)
347 347 if plottype == 'rti':
348 348 self.data[plottype][t] = self.dataOut.getPower()
349 349 if plottype == 'snr':
350 350 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_SNR)
351 351 if plottype == 'dop':
352 352 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_DOP)
353 353 if plottype == 'coh':
354 354 self.data[plottype][t] = self.dataOut.getCoherence()
355 355 if plottype == 'phase':
356 356 self.data[plottype][t] = self.dataOut.getCoherence(phase=True)
357 357 if self.realtime:
358 if plottype == 'spc':
359 self.data_web[plottype] = roundFloats(decimate(self.data[plottype]).tolist())
360 else:
358 361 self.data_web[plottype] = roundFloats(decimate(self.data[plottype][t]).tolist())
359 362 self.data_web['time'] = t
363
360 364 def run(self):
361 365
362 366 print '[Starting] {} from {}'.format(self.name, self.address)
363 367
364 368 self.context = zmq.Context()
365 369 self.receiver = self.context.socket(zmq.PULL)
366 370 self.receiver.bind(self.address)
367 371 monitor = self.receiver.get_monitor_socket()
368 372 self.sender = self.context.socket(zmq.PUB)
369 373 if self.realtime:
370 374 self.sender_web = self.context.socket(zmq.PUB)
371 375 self.sender_web.bind(self.plot_address)
372 376 self.sender.bind("ipc:///tmp/zmq.plots")
373 377
374 378 t = Thread(target=self.event_monitor, args=(monitor,))
375 379 t.start()
376 380
377 381 while True:
378 382 self.dataOut = self.receiver.recv_pyobj()
379 383 # print '[Receiving] {} - {}'.format(self.dataOut.type,
380 384 # self.dataOut.datatime.ctime())
381 385
382 386 self.update()
383 387
384 388 if self.dataOut.finished is True:
385 389 self.send(self.data)
386 390 self.connections -= 1
387 391 if self.connections == 0 and self.started:
388 392 self.ended = True
389 393 self.data['ENDED'] = True
390 394 self.send(self.data)
391 395 self.setup()
392 396 else:
393 397 if self.realtime:
394 398 self.send(self.data)
395 399 self.sender_web.send_string(json.dumps(self.data_web))
396 400 else:
397 401 self.sendData(self.send, self.data)
398 402 self.started = True
399 403
400 404 return
401 405
402 406 def sendToWeb(self):
403 407
404 408 if not self.isWebConfig:
405 409 context = zmq.Context()
406 410 sender_web_config = context.socket(zmq.PUB)
407 411 if 'tcp://' in self.plot_address:
408 print self.plot_address
409 412 dum, address, port = self.plot_address.split(':')
410 413 conf_address = '{}:{}:{}'.format(dum, address, int(port)+1)
411 414 else:
412 415 conf_address = self.plot_address + '.config'
413 416 sender_web_config.bind(conf_address)
414
417 time.sleep(1)
415 418 for kwargs in self.operationKwargs.values():
416 419 if 'plot' in kwargs:
420 print '[Sending] Config data to web for {}'.format(kwargs['code'].upper())
417 421 sender_web_config.send_string(json.dumps(kwargs))
418 print kwargs
419 422 self.isWebConfig = True
420
General Comments 0
You need to be logged in to leave comments. Login now