|
|
import os
|
|
|
import io
|
|
|
import gc
|
|
|
import cv2
|
|
|
import json
|
|
|
import time
|
|
|
import gzip
|
|
|
import numpy
|
|
|
import busio
|
|
|
import board
|
|
|
import ntplib
|
|
|
import psutil
|
|
|
import base64
|
|
|
import requests
|
|
|
import datetime
|
|
|
import traceback
|
|
|
import threading
|
|
|
import urllib.request
|
|
|
import multiprocessing
|
|
|
import RPi.GPIO as GPIO
|
|
|
import adafruit_lidarlite
|
|
|
import paho.mqtt.client as mqtt
|
|
|
import paho.mqtt.publish as publish
|
|
|
|
|
|
from others import *
|
|
|
from copy import deepcopy
|
|
|
from multiprocessing import Manager, Pipe,Queue,Lock
|
|
|
from flask import Flask, render_template, jsonify
|
|
|
from datetime import datetime, timedelta
|
|
|
from time import sleep
|
|
|
|
|
|
|
|
|
class manage_lahares(object):
|
|
|
|
|
|
timestamp_init_system = None
|
|
|
max_retries = 5
|
|
|
capture_count = 0
|
|
|
|
|
|
flag_date_update = False
|
|
|
flag_internet = False
|
|
|
|
|
|
bytes_send = 0
|
|
|
bytes_recv = 0
|
|
|
|
|
|
bytes_recv_total = 0
|
|
|
bytes_send_total = 0
|
|
|
|
|
|
count_status = 0
|
|
|
count_write_data = 0
|
|
|
|
|
|
__version__ = 0
|
|
|
|
|
|
|
|
|
dataOut = dict()
|
|
|
|
|
|
def __get_temperature(self,):
|
|
|
|
|
|
'''
|
|
|
Metodo para leer la temperatura del sistema.
|
|
|
'''
|
|
|
|
|
|
try:
|
|
|
with open("/sys/class/thermal/thermal_zone0/temp", "r") as archivo:
|
|
|
temperature = int(archivo.read())
|
|
|
temperature = temperature / 1000.0 # Convertir de miligrados a grados Celsius
|
|
|
|
|
|
return temperature
|
|
|
|
|
|
except FileNotFoundError:
|
|
|
return -1
|
|
|
|
|
|
|
|
|
def create_path(self,path):
|
|
|
if path is None:
|
|
|
return
|
|
|
|
|
|
if not os.path.isdir(path):
|
|
|
os.makedirs(path)
|
|
|
|
|
|
return
|
|
|
|
|
|
def read_metadata(self):
|
|
|
|
|
|
#Cargamos la versión de los dispositivos
|
|
|
|
|
|
file_version_rpi = "/others/version_rpi.txt"
|
|
|
|
|
|
file_bytes_internet = "/others/b4g.txt"
|
|
|
|
|
|
|
|
|
if os.path.exists(file_version_rpi):
|
|
|
|
|
|
with open(file_version_rpi,"r") as f:
|
|
|
|
|
|
value = float(f.readline())
|
|
|
|
|
|
self.__version__ = value
|
|
|
|
|
|
else:
|
|
|
|
|
|
self.__version__ = 0
|
|
|
|
|
|
try:
|
|
|
|
|
|
if os.path.exists(file_bytes_internet):
|
|
|
'''
|
|
|
Con este archivo podemos realizar el control de consumo de internet.
|
|
|
El archivo existe, entonces leemos los valores
|
|
|
Estructura (2 lineas):
|
|
|
----------
|
|
|
- fecha de creacion
|
|
|
- {bytes_send}#{bytes_recv}
|
|
|
|
|
|
'''
|
|
|
|
|
|
with open(file_bytes_internet,"r") as f:
|
|
|
|
|
|
lines = numpy.array(f.read().splitlines(),dtype=object)
|
|
|
|
|
|
if lines.shape[0] == 0:
|
|
|
self.__create_file_bytes()
|
|
|
|
|
|
if lines.shape[0]>1:
|
|
|
date = lines[0]
|
|
|
#------------- Realizamos control de fecha ------------------#
|
|
|
|
|
|
date = datetime.strptime(date, "%d/%m/%Y")
|
|
|
now = datetime.now()
|
|
|
|
|
|
if (datetime(date.year,date.month,1) == datetime(now.year,now.month,1)):
|
|
|
#Estamos en el mismo mes, cargamos los datos
|
|
|
|
|
|
tmp = lines[1].split("#")
|
|
|
|
|
|
self.bytes_send_total = float(tmp[0])
|
|
|
self.bytes_recv_total = float(tmp[1])
|
|
|
|
|
|
self.bytes_recv = psutil.net_io_counters().bytes_recv
|
|
|
self.bytes_send = psutil.net_io_counters().bytes_sent
|
|
|
|
|
|
else:
|
|
|
#Creamos nuevo archivo con datos
|
|
|
formatted_date_time = now.strftime("%d/%m/%Y")
|
|
|
|
|
|
with open(file_bytes_internet,"w") as f:
|
|
|
|
|
|
f.write(formatted_date_time)
|
|
|
|
|
|
self.bytes_recv = psutil.net_io_counters().bytes_recv
|
|
|
self.bytes_send = psutil.net_io_counters().bytes_sent
|
|
|
|
|
|
self.bytes_send_total = 0
|
|
|
self.bytes_recv_total = 0
|
|
|
|
|
|
else:
|
|
|
|
|
|
self.__create_file_bytes()
|
|
|
|
|
|
|
|
|
except:
|
|
|
|
|
|
self.write_status(f"[ERROR] Error: {traceback.format_exc()}")
|
|
|
|
|
|
|
|
|
|
|
|
def __create_file_bytes(self,):
|
|
|
|
|
|
file_bytes_internet = "/others/b4g.txt"
|
|
|
#preparamos los datos
|
|
|
now = datetime.now()
|
|
|
|
|
|
formatted_date_time = now.strftime("%d/%m/%Y")
|
|
|
self.bytes_recv = psutil.net_io_counters().bytes_recv
|
|
|
self.bytes_send = psutil.net_io_counters().bytes_sent
|
|
|
|
|
|
with open(file_bytes_internet,"w") as f:
|
|
|
|
|
|
formatted_date_time = now.strftime("%d/%m/%Y")
|
|
|
f.write(formatted_date_time)
|
|
|
|
|
|
self.bytes_send_total = 0
|
|
|
self.bytes_recv_total = 0
|
|
|
|
|
|
def __handle_size(self, path):
|
|
|
|
|
|
try:
|
|
|
#en mb
|
|
|
limite = 2048
|
|
|
|
|
|
limite *=1024
|
|
|
size = os.path.getsize(path)
|
|
|
|
|
|
if (size>0.95*limite):
|
|
|
|
|
|
#---------------------- Guardamos la mitad final del archivo ---------------------------------------#
|
|
|
|
|
|
get_position = size // 2
|
|
|
lectura = None
|
|
|
|
|
|
|
|
|
with open(path,'rb') as file:
|
|
|
|
|
|
file.seek(get_position)
|
|
|
lectura = file.read()
|
|
|
|
|
|
with open(path,'wb') as file:
|
|
|
file.write(lectura)
|
|
|
|
|
|
|
|
|
except:
|
|
|
|
|
|
if self.debug:
|
|
|
print("Ocurrió un error al analizar el tamaño del archivo {}".format(os.path.basename(path)))
|
|
|
|
|
|
error = traceback.format_exc()
|
|
|
self.write_status("Ocurrió un error al analizar el tamaño del archivo {} Copia del error: {}.".format(os.path.basename(path),error))
|
|
|
|
|
|
def write_status(self,chain):
|
|
|
|
|
|
now = datetime.now()
|
|
|
|
|
|
formatted_date_time = now.strftime("%d/%m/%Y %H:%M:%S")
|
|
|
|
|
|
filename = '/logs/log.txt'
|
|
|
|
|
|
chain = formatted_date_time + " |" + chain
|
|
|
|
|
|
if not os.path.isdir(os.path.dirname(filename)):
|
|
|
os.makedirs(os.path.dirname(filename))
|
|
|
|
|
|
try:
|
|
|
with open(filename,'a') as file:
|
|
|
|
|
|
file.write(chain + '\n')
|
|
|
except:
|
|
|
|
|
|
if self.debug:
|
|
|
print("Ocurrió un error al guardar datos logs.")
|
|
|
|
|
|
else:
|
|
|
if(self.count_status == 150):
|
|
|
self.count_status = 0
|
|
|
#--------------------- Revisamos el tamaño de los archivos-------------------#
|
|
|
# Debe de ser menor a 1 GB.
|
|
|
|
|
|
self.__handle_size(filename)
|
|
|
else:
|
|
|
self.count_status +=1
|
|
|
return
|
|
|
|
|
|
def check_internet(self,verbose=True):
|
|
|
|
|
|
count = 0
|
|
|
|
|
|
while 1:
|
|
|
|
|
|
try:
|
|
|
urllib.request.urlopen('http://www.google.com', timeout=1)
|
|
|
|
|
|
except:
|
|
|
count +=1
|
|
|
if (count ==3):
|
|
|
self.flag_internet = False
|
|
|
break
|
|
|
|
|
|
sleep(0.5)
|
|
|
else:
|
|
|
if verbose:
|
|
|
self.write_status("Se cuenta con conexión a internet.")
|
|
|
|
|
|
if self.debug:
|
|
|
print("Se cuenta con conexión a internet")
|
|
|
|
|
|
self.flag_internet = True
|
|
|
|
|
|
return
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def update_time(self,):
|
|
|
|
|
|
ntp_server = 'pool.ntp.org'
|
|
|
|
|
|
count = 0
|
|
|
|
|
|
while 1:
|
|
|
|
|
|
try:
|
|
|
|
|
|
client = ntplib.NTPClient()
|
|
|
|
|
|
response = client.request(ntp_server)
|
|
|
|
|
|
ntp_time = datetime.utcfromtimestamp(response.tx_time) - timedelta(hours=+5)
|
|
|
|
|
|
os.system('sudo date {} --utc'.format(ntp_time.strftime('%Y%m%d%H%M.%S')))
|
|
|
|
|
|
if self.debug:
|
|
|
print(f'Hora actualizada: {ntp_time} UTC')
|
|
|
|
|
|
self.write_status(f"La hora ha sido actualizado a {ntp_time}")
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
if self.debug:
|
|
|
print(f'Error al actualizar la hora: {e}')
|
|
|
print(f"Volviendo a intentar en unos instantes...")
|
|
|
|
|
|
count +=1
|
|
|
|
|
|
if(count == 3):
|
|
|
|
|
|
if self.debug:
|
|
|
|
|
|
print("Se alcanzó el maximo de intentos. Se actualizará después")
|
|
|
|
|
|
self.write_status("No se pudo actualizar la hora.")
|
|
|
|
|
|
break
|
|
|
|
|
|
sleep(3)
|
|
|
else:
|
|
|
|
|
|
self.flag_date_update = True
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
def make_setup(self,):
|
|
|
|
|
|
#----------------------- extract lists --------------------------#
|
|
|
|
|
|
try:
|
|
|
|
|
|
list_HFS = self.vars_gpio.get("gpio_HFS",None)
|
|
|
list_HB = self.vars_gpio.get("gpio_HB100",None)
|
|
|
list_RCWL = self.vars_gpio.get("gpio_RCWL",None)
|
|
|
list_ina = self.vars_gpio.get("address_ina",None)
|
|
|
|
|
|
if list_HFS is not None:
|
|
|
for n,key in enumerate(list_HFS.keys()):
|
|
|
if(n>=self.vars.get("MAX_NUMBER_SENSORS")):
|
|
|
if self.debug:
|
|
|
print("Se ha proporcionado una cantidad mayor de sensores para el tipo HFS. Máximo definido son 4 sensores.")
|
|
|
|
|
|
self.write_status("[WARNING] Se asignaron más de 4 sensores para el tipo HFS.")
|
|
|
else:
|
|
|
name = 'sensor_HFS{:02d}'.format(n+1)
|
|
|
pin = int(list_HFS[key])
|
|
|
|
|
|
try:
|
|
|
self.dataOut[name] = sensor_HFS(name,key,pin)
|
|
|
self.dataOut[name].THRESHOLD_BETWEEN_ON_SENSOR = self.vars.get("MIN_ON_SENSOR",30)
|
|
|
self.dataOut[name].THRESHOLD_BETWEEN_OFF_SENSOR = self.vars.get("MIN_OFF_SENSOR",5)
|
|
|
except:
|
|
|
|
|
|
if self.debug:
|
|
|
print(f"[ERROR] Al declarando sensor HFS. Error: {traceback.format_exc()}")
|
|
|
self.write_status(f"[ERROR] Al declarando sensor HFS. Error: {traceback.format_exc()}")
|
|
|
|
|
|
if list_HB is not None:
|
|
|
for n,key in enumerate(list_HB.keys()):
|
|
|
if(n>=self.vars.get("MAX_NUMBER_SENSORS")):
|
|
|
if self.debug:
|
|
|
print("Se ha proporcionado una cantidad mayor de sensores para el tipo HB100. Máximo definido son 4 sensores.")
|
|
|
|
|
|
self.write_status("[WARNING] Se asignaron más de 4 sensores para el tipo HB100.")
|
|
|
|
|
|
else:
|
|
|
name = 'sensor_HB{:02d}'.format(n+1)
|
|
|
self.dataOut[name] = sensor(name,key)
|
|
|
|
|
|
if list_RCWL is not None:
|
|
|
for n,key in enumerate(list_RCWL.keys()):
|
|
|
if(n>=self.vars.get("MAX_NUMBER_SENSORS")):
|
|
|
if self.debug:
|
|
|
print("Se ha proporcionado una cantidad mayor de sensores para el tipo RCWL. Máximo definido son 4 sensores.")
|
|
|
|
|
|
self.write_status("[WARNING] Se asignaron más de 4 sensores para el tipo RCWL.")
|
|
|
|
|
|
else:
|
|
|
name = 'sensor_RCWL{:02d}'.format(n+1)
|
|
|
|
|
|
self.dataOut[name] = sensor(name,key)
|
|
|
|
|
|
|
|
|
if list_ina is not None and self.flag_ina:
|
|
|
|
|
|
for n, key in enumerate(list_ina.keys()):
|
|
|
|
|
|
name = 'ina_{:02d}'.format(n+1)
|
|
|
address = int(list_ina[key])
|
|
|
try:
|
|
|
self.dataOut[name] = ina(name,key,address)
|
|
|
except:
|
|
|
if self.debug:
|
|
|
print(f"[ERROR] Error iniciando sensor ina: {traceback.format_exc()}")
|
|
|
self.write_status(f"[ERROR] Error iniciando sensor ina: {traceback.format_exc()}")
|
|
|
|
|
|
|
|
|
#--------------------- others sensors and modules -----------------#
|
|
|
if self.flag_lidar:
|
|
|
try:
|
|
|
self.dataOut['lidar'] = lidar(name='lidar',key='lidar')
|
|
|
|
|
|
self.dataOut['lidar'].TIME_LIDAR_ON = self.vars.get("TIME_LIDAR_ON",30)
|
|
|
self.dataOut['lidar'].TIME_LIDAR_OFF = self.vars.get("TIME_LIDAR_OFF",60)
|
|
|
self.dataOut['lidar'].min_H = self.vars.get("MIN_HEIGHT_WATER_FOR_LIDAR",5)
|
|
|
self.dataOut['lidar'].minus_H = self.vars.get("MIN_MINUS_HEIGHT_FOR_LIDAR",4)*-1
|
|
|
self.dataOut['lidar'].rare_height = self.vars.get("RARE_HEIGHT",70)
|
|
|
except:
|
|
|
if self.debug:
|
|
|
print(f"[ERROR] Error iniciando sensor: {traceback.format_exc()}")
|
|
|
self.write_status(f"[ERROR] Error iniciando sensor: {traceback.format_exc()}")
|
|
|
except:
|
|
|
|
|
|
if self.debug:
|
|
|
print("Ocurrió un error en la preparación de la trama de configuración.")
|
|
|
|
|
|
error = traceback.format_exc()
|
|
|
|
|
|
self.write_status(f"Ocurrió un error en la preparación de la trama de configuración. Copia del error: {error}")
|
|
|
|
|
|
else:
|
|
|
return
|
|
|
|
|
|
def __config(self,):
|
|
|
|
|
|
#------------------------ revisamos algunas configuraciones ----------------#
|
|
|
|
|
|
if self.flag_camera:
|
|
|
|
|
|
pin = self.camera_keys.get("pin_camera",None)
|
|
|
|
|
|
if pin == None:
|
|
|
|
|
|
self.write_status("[ERROR] Pin del control de camara no ha sido definido. Se controlará el tomado de fotografías por Software.")
|
|
|
|
|
|
else:
|
|
|
# Configuramos el pin como salida
|
|
|
GPIO.setup(pin,GPIO.OUT)
|
|
|
|
|
|
self.camera = camera(flag=self.flag_camera,pin=pin,obj=self.obj_vars)
|
|
|
else:
|
|
|
|
|
|
self.camera = camera(flag=False,pin=None,obj=self.obj_vars)
|
|
|
|
|
|
|
|
|
#-------------------------- Cargamos el modo ---------------------------------#
|
|
|
|
|
|
self.string_model = load_version()
|
|
|
|
|
|
|
|
|
if (self.inference_mode == 'video' and 'Raspberry Pi Zero' in model) or self.inference_mode == 'photo':
|
|
|
|
|
|
if self.debug:
|
|
|
|
|
|
print("Se ha cambiado el modo de inferencia automaticamente. Debido a que los requisitos del sistema no son soportados. Actual inferencia de ML a Server.")
|
|
|
|
|
|
self.write_status("Se ha cambiado el modo de inferencia automaticamente. Debido a que los requisitos del sistema no son soportados. Actual inferencia de ML a Server.")
|
|
|
|
|
|
self.inference_mode = 'server'
|
|
|
|
|
|
|
|
|
def __load_vars(self):
|
|
|
|
|
|
self.path_save = "/data"
|
|
|
|
|
|
self.path_save_img = os.path.join(self.path_save,'img')
|
|
|
self.path_save_json = os.path.join(self.path_save,'json')
|
|
|
|
|
|
|
|
|
self.semaphore = threading.Semaphore(1)
|
|
|
self.semaphore_estimator = threading.Semaphore(1)
|
|
|
self.semaphore_get_video = threading.Semaphore(1)
|
|
|
|
|
|
self.manager = Manager()
|
|
|
self.queue = self.manager.Queue()
|
|
|
self.queue_estimator = self.manager.Queue()
|
|
|
|
|
|
self.obj_vars = VarsJons()
|
|
|
self.estimator = estimator(self.obj_vars,)
|
|
|
|
|
|
self.debug = self.obj_vars.debug
|
|
|
self.vars = self.obj_vars.vars
|
|
|
self.vars_mqtt = self.obj_vars.vars_mqtt
|
|
|
self.store_data = self.obj_vars.store_data
|
|
|
self.vars_gpio = self.obj_vars.vars_gpio
|
|
|
self.inference_mode = self.obj_vars.inference_mode
|
|
|
self.camera_keys = self.obj_vars.camera
|
|
|
|
|
|
self.flag_camera = self.vars_gpio.get("camera", True)
|
|
|
self.flag_lidar = self.vars_gpio.get("lidar",True)
|
|
|
self.flag_ina = self.vars_gpio.get("ina",True)
|
|
|
|
|
|
self.id = str(self.obj_vars.id)
|
|
|
#----------------- share variables --------------------#
|
|
|
|
|
|
self.share_estimator = multiprocessing.Value('d', 0)
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
|
#---------- Esperamos el inicio del sistema ---------#
|
|
|
|
|
|
|
|
|
|
|
|
sleep(4)
|
|
|
|
|
|
#Guardamos el timestamp de inicio del sistema
|
|
|
self.timestamp_init_system = datetime.now().timestamp()
|
|
|
#------------------ get parameters ------------------#
|
|
|
|
|
|
self.__load_vars()
|
|
|
|
|
|
#------------------- Prints -------------------------#
|
|
|
if self.debug:
|
|
|
print(10*"-")
|
|
|
print("Iniciando sistema")
|
|
|
print(r"Ruta de guardado {}".format(self.store_data))
|
|
|
print(r"Ruta de guardado de imagenes {}".format(self.path_save_img))
|
|
|
print(r"Ruta de guardado de archivos .json: {}".format(self.path_save_json))
|
|
|
print(10*"-")
|
|
|
|
|
|
#----------------- Creamos paths ----------------------#
|
|
|
|
|
|
self.create_path(self.path_save_img)
|
|
|
self.create_path(self.path_save_json)
|
|
|
self.create_path(self.path_save)
|
|
|
|
|
|
#------------------ Read metadatos --------------------#
|
|
|
|
|
|
self.read_metadata()
|
|
|
|
|
|
#-------------------- Escribiendo logs -----------------#
|
|
|
self.write_status(30*"-")
|
|
|
self.write_status("Sistema iniciado")
|
|
|
self.write_status(r"Ruta de guardado {}".format(self.store_data))
|
|
|
self.write_status(r"Ruta de guardado de imagenes {}".format(self.path_save_img))
|
|
|
self.write_status(r"Ruta de guardado de archivos .json: {}".format(self.path_save_json))
|
|
|
self.write_status(30*"-")
|
|
|
|
|
|
|
|
|
#-------------------- Leyendo setup --------------------#
|
|
|
|
|
|
self.make_setup()
|
|
|
|
|
|
#--------------------- Others config -------------------#
|
|
|
|
|
|
self.__config()
|
|
|
#------------------- Run process -----------------------#
|
|
|
|
|
|
self.run_process()
|
|
|
|
|
|
#------------------ Fin script -------------------------#
|
|
|
|
|
|
def run_process(self,):
|
|
|
|
|
|
if self.debug:
|
|
|
print("Realizando la derivación de procesos")
|
|
|
|
|
|
self.write_status("Realizando la derivación de procesos.")
|
|
|
|
|
|
#--------------------- Creamos la variable compartida ----------------------#
|
|
|
#Esta variable compartida nos va a permitir compartir datos para mostrarlo en Flask
|
|
|
|
|
|
json_dict = self.manager.dict()
|
|
|
|
|
|
lock = Lock()
|
|
|
#-------------------------- init process --------------------#
|
|
|
try:
|
|
|
self.process_sensores = multiprocessing.Process(target = self.method_sensores, args=(self.share_estimator,json_dict,lock,))
|
|
|
self.process_mqtt = multiprocessing.Process(target = self.method_mqtt,args=(json_dict,lock,))
|
|
|
self.process_estimator= multiprocessing.Process(target= self.method_estimator,args=(self.share_estimator,lock,))
|
|
|
self.process_flask = multiprocessing.Process(target=self.method_flask,args=(json_dict,lock,))
|
|
|
|
|
|
|
|
|
self.process_sensores.start()
|
|
|
self.process_mqtt.start()
|
|
|
self.process_estimator.start()
|
|
|
self.process_flask.start()
|
|
|
|
|
|
|
|
|
if self.inference_mode == 'video' or self.inference_mode == 'server':
|
|
|
|
|
|
self.process_video = multiprocessing.Process(target=self.method_video,args=())
|
|
|
self.process_video.start()
|
|
|
self.process_video.join()
|
|
|
|
|
|
|
|
|
self.process_mqtt.join()
|
|
|
self.process_sensores.join()
|
|
|
self.process_estimator.join()
|
|
|
self.process_flask.join()
|
|
|
|
|
|
|
|
|
except:
|
|
|
|
|
|
error = traceback.format_exc()
|
|
|
|
|
|
if self.debug:
|
|
|
print("Error en la inialización de procesos.")
|
|
|
|
|
|
error = traceback.format_exc()
|
|
|
self.write_status(f"Error en la inialización de procesos. Copia del error {error}")
|
|
|
|
|
|
def method_flask(self,json_dict,lock):
|
|
|
|
|
|
sleep(2)
|
|
|
|
|
|
if self.debug:
|
|
|
|
|
|
print("Metodo Flask Lanzado")
|
|
|
|
|
|
app = Flask(__name__)
|
|
|
self.write_status(f"[Flask] Metodo lanzado")
|
|
|
|
|
|
|
|
|
@app.route('/')
|
|
|
def home():
|
|
|
return render_template("main.html")
|
|
|
|
|
|
def get_image():
|
|
|
|
|
|
latest_name = sorted(os.listdir("/data/img"))[-1]
|
|
|
|
|
|
return os.path.join("data/img",latest_name)
|
|
|
|
|
|
|
|
|
@app.get("/update")
|
|
|
def update():
|
|
|
|
|
|
try:
|
|
|
data = dict()
|
|
|
with locked(lock):
|
|
|
|
|
|
data = dict(json_dict)
|
|
|
|
|
|
x = jsonify(data)
|
|
|
|
|
|
except:
|
|
|
if self.debug:
|
|
|
print(f"[FlaskUPDTErr] Error: {traceback.format_exc()}")
|
|
|
self.write_status(f"[FlaskUPDTErr] Error: {traceback.format_exc()}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
return x
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
app.run(debug=False,port=5000,host='0.0.0.0')
|
|
|
|
|
|
except:
|
|
|
if self.debug:
|
|
|
print(f"[ErrFlask] Error en flask \n{traceback.format_exc()}")
|
|
|
self.write_status(f"[ErrFlask] Error en flask \n{traceback.format_exc()}")
|
|
|
|
|
|
else:
|
|
|
if self.debug:
|
|
|
print(f"[ErrFlask] Error en flask \n{traceback.format_exc()}")
|
|
|
self.write_status(f"[Flask] Servidor lanzado")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def method_estimator(self, share,lock):
|
|
|
|
|
|
#----------------------- Carga del modelo IA -------------------------------------#
|
|
|
|
|
|
timestamp_latest = datetime.now().timestamp()
|
|
|
|
|
|
self.estimator.load_weights() #Cargamos los pesos
|
|
|
|
|
|
if self.debug:
|
|
|
print("Proceso estimator ha sido lanzado.")
|
|
|
|
|
|
self.write_status("[Estimator] Proceso estimator ha sido lanzado.")
|
|
|
|
|
|
sleep(5)
|
|
|
|
|
|
|
|
|
while 1:
|
|
|
|
|
|
flag = False
|
|
|
|
|
|
try:
|
|
|
|
|
|
self.semaphore_estimator.acquire()
|
|
|
|
|
|
while not self.queue_estimator.empty():
|
|
|
|
|
|
flag = True #Se realizó la inferencia
|
|
|
timestamp_latest = datetime.now().timestamp()
|
|
|
|
|
|
if self.inference_mode == 'photo':
|
|
|
self.estimator.image = self.queue_estimator.get()
|
|
|
|
|
|
elif self.inference_mode == 'video':
|
|
|
self.estimator.video = self.queue_estimator.get()
|
|
|
|
|
|
elif self.inference_mode == 'server':
|
|
|
self.estimator.video = self.queue_estimator.get()
|
|
|
|
|
|
self.estimator.run()
|
|
|
|
|
|
self.semaphore_estimator.release()
|
|
|
|
|
|
timestamp = datetime.now().timestamp()
|
|
|
|
|
|
if (timestamp - timestamp_latest > 30*60):
|
|
|
'''
|
|
|
Controlamos que la ultima inferencia no se haya realizado hace más de 30 minutos.
|
|
|
Esto es util para prevenir cualquier tipo de incidente, ya sea cámara malograda entre otras formas.
|
|
|
'''
|
|
|
|
|
|
self.estimator.inference_value = 10
|
|
|
|
|
|
|
|
|
except:
|
|
|
exc = traceback.format_exc()
|
|
|
self.write_status(f"[ERROR] Ocurrió un error al realizar inferencia. Copia del error: {exc}")
|
|
|
sleep(10)
|
|
|
|
|
|
else:
|
|
|
|
|
|
'''
|
|
|
Solo compartimos el valor de la inferencia al otro estimator. Debido a que cada multiproceso
|
|
|
maneja una copia diferente del objeto Estimator. Por más que realicemos un cálculo, no se verá
|
|
|
reflejado en el otro objeto.
|
|
|
'''
|
|
|
if flag:
|
|
|
self.write_status("[Estimator] Se realizó la inferencia en el Queue Estimator. Variable ha sido compartida entre multiprocesos.")
|
|
|
|
|
|
with locked(lock):
|
|
|
if self.estimator.inference_value == None:
|
|
|
inference = 10
|
|
|
else:
|
|
|
inference = self.estimator.inference_value
|
|
|
|
|
|
share.value = inference
|
|
|
|
|
|
sleep(1)
|
|
|
|
|
|
gc.collect()
|
|
|
|
|
|
sleep(3)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def send_uart(self,uart,string):
|
|
|
|
|
|
try:
|
|
|
uart.write(string.encode('utf-8'))
|
|
|
|
|
|
except:
|
|
|
|
|
|
if self.debug:
|
|
|
print("Ocurrió un error al enviar datos por puerto UART.")
|
|
|
|
|
|
self.write_status("[ERROR] Ocurrió un error al enviar datos por el puerto UART.")
|
|
|
|
|
|
|
|
|
|
|
|
def __get_internet_bytes(self,):
|
|
|
|
|
|
a, b = psutil.net_io_counters().bytes_recv,psutil.net_io_counters().bytes_sent
|
|
|
recv = a - self.bytes_recv
|
|
|
send = b - self.bytes_send
|
|
|
|
|
|
self.bytes_recv_total += recv
|
|
|
self.bytes_send_total += send
|
|
|
|
|
|
self.bytes_recv = a
|
|
|
self.bytes_send = b
|
|
|
|
|
|
file_bytes_internet = "/others/b4g.txt"
|
|
|
|
|
|
#--------------------- control -------------------------------#
|
|
|
tmp_write = list()
|
|
|
|
|
|
try:
|
|
|
|
|
|
if os.path.exists(file_bytes_internet):
|
|
|
|
|
|
|
|
|
with open(file_bytes_internet,"r") as file:
|
|
|
|
|
|
line = numpy.array(file.read().splitlines(),dtype=object)
|
|
|
|
|
|
|
|
|
if line.shape[0]>1:
|
|
|
|
|
|
tmp_write.append(line[0])
|
|
|
|
|
|
tmp_write.append(f"{self.bytes_send_total}#{self.bytes_recv_total}")
|
|
|
|
|
|
with open(file_bytes_internet,"w") as file:
|
|
|
|
|
|
file.write(l+'\n' for l in tmp_write)
|
|
|
|
|
|
else:
|
|
|
|
|
|
self.__create_file_bytes()
|
|
|
|
|
|
|
|
|
gc.collect()
|
|
|
except:
|
|
|
|
|
|
self.write_status(f"[ERROR] Error: {traceback.format_exc()}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __process_values(self,share,flask_,lock):
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
tmp_data = dict()
|
|
|
|
|
|
#------------------------ Calculamos el uso de internet --------------#
|
|
|
|
|
|
|
|
|
total_internet = round((self.bytes_recv_total + self.bytes_send_total)/(1024*1024),3)
|
|
|
#---------------------------------------------------------------------#
|
|
|
|
|
|
tmp_data["current_timestamp"] = datetime.now().timestamp()
|
|
|
tmp_data['init_timestamp'] = self.timestamp_init_system
|
|
|
tmp_data['version_rpi'] = self.__version__
|
|
|
tmp_data['location'] = self.obj_vars.location
|
|
|
tmp_data['latitude'] = self.obj_vars.latitude
|
|
|
tmp_data['longitude'] = self.obj_vars.longitude
|
|
|
tmp_data["id"] = self.id
|
|
|
tmp_data['camera_status'] = self.camera.status
|
|
|
tmp_data["disk_percent_use"] = psutil.disk_usage('.').percent
|
|
|
tmp_data['DEVICE_INFO'] = self.string_model
|
|
|
tmp_data['CPU_usage'] = psutil.cpu_percent(interval=1)
|
|
|
tmp_data['temperature'] = self.__get_temperature()
|
|
|
tmp_data["ALL_USE_E_MB"] = round(total_internet,4)
|
|
|
|
|
|
|
|
|
list_keys = self.dataOut.keys()
|
|
|
|
|
|
for key in list_keys:
|
|
|
|
|
|
if key == 'lidar':
|
|
|
|
|
|
|
|
|
tmp_data['lidar_status'] = self.dataOut[key].activate
|
|
|
tmp_data['lidar_dH'] = self.dataOut[key].dH_
|
|
|
tmp_data['h0'] = self.dataOut[key].H0
|
|
|
|
|
|
if self.dataOut[key].ERROR_WIRE == True:
|
|
|
|
|
|
tmp_data[key] = "WIRE_ERROR"
|
|
|
|
|
|
else:
|
|
|
tmp_data[key] = "OK"
|
|
|
|
|
|
elif 'ina' in key:
|
|
|
key_ = self.dataOut[key].key
|
|
|
|
|
|
tmp_data[f'bus_voltage_{key_}'] = round(self.dataOut[key].bus_voltage,4)
|
|
|
tmp_data[f'shunt_voltage_{key_}']= round(self.dataOut[key].shunt_voltage,4)
|
|
|
tmp_data[f'current_{key_}'] = round(self.dataOut[key].current,4)
|
|
|
|
|
|
else:
|
|
|
|
|
|
tmp_data[key] = self.dataOut[key].activate
|
|
|
tmp_data[f'it_{key}'] = self.dataOut[key].current_sensor()
|
|
|
|
|
|
#------------------------------- Inference method ------------------------------#
|
|
|
|
|
|
self.estimator.dataOut = self.dataOut
|
|
|
|
|
|
with locked(lock):
|
|
|
self.estimator.share = float(share.value)
|
|
|
|
|
|
tmp_data['camera_inference'] = self.estimator.share
|
|
|
tmp_data['string_inference'] = self.estimator.string_status
|
|
|
|
|
|
tmp_data['status'] = self.estimator.activate
|
|
|
tmp_data['count_status'] = self.estimator.activate_count
|
|
|
|
|
|
#-----------------------------------------------------#
|
|
|
#--------------- Compartiendo data a Flask -----------#
|
|
|
#-----------------------------------------------------#
|
|
|
|
|
|
try:
|
|
|
with locked(lock):
|
|
|
|
|
|
flask_.clear() #Limpiamos todos los datos compartidos
|
|
|
|
|
|
for key in tmp_data.keys():
|
|
|
|
|
|
flask_[key] = tmp_data[key]
|
|
|
|
|
|
except:
|
|
|
if self.debug:
|
|
|
print(f"[ERROR] Error al escribir datos en la variable compartida: {traceback.format_exc()}")
|
|
|
|
|
|
self.write_status(f"[ERROR] Error al escribir datos en la variable compartida: {traceback.format_exc()}")
|
|
|
|
|
|
|
|
|
payload = {'type':'json',
|
|
|
'content':tmp_data}
|
|
|
|
|
|
if self.store_data:
|
|
|
|
|
|
self.write_data(payload)
|
|
|
|
|
|
|
|
|
return tmp_data
|
|
|
|
|
|
except:
|
|
|
error = (traceback.format_exc())
|
|
|
|
|
|
if self.debug:
|
|
|
print(f"Ocurrió un error al leer los datos.{error}")
|
|
|
|
|
|
self.write_status(f"Ocurrió un error al leer datos. Copia del error {error}.")
|
|
|
|
|
|
|
|
|
|
|
|
def write_data(self,data):
|
|
|
now = datetime.now()
|
|
|
formatted_date_time = now.strftime("%d/%m/%Y %H:%M:%S") + " |"
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
name = 'data.txt'
|
|
|
filename = os.path.join(self.path_save_json,name)
|
|
|
|
|
|
with open(filename,'a') as file:
|
|
|
|
|
|
file.write(formatted_date_time + str(json.dumps(data)) + '\n')
|
|
|
|
|
|
except:
|
|
|
|
|
|
if self.debug:
|
|
|
|
|
|
print(f"Ocurrió un error al guardar los datos en el archivo {name}")
|
|
|
|
|
|
self.write_status(f"[ERROR] Ocurrió un error al guardar los datos en el archivo {name}.")
|
|
|
else:
|
|
|
|
|
|
self.count_write_data += 1
|
|
|
|
|
|
try:
|
|
|
|
|
|
if(self.count_write_data == 100):
|
|
|
|
|
|
self.count_write_data = 0
|
|
|
self.__handle_size(filename)
|
|
|
|
|
|
except:
|
|
|
exc = traceback.format_exc()
|
|
|
self.write_status(f"[Error] Error desconocido en {exc}")
|
|
|
|
|
|
def method_mqtt(self,payload_json,lock):
|
|
|
|
|
|
#----------------------------- Control de internet --------------------------#
|
|
|
|
|
|
# Revisamos el internet cada 30 minutos
|
|
|
timestamp_internet = datetime.now().timestamp()
|
|
|
timestamp_mqtt = datetime.now().timestamp()
|
|
|
|
|
|
values = list()
|
|
|
|
|
|
self.check_internet()
|
|
|
|
|
|
while 1:
|
|
|
try:
|
|
|
while 1:
|
|
|
if self.flag_internet:
|
|
|
|
|
|
CLIENT_MQTT = str(self.id +"_pub")
|
|
|
mqtt_user = self.vars_mqtt.get("mqtt_user")
|
|
|
mqtt_pass = self.vars_mqtt.get("mqtt_pass")
|
|
|
mqtt_broker = self.vars_mqtt.get("mqtt_broker")
|
|
|
mqtt_port = self.vars_mqtt.get("mqtt_port")
|
|
|
|
|
|
|
|
|
|
|
|
self.client = mqtt.Client(CLIENT_MQTT)
|
|
|
self.client_image = mqtt.Client(CLIENT_MQTT+'image')
|
|
|
|
|
|
|
|
|
self.client.on_connect = on_connect
|
|
|
self.client_image.on_connect = on_connect
|
|
|
|
|
|
self.client.on_disconnect = on_disconnect
|
|
|
self.client_image.on_disconnect = on_disconnect
|
|
|
|
|
|
|
|
|
self.client.username_pw_set(mqtt_user, mqtt_pass)
|
|
|
self.client_image.username_pw_set(mqtt_user,mqtt_pass)
|
|
|
|
|
|
|
|
|
self.client.connect(mqtt_broker, mqtt_port,keepalive=300)
|
|
|
self.client_image.connect(mqtt_broker,mqtt_port,keepalive=300)
|
|
|
|
|
|
|
|
|
|
|
|
break
|
|
|
|
|
|
else:
|
|
|
|
|
|
#Esperamos un minuto
|
|
|
if self.debug:
|
|
|
print("No se cuenta con internet. Esperamos un minuto para lanzar el proceso MQTT.")
|
|
|
|
|
|
self.write_status("[WARNING] No se cuenta con internet. Esperando un minuto para lanzar el proceso MQTT.")
|
|
|
|
|
|
sleep(30)
|
|
|
self.check_internet()
|
|
|
|
|
|
|
|
|
if self.debug:
|
|
|
|
|
|
print("El proceso MQTT ha sido lanzado")
|
|
|
|
|
|
self.write_status("El proceso MQTT ha sido lanzado.")
|
|
|
|
|
|
photo_topic = os.path.join(self.vars_mqtt.get("photo_topic"),self.id)
|
|
|
data_topic = os.path.join(self.vars_mqtt.get("data_topic"),self.id)
|
|
|
|
|
|
while 1:
|
|
|
|
|
|
try:
|
|
|
|
|
|
while not self.queue.empty():
|
|
|
|
|
|
self.semaphore.acquire()
|
|
|
|
|
|
mensaje = self.queue.get()
|
|
|
|
|
|
self.semaphore.release()
|
|
|
|
|
|
|
|
|
if mensaje["type"] == 'image':
|
|
|
|
|
|
content = mensaje['content']
|
|
|
|
|
|
try:
|
|
|
|
|
|
if self.debug:
|
|
|
print("Enviando imagen por MQTT ...")
|
|
|
|
|
|
self.write_status("Enviando imagen por MQTT.")
|
|
|
|
|
|
self.client_image.publish(photo_topic,content['images'],qos=1)
|
|
|
|
|
|
if self.debug:
|
|
|
print("Imagen ha sido enviada por MQTT")
|
|
|
|
|
|
self.write_status("Imagen ha sido enviado por MQTT.")
|
|
|
|
|
|
except:
|
|
|
|
|
|
if self.debug:
|
|
|
print("Hubo un error al enviar la imagen. Se devuelve a la cola de envío.")
|
|
|
|
|
|
self.write_status("[ERROR] Error por envío de imagen MQTT.")
|
|
|
|
|
|
gc.collect()
|
|
|
|
|
|
|
|
|
|
|
|
if (datetime.now().timestamp() - timestamp_mqtt > self.vars.get("TIME_SEND_MQTT",3)):
|
|
|
|
|
|
try:
|
|
|
with locked(lock):
|
|
|
data = dict(payload_json)
|
|
|
|
|
|
tmp = dict()
|
|
|
tmp['values'] = data
|
|
|
|
|
|
payload = json.dumps(tmp)
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
self.client.publish(data_topic,payload,qos=1)
|
|
|
except:
|
|
|
|
|
|
self.write_status(f"[ERROR] Error al enviar datos por MQTT: {traceback.format_exc()}")
|
|
|
|
|
|
|
|
|
except:
|
|
|
|
|
|
self.write_status(f"[ERROR] Error al leer datos del buffer compartido: {traceback.format_exc()}")
|
|
|
|
|
|
finally:
|
|
|
|
|
|
timestamp_mqtt = datetime.now().timestamp()
|
|
|
|
|
|
|
|
|
self.client_image.loop()
|
|
|
self.client.loop()
|
|
|
|
|
|
|
|
|
except:
|
|
|
|
|
|
self.write_status("[ERROR] Existe un problema al procesar el envío MQTT.")
|
|
|
sleep(1)
|
|
|
finally:
|
|
|
|
|
|
if(datetime.now().timestamp() - timestamp_internet > self.vars.get("SAMPLING_INTERNET",120)):
|
|
|
|
|
|
self.check_internet()
|
|
|
|
|
|
timestamp_internet = datetime.now().timestamp()
|
|
|
|
|
|
if not self.flag_internet:
|
|
|
|
|
|
if self.debug:
|
|
|
print("Se perdió la conexión a Internet.")
|
|
|
|
|
|
self.write_status("[ERROR] Se perdió la conexión a Internet.")
|
|
|
break
|
|
|
|
|
|
except:
|
|
|
if self.debug:
|
|
|
print(f"Ocurrió un error no manejado en la función principal de lahares. Copia del error: {error}")
|
|
|
|
|
|
error = traceback.format_exc()
|
|
|
self.write_status(f"Ocurrió un error inesperado. Copia del error: {error}")
|
|
|
|
|
|
sleep(5)
|
|
|
|
|
|
|
|
|
def method_video(self,):
|
|
|
|
|
|
'''
|
|
|
En este metodo extraemos 5 segundos de video o 135 frames para el entrenamiento del modelo.
|
|
|
|
|
|
UPDATE:
|
|
|
- Por temas de memoria, solo se trabajará con 10 frames. El formato estará comprimido en gzip
|
|
|
para poder obtener un menor tamaño en memoria mientras se comprime los archivos.
|
|
|
-
|
|
|
'''
|
|
|
self.check_internet(False)#Verbose False
|
|
|
|
|
|
if self.debug:
|
|
|
print("Lanzado el metodo para la adquisición de video.")
|
|
|
|
|
|
|
|
|
self.write_status("[METHOD] Lanzado el metodo para la adquisición de video")
|
|
|
|
|
|
timestamp_sampling = datetime.now().timestamp()
|
|
|
|
|
|
duration_frames = 10
|
|
|
sleep(4)
|
|
|
|
|
|
while 1:
|
|
|
|
|
|
current_time = datetime.now().timestamp()
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
|
if (( timestamp_sampling - current_time )< 0 and self.camera.status and self.semaphore_get_video.acquire(False)):
|
|
|
|
|
|
self.write_status("Obteniendo video frames .....")
|
|
|
|
|
|
try:
|
|
|
|
|
|
count_frames = 0
|
|
|
|
|
|
file_video = None
|
|
|
file_video = io.BytesIO()
|
|
|
|
|
|
|
|
|
self.camera.control_brightness(brightness=100)#Realiza la validación de si es necesario elevar el brillo o no.
|
|
|
|
|
|
vid = cv2.VideoCapture(self.camera.url_rstp)
|
|
|
|
|
|
time_sleep = 0.35 if duration_frames<=10 else 0.2
|
|
|
|
|
|
now = datetime.now().timestamp()
|
|
|
|
|
|
with gzip.GzipFile(fileobj=file_video,mode='wb',compresslevel=9) as gz:
|
|
|
|
|
|
while 1:
|
|
|
now2 = datetime.now().timestamp()
|
|
|
|
|
|
try:
|
|
|
_,_ = vid.read()
|
|
|
|
|
|
except:
|
|
|
self.write_status("Error al capturar el frame")
|
|
|
|
|
|
else:
|
|
|
try:
|
|
|
if (now2 - now) > time_sleep:
|
|
|
|
|
|
while 1:
|
|
|
|
|
|
try:
|
|
|
|
|
|
_,frame = vid.read()
|
|
|
|
|
|
if frame is not None:
|
|
|
|
|
|
frame = cv2.resize(frame,(640,360))
|
|
|
frame = cv2.cvtColor(frame,cv2.COLOR_BGR2RGB)
|
|
|
frame = (frame.astype(numpy.uint8)).tobytes()
|
|
|
|
|
|
gz.write(len(frame).to_bytes(4,'big'))
|
|
|
gz.write(deepcopy(frame))
|
|
|
|
|
|
frame = None
|
|
|
gc.collect()
|
|
|
|
|
|
count_frames += 1
|
|
|
|
|
|
now = datetime.now().timestamp()
|
|
|
|
|
|
break
|
|
|
|
|
|
except:
|
|
|
self.write_status(f"Ocurrió un error en video: {traceback.format_exc()}")
|
|
|
except:
|
|
|
self.write_status(f"Error al comprimir u obtener frames: {traceback.format_exc()}")
|
|
|
|
|
|
if count_frames == 10:
|
|
|
self.write_status("Se obtuvo 10 frames.")
|
|
|
break
|
|
|
|
|
|
try:
|
|
|
vid.release()
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
self.camera.control_brightness(brightness=0)
|
|
|
self.semaphore_get_video.release()
|
|
|
|
|
|
#-------------------------------------------------------------------#
|
|
|
# Por temas de latencia en tiempo, se realizará la inferencia en este metodo
|
|
|
#-------------------------------------------------------------------#
|
|
|
|
|
|
# if self.inference_mode == 'server':
|
|
|
|
|
|
|
|
|
# url = "http://38.10.105.243:7777/predict"
|
|
|
|
|
|
|
|
|
# input_data = {'instances':base64.b64encode(file_video.getvalue()).decode('utf-8'),
|
|
|
# 'id_user':"test-jicamarca",
|
|
|
# 'request_format':True,
|
|
|
# 'shape':(360,640)}
|
|
|
|
|
|
# headers = {
|
|
|
# 'Content-Type': 'application/json',
|
|
|
# 'Content-Encoding': 'gzip-B64',
|
|
|
# }
|
|
|
# input_data = json.dumps(input_data)
|
|
|
|
|
|
# compress = io.BytesIO()
|
|
|
|
|
|
# with gzip.GzipFile(fileobj=compress, mode='wb', compresslevel=9) as gz2:
|
|
|
# gz2.write(input_data.encode('utf-8'))
|
|
|
|
|
|
# if self.flag_internet:
|
|
|
# #Se cuenta con internet para realizar la inferencia al servidor.
|
|
|
# try:
|
|
|
# resp = requests.post(url,data=compress.getvalue(),headers=headers)
|
|
|
|
|
|
# except:
|
|
|
|
|
|
# compress = None
|
|
|
# file_video = None
|
|
|
# gc.collect()
|
|
|
# inference = None
|
|
|
# else:
|
|
|
# compress = None
|
|
|
# file_video = None
|
|
|
# gc.collect()
|
|
|
|
|
|
# if resp.status_code == 200:
|
|
|
# #El envio fue un exito
|
|
|
# inference = 1 - resp.json()['predictions'][0][0]
|
|
|
# self.write_status(f"Inferencia al servidor realizado con exito. Valor de inferencia: {inference}.")
|
|
|
# else:
|
|
|
# inference = None
|
|
|
# self.write_status(f"Se obtuvo otro codigo de respuesta al realizar inferencia al servidor. {resp.status_code}")
|
|
|
|
|
|
|
|
|
#-------------------------------------------------------------------#
|
|
|
#-------------------------------------------------------------------#
|
|
|
#-------------------------------------------------------------------#
|
|
|
|
|
|
|
|
|
self.semaphore_estimator.acquire()
|
|
|
|
|
|
if self.queue_estimator.qsize()>=2:
|
|
|
self.queue_estimator.get()
|
|
|
gc.collect()
|
|
|
|
|
|
|
|
|
self.queue_estimator.put({
|
|
|
'video': deepcopy(file_video),
|
|
|
'timestamp': datetime.now().timestamp()
|
|
|
})
|
|
|
|
|
|
file_video = None
|
|
|
gc.collect()
|
|
|
|
|
|
self.semaphore_estimator.release()
|
|
|
|
|
|
|
|
|
|
|
|
except:
|
|
|
|
|
|
try:
|
|
|
vid.release()
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
gc.collect()
|
|
|
self.write_status(f"[ERROR] Ocurrió un error en el proceso de obtener video. Se volverá a intentar en 15 segundos. Copia del error: {traceback.format_exc()}")
|
|
|
timestamp_sampling = datetime.now().timestamp() + 15
|
|
|
|
|
|
try:
|
|
|
self.semaphore_get_video.release()
|
|
|
except:
|
|
|
self.write_status(f"[ERROR] Code 100 Method")
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
if count_frames == 10:
|
|
|
timestamp_sampling = datetime.now().timestamp() + self.vars.get("SAMPLING_TIME_VIDEO")
|
|
|
self.write_status(f"Se agregó 10 frames en bytes al queue estimator.")
|
|
|
else:
|
|
|
timestamp_sampling = datetime.now().timestamp() + 15
|
|
|
self.write_status(f"Frames insuficientes, se volverá a intentar en 15 segundos.")
|
|
|
|
|
|
elif (( current_time - timestamp_sampling )> 90):
|
|
|
self.write_status(f"[VIDEO] No se puede tomar el semaforo por más de 90 segundos. ")
|
|
|
timestamp_sampling = current_time
|
|
|
|
|
|
sleep(1)
|
|
|
|
|
|
|
|
|
except:
|
|
|
self.write_status(f"[ERROR VIDEO] {traceback.format_exc()}")
|
|
|
|
|
|
else:
|
|
|
#Borrar
|
|
|
with open("/tools/live_video.txt",'w') as f:
|
|
|
|
|
|
date = datetime.now()
|
|
|
chain = f'{date.day} {date.hour}:{date.minute}:{date.second}'
|
|
|
f.write(chain)
|
|
|
|
|
|
sleep(3)
|
|
|
|
|
|
|
|
|
def method_sensores(self,share,flask_,lock):
|
|
|
|
|
|
timestamp_camera = datetime.now().timestamp()
|
|
|
timestamp_sampling = datetime.now().timestamp()
|
|
|
self.timestamp_internet_ = datetime.now().timestamp()
|
|
|
|
|
|
|
|
|
sleep(1)
|
|
|
|
|
|
self.check_internet(False)
|
|
|
|
|
|
while 1:
|
|
|
|
|
|
#------------------- inicio de proceso ----------------------------#
|
|
|
try:
|
|
|
current_time = datetime.now().timestamp()
|
|
|
|
|
|
self.__get_internet_bytes()
|
|
|
|
|
|
if(timestamp_sampling - current_time < 0 ):
|
|
|
|
|
|
'''
|
|
|
Aqui se ejecutan los estados de los sensores.
|
|
|
'''
|
|
|
|
|
|
try:
|
|
|
|
|
|
key_sensor = self.dataOut.keys()
|
|
|
|
|
|
for key in key_sensor:
|
|
|
|
|
|
try:
|
|
|
self.dataOut[key].run()
|
|
|
except:
|
|
|
if self.debug:
|
|
|
print(f"[ERROR] Error actualizando los datos: {traceback.format_exc()}")
|
|
|
self.write_status(f"[ERROR] Error actualizando los datos: {traceback.format_exc()}")
|
|
|
|
|
|
self.__process_values(share,flask_,lock)
|
|
|
|
|
|
except:
|
|
|
if self.debug:
|
|
|
print(f"[ERROR] Error decodificando los datos: {traceback.format_exc()}")
|
|
|
self.write_status(f"[ERROR] Error decodificando los datos: {traceback.format_exc()}")
|
|
|
|
|
|
finally:
|
|
|
|
|
|
timestamp_sampling = current_time + self.vars.get("SAMPLING_TIME_SENSOR")
|
|
|
|
|
|
|
|
|
if(timestamp_camera - current_time < 0 and self.camera.status and self.semaphore_get_video.acquire(False)):
|
|
|
|
|
|
'''
|
|
|
Se adquiere fotos para enviarse por MQTT.
|
|
|
'''
|
|
|
frame = None
|
|
|
|
|
|
try:
|
|
|
|
|
|
self.camera.control_brightness(brightness=100)
|
|
|
vid = cv2.VideoCapture(self.camera.url_rstp)
|
|
|
|
|
|
while 1:
|
|
|
_, frame = vid.read()
|
|
|
|
|
|
if frame is not None:
|
|
|
|
|
|
break
|
|
|
|
|
|
except:
|
|
|
try:
|
|
|
vid.release()
|
|
|
self.camera.control_brightness(brightness=0)
|
|
|
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
self.semaphore_get_video.release()
|
|
|
|
|
|
if self.debug:
|
|
|
print(f"Error: {str(traceback.format_exc())}")
|
|
|
|
|
|
self.write_status(f"[ERROR] Error generado en la adquisición de fotografía {str(traceback.format_exc())}")
|
|
|
|
|
|
timestamp_camera = datetime.now().timestamp() + 5
|
|
|
|
|
|
else:
|
|
|
|
|
|
try:
|
|
|
vid.release()
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
self.camera.control_brightness(brightness=0)
|
|
|
self.semaphore_get_video.release()
|
|
|
#---------------------- imagen obtenida ---------------------#
|
|
|
timestamp_camera = datetime.now().timestamp() + self.vars.get("SAMPLING_TIME_CAMERA")
|
|
|
|
|
|
buffer = cv2.imencode('.jpg',frame)[1]
|
|
|
|
|
|
data = {
|
|
|
'images' : buffer.tobytes(),
|
|
|
'timestamp': datetime.now().timestamp()
|
|
|
}
|
|
|
|
|
|
payload = {
|
|
|
'type' : 'image',
|
|
|
'content': data
|
|
|
}
|
|
|
|
|
|
|
|
|
#-----------------------------------------------------------
|
|
|
#Definimos un tamaño de 3 imagenes, para no llenar el buffer
|
|
|
|
|
|
if self.inference_mode == 'photo':
|
|
|
self.semaphore_estimator.acquire()
|
|
|
|
|
|
payload_estimator = {
|
|
|
'image':numpy.array(frame),
|
|
|
'timestamp':datetime.now().timestamp()
|
|
|
}
|
|
|
if self.queue_estimator.qsize()>=3:
|
|
|
self.queue_estimator.get()
|
|
|
|
|
|
self.queue_estimator.put(payload_estimator)
|
|
|
self.semaphore_estimator.release()
|
|
|
|
|
|
#---------------- Guardamos en la cola las imagenes a ser enviadas -----------------#
|
|
|
self.semaphore.acquire()
|
|
|
if self.flag_internet:
|
|
|
self.queue.put(payload)
|
|
|
|
|
|
if self.queue.qsize()>=10:
|
|
|
self.queue.get()
|
|
|
gc.collect()
|
|
|
|
|
|
self.semaphore.release()
|
|
|
|
|
|
gc.collect()
|
|
|
|
|
|
####################################
|
|
|
## Guardando en la ruta de Flask ###
|
|
|
####################################
|
|
|
|
|
|
filename_flask = os.path.join('/tools/static',"latest.jpg")
|
|
|
|
|
|
with open(filename_flask,'wb') as filew:
|
|
|
|
|
|
filew.write(buffer.tobytes())
|
|
|
|
|
|
####################################
|
|
|
### Guardado de datos ###
|
|
|
####################################
|
|
|
|
|
|
|
|
|
if self.store_data:
|
|
|
|
|
|
name = '{}.jpg'.format(current_time)
|
|
|
|
|
|
filename = os.path.join(self.path_save_img,name)
|
|
|
|
|
|
|
|
|
try:
|
|
|
with open(filename,'wb') as file:
|
|
|
|
|
|
file.write(buffer.tobytes())
|
|
|
|
|
|
|
|
|
except:
|
|
|
if self.debug:
|
|
|
print("Ocurrió un error al guardar la imagen {}".format(name))
|
|
|
self.write_status("[ERROR] Ocurrió un error al guardar la imagen.")
|
|
|
|
|
|
else:
|
|
|
if self.debug:
|
|
|
print(f"Se guardó la imagen {name}")
|
|
|
|
|
|
self.write_status(f"Se guardó la imagen {name}.")
|
|
|
|
|
|
gc.collect()
|
|
|
except:
|
|
|
|
|
|
if self.debug:
|
|
|
print("Ocurrió un error en la recolección de datos.")
|
|
|
|
|
|
self.write_status(f"Ocurrió un error no identificado en el proceso de recolección de data sensores. Copia del error: {traceback.format_exc()}")
|
|
|
|
|
|
sleep(5)
|