IdentificacionIA/reconocimiento2.py

444 lines
18 KiB
Python
Raw Normal View History

2026-03-18 17:45:30 +00:00
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import cv2
import numpy as np
from deepface import DeepFace
import pickle
import time
import threading
import asyncio
import edge_tts
import subprocess
from datetime import datetime
import warnings
import urllib.request
warnings.filterwarnings("ignore")
# ──────────────────────────────────────────────────────────────────────────────
# CONFIGURACIÓN
# ──────────────────────────────────────────────────────────────────────────────
DB_PATH = "db_institucion"
CACHE_PATH = "cache_nombres"
VECTORS_FILE = "representaciones.pkl"
TIMESTAMPS_FILE = "representaciones_timestamps.pkl"
UMBRAL_SIM = 0.50 # Por encima → identificado. Por debajo → desconocido.
COOLDOWN_TIME = 15 # Segundos entre saludos
USUARIO, PASSWORD, IP_DVR = "admin", "TCA200503", "192.168.1.65"
RTSP_URL = f"rtsp://{USUARIO}:{PASSWORD}@{IP_DVR}:554/Streaming/Channels/702"
for path in [DB_PATH, CACHE_PATH]:
os.makedirs(path, exist_ok=True)
# ──────────────────────────────────────────────────────────────────────────────
# YUNET — Detector facial rápido en CPU
# ──────────────────────────────────────────────────────────────────────────────
YUNET_MODEL_PATH = "face_detection_yunet_2023mar.onnx"
if not os.path.exists(YUNET_MODEL_PATH):
print(f"Descargando YuNet ({YUNET_MODEL_PATH})...")
url = ("https://github.com/opencv/opencv_zoo/raw/main/models/"
"face_detection_yunet/face_detection_yunet_2023mar.onnx")
urllib.request.urlretrieve(url, YUNET_MODEL_PATH)
print("YuNet descargado.")
# Detector estricto para ROIs grandes (persona cerca)
# score_threshold alto → menos falsos positivos en fondos
detector_yunet = cv2.FaceDetectorYN.create(
model=YUNET_MODEL_PATH, config="",
input_size=(320, 320),
score_threshold=0.70,
nms_threshold=0.3,
top_k=5000
)
# Detector permisivo para ROIs pequeños (persona lejos)
# score_threshold bajo → no perdemos caras pequeñas o a contraluz
detector_yunet_lejano = cv2.FaceDetectorYN.create(
model=YUNET_MODEL_PATH, config="",
input_size=(320, 320),
score_threshold=0.45,
nms_threshold=0.3,
top_k=5000
)
def detectar_rostros_yunet(roi, lock=None):
"""
Elige automáticamente el detector según el tamaño del ROI.
ROI grande detector estricto (evita falsos positivos).
ROI pequeño detector permisivo (no pierde caras lejanas).
Devuelve lista de (x, y, w, h) o lista vacía.
"""
h_roi, w_roi = roi.shape[:2]
area = w_roi * h_roi
det = detector_yunet if area > 8000 else detector_yunet_lejano
try:
if lock:
with lock:
det.setInputSize((w_roi, h_roi))
_, faces = det.detect(roi)
else:
det.setInputSize((w_roi, h_roi))
_, faces = det.detect(roi)
except Exception:
return []
if faces is None:
return []
resultado = []
for face in faces:
try:
fx, fy, fw, fh = map(int, face[:4])
score = float(face[14]) if len(face) > 14 else 1.0
resultado.append((fx, fy, fw, fh, score))
except (ValueError, OverflowError, TypeError):
continue
return resultado
# ──────────────────────────────────────────────────────────────────────────────
# SISTEMA DE AUDIO
# ──────────────────────────────────────────────────────────────────────────────
def obtener_audios_humanos(genero):
hora = datetime.now().hour
es_mujer = genero.lower() == 'woman'
suffix = "_m.mp3" if es_mujer else "_h.mp3"
if 5 <= hora < 12:
intro = "dias.mp3"
elif 12 <= hora < 19:
intro = "tarde.mp3"
else:
intro = "noches.mp3"
cierre = ("fin_noche" if (hora >= 19 or hora < 5) else "fin_dia") + suffix
return intro, cierre
async def sintetizar_nombre(nombre, ruta):
"""Genera el audio del nombre con edge-tts (solo si no existe en caché)."""
nombre_limpio = nombre.replace('_', ' ')
try:
comunicador = edge_tts.Communicate(nombre_limpio, "es-MX-DaliaNeural", rate="+10%")
await comunicador.save(ruta)
except Exception:
pass
def reproducir(archivo):
"""Lanza mpv en segundo plano sin bloquear."""
if os.path.exists(archivo):
subprocess.Popen(
["mpv", "--no-video", "--volume=100", archivo],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# mpv encadena los 3 archivos en una sola llamada
def hilo_bienvenida(nombre, genero):
"""
Reproduce intro + nombre + cierre sin time.sleep().
mpv los reproduce en orden nativamente el hilo queda libre de inmediato.
"""
archivo_nombre = os.path.join(CACHE_PATH, f"nombre_{nombre}.mp3")
# Sintetizar solo si no estaba en caché
if not os.path.exists(archivo_nombre):
try:
asyncio.run(sintetizar_nombre(nombre, archivo_nombre))
except Exception:
pass
intro, cierre = obtener_audios_humanos(genero)
# Una sola llamada a mpv con los 3 archivos en secuencia
archivos = [f for f in [intro, archivo_nombre, cierre] if os.path.exists(f)]
if archivos:
subprocess.Popen(
["mpv", "--no-video", "--volume=100"] + archivos,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# ──────────────────────────────────────────────────────────────────────────────
# GESTIÓN DE BASE DE DATOS
# Incremental: solo procesa fotos nuevas o modificadas
# Vectores normalizados al guardar → comparación 3x más rápida
# ──────────────────────────────────────────────────────────────────────────────
def gestionar_vectores(actualizar=False):
"""
Carga o actualiza el diccionario {nombre: vector_normalizado}.
- Si actualizar=False y el .pkl existe carga directamente (instantáneo).
- Si actualizar=True solo reprocesa fotos nuevas o modificadas (incremental).
- Los vectores se guardan NORMALIZADOS la comparación es un simple np.dot().
"""
vectores_actuales = {}
# Intentar cargar lo que ya teníamos
if os.path.exists(VECTORS_FILE):
try:
with open(VECTORS_FILE, 'rb') as f:
vectores_actuales = pickle.load(f)
except Exception:
vectores_actuales = {}
if not actualizar:
return vectores_actuales
# ── Cargar timestamps para detectar cambios ───────────────────────────────
timestamps = {}
if os.path.exists(TIMESTAMPS_FILE):
try:
with open(TIMESTAMPS_FILE, 'rb') as f:
timestamps = pickle.load(f)
except Exception:
timestamps = {}
print("\nACTUALIZANDO BASE DE DATOS (solo fotos nuevas o modificadas)...")
imagenes = [f for f in os.listdir(DB_PATH) if f.lower().endswith(('.jpg', '.png'))]
nombres_en_disco = set()
hubo_cambios = False
for archivo in imagenes:
nombre_archivo = os.path.splitext(archivo)[0]
ruta_img = os.path.join(DB_PATH, archivo)
nombres_en_disco.add(nombre_archivo)
ts_actual = os.path.getmtime(ruta_img)
ts_guardado = timestamps.get(nombre_archivo, 0)
# Si ya la teníamos y no cambió → saltar (no llamar a ArcFace)
if nombre_archivo in vectores_actuales and ts_actual == ts_guardado:
continue
try:
res = DeepFace.represent(
img_path=ruta_img, model_name="ArcFace", enforce_detection=True
)
emb = np.array(res[0]["embedding"], dtype=np.float32)
# MEJORA 2: Normalizar al guardar (una sola vez para siempre)
norma = np.linalg.norm(emb)
if norma > 0:
emb = emb / norma
vectores_actuales[nombre_archivo] = emb
timestamps[nombre_archivo] = ts_actual
hubo_cambios = True
print(f" Procesado: {nombre_archivo}")
except Exception:
print(f" Rostro no válido en '{archivo}', omitido.")
# Eliminar personas cuya foto fue borrada del disco
for nombre in list(vectores_actuales.keys()):
if nombre not in nombres_en_disco:
del vectores_actuales[nombre]
timestamps.pop(nombre, None)
hubo_cambios = True
print(f" Eliminado (sin foto): {nombre}")
if hubo_cambios:
with open(VECTORS_FILE, 'wb') as f:
pickle.dump(vectores_actuales, f)
with open(TIMESTAMPS_FILE, 'wb') as f:
pickle.dump(timestamps, f)
print(" Sincronización terminada.\n")
else:
print(" Sin cambios. Base de datos al día.\n")
return vectores_actuales
def buscar_mejor_match(emb_consulta, base_datos):
"""
Compara un embedding (ya normalizado) contra la base de datos.
MEJORA 2: Solo producto punto no hay divisiones por norma en el loop.
Devuelve (mejor_nombre, similitud_maxima).
"""
# Normalizar el embedding de consulta
norma = np.linalg.norm(emb_consulta)
if norma > 0:
emb_consulta = emb_consulta / norma
mejor_match, max_sim = None, -1.0
for nombre, vec in base_datos.items():
# vec ya está normalizado → sim coseno = producto punto puro
sim = float(np.dot(emb_consulta, vec))
if sim > max_sim:
max_sim, mejor_match = sim, nombre
return mejor_match, max_sim
# ──────────────────────────────────────────────────────────────────────────────
# LOOP DE PRUEBA Y REGISTRO
# MEJORA 5 — Reemplaza face_cascade por YuNet (consistente con principal2.py)
# ──────────────────────────────────────────────────────────────────────────────
def sistema_interactivo():
"""
Ventana de depuración y registro de nuevas personas.
Muestra barra de similitud en consola y en pantalla.
Controles: [R] Registrar | [Q] Salir
"""
base_datos = gestionar_vectores(actualizar=False)
cap = cv2.VideoCapture(RTSP_URL)
ultimo_saludo = 0
persona_actual = None
confirmaciones = 0
print("\n" + "=" * 50)
print(" MÓDULO DE REGISTRO Y DEPURACIÓN")
print(" [R] Registrar nuevo rostro | [Q] Salir")
print("=" * 50 + "\n")
faces_ultimo_frame = [] # Para que [R] pueda usarlas aunque no haya detección nueva
while True:
ret, frame = cap.read()
if not ret:
time.sleep(2)
cap.open(RTSP_URL)
continue
h, w = frame.shape[:2]
display_frame = frame.copy()
tiempo_actual = time.time()
# ── Detección con YuNet sobre el frame completo ───────────────────────
faces_raw = detectar_rostros_yunet(frame)
faces_ultimo_frame = faces_raw # Guardamos para usar con [R]
for (fx, fy, fw, fh, score_yunet) in faces_raw:
# Clampear dentro de la imagen
fx = max(0, fx); fy = max(0, fy)
fw = min(w - fx, fw); fh = min(h - fy, fh)
if fw <= 0 or fh <= 0:
continue
# Rectángulo base (amarillo)
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (255, 200, 0), 2)
cv2.putText(display_frame, f"YN:{score_yunet:.2f}",
(fx, fy - 25), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 200, 0), 1)
if (tiempo_actual - ultimo_saludo) <= COOLDOWN_TIME:
continue # En cooldown, no procesamos
# ── Recorte del rostro con margen ─────────────────────────────────
m = int(fw * 0.15)
roi = frame[max(0, fy-m): min(h, fy+fh+m),
max(0, fx-m): min(w, fx+fw+m)]
if roi.size == 0 or roi.shape[0] < 40 or roi.shape[1] < 40:
cv2.putText(display_frame, "muy pequeño",
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 100, 255), 1)
continue
# ── Filtro de nitidez (descarta caras movidas antes de ArcFace) ───
gray_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
nitidez = cv2.Laplacian(gray_roi, cv2.CV_64F).var()
if nitidez < 50.0:
cv2.putText(display_frame, f"blur({nitidez:.0f})",
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 165, 255), 1)
continue
# ── ArcFace ───────────────────────────────────────────────────────
try:
res = DeepFace.represent(
img_path=roi, model_name="ArcFace", enforce_detection=False
)
emb = np.array(res[0]["embedding"], dtype=np.float32)
mejor_match, max_sim = buscar_mejor_match(emb, base_datos)
except Exception as e:
print(f"[ERROR ArcFace]: {e}")
continue
# ── Barra de similitud en consola ─────────────────────────────────
estado = " IDENTIFICADO" if max_sim > UMBRAL_SIM else "DESCONOCIDO"
nombre_d = mejor_match.split('_')[0] if mejor_match else "nadie"
n_bloques = int(max_sim * 20)
barra = "" * n_bloques + "" * (20 - n_bloques)
print(f"[REGISTRO] {estado} | {nombre_d:<14} | {barra} | "
f"{max_sim*100:.1f}% (umbral: {UMBRAL_SIM*100:.0f}%)")
# ── Resultado en pantalla ─────────────────────────────────────────
if max_sim > UMBRAL_SIM and mejor_match:
color = (0, 255, 0)
texto = f"{mejor_match.split('_')[0]} ({max_sim:.2f})"
if mejor_match == persona_actual:
confirmaciones += 1
else:
persona_actual, confirmaciones = mejor_match, 1
if confirmaciones >= 2:
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (0, 255, 0), 3)
try:
analisis = DeepFace.analyze(
roi, actions=['gender'], enforce_detection=False
)[0]
genero = analisis['dominant_gender']
except Exception:
genero = "Man"
threading.Thread(
target=hilo_bienvenida,
args=(mejor_match, genero),
daemon=True
).start()
ultimo_saludo = tiempo_actual
confirmaciones = 0
else:
color = (0, 0, 255)
texto = f"? ({max_sim:.2f})"
confirmaciones = max(0, confirmaciones - 1)
cv2.putText(display_frame, texto,
(fx, fy - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
# ── Mostrar frame ─────────────────────────────────────────────────────
cv2.imshow("Módulo de Registro", display_frame)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
#R: Registrar el rostro más grande del frame ─────────────────
elif key == ord('r'):
if faces_ultimo_frame:
# Elegimos la cara más grande (más cercana)
areas = [fw * fh for (fx, fy, fw, fh, _) in faces_ultimo_frame]
fx, fy, fw, fh, _ = faces_ultimo_frame[np.argmax(areas)]
m = 25
face_roi = frame[max(0, fy-m): min(h, fy+fh+m),
max(0, fx-m): min(w, fx+fw+m)]
if face_roi.size > 0:
nom = input("\nNombre de la persona: ").strip()
if nom:
foto_path = os.path.join(DB_PATH, f"{nom}.jpg")
cv2.imwrite(foto_path, face_roi)
print(f"[OK] Rostro de '{nom}' guardado. Sincronizando...")
base_datos = gestionar_vectores(actualizar=True)
else:
print("[!] Registro cancelado.")
else:
print("[!] Recorte vacío. Intenta de nuevo.")
else:
print("\n[!] No se detectó rostro. Acércate más o mira a la lente.")
cap.release()
cv2.destroyAllWindows()
# ──────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
sistema_interactivo()