##// END OF EJS Templates
nada
José Chávez -
r913:66816cdd4da4
parent child
Show More
@@ -1,422 +1,420
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import time
6 6 import json
7 7 import numpy
8 8 import paho.mqtt.client as mqtt
9 9 import zmq
10 10 import cPickle as pickle
11 11 import datetime
12 12 from zmq.utils.monitor import recv_monitor_message
13 13 from functools import wraps
14 14 from threading import Thread
15 15 from multiprocessing import Process
16 16
17 17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
18 18
19 19 MAXNUMX = 100
20 20 MAXNUMY = 100
21 21
22 22 class PrettyFloat(float):
23 23 def __repr__(self):
24 24 return '%.2f' % self
25 25
26 26 def roundFloats(obj):
27 27 if isinstance(obj, list):
28 28 return map(roundFloats, obj)
29 29 elif isinstance(obj, float):
30 30 return round(obj, 2)
31 31
32 32 def decimate(z):
33 33 # dx = int(len(self.x)/self.__MAXNUMX) + 1
34 34 dy = int(len(z[0])/MAXNUMY) + 1
35 35 return z[::, ::dy]
36 36
37 37 class throttle(object):
38 38 """Decorator that prevents a function from being called more than once every
39 39 time period.
40 40 To create a function that cannot be called more than once a minute, but
41 41 will sleep until it can be called:
42 42 @throttle(minutes=1)
43 43 def foo():
44 44 pass
45 45
46 46 for i in range(10):
47 47 foo()
48 48 print "This function has run %s times." % i
49 49 """
50 50
51 51 def __init__(self, seconds=0, minutes=0, hours=0):
52 52 self.throttle_period = datetime.timedelta(
53 53 seconds=seconds, minutes=minutes, hours=hours
54 54 )
55 55
56 56 self.time_of_last_call = datetime.datetime.min
57 57
58 58 def __call__(self, fn):
59 59 @wraps(fn)
60 60 def wrapper(*args, **kwargs):
61 61 now = datetime.datetime.now()
62 62 time_since_last_call = now - self.time_of_last_call
63 63 time_left = self.throttle_period - time_since_last_call
64 64
65 65 if time_left > datetime.timedelta(seconds=0):
66 66 return
67 67
68 68 self.time_of_last_call = datetime.datetime.now()
69 69 return fn(*args, **kwargs)
70 70
71 71 return wrapper
72 72
73 73
74 74 class PublishData(Operation):
75 75 """Clase publish."""
76 76
77 77 def __init__(self, **kwargs):
78 78 """Inicio."""
79 79 Operation.__init__(self, **kwargs)
80 80 self.isConfig = False
81 81 self.client = None
82 82 self.zeromq = None
83 83 self.mqtt = None
84 84
85 85 def on_disconnect(self, client, userdata, rc):
86 86 if rc != 0:
87 87 print("Unexpected disconnection.")
88 88 self.connect()
89 89
90 90 def connect(self):
91 91 print 'trying to connect'
92 92 try:
93 93 self.client.connect(
94 94 host=self.host,
95 95 port=self.port,
96 96 keepalive=60*10,
97 97 bind_address='')
98 98 self.client.loop_start()
99 99 # self.client.publish(
100 100 # self.topic + 'SETUP',
101 101 # json.dumps(setup),
102 102 # retain=True
103 103 # )
104 104 except:
105 105 print "MQTT Conection error."
106 106 self.client = False
107 107
108 108 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
109 109 self.counter = 0
110 110 self.topic = kwargs.get('topic', 'schain')
111 111 self.delay = kwargs.get('delay', 0)
112 112 self.plottype = kwargs.get('plottype', 'spectra')
113 113 self.host = kwargs.get('host', "10.10.10.82")
114 114 self.port = kwargs.get('port', 3000)
115 115 self.clientId = clientId
116 116 self.cnt = 0
117 117 self.zeromq = zeromq
118 118 self.mqtt = kwargs.get('plottype', 0)
119 119 self.client = None
120 120 setup = []
121 121 if mqtt is 1:
122 122 self.client = mqtt.Client(
123 123 client_id=self.clientId + self.topic + 'SCHAIN',
124 124 clean_session=True)
125 125 self.client.on_disconnect = self.on_disconnect
126 126 self.connect()
127 127 for plot in self.plottype:
128 128 setup.append({
129 129 'plot': plot,
130 130 'topic': self.topic + plot,
131 131 'title': getattr(self, plot + '_' + 'title', False),
132 132 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
133 133 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
134 134 'xrange': getattr(self, plot + '_' + 'xrange', False),
135 135 'yrange': getattr(self, plot + '_' + 'yrange', False),
136 136 'zrange': getattr(self, plot + '_' + 'zrange', False),
137 137 })
138 138 if zeromq is 1:
139 139 context = zmq.Context()
140 140 self.zmq_socket = context.socket(zmq.PUSH)
141 141 server = kwargs.get('server', 'zmq.pipe')
142 142
143 143 if 'tcp://' in server:
144 144 address = server
145 145 else:
146 146 address = 'ipc:///tmp/%s' % server
147 147
148 148 self.zmq_socket.connect(address)
149 149 time.sleep(1)
150 150
151 151
152 152
153 153 def publish_data(self):
154 154 self.dataOut.finished = False
155 155 if self.mqtt is 1:
156 156 yData = self.dataOut.heightList[:2].tolist()
157 157 if self.plottype == 'spectra':
158 158 data = getattr(self.dataOut, 'data_spc')
159 159 z = data/self.dataOut.normFactor
160 160 zdB = 10*numpy.log10(z)
161 161 xlen, ylen = zdB[0].shape
162 162 dx = int(xlen/MAXNUMX) + 1
163 163 dy = int(ylen/MAXNUMY) + 1
164 164 Z = [0 for i in self.dataOut.channelList]
165 165 for i in self.dataOut.channelList:
166 166 Z[i] = zdB[i][::dx, ::dy].tolist()
167 167 payload = {
168 168 'timestamp': self.dataOut.utctime,
169 169 'data': roundFloats(Z),
170 170 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
171 171 'interval': self.dataOut.getTimeInterval(),
172 172 'type': self.plottype,
173 173 'yData': yData
174 174 }
175 175 # print payload
176 176
177 177 elif self.plottype in ('rti', 'power'):
178 178 data = getattr(self.dataOut, 'data_spc')
179 179 z = data/self.dataOut.normFactor
180 180 avg = numpy.average(z, axis=1)
181 181 avgdB = 10*numpy.log10(avg)
182 182 xlen, ylen = z[0].shape
183 183 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
184 184 AVG = [0 for i in self.dataOut.channelList]
185 185 for i in self.dataOut.channelList:
186 186 AVG[i] = avgdB[i][::dy].tolist()
187 187 payload = {
188 188 'timestamp': self.dataOut.utctime,
189 189 'data': roundFloats(AVG),
190 190 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
191 191 'interval': self.dataOut.getTimeInterval(),
192 192 'type': self.plottype,
193 193 'yData': yData
194 194 }
195 195 elif self.plottype == 'noise':
196 196 noise = self.dataOut.getNoise()/self.dataOut.normFactor
197 197 noisedB = 10*numpy.log10(noise)
198 198 payload = {
199 199 'timestamp': self.dataOut.utctime,
200 200 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
201 201 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
202 202 'interval': self.dataOut.getTimeInterval(),
203 203 'type': self.plottype,
204 204 'yData': yData
205 205 }
206 206 elif self.plottype == 'snr':
207 207 data = getattr(self.dataOut, 'data_SNR')
208 208 avgdB = 10*numpy.log10(data)
209 209
210 210 ylen = data[0].size
211 211 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
212 212 AVG = [0 for i in self.dataOut.channelList]
213 213 for i in self.dataOut.channelList:
214 214 AVG[i] = avgdB[i][::dy].tolist()
215 215 payload = {
216 216 'timestamp': self.dataOut.utctime,
217 217 'data': roundFloats(AVG),
218 218 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
219 219 'type': self.plottype,
220 220 'yData': yData
221 221 }
222 222 else:
223 223 print "Tipo de grafico invalido"
224 224 payload = {
225 225 'data': 'None',
226 226 'timestamp': 'None',
227 227 'type': None
228 228 }
229 229 # print 'Publishing data to {}'.format(self.host)
230 230 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
231 231
232 232 if self.zeromq is 1:
233 233 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
234 234 self.zmq_socket.send_pyobj(self.dataOut)
235 235
236 236 def run(self, dataOut, **kwargs):
237 237 self.dataOut = dataOut
238 238 if not self.isConfig:
239 239 self.setup(**kwargs)
240 240 self.isConfig = True
241 241
242 242 self.publish_data()
243 243 time.sleep(self.delay)
244 244
245 245 def close(self):
246 246 if self.zeromq is 1:
247 247 self.dataOut.finished = True
248 248 self.zmq_socket.send_pyobj(self.dataOut)
249 249
250 250 if self.client:
251 251 self.client.loop_stop()
252 252 self.client.disconnect()
253 253
254 254
255 255 class ReceiverData(ProcessingUnit, Process):
256 256
257 257 throttle_value = 5
258 258
259 259 def __init__(self, **kwargs):
260 260
261 261 ProcessingUnit.__init__(self, **kwargs)
262 262 Process.__init__(self)
263 263 self.mp = False
264 264 self.isConfig = False
265 265 self.isWebConfig = False
266 266 self.plottypes =[]
267 267 self.connections = 0
268 268 server = kwargs.get('server', 'zmq.pipe')
269 269 plot_server = kwargs.get('plot_server', 'zmq.web')
270 270 if 'tcp://' in server:
271 271 address = server
272 272 else:
273 273 address = 'ipc:///tmp/%s' % server
274 274
275 275 if 'tcp://' in plot_server:
276 276 plot_address = plot_server
277 277 else:
278 278 plot_address = 'ipc:///tmp/%s' % plot_server
279 279
280 280 self.address = address
281 281 self.plot_address = plot_address
282 282 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
283 283 self.realtime = kwargs.get('realtime', False)
284 284 self.throttle_value = kwargs.get('throttle', 10)
285 285 self.sendData = self.initThrottle(self.throttle_value)
286 286 self.setup()
287 287
288 288 def setup(self):
289 289
290 290 self.data = {}
291 291 self.data['times'] = []
292 292 for plottype in self.plottypes:
293 293 self.data[plottype] = {}
294 294 self.data['noise'] = {}
295 295 self.data['throttle'] = self.throttle_value
296 296 self.data['ENDED'] = False
297 297 self.isConfig = True
298 298 self.data_web = {}
299 299
300 300 def event_monitor(self, monitor):
301 301
302 302 events = {}
303 303
304 304 for name in dir(zmq):
305 305 if name.startswith('EVENT_'):
306 306 value = getattr(zmq, name)
307 307 events[value] = name
308 308
309 309 while monitor.poll():
310 310 evt = recv_monitor_message(monitor)
311 311 if evt['event'] == 32:
312 312 self.connections += 1
313 313 if evt['event'] == 512:
314 314 pass
315 315 if self.connections == 0 and self.started is True:
316 316 self.ended = True
317 317 # send('ENDED')
318 318 evt.update({'description': events[evt['event']]})
319 319
320 320 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
321 321 break
322 322 monitor.close()
323 323 print("event monitor thread done!")
324 324
325 325 def initThrottle(self, throttle_value):
326 326
327 327 @throttle(seconds=throttle_value)
328 328 def sendDataThrottled(fn_sender, data):
329 329 fn_sender(data)
330 330
331 331 return sendDataThrottled
332 332
333 333 def send(self, data):
334 334 # print '[sending] data=%s size=%s' % (data.keys(), len(data['times']))
335 335 self.sender.send_pyobj(data)
336 336
337 337 def update(self):
338
339 338 t = self.dataOut.ltctime
340 339 self.data['times'].append(t)
341 340 self.data['dataOut'] = self.dataOut
342 341 for plottype in self.plottypes:
343 342 if plottype == 'spc':
344 343 z = self.dataOut.data_spc/self.dataOut.normFactor
345 344 self.data[plottype] = 10*numpy.log10(z)
346 345 self.data['noise'][t] = 10*numpy.log10(self.dataOut.getNoise()/self.dataOut.normFactor)
347 346 if plottype == 'rti':
348 347 self.data[plottype][t] = self.dataOut.getPower()
349 348 if plottype == 'snr':
350 349 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_SNR)
351 350 if plottype == 'dop':
352 351 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_DOP)
353 352 if plottype == 'coh':
354 353 self.data[plottype][t] = self.dataOut.getCoherence()
355 354 if plottype == 'phase':
356 355 self.data[plottype][t] = self.dataOut.getCoherence(phase=True)
357 356 if self.realtime:
358 357 self.data_web[plottype] = roundFloats(decimate(self.data[plottype][t]).tolist())
359 self.data_web['time'] = t
358 self.data_web['timestamp'] = t
360 359 self.data_web['interval'] = self.dataOut.getTimeInterval()
361 360 self.data_web['type'] = plottype
362 361 def run(self):
363 362
364 363 print '[Starting] {} from {}'.format(self.name, self.address)
365 364
366 365 self.context = zmq.Context()
367 366 self.receiver = self.context.socket(zmq.PULL)
368 367 self.receiver.bind(self.address)
369 368 monitor = self.receiver.get_monitor_socket()
370 369 self.sender = self.context.socket(zmq.PUB)
371 if self.realtime:
372 self.sender_web = self.context.socket(zmq.PUB)
370 if self.realtime:
371 self.sender_web = self.context.socket(zmq.PUB)
373 372 self.sender_web.bind(self.plot_address)
374 373 self.sender.bind("ipc:///tmp/zmq.plots")
375 374
376 375 t = Thread(target=self.event_monitor, args=(monitor,))
377 376 t.start()
378 377
379 378 while True:
380 379 self.dataOut = self.receiver.recv_pyobj()
381 380 # print '[Receiving] {} - {}'.format(self.dataOut.type,
382 381 # self.dataOut.datatime.ctime())
383 382
384 383 self.update()
385 384
386 385 if self.dataOut.finished is True:
387 386 self.send(self.data)
388 387 self.connections -= 1
389 388 if self.connections == 0 and self.started:
390 389 self.ended = True
391 390 self.data['ENDED'] = True
392 391 self.send(self.data)
393 392 self.setup()
394 393 else:
395 394 if self.realtime:
396 395 self.send(self.data)
397 396 self.sender_web.send_string(json.dumps(self.data_web))
398 397 else:
399 398 self.sendData(self.send, self.data)
400 399 self.started = True
401 400
402 401 return
403
402
404 403 def sendToWeb(self):
405
404
406 405 if not self.isWebConfig:
407 406 context = zmq.Context()
408 407 sender_web_config = context.socket(zmq.PUB)
409 408 if 'tcp://' in self.plot_address:
410 409 print self.plot_address
411 410 dum, address, port = self.plot_address.split(':')
412 411 conf_address = '{}:{}:{}'.format(dum, address, int(port)+1)
413 412 else:
414 413 conf_address = self.plot_address + '.config'
415 sender_web_config.bind(conf_address)
416
414 sender_web_config.bind(conf_address)
415
417 416 for kwargs in self.operationKwargs.values():
418 417 if 'plot' in kwargs:
419 418 sender_web_config.send_string(json.dumps(kwargs))
420 419 print kwargs
421 420 self.isWebConfig = True
422
@@ -1,86 +1,87
1 1 import argparse
2 2
3 3 from schainpy.controller import Project, multiSchain
4 4
5 5 desc = "HF_EXAMPLE"
6 6
7 7 def fiber(cursor, skip, q, dt):
8 8
9 9 controllerObj = Project()
10 10
11 11 controllerObj.setup(id='191', name='test01', description=desc)
12 12
13 13 readUnitConfObj = controllerObj.addReadUnit(datatype='SpectraReader',
14 14 path='/home/nanosat/data/hysell_data20/pdata',
15 15 startDate=dt,
16 16 endDate=dt,
17 17 startTime="00:00:00",
18 18 endTime="23:59:59",
19 19 online=0,
20 20 #set=1426485881,
21 21 delay=10,
22 22 walk=1,
23 23 queue=q,
24 24 cursor=cursor,
25 25 skip=skip,
26 26 #timezone=-5*3600
27 27 )
28 28
29 29 # #opObj11 = readUnitConfObj.addOperation(name='printNumberOfBlock')
30 30 #
31 31 procUnitConfObj2 = controllerObj.addProcUnit(datatype='Spectra', inputId=readUnitConfObj.getId())
32 32 # opObj11 = procUnitConfObj2.addParameter(name='pairsList', value='(0,1)', format='pairslist')
33 33 #
34 34 # procUnitConfObj3 = controllerObj.addProcUnit(datatype='ParametersProc', inputId=readUnitConfObj.getId())
35 35 # opObj11 = procUnitConfObj3.addOperation(name='SpectralMoments', optype='other')
36 36
37 37 #
38 38 # opObj11 = procUnitConfObj1.addOperation(name='SpectraPlot', optype='other')
39 39 # opObj11.addParameter(name='id', value='1000', format='int')
40 40 # opObj11.addParameter(name='wintitle', value='HF_Jicamarca_Spc', format='str')
41 41 # opObj11.addParameter(name='channelList', value='0', format='intlist')
42 42 # opObj11.addParameter(name='zmin', value='-120', format='float')
43 43 # opObj11.addParameter(name='zmax', value='-70', format='float')
44 44 # opObj11.addParameter(name='save', value='1', format='int')
45 45 # opObj11.addParameter(name='figpath', value=figpath, format='str')
46 46
47 47 # opObj11 = procUnitConfObj2.addOperation(name='RTIPlot', optype='other')
48 48 # opObj11.addParameter(name='id', value='2000', format='int')
49 49 # opObj11.addParameter(name='wintitzmaxle', value='HF_Jicamarca', format='str')
50 50 # opObj11.addParameter(name='showprofile', value='0', format='int')
51 51 # # opObj11.addParameter(name='channelList', value='0', format='intlist')
52 52 # # opObj11.addParameter(name='xmin', value='0', format='float')
53 53 # opObj11.addParameter(name='xmin', value='0', format='float')
54 54 # opObj11.addParameter(name='xmax', value='24', format='float')
55 55
56 56 # opObj11.addParameter(name='zmin', value='-110', format='float')
57 57 # opObj11.addParameter(name='zmax', value='-70', format='float')
58 58 # opObj11.addParameter(name='save', value='0', format='int')
59 59 # # opObj11.addParameter(name='figpath', value='/tmp/', format='str')
60 60 #
61 61 opObj12 = procUnitConfObj2.addOperation(name='PublishData', optype='other')
62 62 opObj12.addParameter(name='zeromq', value=1, format='int')
63 63
64
64 65 # opObj13 = procUnitConfObj3.addOperation(name='PublishData', optype='other')
65 66 # opObj13.addParameter(name='zeromq', value=1, format='int')
66 67 # opObj13.addParameter(name='server', value="juanca", format='str')
67 68
68 69 opObj12.addParameter(name='delay', value=1, format='int')
69 70
70 71
71 72 # print "Escribiendo el archivo XML"
72 73 # controllerObj.writeXml(filename)
73 74 # print "Leyendo el archivo XML"
74 75 # controllerObj.readXml(filename)
75 76
76 77
77 78 # timeit.timeit('controllerObj.run()', number=2)
78 79
79 80 controllerObj.start()
80 81
81 82
82 83 if __name__ == '__main__':
83 84 parser = argparse.ArgumentParser(description='Set number of parallel processes')
84 85 parser.add_argument('--nProcess', default=1, type=int)
85 86 args = parser.parse_args()
86 87 multiSchain(fiber, nProcess=args.nProcess, startDate='2015/09/26', endDate='2015/09/26')
General Comments 0
You need to be logged in to leave comments. Login now