##// END OF EJS Templates
fix zmq protocol
Juan C. Valdez -
r886:411fb11610ee
parent child
Show More
@@ -1,259 +1,259
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import time
6 6 import json
7 7 import numpy
8 8 import paho.mqtt.client as mqtt
9 9 import zmq
10 10 import cPickle as pickle
11 11 import datetime
12 12 from zmq.utils.monitor import recv_monitor_message
13 13 from functools import wraps
14 14 from threading import Thread
15 15 from multiprocessing import Process
16 16
17 17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
18 18
19 throttle_value = 10
19 throttle_value = 5
20 20
21 21 class PrettyFloat(float):
22 22 def __repr__(self):
23 23 return '%.2f' % self
24 24
25 25 def roundFloats(obj):
26 26 if isinstance(obj, list):
27 27 return map(roundFloats, obj)
28 28 elif isinstance(obj, float):
29 29 return round(obj, 2)
30 30
31 31 def pretty_floats(obj):
32 32 if isinstance(obj, float):
33 33 return PrettyFloat(obj)
34 34 elif isinstance(obj, dict):
35 35 return dict((k, pretty_floats(v)) for k, v in obj.items())
36 36 elif isinstance(obj, (list, tuple)):
37 37 return map(pretty_floats, obj)
38 38 return obj
39 39
40 40 class throttle(object):
41 41 """Decorator that prevents a function from being called more than once every
42 42 time period.
43 43 To create a function that cannot be called more than once a minute, but
44 44 will sleep until it can be called:
45 45 @throttle(minutes=1)
46 46 def foo():
47 47 pass
48 48
49 49 for i in range(10):
50 50 foo()
51 51 print "This function has run %s times." % i
52 52 """
53 53
54 54 def __init__(self, seconds=0, minutes=0, hours=0):
55 55 self.throttle_period = datetime.timedelta(
56 56 seconds=seconds, minutes=minutes, hours=hours
57 57 )
58 58 self.time_of_last_call = datetime.datetime.min
59 59
60 60 def __call__(self, fn):
61 61 @wraps(fn)
62 62 def wrapper(*args, **kwargs):
63 63 now = datetime.datetime.now()
64 64 time_since_last_call = now - self.time_of_last_call
65 65 time_left = self.throttle_period - time_since_last_call
66 66
67 67 if time_left > datetime.timedelta(seconds=0):
68 68 return
69 69
70 70 self.time_of_last_call = datetime.datetime.now()
71 71 return fn(*args, **kwargs)
72 72
73 73 return wrapper
74 74
75 75
76 76 class PublishData(Operation):
77 77 """Clase publish."""
78 78
79 79 __MAXNUMX = 100
80 80 __MAXNUMY = 100
81 81
82 82 def __init__(self, **kwargs):
83 83 """Inicio."""
84 84 Operation.__init__(self, **kwargs)
85 85 self.isConfig = False
86 86 self.client = None
87 87 self.zeromq = None
88 88 self.mqtt = None
89 89
90 90 def on_disconnect(self, client, userdata, rc):
91 91 if rc != 0:
92 92 print("Unexpected disconnection.")
93 93 self.connect()
94 94
95 95 def connect(self):
96 96 print 'trying to connect'
97 97 try:
98 98 self.client.connect(
99 99 host=self.host,
100 100 port=self.port,
101 101 keepalive=60*10,
102 102 bind_address='')
103 103 print "connected"
104 104 self.client.loop_start()
105 105 # self.client.publish(
106 106 # self.topic + 'SETUP',
107 107 # json.dumps(setup),
108 108 # retain=True
109 109 # )
110 110 except:
111 111 print "MQTT Conection error."
112 112 self.client = False
113 113
114 114 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
115 115 self.counter = 0
116 116 self.topic = kwargs.get('topic', 'schain')
117 117 self.delay = kwargs.get('delay', 0)
118 118 self.plottype = kwargs.get('plottype', 'spectra')
119 119 self.host = kwargs.get('host', "10.10.10.82")
120 120 self.port = kwargs.get('port', 3000)
121 121 self.clientId = clientId
122 122 self.cnt = 0
123 123 self.zeromq = zeromq
124 124 self.mqtt = kwargs.get('plottype', 0)
125 125 self.client = None
126 126 setup = []
127 127 if mqtt is 1:
128 128 print 'mqqt es 1'
129 129 self.client = mqtt.Client(
130 130 client_id=self.clientId + self.topic + 'SCHAIN',
131 131 clean_session=True)
132 132 self.client.on_disconnect = self.on_disconnect
133 133 self.connect()
134 134 for plot in self.plottype:
135 135 setup.append({
136 136 'plot': plot,
137 137 'topic': self.topic + plot,
138 138 'title': getattr(self, plot + '_' + 'title', False),
139 139 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
140 140 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
141 141 'xrange': getattr(self, plot + '_' + 'xrange', False),
142 142 'yrange': getattr(self, plot + '_' + 'yrange', False),
143 143 'zrange': getattr(self, plot + '_' + 'zrange', False),
144 144 })
145 145 if zeromq is 1:
146 146 context = zmq.Context()
147 147 self.zmq_socket = context.socket(zmq.PUSH)
148 148 server = kwargs.get('server', 'zmq.pipe')
149 149
150 if 'http://' in server:
150 if 'tcp://' in server:
151 151 address = server
152 152 else:
153 153 address = 'ipc:///tmp/%s' % server
154 154
155 155 self.zmq_socket.connect(address)
156 156 time.sleep(1)
157 157 print 'zeromq configured'
158 158
159 159
160 160 def publish_data(self):
161 161 self.dataOut.finished = False
162 162 if self.mqtt is 1:
163 163 yData = self.dataOut.heightList[:2].tolist()
164 164 if self.plottype == 'spectra':
165 165 data = getattr(self.dataOut, 'data_spc')
166 166 z = data/self.dataOut.normFactor
167 167 zdB = 10*numpy.log10(z)
168 168 xlen, ylen = zdB[0].shape
169 169 dx = numpy.floor(xlen/self.__MAXNUMX) + 1
170 170 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
171 171 Z = [0 for i in self.dataOut.channelList]
172 172 for i in self.dataOut.channelList:
173 173 Z[i] = zdB[i][::dx, ::dy].tolist()
174 174 payload = {
175 175 'timestamp': self.dataOut.utctime,
176 176 'data': roundFloats(Z),
177 177 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
178 178 'interval': self.dataOut.getTimeInterval(),
179 179 'type': self.plottype,
180 180 'yData': yData
181 181 }
182 182 # print payload
183 183
184 184 elif self.plottype in ('rti', 'power'):
185 185 data = getattr(self.dataOut, 'data_spc')
186 186 z = data/self.dataOut.normFactor
187 187 avg = numpy.average(z, axis=1)
188 188 avgdB = 10*numpy.log10(avg)
189 189 xlen, ylen = z[0].shape
190 190 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
191 191 AVG = [0 for i in self.dataOut.channelList]
192 192 for i in self.dataOut.channelList:
193 193 AVG[i] = avgdB[i][::dy].tolist()
194 194 payload = {
195 195 'timestamp': self.dataOut.utctime,
196 196 'data': roundFloats(AVG),
197 197 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
198 198 'interval': self.dataOut.getTimeInterval(),
199 199 'type': self.plottype,
200 200 'yData': yData
201 201 }
202 202 elif self.plottype == 'noise':
203 203 noise = self.dataOut.getNoise()/self.dataOut.normFactor
204 204 noisedB = 10*numpy.log10(noise)
205 205 payload = {
206 206 'timestamp': self.dataOut.utctime,
207 207 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
208 208 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
209 209 'interval': self.dataOut.getTimeInterval(),
210 210 'type': self.plottype,
211 211 'yData': yData
212 212 }
213 213 elif self.plottype == 'snr':
214 214 data = getattr(self.dataOut, 'data_SNR')
215 215 avgdB = 10*numpy.log10(data)
216 216
217 217 ylen = data[0].size
218 218 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
219 219 AVG = [0 for i in self.dataOut.channelList]
220 220 for i in self.dataOut.channelList:
221 221 AVG[i] = avgdB[i][::dy].tolist()
222 222 payload = {
223 223 'timestamp': self.dataOut.utctime,
224 224 'data': roundFloats(AVG),
225 225 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
226 226 'type': self.plottype,
227 227 'yData': yData
228 228 }
229 229 else:
230 230 print "Tipo de grafico invalido"
231 231 payload = {
232 232 'data': 'None',
233 233 'timestamp': 'None',
234 234 'type': None
235 235 }
236 236 # print 'Publishing data to {}'.format(self.host)
237 237 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
238 238
239 239 if self.zeromq is 1:
240 240 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
241 241 self.zmq_socket.send_pyobj(self.dataOut)
242 242
243 243 def run(self, dataOut, **kwargs):
244 244 self.dataOut = dataOut
245 245 if not self.isConfig:
246 246 self.setup(**kwargs)
247 247 self.isConfig = True
248 248
249 249 self.publish_data()
250 250 time.sleep(self.delay)
251 251
252 252 def close(self):
253 253 if self.zeromq is 1:
254 254 self.dataOut.finished = True
255 255 self.zmq_socket.send_pyobj(self.dataOut)
256 256
257 257 if self.client:
258 258 self.client.loop_stop()
259 259 self.client.disconnect()
General Comments 0
You need to be logged in to leave comments. Login now