Compare commits

..

No commits in common. "ede8d8dda85b2ba266c7a64a1bb708103516d71e" and "6bc9a5cb4438f982696af5ea7a2488e3af109729" have entirely different histories.

22 changed files with 2207 additions and 3161 deletions

352
.gitignore vendored
View File

@ -1,178 +1,174 @@
# ---> Python # ---> Python
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
*$py.class *$py.class
ven2/
.venv/ # C extensions
__pycache__/ *.so
*.pkl
# Distribution / packaging
# C extensions .Python
*.so build/
develop-eggs/
# Distribution / packaging dist/
.Python downloads/
build/ eggs/
develop-eggs/ .eggs/
dist/ lib/
downloads/ lib64/
eggs/ parts/
.eggs/ sdist/
lib/ var/
lib64/ wheels/
parts/ share/python-wheels/
sdist/ *.egg-info/
var/ .installed.cfg
wheels/ *.egg
share/python-wheels/ MANIFEST
*.egg-info/
.installed.cfg # PyInstaller
*.egg # Usually these files are written by a python script from a template
MANIFEST # before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
# PyInstaller *.spec
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it. # Installer logs
*.manifest pip-log.txt
*.spec pip-delete-this-directory.txt
# Installer logs # Unit test / coverage reports
pip-log.txt htmlcov/
pip-delete-this-directory.txt .tox/
.nox/
# Unit test / coverage reports .coverage
htmlcov/ .coverage.*
.tox/ .cache
.nox/ nosetests.xml
.coverage coverage.xml
.coverage.* *.cover
.cache *.py,cover
nosetests.xml .hypothesis/
coverage.xml .pytest_cache/
*.cover cover/
*.py,cover
.hypothesis/ # Translations
.pytest_cache/ *.mo
cover/ *.pot
# Translations # Django stuff:
*.mo *.log
*.pot local_settings.py
db.sqlite3
# Django stuff: db.sqlite3-journal
*.log
local_settings.py # Flask stuff:
db.sqlite3 instance/
db.sqlite3-journal .webassets-cache
# Flask stuff: # Scrapy stuff:
instance/ .scrapy
.webassets-cache
# Sphinx documentation
# Scrapy stuff: docs/_build/
.scrapy
# PyBuilder
# Sphinx documentation .pybuilder/
docs/_build/ target/
# PyBuilder # Jupyter Notebook
.pybuilder/ .ipynb_checkpoints
target/
# IPython
# Jupyter Notebook profile_default/
.ipynb_checkpoints ipython_config.py
# IPython # pyenv
profile_default/ # For a library or package, you might want to ignore these files since the code is
ipython_config.py # intended to run in multiple environments; otherwise, check them in:
# .python-version
# pyenv
# For a library or package, you might want to ignore these files since the code is # pipenv
# intended to run in multiple environments; otherwise, check them in: # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# .python-version # However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# pipenv # install all needed dependencies.
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. #Pipfile.lock
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not # poetry
# install all needed dependencies. # Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
#Pipfile.lock # This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# poetry # https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. #poetry.lock
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries. # pdm
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#poetry.lock #pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# pdm # in version control.
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. # https://pdm.fming.dev/#use-with-ide
#pdm.lock .pdm.toml
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control. # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
# https://pdm.fming.dev/#use-with-ide __pypackages__/
.pdm.toml
# Celery stuff
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm celerybeat-schedule
__pypackages__/ celerybeat.pid
# Celery stuff # SageMath parsed files
celerybeat-schedule *.sage.py
celerybeat.pid
# Environments
# SageMath parsed files .env
*.sage.py .venv
env/
# Environments venv/
.env ENV/
.venv env.bak/
env/ venv.bak/
venv/
ENV/ # Spyder project settings
env.bak/ .spyderproject
venv.bak/ .spyproject
# Spyder project settings # Rope project settings
.spyderproject .ropeproject
.spyproject
# mkdocs documentation
# Rope project settings /site
.ropeproject
# mypy
# mkdocs documentation .mypy_cache/
/site .dmypy.json
dmypy.json
# mypy
.mypy_cache/ # Pyre type checker
.dmypy.json .pyre/
dmypy.json
# pytype static type analyzer
# Pyre type checker .pytype/
.pyre/
# Cython debug symbols
# pytype static type analyzer cython_debug/
.pytype/
# PyCharm
# Cython debug symbols # JetBrains specific template is maintained in a separate JetBrains.gitignore that can
cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# PyCharm # option (not recommended) you can uncomment the following to ignore the entire idea folder.
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can #.idea/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # ────────────────────────────────────────────────────────
#.idea/ # ENTORNO VIRTUAL DEL PROYECTO
# ────────────────────────────────────────────────────────
ia_env/
# ────────────────────────────────────────────────────────
# ENTORNO VIRTUAL DEL PROYECTO # ────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────── # MODELOS DE IA (Límite de GitHub: 100 MB)
ia_env/ # ────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────── *.pt
# MODELOS DE IA (Límite de GitHub: 100 MB) *.onnx
# ────────────────────────────────────────────────────────
*.pt
*.onnx

122
README.md
View File

@ -1,62 +1,62 @@
# Sistema de Identificación y Seguimiento Inteligente # Sistema de Identificación y Seguimiento Inteligente
Este repositorio contiene la arquitectura modular para el seguimiento de personas en múltiples cámaras (Re-ID) y reconocimiento facial asíncrono. Este repositorio contiene la arquitectura modular para el seguimiento de personas en múltiples cámaras (Re-ID) y reconocimiento facial asíncrono.
## Arquitectura del Proyecto ## Arquitectura del Proyecto
El sistema está dividido en tres módulos principales para garantizar la separación de responsabilidades: El sistema está dividido en tres módulos principales para garantizar la separación de responsabilidades:
* `seguimiento2.py`: Motor matemático de Tracking (Kalman + YOLO) y Re-Identificación (OSNet). * `seguimiento2.py`: Motor matemático de Tracking (Kalman + YOLO) y Re-Identificación (OSNet).
* `reconocimiento2.py`: Motor de biometría facial (YuNet + ArcFace) y síntesis de audio (Edge-TTS). * `reconocimiento2.py`: Motor de biometría facial (YuNet + ArcFace) y síntesis de audio (Edge-TTS).
* `main_fusion.py`: Orquestador principal que fusiona ambos motores mediante procesamiento multihilo. * `main_fusion.py`: Orquestador principal que fusiona ambos motores mediante procesamiento multihilo.
## Requisitos Previos ## Requisitos Previos
1. **Python 3.8 - 3.11** instalado en el sistema. 1. **Python 3.8 - 3.11** instalado en el sistema.
2. **Reproductor MPV** instalado y agregado al PATH del sistema (requerido para el motor de audio sin bloqueos). 2. **Reproductor MPV** instalado y agregado al PATH del sistema (requerido para el motor de audio sin bloqueos).
* *Windows:* Descargar de la página oficial o usar `scoop install mpv`. * *Windows:* Descargar de la página oficial o usar `scoop install mpv`.
* *Linux:* `sudo apt install mpv` * *Linux:* `sudo apt install mpv`
* *Mac:* `brew install mpv` * *Mac:* `brew install mpv`
## Guía de Instalación Rápida ## Guía de Instalación Rápida
**1. Clonar el repositorio** **1. Clonar el repositorio**
Abre tu terminal y clona este proyecto: Abre tu terminal y clona este proyecto:
```bash ```bash
git clone <URL_DE_TU_REPOSITORIO_GITEA> git clone <URL_DE_TU_REPOSITORIO_GITEA>
cd IdentificacionIA´´´ cd IdentificacionIA´´´
**2. Crear un Entorno Virtual (¡Importante!) **2. Crear un Entorno Virtual (¡Importante!)
Para evitar conflictos de librerías, crea un entorno virtual limpio dentro de la carpeta del proyecto: Para evitar conflictos de librerías, crea un entorno virtual limpio dentro de la carpeta del proyecto:
python -m venv venv python -m venv venv
3. Activar el Entorno Virtual 3. Activar el Entorno Virtual
En Windows: En Windows:
.\venv\Scripts\activate .\venv\Scripts\activate
En Mac/Linux: En Mac/Linux:
source venv/bin/activate source venv/bin/activate
(Sabrás que está activo si ves un (venv) al inicio de tu línea de comandos). (Sabrás que está activo si ves un (venv) al inicio de tu línea de comandos).
4. Instalar Dependencias 4. Instalar Dependencias
Con el entorno activado, instala todas las librerías necesarias: Con el entorno activado, instala todas las librerías necesarias:
pip install -r requirements.txt pip install -r requirements.txt
## Archivos y Carpetas Necesarias ## Archivos y Carpetas Necesarias
yolov8n.pt (Detector de personas) yolov8n.pt (Detector de personas)
osnet_x0_25_msmt17.onnx (Extractor de características de ropa) osnet_x0_25_msmt17.onnx (Extractor de características de ropa)
face_detection_yunet_2023mar.onnx (Detector facial rápido) face_detection_yunet_2023mar.onnx (Detector facial rápido)
Además, debes tener la carpeta db_institucion con las fotografías de los rostros a reconocer. Además, debes tener la carpeta db_institucion con las fotografías de los rostros a reconocer.
## Ejecución ## Ejecución
Para arrancar el sistema completo con interfaz gráfica y audio, ejecuta: Para arrancar el sistema completo con interfaz gráfica y audio, ejecuta:
python main_fusion.py python main_fusion.py

BIN
bus.jpg

Binary file not shown.

Before

Width:  |  Height:  |  Size: 134 KiB

Binary file not shown.

View File

@ -1,26 +0,0 @@
# Base estable
pip install numpy==1.26.4
# OpenCV compatible con numpy 1.x
pip install opencv-python==4.8.1.78
pip install torch==2.1.2 torchvision==0.16.2 torchaudio==2.1.2 --index-url https://download.pytorch.org/whl/cu118
pip install ultralytics --no-deps
pip install opencv-python==4.8.1.78 matplotlib pyyaml scipy requests pillow
pip install tensorflow==2.21
pip install tf-keras
pip install deepface
pip install onnxruntime
pip install edge-tts
pip install numpy pandas
sudo apt install libxcb-xinerama0
sudo apt install fonts-dejavu
QT_DEBUG_PLUGINS=0 python fusion.py
pip cache purge
python -c "import torch; print(torch.cuda.is_available())"
python -c "import torch; print(torch.cuda.get_device_name(0))"

View File

@ -1,13 +0,0 @@
[ESP32]
ip = 192.168.15.128
puerto = 81
[Audio]
duracion_ms = 2000
tono_base = 440
amplitud = 16000
[General]
timeout = 5
reconectar = true

