##// END OF EJS Templates
Update others.py
Update others.py

File last commit:

r211:e42cf7f6061b
r224:a7e08621a1bc
Show More
pub.py
1560 lines | 54.5 KiB | text/x-python | PythonLexer
import os
import io
import gc
import cv2
import json
import time
import gzip
import numpy
import busio
import board
import ntplib
import psutil
import base64
import requests
import datetime
import traceback
import threading
import urllib.request
import multiprocessing
import RPi.GPIO as GPIO
import adafruit_lidarlite
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
from others import *
from copy import deepcopy
from multiprocessing import Manager, Pipe,Queue,Lock
from flask import Flask, render_template, jsonify
from datetime import datetime, timedelta
from time import sleep
class manage_lahares(object):
timestamp_init_system = None
max_retries = 5
capture_count = 0
flag_date_update = False
flag_internet = False
bytes_send = 0
bytes_recv = 0
bytes_recv_total = 0
bytes_send_total = 0
count_status = 0
count_write_data = 0
__version__ = 0
dataOut = dict()
def __get_temperature(self,):
'''
Metodo para leer la temperatura del sistema.
'''
try:
with open("/sys/class/thermal/thermal_zone0/temp", "r") as archivo:
temperature = int(archivo.read())
temperature = temperature / 1000.0 # Convertir de miligrados a grados Celsius
return temperature
except FileNotFoundError:
return -1
def create_path(self,path):
if path is None:
return
if not os.path.isdir(path):
os.makedirs(path)
return
def read_metadata(self):
#Cargamos la versión de los dispositivos
file_version_rpi = "/others/version_rpi.txt"
file_bytes_internet = "/others/b4g.txt"
if os.path.exists(file_version_rpi):
with open(file_version_rpi,"r") as f:
value = float(f.readline())
self.__version__ = value
else:
self.__version__ = 0
try:
if os.path.exists(file_bytes_internet):
'''
Con este archivo podemos realizar el control de consumo de internet.
El archivo existe, entonces leemos los valores
Estructura (2 lineas):
----------
- fecha de creacion
- {bytes_send}#{bytes_recv}
'''
with open(file_bytes_internet,"r") as f:
lines = numpy.array(f.read().splitlines(),dtype=object)
if lines.shape[0] == 0:
self.__create_file_bytes()
if lines.shape[0]>1:
date = lines[0]
#------------- Realizamos control de fecha ------------------#
date = datetime.strptime(date, "%d/%m/%Y")
now = datetime.now()
if (datetime(date.year,date.month,1) == datetime(now.year,now.month,1)):
#Estamos en el mismo mes, cargamos los datos
tmp = lines[1].split("#")
self.bytes_send_total = float(tmp[0])
self.bytes_recv_total = float(tmp[1])
self.bytes_recv = psutil.net_io_counters().bytes_recv
self.bytes_send = psutil.net_io_counters().bytes_sent
else:
#Creamos nuevo archivo con datos
formatted_date_time = now.strftime("%d/%m/%Y")
with open(file_bytes_internet,"w") as f:
f.write(formatted_date_time)
self.bytes_recv = psutil.net_io_counters().bytes_recv
self.bytes_send = psutil.net_io_counters().bytes_sent
self.bytes_send_total = 0
self.bytes_recv_total = 0
else:
self.__create_file_bytes()
except:
self.write_status(f"[ERROR] Error: {traceback.format_exc()}")
def __create_file_bytes(self,):
file_bytes_internet = "/others/b4g.txt"
#preparamos los datos
now = datetime.now()
formatted_date_time = now.strftime("%d/%m/%Y")
self.bytes_recv = psutil.net_io_counters().bytes_recv
self.bytes_send = psutil.net_io_counters().bytes_sent
with open(file_bytes_internet,"w") as f:
formatted_date_time = now.strftime("%d/%m/%Y")
f.write(formatted_date_time)
self.bytes_send_total = 0
self.bytes_recv_total = 0
def __handle_size(self, path):
try:
#en mb
limite = 2048
limite *=1024
size = os.path.getsize(path)
if (size>0.95*limite):
#---------------------- Guardamos la mitad final del archivo ---------------------------------------#
get_position = size // 2
lectura = None
with open(path,'rb') as file:
file.seek(get_position)
lectura = file.read()
with open(path,'wb') as file:
file.write(lectura)
except:
if self.debug:
print("Ocurrió un error al analizar el tamaño del archivo {}".format(os.path.basename(path)))
error = traceback.format_exc()
self.write_status("Ocurrió un error al analizar el tamaño del archivo {} Copia del error: {}.".format(os.path.basename(path),error))
def write_status(self,chain):
now = datetime.now()
formatted_date_time = now.strftime("%d/%m/%Y %H:%M:%S")
filename = '/logs/log.txt'
chain = formatted_date_time + " |" + chain
if not os.path.isdir(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
try:
with open(filename,'a') as file:
file.write(chain + '\n')
except:
if self.debug:
print("Ocurrió un error al guardar datos logs.")
else:
if(self.count_status == 150):
self.count_status = 0
#--------------------- Revisamos el tamaño de los archivos-------------------#
# Debe de ser menor a 1 GB.
self.__handle_size(filename)
else:
self.count_status +=1
return
def check_internet(self,verbose=True):
count = 0
while 1:
try:
urllib.request.urlopen('http://www.google.com', timeout=1)
except:
count +=1
if (count ==3):
self.flag_internet = False
break
sleep(0.5)
else:
if verbose:
self.write_status("Se cuenta con conexión a internet.")
if self.debug:
print("Se cuenta con conexión a internet")
self.flag_internet = True
return
return
def update_time(self,):
ntp_server = 'pool.ntp.org'
count = 0
while 1:
try:
client = ntplib.NTPClient()
response = client.request(ntp_server)
ntp_time = datetime.utcfromtimestamp(response.tx_time) - timedelta(hours=+5)
os.system('sudo date {} --utc'.format(ntp_time.strftime('%Y%m%d%H%M.%S')))
if self.debug:
print(f'Hora actualizada: {ntp_time} UTC')
self.write_status(f"La hora ha sido actualizado a {ntp_time}")
except Exception as e:
if self.debug:
print(f'Error al actualizar la hora: {e}')
print(f"Volviendo a intentar en unos instantes...")
count +=1
if(count == 3):
if self.debug:
print("Se alcanzó el maximo de intentos. Se actualizará después")
self.write_status("No se pudo actualizar la hora.")
break
sleep(3)
else:
self.flag_date_update = True
break
def make_setup(self,):
#----------------------- extract lists --------------------------#
try:
list_HFS = self.vars_gpio.get("gpio_HFS",None)
list_HB = self.vars_gpio.get("gpio_HB100",None)
list_RCWL = self.vars_gpio.get("gpio_RCWL",None)
list_ina = self.vars_gpio.get("address_ina",None)
if list_HFS is not None:
for n,key in enumerate(list_HFS.keys()):
if(n>=self.vars.get("MAX_NUMBER_SENSORS")):
if self.debug:
print("Se ha proporcionado una cantidad mayor de sensores para el tipo HFS. Máximo definido son 4 sensores.")
self.write_status("[WARNING] Se asignaron más de 4 sensores para el tipo HFS.")
else:
name = 'sensor_HFS{:02d}'.format(n+1)
pin = int(list_HFS[key])
try:
self.dataOut[name] = sensor_HFS(name,key,pin)
self.dataOut[name].THRESHOLD_BETWEEN_ON_SENSOR = self.vars.get("MIN_ON_SENSOR",30)
self.dataOut[name].THRESHOLD_BETWEEN_OFF_SENSOR = self.vars.get("MIN_OFF_SENSOR",5)
except:
if self.debug:
print(f"[ERROR] Al declarando sensor HFS. Error: {traceback.format_exc()}")
self.write_status(f"[ERROR] Al declarando sensor HFS. Error: {traceback.format_exc()}")
if list_HB is not None:
for n,key in enumerate(list_HB.keys()):
if(n>=self.vars.get("MAX_NUMBER_SENSORS")):
if self.debug:
print("Se ha proporcionado una cantidad mayor de sensores para el tipo HB100. Máximo definido son 4 sensores.")
self.write_status("[WARNING] Se asignaron más de 4 sensores para el tipo HB100.")
else:
name = 'sensor_HB{:02d}'.format(n+1)
self.dataOut[name] = sensor(name,key)
if list_RCWL is not None:
for n,key in enumerate(list_RCWL.keys()):
if(n>=self.vars.get("MAX_NUMBER_SENSORS")):
if self.debug:
print("Se ha proporcionado una cantidad mayor de sensores para el tipo RCWL. Máximo definido son 4 sensores.")
self.write_status("[WARNING] Se asignaron más de 4 sensores para el tipo RCWL.")
else:
name = 'sensor_RCWL{:02d}'.format(n+1)
self.dataOut[name] = sensor(name,key)
if list_ina is not None and self.flag_ina:
for n, key in enumerate(list_ina.keys()):
name = 'ina_{:02d}'.format(n+1)
address = int(list_ina[key])
try:
self.dataOut[name] = ina(name,key,address)
except:
if self.debug:
print(f"[ERROR] Error iniciando sensor ina: {traceback.format_exc()}")
self.write_status(f"[ERROR] Error iniciando sensor ina: {traceback.format_exc()}")
#--------------------- others sensors and modules -----------------#
if self.flag_lidar:
try:
self.dataOut['lidar'] = lidar(name='lidar',key='lidar')
self.dataOut['lidar'].TIME_LIDAR_ON = self.vars.get("TIME_LIDAR_ON",30)
self.dataOut['lidar'].TIME_LIDAR_OFF = self.vars.get("TIME_LIDAR_OFF",60)
self.dataOut['lidar'].min_H = self.vars.get("MIN_HEIGHT_WATER_FOR_LIDAR",5)
self.dataOut['lidar'].minus_H = self.vars.get("MIN_MINUS_HEIGHT_FOR_LIDAR",4)*-1
self.dataOut['lidar'].rare_height = self.vars.get("RARE_HEIGHT",70)
except:
if self.debug:
print(f"[ERROR] Error iniciando sensor: {traceback.format_exc()}")
self.write_status(f"[ERROR] Error iniciando sensor: {traceback.format_exc()}")
except:
if self.debug:
print("Ocurrió un error en la preparación de la trama de configuración.")
error = traceback.format_exc()
self.write_status(f"Ocurrió un error en la preparación de la trama de configuración. Copia del error: {error}")
else:
return
def __config(self,):
#------------------------ revisamos algunas configuraciones ----------------#
if self.flag_camera:
pin = self.camera_keys.get("pin_camera",None)
if pin == None:
self.write_status("[ERROR] Pin del control de camara no ha sido definido. Se controlará el tomado de fotografías por Software.")
else:
# Configuramos el pin como salida
GPIO.setup(pin,GPIO.OUT)
self.camera = camera(flag=self.flag_camera,pin=pin,obj=self.obj_vars)
else:
self.camera = camera(flag=False,pin=None,obj=self.obj_vars)
#-------------------------- Cargamos el modo ---------------------------------#
self.string_model = load_version()
if (self.inference_mode == 'video' and 'Raspberry Pi Zero' in model) or self.inference_mode == 'photo':
if self.debug:
print("Se ha cambiado el modo de inferencia automaticamente. Debido a que los requisitos del sistema no son soportados. Actual inferencia de ML a Server.")
self.write_status("Se ha cambiado el modo de inferencia automaticamente. Debido a que los requisitos del sistema no son soportados. Actual inferencia de ML a Server.")
self.inference_mode = 'server'
def __load_vars(self):
self.path_save = "/data"
self.path_save_img = os.path.join(self.path_save,'img')
self.path_save_json = os.path.join(self.path_save,'json')
self.semaphore = threading.Semaphore(1)
self.semaphore_estimator = threading.Semaphore(1)
self.semaphore_get_video = threading.Semaphore(1)
self.manager = Manager()
self.queue = self.manager.Queue()
self.queue_estimator = self.manager.Queue()
self.obj_vars = VarsJons()
self.estimator = estimator(self.obj_vars,)
self.debug = self.obj_vars.debug
self.vars = self.obj_vars.vars
self.vars_mqtt = self.obj_vars.vars_mqtt
self.store_data = self.obj_vars.store_data
self.vars_gpio = self.obj_vars.vars_gpio
self.inference_mode = self.obj_vars.inference_mode
self.camera_keys = self.obj_vars.camera
self.flag_camera = self.vars_gpio.get("camera", True)
self.flag_lidar = self.vars_gpio.get("lidar",True)
self.flag_ina = self.vars_gpio.get("ina",True)
self.id = str(self.obj_vars.id)
#----------------- share variables --------------------#
self.share_estimator = multiprocessing.Value('d', 0)
def __init__(self):
#---------- Esperamos el inicio del sistema ---------#
sleep(4)
#Guardamos el timestamp de inicio del sistema
self.timestamp_init_system = datetime.now().timestamp()
#------------------ get parameters ------------------#
self.__load_vars()
#------------------- Prints -------------------------#
if self.debug:
print(10*"-")
print("Iniciando sistema")
print(r"Ruta de guardado {}".format(self.store_data))
print(r"Ruta de guardado de imagenes {}".format(self.path_save_img))
print(r"Ruta de guardado de archivos .json: {}".format(self.path_save_json))
print(10*"-")
#----------------- Creamos paths ----------------------#
self.create_path(self.path_save_img)
self.create_path(self.path_save_json)
self.create_path(self.path_save)
#------------------ Read metadatos --------------------#
self.read_metadata()
#-------------------- Escribiendo logs -----------------#
self.write_status(30*"-")
self.write_status("Sistema iniciado")
self.write_status(r"Ruta de guardado {}".format(self.store_data))
self.write_status(r"Ruta de guardado de imagenes {}".format(self.path_save_img))
self.write_status(r"Ruta de guardado de archivos .json: {}".format(self.path_save_json))
self.write_status(30*"-")
#-------------------- Leyendo setup --------------------#
self.make_setup()
#--------------------- Others config -------------------#
self.__config()
#------------------- Run process -----------------------#
self.run_process()
#------------------ Fin script -------------------------#
def run_process(self,):
if self.debug:
print("Realizando la derivación de procesos")
self.write_status("Realizando la derivación de procesos.")
#--------------------- Creamos la variable compartida ----------------------#
#Esta variable compartida nos va a permitir compartir datos para mostrarlo en Flask
json_dict = self.manager.dict()
lock = Lock()
#-------------------------- init process --------------------#
try:
self.process_sensores = multiprocessing.Process(target = self.method_sensores, args=(self.share_estimator,json_dict,lock,))
self.process_mqtt = multiprocessing.Process(target = self.method_mqtt,args=(json_dict,lock,))
self.process_estimator= multiprocessing.Process(target= self.method_estimator,args=(self.share_estimator,lock,))
self.process_flask = multiprocessing.Process(target=self.method_flask,args=(json_dict,lock,))
self.process_sensores.start()
self.process_mqtt.start()
self.process_estimator.start()
self.process_flask.start()
if self.inference_mode == 'video' or self.inference_mode == 'server':
self.process_video = multiprocessing.Process(target=self.method_video,args=())
self.process_video.start()
self.process_video.join()
self.process_mqtt.join()
self.process_sensores.join()
self.process_estimator.join()
self.process_flask.join()
except:
error = traceback.format_exc()
if self.debug:
print("Error en la inialización de procesos.")
error = traceback.format_exc()
self.write_status(f"Error en la inialización de procesos. Copia del error {error}")
def method_flask(self,json_dict,lock):
sleep(2)
if self.debug:
print("Metodo Flask Lanzado")
app = Flask(__name__)
self.write_status(f"[Flask] Metodo lanzado")
@app.route('/')
def home():
return render_template("main.html")
def get_image():
latest_name = sorted(os.listdir("/data/img"))[-1]
return os.path.join("data/img",latest_name)
@app.get("/update")
def update():
try:
data = dict()
with locked(lock):
data = dict(json_dict)
x = jsonify(data)
except:
if self.debug:
print(f"[FlaskUPDTErr] Error: {traceback.format_exc()}")
self.write_status(f"[FlaskUPDTErr] Error: {traceback.format_exc()}")
else:
return x
try:
app.run(debug=False,port=5000,host='0.0.0.0')
except:
if self.debug:
print(f"[ErrFlask] Error en flask \n{traceback.format_exc()}")
self.write_status(f"[ErrFlask] Error en flask \n{traceback.format_exc()}")
else:
if self.debug:
print(f"[ErrFlask] Error en flask \n{traceback.format_exc()}")
self.write_status(f"[Flask] Servidor lanzado")
def method_estimator(self, share,lock):
#----------------------- Carga del modelo IA -------------------------------------#
timestamp_latest = datetime.now().timestamp()
self.estimator.load_weights() #Cargamos los pesos
if self.debug:
print("Proceso estimator ha sido lanzado.")
self.write_status("[Estimator] Proceso estimator ha sido lanzado.")
sleep(5)
while 1:
flag = False
try:
self.semaphore_estimator.acquire()
while not self.queue_estimator.empty():
flag = True #Se realizó la inferencia
timestamp_latest = datetime.now().timestamp()
if self.inference_mode == 'photo':
self.estimator.image = self.queue_estimator.get()
elif self.inference_mode == 'video':
self.estimator.video = self.queue_estimator.get()
elif self.inference_mode == 'server':
self.estimator.video = self.queue_estimator.get()
self.estimator.run()
self.semaphore_estimator.release()
timestamp = datetime.now().timestamp()
if (timestamp - timestamp_latest > 30*60):
'''
Controlamos que la ultima inferencia no se haya realizado hace más de 30 minutos.
Esto es util para prevenir cualquier tipo de incidente, ya sea cámara malograda entre otras formas.
'''
self.estimator.inference_value = 10
except:
exc = traceback.format_exc()
self.write_status(f"[ERROR] Ocurrió un error al realizar inferencia. Copia del error: {exc}")
sleep(10)
else:
'''
Solo compartimos el valor de la inferencia al otro estimator. Debido a que cada multiproceso
maneja una copia diferente del objeto Estimator. Por más que realicemos un cálculo, no se verá
reflejado en el otro objeto.
'''
if flag:
self.write_status("[Estimator] Se realizó la inferencia en el Queue Estimator. Variable ha sido compartida entre multiprocesos.")
with locked(lock):
if self.estimator.inference_value == None:
inference = 10
else:
inference = self.estimator.inference_value
share.value = inference
sleep(1)
gc.collect()
sleep(3)
def send_uart(self,uart,string):
try:
uart.write(string.encode('utf-8'))
except:
if self.debug:
print("Ocurrió un error al enviar datos por puerto UART.")
self.write_status("[ERROR] Ocurrió un error al enviar datos por el puerto UART.")
def __get_internet_bytes(self,):
a, b = psutil.net_io_counters().bytes_recv,psutil.net_io_counters().bytes_sent
recv = a - self.bytes_recv
send = b - self.bytes_send
self.bytes_recv_total += recv
self.bytes_send_total += send
self.bytes_recv = a
self.bytes_send = b
file_bytes_internet = "/others/b4g.txt"
#--------------------- control -------------------------------#
tmp_write = list()
try:
if os.path.exists(file_bytes_internet):
with open(file_bytes_internet,"r") as file:
line = numpy.array(file.read().splitlines(),dtype=object)
if line.shape[0]>1:
tmp_write.append(line[0])
tmp_write.append(f"{self.bytes_send_total}#{self.bytes_recv_total}")
with open(file_bytes_internet,"w") as file:
file.write(l+'\n' for l in tmp_write)
else:
self.__create_file_bytes()
gc.collect()
except:
self.write_status(f"[ERROR] Error: {traceback.format_exc()}")
def __process_values(self,share,flask_,lock):
try:
tmp_data = dict()
#------------------------ Calculamos el uso de internet --------------#
total_internet = round((self.bytes_recv_total + self.bytes_send_total)/(1024*1024),3)
#---------------------------------------------------------------------#
tmp_data["current_timestamp"] = datetime.now().timestamp()
tmp_data['init_timestamp'] = self.timestamp_init_system
tmp_data['version_rpi'] = self.__version__
tmp_data['location'] = self.obj_vars.location
tmp_data['latitude'] = self.obj_vars.latitude
tmp_data['longitude'] = self.obj_vars.longitude
tmp_data["id"] = self.id
tmp_data['camera_status'] = self.camera.status
tmp_data["disk_percent_use"] = psutil.disk_usage('.').percent
tmp_data['DEVICE_INFO'] = self.string_model
tmp_data['CPU_usage'] = psutil.cpu_percent(interval=1)
tmp_data['temperature'] = self.__get_temperature()
tmp_data["ALL_USE_E_MB"] = round(total_internet,4)
list_keys = self.dataOut.keys()
for key in list_keys:
if key == 'lidar':
tmp_data['lidar_status'] = self.dataOut[key].activate
tmp_data['lidar_dH'] = self.dataOut[key].dH_
tmp_data['h0'] = self.dataOut[key].H0
if self.dataOut[key].ERROR_WIRE == True:
tmp_data[key] = "WIRE_ERROR"
else:
tmp_data[key] = "OK"
elif 'ina' in key:
key_ = self.dataOut[key].key
tmp_data[f'bus_voltage_{key_}'] = round(self.dataOut[key].bus_voltage,4)
tmp_data[f'shunt_voltage_{key_}']= round(self.dataOut[key].shunt_voltage,4)
tmp_data[f'current_{key_}'] = round(self.dataOut[key].current,4)
else:
tmp_data[key] = self.dataOut[key].activate
tmp_data[f'it_{key}'] = self.dataOut[key].current_sensor()
#------------------------------- Inference method ------------------------------#
self.estimator.dataOut = self.dataOut
with locked(lock):
self.estimator.share = float(share.value)
tmp_data['camera_inference'] = self.estimator.share
tmp_data['string_inference'] = self.estimator.string_status
tmp_data['status'] = self.estimator.activate
tmp_data['count_status'] = self.estimator.activate_count
#-----------------------------------------------------#
#--------------- Compartiendo data a Flask -----------#
#-----------------------------------------------------#
try:
with locked(lock):
flask_.clear() #Limpiamos todos los datos compartidos
for key in tmp_data.keys():
flask_[key] = tmp_data[key]
except:
if self.debug:
print(f"[ERROR] Error al escribir datos en la variable compartida: {traceback.format_exc()}")
self.write_status(f"[ERROR] Error al escribir datos en la variable compartida: {traceback.format_exc()}")
payload = {'type':'json',
'content':tmp_data}
if self.store_data:
self.write_data(payload)
return tmp_data
except:
error = (traceback.format_exc())
if self.debug:
print(f"Ocurrió un error al leer los datos.{error}")
self.write_status(f"Ocurrió un error al leer datos. Copia del error {error}.")
def write_data(self,data):
now = datetime.now()
formatted_date_time = now.strftime("%d/%m/%Y %H:%M:%S") + " |"
try:
name = 'data.txt'
filename = os.path.join(self.path_save_json,name)
with open(filename,'a') as file:
file.write(formatted_date_time + str(json.dumps(data)) + '\n')
except:
if self.debug:
print(f"Ocurrió un error al guardar los datos en el archivo {name}")
self.write_status(f"[ERROR] Ocurrió un error al guardar los datos en el archivo {name}.")
else:
self.count_write_data += 1
try:
if(self.count_write_data == 100):
self.count_write_data = 0
self.__handle_size(filename)
except:
exc = traceback.format_exc()
self.write_status(f"[Error] Error desconocido en {exc}")
def method_mqtt(self,payload_json,lock):
#----------------------------- Control de internet --------------------------#
# Revisamos el internet cada 30 minutos
timestamp_internet = datetime.now().timestamp()
timestamp_mqtt = datetime.now().timestamp()
values = list()
self.check_internet()
while 1:
try:
while 1:
if self.flag_internet:
CLIENT_MQTT = str(self.id +"_pub")
mqtt_user = self.vars_mqtt.get("mqtt_user")
mqtt_pass = self.vars_mqtt.get("mqtt_pass")
mqtt_broker = self.vars_mqtt.get("mqtt_broker")
mqtt_port = self.vars_mqtt.get("mqtt_port")
self.client = mqtt.Client(CLIENT_MQTT)
self.client_image = mqtt.Client(CLIENT_MQTT+'image')
self.client.on_connect = on_connect
self.client_image.on_connect = on_connect
self.client.on_disconnect = on_disconnect
self.client_image.on_disconnect = on_disconnect
self.client.username_pw_set(mqtt_user, mqtt_pass)
self.client_image.username_pw_set(mqtt_user,mqtt_pass)
self.client.connect(mqtt_broker, mqtt_port,keepalive=300)
self.client_image.connect(mqtt_broker,mqtt_port,keepalive=300)
break
else:
#Esperamos un minuto
if self.debug:
print("No se cuenta con internet. Esperamos un minuto para lanzar el proceso MQTT.")
self.write_status("[WARNING] No se cuenta con internet. Esperando un minuto para lanzar el proceso MQTT.")
sleep(30)
self.check_internet()
if self.debug:
print("El proceso MQTT ha sido lanzado")
self.write_status("El proceso MQTT ha sido lanzado.")
photo_topic = os.path.join(self.vars_mqtt.get("photo_topic"),self.id)
data_topic = os.path.join(self.vars_mqtt.get("data_topic"),self.id)
while 1:
try:
while not self.queue.empty():
self.semaphore.acquire()
mensaje = self.queue.get()
self.semaphore.release()
if mensaje["type"] == 'image':
content = mensaje['content']
try:
if self.debug:
print("Enviando imagen por MQTT ...")
self.write_status("Enviando imagen por MQTT.")
self.client_image.publish(photo_topic,content['images'],qos=1)
if self.debug:
print("Imagen ha sido enviada por MQTT")
self.write_status("Imagen ha sido enviado por MQTT.")
except:
if self.debug:
print("Hubo un error al enviar la imagen. Se devuelve a la cola de envío.")
self.write_status("[ERROR] Error por envío de imagen MQTT.")
gc.collect()
if (datetime.now().timestamp() - timestamp_mqtt > self.vars.get("TIME_SEND_MQTT",3)):
try:
with locked(lock):
data = dict(payload_json)
tmp = dict()
tmp['values'] = data
payload = json.dumps(tmp)
try:
self.client.publish(data_topic,payload,qos=1)
except:
self.write_status(f"[ERROR] Error al enviar datos por MQTT: {traceback.format_exc()}")
except:
self.write_status(f"[ERROR] Error al leer datos del buffer compartido: {traceback.format_exc()}")
finally:
timestamp_mqtt = datetime.now().timestamp()
self.client_image.loop()
self.client.loop()
except:
self.write_status("[ERROR] Existe un problema al procesar el envío MQTT.")
sleep(1)
finally:
if(datetime.now().timestamp() - timestamp_internet > self.vars.get("SAMPLING_INTERNET",120)):
self.check_internet()
timestamp_internet = datetime.now().timestamp()
if not self.flag_internet:
if self.debug:
print("Se perdió la conexión a Internet.")
self.write_status("[ERROR] Se perdió la conexión a Internet.")
break
except:
if self.debug:
print(f"Ocurrió un error no manejado en la función principal de lahares. Copia del error: {error}")
error = traceback.format_exc()
self.write_status(f"Ocurrió un error inesperado. Copia del error: {error}")
sleep(5)
def method_video(self,):
'''
En este metodo extraemos 5 segundos de video o 135 frames para el entrenamiento del modelo.
UPDATE:
- Por temas de memoria, solo se trabajará con 10 frames. El formato estará comprimido en gzip
para poder obtener un menor tamaño en memoria mientras se comprime los archivos.
-
'''
self.check_internet(False)#Verbose False
if self.debug:
print("Lanzado el metodo para la adquisición de video.")
self.write_status("[METHOD] Lanzado el metodo para la adquisición de video")
timestamp_sampling = datetime.now().timestamp()
duration_frames = 10
sleep(4)
while 1:
current_time = datetime.now().timestamp()
try:
if (( timestamp_sampling - current_time )< 0 and self.camera.status and self.semaphore_get_video.acquire(False)):
self.write_status("Obteniendo video frames .....")
try:
count_frames = 0
file_video = None
file_video = io.BytesIO()
self.camera.control_brightness(brightness=100)#Realiza la validación de si es necesario elevar el brillo o no.
vid = cv2.VideoCapture(self.camera.url_rstp)
time_sleep = 0.35 if duration_frames<=10 else 0.2
now = datetime.now().timestamp()
with gzip.GzipFile(fileobj=file_video,mode='wb',compresslevel=9) as gz:
while 1:
now2 = datetime.now().timestamp()
try:
_,_ = vid.read()
except:
self.write_status("Error al capturar el frame")
else:
try:
if (now2 - now) > time_sleep:
while 1:
try:
_,frame = vid.read()
if frame is not None:
frame = cv2.resize(frame,(640,360))
frame = cv2.cvtColor(frame,cv2.COLOR_BGR2RGB)
frame = (frame.astype(numpy.uint8)).tobytes()
gz.write(len(frame).to_bytes(4,'big'))
gz.write(deepcopy(frame))
frame = None
gc.collect()
count_frames += 1
now = datetime.now().timestamp()
break
except:
self.write_status(f"Ocurrió un error en video: {traceback.format_exc()}")
except:
self.write_status(f"Error al comprimir u obtener frames: {traceback.format_exc()}")
if count_frames == 10:
self.write_status("Se obtuvo 10 frames.")
break
try:
vid.release()
except:
pass
self.camera.control_brightness(brightness=0)
self.semaphore_get_video.release()
#-------------------------------------------------------------------#
# Por temas de latencia en tiempo, se realizará la inferencia en este metodo
#-------------------------------------------------------------------#
# if self.inference_mode == 'server':
# url = "http://38.10.105.243:7777/predict"
# input_data = {'instances':base64.b64encode(file_video.getvalue()).decode('utf-8'),
# 'id_user':"test-jicamarca",
# 'request_format':True,
# 'shape':(360,640)}
# headers = {
# 'Content-Type': 'application/json',
# 'Content-Encoding': 'gzip-B64',
# }
# input_data = json.dumps(input_data)
# compress = io.BytesIO()
# with gzip.GzipFile(fileobj=compress, mode='wb', compresslevel=9) as gz2:
# gz2.write(input_data.encode('utf-8'))
# if self.flag_internet:
# #Se cuenta con internet para realizar la inferencia al servidor.
# try:
# resp = requests.post(url,data=compress.getvalue(),headers=headers)
# except:
# compress = None
# file_video = None
# gc.collect()
# inference = None
# else:
# compress = None
# file_video = None
# gc.collect()
# if resp.status_code == 200:
# #El envio fue un exito
# inference = 1 - resp.json()['predictions'][0][0]
# self.write_status(f"Inferencia al servidor realizado con exito. Valor de inferencia: {inference}.")
# else:
# inference = None
# self.write_status(f"Se obtuvo otro codigo de respuesta al realizar inferencia al servidor. {resp.status_code}")
#-------------------------------------------------------------------#
#-------------------------------------------------------------------#
#-------------------------------------------------------------------#
self.semaphore_estimator.acquire()
if self.queue_estimator.qsize()>=2:
self.queue_estimator.get()
gc.collect()
self.queue_estimator.put({
'video': deepcopy(file_video),
'timestamp': datetime.now().timestamp()
})
file_video = None
gc.collect()
self.semaphore_estimator.release()
except:
try:
vid.release()
except:
pass
gc.collect()
self.write_status(f"[ERROR] Ocurrió un error en el proceso de obtener video. Se volverá a intentar en 15 segundos. Copia del error: {traceback.format_exc()}")
timestamp_sampling = datetime.now().timestamp() + 15
try:
self.semaphore_get_video.release()
except:
self.write_status(f"[ERROR] Code 100 Method")
else:
if count_frames == 10:
timestamp_sampling = datetime.now().timestamp() + self.vars.get("SAMPLING_TIME_VIDEO")
self.write_status(f"Se agregó 10 frames en bytes al queue estimator.")
else:
timestamp_sampling = datetime.now().timestamp() + 15
self.write_status(f"Frames insuficientes, se volverá a intentar en 15 segundos.")
elif (( current_time - timestamp_sampling )> 90):
self.write_status(f"[VIDEO] No se puede tomar el semaforo por más de 90 segundos. ")
timestamp_sampling = current_time
sleep(1)
except:
self.write_status(f"[ERROR VIDEO] {traceback.format_exc()}")
else:
#Borrar
with open("/tools/live_video.txt",'w') as f:
date = datetime.now()
chain = f'{date.day} {date.hour}:{date.minute}:{date.second}'
f.write(chain)
sleep(3)
def method_sensores(self,share,flask_,lock):
timestamp_camera = datetime.now().timestamp()
timestamp_sampling = datetime.now().timestamp()
self.timestamp_internet_ = datetime.now().timestamp()
sleep(1)
self.check_internet(False)
while 1:
#------------------- inicio de proceso ----------------------------#
try:
current_time = datetime.now().timestamp()
self.__get_internet_bytes()
if(timestamp_sampling - current_time < 0 ):
'''
Aqui se ejecutan los estados de los sensores.
'''
try:
key_sensor = self.dataOut.keys()
for key in key_sensor:
try:
self.dataOut[key].run()
except:
if self.debug:
print(f"[ERROR] Error actualizando los datos: {traceback.format_exc()}")
self.write_status(f"[ERROR] Error actualizando los datos: {traceback.format_exc()}")
self.__process_values(share,flask_,lock)
except:
if self.debug:
print(f"[ERROR] Error decodificando los datos: {traceback.format_exc()}")
self.write_status(f"[ERROR] Error decodificando los datos: {traceback.format_exc()}")
finally:
timestamp_sampling = current_time + self.vars.get("SAMPLING_TIME_SENSOR")
if(timestamp_camera - current_time < 0 and self.camera.status and self.semaphore_get_video.acquire(False)):
'''
Se adquiere fotos para enviarse por MQTT.
'''
frame = None
try:
self.camera.control_brightness(brightness=100)
vid = cv2.VideoCapture(self.camera.url_rstp)
while 1:
_, frame = vid.read()
if frame is not None:
break
except:
try:
vid.release()
self.camera.control_brightness(brightness=0)
except:
pass
self.semaphore_get_video.release()
if self.debug:
print(f"Error: {str(traceback.format_exc())}")
self.write_status(f"[ERROR] Error generado en la adquisición de fotografía {str(traceback.format_exc())}")
timestamp_camera = datetime.now().timestamp() + 5
else:
try:
vid.release()
except:
pass
self.camera.control_brightness(brightness=0)
self.semaphore_get_video.release()
#---------------------- imagen obtenida ---------------------#
timestamp_camera = datetime.now().timestamp() + self.vars.get("SAMPLING_TIME_CAMERA")
buffer = cv2.imencode('.jpg',frame)[1]
data = {
'images' : buffer.tobytes(),
'timestamp': datetime.now().timestamp()
}
payload = {
'type' : 'image',
'content': data
}
#-----------------------------------------------------------
#Definimos un tamaño de 3 imagenes, para no llenar el buffer
if self.inference_mode == 'photo':
self.semaphore_estimator.acquire()
payload_estimator = {
'image':numpy.array(frame),
'timestamp':datetime.now().timestamp()
}
if self.queue_estimator.qsize()>=3:
self.queue_estimator.get()
self.queue_estimator.put(payload_estimator)
self.semaphore_estimator.release()
#---------------- Guardamos en la cola las imagenes a ser enviadas -----------------#
self.semaphore.acquire()
if self.flag_internet:
self.queue.put(payload)
if self.queue.qsize()>=10:
self.queue.get()
gc.collect()
self.semaphore.release()
gc.collect()
####################################
## Guardando en la ruta de Flask ###
####################################
filename_flask = os.path.join('/tools/static',"latest.jpg")
with open(filename_flask,'wb') as filew:
filew.write(buffer.tobytes())
####################################
### Guardado de datos ###
####################################
if self.store_data:
name = '{}.jpg'.format(current_time)
filename = os.path.join(self.path_save_img,name)
try:
with open(filename,'wb') as file:
file.write(buffer.tobytes())
except:
if self.debug:
print("Ocurrió un error al guardar la imagen {}".format(name))
self.write_status("[ERROR] Ocurrió un error al guardar la imagen.")
else:
if self.debug:
print(f"Se guardó la imagen {name}")
self.write_status(f"Se guardó la imagen {name}.")
gc.collect()
except:
if self.debug:
print("Ocurrió un error en la recolección de datos.")
self.write_status(f"Ocurrió un error no identificado en el proceso de recolección de data sensores. Copia del error: {traceback.format_exc()}")
sleep(5)