|
|
import os
|
|
|
import sys
|
|
|
import json
|
|
|
import simplejson
|
|
|
from datetime import datetime
|
|
|
import zmq
|
|
|
import redis
|
|
|
import asgi_redis
|
|
|
import mongoengine
|
|
|
|
|
|
sys.path.append('/app')
|
|
|
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings")
|
|
|
|
|
|
from plotter.models import Experiment, ExpMeta, ExpData
|
|
|
|
|
|
host_mongo = os.environ.get('HOST_MONGO', 'localhost')
|
|
|
mongoengine.connect('dbplots', host=host_mongo, port=27017)
|
|
|
|
|
|
|
|
|
host_redis = os.environ.get('HOST_REDIS', 'localhost')
|
|
|
channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)])
|
|
|
|
|
|
context = zmq.Context()
|
|
|
receiver = context.socket(zmq.SUB)
|
|
|
|
|
|
receiver.bind("tcp://0.0.0.0:4444")
|
|
|
receiver.setsockopt(zmq.SUBSCRIBE, '')
|
|
|
|
|
|
def loaddata():
|
|
|
print('Loading Experiments...')
|
|
|
for tup in json.load(open('scripts/experiments.json')):
|
|
|
print(tup['name'])
|
|
|
exp = Experiment.objects(code=tup['code']).modify(
|
|
|
upsert=True,
|
|
|
new=True,
|
|
|
set__code=tup['code'],
|
|
|
set__name=tup['name'],
|
|
|
)
|
|
|
exp.save()
|
|
|
|
|
|
def update(buffer):
|
|
|
dt = datetime.utcfromtimestamp(buffer['time'])
|
|
|
exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify(
|
|
|
upsert=True,
|
|
|
new=True,
|
|
|
set__code=buffer['exp_code'],
|
|
|
set__date=dt.date(),
|
|
|
set__yrange = buffer['yrange'],
|
|
|
set__xrange = buffer['xrange'],
|
|
|
set__interval = buffer['interval'],
|
|
|
set__localtime = buffer['localtime'],
|
|
|
set__plots = buffer['data'].keys()
|
|
|
)
|
|
|
exp.save()
|
|
|
|
|
|
data = ExpData.objects(expmeta=exp, time=buffer['time']).modify(
|
|
|
upsert=True,
|
|
|
new=True,
|
|
|
set__expmeta = exp,
|
|
|
set__time = buffer['time'],
|
|
|
set__data = buffer['data']
|
|
|
)
|
|
|
|
|
|
data.save()
|
|
|
|
|
|
return exp.id
|
|
|
|
|
|
def main():
|
|
|
print('Starting ZMQ server...')
|
|
|
while True:
|
|
|
buffer = receiver.recv_json()
|
|
|
code = update(buffer)
|
|
|
for plot in buffer['data']:
|
|
|
dum = buffer.copy()
|
|
|
dum['time'] = [buffer['time']]
|
|
|
if plot=='noise':
|
|
|
dum[plot] = [[x] for x in buffer['data'][plot]]
|
|
|
else:
|
|
|
dum[plot] = buffer['data'][plot]
|
|
|
dum.pop('data')
|
|
|
dum.pop('exp_code')
|
|
|
channel.send_group(u'{}_{}'.format(code, plot), {'text': simplejson.dumps(dum, ignore_nan=True)})
|
|
|
print('Sending...{} - {} bytes'.format(plot, len(str(dum))))
|
|
|
|
|
|
if __name__=='__main__':
|
|
|
loaddata()
|
|
|
main()
|