##// END OF EJS Templates
Merge branch 'schain_mp' of http://jro-dev.igp.gob.pe/rhodecode/schain into schain_mp
jespinoza -
r917:ad02a447a4cb merge
parent child
Show More
1 NO CONTENT: new file 100644
@@ -0,0 +1,1
1 =: ERROR: cannot open `=' (No such file or directory)
1 NO CONTENT: new file 100644
1 NO CONTENT: new file 100644
1 NO CONTENT: new file 100644
1 NO CONTENT: new file 100644
1 NO CONTENT: new file 100644
1 NO CONTENT: new file 100644
1 NO CONTENT: new file 100644
1 NO CONTENT: new file 100644
@@ -1,425 +1,433
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import time
6 6 import json
7 7 import numpy
8 8 import paho.mqtt.client as mqtt
9 9 import zmq
10 10 import cPickle as pickle
11 11 import datetime
12 12 from zmq.utils.monitor import recv_monitor_message
13 13 from functools import wraps
14 14 from threading import Thread
15 15 from multiprocessing import Process
16 16
17 17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
18 18
19 19 MAXNUMX = 100
20 20 MAXNUMY = 100
21 21
22 22 class PrettyFloat(float):
23 23 def __repr__(self):
24 24 return '%.2f' % self
25 25
26 26 def roundFloats(obj):
27 27 if isinstance(obj, list):
28 28 return map(roundFloats, obj)
29 29 elif isinstance(obj, float):
30 30 return round(obj, 2)
31 31
32 32 def decimate(z):
33 33 # dx = int(len(self.x)/self.__MAXNUMX) + 1
34 34
35 35 dy = int(len(z[0])/MAXNUMY) + 1
36 36
37 37 return z[::, ::dy]
38 38
39 39 class throttle(object):
40 40 """Decorator that prevents a function from being called more than once every
41 41 time period.
42 42 To create a function that cannot be called more than once a minute, but
43 43 will sleep until it can be called:
44 44 @throttle(minutes=1)
45 45 def foo():
46 46 pass
47 47
48 48 for i in range(10):
49 49 foo()
50 50 print "This function has run %s times." % i
51 51 """
52 52
53 53 def __init__(self, seconds=0, minutes=0, hours=0):
54 54 self.throttle_period = datetime.timedelta(
55 55 seconds=seconds, minutes=minutes, hours=hours
56 56 )
57 57
58 58 self.time_of_last_call = datetime.datetime.min
59 59
60 60 def __call__(self, fn):
61 61 @wraps(fn)
62 62 def wrapper(*args, **kwargs):
63 63 now = datetime.datetime.now()
64 64 time_since_last_call = now - self.time_of_last_call
65 65 time_left = self.throttle_period - time_since_last_call
66 66
67 67 if time_left > datetime.timedelta(seconds=0):
68 68 return
69 69
70 70 self.time_of_last_call = datetime.datetime.now()
71 71 return fn(*args, **kwargs)
72 72
73 73 return wrapper
74 74
75 75
76 76 class PublishData(Operation):
77 77 """Clase publish."""
78 78
79 79 def __init__(self, **kwargs):
80 80 """Inicio."""
81 81 Operation.__init__(self, **kwargs)
82 82 self.isConfig = False
83 83 self.client = None
84 84 self.zeromq = None
85 85 self.mqtt = None
86 86
87 87 def on_disconnect(self, client, userdata, rc):
88 88 if rc != 0:
89 89 print("Unexpected disconnection.")
90 90 self.connect()
91 91
92 92 def connect(self):
93 93 print 'trying to connect'
94 94 try:
95 95 self.client.connect(
96 96 host=self.host,
97 97 port=self.port,
98 98 keepalive=60*10,
99 99 bind_address='')
100 100 self.client.loop_start()
101 101 # self.client.publish(
102 102 # self.topic + 'SETUP',
103 103 # json.dumps(setup),
104 104 # retain=True
105 105 # )
106 106 except:
107 107 print "MQTT Conection error."
108 108 self.client = False
109 109
110 110 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
111 111 self.counter = 0
112 112 self.topic = kwargs.get('topic', 'schain')
113 113 self.delay = kwargs.get('delay', 0)
114 114 self.plottype = kwargs.get('plottype', 'spectra')
115 115 self.host = kwargs.get('host', "10.10.10.82")
116 116 self.port = kwargs.get('port', 3000)
117 117 self.clientId = clientId
118 118 self.cnt = 0
119 119 self.zeromq = zeromq
120 120 self.mqtt = kwargs.get('plottype', 0)
121 121 self.client = None
122 122 setup = []
123 123 if mqtt is 1:
124 124 self.client = mqtt.Client(
125 125 client_id=self.clientId + self.topic + 'SCHAIN',
126 126 clean_session=True)
127 127 self.client.on_disconnect = self.on_disconnect
128 128 self.connect()
129 129 for plot in self.plottype:
130 130 setup.append({
131 131 'plot': plot,
132 132 'topic': self.topic + plot,
133 133 'title': getattr(self, plot + '_' + 'title', False),
134 134 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
135 135 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
136 136 'xrange': getattr(self, plot + '_' + 'xrange', False),
137 137 'yrange': getattr(self, plot + '_' + 'yrange', False),
138 138 'zrange': getattr(self, plot + '_' + 'zrange', False),
139 139 })
140 140 if zeromq is 1:
141 141 context = zmq.Context()
142 142 self.zmq_socket = context.socket(zmq.PUSH)
143 143 server = kwargs.get('server', 'zmq.pipe')
144 144
145 145 if 'tcp://' in server:
146 146 address = server
147 147 else:
148 148 address = 'ipc:///tmp/%s' % server
149 149
150 150 self.zmq_socket.connect(address)
151 151 time.sleep(1)
152 152
153 153 def publish_data(self):
154 154 self.dataOut.finished = False
155 155 if self.mqtt is 1:
156 156 yData = self.dataOut.heightList[:2].tolist()
157 157 if self.plottype == 'spectra':
158 158 data = getattr(self.dataOut, 'data_spc')
159 159 z = data/self.dataOut.normFactor
160 160 zdB = 10*numpy.log10(z)
161 161 xlen, ylen = zdB[0].shape
162 162 dx = int(xlen/MAXNUMX) + 1
163 163 dy = int(ylen/MAXNUMY) + 1
164 164 Z = [0 for i in self.dataOut.channelList]
165 165 for i in self.dataOut.channelList:
166 166 Z[i] = zdB[i][::dx, ::dy].tolist()
167 167 payload = {
168 168 'timestamp': self.dataOut.utctime,
169 169 'data': roundFloats(Z),
170 170 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
171 171 'interval': self.dataOut.getTimeInterval(),
172 172 'type': self.plottype,
173 173 'yData': yData
174 174 }
175 175 # print payload
176 176
177 177 elif self.plottype in ('rti', 'power'):
178 178 data = getattr(self.dataOut, 'data_spc')
179 179 z = data/self.dataOut.normFactor
180 180 avg = numpy.average(z, axis=1)
181 181 avgdB = 10*numpy.log10(avg)
182 182 xlen, ylen = z[0].shape
183 183 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
184 184 AVG = [0 for i in self.dataOut.channelList]
185 185 for i in self.dataOut.channelList:
186 186 AVG[i] = avgdB[i][::dy].tolist()
187 187 payload = {
188 188 'timestamp': self.dataOut.utctime,
189 189 'data': roundFloats(AVG),
190 190 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
191 191 'interval': self.dataOut.getTimeInterval(),
192 192 'type': self.plottype,
193 193 'yData': yData
194 194 }
195 195 elif self.plottype == 'noise':
196 196 noise = self.dataOut.getNoise()/self.dataOut.normFactor
197 197 noisedB = 10*numpy.log10(noise)
198 198 payload = {
199 199 'timestamp': self.dataOut.utctime,
200 200 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
201 201 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
202 202 'interval': self.dataOut.getTimeInterval(),
203 203 'type': self.plottype,
204 204 'yData': yData
205 205 }
206 206 elif self.plottype == 'snr':
207 207 data = getattr(self.dataOut, 'data_SNR')
208 208 avgdB = 10*numpy.log10(data)
209 209
210 210 ylen = data[0].size
211 211 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
212 212 AVG = [0 for i in self.dataOut.channelList]
213 213 for i in self.dataOut.channelList:
214 214 AVG[i] = avgdB[i][::dy].tolist()
215 215 payload = {
216 216 'timestamp': self.dataOut.utctime,
217 217 'data': roundFloats(AVG),
218 218 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
219 219 'type': self.plottype,
220 220 'yData': yData
221 221 }
222 222 else:
223 223 print "Tipo de grafico invalido"
224 224 payload = {
225 225 'data': 'None',
226 226 'timestamp': 'None',
227 227 'type': None
228 228 }
229 229 # print 'Publishing data to {}'.format(self.host)
230 230 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
231 231
232 232 if self.zeromq is 1:
233 233 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
234 234 self.zmq_socket.send_pyobj(self.dataOut)
235 235
236 236 def run(self, dataOut, **kwargs):
237 237 self.dataOut = dataOut
238 238 if not self.isConfig:
239 239 self.setup(**kwargs)
240 240 self.isConfig = True
241 241
242 242 self.publish_data()
243 243 time.sleep(self.delay)
244 244
245 245 def close(self):
246 246 if self.zeromq is 1:
247 247 self.dataOut.finished = True
248 248 self.zmq_socket.send_pyobj(self.dataOut)
249 249
250 250 if self.client:
251 251 self.client.loop_stop()
252 252 self.client.disconnect()
253 253
254 254
255 255 class ReceiverData(ProcessingUnit, Process):
256 256
257 257 throttle_value = 5
258 258
259 259 def __init__(self, **kwargs):
260 260
261 261 ProcessingUnit.__init__(self, **kwargs)
262 262 Process.__init__(self)
263 263 self.mp = False
264 264 self.isConfig = False
265 265 self.isWebConfig = False
266 266 self.plottypes =[]
267 267 self.connections = 0
268 268 server = kwargs.get('server', 'zmq.pipe')
269 269 plot_server = kwargs.get('plot_server', 'zmq.web')
270 270 if 'tcp://' in server:
271 271 address = server
272 272 else:
273 273 address = 'ipc:///tmp/%s' % server
274 274
275 275 if 'tcp://' in plot_server:
276 276 plot_address = plot_server
277 277 else:
278 278 plot_address = 'ipc:///tmp/%s' % plot_server
279 279
280 280 self.address = address
281 281 self.plot_address = plot_address
282 282 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
283 283 self.realtime = kwargs.get('realtime', False)
284 284 self.throttle_value = kwargs.get('throttle', 10)
285 285 self.sendData = self.initThrottle(self.throttle_value)
286 286 self.setup()
287 287
288 288 def setup(self):
289 289
290 290 self.data = {}
291 291 self.data['times'] = []
292 292 for plottype in self.plottypes:
293 293 self.data[plottype] = {}
294 294 self.data['noise'] = {}
295 295 self.data['throttle'] = self.throttle_value
296 296 self.data['ENDED'] = False
297 297 self.isConfig = True
298 298 self.data_web = {}
299 299
300 300 def event_monitor(self, monitor):
301 301
302 302 events = {}
303 303
304 304 for name in dir(zmq):
305 305 if name.startswith('EVENT_'):
306 306 value = getattr(zmq, name)
307 307 events[value] = name
308 308
309 309 while monitor.poll():
310 310 evt = recv_monitor_message(monitor)
311 311 if evt['event'] == 32:
312 312 self.connections += 1
313 313 if evt['event'] == 512:
314 314 pass
315 315 if self.connections == 0 and self.started is True:
316 316 self.ended = True
317 317 # send('ENDED')
318 318 evt.update({'description': events[evt['event']]})
319 319
320 320 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
321 321 break
322 322 monitor.close()
323 323 print("event monitor thread done!")
324 324
325 325 def initThrottle(self, throttle_value):
326 326
327 327 @throttle(seconds=throttle_value)
328 328 def sendDataThrottled(fn_sender, data):
329 329 fn_sender(data)
330 330
331 331 return sendDataThrottled
332 332
333 333 def send(self, data):
334 334 # print '[sending] data=%s size=%s' % (data.keys(), len(data['times']))
335 335 self.sender.send_pyobj(data)
336 336
337 337 def update(self):
338
339 338 t = self.dataOut.ltctime
340 339 self.data['times'].append(t)
341 340 self.data['dataOut'] = self.dataOut
342 341 for plottype in self.plottypes:
343 342 if plottype == 'spc':
344 343 z = self.dataOut.data_spc/self.dataOut.normFactor
345 344 self.data[plottype] = 10*numpy.log10(z)
346 345 self.data['noise'][t] = 10*numpy.log10(self.dataOut.getNoise()/self.dataOut.normFactor)
347 346 if plottype == 'rti':
348 347 self.data[plottype][t] = self.dataOut.getPower()
349 348 if plottype == 'snr':
350 349 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_SNR)
351 350 if plottype == 'dop':
352 351 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_DOP)
353 352 if plottype == 'coh':
354 353 self.data[plottype][t] = self.dataOut.getCoherence()
355 354 if plottype == 'phase':
356 355 self.data[plottype][t] = self.dataOut.getCoherence(phase=True)
357 356 if self.realtime:
357 <<<<<<< HEAD
358 self.data_web[plottype] = roundFloats(decimate(self.data[plottype][t]).tolist())
359 self.data_web['timestamp'] = t
360 =======
358 361 if plottype == 'spc':
359 362 self.data_web[plottype] = roundFloats(decimate(self.data[plottype]).tolist())
360 363 else:
361 364 self.data_web[plottype] = roundFloats(decimate(self.data[plottype][t]).tolist())
362 365 self.data_web['time'] = t
366 >>>>>>> f65929d2cf32d4dddb2d5fa2a72f3970d4d51812
363 367 self.data_web['interval'] = self.dataOut.getTimeInterval()
364 368 self.data_web['type'] = plottype
365 369
366 370 def run(self):
367 371
368 372 print '[Starting] {} from {}'.format(self.name, self.address)
369 373
370 374 self.context = zmq.Context()
371 375 self.receiver = self.context.socket(zmq.PULL)
372 376 self.receiver.bind(self.address)
373 377 monitor = self.receiver.get_monitor_socket()
374 378 self.sender = self.context.socket(zmq.PUB)
375 379 if self.realtime:
376 380 self.sender_web = self.context.socket(zmq.PUB)
377 381 self.sender_web.connect(self.plot_address)
378 382 time.sleep(1)
379 383 self.sender.bind("ipc:///tmp/zmq.plots")
380 384
381 385 t = Thread(target=self.event_monitor, args=(monitor,))
382 386 t.start()
383 387
384 388 while True:
385 389 self.dataOut = self.receiver.recv_pyobj()
386 390 # print '[Receiving] {} - {}'.format(self.dataOut.type,
387 391 # self.dataOut.datatime.ctime())
388 392
389 393 self.update()
390 394
391 395 if self.dataOut.finished is True:
392 396 self.send(self.data)
393 397 self.connections -= 1
394 398 if self.connections == 0 and self.started:
395 399 self.ended = True
396 400 self.data['ENDED'] = True
397 401 self.send(self.data)
398 402 self.setup()
399 403 else:
400 404 if self.realtime:
401 405 self.send(self.data)
402 406 self.sender_web.send_string(json.dumps(self.data_web))
403 407 else:
404 408 self.sendData(self.send, self.data)
405 409 self.started = True
406 410
407 411 return
408 412
409 413 def sendToWeb(self):
410 414
411 415 if not self.isWebConfig:
412 416 context = zmq.Context()
413 417 sender_web_config = context.socket(zmq.PUB)
414 418 if 'tcp://' in self.plot_address:
415 419 dum, address, port = self.plot_address.split(':')
416 420 conf_address = '{}:{}:{}'.format(dum, address, int(port)+1)
417 421 else:
418 422 conf_address = self.plot_address + '.config'
419 423 sender_web_config.bind(conf_address)
424 <<<<<<< HEAD
425
426 =======
420 427 time.sleep(1)
428 >>>>>>> f65929d2cf32d4dddb2d5fa2a72f3970d4d51812
421 429 for kwargs in self.operationKwargs.values():
422 430 if 'plot' in kwargs:
423 431 print '[Sending] Config data to web for {}'.format(kwargs['code'].upper())
424 432 sender_web_config.send_string(json.dumps(kwargs))
425 433 self.isWebConfig = True
@@ -1,86 +1,87
1 1 import argparse
2 2
3 3 from schainpy.controller import Project, multiSchain
4 4
5 5 desc = "HF_EXAMPLE"
6 6
7 7 def fiber(cursor, skip, q, dt):
8 8
9 9 controllerObj = Project()
10 10
11 11 controllerObj.setup(id='191', name='test01', description=desc)
12 12
13 13 readUnitConfObj = controllerObj.addReadUnit(datatype='SpectraReader',
14 14 path='/home/nanosat/data/hysell_data20/pdata',
15 15 startDate=dt,
16 16 endDate=dt,
17 17 startTime="00:00:00",
18 18 endTime="23:59:59",
19 19 online=0,
20 20 #set=1426485881,
21 21 delay=10,
22 22 walk=1,
23 23 queue=q,
24 24 cursor=cursor,
25 25 skip=skip,
26 26 #timezone=-5*3600
27 27 )
28 28
29 29 # #opObj11 = readUnitConfObj.addOperation(name='printNumberOfBlock')
30 30 #
31 31 procUnitConfObj2 = controllerObj.addProcUnit(datatype='Spectra', inputId=readUnitConfObj.getId())
32 32 # opObj11 = procUnitConfObj2.addParameter(name='pairsList', value='(0,1)', format='pairslist')
33 33 #
34 34 # procUnitConfObj3 = controllerObj.addProcUnit(datatype='ParametersProc', inputId=readUnitConfObj.getId())
35 35 # opObj11 = procUnitConfObj3.addOperation(name='SpectralMoments', optype='other')
36 36
37 37 #
38 38 # opObj11 = procUnitConfObj1.addOperation(name='SpectraPlot', optype='other')
39 39 # opObj11.addParameter(name='id', value='1000', format='int')
40 40 # opObj11.addParameter(name='wintitle', value='HF_Jicamarca_Spc', format='str')
41 41 # opObj11.addParameter(name='channelList', value='0', format='intlist')
42 42 # opObj11.addParameter(name='zmin', value='-120', format='float')
43 43 # opObj11.addParameter(name='zmax', value='-70', format='float')
44 44 # opObj11.addParameter(name='save', value='1', format='int')
45 45 # opObj11.addParameter(name='figpath', value=figpath, format='str')
46 46
47 47 # opObj11 = procUnitConfObj2.addOperation(name='RTIPlot', optype='other')
48 48 # opObj11.addParameter(name='id', value='2000', format='int')
49 49 # opObj11.addParameter(name='wintitzmaxle', value='HF_Jicamarca', format='str')
50 50 # opObj11.addParameter(name='showprofile', value='0', format='int')
51 51 # # opObj11.addParameter(name='channelList', value='0', format='intlist')
52 52 # # opObj11.addParameter(name='xmin', value='0', format='float')
53 53 # opObj11.addParameter(name='xmin', value='0', format='float')
54 54 # opObj11.addParameter(name='xmax', value='24', format='float')
55 55
56 56 # opObj11.addParameter(name='zmin', value='-110', format='float')
57 57 # opObj11.addParameter(name='zmax', value='-70', format='float')
58 58 # opObj11.addParameter(name='save', value='0', format='int')
59 59 # # opObj11.addParameter(name='figpath', value='/tmp/', format='str')
60 60 #
61 61 opObj12 = procUnitConfObj2.addOperation(name='PublishData', optype='other')
62 62 opObj12.addParameter(name='zeromq', value=1, format='int')
63 63
64
64 65 # opObj13 = procUnitConfObj3.addOperation(name='PublishData', optype='other')
65 66 # opObj13.addParameter(name='zeromq', value=1, format='int')
66 67 # opObj13.addParameter(name='server', value="juanca", format='str')
67 68
68 69 opObj12.addParameter(name='delay', value=1, format='int')
69 70
70 71
71 72 # print "Escribiendo el archivo XML"
72 73 # controllerObj.writeXml(filename)
73 74 # print "Leyendo el archivo XML"
74 75 # controllerObj.readXml(filename)
75 76
76 77
77 78 # timeit.timeit('controllerObj.run()', number=2)
78 79
79 80 controllerObj.start()
80 81
81 82
82 83 if __name__ == '__main__':
83 84 parser = argparse.ArgumentParser(description='Set number of parallel processes')
84 85 parser.add_argument('--nProcess', default=1, type=int)
85 86 args = parser.parse_args()
86 87 multiSchain(fiber, nProcess=args.nProcess, startDate='2015/09/26', endDate='2015/09/26')
General Comments 0
You need to be logged in to leave comments. Login now