##// END OF EJS Templates
New template, clean code, workig for realtime
New template, clean code, workig for realtime

File last commit:

r20:bb89e66ce589
r21:dc5f8680b6e1
Show More
server.py
153 lines | 5.2 KiB | text/x-python | PythonLexer
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import os
import sys
import json
import simplejson
from datetime import datetime
import time
import zmq
import redis
import asgi_redis
import mongoengine
from threading import Thread
sys.path.append(os.environ.get('APP_DIR', '../'))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings")
from plotter.models import Experiment, ExpMeta, ExpData
host_mongo = os.environ.get('HOST_MONGO', 'localhost')
mongoengine.connect('dbplots', host=host_mongo, port=27017)
host_redis = os.environ.get('HOST_REDIS', 'localhost')
channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)])
def loaddata():
print('Loading Experiments...')
if os.environ.get('APP_DIR', None) is not None:
file_exp = os.path.join(os.environ.get('APP_DIR'), 'scripts', 'experiments.json')
else:
file_exp = './experiments.json'
for tup in json.load(open(file_exp)):
print(tup['name'])
exp = Experiment.objects(code=tup['code']).modify(
upsert=True, # To add a new row
new=True,
set__code=tup['code'],
set__name=tup['name'],
)
exp.save()
#============== funcion para modificar datos en la tabla ==============
def update(buffer):
dt = datetime.utcfromtimestamp(buffer['time'])
print('Updating code={} date={} {}'.format(buffer['exp_code'], dt, datetime.now()))
exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify(
upsert=True, # To add a new row
new=True,
set__code=buffer['exp_code'],
set__date=dt.date(),
set__yrange = buffer['yrange'],
set__xrange = buffer['xrange'],
set__interval = buffer['interval'],
set__localtime = buffer['localtime'],
set__plots = buffer['data'].keys()
)
exp.save()
data = ExpData.objects(expmeta=exp, time=buffer['time']).modify(
upsert=True, # To add a new row
new=True,
set__expmeta = exp,
set__time = buffer['time'],
set__data = buffer['data']
)
data.save()
if datetime.now().date() == dt.date():
return True
return False
# Function that is checking the state of my clients every 30s
def check_times():
while True:
dt = datetime.now()
exps = ExpMeta.objects(date=dt.date())
for exp in exps:
data = ExpData.objects(expmeta=exp).order_by('-time')[0] #me quedo con el ultimo time del ultimo valor
t = time.mktime(dt.timetuple())
if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC
t -= 5*60*60
if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC
data_time = data['time'] + 5*60*60
if (t-data['time']) > 6*exp['interval']:
channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'danger', 'time': data_time})})
print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'offline'))
elif (t-data['time']) > 3*exp['interval']:
channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'warning', 'time': data_time})})
print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'delayed'))
else:
channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'success', 'time': data_time})})
print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'online'))
time.sleep(30)
def main():
print('Starting ZMQ server...')
context = zmq.Context()
receiver = context.socket(zmq.REP)
receiver.bind("tcp://0.0.0.0:4444")
t = Thread(target=check_times)
t.start()
while True:
buffer = receiver.recv_json()
if buffer['localtime'] == True: # Ask which type of time is coming: LT o UTC
buffer['time'] -= 5*60*60
if not isinstance(buffer, dict):
print('*******************')
print(buffer)
continue
if not update(buffer):
receiver.send_string('ok')
continue
print("==================================")
for plot in buffer['data']:
dum = buffer.copy()
dum['time'] = [buffer['time']]
if plot=='noise':
dum[plot] = [[x] for x in buffer['data'][plot]]
elif plot=='spc':
dum['noise'] = [[x] for x in buffer['data']['noise']]
dum['spc'] = buffer['data']['spc']
dum['rti'] = buffer['data']['rti']
else:
dum[plot] = buffer['data'][plot]
dum.pop('data')
exp_code = dum.pop('exp_code')
channel.send_group(
u'{}_{}'.format(exp_code, plot),
{'text': simplejson.dumps(dum, ignore_nan=True)}
)
print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum))))
receiver.send_string('ok')
receiver.close()
context.term()
if __name__=='__main__':
loaddata()
main()