consumers.py
138 lines
| 4.5 KiB
| text/x-python
|
PythonLexer
/ plotter / consumers.py
|
r11 | #!/usr/bin/python | ||
# -*- coding: UTF-8 -*- | ||||
|
r1 | import os | ||
r0 | import json | |||
|
r31 | import simplejson | ||
|
r21 | from datetime import datetime, timedelta | ||
|
r27 | import numpy | ||
r0 | from pymongo import MongoClient | |||
|
r33 | # import mongoengine | ||
|
r22 | from asgiref.sync import async_to_sync | ||
from channels.generic.websocket import WebsocketConsumer | ||||
|
r33 | # from plotter.models import Experiment, ExpDetail, PlotMeta, PlotData | ||
r0 | ||||
|
r11 | # Here we create the db named "dbplots" | ||
|
r1 | host = os.environ.get('HOST_MONGO', 'localhost') | ||
|
r33 | # mongoengine.connect('dbplots', host=host, port=27017) | ||
|
r22 | |||
|
r33 | CLIENT = MongoClient('{}:27017'.format(host)) | ||
DB = CLIENT['dbplots'] | ||||
|
r22 | |||
class MainConsumer(WebsocketConsumer): | ||||
def connect(self): | ||||
self.group_name = 'main' | ||||
async_to_sync(self.channel_layer.group_add)( | ||||
self.group_name, | ||||
self.channel_name | ||||
) | ||||
self.accept() | ||||
def disconnect(self, close_code): | ||||
async_to_sync(self.channel_layer.group_discard)( | ||||
self.group_name, | ||||
self.channel_name | ||||
) | ||||
def receive(self, text_data): | ||||
pass | ||||
def zmq_message(self, event): | ||||
# Send message to WebSocket | ||||
self.send(text_data=event['message']) | ||||
class PlotConsumer(WebsocketConsumer): | ||||
def connect(self): | ||||
if 'realtime' in self.scope['path']: | ||||
self.realtime = True | ||||
self.group_name = '{}_{}'.format( | ||||
self.scope['url_route']['kwargs']['code'], | ||||
self.scope['url_route']['kwargs']['plot'], | ||||
) | ||||
async_to_sync(self.channel_layer.group_add)( | ||||
self.group_name, | ||||
self.channel_name | ||||
) | ||||
else: | ||||
self.realtime = False | ||||
self.accept() | ||||
def disconnect(self, close_code): | ||||
if self.realtime: | ||||
async_to_sync(self.channel_layer.group_discard)( | ||||
self.group_name, | ||||
self.channel_name | ||||
) | ||||
def receive(self, text_data): | ||||
ret = {} | ||||
dt = datetime.strptime(text_data, '%d-%m-%Y') | ||||
code = self.scope['url_route']['kwargs']['code'] | ||||
plot = self.scope['url_route']['kwargs']['plot'] | ||||
|
r33 | exp = DB.experiment.find_one({'code': int(code)}) | ||
det0 = DB.exp_detail.find_one({ | ||||
'experiment': exp['_id'], | ||||
'date': dt-timedelta(days=1) | ||||
}) | ||||
det1 = DB.exp_detail.find_one({ | ||||
'experiment': exp['_id'], | ||||
'date': dt | ||||
}) | ||||
|
r22 | |||
if det1: | ||||
|
r33 | meta1 = DB.plot_meta.find_one({ | ||
'exp_detail': det1['_id'], | ||||
'plot': plot | ||||
}) | ||||
|
r22 | if meta1: | ||
|
r33 | if meta1['metadata']['type'] in ('pcolor',): | ||
datas = DB.plot_data.find( | ||||
{'plot': meta1['_id']}, | ||||
['time', 'data'], | ||||
sort=[('time', -1)], | ||||
limit=1)[0] | ||||
|
r22 | ret['time'] = [datas['time']] | ||
|
r27 | ret['data'] = datas['data'] | ||
|
r33 | ret['metadata'] = meta1['metadata'] | ||
|
r22 | else: | ||
|
r33 | last = det1['last_time'] | ||
metas = [meta1['_id']] | ||||
|
r22 | if det0: | ||
|
r33 | meta0 = DB.plot_meta.find_one({ | ||
'exp_detail': det0['_id'], | ||||
'plot': plot | ||||
}) | ||||
|
r22 | if meta0: | ||
|
r33 | metas.append(meta0['_id']) | ||
total = DB.plot_data.count_documents({ | ||||
'plot': {'$in': metas}, | ||||
'time': {'$gt': last-12*60*60} | ||||
}) | ||||
skip = 0 if total-720<0 else total-720 | ||||
datas = DB.plot_data.find({ | ||||
'plot': {'$in': metas}, | ||||
'time': {'$gt': last-12*60*60} | ||||
}, ['time', 'data'], sort=[('time', 1)], limit=720, skip=skip) | ||||
|
r22 | dum = [(d['time'], d['data']) for d in datas] | ||
ret['time'] = [d[0] for d in dum] | ||||
|
r27 | dum = numpy.array([d[1] for d in dum]) | ||
|
r33 | |||
|
r27 | if len(dum[0][0])==1: | ||
ret['data'] = dum.T[0].tolist() | ||||
else: | ||||
ret['data'] = [t for t in map(list, list(zip(*dum.tolist())))] | ||||
|
r33 | ret['metadata'] = meta1['metadata'] | ||
|
r31 | self.send(simplejson.dumps(ret, ignore_nan=True)) | ||
|
r22 | else: | ||
self.send(json.dumps({'interval': 0})) | ||||
def zmq_message(self, event): | ||||
# Send message to WebSocket | ||||
self.send(text_data=event['message']) | ||||