##// END OF EJS Templates
Merge branch 'schain_mp' of http://jro-dev.igp.gob.pe/rhodecode/schain into schain_mp
jespinoza -
r917:ad02a447a4cb merge
parent child
Show More
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
@@ -0,0 +1,1
1 =: ERROR: cannot open `=' (No such file or directory)
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
@@ -1,425 +1,433
1 '''
1 '''
2 @author: Juan C. Espinoza
2 @author: Juan C. Espinoza
3 '''
3 '''
4
4
5 import time
5 import time
6 import json
6 import json
7 import numpy
7 import numpy
8 import paho.mqtt.client as mqtt
8 import paho.mqtt.client as mqtt
9 import zmq
9 import zmq
10 import cPickle as pickle
10 import cPickle as pickle
11 import datetime
11 import datetime
12 from zmq.utils.monitor import recv_monitor_message
12 from zmq.utils.monitor import recv_monitor_message
13 from functools import wraps
13 from functools import wraps
14 from threading import Thread
14 from threading import Thread
15 from multiprocessing import Process
15 from multiprocessing import Process
16
16
17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
18
18
19 MAXNUMX = 100
19 MAXNUMX = 100
20 MAXNUMY = 100
20 MAXNUMY = 100
21
21
22 class PrettyFloat(float):
22 class PrettyFloat(float):
23 def __repr__(self):
23 def __repr__(self):
24 return '%.2f' % self
24 return '%.2f' % self
25
25
26 def roundFloats(obj):
26 def roundFloats(obj):
27 if isinstance(obj, list):
27 if isinstance(obj, list):
28 return map(roundFloats, obj)
28 return map(roundFloats, obj)
29 elif isinstance(obj, float):
29 elif isinstance(obj, float):
30 return round(obj, 2)
30 return round(obj, 2)
31
31
32 def decimate(z):
32 def decimate(z):
33 # dx = int(len(self.x)/self.__MAXNUMX) + 1
33 # dx = int(len(self.x)/self.__MAXNUMX) + 1
34
34
35 dy = int(len(z[0])/MAXNUMY) + 1
35 dy = int(len(z[0])/MAXNUMY) + 1
36
36
37 return z[::, ::dy]
37 return z[::, ::dy]
38
38
39 class throttle(object):
39 class throttle(object):
40 """Decorator that prevents a function from being called more than once every
40 """Decorator that prevents a function from being called more than once every
41 time period.
41 time period.
42 To create a function that cannot be called more than once a minute, but
42 To create a function that cannot be called more than once a minute, but
43 will sleep until it can be called:
43 will sleep until it can be called:
44 @throttle(minutes=1)
44 @throttle(minutes=1)
45 def foo():
45 def foo():
46 pass
46 pass
47
47
48 for i in range(10):
48 for i in range(10):
49 foo()
49 foo()
50 print "This function has run %s times." % i
50 print "This function has run %s times." % i
51 """
51 """
52
52
53 def __init__(self, seconds=0, minutes=0, hours=0):
53 def __init__(self, seconds=0, minutes=0, hours=0):
54 self.throttle_period = datetime.timedelta(
54 self.throttle_period = datetime.timedelta(
55 seconds=seconds, minutes=minutes, hours=hours
55 seconds=seconds, minutes=minutes, hours=hours
56 )
56 )
57
57
58 self.time_of_last_call = datetime.datetime.min
58 self.time_of_last_call = datetime.datetime.min
59
59
60 def __call__(self, fn):
60 def __call__(self, fn):
61 @wraps(fn)
61 @wraps(fn)
62 def wrapper(*args, **kwargs):
62 def wrapper(*args, **kwargs):
63 now = datetime.datetime.now()
63 now = datetime.datetime.now()
64 time_since_last_call = now - self.time_of_last_call
64 time_since_last_call = now - self.time_of_last_call
65 time_left = self.throttle_period - time_since_last_call
65 time_left = self.throttle_period - time_since_last_call
66
66
67 if time_left > datetime.timedelta(seconds=0):
67 if time_left > datetime.timedelta(seconds=0):
68 return
68 return
69
69
70 self.time_of_last_call = datetime.datetime.now()
70 self.time_of_last_call = datetime.datetime.now()
71 return fn(*args, **kwargs)
71 return fn(*args, **kwargs)
72
72
73 return wrapper
73 return wrapper
74
74
75
75
76 class PublishData(Operation):
76 class PublishData(Operation):
77 """Clase publish."""
77 """Clase publish."""
78
78
79 def __init__(self, **kwargs):
79 def __init__(self, **kwargs):
80 """Inicio."""
80 """Inicio."""
81 Operation.__init__(self, **kwargs)
81 Operation.__init__(self, **kwargs)
82 self.isConfig = False
82 self.isConfig = False
83 self.client = None
83 self.client = None
84 self.zeromq = None
84 self.zeromq = None
85 self.mqtt = None
85 self.mqtt = None
86
86
87 def on_disconnect(self, client, userdata, rc):
87 def on_disconnect(self, client, userdata, rc):
88 if rc != 0:
88 if rc != 0:
89 print("Unexpected disconnection.")
89 print("Unexpected disconnection.")
90 self.connect()
90 self.connect()
91
91
92 def connect(self):
92 def connect(self):
93 print 'trying to connect'
93 print 'trying to connect'
94 try:
94 try:
95 self.client.connect(
95 self.client.connect(
96 host=self.host,
96 host=self.host,
97 port=self.port,
97 port=self.port,
98 keepalive=60*10,
98 keepalive=60*10,
99 bind_address='')
99 bind_address='')
100 self.client.loop_start()
100 self.client.loop_start()
101 # self.client.publish(
101 # self.client.publish(
102 # self.topic + 'SETUP',
102 # self.topic + 'SETUP',
103 # json.dumps(setup),
103 # json.dumps(setup),
104 # retain=True
104 # retain=True
105 # )
105 # )
106 except:
106 except:
107 print "MQTT Conection error."
107 print "MQTT Conection error."
108 self.client = False
108 self.client = False
109
109
110 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
110 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
111 self.counter = 0
111 self.counter = 0
112 self.topic = kwargs.get('topic', 'schain')
112 self.topic = kwargs.get('topic', 'schain')
113 self.delay = kwargs.get('delay', 0)
113 self.delay = kwargs.get('delay', 0)
114 self.plottype = kwargs.get('plottype', 'spectra')
114 self.plottype = kwargs.get('plottype', 'spectra')
115 self.host = kwargs.get('host', "10.10.10.82")
115 self.host = kwargs.get('host', "10.10.10.82")
116 self.port = kwargs.get('port', 3000)
116 self.port = kwargs.get('port', 3000)
117 self.clientId = clientId
117 self.clientId = clientId
118 self.cnt = 0
118 self.cnt = 0
119 self.zeromq = zeromq
119 self.zeromq = zeromq
120 self.mqtt = kwargs.get('plottype', 0)
120 self.mqtt = kwargs.get('plottype', 0)
121 self.client = None
121 self.client = None
122 setup = []
122 setup = []
123 if mqtt is 1:
123 if mqtt is 1:
124 self.client = mqtt.Client(
124 self.client = mqtt.Client(
125 client_id=self.clientId + self.topic + 'SCHAIN',
125 client_id=self.clientId + self.topic + 'SCHAIN',
126 clean_session=True)
126 clean_session=True)
127 self.client.on_disconnect = self.on_disconnect
127 self.client.on_disconnect = self.on_disconnect
128 self.connect()
128 self.connect()
129 for plot in self.plottype:
129 for plot in self.plottype:
130 setup.append({
130 setup.append({
131 'plot': plot,
131 'plot': plot,
132 'topic': self.topic + plot,
132 'topic': self.topic + plot,
133 'title': getattr(self, plot + '_' + 'title', False),
133 'title': getattr(self, plot + '_' + 'title', False),
134 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
134 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
135 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
135 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
136 'xrange': getattr(self, plot + '_' + 'xrange', False),
136 'xrange': getattr(self, plot + '_' + 'xrange', False),
137 'yrange': getattr(self, plot + '_' + 'yrange', False),
137 'yrange': getattr(self, plot + '_' + 'yrange', False),
138 'zrange': getattr(self, plot + '_' + 'zrange', False),
138 'zrange': getattr(self, plot + '_' + 'zrange', False),
139 })
139 })
140 if zeromq is 1:
140 if zeromq is 1:
141 context = zmq.Context()
141 context = zmq.Context()
142 self.zmq_socket = context.socket(zmq.PUSH)
142 self.zmq_socket = context.socket(zmq.PUSH)
143 server = kwargs.get('server', 'zmq.pipe')
143 server = kwargs.get('server', 'zmq.pipe')
144
144
145 if 'tcp://' in server:
145 if 'tcp://' in server:
146 address = server
146 address = server
147 else:
147 else:
148 address = 'ipc:///tmp/%s' % server
148 address = 'ipc:///tmp/%s' % server
149
149
150 self.zmq_socket.connect(address)
150 self.zmq_socket.connect(address)
151 time.sleep(1)
151 time.sleep(1)
152
152
153 def publish_data(self):
153 def publish_data(self):
154 self.dataOut.finished = False
154 self.dataOut.finished = False
155 if self.mqtt is 1:
155 if self.mqtt is 1:
156 yData = self.dataOut.heightList[:2].tolist()
156 yData = self.dataOut.heightList[:2].tolist()
157 if self.plottype == 'spectra':
157 if self.plottype == 'spectra':
158 data = getattr(self.dataOut, 'data_spc')
158 data = getattr(self.dataOut, 'data_spc')
159 z = data/self.dataOut.normFactor
159 z = data/self.dataOut.normFactor
160 zdB = 10*numpy.log10(z)
160 zdB = 10*numpy.log10(z)
161 xlen, ylen = zdB[0].shape
161 xlen, ylen = zdB[0].shape
162 dx = int(xlen/MAXNUMX) + 1
162 dx = int(xlen/MAXNUMX) + 1
163 dy = int(ylen/MAXNUMY) + 1
163 dy = int(ylen/MAXNUMY) + 1
164 Z = [0 for i in self.dataOut.channelList]
164 Z = [0 for i in self.dataOut.channelList]
165 for i in self.dataOut.channelList:
165 for i in self.dataOut.channelList:
166 Z[i] = zdB[i][::dx, ::dy].tolist()
166 Z[i] = zdB[i][::dx, ::dy].tolist()
167 payload = {
167 payload = {
168 'timestamp': self.dataOut.utctime,
168 'timestamp': self.dataOut.utctime,
169 'data': roundFloats(Z),
169 'data': roundFloats(Z),
170 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
170 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
171 'interval': self.dataOut.getTimeInterval(),
171 'interval': self.dataOut.getTimeInterval(),
172 'type': self.plottype,
172 'type': self.plottype,
173 'yData': yData
173 'yData': yData
174 }
174 }
175 # print payload
175 # print payload
176
176
177 elif self.plottype in ('rti', 'power'):
177 elif self.plottype in ('rti', 'power'):
178 data = getattr(self.dataOut, 'data_spc')
178 data = getattr(self.dataOut, 'data_spc')
179 z = data/self.dataOut.normFactor
179 z = data/self.dataOut.normFactor
180 avg = numpy.average(z, axis=1)
180 avg = numpy.average(z, axis=1)
181 avgdB = 10*numpy.log10(avg)
181 avgdB = 10*numpy.log10(avg)
182 xlen, ylen = z[0].shape
182 xlen, ylen = z[0].shape
183 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
183 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
184 AVG = [0 for i in self.dataOut.channelList]
184 AVG = [0 for i in self.dataOut.channelList]
185 for i in self.dataOut.channelList:
185 for i in self.dataOut.channelList:
186 AVG[i] = avgdB[i][::dy].tolist()
186 AVG[i] = avgdB[i][::dy].tolist()
187 payload = {
187 payload = {
188 'timestamp': self.dataOut.utctime,
188 'timestamp': self.dataOut.utctime,
189 'data': roundFloats(AVG),
189 'data': roundFloats(AVG),
190 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
190 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
191 'interval': self.dataOut.getTimeInterval(),
191 'interval': self.dataOut.getTimeInterval(),
192 'type': self.plottype,
192 'type': self.plottype,
193 'yData': yData
193 'yData': yData
194 }
194 }
195 elif self.plottype == 'noise':
195 elif self.plottype == 'noise':
196 noise = self.dataOut.getNoise()/self.dataOut.normFactor
196 noise = self.dataOut.getNoise()/self.dataOut.normFactor
197 noisedB = 10*numpy.log10(noise)
197 noisedB = 10*numpy.log10(noise)
198 payload = {
198 payload = {
199 'timestamp': self.dataOut.utctime,
199 'timestamp': self.dataOut.utctime,
200 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
200 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
201 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
201 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
202 'interval': self.dataOut.getTimeInterval(),
202 'interval': self.dataOut.getTimeInterval(),
203 'type': self.plottype,
203 'type': self.plottype,
204 'yData': yData
204 'yData': yData
205 }
205 }
206 elif self.plottype == 'snr':
206 elif self.plottype == 'snr':
207 data = getattr(self.dataOut, 'data_SNR')
207 data = getattr(self.dataOut, 'data_SNR')
208 avgdB = 10*numpy.log10(data)
208 avgdB = 10*numpy.log10(data)
209
209
210 ylen = data[0].size
210 ylen = data[0].size
211 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
211 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
212 AVG = [0 for i in self.dataOut.channelList]
212 AVG = [0 for i in self.dataOut.channelList]
213 for i in self.dataOut.channelList:
213 for i in self.dataOut.channelList:
214 AVG[i] = avgdB[i][::dy].tolist()
214 AVG[i] = avgdB[i][::dy].tolist()
215 payload = {
215 payload = {
216 'timestamp': self.dataOut.utctime,
216 'timestamp': self.dataOut.utctime,
217 'data': roundFloats(AVG),
217 'data': roundFloats(AVG),
218 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
218 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
219 'type': self.plottype,
219 'type': self.plottype,
220 'yData': yData
220 'yData': yData
221 }
221 }
222 else:
222 else:
223 print "Tipo de grafico invalido"
223 print "Tipo de grafico invalido"
224 payload = {
224 payload = {
225 'data': 'None',
225 'data': 'None',
226 'timestamp': 'None',
226 'timestamp': 'None',
227 'type': None
227 'type': None
228 }
228 }
229 # print 'Publishing data to {}'.format(self.host)
229 # print 'Publishing data to {}'.format(self.host)
230 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
230 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
231
231
232 if self.zeromq is 1:
232 if self.zeromq is 1:
233 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
233 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
234 self.zmq_socket.send_pyobj(self.dataOut)
234 self.zmq_socket.send_pyobj(self.dataOut)
235
235
236 def run(self, dataOut, **kwargs):
236 def run(self, dataOut, **kwargs):
237 self.dataOut = dataOut
237 self.dataOut = dataOut
238 if not self.isConfig:
238 if not self.isConfig:
239 self.setup(**kwargs)
239 self.setup(**kwargs)
240 self.isConfig = True
240 self.isConfig = True
241
241
242 self.publish_data()
242 self.publish_data()
243 time.sleep(self.delay)
243 time.sleep(self.delay)
244
244
245 def close(self):
245 def close(self):
246 if self.zeromq is 1:
246 if self.zeromq is 1:
247 self.dataOut.finished = True
247 self.dataOut.finished = True
248 self.zmq_socket.send_pyobj(self.dataOut)
248 self.zmq_socket.send_pyobj(self.dataOut)
249
249
250 if self.client:
250 if self.client:
251 self.client.loop_stop()
251 self.client.loop_stop()
252 self.client.disconnect()
252 self.client.disconnect()
253
253
254
254
255 class ReceiverData(ProcessingUnit, Process):
255 class ReceiverData(ProcessingUnit, Process):
256
256
257 throttle_value = 5
257 throttle_value = 5
258
258
259 def __init__(self, **kwargs):
259 def __init__(self, **kwargs):
260
260
261 ProcessingUnit.__init__(self, **kwargs)
261 ProcessingUnit.__init__(self, **kwargs)
262 Process.__init__(self)
262 Process.__init__(self)
263 self.mp = False
263 self.mp = False
264 self.isConfig = False
264 self.isConfig = False
265 self.isWebConfig = False
265 self.isWebConfig = False
266 self.plottypes =[]
266 self.plottypes =[]
267 self.connections = 0
267 self.connections = 0
268 server = kwargs.get('server', 'zmq.pipe')
268 server = kwargs.get('server', 'zmq.pipe')
269 plot_server = kwargs.get('plot_server', 'zmq.web')
269 plot_server = kwargs.get('plot_server', 'zmq.web')
270 if 'tcp://' in server:
270 if 'tcp://' in server:
271 address = server
271 address = server
272 else:
272 else:
273 address = 'ipc:///tmp/%s' % server
273 address = 'ipc:///tmp/%s' % server
274
274
275 if 'tcp://' in plot_server:
275 if 'tcp://' in plot_server:
276 plot_address = plot_server
276 plot_address = plot_server
277 else:
277 else:
278 plot_address = 'ipc:///tmp/%s' % plot_server
278 plot_address = 'ipc:///tmp/%s' % plot_server
279
279
280 self.address = address
280 self.address = address
281 self.plot_address = plot_address
281 self.plot_address = plot_address
282 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
282 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
283 self.realtime = kwargs.get('realtime', False)
283 self.realtime = kwargs.get('realtime', False)
284 self.throttle_value = kwargs.get('throttle', 10)
284 self.throttle_value = kwargs.get('throttle', 10)
285 self.sendData = self.initThrottle(self.throttle_value)
285 self.sendData = self.initThrottle(self.throttle_value)
286 self.setup()
286 self.setup()
287
287
288 def setup(self):
288 def setup(self):
289
289
290 self.data = {}
290 self.data = {}
291 self.data['times'] = []
291 self.data['times'] = []
292 for plottype in self.plottypes:
292 for plottype in self.plottypes:
293 self.data[plottype] = {}
293 self.data[plottype] = {}
294 self.data['noise'] = {}
294 self.data['noise'] = {}
295 self.data['throttle'] = self.throttle_value
295 self.data['throttle'] = self.throttle_value
296 self.data['ENDED'] = False
296 self.data['ENDED'] = False
297 self.isConfig = True
297 self.isConfig = True
298 self.data_web = {}
298 self.data_web = {}
299
299
300 def event_monitor(self, monitor):
300 def event_monitor(self, monitor):
301
301
302 events = {}
302 events = {}
303
303
304 for name in dir(zmq):
304 for name in dir(zmq):
305 if name.startswith('EVENT_'):
305 if name.startswith('EVENT_'):
306 value = getattr(zmq, name)
306 value = getattr(zmq, name)
307 events[value] = name
307 events[value] = name
308
308
309 while monitor.poll():
309 while monitor.poll():
310 evt = recv_monitor_message(monitor)
310 evt = recv_monitor_message(monitor)
311 if evt['event'] == 32:
311 if evt['event'] == 32:
312 self.connections += 1
312 self.connections += 1
313 if evt['event'] == 512:
313 if evt['event'] == 512:
314 pass
314 pass
315 if self.connections == 0 and self.started is True:
315 if self.connections == 0 and self.started is True:
316 self.ended = True
316 self.ended = True
317 # send('ENDED')
317 # send('ENDED')
318 evt.update({'description': events[evt['event']]})
318 evt.update({'description': events[evt['event']]})
319
319
320 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
320 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
321 break
321 break
322 monitor.close()
322 monitor.close()
323 print("event monitor thread done!")
323 print("event monitor thread done!")
324
324
325 def initThrottle(self, throttle_value):
325 def initThrottle(self, throttle_value):
326
326
327 @throttle(seconds=throttle_value)
327 @throttle(seconds=throttle_value)
328 def sendDataThrottled(fn_sender, data):
328 def sendDataThrottled(fn_sender, data):
329 fn_sender(data)
329 fn_sender(data)
330
330
331 return sendDataThrottled
331 return sendDataThrottled
332
332
333 def send(self, data):
333 def send(self, data):
334 # print '[sending] data=%s size=%s' % (data.keys(), len(data['times']))
334 # print '[sending] data=%s size=%s' % (data.keys(), len(data['times']))
335 self.sender.send_pyobj(data)
335 self.sender.send_pyobj(data)
336
336
337 def update(self):
337 def update(self):
338
339 t = self.dataOut.ltctime
338 t = self.dataOut.ltctime
340 self.data['times'].append(t)
339 self.data['times'].append(t)
341 self.data['dataOut'] = self.dataOut
340 self.data['dataOut'] = self.dataOut
342 for plottype in self.plottypes:
341 for plottype in self.plottypes:
343 if plottype == 'spc':
342 if plottype == 'spc':
344 z = self.dataOut.data_spc/self.dataOut.normFactor
343 z = self.dataOut.data_spc/self.dataOut.normFactor
345 self.data[plottype] = 10*numpy.log10(z)
344 self.data[plottype] = 10*numpy.log10(z)
346 self.data['noise'][t] = 10*numpy.log10(self.dataOut.getNoise()/self.dataOut.normFactor)
345 self.data['noise'][t] = 10*numpy.log10(self.dataOut.getNoise()/self.dataOut.normFactor)
347 if plottype == 'rti':
346 if plottype == 'rti':
348 self.data[plottype][t] = self.dataOut.getPower()
347 self.data[plottype][t] = self.dataOut.getPower()
349 if plottype == 'snr':
348 if plottype == 'snr':
350 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_SNR)
349 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_SNR)
351 if plottype == 'dop':
350 if plottype == 'dop':
352 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_DOP)
351 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_DOP)
353 if plottype == 'coh':
352 if plottype == 'coh':
354 self.data[plottype][t] = self.dataOut.getCoherence()
353 self.data[plottype][t] = self.dataOut.getCoherence()
355 if plottype == 'phase':
354 if plottype == 'phase':
356 self.data[plottype][t] = self.dataOut.getCoherence(phase=True)
355 self.data[plottype][t] = self.dataOut.getCoherence(phase=True)
357 if self.realtime:
356 if self.realtime:
357 <<<<<<< HEAD
358 self.data_web[plottype] = roundFloats(decimate(self.data[plottype][t]).tolist())
359 self.data_web['timestamp'] = t
360 =======
358 if plottype == 'spc':
361 if plottype == 'spc':
359 self.data_web[plottype] = roundFloats(decimate(self.data[plottype]).tolist())
362 self.data_web[plottype] = roundFloats(decimate(self.data[plottype]).tolist())
360 else:
363 else:
361 self.data_web[plottype] = roundFloats(decimate(self.data[plottype][t]).tolist())
364 self.data_web[plottype] = roundFloats(decimate(self.data[plottype][t]).tolist())
362 self.data_web['time'] = t
365 self.data_web['time'] = t
366 >>>>>>> f65929d2cf32d4dddb2d5fa2a72f3970d4d51812
363 self.data_web['interval'] = self.dataOut.getTimeInterval()
367 self.data_web['interval'] = self.dataOut.getTimeInterval()
364 self.data_web['type'] = plottype
368 self.data_web['type'] = plottype
365
369
366 def run(self):
370 def run(self):
367
371
368 print '[Starting] {} from {}'.format(self.name, self.address)
372 print '[Starting] {} from {}'.format(self.name, self.address)
369
373
370 self.context = zmq.Context()
374 self.context = zmq.Context()
371 self.receiver = self.context.socket(zmq.PULL)
375 self.receiver = self.context.socket(zmq.PULL)
372 self.receiver.bind(self.address)
376 self.receiver.bind(self.address)
373 monitor = self.receiver.get_monitor_socket()
377 monitor = self.receiver.get_monitor_socket()
374 self.sender = self.context.socket(zmq.PUB)
378 self.sender = self.context.socket(zmq.PUB)
375 if self.realtime:
379 if self.realtime:
376 self.sender_web = self.context.socket(zmq.PUB)
380 self.sender_web = self.context.socket(zmq.PUB)
377 self.sender_web.connect(self.plot_address)
381 self.sender_web.connect(self.plot_address)
378 time.sleep(1)
382 time.sleep(1)
379 self.sender.bind("ipc:///tmp/zmq.plots")
383 self.sender.bind("ipc:///tmp/zmq.plots")
380
384
381 t = Thread(target=self.event_monitor, args=(monitor,))
385 t = Thread(target=self.event_monitor, args=(monitor,))
382 t.start()
386 t.start()
383
387
384 while True:
388 while True:
385 self.dataOut = self.receiver.recv_pyobj()
389 self.dataOut = self.receiver.recv_pyobj()
386 # print '[Receiving] {} - {}'.format(self.dataOut.type,
390 # print '[Receiving] {} - {}'.format(self.dataOut.type,
387 # self.dataOut.datatime.ctime())
391 # self.dataOut.datatime.ctime())
388
392
389 self.update()
393 self.update()
390
394
391 if self.dataOut.finished is True:
395 if self.dataOut.finished is True:
392 self.send(self.data)
396 self.send(self.data)
393 self.connections -= 1
397 self.connections -= 1
394 if self.connections == 0 and self.started:
398 if self.connections == 0 and self.started:
395 self.ended = True
399 self.ended = True
396 self.data['ENDED'] = True
400 self.data['ENDED'] = True
397 self.send(self.data)
401 self.send(self.data)
398 self.setup()
402 self.setup()
399 else:
403 else:
400 if self.realtime:
404 if self.realtime:
401 self.send(self.data)
405 self.send(self.data)
402 self.sender_web.send_string(json.dumps(self.data_web))
406 self.sender_web.send_string(json.dumps(self.data_web))
403 else:
407 else:
404 self.sendData(self.send, self.data)
408 self.sendData(self.send, self.data)
405 self.started = True
409 self.started = True
406
410
407 return
411 return
408
412
409 def sendToWeb(self):
413 def sendToWeb(self):
410
414
411 if not self.isWebConfig:
415 if not self.isWebConfig:
412 context = zmq.Context()
416 context = zmq.Context()
413 sender_web_config = context.socket(zmq.PUB)
417 sender_web_config = context.socket(zmq.PUB)
414 if 'tcp://' in self.plot_address:
418 if 'tcp://' in self.plot_address:
415 dum, address, port = self.plot_address.split(':')
419 dum, address, port = self.plot_address.split(':')
416 conf_address = '{}:{}:{}'.format(dum, address, int(port)+1)
420 conf_address = '{}:{}:{}'.format(dum, address, int(port)+1)
417 else:
421 else:
418 conf_address = self.plot_address + '.config'
422 conf_address = self.plot_address + '.config'
419 sender_web_config.bind(conf_address)
423 sender_web_config.bind(conf_address)
424 <<<<<<< HEAD
425
426 =======
420 time.sleep(1)
427 time.sleep(1)
428 >>>>>>> f65929d2cf32d4dddb2d5fa2a72f3970d4d51812
421 for kwargs in self.operationKwargs.values():
429 for kwargs in self.operationKwargs.values():
422 if 'plot' in kwargs:
430 if 'plot' in kwargs:
423 print '[Sending] Config data to web for {}'.format(kwargs['code'].upper())
431 print '[Sending] Config data to web for {}'.format(kwargs['code'].upper())
424 sender_web_config.send_string(json.dumps(kwargs))
432 sender_web_config.send_string(json.dumps(kwargs))
425 self.isWebConfig = True
433 self.isWebConfig = True
@@ -1,86 +1,87
1 import argparse
1 import argparse
2
2
3 from schainpy.controller import Project, multiSchain
3 from schainpy.controller import Project, multiSchain
4
4
5 desc = "HF_EXAMPLE"
5 desc = "HF_EXAMPLE"
6
6
7 def fiber(cursor, skip, q, dt):
7 def fiber(cursor, skip, q, dt):
8
8
9 controllerObj = Project()
9 controllerObj = Project()
10
10
11 controllerObj.setup(id='191', name='test01', description=desc)
11 controllerObj.setup(id='191', name='test01', description=desc)
12
12
13 readUnitConfObj = controllerObj.addReadUnit(datatype='SpectraReader',
13 readUnitConfObj = controllerObj.addReadUnit(datatype='SpectraReader',
14 path='/home/nanosat/data/hysell_data20/pdata',
14 path='/home/nanosat/data/hysell_data20/pdata',
15 startDate=dt,
15 startDate=dt,
16 endDate=dt,
16 endDate=dt,
17 startTime="00:00:00",
17 startTime="00:00:00",
18 endTime="23:59:59",
18 endTime="23:59:59",
19 online=0,
19 online=0,
20 #set=1426485881,
20 #set=1426485881,
21 delay=10,
21 delay=10,
22 walk=1,
22 walk=1,
23 queue=q,
23 queue=q,
24 cursor=cursor,
24 cursor=cursor,
25 skip=skip,
25 skip=skip,
26 #timezone=-5*3600
26 #timezone=-5*3600
27 )
27 )
28
28
29 # #opObj11 = readUnitConfObj.addOperation(name='printNumberOfBlock')
29 # #opObj11 = readUnitConfObj.addOperation(name='printNumberOfBlock')
30 #
30 #
31 procUnitConfObj2 = controllerObj.addProcUnit(datatype='Spectra', inputId=readUnitConfObj.getId())
31 procUnitConfObj2 = controllerObj.addProcUnit(datatype='Spectra', inputId=readUnitConfObj.getId())
32 # opObj11 = procUnitConfObj2.addParameter(name='pairsList', value='(0,1)', format='pairslist')
32 # opObj11 = procUnitConfObj2.addParameter(name='pairsList', value='(0,1)', format='pairslist')
33 #
33 #
34 # procUnitConfObj3 = controllerObj.addProcUnit(datatype='ParametersProc', inputId=readUnitConfObj.getId())
34 # procUnitConfObj3 = controllerObj.addProcUnit(datatype='ParametersProc', inputId=readUnitConfObj.getId())
35 # opObj11 = procUnitConfObj3.addOperation(name='SpectralMoments', optype='other')
35 # opObj11 = procUnitConfObj3.addOperation(name='SpectralMoments', optype='other')
36
36
37 #
37 #
38 # opObj11 = procUnitConfObj1.addOperation(name='SpectraPlot', optype='other')
38 # opObj11 = procUnitConfObj1.addOperation(name='SpectraPlot', optype='other')
39 # opObj11.addParameter(name='id', value='1000', format='int')
39 # opObj11.addParameter(name='id', value='1000', format='int')
40 # opObj11.addParameter(name='wintitle', value='HF_Jicamarca_Spc', format='str')
40 # opObj11.addParameter(name='wintitle', value='HF_Jicamarca_Spc', format='str')
41 # opObj11.addParameter(name='channelList', value='0', format='intlist')
41 # opObj11.addParameter(name='channelList', value='0', format='intlist')
42 # opObj11.addParameter(name='zmin', value='-120', format='float')
42 # opObj11.addParameter(name='zmin', value='-120', format='float')
43 # opObj11.addParameter(name='zmax', value='-70', format='float')
43 # opObj11.addParameter(name='zmax', value='-70', format='float')
44 # opObj11.addParameter(name='save', value='1', format='int')
44 # opObj11.addParameter(name='save', value='1', format='int')
45 # opObj11.addParameter(name='figpath', value=figpath, format='str')
45 # opObj11.addParameter(name='figpath', value=figpath, format='str')
46
46
47 # opObj11 = procUnitConfObj2.addOperation(name='RTIPlot', optype='other')
47 # opObj11 = procUnitConfObj2.addOperation(name='RTIPlot', optype='other')
48 # opObj11.addParameter(name='id', value='2000', format='int')
48 # opObj11.addParameter(name='id', value='2000', format='int')
49 # opObj11.addParameter(name='wintitzmaxle', value='HF_Jicamarca', format='str')
49 # opObj11.addParameter(name='wintitzmaxle', value='HF_Jicamarca', format='str')
50 # opObj11.addParameter(name='showprofile', value='0', format='int')
50 # opObj11.addParameter(name='showprofile', value='0', format='int')
51 # # opObj11.addParameter(name='channelList', value='0', format='intlist')
51 # # opObj11.addParameter(name='channelList', value='0', format='intlist')
52 # # opObj11.addParameter(name='xmin', value='0', format='float')
52 # # opObj11.addParameter(name='xmin', value='0', format='float')
53 # opObj11.addParameter(name='xmin', value='0', format='float')
53 # opObj11.addParameter(name='xmin', value='0', format='float')
54 # opObj11.addParameter(name='xmax', value='24', format='float')
54 # opObj11.addParameter(name='xmax', value='24', format='float')
55
55
56 # opObj11.addParameter(name='zmin', value='-110', format='float')
56 # opObj11.addParameter(name='zmin', value='-110', format='float')
57 # opObj11.addParameter(name='zmax', value='-70', format='float')
57 # opObj11.addParameter(name='zmax', value='-70', format='float')
58 # opObj11.addParameter(name='save', value='0', format='int')
58 # opObj11.addParameter(name='save', value='0', format='int')
59 # # opObj11.addParameter(name='figpath', value='/tmp/', format='str')
59 # # opObj11.addParameter(name='figpath', value='/tmp/', format='str')
60 #
60 #
61 opObj12 = procUnitConfObj2.addOperation(name='PublishData', optype='other')
61 opObj12 = procUnitConfObj2.addOperation(name='PublishData', optype='other')
62 opObj12.addParameter(name='zeromq', value=1, format='int')
62 opObj12.addParameter(name='zeromq', value=1, format='int')
63
63
64
64 # opObj13 = procUnitConfObj3.addOperation(name='PublishData', optype='other')
65 # opObj13 = procUnitConfObj3.addOperation(name='PublishData', optype='other')
65 # opObj13.addParameter(name='zeromq', value=1, format='int')
66 # opObj13.addParameter(name='zeromq', value=1, format='int')
66 # opObj13.addParameter(name='server', value="juanca", format='str')
67 # opObj13.addParameter(name='server', value="juanca", format='str')
67
68
68 opObj12.addParameter(name='delay', value=1, format='int')
69 opObj12.addParameter(name='delay', value=1, format='int')
69
70
70
71
71 # print "Escribiendo el archivo XML"
72 # print "Escribiendo el archivo XML"
72 # controllerObj.writeXml(filename)
73 # controllerObj.writeXml(filename)
73 # print "Leyendo el archivo XML"
74 # print "Leyendo el archivo XML"
74 # controllerObj.readXml(filename)
75 # controllerObj.readXml(filename)
75
76
76
77
77 # timeit.timeit('controllerObj.run()', number=2)
78 # timeit.timeit('controllerObj.run()', number=2)
78
79
79 controllerObj.start()
80 controllerObj.start()
80
81
81
82
82 if __name__ == '__main__':
83 if __name__ == '__main__':
83 parser = argparse.ArgumentParser(description='Set number of parallel processes')
84 parser = argparse.ArgumentParser(description='Set number of parallel processes')
84 parser.add_argument('--nProcess', default=1, type=int)
85 parser.add_argument('--nProcess', default=1, type=int)
85 args = parser.parse_args()
86 args = parser.parse_args()
86 multiSchain(fiber, nProcess=args.nProcess, startDate='2015/09/26', endDate='2015/09/26')
87 multiSchain(fiber, nProcess=args.nProcess, startDate='2015/09/26', endDate='2015/09/26')
General Comments 0
You need to be logged in to leave comments. Login now