''' Clase principal de manejo del programa embebido en la raspberry ''' import os import traceback import gc import json import serial import shutil import psutil import subprocess import paho.mqtt.client as mqtt import paho.mqtt.publish as publish #from others import * import time from time import sleep from datetime import datetime # Inicializamos la rutina principal aquí mediante un comando #------------------------- ------------------------------------------------------------# #--------------------------------------------------------------------------------------# path_script = "/tools/services/check_memory.py" DEVNULL = open(os.devnull,"wb") service_1 = subprocess.Popen(["python", path_script], stdout=DEVNULL, stderr=DEVNULL) #--------------------------------------------------------------------------------------# class MyErrorForManage(Exception): def __init__(self, mensaje): super().__init__(mensaje) self.mensaje = mensaje class VarsJons(object): id = None location = None data = None debug = False type_weights = None store_data = False latitude = None longitude = None vars_mqtt = None vars_gpio = None vars = None weights = None def __init__(self): self.path_file = '/others/vars.json' self.load_auth_data() def send_to_dashboard(self): self.send_data_for_mqtt() def send_data_for_mqtt(self): ''' Se envia la configuración de datos al Dashboard, esto es para un mejor administración. ''' try: broker = self.vars_mqtt['mqtt_broker'] TOPIC = self.vars_mqtt['dashboard_topic'] auth = {'username':self.vars_mqtt['mqtt_user'], 'password':self.vars_mqtt['mqtt_pass'] } port = self.vars_mqtt['mqtt_port'] qos = 2 payload = { 'name':self.data['id_device'], 'location': self.data['location'], 'latitude': self.data['latitude'], 'longitude': self.data['longitude'], 'radar_HB100': self.data['gpio']['radar_HB100'], 'radar_HFS':self.data['gpio']['radar_HFS'], 'lidar':self.data['gpio']['lidar'], 'measure_voltage':self.data['gpio']['ina'], 'camera':self.data['gpio']['camera'], 'store_data':self.data['store_data'], 'inference_mode':self.data["inference"]["inference_mode"], 'level_reference':self.data['vars']["MIN_HEIGHT_WATER_FOR_LIDAR"], 'send_data_mqtt':self.data['vars']["TIME_SEND_MQTT"], 'send_photo_mqtt':self.data['vars']["SAMPLING_TIME_CAMERA"], 'time_video_sampling':self.data['inference']['time_video_sampling'], 'river_width':self.data['river_width'], 'mqtt_broker':self.data['mqtt']["mqtt_broker"], 'mqtt_port':self.data['mqtt']["mqtt_port"], 'mqtt_user':self.data['mqtt']['mqtt_user'], 'mqtt_pass':self.data['mqtt']['mqtt_pass'], 'server_inference_ML':self.data["inference"]["server_inference_ML"], 'port_inference_ML':self.data['inference']["port_inference_ML"], 'inference_mode':self.data['inference']['inference_mode'], 'config_topic':self.data['mqtt']['config_topic'], 'type_weights':self.data['type_weights'] } #Revisar el inference mode de como se recibe en el dashboard. publish.single(topic=TOPIC,auth=auth,port=port,qos=qos,payload=json.dumps(payload),hostname=broker) except: write_general(f"Ocurrió un error al enviar datos MQTT al dashboard. Error: {traceback.format_exc()} {auth}") return def load_auth_data(self): try: with open(self.path_file, 'r') as file: self.data = json.load(file) except FileNotFoundError: raise FileNotFoundError("Archivo auth.json no encontrado en el directorio.") else: self.vars = self.data["vars"] self.vars_mqtt = self.data['mqtt'] self.vars_gpio = self.data['gpio'] self.vars_inference = self.data['inference'] self.debug = self.data['debug'] self.latitude = self.data["latitude"] self.longitude = self.data["longitude"] self.weights = self.data['weights'] self.id = self.data['id_device'] self.vars_router = self.data.get("router",None) self.location = self.data["location"] self.router = self.data['router'] self.type_weights = self.data['type_weights'] self.store_data = self.data['store_data'] self.river_width = self.data['river_width'] self.camera = self.data['camera'] def save_json(self): try: self.data["vars"] = self.vars #self.data["location"] = self.location self.data['mqtt'] = self.vars_mqtt self.data["gpio"] = self.vars_gpio self.data['inference'] = self.vars_inference #self.data['debug'] = self.debug self.data["weights"] = self.weights self.data["id_device"] = self.id #self.data["type_weights"] = self.type_weights self.data['camera'] = self.camera self.data['router'] = self.router except: pass else: try: with open(self.path_file,'w') as file: json.dump(self.data,file,indent=7) except: pass obj_vars = VarsJons() #-------------------------- extract main variables -----------------------# ID_DEVIDE = str(obj_vars.id) CLIENT_MQTT = ID_DEVIDE LIST_KEYS_MQTT = list(obj_vars.vars_mqtt.keys()) LIST_KEYS_VARS = obj_vars.vars.keys() LIST_KEYS_GPIO = obj_vars.vars_gpio.keys() LIST_KEYS_GENERAL = list(obj_vars.data.keys()) LIST_KEYS_INFERENCE = obj_vars.vars_inference.keys() for x in ['router','camera','mqtt','vars','inference','gpio','weights']: LIST_KEYS_GENERAL.remove(x) CONFIG_TOPIC = obj_vars.vars_mqtt['config_topic'] COMMAND_TOPIC = obj_vars.vars_mqtt['command_topic'] mqtt_user = obj_vars.vars_mqtt['mqtt_user'] mqtt_pass = obj_vars.vars_mqtt['mqtt_pass'] mqtt_broker = obj_vars.vars_mqtt['mqtt_broker'] mqtt_port = obj_vars.vars_mqtt['mqtt_port'] CONFIG_TOPIC_PUBLIC = os.path.join(CONFIG_TOPIC,"global") CONFIG_TOPIC_USER = os.path.join(CONFIG_TOPIC,ID_DEVIDE) COMMAND_TOPIC_PUBLIC = os.path.join(COMMAND_TOPIC,"global") COMMAND_TOPIC_USER = os.path.join(COMMAND_TOPIC,ID_DEVIDE) #-------------------------------------------------------------------------# #-------------------------------------------------------------------------# def write_general(chain): now = datetime.now() formatted_date_time = now.strftime("%d/%m/%Y %H:%M:%S") filename = '/logs/log_manage.txt' if not os.path.isdir(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) chain = formatted_date_time + " |" + chain try: with open(filename,'a') as file: file.write(chain + '\n') except: pass return def send_check(topic): client = mqtt.Client(CLIENT_MQTT + "_check") client.on_connect = on_connect client.on_message = on_message client.on_disconnect = on_disconnect client.username_pw_set(mqtt_user,mqtt_pass) client.connect(mqtt_broker,mqtt_port,30) client.publish(topic,"received_200") client.disconnect() def on_message(client, userdata, msg): write_general(f"Mensaje recibido: {msg.payload.decode()} del topico: {msg.topic}") try: if msg.topic == COMMAND_TOPIC_PUBLIC or msg.topic == COMMAND_TOPIC_USER: payload = msg.payload.decode() process_command(payload) #send_check(msg.topic) if msg.topic == CONFIG_TOPIC_PUBLIC or msg.topic ==CONFIG_TOPIC_USER: payload = msg.payload.decode("utf-8") # Consideramos que es un json payload = json.loads(payload) process_config(payload) #send_check(msg.topic) except RuntimeError as e: raise RuntimeError("Reiniciando el sistema....") except MyErrorForManage as e: raise MyErrorForManage("Error generado.") except: pass #if msg.payload.decode() == "Hello world!" def boot_VPN(): try: script = subprocess.Popen(["docker", "restart", "vpn_client"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except: pass def boot_RPI(): ''' Por ahora usaremos github para actualizar los repositorios. ''' #------------------------- Damos valores de acceso a los archivos de actualización ------------------# try: update_rpi = "/others/update_rpi.sh" os.chmod(update_rpi,0o755) except Exception as e: message = f"Hubo un error al actualizar desde el repositorio. Error {e}" write_general(message) try: script = subprocess.Popen(["/others/update_rpi.sh"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return_code = None while 1: return_code = script.poll() if return_code is not None: break stdout, stderr = script.communicate() chain = f"Se llamó al update_rpi.sh para actualización RPI. Con salida stdout: {stdout} / stderr {stderr}. Codigo de finalización:{return_code}" write_general(chain) except: write_general(f"Error al llamar comando update. {traceback.format_exc()}") def boot_ESP32(): ''' Formatea la esp32 mediante un repositorio publico. Se planeaba flashear la ESP32 mediante pines UART, pero estos ya están siendo usados. Asi que se usará el metodo OTA por web. ''' try: try: ser = serial.Serial('/dev/ttyS0', 9600) except: return else: try: command = 'UPDATE_OTA' +'\n' ser.write(command.encode('utf-8')) ser.close() except: return except: return else: message = "Se solicitó actualización de la ESP32. Actualizando mediante OTA" write_general(message) def reboot_container(): ''' Reinicia el contenedor por defecto - lahares -> tarea de lahares - hamachi -> Servicio de vpn ''' nombre_contenedor = "lahares" process = list() message = "Se procede a reiniciar el programa." write_general(message) raise MyErrorForManage("Reiniciando el sistema") gc.collect() def poweroff_camera(): ''' ''' return def search_to_replace(key,value): if key in LIST_KEYS_GENERAL : obj_vars.data[key] = value elif key in LIST_KEYS_GPIO: obj_vars.vars_gpio[key] = value elif key in LIST_KEYS_MQTT: obj_vars.vars_mqtt[key] = value elif key in LIST_KEYS_VARS: obj_vars.vars[key] = value elif key in LIST_KEYS_INFERENCE: obj_vars.vars_inference[key] = value else: message = f"La clave {key} con valor {value} no hace una correspondencia. " write_general(message) def reset_LIDAR(): try: os.remove("/others/h0.txt") except: pass else: reboot_container() def check_changes(): def check_changes_requeriments(): return return def process_command(payload): write_general(f"Se recibió el comando: {payload}") if 'Reiniciar sistema' in payload: ''' Se reinicia el sistema o contenedor. ''' reboot_container() elif 'Apagar camara' in payload: ''' Configuramos el pin de apagado de camara. ''' poweroff_camera() elif "Actualizar firmware Raspberry Pi" in payload: boot_RPI() check_changes() reboot_container() elif "restart VPN" in payload: boot_VPN() elif "reset_LIDAR" in payload: reset_LIDAR() reboot_container() return def on_disconnect(client,userdata,rc): def write_status(chain): now = datetime.now() formatted_date_time = now.strftime("%d/%m/%Y %H:%M:%S") filename = '/logs/log.txt' if not os.path.isdir(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) chain = formatted_date_time + " " + chain try: with open(filename,'a') as file: file.write(chain + '\n') except: pass return write_status("[MANAGE] Se ha desconectado el MQTT, recuperando conexión.") if rc != 0: count_attempts = 0 while 1: try: client.reconnect() except: error = traceback.format_exc() write_status(f"[Error][Manage] Error al reconectar MQTT broker. Intento {count_attempts+1}. Copia del error: {error}") count_attempts +=1 time.sleep(5) if count_attempts == 4*60*60: write_status("[Error] Se alcanzó la máxima cantidad de intento de conexiones. No se puede recibir comandos. Reiniciando el sistema.") raise MyErrorForManage("[Error] Se alcanzó la máxima cantidad de intento de conexiones. No se puede recibir comandos. Reiniciando el sistema.") else: client.subscribe(COMMAND_TOPIC_USER) client.subscribe(COMMAND_TOPIC_PUBLIC) client.subscribe(CONFIG_TOPIC_PUBLIC) client.subscribe(CONFIG_TOPIC_USER) return def process_config(payload): try: dict_keys = payload.keys() except: #No contiene o no es un diccionario pass else: try: for key in dict_keys: value = payload[key] search_to_replace(key,value) #Tenemos value y key. Buscamos en donde está except: #Ocurrió un error al reemplazar los valores pass else: #Guardamos el json obj_vars.save_json() reboot_container() return def on_connect(client, userdata, flags, rc): print("Connected with result code " + str(rc)) print("UserData= " + str(userdata)) print("flags= " + str(flags)) print("") client.subscribe(COMMAND_TOPIC_USER) client.subscribe(COMMAND_TOPIC_PUBLIC) client.subscribe(CONFIG_TOPIC_PUBLIC) client.subscribe(CONFIG_TOPIC_USER) if __name__ =='__main__': path_script = "/tools/main.py" DEVNULL = open(os.devnull,"wb") main_proceso = subprocess.Popen(["python", path_script], stdout=DEVNULL, stderr=DEVNULL) obj_vars.send_to_dashboard() while 1: try: client = mqtt.Client(CLIENT_MQTT + "_MASTER",clean_session=True) client.on_connect = on_connect client.on_message = on_message client.on_disconnect = on_disconnect client.username_pw_set(mqtt_user,mqtt_pass) client.connect(mqtt_broker,mqtt_port,30) client.loop_forever() except MyErrorForManage as e: write_general(f"Interrumpido por comando: {e}") break except Exception as e: #proceso.terminate() print("Proceso MQTT finalizado!!!!") write_general(f"Proceso finalizado por {e}")