##// END OF EJS Templates
Fix saving data position
Fix saving data position

File last commit:

r392:16208270cf50
r392:16208270cf50
Show More
app.py
320 lines | 9.6 KiB | text/x-python | PythonLexer
Add Proc & monitor app
r378 """
A small Test application to show how to use Flask-MQTT.
"""
import os
import time
import logging
from datetime import datetime
import random
import eventlet
import json
import numpy
import base64
import h5py
Fix saving data position
r392 from flask import Flask, render_template, jsonify, request, redirect
Add Proc & monitor app
r378 from flask_mqtt import Mqtt
from flask_socketio import SocketIO
from flask_bootstrap import Bootstrap
eventlet.monkey_patch()
app = Flask(__name__)
app.config['SECRET'] = 'my secret key'
app.config['TEMPLATES_AUTO_RELOAD'] = True
Add BROKER_URL env var
r380 app.config['MQTT_BROKER_URL'] = os.environ['BROKER_URL']
Add Proc & monitor app
r378 app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_CLIENT_ID'] = 'flask_mqtt_sophy'
app.config['MQTT_CLEAN_SESSION'] = True
app.config['MQTT_USERNAME'] = ''
app.config['MQTT_PASSWORD'] = ''
app.config['MQTT_KEEPALIVE'] = 5
app.config['MQTT_TLS_ENABLED'] = False
app.config['MQTT_LAST_WILL_TOPIC'] = 'home/lastwill'
app.config['MQTT_LAST_WILL_MESSAGE'] = 'bye'
app.config['MQTT_LAST_WILL_QOS'] = 2
Update experiment start / stop
r386 try:
mqtt = Mqtt(app)
except:
pass
Add Proc & monitor app
r378 socketio = SocketIO(app)
bootstrap = Bootstrap(app)
MODE_STOP = 0x00
MODE_SPEED = 0x01
MODE_POSITION = 0x02
RX_AZIMUTH = 0x27
RX_ELEVATION = 0x47
TX_AZIMUTH = 0x25
TX_ELEVATION = 0x45
NONE_AXIS = 0x65
RX_FUNCTION = 0x30
TX_FUNCTION = 0x40
HEADER = 0x7e
MIN_SPEED = -180.0
MAX_SPEED = 180.0
SHRT_MIN = -32768
SHRT_MAX = 32768
USHRT_MAX = 65535
DATA_PATH = '/data'
RUNNING = False
Review monitor app
r383 EXPERIMENT = {'name': 'test'}
Add Proc & monitor app
r378
class HDF5File():
def __init__(self):
Fix saving data position
r392 self.data = {}
Add Proc & monitor app
r378 self.timestamp = 0
self.enable = False
self.metadata = None
def ready(self):
if len(self.az)<100 or len(self.el)<100 or len(self.saz)<100 or len(self.sel)<100:
return False
return True
def add_data(self, var, values):
Fix saving data position
r392 if len(values) > 0:
delta = numpy.average(values[1:]-values[:-1])
n = 100 - len(values)
# print(values[-1]+delta, values[0], delta, flush=True)
# missing = numpy.arange(values[-1]+delta, values[0], delta)
print(var, n, numpy.arange(1, n+1)*delta + values[-1], flush=True)
if n > 0:
missing = numpy.arange(1, n+1)*delta + values[-1]
values = values.tolist()
values.extend(missing)
Add Proc & monitor app
r378
Fix saving data position
r392 self.data[var] = values
Add Proc & monitor app
r378
def update(self, data):
self.add_data('az', data['az'])
self.add_data('el', data['el'])
self.add_data('saz', data['saz'])
self.add_data('sel', data['sel'])
self.timestamp = int(data['timestamp'])
def create_file(self):
filex = "pos@%10.3f.h5" % (self.timestamp)
Review monitor app
r383 date_folder = datetime.fromtimestamp(self.timestamp).strftime('%Y-%m-%dT%H-00-00')
Add Proc & monitor app
r378 filename = os.path.join(DATA_PATH, EXPERIMENT['name'], 'position', date_folder, filex)
if not os.path.exists(os.path.dirname(filename)):
path = os.path.dirname(filename)
os.makedirs(path)
with h5py.File(filename, 'w') as fp:
grp = fp.create_group("Data")
Fix saving data position
r392 dset = grp.create_dataset("azi_pos", data=numpy.array(self.data['az']))
dset = grp.create_dataset("ele_pos", data=numpy.array(self.data['el']))
dset = grp.create_dataset("azi_speed", data=numpy.array(self.data['saz']))
dset = grp.create_dataset("ele_speed", data=numpy.array(self.data['sel']))
dset = grp.create_dataset("utc", data=self.timestamp)
Add Proc & monitor app
r378
def write(self, data):
self.update(data)
self.create_file()
HDF = HDF5File()
def getSpeedPosition(msg_b64):
AzPosition=[]
AzSpeed=[]
ElPosition=[]
ElSpeed=[]
RawData=[]
raw = numpy.frombuffer(base64.decodebytes(msg_b64.encode()),numpy.dtype('B'))
raw_size = len(raw)
Review monitor app
r383 Timestamp = (raw[raw_size-4])|(raw[raw_size-3]<<8)|(raw[raw_size-2]<<16)|(raw[raw_size-1]<<24)
# Timestamp = time.time()
Add Proc & monitor app
r378 counter = 0
while counter < raw_size-4:
if raw[counter]==HEADER:
if raw[counter+1]==RX_FUNCTION:
if raw[counter+2]==RX_AZIMUTH or raw[counter+2]==RX_ELEVATION:
iangle = 0.0
ispeed = 0.0
hadstuffing = 0
if (counter+hadstuffing+4<raw_size-4):
if raw[counter+hadstuffing+4]==HEADER-1:
hadstuffing+=1
iangle = int(0x70|raw[counter+hadstuffing+4]&0x0F)
else:
iangle = int(raw[counter+hadstuffing+4])
else:
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.")
break
if (counter+hadstuffing+5<raw_size-4):
if raw[counter+hadstuffing+5]==HEADER-1:
hadstuffing+=1
iangle = iangle + int((0x70|raw[counter+hadstuffing+5]&0x0F)<<8)
else:
iangle = iangle + int(raw[counter+hadstuffing+5]<<8)
else:
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.")
break
if (counter+hadstuffing+6<raw_size-4):
if raw[counter+hadstuffing+6]==HEADER-1:
hadstuffing+=1
ispeed = int(0x70|raw[counter+hadstuffing+6]&0x0F)
else:
ispeed = int(raw[counter+hadstuffing+6])
else:
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.")
break
if (counter+hadstuffing+7<raw_size-4):
if raw[counter+hadstuffing+7]==HEADER-1:
hadstuffing+=1
ispeed = ispeed + int((0x70|raw[counter+hadstuffing+7]&0x0F)<<8)
else:
ispeed = ispeed + int(raw[counter+hadstuffing+7]<<8)
else:
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.")
break
if (counter+2<raw_size-4):
if raw[counter+2]==RX_AZIMUTH:
AzPosition.append(iangle*360.0/USHRT_MAX)
AzSpeed.append(((ispeed-SHRT_MIN)*(MAX_SPEED-MIN_SPEED)/(SHRT_MAX-SHRT_MIN))+MIN_SPEED)
elif raw[counter+2]==RX_ELEVATION:
ElPosition.append(iangle*360.0/USHRT_MAX)
ElSpeed.append(((ispeed-SHRT_MIN)*(MAX_SPEED-MIN_SPEED)/(SHRT_MAX-SHRT_MIN))+MIN_SPEED)
else:
counter+=1
continue
else:
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.")
break
counter = counter+hadstuffing+13
else:
counter+=1
continue
else:
counter+=1
continue
else:
counter+=1
continue
az = numpy.array(AzPosition)
az[numpy.where(az>=359.5)] = 0
saz = numpy.array(AzSpeed)
saz[numpy.where(saz>=28)] = saz[(saz>=28)]-360
el = numpy.array(ElPosition)
el[numpy.where((190 <= el) & (el <= 360))] = el[(190 <= el) & (el<= 360)] - 360
sel = numpy.array(ElSpeed)
sel[numpy.where(sel>=28)] = sel[(sel>=28)]-360
return az, saz, el, sel, int(Timestamp)
@app.route('/')
def index():
return render_template('index.html')
Update experiment start / stop
r386 @app.route('/start', methods=['POST'])
Add Proc & monitor app
r378 def start_proc():
Review monitor app
r383 global EXPERIMENT, RUNNING
Update experiment start / stop
r386 EXPERIMENT.update(request.get_json())
Add Proc & monitor app
r378 RUNNING = True
Update experiment start / stop
r386 print(EXPERIMENT, flush=True)
Add Proc & monitor app
r378 path = os.path.join(DATA_PATH, EXPERIMENT['name'])
Review monitor app
r383 if not os.path.exists(path):
os.makedirs(path)
Add Proc & monitor app
r378 fo = open(os.path.join(path, 'experiment.conf'), 'w')
Review monitor app
r383 fo.write(json.dumps(EXPERIMENT, indent=2))
Add Proc & monitor app
r378 fo.close()
return jsonify({'start': 'ok'})
@app.route('/stop')
def stop_proc():
Review monitor app
r383 global RUNNING
Add Proc & monitor app
r378 RUNNING = False
return jsonify({'stop': 'ok'})
Review monitor app
r383 @app.route('/status')
def status_proc():
global RUNNING
return jsonify({'status': RUNNING})
Fix saving data position
r392 @app.route('/run')
def run_proc():
global RUNNING, EXPERIMENT
path = os.path.join(DATA_PATH, 'TEST')
if not os.path.exists(path):
os.makedirs(path)
EXPERIMENT['name'] = 'TEST'
RUNNING = True
return redirect('/')
Add Proc & monitor app
r378
@socketio.on('publish')
def handle_publish(json_str):
data = json.loads(json_str)
mqtt.publish(data['topic'], data['message'], data['qos'])
@socketio.on('subscribe')
def handle_subscribe(json_str):
data = json.loads(json_str)
mqtt.subscribe(data['topic'], data['qos'])
@socketio.on('unsubscribe_all')
def handle_unsubscribe_all():
mqtt.unsubscribe_all()
@mqtt.on_connect()
def handle_connect(client, userdata, flags, rc):
Review monitor app
r383 mqtt.subscribe(os.environ.get('PEDESTAL_TOPIC', 'JRO_topic'))
Add Proc & monitor app
r378
@mqtt.on_message()
def handle_mqtt_message(client, userdata, message):
Review monitor app
r383 global RUNNING
Add Proc & monitor app
r378 payload = message.payload.decode()
az, saz, el, sel, tm = getSpeedPosition(payload)
if RUNNING:
HDF.write({'az':az, 'el':el, 'saz':saz, 'sel':sel, 'timestamp':tm})
times = numpy.arange(tm, tm+1, 0.1, dtype=float)
data = dict(
az =az[::10].tolist(),
el =el[::10].tolist(),
saz =saz[::10].tolist(),
Typo in elevation speed
r388 sel =sel[::10].tolist(),
Add Proc & monitor app
r378 topic = message.topic,
payload = payload,
qos = message.qos,
Review monitor app
r383 timestamp = times.tolist(),
status = 'Running' if RUNNING else 'Not Running'
Add Proc & monitor app
r378 )
socketio.emit('mqtt_message', data=data)
@mqtt.on_log()
def handle_logging(client, userdata, level, buf):
# print(level, buf)
pass
if __name__ == '__main__':
socketio.run(app, host='0.0.0.0', port=5000, use_reloader=False, debug=True)