##// END OF EJS Templates
Change PUB/SUB to REQ/REP for send data to web server
Juan C. Espinoza -
r1161:9b21f1781b2e
parent child
Show More
@@ -1,635 +1,661
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import time
6 6 import json
7 7 import numpy
8 8 import paho.mqtt.client as mqtt
9 9 import zmq
10 10 import datetime
11 11 from zmq.utils.monitor import recv_monitor_message
12 12 from functools import wraps
13 13 from threading import Thread
14 14 from multiprocessing import Process
15 15
16 16 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
17 17 from schainpy.model.data.jrodata import JROData
18 18 from schainpy.utils import log
19 19
20 20 MAXNUMX = 100
21 21 MAXNUMY = 100
22 22
23 23 class PrettyFloat(float):
24 24 def __repr__(self):
25 25 return '%.2f' % self
26 26
27 27 def roundFloats(obj):
28 28 if isinstance(obj, list):
29 29 return map(roundFloats, obj)
30 30 elif isinstance(obj, float):
31 31 return round(obj, 2)
32 32
33 33 def decimate(z, MAXNUMY):
34 34 dy = int(len(z[0])/MAXNUMY) + 1
35 35
36 36 return z[::, ::dy]
37 37
38 38 class throttle(object):
39 39 '''
40 40 Decorator that prevents a function from being called more than once every
41 41 time period.
42 42 To create a function that cannot be called more than once a minute, but
43 43 will sleep until it can be called:
44 44 @throttle(minutes=1)
45 45 def foo():
46 46 pass
47 47
48 48 for i in range(10):
49 49 foo()
50 50 print "This function has run %s times." % i
51 51 '''
52 52
53 53 def __init__(self, seconds=0, minutes=0, hours=0):
54 54 self.throttle_period = datetime.timedelta(
55 55 seconds=seconds, minutes=minutes, hours=hours
56 56 )
57 57
58 58 self.time_of_last_call = datetime.datetime.min
59 59
60 60 def __call__(self, fn):
61 61 @wraps(fn)
62 62 def wrapper(*args, **kwargs):
63 63 coerce = kwargs.pop('coerce', None)
64 64 if coerce:
65 65 self.time_of_last_call = datetime.datetime.now()
66 66 return fn(*args, **kwargs)
67 67 else:
68 68 now = datetime.datetime.now()
69 69 time_since_last_call = now - self.time_of_last_call
70 70 time_left = self.throttle_period - time_since_last_call
71 71
72 72 if time_left > datetime.timedelta(seconds=0):
73 73 return
74 74
75 75 self.time_of_last_call = datetime.datetime.now()
76 76 return fn(*args, **kwargs)
77 77
78 78 return wrapper
79 79
80 80 class Data(object):
81 81 '''
82 82 Object to hold data to be plotted
83 83 '''
84 84
85 85 def __init__(self, plottypes, throttle_value, exp_code):
86 86 self.plottypes = plottypes
87 87 self.throttle = throttle_value
88 88 self.exp_code = exp_code
89 89 self.ended = False
90 90 self.localtime = False
91 91 self.__times = []
92 92 self.__heights = []
93 93
94 94 def __str__(self):
95 95 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
96 96 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
97 97
98 98 def __len__(self):
99 99 return len(self.__times)
100 100
101 101 def __getitem__(self, key):
102 102 if key not in self.data:
103 103 raise KeyError(log.error('Missing key: {}'.format(key)))
104 104
105 105 if 'spc' in key:
106 106 ret = self.data[key]
107 107 else:
108 108 ret = numpy.array([self.data[key][x] for x in self.times])
109 109 if ret.ndim > 1:
110 110 ret = numpy.swapaxes(ret, 0, 1)
111 111 return ret
112 112
113 113 def __contains__(self, key):
114 114 return key in self.data
115 115
116 116 def setup(self):
117 117 '''
118 118 Configure object
119 119 '''
120 120
121 121 self.ended = False
122 122 self.data = {}
123 123 self.__times = []
124 124 self.__heights = []
125 125 self.__all_heights = set()
126 126 for plot in self.plottypes:
127 127 if 'snr' in plot:
128 128 plot = 'snr'
129 129 self.data[plot] = {}
130 130
131 131 def shape(self, key):
132 132 '''
133 133 Get the shape of the one-element data for the given key
134 134 '''
135 135
136 136 if len(self.data[key]):
137 137 if 'spc' in key:
138 138 return self.data[key].shape
139 139 return self.data[key][self.__times[0]].shape
140 140 return (0,)
141 141
142 142 def update(self, dataOut, tm):
143 143 '''
144 144 Update data object with new dataOut
145 145 '''
146 146
147 147 if tm in self.__times:
148 148 return
149 149
150 150 self.parameters = getattr(dataOut, 'parameters', [])
151 151 if hasattr(dataOut, 'pairsList'):
152 152 self.pairs = dataOut.pairsList
153 153 self.channels = dataOut.channelList
154 154 self.interval = dataOut.getTimeInterval()
155 155 self.localtime = dataOut.useLocalTime
156 156 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
157 157 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
158 158 self.__heights.append(dataOut.heightList)
159 159 self.__all_heights.update(dataOut.heightList)
160 160 self.__times.append(tm)
161 161
162 162 for plot in self.plottypes:
163 163 if plot == 'spc':
164 164 z = dataOut.data_spc/dataOut.normFactor
165 165 self.data[plot] = 10*numpy.log10(z)
166 166 if plot == 'cspc':
167 167 self.data[plot] = dataOut.data_cspc
168 168 if plot == 'noise':
169 169 self.data[plot][tm] = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
170 170 if plot == 'rti':
171 171 self.data[plot][tm] = dataOut.getPower()
172 172 if plot == 'snr_db':
173 173 self.data['snr'][tm] = dataOut.data_SNR
174 174 if plot == 'snr':
175 175 self.data[plot][tm] = 10*numpy.log10(dataOut.data_SNR)
176 176 if plot == 'dop':
177 177 self.data[plot][tm] = 10*numpy.log10(dataOut.data_DOP)
178 178 if plot == 'mean':
179 179 self.data[plot][tm] = dataOut.data_MEAN
180 180 if plot == 'std':
181 181 self.data[plot][tm] = dataOut.data_STD
182 182 if plot == 'coh':
183 183 self.data[plot][tm] = dataOut.getCoherence()
184 184 if plot == 'phase':
185 185 self.data[plot][tm] = dataOut.getCoherence(phase=True)
186 186 if plot == 'output':
187 187 self.data[plot][tm] = dataOut.data_output
188 188 if plot == 'param':
189 189 self.data[plot][tm] = dataOut.data_param
190 190
191 191 def normalize_heights(self):
192 192 '''
193 193 Ensure same-dimension of the data for different heighList
194 194 '''
195 195
196 196 H = numpy.array(list(self.__all_heights))
197 197 H.sort()
198 198 for key in self.data:
199 199 shape = self.shape(key)[:-1] + H.shape
200 200 for tm, obj in self.data[key].items():
201 201 h = self.__heights[self.__times.index(tm)]
202 202 if H.size == h.size:
203 203 continue
204 204 index = numpy.where(numpy.in1d(H, h))[0]
205 205 dummy = numpy.zeros(shape) + numpy.nan
206 206 if len(shape) == 2:
207 207 dummy[:, index] = obj
208 208 else:
209 209 dummy[index] = obj
210 210 self.data[key][tm] = dummy
211 211
212 212 self.__heights = [H for tm in self.__times]
213 213
214 214 def jsonify(self, decimate=False):
215 215 '''
216 216 Convert data to json
217 217 '''
218 218
219 219 data = {}
220 220 tm = self.times[-1]
221
221 dy = int(self.heights.size/MAXNUMY) + 1
222 222 for key in self.data:
223 223 if key in ('spc', 'cspc'):
224 224 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
225 dy = int(self.data[key].shape[2]/MAXNUMY) + 1
226 225 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
227 226 else:
228 227 data[key] = roundFloats(self.data[key][tm].tolist())
229 228
230 229 ret = {'data': data}
231 230 ret['exp_code'] = self.exp_code
232 231 ret['time'] = tm
233 232 ret['interval'] = self.interval
234 233 ret['localtime'] = self.localtime
235 ret['yrange'] = roundFloats(self.heights.tolist())
236 if key in ('spc', 'cspc'):
234 ret['yrange'] = roundFloats(self.heights[::dy].tolist())
235 if 'spc' in self.data or 'cspc' in self.data:
237 236 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
238 237 else:
239 238 ret['xrange'] = []
240 239 if hasattr(self, 'pairs'):
241 240 ret['pairs'] = self.pairs
242 241 else:
243 242 ret['pairs'] = []
244 243 return json.dumps(ret)
245 244
246 245 @property
247 246 def times(self):
248 247 '''
249 248 Return the list of times of the current data
250 249 '''
251 250
252 251 ret = numpy.array(self.__times)
253 252 ret.sort()
254 253 return ret
255 254
256 255 @property
257 256 def heights(self):
258 257 '''
259 258 Return the list of heights of the current data
260 259 '''
261 260
262 261 return numpy.array(self.__heights[-1])
263 262
264 263 class PublishData(Operation):
265 264 '''
266 265 Operation to send data over zmq.
267 266 '''
268 267
269 268 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
270 269
271 270 def __init__(self, **kwargs):
272 271 """Inicio."""
273 272 Operation.__init__(self, **kwargs)
274 273 self.isConfig = False
275 274 self.client = None
276 275 self.zeromq = None
277 276 self.mqtt = None
278 277
279 278 def on_disconnect(self, client, userdata, rc):
280 279 if rc != 0:
281 280 log.warning('Unexpected disconnection.')
282 281 self.connect()
283 282
284 283 def connect(self):
285 284 log.warning('trying to connect')
286 285 try:
287 286 self.client.connect(
288 287 host=self.host,
289 288 port=self.port,
290 289 keepalive=60*10,
291 290 bind_address='')
292 291 self.client.loop_start()
293 292 # self.client.publish(
294 293 # self.topic + 'SETUP',
295 294 # json.dumps(setup),
296 295 # retain=True
297 296 # )
298 297 except:
299 298 log.error('MQTT Conection error.')
300 299 self.client = False
301 300
302 301 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
303 302 self.counter = 0
304 303 self.topic = kwargs.get('topic', 'schain')
305 304 self.delay = kwargs.get('delay', 0)
306 305 self.plottype = kwargs.get('plottype', 'spectra')
307 306 self.host = kwargs.get('host', "10.10.10.82")
308 307 self.port = kwargs.get('port', 3000)
309 308 self.clientId = clientId
310 309 self.cnt = 0
311 310 self.zeromq = zeromq
312 311 self.mqtt = kwargs.get('plottype', 0)
313 312 self.client = None
314 313 self.verbose = verbose
315 314 setup = []
316 315 if mqtt is 1:
317 316 self.client = mqtt.Client(
318 317 client_id=self.clientId + self.topic + 'SCHAIN',
319 318 clean_session=True)
320 319 self.client.on_disconnect = self.on_disconnect
321 320 self.connect()
322 321 for plot in self.plottype:
323 322 setup.append({
324 323 'plot': plot,
325 324 'topic': self.topic + plot,
326 325 'title': getattr(self, plot + '_' + 'title', False),
327 326 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
328 327 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
329 328 'xrange': getattr(self, plot + '_' + 'xrange', False),
330 329 'yrange': getattr(self, plot + '_' + 'yrange', False),
331 330 'zrange': getattr(self, plot + '_' + 'zrange', False),
332 331 })
333 332 if zeromq is 1:
334 333 context = zmq.Context()
335 334 self.zmq_socket = context.socket(zmq.PUSH)
336 335 server = kwargs.get('server', 'zmq.pipe')
337 336
338 337 if 'tcp://' in server:
339 338 address = server
340 339 else:
341 340 address = 'ipc:///tmp/%s' % server
342 341
343 342 self.zmq_socket.connect(address)
344 343 time.sleep(1)
345 344
346 345
347 346 def publish_data(self):
348 347 self.dataOut.finished = False
349 348 if self.mqtt is 1:
350 349 yData = self.dataOut.heightList[:2].tolist()
351 350 if self.plottype == 'spectra':
352 351 data = getattr(self.dataOut, 'data_spc')
353 352 z = data/self.dataOut.normFactor
354 353 zdB = 10*numpy.log10(z)
355 354 xlen, ylen = zdB[0].shape
356 355 dx = int(xlen/MAXNUMX) + 1
357 356 dy = int(ylen/MAXNUMY) + 1
358 357 Z = [0 for i in self.dataOut.channelList]
359 358 for i in self.dataOut.channelList:
360 359 Z[i] = zdB[i][::dx, ::dy].tolist()
361 360 payload = {
362 361 'timestamp': self.dataOut.utctime,
363 362 'data': roundFloats(Z),
364 363 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
365 364 'interval': self.dataOut.getTimeInterval(),
366 365 'type': self.plottype,
367 366 'yData': yData
368 367 }
369 368
370 369 elif self.plottype in ('rti', 'power'):
371 370 data = getattr(self.dataOut, 'data_spc')
372 371 z = data/self.dataOut.normFactor
373 372 avg = numpy.average(z, axis=1)
374 373 avgdB = 10*numpy.log10(avg)
375 374 xlen, ylen = z[0].shape
376 375 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
377 376 AVG = [0 for i in self.dataOut.channelList]
378 377 for i in self.dataOut.channelList:
379 378 AVG[i] = avgdB[i][::dy].tolist()
380 379 payload = {
381 380 'timestamp': self.dataOut.utctime,
382 381 'data': roundFloats(AVG),
383 382 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
384 383 'interval': self.dataOut.getTimeInterval(),
385 384 'type': self.plottype,
386 385 'yData': yData
387 386 }
388 387 elif self.plottype == 'noise':
389 388 noise = self.dataOut.getNoise()/self.dataOut.normFactor
390 389 noisedB = 10*numpy.log10(noise)
391 390 payload = {
392 391 'timestamp': self.dataOut.utctime,
393 392 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
394 393 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
395 394 'interval': self.dataOut.getTimeInterval(),
396 395 'type': self.plottype,
397 396 'yData': yData
398 397 }
399 398 elif self.plottype == 'snr':
400 399 data = getattr(self.dataOut, 'data_SNR')
401 400 avgdB = 10*numpy.log10(data)
402 401
403 402 ylen = data[0].size
404 403 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
405 404 AVG = [0 for i in self.dataOut.channelList]
406 405 for i in self.dataOut.channelList:
407 406 AVG[i] = avgdB[i][::dy].tolist()
408 407 payload = {
409 408 'timestamp': self.dataOut.utctime,
410 409 'data': roundFloats(AVG),
411 410 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
412 411 'type': self.plottype,
413 412 'yData': yData
414 413 }
415 414 else:
416 415 print "Tipo de grafico invalido"
417 416 payload = {
418 417 'data': 'None',
419 418 'timestamp': 'None',
420 419 'type': None
421 420 }
422 421
423 422 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
424 423
425 424 if self.zeromq is 1:
426 425 if self.verbose:
427 426 log.log(
428 427 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
429 428 self.name
430 429 )
431 430 self.zmq_socket.send_pyobj(self.dataOut)
432 431
433 432 def run(self, dataOut, **kwargs):
434 433 self.dataOut = dataOut
435 434 if not self.isConfig:
436 435 self.setup(**kwargs)
437 436 self.isConfig = True
438 437
439 438 self.publish_data()
440 439 time.sleep(self.delay)
441 440
442 441 def close(self):
443 442 if self.zeromq is 1:
444 443 self.dataOut.finished = True
445 444 self.zmq_socket.send_pyobj(self.dataOut)
446 445 time.sleep(0.1)
447 446 self.zmq_socket.close()
448 447 if self.client:
449 448 self.client.loop_stop()
450 449 self.client.disconnect()
451 450
452 451
453 452 class ReceiverData(ProcessingUnit):
454 453
455 454 __attrs__ = ['server']
456 455
457 456 def __init__(self, **kwargs):
458 457
459 458 ProcessingUnit.__init__(self, **kwargs)
460 459
461 460 self.isConfig = False
462 461 server = kwargs.get('server', 'zmq.pipe')
463 462 if 'tcp://' in server:
464 463 address = server
465 464 else:
466 465 address = 'ipc:///tmp/%s' % server
467 466
468 467 self.address = address
469 468 self.dataOut = JROData()
470 469
471 470 def setup(self):
472 471
473 472 self.context = zmq.Context()
474 473 self.receiver = self.context.socket(zmq.PULL)
475 474 self.receiver.bind(self.address)
476 475 time.sleep(0.5)
477 476 log.success('ReceiverData from {}'.format(self.address))
478 477
479 478
480 479 def run(self):
481 480
482 481 if not self.isConfig:
483 482 self.setup()
484 483 self.isConfig = True
485 484
486 485 self.dataOut = self.receiver.recv_pyobj()
487 486 log.log('{} - {}'.format(self.dataOut.type,
488 487 self.dataOut.datatime.ctime(),),
489 488 'Receiving')
490 489
491 490
492 491 class PlotterReceiver(ProcessingUnit, Process):
493 492
494 493 throttle_value = 5
495 494 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
496 495 'exp_code', 'web_server']
497 496
498 497 def __init__(self, **kwargs):
499 498
500 499 ProcessingUnit.__init__(self, **kwargs)
501 500 Process.__init__(self)
502 501 self.mp = False
503 502 self.isConfig = False
504 503 self.isWebConfig = False
505 504 self.connections = 0
506 505 server = kwargs.get('server', 'zmq.pipe')
507 506 web_server = kwargs.get('web_server', None)
508 507 if 'tcp://' in server:
509 508 address = server
510 509 else:
511 510 address = 'ipc:///tmp/%s' % server
512 511 self.address = address
513 512 self.web_address = web_server
514 513 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
515 514 self.realtime = kwargs.get('realtime', False)
516 515 self.localtime = kwargs.get('localtime', True)
517 516 self.throttle_value = kwargs.get('throttle', 5)
518 517 self.exp_code = kwargs.get('exp_code', None)
519 518 self.sendData = self.initThrottle(self.throttle_value)
520 519 self.dates = []
521 520 self.setup()
522 521
523 522 def setup(self):
524 523
525 524 self.data = Data(self.plottypes, self.throttle_value, self.exp_code)
526 525 self.isConfig = True
527 526
528 527 def event_monitor(self, monitor):
529 528
530 529 events = {}
531 530
532 531 for name in dir(zmq):
533 532 if name.startswith('EVENT_'):
534 533 value = getattr(zmq, name)
535 534 events[value] = name
536 535
537 536 while monitor.poll():
538 537 evt = recv_monitor_message(monitor)
539 538 if evt['event'] == 32:
540 539 self.connections += 1
541 540 if evt['event'] == 512:
542 541 pass
543 542
544 543 evt.update({'description': events[evt['event']]})
545 544
546 545 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
547 546 break
548 547 monitor.close()
549 548 print('event monitor thread done!')
550 549
551 550 def initThrottle(self, throttle_value):
552 551
553 552 @throttle(seconds=throttle_value)
554 553 def sendDataThrottled(fn_sender, data):
555 554 fn_sender(data)
556 555
557 556 return sendDataThrottled
558 557
559 558 def send(self, data):
560 559 log.success('Sending {}'.format(data), self.name)
561 560 self.sender.send_pyobj(data)
562 561
563 562 def run(self):
564 563
565 564 log.success(
566 565 'Starting from {}'.format(self.address),
567 566 self.name
568 567 )
569 568
570 569 self.context = zmq.Context()
571 570 self.receiver = self.context.socket(zmq.PULL)
572 571 self.receiver.bind(self.address)
573 572 monitor = self.receiver.get_monitor_socket()
574 573 self.sender = self.context.socket(zmq.PUB)
575 574 if self.web_address:
576 575 log.success(
577 576 'Sending to web: {}'.format(self.web_address),
578 577 self.name
579 578 )
580 self.sender_web = self.context.socket(zmq.PUB)
579 self.sender_web = self.context.socket(zmq.REQ)
581 580 self.sender_web.connect(self.web_address)
581 self.poll = zmq.Poller()
582 self.poll.register(self.sender_web, zmq.POLLIN)
582 583 time.sleep(1)
583 584
584 585 if 'server' in self.kwargs:
585 586 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
586 587 else:
587 588 self.sender.bind("ipc:///tmp/zmq.plots")
588 589
589 590 time.sleep(2)
590 591
591 592 t = Thread(target=self.event_monitor, args=(monitor,))
592 593 t.start()
593 594
594 595 while True:
595 596 dataOut = self.receiver.recv_pyobj()
596 597 if not dataOut.flagNoData:
597 598 if dataOut.type == 'Parameters':
598 599 tm = dataOut.utctimeInit
599 600 else:
600 601 tm = dataOut.utctime
601 602 if dataOut.useLocalTime:
602 603 if not self.localtime:
603 604 tm += time.timezone
604 605 dt = datetime.datetime.fromtimestamp(tm).date()
605 606 else:
606 607 if self.localtime:
607 608 tm -= time.timezone
608 609 dt = datetime.datetime.utcfromtimestamp(tm).date()
609 610 coerce = False
610 611 if dt not in self.dates:
611 612 if self.data:
612 613 self.data.ended = True
613 614 self.send(self.data)
614 615 coerce = True
615 616 self.data.setup()
616 617 self.dates.append(dt)
617 618
618 619 self.data.update(dataOut, tm)
619 620
620 621 if dataOut.finished is True:
621 622 self.connections -= 1
622 623 if self.connections == 0 and dt in self.dates:
623 624 self.data.ended = True
624 625 self.send(self.data)
625 self.data.setup()
626 # self.data.setup()
627 time.sleep(1)
628 break
626 629 else:
627 630 if self.realtime:
628 631 self.send(self.data)
629 632 if self.web_address:
633 retries = 5
634 while True:
630 635 self.sender_web.send(self.data.jsonify())
636 socks = dict(self.poll.poll(5000))
637 if socks.get(self.sender_web) == zmq.POLLIN:
638 reply = self.sender_web.recv_string()
639 if reply == 'ok':
640 break
641 else:
642 print("Malformed reply from server: %s" % reply)
643
644 else:
645 print("No response from server, retrying...")
646 self.sender_web.setsockopt(zmq.LINGER, 0)
647 self.sender_web.close()
648 self.poll.unregister(self.sender_web)
649 retries -= 1
650 if retries == 0:
651 print("Server seems to be offline, abandoning")
652 break
653 self.sender_web = self.context.socket(zmq.REQ)
654 self.sender_web.connect(self.web_address)
655 self.poll.register(self.sender_web, zmq.POLLIN)
656 time.sleep(1)
631 657 else:
632 658 self.sendData(self.send, self.data, coerce=coerce)
633 659 coerce = False
634 660
635 661 return
General Comments 0
You need to be logged in to leave comments. Login now