""" A small Test application to show how to use Flask-MQTT. """ import os import time import logging from datetime import datetime import random import eventlet import json from json import JSONEncoder import numpy import base64 import h5py try: import requests except: pass from bs4 import BeautifulSoup from flask import Flask, render_template, jsonify, request, redirect from flask_mqtt import Mqtt from flask_socketio import SocketIO from flask_bootstrap import Bootstrap eventlet.monkey_patch() app = Flask(__name__) app.config['SECRET'] = 'my secret key' app.config['TEMPLATES_AUTO_RELOAD'] = True app.config['MQTT_BROKER_URL'] = os.environ['BROKER_URL'] app.config['MQTT_BROKER_PORT'] = 1883 app.config['MQTT_CLIENT_ID'] = 'flask_mqtt_sophy' app.config['MQTT_CLEAN_SESSION'] = True app.config['MQTT_USERNAME'] = '' app.config['MQTT_PASSWORD'] = '' app.config['MQTT_KEEPALIVE'] = 5 app.config['MQTT_TLS_ENABLED'] = False app.config['MQTT_LAST_WILL_TOPIC'] = 'home/lastwill' app.config['MQTT_LAST_WILL_MESSAGE'] = 'bye' app.config['MQTT_LAST_WILL_QOS'] = 2 try: mqtt = Mqtt(app) except: pass socketio = SocketIO(app) bootstrap = Bootstrap(app) MODE_STOP = 0x00 MODE_SPEED = 0x01 MODE_POSITION = 0x02 RX_AZIMUTH = 0x27 RX_ELEVATION = 0x47 TX_AZIMUTH = 0x25 TX_ELEVATION = 0x45 NONE_AXIS = 0x65 RX_FUNCTION = 0x30 TX_FUNCTION = 0x40 HEADER = 0x7e MIN_SPEED = -180.0 MAX_SPEED = 180.0 SHRT_MIN = -32768 SHRT_MAX = 32768 USHRT_MAX = 65535 DATA_PATH = '/data' RUNNING = False EXPERIMENT = {'name': 'test'} DEBUG = False class NumpyArrayEncoder(JSONEncoder): def default(self, obj): if isinstance(obj, numpy.ndarray): return obj.tolist() return JSONEncoder.default(self, obj) class HDF5File(): def __init__(self): self.data = {} self.data_pow = {'powa':[], 'powb':[], 'timestamp':[]} self.power_ready = True self.power_time = 0 self.raw = {} self.timestamp = 0 self.enable = False self.metadata = None def reset(self): self.data = {} self.data_pow = {'powa':[], 'powb':[], 'timestamp':[]} self.power_ready = True self.power_time = 0 self.raw = {} self.timestamp = 0 def add_data(self, var, values): if len(values) > 0: delta = numpy.average(values[1:]-values[:-1]) n = 100 - len(values) if n > 0: missing = numpy.arange(1, n+1)*delta + values[-1] values = values.tolist() values.extend(missing) self.data[var] = values def update(self, data): self.add_data('az', data['az']) self.add_data('el', data['el']) self.add_data('saz', data['saz']) self.add_data('sel', data['sel']) self.timestamp = int(data['timestamp']) def create_file(self): filex = "pos@%10.3f.h5" % (self.timestamp) date_folder = datetime.fromtimestamp(self.timestamp).strftime('%Y-%m-%dT%H-00-00') filename = os.path.join(DATA_PATH, EXPERIMENT['name'], 'position', date_folder, filex) if not os.path.exists(os.path.dirname(filename)): path = os.path.dirname(filename) os.makedirs(path) with h5py.File(filename, 'w') as fp: grp = fp.create_group("Data") dset = grp.create_dataset("azi_pos", data=numpy.array(self.data['az'])) dset = grp.create_dataset("ele_pos", data=numpy.array(self.data['el'])) dset = grp.create_dataset("azi_speed", data=numpy.array(self.data['saz'])) dset = grp.create_dataset("ele_speed", data=numpy.array(self.data['sel'])) dset = grp.create_dataset("utc", data=self.timestamp) if DEBUG: filelog = filename.replace('.h5', '.txt') f = open(filelog, 'w') f.write(json.dumps(self.raw, cls=NumpyArrayEncoder)) f.close() def write(self, data): self.raw = data self.update(data) self.create_file() def write_power(self, data): if self.power_ready: filex = "pow@%10.3f.h5" % (self.power_time if self.power_time else self.timestamp) date_folder = datetime.fromtimestamp(self.timestamp).strftime('%Y-%m-%dT%H-00-00') filename = os.path.join(DATA_PATH, EXPERIMENT['name'], 'power', date_folder, filex) if not os.path.exists(os.path.dirname(filename)): path = os.path.dirname(filename) os.makedirs(path) self.fp = h5py.File(filename, 'w') self.power_ready = False if datetime.fromtimestamp(self.timestamp).second == 0: self.power_time = self.timestamp grp = self.fp.create_group("Data") dset = grp.create_dataset("powa", data=numpy.array(self.data_pow['powa'])) dset = grp.create_dataset("powb", data=numpy.array(self.data_pow['powb'])) dset = grp.create_dataset("utc", data=numpy.array(self.data_pow['timestamp'])) self.fp.close() self.data_pow['powa'] = [] self.data_pow['powb'] = [] self.data_pow['timestamp'] = [] self.power_ready = True self.data_pow['powa'].append(data['powa']) self.data_pow['powb'].append(data['powb']) self.data_pow['timestamp'].append(self.timestamp) HDF = HDF5File() def getSpeedPosition(msg_b64): AzPosition=[] AzSpeed=[] ElPosition=[] ElSpeed=[] RawData=[] raw = numpy.frombuffer(base64.decodebytes(msg_b64.encode()),numpy.dtype('B')) raw_size = len(raw) Timestamp = (raw[raw_size-4])|(raw[raw_size-3]<<8)|(raw[raw_size-2]<<16)|(raw[raw_size-1]<<24) # Timestamp = time.time() counter = 0 while counter < raw_size-4: if raw[counter]==HEADER: if raw[counter+1]==RX_FUNCTION: if raw[counter+2]==RX_AZIMUTH or raw[counter+2]==RX_ELEVATION: iangle = 0.0 ispeed = 0.0 hadstuffing = 0 if (counter+hadstuffing+4=359.5)] = 0 saz = numpy.array(AzSpeed) saz[numpy.where(saz>=28)] = saz[(saz>=28)]-360 el = numpy.array(ElPosition) el[numpy.where((190 <= el) & (el <= 360))] = el[(190 <= el) & (el<= 360)] - 360 sel = numpy.array(ElSpeed) sel[numpy.where(sel>=28)] = sel[(sel>=28)]-360 return az, saz, el, sel, int(Timestamp) def get_power(tx): url = 'http://{}/status.xml'.format(tx) try: page = requests.get(url, timeout=0.2) soup = BeautifulSoup(page.content, 'xml') power = float(soup.find('bucAOutputpower').text) except: power = 0.0 return power @app.route('/') def index(): return render_template('index.html') @app.route('/start', methods=['POST']) def start_proc(): global EXPERIMENT, RUNNING EXPERIMENT.update(request.get_json()) RUNNING = True print(EXPERIMENT, flush=True) path = os.path.join(DATA_PATH, EXPERIMENT['name']) if not os.path.exists(path): os.makedirs(path) fo = open(os.path.join(path, 'experiment.conf'), 'w') fo.write(json.dumps(EXPERIMENT, indent=2)) fo.close() return jsonify({'start': 'ok'}) @app.route('/stop') def stop_proc(): global RUNNING, DEBUG, HDF RUNNING = False DEBUG = False HDF.reset() return jsonify({'stop': 'ok'}) @app.route('/status') def status_proc(): global RUNNING return jsonify({'status': RUNNING}) @app.route('/run') def run_proc(): global RUNNING, EXPERIMENT, DEBUG if request.args.get('debug', False): DEBUG = True path = os.path.join(DATA_PATH, 'TEST') if not os.path.exists(path): os.makedirs(path) EXPERIMENT['name'] = 'TEST' RUNNING = True return redirect('/') @socketio.on('publish') def handle_publish(json_str): data = json.loads(json_str) mqtt.publish(data['topic'], data['message'], data['qos']) @socketio.on('subscribe') def handle_subscribe(json_str): data = json.loads(json_str) mqtt.subscribe(data['topic'], data['qos']) @socketio.on('unsubscribe_all') def handle_unsubscribe_all(): mqtt.unsubscribe_all() @mqtt.on_connect() def handle_connect(client, userdata, flags, rc): mqtt.subscribe(os.environ.get('PEDESTAL_TOPIC', 'JRO_topic')) @mqtt.on_message() def handle_mqtt_message(client, userdata, message): global RUNNING, HDF payload = message.payload.decode() az, saz, el, sel, tm = getSpeedPosition(payload) powa = get_power(os.environ['TXA_SITE']) powb = get_power(os.environ['TXB_SITE']) times = numpy.arange(tm, tm+1, 0.1, dtype=float) data = dict( az =az[::10].tolist(), el =el[::10].tolist(), saz =saz[::10].tolist(), sel =sel[::10].tolist(), topic = message.topic, payload = payload, qos = message.qos, timestamp = times.tolist(), powa = [powa], powb = [powb], status = 'Running' if RUNNING else 'Not Running' ) socketio.emit('mqtt_message', data=data) if RUNNING: HDF.write({'az':az, 'el':el, 'saz':saz, 'sel':sel, 'timestamp':tm, 'powa':powa, 'powb':powb}) HDF.write_power({'timestamp':tm, 'powa':powa, 'powb':powb}) @mqtt.on_log() def handle_logging(client, userdata, level, buf): # print(level, buf) pass if __name__ == '__main__': socketio.run(app, host='0.0.0.0', port=5000, use_reloader=False, debug=True)