IdentificacionIA/reconocimiento.py

378 lines
16 KiB
Python
Raw Normal View History

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
# ⚡ NUEVOS CANDADOS: Silenciamos la burocracia de OpenCV y Qt
os.environ['QT_LOGGING_RULES'] = '*=false'
os.environ['OPENCV_LOG_LEVEL'] = 'ERROR'
import cv2
import numpy as np
import pickle
import time
import threading
import asyncio
import edge_tts
import subprocess
from datetime import datetime
import warnings
import json
# ⚡ IMPORTACIÓN DE INSIGHTFACE
from insightface.app import FaceAnalysis
warnings.filterwarnings("ignore")
# ──────────────────────────────────────────────────────────────────────────────
# CONFIGURACIÓN
# ──────────────────────────────────────────────────────────────────────────────
DB_PATH = "db_institucion"
CACHE_PATH = "cache_nombres"
VECTORS_FILE = "base_datos_rostros.pkl"
TIMESTAMPS_FILE = "representaciones_timestamps.pkl"
UMBRAL_SIM = 0.45 # ⚡ Ajustado a la exigencia de producción
COOLDOWN_TIME = 15 # Segundos entre saludos
USUARIO, PASSWORD, IP_DVR = "admin", "TCA200503", "192.168.1.200"
RTSP_URL = f"rtsp://{USUARIO}:{PASSWORD}@{IP_DVR}:554/Streaming/Channels/702"
for path in [DB_PATH, CACHE_PATH]:
os.makedirs(path, exist_ok=True)
# ──────────────────────────────────────────────────────────────────────────────
# EL NUEVO CEREBRO: INSIGHTFACE (buffalo_l)
# ──────────────────────────────────────────────────────────────────────────────
print("Cargando motor InsightFace (buffalo_l)...")
# Carga detección (SCRFD), landmarks, reconocimiento (ArcFace) y atributos (género/edad)
app = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider'])
# det_size a 320x320 es un balance perfecto entre velocidad y capacidad de detectar rostros lejanos
app.prepare(ctx_id=0, det_size=(320, 320))
print("Motor cargado exitosamente.")
# ──────────────────────────────────────────────────────────────────────────────
# SISTEMA DE AUDIO
# ──────────────────────────────────────────────────────────────────────────────
def obtener_audios_humanos(genero):
hora = datetime.now().hour
es_mujer = genero.lower() == 'woman'
suffix = "_m.mp3" if es_mujer else "_h.mp3"
if 5 <= hora < 12:
intro = "dias.mp3"
elif 12 <= hora < 19:
intro = "tarde.mp3"
else:
intro = "noches.mp3"
cierre = ("fin_noche" if (hora >= 19 or hora < 5) else "fin_dia") + suffix
return intro, cierre
async def sintetizar_nombre(nombre, ruta):
nombre_limpio = nombre.replace('_', ' ')
try:
comunicador = edge_tts.Communicate(nombre_limpio, "es-MX-DaliaNeural", rate="+10%")
await comunicador.save(ruta)
except Exception:
pass
def reproducir(archivo):
if os.path.exists(archivo):
subprocess.Popen(
["mpv", "--no-video", "--volume=100", archivo],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
def hilo_bienvenida(nombre, genero):
archivo_nombre = os.path.join(CACHE_PATH, f"nombre_{nombre}.mp3")
if not os.path.exists(archivo_nombre):
try:
asyncio.run(sintetizar_nombre(nombre, archivo_nombre))
except Exception:
pass
intro, cierre = obtener_audios_humanos(genero)
archivos = [f for f in [intro, archivo_nombre, cierre] if os.path.exists(f)]
if archivos:
subprocess.Popen(
["mpv", "--no-video", "--volume=100"] + archivos,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# ──────────────────────────────────────────────────────────────────────────────
# GESTIÓN DE BASE DE DATOS (AHORA CON INSIGHTFACE)
# ──────────────────────────────────────────────────────────────────────────────
def gestionar_vectores(actualizar=False):
vectores_actuales = {}
if os.path.exists(VECTORS_FILE):
try:
with open(VECTORS_FILE, 'rb') as f:
vectores_actuales = pickle.load(f)
except Exception:
vectores_actuales = {}
if not actualizar:
return vectores_actuales
timestamps = {}
if os.path.exists(TIMESTAMPS_FILE):
try:
with open(TIMESTAMPS_FILE, 'rb') as f:
timestamps = pickle.load(f)
except Exception:
timestamps = {}
ruta_generos = os.path.join(CACHE_PATH, "generos.json")
dic_generos = {}
if os.path.exists(ruta_generos):
try:
with open(ruta_generos, 'r') as f:
dic_generos = json.load(f)
except Exception:
pass
print("\nACTUALIZANDO BASE DE DATOS (modelo y Caché de Géneros)...")
imagenes = [f for f in os.listdir(DB_PATH) if f.lower().endswith(('.jpg', '.png'))]
nombres_en_disco = set()
hubo_cambios = False
cambio_generos = False
for archivo in imagenes:
nombre_archivo = os.path.splitext(archivo)[0]
ruta_img = os.path.join(DB_PATH, archivo)
nombres_en_disco.add(nombre_archivo)
ts_actual = os.path.getmtime(ruta_img)
ts_guardado = timestamps.get(nombre_archivo, 0)
falta_genero = nombre_archivo not in dic_generos
if nombre_archivo in vectores_actuales and ts_actual == ts_guardado and not falta_genero:
continue
try:
img_db = cv2.imread(ruta_img)
# ⚡ INSIGHTFACE: Extracción primaria
faces = app.get(img_db)
# HACK DEL LIENZO NEGRO: Si la foto está muy recortada y no ve la cara,
# le agregamos un borde negro gigante para engañar al detector
if len(faces) == 0:
h_img, w_img = img_db.shape[:2]
img_pad = cv2.copyMakeBorder(img_db, h_img, h_img, w_img, w_img, cv2.BORDER_CONSTANT, value=(0,0,0))
faces = app.get(img_pad)
if len(faces) == 0:
print(f" [!] Rostro irrecuperable en '{archivo}'.")
continue
face = max(faces, key=lambda f: (f.bbox[2]-f.bbox[0]) * (f.bbox[3]-f.bbox[1]))
emb = np.array(face.normed_embedding, dtype=np.float32)
# ⚡ CORRECCIÓN DE GÉNERO: InsightFace devuelve "M" o "F"
if falta_genero:
dic_generos[nombre_archivo] = "Man" if face.sex == "M" else "Woman"
cambio_generos = True
vectores_actuales[nombre_archivo] = emb
timestamps[nombre_archivo] = ts_actual
hubo_cambios = True
print(f" Procesado: {nombre_archivo} | Género: {dic_generos.get(nombre_archivo)}")
except Exception as e:
print(f" Error procesando '{archivo}': {e}")
for nombre in list(vectores_actuales.keys()):
if nombre not in nombres_en_disco:
del vectores_actuales[nombre]
timestamps.pop(nombre, None)
if nombre in dic_generos:
del dic_generos[nombre]
cambio_generos = True
hubo_cambios = True
print(f" Eliminado (sin foto): {nombre}")
if hubo_cambios:
with open(VECTORS_FILE, 'wb') as f:
pickle.dump(vectores_actuales, f)
with open(TIMESTAMPS_FILE, 'wb') as f:
pickle.dump(timestamps, f)
if cambio_generos:
with open(ruta_generos, 'w') as f:
json.dump(dic_generos, f)
if hubo_cambios or cambio_generos:
print(" Sincronización terminada.\n")
else:
print(" Sin cambios. Base de datos al día.\n")
return vectores_actuales
# ──────────────────────────────────────────────────────────────────────────────
# BÚSQUEDA BLINDADA
# ──────────────────────────────────────────────────────────────────────────────
def buscar_mejor_match(emb_consulta, base_datos):
# InsightFace devuelve normed_embedding, por lo que la magnitud ya es 1
mejor_match, max_sim = None, -1.0
for nombre, vec in base_datos.items():
sim = float(np.dot(emb_consulta, vec))
if sim > max_sim:
max_sim = sim
mejor_match = nombre
return mejor_match, max_sim
# ──────────────────────────────────────────────────────────────────────────────
# LOOP DE PRUEBA Y REGISTRO
# ──────────────────────────────────────────────────────────────────────────────
def sistema_interactivo():
base_datos = gestionar_vectores(actualizar=False)
cap = cv2.VideoCapture(RTSP_URL)
ultimo_saludo = 0
persona_actual = None
confirmaciones = 0
print("\n" + "=" * 50)
print(" MÓDULO DE REGISTRO - INSIGHTFACE (buffalo_l)")
print(" [R] Registrar nuevo rostro | [Q] Salir")
print("=" * 50 + "\n")
faces_ultimo_frame = []
while True:
ret, frame = cap.read()
if not ret:
time.sleep(2)
cap.open(RTSP_URL)
continue
h, w = frame.shape[:2]
display_frame = frame.copy()
tiempo_actual = time.time()
# ⚡ INSIGHTFACE: Hace TODO aquí (Detección, Landmarks, Género, ArcFace)
faces_raw = app.get(frame)
faces_ultimo_frame = faces_raw
for face in faces_raw:
# Las cajas de InsightFace vienen en formato [x1, y1, x2, y2]
box = face.bbox.astype(int)
fx, fy, x2, y2 = box[0], box[1], box[2], box[3]
fw, fh = x2 - fx, y2 - fy
# Limitamos a los bordes de la pantalla
fx, fy = max(0, fx), max(0, fy)
fw, fh = min(w - fx, fw), min(h - fy, fh)
if fw <= 0 or fh <= 0:
continue
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (255, 200, 0), 2)
cv2.putText(display_frame, f"DET:{face.det_score:.2f}",
(fx, fy - 25), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 200, 0), 1)
if (tiempo_actual - ultimo_saludo) <= COOLDOWN_TIME:
continue
# FILTRO DE TAMAÑO (Para evitar reconocer píxeles lejanos)
if fw < 20 or fh < 20:
cv2.putText(display_frame, "muy pequeno",
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 100, 255), 1)
continue
# SIMETRÍA MATEMÁTICA INMEDIATA
emb = np.array(face.normed_embedding, dtype=np.float32)
mejor_match, max_sim = buscar_mejor_match(emb, base_datos)
estado = " IDENTIFICADO" if max_sim > UMBRAL_SIM else "DESCONOCIDO"
nombre_d = mejor_match.split('_')[0] if mejor_match else "nadie"
n_bloques = int(max_sim * 20)
barra = "" * n_bloques + "" * (20 - n_bloques)
print(f"[REGISTRO] {estado} | {nombre_d:<14} | {barra} | "
f"{max_sim*100:.1f}% (umbral: {UMBRAL_SIM*100:.0f}%)")
if max_sim > UMBRAL_SIM and mejor_match:
color = (0, 255, 0)
texto = f"{mejor_match.split('_')[0]} ({max_sim:.2f})"
if mejor_match == persona_actual:
confirmaciones += 1
else:
persona_actual, confirmaciones = mejor_match, 1
if confirmaciones >= 2:
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (0, 255, 0), 3)
# ⚡ GÉNERO INMEDIATO SIN SOBRECARGA DE CPU
genero = "Man" if face.sex == 1 else "Woman"
threading.Thread(
target=hilo_bienvenida,
args=(mejor_match, genero),
daemon=True
).start()
ultimo_saludo = tiempo_actual
confirmaciones = 0
else:
color = (0, 0, 255)
texto = f"? ({max_sim:.2f})"
confirmaciones = max(0, confirmaciones - 1)
cv2.putText(display_frame, texto,
(fx, fy - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
cv2.imshow("Módulo de Registro", display_frame)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('r'):
if faces_ultimo_frame:
# Tomamos la cara más grande detectada por InsightFace
face_registro = max(faces_ultimo_frame, key=lambda f: (f.bbox[2]-f.bbox[0]) * (f.bbox[3]-f.bbox[1]))
box = face_registro.bbox.astype(int)
fx, fy, fw, fh = box[0], box[1], box[2]-box[0], box[3]-box[1]
# Margen estético para guardar la foto
m_x, m_y = int(fw * 0.30), int(fh * 0.30)
y1, y2 = max(0, fy-m_y), min(h, fy+fh+m_y)
x1, x2 = max(0, fx-m_x), min(w, fx+fw+m_x)
face_roi = frame[y1:y2, x1:x2]
if face_roi.size > 0:
nom = input("\nNombre de la persona: ").strip()
if nom:
# Extraemos el género automáticamente gracias a InsightFace
gen_str = "Man" if face_registro.sex == 1 else "Woman"
# Actualizamos JSON directo sin preguntar
ruta_generos = os.path.join(CACHE_PATH, "generos.json")
dic_generos = {}
if os.path.exists(ruta_generos):
with open(ruta_generos, 'r') as f:
dic_generos = json.load(f)
dic_generos[nom] = gen_str
with open(ruta_generos, 'w') as f:
json.dump(dic_generos, f)
# Guardado de imagen y sincronización
foto_path = os.path.join(DB_PATH, f"{nom}.jpg")
cv2.imwrite(foto_path, face_roi)
print(f"[OK] Rostro de '{nom}' guardado como {gen_str}. Sincronizando...")
base_datos = gestionar_vectores(actualizar=True)
else:
print("[!] Registro cancelado.")
else:
print("[!] Recorte vacío. Intenta de nuevo.")
else:
print("\n[!] No se detectó rostro. Acércate más o mira a la lente.")
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
sistema_interactivo()