|
|
#!/usr/bin/python
|
|
|
# -*- coding: UTF-8 -*-
|
|
|
|
|
|
import os
|
|
|
import sys
|
|
|
import json
|
|
|
import simplejson
|
|
|
from datetime import datetime
|
|
|
import time
|
|
|
import zmq
|
|
|
import mongoengine
|
|
|
import django
|
|
|
from threading import Thread
|
|
|
|
|
|
sys.path.append(os.environ.get('APP_DIR', '../'))
|
|
|
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings")
|
|
|
django.setup()
|
|
|
|
|
|
from plotter.models import Experiment, ExpDetail, PlotMeta, PlotData
|
|
|
|
|
|
host_mongo = os.environ.get('HOST_MONGO', 'localhost')
|
|
|
mongoengine.connect('dbplots', host=host_mongo, port=27017)
|
|
|
|
|
|
import channels.layers
|
|
|
from asgiref.sync import async_to_sync
|
|
|
|
|
|
channel = channels.layers.get_channel_layer()
|
|
|
|
|
|
def loaddata():
|
|
|
print('Loading Experiments...')
|
|
|
if os.environ.get('APP_DIR', None) is not None:
|
|
|
file_exp = os.path.join(os.environ.get('APP_DIR'), 'scripts', 'experiments.json')
|
|
|
else:
|
|
|
file_exp = './experiments.json'
|
|
|
for tup in json.load(open(file_exp)):
|
|
|
print(tup)
|
|
|
exp = Experiment.objects(code=tup['code']).modify(
|
|
|
upsert=True, # To add a new row
|
|
|
new=True,
|
|
|
set__code=tup['code'],
|
|
|
set__name=tup['name'],
|
|
|
)
|
|
|
exp.save()
|
|
|
|
|
|
#============== funcion para modificar datos en la tabla ==============
|
|
|
def update(buffer):
|
|
|
dt = datetime.utcfromtimestamp(buffer['time'])
|
|
|
interval = buffer['metadata'].pop('interval')
|
|
|
tag = buffer['metadata'].pop('tag') if 'tag' in buffer['metadata'] else ''
|
|
|
exp = Experiment.objects.get(code=buffer['code'])
|
|
|
|
|
|
detail = ExpDetail.objects(experiment=exp, date=dt.date()).modify(
|
|
|
upsert=True,
|
|
|
new=True,
|
|
|
set__experiment=exp,
|
|
|
set__date=dt.date(),
|
|
|
set__last_time = buffer['time'],
|
|
|
set__interval = interval,
|
|
|
set__tag = tag,
|
|
|
)
|
|
|
|
|
|
label = buffer['plot'].replace(' ', '_').lower()
|
|
|
plot = PlotMeta.objects(exp_detail=detail, plot=label).modify(
|
|
|
upsert=True,
|
|
|
new=True,
|
|
|
set__metadata = buffer['metadata']
|
|
|
)
|
|
|
#plot.save()
|
|
|
|
|
|
data = PlotData.objects(plot=plot, time=buffer['time']).modify(
|
|
|
upsert=True, # To add a new row
|
|
|
new=True,
|
|
|
set__time = buffer['time'],
|
|
|
set__data = buffer['data']
|
|
|
)
|
|
|
|
|
|
#data.save()
|
|
|
|
|
|
if datetime.now().date() == dt.date():
|
|
|
return True
|
|
|
|
|
|
return False
|
|
|
|
|
|
# Function that is checking the state of my clients every 30s
|
|
|
def check_times():
|
|
|
|
|
|
while True:
|
|
|
dt = datetime.now()
|
|
|
exps = ExpDetail.objects(date=dt.date())
|
|
|
|
|
|
for detail in exps:
|
|
|
code = detail.experiment.code
|
|
|
plot = detail.plots()[0]
|
|
|
t = time.time()
|
|
|
|
|
|
message = {
|
|
|
'code': code,
|
|
|
'time': detail['last_time']
|
|
|
}
|
|
|
|
|
|
if (t-detail['last_time']) > 6*detail['interval']:
|
|
|
value = 'danger'
|
|
|
elif (t-detail['last_time']) > 3*detail['interval']:
|
|
|
value = 'warning'
|
|
|
else:
|
|
|
value = 'success'
|
|
|
|
|
|
print(('{} {} {} {} {}'.format(code, t, detail['last_time'], (t-detail['last_time']), value)))
|
|
|
|
|
|
message['value'] = value
|
|
|
|
|
|
async_to_sync(channel.group_send)(
|
|
|
'main',
|
|
|
{
|
|
|
'type': 'zmq_message',
|
|
|
'message': json.dumps(message)
|
|
|
}
|
|
|
)
|
|
|
|
|
|
time.sleep(60)
|
|
|
|
|
|
def main():
|
|
|
print('Starting ZMQ server...')
|
|
|
context = zmq.Context()
|
|
|
receiver = context.socket(zmq.REP)
|
|
|
receiver.bind("tcp://0.0.0.0:4444")
|
|
|
t = Thread(target=check_times)
|
|
|
t.start()
|
|
|
|
|
|
while True:
|
|
|
|
|
|
buffer = receiver.recv_json()
|
|
|
if not isinstance(buffer, dict):
|
|
|
print('Invalid data received: {}').format(str(buffer))
|
|
|
continue
|
|
|
|
|
|
if not update(buffer):
|
|
|
print('Updating {} for code {}'.format(
|
|
|
buffer['plot'],
|
|
|
buffer['code']
|
|
|
))
|
|
|
else:
|
|
|
buffer['time'] = [buffer['time']]
|
|
|
group = '{}_{}'.format(
|
|
|
buffer['code'],
|
|
|
buffer['plot'].replace(' ', '_').lower()
|
|
|
)
|
|
|
async_to_sync(channel.group_send)(
|
|
|
group,
|
|
|
{
|
|
|
'type': 'zmq_message',
|
|
|
'message': simplejson.dumps(buffer, ignore_nan=True)
|
|
|
}
|
|
|
)
|
|
|
|
|
|
print('Sending to group {}_{} - {} bytes'.format(
|
|
|
buffer['code'],
|
|
|
buffer['plot'].replace(' ', '_').lower(),
|
|
|
len(str(buffer)))
|
|
|
)
|
|
|
|
|
|
receiver.send_string('ok')
|
|
|
|
|
|
receiver.close()
|
|
|
context.term()
|
|
|
|
|
|
if __name__=='__main__':
|
|
|
loaddata()
|
|
|
main()
|
|
|
|