##// END OF EJS Templates
1)Update schain contenainer - 2)Add acq template - 3)Add AZ OFFSET pedestal parameter - 4)Add scheduler (sudo crontab) - 5)Add sirm-job container
1)Update schain contenainer - 2)Add acq template - 3)Add AZ OFFSET pedestal parameter - 4)Add scheduler (sudo crontab) - 5)Add sirm-job container

File last commit:

r438:4bd0c7e20a9b
r438:4bd0c7e20a9b
Show More
app.py
388 lines | 12.2 KiB | text/x-python | PythonLexer
Add Proc & monitor app
r378 """
A small Test application to show how to use Flask-MQTT.
"""
import os
import time
import logging
from datetime import datetime
import random
import eventlet
import json
Fix power monitor
r394 from json import JSONEncoder
Add Proc & monitor app
r378 import numpy
import base64
import h5py
Add power monitor
r393 try:
import requests
except:
pass
from bs4 import BeautifulSoup
Fix saving data position
r392 from flask import Flask, render_template, jsonify, request, redirect
Add Proc & monitor app
r378 from flask_mqtt import Mqtt
from flask_socketio import SocketIO
from flask_bootstrap import Bootstrap
eventlet.monkey_patch()
app = Flask(__name__)
app.config['SECRET'] = 'my secret key'
app.config['TEMPLATES_AUTO_RELOAD'] = True
Add BROKER_URL env var
r380 app.config['MQTT_BROKER_URL'] = os.environ['BROKER_URL']
Add Proc & monitor app
r378 app.config['MQTT_BROKER_PORT'] = 1883
Saving pos files every second
r405 app.config['MQTT_CLIENT_ID'] = 'flask_mqtt_{}'.format(int(random.random()*100))
Add Proc & monitor app
r378 app.config['MQTT_CLEAN_SESSION'] = True
app.config['MQTT_USERNAME'] = ''
app.config['MQTT_PASSWORD'] = ''
app.config['MQTT_KEEPALIVE'] = 5
app.config['MQTT_TLS_ENABLED'] = False
Add new container for monitor RX, add mqtt log view
r411 app.config['MQTT_LAST_WILL_TOPIC'] = 'sophy/error'
Saving pos files every second
r405 app.config['MQTT_LAST_WILL_MESSAGE'] = 'Bye from flask'
app.config['MQTT_LAST_WILL_QOS'] = 1
Add Proc & monitor app
r378
Saving pos files every second
r405 mqtt = Mqtt(app)
Fix power monitor
r394
Add Proc & monitor app
r378 socketio = SocketIO(app)
bootstrap = Bootstrap(app)
MODE_STOP = 0x00
MODE_SPEED = 0x01
MODE_POSITION = 0x02
RX_AZIMUTH = 0x27
RX_ELEVATION = 0x47
TX_AZIMUTH = 0x25
TX_ELEVATION = 0x45
NONE_AXIS = 0x65
RX_FUNCTION = 0x30
TX_FUNCTION = 0x40
HEADER = 0x7e
MIN_SPEED = -180.0
MAX_SPEED = 180.0
SHRT_MIN = -32768
SHRT_MAX = 32768
USHRT_MAX = 65535
DATA_PATH = '/data'
RUNNING = False
Review monitor app
r383 EXPERIMENT = {'name': 'test'}
Fix power monitor
r394 DEBUG = False
class NumpyArrayEncoder(JSONEncoder):
def default(self, obj):
if isinstance(obj, numpy.ndarray):
return obj.tolist()
return JSONEncoder.default(self, obj)
Add Proc & monitor app
r378
class HDF5File():
def __init__(self):
Save position files by minute
r422 self.first = True
self.data_pos = {'az':[], 'el':[], 'saz':[], 'sel':[], 'timestamp':[]}
self.pos_ready = True
Juan C. Espinoza
Update saving pos files
r399 self.pos_time = 0
Add power monitor
r393 self.raw = {}
Add Proc & monitor app
r378 self.timestamp = 0
self.metadata = None
Fix power monitor
r394 def reset(self):
Save position files by minute
r422 self.first = True
self.data_pos = {'az':[], 'el':[], 'saz':[], 'sel':[], 'timestamp':[]}
Juan C. Espinoza
Update saving pos files
r399 self.pos_time = 0
Fix power monitor
r394 self.raw = {}
self.timestamp = 0
Add Proc & monitor app
r378
def update(self, data):
Juan C. Espinoza
Update saving pos files
r399 for var in ('az', 'el', 'saz', 'sel'):
Save position files by minute
r422 tmp = numpy.array(data[var][:99])[::4].tolist()
while len(tmp)<25:
tmp.append(numpy.nan)
self.data_pos[var].extend(tmp)
Add Proc & monitor app
r378
Correct 1 sec delay in position data
r435 self.data_pos['timestamp'].extend(numpy.linspace(int(data['timestamp'])-1, int(data['timestamp'])-0.04, 25).tolist())
self.timestamp = int(data['timestamp'])
Add Proc & monitor app
r378
def write(self, data):
Save position files by minute
r422 if self.first:
self.pos_time = int(data['timestamp'])
1)Update schain contenainer - 2)Add acq template - 3)Add AZ OFFSET pedestal parameter - 4)Add scheduler (sudo crontab) - 5)Add sirm-job container
r438 self.timestamp = int(data['timestamp'])
Save position files by minute
r422 self.first = False
Add power monitor
r393 self.raw = data
1)Update schain contenainer - 2)Add acq template - 3)Add AZ OFFSET pedestal parameter - 4)Add scheduler (sudo crontab) - 5)Add sirm-job container
r438 if True:
date_folder = datetime.fromtimestamp(self.timestamp).strftime('%Y-%m-%dT%H-00-00')
filelog = os.path.join(DATA_PATH, EXPERIMENT['name'], 'position', date_folder, "pos@%10.3f.txt" % (self.timestamp))
if not os.path.exists(os.path.dirname(filelog)):
path = os.path.dirname(filelog)
os.makedirs(path)
f = open(filelog, 'w')
f.write(json.dumps(self.raw, cls=NumpyArrayEncoder))
f.close()
if int(data['timestamp'])<= self.timestamp:
print('Bad time')
return
Add Proc & monitor app
r378 self.update(data)
Juan C. Espinoza
Update saving pos files
r399
Save position files by minute
r422 if self.pos_ready:
filex = "pos@%10.3f.h5" % (self.pos_time)
date_folder = datetime.fromtimestamp(self.pos_time).strftime('%Y-%m-%dT%H-00-00')
filename = os.path.join(DATA_PATH, EXPERIMENT['name'], 'position', date_folder, filex)
if not os.path.exists(os.path.dirname(filename)):
path = os.path.dirname(filename)
os.makedirs(path)
self.fpos = h5py.File(filename, 'w')
self.pos_ready = False
if datetime.fromtimestamp(self.timestamp).second == 0:
self.pos_time = self.timestamp
grp = self.fpos.create_group("Data")
dset = grp.create_dataset("azi_pos", data=numpy.array(self.data_pos['az']))
dset = grp.create_dataset("ele_pos", data=numpy.array(self.data_pos['el']))
dset = grp.create_dataset("azi_speed", data=numpy.array(self.data_pos['saz']))
dset = grp.create_dataset("ele_speed", data=numpy.array(self.data_pos['sel']))
dset = grp.create_dataset("utc", data=numpy.array(self.data_pos['timestamp']))
self.fpos.close()
self.data_pos['az'] = []
self.data_pos['el'] = []
self.data_pos['saz'] = []
self.data_pos['sel'] = []
self.data_pos['timestamp'] = []
self.pos_ready = True
Juan C. Espinoza
Update saving pos files
r399
1)Update schain contenainer - 2)Add acq template - 3)Add AZ OFFSET pedestal parameter - 4)Add scheduler (sudo crontab) - 5)Add sirm-job container
r438
Add Proc & monitor app
r378
Fix power monitor
r394
Add Proc & monitor app
r378 HDF = HDF5File()
def getSpeedPosition(msg_b64):
AzPosition=[]
AzSpeed=[]
ElPosition=[]
ElSpeed=[]
raw = numpy.frombuffer(base64.decodebytes(msg_b64.encode()),numpy.dtype('B'))
raw_size = len(raw)
Review monitor app
r383 Timestamp = (raw[raw_size-4])|(raw[raw_size-3]<<8)|(raw[raw_size-2]<<16)|(raw[raw_size-1]<<24)
# Timestamp = time.time()
Add Proc & monitor app
r378 counter = 0
while counter < raw_size-4:
if raw[counter]==HEADER:
if raw[counter+1]==RX_FUNCTION:
if raw[counter+2]==RX_AZIMUTH or raw[counter+2]==RX_ELEVATION:
iangle = 0.0
ispeed = 0.0
hadstuffing = 0
if (counter+hadstuffing+4<raw_size-4):
if raw[counter+hadstuffing+4]==HEADER-1:
hadstuffing+=1
iangle = int(0x70|raw[counter+hadstuffing+4]&0x0F)
else:
iangle = int(raw[counter+hadstuffing+4])
else:
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.")
break
if (counter+hadstuffing+5<raw_size-4):
if raw[counter+hadstuffing+5]==HEADER-1:
hadstuffing+=1
iangle = iangle + int((0x70|raw[counter+hadstuffing+5]&0x0F)<<8)
else:
iangle = iangle + int(raw[counter+hadstuffing+5]<<8)
else:
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.")
break
if (counter+hadstuffing+6<raw_size-4):
if raw[counter+hadstuffing+6]==HEADER-1:
hadstuffing+=1
ispeed = int(0x70|raw[counter+hadstuffing+6]&0x0F)
else:
ispeed = int(raw[counter+hadstuffing+6])
else:
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.")
break
if (counter+hadstuffing+7<raw_size-4):
if raw[counter+hadstuffing+7]==HEADER-1:
hadstuffing+=1
ispeed = ispeed + int((0x70|raw[counter+hadstuffing+7]&0x0F)<<8)
else:
ispeed = ispeed + int(raw[counter+hadstuffing+7]<<8)
else:
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.")
break
if (counter+2<raw_size-4):
if raw[counter+2]==RX_AZIMUTH:
AzPosition.append(iangle*360.0/USHRT_MAX)
AzSpeed.append(((ispeed-SHRT_MIN)*(MAX_SPEED-MIN_SPEED)/(SHRT_MAX-SHRT_MIN))+MIN_SPEED)
elif raw[counter+2]==RX_ELEVATION:
ElPosition.append(iangle*360.0/USHRT_MAX)
ElSpeed.append(((ispeed-SHRT_MIN)*(MAX_SPEED-MIN_SPEED)/(SHRT_MAX-SHRT_MIN))+MIN_SPEED)
else:
counter+=1
continue
else:
logging.debug("Warning: Index out of bounds. The packet is incomplete or corrupted.")
break
counter = counter+hadstuffing+13
else:
counter+=1
continue
else:
counter+=1
continue
else:
counter+=1
continue
Save position files by minute
r422 az = numpy.array(AzPosition)
Juan C. Espinoza
Round position data
r397 az[numpy.where(az>=359.8)] = 0
Save position files by minute
r422 az = numpy.round(az, 2)
saz = numpy.array(AzSpeed)
Add Proc & monitor app
r378 saz[numpy.where(saz>=28)] = saz[(saz>=28)]-360
Save position files by minute
r422 saz = numpy.round(saz, 2)
Add Proc & monitor app
r378
Save position files by minute
r422 el = numpy.array(ElPosition)
Add Proc & monitor app
r378 el[numpy.where((190 <= el) & (el <= 360))] = el[(190 <= el) & (el<= 360)] - 360
Save position files by minute
r422 el = numpy.round(el, 2)
Add Proc & monitor app
r378
Save position files by minute
r422 sel = numpy.array(ElSpeed)
Add Proc & monitor app
r378 sel[numpy.where(sel>=28)] = sel[(sel>=28)]-360
Save position files by minute
r422 sel = numpy.round(sel, 2)
Add Proc & monitor app
r378 return az, saz, el, sel, int(Timestamp)
Add power monitor
r393
Add Proc & monitor app
r378 @app.route('/')
def index():
Add new container for monitor RX, add mqtt log view
r411 global RUNNING, EXPERIMENT
running = 'Running' if RUNNING else 'Not Running'
Proc and monitor working ok
r418 name = '-' if EXPERIMENT['name']=='test' else EXPERIMENT['name']
Add new container for monitor RX, add mqtt log view
r411 return render_template('index.html', status=running, experiment_name=name)
Add Proc & monitor app
r378
Update experiment start / stop
r386 @app.route('/start', methods=['POST'])
Add Proc & monitor app
r378 def start_proc():
Review monitor app
r383 global EXPERIMENT, RUNNING
Update experiment start / stop
r386 EXPERIMENT.update(request.get_json())
Add Proc & monitor app
r378 RUNNING = True
path = os.path.join(DATA_PATH, EXPERIMENT['name'])
Review monitor app
r383 if not os.path.exists(path):
os.makedirs(path)
Add Proc & monitor app
r378 fo = open(os.path.join(path, 'experiment.conf'), 'w')
Review monitor app
r383 fo.write(json.dumps(EXPERIMENT, indent=2))
Add Proc & monitor app
r378 fo.close()
Add new container for monitor RX, add mqtt log view
r411 mqtt.publish('sophy/control', json.dumps(EXPERIMENT))
Proc and monitor working ok
r418 socketio.emit('mqtt_message', data={'topic':'monitor', 'status': 'Running', 'name': EXPERIMENT['name']})
1)Update schain contenainer - 2)Add acq template - 3)Add AZ OFFSET pedestal parameter - 4)Add scheduler (sudo crontab) - 5)Add sirm-job container
r438 r = requests.post('http://sirm-schain/start', data=EXPERIMENT)
print('Starting schain: {}'.format(r))
Saving pos files every second
r405
Add Proc & monitor app
r378 return jsonify({'start': 'ok'})
@app.route('/stop')
def stop_proc():
Proc and monitor working ok
r418 global RUNNING, DEBUG, HDF, EXPERIMENT
Fix power monitor
r394
Add Proc & monitor app
r378 RUNNING = False
Fix power monitor
r394 DEBUG = False
HDF.reset()
Add TX's alarms
r415 socketio.emit('mqtt_message', data={'topic':'monitor', 'status': 'Not Running'})
Proc and monitor working ok
r418 mqtt.publish('sophy/control', json.dumps({'stop': 'ok'}))
1)Update schain contenainer - 2)Add acq template - 3)Add AZ OFFSET pedestal parameter - 4)Add scheduler (sudo crontab) - 5)Add sirm-job container
r438 r = requests.get('http://sirm-schain/stop')
print('Stopping schain: {}'.format(r))
Add Proc & monitor app
r378
return jsonify({'stop': 'ok'})
Review monitor app
r383 @app.route('/status')
def status_proc():
global RUNNING
return jsonify({'status': RUNNING})
Fix saving data position
r392 @app.route('/run')
def run_proc():
Fix power monitor
r394 global RUNNING, EXPERIMENT, DEBUG
if request.args.get('debug', False):
DEBUG = True
Proc and monitor working ok
r418 name = request.args.get('name', 'TEST')
path = os.path.join(DATA_PATH, name)
Fix saving data position
r392 if not os.path.exists(path):
os.makedirs(path)
Proc and monitor working ok
r418 EXPERIMENT['name'] = name
Fix saving data position
r392 RUNNING = True
Add new container for monitor RX, add mqtt log view
r411 mqtt.publish('sophy/control', json.dumps(EXPERIMENT))
Add TX's alarms
r415 socketio.emit('mqtt_message', data={'topic':'monitor', 'status': 'Running'})
Add new container for monitor RX, add mqtt log view
r411
Fix saving data position
r392 return redirect('/')
Add Proc & monitor app
r378
Add new container for monitor RX, add mqtt log view
r411 @app.route('/mqtt')
def mqtt_log():
return render_template('mqtt.html')
Update 'gps.py' and add acq container
r431
@app.route('/acq')
def acq_log():
return render_template('acq.html')
Add new container for monitor RX, add mqtt log view
r411
Add Proc & monitor app
r378 @socketio.on('publish')
def handle_publish(json_str):
data = json.loads(json_str)
mqtt.publish(data['topic'], data['message'], data['qos'])
@socketio.on('subscribe')
def handle_subscribe(json_str):
data = json.loads(json_str)
mqtt.subscribe(data['topic'], data['qos'])
@socketio.on('unsubscribe_all')
def handle_unsubscribe_all():
mqtt.unsubscribe_all()
@mqtt.on_connect()
def handle_connect(client, userdata, flags, rc):
Add new container for monitor RX, add mqtt log view
r411 mqtt.subscribe(os.environ.get('SOPHY_TOPIC', 'sophy/#'))
Add Proc & monitor app
r378
@mqtt.on_message()
def handle_mqtt_message(client, userdata, message):
Fix power monitor
r394 global RUNNING, HDF
Review monitor app
r383
Add Proc & monitor app
r378 payload = message.payload.decode()
Add new container for monitor RX, add mqtt log view
r411 data = {}
Saving pos files every second
r405
if 'pedestal' in message.topic:
az, saz, el, sel, tm = getSpeedPosition(payload)
Add new container for monitor RX, add mqtt log view
r411 times = numpy.linspace(tm, tm+0.9, 10)
Save position files by minute
r422 data['az'] = az[:100][::10].tolist()
data['el'] = el[:100][::10].tolist()
data['saz'] = saz[:100][::10].tolist()
data['sel'] = sel[:100][::10].tolist()
Add new container for monitor RX, add mqtt log view
r411 data['timestamp'] = times.tolist()
Saving pos files every second
r405
if RUNNING:
Change TX monitor to monitor app
r426 HDF.write({'az':az.tolist(), 'el':el.tolist(), 'saz':saz.tolist(), 'sel':sel.tolist(), 'timestamp':tm})
Add Proc & monitor app
r378
Add new container for monitor RX, add mqtt log view
r411 data['payload'] = payload
data['topic'] = message.topic
Saving pos files every second
r405 data['qos'] = message.qos
Add Proc & monitor app
r378
Saving pos files every second
r405 socketio.emit('mqtt_message', data=data)
Fix power monitor
r394
Add Proc & monitor app
r378 @mqtt.on_log()
def handle_logging(client, userdata, level, buf):
Proc and monitor working ok
r418 # print(level, buf)
Add Proc & monitor app
r378 pass
if __name__ == '__main__':
1)Update schain contenainer - 2)Add acq template - 3)Add AZ OFFSET pedestal parameter - 4)Add scheduler (sudo crontab) - 5)Add sirm-job container
r438 socketio.run(app, host='0.0.0.0', port=5000, use_reloader=False, debug=True)