|
|
import os
|
|
|
import sys
|
|
|
import simplejson
|
|
|
from datetime import datetime
|
|
|
import zmq
|
|
|
import redis
|
|
|
import asgi_redis
|
|
|
import mongoengine
|
|
|
|
|
|
sys.path.append('/app')
|
|
|
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings")
|
|
|
|
|
|
from plotter.models import Experiment, Data
|
|
|
|
|
|
|
|
|
host_mongo = os.environ.get('HOST_MONGO', 'localhost')
|
|
|
mongoengine.connect('dbplots', host=host_mongo, port=27017)
|
|
|
|
|
|
|
|
|
host_redis = os.environ.get('HOST_REDIS', 'localhost')
|
|
|
channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)])
|
|
|
|
|
|
context = zmq.Context()
|
|
|
receiver = context.socket(zmq.SUB)
|
|
|
|
|
|
receiver.bind("tcp://0.0.0.0:4444")
|
|
|
receiver.setsockopt(zmq.SUBSCRIBE, '')
|
|
|
|
|
|
|
|
|
|
|
|
def update_db(buffer):
|
|
|
dt = datetime.utcfromtimestamp(buffer['time'])
|
|
|
exp = Experiment.objects(code=buffer['exp_code'], date=dt.date()).first()
|
|
|
if exp is None:
|
|
|
exp = Experiment(
|
|
|
code=buffer['exp_code'],
|
|
|
date=dt.date(),
|
|
|
yrange = buffer['yrange'],
|
|
|
xrange = buffer['xrange'],
|
|
|
interval = buffer['interval'],
|
|
|
localtime = buffer['localtime'],
|
|
|
name = buffer['name'],
|
|
|
)
|
|
|
exp.save()
|
|
|
|
|
|
data = Data.objects(experiment=exp, time=buffer['time']).first()
|
|
|
|
|
|
if data is None:
|
|
|
data = Data(
|
|
|
experiment = exp,
|
|
|
time = buffer['time'],
|
|
|
data = buffer['data']
|
|
|
).save()
|
|
|
new = True
|
|
|
else:
|
|
|
data.data = buffer['data']
|
|
|
data.save()
|
|
|
new = False
|
|
|
|
|
|
return new
|
|
|
print 'Starting...'
|
|
|
while True:
|
|
|
buffer = receiver.recv_json()
|
|
|
if 'xrange' not in buffer:
|
|
|
buffer['xrange'] = []
|
|
|
if 'name' not in buffer:
|
|
|
buffer['name'] = 'Experiment'
|
|
|
if update_db(buffer):
|
|
|
dum = buffer.copy()
|
|
|
dum['time'] = [buffer['time']]
|
|
|
dum['rti'] = buffer['data']['rti']
|
|
|
# dum['noise'] = buffer['data']['noise']
|
|
|
dum.pop('data')
|
|
|
code = dum.pop('exp_code')
|
|
|
channel.send_group(u'{}_rti'.format(code), {'text': simplejson.dumps(dum, ignore_nan=True)})
|
|
|
print 'Sending...{} - {} bytes'.format('rti', len(str(dum)))
|
|
|
|
|
|
# dum = buffer.copy()
|
|
|
# dum['time'] = [buffer['time']]
|
|
|
# dum['rti'] = buffer['data']['rti']
|
|
|
# dum['spc'] = buffer['data']['spc']
|
|
|
# dum['noise'] = buffer['data']['noise']
|
|
|
# dum.pop('data')
|
|
|
# code = dum.pop('exp_code')
|
|
|
# channel.send_group(u'{}_spc'.format(code), {'text': simplejson.dumps(dum, ignore_nan=True)})
|
|
|
# print 'Sending...{} - {} bytes'.format('spc', len(str(dum)))
|
|
|
|
|
|
# dum = buffer.copy()
|
|
|
# dum['time'] = [buffer['time']]
|
|
|
# dum['noise'] = [[x] for x in buffer['data']['noise']]
|
|
|
# dum.pop('data')
|
|
|
# code = dum.pop('exp_code')
|
|
|
# channel.send_group(u'{}_noise'.format(code), {'text': simplejson.dumps(dum, ignore_nan=True)})
|
|
|
# print 'Sending...{} - {} bytes'.format('noise', len(str(dum)))
|
|
|
|