@@ -0,0 +1,102 | |||||
|
1 | ''' | |||
|
2 | @author: Juan C. Espinoza | |||
|
3 | ''' | |||
|
4 | ||||
|
5 | import time | |||
|
6 | import json | |||
|
7 | import numpy | |||
|
8 | import paho.mqtt.client as mqtt | |||
|
9 | ||||
|
10 | from schainpy.model.proc.jroproc_base import Operation | |||
|
11 | ||||
|
12 | ||||
|
13 | class PrettyFloat(float): | |||
|
14 | def __repr__(self): | |||
|
15 | return '%.2f' % self | |||
|
16 | ||||
|
17 | def pretty_floats(obj): | |||
|
18 | if isinstance(obj, float): | |||
|
19 | return PrettyFloat(obj) | |||
|
20 | elif isinstance(obj, dict): | |||
|
21 | return dict((k, pretty_floats(v)) for k, v in obj.items()) | |||
|
22 | elif isinstance(obj, (list, tuple)): | |||
|
23 | return map(pretty_floats, obj) | |||
|
24 | return obj | |||
|
25 | ||||
|
26 | class PublishData(Operation): | |||
|
27 | ||||
|
28 | __MAXNUMX = 80 | |||
|
29 | __MAXNUMY = 80 | |||
|
30 | ||||
|
31 | def __init__(self): | |||
|
32 | ||||
|
33 | Operation.__init__(self) | |||
|
34 | ||||
|
35 | self.isConfig = False | |||
|
36 | self.client = None | |||
|
37 | ||||
|
38 | @staticmethod | |||
|
39 | def on_disconnect(client, userdata, rc): | |||
|
40 | if rc != 0: | |||
|
41 | print("Unexpected disconnection.") | |||
|
42 | ||||
|
43 | def setup(self, host, port=1883, username=None, password=None, **kwargs): | |||
|
44 | ||||
|
45 | self.client = mqtt.Client() | |||
|
46 | try: | |||
|
47 | self.client.connect(host=host, port=port, keepalive=60*10, bind_address='') | |||
|
48 | except: | |||
|
49 | self.client = False | |||
|
50 | self.topic = kwargs.get('topic', 'schain') | |||
|
51 | self.delay = kwargs.get('delay', 0) | |||
|
52 | self.host = host | |||
|
53 | self.port = port | |||
|
54 | self.cnt = 0 | |||
|
55 | ||||
|
56 | def run(self, dataOut, host, datatype='data_spc', **kwargs): | |||
|
57 | ||||
|
58 | if not self.isConfig: | |||
|
59 | self.setup(host, **kwargs) | |||
|
60 | self.isConfig = True | |||
|
61 | ||||
|
62 | data = getattr(dataOut, datatype) | |||
|
63 | ||||
|
64 | z = data/dataOut.normFactor | |||
|
65 | zdB = 10*numpy.log10(z) | |||
|
66 | avg = numpy.average(z, axis=1) | |||
|
67 | avgdB = 10*numpy.log10(avg) | |||
|
68 | ||||
|
69 | xlen ,ylen = zdB[0].shape | |||
|
70 | ||||
|
71 | ||||
|
72 | dx = numpy.floor(xlen/self.__MAXNUMX) + 1 | |||
|
73 | dy = numpy.floor(ylen/self.__MAXNUMY) + 1 | |||
|
74 | ||||
|
75 | Z = [0 for i in dataOut.channelList] | |||
|
76 | AVG = [0 for i in dataOut.channelList] | |||
|
77 | ||||
|
78 | for i in dataOut.channelList: | |||
|
79 | Z[i] = zdB[i][::dx, ::dy].tolist() | |||
|
80 | AVG[i] = avgdB[i][::dy].tolist() | |||
|
81 | ||||
|
82 | payload = {'timestamp':dataOut.utctime, | |||
|
83 | 'data':pretty_floats(Z), | |||
|
84 | 'data_profile':pretty_floats(AVG), | |||
|
85 | 'channels': ['Ch %s' % ch for ch in dataOut.channelList], | |||
|
86 | 'interval': dataOut.getTimeInterval(), | |||
|
87 | } | |||
|
88 | ||||
|
89 | ||||
|
90 | #if self.cnt==self.interval and self.client: | |||
|
91 | print 'Publishing data to {}'.format(self.host) | |||
|
92 | self.client.publish(self.topic, json.dumps(payload), qos=0) | |||
|
93 | time.sleep(self.delay) | |||
|
94 | #self.cnt = 0 | |||
|
95 | #else: | |||
|
96 | # self.cnt += 1 | |||
|
97 | ||||
|
98 | ||||
|
99 | def close(self): | |||
|
100 | ||||
|
101 | if self.client: | |||
|
102 | self.client.disconnect() No newline at end of file |
@@ -4,4 +4,5 $Author: murco $ | |||||
4 | $Id: Processor.py 1 2012-11-12 18:56:07Z murco $ |
|
4 | $Id: Processor.py 1 2012-11-12 18:56:07Z murco $ | |
5 | ''' |
|
5 | ''' | |
6 |
|
6 | |||
7 | from jroutils_ftp import * No newline at end of file |
|
7 | from jroutils_ftp import * | |
|
8 | from jroutils_publish import PublishData No newline at end of file |
General Comments 0
You need to be logged in to leave comments.
Login now