View File

@ -1,78 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script para configurar la bocina ESP32
Permite cambiar IP, puerto, duración, etc.
"""
import sys
import os
# Agregar el proyecto al path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from core.speaker_iot import configurar_ip, mostrar_configuracion
from core.speaker_iot.config import config
def main():
print("\n" + "=" * 50)
print(" 🎵 CONFIGURACIÓN DE BOCINA IoT")
print("=" * 50)
mostrar_configuracion()
print("\n" + "-" * 50)
print("¿Qué deseas configurar?")
print(" 1. Cambiar IP")
print(" 2. Cambiar puerto")
print(" 3. Cambiar duración del audio")
print(" 4. Ver configuración actual")
print(" 5. Restaurar valores por defecto")
print(" 6. Salir")
opcion = input("\n👉 Opción (1-6): ").strip()
if opcion == "1":
nueva_ip = input("📡 Nueva IP: ").strip()
if nueva_ip:
config.actualizar_ip(nueva_ip)
print(f"✅ IP actualizada a: {nueva_ip}")
elif opcion == "2":
nuevo_puerto = input("🔌 Nuevo puerto [81]: ").strip()
if nuevo_puerto:
config.config.set("ESP32", "puerto", nuevo_puerto)
with open(config.CONFIG_FILE, 'w') as f:
config.config.write(f)
print(f"✅ Puerto actualizado a: {nuevo_puerto}")
elif opcion == "3":
nueva_duracion = input("⏱️ Nueva duración en ms [2000]: ").strip()
if nueva_duracion:
config.config.set("Audio", "duracion_ms", nueva_duracion)
with open(config.CONFIG_FILE, 'w') as f:
config.config.write(f)
print(f"✅ Duración actualizada a: {nueva_duracion}ms")
elif opcion == "4":
mostrar_configuracion()
elif opcion == "5":
confirmar = input("⚠️ ¿Restaurar configuración por defecto? (s/n): ").strip().lower()
if confirmar == 's':
config._crear_configuracion_default()
print("✅ Configuración restaurada")
mostrar_configuracion()
elif opcion == "6":
print("\n👋 Hasta luego!")
return
else:
print("❌ Opción inválida")
print("\n✅ Configuración guardada!")
input("\nPresiona Enter para salir...")
if __name__ == "__main__":
main()

View File

@ -1,21 +0,0 @@
"""
Speaker IoT - Módulo para controlar bocina ESP32
"""
from .bocina_core import (
BocinaCore,
saludar,
detener,
obtener_estado,
configurar_ip,
mostrar_configuracion
)
__all__ = [
'BocinaCore',
'saludar',
'detener',
'obtener_estado',
'configurar_ip',
'mostrar_configuracion'
]

View File

@ -1,224 +0,0 @@
"""
BOCINA CORE - Módulo para controlar la bocina ESP32
"""
import asyncio
import websockets
import json
import struct
import math
from typing import Optional, Dict, Any
from .config import config
# ==================== CONSTANTES ====================
CHUNK_SIZE = 1024
SAMPLE_RATE = 16000
# ==================== CLASE PRINCIPAL ====================
class BocinaCore:
"""Clase principal para controlar la bocina ESP32"""
def __init__(self, ip: str = None, puerto: int = None):
"""
Inicializa el controlador de la bocina
Args:
ip: IP del ESP32 (si es None, usa la del archivo de configuración)
puerto: Puerto WebSocket (si es None, usa el del archivo)
"""
self.ip = ip or config.obtener_ip()
self.puerto = puerto or config.obtener_puerto()
self.url = f"ws://{self.ip}:{self.puerto}"
self.duracion_ms = config.obtener_duracion()
self.tono_base = config.obtener_tono_base()
self.amplitud = config.obtener_amplitud()
self.timeout = config.obtener_timeout()
self._websocket = None
self._conectado = False
# ==================== MÉTODOS PÚBLICOS ====================
def saludar(self, nombre: str, duracion_ms: int = None, tono_personalizado: bool = True) -> bool:
"""
Envía un saludo a la bocina
Args:
nombre: Nombre de la persona
duracion_ms: Duración del saludo (None = usa config)
tono_personalizado: Si True, varía el tono según el nombre
"""
duracion = duracion_ms or self.duracion_ms
return self._ejecutar_async(self._saludar_async(nombre, duracion, tono_personalizado))
def detener(self) -> bool:
"""Detiene la reproducción actual"""
return self._ejecutar_async(self._detener_async())
def estado(self) -> Dict[str, Any]:
"""Obtiene el estado actual de la bocina"""
return self._ejecutar_async(self._estado_async())
def ping(self) -> bool:
"""Prueba la conexión con la bocina"""
return self._ejecutar_async(self._ping_async())
def conectar(self) -> bool:
"""Establece conexión manual con la bocina"""
return self._ejecutar_async(self._conectar_async())
def desconectar(self) -> bool:
"""Cierra la conexión con la bocina"""
return self._ejecutar_async(self._desconectar_async())
# ==================== MÉTODOS INTERNOS ====================
def _ejecutar_async(self, corutina):
"""Ejecuta una función asíncrona desde código síncrono"""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
resultado = loop.run_until_complete(corutina)
loop.close()
return resultado
else:
return loop.run_until_complete(corutina)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
resultado = loop.run_until_complete(corutina)
loop.close()
return resultado
async def _conectar_async(self) -> bool:
"""Conexión asíncrona"""
try:
self._websocket = await websockets.connect(self.url, timeout=self.timeout)
await self._websocket.recv()
self._conectado = True
return True
except Exception:
self._conectado = False
return False
async def _desconectar_async(self) -> bool:
"""Desconexión asíncrona"""
if self._websocket:
await self._websocket.close()
self._websocket = None
self._conectado = False
return True
async def _asegurar_conexion(self) -> bool:
"""Asegura que haya una conexión activa"""
if not self._conectado or not self._websocket:
return await self._conectar_async()
return True
async def _saludar_async(self, nombre: str, duracion_ms: int, tono_personalizado: bool) -> bool:
"""Enviar saludo asíncrono"""
if not await self._asegurar_conexion():
return False
try:
audio = self._generar_audio(nombre, duracion_ms, tono_personalizado)
for i in range(0, len(audio), CHUNK_SIZE):
chunk = audio[i:i + CHUNK_SIZE]
await self._websocket.send(chunk)
await asyncio.sleep(0.005)
return True
except Exception:
return False
async def _detener_async(self) -> bool:
"""Detener reproducción asíncrono"""
if not await self._asegurar_conexion():
return False
try:
await self._websocket.send(json.dumps({"cmd": "STOP"}))
await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout)
return True
except Exception:
return False
async def _estado_async(self) -> Dict[str, Any]:
"""Obtener estado asíncrono"""
if not await self._asegurar_conexion():
return {}
try:
await self._websocket.send(json.dumps({"cmd": "STATUS"}))
respuesta = await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout)
return json.loads(respuesta)
except Exception:
return {}
async def _ping_async(self) -> bool:
"""Ping asíncrono"""
if not await self._asegurar_conexion():
return False
try:
await self._websocket.send(json.dumps({"cmd": "PING"}))
respuesta = await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout)
data = json.loads(respuesta)
return data.get("status") == "ok"
except Exception:
return False
def _generar_audio(self, nombre: str, duracion_ms: int, tono_personalizado: bool) -> bytes:
"""Genera audio PCM para el saludo"""
num_muestras = int(SAMPLE_RATE * duracion_ms / 1000)
if tono_personalizado:
frecuencia = self.tono_base + (len(nombre) * 10)
if frecuencia > 800:
frecuencia = 800
else:
frecuencia = self.tono_base
audio = bytearray()
for i in range(num_muestras):
valor = int(self.amplitud * math.sin(2 * math.pi * frecuencia * i / SAMPLE_RATE))
audio.extend(struct.pack('<h', valor))
return bytes(audio)
# ==================== FUNCIONES SIMPLES (API RÁPIDA) ====================
def saludar(nombre: str, ip: str = None) -> bool:
"""Función rápida para saludar a una persona"""
bocina = BocinaCore(ip)
return bocina.saludar(nombre)
def detener(ip: str = None) -> bool:
"""Detiene la reproducción"""
bocina = BocinaCore(ip)
return bocina.detener()
def obtener_estado(ip: str = None) -> dict:
"""Obtiene el estado de la bocina"""
bocina = BocinaCore(ip)
return bocina.estado()
def configurar_ip(nueva_ip: str):
"""Actualiza la IP en el archivo de configuración"""
from .config import config
config.actualizar_ip(nueva_ip)
def mostrar_configuracion():
"""Muestra la configuración actual"""
from .config import config
config.mostrar_configuracion()

View File

@ -1,108 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Módulo de configuración para la bocina ESP32
"""
import os
import configparser
from pathlib import Path
# ==================== RUTAS ====================
BASE_DIR = Path(__file__).resolve().parent.parent.parent
CONFIG_DIR = BASE_DIR / "config" / "speaker_iot"
CONFIG_FILE = CONFIG_DIR / "settings.ini"
# ==================== CONFIGURACIÓN POR DEFECTO ====================
DEFAULT_CONFIG = {
"ESP32": {
"ip": "192.168.15.128",
"puerto": "81"
},
"Audio": {
"duracion_ms": "2000",
"tono_base": "440",
"amplitud": "16000"
},
"General": {
"timeout": "5",
"reconectar": "true"
}
}
class ConfiguracionBocina:
"""Gestor de configuración para la bocina"""
def __init__(self):
self.config = configparser.ConfigParser()
self._cargar_configuracion()
def _cargar_configuracion(self):
"""Carga la configuración desde el archivo o crea uno por defecto"""
if CONFIG_FILE.exists():
self.config.read(CONFIG_FILE, encoding='utf-8')
else:
self._crear_configuracion_default()
def _crear_configuracion_default(self):
"""Crea archivo de configuración por defecto"""
# Crear directorio si no existe
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
# Cargar valores por defecto
for seccion, valores in DEFAULT_CONFIG.items():
self.config[seccion] = valores
# Guardar archivo
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
self.config.write(f)
print(f"📝 Configuración creada en: {CONFIG_FILE}")
def obtener_ip(self) -> str:
"""Obtiene la IP de la bocina"""
return self.config.get("ESP32", "ip", fallback="192.168.15.128")
def obtener_puerto(self) -> int:
"""Obtiene el puerto de la bocina"""
return self.config.getint("ESP32", "puerto", fallback=81)
def obtener_duracion(self) -> int:
"""Obtiene duración del audio en ms"""
return self.config.getint("Audio", "duracion_ms", fallback=2000)
def obtener_tono_base(self) -> int:
"""Obtiene frecuencia base del tono"""
return self.config.getint("Audio", "tono_base", fallback=440)
def obtener_amplitud(self) -> int:
"""Obtiene amplitud del audio"""
return self.config.getint("Audio", "amplitud", fallback=16000)
def obtener_timeout(self) -> int:
"""Obtiene timeout en segundos"""
return self.config.getint("General", "timeout", fallback=5)
def reconectar_auto(self) -> bool:
"""Obtiene si debe reconectar automáticamente"""
return self.config.getboolean("General", "reconectar", fallback=True)
def actualizar_ip(self, nueva_ip: str):
"""Actualiza la IP de la bocina"""
self.config.set("ESP32", "ip", nueva_ip)
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
self.config.write(f)
print(f"✅ IP actualizada a: {nueva_ip}")
def mostrar_configuracion(self):
"""Muestra la configuración actual"""
print("\n📡 Configuración actual:")
print(f" IP: {self.obtener_ip()}:{self.obtener_puerto()}")
print(f" Duración: {self.obtener_duracion()}ms")
print(f" Tono base: {self.obtener_tono_base()}Hz")
print(f" Timeout: {self.obtener_timeout()}s")
# Instancia global
config = ConfiguracionBocina()

