|
|
#!/usr/bin/python
|
|
|
# -*- coding: UTF-8 -*-
|
|
|
|
|
|
import os
|
|
|
import sys
|
|
|
import json
|
|
|
import simplejson
|
|
|
from datetime import datetime
|
|
|
import time
|
|
|
import zmq
|
|
|
import mongoengine
|
|
|
from threading import Thread
|
|
|
|
|
|
sys.path.append(os.environ.get('APP_DIR', '../'))
|
|
|
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings")
|
|
|
|
|
|
from plotter.models import Experiment, ExpDetail, PlotMeta, PlotData
|
|
|
|
|
|
host_mongo = os.environ.get('HOST_MONGO', 'localhost')
|
|
|
mongoengine.connect('dbplots', host=host_mongo, port=27017)
|
|
|
|
|
|
import channels.layers
|
|
|
from asgiref.sync import async_to_sync
|
|
|
|
|
|
channel = channels.layers.get_channel_layer()
|
|
|
|
|
|
def loaddata():
|
|
|
print('Loading Experiments...')
|
|
|
if os.environ.get('APP_DIR', None) is not None:
|
|
|
file_exp = os.path.join(os.environ.get('APP_DIR'), 'scripts', 'experiments.json')
|
|
|
else:
|
|
|
file_exp = './experiments.json'
|
|
|
for tup in json.load(open(file_exp)):
|
|
|
print(tup)
|
|
|
exp = Experiment.objects(code=tup['code']).modify(
|
|
|
upsert=True, # To add a new row
|
|
|
new=True,
|
|
|
set__code=tup['code'],
|
|
|
set__name=tup['name'],
|
|
|
)
|
|
|
exp.save()
|
|
|
|
|
|
#============== funcion para modificar datos en la tabla ==============
|
|
|
def update(buffer):
|
|
|
dt = datetime.utcfromtimestamp(buffer['time'])
|
|
|
exp = Experiment.objects.get(code=buffer['exp_code'])
|
|
|
|
|
|
detail = ExpDetail.objects(experiment=exp, date=dt.date()).modify(
|
|
|
upsert=True,
|
|
|
new=True,
|
|
|
set__experiment=exp,
|
|
|
set__date=dt.date(),
|
|
|
set__last_time = buffer['time']
|
|
|
)
|
|
|
|
|
|
plot = PlotMeta.objects(exp_detail=detail, plot=buffer['plot']).modify(
|
|
|
upsert=True,
|
|
|
new=True,
|
|
|
set__metadata = buffer['metadata']
|
|
|
)
|
|
|
#plot.save()
|
|
|
|
|
|
data = PlotData.objects(plot=plot, time=buffer['time']).modify(
|
|
|
upsert=True, # To add a new row
|
|
|
new=True,
|
|
|
set__time = buffer['time'],
|
|
|
set__data = buffer['data']
|
|
|
)
|
|
|
|
|
|
#data.save()
|
|
|
|
|
|
if datetime.now().date() == dt.date():
|
|
|
return True
|
|
|
|
|
|
return False
|
|
|
|
|
|
# Function that is checking the state of my clients every 30s
|
|
|
def check_times():
|
|
|
|
|
|
while True:
|
|
|
dt = datetime.now()
|
|
|
exps = ExpDetail.objects(date=dt.date())
|
|
|
|
|
|
for detail in exps:
|
|
|
code = detail.experiment.code
|
|
|
plot = detail.plots()[0]
|
|
|
data_time = detail['last_time']
|
|
|
t = time.mktime(dt.timetuple())
|
|
|
|
|
|
if plot['metadata']['localtime'] == True:
|
|
|
t -= 5*60*60
|
|
|
|
|
|
if plot['metadata']['localtime'] == True:
|
|
|
data_time = detail['last_time'] + 5*60*60
|
|
|
|
|
|
message = {
|
|
|
'code': code,
|
|
|
'time': data_time
|
|
|
}
|
|
|
|
|
|
if (t-detail['last_time']) > 10*60:
|
|
|
value = 'danger'
|
|
|
print(('{} {} {} {} {}'.format(code, t, detail['last_time'], (t-detail['last_time']), 'offline')))
|
|
|
elif (t-detail['last_time']) > 5*60:
|
|
|
value = 'warning'
|
|
|
print(('{} {} {} {} {}'.format(code, t, detail['last_time'], (t-detail['last_time']), 'delayed')))
|
|
|
else:
|
|
|
value = 'success'
|
|
|
|
|
|
message['value'] = value
|
|
|
|
|
|
async_to_sync(channel.group_send)(
|
|
|
'main',
|
|
|
{
|
|
|
'type': 'zmq_message',
|
|
|
'message': json.dumps(message)
|
|
|
}
|
|
|
)
|
|
|
|
|
|
time.sleep(30)
|
|
|
|
|
|
def main():
|
|
|
print('Starting ZMQ server...')
|
|
|
context = zmq.Context()
|
|
|
receiver = context.socket(zmq.REP)
|
|
|
receiver.bind("tcp://0.0.0.0:4444")
|
|
|
t = Thread(target=check_times)
|
|
|
t.start()
|
|
|
|
|
|
while True:
|
|
|
|
|
|
buffer = receiver.recv_json()
|
|
|
|
|
|
if buffer['metadata']['localtime'] == True: # Ask which type of time is coming: LT o UTC
|
|
|
buffer['time'] -= 5*60*60
|
|
|
|
|
|
if not isinstance(buffer, dict):
|
|
|
print('*******************')
|
|
|
print(buffer)
|
|
|
continue
|
|
|
if not update(buffer):
|
|
|
receiver.send_string('ok')
|
|
|
continue
|
|
|
# for plot in buffer['data']:
|
|
|
# dum = buffer.copy()
|
|
|
# dum['time'] = [buffer['time']]
|
|
|
# if plot=='noise':
|
|
|
# dum[plot] = [[x] for x in buffer['data'][plot]]
|
|
|
# elif plot=='spc':
|
|
|
# dum['noise'] = [[x] for x in buffer['data']['noise']]
|
|
|
# dum['spc'] = buffer['data']['spc']
|
|
|
# dum['rti'] = buffer['data']['rti']
|
|
|
# else:
|
|
|
# dum[plot] = buffer['data'][plot]
|
|
|
# dum.pop('data')
|
|
|
# exp_code = dum.pop('exp_code')
|
|
|
# group = '{}_{}'.format(exp_code, plot)
|
|
|
# async_to_sync(channel.group_send)(
|
|
|
# group,
|
|
|
# {
|
|
|
# 'type': 'zmq_message',
|
|
|
# 'message': simplejson.dumps(dum, ignore_nan=True)
|
|
|
# }
|
|
|
# )
|
|
|
|
|
|
# print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum))))
|
|
|
|
|
|
receiver.send_string('ok')
|
|
|
|
|
|
receiver.close()
|
|
|
context.term()
|
|
|
|
|
|
if __name__=='__main__':
|
|
|
loaddata()
|
|
|
main()
|
|
|
|