IdentificacionIA/fusion.py

435 lines
22 KiB
Python
Raw Normal View History

2026-03-18 17:45:30 +00:00
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp|stimeout;3000000"
2026-03-18 17:45:30 +00:00
import cv2
import numpy as np
import time
import threading
from queue import Queue
from deepface import DeepFace
from ultralytics import YOLO
import warnings
2026-04-08 17:00:23 +00:00
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Usando dispositivo: {device}")
2026-03-18 17:45:30 +00:00
warnings.filterwarnings("ignore")
# ──────────────────────────────────────────────────────────────────────────────
# 1. IMPORTAMOS NUESTROS MÓDULOS
# ──────────────────────────────────────────────────────────────────────────────
# Del motor matemático y tracking
from seguimiento2 import GlobalMemory, CamManager, SECUENCIA, URLS, FUENTE, similitud_hibrida
2026-03-18 17:45:30 +00:00
# Del motor de reconocimiento facial y audio
from reconocimiento2 import (
gestionar_vectores,
detectar_rostros_yunet,
buscar_mejor_match,
hilo_bienvenida,
UMBRAL_SIM,
COOLDOWN_TIME
)
# ──────────────────────────────────────────────────────────────────────────────
# 2. PROTECCIONES MULTIHILO E INICIALIZACIÓN
# ──────────────────────────────────────────────────────────────────────────────
COLA_ROSTROS = Queue(maxsize=4)
YUNET_LOCK = threading.Lock()
IA_LOCK = threading.Lock()
# Inicializamos la base de datos usando tu función importada
print("\nIniciando carga de base de datos...")
BASE_DATOS_ROSTROS = gestionar_vectores(actualizar=True)
# ──────────────────────────────────────────────────────────────────────────────
# 3. MOTOR ASÍNCRONO
# ──────────────────────────────────────────────────────────────────────────────
def procesar_rostro_async(frame_hd, box_480, gid, cam_id, global_mem, trk):
""" Toma el recorte del tracker, escala a HD, aplica filtros físicos y votación biométrica """
2026-03-18 17:45:30 +00:00
try:
if not BASE_DATOS_ROSTROS: return
# ──────────────────────────────────────────────────────────
# 1. ESCALADO HD Y EXTRACCIÓN DE CABEZA (Solución Xayli)
# ──────────────────────────────────────────────────────────
h_real, w_real = frame_hd.shape[:2]
if w_real <= 480:
print(f"[ERROR CAM {cam_id}] Le estás pasando el frame_show (480x270) a ArcFace, no el HD.")
2026-03-18 17:45:30 +00:00
escala_x = w_real / 480.0
escala_y = h_real / 270.0
x_min, y_min, x_max, y_max = box_480
2026-03-18 17:45:30 +00:00
h_box = y_max - y_min
y_min_expandido = max(0, y_min - (h_box * 0.15))
# ⚡ 50% del cuerpo para no cortar cabezas de personas de menor estatura
y_max_cabeza = min(270, y_min + (h_box * 0.50))
2026-03-18 17:45:30 +00:00
x1_hd = int(max(0, x_min) * escala_x)
y1_hd = int(y_min_expandido * escala_y)
x2_hd = int(min(480, x_max) * escala_x)
y2_hd = int(y_max_cabeza * escala_y)
2026-03-18 17:45:30 +00:00
roi_cabeza = frame_hd[y1_hd:y2_hd, x1_hd:x2_hd]
2026-03-18 17:45:30 +00:00
# ⚡ Filtro físico relajado a 40x40
if roi_cabeza.size == 0 or roi_cabeza.shape[0] < 20 or roi_cabeza.shape[1] < 20:
2026-03-18 17:45:30 +00:00
return
h_roi, w_roi = roi_cabeza.shape[:2]
# ──────────────────────────────────────────────────────────
# 2. DETECCIÓN YUNET Y FILTROS ANTI-BASURA
# ──────────────────────────────────────────────────────────
2026-03-18 17:45:30 +00:00
faces = detectar_rostros_yunet(roi_cabeza, lock=YUNET_LOCK)
for (rx, ry, rw, rh, score) in faces:
rx, ry = max(0, rx), max(0, ry)
rw, rh = min(w_roi - rx, rw), min(h_roi - ry, rh)
area_rostro_actual = rw * rh
with global_mem.lock:
data = global_mem.db.get(gid, {})
nombre_actual = data.get('nombre')
area_ref = data.get('area_rostro_ref', 0)
necesita_saludo = False
if str(cam_id) == "7":
if not hasattr(global_mem, 'ultimos_saludos'):
global_mem.ultimos_saludos = {}
ultimo = global_mem.ultimos_saludos.get(nombre_actual if nombre_actual else "", 0)
if (time.time() - ultimo) > COOLDOWN_TIME:
necesita_saludo = True
if nombre_actual is None or area_rostro_actual >= (area_ref * 1.5) or necesita_saludo:
m_x = int(rw * 0.25)
m_y = int(rh * 0.25)
2026-03-18 17:45:30 +00:00
roi_rostro = roi_cabeza[max(0, ry-m_y):min(h_roi, ry+rh+m_y),
max(0, rx-m_x):min(w_roi, rx+rw+m_x)]
if roi_rostro.size == 0 or roi_rostro.shape[0] < 20 or roi_rostro.shape[1] < 20:
2026-03-18 17:45:30 +00:00
continue
# 🛡️ FILTRO ANTI-PERFIL: Evita falsos positivos de personas viendo de lado
ratio_aspecto = roi_rostro.shape[1] / float(roi_rostro.shape[0])
if ratio_aspecto < 0.50:
continue
# 🛡️ FILTRO ÓPTICO (Movimiento)
2026-03-18 17:45:30 +00:00
gray_roi = cv2.cvtColor(roi_rostro, cv2.COLOR_BGR2GRAY)
nitidez = cv2.Laplacian(gray_roi, cv2.CV_64F).var()
if nitidez < 15.0:
2026-03-18 17:45:30 +00:00
continue
# VISIÓN NOCTURNA (Simetría con Base de Datos)
try:
lab = cv2.cvtColor(roi_rostro, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
l = clahe.apply(l)
roi_mejorado = cv2.cvtColor(cv2.merge((l, a, b)), cv2.COLOR_LAB2BGR)
except Exception:
roi_mejorado = roi_rostro
# ──────────────────────────────────────────────────────────
# 3. MOTOR MTCNN Y SISTEMA DE VOTACIÓN
# ──────────────────────────────────────────────────────────
2026-03-18 17:45:30 +00:00
with IA_LOCK:
try:
res = DeepFace.represent(
img_path=roi_mejorado,
model_name="ArcFace",
detector_backend="mtcnn",
align=True,
enforce_detection=True
)
2026-03-18 17:45:30 +00:00
emb = np.array(res[0]["embedding"], dtype=np.float32)
mejor_match, max_sim = buscar_mejor_match(emb, BASE_DATOS_ROSTROS)
except Exception:
continue # Cara inválida para MTCNN
2026-03-18 17:45:30 +00:00
print(f"[DEBUG CAM {cam_id}] ArcFace: {mejor_match} al {max_sim:.2f}")
2026-03-18 17:45:30 +00:00
if max_sim >= UMBRAL_SIM and mejor_match:
2026-03-18 17:45:30 +00:00
nombre_limpio = mejor_match.split('_')[0]
with global_mem.lock:
datos_id = global_mem.db.get(gid)
if not datos_id: continue
# SISTEMA DE VOTACIÓN (Anti-Falsos Positivos)
if datos_id.get('candidato_nombre') == nombre_limpio:
datos_id['votos_nombre'] = datos_id.get('votos_nombre', 0) + 1
2026-03-18 17:45:30 +00:00
else:
datos_id['candidato_nombre'] = nombre_limpio
datos_id['votos_nombre'] = 1
2026-03-18 17:45:30 +00:00
# ⚡ EL PASE VIP: Si la certeza es aplastante (>0.55), salta la burocracia
if max_sim >= 0.50:
datos_id['votos_nombre'] = max(2, datos_id['votos_nombre'])
# Solo actuamos si tiene 2 votos consistentes...
if datos_id['votos_nombre'] >= 2:
nombre_actual = datos_id.get('nombre')
2026-03-18 17:45:30 +00:00
# CANDADO DE BAUTIZO: Protege a los VIPs de alucinaciones borrosas
if nombre_actual is not None and nombre_actual != nombre_limpio:
if max_sim < 0.59:
# Si es un puntaje bajo, es una confusión de ArcFace. Lo ignoramos.
print(f" [RECHAZO] ArcFace intentó renombrar a {nombre_actual} como {nombre_limpio} con solo {max_sim:.2f}")
continue
else:
# Si el puntaje es masivo, significa que OSNet se equivocó y pegó 2 personas
print(f"[CORRECCIÓN VIP] OSNet se confundió. Renombrando a {nombre_limpio} ({max_sim:.2f})")
# ⚡ BAUTIZO Y LIMPIEZA
if nombre_actual != nombre_limpio:
datos_id['nombre'] = nombre_limpio
print(f" [BAUTIZO] ID {gid} confirmado como {nombre_limpio}")
ids_a_borrar = []
firma_actual = datos_id['firmas'][0] if datos_id['firmas'] else None
for otro_gid, datos_otro in list(global_mem.db.items()):
if otro_gid == gid: continue
if datos_otro.get('nombre') == nombre_limpio:
ids_a_borrar.append(otro_gid)
elif datos_otro.get('nombre') is None and firma_actual and datos_otro['firmas']:
sim_huerfano = similitud_hibrida(firma_actual, datos_otro['firmas'][0])
if sim_huerfano > 0.75:
ids_a_borrar.append(otro_gid)
for id_basura in ids_a_borrar:
del global_mem.db[id_basura]
# Actualizamos referencias
datos_id['area_rostro_ref'] = area_rostro_actual
datos_id['ts'] = time.time()
# BLINDAJE VIP: Si la certeza es absoluta, amarrar la ropa a este ID
if max_sim > 0.65:
# Usamos la función externa para evitar bloqueos dobles (Deadlocks)
pass
# 🔊 SALUDO DE BIENVENIDA
if str(cam_id) == "7" and necesita_saludo:
global_mem.ultimos_saludos[nombre_limpio] = time.time()
import json
genero = "Man" # Valor por defecto seguro
ruta_generos = os.path.join("cache_nombres", "generos.json")
if os.path.exists(ruta_generos):
try:
with open(ruta_generos, 'r') as f:
dic_generos = json.load(f)
genero = dic_generos.get(nombre_limpio, "Man")
except Exception:
pass # Si el archivo está ocupado, usamos el defecto
# Lanzamos el audio instantáneamente sin IA pesada
threading.Thread(target=hilo_bienvenida, args=(nombre_limpio, genero), daemon=True).start()
# Ejecutamos el blindaje fuera del lock principal
if max_sim > 0.65 and datos_id.get('votos_nombre', 0) >= 2:
global_mem.confirmar_firma_vip(gid, time.time())
break # Salimos del loop de rostros si ya identificamos
2026-03-18 17:45:30 +00:00
except Exception as e:
pass
finally:
trk.procesando_rostro = False
2026-03-18 17:45:30 +00:00
def worker_rostros(global_mem):
""" Consumidor de la cola multihilo """
while True:
frame, box, gid, cam_id, trk = COLA_ROSTROS.get()
procesar_rostro_async(frame, box, gid, cam_id, global_mem, trk)
COLA_ROSTROS.task_done()
# ──────────────────────────────────────────────────────────────────────────────
# 4. LOOP PRINCIPAL DE FUSIÓN
# ──────────────────────────────────────────────────────────────────────────────
class CamStream:
def __init__(self, url):
self.url = url
self.cap = cv2.VideoCapture(url)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.frame = None
threading.Thread(target=self._run, daemon=True).start()
def _run(self):
while True:
ret, f = self.cap.read()
if ret:
self.frame = f
time.sleep(0.01)
else:
time.sleep(2)
self.cap.open(self.url)
def dibujar_track_fusion(frame_show, trk, global_mem):
try: x1, y1, x2, y2 = map(int, trk.box)
except Exception: return
nombre_str = ""
if trk.gid is not None:
with global_mem.lock:
nombre = global_mem.db.get(trk.gid, {}).get('nombre')
if nombre: nombre_str = f" [{nombre}]"
if trk.gid is None: color, label = (150, 150, 150), f"?{trk.local_id}"
elif nombre_str: color, label = (255, 0, 255), f"ID:{trk.gid}{nombre_str}"
elif trk.en_grupo: color, label = (0, 0, 255), f"ID:{trk.gid} [grp]"
elif trk.aprendiendo: color, label = (255, 255, 0), f"ID:{trk.gid} [++]"
elif trk.origen_global: color, label = (0, 165, 255), f"ID:{trk.gid} [re-id]"
else: color, label = (0, 255, 0), f"ID:{trk.gid}"
cv2.rectangle(frame_show, (x1, y1), (x2, y2), color, 2)
(tw, th), _ = cv2.getTextSize(label, FUENTE, 0.55, 1)
cv2.rectangle(frame_show, (x1, y1-th-6), (x1+tw+2, y1), color, -1)
cv2.putText(frame_show, label, (x1+1, y1-4), FUENTE, 0.55, (0,0,0), 1)
def main():
print("\nIniciando Sistema")
2026-04-08 17:00:23 +00:00
model = YOLO("yolov8n.pt").to("cuda")
2026-03-18 17:45:30 +00:00
global_mem = GlobalMemory()
managers = {str(c): CamManager(c, global_mem) for c in SECUENCIA}
cams = [CamStream(u) for u in URLS]
for _ in range(2):
threading.Thread(target=worker_rostros, args=(global_mem,), daemon=True).start()
2026-04-08 17:00:23 +00:00
cv2.namedWindow("SmartSoft", cv2.WINDOW_NORMAL)
2026-03-18 17:45:30 +00:00
idx = 0
while True:
now = time.time()
tiles = []
cam_ia = idx % len(cams)
for i, cam_obj in enumerate(cams):
frame = cam_obj.frame; cid = str(SECUENCIA[i])
if frame is None:
tiles.append(np.zeros((270, 480, 3), np.uint8))
continue
frame_show = cv2.resize(frame.copy(), (480, 270))
boxes = []
turno_activo = (i == cam_ia)
if turno_activo:
res = model.predict(frame_show, conf=0.50, iou=0.50, classes=[0], verbose=False, imgsz=480)
2026-03-18 17:45:30 +00:00
if res[0].boxes:
boxes = res[0].boxes.xyxy.cpu().numpy().tolist()
tracks = managers[cid].update(boxes, frame_show, frame, now, turno_activo)
2026-03-18 17:45:30 +00:00
for trk in tracks:
if trk.time_since_update <= 1:
dibujar_track_fusion(frame_show, trk, global_mem)
if turno_activo and trk.gid is not None and not getattr(trk, 'procesando_rostro', False):
if not COLA_ROSTROS.full():
trk.procesando_rostro = True
COLA_ROSTROS.put((frame.copy(), trk.box, trk.gid, cid, trk))
if turno_activo: cv2.circle(frame_show, (460, 20), 6, (0, 0, 255), -1)
con_id = sum(1 for t in tracks if t.gid and t.time_since_update==0)
cv2.putText(frame_show, f"CAM {cid} [{con_id} ID]", (10, 28), FUENTE, 0.7, (255, 255, 255), 2)
tiles.append(frame_show)
if len(tiles) == 6:
cv2.imshow("SmartSoft Fusion", np.vstack([np.hstack(tiles[0:3]), np.hstack(tiles[3:6])]))
idx += 1
# ⚡ CAPTURAMOS LA TECLA EN UNA VARIABLE
# ⚡ CAPTURAMOS LA TECLA EN UNA VARIABLE
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
2026-03-18 17:45:30 +00:00
break
elif key == ord('r'):
print("\n[MODO REGISTRO] Escaneando mosaico para registrar...")
mejor_roi = None
max_area = 0
# ⚡ CORRECCIÓN: 'cams' es una lista, usamos enumerate
for i, cam_obj in enumerate(cams):
if cam_obj.frame is None: continue
faces = detectar_rostros_yunet(cam_obj.frame)
for (fx, fy, fw, fh, score) in faces:
area = fw * fh
if area > max_area:
max_area = area
h_frame, w_frame = cam_obj.frame.shape[:2]
# Margen amplio (30%) para MTCNN
m_x, m_y = int(fw * 0.30), int(fh * 0.30)
y1 = max(0, fy - m_y)
y2 = min(h_frame, fy + fh + m_y)
x1 = max(0, fx - m_x)
x2 = min(w_frame, fx + fw + m_x)
mejor_roi = cam_obj.frame[y1:y2, x1:x2]
if mejor_roi is not None and mejor_roi.size > 0:
cv2.imshow("Nueva Persona", mejor_roi)
cv2.waitKey(1)
nom = input("Escribe el nombre de la persona: ").strip()
cv2.destroyWindow("Nueva Persona")
if nom:
import json
# 1. Pedimos el género para no usar IA en el futuro
gen_input = input("¿Es Hombre (h) o Mujer (m)?: ").strip().lower()
genero_guardado = "Woman" if gen_input == 'm' else "Man"
# 2. Actualizamos el caché de géneros al instante
ruta_generos = os.path.join("cache_nombres", "generos.json")
os.makedirs("cache_nombres", exist_ok=True)
dic_generos = {}
if os.path.exists(ruta_generos):
try:
with open(ruta_generos, 'r') as f:
dic_generos = json.load(f)
except Exception:
pass
dic_generos[nom] = genero_guardado
try:
with open(ruta_generos, 'w') as f:
json.dump(dic_generos, f)
except Exception as e:
print(f"[!] Error al guardar el género: {e}")
# 3. Guardado tradicional de la foto
ruta_db = "db_institucion"
os.makedirs(ruta_db, exist_ok=True)
cv2.imwrite(os.path.join(ruta_db, f"{nom}.jpg"), mejor_roi)
print(f"[OK] Rostro de '{nom}' guardado como {genero_guardado}.")
print(" Sincronizando base de datos en caliente...")
# Al llamar a esta función, el sistema alineará la foto sin pisar nuestro JSON
nuevos_vectores = gestionar_vectores(actualizar=True)
BASE_DATOS_ROSTROS.clear()
BASE_DATOS_ROSTROS.update(nuevos_vectores)
print(" Sistema listo.")
else:
print("[!] Registro cancelado.")
2026-03-18 17:45:30 +00:00
if __name__ == "__main__":
main()