acq.py
76 lines
| 2.0 KiB
| text/x-python
|
PythonLexer
r438 | #!/usr/bin/env python | ||
import os, sys | |||
import signal | |||
import time | |||
import json | |||
import paho.mqtt.client as paho | |||
from watchdog.events import FileSystemEventHandler | |||
from watchdog.observers.polling import PollingObserver | |||
import platform | |||
MQTTHOST = os.getenv('BROKER_URL', '192.168.1.130') | |||
PROCSITE = os.getenv('PROC_SITE', 'sophy-proc') | |||
MQTTPORT = int(os.getenv('MQTTPORT', 1883)) | |||
MQTTUSERNAME = os.getenv('MQTTUSERNAME', None) | |||
MQTTPASSWORD = os.getenv('MQTTPASSWORD', None) | |||
MQTTWATCHDIR = '/data' | |||
MQTTQOS = int(os.getenv('MQTTQOS', 0)) | |||
MQTTRETAIN = int(os.getenv('MQTTRETAIN', 0)) | |||
MQTTFIXEDTOPIC = os.getenv('MQTTFIXEDTOPIC', 'sophy/acq') | |||
WATCHDEBUG = os.getenv('WATCHDEBUG', 1) | |||
# Publish with retain (True or False) | |||
if MQTTRETAIN == 1: | |||
MQTTRETAIN=True | |||
else: | |||
MQTTRETAIN=False | |||
OS = platform.system() | |||
clientid = 'sophy-acq-%s' % os.getpid() | |||
mqtt = paho.Client(clientid, clean_session=True) | |||
if MQTTUSERNAME is not None or MQTTPASSWORD is not None: | |||
mqtt.username_pw_set(MQTTUSERNAME, MQTTPASSWORD) | |||
def on_publish(mosq, userdata, mid): | |||
pass | |||
def on_disconnect(mosq, userdata, rc): | |||
print ("disconnected") | |||
time.sleep(1) | |||
def on_message(client, userdata, msg): | |||
payload = msg.payload.decode() | |||
print(f"Received `{payload}` from `{msg.topic}` topic", flush=True) | |||
def signal_handler(signal, frame): | |||
""" Bail out at the top level """ | |||
mqtt.loop_stop() | |||
mqtt.disconnect() | |||
sys.exit(0) | |||
def main(): | |||
print('Sophy acquisition monitor will publish messages to {}'.format(MQTTFIXEDTOPIC), flush=True) | |||
mqtt.on_disconnect = on_disconnect | |||
mqtt.on_publish = on_publish | |||
mqtt.connect(MQTTHOST, MQTTPORT) | |||
mqtt.subscribe('sophy/control') | |||
mqtt.on_message = on_message | |||
mqtt.loop_start() | |||
signal.signal(signal.SIGINT, signal_handler) | |||
test = json.dumps({'file': '', 'size': ''}) | |||
while 1: | |||
mqtt.publish(MQTTFIXEDTOPIC, test, qos=MQTTQOS, retain=MQTTRETAIN) | |||
time.sleep(1) | |||
if __name__ == '__main__': | |||
main() |