View File

@ -1,60 +0,0 @@
# Solo importas lo que necesitas
from core.speaker_iot import saludar, detener, obtener_estado
# ===== EJEMPLO 1: Cuando detectas una persona =====
def mi_detector():
nombre = "Ana" # Tu IA obtiene el nombre
# Enviar saludo (¡una sola línea!)
saludar(nombre)
# También puedes verificar si funcionó
if saludar(nombre):
print(f"✅ Saludo enviado a {nombre}")
else:
print(f"❌ Error al enviar saludo a {nombre}")
# ===== EJEMPLO 2: Dentro de tu loop principal =====
while True:
persona = detectar_persona() # Tu función de detección
if persona:
nombre = obtener_nombre(persona) # Tu base de datos
saludar(nombre) # Envía el saludo
# ===== EJEMPLO 3: Clase completa =====
class MiSistemaIA:
def __init__(self):
self.bocina_ip = "192.168.15.128" # O usa la del config
def on_persona_detectada(self, persona):
nombre = self.obtener_nombre(persona)
if nombre:
print(f"🎉 Detectada: {nombre}")
saludar(nombre) # ¡Así de simple!
def obtener_nombre(self, persona):
# Tu lógica para obtener nombre
return persona.get("nombre", "Visitante")
# ===== Ejemplo completo de integración =====
class SistemaSeguridad:
def __init__(self):
self.personas_conocidas = ["Ana", "Carlos", "Maria"]
print("✅ Sistema iniciado - Bocina lista")
def detectar(self, nombre):
if nombre in self.personas_conocidas:
print(f"🔔 ¡Bienvenido {nombre}!")
saludar(nombre) # Envía saludo
return True
else:
print(f"⚠️ Persona no registrada: {nombre}")
return False
# Uso
sistema = SistemaSeguridad()
sistema.detectar("Ana") # Reproduce sonido
sistema.detectar("Luis") # No reproduce

View File

