##// END OF EJS Templates
Fix publish to web_server
Juan C. Espinoza -
r1114:f58248fe43e5
parent child
Show More
@@ -1,636 +1,627
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import time
6 6 import json
7 7 import numpy
8 8 import paho.mqtt.client as mqtt
9 9 import zmq
10 10 import datetime
11 11 from zmq.utils.monitor import recv_monitor_message
12 12 from functools import wraps
13 13 from threading import Thread
14 14 from multiprocessing import Process
15 15
16 16 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
17 17 from schainpy.model.data.jrodata import JROData
18 18 from schainpy.utils import log
19 19
20 20 MAXNUMX = 100
21 21 MAXNUMY = 100
22 22
23 23 class PrettyFloat(float):
24 24 def __repr__(self):
25 25 return '%.2f' % self
26 26
27 27 def roundFloats(obj):
28 28 if isinstance(obj, list):
29 29 return map(roundFloats, obj)
30 30 elif isinstance(obj, float):
31 31 return round(obj, 2)
32 32
33 33 def decimate(z, MAXNUMY):
34 34 dy = int(len(z[0])/MAXNUMY) + 1
35 35
36 36 return z[::, ::dy]
37 37
38 38 class throttle(object):
39 39 '''
40 40 Decorator that prevents a function from being called more than once every
41 41 time period.
42 42 To create a function that cannot be called more than once a minute, but
43 43 will sleep until it can be called:
44 44 @throttle(minutes=1)
45 45 def foo():
46 46 pass
47 47
48 48 for i in range(10):
49 49 foo()
50 50 print "This function has run %s times." % i
51 51 '''
52 52
53 53 def __init__(self, seconds=0, minutes=0, hours=0):
54 54 self.throttle_period = datetime.timedelta(
55 55 seconds=seconds, minutes=minutes, hours=hours
56 56 )
57 57
58 58 self.time_of_last_call = datetime.datetime.min
59 59
60 60 def __call__(self, fn):
61 61 @wraps(fn)
62 62 def wrapper(*args, **kwargs):
63 63 coerce = kwargs.pop('coerce', None)
64 64 if coerce:
65 65 self.time_of_last_call = datetime.datetime.now()
66 66 return fn(*args, **kwargs)
67 67 else:
68 68 now = datetime.datetime.now()
69 69 time_since_last_call = now - self.time_of_last_call
70 70 time_left = self.throttle_period - time_since_last_call
71 71
72 72 if time_left > datetime.timedelta(seconds=0):
73 73 return
74 74
75 75 self.time_of_last_call = datetime.datetime.now()
76 76 return fn(*args, **kwargs)
77 77
78 78 return wrapper
79 79
80 80 class Data(object):
81 81 '''
82 82 Object to hold data to be plotted
83 83 '''
84 84
85 def __init__(self, plottypes, throttle_value):
85 def __init__(self, plottypes, throttle_value, exp_code):
86 86 self.plottypes = plottypes
87 87 self.throttle = throttle_value
88 self.exp_code = exp_code
88 89 self.ended = False
89 90 self.localtime = False
90 91 self.__times = []
91 92 self.__heights = []
92 93
93 94 def __str__(self):
94 95 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
95 96 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
96 97
97 98 def __len__(self):
98 99 return len(self.__times)
99 100
100 101 def __getitem__(self, key):
101 102 if key not in self.data:
102 103 raise KeyError(log.error('Missing key: {}'.format(key)))
103 104
104 105 if 'spc' in key:
105 106 ret = self.data[key]
106 107 else:
107 108 ret = numpy.array([self.data[key][x] for x in self.times])
108 109 if ret.ndim > 1:
109 110 ret = numpy.swapaxes(ret, 0, 1)
110 111 return ret
111 112
112 113 def __contains__(self, key):
113 114 return key in self.data
114 115
115 116 def setup(self):
116 117 '''
117 118 Configure object
118 119 '''
119 120
120 121 self.ended = False
121 122 self.data = {}
122 123 self.__times = []
123 124 self.__heights = []
124 125 self.__all_heights = set()
125 126 for plot in self.plottypes:
126 127 if 'snr' in plot:
127 128 plot = 'snr'
128 129 self.data[plot] = {}
129 130
130 131 def shape(self, key):
131 132 '''
132 133 Get the shape of the one-element data for the given key
133 134 '''
134 135
135 136 if len(self.data[key]):
136 137 if 'spc' in key:
137 138 return self.data[key].shape
138 139 return self.data[key][self.__times[0]].shape
139 140 return (0,)
140 141
141 142 def update(self, dataOut, tm):
142 143 '''
143 144 Update data object with new dataOut
144 145 '''
145 146
146 147 if tm in self.__times:
147 148 return
148 149
149 150 self.parameters = getattr(dataOut, 'parameters', [])
150 151 if hasattr(dataOut, 'pairsList'):
151 152 self.pairs = dataOut.pairsList
152 153 self.channels = dataOut.channelList
153 154 self.interval = dataOut.getTimeInterval()
154 155 self.localtime = dataOut.useLocalTime
155 156 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
156 157 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
157 158 self.__heights.append(dataOut.heightList)
158 159 self.__all_heights.update(dataOut.heightList)
159 160 self.__times.append(tm)
160 161
161 162 for plot in self.plottypes:
162 163 if plot == 'spc':
163 164 z = dataOut.data_spc/dataOut.normFactor
164 165 self.data[plot] = 10*numpy.log10(z)
165 166 if plot == 'cspc':
166 167 self.data[plot] = dataOut.data_cspc
167 168 if plot == 'noise':
168 169 self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
169 170 if plot == 'rti':
170 171 self.data[plot][tm] = dataOut.getPower()
171 172 if plot == 'snr_db':
172 173 self.data['snr'][tm] = dataOut.data_SNR
173 174 if plot == 'snr':
174 175 self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR)
175 176 if plot == 'dop':
176 177 self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP)
177 178 if plot == 'mean':
178 179 self.data[plot][tm] = dataOut.data_MEAN
179 180 if plot == 'std':
180 181 self.data[plot][tm] = dataOut.data_STD
181 182 if plot == 'coh':
182 183 self.data[plot][tm] = dataOut.getCoherence()
183 184 if plot == 'phase':
184 185 self.data[plot][tm] = dataOut.getCoherence(phase=True)
185 186 if plot == 'output':
186 187 self.data[plot][tm] = dataOut.data_output
187 188 if plot == 'param':
188 189 self.data[plot][tm] = dataOut.data_param
189 190
190 191 def normalize_heights(self):
191 192 '''
192 193 Ensure same-dimension of the data for different heighList
193 194 '''
194 195
195 196 H = numpy.array(list(self.__all_heights))
196 197 H.sort()
197 198 for key in self.data:
198 199 shape = self.shape(key)[:-1] + H.shape
199 200 for tm, obj in self.data[key].items():
200 201 h = self.__heights[self.__times.index(tm)]
201 202 if H.size == h.size:
202 203 continue
203 204 index = numpy.where(numpy.in1d(H, h))[0]
204 205 dummy = numpy.zeros(shape) + numpy.nan
205 206 if len(shape) == 2:
206 207 dummy[:, index] = obj
207 208 else:
208 209 dummy[index] = obj
209 210 self.data[key][tm] = dummy
210 211
211 212 self.__heights = [H for tm in self.__times]
212 213
213 214 def jsonify(self, decimate=False):
214 215 '''
215 216 Convert data to json
216 217 '''
217 218
218 ret = {}
219 data = {}
219 220 tm = self.times[-1]
220 221
221 for key, value in self.data:
222 for key in self.data:
222 223 if key in ('spc', 'cspc'):
223 ret[key] = roundFloats(self.data[key].to_list())
224 data[key] = roundFloats(self.data[key].tolist())
224 225 else:
225 ret[key] = roundFloats(self.data[key][tm].to_list())
226 data[key] = roundFloats(self.data[key][tm].tolist())
226 227
227 ret['timestamp'] = tm
228 ret = {'data': data}
229 ret['exp_code'] = self.exp_code
230 ret['time'] = tm
228 231 ret['interval'] = self.interval
232 ret['ymin'] = self.heights[0]
233 ret['ymax'] = self.heights[-1]
234 ret['ystep'] = self.heights[1] - self.heights[0]
235
236 return json.dumps(ret)
229 237
230 238 @property
231 239 def times(self):
232 240 '''
233 241 Return the list of times of the current data
234 242 '''
235 243
236 244 ret = numpy.array(self.__times)
237 245 ret.sort()
238 246 return ret
239 247
240 248 @property
241 249 def heights(self):
242 250 '''
243 251 Return the list of heights of the current data
244 252 '''
245 253
246 254 return numpy.array(self.__heights[-1])
247 255
248 256 class PublishData(Operation):
249 257 '''
250 258 Operation to send data over zmq.
251 259 '''
252 260
253 261 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
254 262
255 263 def __init__(self, **kwargs):
256 264 """Inicio."""
257 265 Operation.__init__(self, **kwargs)
258 266 self.isConfig = False
259 267 self.client = None
260 268 self.zeromq = None
261 269 self.mqtt = None
262 270
263 271 def on_disconnect(self, client, userdata, rc):
264 272 if rc != 0:
265 273 log.warning('Unexpected disconnection.')
266 274 self.connect()
267 275
268 276 def connect(self):
269 277 log.warning('trying to connect')
270 278 try:
271 279 self.client.connect(
272 280 host=self.host,
273 281 port=self.port,
274 282 keepalive=60*10,
275 283 bind_address='')
276 284 self.client.loop_start()
277 285 # self.client.publish(
278 286 # self.topic + 'SETUP',
279 287 # json.dumps(setup),
280 288 # retain=True
281 289 # )
282 290 except:
283 291 log.error('MQTT Conection error.')
284 292 self.client = False
285 293
286 294 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
287 295 self.counter = 0
288 296 self.topic = kwargs.get('topic', 'schain')
289 297 self.delay = kwargs.get('delay', 0)
290 298 self.plottype = kwargs.get('plottype', 'spectra')
291 299 self.host = kwargs.get('host', "10.10.10.82")
292 300 self.port = kwargs.get('port', 3000)
293 301 self.clientId = clientId
294 302 self.cnt = 0
295 303 self.zeromq = zeromq
296 304 self.mqtt = kwargs.get('plottype', 0)
297 305 self.client = None
298 306 self.verbose = verbose
299 307 setup = []
300 308 if mqtt is 1:
301 309 self.client = mqtt.Client(
302 310 client_id=self.clientId + self.topic + 'SCHAIN',
303 311 clean_session=True)
304 312 self.client.on_disconnect = self.on_disconnect
305 313 self.connect()
306 314 for plot in self.plottype:
307 315 setup.append({
308 316 'plot': plot,
309 317 'topic': self.topic + plot,
310 318 'title': getattr(self, plot + '_' + 'title', False),
311 319 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
312 320 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
313 321 'xrange': getattr(self, plot + '_' + 'xrange', False),
314 322 'yrange': getattr(self, plot + '_' + 'yrange', False),
315 323 'zrange': getattr(self, plot + '_' + 'zrange', False),
316 324 })
317 325 if zeromq is 1:
318 326 context = zmq.Context()
319 327 self.zmq_socket = context.socket(zmq.PUSH)
320 328 server = kwargs.get('server', 'zmq.pipe')
321 329
322 330 if 'tcp://' in server:
323 331 address = server
324 332 else:
325 333 address = 'ipc:///tmp/%s' % server
326 334
327 335 self.zmq_socket.connect(address)
328 336 time.sleep(1)
329 337
330 338
331 339 def publish_data(self):
332 340 self.dataOut.finished = False
333 341 if self.mqtt is 1:
334 342 yData = self.dataOut.heightList[:2].tolist()
335 343 if self.plottype == 'spectra':
336 344 data = getattr(self.dataOut, 'data_spc')
337 345 z = data/self.dataOut.normFactor
338 346 zdB = 10*numpy.log10(z)
339 347 xlen, ylen = zdB[0].shape
340 348 dx = int(xlen/MAXNUMX) + 1
341 349 dy = int(ylen/MAXNUMY) + 1
342 350 Z = [0 for i in self.dataOut.channelList]
343 351 for i in self.dataOut.channelList:
344 352 Z[i] = zdB[i][::dx, ::dy].tolist()
345 353 payload = {
346 354 'timestamp': self.dataOut.utctime,
347 355 'data': roundFloats(Z),
348 356 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
349 357 'interval': self.dataOut.getTimeInterval(),
350 358 'type': self.plottype,
351 359 'yData': yData
352 360 }
353 361
354 362 elif self.plottype in ('rti', 'power'):
355 363 data = getattr(self.dataOut, 'data_spc')
356 364 z = data/self.dataOut.normFactor
357 365 avg = numpy.average(z, axis=1)
358 366 avgdB = 10*numpy.log10(avg)
359 367 xlen, ylen = z[0].shape
360 368 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
361 369 AVG = [0 for i in self.dataOut.channelList]
362 370 for i in self.dataOut.channelList:
363 371 AVG[i] = avgdB[i][::dy].tolist()
364 372 payload = {
365 373 'timestamp': self.dataOut.utctime,
366 374 'data': roundFloats(AVG),
367 375 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
368 376 'interval': self.dataOut.getTimeInterval(),
369 377 'type': self.plottype,
370 378 'yData': yData
371 379 }
372 380 elif self.plottype == 'noise':
373 381 noise = self.dataOut.getNoise()/self.dataOut.normFactor
374 382 noisedB = 10*numpy.log10(noise)
375 383 payload = {
376 384 'timestamp': self.dataOut.utctime,
377 385 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
378 386 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
379 387 'interval': self.dataOut.getTimeInterval(),
380 388 'type': self.plottype,
381 389 'yData': yData
382 390 }
383 391 elif self.plottype == 'snr':
384 392 data = getattr(self.dataOut, 'data_SNR')
385 393 avgdB = 10*numpy.log10(data)
386 394
387 395 ylen = data[0].size
388 396 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
389 397 AVG = [0 for i in self.dataOut.channelList]
390 398 for i in self.dataOut.channelList:
391 399 AVG[i] = avgdB[i][::dy].tolist()
392 400 payload = {
393 401 'timestamp': self.dataOut.utctime,
394 402 'data': roundFloats(AVG),
395 403 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
396 404 'type': self.plottype,
397 405 'yData': yData
398 406 }
399 407 else:
400 408 print "Tipo de grafico invalido"
401 409 payload = {
402 410 'data': 'None',
403 411 'timestamp': 'None',
404 412 'type': None
405 413 }
406 414
407 415 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
408 416
409 417 if self.zeromq is 1:
410 418 if self.verbose:
411 419 log.log(
412 420 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
413 421 self.name
414 422 )
415 423 self.zmq_socket.send_pyobj(self.dataOut)
416 424
417 425 def run(self, dataOut, **kwargs):
418 426 self.dataOut = dataOut
419 427 if not self.isConfig:
420 428 self.setup(**kwargs)
421 429 self.isConfig = True
422 430
423 431 self.publish_data()
424 432 time.sleep(self.delay)
425 433
426 434 def close(self):
427 435 if self.zeromq is 1:
428 436 self.dataOut.finished = True
429 437 self.zmq_socket.send_pyobj(self.dataOut)
430 438 time.sleep(0.1)
431 439 self.zmq_socket.close()
432 440 if self.client:
433 441 self.client.loop_stop()
434 442 self.client.disconnect()
435 443
436 444
437 445 class ReceiverData(ProcessingUnit):
438 446
439 447 __attrs__ = ['server']
440 448
441 449 def __init__(self, **kwargs):
442 450
443 451 ProcessingUnit.__init__(self, **kwargs)
444 452
445 453 self.isConfig = False
446 454 server = kwargs.get('server', 'zmq.pipe')
447 455 if 'tcp://' in server:
448 456 address = server
449 457 else:
450 458 address = 'ipc:///tmp/%s' % server
451 459
452 460 self.address = address
453 461 self.dataOut = JROData()
454 462
455 463 def setup(self):
456 464
457 465 self.context = zmq.Context()
458 466 self.receiver = self.context.socket(zmq.PULL)
459 467 self.receiver.bind(self.address)
460 468 time.sleep(0.5)
461 469 log.success('ReceiverData from {}'.format(self.address))
462 470
463 471
464 472 def run(self):
465 473
466 474 if not self.isConfig:
467 475 self.setup()
468 476 self.isConfig = True
469 477
470 478 self.dataOut = self.receiver.recv_pyobj()
471 479 log.log('{} - {}'.format(self.dataOut.type,
472 480 self.dataOut.datatime.ctime(),),
473 481 'Receiving')
474 482
475 483
476 484 class PlotterReceiver(ProcessingUnit, Process):
477 485
478 486 throttle_value = 5
479 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle']
487 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
488 'exp_code', 'web_server']
480 489
481 490 def __init__(self, **kwargs):
482 491
483 492 ProcessingUnit.__init__(self, **kwargs)
484 493 Process.__init__(self)
485 494 self.mp = False
486 495 self.isConfig = False
487 496 self.isWebConfig = False
488 497 self.connections = 0
489 498 server = kwargs.get('server', 'zmq.pipe')
490 plot_server = kwargs.get('plot_server', 'zmq.web')
499 web_server = kwargs.get('web_server', None)
491 500 if 'tcp://' in server:
492 501 address = server
493 502 else:
494 503 address = 'ipc:///tmp/%s' % server
495
496 if 'tcp://' in plot_server:
497 plot_address = plot_server
498 else:
499 plot_address = 'ipc:///tmp/%s' % plot_server
500
501 504 self.address = address
502 self.plot_address = plot_address
503 self.plottypes = [s.strip() for s in kwargs.get('plottypes', '').split(',') if s]
505 self.web_address = web_server
506 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
504 507 self.realtime = kwargs.get('realtime', False)
505 508 self.localtime = kwargs.get('localtime', True)
506 509 self.throttle_value = kwargs.get('throttle', 5)
510 self.exp_code = kwargs.get('exp_code', None)
507 511 self.sendData = self.initThrottle(self.throttle_value)
508 512 self.dates = []
509 513 self.setup()
510 514
511 515 def setup(self):
512 516
513 self.data = Data(self.plottypes, self.throttle_value)
517 self.data = Data(self.plottypes, self.throttle_value, self.exp_code)
514 518 self.isConfig = True
515 519
516 520 def event_monitor(self, monitor):
517 521
518 522 events = {}
519 523
520 524 for name in dir(zmq):
521 525 if name.startswith('EVENT_'):
522 526 value = getattr(zmq, name)
523 527 events[value] = name
524 528
525 529 while monitor.poll():
526 530 evt = recv_monitor_message(monitor)
527 531 if evt['event'] == 32:
528 532 self.connections += 1
529 533 if evt['event'] == 512:
530 534 pass
531 535
532 536 evt.update({'description': events[evt['event']]})
533 537
534 538 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
535 539 break
536 540 monitor.close()
537 541 print('event monitor thread done!')
538 542
539 543 def initThrottle(self, throttle_value):
540 544
541 545 @throttle(seconds=throttle_value)
542 546 def sendDataThrottled(fn_sender, data):
543 547 fn_sender(data)
544 548
545 549 return sendDataThrottled
546 550
547 551 def send(self, data):
548 552 log.success('Sending {}'.format(data), self.name)
549 553 self.sender.send_pyobj(data)
550 554
551 555 def run(self):
552 556
553 557 log.success(
554 558 'Starting from {}'.format(self.address),
555 559 self.name
556 560 )
557 561
558 562 self.context = zmq.Context()
559 563 self.receiver = self.context.socket(zmq.PULL)
560 564 self.receiver.bind(self.address)
561 565 monitor = self.receiver.get_monitor_socket()
562 566 self.sender = self.context.socket(zmq.PUB)
563 if self.realtime:
567 if self.web_address:
568 log.success(
569 'Sending to web: {}'.format(self.web_address),
570 self.name
571 )
564 572 self.sender_web = self.context.socket(zmq.PUB)
565 self.sender_web.connect(self.plot_address)
573 self.sender_web.connect(self.web_address)
566 574 time.sleep(1)
567 575
568 576 if 'server' in self.kwargs:
569 577 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
570 578 else:
571 579 self.sender.bind("ipc:///tmp/zmq.plots")
572 580
573 581 time.sleep(2)
574 582
575 583 t = Thread(target=self.event_monitor, args=(monitor,))
576 584 t.start()
577 585
578 586 while True:
579 587 dataOut = self.receiver.recv_pyobj()
580 588 if not dataOut.flagNoData:
581 589 if dataOut.type == 'Parameters':
582 590 tm = dataOut.utctimeInit
583 591 else:
584 592 tm = dataOut.utctime
585 593 if dataOut.useLocalTime:
586 594 if not self.localtime:
587 595 tm += time.timezone
588 596 dt = datetime.datetime.fromtimestamp(tm).date()
589 597 else:
590 598 if self.localtime:
591 599 tm -= time.timezone
592 600 dt = datetime.datetime.utcfromtimestamp(tm).date()
593 601 coerce = False
594 602 if dt not in self.dates:
595 603 if self.data:
596 604 self.data.ended = True
597 605 self.send(self.data)
598 606 coerce = True
599 607 self.data.setup()
600 608 self.dates.append(dt)
601 609
602 610 self.data.update(dataOut, tm)
603 611
604 612 if dataOut.finished is True:
605 613 self.connections -= 1
606 614 if self.connections == 0 and dt in self.dates:
607 615 self.data.ended = True
608 616 self.send(self.data)
609 617 self.data.setup()
610 618 else:
611 619 if self.realtime:
612 620 self.send(self.data)
613 # self.sender_web.send_string(self.data.jsonify())
621 if self.web_address:
622 self.sender_web.send(self.data.jsonify())
614 623 else:
615 624 self.sendData(self.send, self.data, coerce=coerce)
616 625 coerce = False
617 626
618 627 return
619
620 def sendToWeb(self):
621
622 if not self.isWebConfig:
623 context = zmq.Context()
624 sender_web_config = context.socket(zmq.PUB)
625 if 'tcp://' in self.plot_address:
626 dum, address, port = self.plot_address.split(':')
627 conf_address = '{}:{}:{}'.format(dum, address, int(port)+1)
628 else:
629 conf_address = self.plot_address + '.config'
630 sender_web_config.bind(conf_address)
631 time.sleep(1)
632 for kwargs in self.operationKwargs.values():
633 if 'plot' in kwargs:
634 log.success('[Sending] Config data to web for {}'.format(kwargs['code'].upper()))
635 sender_web_config.send_string(json.dumps(kwargs))
636 self.isWebConfig = True
General Comments 0
You need to be logged in to leave comments. Login now