##// END OF EJS Templates
fix zmq protocol
Juan C. Valdez -
r886:411fb11610ee
parent child
Show More
@@ -1,259 +1,259
1 '''
1 '''
2 @author: Juan C. Espinoza
2 @author: Juan C. Espinoza
3 '''
3 '''
4
4
5 import time
5 import time
6 import json
6 import json
7 import numpy
7 import numpy
8 import paho.mqtt.client as mqtt
8 import paho.mqtt.client as mqtt
9 import zmq
9 import zmq
10 import cPickle as pickle
10 import cPickle as pickle
11 import datetime
11 import datetime
12 from zmq.utils.monitor import recv_monitor_message
12 from zmq.utils.monitor import recv_monitor_message
13 from functools import wraps
13 from functools import wraps
14 from threading import Thread
14 from threading import Thread
15 from multiprocessing import Process
15 from multiprocessing import Process
16
16
17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
18
18
19 throttle_value = 10
19 throttle_value = 5
20
20
21 class PrettyFloat(float):
21 class PrettyFloat(float):
22 def __repr__(self):
22 def __repr__(self):
23 return '%.2f' % self
23 return '%.2f' % self
24
24
25 def roundFloats(obj):
25 def roundFloats(obj):
26 if isinstance(obj, list):
26 if isinstance(obj, list):
27 return map(roundFloats, obj)
27 return map(roundFloats, obj)
28 elif isinstance(obj, float):
28 elif isinstance(obj, float):
29 return round(obj, 2)
29 return round(obj, 2)
30
30
31 def pretty_floats(obj):
31 def pretty_floats(obj):
32 if isinstance(obj, float):
32 if isinstance(obj, float):
33 return PrettyFloat(obj)
33 return PrettyFloat(obj)
34 elif isinstance(obj, dict):
34 elif isinstance(obj, dict):
35 return dict((k, pretty_floats(v)) for k, v in obj.items())
35 return dict((k, pretty_floats(v)) for k, v in obj.items())
36 elif isinstance(obj, (list, tuple)):
36 elif isinstance(obj, (list, tuple)):
37 return map(pretty_floats, obj)
37 return map(pretty_floats, obj)
38 return obj
38 return obj
39
39
40 class throttle(object):
40 class throttle(object):
41 """Decorator that prevents a function from being called more than once every
41 """Decorator that prevents a function from being called more than once every
42 time period.
42 time period.
43 To create a function that cannot be called more than once a minute, but
43 To create a function that cannot be called more than once a minute, but
44 will sleep until it can be called:
44 will sleep until it can be called:
45 @throttle(minutes=1)
45 @throttle(minutes=1)
46 def foo():
46 def foo():
47 pass
47 pass
48
48
49 for i in range(10):
49 for i in range(10):
50 foo()
50 foo()
51 print "This function has run %s times." % i
51 print "This function has run %s times." % i
52 """
52 """
53
53
54 def __init__(self, seconds=0, minutes=0, hours=0):
54 def __init__(self, seconds=0, minutes=0, hours=0):
55 self.throttle_period = datetime.timedelta(
55 self.throttle_period = datetime.timedelta(
56 seconds=seconds, minutes=minutes, hours=hours
56 seconds=seconds, minutes=minutes, hours=hours
57 )
57 )
58 self.time_of_last_call = datetime.datetime.min
58 self.time_of_last_call = datetime.datetime.min
59
59
60 def __call__(self, fn):
60 def __call__(self, fn):
61 @wraps(fn)
61 @wraps(fn)
62 def wrapper(*args, **kwargs):
62 def wrapper(*args, **kwargs):
63 now = datetime.datetime.now()
63 now = datetime.datetime.now()
64 time_since_last_call = now - self.time_of_last_call
64 time_since_last_call = now - self.time_of_last_call
65 time_left = self.throttle_period - time_since_last_call
65 time_left = self.throttle_period - time_since_last_call
66
66
67 if time_left > datetime.timedelta(seconds=0):
67 if time_left > datetime.timedelta(seconds=0):
68 return
68 return
69
69
70 self.time_of_last_call = datetime.datetime.now()
70 self.time_of_last_call = datetime.datetime.now()
71 return fn(*args, **kwargs)
71 return fn(*args, **kwargs)
72
72
73 return wrapper
73 return wrapper
74
74
75
75
76 class PublishData(Operation):
76 class PublishData(Operation):
77 """Clase publish."""
77 """Clase publish."""
78
78
79 __MAXNUMX = 100
79 __MAXNUMX = 100
80 __MAXNUMY = 100
80 __MAXNUMY = 100
81
81
82 def __init__(self, **kwargs):
82 def __init__(self, **kwargs):
83 """Inicio."""
83 """Inicio."""
84 Operation.__init__(self, **kwargs)
84 Operation.__init__(self, **kwargs)
85 self.isConfig = False
85 self.isConfig = False
86 self.client = None
86 self.client = None
87 self.zeromq = None
87 self.zeromq = None
88 self.mqtt = None
88 self.mqtt = None
89
89
90 def on_disconnect(self, client, userdata, rc):
90 def on_disconnect(self, client, userdata, rc):
91 if rc != 0:
91 if rc != 0:
92 print("Unexpected disconnection.")
92 print("Unexpected disconnection.")
93 self.connect()
93 self.connect()
94
94
95 def connect(self):
95 def connect(self):
96 print 'trying to connect'
96 print 'trying to connect'
97 try:
97 try:
98 self.client.connect(
98 self.client.connect(
99 host=self.host,
99 host=self.host,
100 port=self.port,
100 port=self.port,
101 keepalive=60*10,
101 keepalive=60*10,
102 bind_address='')
102 bind_address='')
103 print "connected"
103 print "connected"
104 self.client.loop_start()
104 self.client.loop_start()
105 # self.client.publish(
105 # self.client.publish(
106 # self.topic + 'SETUP',
106 # self.topic + 'SETUP',
107 # json.dumps(setup),
107 # json.dumps(setup),
108 # retain=True
108 # retain=True
109 # )
109 # )
110 except:
110 except:
111 print "MQTT Conection error."
111 print "MQTT Conection error."
112 self.client = False
112 self.client = False
113
113
114 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
114 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
115 self.counter = 0
115 self.counter = 0
116 self.topic = kwargs.get('topic', 'schain')
116 self.topic = kwargs.get('topic', 'schain')
117 self.delay = kwargs.get('delay', 0)
117 self.delay = kwargs.get('delay', 0)
118 self.plottype = kwargs.get('plottype', 'spectra')
118 self.plottype = kwargs.get('plottype', 'spectra')
119 self.host = kwargs.get('host', "10.10.10.82")
119 self.host = kwargs.get('host', "10.10.10.82")
120 self.port = kwargs.get('port', 3000)
120 self.port = kwargs.get('port', 3000)
121 self.clientId = clientId
121 self.clientId = clientId
122 self.cnt = 0
122 self.cnt = 0
123 self.zeromq = zeromq
123 self.zeromq = zeromq
124 self.mqtt = kwargs.get('plottype', 0)
124 self.mqtt = kwargs.get('plottype', 0)
125 self.client = None
125 self.client = None
126 setup = []
126 setup = []
127 if mqtt is 1:
127 if mqtt is 1:
128 print 'mqqt es 1'
128 print 'mqqt es 1'
129 self.client = mqtt.Client(
129 self.client = mqtt.Client(
130 client_id=self.clientId + self.topic + 'SCHAIN',
130 client_id=self.clientId + self.topic + 'SCHAIN',
131 clean_session=True)
131 clean_session=True)
132 self.client.on_disconnect = self.on_disconnect
132 self.client.on_disconnect = self.on_disconnect
133 self.connect()
133 self.connect()
134 for plot in self.plottype:
134 for plot in self.plottype:
135 setup.append({
135 setup.append({
136 'plot': plot,
136 'plot': plot,
137 'topic': self.topic + plot,
137 'topic': self.topic + plot,
138 'title': getattr(self, plot + '_' + 'title', False),
138 'title': getattr(self, plot + '_' + 'title', False),
139 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
139 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
140 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
140 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
141 'xrange': getattr(self, plot + '_' + 'xrange', False),
141 'xrange': getattr(self, plot + '_' + 'xrange', False),
142 'yrange': getattr(self, plot + '_' + 'yrange', False),
142 'yrange': getattr(self, plot + '_' + 'yrange', False),
143 'zrange': getattr(self, plot + '_' + 'zrange', False),
143 'zrange': getattr(self, plot + '_' + 'zrange', False),
144 })
144 })
145 if zeromq is 1:
145 if zeromq is 1:
146 context = zmq.Context()
146 context = zmq.Context()
147 self.zmq_socket = context.socket(zmq.PUSH)
147 self.zmq_socket = context.socket(zmq.PUSH)
148 server = kwargs.get('server', 'zmq.pipe')
148 server = kwargs.get('server', 'zmq.pipe')
149
149
150 if 'http://' in server:
150 if 'tcp://' in server:
151 address = server
151 address = server
152 else:
152 else:
153 address = 'ipc:///tmp/%s' % server
153 address = 'ipc:///tmp/%s' % server
154
154
155 self.zmq_socket.connect(address)
155 self.zmq_socket.connect(address)
156 time.sleep(1)
156 time.sleep(1)
157 print 'zeromq configured'
157 print 'zeromq configured'
158
158
159
159
160 def publish_data(self):
160 def publish_data(self):
161 self.dataOut.finished = False
161 self.dataOut.finished = False
162 if self.mqtt is 1:
162 if self.mqtt is 1:
163 yData = self.dataOut.heightList[:2].tolist()
163 yData = self.dataOut.heightList[:2].tolist()
164 if self.plottype == 'spectra':
164 if self.plottype == 'spectra':
165 data = getattr(self.dataOut, 'data_spc')
165 data = getattr(self.dataOut, 'data_spc')
166 z = data/self.dataOut.normFactor
166 z = data/self.dataOut.normFactor
167 zdB = 10*numpy.log10(z)
167 zdB = 10*numpy.log10(z)
168 xlen, ylen = zdB[0].shape
168 xlen, ylen = zdB[0].shape
169 dx = numpy.floor(xlen/self.__MAXNUMX) + 1
169 dx = numpy.floor(xlen/self.__MAXNUMX) + 1
170 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
170 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
171 Z = [0 for i in self.dataOut.channelList]
171 Z = [0 for i in self.dataOut.channelList]
172 for i in self.dataOut.channelList:
172 for i in self.dataOut.channelList:
173 Z[i] = zdB[i][::dx, ::dy].tolist()
173 Z[i] = zdB[i][::dx, ::dy].tolist()
174 payload = {
174 payload = {
175 'timestamp': self.dataOut.utctime,
175 'timestamp': self.dataOut.utctime,
176 'data': roundFloats(Z),
176 'data': roundFloats(Z),
177 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
177 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
178 'interval': self.dataOut.getTimeInterval(),
178 'interval': self.dataOut.getTimeInterval(),
179 'type': self.plottype,
179 'type': self.plottype,
180 'yData': yData
180 'yData': yData
181 }
181 }
182 # print payload
182 # print payload
183
183
184 elif self.plottype in ('rti', 'power'):
184 elif self.plottype in ('rti', 'power'):
185 data = getattr(self.dataOut, 'data_spc')
185 data = getattr(self.dataOut, 'data_spc')
186 z = data/self.dataOut.normFactor
186 z = data/self.dataOut.normFactor
187 avg = numpy.average(z, axis=1)
187 avg = numpy.average(z, axis=1)
188 avgdB = 10*numpy.log10(avg)
188 avgdB = 10*numpy.log10(avg)
189 xlen, ylen = z[0].shape
189 xlen, ylen = z[0].shape
190 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
190 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
191 AVG = [0 for i in self.dataOut.channelList]
191 AVG = [0 for i in self.dataOut.channelList]
192 for i in self.dataOut.channelList:
192 for i in self.dataOut.channelList:
193 AVG[i] = avgdB[i][::dy].tolist()
193 AVG[i] = avgdB[i][::dy].tolist()
194 payload = {
194 payload = {
195 'timestamp': self.dataOut.utctime,
195 'timestamp': self.dataOut.utctime,
196 'data': roundFloats(AVG),
196 'data': roundFloats(AVG),
197 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
197 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
198 'interval': self.dataOut.getTimeInterval(),
198 'interval': self.dataOut.getTimeInterval(),
199 'type': self.plottype,
199 'type': self.plottype,
200 'yData': yData
200 'yData': yData
201 }
201 }
202 elif self.plottype == 'noise':
202 elif self.plottype == 'noise':
203 noise = self.dataOut.getNoise()/self.dataOut.normFactor
203 noise = self.dataOut.getNoise()/self.dataOut.normFactor
204 noisedB = 10*numpy.log10(noise)
204 noisedB = 10*numpy.log10(noise)
205 payload = {
205 payload = {
206 'timestamp': self.dataOut.utctime,
206 'timestamp': self.dataOut.utctime,
207 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
207 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
208 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
208 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
209 'interval': self.dataOut.getTimeInterval(),
209 'interval': self.dataOut.getTimeInterval(),
210 'type': self.plottype,
210 'type': self.plottype,
211 'yData': yData
211 'yData': yData
212 }
212 }
213 elif self.plottype == 'snr':
213 elif self.plottype == 'snr':
214 data = getattr(self.dataOut, 'data_SNR')
214 data = getattr(self.dataOut, 'data_SNR')
215 avgdB = 10*numpy.log10(data)
215 avgdB = 10*numpy.log10(data)
216
216
217 ylen = data[0].size
217 ylen = data[0].size
218 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
218 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
219 AVG = [0 for i in self.dataOut.channelList]
219 AVG = [0 for i in self.dataOut.channelList]
220 for i in self.dataOut.channelList:
220 for i in self.dataOut.channelList:
221 AVG[i] = avgdB[i][::dy].tolist()
221 AVG[i] = avgdB[i][::dy].tolist()
222 payload = {
222 payload = {
223 'timestamp': self.dataOut.utctime,
223 'timestamp': self.dataOut.utctime,
224 'data': roundFloats(AVG),
224 'data': roundFloats(AVG),
225 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
225 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
226 'type': self.plottype,
226 'type': self.plottype,
227 'yData': yData
227 'yData': yData
228 }
228 }
229 else:
229 else:
230 print "Tipo de grafico invalido"
230 print "Tipo de grafico invalido"
231 payload = {
231 payload = {
232 'data': 'None',
232 'data': 'None',
233 'timestamp': 'None',
233 'timestamp': 'None',
234 'type': None
234 'type': None
235 }
235 }
236 # print 'Publishing data to {}'.format(self.host)
236 # print 'Publishing data to {}'.format(self.host)
237 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
237 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
238
238
239 if self.zeromq is 1:
239 if self.zeromq is 1:
240 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
240 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
241 self.zmq_socket.send_pyobj(self.dataOut)
241 self.zmq_socket.send_pyobj(self.dataOut)
242
242
243 def run(self, dataOut, **kwargs):
243 def run(self, dataOut, **kwargs):
244 self.dataOut = dataOut
244 self.dataOut = dataOut
245 if not self.isConfig:
245 if not self.isConfig:
246 self.setup(**kwargs)
246 self.setup(**kwargs)
247 self.isConfig = True
247 self.isConfig = True
248
248
249 self.publish_data()
249 self.publish_data()
250 time.sleep(self.delay)
250 time.sleep(self.delay)
251
251
252 def close(self):
252 def close(self):
253 if self.zeromq is 1:
253 if self.zeromq is 1:
254 self.dataOut.finished = True
254 self.dataOut.finished = True
255 self.zmq_socket.send_pyobj(self.dataOut)
255 self.zmq_socket.send_pyobj(self.dataOut)
256
256
257 if self.client:
257 if self.client:
258 self.client.loop_stop()
258 self.client.loop_stop()
259 self.client.disconnect()
259 self.client.disconnect()
General Comments 0
You need to be logged in to leave comments. Login now