##// END OF EJS Templates
cambio bind x connect in ReceiverData
jespinoza -
r916:ffa675de04b5
parent child
Show More
@@ -1,426 +1,426
1 1
2 2 import os
3 3 import zmq
4 4 import time
5 5 import numpy
6 6 import datetime
7 7 import numpy as np
8 8 import matplotlib.pyplot as plt
9 9 from mpl_toolkits.axes_grid1 import make_axes_locatable
10 10 from matplotlib.ticker import FuncFormatter, LinearLocator
11 11 from multiprocessing import Process
12 12
13 13 from schainpy.model.proc.jroproc_base import Operation
14 14
15 15 #plt.ion()
16 16
17 17 func = lambda x, pos: ('%s') %(datetime.datetime.utcfromtimestamp(x).strftime('%H:%M'))
18 18
19 19 d1970 = datetime.datetime(1970,1,1)
20 20
21 21 class PlotData(Operation, Process):
22 22
23 23 CODE = 'Figure'
24 24 colormap = 'jet'
25 25 CONFLATE = True
26 26 __MAXNUMX = 80
27 27 __MAXNUMY = 80
28 28 __missing = 1E30
29 29
30 30 def __init__(self, **kwargs):
31 31
32 32 Operation.__init__(self, plot=True, **kwargs)
33 33 Process.__init__(self)
34 34 self.kwargs['code'] = self.CODE
35 35 self.mp = False
36 36 self.dataOut = None
37 37 self.isConfig = False
38 38 self.figure = None
39 39 self.axes = []
40 40 self.localtime = kwargs.pop('localtime', True)
41 41 self.show = kwargs.get('show', True)
42 42 self.save = kwargs.get('save', False)
43 43 self.colormap = kwargs.get('colormap', self.colormap)
44 44 self.showprofile = kwargs.get('showprofile', True)
45 45 self.title = kwargs.get('wintitle', '')
46 46 self.xaxis = kwargs.get('xaxis', 'time')
47 47 self.zmin = kwargs.get('zmin', None)
48 48 self.zmax = kwargs.get('zmax', None)
49 49 self.xmin = kwargs.get('xmin', None)
50 50 self.xmax = kwargs.get('xmax', None)
51 51 self.xrange = kwargs.get('xrange', 24)
52 52 self.ymin = kwargs.get('ymin', None)
53 53 self.ymax = kwargs.get('ymax', None)
54 self.throttle_value = 1
54 self.throttle_value = 5
55 55 def fill_gaps(self, x_buffer, y_buffer, z_buffer):
56 56
57 57 if x_buffer.shape[0] < 2:
58 58 return x_buffer, y_buffer, z_buffer
59 59
60 60 deltas = x_buffer[1:] - x_buffer[0:-1]
61 61 x_median = np.median(deltas)
62 62
63 63 index = np.where(deltas > 5*x_median)
64 64
65 65 if len(index[0]) != 0:
66 66 z_buffer[::, index[0], ::] = self.__missing
67 67 z_buffer = np.ma.masked_inside(z_buffer,
68 68 0.99*self.__missing,
69 69 1.01*self.__missing)
70 70
71 71 return x_buffer, y_buffer, z_buffer
72 72
73 73 def decimate(self):
74 74
75 75 # dx = int(len(self.x)/self.__MAXNUMX) + 1
76 76 dy = int(len(self.y)/self.__MAXNUMY) + 1
77 77
78 78 # x = self.x[::dx]
79 79 x = self.x
80 80 y = self.y[::dy]
81 81 z = self.z[::, ::, ::dy]
82 82
83 83 return x, y, z
84 84
85 85 def __plot(self):
86 86
87 87 print 'plotting...{}'.format(self.CODE)
88 88
89 89 if self.show:
90 90 self.figure.show()
91 91
92 92 self.plot()
93 93 self.figure.suptitle('{} {} - Date:{}'.format(self.title, self.CODE.upper(),
94 94 datetime.datetime.utcfromtimestamp(self.max_time).strftime('%y/%m/%d %H:%M:%S')))
95 95
96 96 if self.save:
97 97 figname = os.path.join(self.save, '{}_{}.png'.format(self.CODE,
98 98 datetime.datetime.utcfromtimestamp(self.times[0]).strftime('%y%m%d_%H%M%S')))
99 99 print 'Saving figure: {}'.format(figname)
100 100 self.figure.savefig(figname)
101 101
102 102 self.figure.canvas.draw()
103 103
104 104 def plot(self):
105 105
106 106 print 'plotting...{}'.format(self.CODE.upper())
107 107 return
108 108
109 109 def run(self):
110 110
111 111 print '[Starting] {}'.format(self.name)
112 112 context = zmq.Context()
113 113 receiver = context.socket(zmq.SUB)
114 114 receiver.setsockopt(zmq.SUBSCRIBE, '')
115 115 receiver.setsockopt(zmq.CONFLATE, self.CONFLATE)
116 116 receiver.connect("ipc:///tmp/zmq.plots")
117 117
118 118 while True:
119 119 try:
120 120 #if True:
121 121 self.data = receiver.recv_pyobj(flags=zmq.NOBLOCK)
122 122 self.dataOut = self.data['dataOut']
123 123 self.times = self.data['times']
124 124 self.times.sort()
125 125 self.throttle_value = self.data['throttle']
126 126 self.min_time = self.times[0]
127 127 self.max_time = self.times[-1]
128 128
129 129 if self.isConfig is False:
130 130 self.setup()
131 131 self.isConfig = True
132 132 self.__plot()
133 133
134 134 if self.data['ENDED'] is True:
135 135 # self.__plot()
136 136 self.isConfig = False
137 137
138 138 except zmq.Again as e:
139 139 print 'Waiting for data...'
140 140 plt.pause(self.throttle_value)
141 141 # time.sleep(3)
142 142
143 143 def close(self):
144 144 if self.dataOut:
145 145 self._plot()
146 146
147 147
148 148 class PlotSpectraData(PlotData):
149 149
150 150 CODE = 'spc'
151 151 colormap = 'jro'
152 152 CONFLATE = False
153 153 def setup(self):
154 154
155 155 ncolspan = 1
156 156 colspan = 1
157 157 self.ncols = int(numpy.sqrt(self.dataOut.nChannels)+0.9)
158 158 self.nrows = int(self.dataOut.nChannels*1./self.ncols + 0.9)
159 159 self.width = 3.6*self.ncols
160 160 self.height = 3.2*self.nrows
161 161 if self.showprofile:
162 162 ncolspan = 3
163 163 colspan = 2
164 164 self.width += 1.2*self.ncols
165 165
166 166 self.ylabel = 'Range [Km]'
167 167 self.titles = ['Channel {}'.format(x) for x in self.dataOut.channelList]
168 168
169 169 if self.figure is None:
170 170 self.figure = plt.figure(figsize=(self.width, self.height),
171 171 edgecolor='k',
172 172 facecolor='w')
173 173 else:
174 174 self.figure.clf()
175 175
176 176 n = 0
177 177 for y in range(self.nrows):
178 178 for x in range(self.ncols):
179 179 if n >= self.dataOut.nChannels:
180 180 break
181 181 ax = plt.subplot2grid((self.nrows, self.ncols*ncolspan), (y, x*ncolspan), 1, colspan)
182 182 if self.showprofile:
183 183 ax.ax_profile = plt.subplot2grid((self.nrows, self.ncols*ncolspan), (y, x*ncolspan+colspan), 1, 1)
184 184
185 185 ax.firsttime = True
186 186 self.axes.append(ax)
187 187 n += 1
188 188
189 189 self.figure.subplots_adjust(left=0.1, right=0.95, bottom=0.15, top=0.85, wspace=0.9, hspace=0.5)
190 190
191 191 def plot(self):
192 192
193 193 if self.xaxis == "frequency":
194 194 x = self.dataOut.getFreqRange(1)/1000.
195 195 xlabel = "Frequency (kHz)"
196 196 elif self.xaxis == "time":
197 197 x = self.dataOut.getAcfRange(1)
198 198 xlabel = "Time (ms)"
199 199 else:
200 200 x = self.dataOut.getVelRange(1)
201 201 xlabel = "Velocity (m/s)"
202 202
203 203 y = self.dataOut.getHeiRange()
204 204 z = self.data[self.CODE]
205 205
206 206 for n, ax in enumerate(self.axes):
207 207
208 208 if ax.firsttime:
209 209 self.xmax = self.xmax if self.xmax else np.nanmax(x)
210 210 self.xmin = self.xmin if self.xmin else -self.xmax
211 211 self.ymin = self.ymin if self.ymin else np.nanmin(y)
212 212 self.ymax = self.ymax if self.ymax else np.nanmax(y)
213 213 self.zmin = self.zmin if self.zmin else np.nanmin(z)
214 214 self.zmax = self.zmax if self.zmax else np.nanmax(z)
215 215 ax.plot = ax.pcolormesh(x, y, z[n].T,
216 216 vmin=self.zmin,
217 217 vmax=self.zmax,
218 218 cmap=plt.get_cmap(self.colormap)
219 219 )
220 220 divider = make_axes_locatable(ax)
221 221 cax = divider.new_horizontal(size='3%', pad=0.05)
222 222 self.figure.add_axes(cax)
223 223 plt.colorbar(ax.plot, cax)
224 224
225 225 ax.set_xlim(self.xmin, self.xmax)
226 226 ax.set_ylim(self.ymin, self.ymax)
227 227
228 228 ax.xaxis.set_major_locator(LinearLocator(5))
229 229 #ax.yaxis.set_major_locator(LinearLocator(4))
230 230
231 231 ax.set_ylabel(self.ylabel)
232 232 ax.set_xlabel(xlabel)
233 233
234 234 ax.firsttime = False
235 235
236 236 if self.showprofile:
237 237 ax.plot_profile= ax.ax_profile.plot(self.data['rti'][self.max_time][n], y)[0]
238 238 ax.ax_profile.set_xlim(self.zmin, self.zmax)
239 239 ax.ax_profile.set_ylim(self.ymin, self.ymax)
240 240 ax.ax_profile.set_xlabel('dB')
241 241 ax.ax_profile.grid(b=True, axis='x')
242 242 ax.plot_noise = ax.ax_profile.plot(numpy.repeat(self.data['noise'][self.max_time][n], len(y)), y,
243 243 color="k", linestyle="dashed", lw=2)[0]
244 244 [tick.set_visible(False) for tick in ax.ax_profile.get_yticklabels()]
245 245 else:
246 246 ax.plot.set_array(z[n].T.ravel())
247 247 if self.showprofile:
248 248 ax.plot_profile.set_data(self.data['rti'][self.max_time][n], y)
249 249 ax.plot_noise.set_data(numpy.repeat(self.data['noise'][self.max_time][n], len(y)), y)
250 250
251 251 ax.set_title('{} - Noise: {:.2f} dB'.format(self.titles[n], self.data['noise'][self.max_time][n]),
252 252 size=8)
253 253
254 254 class PlotRTIData(PlotData):
255 255
256 256 CODE = 'rti'
257 257 colormap = 'jro'
258 258
259 259 def setup(self):
260 260 self.ncols = 1
261 261 self.nrows = self.dataOut.nChannels
262 262 self.width = 10
263 263 self.height = 2.2*self.nrows
264 264 if self.nrows==1:
265 265 self.height += 1
266 266 self.ylabel = 'Range [Km]'
267 267 self.titles = ['Channel {}'.format(x) for x in self.dataOut.channelList]
268 268
269 269 if self.figure is None:
270 270 self.figure = plt.figure(figsize=(self.width, self.height),
271 271 edgecolor='k',
272 272 facecolor='w')
273 273 else:
274 274 self.figure.clf()
275 275 self.axes = []
276 276
277 277 for n in range(self.nrows):
278 278 ax = self.figure.add_subplot(self.nrows, self.ncols, n+1)
279 279 ax.firsttime = True
280 280 self.axes.append(ax)
281 281 self.figure.subplots_adjust(hspace=0.5)
282 282
283 283 def plot(self):
284 284
285 285 self.x = np.array(self.times)
286 286 self.y = self.dataOut.getHeiRange()
287 287 self.z = []
288 288
289 289 for ch in range(self.nrows):
290 290 self.z.append([self.data[self.CODE][t][ch] for t in self.times])
291 291
292 292 self.z = np.array(self.z)
293 293 for n, ax in enumerate(self.axes):
294 294
295 295 x, y, z = self.fill_gaps(*self.decimate())
296 296 xmin = self.min_time
297 297 xmax = xmin+self.xrange*60*60
298 298 if ax.firsttime:
299 299 self.ymin = self.ymin if self.ymin else np.nanmin(self.y)
300 300 self.ymax = self.ymax if self.ymax else np.nanmax(self.y)
301 301 self.zmin = self.zmin if self.zmin else np.nanmin(self.z)
302 302 self.zmax = self.zmax if self.zmax else np.nanmax(self.z)
303 303 plot = ax.pcolormesh(x, y, z[n].T,
304 304 vmin=self.zmin,
305 305 vmax=self.zmax,
306 306 cmap=plt.get_cmap(self.colormap)
307 307 )
308 308 divider = make_axes_locatable(ax)
309 309 cax = divider.new_horizontal(size='2%', pad=0.05)
310 310 self.figure.add_axes(cax)
311 311 plt.colorbar(plot, cax)
312 312 ax.set_ylim(self.ymin, self.ymax)
313 313 if self.xaxis == 'time':
314 314 ax.xaxis.set_major_formatter(FuncFormatter(func))
315 315 ax.xaxis.set_major_locator(LinearLocator(6))
316 316
317 317 # ax.yaxis.set_major_locator(LinearLocator(4))
318 318
319 319 ax.set_ylabel(self.ylabel)
320 320
321 321 # if self.xmin is None:
322 322 # xmin = self.min_time
323 323 # else:
324 324 # xmin = (datetime.datetime.combine(self.dataOut.datatime.date(),
325 325 # datetime.time(self.xmin, 0, 0))-d1970).total_seconds()
326 326
327 327 ax.set_xlim(xmin, xmax)
328 328 ax.firsttime = False
329 329 else:
330 330 ax.collections.remove(ax.collections[0])
331 331 ax.set_xlim(xmin, xmax)
332 332 plot = ax.pcolormesh(x, y, z[n].T,
333 333 vmin=self.zmin,
334 334 vmax=self.zmax,
335 335 cmap=plt.get_cmap(self.colormap)
336 336 )
337 337 ax.set_title('{} {}'.format(self.titles[n],
338 338 datetime.datetime.utcfromtimestamp(self.max_time).strftime('%y/%m/%d %H:%M:%S')),
339 339 size=8)
340 340
341 341
342 342 class PlotCOHData(PlotRTIData):
343 343
344 344 CODE = 'coh'
345 345
346 346 def setup(self):
347 347
348 348 self.ncols = 1
349 349 self.nrows = self.dataOut.nPairs
350 350 self.width = 10
351 351 self.height = 2.2*self.nrows
352 352 if self.nrows==1:
353 353 self.height += 1
354 354 self.ylabel = 'Range [Km]'
355 355 self.titles = ['Channels {}'.format(x) for x in self.dataOut.pairsList]
356 356
357 357 if self.figure is None:
358 358 self.figure = plt.figure(figsize=(self.width, self.height),
359 359 edgecolor='k',
360 360 facecolor='w')
361 361 else:
362 362 self.figure.clf()
363 363 self.axes = []
364 364
365 365 for n in range(self.nrows):
366 366 ax = self.figure.add_subplot(self.nrows, self.ncols, n+1)
367 367 ax.firsttime = True
368 368 self.axes.append(ax)
369 369
370 370 self.figure.subplots_adjust(hspace=0.5)
371 371
372 372 class PlotNoiseData(PlotData):
373 373 CODE = 'noise'
374 374
375 375 def setup(self):
376 376
377 377 self.ncols = 1
378 378 self.nrows = 1
379 379 self.width = 10
380 380 self.height = 3.2
381 381 self.ylabel = 'Intensity [dB]'
382 382 self.titles = ['Noise']
383 383
384 384 if self.figure is None:
385 385 self.figure = plt.figure(figsize=(self.width, self.height),
386 386 edgecolor='k',
387 387 facecolor='w')
388 388 else:
389 389 self.figure.clf()
390 390 self.axes = []
391 391
392 392 self.ax = self.figure.add_subplot(self.nrows, self.ncols, 1)
393 393 self.ax.firsttime = True
394 394
395 395 def plot(self):
396 396
397 397 x = self.times
398 398 xmin = self.min_time
399 399 xmax = xmin+self.xrange*60*60
400 400 if self.ax.firsttime:
401 401 for ch in self.dataOut.channelList:
402 402 y = [self.data[self.CODE][t][ch] for t in self.times]
403 403 self.ax.plot(x, y, lw=1, label='Ch{}'.format(ch))
404 404 self.ax.firsttime = False
405 405 self.ax.xaxis.set_major_formatter(FuncFormatter(func))
406 406 self.ax.xaxis.set_major_locator(LinearLocator(6))
407 407 self.ax.set_ylabel(self.ylabel)
408 408 plt.legend()
409 409 else:
410 410 for ch in self.dataOut.channelList:
411 411 y = [self.data[self.CODE][t][ch] for t in self.times]
412 412 self.ax.lines[ch].set_data(x, y)
413 413
414 414 self.ax.set_xlim(xmin, xmax)
415 415 self.ax.set_ylim(min(y)-5, max(y)+5)
416 416
417 417 class PlotSNRData(PlotRTIData):
418 418 CODE = 'snr'
419 419
420 420 class PlotDOPData(PlotRTIData):
421 421 CODE = 'dop'
422 422 colormap = 'jet'
423 423
424 424 class PlotPHASEData(PlotCOHData):
425 425 CODE = 'phase'
426 426 colormap = 'seismic'
@@ -1,424 +1,425
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import time
6 6 import json
7 7 import numpy
8 8 import paho.mqtt.client as mqtt
9 9 import zmq
10 10 import cPickle as pickle
11 11 import datetime
12 12 from zmq.utils.monitor import recv_monitor_message
13 13 from functools import wraps
14 14 from threading import Thread
15 15 from multiprocessing import Process
16 16
17 17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
18 18
19 19 MAXNUMX = 100
20 20 MAXNUMY = 100
21 21
22 22 class PrettyFloat(float):
23 23 def __repr__(self):
24 24 return '%.2f' % self
25 25
26 26 def roundFloats(obj):
27 27 if isinstance(obj, list):
28 28 return map(roundFloats, obj)
29 29 elif isinstance(obj, float):
30 30 return round(obj, 2)
31 31
32 32 def decimate(z):
33 33 # dx = int(len(self.x)/self.__MAXNUMX) + 1
34 34
35 35 dy = int(len(z[0])/MAXNUMY) + 1
36 36
37 37 return z[::, ::dy]
38 38
39 39 class throttle(object):
40 40 """Decorator that prevents a function from being called more than once every
41 41 time period.
42 42 To create a function that cannot be called more than once a minute, but
43 43 will sleep until it can be called:
44 44 @throttle(minutes=1)
45 45 def foo():
46 46 pass
47 47
48 48 for i in range(10):
49 49 foo()
50 50 print "This function has run %s times." % i
51 51 """
52 52
53 53 def __init__(self, seconds=0, minutes=0, hours=0):
54 54 self.throttle_period = datetime.timedelta(
55 55 seconds=seconds, minutes=minutes, hours=hours
56 56 )
57 57
58 58 self.time_of_last_call = datetime.datetime.min
59 59
60 60 def __call__(self, fn):
61 61 @wraps(fn)
62 62 def wrapper(*args, **kwargs):
63 63 now = datetime.datetime.now()
64 64 time_since_last_call = now - self.time_of_last_call
65 65 time_left = self.throttle_period - time_since_last_call
66 66
67 67 if time_left > datetime.timedelta(seconds=0):
68 68 return
69 69
70 70 self.time_of_last_call = datetime.datetime.now()
71 71 return fn(*args, **kwargs)
72 72
73 73 return wrapper
74 74
75 75
76 76 class PublishData(Operation):
77 77 """Clase publish."""
78 78
79 79 def __init__(self, **kwargs):
80 80 """Inicio."""
81 81 Operation.__init__(self, **kwargs)
82 82 self.isConfig = False
83 83 self.client = None
84 84 self.zeromq = None
85 85 self.mqtt = None
86 86
87 87 def on_disconnect(self, client, userdata, rc):
88 88 if rc != 0:
89 89 print("Unexpected disconnection.")
90 90 self.connect()
91 91
92 92 def connect(self):
93 93 print 'trying to connect'
94 94 try:
95 95 self.client.connect(
96 96 host=self.host,
97 97 port=self.port,
98 98 keepalive=60*10,
99 99 bind_address='')
100 100 self.client.loop_start()
101 101 # self.client.publish(
102 102 # self.topic + 'SETUP',
103 103 # json.dumps(setup),
104 104 # retain=True
105 105 # )
106 106 except:
107 107 print "MQTT Conection error."
108 108 self.client = False
109 109
110 110 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
111 111 self.counter = 0
112 112 self.topic = kwargs.get('topic', 'schain')
113 113 self.delay = kwargs.get('delay', 0)
114 114 self.plottype = kwargs.get('plottype', 'spectra')
115 115 self.host = kwargs.get('host', "10.10.10.82")
116 116 self.port = kwargs.get('port', 3000)
117 117 self.clientId = clientId
118 118 self.cnt = 0
119 119 self.zeromq = zeromq
120 120 self.mqtt = kwargs.get('plottype', 0)
121 121 self.client = None
122 122 setup = []
123 123 if mqtt is 1:
124 124 self.client = mqtt.Client(
125 125 client_id=self.clientId + self.topic + 'SCHAIN',
126 126 clean_session=True)
127 127 self.client.on_disconnect = self.on_disconnect
128 128 self.connect()
129 129 for plot in self.plottype:
130 130 setup.append({
131 131 'plot': plot,
132 132 'topic': self.topic + plot,
133 133 'title': getattr(self, plot + '_' + 'title', False),
134 134 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
135 135 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
136 136 'xrange': getattr(self, plot + '_' + 'xrange', False),
137 137 'yrange': getattr(self, plot + '_' + 'yrange', False),
138 138 'zrange': getattr(self, plot + '_' + 'zrange', False),
139 139 })
140 140 if zeromq is 1:
141 141 context = zmq.Context()
142 142 self.zmq_socket = context.socket(zmq.PUSH)
143 143 server = kwargs.get('server', 'zmq.pipe')
144 144
145 145 if 'tcp://' in server:
146 146 address = server
147 147 else:
148 148 address = 'ipc:///tmp/%s' % server
149 149
150 150 self.zmq_socket.connect(address)
151 151 time.sleep(1)
152 152
153 153 def publish_data(self):
154 154 self.dataOut.finished = False
155 155 if self.mqtt is 1:
156 156 yData = self.dataOut.heightList[:2].tolist()
157 157 if self.plottype == 'spectra':
158 158 data = getattr(self.dataOut, 'data_spc')
159 159 z = data/self.dataOut.normFactor
160 160 zdB = 10*numpy.log10(z)
161 161 xlen, ylen = zdB[0].shape
162 162 dx = int(xlen/MAXNUMX) + 1
163 163 dy = int(ylen/MAXNUMY) + 1
164 164 Z = [0 for i in self.dataOut.channelList]
165 165 for i in self.dataOut.channelList:
166 166 Z[i] = zdB[i][::dx, ::dy].tolist()
167 167 payload = {
168 168 'timestamp': self.dataOut.utctime,
169 169 'data': roundFloats(Z),
170 170 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
171 171 'interval': self.dataOut.getTimeInterval(),
172 172 'type': self.plottype,
173 173 'yData': yData
174 174 }
175 175 # print payload
176 176
177 177 elif self.plottype in ('rti', 'power'):
178 178 data = getattr(self.dataOut, 'data_spc')
179 179 z = data/self.dataOut.normFactor
180 180 avg = numpy.average(z, axis=1)
181 181 avgdB = 10*numpy.log10(avg)
182 182 xlen, ylen = z[0].shape
183 183 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
184 184 AVG = [0 for i in self.dataOut.channelList]
185 185 for i in self.dataOut.channelList:
186 186 AVG[i] = avgdB[i][::dy].tolist()
187 187 payload = {
188 188 'timestamp': self.dataOut.utctime,
189 189 'data': roundFloats(AVG),
190 190 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
191 191 'interval': self.dataOut.getTimeInterval(),
192 192 'type': self.plottype,
193 193 'yData': yData
194 194 }
195 195 elif self.plottype == 'noise':
196 196 noise = self.dataOut.getNoise()/self.dataOut.normFactor
197 197 noisedB = 10*numpy.log10(noise)
198 198 payload = {
199 199 'timestamp': self.dataOut.utctime,
200 200 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
201 201 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
202 202 'interval': self.dataOut.getTimeInterval(),
203 203 'type': self.plottype,
204 204 'yData': yData
205 205 }
206 206 elif self.plottype == 'snr':
207 207 data = getattr(self.dataOut, 'data_SNR')
208 208 avgdB = 10*numpy.log10(data)
209 209
210 210 ylen = data[0].size
211 211 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
212 212 AVG = [0 for i in self.dataOut.channelList]
213 213 for i in self.dataOut.channelList:
214 214 AVG[i] = avgdB[i][::dy].tolist()
215 215 payload = {
216 216 'timestamp': self.dataOut.utctime,
217 217 'data': roundFloats(AVG),
218 218 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
219 219 'type': self.plottype,
220 220 'yData': yData
221 221 }
222 222 else:
223 223 print "Tipo de grafico invalido"
224 224 payload = {
225 225 'data': 'None',
226 226 'timestamp': 'None',
227 227 'type': None
228 228 }
229 229 # print 'Publishing data to {}'.format(self.host)
230 230 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
231 231
232 232 if self.zeromq is 1:
233 233 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
234 234 self.zmq_socket.send_pyobj(self.dataOut)
235 235
236 236 def run(self, dataOut, **kwargs):
237 237 self.dataOut = dataOut
238 238 if not self.isConfig:
239 239 self.setup(**kwargs)
240 240 self.isConfig = True
241 241
242 242 self.publish_data()
243 243 time.sleep(self.delay)
244 244
245 245 def close(self):
246 246 if self.zeromq is 1:
247 247 self.dataOut.finished = True
248 248 self.zmq_socket.send_pyobj(self.dataOut)
249 249
250 250 if self.client:
251 251 self.client.loop_stop()
252 252 self.client.disconnect()
253 253
254 254
255 255 class ReceiverData(ProcessingUnit, Process):
256 256
257 257 throttle_value = 5
258 258
259 259 def __init__(self, **kwargs):
260 260
261 261 ProcessingUnit.__init__(self, **kwargs)
262 262 Process.__init__(self)
263 263 self.mp = False
264 264 self.isConfig = False
265 265 self.isWebConfig = False
266 266 self.plottypes =[]
267 267 self.connections = 0
268 268 server = kwargs.get('server', 'zmq.pipe')
269 269 plot_server = kwargs.get('plot_server', 'zmq.web')
270 270 if 'tcp://' in server:
271 271 address = server
272 272 else:
273 273 address = 'ipc:///tmp/%s' % server
274 274
275 275 if 'tcp://' in plot_server:
276 276 plot_address = plot_server
277 277 else:
278 278 plot_address = 'ipc:///tmp/%s' % plot_server
279 279
280 280 self.address = address
281 281 self.plot_address = plot_address
282 282 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
283 283 self.realtime = kwargs.get('realtime', False)
284 284 self.throttle_value = kwargs.get('throttle', 10)
285 285 self.sendData = self.initThrottle(self.throttle_value)
286 286 self.setup()
287 287
288 288 def setup(self):
289 289
290 290 self.data = {}
291 291 self.data['times'] = []
292 292 for plottype in self.plottypes:
293 293 self.data[plottype] = {}
294 294 self.data['noise'] = {}
295 295 self.data['throttle'] = self.throttle_value
296 296 self.data['ENDED'] = False
297 297 self.isConfig = True
298 298 self.data_web = {}
299 299
300 300 def event_monitor(self, monitor):
301 301
302 302 events = {}
303 303
304 304 for name in dir(zmq):
305 305 if name.startswith('EVENT_'):
306 306 value = getattr(zmq, name)
307 307 events[value] = name
308 308
309 309 while monitor.poll():
310 310 evt = recv_monitor_message(monitor)
311 311 if evt['event'] == 32:
312 312 self.connections += 1
313 313 if evt['event'] == 512:
314 314 pass
315 315 if self.connections == 0 and self.started is True:
316 316 self.ended = True
317 317 # send('ENDED')
318 318 evt.update({'description': events[evt['event']]})
319 319
320 320 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
321 321 break
322 322 monitor.close()
323 323 print("event monitor thread done!")
324 324
325 325 def initThrottle(self, throttle_value):
326 326
327 327 @throttle(seconds=throttle_value)
328 328 def sendDataThrottled(fn_sender, data):
329 329 fn_sender(data)
330 330
331 331 return sendDataThrottled
332 332
333 333 def send(self, data):
334 334 # print '[sending] data=%s size=%s' % (data.keys(), len(data['times']))
335 335 self.sender.send_pyobj(data)
336 336
337 337 def update(self):
338 338
339 339 t = self.dataOut.ltctime
340 340 self.data['times'].append(t)
341 341 self.data['dataOut'] = self.dataOut
342 342 for plottype in self.plottypes:
343 343 if plottype == 'spc':
344 344 z = self.dataOut.data_spc/self.dataOut.normFactor
345 345 self.data[plottype] = 10*numpy.log10(z)
346 346 self.data['noise'][t] = 10*numpy.log10(self.dataOut.getNoise()/self.dataOut.normFactor)
347 347 if plottype == 'rti':
348 348 self.data[plottype][t] = self.dataOut.getPower()
349 349 if plottype == 'snr':
350 350 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_SNR)
351 351 if plottype == 'dop':
352 352 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_DOP)
353 353 if plottype == 'coh':
354 354 self.data[plottype][t] = self.dataOut.getCoherence()
355 355 if plottype == 'phase':
356 356 self.data[plottype][t] = self.dataOut.getCoherence(phase=True)
357 357 if self.realtime:
358 358 if plottype == 'spc':
359 359 self.data_web[plottype] = roundFloats(decimate(self.data[plottype]).tolist())
360 360 else:
361 361 self.data_web[plottype] = roundFloats(decimate(self.data[plottype][t]).tolist())
362 362 self.data_web['time'] = t
363 363 self.data_web['interval'] = self.dataOut.getTimeInterval()
364 364 self.data_web['type'] = plottype
365 365
366 366 def run(self):
367 367
368 368 print '[Starting] {} from {}'.format(self.name, self.address)
369 369
370 370 self.context = zmq.Context()
371 371 self.receiver = self.context.socket(zmq.PULL)
372 372 self.receiver.bind(self.address)
373 373 monitor = self.receiver.get_monitor_socket()
374 374 self.sender = self.context.socket(zmq.PUB)
375 375 if self.realtime:
376 376 self.sender_web = self.context.socket(zmq.PUB)
377 self.sender_web.bind(self.plot_address)
377 self.sender_web.connect(self.plot_address)
378 time.sleep(1)
378 379 self.sender.bind("ipc:///tmp/zmq.plots")
379 380
380 381 t = Thread(target=self.event_monitor, args=(monitor,))
381 382 t.start()
382 383
383 384 while True:
384 385 self.dataOut = self.receiver.recv_pyobj()
385 386 # print '[Receiving] {} - {}'.format(self.dataOut.type,
386 387 # self.dataOut.datatime.ctime())
387 388
388 389 self.update()
389 390
390 391 if self.dataOut.finished is True:
391 392 self.send(self.data)
392 393 self.connections -= 1
393 394 if self.connections == 0 and self.started:
394 395 self.ended = True
395 396 self.data['ENDED'] = True
396 397 self.send(self.data)
397 398 self.setup()
398 399 else:
399 400 if self.realtime:
400 401 self.send(self.data)
401 402 self.sender_web.send_string(json.dumps(self.data_web))
402 403 else:
403 404 self.sendData(self.send, self.data)
404 405 self.started = True
405 406
406 407 return
407 408
408 409 def sendToWeb(self):
409 410
410 411 if not self.isWebConfig:
411 412 context = zmq.Context()
412 413 sender_web_config = context.socket(zmq.PUB)
413 414 if 'tcp://' in self.plot_address:
414 415 dum, address, port = self.plot_address.split(':')
415 416 conf_address = '{}:{}:{}'.format(dum, address, int(port)+1)
416 417 else:
417 418 conf_address = self.plot_address + '.config'
418 419 sender_web_config.bind(conf_address)
419 420 time.sleep(1)
420 421 for kwargs in self.operationKwargs.values():
421 422 if 'plot' in kwargs:
422 423 print '[Sending] Config data to web for {}'.format(kwargs['code'].upper())
423 424 sender_web_config.send_string(json.dumps(kwargs))
424 425 self.isWebConfig = True
General Comments 0
You need to be logged in to leave comments. Login now