#!/usr/bin/python # -*- coding: UTF-8 -*- import os import sys import json import simplejson from datetime import datetime import time import zmq import mongoengine from threading import Thread sys.path.append(os.environ.get('APP_DIR', '../')) os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings") from plotter.models import Experiment, ExpDetail, PlotMeta, PlotData host_mongo = os.environ.get('HOST_MONGO', 'localhost') mongoengine.connect('dbplots', host=host_mongo, port=27017) import channels.layers from asgiref.sync import async_to_sync channel = channels.layers.get_channel_layer() def loaddata(): print('Loading Experiments...') if os.environ.get('APP_DIR', None) is not None: file_exp = os.path.join(os.environ.get('APP_DIR'), 'scripts', 'experiments.json') else: file_exp = './experiments.json' for tup in json.load(open(file_exp)): print(tup) exp = Experiment.objects(code=tup['code']).modify( upsert=True, # To add a new row new=True, set__code=tup['code'], set__name=tup['name'], ) exp.save() #============== funcion para modificar datos en la tabla ============== def update(buffer): dt = datetime.utcfromtimestamp(buffer['time']) exp = Experiment.objects.get(code=buffer['exp_code']) detail = ExpDetail.objects(experiment=exp, date=dt.date()).modify( upsert=True, new=True, set__experiment=exp, set__date=dt.date(), set__last_time = buffer['time'] ) plot = PlotMeta.objects(exp_detail=detail, plot=buffer['plot']).modify( upsert=True, new=True, set__metadata = buffer['metadata'] ) #plot.save() data = PlotData.objects(plot=plot, time=buffer['time']).modify( upsert=True, # To add a new row new=True, set__time = buffer['time'], set__data = buffer['data'] ) #data.save() if datetime.now().date() == dt.date(): return True return False # Function that is checking the state of my clients every 30s def check_times(): while True: dt = datetime.now() exps = ExpDetail.objects(date=dt.date()) for detail in exps: code = detail.experiment.code plot = detail.plots()[0] data_time = detail['last_time'] t = time.mktime(dt.timetuple()) if plot['metadata']['localtime'] == True: t -= 5*60*60 if plot['metadata']['localtime'] == True: data_time = detail['last_time'] + 5*60*60 message = { 'code': code, 'time': data_time } if (t-detail['last_time']) > 10*60: value = 'danger' print(('{} {} {} {} {}'.format(code, t, detail['last_time'], (t-detail['last_time']), 'offline'))) elif (t-detail['last_time']) > 5*60: value = 'warning' print(('{} {} {} {} {}'.format(code, t, detail['last_time'], (t-detail['last_time']), 'delayed'))) else: value = 'success' message['value'] = value async_to_sync(channel.group_send)( 'main', { 'type': 'zmq_message', 'message': json.dumps(message) } ) time.sleep(30) def main(): print('Starting ZMQ server...') context = zmq.Context() receiver = context.socket(zmq.REP) receiver.bind("tcp://0.0.0.0:4444") t = Thread(target=check_times) t.start() while True: buffer = receiver.recv_json() if buffer['metadata']['localtime'] == True: # Ask which type of time is coming: LT o UTC buffer['time'] -= 5*60*60 if not isinstance(buffer, dict): print('*******************') print(buffer) continue if not update(buffer): receiver.send_string('ok') continue # for plot in buffer['data']: # dum = buffer.copy() # dum['time'] = [buffer['time']] # if plot=='noise': # dum[plot] = [[x] for x in buffer['data'][plot]] # elif plot=='spc': # dum['noise'] = [[x] for x in buffer['data']['noise']] # dum['spc'] = buffer['data']['spc'] # dum['rti'] = buffer['data']['rti'] # else: # dum[plot] = buffer['data'][plot] # dum.pop('data') # exp_code = dum.pop('exp_code') # group = '{}_{}'.format(exp_code, plot) # async_to_sync(channel.group_send)( # group, # { # 'type': 'zmq_message', # 'message': simplejson.dumps(dum, ignore_nan=True) # } # ) # print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum)))) receiver.send_string('ok') receiver.close() context.term() if __name__=='__main__': loaddata() main()