##// END OF EJS Templates
Test Version
Test Version

File last commit:

r22:3d86891def25
r22:3d86891def25
Show More
server.py
175 lines | 5.1 KiB | text/x-python | PythonLexer
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import os
import sys
import json
import simplejson
from datetime import datetime
import time
import zmq
import mongoengine
from threading import Thread
sys.path.append(os.environ.get('APP_DIR', '../'))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings")
from plotter.models import Experiment, ExpDetail, PlotMeta, PlotData
host_mongo = os.environ.get('HOST_MONGO', 'localhost')
mongoengine.connect('dbplots', host=host_mongo, port=27017)
import channels.layers
from asgiref.sync import async_to_sync
channel = channels.layers.get_channel_layer()
def loaddata():
print('Loading Experiments...')
if os.environ.get('APP_DIR', None) is not None:
file_exp = os.path.join(os.environ.get('APP_DIR'), 'scripts', 'experiments.json')
else:
file_exp = './experiments.json'
for tup in json.load(open(file_exp)):
print(tup)
exp = Experiment.objects(code=tup['code']).modify(
upsert=True, # To add a new row
new=True,
set__code=tup['code'],
set__name=tup['name'],
)
exp.save()
#============== funcion para modificar datos en la tabla ==============
def update(buffer):
dt = datetime.utcfromtimestamp(buffer['time'])
exp = Experiment.objects.get(code=buffer['exp_code'])
detail = ExpDetail.objects(experiment=exp, date=dt.date()).modify(
upsert=True,
new=True,
set__experiment=exp,
set__date=dt.date(),
set__last_time = buffer['time']
)
plot = PlotMeta.objects(exp_detail=detail, plot=buffer['plot']).modify(
upsert=True,
new=True,
set__metadata = buffer['metadata']
)
#plot.save()
data = PlotData.objects(plot=plot, time=buffer['time']).modify(
upsert=True, # To add a new row
new=True,
set__time = buffer['time'],
set__data = buffer['data']
)
#data.save()
if datetime.now().date() == dt.date():
return True
return False
# Function that is checking the state of my clients every 30s
def check_times():
while True:
dt = datetime.now()
exps = ExpDetail.objects(date=dt.date())
for detail in exps:
code = detail.experiment.code
plot = detail.plots()[0]
data_time = detail['last_time']
t = time.mktime(dt.timetuple())
if plot['metadata']['localtime'] == True:
t -= 5*60*60
if plot['metadata']['localtime'] == True:
data_time = detail['last_time'] + 5*60*60
message = {
'code': code,
'time': data_time
}
if (t-detail['last_time']) > 10*60:
value = 'danger'
print(('{} {} {} {} {}'.format(code, t, detail['last_time'], (t-detail['last_time']), 'offline')))
elif (t-detail['last_time']) > 5*60:
value = 'warning'
print(('{} {} {} {} {}'.format(code, t, detail['last_time'], (t-detail['last_time']), 'delayed')))
else:
value = 'success'
message['value'] = value
async_to_sync(channel.group_send)(
'main',
{
'type': 'zmq_message',
'message': json.dumps(message)
}
)
time.sleep(30)
def main():
print('Starting ZMQ server...')
context = zmq.Context()
receiver = context.socket(zmq.REP)
receiver.bind("tcp://0.0.0.0:4444")
t = Thread(target=check_times)
t.start()
while True:
buffer = receiver.recv_json()
if buffer['metadata']['localtime'] == True: # Ask which type of time is coming: LT o UTC
buffer['time'] -= 5*60*60
if not isinstance(buffer, dict):
print('*******************')
print(buffer)
continue
if not update(buffer):
receiver.send_string('ok')
continue
# for plot in buffer['data']:
# dum = buffer.copy()
# dum['time'] = [buffer['time']]
# if plot=='noise':
# dum[plot] = [[x] for x in buffer['data'][plot]]
# elif plot=='spc':
# dum['noise'] = [[x] for x in buffer['data']['noise']]
# dum['spc'] = buffer['data']['spc']
# dum['rti'] = buffer['data']['rti']
# else:
# dum[plot] = buffer['data'][plot]
# dum.pop('data')
# exp_code = dum.pop('exp_code')
# group = '{}_{}'.format(exp_code, plot)
# async_to_sync(channel.group_send)(
# group,
# {
# 'type': 'zmq_message',
# 'message': simplejson.dumps(dum, ignore_nan=True)
# }
# )
# print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum))))
receiver.send_string('ok')
receiver.close()
context.term()
if __name__=='__main__':
loaddata()
main()