##// END OF EJS Templates
.
.

File last commit:

r225:3f6ada0723f2
r225:3f6ada0723f2
Show More
others.py
1875 lines | 53.6 KiB | text/x-python | PythonLexer
import gc
import os
import io
import cv2
import json
import pytz
import busio
import board
import gzip
import random
import numpy
import base64
import requests
import traceback
import adafruit_ina219
import RPi.GPIO as GPIO
import adafruit_lidarlite
import urllib.request
from time import sleep
from copy import deepcopy
from PIL import Image, ImageOps
from datetime import datetime,time
from contextlib import contextmanager
from requests.auth import HTTPDigestAuth
from adafruit_ina219 import ADCResolution, BusVoltageRange
#---------------------------------------#
def load_version():
try:
model = None
with open('/proc/cpuinfo', 'r') as cpuinfo:
for line in cpuinfo:
if line.startswith('Hardware'):
hardware = line.split(':')[1].strip()
elif line.startswith('Revision'):
revision = line.split(':')[1].strip()
elif line.startswith('Model'):
model = line.split(':')[1].strip()
return model
except:
return False
model = load_version()
if 'Raspberry Pi Zero' in model :
'''
Importamos la versión simple
'''
TOTAL_BUFFER_VIDEO = 10
import tflite_runtime.interpreter as lite
elif 'Raspberry Pi 4' in model :
'''
Agregar más dispositivos si es necesario
'''
TOTAL_BUFFER_VIDEO = 150
######################################################################
######################################################################
######################################################################
# import tqdm
# import keras
# import random
# import einops
# import pathlib
# import itertools
# import collections
# import tensorflow as tf
TOTAL_BUFFER_VIDEO = 10
import tflite_runtime.interpreter as lite
##################################################################################################################################################################################################################
##################################################################################################################################################################################################################
##################################################################################################################################################################################################################
##################################################################################################################################################################################################################
##################################################################################################################################################################################################################
##################################################################################################################################################################################################################
#---------------------------------------#
#------------------------------------#
i2c = busio.I2C(board.SCL, board.SDA)
#------------------------------------#
@contextmanager
def locked(lock):
lock.acquire()
try:
yield
finally:
lock.release()
def format_frames(frame, output_size):
"""
Pad and resize an image from a video.
Args:
frame: Image that needs to resized and padded.
output_size: Pixel size of the output frame image.
Return:
Formatted frame with padding of specified output size.
"""
########frame = tf.image.convert_image_dtype(frame, tf.float32)
########frame = tf.image.resize_with_pad(frame, *output_size)
frame = Image.fromarray(frame)
frame = ImageOps.pad(frame,output_size,method=Image.Resampling.BILINEAR)
frame = numpy.array(frame)/255.0
return frame
def frames_from_video_file(video, n_frames, output_size = (224,224), frame_step = 15):
"""
Creates frames from each video file present for each category.
Args:
video_path: File path to the video.
n_frames: Number of frames to be created per video file.
output_size: Pixel size of the output frame image.
Return:
An NumPy array of frames in the shape of (n_frames, height, width, channels).
"""
# Read each video frame by frame
result = []
#src = cv2.VideoCapture(str(video_path))
src = video
video_length = len(src)
need_length = 1 + (n_frames - 1) * frame_step
if need_length > video_length:
start = 0
else:
max_start = video_length - need_length
start = random.randint(0, max_start + 1)
# ret is a boolean indicating whether read was successful, frame is the image itself
for _ in range(n_frames):
frame = video[start]
frame = format_frames(frame, output_size)
result.append(frame)
start += frame_step
if start >= video_length:
break
result = numpy.array(result)[..., [2, 1, 0]]
result = result.reshape((1,result.shape[0],result.shape[1],result.shape[2],result.shape[3]))
return result
class MyErrorForManage(Exception):
def __init__(self, mensaje):
super().__init__(mensaje)
self.mensaje = mensaje
class BytesEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, bytes):
return obj.decode('utf-8')
return json.JSONEncoder.default(self, obj)
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
print("UserData= " + str(userdata))
print("flags= " + str(flags))
print("")
class VarsJons(object):
id = None
location = None
data = None
debug = False
type_weights = None
store_data = False
latitude = None
longitude = None
vars_mqtt = None
vars_gpio = None
vars = None
weights = None
def __init__(self):
self.path_file = '/others/vars.json'
self.load_auth_data()
def load_auth_data(self):
try:
with open(self.path_file, 'r') as file:
self.data = json.load(file)
except FileNotFoundError:
raise FileNotFoundError("Archivo auth.json no encontrado en el directorio.")
else:
self.vars = self.data["vars"]
self.vars_mqtt = self.data['mqtt']
self.vars_gpio = self.data['gpio']
self.debug = self.data['debug']
self.latitude = self.data["latitude"]
self.longitude = self.data["longitude"]
self.weights = self.data['weights']
self.id = self.data['id_device']
self.location = self.data["location"]
self.inference_mode = self.data['inference_mode']
self.type_weights = self.data['type_weights']
self.store_data = self.data['store_data']
self.camera = self.data['camera']
def save_json(self):
try:
self.data["vars"] = self.vars
self.data["debug"] = self.debug
self.data["location"] = self.location
self.data['mqtt'] = self.vars_mqtt
self.data["gpio"] = self.vars_gpio
self.data['debug'] = self.debug
self.data["weights"] = self.weights
self.data["id_device"] = self.id
self.data["type_weights"] = self.type_weights
self.data['camera'] = self.camera
except:
pass
else:
try:
with open(self.path_file,'w') as file:
json.dump(self.data,file,indent=7)
except:
pass
def on_disconnect(client,userdata,rc):
def write_status(chain):
now = datetime.now()
formatted_date_time = now.strftime("%d/%m/%Y %H:%M:%S")
filename = '/logs/log.txt'
if not os.path.isdir(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
chain = formatted_date_time + " " + chain
try:
with open(filename,'a') as file:
file.write(chain + '\n')
except:
pass
return
write_status("Se ha desconectado el MQTT, recuperando conexión.")
sleep(0.5)
if rc != 0:
count_attempts = 0
while 1:
try:
client.reconnect()
except:
error = traceback.format_exc()
write_status(f"Error al reconectar MQTT broker. Intento {count_attempts+1}/5. Copia del error: {error}")
count_attempts +=1
time.sleep(0.5)
if count_attempts == 5:
write_status(f"Error al reconectar MQTT broker. Nos desconectamos del servidor.")
return
else:
return
MAX_NUMBER_SENSORS = 4
class estimator(object):
'''
Clase que permite estimar si hay un evento de huayco o lahar
Solo conserva los ultimos valores
'''
flag_load_weights = False
_dataOut = None
_image = None
_share = 10
_video = None
_string_status = None
activate = False
activate_count = 0
count_hb = 0
count_HFS = 0
count_RCWL = 0
status_lidar = 0
flag_internet = False
inference_value = None
list_HB = numpy.empty(MAX_NUMBER_SENSORS,dtype=float)
list_HB.fill(numpy.nan)
list_HFS = numpy.empty(MAX_NUMBER_SENSORS,dtype=float)
list_HFS.fill(numpy.nan)
list_RCWL = numpy.empty(MAX_NUMBER_SENSORS,dtype=float)
list_RCWL.fill(numpy.nan)
timestamp = None
#Para inferencia por imagen
TH_UMBRAL = 0.8
values_dict = {'photo':{'0':'no_huayco','1':'huayco','10':'Camera Not Working'},
'video':{'1':'huayco','0':'no_huayco','10':'Camera Not Working'},
'server':{'1':'huayco','0':'no_huayco','10':'Camera Not Working'}}
def __init__(self,obj):
self.obj_vars = obj
self.id = self.obj_vars.id
self.path_save_json = obj.vars.get("path_save",os.path.join(os.getcwd(),'data'))
self.weights = obj.weights.get(obj.type_weights,None)
self.inference_mode = self.obj_vars.inference_mode
if self.weights is None:
self.write_status("[ERROR] El atributo weights no puede ser None en el objeto Estimator. Porfavor, asegure de configurar correctamente la variable.")
raise AttributeError("El atributo weights no puede ser None en el objeto Estimator. Porfavor, asegure de configurar correctamente la variable.")
def reset_values(self):
self.list_HFS.fill(numpy.nan)
self.list_RCWL.fill(numpy.nan)
self.list_HB.fill(numpy.nan)
self.count_hb = 0
self.count_HFS = 0
self.count_RCWL = 0
gc.collect()
@property
def video(self):
return self._video
@property
def share(self):
return self._share
@property
def dataOut(self):
return self._dataOut
@property
def image(self):
return self._image
@property
def string_status(self):
tmp = self.values_dict[self.inference_mode]
if self._share > 0.5 and self._share<= 2:
self._string_status = tmp['1']
elif self._share <= 0.5:
self._string_status = tmp['0']
else:
self._string_status = tmp['10']
return self._string_status
@image.setter
def image(self,value):
self._image = deepcopy(value['image'])
self.timestamp = deepcopy(value['timestamp'])
@video.setter
def video(self,value):
self._video = deepcopy(value['video'])
self.timestamp = deepcopy(value['timestamp'])
@share.setter
def share(self,value):
self._share = value
if self._share>=0.1 and self._share <5 :
self._share = value
else:
self._share = 0
#------------- Realizamos la ponderación -----------------
count = 0
tmp = round(self.weights['camara']*self._share,2)
if tmp>1.1:
tmp = 0
count = tmp + count
tmp = numpy.nanmean(self.list_HFS)
if numpy.isnan(tmp):
tmp = 0
if tmp>1:
tmp = 0
count += self.weights['HFS']*tmp
if numpy.isnan(self.status_lidar):
self.status_lidar = 0
if self.status_lidar>1:
self.status_lidar = 1
count += self.weights['LIDAR']*(self.status_lidar)
tmp = numpy.nanmean(self.list_HB)
if numpy.isnan(tmp):
tmp = 0
if tmp>1:
tmp = 0
count += self.weights['HB100']*tmp
if count>0.7:
self.activate = True
self.activate_count = count
else:
self.activate = False
self.activate_count = count
@dataOut.setter
def dataOut(self,value):
self._dataOut = value
list_keys = self._dataOut.keys()
self.reset_values()
for key in list_keys:
obj = self._dataOut[key]
y = obj.get_latest()[1]
if 'sensor_HFS' in key:
self.list_HFS[self.count_HFS%MAX_NUMBER_SENSORS] = y
self.count_HFS +=1
if 'sensor_HB' in key:
self.list_HB[self.count_hb%MAX_NUMBER_SENSORS] = y
self.count_hb +=1
if 'sensor_RCWL' in key:
self.list_RCWL[self.count_RCWL%MAX_NUMBER_SENSORS] = y
self.count_RCWL +=1
if 'lidar' in key:
#Solo contamos con un lidar
#Asi que nos es suficiente manejarlo como una variable
self.status_lidar = obj.activate
if self.status_lidar:
self.status_lidar = 1
else:
self.status_lidar = 0
def __load_model_photo(self,path_model):
try:
self.model_IA = lite.Interpreter(model_path=path_model)
self.model_IA.allocate_tensors()
except Exception as e:
#Modelo IA no se pudo cargar
self.write_status(f"No se pudo cargar el modelo IA de foto. Error: {e}")
self.flag_load_weights = False
else:
self.write_status("Modelo IA de photo cargado con éxito.")
self.flag_load_weights = True
def __load_model_video(self,path):
try:
HEIGHT = 224
WIDTH = 224
input_shape = (None, 10, HEIGHT, WIDTH, 3)
input = layers.Input(shape=(input_shape[1:]))
x = input
x = Conv2Plus1D(filters=16, kernel_size=(3, 7, 7), padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = Dropout(0.1)(x)
x = ResizeVideo(HEIGHT // 2, WIDTH // 2)(x)
# Block 1
x = add_residual_block(x, 16, (3, 3, 3))
x = Dropout(0.1)(x)
x = ResizeVideo(HEIGHT // 4, WIDTH // 4)(x)
# Block 2
x = add_residual_block(x, 32, (3, 3, 3))
x = Dropout(0.1)(x)
x = ResizeVideo(HEIGHT // 8, WIDTH // 8)(x)
# Block 3
x = add_residual_block(x, 64, (3, 3, 3))
x = Dropout(0.1)(x)
x = ResizeVideo(HEIGHT // 16, WIDTH // 16)(x)
# Block 4
x = add_residual_block(x, 128, (3, 3, 3))
x = Dropout(0.1)(x)
x = ResizeVideo(HEIGHT // 32, WIDTH // 32)(x)
x = layers.AveragePooling3D((10,1,1))(x)
x = layers.Reshape((x.shape[1]*x.shape[2]*x.shape[3],-1))(x)
x = layers.LSTM(128,return_sequences=True)(x)
x = layers.Flatten()(x)
x = layers.Dense(512)(x)
x = Dropout(0.1)(x)
x = layers.Dense(256)(x)
x = layers.Dense(1, activation='sigmoid')(x)
self.model_IA = keras.Model(input, x)
self.model_IA.load_weights(path)
except:
self.write_status(f"[ERROR] No se pudo cargar el modelo IA de video. Error: {traceback.format_exc()}")
self.flag_load_weights = False
else:
self.write_status("Modelo IA de video cargado con exito.")
self.flag_load_weights = True
def load_weights(self):
#Usar mobilnet debido a su reducido tamaño
if self.inference_mode == 'photo':
path_model = "/tools/models/mobilnet.tflite"
self.__load_model_photo(path_model)
elif self.inference_mode == 'video':
#Peso de videos
path_model = "/tools/models/weights_video.h5"
self.__load_model_video(path_model)
elif self.inference_mode == 'server':
'''
Aqui se realizará inferencias a la IP publica del OVS
- La inferencia al OVS se realizará mientras se cuente con internet.
- Si no se cuenta con internet, se realizará inferencias con el pequeño modelo siempre y
cuando sea una RPI 4 o superior.
'''
self.__model_less_complexity()
def __model_less_complexity(self,):
self.model_IA = True
return
def check_internet(self,verbose=True):
count = 0
while 1:
try:
urllib.request.urlopen('http://www.google.com', timeout=1)
except:
count +=1
if (count ==3):
self.flag_internet = False
break
sleep(0.5)
else:
if verbose:
self.write_status("Se cuenta con conexión a internet.")
if self.debug:
print("Se cuenta con conexión a internet")
self.flag_internet = True
return
return
def get_inference(self,):
self.check_internet(False)
if self.inference_mode == 'video':
'''
Se realiza predicción con el modelo de baja complejidad.
'''
n_frames = 10
if self._video != None:
self.write_status("Realizando inferencia del modelo IA con video.")
try:
self._video = frames_from_video_file(self._video,n_frames)#result.reshape((1,result.shape[0],result.shape[1],result.shape[2],result.shape[3]))
result = 1- self.model_IA.predict(self._video)[0][0]
except:
self.write_status(f"[ERROR] Error en la estimación del video. {traceback.format_exc()} ")
self._video = None
return None
else:
#------------------ Guardamos las inferencias en video ---------------------#
#############################################################################
self.__save_inferences(result)
self._video = None
return result
else:
self.write_status("No se puede realizar la inferencia porque el batch es None.")
elif self.inference_mode == 'photo':
'''
Se realiza inferencias mediante el modelo ML mediante foto.
Se va a deprecar este modo debido a que no es suficiente una foto para la estimación
de huaycos.
'''
if self._image is not None:
self.write_status("Realizando inferencia del modelo IA con photo.")
try:
input_details = self.model_IA.get_input_details()
output_details = self.model_IA.get_output_details()
input_data = deepcopy(self._image)
input_data = Image.fromarray(input_data)
resize = input_data.resize((256,256))
resize = numpy.array(resize)
resize = numpy.expand_dims(resize,axis=0)
resize = resize.astype(numpy.float32)
self.model_IA.set_tensor(input_details[0]['index'], resize)
self.model_IA.invoke()
output_data = self.model_IA.get_tensor(output_details[0]['index'])[0][0]
self.write_status(f"Inferencia realizado con exito. Valor de inferencia: {output_data}.")
if output_data>=0.6:
fpath = r'/data/inferences/img/01'
else:
fpath = r'/data/inferences/img/00'
if not os.path.isdir:
os.makedirs(fpath)
name = f'{self.timestamp}.png'
fpath = os.path.join(fpath,name)
original_image = Image.fromarray(self._image)
original_image.save(fpath)
return output_data
except:
exc = traceback.format_exc()
self.write_status(f"[ERROR] Error al realizar inferencia. Copia del error {exc}")
return None
else:
self.write_status("No se puede realizar la inferencia porque la imagen es None.")
elif self.inference_mode == 'server':
'''
Se realizará la inferencia al servidor.
Solo se envía los datos comprimidos en formato json. El servidor se encargará
de darle formato a la imagen.
'''
url = "http://38.10.105.243:7777/predict"
input_data = {'instances':base64.b64encode(self._video.getvalue()).decode('utf-8'),
'id_user':str(self.id),
'request_format':True,
'shape':(360,640)}
headers = {
'Content-Type': 'application/json',
'Content-Encoding': 'gzip-B64',
}
input_data = json.dumps(input_data)
compress = io.BytesIO()
with gzip.GzipFile(fileobj=compress, mode='wb', compresslevel=9) as gz2:
gz2.write(input_data.encode('utf-8'))
if self.flag_internet:
'''
Se cuenta con internet para enviar el video al servidor a fin de realizar la inferencia.
'''
try:
resp = requests.post(url,data=compress.getvalue(),headers=headers)
except:
self._video = None
compress = None
gc.collect()
self.write_status(f"Error ocurrido al realizar la inferencia al servidor. {traceback.format_exc()}")
else:
if resp.status_code == 200:
time1= datetime.now().timestamp()
value_inference = round(1-resp.json()['predictions'][0][0],4) # El modelo actual requiere una resta de 1. Debido a que 0 es evento y 1 es no evento.
self.write_status(f"Inferencia al servidor realizado con exito. Valor de inferencia: {value_inference}.")
self.__save_inferences(value_inference)
self._video = None
compress = None
gc.collect()
return value_inference
else:
self.write_status(f"Se obtuvo otro codigo de respuesta al realizar inferencia al servidor. {resp.status_code}")
self._video = None
compress = None
gc.collect()
return None
else:
'''
Probamos con el modelo de menor complejidad.
- No se encuentra desarrollado por el momento.
'''
self.write_status("[IA] Metodo IA de menor complejidad no ha sido implementado para este modo.")
return None
def __save_inferences(self,result):
return
frame_width, frame_height = self._video.shape[1],self._video.shape[2]
if result>=0.6:
fpath = r'/data/inferences/video/01'
else:
fpath = r'/data/inferences/video/00'
if not os.path.isdir(fpath):
os.makedirs(fpath)
try:
name = f'{self.timestamp}.mp4'
fpath = os.path.join(fpath,name)
out = cv2.VideoWriter(fpath, cv2.VideoWriter_fourcc(*'mp4v'), 10, (frame_width, frame_height))
batch = self._video[0]
for frame in batch:
if frame.dtype != numpy.uint8:
frame = frame.astype(numpy.uint8)
out.write(frame)
out.release()
except:
self.write_status(traceback.format_exc())
def write_status(self,chain):
now = datetime.now()
formatted_date_time = now.strftime("%d/%m/%Y %H:%M:%S")
filename = '/logs/log.txt'
if not os.path.isdir(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
chain = formatted_date_time + " " + chain
try:
with open(filename,'a') as file:
file.write(chain + '\n')
except:
if self.debug:
print("Ocurrió un error al guardar datos logs.")
return
def write_data(self,data):
now = datetime.now()
formatted_date_time = now.strftime("%d/%m/%Y %H:%M:%S") + " |"
try:
name = 'data.txt'
filename = os.path.join(self.path_save_json,name)
with open(filename,'a') as file:
file.write(formatted_date_time + str(json.dumps(data)) + '\n')
except:
if self.debug:
print(f"Ocurrió un error al guardar los datos en el archivo {name}")
self.write_status(f"[ERROR] Ocurrió un error al guardar los datos en el archivo {name}.")
def run(self,):
'''
-----------------------------------------------------------------------------------
Se ha obtenido un promedio de 0.303 segundos por inferencia para foto con RPI Zero.
Se ha obtenido un promedio de 2.4 segundos de inferencia para videos con RPI 4
-----------------------------------------------------------------------------------
'''
value = None
if self.flag_load_weights or self.inference_mode == 'server':
#--------------- Realizamos la inferencia ---------------#
self.inference_value = self.get_inference()
#Por ahora solo copiamos los datos
self.write_data("Inferencia:" + str(self.inference_value) + " Timestamp: " + str(self.timestamp) )
self.write_status("Inferencia:" + str(self.inference_value) + " Timestamp: " + str(self.timestamp) )
class camera(object):
data = None
flag = False
activity = False
_status = False
url_rstp = None
brightness = False
flag_brightness = False
def write_status(self,chain):
now = datetime.now()
formatted_date_time = now.strftime("%d/%m/%Y %H:%M:%S")
filename = '/logs/log.txt'
chain = formatted_date_time + " |" + chain
if not os.path.isdir(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
try:
with open(filename,'a') as file:
file.write(chain + '\n')
except:
if self.debug:
print("Ocurrió un error al guardar datos logs.")
return
def __init__(self,flag=False, pin= 26,obj=None):
'''
Definiciones
------------
flag -> Bandera que condiciona la adquisición de fotografías por la cámara. Permite decidir si se usa o no la cámara.
pin -> Pin que controla el relé de alimentación hacia la cámara.
camara always on -> Modo que permite capturar en cualquier estación del año, pero es condicionado por el flag.
08-08-24
--------
Se agregan nuevas funciones para la camara HIKVISION usando ISAPI
'''
self.flag = flag
self.pin = pin
self.debug = obj.debug
self.vars = obj.vars
self.vars_mqtt = obj.vars_mqtt
self.store_data = obj.store_data
self.vars_gpio = obj.vars_gpio
self.camera_keys = obj.camera
self.camera_always_on = self.vars_gpio.get("camera_always_on",False)
self.camera_ip = self.camera_keys.get("ip")
self.username_camera = self.camera_keys.get("username")
self.password_camera = self.camera_keys.get("password")
self.port_camera = self.camera_keys.get("port")
self.__config()
def __config(self,):
self.__gen__rstp()
self.__update_time()
self.__switch_mode_to_night()
self.__update_brightness(brightness=0) #Apagamos la luz de la cámara
def __update_time(self,):
if self.camera_ip != None:
url_supplement_light = f'http://{self.camera_ip}/ISAPI/System/time'
now = datetime.now(pytz.utc).astimezone(pytz.timezone('Etc/GMT+5')).time()
hora_actual = now.strftime('%Y-%m-%dT%H:%M:%S')
zona_horaria = "EST5"
xml_data = f"""<Time version="2.0" xmlns="http://www.isapi.org/ver20/XMLSchema">
<timeMode>manual</timeMode>
<localTime>{hora_actual}</localTime>
<timeZone>{zona_horaria}</timeZone>
</Time>"""
try:
response = requests.put(
url_supplement_light,
data=xml_data,
headers={'Content-Type': 'application/xml'},
auth=HTTPDigestAuth(self.username_camera, self.password_camera)
)
if response.status_code == 200:
print(f"Hora actualizada.")
else:
raise RuntimeError(f"Error {response.status_code}: {response.text}")
except:
self.write_status(f"[Camera] Error producido al actualizar la fecha. Error: {traceback.format_exc()}.")
else:
self.write_status(f"[Camera] Fecha actualizada.")
def __gen__rstp(self):
self.url_rstp = f'rtsp://{self.username_camera}:{self.password_camera}@{self.camera_ip}:{self.port_camera}/streaming/channels/1'
def __update_brightness(self,brightness=100):
if self.camera_ip != None:
url_supplement_light = f'http://{self.camera_ip}/ISAPI/Image/channels/1/supplementLight'
xml_data = f'''
<SupplementLight>
<supplementLightMode>colorVuWhiteLight</supplementLightMode>
<mixedLightBrightnessRegulatMode>manual</mixedLightBrightnessRegulatMode>
<whiteLightBrightness>{brightness}</whiteLightBrightness>
</SupplementLight>
'''
try:
response = requests.put(
url_supplement_light,
data=xml_data,
headers={'Content-Type': 'application/xml'},
auth=HTTPDigestAuth(self.username_camera, self.password_camera)
)
if response.status_code == 200:
print(f"Brillo ajustado a {brightness}%.")
else:
raise RuntimeError(f"Error {response.status_code}: {response.text}")
except:
self.write_status(f"[Camera] Error producido al actualizar el brillo. Error {traceback.format_exc()}.")
else:
self.write_status(f"[Camera] Brillo de luz actualizado a {brightness}.")
if brightness >50:
sleep(3)
def __switch_mode_to_night(self,):
'''
En este modulo, se configura a la cámara para que pueda cambiar el modo switch a modo noche.
'''
xml_data = """
<?xml version:"1.0" encoding="UTF-8"?>
<IrcutFilter>
<IrcutFilterType>night</IrcutFilterType>
</IrcutFilter>
"""
headers ={'Content-Type': 'application/xml'}
url = f'http://{self.camera_ip}/ISAPI/Image/channels/1/ircutFilter'
username = self.username_camera
password = self.password_camera
if self.camera_ip != None:
try:
response = requests.put(url, data=xml_data, auth=HTTPDigestAuth(username, password), headers=headers)
except:
self.write_status(f"[ERROR] Error al cambiar modo de la camara a noche. Error: {traceback.format_exc()}")
else:
if response.status_code == 200:
self.write_status("[Camera] Modo ha sido cambiado a Noche.")
else:
self.write_status("[Camera] Error al cambiar a modo noche.")
def __on_camera(self,):
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(self.pin,GPIO.OUT)
GPIO.output(self.pin,GPIO.LOW)
self.activity = True
def __off_camera(self,):
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(self.pin,GPIO.OUT)
GPIO.output(self.pin,GPIO.HIGH)
self.activity = False
def __is_night(self):
'''
Se establecen las condiciones para que tiempos sea declarado noche
'''
now = datetime.now(pytz.utc).astimezone(pytz.timezone('Etc/GMT+5')).time()
flag = False
flag = (time(17,0)<=now<=time(23,59)) or (time(0,0)<=now<=time(6,50))
return flag
@property
def status(self):
if self.flag == False:
self.__off_camera()
return False
if self.camera_always_on:
'''La cámara siempre estará prendida'''
self.__on_camera()
self.__process_on()
return True
#------ Establecemos UTC -5 ---------#
utc_minus_5 = pytz.timezone('Etc/GMT+5')
now = datetime.now(pytz.utc).astimezone(utc_minus_5).time()
'''
Aqui establecemos criterío de activación de la cámara
-----------------------------------------------------
- Por ejemplo, aqui se define que la cámara funciona entre las 6 am y 18pm
de cada día. La activación y desactivación de la cámara será controlada mediante un relé.
- Se puede establecer otros criterios como activar la cámara durante ciertos
periodos de meses por inactividad o menor radiación.
'''
#------------------- Criterio ----------------------------#
# self._status = False
# self._status = time(6, 00) <= now <= time(7, 10)
# self._status = time(10, 00) <= now <= time(10, 10)
# self._status = time(12, 00) <= now <= time(12, 10)
# self._status = time(16, 00) <= now <= time(16, 10)
# self._status = time(17, 30) <= now <= time(18,30)
# self._status = time(21, 00) <= now <= time(21,10)
# self._status = time(1, 00) <= now <= time(1,10)
# self._status = time(4, 00) <= now <= time(4,10)
self._status = True
#-------------------- Condiciones ------------------------#
if self._status ==True and self.pin != None and self.activity == False:
#Prendemos la cámara del relé.
self._status = True
self.__on_camera()
self.write_status("[CAMERA] Se prendió la cámara.")
elif self._status == False and self.pin != None and self.activity == True:
# Operaciones para desactivar la salida del relé
self.__off_camera()
self.write_status("[CAMERA] Se apagó la cámara.")
###############################################################
# if self._status:
# self.__process_on() # Procesos de validación cuando la cámara esté prendida o en funcionamiento.
return self._status
def control_brightness(self,brightness=100):
try:
flag_night = self.__is_night()
if flag_night == True and brightness>0:
#Realizamos el cambio de brillo
self.brightness = True
self.__update_brightness(brightness=brightness)
elif brightness == 0:
self.brightness = False
self.__update_brightness(brightness=0)
except:
self.write_status("[CAMERA_ERROR] Ocurrió un error al controlar el brillo de la camara.")
return
def __process_on(self,):
'''
Procesos que se ejecutan o validan cuando la cámara está prendida.
'''
flag_night = self.__is_night()
if flag_night == True and self.flag_brightness == False:
self.flag_brightness = True
self.__update_brightness(brightness=100)
if flag_night==False and self.flag_brightness == True:
self.flag_brightness = False
self.__update_brightness(brightness=0)
class sensor(object):
max_size = 300
name = ""
key = None
H0 = None
y_value = list()
x_value = list()
FLAG_CALIBRATION_LIDAR = False
array_calibration = list()
def write_status(self,chain):
now = datetime.now()
formatted_date_time = now.strftime("%d/%m/%Y %H:%M:%S")
filename = '/logs/log.txt'
chain = formatted_date_time + " |" + chain
if not os.path.isdir(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
try:
with open(filename,'a') as file:
file.write(chain + '\n')
except:
if self.debug:
print("Ocurrió un error al guardar datos logs.")
return
def __init__(self,name,key):
self.name = name
self.key = key
def __logic(self,value):
'''
Se implementará en las clases hijas.
'''
self.write_status("[ERROR SENSOR] No se encuentra implementado el método lógic.")
raise NotImplementedError("No se encuentra implementado el método logic.")
def insert_value(self,value):
if 'lidar' in self.name:
self.__load_H0()
if self.FLAG_CALIBRATION_LIDAR == False:
self.__calibration(value)
timestamp = datetime.now().timestamp()
size = len(self.x_value)
if(size>self.max_size):
self.x_value = self.x_value[1:]
self.y_value = self.y_value[1:]
self.x_value.append(timestamp)
self.y_value.append(value)
gc.collect()
def get_values(self,):
return numpy.array(self.x_value,dtype=float), numpy.array(self.y_value,dtype=float)
def get_latest(self,):
if len(self.x_value)>0:
return self.x_value[-1], self.y_value[-1]
else:
return None,None
class sensor_HFS(sensor):
activate = False
timestamp_init = 0
timestamp_fin = 0
timestamp = 0
timestamp_off = 0
status = False
prev_status = False
THRESHOLD_BETWEEN_OFF_SENSOR = 3
THRESHOLD_BETWEEN_ON_SENSOR = 5
pull_down = True
def __init__(self,name,key,pin,**kwargs):
self.name = name
self.key = key
self.pin = pin
if pin == None:
self.write_status("[ERROR] Pin no debe de ser None para el sensor HFS.")
raise AttributeError("Valor de Pin es None.")
self.pull_down = False
self.__config()
def __config(self,):
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
if self.pull_down:
GPIO.setup(self.pin,GPIO.IN,pull_up_down=GPIO.PUD_DOWN)
else:
GPIO.setup(self.pin,GPIO.IN)
chain = f"[Settings] Sensor HFS configurado con valores key:{self.key} name:{self.name} pin{self.pin}"
self.write_status(chain)
def run(self,):
value = GPIO.input(self.pin)
self.__logic(value)
def current_sensor(self,):
return GPIO.input(self.pin)
def __logic(self,status):
timestamp = datetime.now().timestamp()
self.prev_status = self.status
self.status = status
self.timestamp = timestamp
if(self.prev_status == False and self.status == True):
self.timestamp_init = timestamp
elif(self.prev_status == False and self.status == False):
#sensor desactivado
if self.activate:
if ((timestamp - self.timestamp_off)>=self.THRESHOLD_BETWEEN_OFF_SENSOR):
self.activate = False
elif(self.prev_status == True and self.status == False):
#se desactivó el sensor
if self.activate:
self.timestamp_off = timestamp
elif(self.prev_status == True and self.status == True):
if (timestamp - self.timestamp_init>=self.THRESHOLD_BETWEEN_ON_SENSOR):
#sensor activado
self.activate = True
class ina(sensor):
bus_voltage = 0
shunt_voltage = 0
current = 0
def __init__(self,name,key,address):
if address == None:
self.write_status("[ERROR] Se debe de asignar la dirección al atributo INA.")
raise AttributeError("Se debe de asignar la dirección al atributo INA.")
self.address = address
self.name = name
self.key = key
self.__config()
def __config(self,):
self.sensor = adafruit_ina219.INA219(i2c,self.address)
#Aumentamos resolución del ina219
self.sensor.bus_adc_resolution = ADCResolution.ADCRES_12BIT_32S
self.sensor.shunt_adc_resolution = ADCResolution.ADCRES_12BIT_32S
#self.sensor.bus_voltage_range = BusVoltageRange.RANGE_16V
def run(self,):
self.bus_voltage = self.sensor.bus_voltage #V
self.shunt_voltage = self.sensor.shunt_voltage / 1000 #mV
self.current = self.sensor.current #mA
class lidar(sensor):
H0 = 0
dH_ = None
minus_H = -6 #Desnivel para realizar una nueva calibración.
TIME_LIDAR_ON = 30 #Segundos para considerar activado el sensor.
TIME_LIDAR_OFF = 60 #Segundos para considerar desactivado el sensor
TIME_RARE_EVENT = 60 #Silenciamos la activación por 1 minutos
NUM_SAMPLES_CALIBRATION = 15
NUM_SAMPLES_MEAN = 10
TIMEOUT_CALIBRATION = 60*60*24*1 #Se calibrará automaticamente cada 1 dia.
rare_height = 60
min_H = 10 # Centimetros como minimo de columna de agua
#---------------------------------------------------------------------------------#
mode_calibration = False
ERROR_WIRE = False
activate = False
FLAG_RARE_EVENT = False
timestamp_init = None
timestamp_fin = None
timestamp_calibrate = None
timestamp_rare_event = None
timestamp_init_calibration = 0 #Aseguramos de calibrar el sensor en cada encendido.
#---------------------------------------------------------------------------------#
array_samples = list()
def __load_H0(self,):
try:
timestamp = datetime.now().timestamp()
path = "/others/h0.txt"
flag = os.path.exists(path)
if (flag):
with open(path,'r') as file:
string_ = (file.readline())
values = string_.split("|")
self.timestamp_init_calibration = float(values[0])
self.H0 = float(values[1].strip())
if(timestamp - self.timestamp_init_calibration>self.TIMEOUT_CALIBRATION):
self.FLAG_CALIBRATION_LIDAR = False
elif self.H0 > 10000:
self.FLAG_CALIBRATION_LIDAR = False
else:
self.FLAG_CALIBRATION_LIDAR = True
else:
# Se debe de realizar su calibración de H0
self.FLAG_CALIBRATION_LIDAR = False
except Exception as e:
print(f"Error producido al leer H0 del lidar. Copia del error {e}.")
def __calibration(self,value):
self.array_calibration.append(value)
if(len(self.array_calibration)==self.NUM_SAMPLES_CALIBRATION):
self.H0 = numpy.nanmedian(numpy.array(self.array_calibration))
self.FLAG_CALIBRATION_LIDAR = True
self.array_calibration = list()
#Guardamos el archivo de calibración
try:
path = "/others/h0.txt"
flag = os.path.exists(path)
if flag:
try:
os.remove(path)
except:
pass
with open(path,"w") as file:
timestamp = datetime.now().timestamp()
file.write(str(timestamp)+"|"+str(self.H0))
self.timestamp_init_calibration = timestamp
self.timestamp_calibrate = None
self.timestamp_init = None
self.timestamp_fin = None
self.timestamp_rare_event= None
self.FLAG_RARE_EVENT = False
self.activate = False
self.mode_calibration = False
except:
pass
finally:
gc.collect()
def __config(self,):
self.sensor_lidar = adafruit_lidarlite.LIDARLite(i2c, sensor_type=adafruit_lidarlite.TYPE_V3HP)
def __init__(self,name,key):
super().__init__(name,key)
self.__config()
self.__load_H0()
if self.FLAG_CALIBRATION_LIDAR == False:
self.mode_calibration = True
else:
self.mode_calibration = False
def run(self,):
timestamp = datetime.now().timestamp()
diff = timestamp- self.timestamp_init_calibration
value = self.sensor_lidar.distance
if self.mode_calibration:
self.__calibration(value)
elif (diff> self.TIMEOUT_CALIBRATION):
#Es necesario realizar una calibración
self.mode_calibration = True
else:
if (value > 10000 and self.ERROR_WIRE == False):
'''
Error de sensor lidar en la obtención de datos. Puede ser problema de cables
Se desabilita hasta que sea corregido manualmente.
'''
self.ERROR_WIRE = True
self.dH_ = None
path = "/others/h0.txt"
with open(path,"w") as file:
timestamp = datetime.now().timestamp()
file.write(str(0)+"|"+str(0))
elif (value < 10000 ):
if(self.ERROR_WIRE):
self.ERROR_WIRE = False
self.mode_calibration = True
else:
self.array_samples.append(value)
self.array_samples.append(self.sensor_lidar.distance)
if(len(self.array_samples)>=self.NUM_SAMPLES_MEAN):
value = numpy.nanmedian(numpy.array(self.array_samples))
self.array_samples = list()
size = len(self.x_value)
if(size>self.max_size):
self.x_value = self.x_value[1:]
self.y_value = self.y_value[1:]
self.x_value.append(timestamp)
self.y_value.append(value)
#---------------------- logica -------------------------#
self.__logic(value)
def __logic(self,value):
timestamp = datetime.now().timestamp()
dH = self.H0 - value
self.dH_ = dH
'''
Si dH>0, entonces el sistema ha detectado evento
Si dH<0, hay un desnivel en el suelo o referencia por lo que es necesario volver a calibrar
'''
if dH>= self.min_H and self.timestamp_init == None:
'''
Comienza el evento
'''
self.timestamp_init = timestamp
elif dH<self.min_H and dH>=0 :
if self.timestamp_init != None:
diff_timestamp = datetime.now().timestamp() - self.timestamp_init # Calculamos cuanto tiempo va activado la señal.
if self.timestamp_init != None and dH>1 and diff_timestamp<15:
'''
Al inicio del evento se puede considerar pequeñas variaciones
'''
pass
elif self.timestamp_init !=None and self.timestamp_fin == None:
'''
El tiempo que lleva activado la señal de alerta es más de 15 segundos.
'''
self.timestamp_fin = timestamp
elif self.timestamp_init!=None and self.timestamp_fin !=None:
'''
En caso la diferencia de altura es menor a lo establecido como emergencia,
entonces se desactiva la señal de alerta pero ya pasado un tiempo.
'''
if (timestamp - self.timestamp_fin>=self.TIME_LIDAR_OFF):
self.timestamp_fin = None
self.timestamp_init = None
self.timestamp_rare_event = None
self.activate = False
elif dH>= self.min_H and self.timestamp_init != None:
#Consideramos que debe de ser constante tal cambio al menos 30 segundos
#Se puede configurar en el archivo vars.json
timestamp = datetime.now().timestamp()
diff_timestamp = timestamp - self.timestamp_init
if dH>= self.rare_height and diff_timestamp<=10:
'''
En este caso consideramos que la altura aumentó rapidamente en menos de 10 segundos.
Este suceso puede ser producido por un mantenimiento o por seres vivos en el cauce.
'''
self.FLAG_RARE_EVENT = True
self.timestamp_rare_event = timestamp
self.timestamp_init = None
self.timestamp_fin = None
self.activate = False
elif diff_timestamp >= self.TIME_LIDAR_ON:
'''
El tiempo de activación fue superado, por lo que se validará la activación.
'''
if(self.timestamp_rare_event == None):
self.timestamp_rare_event = 0
diff = timestamp - self.timestamp_rare_event
if (diff>self.TIME_RARE_EVENT):
self.timestamp_rare_event = 0
self.FLAG_RARE_EVENT = False
if not self.FLAG_RARE_EVENT:
self.activate = True
self.timestamp_rare_event = None
elif dH <=self.minus_H and self.timestamp_calibrate == None:
'''
Verificamos si es necesario realizar una calibración.
'''
self.timestamp_calibrate = timestamp
elif dH <=self.minus_H and self.timestamp_calibrate != None:
'''
Si se sobrepasa el tiempo necesario para una nueva calibración,
entonces se seleccion
'''
diff = timestamp - self.timestamp_calibrate
if diff >= 30:
self.mode_calibration = True
else:
'''
En el unico caso que el lidar marque dH=0.
'''
self.timestamp_calibrate = None
self.timestamp_init = None
self.timestamp_fin = None
self.timestamp_rare_event= None
gc.collect()