import os os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' os.environ['CUDA_VISIBLE_DEVICES'] = '-1' os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp|stimeout;3000000" import cv2 import numpy as np import time import threading from queue import Queue from deepface import DeepFace from ultralytics import YOLO import warnings warnings.filterwarnings("ignore") # ────────────────────────────────────────────────────────────────────────────── # 1. IMPORTAMOS NUESTROS MÓDULOS # ────────────────────────────────────────────────────────────────────────────── # Del motor matemático y tracking from seguimiento2 import GlobalMemory, CamManager, SECUENCIA, URLS, FUENTE, similitud_hibrida # Del motor de reconocimiento facial y audio from reconocimiento2 import ( gestionar_vectores, detectar_rostros_yunet, buscar_mejor_match, hilo_bienvenida, UMBRAL_SIM, COOLDOWN_TIME ) # ────────────────────────────────────────────────────────────────────────────── # 2. PROTECCIONES MULTIHILO E INICIALIZACIÓN # ────────────────────────────────────────────────────────────────────────────── COLA_ROSTROS = Queue(maxsize=4) YUNET_LOCK = threading.Lock() IA_LOCK = threading.Lock() # Inicializamos la base de datos usando tu función importada print("\nIniciando carga de base de datos...") BASE_DATOS_ROSTROS = gestionar_vectores(actualizar=True) # ────────────────────────────────────────────────────────────────────────────── # 3. MOTOR ASÍNCRONO # ────────────────────────────────────────────────────────────────────────────── def procesar_rostro_async(frame_hd, box_480, gid, cam_id, global_mem, trk): """ Toma el recorte del tracker, escala a HD, aplica filtros físicos y votación biométrica """ try: if not BASE_DATOS_ROSTROS: return # ────────────────────────────────────────────────────────── # 1. ESCALADO HD Y EXTRACCIÓN DE CABEZA (Solución Xayli) # ────────────────────────────────────────────────────────── h_real, w_real = frame_hd.shape[:2] if w_real <= 480: print(f"[ERROR CAM {cam_id}] Le estás pasando el frame_show (480x270) a ArcFace, no el HD.") escala_x = w_real / 480.0 escala_y = h_real / 270.0 x_min, y_min, x_max, y_max = box_480 h_box = y_max - y_min y_min_expandido = max(0, y_min - (h_box * 0.15)) # ⚡ 50% del cuerpo para no cortar cabezas de personas de menor estatura y_max_cabeza = min(270, y_min + (h_box * 0.50)) x1_hd = int(max(0, x_min) * escala_x) y1_hd = int(y_min_expandido * escala_y) x2_hd = int(min(480, x_max) * escala_x) y2_hd = int(y_max_cabeza * escala_y) roi_cabeza = frame_hd[y1_hd:y2_hd, x1_hd:x2_hd] # ⚡ Filtro físico relajado a 40x40 if roi_cabeza.size == 0 or roi_cabeza.shape[0] < 20 or roi_cabeza.shape[1] < 20: return h_roi, w_roi = roi_cabeza.shape[:2] # ────────────────────────────────────────────────────────── # 2. DETECCIÓN YUNET Y FILTROS ANTI-BASURA # ────────────────────────────────────────────────────────── faces = detectar_rostros_yunet(roi_cabeza, lock=YUNET_LOCK) for (rx, ry, rw, rh, score) in faces: rx, ry = max(0, rx), max(0, ry) rw, rh = min(w_roi - rx, rw), min(h_roi - ry, rh) area_rostro_actual = rw * rh with global_mem.lock: data = global_mem.db.get(gid, {}) nombre_actual = data.get('nombre') area_ref = data.get('area_rostro_ref', 0) necesita_saludo = False if str(cam_id) == "7": if not hasattr(global_mem, 'ultimos_saludos'): global_mem.ultimos_saludos = {} ultimo = global_mem.ultimos_saludos.get(nombre_actual if nombre_actual else "", 0) if (time.time() - ultimo) > COOLDOWN_TIME: necesita_saludo = True if nombre_actual is None or area_rostro_actual >= (area_ref * 1.5) or necesita_saludo: m_x = int(rw * 0.25) m_y = int(rh * 0.25) roi_rostro = roi_cabeza[max(0, ry-m_y):min(h_roi, ry+rh+m_y), max(0, rx-m_x):min(w_roi, rx+rw+m_x)] if roi_rostro.size == 0 or roi_rostro.shape[0] < 20 or roi_rostro.shape[1] < 20: continue # 🛡️ FILTRO ANTI-PERFIL: Evita falsos positivos de personas viendo de lado ratio_aspecto = roi_rostro.shape[1] / float(roi_rostro.shape[0]) if ratio_aspecto < 0.50: continue # 🛡️ FILTRO ÓPTICO (Movimiento) gray_roi = cv2.cvtColor(roi_rostro, cv2.COLOR_BGR2GRAY) nitidez = cv2.Laplacian(gray_roi, cv2.CV_64F).var() if nitidez < 15.0: continue # VISIÓN NOCTURNA (Simetría con Base de Datos) try: lab = cv2.cvtColor(roi_rostro, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) l = clahe.apply(l) roi_mejorado = cv2.cvtColor(cv2.merge((l, a, b)), cv2.COLOR_LAB2BGR) except Exception: roi_mejorado = roi_rostro # ────────────────────────────────────────────────────────── # 3. MOTOR MTCNN Y SISTEMA DE VOTACIÓN # ────────────────────────────────────────────────────────── with IA_LOCK: try: res = DeepFace.represent( img_path=roi_mejorado, model_name="ArcFace", detector_backend="mtcnn", align=True, enforce_detection=True ) emb = np.array(res[0]["embedding"], dtype=np.float32) mejor_match, max_sim = buscar_mejor_match(emb, BASE_DATOS_ROSTROS) except Exception: continue # Cara inválida para MTCNN print(f"[DEBUG CAM {cam_id}] ArcFace: {mejor_match} al {max_sim:.2f}") if max_sim >= UMBRAL_SIM and mejor_match: nombre_limpio = mejor_match.split('_')[0] with global_mem.lock: datos_id = global_mem.db.get(gid) if not datos_id: continue # SISTEMA DE VOTACIÓN (Anti-Falsos Positivos) if datos_id.get('candidato_nombre') == nombre_limpio: datos_id['votos_nombre'] = datos_id.get('votos_nombre', 0) + 1 else: datos_id['candidato_nombre'] = nombre_limpio datos_id['votos_nombre'] = 1 # ⚡ EL PASE VIP: Si la certeza es aplastante (>0.55), salta la burocracia if max_sim >= 0.50: datos_id['votos_nombre'] = max(2, datos_id['votos_nombre']) # Solo actuamos si tiene 2 votos consistentes... if datos_id['votos_nombre'] >= 2: nombre_actual = datos_id.get('nombre') # CANDADO DE BAUTIZO: Protege a los VIPs de alucinaciones borrosas if nombre_actual is not None and nombre_actual != nombre_limpio: if max_sim < 0.59: # Si es un puntaje bajo, es una confusión de ArcFace. Lo ignoramos. print(f" [RECHAZO] ArcFace intentó renombrar a {nombre_actual} como {nombre_limpio} con solo {max_sim:.2f}") continue else: # Si el puntaje es masivo, significa que OSNet se equivocó y pegó 2 personas print(f"[CORRECCIÓN VIP] OSNet se confundió. Renombrando a {nombre_limpio} ({max_sim:.2f})") # ⚡ BAUTIZO Y LIMPIEZA if nombre_actual != nombre_limpio: datos_id['nombre'] = nombre_limpio print(f" [BAUTIZO] ID {gid} confirmado como {nombre_limpio}") ids_a_borrar = [] firma_actual = datos_id['firmas'][0] if datos_id['firmas'] else None for otro_gid, datos_otro in list(global_mem.db.items()): if otro_gid == gid: continue if datos_otro.get('nombre') == nombre_limpio: ids_a_borrar.append(otro_gid) elif datos_otro.get('nombre') is None and firma_actual and datos_otro['firmas']: sim_huerfano = similitud_hibrida(firma_actual, datos_otro['firmas'][0]) if sim_huerfano > 0.75: ids_a_borrar.append(otro_gid) for id_basura in ids_a_borrar: del global_mem.db[id_basura] # Actualizamos referencias datos_id['area_rostro_ref'] = area_rostro_actual datos_id['ts'] = time.time() # BLINDAJE VIP: Si la certeza es absoluta, amarrar la ropa a este ID if max_sim > 0.65: # Usamos la función externa para evitar bloqueos dobles (Deadlocks) pass # 🔊 SALUDO DE BIENVENIDA if str(cam_id) == "7" and necesita_saludo: global_mem.ultimos_saludos[nombre_limpio] = time.time() import json genero = "Man" # Valor por defecto seguro ruta_generos = os.path.join("cache_nombres", "generos.json") if os.path.exists(ruta_generos): try: with open(ruta_generos, 'r') as f: dic_generos = json.load(f) genero = dic_generos.get(nombre_limpio, "Man") except Exception: pass # Si el archivo está ocupado, usamos el defecto # Lanzamos el audio instantáneamente sin IA pesada threading.Thread(target=hilo_bienvenida, args=(nombre_limpio, genero), daemon=True).start() # Ejecutamos el blindaje fuera del lock principal if max_sim > 0.65 and datos_id.get('votos_nombre', 0) >= 2: global_mem.confirmar_firma_vip(gid, time.time()) break # Salimos del loop de rostros si ya identificamos except Exception as e: pass finally: trk.procesando_rostro = False def worker_rostros(global_mem): """ Consumidor de la cola multihilo """ while True: frame, box, gid, cam_id, trk = COLA_ROSTROS.get() procesar_rostro_async(frame, box, gid, cam_id, global_mem, trk) COLA_ROSTROS.task_done() # ────────────────────────────────────────────────────────────────────────────── # 4. LOOP PRINCIPAL DE FUSIÓN # ────────────────────────────────────────────────────────────────────────────── class CamStream: def __init__(self, url): self.url = url self.cap = cv2.VideoCapture(url) self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) self.frame = None threading.Thread(target=self._run, daemon=True).start() def _run(self): while True: ret, f = self.cap.read() if ret: self.frame = f time.sleep(0.01) else: time.sleep(2) self.cap.open(self.url) def dibujar_track_fusion(frame_show, trk, global_mem): try: x1, y1, x2, y2 = map(int, trk.box) except Exception: return nombre_str = "" if trk.gid is not None: with global_mem.lock: nombre = global_mem.db.get(trk.gid, {}).get('nombre') if nombre: nombre_str = f" [{nombre}]" if trk.gid is None: color, label = (150, 150, 150), f"?{trk.local_id}" elif nombre_str: color, label = (255, 0, 255), f"ID:{trk.gid}{nombre_str}" elif trk.en_grupo: color, label = (0, 0, 255), f"ID:{trk.gid} [grp]" elif trk.aprendiendo: color, label = (255, 255, 0), f"ID:{trk.gid} [++]" elif trk.origen_global: color, label = (0, 165, 255), f"ID:{trk.gid} [re-id]" else: color, label = (0, 255, 0), f"ID:{trk.gid}" cv2.rectangle(frame_show, (x1, y1), (x2, y2), color, 2) (tw, th), _ = cv2.getTextSize(label, FUENTE, 0.55, 1) cv2.rectangle(frame_show, (x1, y1-th-6), (x1+tw+2, y1), color, -1) cv2.putText(frame_show, label, (x1+1, y1-4), FUENTE, 0.55, (0,0,0), 1) def main(): print("\nIniciando Sistema") model = YOLO("yolov8n.pt") global_mem = GlobalMemory() managers = {str(c): CamManager(c, global_mem) for c in SECUENCIA} cams = [CamStream(u) for u in URLS] for _ in range(2): threading.Thread(target=worker_rostros, args=(global_mem,), daemon=True).start() cv2.namedWindow("SmartSoft", cv2.WINDOW_AUTOSIZE) idx = 0 while True: now = time.time() tiles = [] cam_ia = idx % len(cams) for i, cam_obj in enumerate(cams): frame = cam_obj.frame; cid = str(SECUENCIA[i]) if frame is None: tiles.append(np.zeros((270, 480, 3), np.uint8)) continue frame_show = cv2.resize(frame.copy(), (480, 270)) boxes = [] turno_activo = (i == cam_ia) if turno_activo: res = model.predict(frame_show, conf=0.50, iou=0.50, classes=[0], verbose=False, imgsz=480) if res[0].boxes: boxes = res[0].boxes.xyxy.cpu().numpy().tolist() tracks = managers[cid].update(boxes, frame_show, frame, now, turno_activo) for trk in tracks: if trk.time_since_update <= 1: dibujar_track_fusion(frame_show, trk, global_mem) if turno_activo and trk.gid is not None and not getattr(trk, 'procesando_rostro', False): if not COLA_ROSTROS.full(): trk.procesando_rostro = True COLA_ROSTROS.put((frame.copy(), trk.box, trk.gid, cid, trk)) if turno_activo: cv2.circle(frame_show, (460, 20), 6, (0, 0, 255), -1) con_id = sum(1 for t in tracks if t.gid and t.time_since_update==0) cv2.putText(frame_show, f"CAM {cid} [{con_id} ID]", (10, 28), FUENTE, 0.7, (255, 255, 255), 2) tiles.append(frame_show) if len(tiles) == 6: cv2.imshow("SmartSoft Fusion", np.vstack([np.hstack(tiles[0:3]), np.hstack(tiles[3:6])])) idx += 1 # ⚡ CAPTURAMOS LA TECLA EN UNA VARIABLE # ⚡ CAPTURAMOS LA TECLA EN UNA VARIABLE key = cv2.waitKey(1) & 0xFF if key == ord('q'): break elif key == ord('r'): print("\n[MODO REGISTRO] Escaneando mosaico para registrar...") mejor_roi = None max_area = 0 # ⚡ CORRECCIÓN: 'cams' es una lista, usamos enumerate for i, cam_obj in enumerate(cams): if cam_obj.frame is None: continue faces = detectar_rostros_yunet(cam_obj.frame) for (fx, fy, fw, fh, score) in faces: area = fw * fh if area > max_area: max_area = area h_frame, w_frame = cam_obj.frame.shape[:2] # Margen amplio (30%) para MTCNN m_x, m_y = int(fw * 0.30), int(fh * 0.30) y1 = max(0, fy - m_y) y2 = min(h_frame, fy + fh + m_y) x1 = max(0, fx - m_x) x2 = min(w_frame, fx + fw + m_x) mejor_roi = cam_obj.frame[y1:y2, x1:x2] if mejor_roi is not None and mejor_roi.size > 0: cv2.imshow("Nueva Persona", mejor_roi) cv2.waitKey(1) nom = input("Escribe el nombre de la persona: ").strip() cv2.destroyWindow("Nueva Persona") if nom: import json # 1. Pedimos el género para no usar IA en el futuro gen_input = input("¿Es Hombre (h) o Mujer (m)?: ").strip().lower() genero_guardado = "Woman" if gen_input == 'm' else "Man" # 2. Actualizamos el caché de géneros al instante ruta_generos = os.path.join("cache_nombres", "generos.json") os.makedirs("cache_nombres", exist_ok=True) dic_generos = {} if os.path.exists(ruta_generos): try: with open(ruta_generos, 'r') as f: dic_generos = json.load(f) except Exception: pass dic_generos[nom] = genero_guardado try: with open(ruta_generos, 'w') as f: json.dump(dic_generos, f) except Exception as e: print(f"[!] Error al guardar el género: {e}") # 3. Guardado tradicional de la foto ruta_db = "db_institucion" os.makedirs(ruta_db, exist_ok=True) cv2.imwrite(os.path.join(ruta_db, f"{nom}.jpg"), mejor_roi) print(f"[OK] Rostro de '{nom}' guardado como {genero_guardado}.") print(" Sincronizando base de datos en caliente...") # Al llamar a esta función, el sistema alineará la foto sin pisar nuestro JSON nuevos_vectores = gestionar_vectores(actualizar=True) BASE_DATOS_ROSTROS.clear() BASE_DATOS_ROSTROS.update(nuevos_vectores) print(" Sistema listo.") else: print("[!] Registro cancelado.") if __name__ == "__main__": main()