##// END OF EJS Templates
zmq support in PublishData
Juan C. Valdez -
r883:4d54fdaec8c6
parent child
Show More
@@ -6,8 +6,17 import time
6 6 import json
7 7 import numpy
8 8 import paho.mqtt.client as mqtt
9 import zmq
10 import cPickle as pickle
11 import datetime
12 from zmq.utils.monitor import recv_monitor_message
13 from functools import wraps
14 from threading import Thread
15 from multiprocessing import Process
9 16
10 from schainpy.model.proc.jroproc_base import Operation
17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
18
19 throttle_value = 10
11 20
12 21 class PrettyFloat(float):
13 22 def __repr__(self):
@@ -28,18 +37,55 def pretty_floats(obj):
28 37 return map(pretty_floats, obj)
29 38 return obj
30 39
40 class throttle(object):
41 """Decorator that prevents a function from being called more than once every
42 time period.
43 To create a function that cannot be called more than once a minute, but
44 will sleep until it can be called:
45 @throttle(minutes=1)
46 def foo():
47 pass
48
49 for i in range(10):
50 foo()
51 print "This function has run %s times." % i
52 """
53
54 def __init__(self, seconds=0, minutes=0, hours=0):
55 self.throttle_period = datetime.timedelta(
56 seconds=seconds, minutes=minutes, hours=hours
57 )
58 self.time_of_last_call = datetime.datetime.min
59
60 def __call__(self, fn):
61 @wraps(fn)
62 def wrapper(*args, **kwargs):
63 now = datetime.datetime.now()
64 time_since_last_call = now - self.time_of_last_call
65 time_left = self.throttle_period - time_since_last_call
66
67 if time_left > datetime.timedelta(seconds=0):
68 return
69
70 self.time_of_last_call = datetime.datetime.now()
71 return fn(*args, **kwargs)
72
73 return wrapper
74
31 75
32 76 class PublishData(Operation):
33 77 """Clase publish."""
34 78
35 __MAXNUMX = 80
36 __MAXNUMY = 80
79 __MAXNUMX = 100
80 __MAXNUMY = 100
37 81
38 def __init__(self):
82 def __init__(self, **kwargs):
39 83 """Inicio."""
40 Operation.__init__(self)
84 Operation.__init__(self, **kwargs)
41 85 self.isConfig = False
42 86 self.client = None
87 self.zeromq = None
88 self.mqtt = None
43 89
44 90 def on_disconnect(self, client, userdata, rc):
45 91 if rc != 0:
@@ -65,16 +111,26 class PublishData(Operation):
65 111 print "MQTT Conection error."
66 112 self.client = False
67 113
68 def setup(self, host, port=1883, username=None, password=None, clientId="user", **kwargs):
69
114 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
115 self.counter = 0
70 116 self.topic = kwargs.get('topic', 'schain')
71 117 self.delay = kwargs.get('delay', 0)
72 118 self.plottype = kwargs.get('plottype', 'spectra')
73 self.host = host
74 self.port = port
119 self.host = kwargs.get('host', "10.10.10.82")
120 self.port = kwargs.get('port', 3000)
75 121 self.clientId = clientId
76 122 self.cnt = 0
123 self.zeromq = zeromq
124 self.mqtt = kwargs.get('plottype', 0)
125 self.client = None
77 126 setup = []
127 if mqtt is 1:
128 print 'mqqt es 1'
129 self.client = mqtt.Client(
130 client_id=self.clientId + self.topic + 'SCHAIN',
131 clean_session=True)
132 self.client.on_disconnect = self.on_disconnect
133 self.connect()
78 134 for plot in self.plottype:
79 135 setup.append({
80 136 'plot': plot,
@@ -86,16 +142,27 class PublishData(Operation):
86 142 'yrange': getattr(self, plot + '_' + 'yrange', False),
87 143 'zrange': getattr(self, plot + '_' + 'zrange', False),
88 144 })
89 self.client = mqtt.Client(
90 client_id=self.clientId + self.topic + 'SCHAIN',
91 clean_session=True)
92 self.client.on_disconnect = self.on_disconnect
93 self.connect()
145 if zeromq is 1:
146 context = zmq.Context()
147 self.zmq_socket = context.socket(zmq.PUSH)
148 server = kwargs.get('server', 'zmq.pipe')
94 149
95 def publish_data(self, plottype):
96 data = getattr(self.dataOut, 'data_spc')
150 if 'http://' in server:
151 address = server
152 else:
153 address = 'ipc:///tmp/%s' % server
154
155 self.zmq_socket.connect(address)
156 time.sleep(1)
157 print 'zeromq configured'
158
159
160 def publish_data(self):
161 self.dataOut.finished = False
162 if self.mqtt is 1:
97 163 yData = self.dataOut.heightList[:2].tolist()
98 if plottype == 'spectra':
164 if self.plottype == 'spectra':
165 data = getattr(self.dataOut, 'data_spc')
99 166 z = data/self.dataOut.normFactor
100 167 zdB = 10*numpy.log10(z)
101 168 xlen, ylen = zdB[0].shape
@@ -109,13 +176,13 class PublishData(Operation):
109 176 'data': roundFloats(Z),
110 177 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
111 178 'interval': self.dataOut.getTimeInterval(),
112 'xRange': [0, 80],
113 'type': plottype,
179 'type': self.plottype,
114 180 'yData': yData
115 181 }
116 182 # print payload
117 183
118 elif plottype in ('rti', 'power'):
184 elif self.plottype in ('rti', 'power'):
185 data = getattr(self.dataOut, 'data_spc')
119 186 z = data/self.dataOut.normFactor
120 187 avg = numpy.average(z, axis=1)
121 188 avgdB = 10*numpy.log10(avg)
@@ -129,11 +196,10 class PublishData(Operation):
129 196 'data': roundFloats(AVG),
130 197 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
131 198 'interval': self.dataOut.getTimeInterval(),
132 'xRange': [0, 80],
133 'type': plottype,
199 'type': self.plottype,
134 200 'yData': yData
135 201 }
136 elif plottype == 'noise':
202 elif self.plottype == 'noise':
137 203 noise = self.dataOut.getNoise()/self.dataOut.normFactor
138 204 noisedB = 10*numpy.log10(noise)
139 205 payload = {
@@ -141,8 +207,23 class PublishData(Operation):
141 207 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
142 208 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
143 209 'interval': self.dataOut.getTimeInterval(),
144 'xRange': [0, 80],
145 'type': plottype,
210 'type': self.plottype,
211 'yData': yData
212 }
213 elif self.plottype == 'snr':
214 data = getattr(self.dataOut, 'data_SNR')
215 avgdB = 10*numpy.log10(data)
216
217 ylen = data[0].size
218 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
219 AVG = [0 for i in self.dataOut.channelList]
220 for i in self.dataOut.channelList:
221 AVG[i] = avgdB[i][::dy].tolist()
222 payload = {
223 'timestamp': self.dataOut.utctime,
224 'data': roundFloats(AVG),
225 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
226 'type': self.plottype,
146 227 'yData': yData
147 228 }
148 229 else:
@@ -152,21 +233,27 class PublishData(Operation):
152 233 'timestamp': 'None',
153 234 'type': None
154 235 }
155 print 'Publishing data to {}'.format(self.host)
156 print '*************************'
157 self.client.publish(self.topic + plottype, json.dumps(payload), qos=0)
236 # print 'Publishing data to {}'.format(self.host)
237 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
158 238
239 if self.zeromq is 1:
240 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
241 self.zmq_socket.send_pyobj(self.dataOut)
159 242
160 def run(self, dataOut, host, **kwargs):
243 def run(self, dataOut, **kwargs):
161 244 self.dataOut = dataOut
162 245 if not self.isConfig:
163 self.setup(host, **kwargs)
246 self.setup(**kwargs)
164 247 self.isConfig = True
165 248
166 map(self.publish_data, self.plottype)
249 self.publish_data()
167 250 time.sleep(self.delay)
168 251
169 252 def close(self):
253 if self.zeromq is 1:
254 self.dataOut.finished = True
255 self.zmq_socket.send_pyobj(self.dataOut)
256
170 257 if self.client:
171 258 self.client.loop_stop()
172 259 self.client.disconnect()
General Comments 0
You need to be logged in to leave comments. Login now