##// END OF EJS Templates
Fix time when publishing Parameters
Juan C. Espinoza -
r1094:4c0b45256b7d
parent child
Show More
@@ -1,619 +1,631
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import time
6 6 import json
7 7 import numpy
8 8 import paho.mqtt.client as mqtt
9 9 import zmq
10 10 import datetime
11 11 from zmq.utils.monitor import recv_monitor_message
12 12 from functools import wraps
13 13 from threading import Thread
14 14 from multiprocessing import Process
15 15
16 16 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
17 17 from schainpy.model.data.jrodata import JROData
18 18 from schainpy.utils import log
19 19
20 20 MAXNUMX = 100
21 21 MAXNUMY = 100
22 22
23 23 class PrettyFloat(float):
24 24 def __repr__(self):
25 25 return '%.2f' % self
26 26
27 27 def roundFloats(obj):
28 28 if isinstance(obj, list):
29 29 return map(roundFloats, obj)
30 30 elif isinstance(obj, float):
31 31 return round(obj, 2)
32 32
33 33 def decimate(z, MAXNUMY):
34 34 dy = int(len(z[0])/MAXNUMY) + 1
35 35
36 36 return z[::, ::dy]
37 37
38 38 class throttle(object):
39 39 '''
40 40 Decorator that prevents a function from being called more than once every
41 41 time period.
42 42 To create a function that cannot be called more than once a minute, but
43 43 will sleep until it can be called:
44 44 @throttle(minutes=1)
45 45 def foo():
46 46 pass
47 47
48 48 for i in range(10):
49 49 foo()
50 50 print "This function has run %s times." % i
51 51 '''
52 52
53 53 def __init__(self, seconds=0, minutes=0, hours=0):
54 54 self.throttle_period = datetime.timedelta(
55 55 seconds=seconds, minutes=minutes, hours=hours
56 56 )
57 57
58 58 self.time_of_last_call = datetime.datetime.min
59 59
60 60 def __call__(self, fn):
61 61 @wraps(fn)
62 62 def wrapper(*args, **kwargs):
63 now = datetime.datetime.now()
64 time_since_last_call = now - self.time_of_last_call
65 time_left = self.throttle_period - time_since_last_call
63 coerce = kwargs.pop('coerce', None)
64 if coerce:
65 self.time_of_last_call = datetime.datetime.now()
66 return fn(*args, **kwargs)
67 else:
68 now = datetime.datetime.now()
69 time_since_last_call = now - self.time_of_last_call
70 time_left = self.throttle_period - time_since_last_call
66 71
67 if time_left > datetime.timedelta(seconds=0):
68 return
72 if time_left > datetime.timedelta(seconds=0):
73 return
69 74
70 75 self.time_of_last_call = datetime.datetime.now()
71 76 return fn(*args, **kwargs)
72 77
73 78 return wrapper
74 79
75 80 class Data(object):
76 81 '''
77 82 Object to hold data to be plotted
78 83 '''
79 84
80 85 def __init__(self, plottypes, throttle_value):
81 86 self.plottypes = plottypes
82 87 self.throttle = throttle_value
83 88 self.ended = False
84 89 self.localtime = False
85 90 self.__times = []
86 91 self.__heights = []
87 92
88 93 def __str__(self):
89 94 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
90 95 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
91 96
92 97 def __len__(self):
93 98 return len(self.__times)
94 99
95 100 def __getitem__(self, key):
96 101 if key not in self.data:
97 102 raise KeyError(log.error('Missing key: {}'.format(key)))
98 103
99 104 if 'spc' in key:
100 105 ret = self.data[key]
101 106 else:
102 107 ret = numpy.array([self.data[key][x] for x in self.times])
103 108 if ret.ndim > 1:
104 109 ret = numpy.swapaxes(ret, 0, 1)
105 110 return ret
106 111
112 def __contains__(self, key):
113 return key in self.data
114
107 115 def setup(self):
108 116 '''
109 117 Configure object
110 118 '''
111 119
112 120 self.ended = False
113 121 self.data = {}
114 122 self.__times = []
115 123 self.__heights = []
116 124 self.__all_heights = set()
117 125 for plot in self.plottypes:
118 126 if 'snr' in plot:
119 127 plot = 'snr'
120 128 self.data[plot] = {}
121 129
122 130 def shape(self, key):
123 131 '''
124 132 Get the shape of the one-element data for the given key
125 133 '''
126 134
127 135 if len(self.data[key]):
128 136 if 'spc' in key:
129 137 return self.data[key].shape
130 138 return self.data[key][self.__times[0]].shape
131 139 return (0,)
132 140
133 141 def update(self, dataOut):
134 142 '''
135 143 Update data object with new dataOut
136 144 '''
137 145
138 146 tm = dataOut.utctime
139 147 if tm in self.__times:
140 148 return
141 149
142 150 self.parameters = getattr(dataOut, 'parameters', [])
143 151 self.pairs = dataOut.pairsList
144 152 self.channels = dataOut.channelList
145 153 self.interval = dataOut.getTimeInterval()
146 154 self.localtime = dataOut.useLocalTime
147 155 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
148 156 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
149 157 self.__heights.append(dataOut.heightList)
150 158 self.__all_heights.update(dataOut.heightList)
151 159 self.__times.append(tm)
152 160
153 161 for plot in self.plottypes:
154 162 if plot == 'spc':
155 163 z = dataOut.data_spc/dataOut.normFactor
156 164 self.data[plot] = 10*numpy.log10(z)
157 165 if plot == 'cspc':
158 166 self.data[plot] = dataOut.data_cspc
159 167 if plot == 'noise':
160 168 self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
161 169 if plot == 'rti':
162 170 self.data[plot][tm] = dataOut.getPower()
163 171 if plot == 'snr_db':
164 172 self.data['snr'][tm] = dataOut.data_SNR
165 173 if plot == 'snr':
166 174 self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR)
167 175 if plot == 'dop':
168 176 self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP)
169 177 if plot == 'mean':
170 178 self.data[plot][tm] = dataOut.data_MEAN
171 179 if plot == 'std':
172 180 self.data[plot][tm] = dataOut.data_STD
173 181 if plot == 'coh':
174 182 self.data[plot][tm] = dataOut.getCoherence()
175 183 if plot == 'phase':
176 184 self.data[plot][tm] = dataOut.getCoherence(phase=True)
177 185 if plot == 'output':
178 186 self.data[plot][tm] = dataOut.data_output
179 187 if plot == 'param':
180 188 self.data[plot][tm] = dataOut.data_param
181 189
182 190 def normalize_heights(self):
183 191 '''
184 192 Ensure same-dimension of the data for different heighList
185 193 '''
186 194
187 195 H = numpy.array(list(self.__all_heights))
188 196 H.sort()
189 197 for key in self.data:
190 198 shape = self.shape(key)[:-1] + H.shape
191 199 for tm, obj in self.data[key].items():
192 200 h = self.__heights[self.__times.index(tm)]
193 201 if H.size == h.size:
194 202 continue
195 203 index = numpy.where(numpy.in1d(H, h))[0]
196 204 dummy = numpy.zeros(shape) + numpy.nan
197 205 if len(shape) == 2:
198 206 dummy[:, index] = obj
199 207 else:
200 208 dummy[index] = obj
201 209 self.data[key][tm] = dummy
202 210
203 211 self.__heights = [H for tm in self.__times]
204 212
205 213 def jsonify(self, decimate=False):
206 214 '''
207 215 Convert data to json
208 216 '''
209 217
210 218 ret = {}
211 219 tm = self.times[-1]
212 220
213 221 for key, value in self.data:
214 222 if key in ('spc', 'cspc'):
215 223 ret[key] = roundFloats(self.data[key].to_list())
216 224 else:
217 225 ret[key] = roundFloats(self.data[key][tm].to_list())
218 226
219 227 ret['timestamp'] = tm
220 228 ret['interval'] = self.interval
221 229
222 230 @property
223 231 def times(self):
224 232 '''
225 233 Return the list of times of the current data
226 234 '''
227 235
228 236 ret = numpy.array(self.__times)
229 237 ret.sort()
230 238 return ret
231 239
232 240 @property
233 241 def heights(self):
234 242 '''
235 243 Return the list of heights of the current data
236 244 '''
237 245
238 246 return numpy.array(self.__heights[-1])
239 247
240 248 class PublishData(Operation):
241 249 '''
242 250 Operation to send data over zmq.
243 251 '''
244 252
245 253 def __init__(self, **kwargs):
246 254 """Inicio."""
247 255 Operation.__init__(self, **kwargs)
248 256 self.isConfig = False
249 257 self.client = None
250 258 self.zeromq = None
251 259 self.mqtt = None
252 260
253 261 def on_disconnect(self, client, userdata, rc):
254 262 if rc != 0:
255 263 log.warning('Unexpected disconnection.')
256 264 self.connect()
257 265
258 266 def connect(self):
259 267 log.warning('trying to connect')
260 268 try:
261 269 self.client.connect(
262 270 host=self.host,
263 271 port=self.port,
264 272 keepalive=60*10,
265 273 bind_address='')
266 274 self.client.loop_start()
267 275 # self.client.publish(
268 276 # self.topic + 'SETUP',
269 277 # json.dumps(setup),
270 278 # retain=True
271 279 # )
272 280 except:
273 281 log.error('MQTT Conection error.')
274 282 self.client = False
275 283
276 284 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
277 285 self.counter = 0
278 286 self.topic = kwargs.get('topic', 'schain')
279 287 self.delay = kwargs.get('delay', 0)
280 288 self.plottype = kwargs.get('plottype', 'spectra')
281 289 self.host = kwargs.get('host', "10.10.10.82")
282 290 self.port = kwargs.get('port', 3000)
283 291 self.clientId = clientId
284 292 self.cnt = 0
285 293 self.zeromq = zeromq
286 294 self.mqtt = kwargs.get('plottype', 0)
287 295 self.client = None
288 296 self.verbose = verbose
289 297 setup = []
290 298 if mqtt is 1:
291 299 self.client = mqtt.Client(
292 300 client_id=self.clientId + self.topic + 'SCHAIN',
293 301 clean_session=True)
294 302 self.client.on_disconnect = self.on_disconnect
295 303 self.connect()
296 304 for plot in self.plottype:
297 305 setup.append({
298 306 'plot': plot,
299 307 'topic': self.topic + plot,
300 308 'title': getattr(self, plot + '_' + 'title', False),
301 309 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
302 310 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
303 311 'xrange': getattr(self, plot + '_' + 'xrange', False),
304 312 'yrange': getattr(self, plot + '_' + 'yrange', False),
305 313 'zrange': getattr(self, plot + '_' + 'zrange', False),
306 314 })
307 315 if zeromq is 1:
308 316 context = zmq.Context()
309 317 self.zmq_socket = context.socket(zmq.PUSH)
310 318 server = kwargs.get('server', 'zmq.pipe')
311 319
312 320 if 'tcp://' in server:
313 321 address = server
314 322 else:
315 323 address = 'ipc:///tmp/%s' % server
316 324
317 325 self.zmq_socket.connect(address)
318 326 time.sleep(1)
319 327
320 328
321 329 def publish_data(self):
322 330 self.dataOut.finished = False
323 331 if self.mqtt is 1:
324 332 yData = self.dataOut.heightList[:2].tolist()
325 333 if self.plottype == 'spectra':
326 334 data = getattr(self.dataOut, 'data_spc')
327 335 z = data/self.dataOut.normFactor
328 336 zdB = 10*numpy.log10(z)
329 337 xlen, ylen = zdB[0].shape
330 338 dx = int(xlen/MAXNUMX) + 1
331 339 dy = int(ylen/MAXNUMY) + 1
332 340 Z = [0 for i in self.dataOut.channelList]
333 341 for i in self.dataOut.channelList:
334 342 Z[i] = zdB[i][::dx, ::dy].tolist()
335 343 payload = {
336 344 'timestamp': self.dataOut.utctime,
337 345 'data': roundFloats(Z),
338 346 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
339 347 'interval': self.dataOut.getTimeInterval(),
340 348 'type': self.plottype,
341 349 'yData': yData
342 350 }
343 351
344 352 elif self.plottype in ('rti', 'power'):
345 353 data = getattr(self.dataOut, 'data_spc')
346 354 z = data/self.dataOut.normFactor
347 355 avg = numpy.average(z, axis=1)
348 356 avgdB = 10*numpy.log10(avg)
349 357 xlen, ylen = z[0].shape
350 358 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
351 359 AVG = [0 for i in self.dataOut.channelList]
352 360 for i in self.dataOut.channelList:
353 361 AVG[i] = avgdB[i][::dy].tolist()
354 362 payload = {
355 363 'timestamp': self.dataOut.utctime,
356 364 'data': roundFloats(AVG),
357 365 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
358 366 'interval': self.dataOut.getTimeInterval(),
359 367 'type': self.plottype,
360 368 'yData': yData
361 369 }
362 370 elif self.plottype == 'noise':
363 371 noise = self.dataOut.getNoise()/self.dataOut.normFactor
364 372 noisedB = 10*numpy.log10(noise)
365 373 payload = {
366 374 'timestamp': self.dataOut.utctime,
367 375 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
368 376 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
369 377 'interval': self.dataOut.getTimeInterval(),
370 378 'type': self.plottype,
371 379 'yData': yData
372 380 }
373 381 elif self.plottype == 'snr':
374 382 data = getattr(self.dataOut, 'data_SNR')
375 383 avgdB = 10*numpy.log10(data)
376 384
377 385 ylen = data[0].size
378 386 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
379 387 AVG = [0 for i in self.dataOut.channelList]
380 388 for i in self.dataOut.channelList:
381 389 AVG[i] = avgdB[i][::dy].tolist()
382 390 payload = {
383 391 'timestamp': self.dataOut.utctime,
384 392 'data': roundFloats(AVG),
385 393 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
386 394 'type': self.plottype,
387 395 'yData': yData
388 396 }
389 397 else:
390 398 print "Tipo de grafico invalido"
391 399 payload = {
392 400 'data': 'None',
393 401 'timestamp': 'None',
394 402 'type': None
395 403 }
396 404
397 405 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
398 406
399 407 if self.zeromq is 1:
400 408 if self.verbose:
401 409 log.log(
402 410 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
403 411 self.name
404 412 )
405 413 self.zmq_socket.send_pyobj(self.dataOut)
406 414
407 415 def run(self, dataOut, **kwargs):
408 416 self.dataOut = dataOut
409 417 if not self.isConfig:
410 418 self.setup(**kwargs)
411 419 self.isConfig = True
412 420
413 421 self.publish_data()
414 422 time.sleep(self.delay)
415 423
416 424 def close(self):
417 425 if self.zeromq is 1:
418 426 self.dataOut.finished = True
419 427 self.zmq_socket.send_pyobj(self.dataOut)
420 428 time.sleep(0.1)
421 429 self.zmq_socket.close()
422 430 if self.client:
423 431 self.client.loop_stop()
424 432 self.client.disconnect()
425 433
426 434
427 435 class ReceiverData(ProcessingUnit):
428 436
429 437 def __init__(self, **kwargs):
430 438
431 439 ProcessingUnit.__init__(self, **kwargs)
432 440
433 441 self.isConfig = False
434 442 server = kwargs.get('server', 'zmq.pipe')
435 443 if 'tcp://' in server:
436 444 address = server
437 445 else:
438 446 address = 'ipc:///tmp/%s' % server
439 447
440 448 self.address = address
441 449 self.dataOut = JROData()
442 450
443 451 def setup(self):
444 452
445 453 self.context = zmq.Context()
446 454 self.receiver = self.context.socket(zmq.PULL)
447 455 self.receiver.bind(self.address)
448 456 time.sleep(0.5)
449 457 log.success('ReceiverData from {}'.format(self.address))
450 458
451 459
452 460 def run(self):
453 461
454 462 if not self.isConfig:
455 463 self.setup()
456 464 self.isConfig = True
457 465
458 466 self.dataOut = self.receiver.recv_pyobj()
459 467 log.log('{} - {}'.format(self.dataOut.type,
460 468 self.dataOut.datatime.ctime(),),
461 469 'Receiving')
462 470
463 471
464 472 class PlotterReceiver(ProcessingUnit, Process):
465 473
466 474 throttle_value = 5
467 475
468 476 def __init__(self, **kwargs):
469 477
470 478 ProcessingUnit.__init__(self, **kwargs)
471 479 Process.__init__(self)
472 480 self.mp = False
473 481 self.isConfig = False
474 482 self.isWebConfig = False
475 483 self.connections = 0
476 484 server = kwargs.get('server', 'zmq.pipe')
477 485 plot_server = kwargs.get('plot_server', 'zmq.web')
478 486 if 'tcp://' in server:
479 487 address = server
480 488 else:
481 489 address = 'ipc:///tmp/%s' % server
482 490
483 491 if 'tcp://' in plot_server:
484 492 plot_address = plot_server
485 493 else:
486 494 plot_address = 'ipc:///tmp/%s' % plot_server
487 495
488 496 self.address = address
489 497 self.plot_address = plot_address
490 498 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
491 499 self.realtime = kwargs.get('realtime', False)
492 500 self.localtime = kwargs.get('localtime', True)
493 501 self.throttle_value = kwargs.get('throttle', 5)
494 502 self.sendData = self.initThrottle(self.throttle_value)
495 503 self.dates = []
496 504 self.setup()
497 505
498 506 def setup(self):
499 507
500 508 self.data = Data(self.plottypes, self.throttle_value)
501 509 self.isConfig = True
502 510
503 511 def event_monitor(self, monitor):
504 512
505 513 events = {}
506 514
507 515 for name in dir(zmq):
508 516 if name.startswith('EVENT_'):
509 517 value = getattr(zmq, name)
510 518 events[value] = name
511 519
512 520 while monitor.poll():
513 521 evt = recv_monitor_message(monitor)
514 522 if evt['event'] == 32:
515 523 self.connections += 1
516 524 if evt['event'] == 512:
517 525 pass
518 526
519 527 evt.update({'description': events[evt['event']]})
520 528
521 529 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
522 530 break
523 531 monitor.close()
524 532 print('event monitor thread done!')
525 533
526 534 def initThrottle(self, throttle_value):
527 535
528 536 @throttle(seconds=throttle_value)
529 537 def sendDataThrottled(fn_sender, data):
530 538 fn_sender(data)
531 539
532 540 return sendDataThrottled
533 541
534 542 def send(self, data):
535 543 log.success('Sending {}'.format(data), self.name)
536 544 self.sender.send_pyobj(data)
537 545
538 546 def run(self):
539 547
540 548 log.success(
541 549 'Starting from {}'.format(self.address),
542 550 self.name
543 551 )
544 552
545 553 self.context = zmq.Context()
546 554 self.receiver = self.context.socket(zmq.PULL)
547 555 self.receiver.bind(self.address)
548 556 monitor = self.receiver.get_monitor_socket()
549 557 self.sender = self.context.socket(zmq.PUB)
550 558 if self.realtime:
551 559 self.sender_web = self.context.socket(zmq.PUB)
552 560 self.sender_web.connect(self.plot_address)
553 561 time.sleep(1)
554 562
555 563 if 'server' in self.kwargs:
556 564 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
557 565 else:
558 566 self.sender.bind("ipc:///tmp/zmq.plots")
559 567
560 568 time.sleep(2)
561 569
562 570 t = Thread(target=self.event_monitor, args=(monitor,))
563 571 t.start()
564 572
565 573 while True:
566 574 dataOut = self.receiver.recv_pyobj()
567 tm = dataOut.utctime
568 if dataOut.useLocalTime:
569 if not self.localtime:
570 tm += time.timezone
571 dt = datetime.datetime.fromtimestamp(tm).date()
572 else:
573 if self.localtime:
574 tm -= time.timezone
575 dt = datetime.datetime.utcfromtimestamp(tm).date()
576 sended = False
577 if dt not in self.dates:
578 if self.data:
579 self.data.ended = True
580 self.send(self.data)
581 sended = True
582 self.data.setup()
583 self.dates.append(dt)
584
585 self.data.update(dataOut)
575 if not dataOut.flagNoData:
576 if dataOut.type == 'Parameters':
577 tm = dataOut.utctimeInit
578 else:
579 tm = dataOut.utctime
580 if dataOut.useLocalTime:
581 if not self.localtime:
582 tm += time.timezone
583 dt = datetime.datetime.fromtimestamp(tm).date()
584 else:
585 if self.localtime:
586 tm -= time.timezone
587 dt = datetime.datetime.utcfromtimestamp(tm).date()
588 coerce = False
589 if dt not in self.dates:
590 if self.data:
591 self.data.ended = True
592 self.send(self.data)
593 coerce = True
594 self.data.setup()
595 self.dates.append(dt)
586 596
597 self.data.update(dataOut)
598
587 599 if dataOut.finished is True:
588 600 self.connections -= 1
589 601 if self.connections == 0 and dt in self.dates:
590 602 self.data.ended = True
591 603 self.send(self.data)
592 604 self.data.setup()
593 605 else:
594 606 if self.realtime:
595 607 self.send(self.data)
596 608 # self.sender_web.send_string(self.data.jsonify())
597 else:
598 if not sended:
599 self.sendData(self.send, self.data)
609 else:
610 self.sendData(self.send, self.data, coerce=coerce)
611 coerce = False
600 612
601 613 return
602 614
603 615 def sendToWeb(self):
604 616
605 617 if not self.isWebConfig:
606 618 context = zmq.Context()
607 619 sender_web_config = context.socket(zmq.PUB)
608 620 if 'tcp://' in self.plot_address:
609 621 dum, address, port = self.plot_address.split(':')
610 622 conf_address = '{}:{}:{}'.format(dum, address, int(port)+1)
611 623 else:
612 624 conf_address = self.plot_address + '.config'
613 625 sender_web_config.bind(conf_address)
614 626 time.sleep(1)
615 627 for kwargs in self.operationKwargs.values():
616 628 if 'plot' in kwargs:
617 629 log.success('[Sending] Config data to web for {}'.format(kwargs['code'].upper()))
618 630 sender_web_config.send_string(json.dumps(kwargs))
619 631 self.isWebConfig = True
General Comments 0
You need to be logged in to leave comments. Login now