##// END OF EJS Templates
zmq support in PublishData
Juan C. Valdez -
r883:4d54fdaec8c6
parent child
Show More
@@ -6,8 +6,17 import time
6 6 import json
7 7 import numpy
8 8 import paho.mqtt.client as mqtt
9 import zmq
10 import cPickle as pickle
11 import datetime
12 from zmq.utils.monitor import recv_monitor_message
13 from functools import wraps
14 from threading import Thread
15 from multiprocessing import Process
9 16
10 from schainpy.model.proc.jroproc_base import Operation
17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
18
19 throttle_value = 10
11 20
12 21 class PrettyFloat(float):
13 22 def __repr__(self):
@@ -28,18 +37,55 def pretty_floats(obj):
28 37 return map(pretty_floats, obj)
29 38 return obj
30 39
40 class throttle(object):
41 """Decorator that prevents a function from being called more than once every
42 time period.
43 To create a function that cannot be called more than once a minute, but
44 will sleep until it can be called:
45 @throttle(minutes=1)
46 def foo():
47 pass
48
49 for i in range(10):
50 foo()
51 print "This function has run %s times." % i
52 """
53
54 def __init__(self, seconds=0, minutes=0, hours=0):
55 self.throttle_period = datetime.timedelta(
56 seconds=seconds, minutes=minutes, hours=hours
57 )
58 self.time_of_last_call = datetime.datetime.min
59
60 def __call__(self, fn):
61 @wraps(fn)
62 def wrapper(*args, **kwargs):
63 now = datetime.datetime.now()
64 time_since_last_call = now - self.time_of_last_call
65 time_left = self.throttle_period - time_since_last_call
66
67 if time_left > datetime.timedelta(seconds=0):
68 return
69
70 self.time_of_last_call = datetime.datetime.now()
71 return fn(*args, **kwargs)
72
73 return wrapper
74
31 75
32 76 class PublishData(Operation):
33 77 """Clase publish."""
34 78
35 __MAXNUMX = 80
36 __MAXNUMY = 80
79 __MAXNUMX = 100
80 __MAXNUMY = 100
37 81
38 def __init__(self):
82 def __init__(self, **kwargs):
39 83 """Inicio."""
40 Operation.__init__(self)
84 Operation.__init__(self, **kwargs)
41 85 self.isConfig = False
42 86 self.client = None
87 self.zeromq = None
88 self.mqtt = None
43 89
44 90 def on_disconnect(self, client, userdata, rc):
45 91 if rc != 0:
@@ -65,108 +111,149 class PublishData(Operation):
65 111 print "MQTT Conection error."
66 112 self.client = False
67 113
68 def setup(self, host, port=1883, username=None, password=None, clientId="user", **kwargs):
69
114 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
115 self.counter = 0
70 116 self.topic = kwargs.get('topic', 'schain')
71 117 self.delay = kwargs.get('delay', 0)
72 118 self.plottype = kwargs.get('plottype', 'spectra')
73 self.host = host
74 self.port = port
119 self.host = kwargs.get('host', "10.10.10.82")
120 self.port = kwargs.get('port', 3000)
75 121 self.clientId = clientId
76 122 self.cnt = 0
123 self.zeromq = zeromq
124 self.mqtt = kwargs.get('plottype', 0)
125 self.client = None
77 126 setup = []
78 for plot in self.plottype:
79 setup.append({
80 'plot': plot,
81 'topic': self.topic + plot,
82 'title': getattr(self, plot + '_' + 'title', False),
83 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
84 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
85 'xrange': getattr(self, plot + '_' + 'xrange', False),
86 'yrange': getattr(self, plot + '_' + 'yrange', False),
87 'zrange': getattr(self, plot + '_' + 'zrange', False),
88 })
89 self.client = mqtt.Client(
90 client_id=self.clientId + self.topic + 'SCHAIN',
91 clean_session=True)
92 self.client.on_disconnect = self.on_disconnect
93 self.connect()
127 if mqtt is 1:
128 print 'mqqt es 1'
129 self.client = mqtt.Client(
130 client_id=self.clientId + self.topic + 'SCHAIN',
131 clean_session=True)
132 self.client.on_disconnect = self.on_disconnect
133 self.connect()
134 for plot in self.plottype:
135 setup.append({
136 'plot': plot,
137 'topic': self.topic + plot,
138 'title': getattr(self, plot + '_' + 'title', False),
139 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
140 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
141 'xrange': getattr(self, plot + '_' + 'xrange', False),
142 'yrange': getattr(self, plot + '_' + 'yrange', False),
143 'zrange': getattr(self, plot + '_' + 'zrange', False),
144 })
145 if zeromq is 1:
146 context = zmq.Context()
147 self.zmq_socket = context.socket(zmq.PUSH)
148 server = kwargs.get('server', 'zmq.pipe')
149
150 if 'http://' in server:
151 address = server
152 else:
153 address = 'ipc:///tmp/%s' % server
154
155 self.zmq_socket.connect(address)
156 time.sleep(1)
157 print 'zeromq configured'
158
159
160 def publish_data(self):
161 self.dataOut.finished = False
162 if self.mqtt is 1:
163 yData = self.dataOut.heightList[:2].tolist()
164 if self.plottype == 'spectra':
165 data = getattr(self.dataOut, 'data_spc')
166 z = data/self.dataOut.normFactor
167 zdB = 10*numpy.log10(z)
168 xlen, ylen = zdB[0].shape
169 dx = numpy.floor(xlen/self.__MAXNUMX) + 1
170 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
171 Z = [0 for i in self.dataOut.channelList]
172 for i in self.dataOut.channelList:
173 Z[i] = zdB[i][::dx, ::dy].tolist()
174 payload = {
175 'timestamp': self.dataOut.utctime,
176 'data': roundFloats(Z),
177 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
178 'interval': self.dataOut.getTimeInterval(),
179 'type': self.plottype,
180 'yData': yData
181 }
182 # print payload
94 183
95 def publish_data(self, plottype):
96 data = getattr(self.dataOut, 'data_spc')
97 yData = self.dataOut.heightList[:2].tolist()
98 if plottype == 'spectra':
99 z = data/self.dataOut.normFactor
100 zdB = 10*numpy.log10(z)
101 xlen, ylen = zdB[0].shape
102 dx = numpy.floor(xlen/self.__MAXNUMX) + 1
103 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
104 Z = [0 for i in self.dataOut.channelList]
105 for i in self.dataOut.channelList:
106 Z[i] = zdB[i][::dx, ::dy].tolist()
107 payload = {
108 'timestamp': self.dataOut.utctime,
109 'data': roundFloats(Z),
110 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
111 'interval': self.dataOut.getTimeInterval(),
112 'xRange': [0, 80],
113 'type': plottype,
114 'yData': yData
115 }
116 # print payload
117
118 elif plottype in ('rti', 'power'):
119 z = data/self.dataOut.normFactor
120 avg = numpy.average(z, axis=1)
121 avgdB = 10*numpy.log10(avg)
122 xlen, ylen = z[0].shape
123 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
124 AVG = [0 for i in self.dataOut.channelList]
125 for i in self.dataOut.channelList:
126 AVG[i] = avgdB[i][::dy].tolist()
127 payload = {
128 'timestamp': self.dataOut.utctime,
129 'data': roundFloats(AVG),
130 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
131 'interval': self.dataOut.getTimeInterval(),
132 'xRange': [0, 80],
133 'type': plottype,
134 'yData': yData
135 }
136 elif plottype == 'noise':
137 noise = self.dataOut.getNoise()/self.dataOut.normFactor
138 noisedB = 10*numpy.log10(noise)
139 payload = {
140 'timestamp': self.dataOut.utctime,
141 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
142 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
143 'interval': self.dataOut.getTimeInterval(),
144 'xRange': [0, 80],
145 'type': plottype,
146 'yData': yData
147 }
148 else:
149 print "Tipo de grafico invalido"
150 payload = {
151 'data': 'None',
152 'timestamp': 'None',
153 'type': None
154 }
155 print 'Publishing data to {}'.format(self.host)
156 print '*************************'
157 self.client.publish(self.topic + plottype, json.dumps(payload), qos=0)
158
159
160 def run(self, dataOut, host, **kwargs):
184 elif self.plottype in ('rti', 'power'):
185 data = getattr(self.dataOut, 'data_spc')
186 z = data/self.dataOut.normFactor
187 avg = numpy.average(z, axis=1)
188 avgdB = 10*numpy.log10(avg)
189 xlen, ylen = z[0].shape
190 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
191 AVG = [0 for i in self.dataOut.channelList]
192 for i in self.dataOut.channelList:
193 AVG[i] = avgdB[i][::dy].tolist()
194 payload = {
195 'timestamp': self.dataOut.utctime,
196 'data': roundFloats(AVG),
197 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
198 'interval': self.dataOut.getTimeInterval(),
199 'type': self.plottype,
200 'yData': yData
201 }
202 elif self.plottype == 'noise':
203 noise = self.dataOut.getNoise()/self.dataOut.normFactor
204 noisedB = 10*numpy.log10(noise)
205 payload = {
206 'timestamp': self.dataOut.utctime,
207 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
208 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
209 'interval': self.dataOut.getTimeInterval(),
210 'type': self.plottype,
211 'yData': yData
212 }
213 elif self.plottype == 'snr':
214 data = getattr(self.dataOut, 'data_SNR')
215 avgdB = 10*numpy.log10(data)
216
217 ylen = data[0].size
218 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
219 AVG = [0 for i in self.dataOut.channelList]
220 for i in self.dataOut.channelList:
221 AVG[i] = avgdB[i][::dy].tolist()
222 payload = {
223 'timestamp': self.dataOut.utctime,
224 'data': roundFloats(AVG),
225 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
226 'type': self.plottype,
227 'yData': yData
228 }
229 else:
230 print "Tipo de grafico invalido"
231 payload = {
232 'data': 'None',
233 'timestamp': 'None',
234 'type': None
235 }
236 # print 'Publishing data to {}'.format(self.host)
237 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
238
239 if self.zeromq is 1:
240 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
241 self.zmq_socket.send_pyobj(self.dataOut)
242
243 def run(self, dataOut, **kwargs):
161 244 self.dataOut = dataOut
162 245 if not self.isConfig:
163 self.setup(host, **kwargs)
246 self.setup(**kwargs)
164 247 self.isConfig = True
165 248
166 map(self.publish_data, self.plottype)
249 self.publish_data()
167 250 time.sleep(self.delay)
168 251
169 252 def close(self):
253 if self.zeromq is 1:
254 self.dataOut.finished = True
255 self.zmq_socket.send_pyobj(self.dataOut)
256
170 257 if self.client:
171 258 self.client.loop_stop()
172 259 self.client.disconnect()
General Comments 0
You need to be logged in to leave comments. Login now