##// END OF EJS Templates
First version RTI, Spectra, Noise + Docker
First version RTI, Spectra, Noise + Docker

File last commit:

r0:476448e11002
r0:476448e11002
Show More
server.py
88 lines | 2.8 KiB | text/x-python | PythonLexer
import os
import sys
import simplejson
from datetime import datetime
import zmq
import redis
import asgi_redis
import mongoengine
sys.path.append('/app')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings")
from plotter.models import Experiment, Data
mongoengine.connect('dbplots', host='mongo', port=27017)
context = zmq.Context()
receiver = context.socket(zmq.SUB)
receiver.bind("tcp://127.0.0.1:4444")
receiver.setsockopt(zmq.SUBSCRIBE, '')
channel = asgi_redis.RedisChannelLayer(hosts=[('redis', 6379)])
def update_db(buffer):
dt = datetime.utcfromtimestamp(buffer['time'])
exp = Experiment.objects(code=buffer['exp_code'], date=dt.date()).first()
if exp is None:
exp = Experiment(
code=buffer['exp_code'],
date=dt.date(),
yrange = buffer['yrange'],
xrange = buffer['xrange'],
interval = buffer['interval'],
localtime = buffer['localtime'],
name = buffer['name'],
)
exp.save()
data = Data.objects(experiment=exp, time=buffer['time']).first()
if data is None:
data = Data(
experiment = exp,
time = buffer['time'],
data = buffer['data']
).save()
new = True
else:
data.data = buffer['data']
data.save()
new = False
return new
print 'Waiting for messages...'
while True:
buffer = receiver.recv_json()
if 'xrange' not in buffer:
buffer['xrange'] = []
if 'name' not in buffer:
buffer['name'] = 'Experiment'
if update_db(buffer):
dum = buffer.copy()
dum['time'] = [buffer['time']]
dum['rti'] = buffer['data']['rti']
# dum['noise'] = buffer['data']['noise']
dum.pop('data')
code = dum.pop('exp_code')
channel.send_group(u'{}_rti'.format(code), {'text': simplejson.dumps(dum, ignore_nan=True)})
print 'Sending...{} - {} bytes'.format('rti', len(str(dum)))
# dum = buffer.copy()
# dum['time'] = [buffer['time']]
# dum['rti'] = buffer['data']['rti']
# dum['spc'] = buffer['data']['spc']
# dum['noise'] = buffer['data']['noise']
# dum.pop('data')
# code = dum.pop('exp_code')
# channel.send_group(u'{}_spc'.format(code), {'text': simplejson.dumps(dum, ignore_nan=True)})
# print 'Sending...{} - {} bytes'.format('spc', len(str(dum)))
# dum = buffer.copy()
# dum['time'] = [buffer['time']]
# dum['noise'] = [[x] for x in buffer['data']['noise']]
# dum.pop('data')
# code = dum.pop('exp_code')
# channel.send_group(u'{}_noise'.format(code), {'text': simplejson.dumps(dum, ignore_nan=True)})
# print 'Sending...{} - {} bytes'.format('noise', len(str(dum)))