import os os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' os.environ['CUDA_VISIBLE_DEVICES'] = '-1' # ⚡ NUEVOS CANDADOS: Silenciamos la burocracia de OpenCV y Qt os.environ['QT_LOGGING_RULES'] = '*=false' os.environ['OPENCV_LOG_LEVEL'] = 'ERROR' import cv2 import numpy as np import pickle import time import threading import asyncio import edge_tts import subprocess from datetime import datetime import warnings import json # ⚡ IMPORTACIÓN DE INSIGHTFACE from insightface.app import FaceAnalysis warnings.filterwarnings("ignore") # ────────────────────────────────────────────────────────────────────────────── # CONFIGURACIÓN # ────────────────────────────────────────────────────────────────────────────── DB_PATH = "db_institucion" CACHE_PATH = "cache_nombres" VECTORS_FILE = "base_datos_rostros.pkl" TIMESTAMPS_FILE = "representaciones_timestamps.pkl" UMBRAL_SIM = 0.45 # ⚡ Ajustado a la exigencia de producción COOLDOWN_TIME = 15 # Segundos entre saludos USUARIO, PASSWORD, IP_DVR = "admin", "TCA200503", "192.168.1.200" RTSP_URL = f"rtsp://{USUARIO}:{PASSWORD}@{IP_DVR}:554/Streaming/Channels/702" for path in [DB_PATH, CACHE_PATH]: os.makedirs(path, exist_ok=True) # ────────────────────────────────────────────────────────────────────────────── # EL NUEVO CEREBRO: INSIGHTFACE (buffalo_l) # ────────────────────────────────────────────────────────────────────────────── print("Cargando motor InsightFace (buffalo_l)...") # Carga detección (SCRFD), landmarks, reconocimiento (ArcFace) y atributos (género/edad) app = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider']) # det_size a 320x320 es un balance perfecto entre velocidad y capacidad de detectar rostros lejanos app.prepare(ctx_id=0, det_size=(320, 320)) print("Motor cargado exitosamente.") # ────────────────────────────────────────────────────────────────────────────── # SISTEMA DE AUDIO # ────────────────────────────────────────────────────────────────────────────── def obtener_audios_humanos(genero): hora = datetime.now().hour es_mujer = genero.lower() == 'woman' suffix = "_m.mp3" if es_mujer else "_h.mp3" if 5 <= hora < 12: intro = "dias.mp3" elif 12 <= hora < 19: intro = "tarde.mp3" else: intro = "noches.mp3" cierre = ("fin_noche" if (hora >= 19 or hora < 5) else "fin_dia") + suffix return intro, cierre async def sintetizar_nombre(nombre, ruta): nombre_limpio = nombre.replace('_', ' ') try: comunicador = edge_tts.Communicate(nombre_limpio, "es-MX-DaliaNeural", rate="+10%") await comunicador.save(ruta) except Exception: pass def reproducir(archivo): if os.path.exists(archivo): subprocess.Popen( ["mpv", "--no-video", "--volume=100", archivo], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) def hilo_bienvenida(nombre, genero): archivo_nombre = os.path.join(CACHE_PATH, f"nombre_{nombre}.mp3") if not os.path.exists(archivo_nombre): try: asyncio.run(sintetizar_nombre(nombre, archivo_nombre)) except Exception: pass intro, cierre = obtener_audios_humanos(genero) archivos = [f for f in [intro, archivo_nombre, cierre] if os.path.exists(f)] if archivos: subprocess.Popen( ["mpv", "--no-video", "--volume=100"] + archivos, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) # ────────────────────────────────────────────────────────────────────────────── # GESTIÓN DE BASE DE DATOS (AHORA CON INSIGHTFACE) # ────────────────────────────────────────────────────────────────────────────── def gestionar_vectores(actualizar=False): vectores_actuales = {} if os.path.exists(VECTORS_FILE): try: with open(VECTORS_FILE, 'rb') as f: vectores_actuales = pickle.load(f) except Exception: vectores_actuales = {} if not actualizar: return vectores_actuales timestamps = {} if os.path.exists(TIMESTAMPS_FILE): try: with open(TIMESTAMPS_FILE, 'rb') as f: timestamps = pickle.load(f) except Exception: timestamps = {} ruta_generos = os.path.join(CACHE_PATH, "generos.json") dic_generos = {} if os.path.exists(ruta_generos): try: with open(ruta_generos, 'r') as f: dic_generos = json.load(f) except Exception: pass print("\nACTUALIZANDO BASE DE DATOS (modelo y Caché de Géneros)...") imagenes = [f for f in os.listdir(DB_PATH) if f.lower().endswith(('.jpg', '.png'))] nombres_en_disco = set() hubo_cambios = False cambio_generos = False for archivo in imagenes: nombre_archivo = os.path.splitext(archivo)[0] ruta_img = os.path.join(DB_PATH, archivo) nombres_en_disco.add(nombre_archivo) ts_actual = os.path.getmtime(ruta_img) ts_guardado = timestamps.get(nombre_archivo, 0) falta_genero = nombre_archivo not in dic_generos if nombre_archivo in vectores_actuales and ts_actual == ts_guardado and not falta_genero: continue try: img_db = cv2.imread(ruta_img) # ⚡ INSIGHTFACE: Extracción primaria faces = app.get(img_db) # HACK DEL LIENZO NEGRO: Si la foto está muy recortada y no ve la cara, # le agregamos un borde negro gigante para engañar al detector if len(faces) == 0: h_img, w_img = img_db.shape[:2] img_pad = cv2.copyMakeBorder(img_db, h_img, h_img, w_img, w_img, cv2.BORDER_CONSTANT, value=(0,0,0)) faces = app.get(img_pad) if len(faces) == 0: print(f" [!] Rostro irrecuperable en '{archivo}'.") continue face = max(faces, key=lambda f: (f.bbox[2]-f.bbox[0]) * (f.bbox[3]-f.bbox[1])) emb = np.array(face.normed_embedding, dtype=np.float32) # ⚡ CORRECCIÓN DE GÉNERO: InsightFace devuelve "M" o "F" if falta_genero: dic_generos[nombre_archivo] = "Man" if face.sex == "M" else "Woman" cambio_generos = True vectores_actuales[nombre_archivo] = emb timestamps[nombre_archivo] = ts_actual hubo_cambios = True print(f" Procesado: {nombre_archivo} | Género: {dic_generos.get(nombre_archivo)}") except Exception as e: print(f" Error procesando '{archivo}': {e}") for nombre in list(vectores_actuales.keys()): if nombre not in nombres_en_disco: del vectores_actuales[nombre] timestamps.pop(nombre, None) if nombre in dic_generos: del dic_generos[nombre] cambio_generos = True hubo_cambios = True print(f" Eliminado (sin foto): {nombre}") if hubo_cambios: with open(VECTORS_FILE, 'wb') as f: pickle.dump(vectores_actuales, f) with open(TIMESTAMPS_FILE, 'wb') as f: pickle.dump(timestamps, f) if cambio_generos: with open(ruta_generos, 'w') as f: json.dump(dic_generos, f) if hubo_cambios or cambio_generos: print(" Sincronización terminada.\n") else: print(" Sin cambios. Base de datos al día.\n") return vectores_actuales # ────────────────────────────────────────────────────────────────────────────── # BÚSQUEDA BLINDADA # ────────────────────────────────────────────────────────────────────────────── def buscar_mejor_match(emb_consulta, base_datos): # InsightFace devuelve normed_embedding, por lo que la magnitud ya es 1 mejor_match, max_sim = None, -1.0 for nombre, vec in base_datos.items(): sim = float(np.dot(emb_consulta, vec)) if sim > max_sim: max_sim = sim mejor_match = nombre return mejor_match, max_sim # ────────────────────────────────────────────────────────────────────────────── # LOOP DE PRUEBA Y REGISTRO # ────────────────────────────────────────────────────────────────────────────── def sistema_interactivo(): base_datos = gestionar_vectores(actualizar=False) cap = cv2.VideoCapture(RTSP_URL) ultimo_saludo = 0 persona_actual = None confirmaciones = 0 print("\n" + "=" * 50) print(" MÓDULO DE REGISTRO - INSIGHTFACE (buffalo_l)") print(" [R] Registrar nuevo rostro | [Q] Salir") print("=" * 50 + "\n") faces_ultimo_frame = [] while True: ret, frame = cap.read() if not ret: time.sleep(2) cap.open(RTSP_URL) continue h, w = frame.shape[:2] display_frame = frame.copy() tiempo_actual = time.time() # ⚡ INSIGHTFACE: Hace TODO aquí (Detección, Landmarks, Género, ArcFace) faces_raw = app.get(frame) faces_ultimo_frame = faces_raw for face in faces_raw: # Las cajas de InsightFace vienen en formato [x1, y1, x2, y2] box = face.bbox.astype(int) fx, fy, x2, y2 = box[0], box[1], box[2], box[3] fw, fh = x2 - fx, y2 - fy # Limitamos a los bordes de la pantalla fx, fy = max(0, fx), max(0, fy) fw, fh = min(w - fx, fw), min(h - fy, fh) if fw <= 0 or fh <= 0: continue cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (255, 200, 0), 2) cv2.putText(display_frame, f"DET:{face.det_score:.2f}", (fx, fy - 25), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 200, 0), 1) if (tiempo_actual - ultimo_saludo) <= COOLDOWN_TIME: continue # FILTRO DE TAMAÑO (Para evitar reconocer píxeles lejanos) if fw < 20 or fh < 20: cv2.putText(display_frame, "muy pequeno", (fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 100, 255), 1) continue # SIMETRÍA MATEMÁTICA INMEDIATA emb = np.array(face.normed_embedding, dtype=np.float32) mejor_match, max_sim = buscar_mejor_match(emb, base_datos) estado = " IDENTIFICADO" if max_sim > UMBRAL_SIM else "DESCONOCIDO" nombre_d = mejor_match.split('_')[0] if mejor_match else "nadie" n_bloques = int(max_sim * 20) barra = "█" * n_bloques + "░" * (20 - n_bloques) print(f"[REGISTRO] {estado} | {nombre_d:<14} | {barra} | " f"{max_sim*100:.1f}% (umbral: {UMBRAL_SIM*100:.0f}%)") if max_sim > UMBRAL_SIM and mejor_match: color = (0, 255, 0) texto = f"{mejor_match.split('_')[0]} ({max_sim:.2f})" if mejor_match == persona_actual: confirmaciones += 1 else: persona_actual, confirmaciones = mejor_match, 1 if confirmaciones >= 2: cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (0, 255, 0), 3) # ⚡ GÉNERO INMEDIATO SIN SOBRECARGA DE CPU genero = "Man" if face.sex == 1 else "Woman" threading.Thread( target=hilo_bienvenida, args=(mejor_match, genero), daemon=True ).start() ultimo_saludo = tiempo_actual confirmaciones = 0 else: color = (0, 0, 255) texto = f"? ({max_sim:.2f})" confirmaciones = max(0, confirmaciones - 1) cv2.putText(display_frame, texto, (fx, fy - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) cv2.imshow("Módulo de Registro", display_frame) key = cv2.waitKey(1) & 0xFF if key == ord('q'): break elif key == ord('r'): if faces_ultimo_frame: # Tomamos la cara más grande detectada por InsightFace face_registro = max(faces_ultimo_frame, key=lambda f: (f.bbox[2]-f.bbox[0]) * (f.bbox[3]-f.bbox[1])) box = face_registro.bbox.astype(int) fx, fy, fw, fh = box[0], box[1], box[2]-box[0], box[3]-box[1] # Margen estético para guardar la foto m_x, m_y = int(fw * 0.30), int(fh * 0.30) y1, y2 = max(0, fy-m_y), min(h, fy+fh+m_y) x1, x2 = max(0, fx-m_x), min(w, fx+fw+m_x) face_roi = frame[y1:y2, x1:x2] if face_roi.size > 0: nom = input("\nNombre de la persona: ").strip() if nom: # Extraemos el género automáticamente gracias a InsightFace gen_str = "Man" if face_registro.sex == 1 else "Woman" # Actualizamos JSON directo sin preguntar ruta_generos = os.path.join(CACHE_PATH, "generos.json") dic_generos = {} if os.path.exists(ruta_generos): with open(ruta_generos, 'r') as f: dic_generos = json.load(f) dic_generos[nom] = gen_str with open(ruta_generos, 'w') as f: json.dump(dic_generos, f) # Guardado de imagen y sincronización foto_path = os.path.join(DB_PATH, f"{nom}.jpg") cv2.imwrite(foto_path, face_roi) print(f"[OK] Rostro de '{nom}' guardado como {gen_str}. Sincronizando...") base_datos = gestionar_vectores(actualizar=True) else: print("[!] Registro cancelado.") else: print("[!] Recorte vacío. Intenta de nuevo.") else: print("\n[!] No se detectó rostro. Acércate más o mira a la lente.") cap.release() cv2.destroyAllWindows() if __name__ == "__main__": sistema_interactivo()