##// END OF EJS Templates
merged
José Chávez -
r915:2e893773fb1e
parent child
Show More
@@ -1,432 +1,422
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import time
6 6 import json
7 7 import numpy
8 8 import paho.mqtt.client as mqtt
9 9 import zmq
10 10 import cPickle as pickle
11 11 import datetime
12 12 from zmq.utils.monitor import recv_monitor_message
13 13 from functools import wraps
14 14 from threading import Thread
15 15 from multiprocessing import Process
16 16
17 17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
18 18
19 19 MAXNUMX = 100
20 20 MAXNUMY = 100
21 21
22 22 class PrettyFloat(float):
23 23 def __repr__(self):
24 24 return '%.2f' % self
25 25
26 26 def roundFloats(obj):
27 27 if isinstance(obj, list):
28 28 return map(roundFloats, obj)
29 29 elif isinstance(obj, float):
30 30 return round(obj, 2)
31 31
32 32 def decimate(z):
33 33 # dx = int(len(self.x)/self.__MAXNUMX) + 1
34 34
35 35 dy = int(len(z[0])/MAXNUMY) + 1
36 36
37 37 return z[::, ::dy]
38 38
39 39 class throttle(object):
40 40 """Decorator that prevents a function from being called more than once every
41 41 time period.
42 42 To create a function that cannot be called more than once a minute, but
43 43 will sleep until it can be called:
44 44 @throttle(minutes=1)
45 45 def foo():
46 46 pass
47 47
48 48 for i in range(10):
49 49 foo()
50 50 print "This function has run %s times." % i
51 51 """
52 52
53 53 def __init__(self, seconds=0, minutes=0, hours=0):
54 54 self.throttle_period = datetime.timedelta(
55 55 seconds=seconds, minutes=minutes, hours=hours
56 56 )
57 57
58 58 self.time_of_last_call = datetime.datetime.min
59 59
60 60 def __call__(self, fn):
61 61 @wraps(fn)
62 62 def wrapper(*args, **kwargs):
63 63 now = datetime.datetime.now()
64 64 time_since_last_call = now - self.time_of_last_call
65 65 time_left = self.throttle_period - time_since_last_call
66 66
67 67 if time_left > datetime.timedelta(seconds=0):
68 68 return
69 69
70 70 self.time_of_last_call = datetime.datetime.now()
71 71 return fn(*args, **kwargs)
72 72
73 73 return wrapper
74 74
75 75
76 76 class PublishData(Operation):
77 77 """Clase publish."""
78 78
79 79 def __init__(self, **kwargs):
80 80 """Inicio."""
81 81 Operation.__init__(self, **kwargs)
82 82 self.isConfig = False
83 83 self.client = None
84 84 self.zeromq = None
85 85 self.mqtt = None
86 86
87 87 def on_disconnect(self, client, userdata, rc):
88 88 if rc != 0:
89 89 print("Unexpected disconnection.")
90 90 self.connect()
91 91
92 92 def connect(self):
93 93 print 'trying to connect'
94 94 try:
95 95 self.client.connect(
96 96 host=self.host,
97 97 port=self.port,
98 98 keepalive=60*10,
99 99 bind_address='')
100 100 self.client.loop_start()
101 101 # self.client.publish(
102 102 # self.topic + 'SETUP',
103 103 # json.dumps(setup),
104 104 # retain=True
105 105 # )
106 106 except:
107 107 print "MQTT Conection error."
108 108 self.client = False
109 109
110 110 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
111 111 self.counter = 0
112 112 self.topic = kwargs.get('topic', 'schain')
113 113 self.delay = kwargs.get('delay', 0)
114 114 self.plottype = kwargs.get('plottype', 'spectra')
115 115 self.host = kwargs.get('host', "10.10.10.82")
116 116 self.port = kwargs.get('port', 3000)
117 117 self.clientId = clientId
118 118 self.cnt = 0
119 119 self.zeromq = zeromq
120 120 self.mqtt = kwargs.get('plottype', 0)
121 121 self.client = None
122 122 setup = []
123 123 if mqtt is 1:
124 124 self.client = mqtt.Client(
125 125 client_id=self.clientId + self.topic + 'SCHAIN',
126 126 clean_session=True)
127 127 self.client.on_disconnect = self.on_disconnect
128 128 self.connect()
129 129 for plot in self.plottype:
130 130 setup.append({
131 131 'plot': plot,
132 132 'topic': self.topic + plot,
133 133 'title': getattr(self, plot + '_' + 'title', False),
134 134 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
135 135 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
136 136 'xrange': getattr(self, plot + '_' + 'xrange', False),
137 137 'yrange': getattr(self, plot + '_' + 'yrange', False),
138 138 'zrange': getattr(self, plot + '_' + 'zrange', False),
139 139 })
140 140 if zeromq is 1:
141 141 context = zmq.Context()
142 142 self.zmq_socket = context.socket(zmq.PUSH)
143 143 server = kwargs.get('server', 'zmq.pipe')
144 144
145 145 if 'tcp://' in server:
146 146 address = server
147 147 else:
148 148 address = 'ipc:///tmp/%s' % server
149 149
150 150 self.zmq_socket.connect(address)
151 151 time.sleep(1)
152 152
153 153 def publish_data(self):
154 154 self.dataOut.finished = False
155 155 if self.mqtt is 1:
156 156 yData = self.dataOut.heightList[:2].tolist()
157 157 if self.plottype == 'spectra':
158 158 data = getattr(self.dataOut, 'data_spc')
159 159 z = data/self.dataOut.normFactor
160 160 zdB = 10*numpy.log10(z)
161 161 xlen, ylen = zdB[0].shape
162 162 dx = int(xlen/MAXNUMX) + 1
163 163 dy = int(ylen/MAXNUMY) + 1
164 164 Z = [0 for i in self.dataOut.channelList]
165 165 for i in self.dataOut.channelList:
166 166 Z[i] = zdB[i][::dx, ::dy].tolist()
167 167 payload = {
168 168 'timestamp': self.dataOut.utctime,
169 169 'data': roundFloats(Z),
170 170 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
171 171 'interval': self.dataOut.getTimeInterval(),
172 172 'type': self.plottype,
173 173 'yData': yData
174 174 }
175 175 # print payload
176 176
177 177 elif self.plottype in ('rti', 'power'):
178 178 data = getattr(self.dataOut, 'data_spc')
179 179 z = data/self.dataOut.normFactor
180 180 avg = numpy.average(z, axis=1)
181 181 avgdB = 10*numpy.log10(avg)
182 182 xlen, ylen = z[0].shape
183 183 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
184 184 AVG = [0 for i in self.dataOut.channelList]
185 185 for i in self.dataOut.channelList:
186 186 AVG[i] = avgdB[i][::dy].tolist()
187 187 payload = {
188 188 'timestamp': self.dataOut.utctime,
189 189 'data': roundFloats(AVG),
190 190 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
191 191 'interval': self.dataOut.getTimeInterval(),
192 192 'type': self.plottype,
193 193 'yData': yData
194 194 }
195 195 elif self.plottype == 'noise':
196 196 noise = self.dataOut.getNoise()/self.dataOut.normFactor
197 197 noisedB = 10*numpy.log10(noise)
198 198 payload = {
199 199 'timestamp': self.dataOut.utctime,
200 200 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
201 201 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
202 202 'interval': self.dataOut.getTimeInterval(),
203 203 'type': self.plottype,
204 204 'yData': yData
205 205 }
206 206 elif self.plottype == 'snr':
207 207 data = getattr(self.dataOut, 'data_SNR')
208 208 avgdB = 10*numpy.log10(data)
209 209
210 210 ylen = data[0].size
211 211 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
212 212 AVG = [0 for i in self.dataOut.channelList]
213 213 for i in self.dataOut.channelList:
214 214 AVG[i] = avgdB[i][::dy].tolist()
215 215 payload = {
216 216 'timestamp': self.dataOut.utctime,
217 217 'data': roundFloats(AVG),
218 218 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
219 219 'type': self.plottype,
220 220 'yData': yData
221 221 }
222 222 else:
223 223 print "Tipo de grafico invalido"
224 224 payload = {
225 225 'data': 'None',
226 226 'timestamp': 'None',
227 227 'type': None
228 228 }
229 229 # print 'Publishing data to {}'.format(self.host)
230 230 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
231 231
232 232 if self.zeromq is 1:
233 233 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
234 234 self.zmq_socket.send_pyobj(self.dataOut)
235 235
236 236 def run(self, dataOut, **kwargs):
237 237 self.dataOut = dataOut
238 238 if not self.isConfig:
239 239 self.setup(**kwargs)
240 240 self.isConfig = True
241 241
242 242 self.publish_data()
243 243 time.sleep(self.delay)
244 244
245 245 def close(self):
246 246 if self.zeromq is 1:
247 247 self.dataOut.finished = True
248 248 self.zmq_socket.send_pyobj(self.dataOut)
249 249
250 250 if self.client:
251 251 self.client.loop_stop()
252 252 self.client.disconnect()
253 253
254 254
255 255 class ReceiverData(ProcessingUnit, Process):
256 256
257 257 throttle_value = 5
258 258
259 259 def __init__(self, **kwargs):
260 260
261 261 ProcessingUnit.__init__(self, **kwargs)
262 262 Process.__init__(self)
263 263 self.mp = False
264 264 self.isConfig = False
265 265 self.isWebConfig = False
266 266 self.plottypes =[]
267 267 self.connections = 0
268 268 server = kwargs.get('server', 'zmq.pipe')
269 269 plot_server = kwargs.get('plot_server', 'zmq.web')
270 270 if 'tcp://' in server:
271 271 address = server
272 272 else:
273 273 address = 'ipc:///tmp/%s' % server
274 274
275 275 if 'tcp://' in plot_server:
276 276 plot_address = plot_server
277 277 else:
278 278 plot_address = 'ipc:///tmp/%s' % plot_server
279 279
280 280 self.address = address
281 281 self.plot_address = plot_address
282 282 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
283 283 self.realtime = kwargs.get('realtime', False)
284 284 self.throttle_value = kwargs.get('throttle', 10)
285 285 self.sendData = self.initThrottle(self.throttle_value)
286 286 self.setup()
287 287
288 288 def setup(self):
289 289
290 290 self.data = {}
291 291 self.data['times'] = []
292 292 for plottype in self.plottypes:
293 293 self.data[plottype] = {}
294 294 self.data['noise'] = {}
295 295 self.data['throttle'] = self.throttle_value
296 296 self.data['ENDED'] = False
297 297 self.isConfig = True
298 298 self.data_web = {}
299 299
300 300 def event_monitor(self, monitor):
301 301
302 302 events = {}
303 303
304 304 for name in dir(zmq):
305 305 if name.startswith('EVENT_'):
306 306 value = getattr(zmq, name)
307 307 events[value] = name
308 308
309 309 while monitor.poll():
310 310 evt = recv_monitor_message(monitor)
311 311 if evt['event'] == 32:
312 312 self.connections += 1
313 313 if evt['event'] == 512:
314 314 pass
315 315 if self.connections == 0 and self.started is True:
316 316 self.ended = True
317 317 # send('ENDED')
318 318 evt.update({'description': events[evt['event']]})
319 319
320 320 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
321 321 break
322 322 monitor.close()
323 323 print("event monitor thread done!")
324 324
325 325 def initThrottle(self, throttle_value):
326 326
327 327 @throttle(seconds=throttle_value)
328 328 def sendDataThrottled(fn_sender, data):
329 329 fn_sender(data)
330 330
331 331 return sendDataThrottled
332 332
333 333 def send(self, data):
334 334 # print '[sending] data=%s size=%s' % (data.keys(), len(data['times']))
335 335 self.sender.send_pyobj(data)
336 336
337 337 def update(self):
338 338 t = self.dataOut.ltctime
339 339 self.data['times'].append(t)
340 340 self.data['dataOut'] = self.dataOut
341 341 for plottype in self.plottypes:
342 342 if plottype == 'spc':
343 343 z = self.dataOut.data_spc/self.dataOut.normFactor
344 344 self.data[plottype] = 10*numpy.log10(z)
345 345 self.data['noise'][t] = 10*numpy.log10(self.dataOut.getNoise()/self.dataOut.normFactor)
346 346 if plottype == 'rti':
347 347 self.data[plottype][t] = self.dataOut.getPower()
348 348 if plottype == 'snr':
349 349 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_SNR)
350 350 if plottype == 'dop':
351 351 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_DOP)
352 352 if plottype == 'coh':
353 353 self.data[plottype][t] = self.dataOut.getCoherence()
354 354 if plottype == 'phase':
355 355 self.data[plottype][t] = self.dataOut.getCoherence(phase=True)
356 356 if self.realtime:
357 <<<<<<< HEAD
358 self.data_web[plottype] = roundFloats(decimate(self.data[plottype][t]).tolist())
359 self.data_web['timestamp'] = t
360 =======
361 357 if plottype == 'spc':
362 358 self.data_web[plottype] = roundFloats(decimate(self.data[plottype]).tolist())
363 359 else:
364 360 self.data_web[plottype] = roundFloats(decimate(self.data[plottype][t]).tolist())
365 self.data_web['time'] = t
366 >>>>>>> f65929d2cf32d4dddb2d5fa2a72f3970d4d51812
367 361 self.data_web['interval'] = self.dataOut.getTimeInterval()
368 362 self.data_web['type'] = plottype
369 363
370 364 def run(self):
371 365
372 366 print '[Starting] {} from {}'.format(self.name, self.address)
373 367
374 368 self.context = zmq.Context()
375 369 self.receiver = self.context.socket(zmq.PULL)
376 370 self.receiver.bind(self.address)
377 371 monitor = self.receiver.get_monitor_socket()
378 372 self.sender = self.context.socket(zmq.PUB)
379 373 if self.realtime:
380 374 self.sender_web = self.context.socket(zmq.PUB)
381 375 self.sender_web.bind(self.plot_address)
382 376 self.sender.bind("ipc:///tmp/zmq.plots")
383 377
384 378 t = Thread(target=self.event_monitor, args=(monitor,))
385 379 t.start()
386 380
387 381 while True:
388 382 self.dataOut = self.receiver.recv_pyobj()
389 383 # print '[Receiving] {} - {}'.format(self.dataOut.type,
390 384 # self.dataOut.datatime.ctime())
391 385
392 386 self.update()
393 387
394 388 if self.dataOut.finished is True:
395 389 self.send(self.data)
396 390 self.connections -= 1
397 391 if self.connections == 0 and self.started:
398 392 self.ended = True
399 393 self.data['ENDED'] = True
400 394 self.send(self.data)
401 395 self.setup()
402 396 else:
403 397 if self.realtime:
404 398 self.send(self.data)
405 399 self.sender_web.send_string(json.dumps(self.data_web))
406 400 else:
407 401 self.sendData(self.send, self.data)
408 402 self.started = True
409 403
410 404 return
411 405
412 406 def sendToWeb(self):
413 407
414 408 if not self.isWebConfig:
415 409 context = zmq.Context()
416 410 sender_web_config = context.socket(zmq.PUB)
417 411 if 'tcp://' in self.plot_address:
418 412 dum, address, port = self.plot_address.split(':')
419 413 conf_address = '{}:{}:{}'.format(dum, address, int(port)+1)
420 414 else:
421 415 conf_address = self.plot_address + '.config'
422 416 sender_web_config.bind(conf_address)
423 <<<<<<< HEAD
424
425 =======
426 417 time.sleep(1)
427 >>>>>>> f65929d2cf32d4dddb2d5fa2a72f3970d4d51812
428 418 for kwargs in self.operationKwargs.values():
429 419 if 'plot' in kwargs:
430 420 print '[Sending] Config data to web for {}'.format(kwargs['code'].upper())
431 421 sender_web_config.send_string(json.dumps(kwargs))
432 422 self.isWebConfig = True
General Comments 0
You need to be logged in to leave comments. Login now