#!/usr/bin/python # -*- coding: UTF-8 -*- import os import sys import json import simplejson from datetime import datetime import time import zmq import redis import asgi_redis import mongoengine from threading import Thread sys.path.append(os.environ.get('APP_DIR', '../')) os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings") from plotter.models import Experiment, ExpMeta, ExpData host_mongo = os.environ.get('HOST_MONGO', 'localhost') mongoengine.connect('dbplots', host=host_mongo, port=27017) host_redis = os.environ.get('HOST_REDIS', 'localhost') channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)]) def loaddata(): print('Loading Experiments...') if os.environ.get('APP_DIR', None) is not None: file_exp = os.path.join(os.environ.get('APP_DIR'), 'scripts', 'experiments.json') else: file_exp = './experiments.json' for tup in json.load(open(file_exp)): print(tup['name']) exp = Experiment.objects(code=tup['code']).modify( upsert=True, # To add a new row new=True, set__code=tup['code'], set__name=tup['name'], ) exp.save() #============== funcion para modificar datos en la tabla ============== def update(buffer): dt = datetime.utcfromtimestamp(buffer['time']) print('Updating code={} date={} {}'.format(buffer['exp_code'], dt, datetime.now())) exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify( upsert=True, # To add a new row new=True, set__code=buffer['exp_code'], set__date=dt.date(), set__yrange = buffer['yrange'], set__xrange = buffer['xrange'], set__interval = buffer['interval'], set__localtime = buffer['localtime'], set__plots = buffer['data'].keys() ) exp.save() data = ExpData.objects(expmeta=exp, time=buffer['time']).modify( upsert=True, # To add a new row new=True, set__expmeta = exp, set__time = buffer['time'], set__data = buffer['data'] ) data.save() if datetime.now().date() == dt.date(): return True return False # Function that is checking the state of my clients every 30s def check_times(): while True: dt = datetime.now() exps = ExpMeta.objects(date=dt.date()) for exp in exps: data = ExpData.objects(expmeta=exp).order_by('-time')[0] #me quedo con el ultimo time del ultimo valor t = time.mktime(dt.timetuple()) if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC t -= 5*60*60 if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC data_time = data['time'] + 5*60*60 if (t-data['time']) > 6*exp['interval']: channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'danger', 'time': data_time})}) print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'offline')) elif (t-data['time']) > 3*exp['interval']: channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'warning', 'time': data_time})}) print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'delayed')) else: channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'success', 'time': data_time})}) print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'online')) time.sleep(30) def main(): print('Starting ZMQ server...') context = zmq.Context() receiver = context.socket(zmq.REP) receiver.bind("tcp://0.0.0.0:4444") t = Thread(target=check_times) t.start() while True: buffer = receiver.recv_json() if buffer['localtime'] == True: # Ask which type of time is coming: LT o UTC buffer['time'] -= 5*60*60 if not isinstance(buffer, dict): print('*******************') print(buffer) continue if not update(buffer): receiver.send_string('ok') continue print("==================================") for plot in buffer['data']: dum = buffer.copy() dum['time'] = [buffer['time']] if plot=='noise': dum[plot] = [[x] for x in buffer['data'][plot]] elif plot=='spc': dum['noise'] = [[x] for x in buffer['data']['noise']] dum['spc'] = buffer['data']['spc'] dum['rti'] = buffer['data']['rti'] else: dum[plot] = buffer['data'][plot] dum.pop('data') exp_code = dum.pop('exp_code') channel.send_group( u'{}_{}'.format(exp_code, plot), {'text': simplejson.dumps(dum, ignore_nan=True)} ) print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum)))) receiver.send_string('ok') receiver.close() context.term() if __name__=='__main__': loaddata() main()