Compare commits

..

No commits in common. "master" and "main" have entirely different histories.
master ... main

27 changed files with 2209 additions and 3583 deletions

352
.gitignore vendored
View File

@ -1,178 +1,174 @@
# ---> Python # ---> Python
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
*$py.class *$py.class
ven2/
.venv/ # C extensions
__pycache__/ *.so
*.pkl
# Distribution / packaging
# C extensions .Python
*.so build/
develop-eggs/
# Distribution / packaging dist/
.Python downloads/
build/ eggs/
develop-eggs/ .eggs/
dist/ lib/
downloads/ lib64/
eggs/ parts/
.eggs/ sdist/
lib/ var/
lib64/ wheels/
parts/ share/python-wheels/
sdist/ *.egg-info/
var/ .installed.cfg
wheels/ *.egg
share/python-wheels/ MANIFEST
*.egg-info/
.installed.cfg # PyInstaller
*.egg # Usually these files are written by a python script from a template
MANIFEST # before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
# PyInstaller *.spec
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it. # Installer logs
*.manifest pip-log.txt
*.spec pip-delete-this-directory.txt
# Installer logs # Unit test / coverage reports
pip-log.txt htmlcov/
pip-delete-this-directory.txt .tox/
.nox/
# Unit test / coverage reports .coverage
htmlcov/ .coverage.*
.tox/ .cache
.nox/ nosetests.xml
.coverage coverage.xml
.coverage.* *.cover
.cache *.py,cover
nosetests.xml .hypothesis/
coverage.xml .pytest_cache/
*.cover cover/
*.py,cover
.hypothesis/ # Translations
.pytest_cache/ *.mo
cover/ *.pot
# Translations # Django stuff:
*.mo *.log
*.pot local_settings.py
db.sqlite3
# Django stuff: db.sqlite3-journal
*.log
local_settings.py # Flask stuff:
db.sqlite3 instance/
db.sqlite3-journal .webassets-cache
# Flask stuff: # Scrapy stuff:
instance/ .scrapy
.webassets-cache
# Sphinx documentation
# Scrapy stuff: docs/_build/
.scrapy
# PyBuilder
# Sphinx documentation .pybuilder/
docs/_build/ target/
# PyBuilder # Jupyter Notebook
.pybuilder/ .ipynb_checkpoints
target/
# IPython
# Jupyter Notebook profile_default/
.ipynb_checkpoints ipython_config.py
# IPython # pyenv
profile_default/ # For a library or package, you might want to ignore these files since the code is
ipython_config.py # intended to run in multiple environments; otherwise, check them in:
# .python-version
# pyenv
# For a library or package, you might want to ignore these files since the code is # pipenv
# intended to run in multiple environments; otherwise, check them in: # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# .python-version # However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# pipenv # install all needed dependencies.
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. #Pipfile.lock
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not # poetry
# install all needed dependencies. # Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
#Pipfile.lock # This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# poetry # https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. #poetry.lock
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries. # pdm
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#poetry.lock #pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# pdm # in version control.
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. # https://pdm.fming.dev/#use-with-ide
#pdm.lock .pdm.toml
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control. # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
# https://pdm.fming.dev/#use-with-ide __pypackages__/
.pdm.toml
# Celery stuff
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm celerybeat-schedule
__pypackages__/ celerybeat.pid
# Celery stuff # SageMath parsed files
celerybeat-schedule *.sage.py
celerybeat.pid
# Environments
# SageMath parsed files .env
*.sage.py .venv
env/
# Environments venv/
.env ENV/
.venv env.bak/
env/ venv.bak/
venv/
ENV/ # Spyder project settings
env.bak/ .spyderproject
venv.bak/ .spyproject
# Spyder project settings # Rope project settings
.spyderproject .ropeproject
.spyproject
# mkdocs documentation
# Rope project settings /site
.ropeproject
# mypy
# mkdocs documentation .mypy_cache/
/site .dmypy.json
dmypy.json
# mypy
.mypy_cache/ # Pyre type checker
.dmypy.json .pyre/
dmypy.json
# pytype static type analyzer
# Pyre type checker .pytype/
.pyre/
# Cython debug symbols
# pytype static type analyzer cython_debug/
.pytype/
# PyCharm
# Cython debug symbols # JetBrains specific template is maintained in a separate JetBrains.gitignore that can
cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# PyCharm # option (not recommended) you can uncomment the following to ignore the entire idea folder.
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can #.idea/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # ────────────────────────────────────────────────────────
#.idea/ # ENTORNO VIRTUAL DEL PROYECTO
# ────────────────────────────────────────────────────────
ia_env/
# ────────────────────────────────────────────────────────
# ENTORNO VIRTUAL DEL PROYECTO # ────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────── # MODELOS DE IA (Límite de GitHub: 100 MB)
ia_env/ # ────────────────────────────────────────────────────────
# ──────────────────────────────────────────────────────── *.pt
# MODELOS DE IA (Límite de GitHub: 100 MB) *.onnx
# ────────────────────────────────────────────────────────
*.pt
*.onnx

122
README.md
View File

@ -1,62 +1,62 @@
# Sistema de Identificación y Seguimiento Inteligente # Sistema de Identificación y Seguimiento Inteligente
Este repositorio contiene la arquitectura modular para el seguimiento de personas en múltiples cámaras (Re-ID) y reconocimiento facial asíncrono. Este repositorio contiene la arquitectura modular para el seguimiento de personas en múltiples cámaras (Re-ID) y reconocimiento facial asíncrono.
## Arquitectura del Proyecto ## Arquitectura del Proyecto
El sistema está dividido en tres módulos principales para garantizar la separación de responsabilidades: El sistema está dividido en tres módulos principales para garantizar la separación de responsabilidades:
* `seguimiento2.py`: Motor matemático de Tracking (Kalman + YOLO) y Re-Identificación (OSNet). * `seguimiento2.py`: Motor matemático de Tracking (Kalman + YOLO) y Re-Identificación (OSNet).
* `reconocimiento2.py`: Motor de biometría facial (YuNet + ArcFace) y síntesis de audio (Edge-TTS). * `reconocimiento2.py`: Motor de biometría facial (YuNet + ArcFace) y síntesis de audio (Edge-TTS).
* `main_fusion.py`: Orquestador principal que fusiona ambos motores mediante procesamiento multihilo. * `main_fusion.py`: Orquestador principal que fusiona ambos motores mediante procesamiento multihilo.
## Requisitos Previos ## Requisitos Previos
1. **Python 3.8 - 3.11** instalado en el sistema. 1. **Python 3.8 - 3.11** instalado en el sistema.
2. **Reproductor MPV** instalado y agregado al PATH del sistema (requerido para el motor de audio sin bloqueos). 2. **Reproductor MPV** instalado y agregado al PATH del sistema (requerido para el motor de audio sin bloqueos).
* *Windows:* Descargar de la página oficial o usar `scoop install mpv`. * *Windows:* Descargar de la página oficial o usar `scoop install mpv`.
* *Linux:* `sudo apt install mpv` * *Linux:* `sudo apt install mpv`
* *Mac:* `brew install mpv` * *Mac:* `brew install mpv`
## Guía de Instalación Rápida ## Guía de Instalación Rápida
**1. Clonar el repositorio** **1. Clonar el repositorio**
Abre tu terminal y clona este proyecto: Abre tu terminal y clona este proyecto:
```bash ```bash
git clone <URL_DE_TU_REPOSITORIO_GITEA> git clone <URL_DE_TU_REPOSITORIO_GITEA>
cd IdentificacionIA´´´ cd IdentificacionIA´´´
**2. Crear un Entorno Virtual (¡Importante!) **2. Crear un Entorno Virtual (¡Importante!)
Para evitar conflictos de librerías, crea un entorno virtual limpio dentro de la carpeta del proyecto: Para evitar conflictos de librerías, crea un entorno virtual limpio dentro de la carpeta del proyecto:
python -m venv venv python -m venv venv
3. Activar el Entorno Virtual 3. Activar el Entorno Virtual
En Windows: En Windows:
.\venv\Scripts\activate .\venv\Scripts\activate
En Mac/Linux: En Mac/Linux:
source venv/bin/activate source venv/bin/activate
(Sabrás que está activo si ves un (venv) al inicio de tu línea de comandos). (Sabrás que está activo si ves un (venv) al inicio de tu línea de comandos).
4. Instalar Dependencias 4. Instalar Dependencias
Con el entorno activado, instala todas las librerías necesarias: Con el entorno activado, instala todas las librerías necesarias:
pip install -r requirements.txt pip install -r requirements.txt
## Archivos y Carpetas Necesarias ## Archivos y Carpetas Necesarias
yolov8n.pt (Detector de personas) yolov8n.pt (Detector de personas)
osnet_x0_25_msmt17.onnx (Extractor de características de ropa) osnet_x0_25_msmt17.onnx (Extractor de características de ropa)
face_detection_yunet_2023mar.onnx (Detector facial rápido) face_detection_yunet_2023mar.onnx (Detector facial rápido)
Además, debes tener la carpeta db_institucion con las fotografías de los rostros a reconocer. Además, debes tener la carpeta db_institucion con las fotografías de los rostros a reconocer.
## Ejecución ## Ejecución
Para arrancar el sistema completo con interfaz gráfica y audio, ejecuta: Para arrancar el sistema completo con interfaz gráfica y audio, ejecuta:
python main_fusion.py python main_fusion.py

Binary file not shown.

BIN
bus.jpg

Binary file not shown.

Before

Width:  |  Height:  |  Size: 134 KiB

View File

@ -1 +1 @@
{"Emanuel Flores": "Man", "Vikicar Aldana": "Woman", "Rodrigo Cahuantzi C": "Man", "Cristian Hernandez Suarez": "Man", "Omar": "Man", "Oscar Atriano Ponce_1": "Man", "Miguel Angel": "Man", "Carlos Eduardo Cuamatzi": "Man", "Rosa maria": "Woman", "Ximena": "Woman", "Ana Karen Guerrero": "Woman", "Yuriel": "Man", "Diana Laura Tecpa": "Woman", "aridai montiel zistecatl": "Woman", "Aridai montiel": "Woman", "Vikicar": "Woman", "Ian Axel": "Man", "Rafael": "Man", "Rubisela Barrientos": "Woman", "ian axel": "Man", "Adriana Lopez": "Woman", "Oscar Atriano Ponce": "Man", "Xayli Ximena": "Woman", "Victor Manuel Ocampo Mendez": "Man", "Victor": "Man"} {"Emanuel Flores": "Man", "Vikicar Aldana": "Woman", "Rodrigo Cahuantzi C": "Man", "Cristian Hernandez Suarez": "Man", "Omar": "Man", "Oscar Atriano Ponce_1": "Man", "Miguel Angel": "Man", "Carlos Eduardo Cuamatzi": "Man", "Rosa maria": "Woman", "Ximena": "Woman", "Ana Karen Guerrero": "Woman", "Yuriel": "Man", "Diana Laura": "Woman", "Diana Laura Tecpa": "Woman", "aridai montiel zistecatl": "Woman", "Aridai montiel": "Woman", "Vikicar": "Woman", "Ian Axel": "Man", "Rafael": "Man", "Rubisela Barrientos": "Woman", "ian axel": "Man", "Adriana Lopez": "Woman", "Oscar Atriano Ponce": "Man", "Xayli Ximena": "Woman", "Victor Manuel Ocampo Mendez": "Man", "Victor": "Man"}

Binary file not shown.

View File

@ -1,26 +0,0 @@
# Base estable
pip install numpy==1.26.4
# OpenCV compatible con numpy 1.x
pip install opencv-python==4.8.1.78
pip install torch==2.1.2 torchvision==0.16.2 torchaudio==2.1.2 --index-url https://download.pytorch.org/whl/cu118
pip install ultralytics --no-deps
pip install opencv-python==4.8.1.78 matplotlib pyyaml scipy requests pillow
pip install tensorflow==2.21
pip install tf-keras
pip install deepface
pip install onnxruntime
pip install edge-tts
pip install numpy pandas
sudo apt install libxcb-xinerama0
sudo apt install fonts-dejavu
QT_DEBUG_PLUGINS=0 python fusion.py
pip cache purge
python -c "import torch; print(torch.cuda.is_available())"
python -c "import torch; print(torch.cuda.get_device_name(0))"

