##// END OF EJS Templates
Update templates
Update templates

File last commit:

r34:cc91e3b85c92
r35:1d0c19b50218
Show More
server.py
171 lines | 4.6 KiB | text/x-python | PythonLexer
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import os
import sys
import json
import simplejson
from datetime import datetime
import time
import zmq
import mongoengine
from threading import Thread
sys.path.append(os.environ.get('APP_DIR', '../'))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings")
from plotter.models import Experiment, ExpDetail, PlotMeta, PlotData
host_mongo = os.environ.get('HOST_MONGO', 'localhost')
mongoengine.connect('dbplots', host=host_mongo, port=27017)
import channels.layers
from asgiref.sync import async_to_sync
channel = channels.layers.get_channel_layer()
def loaddata():
print('Loading Experiments...')
if os.environ.get('APP_DIR', None) is not None:
file_exp = os.path.join(os.environ.get('APP_DIR'), 'scripts', 'experiments.json')
else:
file_exp = './experiments.json'
for tup in json.load(open(file_exp)):
print(tup)
exp = Experiment.objects(code=tup['code']).modify(
upsert=True, # To add a new row
new=True,
set__code=tup['code'],
set__name=tup['name'],
)
exp.save()
#============== funcion para modificar datos en la tabla ==============
def update(buffer):
dt = datetime.utcfromtimestamp(buffer['time'])
exp = Experiment.objects.get(code=buffer['code'])
detail = ExpDetail.objects(experiment=exp, date=dt.date()).modify(
upsert=True,
new=True,
set__experiment=exp,
set__date=dt.date(),
set__last_time = buffer['time']
)
label = buffer['plot'].replace(' ', '_').lower()
plot = PlotMeta.objects(exp_detail=detail, plot=label).modify(
upsert=True,
new=True,
set__metadata = buffer['metadata']
)
#plot.save()
data = PlotData.objects(plot=plot, time=buffer['time']).modify(
upsert=True, # To add a new row
new=True,
set__time = buffer['time'],
set__data = buffer['data']
)
#data.save()
if datetime.now().date() == dt.date():
return True
return False
# Function that is checking the state of my clients every 30s
def check_times():
while True:
dt = datetime.now()
exps = ExpDetail.objects(date=dt.date())
for detail in exps:
code = detail.experiment.code
plot = detail.plots()[0]
data_time = detail['last_time']
t = time.mktime(dt.timetuple())
#if plot['metadata']['localtime'] == True:
t -= 5*60*60
data_time = detail['last_time'] + 5*60*60
message = {
'code': code,
'time': data_time
}
if (t-detail['last_time']) > 10*60:
value = 'danger'
elif (t-detail['last_time']) > 5*60:
value = 'warning'
else:
value = 'success'
print(('{} {} {} {} {}'.format(code, t, detail['last_time'], (t-detail['last_time']), value)))
message['value'] = value
async_to_sync(channel.group_send)(
'main',
{
'type': 'zmq_message',
'message': json.dumps(message)
}
)
time.sleep(60)
def main():
print('Starting ZMQ server...')
context = zmq.Context()
receiver = context.socket(zmq.REP)
receiver.bind("tcp://0.0.0.0:4444")
t = Thread(target=check_times)
t.start()
while True:
buffer = receiver.recv_json()
if buffer['metadata']['localtime'] == True: # Ask which type of time is coming: LT o UTC
buffer['time'] -= 5*60*60
if not isinstance(buffer, dict):
print('*******************')
print(buffer)
continue
if not update(buffer):
receiver.send_string('ok')
print('Queeee paso!!!')
continue
buffer['time'] = [buffer['time']]
group = '{}_{}'.format(
buffer['code'],
buffer['plot'].replace(' ', '_').lower()
)
async_to_sync(channel.group_send)(
group,
{
'type': 'zmq_message',
'message': simplejson.dumps(buffer, ignore_nan=True)
}
)
print('Sending to group {}_{} - {} bytes'.format(
buffer['code'],
buffer['plot'].replace(' ', '_').lower(),
len(str(buffer)))
)
receiver.send_string('ok')
receiver.close()
context.term()
if __name__=='__main__':
loaddata()
main()