##// END OF EJS Templates
zmq support in PublishData
Juan C. Valdez -
r883:4d54fdaec8c6
parent child
Show More
@@ -6,8 +6,17 import time
6 import json
6 import json
7 import numpy
7 import numpy
8 import paho.mqtt.client as mqtt
8 import paho.mqtt.client as mqtt
9 import zmq
10 import cPickle as pickle
11 import datetime
12 from zmq.utils.monitor import recv_monitor_message
13 from functools import wraps
14 from threading import Thread
15 from multiprocessing import Process
9
16
10 from schainpy.model.proc.jroproc_base import Operation
17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
18
19 throttle_value = 10
11
20
12 class PrettyFloat(float):
21 class PrettyFloat(float):
13 def __repr__(self):
22 def __repr__(self):
@@ -28,18 +37,55 def pretty_floats(obj):
28 return map(pretty_floats, obj)
37 return map(pretty_floats, obj)
29 return obj
38 return obj
30
39
40 class throttle(object):
41 """Decorator that prevents a function from being called more than once every
42 time period.
43 To create a function that cannot be called more than once a minute, but
44 will sleep until it can be called:
45 @throttle(minutes=1)
46 def foo():
47 pass
48
49 for i in range(10):
50 foo()
51 print "This function has run %s times." % i
52 """
53
54 def __init__(self, seconds=0, minutes=0, hours=0):
55 self.throttle_period = datetime.timedelta(
56 seconds=seconds, minutes=minutes, hours=hours
57 )
58 self.time_of_last_call = datetime.datetime.min
59
60 def __call__(self, fn):
61 @wraps(fn)
62 def wrapper(*args, **kwargs):
63 now = datetime.datetime.now()
64 time_since_last_call = now - self.time_of_last_call
65 time_left = self.throttle_period - time_since_last_call
66
67 if time_left > datetime.timedelta(seconds=0):
68 return
69
70 self.time_of_last_call = datetime.datetime.now()
71 return fn(*args, **kwargs)
72
73 return wrapper
74
31
75
32 class PublishData(Operation):
76 class PublishData(Operation):
33 """Clase publish."""
77 """Clase publish."""
34
78
35 __MAXNUMX = 80
79 __MAXNUMX = 100
36 __MAXNUMY = 80
80 __MAXNUMY = 100
37
81
38 def __init__(self):
82 def __init__(self, **kwargs):
39 """Inicio."""
83 """Inicio."""
40 Operation.__init__(self)
84 Operation.__init__(self, **kwargs)
41 self.isConfig = False
85 self.isConfig = False
42 self.client = None
86 self.client = None
87 self.zeromq = None
88 self.mqtt = None
43
89
44 def on_disconnect(self, client, userdata, rc):
90 def on_disconnect(self, client, userdata, rc):
45 if rc != 0:
91 if rc != 0:
@@ -65,108 +111,149 class PublishData(Operation):
65 print "MQTT Conection error."
111 print "MQTT Conection error."
66 self.client = False
112 self.client = False
67
113
68 def setup(self, host, port=1883, username=None, password=None, clientId="user", **kwargs):
114 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
69
115 self.counter = 0
70 self.topic = kwargs.get('topic', 'schain')
116 self.topic = kwargs.get('topic', 'schain')
71 self.delay = kwargs.get('delay', 0)
117 self.delay = kwargs.get('delay', 0)
72 self.plottype = kwargs.get('plottype', 'spectra')
118 self.plottype = kwargs.get('plottype', 'spectra')
73 self.host = host
119 self.host = kwargs.get('host', "10.10.10.82")
74 self.port = port
120 self.port = kwargs.get('port', 3000)
75 self.clientId = clientId
121 self.clientId = clientId
76 self.cnt = 0
122 self.cnt = 0
123 self.zeromq = zeromq
124 self.mqtt = kwargs.get('plottype', 0)
125 self.client = None
77 setup = []
126 setup = []
78 for plot in self.plottype:
127 if mqtt is 1:
79 setup.append({
128 print 'mqqt es 1'
80 'plot': plot,
129 self.client = mqtt.Client(
81 'topic': self.topic + plot,
130 client_id=self.clientId + self.topic + 'SCHAIN',
82 'title': getattr(self, plot + '_' + 'title', False),
131 clean_session=True)
83 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
132 self.client.on_disconnect = self.on_disconnect
84 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
133 self.connect()
85 'xrange': getattr(self, plot + '_' + 'xrange', False),
134 for plot in self.plottype:
86 'yrange': getattr(self, plot + '_' + 'yrange', False),
135 setup.append({
87 'zrange': getattr(self, plot + '_' + 'zrange', False),
136 'plot': plot,
88 })
137 'topic': self.topic + plot,
89 self.client = mqtt.Client(
138 'title': getattr(self, plot + '_' + 'title', False),
90 client_id=self.clientId + self.topic + 'SCHAIN',
139 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
91 clean_session=True)
140 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
92 self.client.on_disconnect = self.on_disconnect
141 'xrange': getattr(self, plot + '_' + 'xrange', False),
93 self.connect()
142 'yrange': getattr(self, plot + '_' + 'yrange', False),
143 'zrange': getattr(self, plot + '_' + 'zrange', False),
144 })
145 if zeromq is 1:
146 context = zmq.Context()
147 self.zmq_socket = context.socket(zmq.PUSH)
148 server = kwargs.get('server', 'zmq.pipe')
149
150 if 'http://' in server:
151 address = server
152 else:
153 address = 'ipc:///tmp/%s' % server
154
155 self.zmq_socket.connect(address)
156 time.sleep(1)
157 print 'zeromq configured'
158
159
160 def publish_data(self):
161 self.dataOut.finished = False
162 if self.mqtt is 1:
163 yData = self.dataOut.heightList[:2].tolist()
164 if self.plottype == 'spectra':
165 data = getattr(self.dataOut, 'data_spc')
166 z = data/self.dataOut.normFactor
167 zdB = 10*numpy.log10(z)
168 xlen, ylen = zdB[0].shape
169 dx = numpy.floor(xlen/self.__MAXNUMX) + 1
170 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
171 Z = [0 for i in self.dataOut.channelList]
172 for i in self.dataOut.channelList:
173 Z[i] = zdB[i][::dx, ::dy].tolist()
174 payload = {
175 'timestamp': self.dataOut.utctime,
176 'data': roundFloats(Z),
177 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
178 'interval': self.dataOut.getTimeInterval(),
179 'type': self.plottype,
180 'yData': yData
181 }
182 # print payload
94
183
95 def publish_data(self, plottype):
184 elif self.plottype in ('rti', 'power'):
96 data = getattr(self.dataOut, 'data_spc')
185 data = getattr(self.dataOut, 'data_spc')
97 yData = self.dataOut.heightList[:2].tolist()
186 z = data/self.dataOut.normFactor
98 if plottype == 'spectra':
187 avg = numpy.average(z, axis=1)
99 z = data/self.dataOut.normFactor
188 avgdB = 10*numpy.log10(avg)
100 zdB = 10*numpy.log10(z)
189 xlen, ylen = z[0].shape
101 xlen, ylen = zdB[0].shape
190 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
102 dx = numpy.floor(xlen/self.__MAXNUMX) + 1
191 AVG = [0 for i in self.dataOut.channelList]
103 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
192 for i in self.dataOut.channelList:
104 Z = [0 for i in self.dataOut.channelList]
193 AVG[i] = avgdB[i][::dy].tolist()
105 for i in self.dataOut.channelList:
194 payload = {
106 Z[i] = zdB[i][::dx, ::dy].tolist()
195 'timestamp': self.dataOut.utctime,
107 payload = {
196 'data': roundFloats(AVG),
108 'timestamp': self.dataOut.utctime,
197 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
109 'data': roundFloats(Z),
198 'interval': self.dataOut.getTimeInterval(),
110 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
199 'type': self.plottype,
111 'interval': self.dataOut.getTimeInterval(),
200 'yData': yData
112 'xRange': [0, 80],
201 }
113 'type': plottype,
202 elif self.plottype == 'noise':
114 'yData': yData
203 noise = self.dataOut.getNoise()/self.dataOut.normFactor
115 }
204 noisedB = 10*numpy.log10(noise)
116 # print payload
205 payload = {
117
206 'timestamp': self.dataOut.utctime,
118 elif plottype in ('rti', 'power'):
207 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
119 z = data/self.dataOut.normFactor
208 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
120 avg = numpy.average(z, axis=1)
209 'interval': self.dataOut.getTimeInterval(),
121 avgdB = 10*numpy.log10(avg)
210 'type': self.plottype,
122 xlen, ylen = z[0].shape
211 'yData': yData
123 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
212 }
124 AVG = [0 for i in self.dataOut.channelList]
213 elif self.plottype == 'snr':
125 for i in self.dataOut.channelList:
214 data = getattr(self.dataOut, 'data_SNR')
126 AVG[i] = avgdB[i][::dy].tolist()
215 avgdB = 10*numpy.log10(data)
127 payload = {
216
128 'timestamp': self.dataOut.utctime,
217 ylen = data[0].size
129 'data': roundFloats(AVG),
218 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
130 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
219 AVG = [0 for i in self.dataOut.channelList]
131 'interval': self.dataOut.getTimeInterval(),
220 for i in self.dataOut.channelList:
132 'xRange': [0, 80],
221 AVG[i] = avgdB[i][::dy].tolist()
133 'type': plottype,
222 payload = {
134 'yData': yData
223 'timestamp': self.dataOut.utctime,
135 }
224 'data': roundFloats(AVG),
136 elif plottype == 'noise':
225 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
137 noise = self.dataOut.getNoise()/self.dataOut.normFactor
226 'type': self.plottype,
138 noisedB = 10*numpy.log10(noise)
227 'yData': yData
139 payload = {
228 }
140 'timestamp': self.dataOut.utctime,
229 else:
141 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
230 print "Tipo de grafico invalido"
142 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
231 payload = {
143 'interval': self.dataOut.getTimeInterval(),
232 'data': 'None',
144 'xRange': [0, 80],
233 'timestamp': 'None',
145 'type': plottype,
234 'type': None
146 'yData': yData
235 }
147 }
236 # print 'Publishing data to {}'.format(self.host)
148 else:
237 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
149 print "Tipo de grafico invalido"
238
150 payload = {
239 if self.zeromq is 1:
151 'data': 'None',
240 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
152 'timestamp': 'None',
241 self.zmq_socket.send_pyobj(self.dataOut)
153 'type': None
242
154 }
243 def run(self, dataOut, **kwargs):
155 print 'Publishing data to {}'.format(self.host)
156 print '*************************'
157 self.client.publish(self.topic + plottype, json.dumps(payload), qos=0)
158
159
160 def run(self, dataOut, host, **kwargs):
161 self.dataOut = dataOut
244 self.dataOut = dataOut
162 if not self.isConfig:
245 if not self.isConfig:
163 self.setup(host, **kwargs)
246 self.setup(**kwargs)
164 self.isConfig = True
247 self.isConfig = True
165
248
166 map(self.publish_data, self.plottype)
249 self.publish_data()
167 time.sleep(self.delay)
250 time.sleep(self.delay)
168
251
169 def close(self):
252 def close(self):
253 if self.zeromq is 1:
254 self.dataOut.finished = True
255 self.zmq_socket.send_pyobj(self.dataOut)
256
170 if self.client:
257 if self.client:
171 self.client.loop_stop()
258 self.client.loop_stop()
172 self.client.disconnect()
259 self.client.disconnect()
General Comments 0
You need to be logged in to leave comments. Login now