View File

@ -1,13 +0,0 @@
[ESP32]
ip = 192.168.15.128
puerto = 81
[Audio]
duracion_ms = 2000
tono_base = 440
amplitud = 16000
[General]
timeout = 5
reconectar = true

View File

@ -1,78 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script para configurar la bocina ESP32
Permite cambiar IP, puerto, duración, etc.
"""
import sys
import os
# Agregar el proyecto al path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from core.speaker_iot import configurar_ip, mostrar_configuracion
from core.speaker_iot.config import config
def main():
print("\n" + "=" * 50)
print(" 🎵 CONFIGURACIÓN DE BOCINA IoT")
print("=" * 50)
mostrar_configuracion()
print("\n" + "-" * 50)
print("¿Qué deseas configurar?")
print(" 1. Cambiar IP")
print(" 2. Cambiar puerto")
print(" 3. Cambiar duración del audio")
print(" 4. Ver configuración actual")
print(" 5. Restaurar valores por defecto")
print(" 6. Salir")
opcion = input("\n👉 Opción (1-6): ").strip()
if opcion == "1":
nueva_ip = input("📡 Nueva IP: ").strip()
if nueva_ip:
config.actualizar_ip(nueva_ip)
print(f"✅ IP actualizada a: {nueva_ip}")
elif opcion == "2":
nuevo_puerto = input("🔌 Nuevo puerto [81]: ").strip()
if nuevo_puerto:
config.config.set("ESP32", "puerto", nuevo_puerto)
with open(config.CONFIG_FILE, 'w') as f:
config.config.write(f)
print(f"✅ Puerto actualizado a: {nuevo_puerto}")
elif opcion == "3":
nueva_duracion = input("⏱️ Nueva duración en ms [2000]: ").strip()
if nueva_duracion:
config.config.set("Audio", "duracion_ms", nueva_duracion)
with open(config.CONFIG_FILE, 'w') as f:
config.config.write(f)
print(f"✅ Duración actualizada a: {nueva_duracion}ms")
elif opcion == "4":
mostrar_configuracion()
elif opcion == "5":
confirmar = input("⚠️ ¿Restaurar configuración por defecto? (s/n): ").strip().lower()
if confirmar == 's':
config._crear_configuracion_default()
print("✅ Configuración restaurada")
mostrar_configuracion()
elif opcion == "6":
print("\n👋 Hasta luego!")
return
else:
print("❌ Opción inválida")
print("\n✅ Configuración guardada!")
input("\nPresiona Enter para salir...")
if __name__ == "__main__":
main()

View File

@ -1,21 +0,0 @@
"""
Speaker IoT - Módulo para controlar bocina ESP32
"""
from .bocina_core import (
BocinaCore,
saludar,
detener,
obtener_estado,
configurar_ip,
mostrar_configuracion
)
__all__ = [
'BocinaCore',
'saludar',
'detener',
'obtener_estado',
'configurar_ip',
'mostrar_configuracion'
]

View File

@ -1,224 +0,0 @@
"""
BOCINA CORE - Módulo para controlar la bocina ESP32
"""
import asyncio
import websockets
import json
import struct
import math
from typing import Optional, Dict, Any
from .config import config
# ==================== CONSTANTES ====================
CHUNK_SIZE = 1024
SAMPLE_RATE = 16000
# ==================== CLASE PRINCIPAL ====================
class BocinaCore:
"""Clase principal para controlar la bocina ESP32"""
def __init__(self, ip: str = None, puerto: int = None):
"""
Inicializa el controlador de la bocina
Args:
ip: IP del ESP32 (si es None, usa la del archivo de configuración)
puerto: Puerto WebSocket (si es None, usa el del archivo)
"""
self.ip = ip or config.obtener_ip()
self.puerto = puerto or config.obtener_puerto()
self.url = f"ws://{self.ip}:{self.puerto}"
self.duracion_ms = config.obtener_duracion()
self.tono_base = config.obtener_tono_base()
self.amplitud = config.obtener_amplitud()
self.timeout = config.obtener_timeout()
self._websocket = None
self._conectado = False
# ==================== MÉTODOS PÚBLICOS ====================
def saludar(self, nombre: str, duracion_ms: int = None, tono_personalizado: bool = True) -> bool:
"""
Envía un saludo a la bocina
Args:
nombre: Nombre de la persona
duracion_ms: Duración del saludo (None = usa config)
tono_personalizado: Si True, varía el tono según el nombre
"""
duracion = duracion_ms or self.duracion_ms
return self._ejecutar_async(self._saludar_async(nombre, duracion, tono_personalizado))
def detener(self) -> bool:
"""Detiene la reproducción actual"""
return self._ejecutar_async(self._detener_async())
def estado(self) -> Dict[str, Any]:
"""Obtiene el estado actual de la bocina"""
return self._ejecutar_async(self._estado_async())
def ping(self) -> bool:
"""Prueba la conexión con la bocina"""
return self._ejecutar_async(self._ping_async())
def conectar(self) -> bool:
"""Establece conexión manual con la bocina"""
return self._ejecutar_async(self._conectar_async())
def desconectar(self) -> bool:
"""Cierra la conexión con la bocina"""
return self._ejecutar_async(self._desconectar_async())
# ==================== MÉTODOS INTERNOS ====================
def _ejecutar_async(self, corutina):
"""Ejecuta una función asíncrona desde código síncrono"""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
resultado = loop.run_until_complete(corutina)
loop.close()
return resultado
else:
return loop.run_until_complete(corutina)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
resultado = loop.run_until_complete(corutina)
loop.close()
return resultado
async def _conectar_async(self) -> bool:
"""Conexión asíncrona"""
try:
self._websocket = await websockets.connect(self.url, timeout=self.timeout)
await self._websocket.recv()
self._conectado = True
return True
except Exception:
self._conectado = False
return False
async def _desconectar_async(self) -> bool:
"""Desconexión asíncrona"""
if self._websocket:
await self._websocket.close()
self._websocket = None
self._conectado = False
return True
async def _asegurar_conexion(self) -> bool:
"""Asegura que haya una conexión activa"""
if not self._conectado or not self._websocket:
return await self._conectar_async()
return True
async def _saludar_async(self, nombre: str, duracion_ms: int, tono_personalizado: bool) -> bool:
"""Enviar saludo asíncrono"""
if not await self._asegurar_conexion():
return False
try:
audio = self._generar_audio(nombre, duracion_ms, tono_personalizado)
for i in range(0, len(audio), CHUNK_SIZE):
chunk = audio[i:i + CHUNK_SIZE]
await self._websocket.send(chunk)
await asyncio.sleep(0.005)
return True
except Exception:
return False
async def _detener_async(self) -> bool:
"""Detener reproducción asíncrono"""
if not await self._asegurar_conexion():
return False
try:
await self._websocket.send(json.dumps({"cmd": "STOP"}))
await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout)
return True
except Exception:
return False
async def _estado_async(self) -> Dict[str, Any]:
"""Obtener estado asíncrono"""
if not await self._asegurar_conexion():
return {}
try:
await self._websocket.send(json.dumps({"cmd": "STATUS"}))
respuesta = await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout)
return json.loads(respuesta)
except Exception:
return {}
async def _ping_async(self) -> bool:
"""Ping asíncrono"""
if not await self._asegurar_conexion():
return False
try:
await self._websocket.send(json.dumps({"cmd": "PING"}))
respuesta = await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout)
data = json.loads(respuesta)
return data.get("status") == "ok"
except Exception:
return False
def _generar_audio(self, nombre: str, duracion_ms: int, tono_personalizado: bool) -> bytes:
"""Genera audio PCM para el saludo"""
num_muestras = int(SAMPLE_RATE * duracion_ms / 1000)
if tono_personalizado:
frecuencia = self.tono_base + (len(nombre) * 10)
if frecuencia > 800:
frecuencia = 800
else:
frecuencia = self.tono_base
audio = bytearray()
for i in range(num_muestras):
valor = int(self.amplitud * math.sin(2 * math.pi * frecuencia * i / SAMPLE_RATE))
audio.extend(struct.pack('<h', valor))
return bytes(audio)
# ==================== FUNCIONES SIMPLES (API RÁPIDA) ====================
def saludar(nombre: str, ip: str = None) -> bool:
"""Función rápida para saludar a una persona"""
bocina = BocinaCore(ip)
return bocina.saludar(nombre)
def detener(ip: str = None) -> bool:
"""Detiene la reproducción"""
bocina = BocinaCore(ip)
return bocina.detener()
def obtener_estado(ip: str = None) -> dict:
"""Obtiene el estado de la bocina"""
bocina = BocinaCore(ip)
return bocina.estado()
def configurar_ip(nueva_ip: str):
"""Actualiza la IP en el archivo de configuración"""
from .config import config
config.actualizar_ip(nueva_ip)
def mostrar_configuracion():
"""Muestra la configuración actual"""
from .config import config
config.mostrar_configuracion()

View File

@ -1,108 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Módulo de configuración para la bocina ESP32
"""
import os
import configparser
from pathlib import Path
# ==================== RUTAS ====================
BASE_DIR = Path(__file__).resolve().parent.parent.parent
CONFIG_DIR = BASE_DIR / "config" / "speaker_iot"
CONFIG_FILE = CONFIG_DIR / "settings.ini"
# ==================== CONFIGURACIÓN POR DEFECTO ====================
DEFAULT_CONFIG = {
"ESP32": {
"ip": "192.168.15.128",
"puerto": "81"
},
"Audio": {
"duracion_ms": "2000",
"tono_base": "440",
"amplitud": "16000"
},
"General": {
"timeout": "5",
"reconectar": "true"
}
}
class ConfiguracionBocina:
"""Gestor de configuración para la bocina"""
def __init__(self):
self.config = configparser.ConfigParser()
self._cargar_configuracion()
def _cargar_configuracion(self):
"""Carga la configuración desde el archivo o crea uno por defecto"""
if CONFIG_FILE.exists():
self.config.read(CONFIG_FILE, encoding='utf-8')
else:
self._crear_configuracion_default()
def _crear_configuracion_default(self):
"""Crea archivo de configuración por defecto"""
# Crear directorio si no existe
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
# Cargar valores por defecto
for seccion, valores in DEFAULT_CONFIG.items():
self.config[seccion] = valores
# Guardar archivo
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
self.config.write(f)
print(f"📝 Configuración creada en: {CONFIG_FILE}")
def obtener_ip(self) -> str:
"""Obtiene la IP de la bocina"""
return self.config.get("ESP32", "ip", fallback="192.168.15.128")
def obtener_puerto(self) -> int:
"""Obtiene el puerto de la bocina"""
return self.config.getint("ESP32", "puerto", fallback=81)
def obtener_duracion(self) -> int:
"""Obtiene duración del audio en ms"""
return self.config.getint("Audio", "duracion_ms", fallback=2000)
def obtener_tono_base(self) -> int:
"""Obtiene frecuencia base del tono"""
return self.config.getint("Audio", "tono_base", fallback=440)
def obtener_amplitud(self) -> int:
"""Obtiene amplitud del audio"""
return self.config.getint("Audio", "amplitud", fallback=16000)
def obtener_timeout(self) -> int:
"""Obtiene timeout en segundos"""
return self.config.getint("General", "timeout", fallback=5)
def reconectar_auto(self) -> bool:
"""Obtiene si debe reconectar automáticamente"""
return self.config.getboolean("General", "reconectar", fallback=True)
def actualizar_ip(self, nueva_ip: str):
"""Actualiza la IP de la bocina"""
self.config.set("ESP32", "ip", nueva_ip)
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
self.config.write(f)
print(f"✅ IP actualizada a: {nueva_ip}")
def mostrar_configuracion(self):
"""Muestra la configuración actual"""
print("\n📡 Configuración actual:")
print(f" IP: {self.obtener_ip()}:{self.obtener_puerto()}")
print(f" Duración: {self.obtener_duracion()}ms")
print(f" Tono base: {self.obtener_tono_base()}Hz")
print(f" Timeout: {self.obtener_timeout()}s")
# Instancia global
config = ConfiguracionBocina()

View File

