consumers.py
115 lines
| 3.8 KiB
| text/x-python
|
PythonLexer
/ plotter / consumers.py
|
r11 | #!/usr/bin/python | ||
# -*- coding: UTF-8 -*- | ||||
|
r1 | import os | ||
r0 | import json | |||
|
r31 | import simplejson | ||
|
r21 | from datetime import datetime, timedelta | ||
|
r27 | import numpy | ||
r0 | from pymongo import MongoClient | |||
|
r22 | import mongoengine | ||
from asgiref.sync import async_to_sync | ||||
from channels.generic.websocket import WebsocketConsumer | ||||
from plotter.models import Experiment, ExpDetail, PlotMeta, PlotData | ||||
r0 | ||||
|
r11 | # Here we create the db named "dbplots" | ||
|
r1 | host = os.environ.get('HOST_MONGO', 'localhost') | ||
|
r22 | mongoengine.connect('dbplots', host=host, port=27017) | ||
# CLIENT = MongoClient('{}:27017'.format(host)) | ||||
# DB = CLIENT['dbplots'] | ||||
class MainConsumer(WebsocketConsumer): | ||||
def connect(self): | ||||
self.group_name = 'main' | ||||
async_to_sync(self.channel_layer.group_add)( | ||||
self.group_name, | ||||
self.channel_name | ||||
) | ||||
self.accept() | ||||
def disconnect(self, close_code): | ||||
async_to_sync(self.channel_layer.group_discard)( | ||||
self.group_name, | ||||
self.channel_name | ||||
) | ||||
def receive(self, text_data): | ||||
pass | ||||
def zmq_message(self, event): | ||||
# Send message to WebSocket | ||||
self.send(text_data=event['message']) | ||||
class PlotConsumer(WebsocketConsumer): | ||||
def connect(self): | ||||
if 'realtime' in self.scope['path']: | ||||
self.realtime = True | ||||
self.group_name = '{}_{}'.format( | ||||
self.scope['url_route']['kwargs']['code'], | ||||
self.scope['url_route']['kwargs']['plot'], | ||||
) | ||||
async_to_sync(self.channel_layer.group_add)( | ||||
self.group_name, | ||||
self.channel_name | ||||
) | ||||
else: | ||||
self.realtime = False | ||||
self.accept() | ||||
def disconnect(self, close_code): | ||||
if self.realtime: | ||||
async_to_sync(self.channel_layer.group_discard)( | ||||
self.group_name, | ||||
self.channel_name | ||||
) | ||||
def receive(self, text_data): | ||||
ret = {} | ||||
dt = datetime.strptime(text_data, '%d-%m-%Y') | ||||
code = self.scope['url_route']['kwargs']['code'] | ||||
plot = self.scope['url_route']['kwargs']['plot'] | ||||
exp = Experiment.objects.get(code=code) | ||||
det0 = ExpDetail.objects(experiment=exp, date=dt-timedelta(days=1)) | ||||
det1 = ExpDetail.objects(experiment=exp, date=dt) | ||||
if det1: | ||||
meta1 = PlotMeta.objects(exp_detail=det1[0], plot=plot) | ||||
if meta1: | ||||
|
r27 | if meta1[0].metadata['type'] in ('pcolor',): | ||
|
r22 | datas = PlotData.objects(plot=meta1[0]).order_by('-time').first() | ||
ret['time'] = [datas['time']] | ||||
|
r27 | ret['data'] = datas['data'] | ||
ret['metadata'] = meta1[0].metadata | ||||
|
r22 | else: | ||
last = det1[0]['last_time'] | ||||
metas = [meta1[0]] | ||||
if det0: | ||||
meta0 = PlotMeta.objects(exp_detail=det0[0], plot=plot) | ||||
if meta0: | ||||
metas.append(meta0[0]) | ||||
datas = PlotData.objects(plot__in=metas, time__gt=last-12*60*60).limit(720) | ||||
dum = [(d['time'], d['data']) for d in datas] | ||||
ret['time'] = [d[0] for d in dum] | ||||
|
r27 | dum = numpy.array([d[1] for d in dum]) | ||
if len(dum[0][0])==1: | ||||
ret['data'] = dum.T[0].tolist() | ||||
else: | ||||
ret['data'] = [t for t in map(list, list(zip(*dum.tolist())))] | ||||
ret['metadata'] = metas[0].metadata | ||||
|
r22 | |||
# exp.pop('date', None) | ||||
# exp.pop('_id', None) | ||||
|
r31 | self.send(simplejson.dumps(ret, ignore_nan=True)) | ||
|
r22 | else: | ||
self.send(json.dumps({'interval': 0})) | ||||
def zmq_message(self, event): | ||||
# Send message to WebSocket | ||||
self.send(text_data=event['message']) | ||||