IdentificacionIA/reconocimiento2.py

466 lines
19 KiB
Python

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import cv2
import numpy as np
from deepface import DeepFace
import pickle
import time
import threading
import asyncio
import edge_tts
import subprocess
from datetime import datetime
import warnings
import urllib.request
warnings.filterwarnings("ignore")
# ──────────────────────────────────────────────────────────────────────────────
# CONFIGURACIÓN
# ──────────────────────────────────────────────────────────────────────────────
DB_PATH = "db_institucion"
CACHE_PATH = "cache_nombres"
VECTORS_FILE = "base_datos_rostros.pkl"
TIMESTAMPS_FILE = "representaciones_timestamps.pkl"
UMBRAL_SIM = 0.39 # Por encima → identificado. Por debajo → desconocido.
COOLDOWN_TIME = 15 # Segundos entre saludos
USUARIO, PASSWORD, IP_DVR = "admin", "TCA200503", "192.168.1.244"
RTSP_URL = f"rtsp://{USUARIO}:{PASSWORD}@{IP_DVR}:554/Streaming/Channels/702"
for path in [DB_PATH, CACHE_PATH]:
os.makedirs(path, exist_ok=True)
# ──────────────────────────────────────────────────────────────────────────────
# YUNET — Detector facial rápido en CPU
# ──────────────────────────────────────────────────────────────────────────────
YUNET_MODEL_PATH = "face_detection_yunet_2023mar.onnx"
if not os.path.exists(YUNET_MODEL_PATH):
print(f"Descargando YuNet ({YUNET_MODEL_PATH})...")
url = ("https://github.com/opencv/opencv_zoo/raw/main/models/"
"face_detection_yunet/face_detection_yunet_2023mar.onnx")
urllib.request.urlretrieve(url, YUNET_MODEL_PATH)
print("YuNet descargado.")
# Detector estricto para ROIs grandes (persona cerca)
detector_yunet = cv2.FaceDetectorYN.create(
model=YUNET_MODEL_PATH, config="",
input_size=(320, 320),
score_threshold=0.70,
nms_threshold=0.3,
top_k=5000
)
# Detector permisivo para ROIs pequeños (persona lejos)
detector_yunet_lejano = cv2.FaceDetectorYN.create(
model=YUNET_MODEL_PATH, config="",
input_size=(320, 320),
score_threshold=0.45,
nms_threshold=0.3,
top_k=5000
)
def detectar_rostros_yunet(roi, lock=None):
"""
Elige automáticamente el detector según el tamaño del ROI.
"""
h_roi, w_roi = roi.shape[:2]
area = w_roi * h_roi
det = detector_yunet if area > 8000 else detector_yunet_lejano
try:
if lock:
with lock:
det.setInputSize((w_roi, h_roi))
_, faces = det.detect(roi)
else:
det.setInputSize((w_roi, h_roi))
_, faces = det.detect(roi)
except Exception:
return []
if faces is None:
return []
resultado = []
for face in faces:
try:
fx, fy, fw, fh = map(int, face[:4])
score = float(face[14]) if len(face) > 14 else 1.0
resultado.append((fx, fy, fw, fh, score))
except (ValueError, OverflowError, TypeError):
continue
return resultado
# ──────────────────────────────────────────────────────────────────────────────
# SISTEMA DE AUDIO
# ──────────────────────────────────────────────────────────────────────────────
def obtener_audios_humanos(genero):
hora = datetime.now().hour
es_mujer = genero.lower() == 'woman'
suffix = "_m.mp3" if es_mujer else "_h.mp3"
if 5 <= hora < 12:
intro = "dias.mp3"
elif 12 <= hora < 19:
intro = "tarde.mp3"
else:
intro = "noches.mp3"
cierre = ("fin_noche" if (hora >= 19 or hora < 5) else "fin_dia") + suffix
return intro, cierre
async def sintetizar_nombre(nombre, ruta):
nombre_limpio = nombre.replace('_', ' ')
try:
comunicador = edge_tts.Communicate(nombre_limpio, "es-MX-DaliaNeural", rate="+10%")
await comunicador.save(ruta)
except Exception:
pass
def reproducir(archivo):
if os.path.exists(archivo):
subprocess.Popen(
["mpv", "--no-video", "--volume=100", archivo],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
def hilo_bienvenida(nombre, genero):
archivo_nombre = os.path.join(CACHE_PATH, f"nombre_{nombre}.mp3")
if not os.path.exists(archivo_nombre):
try:
asyncio.run(sintetizar_nombre(nombre, archivo_nombre))
except Exception:
pass
intro, cierre = obtener_audios_humanos(genero)
archivos = [f for f in [intro, archivo_nombre, cierre] if os.path.exists(f)]
if archivos:
subprocess.Popen(
["mpv", "--no-video", "--volume=100"] + archivos,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# ──────────────────────────────────────────────────────────────────────────────
# GESTIÓN DE BASE DE DATOS (AHORA CON RETINAFACE Y ALINEACIÓN)
# ──────────────────────────────────────────────────────────────────────────────
def gestionar_vectores(actualizar=False):
import json # ⚡ Asegúrate de tener importado json
vectores_actuales = {}
if os.path.exists(VECTORS_FILE):
try:
with open(VECTORS_FILE, 'rb') as f:
vectores_actuales = pickle.load(f)
except Exception:
vectores_actuales = {}
if not actualizar:
return vectores_actuales
timestamps = {}
if os.path.exists(TIMESTAMPS_FILE):
try:
with open(TIMESTAMPS_FILE, 'rb') as f:
timestamps = pickle.load(f)
except Exception:
timestamps = {}
# ──────────────────────────────────────────────────────────
# CARGA DEL CACHÉ DE GÉNEROS
# ──────────────────────────────────────────────────────────
ruta_generos = os.path.join(CACHE_PATH, "generos.json")
dic_generos = {}
if os.path.exists(ruta_generos):
try:
with open(ruta_generos, 'r') as f:
dic_generos = json.load(f)
except Exception:
pass
print("\nACTUALIZANDO BASE DE DATOS (Alineación y Caché de Géneros)...")
imagenes = [f for f in os.listdir(DB_PATH) if f.lower().endswith(('.jpg', '.png'))]
nombres_en_disco = set()
hubo_cambios = False
cambio_generos = False # Bandera para saber si actualizamos el JSON
for archivo in imagenes:
nombre_archivo = os.path.splitext(archivo)[0]
ruta_img = os.path.join(DB_PATH, archivo)
nombres_en_disco.add(nombre_archivo)
ts_actual = os.path.getmtime(ruta_img)
ts_guardado = timestamps.get(nombre_archivo, 0)
# Si ya tenemos el vector pero NO tenemos su género en el JSON, forzamos el procesamiento
falta_genero = nombre_archivo not in dic_generos
if nombre_archivo in vectores_actuales and ts_actual == ts_guardado and not falta_genero:
continue
try:
img_db = cv2.imread(ruta_img)
lab = cv2.cvtColor(img_db, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
l = clahe.apply(l)
img_mejorada = cv2.cvtColor(cv2.merge((l, a, b)), cv2.COLOR_LAB2BGR)
# IA DE GÉNERO (Solo se ejecuta 1 vez por persona en toda la vida del sistema)
if falta_genero:
try:
analisis = DeepFace.analyze(img_mejorada, actions=['gender'], enforce_detection=False)[0]
dic_generos[nombre_archivo] = analisis.get('dominant_gender', 'Man')
except Exception:
dic_generos[nombre_archivo] = "Man" # Respaldo
cambio_generos = True
# Extraemos el vector
res = DeepFace.represent(
img_path=img_mejorada,
model_name="ArcFace",
detector_backend="mtcnn",
align=True,
enforce_detection=True
)
emb = np.array(res[0]["embedding"], dtype=np.float32)
norma = np.linalg.norm(emb)
if norma > 0:
emb = emb / norma
vectores_actuales[nombre_archivo] = emb
timestamps[nombre_archivo] = ts_actual
hubo_cambios = True
print(f" Procesado y alineado: {nombre_archivo} | Género: {dic_generos.get(nombre_archivo)}")
except Exception as e:
print(f" Rostro no válido en '{archivo}', omitido. Error: {e}")
# Limpieza de eliminados
for nombre in list(vectores_actuales.keys()):
if nombre not in nombres_en_disco:
del vectores_actuales[nombre]
timestamps.pop(nombre, None)
if nombre in dic_generos:
del dic_generos[nombre]
cambio_generos = True
hubo_cambios = True
print(f" Eliminado (sin foto): {nombre}")
# Guardado de la memoria
if hubo_cambios:
with open(VECTORS_FILE, 'wb') as f:
pickle.dump(vectores_actuales, f)
with open(TIMESTAMPS_FILE, 'wb') as f:
pickle.dump(timestamps, f)
# Guardado del JSON de géneros si hubo descubrimientos nuevos
if cambio_generos:
with open(ruta_generos, 'w') as f:
json.dump(dic_generos, f)
if hubo_cambios or cambio_generos:
print(" Sincronización terminada.\n")
else:
print(" Sin cambios. Base de datos al día.\n")
return vectores_actuales
# ──────────────────────────────────────────────────────────────────────────────
# BÚSQUEDA BLINDADA (Similitud Coseno estricta)
# ──────────────────────────────────────────────────────────────────────────────
def buscar_mejor_match(emb_consulta, base_datos):
# ⚡ MAGIA 3: Normalización L2 del vector entrante
norma = np.linalg.norm(emb_consulta)
if norma > 0:
emb_consulta = emb_consulta / norma
mejor_match, max_sim = None, -1.0
for nombre, vec in base_datos.items():
# Como ambos están normalizados, esto es Similitud Coseno pura (-1.0 a 1.0)
sim = float(np.dot(emb_consulta, vec))
if sim > max_sim:
max_sim = sim
mejor_match = nombre
return mejor_match, max_sim
# ──────────────────────────────────────────────────────────────────────────────
# LOOP DE PRUEBA Y REGISTRO (CON SIMETRÍA ESTRICTA)
# ──────────────────────────────────────────────────────────────────────────────
def sistema_interactivo():
base_datos = gestionar_vectores(actualizar=False)
cap = cv2.VideoCapture(RTSP_URL)
ultimo_saludo = 0
persona_actual = None
confirmaciones = 0
print("\n" + "=" * 50)
print(" MÓDULO DE REGISTRO Y DEPURACIÓN ESTRICTO")
print(" [R] Registrar nuevo rostro | [Q] Salir")
print("=" * 50 + "\n")
faces_ultimo_frame = []
while True:
ret, frame = cap.read()
if not ret:
time.sleep(2)
cap.open(RTSP_URL)
continue
h, w = frame.shape[:2]
display_frame = frame.copy()
tiempo_actual = time.time()
faces_raw = detectar_rostros_yunet(frame)
faces_ultimo_frame = faces_raw
for (fx, fy, fw, fh, score_yunet) in faces_raw:
fx = max(0, fx); fy = max(0, fy)
fw = min(w - fx, fw); fh = min(h - fy, fh)
if fw <= 0 or fh <= 0:
continue
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (255, 200, 0), 2)
cv2.putText(display_frame, f"YN:{score_yunet:.2f}",
(fx, fy - 25), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 200, 0), 1)
if (tiempo_actual - ultimo_saludo) <= COOLDOWN_TIME:
continue
m = int(fw * 0.15)
roi = frame[max(0, fy-m): min(h, fy+fh+m),
max(0, fx-m): min(w, fx+fw+m)]
# 🛡️ FILTRO DE TAMAÑO FÍSICO
if roi.size == 0 or roi.shape[0] < 40 or roi.shape[1] < 40:
cv2.putText(display_frame, "muy pequeno",
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 100, 255), 1)
continue
# 🛡️ FILTRO DE NITIDEZ
gray_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
nitidez = cv2.Laplacian(gray_roi, cv2.CV_64F).var()
if nitidez < 50.0:
cv2.putText(display_frame, f"blur({nitidez:.0f})",
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 165, 255), 1)
continue
# 🌙 SIMETRÍA 1: VISIÓN NOCTURNA (CLAHE) AL VIDEO EN VIVO
try:
lab = cv2.cvtColor(roi, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
l = clahe.apply(l)
roi_mejorado = cv2.cvtColor(cv2.merge((l, a, b)), cv2.COLOR_LAB2BGR)
except Exception:
roi_mejorado = roi # Respaldo de seguridad
# 🧠 SIMETRÍA 2: MOTOR MTCNN Y ALINEACIÓN (Igual que la Base de Datos)
try:
res = DeepFace.represent(
img_path=roi_mejorado,
model_name="ArcFace",
detector_backend="mtcnn", # El mismo que en gestionar_vectores
align=True, # Enderezamos la cara
enforce_detection=True # Si MTCNN no ve cara clara, aborta
)
emb = np.array(res[0]["embedding"], dtype=np.float32)
mejor_match, max_sim = buscar_mejor_match(emb, base_datos)
except Exception:
# MTCNN abortó porque la cara estaba de perfil, tapada o no era una cara
cv2.putText(display_frame, "MTCNN Ignorado",
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
continue
estado = " IDENTIFICADO" if max_sim > UMBRAL_SIM else "DESCONOCIDO"
nombre_d = mejor_match.split('_')[0] if mejor_match else "nadie"
n_bloques = int(max_sim * 20)
barra = "" * n_bloques + "" * (20 - n_bloques)
print(f"[REGISTRO] {estado} | {nombre_d:<14} | {barra} | "
f"{max_sim*100:.1f}% (umbral: {UMBRAL_SIM*100:.0f}%)")
if max_sim > UMBRAL_SIM and mejor_match:
color = (0, 255, 0)
texto = f"{mejor_match.split('_')[0]} ({max_sim:.2f})"
if mejor_match == persona_actual:
confirmaciones += 1
else:
persona_actual, confirmaciones = mejor_match, 1
if confirmaciones >= 2:
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (0, 255, 0), 3)
try:
analisis = DeepFace.analyze(
roi_mejorado, actions=['gender'], enforce_detection=False
)[0]
genero = analisis['dominant_gender']
except Exception:
genero = "Man"
threading.Thread(
target=hilo_bienvenida,
args=(mejor_match, genero),
daemon=True
).start()
ultimo_saludo = tiempo_actual
confirmaciones = 0
else:
color = (0, 0, 255)
texto = f"? ({max_sim:.2f})"
confirmaciones = max(0, confirmaciones - 1)
cv2.putText(display_frame, texto,
(fx, fy - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
cv2.imshow("Módulo de Registro", display_frame)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('r'):
if faces_ultimo_frame:
areas = [fw * fh for (fx, fy, fw, fh, _) in faces_ultimo_frame]
fx, fy, fw, fh, _ = faces_ultimo_frame[np.argmax(areas)]
m_x = int(fw * 0.30)
m_y = int(fh * 0.30)
face_roi = frame[max(0, fy-m_y): min(h, fy+fh+m_y),
max(0, fx-m_x): min(w, fx+fw+m_x)]
if face_roi.size > 0:
nom = input("\nNombre de la persona: ").strip()
if nom:
foto_path = os.path.join(DB_PATH, f"{nom}.jpg")
cv2.imwrite(foto_path, face_roi)
print(f"[OK] Rostro de '{nom}' guardado. Sincronizando...")
base_datos = gestionar_vectores(actualizar=True)
else:
print("[!] Registro cancelado.")
else:
print("[!] Recorte vacío. Intenta de nuevo.")
else:
print("\n[!] No se detectó rostro. Acércate más o mira a la lente.")
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
sistema_interactivo()