|
|
"""
|
|
|
A small Test application to show how to use Flask-MQTT.
|
|
|
|
|
|
"""
|
|
|
import os
|
|
|
import time
|
|
|
import logging
|
|
|
from datetime import datetime
|
|
|
import random
|
|
|
import eventlet
|
|
|
import json
|
|
|
from json import JSONEncoder
|
|
|
import numpy
|
|
|
import base64
|
|
|
import h5py
|
|
|
try:
|
|
|
import requests
|
|
|
except:
|
|
|
pass
|
|
|
from bs4 import BeautifulSoup
|
|
|
from flask import Flask, render_template, jsonify, request, redirect
|
|
|
from flask_mqtt import Mqtt
|
|
|
from flask_socketio import SocketIO
|
|
|
from flask_bootstrap import Bootstrap
|
|
|
|
|
|
eventlet.monkey_patch()
|
|
|
|
|
|
app = Flask(__name__)
|
|
|
app.config['SECRET'] = 'my secret key'
|
|
|
app.config['TEMPLATES_AUTO_RELOAD'] = True
|
|
|
app.config['MQTT_BROKER_URL'] = os.environ['BROKER_URL']
|
|
|
app.config['MQTT_BROKER_PORT'] = 1883
|
|
|
app.config['MQTT_CLIENT_ID'] = 'flask_mqtt_sophy'
|
|
|
app.config['MQTT_CLEAN_SESSION'] = True
|
|
|
app.config['MQTT_USERNAME'] = ''
|
|
|
app.config['MQTT_PASSWORD'] = ''
|
|
|
app.config['MQTT_KEEPALIVE'] = 5
|
|
|
app.config['MQTT_TLS_ENABLED'] = False
|
|
|
app.config['MQTT_LAST_WILL_TOPIC'] = 'home/lastwill'
|
|
|
app.config['MQTT_LAST_WILL_MESSAGE'] = 'bye'
|
|
|
app.config['MQTT_LAST_WILL_QOS'] = 2
|
|
|
|
|
|
try:
|
|
|
mqtt = Mqtt(app)
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
socketio = SocketIO(app)
|
|
|
bootstrap = Bootstrap(app)
|
|
|
|
|
|
MODE_STOP = 0x00
|
|
|
MODE_SPEED = 0x01
|
|
|
MODE_POSITION = 0x02
|
|
|
RX_AZIMUTH = 0x27
|
|
|
RX_ELEVATION = 0x47
|
|
|
TX_AZIMUTH = 0x25
|
|
|
TX_ELEVATION = 0x45
|
|
|
NONE_AXIS = 0x65
|
|
|
RX_FUNCTION = 0x30
|
|
|
TX_FUNCTION = 0x40
|
|
|
HEADER = 0x7e
|
|
|
MIN_SPEED = -180.0
|
|
|
MAX_SPEED = 180.0
|
|
|
SHRT_MIN = -32768
|
|
|
SHRT_MAX = 32768
|
|
|
USHRT_MAX = 65535
|
|
|
|
|
|
DATA_PATH = '/data'
|
|
|
RUNNING = False
|
|
|
EXPERIMENT = {'name': 'test'}
|
|
|
DEBUG = False
|
|
|
|
|
|
class NumpyArrayEncoder(JSONEncoder):
|
|
|
def default(self, obj):
|
|
|
if isinstance(obj, numpy.ndarray):
|
|
|
return obj.tolist()
|
|
|
return JSONEncoder.default(self, obj)
|
|
|
|
|
|
class HDF5File():
|
|
|
|
|
|
def __init__(self):
|
|
|
self.data = {}
|
|
|
self.data_pow = {'powa':[], 'powb':[], 'timestamp':[]}
|
|
|
self.power_ready = True
|
|
|
self.power_time = 0
|
|
|
self.raw = {}
|
|
|
self.timestamp = 0
|
|
|
self.enable = False
|
|
|
self.metadata = None
|
|
|
|
|
|
def reset(self):
|
|
|
self.data = {}
|
|
|
self.data_pow = {'powa':[], 'powb':[], 'timestamp':[]}
|
|
|
self.power_ready = True
|
|
|
self.power_time = 0
|
|
|
self.raw = {}
|
|
|
self.timestamp = 0
|
|
|
|
|
|
def add_data(self, var, values):
|
|
|
|
|
|
if len(values) > 0:
|
|
|
delta = numpy.average(values[1:]-values[:-1])
|
|
|
n = 100 - len(values)
|
|
|
if n > 0:
|
|
|
missing = numpy.arange(1, n+1)*delta + values[-1]
|
|
|
values = values.tolist()
|
|
|
values.extend(missing)
|
|
|
|
|
|
self.data[var] = values
|
|
|
|
|
|
def update(self, data):
|
|
|
|
|
|
self.add_data('az', data['az'])
|
|
|
self.add_data('el', data['el'])
|
|
|
self.add_data('saz', data['saz'])
|
|
|
self.add_data('sel', data['sel'])
|
|
|
self.timestamp = int(data['timestamp'])
|
|
|
|
|
|
def create_file(self):
|
|
|
|
|
|
filex = "pos@%10.3f.h5" % (self.timestamp)
|
|
|
date_folder = datetime.fromtimestamp(self.timestamp).strftime('%Y-%m-%dT%H-00-00')
|
|
|
filename = os.path.join(DATA_PATH, EXPERIMENT['name'], 'position', date_folder, filex)
|
|
|
if not os.path.exists(os.path.dirname(filename)):
|
|
|
path = os.path.dirname(filename)
|
|
|
os.makedirs(path)
|
|
|
with h5py.File(filename, 'w') as fp:
|
|
|
grp = fp.create_group("Data")
|
|
|
dset = grp.create_dataset("azi_pos", data=numpy.array(self.data['az']))
|
|
|
dset = grp.create_dataset("ele_pos", data=numpy.array(self.data['el']))
|
|
|
dset = grp.create_dataset("azi_speed", data=numpy.array(self.data['saz']))
|
|
|
dset = grp.create_dataset("ele_speed", data=numpy.array(self.data['sel']))
|
|
|
dset = grp.create_dataset("utc", data=self.timestamp)
|
|
|
if DEBUG:
|
|
|
filelog = filename.replace('.h5', '.txt')
|
|
|
f = open(filelog, 'w')
|
|
|
f.write(json.dumps(self.raw, cls=NumpyArrayEncoder))
|
|
|
f.close()
|
|
|
|
|
|
def write(self, data):
|
|
|
|
|
|
self.raw = data
|
|
|
self.update(data)
|
|
|
self.create_file()
|
|
|
|
|
|
def write_power(self, data):
|
|
|
|
|
|
if self.power_ready:
|
|
|
filex = "pow@%10.3f.h5" % (self.power_time if self.power_time else self.timestamp)
|
|
|
date_folder = datetime.fromtimestamp(self.timestamp).strftime('%Y-%m-%dT%H-00-00')
|
|
|
filename = os.path.join(DATA_PATH, EXPERIMENT['name'], 'power', date_folder, filex)
|
|
|
if not os.path.exists(os.path.dirname(filename)):
|
|
|
path = os.path.dirname(filename)
|
|
|
os.makedirs(path)
|
|
|
self.fp = h5py.File(filename, 'w')
|
|
|
self.power_ready = False
|
|
|
|
|
|
if datetime.fromtimestamp(self.timestamp).second == 0:
|
|
|
self.power_time = self.timestamp
|
|
|
grp = self.fp.create_group("Data")
|
|
|
dset = grp.create_dataset("powa", data=numpy.array(self.data_pow['powa']))
|
|
|
dset = grp.create_dataset("powb", data=numpy.array(self.data_pow['powb']))
|
|
|
dset = grp.create_dataset("utc", data=numpy.array(self.data_pow['timestamp']))
|
|
|
self.fp.close()
|
|
|
self.data_pow['powa'] = []
|
|
|
self.data_pow['powb'] = []
|
|
|
self.data_pow['timestamp'] = []
|
|
|
self.power_ready = True
|
|
|
|
|
|
self.data_pow['powa'].append(data['powa'])
|
|
|
self.data_pow['powb'].append(data['powb'])
|
|
|
self.data_pow['timestamp'].append(self.timestamp)
|
|
|
|
|
|
|
|
|
HDF = HDF5File()
|
|
|
|
|
|
def getSpeedPosition(msg_b64):
|
|
|
AzPosition=[]
|
|
|
AzSpeed=[]
|
|
|
ElPosition=[]
|
|
|
ElSpeed=[]
|
|
|
RawData=[]
|
|
|
raw = numpy.frombuffer(base64.decodebytes(msg_b64.encode()),numpy.dtype('B'))
|
|
|
raw_size = len(raw)
|
|
|
Timestamp = (raw[raw_size-4])|(raw[raw_size-3]<<8)|(raw[raw_size-2]<<16)|(raw[raw_size-1]<<24)
|
|
|
# Timestamp = time.time()
|
|
|
counter = 0
|
|
|
while counter < raw_size-4:
|
|
|
if raw[counter]==HEADER:
|
|
|
if raw[counter+1]==RX_FUNCTION:
|
|
|
if raw[counter+2]==RX_AZIMUTH or raw[counter+2]==RX_ELEVATION:
|
|
|
iangle = 0.0
|
|
|
ispeed = 0.0
|
|
|
hadstuffing = 0
|
|
|
|
|
|
if (counter+hadstuffing+4<raw_size-4):
|
|
|
if raw[counter+hadstuffing+4]==HEADER-1:
|
|
|
hadstuffing+=1
|
|
|
iangle = int(0x70|raw[counter+hadstuffing+4]&0x0F)
|
|
|
else:
|
|
|
iangle = int(raw[counter+hadstuffing+4])
|
|
|
else:
|
|
|
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.")
|
|
|
break
|
|
|
|
|
|
if (counter+hadstuffing+5<raw_size-4):
|
|
|
if raw[counter+hadstuffing+5]==HEADER-1:
|
|
|
hadstuffing+=1
|
|
|
iangle = iangle + int((0x70|raw[counter+hadstuffing+5]&0x0F)<<8)
|
|
|
else:
|
|
|
iangle = iangle + int(raw[counter+hadstuffing+5]<<8)
|
|
|
else:
|
|
|
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.")
|
|
|
break
|
|
|
|
|
|
if (counter+hadstuffing+6<raw_size-4):
|
|
|
if raw[counter+hadstuffing+6]==HEADER-1:
|
|
|
hadstuffing+=1
|
|
|
ispeed = int(0x70|raw[counter+hadstuffing+6]&0x0F)
|
|
|
else:
|
|
|
ispeed = int(raw[counter+hadstuffing+6])
|
|
|
else:
|
|
|
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.")
|
|
|
break
|
|
|
|
|
|
if (counter+hadstuffing+7<raw_size-4):
|
|
|
if raw[counter+hadstuffing+7]==HEADER-1:
|
|
|
hadstuffing+=1
|
|
|
ispeed = ispeed + int((0x70|raw[counter+hadstuffing+7]&0x0F)<<8)
|
|
|
else:
|
|
|
ispeed = ispeed + int(raw[counter+hadstuffing+7]<<8)
|
|
|
else:
|
|
|
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.")
|
|
|
break
|
|
|
|
|
|
if (counter+2<raw_size-4):
|
|
|
if raw[counter+2]==RX_AZIMUTH:
|
|
|
AzPosition.append(iangle*360.0/USHRT_MAX)
|
|
|
AzSpeed.append(((ispeed-SHRT_MIN)*(MAX_SPEED-MIN_SPEED)/(SHRT_MAX-SHRT_MIN))+MIN_SPEED)
|
|
|
elif raw[counter+2]==RX_ELEVATION:
|
|
|
ElPosition.append(iangle*360.0/USHRT_MAX)
|
|
|
ElSpeed.append(((ispeed-SHRT_MIN)*(MAX_SPEED-MIN_SPEED)/(SHRT_MAX-SHRT_MIN))+MIN_SPEED)
|
|
|
else:
|
|
|
counter+=1
|
|
|
continue
|
|
|
else:
|
|
|
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.")
|
|
|
break
|
|
|
|
|
|
counter = counter+hadstuffing+13
|
|
|
else:
|
|
|
counter+=1
|
|
|
continue
|
|
|
else:
|
|
|
counter+=1
|
|
|
continue
|
|
|
else:
|
|
|
counter+=1
|
|
|
continue
|
|
|
|
|
|
az = numpy.array(AzPosition)
|
|
|
az[numpy.where(az>=359.5)] = 0
|
|
|
|
|
|
saz = numpy.array(AzSpeed)
|
|
|
saz[numpy.where(saz>=28)] = saz[(saz>=28)]-360
|
|
|
|
|
|
el = numpy.array(ElPosition)
|
|
|
el[numpy.where((190 <= el) & (el <= 360))] = el[(190 <= el) & (el<= 360)] - 360
|
|
|
|
|
|
sel = numpy.array(ElSpeed)
|
|
|
sel[numpy.where(sel>=28)] = sel[(sel>=28)]-360
|
|
|
|
|
|
return az, saz, el, sel, int(Timestamp)
|
|
|
|
|
|
def get_power(tx):
|
|
|
|
|
|
url = 'http://{}/status.xml'.format(tx)
|
|
|
try:
|
|
|
page = requests.get(url, timeout=0.2)
|
|
|
soup = BeautifulSoup(page.content, 'xml')
|
|
|
power = float(soup.find('bucAOutputpower').text)
|
|
|
except:
|
|
|
power = 0.0
|
|
|
return power
|
|
|
|
|
|
|
|
|
@app.route('/')
|
|
|
def index():
|
|
|
|
|
|
return render_template('index.html')
|
|
|
|
|
|
@app.route('/start', methods=['POST'])
|
|
|
def start_proc():
|
|
|
|
|
|
global EXPERIMENT, RUNNING
|
|
|
|
|
|
EXPERIMENT.update(request.get_json())
|
|
|
RUNNING = True
|
|
|
print(EXPERIMENT, flush=True)
|
|
|
path = os.path.join(DATA_PATH, EXPERIMENT['name'])
|
|
|
if not os.path.exists(path):
|
|
|
os.makedirs(path)
|
|
|
fo = open(os.path.join(path, 'experiment.conf'), 'w')
|
|
|
fo.write(json.dumps(EXPERIMENT, indent=2))
|
|
|
fo.close()
|
|
|
|
|
|
return jsonify({'start': 'ok'})
|
|
|
|
|
|
@app.route('/stop')
|
|
|
def stop_proc():
|
|
|
|
|
|
global RUNNING, DEBUG, HDF
|
|
|
|
|
|
RUNNING = False
|
|
|
DEBUG = False
|
|
|
HDF.reset()
|
|
|
|
|
|
return jsonify({'stop': 'ok'})
|
|
|
|
|
|
@app.route('/status')
|
|
|
def status_proc():
|
|
|
|
|
|
global RUNNING
|
|
|
|
|
|
return jsonify({'status': RUNNING})
|
|
|
|
|
|
@app.route('/run')
|
|
|
def run_proc():
|
|
|
|
|
|
global RUNNING, EXPERIMENT, DEBUG
|
|
|
if request.args.get('debug', False):
|
|
|
DEBUG = True
|
|
|
path = os.path.join(DATA_PATH, 'TEST')
|
|
|
if not os.path.exists(path):
|
|
|
os.makedirs(path)
|
|
|
EXPERIMENT['name'] = 'TEST'
|
|
|
RUNNING = True
|
|
|
|
|
|
return redirect('/')
|
|
|
|
|
|
@socketio.on('publish')
|
|
|
def handle_publish(json_str):
|
|
|
data = json.loads(json_str)
|
|
|
mqtt.publish(data['topic'], data['message'], data['qos'])
|
|
|
|
|
|
@socketio.on('subscribe')
|
|
|
def handle_subscribe(json_str):
|
|
|
data = json.loads(json_str)
|
|
|
mqtt.subscribe(data['topic'], data['qos'])
|
|
|
|
|
|
@socketio.on('unsubscribe_all')
|
|
|
def handle_unsubscribe_all():
|
|
|
mqtt.unsubscribe_all()
|
|
|
|
|
|
@mqtt.on_connect()
|
|
|
def handle_connect(client, userdata, flags, rc):
|
|
|
mqtt.subscribe(os.environ.get('PEDESTAL_TOPIC', 'JRO_topic'))
|
|
|
|
|
|
@mqtt.on_message()
|
|
|
def handle_mqtt_message(client, userdata, message):
|
|
|
|
|
|
global RUNNING, HDF
|
|
|
|
|
|
payload = message.payload.decode()
|
|
|
az, saz, el, sel, tm = getSpeedPosition(payload)
|
|
|
powa = get_power(os.environ['TXA_SITE'])
|
|
|
powb = get_power(os.environ['TXB_SITE'])
|
|
|
times = numpy.arange(tm, tm+1, 0.1, dtype=float)
|
|
|
|
|
|
data = dict(
|
|
|
az =az[::10].tolist(),
|
|
|
el =el[::10].tolist(),
|
|
|
saz =saz[::10].tolist(),
|
|
|
sel =sel[::10].tolist(),
|
|
|
topic = message.topic,
|
|
|
payload = payload,
|
|
|
qos = message.qos,
|
|
|
timestamp = times.tolist(),
|
|
|
powa = [powa],
|
|
|
powb = [powb],
|
|
|
status = 'Running' if RUNNING else 'Not Running'
|
|
|
)
|
|
|
|
|
|
socketio.emit('mqtt_message', data=data)
|
|
|
|
|
|
if RUNNING:
|
|
|
HDF.write({'az':az, 'el':el, 'saz':saz, 'sel':sel, 'timestamp':tm, 'powa':powa, 'powb':powb})
|
|
|
HDF.write_power({'timestamp':tm, 'powa':powa, 'powb':powb})
|
|
|
|
|
|
@mqtt.on_log()
|
|
|
def handle_logging(client, userdata, level, buf):
|
|
|
# print(level, buf)
|
|
|
pass
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
socketio.run(app, host='0.0.0.0', port=5000, use_reloader=False, debug=True)
|
|
|
|