##// END OF EJS Templates
Update version_rpi.txt
Update version_rpi.txt

File last commit:

r297:c9b442208466
r317:7eab564739e2 master
Show More
manage.py
615 lines | 16.0 KiB | text/x-python | PythonLexer
'''
Clase principal de manejo del programa embebido en la raspberry
'''
import os
import traceback
import gc
import json
import serial
import shutil
import psutil
import subprocess
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
#from others import *
import time
from time import sleep
from datetime import datetime
# Inicializamos la rutina principal aquí mediante un comando
#------------------------- ------------------------------------------------------------#
#--------------------------------------------------------------------------------------#
path_script = "/tools/services/check_memory.py"
DEVNULL = open(os.devnull,"wb")
service_1 = subprocess.Popen(["python", path_script], stdout=DEVNULL, stderr=DEVNULL)
#--------------------------------------------------------------------------------------#
class MyErrorForManage(Exception):
def __init__(self, mensaje):
super().__init__(mensaje)
self.mensaje = mensaje
class VarsJons(object):
id = None
location = None
data = None
debug = False
type_weights = None
store_data = False
latitude = None
longitude = None
vars_mqtt = None
vars_gpio = None
vars = None
weights = None
def __init__(self):
self.path_file = '/others/vars.json'
self.load_auth_data()
def send_to_dashboard(self):
self.send_data_for_mqtt()
def send_data_for_mqtt(self):
'''
Se envia la configuración de datos al Dashboard, esto es para un mejor administración.
'''
try:
broker = self.vars_mqtt['mqtt_broker']
TOPIC = self.vars_mqtt['dashboard_topic']
auth = {'username':self.vars_mqtt['mqtt_user'],
'password':self.vars_mqtt['mqtt_pass']
}
port = self.vars_mqtt['mqtt_port']
qos = 2
payload = {
'name':self.data['id_device'],
'location': self.data['location'],
'latitude': self.data['latitude'],
'longitude': self.data['longitude'],
'radar_HB100': self.data['gpio']['radar_HB100'],
'radar_HFS':self.data['gpio']['radar_HFS'],
'lidar':self.data['gpio']['lidar'],
'measure_voltage':self.data['gpio']['ina'],
'camera':self.data['gpio']['camera'],
'store_data':self.data['store_data'],
'inference_mode':self.data["inference"]["inference_mode"],
'level_reference':self.data['vars']["MIN_HEIGHT_WATER_FOR_LIDAR"],
'send_data_mqtt':self.data['vars']["TIME_SEND_MQTT"],
'send_photo_mqtt':self.data['vars']["SAMPLING_TIME_CAMERA"],
'time_video_sampling':self.data['inference']['time_video_sampling'],
'river_width':self.data['river_width'],
'mqtt_broker':self.data['mqtt']["mqtt_broker"],
'mqtt_port':self.data['mqtt']["mqtt_port"],
'mqtt_user':self.data['mqtt']['mqtt_user'],
'mqtt_pass':self.data['mqtt']['mqtt_pass'],
'server_inference_ML':self.data["inference"]["server_inference_ML"],
'port_inference_ML':self.data['inference']["port_inference_ML"],
'inference_mode':self.data['inference']['inference_mode'],
'config_topic':self.data['mqtt']['config_topic'],
'type_weights':self.data['type_weights']
}
#Revisar el inference mode de como se recibe en el dashboard.
publish.single(topic=TOPIC,auth=auth,port=port,qos=qos,payload=json.dumps(payload),hostname=broker)
except:
write_general(f"Ocurrió un error al enviar datos MQTT al dashboard. Error: {traceback.format_exc()} {auth}")
return
def load_auth_data(self):
try:
with open(self.path_file, 'r') as file:
self.data = json.load(file)
except FileNotFoundError:
raise FileNotFoundError("Archivo auth.json no encontrado en el directorio.")
else:
self.vars = self.data["vars"]
self.vars_mqtt = self.data['mqtt']
self.vars_gpio = self.data['gpio']
self.vars_inference = self.data['inference']
self.debug = self.data['debug']
self.latitude = self.data["latitude"]
self.longitude = self.data["longitude"]
self.weights = self.data['weights']
self.id = self.data['id_device']
self.vars_router = self.data.get("router",None)
self.location = self.data["location"]
self.router = self.data['router']
self.type_weights = self.data['type_weights']
self.store_data = self.data['store_data']
self.river_width = self.data['river_width']
self.camera = self.data['camera']
def save_json(self):
try:
self.data["vars"] = self.vars
#self.data["location"] = self.location
self.data['mqtt'] = self.vars_mqtt
self.data["gpio"] = self.vars_gpio
self.data['inference'] = self.vars_inference
#self.data['debug'] = self.debug
self.data["weights"] = self.weights
self.data["id_device"] = self.id
#self.data["type_weights"] = self.type_weights
self.data['camera'] = self.camera
self.data['router'] = self.router
except:
pass
else:
try:
with open(self.path_file,'w') as file:
json.dump(self.data,file,indent=7)
except:
pass
obj_vars = VarsJons()
#-------------------------- extract main variables -----------------------#
ID_DEVIDE = str(obj_vars.id)
CLIENT_MQTT = ID_DEVIDE
LIST_KEYS_MQTT = list(obj_vars.vars_mqtt.keys())
LIST_KEYS_VARS = obj_vars.vars.keys()
LIST_KEYS_GPIO = obj_vars.vars_gpio.keys()
LIST_KEYS_GENERAL = list(obj_vars.data.keys())
LIST_KEYS_INFERENCE = obj_vars.vars_inference.keys()
for x in ['router','camera','mqtt','vars','inference','gpio','weights']:
LIST_KEYS_GENERAL.remove(x)
CONFIG_TOPIC = obj_vars.vars_mqtt['config_topic']
COMMAND_TOPIC = obj_vars.vars_mqtt['command_topic']
mqtt_user = obj_vars.vars_mqtt['mqtt_user']
mqtt_pass = obj_vars.vars_mqtt['mqtt_pass']
mqtt_broker = obj_vars.vars_mqtt['mqtt_broker']
mqtt_port = obj_vars.vars_mqtt['mqtt_port']
CONFIG_TOPIC_PUBLIC = os.path.join(CONFIG_TOPIC,"global")
CONFIG_TOPIC_USER = os.path.join(CONFIG_TOPIC,ID_DEVIDE)
COMMAND_TOPIC_PUBLIC = os.path.join(COMMAND_TOPIC,"global")
COMMAND_TOPIC_USER = os.path.join(COMMAND_TOPIC,ID_DEVIDE)
#-------------------------------------------------------------------------#
#-------------------------------------------------------------------------#
def write_general(chain):
now = datetime.now()
formatted_date_time = now.strftime("%d/%m/%Y %H:%M:%S")
filename = '/logs/log_manage.txt'
if not os.path.isdir(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
chain = formatted_date_time + " |" + chain
try:
with open(filename,'a') as file:
file.write(chain + '\n')
except:
pass
return
def send_check(topic):
client = mqtt.Client(CLIENT_MQTT + "_check")
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.username_pw_set(mqtt_user,mqtt_pass)
client.connect(mqtt_broker,mqtt_port,30)
client.publish(topic,"received_200")
client.disconnect()
def on_message(client, userdata, msg):
write_general(f"Mensaje recibido: {msg.payload.decode()} del topico: {msg.topic}")
try:
if msg.topic == COMMAND_TOPIC_PUBLIC or msg.topic == COMMAND_TOPIC_USER:
payload = msg.payload.decode()
process_command(payload)
#send_check(msg.topic)
if msg.topic == CONFIG_TOPIC_PUBLIC or msg.topic ==CONFIG_TOPIC_USER:
payload = msg.payload.decode("utf-8")
# Consideramos que es un json
payload = json.loads(payload)
process_config(payload)
#send_check(msg.topic)
except RuntimeError as e:
raise RuntimeError("Reiniciando el sistema....")
except MyErrorForManage as e:
raise MyErrorForManage("Error generado.")
except:
pass
#if msg.payload.decode() == "Hello world!"
def boot_VPN():
try:
script = subprocess.Popen(["docker", "restart", "vpn_client"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except:
pass
def boot_RPI():
'''
Por ahora usaremos github para actualizar los repositorios.
'''
#------------------------- Damos valores de acceso a los archivos de actualización ------------------#
try:
update_rpi = "/others/update_rpi.sh"
os.chmod(update_rpi,0o755)
except Exception as e:
message = f"Hubo un error al actualizar desde el repositorio. Error {e}"
write_general(message)
try:
script = subprocess.Popen(["/others/update_rpi.sh"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return_code = None
while 1:
return_code = script.poll()
if return_code is not None:
break
stdout, stderr = script.communicate()
chain = f"Se llamó al update_rpi.sh para actualización RPI. Con salida stdout: {stdout} / stderr {stderr}. Codigo de finalización:{return_code}"
write_general(chain)
except:
write_general(f"Error al llamar comando update. {traceback.format_exc()}")
def boot_ESP32():
'''
Formatea la esp32 mediante un repositorio publico.
Se planeaba flashear la ESP32 mediante pines UART, pero
estos ya están siendo usados. Asi que se usará el metodo OTA
por web.
'''
try:
try:
ser = serial.Serial('/dev/ttyS0', 9600)
except:
return
else:
try:
command = 'UPDATE_OTA' +'\n'
ser.write(command.encode('utf-8'))
ser.close()
except:
return
except:
return
else:
message = "Se solicitó actualización de la ESP32. Actualizando mediante OTA"
write_general(message)
def reboot_container():
'''
Reinicia el contenedor por defecto
- lahares -> tarea de lahares
- hamachi -> Servicio de vpn
'''
nombre_contenedor = "lahares"
process = list()
message = "Se procede a reiniciar el programa."
write_general(message)
raise MyErrorForManage("Reiniciando el sistema")
gc.collect()
def poweroff_camera():
'''
'''
return
def search_to_replace(key,value):
if key in LIST_KEYS_GENERAL :
obj_vars.data[key] = value
elif key in LIST_KEYS_GPIO:
obj_vars.vars_gpio[key] = value
elif key in LIST_KEYS_MQTT:
obj_vars.vars_mqtt[key] = value
elif key in LIST_KEYS_VARS:
obj_vars.vars[key] = value
elif key in LIST_KEYS_INFERENCE:
obj_vars.vars_inference[key] = value
else:
message = f"La clave {key} con valor {value} no hace una correspondencia. "
write_general(message)
def reset_LIDAR():
try:
os.remove("/others/h0.txt")
except:
pass
else:
reboot_container()
def check_changes():
def check_changes_requeriments():
return
return
def process_command(payload):
write_general(f"Se recibió el comando: {payload}")
if 'Reiniciar sistema' in payload:
'''
Se reinicia el sistema o contenedor.
'''
reboot_container()
elif 'Apagar camara' in payload:
'''
Configuramos el pin de apagado de camara.
'''
poweroff_camera()
elif "Actualizar firmware Raspberry Pi" in payload:
boot_RPI()
check_changes()
reboot_container()
elif "restart VPN" in payload:
boot_VPN()
elif "reset_LIDAR" in payload:
reset_LIDAR()
reboot_container()
return
def on_disconnect(client,userdata,rc):
def write_status(chain):
now = datetime.now()
formatted_date_time = now.strftime("%d/%m/%Y %H:%M:%S")
filename = '/logs/log.txt'
if not os.path.isdir(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
chain = formatted_date_time + " " + chain
try:
with open(filename,'a') as file:
file.write(chain + '\n')
except:
pass
return
write_status("[MANAGE] Se ha desconectado el MQTT, recuperando conexión.")
if rc != 0:
count_attempts = 0
while 1:
try:
client.reconnect()
except:
error = traceback.format_exc()
write_status(f"[Error][Manage] Error al reconectar MQTT broker. Intento {count_attempts+1}. Copia del error: {error}")
count_attempts +=1
time.sleep(5)
if count_attempts == 4*60*60:
write_status("[Error] Se alcanzó la máxima cantidad de intento de conexiones. No se puede recibir comandos. Reiniciando el sistema.")
raise MyErrorForManage("[Error] Se alcanzó la máxima cantidad de intento de conexiones. No se puede recibir comandos. Reiniciando el sistema.")
else:
client.subscribe(COMMAND_TOPIC_USER)
client.subscribe(COMMAND_TOPIC_PUBLIC)
client.subscribe(CONFIG_TOPIC_PUBLIC)
client.subscribe(CONFIG_TOPIC_USER)
return
def process_config(payload):
try:
dict_keys = payload.keys()
except:
#No contiene o no es un diccionario
pass
else:
try:
for key in dict_keys:
value = payload[key]
search_to_replace(key,value)
#Tenemos value y key. Buscamos en donde está
except:
#Ocurrió un error al reemplazar los valores
pass
else:
#Guardamos el json
obj_vars.save_json()
reboot_container()
return
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
print("UserData= " + str(userdata))
print("flags= " + str(flags))
print("")
client.subscribe(COMMAND_TOPIC_USER)
client.subscribe(COMMAND_TOPIC_PUBLIC)
client.subscribe(CONFIG_TOPIC_PUBLIC)
client.subscribe(CONFIG_TOPIC_USER)
if __name__ =='__main__':
path_script = "/tools/main.py"
DEVNULL = open(os.devnull,"wb")
main_proceso = subprocess.Popen(["python", path_script], stdout=DEVNULL, stderr=DEVNULL)
obj_vars.send_to_dashboard()
while 1:
try:
client = mqtt.Client(CLIENT_MQTT + "_MASTER",clean_session=True)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.username_pw_set(mqtt_user,mqtt_pass)
client.connect(mqtt_broker,mqtt_port,30)
client.loop_forever()
except MyErrorForManage as e:
write_general(f"Interrumpido por comando: {e}")
break
except Exception as e:
#proceso.terminate()
print("Proceso MQTT finalizado!!!!")
write_general(f"Proceso finalizado por {e}")