server.py
88 lines
| 2.8 KiB
| text/x-python
|
PythonLexer
/ scripts / server.py
r0 | import os | |||
import sys | ||||
import simplejson | ||||
from datetime import datetime | ||||
import zmq | ||||
import redis | ||||
import asgi_redis | ||||
import mongoengine | ||||
sys.path.append('/app') | ||||
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings") | ||||
from plotter.models import Experiment, Data | ||||
mongoengine.connect('dbplots', host='mongo', port=27017) | ||||
context = zmq.Context() | ||||
receiver = context.socket(zmq.SUB) | ||||
receiver.bind("tcp://127.0.0.1:4444") | ||||
receiver.setsockopt(zmq.SUBSCRIBE, '') | ||||
channel = asgi_redis.RedisChannelLayer(hosts=[('redis', 6379)]) | ||||
def update_db(buffer): | ||||
dt = datetime.utcfromtimestamp(buffer['time']) | ||||
exp = Experiment.objects(code=buffer['exp_code'], date=dt.date()).first() | ||||
if exp is None: | ||||
exp = Experiment( | ||||
code=buffer['exp_code'], | ||||
date=dt.date(), | ||||
yrange = buffer['yrange'], | ||||
xrange = buffer['xrange'], | ||||
interval = buffer['interval'], | ||||
localtime = buffer['localtime'], | ||||
name = buffer['name'], | ||||
) | ||||
exp.save() | ||||
data = Data.objects(experiment=exp, time=buffer['time']).first() | ||||
if data is None: | ||||
data = Data( | ||||
experiment = exp, | ||||
time = buffer['time'], | ||||
data = buffer['data'] | ||||
).save() | ||||
new = True | ||||
else: | ||||
data.data = buffer['data'] | ||||
data.save() | ||||
new = False | ||||
return new | ||||
print 'Waiting for messages...' | ||||
while True: | ||||
buffer = receiver.recv_json() | ||||
if 'xrange' not in buffer: | ||||
buffer['xrange'] = [] | ||||
if 'name' not in buffer: | ||||
buffer['name'] = 'Experiment' | ||||
if update_db(buffer): | ||||
dum = buffer.copy() | ||||
dum['time'] = [buffer['time']] | ||||
dum['rti'] = buffer['data']['rti'] | ||||
# dum['noise'] = buffer['data']['noise'] | ||||
dum.pop('data') | ||||
code = dum.pop('exp_code') | ||||
channel.send_group(u'{}_rti'.format(code), {'text': simplejson.dumps(dum, ignore_nan=True)}) | ||||
print 'Sending...{} - {} bytes'.format('rti', len(str(dum))) | ||||
# dum = buffer.copy() | ||||
# dum['time'] = [buffer['time']] | ||||
# dum['rti'] = buffer['data']['rti'] | ||||
# dum['spc'] = buffer['data']['spc'] | ||||
# dum['noise'] = buffer['data']['noise'] | ||||
# dum.pop('data') | ||||
# code = dum.pop('exp_code') | ||||
# channel.send_group(u'{}_spc'.format(code), {'text': simplejson.dumps(dum, ignore_nan=True)}) | ||||
# print 'Sending...{} - {} bytes'.format('spc', len(str(dum))) | ||||
# dum = buffer.copy() | ||||
# dum['time'] = [buffer['time']] | ||||
# dum['noise'] = [[x] for x in buffer['data']['noise']] | ||||
# dum.pop('data') | ||||
# code = dum.pop('exp_code') | ||||
# channel.send_group(u'{}_noise'.format(code), {'text': simplejson.dumps(dum, ignore_nan=True)}) | ||||
# print 'Sending...{} - {} bytes'.format('noise', len(str(dum))) | ||||