##// END OF EJS Templates
cambios en como se envia la data a la web
José Chávez -
r909:ec5b39ef16e6
parent child
Show More
@@ -1,393 +1,395
1 '''
1 '''
2 @author: Juan C. Espinoza
2 @author: Juan C. Espinoza
3 '''
3 '''
4
4
5 import time
5 import time
6 import json
6 import json
7 import numpy
7 import numpy
8 import paho.mqtt.client as mqtt
8 import paho.mqtt.client as mqtt
9 import zmq
9 import zmq
10 import cPickle as pickle
10 import cPickle as pickle
11 import datetime
11 import datetime
12 from zmq.utils.monitor import recv_monitor_message
12 from zmq.utils.monitor import recv_monitor_message
13 from functools import wraps
13 from functools import wraps
14 from threading import Thread
14 from threading import Thread
15 from multiprocessing import Process
15 from multiprocessing import Process
16
16
17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
18
18
19 MAXNUMX = 100
19 MAXNUMX = 100
20 MAXNUMY = 100
20 MAXNUMY = 100
21
21
22 class PrettyFloat(float):
22 class PrettyFloat(float):
23 def __repr__(self):
23 def __repr__(self):
24 return '%.2f' % self
24 return '%.2f' % self
25
25
26 def roundFloats(obj):
26 def roundFloats(obj):
27 if isinstance(obj, list):
27 if isinstance(obj, list):
28 return map(roundFloats, obj)
28 return map(roundFloats, obj)
29 elif isinstance(obj, float):
29 elif isinstance(obj, float):
30 return round(obj, 2)
30 return round(obj, 2)
31
31
32 def decimate(z):
32 def decimate(z):
33 # dx = int(len(self.x)/self.__MAXNUMX) + 1
33 # dx = int(len(self.x)/self.__MAXNUMX) + 1
34 dy = int(len(z[0])/MAXNUMY) + 1
34 dy = int(len(z[0])/MAXNUMY) + 1
35 return z[::, ::dy]
35 return z[::, ::dy]
36
36
37 class throttle(object):
37 class throttle(object):
38 """Decorator that prevents a function from being called more than once every
38 """Decorator that prevents a function from being called more than once every
39 time period.
39 time period.
40 To create a function that cannot be called more than once a minute, but
40 To create a function that cannot be called more than once a minute, but
41 will sleep until it can be called:
41 will sleep until it can be called:
42 @throttle(minutes=1)
42 @throttle(minutes=1)
43 def foo():
43 def foo():
44 pass
44 pass
45
45
46 for i in range(10):
46 for i in range(10):
47 foo()
47 foo()
48 print "This function has run %s times." % i
48 print "This function has run %s times." % i
49 """
49 """
50
50
51 def __init__(self, seconds=0, minutes=0, hours=0):
51 def __init__(self, seconds=0, minutes=0, hours=0):
52 self.throttle_period = datetime.timedelta(
52 self.throttle_period = datetime.timedelta(
53 seconds=seconds, minutes=minutes, hours=hours
53 seconds=seconds, minutes=minutes, hours=hours
54 )
54 )
55
55
56 self.time_of_last_call = datetime.datetime.min
56 self.time_of_last_call = datetime.datetime.min
57
57
58 def __call__(self, fn):
58 def __call__(self, fn):
59 @wraps(fn)
59 @wraps(fn)
60 def wrapper(*args, **kwargs):
60 def wrapper(*args, **kwargs):
61 now = datetime.datetime.now()
61 now = datetime.datetime.now()
62 time_since_last_call = now - self.time_of_last_call
62 time_since_last_call = now - self.time_of_last_call
63 time_left = self.throttle_period - time_since_last_call
63 time_left = self.throttle_period - time_since_last_call
64
64
65 if time_left > datetime.timedelta(seconds=0):
65 if time_left > datetime.timedelta(seconds=0):
66 return
66 return
67
67
68 self.time_of_last_call = datetime.datetime.now()
68 self.time_of_last_call = datetime.datetime.now()
69 return fn(*args, **kwargs)
69 return fn(*args, **kwargs)
70
70
71 return wrapper
71 return wrapper
72
72
73
73
74 class PublishData(Operation):
74 class PublishData(Operation):
75 """Clase publish."""
75 """Clase publish."""
76
76
77 def __init__(self, **kwargs):
77 def __init__(self, **kwargs):
78 """Inicio."""
78 """Inicio."""
79 Operation.__init__(self, **kwargs)
79 Operation.__init__(self, **kwargs)
80 self.isConfig = False
80 self.isConfig = False
81 self.client = None
81 self.client = None
82 self.zeromq = None
82 self.zeromq = None
83 self.mqtt = None
83 self.mqtt = None
84
84
85 def on_disconnect(self, client, userdata, rc):
85 def on_disconnect(self, client, userdata, rc):
86 if rc != 0:
86 if rc != 0:
87 print("Unexpected disconnection.")
87 print("Unexpected disconnection.")
88 self.connect()
88 self.connect()
89
89
90 def connect(self):
90 def connect(self):
91 print 'trying to connect'
91 print 'trying to connect'
92 try:
92 try:
93 self.client.connect(
93 self.client.connect(
94 host=self.host,
94 host=self.host,
95 port=self.port,
95 port=self.port,
96 keepalive=60*10,
96 keepalive=60*10,
97 bind_address='')
97 bind_address='')
98 self.client.loop_start()
98 self.client.loop_start()
99 # self.client.publish(
99 # self.client.publish(
100 # self.topic + 'SETUP',
100 # self.topic + 'SETUP',
101 # json.dumps(setup),
101 # json.dumps(setup),
102 # retain=True
102 # retain=True
103 # )
103 # )
104 except:
104 except:
105 print "MQTT Conection error."
105 print "MQTT Conection error."
106 self.client = False
106 self.client = False
107
107
108 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
108 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
109 self.counter = 0
109 self.counter = 0
110 self.topic = kwargs.get('topic', 'schain')
110 self.topic = kwargs.get('topic', 'schain')
111 self.delay = kwargs.get('delay', 0)
111 self.delay = kwargs.get('delay', 0)
112 self.plottype = kwargs.get('plottype', 'spectra')
112 self.plottype = kwargs.get('plottype', 'spectra')
113 self.host = kwargs.get('host', "10.10.10.82")
113 self.host = kwargs.get('host', "10.10.10.82")
114 self.port = kwargs.get('port', 3000)
114 self.port = kwargs.get('port', 3000)
115 self.clientId = clientId
115 self.clientId = clientId
116 self.cnt = 0
116 self.cnt = 0
117 self.zeromq = zeromq
117 self.zeromq = zeromq
118 self.mqtt = kwargs.get('plottype', 0)
118 self.mqtt = kwargs.get('plottype', 0)
119 self.client = None
119 self.client = None
120 setup = []
120 setup = []
121 if mqtt is 1:
121 if mqtt is 1:
122 self.client = mqtt.Client(
122 self.client = mqtt.Client(
123 client_id=self.clientId + self.topic + 'SCHAIN',
123 client_id=self.clientId + self.topic + 'SCHAIN',
124 clean_session=True)
124 clean_session=True)
125 self.client.on_disconnect = self.on_disconnect
125 self.client.on_disconnect = self.on_disconnect
126 self.connect()
126 self.connect()
127 for plot in self.plottype:
127 for plot in self.plottype:
128 setup.append({
128 setup.append({
129 'plot': plot,
129 'plot': plot,
130 'topic': self.topic + plot,
130 'topic': self.topic + plot,
131 'title': getattr(self, plot + '_' + 'title', False),
131 'title': getattr(self, plot + '_' + 'title', False),
132 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
132 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
133 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
133 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
134 'xrange': getattr(self, plot + '_' + 'xrange', False),
134 'xrange': getattr(self, plot + '_' + 'xrange', False),
135 'yrange': getattr(self, plot + '_' + 'yrange', False),
135 'yrange': getattr(self, plot + '_' + 'yrange', False),
136 'zrange': getattr(self, plot + '_' + 'zrange', False),
136 'zrange': getattr(self, plot + '_' + 'zrange', False),
137 })
137 })
138 if zeromq is 1:
138 if zeromq is 1:
139 context = zmq.Context()
139 context = zmq.Context()
140 self.zmq_socket = context.socket(zmq.PUSH)
140 self.zmq_socket = context.socket(zmq.PUSH)
141 server = kwargs.get('server', 'zmq.pipe')
141 server = kwargs.get('server', 'zmq.pipe')
142
142
143 if 'tcp://' in server:
143 if 'tcp://' in server:
144 address = server
144 address = server
145 else:
145 else:
146 address = 'ipc:///tmp/%s' % server
146 address = 'ipc:///tmp/%s' % server
147
147
148 self.zmq_socket.connect(address)
148 self.zmq_socket.connect(address)
149 time.sleep(1)
149 time.sleep(1)
150
150
151
151
152
152
153 def publish_data(self):
153 def publish_data(self):
154 self.dataOut.finished = False
154 self.dataOut.finished = False
155 if self.mqtt is 1:
155 if self.mqtt is 1:
156 yData = self.dataOut.heightList[:2].tolist()
156 yData = self.dataOut.heightList[:2].tolist()
157 if self.plottype == 'spectra':
157 if self.plottype == 'spectra':
158 data = getattr(self.dataOut, 'data_spc')
158 data = getattr(self.dataOut, 'data_spc')
159 z = data/self.dataOut.normFactor
159 z = data/self.dataOut.normFactor
160 zdB = 10*numpy.log10(z)
160 zdB = 10*numpy.log10(z)
161 xlen, ylen = zdB[0].shape
161 xlen, ylen = zdB[0].shape
162 dx = int(xlen/MAXNUMX) + 1
162 dx = int(xlen/MAXNUMX) + 1
163 dy = int(ylen/MAXNUMY) + 1
163 dy = int(ylen/MAXNUMY) + 1
164 Z = [0 for i in self.dataOut.channelList]
164 Z = [0 for i in self.dataOut.channelList]
165 for i in self.dataOut.channelList:
165 for i in self.dataOut.channelList:
166 Z[i] = zdB[i][::dx, ::dy].tolist()
166 Z[i] = zdB[i][::dx, ::dy].tolist()
167 payload = {
167 payload = {
168 'timestamp': self.dataOut.utctime,
168 'timestamp': self.dataOut.utctime,
169 'data': roundFloats(Z),
169 'data': roundFloats(Z),
170 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
170 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
171 'interval': self.dataOut.getTimeInterval(),
171 'interval': self.dataOut.getTimeInterval(),
172 'type': self.plottype,
172 'type': self.plottype,
173 'yData': yData
173 'yData': yData
174 }
174 }
175 # print payload
175 # print payload
176
176
177 elif self.plottype in ('rti', 'power'):
177 elif self.plottype in ('rti', 'power'):
178 data = getattr(self.dataOut, 'data_spc')
178 data = getattr(self.dataOut, 'data_spc')
179 z = data/self.dataOut.normFactor
179 z = data/self.dataOut.normFactor
180 avg = numpy.average(z, axis=1)
180 avg = numpy.average(z, axis=1)
181 avgdB = 10*numpy.log10(avg)
181 avgdB = 10*numpy.log10(avg)
182 xlen, ylen = z[0].shape
182 xlen, ylen = z[0].shape
183 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
183 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
184 AVG = [0 for i in self.dataOut.channelList]
184 AVG = [0 for i in self.dataOut.channelList]
185 for i in self.dataOut.channelList:
185 for i in self.dataOut.channelList:
186 AVG[i] = avgdB[i][::dy].tolist()
186 AVG[i] = avgdB[i][::dy].tolist()
187 payload = {
187 payload = {
188 'timestamp': self.dataOut.utctime,
188 'timestamp': self.dataOut.utctime,
189 'data': roundFloats(AVG),
189 'data': roundFloats(AVG),
190 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
190 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
191 'interval': self.dataOut.getTimeInterval(),
191 'interval': self.dataOut.getTimeInterval(),
192 'type': self.plottype,
192 'type': self.plottype,
193 'yData': yData
193 'yData': yData
194 }
194 }
195 elif self.plottype == 'noise':
195 elif self.plottype == 'noise':
196 noise = self.dataOut.getNoise()/self.dataOut.normFactor
196 noise = self.dataOut.getNoise()/self.dataOut.normFactor
197 noisedB = 10*numpy.log10(noise)
197 noisedB = 10*numpy.log10(noise)
198 payload = {
198 payload = {
199 'timestamp': self.dataOut.utctime,
199 'timestamp': self.dataOut.utctime,
200 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
200 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
201 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
201 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
202 'interval': self.dataOut.getTimeInterval(),
202 'interval': self.dataOut.getTimeInterval(),
203 'type': self.plottype,
203 'type': self.plottype,
204 'yData': yData
204 'yData': yData
205 }
205 }
206 elif self.plottype == 'snr':
206 elif self.plottype == 'snr':
207 data = getattr(self.dataOut, 'data_SNR')
207 data = getattr(self.dataOut, 'data_SNR')
208 avgdB = 10*numpy.log10(data)
208 avgdB = 10*numpy.log10(data)
209
209
210 ylen = data[0].size
210 ylen = data[0].size
211 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
211 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
212 AVG = [0 for i in self.dataOut.channelList]
212 AVG = [0 for i in self.dataOut.channelList]
213 for i in self.dataOut.channelList:
213 for i in self.dataOut.channelList:
214 AVG[i] = avgdB[i][::dy].tolist()
214 AVG[i] = avgdB[i][::dy].tolist()
215 payload = {
215 payload = {
216 'timestamp': self.dataOut.utctime,
216 'timestamp': self.dataOut.utctime,
217 'data': roundFloats(AVG),
217 'data': roundFloats(AVG),
218 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
218 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
219 'type': self.plottype,
219 'type': self.plottype,
220 'yData': yData
220 'yData': yData
221 }
221 }
222 else:
222 else:
223 print "Tipo de grafico invalido"
223 print "Tipo de grafico invalido"
224 payload = {
224 payload = {
225 'data': 'None',
225 'data': 'None',
226 'timestamp': 'None',
226 'timestamp': 'None',
227 'type': None
227 'type': None
228 }
228 }
229 # print 'Publishing data to {}'.format(self.host)
229 # print 'Publishing data to {}'.format(self.host)
230 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
230 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
231
231
232 if self.zeromq is 1:
232 if self.zeromq is 1:
233 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
233 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
234 self.zmq_socket.send_pyobj(self.dataOut)
234 self.zmq_socket.send_pyobj(self.dataOut)
235
235
236 def run(self, dataOut, **kwargs):
236 def run(self, dataOut, **kwargs):
237 self.dataOut = dataOut
237 self.dataOut = dataOut
238 if not self.isConfig:
238 if not self.isConfig:
239 self.setup(**kwargs)
239 self.setup(**kwargs)
240 self.isConfig = True
240 self.isConfig = True
241
241
242 self.publish_data()
242 self.publish_data()
243 time.sleep(self.delay)
243 time.sleep(self.delay)
244
244
245 def close(self):
245 def close(self):
246 if self.zeromq is 1:
246 if self.zeromq is 1:
247 self.dataOut.finished = True
247 self.dataOut.finished = True
248 self.zmq_socket.send_pyobj(self.dataOut)
248 self.zmq_socket.send_pyobj(self.dataOut)
249
249
250 if self.client:
250 if self.client:
251 self.client.loop_stop()
251 self.client.loop_stop()
252 self.client.disconnect()
252 self.client.disconnect()
253
253
254
254
255 class ReceiverData(ProcessingUnit, Process):
255 class ReceiverData(ProcessingUnit, Process):
256
256
257 throttle_value = 5
257 throttle_value = 5
258
258
259 def __init__(self, **kwargs):
259 def __init__(self, **kwargs):
260
260
261 ProcessingUnit.__init__(self, **kwargs)
261 ProcessingUnit.__init__(self, **kwargs)
262 Process.__init__(self)
262 Process.__init__(self)
263 self.mp = False
263 self.mp = False
264 self.isConfig = False
264 self.isConfig = False
265 self.plottypes =[]
265 self.plottypes =[]
266 self.connections = 0
266 self.connections = 0
267 server = kwargs.get('server', 'zmq.pipe')
267 server = kwargs.get('server', 'zmq.pipe')
268 if 'tcp://' in server:
268 if 'tcp://' in server:
269 address = server
269 address = server
270 else:
270 else:
271 address = 'ipc:///tmp/%s' % server
271 address = 'ipc:///tmp/%s' % server
272
272
273 self.address = address
273 self.address = address
274 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
274 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
275 self.realtime = kwargs.get('realtime', False)
275 self.realtime = kwargs.get('realtime', False)
276 self.throttle_value = kwargs.get('throttle', 10)
276 self.throttle_value = kwargs.get('throttle', 10)
277 self.sendData = self.initThrottle(self.throttle_value)
277 self.sendData = self.initThrottle(self.throttle_value)
278 self.setup()
278 self.setup()
279
279
280 def setup(self):
280 def setup(self):
281
281
282 self.data = {}
282 self.data = {}
283 self.data['times'] = []
283 self.data['times'] = []
284 for plottype in self.plottypes:
284 for plottype in self.plottypes:
285 self.data[plottype] = {}
285 self.data[plottype] = {}
286 self.data['noise'] = {}
286 self.data['noise'] = {}
287 self.data['throttle'] = self.throttle_value
287 self.data['throttle'] = self.throttle_value
288 self.data['ENDED'] = False
288 self.data['ENDED'] = False
289 self.isConfig = True
289 self.isConfig = True
290 self.data_web = {}
290 self.data_web = {}
291
291
292 def event_monitor(self, monitor):
292 def event_monitor(self, monitor):
293
293
294 events = {}
294 events = {}
295
295
296 for name in dir(zmq):
296 for name in dir(zmq):
297 if name.startswith('EVENT_'):
297 if name.startswith('EVENT_'):
298 value = getattr(zmq, name)
298 value = getattr(zmq, name)
299 events[value] = name
299 events[value] = name
300
300
301 while monitor.poll():
301 while monitor.poll():
302 evt = recv_monitor_message(monitor)
302 evt = recv_monitor_message(monitor)
303 if evt['event'] == 32:
303 if evt['event'] == 32:
304 self.connections += 1
304 self.connections += 1
305 if evt['event'] == 512:
305 if evt['event'] == 512:
306 pass
306 pass
307 if self.connections == 0 and self.started is True:
307 if self.connections == 0 and self.started is True:
308 self.ended = True
308 self.ended = True
309 # send('ENDED')
309 # send('ENDED')
310 evt.update({'description': events[evt['event']]})
310 evt.update({'description': events[evt['event']]})
311
311
312 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
312 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
313 break
313 break
314 monitor.close()
314 monitor.close()
315 print("event monitor thread done!")
315 print("event monitor thread done!")
316
316
317 def initThrottle(self, throttle_value):
317 def initThrottle(self, throttle_value):
318
318
319 @throttle(seconds=throttle_value)
319 @throttle(seconds=throttle_value)
320 def sendDataThrottled(fn_sender, data):
320 def sendDataThrottled(fn_sender, data):
321 fn_sender(data)
321 fn_sender(data)
322
322
323 return sendDataThrottled
323 return sendDataThrottled
324
324
325 def send(self, data):
325 def send(self, data):
326 # print '[sending] data=%s size=%s' % (data.keys(), len(data['times']))
326 # print '[sending] data=%s size=%s' % (data.keys(), len(data['times']))
327 self.sender.send_pyobj(data)
327 self.sender.send_pyobj(data)
328
328
329 def update(self):
329 def update(self):
330
330
331 t = self.dataOut.ltctime
331 t = self.dataOut.ltctime
332 self.data['times'].append(t)
332 self.data['times'].append(t)
333 self.data['dataOut'] = self.dataOut
333 self.data['dataOut'] = self.dataOut
334 for plottype in self.plottypes:
334 for plottype in self.plottypes:
335 if plottype == 'spc':
335 if plottype == 'spc':
336 z = self.dataOut.data_spc/self.dataOut.normFactor
336 z = self.dataOut.data_spc/self.dataOut.normFactor
337 self.data[plottype] = 10*numpy.log10(z)
337 self.data[plottype] = 10*numpy.log10(z)
338 self.data['noise'][t] = 10*numpy.log10(self.dataOut.getNoise()/self.dataOut.normFactor)
338 self.data['noise'][t] = 10*numpy.log10(self.dataOut.getNoise()/self.dataOut.normFactor)
339 if plottype == 'rti':
339 if plottype == 'rti':
340 self.data[plottype][t] = self.dataOut.getPower()
340 self.data[plottype][t] = self.dataOut.getPower()
341 if plottype == 'snr':
341 if plottype == 'snr':
342 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_SNR)
342 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_SNR)
343 if plottype == 'dop':
343 if plottype == 'dop':
344 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_DOP)
344 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_DOP)
345 if plottype == 'coh':
345 if plottype == 'coh':
346 self.data[plottype][t] = self.dataOut.getCoherence()
346 self.data[plottype][t] = self.dataOut.getCoherence()
347 if plottype == 'phase':
347 if plottype == 'phase':
348 self.data[plottype][t] = self.dataOut.getCoherence(phase=True)
348 self.data[plottype][t] = self.dataOut.getCoherence(phase=True)
349 if self.realtime:
349 if self.realtime:
350 self.data_web[plottype] = roundFloats(decimate(self.data[plottype][t]).tolist())
350 self.data_web[plottype] = roundFloats(decimate(self.data[plottype][t]).tolist())
351 self.data_web['time'] = t
351 self.data_web['time'] = t
352 self.data_web['interval'] = self.dataOut.getTimeInterval()
353 self.data_web['type'] = plottype
352 def run(self):
354 def run(self):
353
355
354 print '[Starting] {} from {}'.format(self.name, self.address)
356 print '[Starting] {} from {}'.format(self.name, self.address)
355
357
356 self.context = zmq.Context()
358 self.context = zmq.Context()
357 self.receiver = self.context.socket(zmq.PULL)
359 self.receiver = self.context.socket(zmq.PULL)
358 self.receiver.bind(self.address)
360 self.receiver.bind(self.address)
359 monitor = self.receiver.get_monitor_socket()
361 monitor = self.receiver.get_monitor_socket()
360 self.sender = self.context.socket(zmq.PUB)
362 self.sender = self.context.socket(zmq.PUB)
361 if self.realtime:
363 if self.realtime:
362 self.sender_web = self.context.socket(zmq.PUB)
364 self.sender_web = self.context.socket(zmq.PUB)
363 # self.sender_web.setsockopt(zmq.PUBLISH, 'realtime')
365 # self.sender_web.setsockopt(zmq.PUBLISH, 'realtime')
364 self.sender_web.bind("ipc:///tmp/zmq.web")
366 self.sender_web.bind("ipc:///tmp/zmq.web")
365 self.sender.bind("ipc:///tmp/zmq.plots")
367 self.sender.bind("ipc:///tmp/zmq.plots")
366
368
367 t = Thread(target=self.event_monitor, args=(monitor,))
369 t = Thread(target=self.event_monitor, args=(monitor,))
368 t.start()
370 t.start()
369
371
370 while True:
372 while True:
371 self.dataOut = self.receiver.recv_pyobj()
373 self.dataOut = self.receiver.recv_pyobj()
372 # print '[Receiving] {} - {}'.format(self.dataOut.type,
374 # print '[Receiving] {} - {}'.format(self.dataOut.type,
373 # self.dataOut.datatime.ctime())
375 # self.dataOut.datatime.ctime())
374
376
375 self.update()
377 self.update()
376
378
377 if self.dataOut.finished is True:
379 if self.dataOut.finished is True:
378 self.send(self.data)
380 self.send(self.data)
379 self.connections -= 1
381 self.connections -= 1
380 if self.connections == 0 and self.started:
382 if self.connections == 0 and self.started:
381 self.ended = True
383 self.ended = True
382 self.data['ENDED'] = True
384 self.data['ENDED'] = True
383 self.send(self.data)
385 self.send(self.data)
384 self.setup()
386 self.setup()
385 else:
387 else:
386 if self.realtime:
388 if self.realtime:
387 self.send(self.data)
389 self.send(self.data)
388 self.sender_web.send_string(json.dumps(self.data_web))
390 self.sender_web.send_string(json.dumps(self.data_web))
389 else:
391 else:
390 self.sendData(self.send, self.data)
392 self.sendData(self.send, self.data)
391 self.started = True
393 self.started = True
392
394
393 return
395 return
General Comments 0
You need to be logged in to leave comments. Login now