import os import sys import simplejson from datetime import datetime import zmq import redis import asgi_redis import mongoengine sys.path.append('/app') os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings") from plotter.models import Experiment, Data mongoengine.connect('dbplots', host='mongo', port=27017) context = zmq.Context() receiver = context.socket(zmq.SUB) receiver.bind("tcp://127.0.0.1:4444") receiver.setsockopt(zmq.SUBSCRIBE, '') channel = asgi_redis.RedisChannelLayer(hosts=[('redis', 6379)]) def update_db(buffer): dt = datetime.utcfromtimestamp(buffer['time']) exp = Experiment.objects(code=buffer['exp_code'], date=dt.date()).first() if exp is None: exp = Experiment( code=buffer['exp_code'], date=dt.date(), yrange = buffer['yrange'], xrange = buffer['xrange'], interval = buffer['interval'], localtime = buffer['localtime'], name = buffer['name'], ) exp.save() data = Data.objects(experiment=exp, time=buffer['time']).first() if data is None: data = Data( experiment = exp, time = buffer['time'], data = buffer['data'] ).save() new = True else: data.data = buffer['data'] data.save() new = False return new print 'Waiting for messages...' while True: buffer = receiver.recv_json() if 'xrange' not in buffer: buffer['xrange'] = [] if 'name' not in buffer: buffer['name'] = 'Experiment' if update_db(buffer): dum = buffer.copy() dum['time'] = [buffer['time']] dum['rti'] = buffer['data']['rti'] # dum['noise'] = buffer['data']['noise'] dum.pop('data') code = dum.pop('exp_code') channel.send_group(u'{}_rti'.format(code), {'text': simplejson.dumps(dum, ignore_nan=True)}) print 'Sending...{} - {} bytes'.format('rti', len(str(dum))) # dum = buffer.copy() # dum['time'] = [buffer['time']] # dum['rti'] = buffer['data']['rti'] # dum['spc'] = buffer['data']['spc'] # dum['noise'] = buffer['data']['noise'] # dum.pop('data') # code = dum.pop('exp_code') # channel.send_group(u'{}_spc'.format(code), {'text': simplejson.dumps(dum, ignore_nan=True)}) # print 'Sending...{} - {} bytes'.format('spc', len(str(dum))) # dum = buffer.copy() # dum['time'] = [buffer['time']] # dum['noise'] = [[x] for x in buffer['data']['noise']] # dum.pop('data') # code = dum.pop('exp_code') # channel.send_group(u'{}_noise'.format(code), {'text': simplejson.dumps(dum, ignore_nan=True)}) # print 'Sending...{} - {} bytes'.format('noise', len(str(dum)))