server.py
172 lines
| 4.7 KiB
| text/x-python
|
PythonLexer
/ scripts / server.py
|
r10 | #!/usr/bin/python | ||
# -*- coding: UTF-8 -*- | ||||
r0 | import os | |||
import sys | ||||
r2 | import json | |||
r0 | import simplejson | |||
from datetime import datetime | ||||
|
r10 | import time | ||
r0 | import zmq | |||
import mongoengine | ||||
|
r39 | import django | ||
|
r10 | from threading import Thread | ||
r0 | ||||
|
r17 | sys.path.append(os.environ.get('APP_DIR', '../')) | ||
r0 | os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings") | |||
|
r39 | django.setup() | ||
r0 | ||||
|
r22 | from plotter.models import Experiment, ExpDetail, PlotMeta, PlotData | ||
|
r1 | |||
host_mongo = os.environ.get('HOST_MONGO', 'localhost') | ||||
mongoengine.connect('dbplots', host=host_mongo, port=27017) | ||||
|
r22 | import channels.layers | ||
from asgiref.sync import async_to_sync | ||||
channel = channels.layers.get_channel_layer() | ||||
|
r10 | |||
r2 | def loaddata(): | |||
print('Loading Experiments...') | ||||
|
r17 | if os.environ.get('APP_DIR', None) is not None: | ||
file_exp = os.path.join(os.environ.get('APP_DIR'), 'scripts', 'experiments.json') | ||||
else: | ||||
file_exp = './experiments.json' | ||||
for tup in json.load(open(file_exp)): | ||||
|
r22 | print(tup) | ||
r2 | exp = Experiment.objects(code=tup['code']).modify( | |||
|
r10 | upsert=True, # To add a new row | ||
r2 | new=True, | |||
set__code=tup['code'], | ||||
set__name=tup['name'], | ||||
r0 | ) | |||
exp.save() | ||||
r2 | ||||
|
r10 | #============== funcion para modificar datos en la tabla ============== | ||
r2 | def update(buffer): | |||
r45 | dt = datetime.fromtimestamp(buffer['time']) | |||
|
r36 | interval = buffer['metadata'].pop('interval') | ||
|
r39 | tag = buffer['metadata'].pop('tag') if 'tag' in buffer['metadata'] else '' | ||
|
r27 | exp = Experiment.objects.get(code=buffer['code']) | ||
|
r22 | |||
detail = ExpDetail.objects(experiment=exp, date=dt.date()).modify( | ||||
upsert=True, | ||||
new=True, | ||||
set__experiment=exp, | ||||
r2 | set__date=dt.date(), | |||
|
r36 | set__last_time = buffer['time'], | ||
|
r39 | set__interval = interval, | ||
set__tag = tag, | ||||
|
r22 | ) | ||
|
r27 | label = buffer['plot'].replace(' ', '_').lower() | ||
plot = PlotMeta.objects(exp_detail=detail, plot=label).modify( | ||||
|
r22 | upsert=True, | ||
new=True, | ||||
set__metadata = buffer['metadata'] | ||||
r2 | ) | |||
|
r22 | #plot.save() | ||
r0 | ||||
|
r22 | data = PlotData.objects(plot=plot, time=buffer['time']).modify( | ||
|
r10 | upsert=True, # To add a new row | ||
r2 | new=True, | |||
set__time = buffer['time'], | ||||
set__data = buffer['data'] | ||||
) | ||||
r0 | ||||
|
r22 | #data.save() | ||
r0 | ||||
|
r19 | if datetime.now().date() == dt.date(): | ||
return True | ||||
return False | ||||
# Function that is checking the state of my clients every 30s | ||||
|
r10 | def check_times(): | ||
while True: | ||||
|
r19 | dt = datetime.now() | ||
|
r22 | exps = ExpDetail.objects(date=dt.date()) | ||
for detail in exps: | ||||
code = detail.experiment.code | ||||
plot = detail.plots()[0] | ||||
|
r44 | t = time.time() | ||
|
r10 | |||
|
r22 | message = { | ||
'code': code, | ||||
|
r44 | 'time': detail['last_time'] | ||
|
r22 | } | ||
|
r36 | if (t-detail['last_time']) > 6*detail['interval']: | ||
|
r22 | value = 'danger' | ||
|
r36 | elif (t-detail['last_time']) > 3*detail['interval']: | ||
|
r22 | value = 'warning' | ||
|
r10 | else: | ||
|
r22 | value = 'success' | ||
|
r27 | print(('{} {} {} {} {}'.format(code, t, detail['last_time'], (t-detail['last_time']), value))) | ||
|
r22 | message['value'] = value | ||
async_to_sync(channel.group_send)( | ||||
'main', | ||||
{ | ||||
'type': 'zmq_message', | ||||
'message': json.dumps(message) | ||||
} | ||||
) | ||||
|
r27 | time.sleep(60) | ||
r0 | ||||
r2 | def main(): | |||
print('Starting ZMQ server...') | ||||
|
r19 | context = zmq.Context() | ||
receiver = context.socket(zmq.REP) | ||||
receiver.bind("tcp://0.0.0.0:4444") | ||||
|
r10 | t = Thread(target=check_times) | ||
t.start() | ||||
|
r17 | while True: | ||
|
r10 | |||
r45 | try: | |||
buffer = receiver.recv_json() | ||||
except: | ||||
continue | ||||
|
r36 | if not isinstance(buffer, dict): | ||
print('Invalid data received: {}').format(str(buffer)) | ||||
continue | ||||
|
r19 | if not update(buffer): | ||
|
r36 | print('Updating {} for code {}'.format( | ||
buffer['plot'], | ||||
buffer['code'] | ||||
)) | ||||
else: | ||||
buffer['time'] = [buffer['time']] | ||||
group = '{}_{}'.format( | ||||
buffer['code'], | ||||
buffer['plot'].replace(' ', '_').lower() | ||||
) | ||||
async_to_sync(channel.group_send)( | ||||
group, | ||||
{ | ||||
'type': 'zmq_message', | ||||
'message': simplejson.dumps(buffer, ignore_nan=True) | ||||
} | ||||
|
r27 | ) | ||
|
r36 | print('Sending to group {}_{} - {} bytes'.format( | ||
buffer['code'], | ||||
buffer['plot'].replace(' ', '_').lower(), | ||||
len(str(buffer))) | ||||
) | ||||
|
r17 | receiver.send_string('ok') | ||
|
r10 | |||
receiver.close() | ||||
context.term() | ||||
r0 | ||||
r2 | if __name__=='__main__': | |||
loaddata() | ||||
r8 | main() | |||