378 lines
16 KiB
Python
378 lines
16 KiB
Python
|
|
import os
|
||
|
|
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
|
||
|
|
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
|
||
|
|
|
||
|
|
# ⚡ NUEVOS CANDADOS: Silenciamos la burocracia de OpenCV y Qt
|
||
|
|
os.environ['QT_LOGGING_RULES'] = '*=false'
|
||
|
|
os.environ['OPENCV_LOG_LEVEL'] = 'ERROR'
|
||
|
|
|
||
|
|
import cv2
|
||
|
|
import numpy as np
|
||
|
|
import pickle
|
||
|
|
import time
|
||
|
|
import threading
|
||
|
|
import asyncio
|
||
|
|
import edge_tts
|
||
|
|
import subprocess
|
||
|
|
from datetime import datetime
|
||
|
|
import warnings
|
||
|
|
import json
|
||
|
|
|
||
|
|
# ⚡ IMPORTACIÓN DE INSIGHTFACE
|
||
|
|
from insightface.app import FaceAnalysis
|
||
|
|
|
||
|
|
warnings.filterwarnings("ignore")
|
||
|
|
|
||
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
|
# CONFIGURACIÓN
|
||
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
|
DB_PATH = "db_institucion"
|
||
|
|
CACHE_PATH = "cache_nombres"
|
||
|
|
VECTORS_FILE = "base_datos_rostros.pkl"
|
||
|
|
TIMESTAMPS_FILE = "representaciones_timestamps.pkl"
|
||
|
|
UMBRAL_SIM = 0.45 # ⚡ Ajustado a la exigencia de producción
|
||
|
|
COOLDOWN_TIME = 15 # Segundos entre saludos
|
||
|
|
|
||
|
|
USUARIO, PASSWORD, IP_DVR = "admin", "TCA200503", "192.168.1.200"
|
||
|
|
RTSP_URL = f"rtsp://{USUARIO}:{PASSWORD}@{IP_DVR}:554/Streaming/Channels/702"
|
||
|
|
|
||
|
|
for path in [DB_PATH, CACHE_PATH]:
|
||
|
|
os.makedirs(path, exist_ok=True)
|
||
|
|
|
||
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
|
# EL NUEVO CEREBRO: INSIGHTFACE (buffalo_l)
|
||
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
|
print("Cargando motor InsightFace (buffalo_l)...")
|
||
|
|
# Carga detección (SCRFD), landmarks, reconocimiento (ArcFace) y atributos (género/edad)
|
||
|
|
app = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider'])
|
||
|
|
# det_size a 320x320 es un balance perfecto entre velocidad y capacidad de detectar rostros lejanos
|
||
|
|
app.prepare(ctx_id=0, det_size=(320, 320))
|
||
|
|
print("Motor cargado exitosamente.")
|
||
|
|
|
||
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
|
# SISTEMA DE AUDIO
|
||
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
|
def obtener_audios_humanos(genero):
|
||
|
|
hora = datetime.now().hour
|
||
|
|
es_mujer = genero.lower() == 'woman'
|
||
|
|
suffix = "_m.mp3" if es_mujer else "_h.mp3"
|
||
|
|
if 5 <= hora < 12:
|
||
|
|
intro = "dias.mp3"
|
||
|
|
elif 12 <= hora < 19:
|
||
|
|
intro = "tarde.mp3"
|
||
|
|
else:
|
||
|
|
intro = "noches.mp3"
|
||
|
|
cierre = ("fin_noche" if (hora >= 19 or hora < 5) else "fin_dia") + suffix
|
||
|
|
return intro, cierre
|
||
|
|
|
||
|
|
async def sintetizar_nombre(nombre, ruta):
|
||
|
|
nombre_limpio = nombre.replace('_', ' ')
|
||
|
|
try:
|
||
|
|
comunicador = edge_tts.Communicate(nombre_limpio, "es-MX-DaliaNeural", rate="+10%")
|
||
|
|
await comunicador.save(ruta)
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
def reproducir(archivo):
|
||
|
|
if os.path.exists(archivo):
|
||
|
|
subprocess.Popen(
|
||
|
|
["mpv", "--no-video", "--volume=100", archivo],
|
||
|
|
stdout=subprocess.DEVNULL,
|
||
|
|
stderr=subprocess.DEVNULL
|
||
|
|
)
|
||
|
|
|
||
|
|
def hilo_bienvenida(nombre, genero):
|
||
|
|
archivo_nombre = os.path.join(CACHE_PATH, f"nombre_{nombre}.mp3")
|
||
|
|
|
||
|
|
if not os.path.exists(archivo_nombre):
|
||
|
|
try:
|
||
|
|
asyncio.run(sintetizar_nombre(nombre, archivo_nombre))
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
intro, cierre = obtener_audios_humanos(genero)
|
||
|
|
|
||
|
|
archivos = [f for f in [intro, archivo_nombre, cierre] if os.path.exists(f)]
|
||
|
|
if archivos:
|
||
|
|
subprocess.Popen(
|
||
|
|
["mpv", "--no-video", "--volume=100"] + archivos,
|
||
|
|
stdout=subprocess.DEVNULL,
|
||
|
|
stderr=subprocess.DEVNULL
|
||
|
|
)
|
||
|
|
|
||
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
|
# GESTIÓN DE BASE DE DATOS (AHORA CON INSIGHTFACE)
|
||
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
|
def gestionar_vectores(actualizar=False):
|
||
|
|
vectores_actuales = {}
|
||
|
|
if os.path.exists(VECTORS_FILE):
|
||
|
|
try:
|
||
|
|
with open(VECTORS_FILE, 'rb') as f:
|
||
|
|
vectores_actuales = pickle.load(f)
|
||
|
|
except Exception:
|
||
|
|
vectores_actuales = {}
|
||
|
|
|
||
|
|
if not actualizar:
|
||
|
|
return vectores_actuales
|
||
|
|
|
||
|
|
timestamps = {}
|
||
|
|
if os.path.exists(TIMESTAMPS_FILE):
|
||
|
|
try:
|
||
|
|
with open(TIMESTAMPS_FILE, 'rb') as f:
|
||
|
|
timestamps = pickle.load(f)
|
||
|
|
except Exception:
|
||
|
|
timestamps = {}
|
||
|
|
|
||
|
|
ruta_generos = os.path.join(CACHE_PATH, "generos.json")
|
||
|
|
dic_generos = {}
|
||
|
|
if os.path.exists(ruta_generos):
|
||
|
|
try:
|
||
|
|
with open(ruta_generos, 'r') as f:
|
||
|
|
dic_generos = json.load(f)
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
print("\nACTUALIZANDO BASE DE DATOS (modelo y Caché de Géneros)...")
|
||
|
|
imagenes = [f for f in os.listdir(DB_PATH) if f.lower().endswith(('.jpg', '.png'))]
|
||
|
|
nombres_en_disco = set()
|
||
|
|
hubo_cambios = False
|
||
|
|
cambio_generos = False
|
||
|
|
|
||
|
|
for archivo in imagenes:
|
||
|
|
nombre_archivo = os.path.splitext(archivo)[0]
|
||
|
|
ruta_img = os.path.join(DB_PATH, archivo)
|
||
|
|
nombres_en_disco.add(nombre_archivo)
|
||
|
|
|
||
|
|
ts_actual = os.path.getmtime(ruta_img)
|
||
|
|
ts_guardado = timestamps.get(nombre_archivo, 0)
|
||
|
|
falta_genero = nombre_archivo not in dic_generos
|
||
|
|
|
||
|
|
if nombre_archivo in vectores_actuales and ts_actual == ts_guardado and not falta_genero:
|
||
|
|
continue
|
||
|
|
|
||
|
|
try:
|
||
|
|
img_db = cv2.imread(ruta_img)
|
||
|
|
|
||
|
|
# ⚡ INSIGHTFACE: Extracción primaria
|
||
|
|
faces = app.get(img_db)
|
||
|
|
|
||
|
|
# HACK DEL LIENZO NEGRO: Si la foto está muy recortada y no ve la cara,
|
||
|
|
# le agregamos un borde negro gigante para engañar al detector
|
||
|
|
if len(faces) == 0:
|
||
|
|
h_img, w_img = img_db.shape[:2]
|
||
|
|
img_pad = cv2.copyMakeBorder(img_db, h_img, h_img, w_img, w_img, cv2.BORDER_CONSTANT, value=(0,0,0))
|
||
|
|
faces = app.get(img_pad)
|
||
|
|
|
||
|
|
if len(faces) == 0:
|
||
|
|
print(f" [!] Rostro irrecuperable en '{archivo}'.")
|
||
|
|
continue
|
||
|
|
|
||
|
|
face = max(faces, key=lambda f: (f.bbox[2]-f.bbox[0]) * (f.bbox[3]-f.bbox[1]))
|
||
|
|
emb = np.array(face.normed_embedding, dtype=np.float32)
|
||
|
|
|
||
|
|
# ⚡ CORRECCIÓN DE GÉNERO: InsightFace devuelve "M" o "F"
|
||
|
|
if falta_genero:
|
||
|
|
dic_generos[nombre_archivo] = "Man" if face.sex == "M" else "Woman"
|
||
|
|
cambio_generos = True
|
||
|
|
|
||
|
|
vectores_actuales[nombre_archivo] = emb
|
||
|
|
timestamps[nombre_archivo] = ts_actual
|
||
|
|
hubo_cambios = True
|
||
|
|
print(f" Procesado: {nombre_archivo} | Género: {dic_generos.get(nombre_archivo)}")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f" Error procesando '{archivo}': {e}")
|
||
|
|
|
||
|
|
for nombre in list(vectores_actuales.keys()):
|
||
|
|
if nombre not in nombres_en_disco:
|
||
|
|
del vectores_actuales[nombre]
|
||
|
|
timestamps.pop(nombre, None)
|
||
|
|
if nombre in dic_generos:
|
||
|
|
del dic_generos[nombre]
|
||
|
|
cambio_generos = True
|
||
|
|
hubo_cambios = True
|
||
|
|
print(f" Eliminado (sin foto): {nombre}")
|
||
|
|
|
||
|
|
if hubo_cambios:
|
||
|
|
with open(VECTORS_FILE, 'wb') as f:
|
||
|
|
pickle.dump(vectores_actuales, f)
|
||
|
|
with open(TIMESTAMPS_FILE, 'wb') as f:
|
||
|
|
pickle.dump(timestamps, f)
|
||
|
|
|
||
|
|
if cambio_generos:
|
||
|
|
with open(ruta_generos, 'w') as f:
|
||
|
|
json.dump(dic_generos, f)
|
||
|
|
|
||
|
|
if hubo_cambios or cambio_generos:
|
||
|
|
print(" Sincronización terminada.\n")
|
||
|
|
else:
|
||
|
|
print(" Sin cambios. Base de datos al día.\n")
|
||
|
|
|
||
|
|
return vectores_actuales
|
||
|
|
|
||
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
|
# BÚSQUEDA BLINDADA
|
||
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
|
def buscar_mejor_match(emb_consulta, base_datos):
|
||
|
|
# InsightFace devuelve normed_embedding, por lo que la magnitud ya es 1
|
||
|
|
mejor_match, max_sim = None, -1.0
|
||
|
|
for nombre, vec in base_datos.items():
|
||
|
|
sim = float(np.dot(emb_consulta, vec))
|
||
|
|
if sim > max_sim:
|
||
|
|
max_sim = sim
|
||
|
|
mejor_match = nombre
|
||
|
|
|
||
|
|
return mejor_match, max_sim
|
||
|
|
|
||
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
|
# LOOP DE PRUEBA Y REGISTRO
|
||
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
|
def sistema_interactivo():
|
||
|
|
base_datos = gestionar_vectores(actualizar=False)
|
||
|
|
cap = cv2.VideoCapture(RTSP_URL)
|
||
|
|
ultimo_saludo = 0
|
||
|
|
persona_actual = None
|
||
|
|
confirmaciones = 0
|
||
|
|
|
||
|
|
print("\n" + "=" * 50)
|
||
|
|
print(" MÓDULO DE REGISTRO - INSIGHTFACE (buffalo_l)")
|
||
|
|
print(" [R] Registrar nuevo rostro | [Q] Salir")
|
||
|
|
print("=" * 50 + "\n")
|
||
|
|
|
||
|
|
faces_ultimo_frame = []
|
||
|
|
|
||
|
|
while True:
|
||
|
|
ret, frame = cap.read()
|
||
|
|
if not ret:
|
||
|
|
time.sleep(2)
|
||
|
|
cap.open(RTSP_URL)
|
||
|
|
continue
|
||
|
|
|
||
|
|
h, w = frame.shape[:2]
|
||
|
|
display_frame = frame.copy()
|
||
|
|
tiempo_actual = time.time()
|
||
|
|
|
||
|
|
# ⚡ INSIGHTFACE: Hace TODO aquí (Detección, Landmarks, Género, ArcFace)
|
||
|
|
faces_raw = app.get(frame)
|
||
|
|
faces_ultimo_frame = faces_raw
|
||
|
|
|
||
|
|
for face in faces_raw:
|
||
|
|
# Las cajas de InsightFace vienen en formato [x1, y1, x2, y2]
|
||
|
|
box = face.bbox.astype(int)
|
||
|
|
fx, fy, x2, y2 = box[0], box[1], box[2], box[3]
|
||
|
|
fw, fh = x2 - fx, y2 - fy
|
||
|
|
|
||
|
|
# Limitamos a los bordes de la pantalla
|
||
|
|
fx, fy = max(0, fx), max(0, fy)
|
||
|
|
fw, fh = min(w - fx, fw), min(h - fy, fh)
|
||
|
|
|
||
|
|
if fw <= 0 or fh <= 0:
|
||
|
|
continue
|
||
|
|
|
||
|
|
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (255, 200, 0), 2)
|
||
|
|
cv2.putText(display_frame, f"DET:{face.det_score:.2f}",
|
||
|
|
(fx, fy - 25), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 200, 0), 1)
|
||
|
|
|
||
|
|
if (tiempo_actual - ultimo_saludo) <= COOLDOWN_TIME:
|
||
|
|
continue
|
||
|
|
|
||
|
|
# FILTRO DE TAMAÑO (Para evitar reconocer píxeles lejanos)
|
||
|
|
if fw < 20 or fh < 20:
|
||
|
|
cv2.putText(display_frame, "muy pequeno",
|
||
|
|
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 100, 255), 1)
|
||
|
|
continue
|
||
|
|
|
||
|
|
# SIMETRÍA MATEMÁTICA INMEDIATA
|
||
|
|
emb = np.array(face.normed_embedding, dtype=np.float32)
|
||
|
|
mejor_match, max_sim = buscar_mejor_match(emb, base_datos)
|
||
|
|
|
||
|
|
estado = " IDENTIFICADO" if max_sim > UMBRAL_SIM else "DESCONOCIDO"
|
||
|
|
nombre_d = mejor_match.split('_')[0] if mejor_match else "nadie"
|
||
|
|
n_bloques = int(max_sim * 20)
|
||
|
|
barra = "█" * n_bloques + "░" * (20 - n_bloques)
|
||
|
|
print(f"[REGISTRO] {estado} | {nombre_d:<14} | {barra} | "
|
||
|
|
f"{max_sim*100:.1f}% (umbral: {UMBRAL_SIM*100:.0f}%)")
|
||
|
|
|
||
|
|
if max_sim > UMBRAL_SIM and mejor_match:
|
||
|
|
color = (0, 255, 0)
|
||
|
|
texto = f"{mejor_match.split('_')[0]} ({max_sim:.2f})"
|
||
|
|
|
||
|
|
if mejor_match == persona_actual:
|
||
|
|
confirmaciones += 1
|
||
|
|
else:
|
||
|
|
persona_actual, confirmaciones = mejor_match, 1
|
||
|
|
|
||
|
|
if confirmaciones >= 2:
|
||
|
|
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (0, 255, 0), 3)
|
||
|
|
|
||
|
|
# ⚡ GÉNERO INMEDIATO SIN SOBRECARGA DE CPU
|
||
|
|
genero = "Man" if face.sex == 1 else "Woman"
|
||
|
|
|
||
|
|
threading.Thread(
|
||
|
|
target=hilo_bienvenida,
|
||
|
|
args=(mejor_match, genero),
|
||
|
|
daemon=True
|
||
|
|
).start()
|
||
|
|
ultimo_saludo = tiempo_actual
|
||
|
|
confirmaciones = 0
|
||
|
|
else:
|
||
|
|
color = (0, 0, 255)
|
||
|
|
texto = f"? ({max_sim:.2f})"
|
||
|
|
confirmaciones = max(0, confirmaciones - 1)
|
||
|
|
|
||
|
|
cv2.putText(display_frame, texto,
|
||
|
|
(fx, fy - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
|
||
|
|
|
||
|
|
cv2.imshow("Módulo de Registro", display_frame)
|
||
|
|
key = cv2.waitKey(1) & 0xFF
|
||
|
|
|
||
|
|
if key == ord('q'):
|
||
|
|
break
|
||
|
|
|
||
|
|
elif key == ord('r'):
|
||
|
|
if faces_ultimo_frame:
|
||
|
|
# Tomamos la cara más grande detectada por InsightFace
|
||
|
|
face_registro = max(faces_ultimo_frame, key=lambda f: (f.bbox[2]-f.bbox[0]) * (f.bbox[3]-f.bbox[1]))
|
||
|
|
box = face_registro.bbox.astype(int)
|
||
|
|
fx, fy, fw, fh = box[0], box[1], box[2]-box[0], box[3]-box[1]
|
||
|
|
|
||
|
|
# Margen estético para guardar la foto
|
||
|
|
m_x, m_y = int(fw * 0.30), int(fh * 0.30)
|
||
|
|
y1, y2 = max(0, fy-m_y), min(h, fy+fh+m_y)
|
||
|
|
x1, x2 = max(0, fx-m_x), min(w, fx+fw+m_x)
|
||
|
|
|
||
|
|
face_roi = frame[y1:y2, x1:x2]
|
||
|
|
|
||
|
|
if face_roi.size > 0:
|
||
|
|
nom = input("\nNombre de la persona: ").strip()
|
||
|
|
if nom:
|
||
|
|
# Extraemos el género automáticamente gracias a InsightFace
|
||
|
|
gen_str = "Man" if face_registro.sex == 1 else "Woman"
|
||
|
|
|
||
|
|
# Actualizamos JSON directo sin preguntar
|
||
|
|
ruta_generos = os.path.join(CACHE_PATH, "generos.json")
|
||
|
|
dic_generos = {}
|
||
|
|
if os.path.exists(ruta_generos):
|
||
|
|
with open(ruta_generos, 'r') as f:
|
||
|
|
dic_generos = json.load(f)
|
||
|
|
dic_generos[nom] = gen_str
|
||
|
|
with open(ruta_generos, 'w') as f:
|
||
|
|
json.dump(dic_generos, f)
|
||
|
|
|
||
|
|
# Guardado de imagen y sincronización
|
||
|
|
foto_path = os.path.join(DB_PATH, f"{nom}.jpg")
|
||
|
|
cv2.imwrite(foto_path, face_roi)
|
||
|
|
print(f"[OK] Rostro de '{nom}' guardado como {gen_str}. Sincronizando...")
|
||
|
|
base_datos = gestionar_vectores(actualizar=True)
|
||
|
|
else:
|
||
|
|
print("[!] Registro cancelado.")
|
||
|
|
else:
|
||
|
|
print("[!] Recorte vacío. Intenta de nuevo.")
|
||
|
|
else:
|
||
|
|
print("\n[!] No se detectó rostro. Acércate más o mira a la lente.")
|
||
|
|
|
||
|
|
cap.release()
|
||
|
|
cv2.destroyAllWindows()
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
sistema_interactivo()
|