import os import io import gc import cv2 import json import time import gzip import numpy import busio import board import ntplib import psutil import base64 import requests import datetime import traceback import threading import urllib.request import multiprocessing import RPi.GPIO as GPIO import adafruit_lidarlite import paho.mqtt.client as mqtt import paho.mqtt.publish as publish from others import * from copy import deepcopy from multiprocessing import Manager, Pipe,Queue,Lock from flask import Flask, render_template, jsonify from datetime import datetime, timedelta from time import sleep class manage_lahares(object): timestamp_init_system = None max_retries = 5 capture_count = 0 flag_date_update = False flag_internet = False bytes_send = 0 bytes_recv = 0 bytes_recv_total = 0 bytes_send_total = 0 count_status = 0 count_write_data = 0 __version__ = 0 dataOut = dict() def __get_temperature(self,): ''' Metodo para leer la temperatura del sistema. ''' try: with open("/sys/class/thermal/thermal_zone0/temp", "r") as archivo: temperature = int(archivo.read()) temperature = temperature / 1000.0 # Convertir de miligrados a grados Celsius return temperature except FileNotFoundError: return -1 def create_path(self,path): if path is None: return if not os.path.isdir(path): os.makedirs(path) return def read_metadata(self): #Cargamos la versión de los dispositivos file_version_rpi = "/others/version_rpi.txt" file_bytes_internet = "/others/b4g.txt" if os.path.exists(file_version_rpi): with open(file_version_rpi,"r") as f: value = float(f.readline()) self.__version__ = value else: self.__version__ = 0 try: if os.path.exists(file_bytes_internet): ''' Con este archivo podemos realizar el control de consumo de internet. El archivo existe, entonces leemos los valores Estructura (2 lineas): ---------- - fecha de creacion - {bytes_send}#{bytes_recv} ''' with open(file_bytes_internet,"r") as f: lines = numpy.array(f.read().splitlines(),dtype=object) if lines.shape[0] == 0: self.__create_file_bytes() if lines.shape[0]>1: date = lines[0] #------------- Realizamos control de fecha ------------------# date = datetime.strptime(date, "%d/%m/%Y") now = datetime.now() if (datetime(date.year,date.month,1) == datetime(now.year,now.month,1)): #Estamos en el mismo mes, cargamos los datos tmp = lines[1].split("#") self.bytes_send_total = float(tmp[0]) self.bytes_recv_total = float(tmp[1]) self.bytes_recv = psutil.net_io_counters().bytes_recv self.bytes_send = psutil.net_io_counters().bytes_sent else: #Creamos nuevo archivo con datos formatted_date_time = now.strftime("%d/%m/%Y") with open(file_bytes_internet,"w") as f: f.write(formatted_date_time) self.bytes_recv = psutil.net_io_counters().bytes_recv self.bytes_send = psutil.net_io_counters().bytes_sent self.bytes_send_total = 0 self.bytes_recv_total = 0 else: self.__create_file_bytes() except: self.write_status(f"[ERROR] Error: {traceback.format_exc()}") def __create_file_bytes(self,): file_bytes_internet = "/others/b4g.txt" #preparamos los datos now = datetime.now() formatted_date_time = now.strftime("%d/%m/%Y") self.bytes_recv = psutil.net_io_counters().bytes_recv self.bytes_send = psutil.net_io_counters().bytes_sent with open(file_bytes_internet,"w") as f: formatted_date_time = now.strftime("%d/%m/%Y") f.write(formatted_date_time) self.bytes_send_total = 0 self.bytes_recv_total = 0 def __handle_size(self, path): try: #en mb limite = 2048 limite *=1024 size = os.path.getsize(path) if (size>0.95*limite): #---------------------- Guardamos la mitad final del archivo ---------------------------------------# get_position = size // 2 lectura = None with open(path,'rb') as file: file.seek(get_position) lectura = file.read() with open(path,'wb') as file: file.write(lectura) except: if self.debug: print("Ocurrió un error al analizar el tamaño del archivo {}".format(os.path.basename(path))) error = traceback.format_exc() self.write_status("Ocurrió un error al analizar el tamaño del archivo {} Copia del error: {}.".format(os.path.basename(path),error)) def write_status(self,chain): now = datetime.now() formatted_date_time = now.strftime("%d/%m/%Y %H:%M:%S") filename = '/logs/log.txt' chain = formatted_date_time + " |" + chain if not os.path.isdir(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) try: with open(filename,'a') as file: file.write(chain + '\n') except: if self.debug: print("Ocurrió un error al guardar datos logs.") else: if(self.count_status == 150): self.count_status = 0 #--------------------- Revisamos el tamaño de los archivos-------------------# # Debe de ser menor a 1 GB. self.__handle_size(filename) else: self.count_status +=1 return def check_internet(self,verbose=True): count = 0 while 1: try: urllib.request.urlopen('http://www.google.com', timeout=1) except: count +=1 if (count ==3): self.flag_internet = False break sleep(0.5) else: if verbose: self.write_status("Se cuenta con conexión a internet.") if self.debug: print("Se cuenta con conexión a internet") self.flag_internet = True return return def update_time(self,): ntp_server = 'pool.ntp.org' count = 0 while 1: try: client = ntplib.NTPClient() response = client.request(ntp_server) ntp_time = datetime.utcfromtimestamp(response.tx_time) - timedelta(hours=+5) os.system('sudo date {} --utc'.format(ntp_time.strftime('%Y%m%d%H%M.%S'))) if self.debug: print(f'Hora actualizada: {ntp_time} UTC') self.write_status(f"La hora ha sido actualizado a {ntp_time}") except Exception as e: if self.debug: print(f'Error al actualizar la hora: {e}') print(f"Volviendo a intentar en unos instantes...") count +=1 if(count == 3): if self.debug: print("Se alcanzó el maximo de intentos. Se actualizará después") self.write_status("No se pudo actualizar la hora.") break sleep(3) else: self.flag_date_update = True break def make_setup(self,): #----------------------- extract lists --------------------------# try: list_HFS = self.vars_gpio.get("gpio_HFS",None) list_HB = self.vars_gpio.get("gpio_HB100",None) list_RCWL = self.vars_gpio.get("gpio_RCWL",None) list_ina = self.vars_gpio.get("address_ina",None) if list_HFS is not None: for n,key in enumerate(list_HFS.keys()): if(n>=self.vars.get("MAX_NUMBER_SENSORS")): if self.debug: print("Se ha proporcionado una cantidad mayor de sensores para el tipo HFS. Máximo definido son 4 sensores.") self.write_status("[WARNING] Se asignaron más de 4 sensores para el tipo HFS.") else: name = 'sensor_HFS{:02d}'.format(n+1) pin = int(list_HFS[key]) try: self.dataOut[name] = sensor_HFS(name,key,pin) self.dataOut[name].THRESHOLD_BETWEEN_ON_SENSOR = self.vars.get("MIN_ON_SENSOR",30) self.dataOut[name].THRESHOLD_BETWEEN_OFF_SENSOR = self.vars.get("MIN_OFF_SENSOR",5) except: if self.debug: print(f"[ERROR] Al declarando sensor HFS. Error: {traceback.format_exc()}") self.write_status(f"[ERROR] Al declarando sensor HFS. Error: {traceback.format_exc()}") if list_HB is not None: for n,key in enumerate(list_HB.keys()): if(n>=self.vars.get("MAX_NUMBER_SENSORS")): if self.debug: print("Se ha proporcionado una cantidad mayor de sensores para el tipo HB100. Máximo definido son 4 sensores.") self.write_status("[WARNING] Se asignaron más de 4 sensores para el tipo HB100.") else: name = 'sensor_HB{:02d}'.format(n+1) self.dataOut[name] = sensor(name,key) if list_RCWL is not None: for n,key in enumerate(list_RCWL.keys()): if(n>=self.vars.get("MAX_NUMBER_SENSORS")): if self.debug: print("Se ha proporcionado una cantidad mayor de sensores para el tipo RCWL. Máximo definido son 4 sensores.") self.write_status("[WARNING] Se asignaron más de 4 sensores para el tipo RCWL.") else: name = 'sensor_RCWL{:02d}'.format(n+1) self.dataOut[name] = sensor(name,key) if list_ina is not None and self.flag_ina: for n, key in enumerate(list_ina.keys()): name = 'ina_{:02d}'.format(n+1) address = int(list_ina[key]) try: self.dataOut[name] = ina(name,key,address) except: if self.debug: print(f"[ERROR] Error iniciando sensor ina: {traceback.format_exc()}") self.write_status(f"[ERROR] Error iniciando sensor ina: {traceback.format_exc()}") #--------------------- others sensors and modules -----------------# if self.flag_lidar: try: self.dataOut['lidar'] = lidar(name='lidar',key='lidar') self.dataOut['lidar'].TIME_LIDAR_ON = self.vars.get("TIME_LIDAR_ON",30) self.dataOut['lidar'].TIME_LIDAR_OFF = self.vars.get("TIME_LIDAR_OFF",60) self.dataOut['lidar'].min_H = self.vars.get("MIN_HEIGHT_WATER_FOR_LIDAR",5) self.dataOut['lidar'].minus_H = self.vars.get("MIN_MINUS_HEIGHT_FOR_LIDAR",4)*-1 self.dataOut['lidar'].rare_height = self.vars.get("RARE_HEIGHT",70) except: if self.debug: print(f"[ERROR] Error iniciando sensor: {traceback.format_exc()}") self.write_status(f"[ERROR] Error iniciando sensor: {traceback.format_exc()}") except: if self.debug: print("Ocurrió un error en la preparación de la trama de configuración.") error = traceback.format_exc() self.write_status(f"Ocurrió un error en la preparación de la trama de configuración. Copia del error: {error}") else: return def __config(self,): #------------------------ revisamos algunas configuraciones ----------------# if self.flag_camera: pin = self.camera_keys.get("pin_camera",None) if pin == None: self.write_status("[ERROR] Pin del control de camara no ha sido definido. Se controlará el tomado de fotografías por Software.") else: # Configuramos el pin como salida GPIO.setup(pin,GPIO.OUT) self.camera = camera(flag=self.flag_camera,pin=pin,obj=self.obj_vars) else: self.camera = camera(flag=False,pin=None,obj=self.obj_vars) #-------------------------- Cargamos el modo ---------------------------------# self.string_model = load_version() if (self.inference_mode == 'video' and 'Raspberry Pi Zero' in model) or self.inference_mode == 'photo': if self.debug: print("Se ha cambiado el modo de inferencia automaticamente. Debido a que los requisitos del sistema no son soportados. Actual inferencia de ML a Server.") self.write_status("Se ha cambiado el modo de inferencia automaticamente. Debido a que los requisitos del sistema no son soportados. Actual inferencia de ML a Server.") self.inference_mode = 'server' def __load_vars(self): self.path_save = "/data" self.path_save_img = os.path.join(self.path_save,'img') self.path_save_json = os.path.join(self.path_save,'json') self.semaphore = threading.Semaphore(1) self.semaphore_estimator = threading.Semaphore(1) self.semaphore_get_video = threading.Semaphore(1) self.manager = Manager() self.queue = self.manager.Queue() self.queue_estimator = self.manager.Queue() self.obj_vars = VarsJons() self.estimator = estimator(self.obj_vars,) self.debug = self.obj_vars.debug self.vars = self.obj_vars.vars self.vars_mqtt = self.obj_vars.vars_mqtt self.store_data = self.obj_vars.store_data self.vars_gpio = self.obj_vars.vars_gpio self.inference_mode = self.obj_vars.inference_mode self.camera_keys = self.obj_vars.camera self.flag_camera = self.vars_gpio.get("camera", True) self.flag_lidar = self.vars_gpio.get("lidar",True) self.flag_ina = self.vars_gpio.get("ina",True) self.id = str(self.obj_vars.id) #----------------- share variables --------------------# self.share_estimator = multiprocessing.Value('d', 0) def __init__(self): #---------- Esperamos el inicio del sistema ---------# sleep(4) #Guardamos el timestamp de inicio del sistema self.timestamp_init_system = datetime.now().timestamp() #------------------ get parameters ------------------# self.__load_vars() #------------------- Prints -------------------------# if self.debug: print(10*"-") print("Iniciando sistema") print(r"Ruta de guardado {}".format(self.store_data)) print(r"Ruta de guardado de imagenes {}".format(self.path_save_img)) print(r"Ruta de guardado de archivos .json: {}".format(self.path_save_json)) print(10*"-") #----------------- Creamos paths ----------------------# self.create_path(self.path_save_img) self.create_path(self.path_save_json) self.create_path(self.path_save) #------------------ Read metadatos --------------------# self.read_metadata() #-------------------- Escribiendo logs -----------------# self.write_status(30*"-") self.write_status("Sistema iniciado") self.write_status(r"Ruta de guardado {}".format(self.store_data)) self.write_status(r"Ruta de guardado de imagenes {}".format(self.path_save_img)) self.write_status(r"Ruta de guardado de archivos .json: {}".format(self.path_save_json)) self.write_status(30*"-") #-------------------- Leyendo setup --------------------# self.make_setup() #--------------------- Others config -------------------# self.__config() #------------------- Run process -----------------------# self.run_process() #------------------ Fin script -------------------------# def run_process(self,): if self.debug: print("Realizando la derivación de procesos") self.write_status("Realizando la derivación de procesos.") #--------------------- Creamos la variable compartida ----------------------# #Esta variable compartida nos va a permitir compartir datos para mostrarlo en Flask json_dict = self.manager.dict() lock = Lock() #-------------------------- init process --------------------# try: self.process_sensores = multiprocessing.Process(target = self.method_sensores, args=(self.share_estimator,json_dict,lock,)) self.process_mqtt = multiprocessing.Process(target = self.method_mqtt,args=(json_dict,lock,)) self.process_estimator= multiprocessing.Process(target= self.method_estimator,args=(self.share_estimator,lock,)) self.process_flask = multiprocessing.Process(target=self.method_flask,args=(json_dict,lock,)) self.process_sensores.start() self.process_mqtt.start() self.process_estimator.start() self.process_flask.start() if self.inference_mode == 'video' or self.inference_mode == 'server': self.process_video = multiprocessing.Process(target=self.method_video,args=()) self.process_video.start() self.process_video.join() self.process_mqtt.join() self.process_sensores.join() self.process_estimator.join() self.process_flask.join() except: error = traceback.format_exc() if self.debug: print("Error en la inialización de procesos.") error = traceback.format_exc() self.write_status(f"Error en la inialización de procesos. Copia del error {error}") def method_flask(self,json_dict,lock): sleep(2) if self.debug: print("Metodo Flask Lanzado") app = Flask(__name__) self.write_status(f"[Flask] Metodo lanzado") @app.route('/') def home(): return render_template("main.html") def get_image(): latest_name = sorted(os.listdir("/data/img"))[-1] return os.path.join("data/img",latest_name) @app.get("/update") def update(): try: data = dict() with locked(lock): data = dict(json_dict) x = jsonify(data) except: if self.debug: print(f"[FlaskUPDTErr] Error: {traceback.format_exc()}") self.write_status(f"[FlaskUPDTErr] Error: {traceback.format_exc()}") else: return x try: app.run(debug=False,port=5000,host='0.0.0.0') except: if self.debug: print(f"[ErrFlask] Error en flask \n{traceback.format_exc()}") self.write_status(f"[ErrFlask] Error en flask \n{traceback.format_exc()}") else: if self.debug: print(f"[ErrFlask] Error en flask \n{traceback.format_exc()}") self.write_status(f"[Flask] Servidor lanzado") def method_estimator(self, share,lock): #----------------------- Carga del modelo IA -------------------------------------# timestamp_latest = datetime.now().timestamp() self.estimator.load_weights() #Cargamos los pesos if self.debug: print("Proceso estimator ha sido lanzado.") self.write_status("[Estimator] Proceso estimator ha sido lanzado.") sleep(5) while 1: flag = False try: self.semaphore_estimator.acquire() while not self.queue_estimator.empty(): flag = True #Se realizó la inferencia timestamp_latest = datetime.now().timestamp() if self.inference_mode == 'photo': self.estimator.image = self.queue_estimator.get() elif self.inference_mode == 'video': self.estimator.video = self.queue_estimator.get() elif self.inference_mode == 'server': self.estimator.video = self.queue_estimator.get() self.estimator.run() self.semaphore_estimator.release() timestamp = datetime.now().timestamp() if (timestamp - timestamp_latest > 30*60): ''' Controlamos que la ultima inferencia no se haya realizado hace más de 30 minutos. Esto es util para prevenir cualquier tipo de incidente, ya sea cámara malograda entre otras formas. ''' self.estimator.inference_value = 10 except: exc = traceback.format_exc() self.write_status(f"[ERROR] Ocurrió un error al realizar inferencia. Copia del error: {exc}") sleep(10) else: ''' Solo compartimos el valor de la inferencia al otro estimator. Debido a que cada multiproceso maneja una copia diferente del objeto Estimator. Por más que realicemos un cálculo, no se verá reflejado en el otro objeto. ''' if flag: self.write_status("[Estimator] Se realizó la inferencia en el Queue Estimator. Variable ha sido compartida entre multiprocesos.") with locked(lock): if self.estimator.inference_value == None: inference = 10 else: inference = self.estimator.inference_value share.value = inference sleep(1) gc.collect() sleep(3) def send_uart(self,uart,string): try: uart.write(string.encode('utf-8')) except: if self.debug: print("Ocurrió un error al enviar datos por puerto UART.") self.write_status("[ERROR] Ocurrió un error al enviar datos por el puerto UART.") def __get_internet_bytes(self,): a, b = psutil.net_io_counters().bytes_recv,psutil.net_io_counters().bytes_sent recv = a - self.bytes_recv send = b - self.bytes_send self.bytes_recv_total += recv self.bytes_send_total += send self.bytes_recv = a self.bytes_send = b file_bytes_internet = "/others/b4g.txt" #--------------------- control -------------------------------# tmp_write = list() try: if os.path.exists(file_bytes_internet): with open(file_bytes_internet,"r") as file: line = numpy.array(file.read().splitlines(),dtype=object) if line.shape[0]>1: tmp_write.append(line[0]) tmp_write.append(f"{self.bytes_send_total}#{self.bytes_recv_total}") with open(file_bytes_internet,"w") as file: file.write(l+'\n' for l in tmp_write) else: self.__create_file_bytes() gc.collect() except: self.write_status(f"[ERROR] Error: {traceback.format_exc()}") def __process_values(self,share,flask_,lock): try: tmp_data = dict() #------------------------ Calculamos el uso de internet --------------# total_internet = round((self.bytes_recv_total + self.bytes_send_total)/(1024*1024),3) #---------------------------------------------------------------------# tmp_data["current_timestamp"] = datetime.now().timestamp() tmp_data['init_timestamp'] = self.timestamp_init_system tmp_data['version_rpi'] = self.__version__ tmp_data['location'] = self.obj_vars.location tmp_data['latitude'] = self.obj_vars.latitude tmp_data['longitude'] = self.obj_vars.longitude tmp_data["id"] = self.id tmp_data['camera_status'] = self.camera.status tmp_data["disk_percent_use"] = psutil.disk_usage('.').percent tmp_data['DEVICE_INFO'] = self.string_model tmp_data['CPU_usage'] = psutil.cpu_percent(interval=1) tmp_data['temperature'] = self.__get_temperature() tmp_data["ALL_USE_E_MB"] = round(total_internet,4) list_keys = self.dataOut.keys() for key in list_keys: if key == 'lidar': tmp_data['lidar_status'] = self.dataOut[key].activate tmp_data['lidar_dH'] = self.dataOut[key].dH_ tmp_data['h0'] = self.dataOut[key].H0 if self.dataOut[key].ERROR_WIRE == True: tmp_data[key] = "WIRE_ERROR" else: tmp_data[key] = "OK" elif 'ina' in key: key_ = self.dataOut[key].key tmp_data[f'bus_voltage_{key_}'] = round(self.dataOut[key].bus_voltage,4) tmp_data[f'shunt_voltage_{key_}']= round(self.dataOut[key].shunt_voltage,4) tmp_data[f'current_{key_}'] = round(self.dataOut[key].current,4) else: tmp_data[key] = self.dataOut[key].activate tmp_data[f'it_{key}'] = self.dataOut[key].current_sensor() #------------------------------- Inference method ------------------------------# self.estimator.dataOut = self.dataOut with locked(lock): self.estimator.share = float(share.value) tmp_data['camera_inference'] = self.estimator.share tmp_data['string_inference'] = self.estimator.string_status tmp_data['status'] = self.estimator.activate tmp_data['count_status'] = self.estimator.activate_count #-----------------------------------------------------# #--------------- Compartiendo data a Flask -----------# #-----------------------------------------------------# try: with locked(lock): flask_.clear() #Limpiamos todos los datos compartidos for key in tmp_data.keys(): flask_[key] = tmp_data[key] except: if self.debug: print(f"[ERROR] Error al escribir datos en la variable compartida: {traceback.format_exc()}") self.write_status(f"[ERROR] Error al escribir datos en la variable compartida: {traceback.format_exc()}") payload = {'type':'json', 'content':tmp_data} if self.store_data: self.write_data(payload) return tmp_data except: error = (traceback.format_exc()) if self.debug: print(f"Ocurrió un error al leer los datos.{error}") self.write_status(f"Ocurrió un error al leer datos. Copia del error {error}.") def write_data(self,data): now = datetime.now() formatted_date_time = now.strftime("%d/%m/%Y %H:%M:%S") + " |" try: name = 'data.txt' filename = os.path.join(self.path_save_json,name) with open(filename,'a') as file: file.write(formatted_date_time + str(json.dumps(data)) + '\n') except: if self.debug: print(f"Ocurrió un error al guardar los datos en el archivo {name}") self.write_status(f"[ERROR] Ocurrió un error al guardar los datos en el archivo {name}.") else: self.count_write_data += 1 try: if(self.count_write_data == 100): self.count_write_data = 0 self.__handle_size(filename) except: exc = traceback.format_exc() self.write_status(f"[Error] Error desconocido en {exc}") def method_mqtt(self,payload_json,lock): #----------------------------- Control de internet --------------------------# # Revisamos el internet cada 30 minutos timestamp_internet = datetime.now().timestamp() timestamp_mqtt = datetime.now().timestamp() values = list() self.check_internet() while 1: try: while 1: if self.flag_internet: CLIENT_MQTT = str(self.id +"_pub") mqtt_user = self.vars_mqtt.get("mqtt_user") mqtt_pass = self.vars_mqtt.get("mqtt_pass") mqtt_broker = self.vars_mqtt.get("mqtt_broker") mqtt_port = self.vars_mqtt.get("mqtt_port") self.client = mqtt.Client(CLIENT_MQTT) self.client_image = mqtt.Client(CLIENT_MQTT+'image') self.client.on_connect = on_connect self.client_image.on_connect = on_connect self.client.on_disconnect = on_disconnect self.client_image.on_disconnect = on_disconnect self.client.username_pw_set(mqtt_user, mqtt_pass) self.client_image.username_pw_set(mqtt_user,mqtt_pass) self.client.connect(mqtt_broker, mqtt_port,keepalive=300) self.client_image.connect(mqtt_broker,mqtt_port,keepalive=300) break else: #Esperamos un minuto if self.debug: print("No se cuenta con internet. Esperamos un minuto para lanzar el proceso MQTT.") self.write_status("[WARNING] No se cuenta con internet. Esperando un minuto para lanzar el proceso MQTT.") sleep(30) self.check_internet() if self.debug: print("El proceso MQTT ha sido lanzado") self.write_status("El proceso MQTT ha sido lanzado.") photo_topic = os.path.join(self.vars_mqtt.get("photo_topic"),self.id) data_topic = os.path.join(self.vars_mqtt.get("data_topic"),self.id) while 1: try: while not self.queue.empty(): self.semaphore.acquire() mensaje = self.queue.get() self.semaphore.release() if mensaje["type"] == 'image': content = mensaje['content'] try: if self.debug: print("Enviando imagen por MQTT ...") self.write_status("Enviando imagen por MQTT.") self.client_image.publish(photo_topic,content['images'],qos=1) if self.debug: print("Imagen ha sido enviada por MQTT") self.write_status("Imagen ha sido enviado por MQTT.") except: if self.debug: print("Hubo un error al enviar la imagen. Se devuelve a la cola de envío.") self.write_status("[ERROR] Error por envío de imagen MQTT.") gc.collect() if (datetime.now().timestamp() - timestamp_mqtt > self.vars.get("TIME_SEND_MQTT",3)): try: with locked(lock): data = dict(payload_json) tmp = dict() tmp['values'] = data payload = json.dumps(tmp) try: self.client.publish(data_topic,payload,qos=1) except: self.write_status(f"[ERROR] Error al enviar datos por MQTT: {traceback.format_exc()}") except: self.write_status(f"[ERROR] Error al leer datos del buffer compartido: {traceback.format_exc()}") finally: timestamp_mqtt = datetime.now().timestamp() self.client_image.loop() self.client.loop() except: self.write_status("[ERROR] Existe un problema al procesar el envío MQTT.") sleep(1) finally: if(datetime.now().timestamp() - timestamp_internet > self.vars.get("SAMPLING_INTERNET",120)): self.check_internet() timestamp_internet = datetime.now().timestamp() if not self.flag_internet: if self.debug: print("Se perdió la conexión a Internet.") self.write_status("[ERROR] Se perdió la conexión a Internet.") break except: if self.debug: print(f"Ocurrió un error no manejado en la función principal de lahares. Copia del error: {error}") error = traceback.format_exc() self.write_status(f"Ocurrió un error inesperado. Copia del error: {error}") sleep(5) def method_video(self,): ''' En este metodo extraemos 5 segundos de video o 135 frames para el entrenamiento del modelo. UPDATE: - Por temas de memoria, solo se trabajará con 10 frames. El formato estará comprimido en gzip para poder obtener un menor tamaño en memoria mientras se comprime los archivos. - ''' self.check_internet(False)#Verbose False if self.debug: print("Lanzado el metodo para la adquisición de video.") self.write_status("[METHOD] Lanzado el metodo para la adquisición de video") timestamp_sampling = datetime.now().timestamp() duration_frames = 10 sleep(4) while 1: current_time = datetime.now().timestamp() try: if (( timestamp_sampling - current_time )< 0 and self.camera.status and self.semaphore_get_video.acquire(False)): self.write_status("Obteniendo video frames .....") try: count_frames = 0 file_video = None file_video = io.BytesIO() self.camera.control_brightness(brightness=100)#Realiza la validación de si es necesario elevar el brillo o no. vid = cv2.VideoCapture(self.camera.url_rstp) time_sleep = 0.35 if duration_frames<=10 else 0.2 now = datetime.now().timestamp() with gzip.GzipFile(fileobj=file_video,mode='wb',compresslevel=9) as gz: while 1: now2 = datetime.now().timestamp() try: _,_ = vid.read() except: self.write_status("Error al capturar el frame") else: try: if (now2 - now) > time_sleep: while 1: try: _,frame = vid.read() if frame is not None: frame = cv2.resize(frame,(640,360)) frame = cv2.cvtColor(frame,cv2.COLOR_BGR2RGB) frame = (frame.astype(numpy.uint8)).tobytes() gz.write(len(frame).to_bytes(4,'big')) gz.write(deepcopy(frame)) frame = None gc.collect() count_frames += 1 now = datetime.now().timestamp() break except: self.write_status(f"Ocurrió un error en video: {traceback.format_exc()}") except: self.write_status(f"Error al comprimir u obtener frames: {traceback.format_exc()}") if count_frames == 10: self.write_status("Se obtuvo 10 frames.") break try: vid.release() except: pass self.camera.control_brightness(brightness=0) self.semaphore_get_video.release() #-------------------------------------------------------------------# # Por temas de latencia en tiempo, se realizará la inferencia en este metodo #-------------------------------------------------------------------# # if self.inference_mode == 'server': # url = "http://38.10.105.243:7777/predict" # input_data = {'instances':base64.b64encode(file_video.getvalue()).decode('utf-8'), # 'id_user':"test-jicamarca", # 'request_format':True, # 'shape':(360,640)} # headers = { # 'Content-Type': 'application/json', # 'Content-Encoding': 'gzip-B64', # } # input_data = json.dumps(input_data) # compress = io.BytesIO() # with gzip.GzipFile(fileobj=compress, mode='wb', compresslevel=9) as gz2: # gz2.write(input_data.encode('utf-8')) # if self.flag_internet: # #Se cuenta con internet para realizar la inferencia al servidor. # try: # resp = requests.post(url,data=compress.getvalue(),headers=headers) # except: # compress = None # file_video = None # gc.collect() # inference = None # else: # compress = None # file_video = None # gc.collect() # if resp.status_code == 200: # #El envio fue un exito # inference = 1 - resp.json()['predictions'][0][0] # self.write_status(f"Inferencia al servidor realizado con exito. Valor de inferencia: {inference}.") # else: # inference = None # self.write_status(f"Se obtuvo otro codigo de respuesta al realizar inferencia al servidor. {resp.status_code}") #-------------------------------------------------------------------# #-------------------------------------------------------------------# #-------------------------------------------------------------------# self.semaphore_estimator.acquire() if self.queue_estimator.qsize()>=2: self.queue_estimator.get() gc.collect() self.queue_estimator.put({ 'video': deepcopy(file_video), 'timestamp': datetime.now().timestamp() }) file_video = None gc.collect() self.semaphore_estimator.release() except: try: vid.release() except: pass gc.collect() self.write_status(f"[ERROR] Ocurrió un error en el proceso de obtener video. Se volverá a intentar en 15 segundos. Copia del error: {traceback.format_exc()}") timestamp_sampling = datetime.now().timestamp() + 15 try: self.semaphore_get_video.release() except: self.write_status(f"[ERROR] Code 100 Method") else: if count_frames == 10: timestamp_sampling = datetime.now().timestamp() + self.vars.get("SAMPLING_TIME_VIDEO") self.write_status(f"Se agregó 10 frames en bytes al queue estimator.") else: timestamp_sampling = datetime.now().timestamp() + 15 self.write_status(f"Frames insuficientes, se volverá a intentar en 15 segundos.") elif (( current_time - timestamp_sampling )> 90): self.write_status(f"[VIDEO] No se puede tomar el semaforo por más de 90 segundos. ") timestamp_sampling = current_time sleep(1) except: self.write_status(f"[ERROR VIDEO] {traceback.format_exc()}") else: #Borrar with open("/tools/live_video.txt",'w') as f: date = datetime.now() chain = f'{date.day} {date.hour}:{date.minute}:{date.second}' f.write(chain) sleep(3) def method_sensores(self,share,flask_,lock): timestamp_camera = datetime.now().timestamp() timestamp_sampling = datetime.now().timestamp() self.timestamp_internet_ = datetime.now().timestamp() sleep(1) self.check_internet(False) while 1: #------------------- inicio de proceso ----------------------------# try: current_time = datetime.now().timestamp() self.__get_internet_bytes() if(timestamp_sampling - current_time < 0 ): ''' Aqui se ejecutan los estados de los sensores. ''' try: key_sensor = self.dataOut.keys() for key in key_sensor: try: self.dataOut[key].run() except: if self.debug: print(f"[ERROR] Error actualizando los datos: {traceback.format_exc()}") self.write_status(f"[ERROR] Error actualizando los datos: {traceback.format_exc()}") self.__process_values(share,flask_,lock) except: if self.debug: print(f"[ERROR] Error decodificando los datos: {traceback.format_exc()}") self.write_status(f"[ERROR] Error decodificando los datos: {traceback.format_exc()}") finally: timestamp_sampling = current_time + self.vars.get("SAMPLING_TIME_SENSOR") if(timestamp_camera - current_time < 0 and self.camera.status and self.semaphore_get_video.acquire(False)): ''' Se adquiere fotos para enviarse por MQTT. ''' frame = None try: self.camera.control_brightness(brightness=100) vid = cv2.VideoCapture(self.camera.url_rstp) while 1: _, frame = vid.read() if frame is not None: break except: try: vid.release() self.camera.control_brightness(brightness=0) except: pass self.semaphore_get_video.release() if self.debug: print(f"Error: {str(traceback.format_exc())}") self.write_status(f"[ERROR] Error generado en la adquisición de fotografía {str(traceback.format_exc())}") timestamp_camera = datetime.now().timestamp() + 5 else: try: vid.release() except: pass self.camera.control_brightness(brightness=0) self.semaphore_get_video.release() #---------------------- imagen obtenida ---------------------# timestamp_camera = datetime.now().timestamp() + self.vars.get("SAMPLING_TIME_CAMERA") buffer = cv2.imencode('.jpg',frame)[1] data = { 'images' : buffer.tobytes(), 'timestamp': datetime.now().timestamp() } payload = { 'type' : 'image', 'content': data } #----------------------------------------------------------- #Definimos un tamaño de 3 imagenes, para no llenar el buffer if self.inference_mode == 'photo': self.semaphore_estimator.acquire() payload_estimator = { 'image':numpy.array(frame), 'timestamp':datetime.now().timestamp() } if self.queue_estimator.qsize()>=3: self.queue_estimator.get() self.queue_estimator.put(payload_estimator) self.semaphore_estimator.release() #---------------- Guardamos en la cola las imagenes a ser enviadas -----------------# self.semaphore.acquire() if self.flag_internet: self.queue.put(payload) if self.queue.qsize()>=10: self.queue.get() gc.collect() self.semaphore.release() gc.collect() #################################### ## Guardando en la ruta de Flask ### #################################### filename_flask = os.path.join('/tools/static',"latest.jpg") with open(filename_flask,'wb') as filew: filew.write(buffer.tobytes()) #################################### ### Guardado de datos ### #################################### if self.store_data: name = '{}.jpg'.format(current_time) filename = os.path.join(self.path_save_img,name) try: with open(filename,'wb') as file: file.write(buffer.tobytes()) except: if self.debug: print("Ocurrió un error al guardar la imagen {}".format(name)) self.write_status("[ERROR] Ocurrió un error al guardar la imagen.") else: if self.debug: print(f"Se guardó la imagen {name}") self.write_status(f"Se guardó la imagen {name}.") gc.collect() except: if self.debug: print("Ocurrió un error en la recolección de datos.") self.write_status(f"Ocurrió un error no identificado en el proceso de recolección de data sensores. Copia del error: {traceback.format_exc()}") sleep(5)