@ -1,60 +0,0 @@
# Solo importas lo que necesitas
from core.speaker_iot import saludar, detener, obtener_estado
# ===== EJEMPLO 1: Cuando detectas una persona =====
def mi_detector():
nombre = "Ana" # Tu IA obtiene el nombre
# Enviar saludo (¡una sola línea!)
saludar(nombre)
# También puedes verificar si funcionó
if saludar(nombre):
print(f"✅ Saludo enviado a {nombre}")
else:
print(f"❌ Error al enviar saludo a {nombre}")
# ===== EJEMPLO 2: Dentro de tu loop principal =====
while True:
persona = detectar_persona() # Tu función de detección
if persona:
nombre = obtener_nombre(persona) # Tu base de datos
saludar(nombre) # Envía el saludo
# ===== EJEMPLO 3: Clase completa =====
class MiSistemaIA:
def __init__(self):
self.bocina_ip = "192.168.15.128" # O usa la del config
def on_persona_detectada(self, persona):
nombre = self.obtener_nombre(persona)
if nombre:
print(f"🎉 Detectada: {nombre}")
saludar(nombre) # ¡Así de simple!
def obtener_nombre(self, persona):
# Tu lógica para obtener nombre
return persona.get("nombre", "Visitante")
# ===== Ejemplo completo de integración =====
class SistemaSeguridad:
def __init__(self):
self.personas_conocidas = ["Ana", "Carlos", "Maria"]
print("✅ Sistema iniciado - Bocina lista")
def detectar(self, nombre):
if nombre in self.personas_conocidas:
print(f"🔔 ¡Bienvenido {nombre}!")
saludar(nombre) # Envía saludo
return True
else:
print(f"⚠️ Persona no registrada: {nombre}")
return False
# Uso
sistema = SistemaSeguridad()
sistema.detectar("Ana") # Reproduce sonido
sistema.detectar("Luis") # No reproduce

View File

