##// END OF EJS Templates
Test Version
Juan C. Espinoza -
r22:3d86891def25
parent child
Show More
@@ -0,0 +1,9
1 from django.conf.urls import url
2
3 from . import consumers
4
5 websocket_urlpatterns = [
6 url(r'^ws/main/$', consumers.MainConsumer),
7 url(r'^ws/realtime/(?P<code>[^/]+)/(?P<plot>[^/]+)/$', consumers.PlotConsumer),
8 url(r'^ws/database/(?P<code>[^/]+)/(?P<plot>[^/]+)/$', consumers.PlotConsumer),
9 ] No newline at end of file
@@ -1,4 +1,4
1 FROM python:2.7-slim
1 FROM python:3-slim
2 2 RUN mkdir /app
3 3 WORKDIR /app
4 4 ADD requirements.txt ./requirements.txt
@@ -45,7 +45,7 services:
45 45
46 46 mongo:
47 47 container_name: 'realtime_mongo'
48 image: 'mongo:3.3'
48 image: 'mongo:4.0'
49 49 command: '--storageEngine wiredTiger'
50 50 ports:
51 51 - '127.0.0.1:27017:27017'
@@ -6,36 +6,131 import json
6 6 from datetime import datetime, timedelta
7 7
8 8 from pymongo import MongoClient
9 import mongoengine
10 from asgiref.sync import async_to_sync
11 from channels.generic.websocket import WebsocketConsumer
12
13 from plotter.models import Experiment, ExpDetail, PlotMeta, PlotData
9 14
10 from channels.handler import AsgiHandler
11 from channels.auth import channel_session_user
12 from channels import Group
13 15 # Here we create the db named "dbplots"
14 16 host = os.environ.get('HOST_MONGO', 'localhost')
15 CLIENT = MongoClient('{}:27017'.format(host))
16 DB = CLIENT['dbplots']
17
18 # Connected to websocket.connect
19 def ws_connect(message, id, code, plot):
20
21 if id == 'main':
22 Group('main').add(message.reply_channel)
23 print('New main connection')
24 elif id == 'realtime':
25 Group('{}_{}'.format(code, plot)).add(message.reply_channel)
26 print('New connection from: {}, Group: {}_{}'.format(message.content['client'][0], code, plot))
17 mongoengine.connect('dbplots', host=host, port=27017)
18
19 # CLIENT = MongoClient('{}:27017'.format(host))
20 # DB = CLIENT['dbplots']
21
22
23 class MainConsumer(WebsocketConsumer):
24
25 def connect(self):
26 self.group_name = 'main'
27 async_to_sync(self.channel_layer.group_add)(
28 self.group_name,
29 self.channel_name
30 )
31 self.accept()
32
33 def disconnect(self, close_code):
34 async_to_sync(self.channel_layer.group_discard)(
35 self.group_name,
36 self.channel_name
37 )
38
39 def receive(self, text_data):
40 pass
41
42 def zmq_message(self, event):
43 # Send message to WebSocket
44 self.send(text_data=event['message'])
45
46 class PlotConsumer(WebsocketConsumer):
47
48 def connect(self):
49
50 if 'realtime' in self.scope['path']:
51 self.realtime = True
52 self.group_name = '{}_{}'.format(
53 self.scope['url_route']['kwargs']['code'],
54 self.scope['url_route']['kwargs']['plot'],
55 )
56
57 async_to_sync(self.channel_layer.group_add)(
58 self.group_name,
59 self.channel_name
60 )
61 else:
62 self.realtime = False
63 self.accept()
64
65 def disconnect(self, close_code):
66
67 if self.realtime:
68 async_to_sync(self.channel_layer.group_discard)(
69 self.group_name,
70 self.channel_name
71 )
72
73 def receive(self, text_data):
74 ret = {}
75 dt = datetime.strptime(text_data, '%d-%m-%Y')
76 code = self.scope['url_route']['kwargs']['code']
77 plot = self.scope['url_route']['kwargs']['plot']
78 # exp = DB.experiment.find_one({'code': int(code)})
79 # det0 = DB.exp_detail.find_one({'experiment': exp['_id'], 'date': dt-timedelta(days=1)})
80 # det1 = DB.exp_detail.find_one({'experiment': exp['_id'], 'date': dt})
81 exp = Experiment.objects.get(code=code)
82 det0 = ExpDetail.objects(experiment=exp, date=dt-timedelta(days=1))
83 det1 = ExpDetail.objects(experiment=exp, date=dt)
84
85 if det1:
86 meta1 = PlotMeta.objects(exp_detail=det1[0], plot=plot)
87 if meta1:
88 if plot == 'spc':
89 datas = PlotData.objects(plot=meta1[0]).order_by('-time').first()
90 ret['time'] = [datas['time']]
91 ret['spc'] = datas['data']
92 ret['meta'] = dict(meta1[0].metadata)
93 meta = PlotMeta.objects(exp_detail=det1[0], plot='rti')
94 if meta:
95 data = PlotData.objects(plot=meta[0], time=ret['time'][0])
96 if data:
97 ret['rti'] = data[0]['data']
98
99 meta = PlotMeta.objects(exp_detail=det1[0], plot='noise')
100 if meta:
101 data = PlotData.objects(plot=meta[0], time=ret['time'][0])
102 if data:
103 ret['meta']['titles'] = ['{} dB'.format(x) for x in data[0]['data']]
27 104 else:
28 print('New connection from: {}, history, id: {}'.format(message.content['client'][0], id))
29 message.reply_channel.send({
30 'accept': True
31 })
105 last = det1[0]['last_time']
106 metas = [meta1[0]]
107 if det0:
108 meta0 = PlotMeta.objects(exp_detail=det0[0], plot=plot)
109 if meta0:
110 metas.append(meta0[0])
111 datas = PlotData.objects(plot__in=metas, time__gt=last-12*60*60).limit(720)
112 dum = [(d['time'], d['data']) for d in datas]
113 ret['time'] = [d[0] for d in dum]
114 dum = [d[1] for d in dum]
115 ret[plot] = [t for t in map(list, list(zip(*dum)))]
116 ret['meta'] = metas[0].metadata
117
118 # exp.pop('date', None)
119 # exp.pop('_id', None)
120 self.send(json.dumps(ret))
121 else:
122 self.send(json.dumps({'interval': 0}))
123
124 def zmq_message(self, event):
125 # Send message to WebSocket
126 self.send(text_data=event['message'])
32 127
33 128 def ws_message(message, id, code, plot):
34 129 # Accept the incoming connection
35 130 dt = datetime.strptime(str(json.loads(message.content['text'])['date']), '%d/%m/%Y')
36 131 exp0 = DB.exp_meta.find_one({'code': int(code), 'date': dt-timedelta(days=1)})
37 132 exp = DB.exp_meta.find_one({'code': int(code), 'date': dt})
38 print('New request for id={}'.format(id))
133 print(('New request for id={}'.format(id)))
39 134 if exp and plot in exp['plots']:
40 135 if plot == 'spc':
41 136 datas = DB.exp_data.find({'expmeta': exp['_id']}, ['time', 'data']).sort('time', -1).limit(1)[0]
@@ -68,14 +163,10 def ws_message(message, id, code, plot):
68 163 dum = [(d['time'], d['data'][plot]) for d in datas]
69 164 exp['time'] = [d[0] for d in dum]
70 165 dum = [d[1] for d in dum]
71 exp[plot] = [t for t in map(list, zip(*dum))]
166 exp[plot] = [t for t in map(list, list(zip(*dum)))]
72 167
73 168 exp.pop('date', None)
74 169 exp.pop('_id', None)
75 170 message.reply_channel.send({'text': json.dumps(exp)})
76 171 else:
77 172 message.reply_channel.send({'text': json.dumps({'interval': 0})})
78
79 # Connected to websocket.disconnect
80 def ws_disconnect(message, id, code, plot):
81 Group('{}_{}'.format(code, plot)).discard(message.reply_channel)
@@ -1,33 +1,31
1 1 # -*- coding: utf-8 -*-
2 from __future__ import unicode_literals
3 2
4 3 from django.db import models
5 from mongoengine import *
4 from mongoengine import Document, IntField, FloatField, StringField, DictField, ListField, DateTimeField, ReferenceField
6 5
7 6 class Experiment(Document):
8 7 code = IntField(unique=True)
9 8 name = StringField(max_length=40)
10 9
11 class ExpMeta(Document):
12 code = IntField()
10 class ExpDetail(Document):
11 experiment = ReferenceField(Experiment)
13 12 date = DateTimeField()
14 pairs = ListField(default=list)
15 yrange = ListField(FloatField())
16 xrange = ListField(FloatField())
17 interval = FloatField()
18 plots = ListField(StringField())
19 localtime = BooleanField()
13 last_time = FloatField()
20 14
21 meta = {
22 'indexes': [[("code", 1), ("date", 1)]]
23 }
15 def plots(self):
16 return PlotMeta.objects(exp_detail=self)
17
18 class PlotMeta(Document):
19 exp_detail = ReferenceField(ExpDetail)
20 metadata = DictField()
21 plot = StringField()
24 22
25 class ExpData(Document):
26 expmeta = LazyReferenceField(ExpMeta)
23 class PlotData(Document):
24 plot = ReferenceField(PlotMeta)
27 25 time = FloatField()
28 data = DictField()
26 data = ListField()
29 27
30 28 meta = {
31 'indexes': ["expmeta", "+time"]
29 'indexes': ["plot", "+time"]
32 30 }
33 31
@@ -27,7 +27,7 class PcolorBuffer {
27 27 this.lastFunc = null;
28 28 this.zbuffer = [];
29 29 this.xbuffer = [];
30 this.empty = Array(data.yrange.length).fill(null);
30 this.empty = Array(data.meta.yrange.length).fill(null);
31 31 this.props = props;
32 32 this.setup(data);
33 33 }
@@ -37,7 +37,7 class PcolorBuffer {
37 37 if (data.time.length == 1) {
38 38 var values = { 'time': data.time, 'data': data[this.key].map(function (x) { return [x] }) };
39 39 } else {
40 var values = this.fill_gaps(data.time, data[this.key], data.interval, data[this.key].length);
40 var values = this.fill_gaps(data.time, data[this.key], data.meta.interval, data[this.key].length);
41 41 }
42 42 var t = values.time.map(function (x) {
43 43 var a = new Date(x * 1000);
@@ -277,7 +277,6 class Pcolor {
277 277 title: 'Velocity',
278 278 showgrid: false,
279 279 zeroline: false,
280 domain: [0, 0.7],
281 280 linewidth: 2,
282 281 mirror: true,
283 282 size: 12,
@@ -288,17 +287,8 class Pcolor {
288 287 linewidth: 2,
289 288 mirror: 'all',
290 289 size: 12,
291 range: [data.yrange[0], data.yrange.slice(-1)[0]],
290 //range: [data.meta.yrange[0], data.meta.yrange.slice(-1)[0]],
292 291 },
293 xaxis2: {
294 title: 'dB',
295 domain: [0.75, 1],
296 ticks: 'outside',
297 linewidth: 2,
298 mirror: true,
299 size: 12,
300 },
301
302 292 titlefont: {
303 293 size: 14
304 294 },
@@ -314,32 +304,51 class Pcolor {
314 304 this.div.appendChild(iDiv);
315 305 var trace1 = {
316 306 z: data.spc[i],
317 x: data.xrange,
307 y: data.meta.yrange,
308 x: data.meta.xrange,
318 309 colorscale: this.props.colormap || 'Jet',
319 310 transpose: true,
320 311 type: 'heatmap'
321 312 };
322 313
314 if ('rti' in data){
315 layout.xaxis.domain = [0, 0.7];
316 layout.xaxis2 = {
317 title: 'dB',
318 domain: [0.75, 1],
319 ticks: 'outside',
320 linewidth: 2,
321 mirror: true,
322 size: 12,
323 };
323 324 var trace2 = {
324 325 x: data.rti[i],
326 y: data.meta.yrange,
325 327 xaxis: 'x2',
326 328 type: 'scatter',
327 329 };
330 }
328 331
329 332 if (this.props.zmin) {
330 333 trace1.zmin = this.props.zmin
331 334 }
332 335 if (this.props.zmax) {
333 336 trace1.zmax = this.props.zmax;
337 if ('rti' in data){
334 338 layout.xaxis2.range = [this.props.zmin, this.props.zmax]
335 339 }
340 }
336 341
337 342 var t = new Date(data.time * 1000);
338 343 // This condition is used to change from UTC to LT
339 344 if (data.localtime == true){
340 345 t.setTime( t.getTime() + t.getTimezoneOffset()*60*1000 );
341 346 }
342 layout.title = 'Ch ' + i + ': ' + data.noise[i] + 'dB - ' + t.toLocaleString();
347 if ('titles' in data.meta){
348 layout.title = 'Ch ' + i + ': ' + data.meta.titles[i] + ' ' + t.toLocaleString();
349 }else{
350 layout.title = 'Ch ' + i + ': ' + t.toLocaleString();
351 }
343 352 var conf = {
344 353 modeBarButtonsToRemove: ['sendDataToCloud', 'autoScale2d', 'hoverClosestCartesian', 'hoverCompareCartesian', 'lasso2d', 'select2d', 'zoomIn2d', 'zoomOut2d', 'toggleSpikelines'],
345 354 modeBarButtonsToAdd: [{
@@ -354,7 +363,12 class Pcolor {
354 363 displaylogo: false,
355 364 showTips: true
356 365 };
357 Plotly.newPlot('plot-' + i, [trace1, trace2], layout, conf);
366 if ('rti' in data){
367 var traces = [trace1, trace2]
368 }else{
369 var traces = [trace1]
370 }
371 Plotly.newPlot('plot-' + i, traces, layout, conf);
358 372 }
359 373 }
360 374
@@ -445,7 +459,7 class Scatter {
445 459 if (data.time.length == 1) {
446 460 var values = { 'time': data.time, 'data': data[this.key] };
447 461 } else {
448 var values = this.fill_gaps(data.time, data[this.key], data.interval, data[this.key].length);
462 var values = this.fill_gaps(data.time, data[this.key], data.meta.interval, data[this.key].length);
449 463 }
450 464
451 465 var t = values.time.map(function (x) {
@@ -549,10 +563,12 class Scatter {
549 563 for (var i = 1; i < xBuffer.length; i++) {
550 564 var cnt = 0;
551 565 last = x.slice(-1)[0];
566 console.log(Math.abs(parseFloat(xBuffer[i]) - last) + ' '+ parseFloat(interval));
552 567 while (Math.abs(parseFloat(xBuffer[i]) - last) > 1.5 * parseFloat(interval)) {
553 568 cnt += 1;
554 569 last = last + interval;
555 570 x.push(last);
571 console.log('missing ' + new Date(last*1000));
556 572 for (var j = 0; j < N; j++) {
557 573 y[j].push(null);
558 574 }
@@ -24,17 +24,18
24 24
25 25 /* This part create a new socket named "socket" to comunicate
26 26 if there is new data we could be able to change some attributes of a class*/
27 var socket = new WebSocket('ws://' + window.location.host +'/main/9999/none/');
27 var socket = new WebSocket('ws://' + window.location.host +'/ws/main/');
28 28 socket.onopen = function open() {
29 29 console.log('Main WebSockets connection created.');
30 30 };
31 31
32 socket.onmessage = function message(event) {
32 socket.onmessage = function(event) {
33 33 var data = JSON.parse(event.data);
34 34 console.log(data);
35 code = data.code;
36 value = data.value;
37 time = moment(new Date(data.time*1000)).format('hh:mm:ss a');
35 var code = data['code'];
36 console.log(code);
37 var value = data['value'];
38 var time = moment(new Date(data['time']*1000)).format('hh:mm:ss a');
38 39
39 40 /*This conditional ask if value(send by server) is online, and if it is then
40 41 change value to online in div with id="#alert_"+code*/
@@ -39,13 +39,13
39 39 or just update the last data*/
40 40 $("#loader").css("display", "block");
41 41 {% if realtime %}
42 var socket = new WebSocket('ws://' + window.location.host + '/realtime/{{code}}/{{plot}}/');
42 var socket = new WebSocket('ws://' + window.location.host + '/ws/realtime/{{code}}/{{plot}}/');
43 43 {% else %}
44 var socket = new WebSocket('ws://' + window.location.host + '/{{id}}/{{code}}/{{plot}}/');
44 var socket = new WebSocket('ws://' + window.location.host + '/ws/database/{{code}}/{{plot}}/');
45 45 {% endif %}
46 46 socket.onopen = function open() {
47 47 console.log('WebSockets connection created: ' + socket.url);
48 socket.send('{"date": "{{date}}"}')
48 socket.send('{{date}}')
49 49 };
50 50
51 51 socket.onmessage = function message(event) {
@@ -1,6 +1,6
1 1 #!/usr/bin/python
2 2 # -*- coding: UTF-8 -*-
3 from __future__ import unicode_literals
3
4 4
5 5 import os
6 6 import time
@@ -13,7 +13,7 from django.shortcuts import render
13 13
14 14 import mongoengine
15 15
16 from plotter.models import Experiment, ExpMeta, ExpData
16 from plotter.models import Experiment, ExpDetail, PlotMeta, PlotData
17 17
18 18 host = os.environ.get('HOST_MONGO', 'localhost')
19 19 mongoengine.connect('dbplots', host=host, port=27017)
@@ -68,38 +68,37 class SPCSetupForm(forms.Form):
68 68 def main(request):
69 69
70 70 kwargs = {}
71 date = request.GET.get('date', datetime.now().strftime('%d/%m/%Y'))
72 exps = ExpMeta.objects.filter(date=datetime.strptime(date, '%d/%m/%Y'))
71 date = request.GET.get('date', datetime.now().strftime('%d-%m-%Y'))
72 exps = ExpDetail.objects(date=datetime.strptime(date, '%d-%m-%Y'))
73 73 experiments = []
74 74
75 75 for exp in exps:
76 76 dum = {}
77 dum['code'] = exp.code
78 dum['plots'] = exp.plots
79 dum['name'] = Experiment.objects.get(code=exp.code).name
77 dum['code'] = exp.experiment.code
78 dum['plots'] = [plot.plot for plot in exp.plots()]
79 dum['name'] = exp.experiment.name
80 80 dt = datetime.now()
81 data = ExpData.objects(expmeta=exp).order_by('-time')[0] #Get the time from the last data
82 81
83 82 t = time.mktime(dt.timetuple())
84 83
85 if exp['localtime'] == True: #Ask which type of time is coming: LT o UTC
84 if exp.plots()[0]['metadata']['localtime'] == True: #Ask which type of time is coming: LT o UTC
86 85 t -= 5*60*60
87 86 # COnditionals to know which state are my clients
88 if (t-data['time']) > 6*exp['interval']:
87 if (t-exp['last_time']) > 10*60:
89 88 status = 'Offline'
90 89 clase = 'alertas-offline'
91 90 style = 'danger'
92 lastDataDate = data['time']
93 elif (t-data['time']) > 3*exp['interval']:
91 lastDataDate = exp['last_time']
92 elif (t-exp['last_time']) > 5*60:
94 93 status = 'Delayed'
95 94 clase = 'alertas-delayed'
96 95 style = 'warning'
97 lastDataDate = data['time']
96 lastDataDate = exp['last_time']
98 97 else:
99 98 status = 'Online'
100 99 clase = 'alertas-online'
101 100 style = 'success'
102 lastDataDate = data['time']
101 lastDataDate = exp['last_time']
103 102
104 103 dum['status'] = status
105 104 dum['class'] = clase
@@ -121,16 +120,17 def plot(request, code=None, plot=None):
121 120 realtime = False
122 121 date = request.GET.get('date', None)
123 122 if date is None:
124 date = datetime.now().strftime('%d/%m/%Y')
123 date = datetime.now().strftime('%d-%m-%Y')
125 124 realtime = True
126 125 exp = Experiment.objects.get(code=int(code))
127 expmeta = ExpMeta.objects.get(code=int(code), date=datetime.strptime(date, '%d/%m/%Y'))
126 detail = ExpDetail.objects.get(experiment=exp, date=datetime.strptime(date, '%d-%m-%Y'))
127 meta = PlotMeta.objects.get(exp_detail=detail, plot=plot)
128 128
129 129 kwargs = {
130 130 'code': code,
131 131 'plot': plot,
132 132 'date': date,
133 'id': expmeta.id,
133 'id': meta.pk,
134 134 'realtime': realtime,
135 135 'title': exp.name,
136 136 }
@@ -1,8 +1,11
1 from channels.routing import route
2 from plotter.consumers import ws_connect, ws_disconnect, ws_message
1 from channels.auth import AuthMiddlewareStack
2 from channels.routing import ProtocolTypeRouter, URLRouter
3 import plotter.routing
3 4
4 channel_routing = [
5 route("websocket.connect", ws_connect, path=r'^/(?P<id>[a-z]+)/(?P<code>[0-9]+)/(?P<plot>[a-z]+)/$'),
6 route("websocket.receive", ws_message, path=r'^/(?P<id>[a-z]+)/(?P<code>[0-9]+)/(?P<plot>[a-z]+)/$'),
7 route("websocket.disconnect", ws_disconnect, path=r'^/(?P<id>[a-z]+)/(?P<code>[0-9]+)/(?P<plot>[a-z]+)/$'),
8 ] No newline at end of file
5 application = ProtocolTypeRouter({
6 'websocket': AuthMiddlewareStack(
7 URLRouter(
8 plotter.routing.websocket_urlpatterns
9 )
10 ),
11 }) No newline at end of file
@@ -127,10 +127,11 host = os.environ.get('HOST_REDIS', '127.0.0.1')
127 127
128 128 CHANNEL_LAYERS = {
129 129 "default": {
130 "BACKEND": "asgi_redis.RedisChannelLayer",
130 'BACKEND': 'channels_redis.core.RedisChannelLayer',
131 131 "CONFIG": {
132 132 "hosts": [(host, 6379)],
133 133 },
134 "ROUTING": "realtime.routing.channel_routing",
135 134 },
136 135 }
136
137 ASGI_APPLICATION = "realtime.routing.application"
@@ -1,10 +1,10
1 asgi-redis==1.4.3
2 Django==1.11.7
1 Django
3 2 django-bootstrap4
4 channels==1.1.8
5 mongoengine==0.15.0
6 pymongo==3.5.1
7 pyzmq==16.0.3
8 redis==2.10.6
9 requests==2.18.4
10 simplejson==3.12.0
3 channels
4 channels_redis
5 mongoengine
6 pymongo
7 pyzmq
8 redis
9 requests
10 simplejson No newline at end of file
@@ -13,17 +13,17 REQUEST_TIMEOUT = 5000
13 13 RETRIES = 5
14 14 SERVER_ENDPOINT = 'tcp://localhost:4444'
15 15
16 context = zmq.Context()
17 socket = context.socket(zmq.REQ)
18 socket.connect (SERVER_ENDPOINT)
19 poll = zmq.Poller()
20 poll.register(socket, zmq.POLLIN)
16
21 17
22 18 def send(dato):
23 19 '''
24 20 Function to send data to server
25 21 '''
26 global socket, poll
22 context = zmq.Context()
23 socket = context.socket(zmq.REQ)
24 socket.connect (SERVER_ENDPOINT)
25 poll = zmq.Poller()
26 poll.register(socket, zmq.POLLIN)
27 27 retries = RETRIES
28 28 while True:
29 29 socket.send_json(dato)
@@ -31,12 +31,12 def send(dato):
31 31 if socks.get(socket) == zmq.POLLIN:
32 32 reply = socket.recv_string()
33 33 if reply == 'ok':
34 print('Server replied (%s)' % reply)
34 print(('Server replied (%s)' % reply))
35 35 break
36 36 else:
37 print('Malformed reply from server: %s' % reply)
37 print(('Malformed reply from server: %s' % reply))
38 38 else:
39 print('No response from server, retries left {}'.format(retries))
39 print(('No response from server, retries left {}'.format(retries)))
40 40 socket.setsockopt(zmq.LINGER, 0)
41 41 socket.close()
42 42 poll.unregister(socket)
@@ -62,25 +62,31 def main(realtime, code, date=None, interval=30):
62 62 else:
63 63 dt = date
64 64
65 data = {
66 'spc': np.round(np.random.rand(4, 60, 100)*5 + 10, 2).tolist(),
67 'rti': np.round(np.random.rand(4, 100)*5 + 10, 2).tolist(),
68 'noise': np.round(np.random.rand(4) + np.array([10,11,12,13]), 2).tolist()
69 }
70
65 71 while True:
66 72
67 print('Sending {} - {}'.format(code, dt))
73 print(('Sending {} - {}'.format(code, dt)))
68 74
69 75 dato = {
70 76 'time': time.mktime(dt.timetuple()),
71 'yrange': np.arange(100).tolist(),
77 'metadata':{
78 'yrange': np.arange(80, 120, 40/100.).tolist(),
72 79 'xrange': np.arange(-30, 30).tolist(),
73 80 'localtime': True,
74 'interval': interval,
81 'interval': interval
82 },
75 83 'exp_code': code,
76 'data': {
77 'noise': np.round(np.random.rand(4) + np.array([10,11,12,13]), 2).tolist(),
78 'rti': np.round(np.random.rand(4, 100)*5 + 10, 2).tolist(),
79 'spc': np.round(np.random.rand(4, 60, 100)*5 + 10, 2).tolist(),
80 }
81 84 }
82 85
83 86 dt = dt + timedelta(seconds=interval)
87 for plot, d in data.items():
88 dato['plot'] = plot
89 dato['data'] = d
84 90 t = Thread(target=send, args=(dato, ))
85 91 t.start()
86 92 if realtime:
@@ -92,7 +98,7 if __name__ == '__main__':
92 98 parser = argparse.ArgumentParser(description='This is a Client for realtime app')
93 99 parser.add_argument('--date', action='store', default=None, help='format: 2018/02/13 12:23:00')
94 100 parser.add_argument('-r', action='store_true', dest='realtime', default=None)
95 parser.add_argument('-c', action='store', dest='code', default='170')
101 parser.add_argument('-c', action='store', dest='code', default='172')
96 102 parser.add_argument('-i', action='store', dest='interval', type=int, default=30)
97 103
98 104 results = parser.parse_args()
@@ -100,5 +106,5 if __name__ == '__main__':
100 106 try:
101 107 results.date = datetime.strptime(results.date, '%Y/%m/%d %H:%M:%S')
102 108 except:
103 raise(NameError('You must specify a date (--date) for non-realtime experiment'))
109 raise NameError('You must specify a date (--date) for non-realtime experiment')
104 110 main(results.realtime, results.code, results.date, results.interval)
@@ -8,21 +8,21 import simplejson
8 8 from datetime import datetime
9 9 import time
10 10 import zmq
11 import redis
12 import asgi_redis
13 11 import mongoengine
14 12 from threading import Thread
15 13
16 14 sys.path.append(os.environ.get('APP_DIR', '../'))
17 15 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings")
18 16
19 from plotter.models import Experiment, ExpMeta, ExpData
17 from plotter.models import Experiment, ExpDetail, PlotMeta, PlotData
20 18
21 19 host_mongo = os.environ.get('HOST_MONGO', 'localhost')
22 20 mongoengine.connect('dbplots', host=host_mongo, port=27017)
23 21
24 host_redis = os.environ.get('HOST_REDIS', 'localhost')
25 channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)])
22 import channels.layers
23 from asgiref.sync import async_to_sync
24
25 channel = channels.layers.get_channel_layer()
26 26
27 27 def loaddata():
28 28 print('Loading Experiments...')
@@ -31,7 +31,7 def loaddata():
31 31 else:
32 32 file_exp = './experiments.json'
33 33 for tup in json.load(open(file_exp)):
34 print(tup['name'])
34 print(tup)
35 35 exp = Experiment.objects(code=tup['code']).modify(
36 36 upsert=True, # To add a new row
37 37 new=True,
@@ -43,29 +43,31 def loaddata():
43 43 #============== funcion para modificar datos en la tabla ==============
44 44 def update(buffer):
45 45 dt = datetime.utcfromtimestamp(buffer['time'])
46 print('Updating code={} date={} {}'.format(buffer['exp_code'], dt, datetime.now()))
47 exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify(
48 upsert=True, # To add a new row
46 exp = Experiment.objects.get(code=buffer['exp_code'])
47
48 detail = ExpDetail.objects(experiment=exp, date=dt.date()).modify(
49 upsert=True,
49 50 new=True,
50 set__code=buffer['exp_code'],
51 set__experiment=exp,
51 52 set__date=dt.date(),
52 set__yrange = buffer['yrange'],
53 set__xrange = buffer['xrange'],
54 set__interval = buffer['interval'],
55 set__localtime = buffer['localtime'],
56 set__plots = buffer['data'].keys()
53 set__last_time = buffer['time']
57 54 )
58 exp.save()
59 55
60 data = ExpData.objects(expmeta=exp, time=buffer['time']).modify(
56 plot = PlotMeta.objects(exp_detail=detail, plot=buffer['plot']).modify(
57 upsert=True,
58 new=True,
59 set__metadata = buffer['metadata']
60 )
61 #plot.save()
62
63 data = PlotData.objects(plot=plot, time=buffer['time']).modify(
61 64 upsert=True, # To add a new row
62 65 new=True,
63 set__expmeta = exp,
64 66 set__time = buffer['time'],
65 67 set__data = buffer['data']
66 68 )
67 69
68 data.save()
70 #data.save()
69 71
70 72 if datetime.now().date() == dt.date():
71 73 return True
@@ -77,28 +79,44 def check_times():
77 79
78 80 while True:
79 81 dt = datetime.now()
80 exps = ExpMeta.objects(date=dt.date())
81
82 for exp in exps:
83 data = ExpData.objects(expmeta=exp).order_by('-time')[0] #me quedo con el ultimo time del ultimo valor
82 exps = ExpDetail.objects(date=dt.date())
84 83
84 for detail in exps:
85 code = detail.experiment.code
86 plot = detail.plots()[0]
87 data_time = detail['last_time']
85 88 t = time.mktime(dt.timetuple())
86 89
87 if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC
90 if plot['metadata']['localtime'] == True:
88 91 t -= 5*60*60
89 92
90 if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC
91 data_time = data['time'] + 5*60*60
93 if plot['metadata']['localtime'] == True:
94 data_time = detail['last_time'] + 5*60*60
92 95
93 if (t-data['time']) > 6*exp['interval']:
94 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'danger', 'time': data_time})})
95 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'offline'))
96 elif (t-data['time']) > 3*exp['interval']:
97 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'warning', 'time': data_time})})
98 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'delayed'))
96 message = {
97 'code': code,
98 'time': data_time
99 }
100
101 if (t-detail['last_time']) > 10*60:
102 value = 'danger'
103 print(('{} {} {} {} {}'.format(code, t, detail['last_time'], (t-detail['last_time']), 'offline')))
104 elif (t-detail['last_time']) > 5*60:
105 value = 'warning'
106 print(('{} {} {} {} {}'.format(code, t, detail['last_time'], (t-detail['last_time']), 'delayed')))
99 107 else:
100 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'success', 'time': data_time})})
101 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'online'))
108 value = 'success'
109
110 message['value'] = value
111
112 async_to_sync(channel.group_send)(
113 'main',
114 {
115 'type': 'zmq_message',
116 'message': json.dumps(message)
117 }
118 )
119
102 120 time.sleep(30)
103 121
104 122 def main():
@@ -113,7 +131,7 def main():
113 131
114 132 buffer = receiver.recv_json()
115 133
116 if buffer['localtime'] == True: # Ask which type of time is coming: LT o UTC
134 if buffer['metadata']['localtime'] == True: # Ask which type of time is coming: LT o UTC
117 135 buffer['time'] -= 5*60*60
118 136
119 137 if not isinstance(buffer, dict):
@@ -123,25 +141,29 def main():
123 141 if not update(buffer):
124 142 receiver.send_string('ok')
125 143 continue
126 print("==================================")
127 for plot in buffer['data']:
128 dum = buffer.copy()
129 dum['time'] = [buffer['time']]
130 if plot=='noise':
131 dum[plot] = [[x] for x in buffer['data'][plot]]
132 elif plot=='spc':
133 dum['noise'] = [[x] for x in buffer['data']['noise']]
134 dum['spc'] = buffer['data']['spc']
135 dum['rti'] = buffer['data']['rti']
136 else:
137 dum[plot] = buffer['data'][plot]
138 dum.pop('data')
139 exp_code = dum.pop('exp_code')
140 channel.send_group(
141 u'{}_{}'.format(exp_code, plot),
142 {'text': simplejson.dumps(dum, ignore_nan=True)}
143 )
144 print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum))))
144 # for plot in buffer['data']:
145 # dum = buffer.copy()
146 # dum['time'] = [buffer['time']]
147 # if plot=='noise':
148 # dum[plot] = [[x] for x in buffer['data'][plot]]
149 # elif plot=='spc':
150 # dum['noise'] = [[x] for x in buffer['data']['noise']]
151 # dum['spc'] = buffer['data']['spc']
152 # dum['rti'] = buffer['data']['rti']
153 # else:
154 # dum[plot] = buffer['data'][plot]
155 # dum.pop('data')
156 # exp_code = dum.pop('exp_code')
157 # group = '{}_{}'.format(exp_code, plot)
158 # async_to_sync(channel.group_send)(
159 # group,
160 # {
161 # 'type': 'zmq_message',
162 # 'message': simplejson.dumps(dum, ignore_nan=True)
163 # }
164 # )
165
166 # print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum))))
145 167
146 168 receiver.send_string('ok')
147 169
General Comments 0
You need to be logged in to leave comments. Login now