@ -1,409 +0,0 @@
"""
BOCINA INTELIGENTE - CLIENTE PYTHON
====================================
Cliente para enviar audio y comandos a la bocina ESP32 via WebSocket
Uso:
python bocina_client.py
(Luego ingresa la IP y el nombre cuando se solicite)
"""
import asyncio
import websockets
import json
import struct
import math
import sys
import time
import os
from typing import Optional
# ==================== LIMPIAR PANTALLA ====================
def limpiar_pantalla():
"""Limpia la consola según el sistema operativo"""
os.system('cls' if os.name == 'nt' else 'clear')
# ==================== CLASE BOCINA ====================
class BocinaInteligente:
"""Cliente para controlar la bocina inteligente ESP32"""
def __init__(self, ip: str, puerto: int = 81):
self.ip = ip
self.puerto = puerto
self.url = f"ws://{ip}:{puerto}"
self.websocket: Optional[websockets.WebSocketClientProtocol] = None
self.chunk_size = 1024
self.timeout = 5
self.conectado = False
async def conectar(self) -> bool:
"""Conectar al ESP32"""
try:
print(f"🔌 Conectando a {self.url}...")
self.websocket = await websockets.connect(self.url)
# Esperar mensaje de bienvenida
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
if data.get("status") == "ok":
print(f"{data.get('msg')}")
self.conectado = True
return True
except Exception as e:
print(f" ❌ Error: {e}")
return False
return False
async def desconectar(self):
"""Cerrar conexión"""
if self.websocket:
await self.websocket.close()
self.conectado = False
print("🔌 Conexión cerrada")
async def ping(self) -> bool:
"""Probar conexión con el ESP32"""
if not self.websocket:
return False
try:
await self.websocket.send(json.dumps({"cmd": "PING"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
return data.get("status") == "ok"
except:
return False
async def obtener_estado(self) -> dict:
"""Obtener estadísticas del ESP32"""
if not self.websocket:
return {}
try:
await self.websocket.send(json.dumps({"cmd": "STATUS"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
return json.loads(response)
except:
return {}
async def detener(self) -> bool:
"""Detener reproducción"""
if not self.websocket:
return False
try:
await self.websocket.send(json.dumps({"cmd": "STOP"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
return data.get("status") == "ok"
except:
return False
async def enviar_audio(self, audio_data: bytes, nombre: str = "audio") -> bool:
"""
Enviar audio al ESP32
Args:
audio_data: Datos de audio en formato PCM (16kHz, 16bits, mono)
nombre: Nombre identificador (para logs)
"""
if not self.websocket:
print("❌ No hay conexión")
return False
total_chunks = (len(audio_data) + self.chunk_size - 1) // self.chunk_size
print(f"📤 Enviando {total_chunks} chunks ({len(audio_data)} bytes) para '{nombre}'")
inicio = time.time()
for i, chunk_start in enumerate(range(0, len(audio_data), self.chunk_size)):
chunk = audio_data[chunk_start:chunk_start + self.chunk_size]
await self.websocket.send(chunk)
# Mostrar progreso cada 10 chunks o al final
if (i + 1) % 10 == 0 or i == total_chunks - 1:
porcentaje = ((i + 1) * 100) // total_chunks
print(f" 📊 Progreso: {porcentaje}% ({i+1}/{total_chunks} chunks)")
# Pequeña pausa para no saturar
await asyncio.sleep(0.005)
elapsed = time.time() - inicio
print(f"✅ Audio enviado en {elapsed:.2f} segundos")
return True
# ==================== GENERADORES DE AUDIO ====================
def generar_tono(frecuencia: int = 440, duracion_ms: int = 2000,
sample_rate: int = 16000, amplitud: int = 16000) -> bytes:
"""
Generar un tono seno en formato PCM
Args:
frecuencia: Frecuencia del tono en Hz
duracion_ms: Duración en milisegundos
sample_rate: Frecuencia de muestreo
amplitud: Amplitud máxima (0-32767)
"""
num_muestras = int(sample_rate * duracion_ms / 1000)
audio = bytearray()
for i in range(num_muestras):
valor = int(amplitud * math.sin(2 * math.pi * frecuencia * i / sample_rate))
audio.extend(struct.pack('<h', valor)) # little-endian, 16 bits
return bytes(audio)
def generar_melodia_bienvenida() -> bytes:
"""Generar melodía de bienvenida (Do-Re-Mi-Fa-Sol)"""
sample_rate = 16000
duracion_nota = 500 # ms por nota
notas = [261, 293, 329, 349, 392, 440] # Do, Re, Mi, Fa, Sol, La
audio = bytearray()
for nota in notas:
nota_audio = generar_tono(nota, duracion_nota, sample_rate, amplitud=12000)
audio.extend(nota_audio)
return bytes(audio)
def generar_saludo_personalizado(nombre: str) -> bytes:
"""
Generar un saludo personalizado (versión simple)
En un caso real, aquí usarías un servicio TTS
"""
# Usar frecuencia diferente según la longitud del nombre
frecuencia_base = 440
frecuencia = frecuencia_base + (len(nombre) * 10)
# Limitar frecuencia máxima
if frecuencia > 800:
frecuencia = 800
return generar_tono(frecuencia, duracion_ms=2000)
# ==================== MENÚ PRINCIPAL ====================
def mostrar_menu():
"""Muestra el menú principal"""
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE - CONTROL")
print("=" * 50)
print("\n📋 Opciones disponibles:")
print(" 1. 🔊 Enviar tono de prueba (440Hz)")
print(" 2. 🎵 Enviar melodía de bienvenida")
print(" 3. 💬 Enviar saludo personalizado")
print(" 4. 📊 Ver estado del ESP32")
print(" 5. 🏓 Probar ping")
print(" 6. 🔇 Detener reproducción")
print(" 7. 🔄 Reconectar")
print(" 8. 🚪 Salir")
print("-" * 50)
# ==================== MODO INTERACTIVO ====================
async def modo_interactivo():
"""Modo interactivo con entrada de IP y nombre"""
# Limpiar pantalla
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE")
print("=" * 50)
# Solicitar IP
print("\n📡 Configuración de conexión:")
ip_default = "192.168.15.128"
ip = input(f" IP del ESP32 [{ip_default}]: ").strip()
if not ip:
ip = ip_default
# Solicitar nombre por defecto para saludos
nombre_default = "Visitante"
nombre = input(f" Nombre por defecto [{nombre_default}]: ").strip()
if not nombre:
nombre = nombre_default
# Crear instancia
bocina = BocinaInteligente(ip)
# Conectar
print("\n🔄 Conectando...")
if not await bocina.conectar():
print("\n❌ No se pudo conectar al ESP32")
print(" Verifica que:")
print(f" 1. La IP {ip} sea correcta")
print(" 2. El ESP32 esté encendido")
print(" 3. Estés en la misma red WiFi")
input("\n Presiona Enter para salir...")
return
print("\n✅ ¡Conectado exitosamente!")
print(f" 📡 IP: {ip}")
print(f" 👤 Nombre: {nombre}")
# Bucle principal
while True:
mostrar_menu()
opcion = input("\n👉 Selecciona una opción (1-8): ").strip()
if opcion == "1":
print("\n🔊 Enviando tono de prueba (440Hz)...")
audio = generar_tono(440, 2000)
await bocina.enviar_audio(audio, "Tono 440Hz")
elif opcion == "2":
print("\n🎵 Enviando melodía de bienvenida...")
audio = generar_melodia_bienvenida()
await bocina.enviar_audio(audio, "Melodía")
elif opcion == "3":
# Pedir nombre específico para este saludo
nombre_saludo = input(f" 👤 Nombre (Enter para usar '{nombre}'): ").strip()
if not nombre_saludo:
nombre_saludo = nombre
print(f"\n🔊 Generando saludo para '{nombre_saludo}'...")
audio = generar_saludo_personalizado(nombre_saludo)
await bocina.enviar_audio(audio, nombre_saludo)
elif opcion == "4":
print("\n📊 Obteniendo estado del ESP32...")
estado = await bocina.obtener_estado()
if estado:
print("\n 📡 Estado del sistema:")
print(f" Status: {estado.get('status', 'desconocido')}")
print(f" 📦 Bytes recibidos: {estado.get('bytes_recibidos', 0)}")
print(f" 🔢 Chunks recibidos: {estado.get('chunks_recibidos', 0)}")
print(f" 🎵 Audio activo: {'✅ Sí' if estado.get('audio_activo') else '❌ No'}")
print(f" 📶 WiFi RSSI: {estado.get('wifi_rssi', 0)} dBm")
else:
print(" ❌ No se pudo obtener estado")
elif opcion == "5":
print("\n🏓 Probando ping...")
inicio = time.time()
if await bocina.ping():
latencia = (time.time() - inicio) * 1000
print(f" ✅ PONG recibido (latencia: {latencia:.0f}ms)")
else:
print(" ❌ Sin respuesta - verifica la conexión")
elif opcion == "6":
print("\n🔇 Deteniendo reproducción...")
if await bocina.detener():
print(" ✅ Reproducción detenida")
else:
print(" ⚠️ No se pudo detener o ya estaba detenido")
elif opcion == "7":
print("\n🔄 Reconectando...")
await bocina.desconectar()
await asyncio.sleep(1)
if await bocina.conectar():
print(" ✅ Reconectado exitosamente")
else:
print(" ❌ Error al reconectar")
elif opcion == "8":
print("\n👋 Saliendo...")
break
else:
print("❌ Opción inválida")
# Pequeña pausa antes de volver al menú
await asyncio.sleep(0.5)
# Cerrar conexión
await bocina.desconectar()
print("\n✅ Programa finalizado")
# ==================== MODO RÁPIDO ====================
async def modo_rapido():
"""Modo rápido: pide IP y nombre y envía saludo inmediato"""
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE - MODO RÁPIDO")
print("=" * 50)
# Solicitar IP
print("\n📡 Configuración:")
ip_default = "192.168.15.128"
ip = input(f" IP del ESP32 [{ip_default}]: ").strip()
if not ip:
ip = ip_default
# Solicitar nombre
nombre = input(" 👤 Nombre de la persona: ").strip()
if not nombre:
nombre = "Visitante"
# Conectar y enviar
bocina = BocinaInteligente(ip)
print("\n🔄 Conectando...")
if not await bocina.conectar():
print("❌ No se pudo conectar")
input("\nPresiona Enter para salir...")
return
print(f"\n🔊 Enviando saludo para '{nombre}'...")
audio = generar_saludo_personalizado(nombre)
await bocina.enviar_audio(audio, nombre)
# Mostrar estado
await asyncio.sleep(1)
estado = await bocina.obtener_estado()
if estado:
print(f"\n📊 Enviados: {estado.get('bytes_recibidos', 0)} bytes")
await bocina.desconectar()
print("\n✅ Saludo enviado!")
input("\nPresiona Enter para salir...")
# ==================== MAIN ====================
async def main():
"""Función principal"""
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE v1.0")
print("=" * 50)
print("\nSelecciona modo de operación:")
print(" 1. 🎮 Modo interactivo (menú completo)")
print(" 2. ⚡ Modo rápido (solo enviar saludo)")
print(" 3. 🚪 Salir")
modo = input("\n👉 Opción (1-3): ").strip()
if modo == "1":
await modo_interactivo()
elif modo == "2":
await modo_rapido()
else:
print("\n👋 Hasta luego!")
return
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n👋 Programa interrumpido")
except Exception as e:
print(f"\n❌ Error: {e}")
input("\nPresiona Enter para salir...")

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.7 KiB

BIN
db_institucion/Ian Axel.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.0 KiB

View File

@ -10,9 +10,6 @@ from queue import Queue
from deepface import DeepFace from deepface import DeepFace
from ultralytics import YOLO from ultralytics import YOLO
import warnings import warnings
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Usando dispositivo: {device}")
warnings.filterwarnings("ignore") warnings.filterwarnings("ignore")
@ -298,7 +295,7 @@ def dibujar_track_fusion(frame_show, trk, global_mem):
def main(): def main():
print("\nIniciando Sistema") print("\nIniciando Sistema")
model = YOLO("yolov8n.pt").to("cuda") model = YOLO("yolov8n.pt")
global_mem = GlobalMemory() global_mem = GlobalMemory()
managers = {str(c): CamManager(c, global_mem) for c in SECUENCIA} managers = {str(c): CamManager(c, global_mem) for c in SECUENCIA}
cams = [CamStream(u) for u in URLS] cams = [CamStream(u) for u in URLS]
@ -306,7 +303,7 @@ def main():
for _ in range(2): for _ in range(2):
threading.Thread(target=worker_rostros, args=(global_mem,), daemon=True).start() threading.Thread(target=worker_rostros, args=(global_mem,), daemon=True).start()
cv2.namedWindow("SmartSoft", cv2.WINDOW_NORMAL) cv2.namedWindow("SmartSoft", cv2.WINDOW_AUTOSIZE)
idx = 0 idx = 0
while True: while True:

View File

@ -1,110 +1,110 @@
import cv2 import cv2
import os import os
import json import json
import numpy as np import numpy as np
# ⚡ Importamos tus motores exactos para no romper la simetría # ⚡ Importamos tus motores exactos para no romper la simetría
from reconocimiento2 import detectar_rostros_yunet, gestionar_vectores from reconocimiento2 import detectar_rostros_yunet, gestionar_vectores
def registrar_desde_webcam(): def registrar_desde_webcam():
print("\n" + "="*50) print("\n" + "="*50)
print("📸 MÓDULO DE REGISTRO LIMPIO (WEBCAM LOCAL)") print("📸 MÓDULO DE REGISTRO LIMPIO (WEBCAM LOCAL)")
print("Alinea tu rostro, mira a la cámara con buena luz.") print("Alinea tu rostro, mira a la cámara con buena luz.")
print("Presiona [R] para capturar | [Q] para salir") print("Presiona [R] para capturar | [Q] para salir")
print("="*50 + "\n") print("="*50 + "\n")
DB_PATH = "db_institucion" DB_PATH = "db_institucion"
CACHE_PATH = "cache_nombres" CACHE_PATH = "cache_nombres"
os.makedirs(DB_PATH, exist_ok=True) os.makedirs(DB_PATH, exist_ok=True)
os.makedirs(CACHE_PATH, exist_ok=True) os.makedirs(CACHE_PATH, exist_ok=True)
# 0 es la cámara por defecto de tu laptop # 0 es la cámara por defecto de tu laptop
cap = cv2.VideoCapture(0) cap = cv2.VideoCapture(0)
if not cap.isOpened(): if not cap.isOpened():
print("[!] Error: No se pudo abrir la webcam local.") print("[!] Error: No se pudo abrir la webcam local.")
return return
while True: while True:
ret, frame = cap.read() ret, frame = cap.read()
if not ret: continue if not ret: continue
# Espejamos la imagen para que actúe como un espejo natural # Espejamos la imagen para que actúe como un espejo natural
frame = cv2.flip(frame, 1) frame = cv2.flip(frame, 1)
display_frame = frame.copy() display_frame = frame.copy()
# Usamos YuNet para garantizar que estamos capturando una cara válida # Usamos YuNet para garantizar que estamos capturando una cara válida
faces = detectar_rostros_yunet(frame) faces = detectar_rostros_yunet(frame)
mejor_rostro = None mejor_rostro = None
max_area = 0 max_area = 0
for (fx, fy, fw, fh, score) in faces: for (fx, fy, fw, fh, score) in faces:
area = fw * fh area = fw * fh
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (0, 255, 0), 2) cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (0, 255, 0), 2)
if area > max_area: if area > max_area:
max_area = area max_area = area
h_frame, w_frame = frame.shape[:2] h_frame, w_frame = frame.shape[:2]
# Mismo margen del 30% que requiere MTCNN para alinear correctamente # Mismo margen del 30% que requiere MTCNN para alinear correctamente
m_x, m_y = int(fw * 0.30), int(fh * 0.30) m_x, m_y = int(fw * 0.30), int(fh * 0.30)
y1 = max(0, fy - m_y) y1 = max(0, fy - m_y)
y2 = min(h_frame, fy + fh + m_y) y2 = min(h_frame, fy + fh + m_y)
x1 = max(0, fx - m_x) x1 = max(0, fx - m_x)
x2 = min(w_frame, fx + fw + m_x) x2 = min(w_frame, fx + fw + m_x)
mejor_rostro = frame[y1:y2, x1:x2] mejor_rostro = frame[y1:y2, x1:x2]
cv2.putText(display_frame, "Alineate y presiona [R]", (10, 30), cv2.putText(display_frame, "Alineate y presiona [R]", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.imshow("Registro Webcam Local", display_frame) cv2.imshow("Registro Webcam Local", display_frame)
key = cv2.waitKey(1) & 0xFF key = cv2.waitKey(1) & 0xFF
if key == ord('q'): if key == ord('q'):
break break
elif key == ord('r'): elif key == ord('r'):
if mejor_rostro is not None and mejor_rostro.size > 0: if mejor_rostro is not None and mejor_rostro.size > 0:
cv2.imshow("Captura Congelada", mejor_rostro) cv2.imshow("Captura Congelada", mejor_rostro)
cv2.waitKey(1) cv2.waitKey(1)
print("\n--- NUEVO REGISTRO ---") print("\n--- NUEVO REGISTRO ---")
nom = input("Escribe el nombre exacto de la persona: ").strip() nom = input("Escribe el nombre exacto de la persona: ").strip()
if nom: if nom:
gen_input = input("¿Es Hombre (h) o Mujer (m)?: ").strip().lower() gen_input = input("¿Es Hombre (h) o Mujer (m)?: ").strip().lower()
genero_guardado = "Woman" if gen_input == 'm' else "Man" genero_guardado = "Woman" if gen_input == 'm' else "Man"
# 1. Guardamos la foto pura # 1. Guardamos la foto pura
foto_path = os.path.join(DB_PATH, f"{nom}.jpg") foto_path = os.path.join(DB_PATH, f"{nom}.jpg")
cv2.imwrite(foto_path, mejor_rostro) cv2.imwrite(foto_path, mejor_rostro)
# 2. Actualizamos el caché de géneros sin usar IA # 2. Actualizamos el caché de géneros sin usar IA
ruta_generos = os.path.join(CACHE_PATH, "generos.json") ruta_generos = os.path.join(CACHE_PATH, "generos.json")
dic_generos = {} dic_generos = {}
if os.path.exists(ruta_generos): if os.path.exists(ruta_generos):
try: try:
with open(ruta_generos, 'r') as f: with open(ruta_generos, 'r') as f:
dic_generos = json.load(f) dic_generos = json.load(f)
except Exception: pass except Exception: pass
dic_generos[nom] = genero_guardado dic_generos[nom] = genero_guardado
with open(ruta_generos, 'w') as f: with open(ruta_generos, 'w') as f:
json.dump(dic_generos, f) json.dump(dic_generos, f)
print(f"\n[OK] Foto guardada. Generando punto de gravedad matemático...") print(f"\n[OK] Foto guardada. Generando punto de gravedad matemático...")
# 3. Forzamos la creación del vector en la base de datos # 3. Forzamos la creación del vector en la base de datos
gestionar_vectores(actualizar=True) gestionar_vectores(actualizar=True)
print(" Registro inyectado exitosamente en el sistema principal.") print(" Registro inyectado exitosamente en el sistema principal.")
else: else:
print("[!] Registro cancelado por nombre vacío.") print("[!] Registro cancelado por nombre vacío.")
cv2.destroyWindow("Captura Congelada") cv2.destroyWindow("Captura Congelada")
else: else:
print("[!] No se detectó ningún rostro claro. Acércate más a la luz.") print("[!] No se detectó ningún rostro claro. Acércate más a la luz.")
cap.release() cap.release()
cv2.destroyAllWindows() cv2.destroyAllWindows()
if __name__ == "__main__": if __name__ == "__main__":
registrar_desde_webcam() registrar_desde_webcam()

View File

@ -1,43 +1,43 @@
import cv2 import cv2
import time import time
from ultralytics import YOLO from ultralytics import YOLO
from seguimiento2 import GlobalMemory, CamManager, dibujar_track from seguimiento2 import GlobalMemory, CamManager, dibujar_track
def test_video(video_path): def test_video(video_path):
print(f"Iniciando Benchmark de Video: {video_path}") print(f"Iniciando Benchmark de Video: {video_path}")
model = YOLO("yolov8n.pt") model = YOLO("yolov8n.pt")
global_mem = GlobalMemory() global_mem = GlobalMemory()
manager = CamManager("TEST_CAM", global_mem) manager = CamManager("TEST_CAM", global_mem)
cap = cv2.VideoCapture(video_path) cap = cv2.VideoCapture(video_path)
cv2.namedWindow("Benchmark TT", cv2.WINDOW_AUTOSIZE) cv2.namedWindow("Benchmark TT", cv2.WINDOW_AUTOSIZE)
while cap.isOpened(): while cap.isOpened():
ret, frame = cap.read() ret, frame = cap.read()
if not ret: break if not ret: break
now = time.time() now = time.time()
frame_show = cv2.resize(frame, (480, 270)) frame_show = cv2.resize(frame, (480, 270))
# Inferencia frame por frame sin hilos (sincrónico) # Inferencia frame por frame sin hilos (sincrónico)
res = model.predict(frame_show, conf=0.40, iou=0.50, classes=[0], verbose=False, imgsz=480, device='cpu') res = model.predict(frame_show, conf=0.40, iou=0.50, classes=[0], verbose=False, imgsz=480, device='cpu')
boxes = res[0].boxes.xyxy.cpu().numpy().tolist() if res[0].boxes else [] boxes = res[0].boxes.xyxy.cpu().numpy().tolist() if res[0].boxes else []
tracks = manager.update(boxes, frame_show, now, turno_activo=True) tracks = manager.update(boxes, frame_show, now, turno_activo=True)
for trk in tracks: for trk in tracks:
if trk.time_since_update == 0: if trk.time_since_update == 0:
dibujar_track(frame_show, trk) dibujar_track(frame_show, trk)
cv2.imshow("Benchmark TT", frame_show) cv2.imshow("Benchmark TT", frame_show)
# Si presionas espacio se pausa, con 'q' sales # Si presionas espacio se pausa, con 'q' sales
key = cv2.waitKey(30) & 0xFF key = cv2.waitKey(30) & 0xFF
if key == ord('q'): break if key == ord('q'): break
elif key == ord(' '): cv2.waitKey(-1) elif key == ord(' '): cv2.waitKey(-1)
cap.release() cap.release()
cv2.destroyAllWindows() cv2.destroyAllWindows()
if __name__ == "__main__": if __name__ == "__main__":
test_video("video.mp4") # Pon aquí el nombre de tu video test_video("video.mp4") # Pon aquí el nombre de tu video

View File

@ -1,473 +1,465 @@
import os import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['CUDA_VISIBLE_DEVICES'] = '-1' os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import cv2 import cv2
import numpy as np import numpy as np
from deepface import DeepFace from deepface import DeepFace
import pickle import pickle
import time import time
import threading import threading
import asyncio import asyncio
import edge_tts import edge_tts
import subprocess import subprocess
from datetime import datetime from datetime import datetime
import warnings import warnings
import urllib.request import urllib.request
import torch
warnings.filterwarnings("ignore")
if torch.cuda.is_available():
device = "cuda" # ──────────────────────────────────────────────────────────────────────────────
print("GPU detectada → usando GPU 🚀") # CONFIGURACIÓN
else: # ──────────────────────────────────────────────────────────────────────────────
device = "cpu" DB_PATH = "db_institucion"
print("GPU no disponible → usando CPU ⚠️") CACHE_PATH = "cache_nombres"
VECTORS_FILE = "base_datos_rostros.pkl"
warnings.filterwarnings("ignore") TIMESTAMPS_FILE = "representaciones_timestamps.pkl"
UMBRAL_SIM = 0.42 # Por encima → identificado. Por debajo → desconocido.
# ────────────────────────────────────────────────────────────────────────────── COOLDOWN_TIME = 15 # Segundos entre saludos
# CONFIGURACIÓN
# ────────────────────────────────────────────────────────────────────────────── USUARIO, PASSWORD, IP_DVR = "admin", "TCA200503", "192.168.1.244"
DB_PATH = "db_institucion" RTSP_URL = f"rtsp://{USUARIO}:{PASSWORD}@{IP_DVR}:554/Streaming/Channels/702"
CACHE_PATH = "cache_nombres"
VECTORS_FILE = "base_datos_rostros.pkl" for path in [DB_PATH, CACHE_PATH]:
TIMESTAMPS_FILE = "representaciones_timestamps.pkl" os.makedirs(path, exist_ok=True)
UMBRAL_SIM = 0.42 # Por encima → identificado. Por debajo → desconocido.
COOLDOWN_TIME = 15 # Segundos entre saludos # ──────────────────────────────────────────────────────────────────────────────
# YUNET — Detector facial rápido en CPU
USUARIO, PASSWORD, IP_DVR = "admin", "TCA200503", "192.168.1.244" # ──────────────────────────────────────────────────────────────────────────────
RTSP_URL = f"rtsp://{USUARIO}:{PASSWORD}@{IP_DVR}:554/Streaming/Channels/702" YUNET_MODEL_PATH = "face_detection_yunet_2023mar.onnx"
for path in [DB_PATH, CACHE_PATH]: if not os.path.exists(YUNET_MODEL_PATH):
os.makedirs(path, exist_ok=True) print(f"Descargando YuNet ({YUNET_MODEL_PATH})...")
url = ("https://github.com/opencv/opencv_zoo/raw/main/models/"
# ────────────────────────────────────────────────────────────────────────────── "face_detection_yunet/face_detection_yunet_2023mar.onnx")
# YUNET — Detector facial rápido en CPU urllib.request.urlretrieve(url, YUNET_MODEL_PATH)
# ────────────────────────────────────────────────────────────────────────────── print("YuNet descargado.")
YUNET_MODEL_PATH = "face_detection_yunet_2023mar.onnx"
# Detector estricto para ROIs grandes (persona cerca)
if not os.path.exists(YUNET_MODEL_PATH): detector_yunet = cv2.FaceDetectorYN.create(
print(f"Descargando YuNet ({YUNET_MODEL_PATH})...") model=YUNET_MODEL_PATH, config="",
url = ("https://github.com/opencv/opencv_zoo/raw/main/models/" input_size=(320, 320),
"face_detection_yunet/face_detection_yunet_2023mar.onnx") score_threshold=0.70,
urllib.request.urlretrieve(url, YUNET_MODEL_PATH) nms_threshold=0.3,
print("YuNet descargado.") top_k=5000
)
# Detector estricto para ROIs grandes (persona cerca)
detector_yunet = cv2.FaceDetectorYN.create( # Detector permisivo para ROIs pequeños (persona lejos)
model=YUNET_MODEL_PATH, config="", detector_yunet_lejano = cv2.FaceDetectorYN.create(
input_size=(320, 320), model=YUNET_MODEL_PATH, config="",
score_threshold=0.70, input_size=(320, 320),
nms_threshold=0.3, score_threshold=0.45,
top_k=5000 nms_threshold=0.3,
) top_k=5000
)
# Detector permisivo para ROIs pequeños (persona lejos)
detector_yunet_lejano = cv2.FaceDetectorYN.create( def detectar_rostros_yunet(roi, lock=None):
model=YUNET_MODEL_PATH, config="", """
input_size=(320, 320), Elige automáticamente el detector según el tamaño del ROI.
score_threshold=0.45, """
nms_threshold=0.3, h_roi, w_roi = roi.shape[:2]
top_k=5000 area = w_roi * h_roi
) det = detector_yunet if area > 8000 else detector_yunet_lejano
def detectar_rostros_yunet(roi, lock=None): try:
""" if lock:
Elige automáticamente el detector según el tamaño del ROI. with lock:
""" det.setInputSize((w_roi, h_roi))
h_roi, w_roi = roi.shape[:2] _, faces = det.detect(roi)
area = w_roi * h_roi else:
det = detector_yunet if area > 8000 else detector_yunet_lejano det.setInputSize((w_roi, h_roi))
_, faces = det.detect(roi)
try: except Exception:
if lock: return []
with lock:
det.setInputSize((w_roi, h_roi)) if faces is None:
_, faces = det.detect(roi) return []
else:
det.setInputSize((w_roi, h_roi)) resultado = []
_, faces = det.detect(roi) for face in faces:
except Exception: try:
return [] fx, fy, fw, fh = map(int, face[:4])
score = float(face[14]) if len(face) > 14 else 1.0
if faces is None: resultado.append((fx, fy, fw, fh, score))
return [] except (ValueError, OverflowError, TypeError):
continue
resultado = [] return resultado
for face in faces:
try:
fx, fy, fw, fh = map(int, face[:4]) # ──────────────────────────────────────────────────────────────────────────────
score = float(face[14]) if len(face) > 14 else 1.0 # SISTEMA DE AUDIO
resultado.append((fx, fy, fw, fh, score)) # ──────────────────────────────────────────────────────────────────────────────
except (ValueError, OverflowError, TypeError): def obtener_audios_humanos(genero):
continue hora = datetime.now().hour
return resultado es_mujer = genero.lower() == 'woman'
suffix = "_m.mp3" if es_mujer else "_h.mp3"
if 5 <= hora < 12:
# ────────────────────────────────────────────────────────────────────────────── intro = "dias.mp3"
# SISTEMA DE AUDIO elif 12 <= hora < 19:
# ────────────────────────────────────────────────────────────────────────────── intro = "tarde.mp3"
def obtener_audios_humanos(genero): else:
hora = datetime.now().hour intro = "noches.mp3"
es_mujer = genero.lower() == 'woman' cierre = ("fin_noche" if (hora >= 19 or hora < 5) else "fin_dia") + suffix
suffix = "_m.mp3" if es_mujer else "_h.mp3" return intro, cierre
if 5 <= hora < 12:
intro = "dias.mp3"
elif 12 <= hora < 19: async def sintetizar_nombre(nombre, ruta):
intro = "tarde.mp3" nombre_limpio = nombre.replace('_', ' ')
else: try:
intro = "noches.mp3" comunicador = edge_tts.Communicate(nombre_limpio, "es-MX-DaliaNeural", rate="+10%")
cierre = ("fin_noche" if (hora >= 19 or hora < 5) else "fin_dia") + suffix await comunicador.save(ruta)
return intro, cierre except Exception:
pass
async def sintetizar_nombre(nombre, ruta):
nombre_limpio = nombre.replace('_', ' ') def reproducir(archivo):
try: if os.path.exists(archivo):
comunicador = edge_tts.Communicate(nombre_limpio, "es-MX-DaliaNeural", rate="+10%") subprocess.Popen(
await comunicador.save(ruta) ["mpv", "--no-video", "--volume=100", archivo],
except Exception: stdout=subprocess.DEVNULL,
pass stderr=subprocess.DEVNULL
)
def reproducir(archivo):
if os.path.exists(archivo): def hilo_bienvenida(nombre, genero):
subprocess.Popen( archivo_nombre = os.path.join(CACHE_PATH, f"nombre_{nombre}.mp3")
["mpv", "--no-video", "--volume=100", archivo],
stdout=subprocess.DEVNULL, if not os.path.exists(archivo_nombre):
stderr=subprocess.DEVNULL try:
) asyncio.run(sintetizar_nombre(nombre, archivo_nombre))
except Exception:
pass
def hilo_bienvenida(nombre, genero):
archivo_nombre = os.path.join(CACHE_PATH, f"nombre_{nombre}.mp3") intro, cierre = obtener_audios_humanos(genero)
if not os.path.exists(archivo_nombre): archivos = [f for f in [intro, archivo_nombre, cierre] if os.path.exists(f)]
try: if archivos:
asyncio.run(sintetizar_nombre(nombre, archivo_nombre)) subprocess.Popen(
except Exception: ["mpv", "--no-video", "--volume=100"] + archivos,
pass stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
intro, cierre = obtener_audios_humanos(genero) )
archivos = [f for f in [intro, archivo_nombre, cierre] if os.path.exists(f)]
if archivos: # ──────────────────────────────────────────────────────────────────────────────
subprocess.Popen( # GESTIÓN DE BASE DE DATOS (AHORA CON RETINAFACE Y ALINEACIÓN)
["mpv", "--no-video", "--volume=100"] + archivos, # ──────────────────────────────────────────────────────────────────────────────
stdout=subprocess.DEVNULL, def gestionar_vectores(actualizar=False):
stderr=subprocess.DEVNULL import json # ⚡ Asegúrate de tener importado json
)
vectores_actuales = {}
if os.path.exists(VECTORS_FILE):
# ────────────────────────────────────────────────────────────────────────────── try:
# GESTIÓN DE BASE DE DATOS (AHORA CON RETINAFACE Y ALINEACIÓN) with open(VECTORS_FILE, 'rb') as f:
# ────────────────────────────────────────────────────────────────────────────── vectores_actuales = pickle.load(f)
def gestionar_vectores(actualizar=False): except Exception:
import json # ⚡ Asegúrate de tener importado json vectores_actuales = {}
vectores_actuales = {} if not actualizar:
if os.path.exists(VECTORS_FILE): return vectores_actuales
try:
with open(VECTORS_FILE, 'rb') as f: timestamps = {}
vectores_actuales = pickle.load(f) if os.path.exists(TIMESTAMPS_FILE):
except Exception: try:
vectores_actuales = {} with open(TIMESTAMPS_FILE, 'rb') as f:
timestamps = pickle.load(f)
if not actualizar: except Exception:
return vectores_actuales timestamps = {}
timestamps = {} # ──────────────────────────────────────────────────────────
if os.path.exists(TIMESTAMPS_FILE): # CARGA DEL CACHÉ DE GÉNEROS
try: # ──────────────────────────────────────────────────────────
with open(TIMESTAMPS_FILE, 'rb') as f: ruta_generos = os.path.join(CACHE_PATH, "generos.json")
timestamps = pickle.load(f) dic_generos = {}
except Exception: if os.path.exists(ruta_generos):
timestamps = {} try:
with open(ruta_generos, 'r') as f:
# ────────────────────────────────────────────────────────── dic_generos = json.load(f)
# CARGA DEL CACHÉ DE GÉNEROS except Exception:
# ────────────────────────────────────────────────────────── pass
ruta_generos = os.path.join(CACHE_PATH, "generos.json")
dic_generos = {} print("\nACTUALIZANDO BASE DE DATOS (Alineación y Caché de Géneros)...")
if os.path.exists(ruta_generos): imagenes = [f for f in os.listdir(DB_PATH) if f.lower().endswith(('.jpg', '.png'))]
try: nombres_en_disco = set()
with open(ruta_generos, 'r') as f: hubo_cambios = False
dic_generos = json.load(f) cambio_generos = False # Bandera para saber si actualizamos el JSON
except Exception:
pass for archivo in imagenes:
nombre_archivo = os.path.splitext(archivo)[0]
print("\nACTUALIZANDO BASE DE DATOS (Alineación y Caché de Géneros)...") ruta_img = os.path.join(DB_PATH, archivo)
imagenes = [f for f in os.listdir(DB_PATH) if f.lower().endswith(('.jpg', '.png'))] nombres_en_disco.add(nombre_archivo)
nombres_en_disco = set()
hubo_cambios = False ts_actual = os.path.getmtime(ruta_img)
cambio_generos = False # Bandera para saber si actualizamos el JSON ts_guardado = timestamps.get(nombre_archivo, 0)
for archivo in imagenes: # Si ya tenemos el vector pero NO tenemos su género en el JSON, forzamos el procesamiento
nombre_archivo = os.path.splitext(archivo)[0] falta_genero = nombre_archivo not in dic_generos
ruta_img = os.path.join(DB_PATH, archivo)
nombres_en_disco.add(nombre_archivo) if nombre_archivo in vectores_actuales and ts_actual == ts_guardado and not falta_genero:
continue
ts_actual = os.path.getmtime(ruta_img)
ts_guardado = timestamps.get(nombre_archivo, 0) try:
img_db = cv2.imread(ruta_img)
# Si ya tenemos el vector pero NO tenemos su género en el JSON, forzamos el procesamiento lab = cv2.cvtColor(img_db, cv2.COLOR_BGR2LAB)
falta_genero = nombre_archivo not in dic_generos l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
if nombre_archivo in vectores_actuales and ts_actual == ts_guardado and not falta_genero: l = clahe.apply(l)
continue img_mejorada = cv2.cvtColor(cv2.merge((l, a, b)), cv2.COLOR_LAB2BGR)
try: # IA DE GÉNERO (Solo se ejecuta 1 vez por persona en toda la vida del sistema)
img_db = cv2.imread(ruta_img) if falta_genero:
lab = cv2.cvtColor(img_db, cv2.COLOR_BGR2LAB) try:
l, a, b = cv2.split(lab) analisis = DeepFace.analyze(img_mejorada, actions=['gender'], enforce_detection=False)[0]
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) dic_generos[nombre_archivo] = analisis.get('dominant_gender', 'Man')
l = clahe.apply(l) except Exception:
img_mejorada = cv2.cvtColor(cv2.merge((l, a, b)), cv2.COLOR_LAB2BGR) dic_generos[nombre_archivo] = "Man" # Respaldo
cambio_generos = True
# IA DE GÉNERO (Solo se ejecuta 1 vez por persona en toda la vida del sistema)
if falta_genero: # Extraemos el vector
try: res = DeepFace.represent(
analisis = DeepFace.analyze(img_mejorada, actions=['gender'], enforce_detection=False)[0] img_path=img_mejorada,
dic_generos[nombre_archivo] = analisis.get('dominant_gender', 'Man') model_name="ArcFace",
except Exception: detector_backend="mtcnn",
dic_generos[nombre_archivo] = "Man" # Respaldo align=True,
cambio_generos = True enforce_detection=True
)
# Extraemos el vector emb = np.array(res[0]["embedding"], dtype=np.float32)
res = DeepFace.represent(
img_path=img_mejorada, norma = np.linalg.norm(emb)
model_name="ArcFace", if norma > 0:
detector_backend="opencv", emb = emb / norma
align=False,
enforce_detection=True vectores_actuales[nombre_archivo] = emb
) timestamps[nombre_archivo] = ts_actual
emb = np.array(res[0]["embedding"], dtype=np.float32) hubo_cambios = True
print(f" Procesado y alineado: {nombre_archivo} | Género: {dic_generos.get(nombre_archivo)}")
norma = np.linalg.norm(emb)
if norma > 0: except Exception as e:
emb = emb / norma print(f" Rostro no válido en '{archivo}', omitido. Error: {e}")
vectores_actuales[nombre_archivo] = emb # Limpieza de eliminados
timestamps[nombre_archivo] = ts_actual for nombre in list(vectores_actuales.keys()):
hubo_cambios = True if nombre not in nombres_en_disco:
print(f" Procesado y alineado: {nombre_archivo} | Género: {dic_generos.get(nombre_archivo)}") del vectores_actuales[nombre]
timestamps.pop(nombre, None)
except Exception as e: if nombre in dic_generos:
print(f" Rostro no válido en '{archivo}', omitido. Error: {e}") del dic_generos[nombre]
cambio_generos = True
# Limpieza de eliminados hubo_cambios = True
for nombre in list(vectores_actuales.keys()): print(f" Eliminado (sin foto): {nombre}")
if nombre not in nombres_en_disco:
del vectores_actuales[nombre] # Guardado de la memoria
timestamps.pop(nombre, None) if hubo_cambios:
if nombre in dic_generos: with open(VECTORS_FILE, 'wb') as f:
del dic_generos[nombre] pickle.dump(vectores_actuales, f)
cambio_generos = True with open(TIMESTAMPS_FILE, 'wb') as f:
hubo_cambios = True pickle.dump(timestamps, f)
print(f" Eliminado (sin foto): {nombre}")
# Guardado del JSON de géneros si hubo descubrimientos nuevos
# Guardado de la memoria if cambio_generos:
if hubo_cambios: with open(ruta_generos, 'w') as f:
with open(VECTORS_FILE, 'wb') as f: json.dump(dic_generos, f)
pickle.dump(vectores_actuales, f)
with open(TIMESTAMPS_FILE, 'wb') as f: if hubo_cambios or cambio_generos:
pickle.dump(timestamps, f) print(" Sincronización terminada.\n")
else:
# Guardado del JSON de géneros si hubo descubrimientos nuevos print(" Sin cambios. Base de datos al día.\n")
if cambio_generos:
with open(ruta_generos, 'w') as f: return vectores_actuales
json.dump(dic_generos, f)
# ──────────────────────────────────────────────────────────────────────────────
if hubo_cambios or cambio_generos: # BÚSQUEDA BLINDADA (Similitud Coseno estricta)
print(" Sincronización terminada.\n") # ──────────────────────────────────────────────────────────────────────────────
else: def buscar_mejor_match(emb_consulta, base_datos):
print(" Sin cambios. Base de datos al día.\n") # ⚡ MAGIA 3: Normalización L2 del vector entrante
norma = np.linalg.norm(emb_consulta)
return vectores_actuales if norma > 0:
emb_consulta = emb_consulta / norma
# ──────────────────────────────────────────────────────────────────────────────
# BÚSQUEDA BLINDADA (Similitud Coseno estricta) mejor_match, max_sim = None, -1.0
# ────────────────────────────────────────────────────────────────────────────── for nombre, vec in base_datos.items():
def buscar_mejor_match(emb_consulta, base_datos): # Como ambos están normalizados, esto es Similitud Coseno pura (-1.0 a 1.0)
# ⚡ MAGIA 3: Normalización L2 del vector entrante sim = float(np.dot(emb_consulta, vec))
norma = np.linalg.norm(emb_consulta) if sim > max_sim:
if norma > 0: max_sim = sim
emb_consulta = emb_consulta / norma mejor_match = nombre
mejor_match, max_sim = None, -1.0 return mejor_match, max_sim
for nombre, vec in base_datos.items():
# Como ambos están normalizados, esto es Similitud Coseno pura (-1.0 a 1.0) # ──────────────────────────────────────────────────────────────────────────────
sim = float(np.dot(emb_consulta, vec)) # LOOP DE PRUEBA Y REGISTRO (CON SIMETRÍA ESTRICTA)
if sim > max_sim: # ──────────────────────────────────────────────────────────────────────────────
max_sim = sim def sistema_interactivo():
mejor_match = nombre base_datos = gestionar_vectores(actualizar=False)
cap = cv2.VideoCapture(RTSP_URL)
return mejor_match, max_sim ultimo_saludo = 0
persona_actual = None
# ────────────────────────────────────────────────────────────────────────────── confirmaciones = 0
# LOOP DE PRUEBA Y REGISTRO (CON SIMETRÍA ESTRICTA)
# ────────────────────────────────────────────────────────────────────────────── print("\n" + "=" * 50)
def sistema_interactivo(): print(" MÓDULO DE REGISTRO Y DEPURACIÓN ESTRICTO")
base_datos = gestionar_vectores(actualizar=False) print(" [R] Registrar nuevo rostro | [Q] Salir")
cap = cv2.VideoCapture(RTSP_URL) print("=" * 50 + "\n")
ultimo_saludo = 0
persona_actual = None faces_ultimo_frame = []
confirmaciones = 0
while True:
print("\n" + "=" * 50) ret, frame = cap.read()
print(" MÓDULO DE REGISTRO Y DEPURACIÓN ESTRICTO") if not ret:
print(" [R] Registrar nuevo rostro | [Q] Salir") time.sleep(2)
print("=" * 50 + "\n") cap.open(RTSP_URL)
continue
faces_ultimo_frame = []
h, w = frame.shape[:2]
while True: display_frame = frame.copy()
ret, frame = cap.read() tiempo_actual = time.time()
if not ret:
time.sleep(2) faces_raw = detectar_rostros_yunet(frame)
cap.open(RTSP_URL) faces_ultimo_frame = faces_raw
continue
for (fx, fy, fw, fh, score_yunet) in faces_raw:
h, w = frame.shape[:2] fx = max(0, fx); fy = max(0, fy)
display_frame = frame.copy() fw = min(w - fx, fw); fh = min(h - fy, fh)
tiempo_actual = time.time() if fw <= 0 or fh <= 0:
continue
faces_raw = detectar_rostros_yunet(frame)
faces_ultimo_frame = faces_raw cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (255, 200, 0), 2)
cv2.putText(display_frame, f"YN:{score_yunet:.2f}",
for (fx, fy, fw, fh, score_yunet) in faces_raw: (fx, fy - 25), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 200, 0), 1)
fx = max(0, fx); fy = max(0, fy)
fw = min(w - fx, fw); fh = min(h - fy, fh) if (tiempo_actual - ultimo_saludo) <= COOLDOWN_TIME:
if fw <= 0 or fh <= 0: continue
continue
m = int(fw * 0.15)
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (255, 200, 0), 2) roi = frame[max(0, fy-m): min(h, fy+fh+m),
cv2.putText(display_frame, f"YN:{score_yunet:.2f}", max(0, fx-m): min(w, fx+fw+m)]
(fx, fy - 25), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 200, 0), 1)
# 🛡️ FILTRO DE TAMAÑO FÍSICO
if (tiempo_actual - ultimo_saludo) <= COOLDOWN_TIME: if roi.size == 0 or roi.shape[0] < 40 or roi.shape[1] < 40:
continue cv2.putText(display_frame, "muy pequeno",
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 100, 255), 1)
m = int(fw * 0.15) continue
roi = frame[max(0, fy-m): min(h, fy+fh+m),
max(0, fx-m): min(w, fx+fw+m)] # 🛡️ FILTRO DE NITIDEZ
gray_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
# 🛡️ FILTRO DE TAMAÑO FÍSICO nitidez = cv2.Laplacian(gray_roi, cv2.CV_64F).var()
if roi.size == 0 or roi.shape[0] < 40 or roi.shape[1] < 40: if nitidez < 50.0:
cv2.putText(display_frame, "muy pequeno", cv2.putText(display_frame, f"blur({nitidez:.0f})",
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 100, 255), 1) (fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 165, 255), 1)
continue continue
# 🛡️ FILTRO DE NITIDEZ # 🌙 SIMETRÍA 1: VISIÓN NOCTURNA (CLAHE) AL VIDEO EN VIVO
gray_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) try:
nitidez = cv2.Laplacian(gray_roi, cv2.CV_64F).var() lab = cv2.cvtColor(roi, cv2.COLOR_BGR2LAB)
if nitidez < 50.0: l, a, b = cv2.split(lab)
cv2.putText(display_frame, f"blur({nitidez:.0f})", clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 165, 255), 1) l = clahe.apply(l)
continue roi_mejorado = cv2.cvtColor(cv2.merge((l, a, b)), cv2.COLOR_LAB2BGR)
except Exception:
# 🌙 SIMETRÍA 1: VISIÓN NOCTURNA (CLAHE) AL VIDEO EN VIVO roi_mejorado = roi # Respaldo de seguridad
try:
lab = cv2.cvtColor(roi, cv2.COLOR_BGR2LAB) # 🧠 SIMETRÍA 2: MOTOR MTCNN Y ALINEACIÓN (Igual que la Base de Datos)
l, a, b = cv2.split(lab) try:
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) res = DeepFace.represent(
l = clahe.apply(l) img_path=roi_mejorado,
roi_mejorado = cv2.cvtColor(cv2.merge((l, a, b)), cv2.COLOR_LAB2BGR) model_name="ArcFace",
except Exception: detector_backend="mtcnn", # El mismo que en gestionar_vectores
roi_mejorado = roi # Respaldo de seguridad align=True, # Enderezamos la cara
enforce_detection=True # Si MTCNN no ve cara clara, aborta
# 🧠 SIMETRÍA 2: MOTOR MTCNN Y ALINEACIÓN (Igual que la Base de Datos) )
try: emb = np.array(res[0]["embedding"], dtype=np.float32)
res = DeepFace.represent( mejor_match, max_sim = buscar_mejor_match(emb, base_datos)
img_path=roi_mejorado,
model_name="ArcFace", except Exception:
detector_backend="mtcnn", # El mismo que en gestionar_vectores # MTCNN abortó porque la cara estaba de perfil, tapada o no era una cara
align=True, # Enderezamos la cara cv2.putText(display_frame, "MTCNN Ignorado",
enforce_detection=True # Si MTCNN no ve cara clara, aborta (fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
) continue
emb = np.array(res[0]["embedding"], dtype=np.float32)
mejor_match, max_sim = buscar_mejor_match(emb, base_datos) estado = " IDENTIFICADO" if max_sim > UMBRAL_SIM else "DESCONOCIDO"
nombre_d = mejor_match.split('_')[0] if mejor_match else "nadie"
except Exception: n_bloques = int(max_sim * 20)
# MTCNN abortó porque la cara estaba de perfil, tapada o no era una cara barra = "" * n_bloques + "" * (20 - n_bloques)
cv2.putText(display_frame, "MTCNN Ignorado", print(f"[REGISTRO] {estado} | {nombre_d:<14} | {barra} | "
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1) f"{max_sim*100:.1f}% (umbral: {UMBRAL_SIM*100:.0f}%)")
continue
if max_sim > UMBRAL_SIM and mejor_match:
estado = " IDENTIFICADO" if max_sim > UMBRAL_SIM else "DESCONOCIDO" color = (0, 255, 0)
nombre_d = mejor_match.split('_')[0] if mejor_match else "nadie" texto = f"{mejor_match.split('_')[0]} ({max_sim:.2f})"
n_bloques = int(max_sim * 20)
barra = "" * n_bloques + "" * (20 - n_bloques) if mejor_match == persona_actual:
print(f"[REGISTRO] {estado} | {nombre_d:<14} | {barra} | " confirmaciones += 1
f"{max_sim*100:.1f}% (umbral: {UMBRAL_SIM*100:.0f}%)") else:
persona_actual, confirmaciones = mejor_match, 1
if max_sim > UMBRAL_SIM and mejor_match:
color = (0, 255, 0) if confirmaciones >= 1:
texto = f"{mejor_match.split('_')[0]} ({max_sim:.2f})" cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (0, 255, 0), 3)
try:
if mejor_match == persona_actual: analisis = DeepFace.analyze(
confirmaciones += 1 roi_mejorado, actions=['gender'], enforce_detection=False
else: )[0]
persona_actual, confirmaciones = mejor_match, 1 genero = analisis['dominant_gender']
except Exception:
if confirmaciones >= 1: genero = "Man"
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (0, 255, 0), 3)
try: threading.Thread(
analisis = DeepFace.analyze( target=hilo_bienvenida,
roi_mejorado, actions=['gender'], enforce_detection=False args=(mejor_match, genero),
)[0] daemon=True
genero = analisis['dominant_gender'] ).start()
except Exception: ultimo_saludo = tiempo_actual
genero = "Man" confirmaciones = 0
threading.Thread( else:
target=hilo_bienvenida, color = (0, 0, 255)
args=(mejor_match, genero), texto = f"? ({max_sim:.2f})"
daemon=True confirmaciones = max(0, confirmaciones - 1)
).start()
ultimo_saludo = tiempo_actual cv2.putText(display_frame, texto,
confirmaciones = 0 (fx, fy - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
else: cv2.imshow("Módulo de Registro", display_frame)
color = (0, 0, 255) key = cv2.waitKey(1) & 0xFF
texto = f"? ({max_sim:.2f})"
confirmaciones = max(0, confirmaciones - 1) if key == ord('q'):
break
cv2.putText(display_frame, texto,
(fx, fy - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) elif key == ord('r'):
if faces_ultimo_frame:
cv2.imshow("Módulo de Registro", display_frame) areas = [fw * fh for (fx, fy, fw, fh, _) in faces_ultimo_frame]
key = cv2.waitKey(1) & 0xFF fx, fy, fw, fh, _ = faces_ultimo_frame[np.argmax(areas)]
if key == ord('q'): m_x = int(fw * 0.30)
break m_y = int(fh * 0.30)
face_roi = frame[max(0, fy-m_y): min(h, fy+fh+m_y),
elif key == ord('r'): max(0, fx-m_x): min(w, fx+fw+m_x)]
if faces_ultimo_frame:
areas = [fw * fh for (fx, fy, fw, fh, _) in faces_ultimo_frame] if face_roi.size > 0:
fx, fy, fw, fh, _ = faces_ultimo_frame[np.argmax(areas)] nom = input("\nNombre de la persona: ").strip()
if nom:
m_x = int(fw * 0.30) foto_path = os.path.join(DB_PATH, f"{nom}.jpg")
m_y = int(fh * 0.30) cv2.imwrite(foto_path, face_roi)
face_roi = frame[max(0, fy-m_y): min(h, fy+fh+m_y), print(f"[OK] Rostro de '{nom}' guardado. Sincronizando...")
max(0, fx-m_x): min(w, fx+fw+m_x)] base_datos = gestionar_vectores(actualizar=True)
else:
if face_roi.size > 0: print("[!] Registro cancelado.")
nom = input("\nNombre de la persona: ").strip() else:
if nom: print("[!] Recorte vacío. Intenta de nuevo.")
foto_path = os.path.join(DB_PATH, f"{nom}.jpg") else:
cv2.imwrite(foto_path, face_roi) print("\n[!] No se detectó rostro. Acércate más o mira a la lente.")
print(f"[OK] Rostro de '{nom}' guardado. Sincronizando...")
base_datos = gestionar_vectores(actualizar=True) cap.release()
else: cv2.destroyAllWindows()
print("[!] Registro cancelado.")
else: if __name__ == "__main__":
print("[!] Recorte vacío. Intenta de nuevo.") sistema_interactivo()
else:
print("\n[!] No se detectó rostro. Acércate más o mira a la lente.")
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
sistema_interactivo()

View File

@ -1,96 +1,96 @@
absl-py==2.4.0 absl-py==2.4.0
aiohappyeyeballs==2.6.1 aiohappyeyeballs==2.6.1
aiohttp==3.13.3 aiohttp==3.13.3
aiosignal==1.4.0 aiosignal==1.4.0
astunparse==1.6.3 astunparse==1.6.3
attrs==25.4.0 attrs==25.4.0
beautifulsoup4==4.14.3 beautifulsoup4==4.14.3
blinker==1.9.0 blinker==1.9.0
certifi==2026.1.4 certifi==2026.1.4
charset-normalizer==3.4.4 charset-normalizer==3.4.4
click==8.3.1 click==8.3.1
contourpy==1.3.3 contourpy==1.3.3
cycler==0.12.1 cycler==0.12.1
deepface==0.0.98 deepface==0.0.98
edge-tts==7.2.7 edge-tts==7.2.7
filelock==3.20.0 filelock==3.20.0
fire==0.7.1 fire==0.7.1
Flask==3.1.2 Flask==3.1.2
flask-cors==6.0.2 flask-cors==6.0.2
flatbuffers==25.12.19 flatbuffers==25.12.19
fonttools==4.61.1 fonttools==4.61.1
frozenlist==1.8.0 frozenlist==1.8.0
fsspec==2025.12.0 fsspec==2025.12.0
gast==0.7.0 gast==0.7.0
gdown==5.2.1 gdown==5.2.1
google-pasta==0.2.0 google-pasta==0.2.0
grpcio==1.78.0 grpcio==1.78.0
gunicorn==25.0.3 gunicorn==25.0.3
h5py==3.15.1 h5py==3.15.1
idna==3.11 idna==3.11
itsdangerous==2.2.0 itsdangerous==2.2.0
Jinja2==3.1.6 Jinja2==3.1.6
joblib==1.5.3 joblib==1.5.3
keras==3.13.2 keras==3.13.2
kiwisolver==1.4.9 kiwisolver==1.4.9
lap==0.5.12 lap==0.5.12
libclang==18.1.1 libclang==18.1.1
lightdsa==0.0.3 lightdsa==0.0.3
lightecc==0.0.4 lightecc==0.0.4
lightphe==0.0.20 lightphe==0.0.20
lz4==4.4.5 lz4==4.4.5
Markdown==3.10.2 Markdown==3.10.2
markdown-it-py==4.0.0 markdown-it-py==4.0.0
MarkupSafe==2.1.5 MarkupSafe==2.1.5
matplotlib==3.10.8 matplotlib==3.10.8
mdurl==0.1.2 mdurl==0.1.2
ml_dtypes==0.5.4 ml_dtypes==0.5.4
mpmath==1.3.0 mpmath==1.3.0
mtcnn==1.0.0 mtcnn==1.0.0
multidict==6.7.1 multidict==6.7.1
namex==0.1.0 namex==0.1.0
networkx==3.6.1 networkx==3.6.1
numpy==1.26.4 numpy==1.26.4
onnxruntime==1.24.2 onnxruntime==1.24.2
opencv-python==4.11.0.86 opencv-python==4.11.0.86
opt_einsum==3.4.0 opt_einsum==3.4.0
optree==0.18.0 optree==0.18.0
packaging==26.0 packaging==26.0
pandas==3.0.0 pandas==3.0.0
pillow==12.0.0 pillow==12.0.0
polars==1.38.1 polars==1.38.1
polars-runtime-32==1.38.1 polars-runtime-32==1.38.1
propcache==0.4.1 propcache==0.4.1
protobuf==6.33.5 protobuf==6.33.5
psutil==7.2.2 psutil==7.2.2
Pygments==2.19.2 Pygments==2.19.2
pyparsing==3.3.2 pyparsing==3.3.2
PySocks==1.7.1 PySocks==1.7.1
python-dateutil==2.9.0.post0 python-dateutil==2.9.0.post0
python-dotenv==1.2.1 python-dotenv==1.2.1
PyYAML==6.0.3 PyYAML==6.0.3
requests==2.32.5 requests==2.32.5
retina-face==0.0.17 retina-face==0.0.17
rich==14.3.2 rich==14.3.2
scipy==1.17.0 scipy==1.17.0
six==1.17.0 six==1.17.0
soupsieve==2.8.3 soupsieve==2.8.3
sympy==1.14.0 sympy==1.14.0
tabulate==0.9.0 tabulate==0.9.0
tensorboard==2.20.0 tensorboard==2.20.0
tensorboard-data-server==0.7.2 tensorboard-data-server==0.7.2
tensorflow==2.20.0 tensorflow==2.20.0
tensorflow-io-gcs-filesystem==0.37.1 tensorflow-io-gcs-filesystem==0.37.1
termcolor==3.3.0 termcolor==3.3.0
tf_keras==2.20.1 tf_keras==2.20.1
torch==2.10.0+cpu torch==2.10.0+cpu
torchreid==0.2.5 torchreid==0.2.5
torchvision==0.25.0+cpu torchvision==0.25.0+cpu
tqdm==4.67.3 tqdm==4.67.3
typing_extensions==4.15.0 typing_extensions==4.15.0
ultralytics==8.4.14 ultralytics==8.4.14
ultralytics-thop==2.0.18 ultralytics-thop==2.0.18
urllib3==2.6.3 urllib3==2.6.3
Werkzeug==3.1.5 Werkzeug==3.1.5
wrapt==2.1.1 wrapt==2.1.1
yarl==1.22.0 yarl==1.22.0

Binary file not shown.

Before

Width:  |  Height:  |  Size: 357 KiB

File diff suppressed because it is too large Load Diff