##// END OF EJS Templates
zmq support in PublishData
Juan C. Valdez -
r883:4d54fdaec8c6
parent child
Show More
@@ -6,8 +6,17 import time
6 import json
6 import json
7 import numpy
7 import numpy
8 import paho.mqtt.client as mqtt
8 import paho.mqtt.client as mqtt
9 import zmq
10 import cPickle as pickle
11 import datetime
12 from zmq.utils.monitor import recv_monitor_message
13 from functools import wraps
14 from threading import Thread
15 from multiprocessing import Process
9
16
10 from schainpy.model.proc.jroproc_base import Operation
17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
18
19 throttle_value = 10
11
20
12 class PrettyFloat(float):
21 class PrettyFloat(float):
13 def __repr__(self):
22 def __repr__(self):
@@ -28,18 +37,55 def pretty_floats(obj):
28 return map(pretty_floats, obj)
37 return map(pretty_floats, obj)
29 return obj
38 return obj
30
39
40 class throttle(object):
41 """Decorator that prevents a function from being called more than once every
42 time period.
43 To create a function that cannot be called more than once a minute, but
44 will sleep until it can be called:
45 @throttle(minutes=1)
46 def foo():
47 pass
48
49 for i in range(10):
50 foo()
51 print "This function has run %s times." % i
52 """
53
54 def __init__(self, seconds=0, minutes=0, hours=0):
55 self.throttle_period = datetime.timedelta(
56 seconds=seconds, minutes=minutes, hours=hours
57 )
58 self.time_of_last_call = datetime.datetime.min
59
60 def __call__(self, fn):
61 @wraps(fn)
62 def wrapper(*args, **kwargs):
63 now = datetime.datetime.now()
64 time_since_last_call = now - self.time_of_last_call
65 time_left = self.throttle_period - time_since_last_call
66
67 if time_left > datetime.timedelta(seconds=0):
68 return
69
70 self.time_of_last_call = datetime.datetime.now()
71 return fn(*args, **kwargs)
72
73 return wrapper
74
31
75
32 class PublishData(Operation):
76 class PublishData(Operation):
33 """Clase publish."""
77 """Clase publish."""
34
78
35 __MAXNUMX = 80
79 __MAXNUMX = 100
36 __MAXNUMY = 80
80 __MAXNUMY = 100
37
81
38 def __init__(self):
82 def __init__(self, **kwargs):
39 """Inicio."""
83 """Inicio."""
40 Operation.__init__(self)
84 Operation.__init__(self, **kwargs)
41 self.isConfig = False
85 self.isConfig = False
42 self.client = None
86 self.client = None
87 self.zeromq = None
88 self.mqtt = None
43
89
44 def on_disconnect(self, client, userdata, rc):
90 def on_disconnect(self, client, userdata, rc):
45 if rc != 0:
91 if rc != 0:
@@ -65,16 +111,26 class PublishData(Operation):
65 print "MQTT Conection error."
111 print "MQTT Conection error."
66 self.client = False
112 self.client = False
67
113
68 def setup(self, host, port=1883, username=None, password=None, clientId="user", **kwargs):
114 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
69
115 self.counter = 0
70 self.topic = kwargs.get('topic', 'schain')
116 self.topic = kwargs.get('topic', 'schain')
71 self.delay = kwargs.get('delay', 0)
117 self.delay = kwargs.get('delay', 0)
72 self.plottype = kwargs.get('plottype', 'spectra')
118 self.plottype = kwargs.get('plottype', 'spectra')
73 self.host = host
119 self.host = kwargs.get('host', "10.10.10.82")
74 self.port = port
120 self.port = kwargs.get('port', 3000)
75 self.clientId = clientId
121 self.clientId = clientId
76 self.cnt = 0
122 self.cnt = 0
123 self.zeromq = zeromq
124 self.mqtt = kwargs.get('plottype', 0)
125 self.client = None
77 setup = []
126 setup = []
127 if mqtt is 1:
128 print 'mqqt es 1'
129 self.client = mqtt.Client(
130 client_id=self.clientId + self.topic + 'SCHAIN',
131 clean_session=True)
132 self.client.on_disconnect = self.on_disconnect
133 self.connect()
78 for plot in self.plottype:
134 for plot in self.plottype:
79 setup.append({
135 setup.append({
80 'plot': plot,
136 'plot': plot,
@@ -86,16 +142,27 class PublishData(Operation):
86 'yrange': getattr(self, plot + '_' + 'yrange', False),
142 'yrange': getattr(self, plot + '_' + 'yrange', False),
87 'zrange': getattr(self, plot + '_' + 'zrange', False),
143 'zrange': getattr(self, plot + '_' + 'zrange', False),
88 })
144 })
89 self.client = mqtt.Client(
145 if zeromq is 1:
90 client_id=self.clientId + self.topic + 'SCHAIN',
146 context = zmq.Context()
91 clean_session=True)
147 self.zmq_socket = context.socket(zmq.PUSH)
92 self.client.on_disconnect = self.on_disconnect
148 server = kwargs.get('server', 'zmq.pipe')
93 self.connect()
94
149
95 def publish_data(self, plottype):
150 if 'http://' in server:
96 data = getattr(self.dataOut, 'data_spc')
151 address = server
152 else:
153 address = 'ipc:///tmp/%s' % server
154
155 self.zmq_socket.connect(address)
156 time.sleep(1)
157 print 'zeromq configured'
158
159
160 def publish_data(self):
161 self.dataOut.finished = False
162 if self.mqtt is 1:
97 yData = self.dataOut.heightList[:2].tolist()
163 yData = self.dataOut.heightList[:2].tolist()
98 if plottype == 'spectra':
164 if self.plottype == 'spectra':
165 data = getattr(self.dataOut, 'data_spc')
99 z = data/self.dataOut.normFactor
166 z = data/self.dataOut.normFactor
100 zdB = 10*numpy.log10(z)
167 zdB = 10*numpy.log10(z)
101 xlen, ylen = zdB[0].shape
168 xlen, ylen = zdB[0].shape
@@ -109,13 +176,13 class PublishData(Operation):
109 'data': roundFloats(Z),
176 'data': roundFloats(Z),
110 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
177 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
111 'interval': self.dataOut.getTimeInterval(),
178 'interval': self.dataOut.getTimeInterval(),
112 'xRange': [0, 80],
179 'type': self.plottype,
113 'type': plottype,
114 'yData': yData
180 'yData': yData
115 }
181 }
116 # print payload
182 # print payload
117
183
118 elif plottype in ('rti', 'power'):
184 elif self.plottype in ('rti', 'power'):
185 data = getattr(self.dataOut, 'data_spc')
119 z = data/self.dataOut.normFactor
186 z = data/self.dataOut.normFactor
120 avg = numpy.average(z, axis=1)
187 avg = numpy.average(z, axis=1)
121 avgdB = 10*numpy.log10(avg)
188 avgdB = 10*numpy.log10(avg)
@@ -129,11 +196,10 class PublishData(Operation):
129 'data': roundFloats(AVG),
196 'data': roundFloats(AVG),
130 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
197 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
131 'interval': self.dataOut.getTimeInterval(),
198 'interval': self.dataOut.getTimeInterval(),
132 'xRange': [0, 80],
199 'type': self.plottype,
133 'type': plottype,
134 'yData': yData
200 'yData': yData
135 }
201 }
136 elif plottype == 'noise':
202 elif self.plottype == 'noise':
137 noise = self.dataOut.getNoise()/self.dataOut.normFactor
203 noise = self.dataOut.getNoise()/self.dataOut.normFactor
138 noisedB = 10*numpy.log10(noise)
204 noisedB = 10*numpy.log10(noise)
139 payload = {
205 payload = {
@@ -141,8 +207,23 class PublishData(Operation):
141 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
207 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
142 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
208 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
143 'interval': self.dataOut.getTimeInterval(),
209 'interval': self.dataOut.getTimeInterval(),
144 'xRange': [0, 80],
210 'type': self.plottype,
145 'type': plottype,
211 'yData': yData
212 }
213 elif self.plottype == 'snr':
214 data = getattr(self.dataOut, 'data_SNR')
215 avgdB = 10*numpy.log10(data)
216
217 ylen = data[0].size
218 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
219 AVG = [0 for i in self.dataOut.channelList]
220 for i in self.dataOut.channelList:
221 AVG[i] = avgdB[i][::dy].tolist()
222 payload = {
223 'timestamp': self.dataOut.utctime,
224 'data': roundFloats(AVG),
225 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
226 'type': self.plottype,
146 'yData': yData
227 'yData': yData
147 }
228 }
148 else:
229 else:
@@ -152,21 +233,27 class PublishData(Operation):
152 'timestamp': 'None',
233 'timestamp': 'None',
153 'type': None
234 'type': None
154 }
235 }
155 print 'Publishing data to {}'.format(self.host)
236 # print 'Publishing data to {}'.format(self.host)
156 print '*************************'
237 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
157 self.client.publish(self.topic + plottype, json.dumps(payload), qos=0)
158
238
239 if self.zeromq is 1:
240 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
241 self.zmq_socket.send_pyobj(self.dataOut)
159
242
160 def run(self, dataOut, host, **kwargs):
243 def run(self, dataOut, **kwargs):
161 self.dataOut = dataOut
244 self.dataOut = dataOut
162 if not self.isConfig:
245 if not self.isConfig:
163 self.setup(host, **kwargs)
246 self.setup(**kwargs)
164 self.isConfig = True
247 self.isConfig = True
165
248
166 map(self.publish_data, self.plottype)
249 self.publish_data()
167 time.sleep(self.delay)
250 time.sleep(self.delay)
168
251
169 def close(self):
252 def close(self):
253 if self.zeromq is 1:
254 self.dataOut.finished = True
255 self.zmq_socket.send_pyobj(self.dataOut)
256
170 if self.client:
257 if self.client:
171 self.client.loop_stop()
258 self.client.loop_stop()
172 self.client.disconnect()
259 self.client.disconnect()
General Comments 0
You need to be logged in to leave comments. Login now