@ -1,409 +0,0 @@
"""
BOCINA INTELIGENTE - CLIENTE PYTHON
====================================
Cliente para enviar audio y comandos a la bocina ESP32 via WebSocket
Uso:
python bocina_client.py
(Luego ingresa la IP y el nombre cuando se solicite)
"""
import asyncio
import websockets
import json
import struct
import math
import sys
import time
import os
from typing import Optional
# ==================== LIMPIAR PANTALLA ====================
def limpiar_pantalla():
"""Limpia la consola según el sistema operativo"""
os.system('cls' if os.name == 'nt' else 'clear')
# ==================== CLASE BOCINA ====================
class BocinaInteligente:
"""Cliente para controlar la bocina inteligente ESP32"""
def __init__(self, ip: str, puerto: int = 81):
self.ip = ip
self.puerto = puerto
self.url = f"ws://{ip}:{puerto}"
self.websocket: Optional[websockets.WebSocketClientProtocol] = None
self.chunk_size = 1024
self.timeout = 5
self.conectado = False
async def conectar(self) -> bool:
"""Conectar al ESP32"""
try:
print(f"🔌 Conectando a {self.url}...")
self.websocket = await websockets.connect(self.url)
# Esperar mensaje de bienvenida
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
if data.get("status") == "ok":
print(f"{data.get('msg')}")
self.conectado = True
return True
except Exception as e:
print(f" ❌ Error: {e}")
return False
return False
async def desconectar(self):
"""Cerrar conexión"""
if self.websocket:
await self.websocket.close()
self.conectado = False
print("🔌 Conexión cerrada")
async def ping(self) -> bool:
"""Probar conexión con el ESP32"""
if not self.websocket:
return False
try:
await self.websocket.send(json.dumps({"cmd": "PING"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
return data.get("status") == "ok"
except:
return False
async def obtener_estado(self) -> dict:
"""Obtener estadísticas del ESP32"""
if not self.websocket:
return {}
try:
await self.websocket.send(json.dumps({"cmd": "STATUS"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
return json.loads(response)
except:
return {}
async def detener(self) -> bool:
"""Detener reproducción"""
if not self.websocket:
return False
try:
await self.websocket.send(json.dumps({"cmd": "STOP"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
return data.get("status") == "ok"
except:
return False
async def enviar_audio(self, audio_data: bytes, nombre: str = "audio") -> bool:
"""
Enviar audio al ESP32
Args:
audio_data: Datos de audio en formato PCM (16kHz, 16bits, mono)
nombre: Nombre identificador (para logs)
"""
if not self.websocket:
print("❌ No hay conexión")
return False
total_chunks = (len(audio_data) + self.chunk_size - 1) // self.chunk_size
print(f"📤 Enviando {total_chunks} chunks ({len(audio_data)} bytes) para '{nombre}'")
inicio = time.time()
for i, chunk_start in enumerate(range(0, len(audio_data), self.chunk_size)):
chunk = audio_data[chunk_start:chunk_start + self.chunk_size]
await self.websocket.send(chunk)
# Mostrar progreso cada 10 chunks o al final
if (i + 1) % 10 == 0 or i == total_chunks - 1:
porcentaje = ((i + 1) * 100) // total_chunks
print(f" 📊 Progreso: {porcentaje}% ({i+1}/{total_chunks} chunks)")
# Pequeña pausa para no saturar
await asyncio.sleep(0.005)
elapsed = time.time() - inicio
print(f"✅ Audio enviado en {elapsed:.2f} segundos")
return True
# ==================== GENERADORES DE AUDIO ====================
def generar_tono(frecuencia: int = 440, duracion_ms: int = 2000,
sample_rate: int = 16000, amplitud: int = 16000) -> bytes:
"""
Generar un tono seno en formato PCM
Args:
frecuencia: Frecuencia del tono en Hz
duracion_ms: Duración en milisegundos
sample_rate: Frecuencia de muestreo
amplitud: Amplitud máxima (0-32767)
"""
num_muestras = int(sample_rate * duracion_ms / 1000)
audio = bytearray()
for i in range(num_muestras):
valor = int(amplitud * math.sin(2 * math.pi * frecuencia * i / sample_rate))
audio.extend(struct.pack('<h', valor)) # little-endian, 16 bits
return bytes(audio)
def generar_melodia_bienvenida() -> bytes:
"""Generar melodía de bienvenida (Do-Re-Mi-Fa-Sol)"""
sample_rate = 16000
duracion_nota = 500 # ms por nota
notas = [261, 293, 329, 349, 392, 440] # Do, Re, Mi, Fa, Sol, La
audio = bytearray()
for nota in notas:
nota_audio = generar_tono(nota, duracion_nota, sample_rate, amplitud=12000)
audio.extend(nota_audio)
return bytes(audio)
def generar_saludo_personalizado(nombre: str) -> bytes:
"""
Generar un saludo personalizado (versión simple)
En un caso real, aquí usarías un servicio TTS
"""
# Usar frecuencia diferente según la longitud del nombre
frecuencia_base = 440
frecuencia = frecuencia_base + (len(nombre) * 10)
# Limitar frecuencia máxima
if frecuencia > 800:
frecuencia = 800
return generar_tono(frecuencia, duracion_ms=2000)
# ==================== MENÚ PRINCIPAL ====================
def mostrar_menu():
"""Muestra el menú principal"""
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE - CONTROL")
print("=" * 50)
print("\n📋 Opciones disponibles:")
print(" 1. 🔊 Enviar tono de prueba (440Hz)")
print(" 2. 🎵 Enviar melodía de bienvenida")
print(" 3. 💬 Enviar saludo personalizado")
print(" 4. 📊 Ver estado del ESP32")
print(" 5. 🏓 Probar ping")
print(" 6. 🔇 Detener reproducción")
print(" 7. 🔄 Reconectar")
print(" 8. 🚪 Salir")
print("-" * 50)
# ==================== MODO INTERACTIVO ====================
async def modo_interactivo():
"""Modo interactivo con entrada de IP y nombre"""
# Limpiar pantalla
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE")
print("=" * 50)
# Solicitar IP
print("\n📡 Configuración de conexión:")
ip_default = "192.168.15.128"
ip = input(f" IP del ESP32 [{ip_default}]: ").strip()
if not ip:
ip = ip_default
# Solicitar nombre por defecto para saludos
nombre_default = "Visitante"
nombre = input(f" Nombre por defecto [{nombre_default}]: ").strip()
if not nombre:
nombre = nombre_default
# Crear instancia
bocina = BocinaInteligente(ip)
# Conectar
print("\n🔄 Conectando...")
if not await bocina.conectar():
print("\n❌ No se pudo conectar al ESP32")
print(" Verifica que:")
print(f" 1. La IP {ip} sea correcta")
print(" 2. El ESP32 esté encendido")
print(" 3. Estés en la misma red WiFi")
input("\n Presiona Enter para salir...")
return
print("\n✅ ¡Conectado exitosamente!")
print(f" 📡 IP: {ip}")
print(f" 👤 Nombre: {nombre}")
# Bucle principal
while True:
mostrar_menu()
opcion = input("\n👉 Selecciona una opción (1-8): ").strip()
if opcion == "1":
print("\n🔊 Enviando tono de prueba (440Hz)...")
audio = generar_tono(440, 2000)
await bocina.enviar_audio(audio, "Tono 440Hz")
elif opcion == "2":
print("\n🎵 Enviando melodía de bienvenida...")
audio = generar_melodia_bienvenida()
await bocina.enviar_audio(audio, "Melodía")
elif opcion == "3":
# Pedir nombre específico para este saludo
nombre_saludo = input(f" 👤 Nombre (Enter para usar '{nombre}'): ").strip()
if not nombre_saludo:
nombre_saludo = nombre
print(f"\n🔊 Generando saludo para '{nombre_saludo}'...")
audio = generar_saludo_personalizado(nombre_saludo)
await bocina.enviar_audio(audio, nombre_saludo)
elif opcion == "4":
print("\n📊 Obteniendo estado del ESP32...")
estado = await bocina.obtener_estado()
if estado:
print("\n 📡 Estado del sistema:")
print(f" Status: {estado.get('status', 'desconocido')}")
print(f" 📦 Bytes recibidos: {estado.get('bytes_recibidos', 0)}")
print(f" 🔢 Chunks recibidos: {estado.get('chunks_recibidos', 0)}")
print(f" 🎵 Audio activo: {'✅ Sí' if estado.get('audio_activo') else '❌ No'}")
print(f" 📶 WiFi RSSI: {estado.get('wifi_rssi', 0)} dBm")
else:
print(" ❌ No se pudo obtener estado")
elif opcion == "5":
print("\n🏓 Probando ping...")
inicio = time.time()
if await bocina.ping():
latencia = (time.time() - inicio) * 1000
print(f" ✅ PONG recibido (latencia: {latencia:.0f}ms)")
else:
print(" ❌ Sin respuesta - verifica la conexión")
elif opcion == "6":
print("\n🔇 Deteniendo reproducción...")
if await bocina.detener():
print(" ✅ Reproducción detenida")
else:
print(" ⚠️ No se pudo detener o ya estaba detenido")
elif opcion == "7":
print("\n🔄 Reconectando...")
await bocina.desconectar()
await asyncio.sleep(1)
if await bocina.conectar():
print(" ✅ Reconectado exitosamente")
else:
print(" ❌ Error al reconectar")
elif opcion == "8":
print("\n👋 Saliendo...")
break
else:
print("❌ Opción inválida")
# Pequeña pausa antes de volver al menú
await asyncio.sleep(0.5)
# Cerrar conexión
await bocina.desconectar()
print("\n✅ Programa finalizado")
# ==================== MODO RÁPIDO ====================
async def modo_rapido():
"""Modo rápido: pide IP y nombre y envía saludo inmediato"""
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE - MODO RÁPIDO")
print("=" * 50)
# Solicitar IP
print("\n📡 Configuración:")
ip_default = "192.168.15.128"
ip = input(f" IP del ESP32 [{ip_default}]: ").strip()
if not ip:
ip = ip_default
# Solicitar nombre
nombre = input(" 👤 Nombre de la persona: ").strip()
if not nombre:
nombre = "Visitante"
# Conectar y enviar
bocina = BocinaInteligente(ip)
print("\n🔄 Conectando...")
if not await bocina.conectar():
print("❌ No se pudo conectar")
input("\nPresiona Enter para salir...")
return
print(f"\n🔊 Enviando saludo para '{nombre}'...")
audio = generar_saludo_personalizado(nombre)
await bocina.enviar_audio(audio, nombre)
# Mostrar estado
await asyncio.sleep(1)
estado = await bocina.obtener_estado()
if estado:
print(f"\n📊 Enviados: {estado.get('bytes_recibidos', 0)} bytes")
await bocina.desconectar()
print("\n✅ Saludo enviado!")
input("\nPresiona Enter para salir...")
# ==================== MAIN ====================
async def main():
"""Función principal"""
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE v1.0")
print("=" * 50)
print("\nSelecciona modo de operación:")
print(" 1. 🎮 Modo interactivo (menú completo)")
print(" 2. ⚡ Modo rápido (solo enviar saludo)")
print(" 3. 🚪 Salir")
modo = input("\n👉 Opción (1-3): ").strip()
if modo == "1":
await modo_interactivo()
elif modo == "2":
await modo_rapido()
else:
print("\n👋 Hasta luego!")
return
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n👋 Programa interrumpido")
except Exception as e:
print(f"\n❌ Error: {e}")
input("\nPresiona Enter para salir...")

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.7 KiB

BIN
db_institucion/Ian Axel.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.0 KiB

View File

@ -224,7 +224,7 @@ def main():
threading.Thread(target=worker_rostros, args=(global_mem,), daemon=True).start() threading.Thread(target=worker_rostros, args=(global_mem,), daemon=True).start()
cv2.namedWindow("SmartSoft", cv2.WINDOW_NORMAL) cv2.namedWindow("SmartSoft", cv2.WINDOW_AUTOSIZE)
idx = 0 idx = 0
while True: while True:

View File

@ -10,12 +10,6 @@ from queue import Queue
from deepface import DeepFace from deepface import DeepFace
from ultralytics import YOLO from ultralytics import YOLO
import warnings import warnings
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Usando dispositivo: {device}")
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Usando dispositivo: {device}")
warnings.filterwarnings("ignore") warnings.filterwarnings("ignore")
@ -301,7 +295,7 @@ def dibujar_track_fusion(frame_show, trk, global_mem):
def main(): def main():
print("\nIniciando Sistema") print("\nIniciando Sistema")
model = YOLO("yolov8n.pt").to("cuda") model = YOLO("yolov8n.pt")
global_mem = GlobalMemory() global_mem = GlobalMemory()
managers = {str(c): CamManager(c, global_mem) for c in SECUENCIA} managers = {str(c): CamManager(c, global_mem) for c in SECUENCIA}
cams = [CamStream(u) for u in URLS] cams = [CamStream(u) for u in URLS]
@ -309,7 +303,7 @@ def main():
for _ in range(2): for _ in range(2):
threading.Thread(target=worker_rostros, args=(global_mem,), daemon=True).start() threading.Thread(target=worker_rostros, args=(global_mem,), daemon=True).start()
cv2.namedWindow("SmartSoft", cv2.WINDOW_NORMAL) cv2.namedWindow("SmartSoft", cv2.WINDOW_AUTOSIZE)
idx = 0 idx = 0
while True: while True:

View File

@ -1,110 +1,110 @@
import cv2 import cv2
import os import os
import json import json
import numpy as np import numpy as np
# ⚡ Importamos tus motores exactos para no romper la simetría # ⚡ Importamos tus motores exactos para no romper la simetría
from reconocimiento2 import detectar_rostros_yunet, gestionar_vectores from reconocimiento2 import detectar_rostros_yunet, gestionar_vectores
def registrar_desde_webcam(): def registrar_desde_webcam():
print("\n" + "="*50) print("\n" + "="*50)
print("📸 MÓDULO DE REGISTRO LIMPIO (WEBCAM LOCAL)") print("📸 MÓDULO DE REGISTRO LIMPIO (WEBCAM LOCAL)")
print("Alinea tu rostro, mira a la cámara con buena luz.") print("Alinea tu rostro, mira a la cámara con buena luz.")
print("Presiona [R] para capturar | [Q] para salir") print("Presiona [R] para capturar | [Q] para salir")
print("="*50 + "\n") print("="*50 + "\n")
DB_PATH = "db_institucion" DB_PATH = "db_institucion"
CACHE_PATH = "cache_nombres" CACHE_PATH = "cache_nombres"
os.makedirs(DB_PATH, exist_ok=True) os.makedirs(DB_PATH, exist_ok=True)
os.makedirs(CACHE_PATH, exist_ok=True) os.makedirs(CACHE_PATH, exist_ok=True)
# 0 es la cámara por defecto de tu laptop # 0 es la cámara por defecto de tu laptop
cap = cv2.VideoCapture(0) cap = cv2.VideoCapture(0)
if not cap.isOpened(): if not cap.isOpened():
print("[!] Error: No se pudo abrir la webcam local.") print("[!] Error: No se pudo abrir la webcam local.")
return return
while True: while True:
ret, frame = cap.read() ret, frame = cap.read()
if not ret: continue if not ret: continue
# Espejamos la imagen para que actúe como un espejo natural # Espejamos la imagen para que actúe como un espejo natural
frame = cv2.flip(frame, 1) frame = cv2.flip(frame, 1)
display_frame = frame.copy() display_frame = frame.copy()
# Usamos YuNet para garantizar que estamos capturando una cara válida # Usamos YuNet para garantizar que estamos capturando una cara válida
faces = detectar_rostros_yunet(frame) faces = detectar_rostros_yunet(frame)
mejor_rostro = None mejor_rostro = None
max_area = 0 max_area = 0
for (fx, fy, fw, fh, score) in faces: for (fx, fy, fw, fh, score) in faces:
area = fw * fh area = fw * fh
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (0, 255, 0), 2) cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (0, 255, 0), 2)
if area > max_area: if area > max_area:
max_area = area max_area = area
h_frame, w_frame = frame.shape[:2] h_frame, w_frame = frame.shape[:2]
# Mismo margen del 30% que requiere MTCNN para alinear correctamente # Mismo margen del 30% que requiere MTCNN para alinear correctamente
m_x, m_y = int(fw * 0.30), int(fh * 0.30) m_x, m_y = int(fw * 0.30), int(fh * 0.30)
y1 = max(0, fy - m_y) y1 = max(0, fy - m_y)
y2 = min(h_frame, fy + fh + m_y) y2 = min(h_frame, fy + fh + m_y)
x1 = max(0, fx - m_x) x1 = max(0, fx - m_x)
x2 = min(w_frame, fx + fw + m_x) x2 = min(w_frame, fx + fw + m_x)
mejor_rostro = frame[y1:y2, x1:x2] mejor_rostro = frame[y1:y2, x1:x2]
cv2.putText(display_frame, "Alineate y presiona [R]", (10, 30), cv2.putText(display_frame, "Alineate y presiona [R]", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.imshow("Registro Webcam Local", display_frame) cv2.imshow("Registro Webcam Local", display_frame)
key = cv2.waitKey(1) & 0xFF key = cv2.waitKey(1) & 0xFF
if key == ord('q'): if key == ord('q'):
break break
elif key == ord('r'): elif key == ord('r'):
if mejor_rostro is not None and mejor_rostro.size > 0: if mejor_rostro is not None and mejor_rostro.size > 0:
cv2.imshow("Captura Congelada", mejor_rostro) cv2.imshow("Captura Congelada", mejor_rostro)
cv2.waitKey(1) cv2.waitKey(1)
print("\n--- NUEVO REGISTRO ---") print("\n--- NUEVO REGISTRO ---")
nom = input("Escribe el nombre exacto de la persona: ").strip() nom = input("Escribe el nombre exacto de la persona: ").strip()
if nom: if nom:
gen_input = input("¿Es Hombre (h) o Mujer (m)?: ").strip().lower() gen_input = input("¿Es Hombre (h) o Mujer (m)?: ").strip().lower()
genero_guardado = "Woman" if gen_input == 'm' else "Man" genero_guardado = "Woman" if gen_input == 'm' else "Man"
# 1. Guardamos la foto pura # 1. Guardamos la foto pura
foto_path = os.path.join(DB_PATH, f"{nom}.jpg") foto_path = os.path.join(DB_PATH, f"{nom}.jpg")
cv2.imwrite(foto_path, mejor_rostro) cv2.imwrite(foto_path, mejor_rostro)
# 2. Actualizamos el caché de géneros sin usar IA # 2. Actualizamos el caché de géneros sin usar IA
ruta_generos = os.path.join(CACHE_PATH, "generos.json") ruta_generos = os.path.join(CACHE_PATH, "generos.json")
dic_generos = {} dic_generos = {}
if os.path.exists(ruta_generos): if os.path.exists(ruta_generos):
try: try:
with open(ruta_generos, 'r') as f: with open(ruta_generos, 'r') as f:
dic_generos = json.load(f) dic_generos = json.load(f)
except Exception: pass except Exception: pass
dic_generos[nom] = genero_guardado dic_generos[nom] = genero_guardado
with open(ruta_generos, 'w') as f: with open(ruta_generos, 'w') as f:
json.dump(dic_generos, f) json.dump(dic_generos, f)
print(f"\n[OK] Foto guardada. Generando punto de gravedad matemático...") print(f"\n[OK] Foto guardada. Generando punto de gravedad matemático...")
# 3. Forzamos la creación del vector en la base de datos # 3. Forzamos la creación del vector en la base de datos
gestionar_vectores(actualizar=True) gestionar_vectores(actualizar=True)
print(" Registro inyectado exitosamente en el sistema principal.") print(" Registro inyectado exitosamente en el sistema principal.")
else: else:
print("[!] Registro cancelado por nombre vacío.") print("[!] Registro cancelado por nombre vacío.")
cv2.destroyWindow("Captura Congelada") cv2.destroyWindow("Captura Congelada")
else: else:
print("[!] No se detectó ningún rostro claro. Acércate más a la luz.") print("[!] No se detectó ningún rostro claro. Acércate más a la luz.")
cap.release() cap.release()
cv2.destroyAllWindows() cv2.destroyAllWindows()
if __name__ == "__main__": if __name__ == "__main__":
registrar_desde_webcam() registrar_desde_webcam()

View File

@ -1,43 +1,43 @@
import cv2 import cv2
import time import time
from ultralytics import YOLO from ultralytics import YOLO
from seguimiento2 import GlobalMemory, CamManager, dibujar_track from seguimiento2 import GlobalMemory, CamManager, dibujar_track
def test_video(video_path): def test_video(video_path):
print(f"Iniciando Benchmark de Video: {video_path}") print(f"Iniciando Benchmark de Video: {video_path}")
model = YOLO("yolov8n.pt") model = YOLO("yolov8n.pt")
global_mem = GlobalMemory() global_mem = GlobalMemory()
manager = CamManager("TEST_CAM", global_mem) manager = CamManager("TEST_CAM", global_mem)
cap = cv2.VideoCapture(video_path) cap = cv2.VideoCapture(video_path)
cv2.namedWindow("Benchmark TT", cv2.WINDOW_AUTOSIZE) cv2.namedWindow("Benchmark TT", cv2.WINDOW_AUTOSIZE)
while cap.isOpened(): while cap.isOpened():
ret, frame = cap.read() ret, frame = cap.read()
if not ret: break if not ret: break
now = time.time() now = time.time()
frame_show = cv2.resize(frame, (480, 270)) frame_show = cv2.resize(frame, (480, 270))
# Inferencia frame por frame sin hilos (sincrónico) # Inferencia frame por frame sin hilos (sincrónico)
res = model.predict(frame_show, conf=0.40, iou=0.50, classes=[0], verbose=False, imgsz=480, device='cpu') res = model.predict(frame_show, conf=0.40, iou=0.50, classes=[0], verbose=False, imgsz=480, device='cpu')
boxes = res[0].boxes.xyxy.cpu().numpy().tolist() if res[0].boxes else [] boxes = res[0].boxes.xyxy.cpu().numpy().tolist() if res[0].boxes else []
tracks = manager.update(boxes, frame_show, now, turno_activo=True) tracks = manager.update(boxes, frame_show, now, turno_activo=True)
for trk in tracks: for trk in tracks:
if trk.time_since_update == 0: if trk.time_since_update == 0:
dibujar_track(frame_show, trk) dibujar_track(frame_show, trk)
cv2.imshow("Benchmark TT", frame_show) cv2.imshow("Benchmark TT", frame_show)
# Si presionas espacio se pausa, con 'q' sales # Si presionas espacio se pausa, con 'q' sales
key = cv2.waitKey(30) & 0xFF key = cv2.waitKey(30) & 0xFF
if key == ord('q'): break if key == ord('q'): break
elif key == ord(' '): cv2.waitKey(-1) elif key == ord(' '): cv2.waitKey(-1)
cap.release() cap.release()
cv2.destroyAllWindows() cv2.destroyAllWindows()
if __name__ == "__main__": if __name__ == "__main__":
test_video("video.mp4") # Pon aquí el nombre de tu video test_video("video.mp4") # Pon aquí el nombre de tu video

View File

@ -1,481 +1,465 @@
import os import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['CUDA_VISIBLE_DEVICES'] = '-1' os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import cv2 import cv2
import numpy as np import numpy as np
from deepface import DeepFace from deepface import DeepFace
import pickle import pickle
import time import time
import threading import threading
import asyncio import asyncio
import edge_tts import edge_tts
import subprocess import subprocess
from datetime import datetime from datetime import datetime
import warnings import warnings
import urllib.request import urllib.request
import torch
warnings.filterwarnings("ignore")
if torch.cuda.is_available():
device = "cuda" # ──────────────────────────────────────────────────────────────────────────────
print("GPU detectada → usando GPU 🚀") # CONFIGURACIÓN
else: # ──────────────────────────────────────────────────────────────────────────────
device = "cpu" DB_PATH = "db_institucion"
print("GPU no disponible → usando CPU ⚠️") CACHE_PATH = "cache_nombres"
import torch VECTORS_FILE = "base_datos_rostros.pkl"
TIMESTAMPS_FILE = "representaciones_timestamps.pkl"
if torch.cuda.is_available(): UMBRAL_SIM = 0.42 # Por encima → identificado. Por debajo → desconocido.
device = "cuda" COOLDOWN_TIME = 15 # Segundos entre saludos
print("GPU detectada → usando GPU 🚀")
else: USUARIO, PASSWORD, IP_DVR = "admin", "TCA200503", "192.168.1.244"
device = "cpu" RTSP_URL = f"rtsp://{USUARIO}:{PASSWORD}@{IP_DVR}:554/Streaming/Channels/702"
print("GPU no disponible → usando CPU ⚠️")
for path in [DB_PATH, CACHE_PATH]:
warnings.filterwarnings("ignore") os.makedirs(path, exist_ok=True)
# ────────────────────────────────────────────────────────────────────────────── # ──────────────────────────────────────────────────────────────────────────────
# CONFIGURACIÓN # YUNET — Detector facial rápido en CPU
# ────────────────────────────────────────────────────────────────────────────── # ──────────────────────────────────────────────────────────────────────────────
DB_PATH = "db_institucion" YUNET_MODEL_PATH = "face_detection_yunet_2023mar.onnx"
CACHE_PATH = "cache_nombres"
VECTORS_FILE = "base_datos_rostros.pkl" if not os.path.exists(YUNET_MODEL_PATH):
TIMESTAMPS_FILE = "representaciones_timestamps.pkl" print(f"Descargando YuNet ({YUNET_MODEL_PATH})...")
UMBRAL_SIM = 0.42 # Por encima → identificado. Por debajo → desconocido. url = ("https://github.com/opencv/opencv_zoo/raw/main/models/"
COOLDOWN_TIME = 15 # Segundos entre saludos "face_detection_yunet/face_detection_yunet_2023mar.onnx")
urllib.request.urlretrieve(url, YUNET_MODEL_PATH)
USUARIO, PASSWORD, IP_DVR = "admin", "TCA200503", "192.168.1.244" print("YuNet descargado.")
RTSP_URL = f"rtsp://{USUARIO}:{PASSWORD}@{IP_DVR}:554/Streaming/Channels/702"
# Detector estricto para ROIs grandes (persona cerca)
for path in [DB_PATH, CACHE_PATH]: detector_yunet = cv2.FaceDetectorYN.create(
os.makedirs(path, exist_ok=True) model=YUNET_MODEL_PATH, config="",
input_size=(320, 320),
# ────────────────────────────────────────────────────────────────────────────── score_threshold=0.70,
# YUNET — Detector facial rápido en CPU nms_threshold=0.3,
# ────────────────────────────────────────────────────────────────────────────── top_k=5000
YUNET_MODEL_PATH = "face_detection_yunet_2023mar.onnx" )
if not os.path.exists(YUNET_MODEL_PATH): # Detector permisivo para ROIs pequeños (persona lejos)
print(f"Descargando YuNet ({YUNET_MODEL_PATH})...") detector_yunet_lejano = cv2.FaceDetectorYN.create(
url = ("https://github.com/opencv/opencv_zoo/raw/main/models/" model=YUNET_MODEL_PATH, config="",
"face_detection_yunet/face_detection_yunet_2023mar.onnx") input_size=(320, 320),
urllib.request.urlretrieve(url, YUNET_MODEL_PATH) score_threshold=0.45,
print("YuNet descargado.") nms_threshold=0.3,
top_k=5000
# Detector estricto para ROIs grandes (persona cerca) )
detector_yunet = cv2.FaceDetectorYN.create(
model=YUNET_MODEL_PATH, config="", def detectar_rostros_yunet(roi, lock=None):
input_size=(320, 320), """
score_threshold=0.70, Elige automáticamente el detector según el tamaño del ROI.
nms_threshold=0.3, """
top_k=5000 h_roi, w_roi = roi.shape[:2]
) area = w_roi * h_roi
det = detector_yunet if area > 8000 else detector_yunet_lejano
# Detector permisivo para ROIs pequeños (persona lejos)
detector_yunet_lejano = cv2.FaceDetectorYN.create( try:
model=YUNET_MODEL_PATH, config="", if lock:
input_size=(320, 320), with lock:
score_threshold=0.45, det.setInputSize((w_roi, h_roi))
nms_threshold=0.3, _, faces = det.detect(roi)
top_k=5000 else:
) det.setInputSize((w_roi, h_roi))
_, faces = det.detect(roi)
def detectar_rostros_yunet(roi, lock=None): except Exception:
""" return []
Elige automáticamente el detector según el tamaño del ROI.
""" if faces is None:
h_roi, w_roi = roi.shape[:2] return []
area = w_roi * h_roi
det = detector_yunet if area > 8000 else detector_yunet_lejano resultado = []
for face in faces:
try: try:
if lock: fx, fy, fw, fh = map(int, face[:4])
with lock: score = float(face[14]) if len(face) > 14 else 1.0
det.setInputSize((w_roi, h_roi)) resultado.append((fx, fy, fw, fh, score))
_, faces = det.detect(roi) except (ValueError, OverflowError, TypeError):
else: continue
det.setInputSize((w_roi, h_roi)) return resultado
_, faces = det.detect(roi)
except Exception:
return [] # ──────────────────────────────────────────────────────────────────────────────
# SISTEMA DE AUDIO
if faces is None: # ──────────────────────────────────────────────────────────────────────────────
return [] def obtener_audios_humanos(genero):
hora = datetime.now().hour
resultado = [] es_mujer = genero.lower() == 'woman'
for face in faces: suffix = "_m.mp3" if es_mujer else "_h.mp3"
try: if 5 <= hora < 12:
fx, fy, fw, fh = map(int, face[:4]) intro = "dias.mp3"
score = float(face[14]) if len(face) > 14 else 1.0 elif 12 <= hora < 19:
resultado.append((fx, fy, fw, fh, score)) intro = "tarde.mp3"
except (ValueError, OverflowError, TypeError): else:
continue intro = "noches.mp3"
return resultado cierre = ("fin_noche" if (hora >= 19 or hora < 5) else "fin_dia") + suffix
return intro, cierre
# ──────────────────────────────────────────────────────────────────────────────
# SISTEMA DE AUDIO async def sintetizar_nombre(nombre, ruta):
# ────────────────────────────────────────────────────────────────────────────── nombre_limpio = nombre.replace('_', ' ')
def obtener_audios_humanos(genero): try:
hora = datetime.now().hour comunicador = edge_tts.Communicate(nombre_limpio, "es-MX-DaliaNeural", rate="+10%")
es_mujer = genero.lower() == 'woman' await comunicador.save(ruta)
suffix = "_m.mp3" if es_mujer else "_h.mp3" except Exception:
if 5 <= hora < 12: pass
intro = "dias.mp3"
elif 12 <= hora < 19:
intro = "tarde.mp3" def reproducir(archivo):
else: if os.path.exists(archivo):
intro = "noches.mp3" subprocess.Popen(
cierre = ("fin_noche" if (hora >= 19 or hora < 5) else "fin_dia") + suffix ["mpv", "--no-video", "--volume=100", archivo],
return intro, cierre stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
async def sintetizar_nombre(nombre, ruta):
nombre_limpio = nombre.replace('_', ' ')
try: def hilo_bienvenida(nombre, genero):
comunicador = edge_tts.Communicate(nombre_limpio, "es-MX-DaliaNeural", rate="+10%") archivo_nombre = os.path.join(CACHE_PATH, f"nombre_{nombre}.mp3")
await comunicador.save(ruta)
except Exception: if not os.path.exists(archivo_nombre):
pass try:
asyncio.run(sintetizar_nombre(nombre, archivo_nombre))
except Exception:
def reproducir(archivo): pass
if os.path.exists(archivo):
subprocess.Popen( intro, cierre = obtener_audios_humanos(genero)
["mpv", "--no-video", "--volume=100", archivo],
stdout=subprocess.DEVNULL, archivos = [f for f in [intro, archivo_nombre, cierre] if os.path.exists(f)]
stderr=subprocess.DEVNULL if archivos:
) subprocess.Popen(
["mpv", "--no-video", "--volume=100"] + archivos,
stdout=subprocess.DEVNULL,
def hilo_bienvenida(nombre, genero): stderr=subprocess.DEVNULL
archivo_nombre = os.path.join(CACHE_PATH, f"nombre_{nombre}.mp3") )
if not os.path.exists(archivo_nombre):
try: # ──────────────────────────────────────────────────────────────────────────────
asyncio.run(sintetizar_nombre(nombre, archivo_nombre)) # GESTIÓN DE BASE DE DATOS (AHORA CON RETINAFACE Y ALINEACIÓN)
except Exception: # ──────────────────────────────────────────────────────────────────────────────
pass def gestionar_vectores(actualizar=False):
import json # ⚡ Asegúrate de tener importado json
intro, cierre = obtener_audios_humanos(genero)
vectores_actuales = {}
archivos = [f for f in [intro, archivo_nombre, cierre] if os.path.exists(f)] if os.path.exists(VECTORS_FILE):
if archivos: try:
subprocess.Popen( with open(VECTORS_FILE, 'rb') as f:
["mpv", "--no-video", "--volume=100"] + archivos, vectores_actuales = pickle.load(f)
stdout=subprocess.DEVNULL, except Exception:
stderr=subprocess.DEVNULL vectores_actuales = {}
)
if not actualizar:
return vectores_actuales
# ──────────────────────────────────────────────────────────────────────────────
# GESTIÓN DE BASE DE DATOS (AHORA CON RETINAFACE Y ALINEACIÓN) timestamps = {}
# ────────────────────────────────────────────────────────────────────────────── if os.path.exists(TIMESTAMPS_FILE):
def gestionar_vectores(actualizar=False): try:
import json # ⚡ Asegúrate de tener importado json with open(TIMESTAMPS_FILE, 'rb') as f:
timestamps = pickle.load(f)
vectores_actuales = {} except Exception:
if os.path.exists(VECTORS_FILE): timestamps = {}
try:
with open(VECTORS_FILE, 'rb') as f: # ──────────────────────────────────────────────────────────
vectores_actuales = pickle.load(f) # CARGA DEL CACHÉ DE GÉNEROS
except Exception: # ──────────────────────────────────────────────────────────
vectores_actuales = {} ruta_generos = os.path.join(CACHE_PATH, "generos.json")
dic_generos = {}
if not actualizar: if os.path.exists(ruta_generos):
return vectores_actuales try:
with open(ruta_generos, 'r') as f:
timestamps = {} dic_generos = json.load(f)
if os.path.exists(TIMESTAMPS_FILE): except Exception:
try: pass
with open(TIMESTAMPS_FILE, 'rb') as f:
timestamps = pickle.load(f) print("\nACTUALIZANDO BASE DE DATOS (Alineación y Caché de Géneros)...")
except Exception: imagenes = [f for f in os.listdir(DB_PATH) if f.lower().endswith(('.jpg', '.png'))]
timestamps = {} nombres_en_disco = set()
hubo_cambios = False
# ────────────────────────────────────────────────────────── cambio_generos = False # Bandera para saber si actualizamos el JSON
# CARGA DEL CACHÉ DE GÉNEROS
# ────────────────────────────────────────────────────────── for archivo in imagenes:
ruta_generos = os.path.join(CACHE_PATH, "generos.json") nombre_archivo = os.path.splitext(archivo)[0]
dic_generos = {} ruta_img = os.path.join(DB_PATH, archivo)
if os.path.exists(ruta_generos): nombres_en_disco.add(nombre_archivo)
try:
with open(ruta_generos, 'r') as f: ts_actual = os.path.getmtime(ruta_img)
dic_generos = json.load(f) ts_guardado = timestamps.get(nombre_archivo, 0)
except Exception:
pass # Si ya tenemos el vector pero NO tenemos su género en el JSON, forzamos el procesamiento
falta_genero = nombre_archivo not in dic_generos
print("\nACTUALIZANDO BASE DE DATOS (Alineación y Caché de Géneros)...")
imagenes = [f for f in os.listdir(DB_PATH) if f.lower().endswith(('.jpg', '.png'))] if nombre_archivo in vectores_actuales and ts_actual == ts_guardado and not falta_genero:
nombres_en_disco = set() continue
hubo_cambios = False
cambio_generos = False # Bandera para saber si actualizamos el JSON try:
img_db = cv2.imread(ruta_img)
for archivo in imagenes: lab = cv2.cvtColor(img_db, cv2.COLOR_BGR2LAB)
nombre_archivo = os.path.splitext(archivo)[0] l, a, b = cv2.split(lab)
ruta_img = os.path.join(DB_PATH, archivo) clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
nombres_en_disco.add(nombre_archivo) l = clahe.apply(l)
img_mejorada = cv2.cvtColor(cv2.merge((l, a, b)), cv2.COLOR_LAB2BGR)
ts_actual = os.path.getmtime(ruta_img)
ts_guardado = timestamps.get(nombre_archivo, 0) # IA DE GÉNERO (Solo se ejecuta 1 vez por persona en toda la vida del sistema)
if falta_genero:
# Si ya tenemos el vector pero NO tenemos su género en el JSON, forzamos el procesamiento try:
falta_genero = nombre_archivo not in dic_generos analisis = DeepFace.analyze(img_mejorada, actions=['gender'], enforce_detection=False)[0]
dic_generos[nombre_archivo] = analisis.get('dominant_gender', 'Man')
if nombre_archivo in vectores_actuales and ts_actual == ts_guardado and not falta_genero: except Exception:
continue dic_generos[nombre_archivo] = "Man" # Respaldo
cambio_generos = True
try:
img_db = cv2.imread(ruta_img) # Extraemos el vector
lab = cv2.cvtColor(img_db, cv2.COLOR_BGR2LAB) res = DeepFace.represent(
l, a, b = cv2.split(lab) img_path=img_mejorada,
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) model_name="ArcFace",
l = clahe.apply(l) detector_backend="mtcnn",
img_mejorada = cv2.cvtColor(cv2.merge((l, a, b)), cv2.COLOR_LAB2BGR) align=True,
enforce_detection=True
# IA DE GÉNERO (Solo se ejecuta 1 vez por persona en toda la vida del sistema) )
if falta_genero: emb = np.array(res[0]["embedding"], dtype=np.float32)
try:
analisis = DeepFace.analyze(img_mejorada, actions=['gender'], enforce_detection=False)[0] norma = np.linalg.norm(emb)
dic_generos[nombre_archivo] = analisis.get('dominant_gender', 'Man') if norma > 0:
except Exception: emb = emb / norma
dic_generos[nombre_archivo] = "Man" # Respaldo
cambio_generos = True vectores_actuales[nombre_archivo] = emb
timestamps[nombre_archivo] = ts_actual
# Extraemos el vector hubo_cambios = True
res = DeepFace.represent( print(f" Procesado y alineado: {nombre_archivo} | Género: {dic_generos.get(nombre_archivo)}")
img_path=img_mejorada,
model_name="ArcFace", except Exception as e:
detector_backend="opencv", print(f" Rostro no válido en '{archivo}', omitido. Error: {e}")
align=False,
enforce_detection=True # Limpieza de eliminados
) for nombre in list(vectores_actuales.keys()):
emb = np.array(res[0]["embedding"], dtype=np.float32) if nombre not in nombres_en_disco:
del vectores_actuales[nombre]
norma = np.linalg.norm(emb) timestamps.pop(nombre, None)
if norma > 0: if nombre in dic_generos:
emb = emb / norma del dic_generos[nombre]
cambio_generos = True
vectores_actuales[nombre_archivo] = emb hubo_cambios = True
timestamps[nombre_archivo] = ts_actual print(f" Eliminado (sin foto): {nombre}")
hubo_cambios = True
print(f" Procesado y alineado: {nombre_archivo} | Género: {dic_generos.get(nombre_archivo)}") # Guardado de la memoria
if hubo_cambios:
except Exception as e: with open(VECTORS_FILE, 'wb') as f:
print(f" Rostro no válido en '{archivo}', omitido. Error: {e}") pickle.dump(vectores_actuales, f)
with open(TIMESTAMPS_FILE, 'wb') as f:
# Limpieza de eliminados pickle.dump(timestamps, f)
for nombre in list(vectores_actuales.keys()):
if nombre not in nombres_en_disco: # Guardado del JSON de géneros si hubo descubrimientos nuevos
del vectores_actuales[nombre] if cambio_generos:
timestamps.pop(nombre, None) with open(ruta_generos, 'w') as f:
if nombre in dic_generos: json.dump(dic_generos, f)
del dic_generos[nombre]
cambio_generos = True if hubo_cambios or cambio_generos:
hubo_cambios = True print(" Sincronización terminada.\n")
print(f" Eliminado (sin foto): {nombre}") else:
print(" Sin cambios. Base de datos al día.\n")
# Guardado de la memoria
if hubo_cambios: return vectores_actuales
with open(VECTORS_FILE, 'wb') as f:
pickle.dump(vectores_actuales, f) # ──────────────────────────────────────────────────────────────────────────────
with open(TIMESTAMPS_FILE, 'wb') as f: # BÚSQUEDA BLINDADA (Similitud Coseno estricta)
pickle.dump(timestamps, f) # ──────────────────────────────────────────────────────────────────────────────
def buscar_mejor_match(emb_consulta, base_datos):
# Guardado del JSON de géneros si hubo descubrimientos nuevos # ⚡ MAGIA 3: Normalización L2 del vector entrante
if cambio_generos: norma = np.linalg.norm(emb_consulta)
with open(ruta_generos, 'w') as f: if norma > 0:
json.dump(dic_generos, f) emb_consulta = emb_consulta / norma
if hubo_cambios or cambio_generos: mejor_match, max_sim = None, -1.0
print(" Sincronización terminada.\n") for nombre, vec in base_datos.items():
else: # Como ambos están normalizados, esto es Similitud Coseno pura (-1.0 a 1.0)
print(" Sin cambios. Base de datos al día.\n") sim = float(np.dot(emb_consulta, vec))
if sim > max_sim:
return vectores_actuales max_sim = sim
mejor_match = nombre
# ──────────────────────────────────────────────────────────────────────────────
# BÚSQUEDA BLINDADA (Similitud Coseno estricta) return mejor_match, max_sim
# ──────────────────────────────────────────────────────────────────────────────
def buscar_mejor_match(emb_consulta, base_datos): # ──────────────────────────────────────────────────────────────────────────────
# ⚡ MAGIA 3: Normalización L2 del vector entrante # LOOP DE PRUEBA Y REGISTRO (CON SIMETRÍA ESTRICTA)
norma = np.linalg.norm(emb_consulta) # ──────────────────────────────────────────────────────────────────────────────
if norma > 0: def sistema_interactivo():
emb_consulta = emb_consulta / norma base_datos = gestionar_vectores(actualizar=False)
cap = cv2.VideoCapture(RTSP_URL)
mejor_match, max_sim = None, -1.0 ultimo_saludo = 0
for nombre, vec in base_datos.items(): persona_actual = None
# Como ambos están normalizados, esto es Similitud Coseno pura (-1.0 a 1.0) confirmaciones = 0
sim = float(np.dot(emb_consulta, vec))
if sim > max_sim: print("\n" + "=" * 50)
max_sim = sim print(" MÓDULO DE REGISTRO Y DEPURACIÓN ESTRICTO")
mejor_match = nombre print(" [R] Registrar nuevo rostro | [Q] Salir")
print("=" * 50 + "\n")
return mejor_match, max_sim
faces_ultimo_frame = []
# ──────────────────────────────────────────────────────────────────────────────
# LOOP DE PRUEBA Y REGISTRO (CON SIMETRÍA ESTRICTA) while True:
# ────────────────────────────────────────────────────────────────────────────── ret, frame = cap.read()
def sistema_interactivo(): if not ret:
base_datos = gestionar_vectores(actualizar=False) time.sleep(2)
cap = cv2.VideoCapture(RTSP_URL) cap.open(RTSP_URL)
ultimo_saludo = 0 continue
persona_actual = None
confirmaciones = 0 h, w = frame.shape[:2]
display_frame = frame.copy()
print("\n" + "=" * 50) tiempo_actual = time.time()
print(" MÓDULO DE REGISTRO Y DEPURACIÓN ESTRICTO")
print(" [R] Registrar nuevo rostro | [Q] Salir") faces_raw = detectar_rostros_yunet(frame)
print("=" * 50 + "\n") faces_ultimo_frame = faces_raw
faces_ultimo_frame = [] for (fx, fy, fw, fh, score_yunet) in faces_raw:
fx = max(0, fx); fy = max(0, fy)
while True: fw = min(w - fx, fw); fh = min(h - fy, fh)
ret, frame = cap.read() if fw <= 0 or fh <= 0:
if not ret: continue
time.sleep(2)
cap.open(RTSP_URL) cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (255, 200, 0), 2)
continue cv2.putText(display_frame, f"YN:{score_yunet:.2f}",
(fx, fy - 25), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 200, 0), 1)
h, w = frame.shape[:2]
display_frame = frame.copy() if (tiempo_actual - ultimo_saludo) <= COOLDOWN_TIME:
tiempo_actual = time.time() continue
faces_raw = detectar_rostros_yunet(frame) m = int(fw * 0.15)
faces_ultimo_frame = faces_raw roi = frame[max(0, fy-m): min(h, fy+fh+m),
max(0, fx-m): min(w, fx+fw+m)]
for (fx, fy, fw, fh, score_yunet) in faces_raw:
fx = max(0, fx); fy = max(0, fy) # 🛡️ FILTRO DE TAMAÑO FÍSICO
fw = min(w - fx, fw); fh = min(h - fy, fh) if roi.size == 0 or roi.shape[0] < 40 or roi.shape[1] < 40:
if fw <= 0 or fh <= 0: cv2.putText(display_frame, "muy pequeno",
continue (fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 100, 255), 1)
continue
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (255, 200, 0), 2)
cv2.putText(display_frame, f"YN:{score_yunet:.2f}", # 🛡️ FILTRO DE NITIDEZ
(fx, fy - 25), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 200, 0), 1) gray_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
nitidez = cv2.Laplacian(gray_roi, cv2.CV_64F).var()
if (tiempo_actual - ultimo_saludo) <= COOLDOWN_TIME: if nitidez < 50.0:
continue cv2.putText(display_frame, f"blur({nitidez:.0f})",
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 165, 255), 1)
m = int(fw * 0.15) continue
roi = frame[max(0, fy-m): min(h, fy+fh+m),
max(0, fx-m): min(w, fx+fw+m)] # 🌙 SIMETRÍA 1: VISIÓN NOCTURNA (CLAHE) AL VIDEO EN VIVO
try:
# 🛡️ FILTRO DE TAMAÑO FÍSICO lab = cv2.cvtColor(roi, cv2.COLOR_BGR2LAB)
if roi.size == 0 or roi.shape[0] < 40 or roi.shape[1] < 40: l, a, b = cv2.split(lab)
cv2.putText(display_frame, "muy pequeno", clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 100, 255), 1) l = clahe.apply(l)
continue roi_mejorado = cv2.cvtColor(cv2.merge((l, a, b)), cv2.COLOR_LAB2BGR)
except Exception:
# 🛡️ FILTRO DE NITIDEZ roi_mejorado = roi # Respaldo de seguridad
gray_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
nitidez = cv2.Laplacian(gray_roi, cv2.CV_64F).var() # 🧠 SIMETRÍA 2: MOTOR MTCNN Y ALINEACIÓN (Igual que la Base de Datos)
if nitidez < 50.0: try:
cv2.putText(display_frame, f"blur({nitidez:.0f})", res = DeepFace.represent(
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 165, 255), 1) img_path=roi_mejorado,
continue model_name="ArcFace",
detector_backend="mtcnn", # El mismo que en gestionar_vectores
# 🌙 SIMETRÍA 1: VISIÓN NOCTURNA (CLAHE) AL VIDEO EN VIVO align=True, # Enderezamos la cara
try: enforce_detection=True # Si MTCNN no ve cara clara, aborta
lab = cv2.cvtColor(roi, cv2.COLOR_BGR2LAB) )
l, a, b = cv2.split(lab) emb = np.array(res[0]["embedding"], dtype=np.float32)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) mejor_match, max_sim = buscar_mejor_match(emb, base_datos)
l = clahe.apply(l)
roi_mejorado = cv2.cvtColor(cv2.merge((l, a, b)), cv2.COLOR_LAB2BGR) except Exception:
except Exception: # MTCNN abortó porque la cara estaba de perfil, tapada o no era una cara
roi_mejorado = roi # Respaldo de seguridad cv2.putText(display_frame, "MTCNN Ignorado",
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
# 🧠 SIMETRÍA 2: MOTOR MTCNN Y ALINEACIÓN (Igual que la Base de Datos) continue
try:
res = DeepFace.represent( estado = " IDENTIFICADO" if max_sim > UMBRAL_SIM else "DESCONOCIDO"
img_path=roi_mejorado, nombre_d = mejor_match.split('_')[0] if mejor_match else "nadie"
model_name="ArcFace", n_bloques = int(max_sim * 20)
detector_backend="mtcnn", # El mismo que en gestionar_vectores barra = "" * n_bloques + "" * (20 - n_bloques)
align=True, # Enderezamos la cara print(f"[REGISTRO] {estado} | {nombre_d:<14} | {barra} | "
enforce_detection=True # Si MTCNN no ve cara clara, aborta f"{max_sim*100:.1f}% (umbral: {UMBRAL_SIM*100:.0f}%)")
)
emb = np.array(res[0]["embedding"], dtype=np.float32) if max_sim > UMBRAL_SIM and mejor_match:
mejor_match, max_sim = buscar_mejor_match(emb, base_datos) color = (0, 255, 0)
texto = f"{mejor_match.split('_')[0]} ({max_sim:.2f})"
except Exception:
# MTCNN abortó porque la cara estaba de perfil, tapada o no era una cara if mejor_match == persona_actual:
cv2.putText(display_frame, "MTCNN Ignorado", confirmaciones += 1
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1) else:
continue persona_actual, confirmaciones = mejor_match, 1
estado = " IDENTIFICADO" if max_sim > UMBRAL_SIM else "DESCONOCIDO" if confirmaciones >= 1:
nombre_d = mejor_match.split('_')[0] if mejor_match else "nadie" cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (0, 255, 0), 3)
n_bloques = int(max_sim * 20) try:
barra = "" * n_bloques + "" * (20 - n_bloques) analisis = DeepFace.analyze(
print(f"[REGISTRO] {estado} | {nombre_d:<14} | {barra} | " roi_mejorado, actions=['gender'], enforce_detection=False
f"{max_sim*100:.1f}% (umbral: {UMBRAL_SIM*100:.0f}%)") )[0]
genero = analisis['dominant_gender']
if max_sim > UMBRAL_SIM and mejor_match: except Exception:
color = (0, 255, 0) genero = "Man"
texto = f"{mejor_match.split('_')[0]} ({max_sim:.2f})"
threading.Thread(
if mejor_match == persona_actual: target=hilo_bienvenida,
confirmaciones += 1 args=(mejor_match, genero),
else: daemon=True
persona_actual, confirmaciones = mejor_match, 1 ).start()
ultimo_saludo = tiempo_actual
if confirmaciones >= 1: confirmaciones = 0
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (0, 255, 0), 3)
try: else:
analisis = DeepFace.analyze( color = (0, 0, 255)
roi_mejorado, actions=['gender'], enforce_detection=False texto = f"? ({max_sim:.2f})"
)[0] confirmaciones = max(0, confirmaciones - 1)
genero = analisis['dominant_gender']
except Exception: cv2.putText(display_frame, texto,
genero = "Man" (fx, fy - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
threading.Thread( cv2.imshow("Módulo de Registro", display_frame)
target=hilo_bienvenida, key = cv2.waitKey(1) & 0xFF
args=(mejor_match, genero),
daemon=True if key == ord('q'):
).start() break
ultimo_saludo = tiempo_actual
confirmaciones = 0 elif key == ord('r'):
if faces_ultimo_frame:
else: areas = [fw * fh for (fx, fy, fw, fh, _) in faces_ultimo_frame]
color = (0, 0, 255) fx, fy, fw, fh, _ = faces_ultimo_frame[np.argmax(areas)]
texto = f"? ({max_sim:.2f})"
confirmaciones = max(0, confirmaciones - 1) m_x = int(fw * 0.30)
m_y = int(fh * 0.30)
cv2.putText(display_frame, texto, face_roi = frame[max(0, fy-m_y): min(h, fy+fh+m_y),
(fx, fy - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) max(0, fx-m_x): min(w, fx+fw+m_x)]
cv2.imshow("Módulo de Registro", display_frame) if face_roi.size > 0:
key = cv2.waitKey(1) & 0xFF nom = input("\nNombre de la persona: ").strip()
if nom:
if key == ord('q'): foto_path = os.path.join(DB_PATH, f"{nom}.jpg")
break cv2.imwrite(foto_path, face_roi)
print(f"[OK] Rostro de '{nom}' guardado. Sincronizando...")
elif key == ord('r'): base_datos = gestionar_vectores(actualizar=True)
if faces_ultimo_frame: else:
areas = [fw * fh for (fx, fy, fw, fh, _) in faces_ultimo_frame] print("[!] Registro cancelado.")
fx, fy, fw, fh, _ = faces_ultimo_frame[np.argmax(areas)] else:
print("[!] Recorte vacío. Intenta de nuevo.")
m_x = int(fw * 0.30) else:
m_y = int(fh * 0.30) print("\n[!] No se detectó rostro. Acércate más o mira a la lente.")
face_roi = frame[max(0, fy-m_y): min(h, fy+fh+m_y),
max(0, fx-m_x): min(w, fx+fw+m_x)] cap.release()
cv2.destroyAllWindows()
if face_roi.size > 0:
nom = input("\nNombre de la persona: ").strip() if __name__ == "__main__":
if nom: sistema_interactivo()
foto_path = os.path.join(DB_PATH, f"{nom}.jpg")
cv2.imwrite(foto_path, face_roi)
print(f"[OK] Rostro de '{nom}' guardado. Sincronizando...")
base_datos = gestionar_vectores(actualizar=True)
else:
print("[!] Registro cancelado.")
else:
print("[!] Recorte vacío. Intenta de nuevo.")
else:
print("\n[!] No se detectó rostro. Acércate más o mira a la lente.")
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
sistema_interactivo()

Binary file not shown.

View File

@ -1,96 +1,96 @@
absl-py==2.4.0 absl-py==2.4.0
aiohappyeyeballs==2.6.1 aiohappyeyeballs==2.6.1
aiohttp==3.13.3 aiohttp==3.13.3
aiosignal==1.4.0 aiosignal==1.4.0
astunparse==1.6.3 astunparse==1.6.3
attrs==25.4.0 attrs==25.4.0
beautifulsoup4==4.14.3 beautifulsoup4==4.14.3
blinker==1.9.0 blinker==1.9.0
certifi==2026.1.4 certifi==2026.1.4
charset-normalizer==3.4.4 charset-normalizer==3.4.4
click==8.3.1 click==8.3.1
contourpy==1.3.2 contourpy==1.3.3
cycler==0.12.1 cycler==0.12.1
deepface==0.0.98 deepface==0.0.98
edge-tts==7.2.7 edge-tts==7.2.7
filelock==3.20.0 filelock==3.20.0
fire==0.7.1 fire==0.7.1
Flask==3.1.2 Flask==3.1.2
flask-cors==6.0.2 flask-cors==6.0.2
flatbuffers==25.12.19 flatbuffers==25.12.19
fonttools==4.61.1 fonttools==4.61.1
frozenlist==1.8.0 frozenlist==1.8.0
fsspec==2025.12.0 fsspec==2025.12.0
gast==0.7.0 gast==0.7.0
gdown==5.2.1 gdown==5.2.1
google-pasta==0.2.0 google-pasta==0.2.0
grpcio==1.78.0 grpcio==1.78.0
gunicorn==25.0.3 gunicorn==25.0.3
h5py==3.15.1 h5py==3.15.1
idna==3.11 idna==3.11
itsdangerous==2.2.0 itsdangerous==2.2.0
Jinja2==3.1.6 Jinja2==3.1.6
joblib==1.5.3 joblib==1.5.3
keras==3.12.1 keras==3.13.2
kiwisolver==1.4.9 kiwisolver==1.4.9
lap==0.5.12 lap==0.5.12
libclang==18.1.1 libclang==18.1.1
lightdsa==0.0.3 lightdsa==0.0.3
lightecc==0.0.4 lightecc==0.0.4
lightphe==0.0.20 lightphe==0.0.20
lz4==4.4.5 lz4==4.4.5
Markdown==3.10.2 Markdown==3.10.2
markdown-it-py==4.0.0 markdown-it-py==4.0.0
MarkupSafe==2.1.5 MarkupSafe==2.1.5
matplotlib==3.10.8 matplotlib==3.10.8
mdurl==0.1.2 mdurl==0.1.2
ml_dtypes==0.5.4 ml_dtypes==0.5.4
mpmath==1.3.0 mpmath==1.3.0
mtcnn==1.0.0 mtcnn==1.0.0
multidict==6.7.1 multidict==6.7.1
namex==0.1.0 namex==0.1.0
networkx==3.4.2 networkx==3.6.1
numpy==1.26.4 numpy==1.26.4
onnxruntime==1.23.2 onnxruntime==1.24.2
opencv-python==4.11.0.86 opencv-python==4.11.0.86
opt_einsum==3.4.0 opt_einsum==3.4.0
optree==0.18.0 optree==0.18.0
packaging==26.0 packaging==26.0
pandas==2.3.3 pandas==3.0.0
pillow==12.0.0 pillow==12.0.0
polars==1.38.1 polars==1.38.1
polars-runtime-32==1.38.1 polars-runtime-32==1.38.1
propcache==0.4.1 propcache==0.4.1
protobuf==6.33.5 protobuf==6.33.5
psutil==7.2.2 psutil==7.2.2
Pygments==2.19.2 Pygments==2.19.2
pyparsing==3.3.2 pyparsing==3.3.2
PySocks==1.7.1 PySocks==1.7.1
python-dateutil==2.9.0.post0 python-dateutil==2.9.0.post0
python-dotenv==1.2.1 python-dotenv==1.2.1
PyYAML==6.0.3 PyYAML==6.0.3
requests==2.32.5 requests==2.32.5
retina-face==0.0.17 retina-face==0.0.17
rich==14.3.2 rich==14.3.2
scipy==1.15.3 scipy==1.17.0
six==1.17.0 six==1.17.0
soupsieve==2.8.3 soupsieve==2.8.3
sympy==1.14.0 sympy==1.14.0
tabulate==0.9.0 tabulate==0.9.0
tensorboard==2.20.0 tensorboard==2.20.0
tensorboard-data-server==0.7.2 tensorboard-data-server==0.7.2
tensorflow==2.20.0 tensorflow==2.20.0
tensorflow-io-gcs-filesystem==0.37.1 tensorflow-io-gcs-filesystem==0.37.1
termcolor==3.3.0 termcolor==3.3.0
tf_keras==2.20.1 tf_keras==2.20.1
torch==2.10.0 torch==2.10.0+cpu
torchreid==0.2.5 torchreid==0.2.5
torchvision==0.25.0 torchvision==0.25.0+cpu
tqdm==4.67.3 tqdm==4.67.3
typing_extensions==4.15.0 typing_extensions==4.15.0
ultralytics==8.4.14 ultralytics==8.4.14
ultralytics-thop==2.0.18 ultralytics-thop==2.0.18
urllib3==2.6.3 urllib3==2.6.3
Werkzeug==3.1.5 Werkzeug==3.1.5
wrapt==2.1.1 wrapt==2.1.1
yarl==1.22.0 yarl==1.22.0

Binary file not shown.

Before

Width:  |  Height:  |  Size: 357 KiB

View File

@ -1,409 +0,0 @@
"""
BOCINA INTELIGENTE - CLIENTE PYTHON
====================================
Cliente para enviar audio y comandos a la bocina ESP32 via WebSocket
Uso:
python bocina_client.py
(Luego ingresa la IP y el nombre cuando se solicite)
"""
import asyncio
import websockets
import json
import struct
import math
import sys
import time
import os
from typing import Optional
# ==================== LIMPIAR PANTALLA ====================
def limpiar_pantalla():
"""Limpia la consola según el sistema operativo"""
os.system('cls' if os.name == 'nt' else 'clear')
# ==================== CLASE BOCINA ====================
class BocinaInteligente:
"""Cliente para controlar la bocina inteligente ESP32"""
def __init__(self, ip: str, puerto: int = 81):
self.ip = ip
self.puerto = puerto
self.url = f"ws://{ip}:{puerto}"
self.websocket: Optional[websockets.WebSocketClientProtocol] = None
self.chunk_size = 1024
self.timeout = 5
self.conectado = False
async def conectar(self) -> bool:
"""Conectar al ESP32"""
try:
print(f"🔌 Conectando a {self.url}...")
self.websocket = await websockets.connect(self.url)
# Esperar mensaje de bienvenida
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
if data.get("status") == "ok":
print(f"{data.get('msg')}")
self.conectado = True
return True
except Exception as e:
print(f" ❌ Error: {e}")
return False
return False
async def desconectar(self):
"""Cerrar conexión"""
if self.websocket:
await self.websocket.close()
self.conectado = False
print("🔌 Conexión cerrada")
async def ping(self) -> bool:
"""Probar conexión con el ESP32"""
if not self.websocket:
return False
try:
await self.websocket.send(json.dumps({"cmd": "PING"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
return data.get("status") == "ok"
except:
return False
async def obtener_estado(self) -> dict:
"""Obtener estadísticas del ESP32"""
if not self.websocket:
return {}
try:
await self.websocket.send(json.dumps({"cmd": "STATUS"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
return json.loads(response)
except:
return {}
async def detener(self) -> bool:
"""Detener reproducción"""
if not self.websocket:
return False
try:
await self.websocket.send(json.dumps({"cmd": "STOP"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
return data.get("status") == "ok"
except:
return False
async def enviar_audio(self, audio_data: bytes, nombre: str = "audio") -> bool:
"""
Enviar audio al ESP32
Args:
audio_data: Datos de audio en formato PCM (16kHz, 16bits, mono)
nombre: Nombre identificador (para logs)
"""
if not self.websocket:
print("❌ No hay conexión")
return False
total_chunks = (len(audio_data) + self.chunk_size - 1) // self.chunk_size
print(f"📤 Enviando {total_chunks} chunks ({len(audio_data)} bytes) para '{nombre}'")
inicio = time.time()
for i, chunk_start in enumerate(range(0, len(audio_data), self.chunk_size)):
chunk = audio_data[chunk_start:chunk_start + self.chunk_size]
await self.websocket.send(chunk)
# Mostrar progreso cada 10 chunks o al final
if (i + 1) % 10 == 0 or i == total_chunks - 1:
porcentaje = ((i + 1) * 100) // total_chunks
print(f" 📊 Progreso: {porcentaje}% ({i+1}/{total_chunks} chunks)")
# Pequeña pausa para no saturar
await asyncio.sleep(0.005)
elapsed = time.time() - inicio
print(f"✅ Audio enviado en {elapsed:.2f} segundos")
return True
# ==================== GENERADORES DE AUDIO ====================
def generar_tono(frecuencia: int = 440, duracion_ms: int = 2000,
sample_rate: int = 16000, amplitud: int = 16000) -> bytes:
"""
Generar un tono seno en formato PCM
Args:
frecuencia: Frecuencia del tono en Hz
duracion_ms: Duración en milisegundos
sample_rate: Frecuencia de muestreo
amplitud: Amplitud máxima (0-32767)
"""
num_muestras = int(sample_rate * duracion_ms / 1000)
audio = bytearray()
for i in range(num_muestras):
valor = int(amplitud * math.sin(2 * math.pi * frecuencia * i / sample_rate))
audio.extend(struct.pack('<h', valor)) # little-endian, 16 bits
return bytes(audio)
def generar_melodia_bienvenida() -> bytes:
"""Generar melodía de bienvenida (Do-Re-Mi-Fa-Sol)"""
sample_rate = 16000
duracion_nota = 500 # ms por nota
notas = [261, 293, 329, 349, 392, 440] # Do, Re, Mi, Fa, Sol, La
audio = bytearray()
for nota in notas:
nota_audio = generar_tono(nota, duracion_nota, sample_rate, amplitud=12000)
audio.extend(nota_audio)
return bytes(audio)
def generar_saludo_personalizado(nombre: str) -> bytes:
"""
Generar un saludo personalizado (versión simple)
En un caso real, aquí usarías un servicio TTS
"""
# Usar frecuencia diferente según la longitud del nombre
frecuencia_base = 440
frecuencia = frecuencia_base + (len(nombre) * 10)
# Limitar frecuencia máxima
if frecuencia > 800:
frecuencia = 800
return generar_tono(frecuencia, duracion_ms=2000)
# ==================== MENÚ PRINCIPAL ====================
def mostrar_menu():
"""Muestra el menú principal"""
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE - CONTROL")
print("=" * 50)
print("\n📋 Opciones disponibles:")
print(" 1. 🔊 Enviar tono de prueba (440Hz)")
print(" 2. 🎵 Enviar melodía de bienvenida")
print(" 3. 💬 Enviar saludo personalizado")
print(" 4. 📊 Ver estado del ESP32")
print(" 5. 🏓 Probar ping")
print(" 6. 🔇 Detener reproducción")
print(" 7. 🔄 Reconectar")
print(" 8. 🚪 Salir")
print("-" * 50)
# ==================== MODO INTERACTIVO ====================
async def modo_interactivo():
"""Modo interactivo con entrada de IP y nombre"""
# Limpiar pantalla
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE")
print("=" * 50)
# Solicitar IP
print("\n📡 Configuración de conexión:")
ip_default = "192.168.15.128"
ip = input(f" IP del ESP32 [{ip_default}]: ").strip()
if not ip:
ip = ip_default
# Solicitar nombre por defecto para saludos
nombre_default = "Visitante"
nombre = input(f" Nombre por defecto [{nombre_default}]: ").strip()
if not nombre:
nombre = nombre_default
# Crear instancia
bocina = BocinaInteligente(ip)
# Conectar
print("\n🔄 Conectando...")
if not await bocina.conectar():
print("\n❌ No se pudo conectar al ESP32")
print(" Verifica que:")
print(f" 1. La IP {ip} sea correcta")
print(" 2. El ESP32 esté encendido")
print(" 3. Estés en la misma red WiFi")
input("\n Presiona Enter para salir...")
return
print("\n✅ ¡Conectado exitosamente!")
print(f" 📡 IP: {ip}")
print(f" 👤 Nombre: {nombre}")
# Bucle principal
while True:
mostrar_menu()
opcion = input("\n👉 Selecciona una opción (1-8): ").strip()
if opcion == "1":
print("\n🔊 Enviando tono de prueba (440Hz)...")
audio = generar_tono(440, 2000)
await bocina.enviar_audio(audio, "Tono 440Hz")
elif opcion == "2":
print("\n🎵 Enviando melodía de bienvenida...")
audio = generar_melodia_bienvenida()
await bocina.enviar_audio(audio, "Melodía")
elif opcion == "3":
# Pedir nombre específico para este saludo
nombre_saludo = input(f" 👤 Nombre (Enter para usar '{nombre}'): ").strip()
if not nombre_saludo:
nombre_saludo = nombre
print(f"\n🔊 Generando saludo para '{nombre_saludo}'...")
audio = generar_saludo_personalizado(nombre_saludo)
await bocina.enviar_audio(audio, nombre_saludo)
elif opcion == "4":
print("\n📊 Obteniendo estado del ESP32...")
estado = await bocina.obtener_estado()
if estado:
print("\n 📡 Estado del sistema:")
print(f" Status: {estado.get('status', 'desconocido')}")
print(f" 📦 Bytes recibidos: {estado.get('bytes_recibidos', 0)}")
print(f" 🔢 Chunks recibidos: {estado.get('chunks_recibidos', 0)}")
print(f" 🎵 Audio activo: {'✅ Sí' if estado.get('audio_activo') else '❌ No'}")
print(f" 📶 WiFi RSSI: {estado.get('wifi_rssi', 0)} dBm")
else:
print(" ❌ No se pudo obtener estado")
elif opcion == "5":
print("\n🏓 Probando ping...")
inicio = time.time()
if await bocina.ping():
latencia = (time.time() - inicio) * 1000
print(f" ✅ PONG recibido (latencia: {latencia:.0f}ms)")
else:
print(" ❌ Sin respuesta - verifica la conexión")
elif opcion == "6":
print("\n🔇 Deteniendo reproducción...")
if await bocina.detener():
print(" ✅ Reproducción detenida")
else:
print(" ⚠️ No se pudo detener o ya estaba detenido")
elif opcion == "7":
print("\n🔄 Reconectando...")
await bocina.desconectar()
await asyncio.sleep(1)
if await bocina.conectar():
print(" ✅ Reconectado exitosamente")
else:
print(" ❌ Error al reconectar")
elif opcion == "8":
print("\n👋 Saliendo...")
break
else:
print("❌ Opción inválida")
# Pequeña pausa antes de volver al menú
await asyncio.sleep(0.5)
# Cerrar conexión
await bocina.desconectar()
print("\n✅ Programa finalizado")
# ==================== MODO RÁPIDO ====================
async def modo_rapido():
"""Modo rápido: pide IP y nombre y envía saludo inmediato"""
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE - MODO RÁPIDO")
print("=" * 50)
# Solicitar IP
print("\n📡 Configuración:")
ip_default = "192.168.15.128"
ip = input(f" IP del ESP32 [{ip_default}]: ").strip()
if not ip:
ip = ip_default
# Solicitar nombre
nombre = input(" 👤 Nombre de la persona: ").strip()
if not nombre:
nombre = "Visitante"
# Conectar y enviar
bocina = BocinaInteligente(ip)
print("\n🔄 Conectando...")
if not await bocina.conectar():
print("❌ No se pudo conectar")
input("\nPresiona Enter para salir...")
return
print(f"\n🔊 Enviando saludo para '{nombre}'...")
audio = generar_saludo_personalizado(nombre)
await bocina.enviar_audio(audio, nombre)
# Mostrar estado
await asyncio.sleep(1)
estado = await bocina.obtener_estado()
if estado:
print(f"\n📊 Enviados: {estado.get('bytes_recibidos', 0)} bytes")
await bocina.desconectar()
print("\n✅ Saludo enviado!")
input("\nPresiona Enter para salir...")
# ==================== MAIN ====================
async def main():
"""Función principal"""
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE v1.0")
print("=" * 50)
print("\nSelecciona modo de operación:")
print(" 1. 🎮 Modo interactivo (menú completo)")
print(" 2. ⚡ Modo rápido (solo enviar saludo)")
print(" 3. 🚪 Salir")
modo = input("\n👉 Opción (1-3): ").strip()
if modo == "1":
await modo_interactivo()
elif modo == "2":
await modo_rapido()
else:
print("\n👋 Hasta luego!")
return
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n👋 Programa interrumpido")
except Exception as e:
print(f"\n❌ Error: {e}")
input("\nPresiona Enter para salir...")

File diff suppressed because it is too large Load Diff