app.py
388 lines
| 12.2 KiB
| text/x-python
|
PythonLexer
r378 | """ | ||
A small Test application to show how to use Flask-MQTT. | |||
""" | |||
import os | |||
import time | |||
import logging | |||
from datetime import datetime | |||
import random | |||
import eventlet | |||
import json | |||
r394 | from json import JSONEncoder | ||
r378 | import numpy | ||
import base64 | |||
import h5py | |||
r393 | try: | ||
import requests | |||
except: | |||
pass | |||
from bs4 import BeautifulSoup | |||
r392 | from flask import Flask, render_template, jsonify, request, redirect | ||
r378 | from flask_mqtt import Mqtt | ||
from flask_socketio import SocketIO | |||
from flask_bootstrap import Bootstrap | |||
eventlet.monkey_patch() | |||
app = Flask(__name__) | |||
app.config['SECRET'] = 'my secret key' | |||
app.config['TEMPLATES_AUTO_RELOAD'] = True | |||
r380 | app.config['MQTT_BROKER_URL'] = os.environ['BROKER_URL'] | ||
r378 | app.config['MQTT_BROKER_PORT'] = 1883 | ||
r405 | app.config['MQTT_CLIENT_ID'] = 'flask_mqtt_{}'.format(int(random.random()*100)) | ||
r378 | app.config['MQTT_CLEAN_SESSION'] = True | ||
app.config['MQTT_USERNAME'] = '' | |||
app.config['MQTT_PASSWORD'] = '' | |||
app.config['MQTT_KEEPALIVE'] = 5 | |||
app.config['MQTT_TLS_ENABLED'] = False | |||
r411 | app.config['MQTT_LAST_WILL_TOPIC'] = 'sophy/error' | ||
r405 | app.config['MQTT_LAST_WILL_MESSAGE'] = 'Bye from flask' | ||
app.config['MQTT_LAST_WILL_QOS'] = 1 | |||
r378 | |||
r405 | mqtt = Mqtt(app) | ||
r394 | |||
r378 | socketio = SocketIO(app) | ||
bootstrap = Bootstrap(app) | |||
MODE_STOP = 0x00 | |||
MODE_SPEED = 0x01 | |||
MODE_POSITION = 0x02 | |||
RX_AZIMUTH = 0x27 | |||
RX_ELEVATION = 0x47 | |||
TX_AZIMUTH = 0x25 | |||
TX_ELEVATION = 0x45 | |||
NONE_AXIS = 0x65 | |||
RX_FUNCTION = 0x30 | |||
TX_FUNCTION = 0x40 | |||
HEADER = 0x7e | |||
MIN_SPEED = -180.0 | |||
MAX_SPEED = 180.0 | |||
SHRT_MIN = -32768 | |||
SHRT_MAX = 32768 | |||
USHRT_MAX = 65535 | |||
DATA_PATH = '/data' | |||
RUNNING = False | |||
r383 | EXPERIMENT = {'name': 'test'} | ||
r394 | DEBUG = False | ||
class NumpyArrayEncoder(JSONEncoder): | |||
def default(self, obj): | |||
if isinstance(obj, numpy.ndarray): | |||
return obj.tolist() | |||
return JSONEncoder.default(self, obj) | |||
r378 | |||
class HDF5File(): | |||
def __init__(self): | |||
r422 | self.first = True | ||
self.data_pos = {'az':[], 'el':[], 'saz':[], 'sel':[], 'timestamp':[]} | |||
self.pos_ready = True | |||
|
r399 | self.pos_time = 0 | |
r393 | self.raw = {} | ||
r378 | self.timestamp = 0 | ||
self.metadata = None | |||
r394 | def reset(self): | ||
r422 | self.first = True | ||
self.data_pos = {'az':[], 'el':[], 'saz':[], 'sel':[], 'timestamp':[]} | |||
|
r399 | self.pos_time = 0 | |
r394 | self.raw = {} | ||
self.timestamp = 0 | |||
r378 | |||
def update(self, data): | |||
|
r399 | for var in ('az', 'el', 'saz', 'sel'): | |
r422 | tmp = numpy.array(data[var][:99])[::4].tolist() | ||
while len(tmp)<25: | |||
tmp.append(numpy.nan) | |||
self.data_pos[var].extend(tmp) | |||
r378 | |||
r435 | self.data_pos['timestamp'].extend(numpy.linspace(int(data['timestamp'])-1, int(data['timestamp'])-0.04, 25).tolist()) | ||
self.timestamp = int(data['timestamp']) | |||
r378 | |||
def write(self, data): | |||
r422 | if self.first: | ||
self.pos_time = int(data['timestamp']) | |||
r438 | self.timestamp = int(data['timestamp']) | ||
r422 | self.first = False | ||
r393 | self.raw = data | ||
r438 | if True: | ||
date_folder = datetime.fromtimestamp(self.timestamp).strftime('%Y-%m-%dT%H-00-00') | |||
filelog = os.path.join(DATA_PATH, EXPERIMENT['name'], 'position', date_folder, "pos@%10.3f.txt" % (self.timestamp)) | |||
if not os.path.exists(os.path.dirname(filelog)): | |||
path = os.path.dirname(filelog) | |||
os.makedirs(path) | |||
f = open(filelog, 'w') | |||
f.write(json.dumps(self.raw, cls=NumpyArrayEncoder)) | |||
f.close() | |||
if int(data['timestamp'])<= self.timestamp: | |||
print('Bad time') | |||
return | |||
r378 | self.update(data) | ||
|
r399 | ||
r422 | if self.pos_ready: | ||
filex = "pos@%10.3f.h5" % (self.pos_time) | |||
date_folder = datetime.fromtimestamp(self.pos_time).strftime('%Y-%m-%dT%H-00-00') | |||
filename = os.path.join(DATA_PATH, EXPERIMENT['name'], 'position', date_folder, filex) | |||
if not os.path.exists(os.path.dirname(filename)): | |||
path = os.path.dirname(filename) | |||
os.makedirs(path) | |||
self.fpos = h5py.File(filename, 'w') | |||
self.pos_ready = False | |||
if datetime.fromtimestamp(self.timestamp).second == 0: | |||
self.pos_time = self.timestamp | |||
grp = self.fpos.create_group("Data") | |||
dset = grp.create_dataset("azi_pos", data=numpy.array(self.data_pos['az'])) | |||
dset = grp.create_dataset("ele_pos", data=numpy.array(self.data_pos['el'])) | |||
dset = grp.create_dataset("azi_speed", data=numpy.array(self.data_pos['saz'])) | |||
dset = grp.create_dataset("ele_speed", data=numpy.array(self.data_pos['sel'])) | |||
dset = grp.create_dataset("utc", data=numpy.array(self.data_pos['timestamp'])) | |||
self.fpos.close() | |||
self.data_pos['az'] = [] | |||
self.data_pos['el'] = [] | |||
self.data_pos['saz'] = [] | |||
self.data_pos['sel'] = [] | |||
self.data_pos['timestamp'] = [] | |||
self.pos_ready = True | |||
|
r399 | ||
r438 | |||
r378 | |||
r394 | |||
r378 | HDF = HDF5File() | ||
def getSpeedPosition(msg_b64): | |||
AzPosition=[] | |||
AzSpeed=[] | |||
ElPosition=[] | |||
ElSpeed=[] | |||
raw = numpy.frombuffer(base64.decodebytes(msg_b64.encode()),numpy.dtype('B')) | |||
raw_size = len(raw) | |||
r383 | Timestamp = (raw[raw_size-4])|(raw[raw_size-3]<<8)|(raw[raw_size-2]<<16)|(raw[raw_size-1]<<24) | ||
# Timestamp = time.time() | |||
r378 | counter = 0 | ||
while counter < raw_size-4: | |||
if raw[counter]==HEADER: | |||
if raw[counter+1]==RX_FUNCTION: | |||
if raw[counter+2]==RX_AZIMUTH or raw[counter+2]==RX_ELEVATION: | |||
iangle = 0.0 | |||
ispeed = 0.0 | |||
hadstuffing = 0 | |||
if (counter+hadstuffing+4<raw_size-4): | |||
if raw[counter+hadstuffing+4]==HEADER-1: | |||
hadstuffing+=1 | |||
iangle = int(0x70|raw[counter+hadstuffing+4]&0x0F) | |||
else: | |||
iangle = int(raw[counter+hadstuffing+4]) | |||
else: | |||
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.") | |||
break | |||
if (counter+hadstuffing+5<raw_size-4): | |||
if raw[counter+hadstuffing+5]==HEADER-1: | |||
hadstuffing+=1 | |||
iangle = iangle + int((0x70|raw[counter+hadstuffing+5]&0x0F)<<8) | |||
else: | |||
iangle = iangle + int(raw[counter+hadstuffing+5]<<8) | |||
else: | |||
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.") | |||
break | |||
if (counter+hadstuffing+6<raw_size-4): | |||
if raw[counter+hadstuffing+6]==HEADER-1: | |||
hadstuffing+=1 | |||
ispeed = int(0x70|raw[counter+hadstuffing+6]&0x0F) | |||
else: | |||
ispeed = int(raw[counter+hadstuffing+6]) | |||
else: | |||
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.") | |||
break | |||
if (counter+hadstuffing+7<raw_size-4): | |||
if raw[counter+hadstuffing+7]==HEADER-1: | |||
hadstuffing+=1 | |||
ispeed = ispeed + int((0x70|raw[counter+hadstuffing+7]&0x0F)<<8) | |||
else: | |||
ispeed = ispeed + int(raw[counter+hadstuffing+7]<<8) | |||
else: | |||
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.") | |||
break | |||
if (counter+2<raw_size-4): | |||
if raw[counter+2]==RX_AZIMUTH: | |||
AzPosition.append(iangle*360.0/USHRT_MAX) | |||
AzSpeed.append(((ispeed-SHRT_MIN)*(MAX_SPEED-MIN_SPEED)/(SHRT_MAX-SHRT_MIN))+MIN_SPEED) | |||
elif raw[counter+2]==RX_ELEVATION: | |||
ElPosition.append(iangle*360.0/USHRT_MAX) | |||
ElSpeed.append(((ispeed-SHRT_MIN)*(MAX_SPEED-MIN_SPEED)/(SHRT_MAX-SHRT_MIN))+MIN_SPEED) | |||
else: | |||
counter+=1 | |||
continue | |||
else: | |||
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.") | |||
break | |||
counter = counter+hadstuffing+13 | |||
else: | |||
counter+=1 | |||
continue | |||
else: | |||
counter+=1 | |||
continue | |||
else: | |||
counter+=1 | |||
continue | |||
r422 | az = numpy.array(AzPosition) | ||
|
r397 | az[numpy.where(az>=359.8)] = 0 | |
r422 | az = numpy.round(az, 2) | ||
saz = numpy.array(AzSpeed) | |||
r378 | saz[numpy.where(saz>=28)] = saz[(saz>=28)]-360 | ||
r422 | saz = numpy.round(saz, 2) | ||
r378 | |||
r422 | el = numpy.array(ElPosition) | ||
r378 | el[numpy.where((190 <= el) & (el <= 360))] = el[(190 <= el) & (el<= 360)] - 360 | ||
r422 | el = numpy.round(el, 2) | ||
r378 | |||
r422 | sel = numpy.array(ElSpeed) | ||
r378 | sel[numpy.where(sel>=28)] = sel[(sel>=28)]-360 | ||
r422 | sel = numpy.round(sel, 2) | ||
r378 | return az, saz, el, sel, int(Timestamp) | ||
r393 | |||
r378 | @app.route('/') | ||
def index(): | |||
r411 | global RUNNING, EXPERIMENT | ||
running = 'Running' if RUNNING else 'Not Running' | |||
r418 | name = '-' if EXPERIMENT['name']=='test' else EXPERIMENT['name'] | ||
r411 | return render_template('index.html', status=running, experiment_name=name) | ||
r378 | |||
r386 | @app.route('/start', methods=['POST']) | ||
r378 | def start_proc(): | ||
r383 | global EXPERIMENT, RUNNING | ||
r386 | EXPERIMENT.update(request.get_json()) | ||
r378 | RUNNING = True | ||
path = os.path.join(DATA_PATH, EXPERIMENT['name']) | |||
r383 | if not os.path.exists(path): | ||
os.makedirs(path) | |||
r378 | fo = open(os.path.join(path, 'experiment.conf'), 'w') | ||
r383 | fo.write(json.dumps(EXPERIMENT, indent=2)) | ||
r378 | fo.close() | ||
r411 | mqtt.publish('sophy/control', json.dumps(EXPERIMENT)) | ||
r418 | socketio.emit('mqtt_message', data={'topic':'monitor', 'status': 'Running', 'name': EXPERIMENT['name']}) | ||
r438 | r = requests.post('http://sirm-schain/start', data=EXPERIMENT) | ||
print('Starting schain: {}'.format(r)) | |||
r405 | |||
r378 | return jsonify({'start': 'ok'}) | ||
@app.route('/stop') | |||
def stop_proc(): | |||
r418 | global RUNNING, DEBUG, HDF, EXPERIMENT | ||
r394 | |||
r378 | RUNNING = False | ||
r394 | DEBUG = False | ||
HDF.reset() | |||
r415 | socketio.emit('mqtt_message', data={'topic':'monitor', 'status': 'Not Running'}) | ||
r418 | mqtt.publish('sophy/control', json.dumps({'stop': 'ok'})) | ||
r438 | r = requests.get('http://sirm-schain/stop') | ||
print('Stopping schain: {}'.format(r)) | |||
r378 | |||
return jsonify({'stop': 'ok'}) | |||
r383 | @app.route('/status') | ||
def status_proc(): | |||
global RUNNING | |||
return jsonify({'status': RUNNING}) | |||
r392 | @app.route('/run') | ||
def run_proc(): | |||
r394 | global RUNNING, EXPERIMENT, DEBUG | ||
if request.args.get('debug', False): | |||
DEBUG = True | |||
r418 | name = request.args.get('name', 'TEST') | ||
path = os.path.join(DATA_PATH, name) | |||
r392 | if not os.path.exists(path): | ||
os.makedirs(path) | |||
r418 | EXPERIMENT['name'] = name | ||
r392 | RUNNING = True | ||
r411 | mqtt.publish('sophy/control', json.dumps(EXPERIMENT)) | ||
r415 | socketio.emit('mqtt_message', data={'topic':'monitor', 'status': 'Running'}) | ||
r411 | |||
r392 | return redirect('/') | ||
r378 | |||
r411 | @app.route('/mqtt') | ||
def mqtt_log(): | |||
return render_template('mqtt.html') | |||
r431 | |||
@app.route('/acq') | |||
def acq_log(): | |||
return render_template('acq.html') | |||
r411 | |||
r378 | @socketio.on('publish') | ||
def handle_publish(json_str): | |||
data = json.loads(json_str) | |||
mqtt.publish(data['topic'], data['message'], data['qos']) | |||
@socketio.on('subscribe') | |||
def handle_subscribe(json_str): | |||
data = json.loads(json_str) | |||
mqtt.subscribe(data['topic'], data['qos']) | |||
@socketio.on('unsubscribe_all') | |||
def handle_unsubscribe_all(): | |||
mqtt.unsubscribe_all() | |||
@mqtt.on_connect() | |||
def handle_connect(client, userdata, flags, rc): | |||
r411 | mqtt.subscribe(os.environ.get('SOPHY_TOPIC', 'sophy/#')) | ||
r378 | |||
@mqtt.on_message() | |||
def handle_mqtt_message(client, userdata, message): | |||
r394 | global RUNNING, HDF | ||
r383 | |||
r378 | payload = message.payload.decode() | ||
r411 | data = {} | ||
r405 | |||
if 'pedestal' in message.topic: | |||
az, saz, el, sel, tm = getSpeedPosition(payload) | |||
r411 | times = numpy.linspace(tm, tm+0.9, 10) | ||
r422 | data['az'] = az[:100][::10].tolist() | ||
data['el'] = el[:100][::10].tolist() | |||
data['saz'] = saz[:100][::10].tolist() | |||
data['sel'] = sel[:100][::10].tolist() | |||
r411 | data['timestamp'] = times.tolist() | ||
r405 | |||
if RUNNING: | |||
r426 | HDF.write({'az':az.tolist(), 'el':el.tolist(), 'saz':saz.tolist(), 'sel':sel.tolist(), 'timestamp':tm}) | ||
r378 | |||
r411 | data['payload'] = payload | ||
data['topic'] = message.topic | |||
r405 | data['qos'] = message.qos | ||
r378 | |||
r405 | socketio.emit('mqtt_message', data=data) | ||
r394 | |||
r378 | @mqtt.on_log() | ||
def handle_logging(client, userdata, level, buf): | |||
r418 | # print(level, buf) | ||
r378 | pass | ||
if __name__ == '__main__': | |||
r438 | socketio.run(app, host='0.0.0.0', port=5000, use_reloader=False, debug=True) |