diff --git a/Dockerfile b/Dockerfile index 8c6548e..d6dc0c1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ - FROM python:2.7-slim + FROM python:3-slim RUN mkdir /app WORKDIR /app ADD requirements.txt ./requirements.txt diff --git a/docker-compose.yml b/docker-compose.yml index 2fec989..5b4bd2a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -45,7 +45,7 @@ services: mongo: container_name: 'realtime_mongo' - image: 'mongo:3.3' + image: 'mongo:4.0' command: '--storageEngine wiredTiger' ports: - '127.0.0.1:27017:27017' diff --git a/plotter/consumers.py b/plotter/consumers.py index 58a111e..b17f8c7 100644 --- a/plotter/consumers.py +++ b/plotter/consumers.py @@ -6,36 +6,131 @@ import json from datetime import datetime, timedelta from pymongo import MongoClient +import mongoengine +from asgiref.sync import async_to_sync +from channels.generic.websocket import WebsocketConsumer + +from plotter.models import Experiment, ExpDetail, PlotMeta, PlotData -from channels.handler import AsgiHandler -from channels.auth import channel_session_user -from channels import Group # Here we create the db named "dbplots" host = os.environ.get('HOST_MONGO', 'localhost') -CLIENT = MongoClient('{}:27017'.format(host)) -DB = CLIENT['dbplots'] - -# Connected to websocket.connect -def ws_connect(message, id, code, plot): - - if id == 'main': - Group('main').add(message.reply_channel) - print('New main connection') - elif id == 'realtime': - Group('{}_{}'.format(code, plot)).add(message.reply_channel) - print('New connection from: {}, Group: {}_{}'.format(message.content['client'][0], code, plot)) - else: - print('New connection from: {}, history, id: {}'.format(message.content['client'][0], id)) - message.reply_channel.send({ - 'accept': True - }) +mongoengine.connect('dbplots', host=host, port=27017) + +# CLIENT = MongoClient('{}:27017'.format(host)) +# DB = CLIENT['dbplots'] + + +class MainConsumer(WebsocketConsumer): + + def connect(self): + self.group_name = 'main' + async_to_sync(self.channel_layer.group_add)( + self.group_name, + self.channel_name + ) + self.accept() + + def disconnect(self, close_code): + async_to_sync(self.channel_layer.group_discard)( + self.group_name, + self.channel_name + ) + + def receive(self, text_data): + pass + + def zmq_message(self, event): + # Send message to WebSocket + self.send(text_data=event['message']) + +class PlotConsumer(WebsocketConsumer): + + def connect(self): + + if 'realtime' in self.scope['path']: + self.realtime = True + self.group_name = '{}_{}'.format( + self.scope['url_route']['kwargs']['code'], + self.scope['url_route']['kwargs']['plot'], + ) + + async_to_sync(self.channel_layer.group_add)( + self.group_name, + self.channel_name + ) + else: + self.realtime = False + self.accept() + + def disconnect(self, close_code): + + if self.realtime: + async_to_sync(self.channel_layer.group_discard)( + self.group_name, + self.channel_name + ) + + def receive(self, text_data): + ret = {} + dt = datetime.strptime(text_data, '%d-%m-%Y') + code = self.scope['url_route']['kwargs']['code'] + plot = self.scope['url_route']['kwargs']['plot'] + # exp = DB.experiment.find_one({'code': int(code)}) + # det0 = DB.exp_detail.find_one({'experiment': exp['_id'], 'date': dt-timedelta(days=1)}) + # det1 = DB.exp_detail.find_one({'experiment': exp['_id'], 'date': dt}) + exp = Experiment.objects.get(code=code) + det0 = ExpDetail.objects(experiment=exp, date=dt-timedelta(days=1)) + det1 = ExpDetail.objects(experiment=exp, date=dt) + + if det1: + meta1 = PlotMeta.objects(exp_detail=det1[0], plot=plot) + if meta1: + if plot == 'spc': + datas = PlotData.objects(plot=meta1[0]).order_by('-time').first() + ret['time'] = [datas['time']] + ret['spc'] = datas['data'] + ret['meta'] = dict(meta1[0].metadata) + meta = PlotMeta.objects(exp_detail=det1[0], plot='rti') + if meta: + data = PlotData.objects(plot=meta[0], time=ret['time'][0]) + if data: + ret['rti'] = data[0]['data'] + + meta = PlotMeta.objects(exp_detail=det1[0], plot='noise') + if meta: + data = PlotData.objects(plot=meta[0], time=ret['time'][0]) + if data: + ret['meta']['titles'] = ['{} dB'.format(x) for x in data[0]['data']] + else: + last = det1[0]['last_time'] + metas = [meta1[0]] + if det0: + meta0 = PlotMeta.objects(exp_detail=det0[0], plot=plot) + if meta0: + metas.append(meta0[0]) + datas = PlotData.objects(plot__in=metas, time__gt=last-12*60*60).limit(720) + dum = [(d['time'], d['data']) for d in datas] + ret['time'] = [d[0] for d in dum] + dum = [d[1] for d in dum] + ret[plot] = [t for t in map(list, list(zip(*dum)))] + ret['meta'] = metas[0].metadata + + # exp.pop('date', None) + # exp.pop('_id', None) + self.send(json.dumps(ret)) + else: + self.send(json.dumps({'interval': 0})) + + def zmq_message(self, event): + # Send message to WebSocket + self.send(text_data=event['message']) def ws_message(message, id, code, plot): # Accept the incoming connection dt = datetime.strptime(str(json.loads(message.content['text'])['date']), '%d/%m/%Y') exp0 = DB.exp_meta.find_one({'code': int(code), 'date': dt-timedelta(days=1)}) exp = DB.exp_meta.find_one({'code': int(code), 'date': dt}) - print('New request for id={}'.format(id)) + print(('New request for id={}'.format(id))) if exp and plot in exp['plots']: if plot == 'spc': datas = DB.exp_data.find({'expmeta': exp['_id']}, ['time', 'data']).sort('time', -1).limit(1)[0] @@ -68,14 +163,10 @@ def ws_message(message, id, code, plot): dum = [(d['time'], d['data'][plot]) for d in datas] exp['time'] = [d[0] for d in dum] dum = [d[1] for d in dum] - exp[plot] = [t for t in map(list, zip(*dum))] + exp[plot] = [t for t in map(list, list(zip(*dum)))] exp.pop('date', None) exp.pop('_id', None) message.reply_channel.send({'text': json.dumps(exp)}) else: message.reply_channel.send({'text': json.dumps({'interval': 0})}) - -# Connected to websocket.disconnect -def ws_disconnect(message, id, code, plot): - Group('{}_{}'.format(code, plot)).discard(message.reply_channel) diff --git a/plotter/models.py b/plotter/models.py index fe12117..68d1ac2 100644 --- a/plotter/models.py +++ b/plotter/models.py @@ -1,33 +1,31 @@ # -*- coding: utf-8 -*- -from __future__ import unicode_literals from django.db import models -from mongoengine import * +from mongoengine import Document, IntField, FloatField, StringField, DictField, ListField, DateTimeField, ReferenceField class Experiment(Document): code = IntField(unique=True) name = StringField(max_length=40) -class ExpMeta(Document): - code = IntField() +class ExpDetail(Document): + experiment = ReferenceField(Experiment) date = DateTimeField() - pairs = ListField(default=list) - yrange = ListField(FloatField()) - xrange = ListField(FloatField()) - interval = FloatField() - plots = ListField(StringField()) - localtime = BooleanField() + last_time = FloatField() - meta = { - 'indexes': [[("code", 1), ("date", 1)]] - } + def plots(self): + return PlotMeta.objects(exp_detail=self) + +class PlotMeta(Document): + exp_detail = ReferenceField(ExpDetail) + metadata = DictField() + plot = StringField() -class ExpData(Document): - expmeta = LazyReferenceField(ExpMeta) +class PlotData(Document): + plot = ReferenceField(PlotMeta) time = FloatField() - data = DictField() + data = ListField() meta = { - 'indexes': ["expmeta", "+time"] + 'indexes': ["plot", "+time"] } diff --git a/plotter/routing.py b/plotter/routing.py new file mode 100644 index 0000000..45c8498 --- /dev/null +++ b/plotter/routing.py @@ -0,0 +1,9 @@ +from django.conf.urls import url + +from . import consumers + +websocket_urlpatterns = [ + url(r'^ws/main/$', consumers.MainConsumer), + url(r'^ws/realtime/(?P[^/]+)/(?P[^/]+)/$', consumers.PlotConsumer), + url(r'^ws/database/(?P[^/]+)/(?P[^/]+)/$', consumers.PlotConsumer), +] \ No newline at end of file diff --git a/plotter/static/js/jroplots.js b/plotter/static/js/jroplots.js index 6c0cfb0..8919e2f 100644 --- a/plotter/static/js/jroplots.js +++ b/plotter/static/js/jroplots.js @@ -27,7 +27,7 @@ class PcolorBuffer { this.lastFunc = null; this.zbuffer = []; this.xbuffer = []; - this.empty = Array(data.yrange.length).fill(null); + this.empty = Array(data.meta.yrange.length).fill(null); this.props = props; this.setup(data); } @@ -37,7 +37,7 @@ class PcolorBuffer { if (data.time.length == 1) { var values = { 'time': data.time, 'data': data[this.key].map(function (x) { return [x] }) }; } else { - var values = this.fill_gaps(data.time, data[this.key], data.interval, data[this.key].length); + var values = this.fill_gaps(data.time, data[this.key], data.meta.interval, data[this.key].length); } var t = values.time.map(function (x) { var a = new Date(x * 1000); @@ -277,7 +277,6 @@ class Pcolor { title: 'Velocity', showgrid: false, zeroline: false, - domain: [0, 0.7], linewidth: 2, mirror: true, size: 12, @@ -288,17 +287,8 @@ class Pcolor { linewidth: 2, mirror: 'all', size: 12, - range: [data.yrange[0], data.yrange.slice(-1)[0]], + //range: [data.meta.yrange[0], data.meta.yrange.slice(-1)[0]], }, - xaxis2: { - title: 'dB', - domain: [0.75, 1], - ticks: 'outside', - linewidth: 2, - mirror: true, - size: 12, - }, - titlefont: { size: 14 }, @@ -314,24 +304,39 @@ class Pcolor { this.div.appendChild(iDiv); var trace1 = { z: data.spc[i], - x: data.xrange, + y: data.meta.yrange, + x: data.meta.xrange, colorscale: this.props.colormap || 'Jet', transpose: true, type: 'heatmap' }; - var trace2 = { - x: data.rti[i], - xaxis: 'x2', - type: 'scatter', - }; + if ('rti' in data){ + layout.xaxis.domain = [0, 0.7]; + layout.xaxis2 = { + title: 'dB', + domain: [0.75, 1], + ticks: 'outside', + linewidth: 2, + mirror: true, + size: 12, + }; + var trace2 = { + x: data.rti[i], + y: data.meta.yrange, + xaxis: 'x2', + type: 'scatter', + }; + } if (this.props.zmin) { trace1.zmin = this.props.zmin } if (this.props.zmax) { trace1.zmax = this.props.zmax; - layout.xaxis2.range = [this.props.zmin, this.props.zmax] + if ('rti' in data){ + layout.xaxis2.range = [this.props.zmin, this.props.zmax] + } } var t = new Date(data.time * 1000); @@ -339,7 +344,11 @@ class Pcolor { if (data.localtime == true){ t.setTime( t.getTime() + t.getTimezoneOffset()*60*1000 ); } - layout.title = 'Ch ' + i + ': ' + data.noise[i] + 'dB - ' + t.toLocaleString(); + if ('titles' in data.meta){ + layout.title = 'Ch ' + i + ': ' + data.meta.titles[i] + ' ' + t.toLocaleString(); + }else{ + layout.title = 'Ch ' + i + ': ' + t.toLocaleString(); + } var conf = { modeBarButtonsToRemove: ['sendDataToCloud', 'autoScale2d', 'hoverClosestCartesian', 'hoverCompareCartesian', 'lasso2d', 'select2d', 'zoomIn2d', 'zoomOut2d', 'toggleSpikelines'], modeBarButtonsToAdd: [{ @@ -354,7 +363,12 @@ class Pcolor { displaylogo: false, showTips: true }; - Plotly.newPlot('plot-' + i, [trace1, trace2], layout, conf); + if ('rti' in data){ + var traces = [trace1, trace2] + }else{ + var traces = [trace1] + } + Plotly.newPlot('plot-' + i, traces, layout, conf); } } @@ -445,7 +459,7 @@ class Scatter { if (data.time.length == 1) { var values = { 'time': data.time, 'data': data[this.key] }; } else { - var values = this.fill_gaps(data.time, data[this.key], data.interval, data[this.key].length); + var values = this.fill_gaps(data.time, data[this.key], data.meta.interval, data[this.key].length); } var t = values.time.map(function (x) { @@ -549,10 +563,12 @@ class Scatter { for (var i = 1; i < xBuffer.length; i++) { var cnt = 0; last = x.slice(-1)[0]; + console.log(Math.abs(parseFloat(xBuffer[i]) - last) + ' '+ parseFloat(interval)); while (Math.abs(parseFloat(xBuffer[i]) - last) > 1.5 * parseFloat(interval)) { cnt += 1; last = last + interval; x.push(last); + console.log('missing ' + new Date(last*1000)); for (var j = 0; j < N; j++) { y[j].push(null); } diff --git a/plotter/templates/home.html b/plotter/templates/home.html index 049e02f..5a0af32 100644 --- a/plotter/templates/home.html +++ b/plotter/templates/home.html @@ -24,17 +24,18 @@ /* This part create a new socket named "socket" to comunicate if there is new data we could be able to change some attributes of a class*/ - var socket = new WebSocket('ws://' + window.location.host +'/main/9999/none/'); + var socket = new WebSocket('ws://' + window.location.host +'/ws/main/'); socket.onopen = function open() { console.log('Main WebSockets connection created.'); }; - socket.onmessage = function message(event) { + socket.onmessage = function(event) { var data = JSON.parse(event.data); console.log(data); - code = data.code; - value = data.value; - time = moment(new Date(data.time*1000)).format('hh:mm:ss a'); + var code = data['code']; + console.log(code); + var value = data['value']; + var time = moment(new Date(data['time']*1000)).format('hh:mm:ss a'); /*This conditional ask if value(send by server) is online, and if it is then change value to online in div with id="#alert_"+code*/ diff --git a/plotter/templates/plot.html b/plotter/templates/plot.html index a113768..b9dffb5 100644 --- a/plotter/templates/plot.html +++ b/plotter/templates/plot.html @@ -39,13 +39,13 @@ or just update the last data*/ $("#loader").css("display", "block"); {% if realtime %} - var socket = new WebSocket('ws://' + window.location.host + '/realtime/{{code}}/{{plot}}/'); + var socket = new WebSocket('ws://' + window.location.host + '/ws/realtime/{{code}}/{{plot}}/'); {% else %} - var socket = new WebSocket('ws://' + window.location.host + '/{{id}}/{{code}}/{{plot}}/'); + var socket = new WebSocket('ws://' + window.location.host + '/ws/database/{{code}}/{{plot}}/'); {% endif %} socket.onopen = function open() { console.log('WebSockets connection created: ' + socket.url); - socket.send('{"date": "{{date}}"}') + socket.send('{{date}}') }; socket.onmessage = function message(event) { diff --git a/plotter/views.py b/plotter/views.py index da68895..da8931b 100644 --- a/plotter/views.py +++ b/plotter/views.py @@ -1,6 +1,6 @@ #!/usr/bin/python # -*- coding: UTF-8 -*- -from __future__ import unicode_literals + import os import time @@ -13,7 +13,7 @@ from django.shortcuts import render import mongoengine -from plotter.models import Experiment, ExpMeta, ExpData +from plotter.models import Experiment, ExpDetail, PlotMeta, PlotData host = os.environ.get('HOST_MONGO', 'localhost') mongoengine.connect('dbplots', host=host, port=27017) @@ -68,38 +68,37 @@ class SPCSetupForm(forms.Form): def main(request): kwargs = {} - date = request.GET.get('date', datetime.now().strftime('%d/%m/%Y')) - exps = ExpMeta.objects.filter(date=datetime.strptime(date, '%d/%m/%Y')) + date = request.GET.get('date', datetime.now().strftime('%d-%m-%Y')) + exps = ExpDetail.objects(date=datetime.strptime(date, '%d-%m-%Y')) experiments = [] for exp in exps: dum = {} - dum['code'] = exp.code - dum['plots'] = exp.plots - dum['name'] = Experiment.objects.get(code=exp.code).name + dum['code'] = exp.experiment.code + dum['plots'] = [plot.plot for plot in exp.plots()] + dum['name'] = exp.experiment.name dt = datetime.now() - data = ExpData.objects(expmeta=exp).order_by('-time')[0] #Get the time from the last data t = time.mktime(dt.timetuple()) - if exp['localtime'] == True: #Ask which type of time is coming: LT o UTC + if exp.plots()[0]['metadata']['localtime'] == True: #Ask which type of time is coming: LT o UTC t -= 5*60*60 # COnditionals to know which state are my clients - if (t-data['time']) > 6*exp['interval']: + if (t-exp['last_time']) > 10*60: status = 'Offline' clase = 'alertas-offline' style = 'danger' - lastDataDate = data['time'] - elif (t-data['time']) > 3*exp['interval']: + lastDataDate = exp['last_time'] + elif (t-exp['last_time']) > 5*60: status = 'Delayed' clase = 'alertas-delayed' style = 'warning' - lastDataDate = data['time'] + lastDataDate = exp['last_time'] else: status = 'Online' clase = 'alertas-online' style = 'success' - lastDataDate = data['time'] + lastDataDate = exp['last_time'] dum['status'] = status dum['class'] = clase @@ -121,16 +120,17 @@ def plot(request, code=None, plot=None): realtime = False date = request.GET.get('date', None) if date is None: - date = datetime.now().strftime('%d/%m/%Y') + date = datetime.now().strftime('%d-%m-%Y') realtime = True exp = Experiment.objects.get(code=int(code)) - expmeta = ExpMeta.objects.get(code=int(code), date=datetime.strptime(date, '%d/%m/%Y')) + detail = ExpDetail.objects.get(experiment=exp, date=datetime.strptime(date, '%d-%m-%Y')) + meta = PlotMeta.objects.get(exp_detail=detail, plot=plot) kwargs = { 'code': code, 'plot': plot, 'date': date, - 'id': expmeta.id, + 'id': meta.pk, 'realtime': realtime, 'title': exp.name, } diff --git a/realtime/routing.py b/realtime/routing.py index ecee0ee..e441c24 100644 --- a/realtime/routing.py +++ b/realtime/routing.py @@ -1,8 +1,11 @@ -from channels.routing import route -from plotter.consumers import ws_connect, ws_disconnect, ws_message +from channels.auth import AuthMiddlewareStack +from channels.routing import ProtocolTypeRouter, URLRouter +import plotter.routing -channel_routing = [ - route("websocket.connect", ws_connect, path=r'^/(?P[a-z]+)/(?P[0-9]+)/(?P[a-z]+)/$'), - route("websocket.receive", ws_message, path=r'^/(?P[a-z]+)/(?P[0-9]+)/(?P[a-z]+)/$'), - route("websocket.disconnect", ws_disconnect, path=r'^/(?P[a-z]+)/(?P[0-9]+)/(?P[a-z]+)/$'), -] \ No newline at end of file +application = ProtocolTypeRouter({ + 'websocket': AuthMiddlewareStack( + URLRouter( + plotter.routing.websocket_urlpatterns + ) + ), +}) \ No newline at end of file diff --git a/realtime/settings.py b/realtime/settings.py index cf64267..0dba516 100644 --- a/realtime/settings.py +++ b/realtime/settings.py @@ -127,10 +127,11 @@ host = os.environ.get('HOST_REDIS', '127.0.0.1') CHANNEL_LAYERS = { "default": { - "BACKEND": "asgi_redis.RedisChannelLayer", + 'BACKEND': 'channels_redis.core.RedisChannelLayer', "CONFIG": { "hosts": [(host, 6379)], }, - "ROUTING": "realtime.routing.channel_routing", }, } + +ASGI_APPLICATION = "realtime.routing.application" diff --git a/requirements.txt b/requirements.txt index 8bf9dc7..3a62801 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,10 @@ -asgi-redis==1.4.3 -Django==1.11.7 +Django django-bootstrap4 -channels==1.1.8 -mongoengine==0.15.0 -pymongo==3.5.1 -pyzmq==16.0.3 -redis==2.10.6 -requests==2.18.4 -simplejson==3.12.0 +channels +channels_redis +mongoengine +pymongo +pyzmq +redis +requests +simplejson \ No newline at end of file diff --git a/scripts/client.py b/scripts/client.py index 8c8df86..c831d5c 100644 --- a/scripts/client.py +++ b/scripts/client.py @@ -13,17 +13,17 @@ REQUEST_TIMEOUT = 5000 RETRIES = 5 SERVER_ENDPOINT = 'tcp://localhost:4444' -context = zmq.Context() -socket = context.socket(zmq.REQ) -socket.connect (SERVER_ENDPOINT) -poll = zmq.Poller() -poll.register(socket, zmq.POLLIN) + def send(dato): ''' Function to send data to server ''' - global socket, poll + context = zmq.Context() + socket = context.socket(zmq.REQ) + socket.connect (SERVER_ENDPOINT) + poll = zmq.Poller() + poll.register(socket, zmq.POLLIN) retries = RETRIES while True: socket.send_json(dato) @@ -31,12 +31,12 @@ def send(dato): if socks.get(socket) == zmq.POLLIN: reply = socket.recv_string() if reply == 'ok': - print('Server replied (%s)' % reply) + print(('Server replied (%s)' % reply)) break else: - print('Malformed reply from server: %s' % reply) + print(('Malformed reply from server: %s' % reply)) else: - print('No response from server, retries left {}'.format(retries)) + print(('No response from server, retries left {}'.format(retries))) socket.setsockopt(zmq.LINGER, 0) socket.close() poll.unregister(socket) @@ -62,27 +62,33 @@ def main(realtime, code, date=None, interval=30): else: dt = date + data = { + 'spc': np.round(np.random.rand(4, 60, 100)*5 + 10, 2).tolist(), + 'rti': np.round(np.random.rand(4, 100)*5 + 10, 2).tolist(), + 'noise': np.round(np.random.rand(4) + np.array([10,11,12,13]), 2).tolist() + } + while True: - print('Sending {} - {}'.format(code, dt)) + print(('Sending {} - {}'.format(code, dt))) dato = { 'time': time.mktime(dt.timetuple()), - 'yrange': np.arange(100).tolist(), - 'xrange': np.arange(-30, 30).tolist(), - 'localtime': True, - 'interval': interval, + 'metadata':{ + 'yrange': np.arange(80, 120, 40/100.).tolist(), + 'xrange': np.arange(-30, 30).tolist(), + 'localtime': True, + 'interval': interval + }, 'exp_code': code, - 'data': { - 'noise': np.round(np.random.rand(4) + np.array([10,11,12,13]), 2).tolist(), - 'rti': np.round(np.random.rand(4, 100)*5 + 10, 2).tolist(), - 'spc': np.round(np.random.rand(4, 60, 100)*5 + 10, 2).tolist(), - } - } + } dt = dt + timedelta(seconds=interval) - t = Thread(target=send, args=(dato, )) - t.start() + for plot, d in data.items(): + dato['plot'] = plot + dato['data'] = d + t = Thread(target=send, args=(dato, )) + t.start() if realtime: time.sleep(interval) else: @@ -92,7 +98,7 @@ if __name__ == '__main__': parser = argparse.ArgumentParser(description='This is a Client for realtime app') parser.add_argument('--date', action='store', default=None, help='format: 2018/02/13 12:23:00') parser.add_argument('-r', action='store_true', dest='realtime', default=None) - parser.add_argument('-c', action='store', dest='code', default='170') + parser.add_argument('-c', action='store', dest='code', default='172') parser.add_argument('-i', action='store', dest='interval', type=int, default=30) results = parser.parse_args() @@ -100,5 +106,5 @@ if __name__ == '__main__': try: results.date = datetime.strptime(results.date, '%Y/%m/%d %H:%M:%S') except: - raise(NameError('You must specify a date (--date) for non-realtime experiment')) + raise NameError('You must specify a date (--date) for non-realtime experiment') main(results.realtime, results.code, results.date, results.interval) diff --git a/scripts/server.py b/scripts/server.py index 4acf906..b44251c 100644 --- a/scripts/server.py +++ b/scripts/server.py @@ -8,21 +8,21 @@ import simplejson from datetime import datetime import time import zmq -import redis -import asgi_redis import mongoengine from threading import Thread sys.path.append(os.environ.get('APP_DIR', '../')) os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings") -from plotter.models import Experiment, ExpMeta, ExpData +from plotter.models import Experiment, ExpDetail, PlotMeta, PlotData host_mongo = os.environ.get('HOST_MONGO', 'localhost') mongoengine.connect('dbplots', host=host_mongo, port=27017) -host_redis = os.environ.get('HOST_REDIS', 'localhost') -channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)]) +import channels.layers +from asgiref.sync import async_to_sync + +channel = channels.layers.get_channel_layer() def loaddata(): print('Loading Experiments...') @@ -31,7 +31,7 @@ def loaddata(): else: file_exp = './experiments.json' for tup in json.load(open(file_exp)): - print(tup['name']) + print(tup) exp = Experiment.objects(code=tup['code']).modify( upsert=True, # To add a new row new=True, @@ -43,29 +43,31 @@ def loaddata(): #============== funcion para modificar datos en la tabla ============== def update(buffer): dt = datetime.utcfromtimestamp(buffer['time']) - print('Updating code={} date={} {}'.format(buffer['exp_code'], dt, datetime.now())) - exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify( - upsert=True, # To add a new row - new=True, - set__code=buffer['exp_code'], + exp = Experiment.objects.get(code=buffer['exp_code']) + + detail = ExpDetail.objects(experiment=exp, date=dt.date()).modify( + upsert=True, + new=True, + set__experiment=exp, set__date=dt.date(), - set__yrange = buffer['yrange'], - set__xrange = buffer['xrange'], - set__interval = buffer['interval'], - set__localtime = buffer['localtime'], - set__plots = buffer['data'].keys() + set__last_time = buffer['time'] + ) + + plot = PlotMeta.objects(exp_detail=detail, plot=buffer['plot']).modify( + upsert=True, + new=True, + set__metadata = buffer['metadata'] ) - exp.save() + #plot.save() - data = ExpData.objects(expmeta=exp, time=buffer['time']).modify( + data = PlotData.objects(plot=plot, time=buffer['time']).modify( upsert=True, # To add a new row new=True, - set__expmeta = exp, set__time = buffer['time'], set__data = buffer['data'] ) - data.save() + #data.save() if datetime.now().date() == dt.date(): return True @@ -77,28 +79,44 @@ def check_times(): while True: dt = datetime.now() - exps = ExpMeta.objects(date=dt.date()) - - for exp in exps: - data = ExpData.objects(expmeta=exp).order_by('-time')[0] #me quedo con el ultimo time del ultimo valor - + exps = ExpDetail.objects(date=dt.date()) + + for detail in exps: + code = detail.experiment.code + plot = detail.plots()[0] + data_time = detail['last_time'] t = time.mktime(dt.timetuple()) - if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC + if plot['metadata']['localtime'] == True: t -= 5*60*60 - if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC - data_time = data['time'] + 5*60*60 + if plot['metadata']['localtime'] == True: + data_time = detail['last_time'] + 5*60*60 - if (t-data['time']) > 6*exp['interval']: - channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'danger', 'time': data_time})}) - print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'offline')) - elif (t-data['time']) > 3*exp['interval']: - channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'warning', 'time': data_time})}) - print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'delayed')) + message = { + 'code': code, + 'time': data_time + } + + if (t-detail['last_time']) > 10*60: + value = 'danger' + print(('{} {} {} {} {}'.format(code, t, detail['last_time'], (t-detail['last_time']), 'offline'))) + elif (t-detail['last_time']) > 5*60: + value = 'warning' + print(('{} {} {} {} {}'.format(code, t, detail['last_time'], (t-detail['last_time']), 'delayed'))) else: - channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'success', 'time': data_time})}) - print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'online')) + value = 'success' + + message['value'] = value + + async_to_sync(channel.group_send)( + 'main', + { + 'type': 'zmq_message', + 'message': json.dumps(message) + } + ) + time.sleep(30) def main(): @@ -113,7 +131,7 @@ def main(): buffer = receiver.recv_json() - if buffer['localtime'] == True: # Ask which type of time is coming: LT o UTC + if buffer['metadata']['localtime'] == True: # Ask which type of time is coming: LT o UTC buffer['time'] -= 5*60*60 if not isinstance(buffer, dict): @@ -123,25 +141,29 @@ def main(): if not update(buffer): receiver.send_string('ok') continue - print("==================================") - for plot in buffer['data']: - dum = buffer.copy() - dum['time'] = [buffer['time']] - if plot=='noise': - dum[plot] = [[x] for x in buffer['data'][plot]] - elif plot=='spc': - dum['noise'] = [[x] for x in buffer['data']['noise']] - dum['spc'] = buffer['data']['spc'] - dum['rti'] = buffer['data']['rti'] - else: - dum[plot] = buffer['data'][plot] - dum.pop('data') - exp_code = dum.pop('exp_code') - channel.send_group( - u'{}_{}'.format(exp_code, plot), - {'text': simplejson.dumps(dum, ignore_nan=True)} - ) - print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum)))) + # for plot in buffer['data']: + # dum = buffer.copy() + # dum['time'] = [buffer['time']] + # if plot=='noise': + # dum[plot] = [[x] for x in buffer['data'][plot]] + # elif plot=='spc': + # dum['noise'] = [[x] for x in buffer['data']['noise']] + # dum['spc'] = buffer['data']['spc'] + # dum['rti'] = buffer['data']['rti'] + # else: + # dum[plot] = buffer['data'][plot] + # dum.pop('data') + # exp_code = dum.pop('exp_code') + # group = '{}_{}'.format(exp_code, plot) + # async_to_sync(channel.group_send)( + # group, + # { + # 'type': 'zmq_message', + # 'message': simplejson.dumps(dum, ignore_nan=True) + # } + # ) + + # print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum)))) receiver.send_string('ok')