Compare commits

..

No commits in common. "master" and "main" have entirely different histories.
master ... main

27 changed files with 2209 additions and 3583 deletions

352
.gitignore vendored
View File

@ -1,178 +1,174 @@
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
ven2/
.venv/
__pycache__/
*.pkl
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# ────────────────────────────────────────────────────────
# ENTORNO VIRTUAL DEL PROYECTO
# ────────────────────────────────────────────────────────
ia_env/
# ────────────────────────────────────────────────────────
# MODELOS DE IA (Límite de GitHub: 100 MB)
# ────────────────────────────────────────────────────────
*.pt
*.onnx
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# ────────────────────────────────────────────────────────
# ENTORNO VIRTUAL DEL PROYECTO
# ────────────────────────────────────────────────────────
ia_env/
# ────────────────────────────────────────────────────────
# MODELOS DE IA (Límite de GitHub: 100 MB)
# ────────────────────────────────────────────────────────
*.pt
*.onnx

122
README.md
View File

@ -1,62 +1,62 @@
# Sistema de Identificación y Seguimiento Inteligente
Este repositorio contiene la arquitectura modular para el seguimiento de personas en múltiples cámaras (Re-ID) y reconocimiento facial asíncrono.
## Arquitectura del Proyecto
El sistema está dividido en tres módulos principales para garantizar la separación de responsabilidades:
* `seguimiento2.py`: Motor matemático de Tracking (Kalman + YOLO) y Re-Identificación (OSNet).
* `reconocimiento2.py`: Motor de biometría facial (YuNet + ArcFace) y síntesis de audio (Edge-TTS).
* `main_fusion.py`: Orquestador principal que fusiona ambos motores mediante procesamiento multihilo.
## Requisitos Previos
1. **Python 3.8 - 3.11** instalado en el sistema.
2. **Reproductor MPV** instalado y agregado al PATH del sistema (requerido para el motor de audio sin bloqueos).
* *Windows:* Descargar de la página oficial o usar `scoop install mpv`.
* *Linux:* `sudo apt install mpv`
* *Mac:* `brew install mpv`
## Guía de Instalación Rápida
**1. Clonar el repositorio**
Abre tu terminal y clona este proyecto:
```bash
git clone <URL_DE_TU_REPOSITORIO_GITEA>
cd IdentificacionIA´´´
**2. Crear un Entorno Virtual (¡Importante!)
Para evitar conflictos de librerías, crea un entorno virtual limpio dentro de la carpeta del proyecto:
python -m venv venv
3. Activar el Entorno Virtual
En Windows:
.\venv\Scripts\activate
En Mac/Linux:
source venv/bin/activate
(Sabrás que está activo si ves un (venv) al inicio de tu línea de comandos).
4. Instalar Dependencias
Con el entorno activado, instala todas las librerías necesarias:
pip install -r requirements.txt
## Archivos y Carpetas Necesarias
yolov8n.pt (Detector de personas)
osnet_x0_25_msmt17.onnx (Extractor de características de ropa)
face_detection_yunet_2023mar.onnx (Detector facial rápido)
Además, debes tener la carpeta db_institucion con las fotografías de los rostros a reconocer.
## Ejecución
Para arrancar el sistema completo con interfaz gráfica y audio, ejecuta:
# Sistema de Identificación y Seguimiento Inteligente
Este repositorio contiene la arquitectura modular para el seguimiento de personas en múltiples cámaras (Re-ID) y reconocimiento facial asíncrono.
## Arquitectura del Proyecto
El sistema está dividido en tres módulos principales para garantizar la separación de responsabilidades:
* `seguimiento2.py`: Motor matemático de Tracking (Kalman + YOLO) y Re-Identificación (OSNet).
* `reconocimiento2.py`: Motor de biometría facial (YuNet + ArcFace) y síntesis de audio (Edge-TTS).
* `main_fusion.py`: Orquestador principal que fusiona ambos motores mediante procesamiento multihilo.
## Requisitos Previos
1. **Python 3.8 - 3.11** instalado en el sistema.
2. **Reproductor MPV** instalado y agregado al PATH del sistema (requerido para el motor de audio sin bloqueos).
* *Windows:* Descargar de la página oficial o usar `scoop install mpv`.
* *Linux:* `sudo apt install mpv`
* *Mac:* `brew install mpv`
## Guía de Instalación Rápida
**1. Clonar el repositorio**
Abre tu terminal y clona este proyecto:
```bash
git clone <URL_DE_TU_REPOSITORIO_GITEA>
cd IdentificacionIA´´´
**2. Crear un Entorno Virtual (¡Importante!)
Para evitar conflictos de librerías, crea un entorno virtual limpio dentro de la carpeta del proyecto:
python -m venv venv
3. Activar el Entorno Virtual
En Windows:
.\venv\Scripts\activate
En Mac/Linux:
source venv/bin/activate
(Sabrás que está activo si ves un (venv) al inicio de tu línea de comandos).
4. Instalar Dependencias
Con el entorno activado, instala todas las librerías necesarias:
pip install -r requirements.txt
## Archivos y Carpetas Necesarias
yolov8n.pt (Detector de personas)
osnet_x0_25_msmt17.onnx (Extractor de características de ropa)
face_detection_yunet_2023mar.onnx (Detector facial rápido)
Además, debes tener la carpeta db_institucion con las fotografías de los rostros a reconocer.
## Ejecución
Para arrancar el sistema completo con interfaz gráfica y audio, ejecuta:
python main_fusion.py

Binary file not shown.

BIN
bus.jpg

Binary file not shown.

Before

Width:  |  Height:  |  Size: 134 KiB

View File

@ -1 +1 @@
{"Emanuel Flores": "Man", "Vikicar Aldana": "Woman", "Rodrigo Cahuantzi C": "Man", "Cristian Hernandez Suarez": "Man", "Omar": "Man", "Oscar Atriano Ponce_1": "Man", "Miguel Angel": "Man", "Carlos Eduardo Cuamatzi": "Man", "Rosa maria": "Woman", "Ximena": "Woman", "Ana Karen Guerrero": "Woman", "Yuriel": "Man", "Diana Laura Tecpa": "Woman", "aridai montiel zistecatl": "Woman", "Aridai montiel": "Woman", "Vikicar": "Woman", "Ian Axel": "Man", "Rafael": "Man", "Rubisela Barrientos": "Woman", "ian axel": "Man", "Adriana Lopez": "Woman", "Oscar Atriano Ponce": "Man", "Xayli Ximena": "Woman", "Victor Manuel Ocampo Mendez": "Man", "Victor": "Man"}
{"Emanuel Flores": "Man", "Vikicar Aldana": "Woman", "Rodrigo Cahuantzi C": "Man", "Cristian Hernandez Suarez": "Man", "Omar": "Man", "Oscar Atriano Ponce_1": "Man", "Miguel Angel": "Man", "Carlos Eduardo Cuamatzi": "Man", "Rosa maria": "Woman", "Ximena": "Woman", "Ana Karen Guerrero": "Woman", "Yuriel": "Man", "Diana Laura": "Woman", "Diana Laura Tecpa": "Woman", "aridai montiel zistecatl": "Woman", "Aridai montiel": "Woman", "Vikicar": "Woman", "Ian Axel": "Man", "Rafael": "Man", "Rubisela Barrientos": "Woman", "ian axel": "Man", "Adriana Lopez": "Woman", "Oscar Atriano Ponce": "Man", "Xayli Ximena": "Woman", "Victor Manuel Ocampo Mendez": "Man", "Victor": "Man"}

Binary file not shown.

View File

@ -1,26 +0,0 @@
# Base estable
pip install numpy==1.26.4
# OpenCV compatible con numpy 1.x
pip install opencv-python==4.8.1.78
pip install torch==2.1.2 torchvision==0.16.2 torchaudio==2.1.2 --index-url https://download.pytorch.org/whl/cu118
pip install ultralytics --no-deps
pip install opencv-python==4.8.1.78 matplotlib pyyaml scipy requests pillow
pip install tensorflow==2.21
pip install tf-keras
pip install deepface
pip install onnxruntime
pip install edge-tts
pip install numpy pandas
sudo apt install libxcb-xinerama0
sudo apt install fonts-dejavu
QT_DEBUG_PLUGINS=0 python fusion.py
pip cache purge
python -c "import torch; print(torch.cuda.is_available())"
python -c "import torch; print(torch.cuda.get_device_name(0))"

View File

@ -1,13 +0,0 @@
[ESP32]
ip = 192.168.15.128
puerto = 81
[Audio]
duracion_ms = 2000
tono_base = 440
amplitud = 16000
[General]
timeout = 5
reconectar = true

View File

@ -1,78 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script para configurar la bocina ESP32
Permite cambiar IP, puerto, duración, etc.
"""
import sys
import os
# Agregar el proyecto al path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from core.speaker_iot import configurar_ip, mostrar_configuracion
from core.speaker_iot.config import config
def main():
print("\n" + "=" * 50)
print(" 🎵 CONFIGURACIÓN DE BOCINA IoT")
print("=" * 50)
mostrar_configuracion()
print("\n" + "-" * 50)
print("¿Qué deseas configurar?")
print(" 1. Cambiar IP")
print(" 2. Cambiar puerto")
print(" 3. Cambiar duración del audio")
print(" 4. Ver configuración actual")
print(" 5. Restaurar valores por defecto")
print(" 6. Salir")
opcion = input("\n👉 Opción (1-6): ").strip()
if opcion == "1":
nueva_ip = input("📡 Nueva IP: ").strip()
if nueva_ip:
config.actualizar_ip(nueva_ip)
print(f"✅ IP actualizada a: {nueva_ip}")
elif opcion == "2":
nuevo_puerto = input("🔌 Nuevo puerto [81]: ").strip()
if nuevo_puerto:
config.config.set("ESP32", "puerto", nuevo_puerto)
with open(config.CONFIG_FILE, 'w') as f:
config.config.write(f)
print(f"✅ Puerto actualizado a: {nuevo_puerto}")
elif opcion == "3":
nueva_duracion = input("⏱️ Nueva duración en ms [2000]: ").strip()
if nueva_duracion:
config.config.set("Audio", "duracion_ms", nueva_duracion)
with open(config.CONFIG_FILE, 'w') as f:
config.config.write(f)
print(f"✅ Duración actualizada a: {nueva_duracion}ms")
elif opcion == "4":
mostrar_configuracion()
elif opcion == "5":
confirmar = input("⚠️ ¿Restaurar configuración por defecto? (s/n): ").strip().lower()
if confirmar == 's':
config._crear_configuracion_default()
print("✅ Configuración restaurada")
mostrar_configuracion()
elif opcion == "6":
print("\n👋 Hasta luego!")
return
else:
print("❌ Opción inválida")
print("\n✅ Configuración guardada!")
input("\nPresiona Enter para salir...")
if __name__ == "__main__":
main()

View File

@ -1,21 +0,0 @@
"""
Speaker IoT - Módulo para controlar bocina ESP32
"""
from .bocina_core import (
BocinaCore,
saludar,
detener,
obtener_estado,
configurar_ip,
mostrar_configuracion
)
__all__ = [
'BocinaCore',
'saludar',
'detener',
'obtener_estado',
'configurar_ip',
'mostrar_configuracion'
]

View File

@ -1,224 +0,0 @@
"""
BOCINA CORE - Módulo para controlar la bocina ESP32
"""
import asyncio
import websockets
import json
import struct
import math
from typing import Optional, Dict, Any
from .config import config
# ==================== CONSTANTES ====================
CHUNK_SIZE = 1024
SAMPLE_RATE = 16000
# ==================== CLASE PRINCIPAL ====================
class BocinaCore:
"""Clase principal para controlar la bocina ESP32"""
def __init__(self, ip: str = None, puerto: int = None):
"""
Inicializa el controlador de la bocina
Args:
ip: IP del ESP32 (si es None, usa la del archivo de configuración)
puerto: Puerto WebSocket (si es None, usa el del archivo)
"""
self.ip = ip or config.obtener_ip()
self.puerto = puerto or config.obtener_puerto()
self.url = f"ws://{self.ip}:{self.puerto}"
self.duracion_ms = config.obtener_duracion()
self.tono_base = config.obtener_tono_base()
self.amplitud = config.obtener_amplitud()
self.timeout = config.obtener_timeout()
self._websocket = None
self._conectado = False
# ==================== MÉTODOS PÚBLICOS ====================
def saludar(self, nombre: str, duracion_ms: int = None, tono_personalizado: bool = True) -> bool:
"""
Envía un saludo a la bocina
Args:
nombre: Nombre de la persona
duracion_ms: Duración del saludo (None = usa config)
tono_personalizado: Si True, varía el tono según el nombre
"""
duracion = duracion_ms or self.duracion_ms
return self._ejecutar_async(self._saludar_async(nombre, duracion, tono_personalizado))
def detener(self) -> bool:
"""Detiene la reproducción actual"""
return self._ejecutar_async(self._detener_async())
def estado(self) -> Dict[str, Any]:
"""Obtiene el estado actual de la bocina"""
return self._ejecutar_async(self._estado_async())
def ping(self) -> bool:
"""Prueba la conexión con la bocina"""
return self._ejecutar_async(self._ping_async())
def conectar(self) -> bool:
"""Establece conexión manual con la bocina"""
return self._ejecutar_async(self._conectar_async())
def desconectar(self) -> bool:
"""Cierra la conexión con la bocina"""
return self._ejecutar_async(self._desconectar_async())
# ==================== MÉTODOS INTERNOS ====================
def _ejecutar_async(self, corutina):
"""Ejecuta una función asíncrona desde código síncrono"""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
resultado = loop.run_until_complete(corutina)
loop.close()
return resultado
else:
return loop.run_until_complete(corutina)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
resultado = loop.run_until_complete(corutina)
loop.close()
return resultado
async def _conectar_async(self) -> bool:
"""Conexión asíncrona"""
try:
self._websocket = await websockets.connect(self.url, timeout=self.timeout)
await self._websocket.recv()
self._conectado = True
return True
except Exception:
self._conectado = False
return False
async def _desconectar_async(self) -> bool:
"""Desconexión asíncrona"""
if self._websocket:
await self._websocket.close()
self._websocket = None
self._conectado = False
return True
async def _asegurar_conexion(self) -> bool:
"""Asegura que haya una conexión activa"""
if not self._conectado or not self._websocket:
return await self._conectar_async()
return True
async def _saludar_async(self, nombre: str, duracion_ms: int, tono_personalizado: bool) -> bool:
"""Enviar saludo asíncrono"""
if not await self._asegurar_conexion():
return False
try:
audio = self._generar_audio(nombre, duracion_ms, tono_personalizado)
for i in range(0, len(audio), CHUNK_SIZE):
chunk = audio[i:i + CHUNK_SIZE]
await self._websocket.send(chunk)
await asyncio.sleep(0.005)
return True
except Exception:
return False
async def _detener_async(self) -> bool:
"""Detener reproducción asíncrono"""
if not await self._asegurar_conexion():
return False
try:
await self._websocket.send(json.dumps({"cmd": "STOP"}))
await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout)
return True
except Exception:
return False
async def _estado_async(self) -> Dict[str, Any]:
"""Obtener estado asíncrono"""
if not await self._asegurar_conexion():
return {}
try:
await self._websocket.send(json.dumps({"cmd": "STATUS"}))
respuesta = await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout)
return json.loads(respuesta)
except Exception:
return {}
async def _ping_async(self) -> bool:
"""Ping asíncrono"""
if not await self._asegurar_conexion():
return False
try:
await self._websocket.send(json.dumps({"cmd": "PING"}))
respuesta = await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout)
data = json.loads(respuesta)
return data.get("status") == "ok"
except Exception:
return False
def _generar_audio(self, nombre: str, duracion_ms: int, tono_personalizado: bool) -> bytes:
"""Genera audio PCM para el saludo"""
num_muestras = int(SAMPLE_RATE * duracion_ms / 1000)
if tono_personalizado:
frecuencia = self.tono_base + (len(nombre) * 10)
if frecuencia > 800:
frecuencia = 800
else:
frecuencia = self.tono_base
audio = bytearray()
for i in range(num_muestras):
valor = int(self.amplitud * math.sin(2 * math.pi * frecuencia * i / SAMPLE_RATE))
audio.extend(struct.pack('<h', valor))
return bytes(audio)
# ==================== FUNCIONES SIMPLES (API RÁPIDA) ====================
def saludar(nombre: str, ip: str = None) -> bool:
"""Función rápida para saludar a una persona"""
bocina = BocinaCore(ip)
return bocina.saludar(nombre)
def detener(ip: str = None) -> bool:
"""Detiene la reproducción"""
bocina = BocinaCore(ip)
return bocina.detener()
def obtener_estado(ip: str = None) -> dict:
"""Obtiene el estado de la bocina"""
bocina = BocinaCore(ip)
return bocina.estado()
def configurar_ip(nueva_ip: str):
"""Actualiza la IP en el archivo de configuración"""
from .config import config
config.actualizar_ip(nueva_ip)
def mostrar_configuracion():
"""Muestra la configuración actual"""
from .config import config
config.mostrar_configuracion()

View File

@ -1,108 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Módulo de configuración para la bocina ESP32
"""
import os
import configparser
from pathlib import Path
# ==================== RUTAS ====================
BASE_DIR = Path(__file__).resolve().parent.parent.parent
CONFIG_DIR = BASE_DIR / "config" / "speaker_iot"
CONFIG_FILE = CONFIG_DIR / "settings.ini"
# ==================== CONFIGURACIÓN POR DEFECTO ====================
DEFAULT_CONFIG = {
"ESP32": {
"ip": "192.168.15.128",
"puerto": "81"
},
"Audio": {
"duracion_ms": "2000",
"tono_base": "440",
"amplitud": "16000"
},
"General": {
"timeout": "5",
"reconectar": "true"
}
}
class ConfiguracionBocina:
"""Gestor de configuración para la bocina"""
def __init__(self):
self.config = configparser.ConfigParser()
self._cargar_configuracion()
def _cargar_configuracion(self):
"""Carga la configuración desde el archivo o crea uno por defecto"""
if CONFIG_FILE.exists():
self.config.read(CONFIG_FILE, encoding='utf-8')
else:
self._crear_configuracion_default()
def _crear_configuracion_default(self):
"""Crea archivo de configuración por defecto"""
# Crear directorio si no existe
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
# Cargar valores por defecto
for seccion, valores in DEFAULT_CONFIG.items():
self.config[seccion] = valores
# Guardar archivo
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
self.config.write(f)
print(f"📝 Configuración creada en: {CONFIG_FILE}")
def obtener_ip(self) -> str:
"""Obtiene la IP de la bocina"""
return self.config.get("ESP32", "ip", fallback="192.168.15.128")
def obtener_puerto(self) -> int:
"""Obtiene el puerto de la bocina"""
return self.config.getint("ESP32", "puerto", fallback=81)
def obtener_duracion(self) -> int:
"""Obtiene duración del audio en ms"""
return self.config.getint("Audio", "duracion_ms", fallback=2000)
def obtener_tono_base(self) -> int:
"""Obtiene frecuencia base del tono"""
return self.config.getint("Audio", "tono_base", fallback=440)
def obtener_amplitud(self) -> int:
"""Obtiene amplitud del audio"""
return self.config.getint("Audio", "amplitud", fallback=16000)
def obtener_timeout(self) -> int:
"""Obtiene timeout en segundos"""
return self.config.getint("General", "timeout", fallback=5)
def reconectar_auto(self) -> bool:
"""Obtiene si debe reconectar automáticamente"""
return self.config.getboolean("General", "reconectar", fallback=True)
def actualizar_ip(self, nueva_ip: str):
"""Actualiza la IP de la bocina"""
self.config.set("ESP32", "ip", nueva_ip)
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
self.config.write(f)
print(f"✅ IP actualizada a: {nueva_ip}")
def mostrar_configuracion(self):
"""Muestra la configuración actual"""
print("\n📡 Configuración actual:")
print(f" IP: {self.obtener_ip()}:{self.obtener_puerto()}")
print(f" Duración: {self.obtener_duracion()}ms")
print(f" Tono base: {self.obtener_tono_base()}Hz")
print(f" Timeout: {self.obtener_timeout()}s")
# Instancia global
config = ConfiguracionBocina()

View File

@ -1,60 +0,0 @@
# Solo importas lo que necesitas
from core.speaker_iot import saludar, detener, obtener_estado
# ===== EJEMPLO 1: Cuando detectas una persona =====
def mi_detector():
nombre = "Ana" # Tu IA obtiene el nombre
# Enviar saludo (¡una sola línea!)
saludar(nombre)
# También puedes verificar si funcionó
if saludar(nombre):
print(f"✅ Saludo enviado a {nombre}")
else:
print(f"❌ Error al enviar saludo a {nombre}")
# ===== EJEMPLO 2: Dentro de tu loop principal =====
while True:
persona = detectar_persona() # Tu función de detección
if persona:
nombre = obtener_nombre(persona) # Tu base de datos
saludar(nombre) # Envía el saludo
# ===== EJEMPLO 3: Clase completa =====
class MiSistemaIA:
def __init__(self):
self.bocina_ip = "192.168.15.128" # O usa la del config
def on_persona_detectada(self, persona):
nombre = self.obtener_nombre(persona)
if nombre:
print(f"🎉 Detectada: {nombre}")
saludar(nombre) # ¡Así de simple!
def obtener_nombre(self, persona):
# Tu lógica para obtener nombre
return persona.get("nombre", "Visitante")
# ===== Ejemplo completo de integración =====
class SistemaSeguridad:
def __init__(self):
self.personas_conocidas = ["Ana", "Carlos", "Maria"]
print("✅ Sistema iniciado - Bocina lista")
def detectar(self, nombre):
if nombre in self.personas_conocidas:
print(f"🔔 ¡Bienvenido {nombre}!")
saludar(nombre) # Envía saludo
return True
else:
print(f"⚠️ Persona no registrada: {nombre}")
return False
# Uso
sistema = SistemaSeguridad()
sistema.detectar("Ana") # Reproduce sonido
sistema.detectar("Luis") # No reproduce

View File

@ -1,409 +0,0 @@
"""
BOCINA INTELIGENTE - CLIENTE PYTHON
====================================
Cliente para enviar audio y comandos a la bocina ESP32 via WebSocket
Uso:
python bocina_client.py
(Luego ingresa la IP y el nombre cuando se solicite)
"""
import asyncio
import websockets
import json
import struct
import math
import sys
import time
import os
from typing import Optional
# ==================== LIMPIAR PANTALLA ====================
def limpiar_pantalla():
"""Limpia la consola según el sistema operativo"""
os.system('cls' if os.name == 'nt' else 'clear')
# ==================== CLASE BOCINA ====================
class BocinaInteligente:
"""Cliente para controlar la bocina inteligente ESP32"""
def __init__(self, ip: str, puerto: int = 81):
self.ip = ip
self.puerto = puerto
self.url = f"ws://{ip}:{puerto}"
self.websocket: Optional[websockets.WebSocketClientProtocol] = None
self.chunk_size = 1024
self.timeout = 5
self.conectado = False
async def conectar(self) -> bool:
"""Conectar al ESP32"""
try:
print(f"🔌 Conectando a {self.url}...")
self.websocket = await websockets.connect(self.url)
# Esperar mensaje de bienvenida
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
if data.get("status") == "ok":
print(f"{data.get('msg')}")
self.conectado = True
return True
except Exception as e:
print(f" ❌ Error: {e}")
return False
return False
async def desconectar(self):
"""Cerrar conexión"""
if self.websocket:
await self.websocket.close()
self.conectado = False
print("🔌 Conexión cerrada")
async def ping(self) -> bool:
"""Probar conexión con el ESP32"""
if not self.websocket:
return False
try:
await self.websocket.send(json.dumps({"cmd": "PING"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
return data.get("status") == "ok"
except:
return False
async def obtener_estado(self) -> dict:
"""Obtener estadísticas del ESP32"""
if not self.websocket:
return {}
try:
await self.websocket.send(json.dumps({"cmd": "STATUS"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
return json.loads(response)
except:
return {}
async def detener(self) -> bool:
"""Detener reproducción"""
if not self.websocket:
return False
try:
await self.websocket.send(json.dumps({"cmd": "STOP"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
return data.get("status") == "ok"
except:
return False
async def enviar_audio(self, audio_data: bytes, nombre: str = "audio") -> bool:
"""
Enviar audio al ESP32
Args:
audio_data: Datos de audio en formato PCM (16kHz, 16bits, mono)
nombre: Nombre identificador (para logs)
"""
if not self.websocket:
print("❌ No hay conexión")
return False
total_chunks = (len(audio_data) + self.chunk_size - 1) // self.chunk_size
print(f"📤 Enviando {total_chunks} chunks ({len(audio_data)} bytes) para '{nombre}'")
inicio = time.time()
for i, chunk_start in enumerate(range(0, len(audio_data), self.chunk_size)):
chunk = audio_data[chunk_start:chunk_start + self.chunk_size]
await self.websocket.send(chunk)
# Mostrar progreso cada 10 chunks o al final
if (i + 1) % 10 == 0 or i == total_chunks - 1:
porcentaje = ((i + 1) * 100) // total_chunks
print(f" 📊 Progreso: {porcentaje}% ({i+1}/{total_chunks} chunks)")
# Pequeña pausa para no saturar
await asyncio.sleep(0.005)
elapsed = time.time() - inicio
print(f"✅ Audio enviado en {elapsed:.2f} segundos")
return True
# ==================== GENERADORES DE AUDIO ====================
def generar_tono(frecuencia: int = 440, duracion_ms: int = 2000,
sample_rate: int = 16000, amplitud: int = 16000) -> bytes:
"""
Generar un tono seno en formato PCM
Args:
frecuencia: Frecuencia del tono en Hz
duracion_ms: Duración en milisegundos
sample_rate: Frecuencia de muestreo
amplitud: Amplitud máxima (0-32767)
"""
num_muestras = int(sample_rate * duracion_ms / 1000)
audio = bytearray()
for i in range(num_muestras):
valor = int(amplitud * math.sin(2 * math.pi * frecuencia * i / sample_rate))
audio.extend(struct.pack('<h', valor)) # little-endian, 16 bits
return bytes(audio)
def generar_melodia_bienvenida() -> bytes:
"""Generar melodía de bienvenida (Do-Re-Mi-Fa-Sol)"""
sample_rate = 16000
duracion_nota = 500 # ms por nota
notas = [261, 293, 329, 349, 392, 440] # Do, Re, Mi, Fa, Sol, La
audio = bytearray()
for nota in notas:
nota_audio = generar_tono(nota, duracion_nota, sample_rate, amplitud=12000)
audio.extend(nota_audio)
return bytes(audio)
def generar_saludo_personalizado(nombre: str) -> bytes:
"""
Generar un saludo personalizado (versión simple)
En un caso real, aquí usarías un servicio TTS
"""
# Usar frecuencia diferente según la longitud del nombre
frecuencia_base = 440
frecuencia = frecuencia_base + (len(nombre) * 10)
# Limitar frecuencia máxima
if frecuencia > 800:
frecuencia = 800
return generar_tono(frecuencia, duracion_ms=2000)
# ==================== MENÚ PRINCIPAL ====================
def mostrar_menu():
"""Muestra el menú principal"""
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE - CONTROL")
print("=" * 50)
print("\n📋 Opciones disponibles:")
print(" 1. 🔊 Enviar tono de prueba (440Hz)")
print(" 2. 🎵 Enviar melodía de bienvenida")
print(" 3. 💬 Enviar saludo personalizado")
print(" 4. 📊 Ver estado del ESP32")
print(" 5. 🏓 Probar ping")
print(" 6. 🔇 Detener reproducción")
print(" 7. 🔄 Reconectar")
print(" 8. 🚪 Salir")
print("-" * 50)
# ==================== MODO INTERACTIVO ====================
async def modo_interactivo():
"""Modo interactivo con entrada de IP y nombre"""
# Limpiar pantalla
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE")
print("=" * 50)
# Solicitar IP
print("\n📡 Configuración de conexión:")
ip_default = "192.168.15.128"
ip = input(f" IP del ESP32 [{ip_default}]: ").strip()
if not ip:
ip = ip_default
# Solicitar nombre por defecto para saludos
nombre_default = "Visitante"
nombre = input(f" Nombre por defecto [{nombre_default}]: ").strip()
if not nombre:
nombre = nombre_default
# Crear instancia
bocina = BocinaInteligente(ip)
# Conectar
print("\n🔄 Conectando...")
if not await bocina.conectar():
print("\n❌ No se pudo conectar al ESP32")
print(" Verifica que:")
print(f" 1. La IP {ip} sea correcta")
print(" 2. El ESP32 esté encendido")
print(" 3. Estés en la misma red WiFi")
input("\n Presiona Enter para salir...")
return
print("\n✅ ¡Conectado exitosamente!")
print(f" 📡 IP: {ip}")
print(f" 👤 Nombre: {nombre}")
# Bucle principal
while True:
mostrar_menu()
opcion = input("\n👉 Selecciona una opción (1-8): ").strip()
if opcion == "1":
print("\n🔊 Enviando tono de prueba (440Hz)...")
audio = generar_tono(440, 2000)
await bocina.enviar_audio(audio, "Tono 440Hz")
elif opcion == "2":
print("\n🎵 Enviando melodía de bienvenida...")
audio = generar_melodia_bienvenida()
await bocina.enviar_audio(audio, "Melodía")
elif opcion == "3":
# Pedir nombre específico para este saludo
nombre_saludo = input(f" 👤 Nombre (Enter para usar '{nombre}'): ").strip()
if not nombre_saludo:
nombre_saludo = nombre
print(f"\n🔊 Generando saludo para '{nombre_saludo}'...")
audio = generar_saludo_personalizado(nombre_saludo)
await bocina.enviar_audio(audio, nombre_saludo)
elif opcion == "4":
print("\n📊 Obteniendo estado del ESP32...")
estado = await bocina.obtener_estado()
if estado:
print("\n 📡 Estado del sistema:")
print(f" Status: {estado.get('status', 'desconocido')}")
print(f" 📦 Bytes recibidos: {estado.get('bytes_recibidos', 0)}")
print(f" 🔢 Chunks recibidos: {estado.get('chunks_recibidos', 0)}")
print(f" 🎵 Audio activo: {'✅ Sí' if estado.get('audio_activo') else '❌ No'}")
print(f" 📶 WiFi RSSI: {estado.get('wifi_rssi', 0)} dBm")
else:
print(" ❌ No se pudo obtener estado")
elif opcion == "5":
print("\n🏓 Probando ping...")
inicio = time.time()
if await bocina.ping():
latencia = (time.time() - inicio) * 1000
print(f" ✅ PONG recibido (latencia: {latencia:.0f}ms)")
else:
print(" ❌ Sin respuesta - verifica la conexión")
elif opcion == "6":
print("\n🔇 Deteniendo reproducción...")
if await bocina.detener():
print(" ✅ Reproducción detenida")
else:
print(" ⚠️ No se pudo detener o ya estaba detenido")
elif opcion == "7":
print("\n🔄 Reconectando...")
await bocina.desconectar()
await asyncio.sleep(1)
if await bocina.conectar():
print(" ✅ Reconectado exitosamente")
else:
print(" ❌ Error al reconectar")
elif opcion == "8":
print("\n👋 Saliendo...")
break
else:
print("❌ Opción inválida")
# Pequeña pausa antes de volver al menú
await asyncio.sleep(0.5)
# Cerrar conexión
await bocina.desconectar()
print("\n✅ Programa finalizado")
# ==================== MODO RÁPIDO ====================
async def modo_rapido():
"""Modo rápido: pide IP y nombre y envía saludo inmediato"""
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE - MODO RÁPIDO")
print("=" * 50)
# Solicitar IP
print("\n📡 Configuración:")
ip_default = "192.168.15.128"
ip = input(f" IP del ESP32 [{ip_default}]: ").strip()
if not ip:
ip = ip_default
# Solicitar nombre
nombre = input(" 👤 Nombre de la persona: ").strip()
if not nombre:
nombre = "Visitante"
# Conectar y enviar
bocina = BocinaInteligente(ip)
print("\n🔄 Conectando...")
if not await bocina.conectar():
print("❌ No se pudo conectar")
input("\nPresiona Enter para salir...")
return
print(f"\n🔊 Enviando saludo para '{nombre}'...")
audio = generar_saludo_personalizado(nombre)
await bocina.enviar_audio(audio, nombre)
# Mostrar estado
await asyncio.sleep(1)
estado = await bocina.obtener_estado()
if estado:
print(f"\n📊 Enviados: {estado.get('bytes_recibidos', 0)} bytes")
await bocina.desconectar()
print("\n✅ Saludo enviado!")
input("\nPresiona Enter para salir...")
# ==================== MAIN ====================
async def main():
"""Función principal"""
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE v1.0")
print("=" * 50)
print("\nSelecciona modo de operación:")
print(" 1. 🎮 Modo interactivo (menú completo)")
print(" 2. ⚡ Modo rápido (solo enviar saludo)")
print(" 3. 🚪 Salir")
modo = input("\n👉 Opción (1-3): ").strip()
if modo == "1":
await modo_interactivo()
elif modo == "2":
await modo_rapido()
else:
print("\n👋 Hasta luego!")
return
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n👋 Programa interrumpido")
except Exception as e:
print(f"\n❌ Error: {e}")
input("\nPresiona Enter para salir...")

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.7 KiB

BIN
db_institucion/Ian Axel.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.0 KiB

View File

@ -224,7 +224,7 @@ def main():
threading.Thread(target=worker_rostros, args=(global_mem,), daemon=True).start()
cv2.namedWindow("SmartSoft", cv2.WINDOW_NORMAL)
cv2.namedWindow("SmartSoft", cv2.WINDOW_AUTOSIZE)
idx = 0
while True:

View File

@ -10,12 +10,6 @@ from queue import Queue
from deepface import DeepFace
from ultralytics import YOLO
import warnings
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Usando dispositivo: {device}")
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Usando dispositivo: {device}")
warnings.filterwarnings("ignore")
@ -301,7 +295,7 @@ def dibujar_track_fusion(frame_show, trk, global_mem):
def main():
print("\nIniciando Sistema")
model = YOLO("yolov8n.pt").to("cuda")
model = YOLO("yolov8n.pt")
global_mem = GlobalMemory()
managers = {str(c): CamManager(c, global_mem) for c in SECUENCIA}
cams = [CamStream(u) for u in URLS]
@ -309,7 +303,7 @@ def main():
for _ in range(2):
threading.Thread(target=worker_rostros, args=(global_mem,), daemon=True).start()
cv2.namedWindow("SmartSoft", cv2.WINDOW_NORMAL)
cv2.namedWindow("SmartSoft", cv2.WINDOW_AUTOSIZE)
idx = 0
while True:

View File

@ -1,110 +1,110 @@
import cv2
import os
import json
import numpy as np
# ⚡ Importamos tus motores exactos para no romper la simetría
from reconocimiento2 import detectar_rostros_yunet, gestionar_vectores
def registrar_desde_webcam():
print("\n" + "="*50)
print("📸 MÓDULO DE REGISTRO LIMPIO (WEBCAM LOCAL)")
print("Alinea tu rostro, mira a la cámara con buena luz.")
print("Presiona [R] para capturar | [Q] para salir")
print("="*50 + "\n")
DB_PATH = "db_institucion"
CACHE_PATH = "cache_nombres"
os.makedirs(DB_PATH, exist_ok=True)
os.makedirs(CACHE_PATH, exist_ok=True)
# 0 es la cámara por defecto de tu laptop
cap = cv2.VideoCapture(0)
if not cap.isOpened():
print("[!] Error: No se pudo abrir la webcam local.")
return
while True:
ret, frame = cap.read()
if not ret: continue
# Espejamos la imagen para que actúe como un espejo natural
frame = cv2.flip(frame, 1)
display_frame = frame.copy()
# Usamos YuNet para garantizar que estamos capturando una cara válida
faces = detectar_rostros_yunet(frame)
mejor_rostro = None
max_area = 0
for (fx, fy, fw, fh, score) in faces:
area = fw * fh
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (0, 255, 0), 2)
if area > max_area:
max_area = area
h_frame, w_frame = frame.shape[:2]
# Mismo margen del 30% que requiere MTCNN para alinear correctamente
m_x, m_y = int(fw * 0.30), int(fh * 0.30)
y1 = max(0, fy - m_y)
y2 = min(h_frame, fy + fh + m_y)
x1 = max(0, fx - m_x)
x2 = min(w_frame, fx + fw + m_x)
mejor_rostro = frame[y1:y2, x1:x2]
cv2.putText(display_frame, "Alineate y presiona [R]", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.imshow("Registro Webcam Local", display_frame)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('r'):
if mejor_rostro is not None and mejor_rostro.size > 0:
cv2.imshow("Captura Congelada", mejor_rostro)
cv2.waitKey(1)
print("\n--- NUEVO REGISTRO ---")
nom = input("Escribe el nombre exacto de la persona: ").strip()
if nom:
gen_input = input("¿Es Hombre (h) o Mujer (m)?: ").strip().lower()
genero_guardado = "Woman" if gen_input == 'm' else "Man"
# 1. Guardamos la foto pura
foto_path = os.path.join(DB_PATH, f"{nom}.jpg")
cv2.imwrite(foto_path, mejor_rostro)
# 2. Actualizamos el caché de géneros sin usar IA
ruta_generos = os.path.join(CACHE_PATH, "generos.json")
dic_generos = {}
if os.path.exists(ruta_generos):
try:
with open(ruta_generos, 'r') as f:
dic_generos = json.load(f)
except Exception: pass
dic_generos[nom] = genero_guardado
with open(ruta_generos, 'w') as f:
json.dump(dic_generos, f)
print(f"\n[OK] Foto guardada. Generando punto de gravedad matemático...")
# 3. Forzamos la creación del vector en la base de datos
gestionar_vectores(actualizar=True)
print(" Registro inyectado exitosamente en el sistema principal.")
else:
print("[!] Registro cancelado por nombre vacío.")
cv2.destroyWindow("Captura Congelada")
else:
print("[!] No se detectó ningún rostro claro. Acércate más a la luz.")
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
import cv2
import os
import json
import numpy as np
# ⚡ Importamos tus motores exactos para no romper la simetría
from reconocimiento2 import detectar_rostros_yunet, gestionar_vectores
def registrar_desde_webcam():
print("\n" + "="*50)
print("📸 MÓDULO DE REGISTRO LIMPIO (WEBCAM LOCAL)")
print("Alinea tu rostro, mira a la cámara con buena luz.")
print("Presiona [R] para capturar | [Q] para salir")
print("="*50 + "\n")
DB_PATH = "db_institucion"
CACHE_PATH = "cache_nombres"
os.makedirs(DB_PATH, exist_ok=True)
os.makedirs(CACHE_PATH, exist_ok=True)
# 0 es la cámara por defecto de tu laptop
cap = cv2.VideoCapture(0)
if not cap.isOpened():
print("[!] Error: No se pudo abrir la webcam local.")
return
while True:
ret, frame = cap.read()
if not ret: continue
# Espejamos la imagen para que actúe como un espejo natural
frame = cv2.flip(frame, 1)
display_frame = frame.copy()
# Usamos YuNet para garantizar que estamos capturando una cara válida
faces = detectar_rostros_yunet(frame)
mejor_rostro = None
max_area = 0
for (fx, fy, fw, fh, score) in faces:
area = fw * fh
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (0, 255, 0), 2)
if area > max_area:
max_area = area
h_frame, w_frame = frame.shape[:2]
# Mismo margen del 30% que requiere MTCNN para alinear correctamente
m_x, m_y = int(fw * 0.30), int(fh * 0.30)
y1 = max(0, fy - m_y)
y2 = min(h_frame, fy + fh + m_y)
x1 = max(0, fx - m_x)
x2 = min(w_frame, fx + fw + m_x)
mejor_rostro = frame[y1:y2, x1:x2]
cv2.putText(display_frame, "Alineate y presiona [R]", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.imshow("Registro Webcam Local", display_frame)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('r'):
if mejor_rostro is not None and mejor_rostro.size > 0:
cv2.imshow("Captura Congelada", mejor_rostro)
cv2.waitKey(1)
print("\n--- NUEVO REGISTRO ---")
nom = input("Escribe el nombre exacto de la persona: ").strip()
if nom:
gen_input = input("¿Es Hombre (h) o Mujer (m)?: ").strip().lower()
genero_guardado = "Woman" if gen_input == 'm' else "Man"
# 1. Guardamos la foto pura
foto_path = os.path.join(DB_PATH, f"{nom}.jpg")
cv2.imwrite(foto_path, mejor_rostro)
# 2. Actualizamos el caché de géneros sin usar IA
ruta_generos = os.path.join(CACHE_PATH, "generos.json")
dic_generos = {}
if os.path.exists(ruta_generos):
try:
with open(ruta_generos, 'r') as f:
dic_generos = json.load(f)
except Exception: pass
dic_generos[nom] = genero_guardado
with open(ruta_generos, 'w') as f:
json.dump(dic_generos, f)
print(f"\n[OK] Foto guardada. Generando punto de gravedad matemático...")
# 3. Forzamos la creación del vector en la base de datos
gestionar_vectores(actualizar=True)
print(" Registro inyectado exitosamente en el sistema principal.")
else:
print("[!] Registro cancelado por nombre vacío.")
cv2.destroyWindow("Captura Congelada")
else:
print("[!] No se detectó ningún rostro claro. Acércate más a la luz.")
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
registrar_desde_webcam()

View File

@ -1,43 +1,43 @@
import cv2
import time
from ultralytics import YOLO
from seguimiento2 import GlobalMemory, CamManager, dibujar_track
def test_video(video_path):
print(f"Iniciando Benchmark de Video: {video_path}")
model = YOLO("yolov8n.pt")
global_mem = GlobalMemory()
manager = CamManager("TEST_CAM", global_mem)
cap = cv2.VideoCapture(video_path)
cv2.namedWindow("Benchmark TT", cv2.WINDOW_AUTOSIZE)
while cap.isOpened():
ret, frame = cap.read()
if not ret: break
now = time.time()
frame_show = cv2.resize(frame, (480, 270))
# Inferencia frame por frame sin hilos (sincrónico)
res = model.predict(frame_show, conf=0.40, iou=0.50, classes=[0], verbose=False, imgsz=480, device='cpu')
boxes = res[0].boxes.xyxy.cpu().numpy().tolist() if res[0].boxes else []
tracks = manager.update(boxes, frame_show, now, turno_activo=True)
for trk in tracks:
if trk.time_since_update == 0:
dibujar_track(frame_show, trk)
cv2.imshow("Benchmark TT", frame_show)
# Si presionas espacio se pausa, con 'q' sales
key = cv2.waitKey(30) & 0xFF
if key == ord('q'): break
elif key == ord(' '): cv2.waitKey(-1)
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
import cv2
import time
from ultralytics import YOLO
from seguimiento2 import GlobalMemory, CamManager, dibujar_track
def test_video(video_path):
print(f"Iniciando Benchmark de Video: {video_path}")
model = YOLO("yolov8n.pt")
global_mem = GlobalMemory()
manager = CamManager("TEST_CAM", global_mem)
cap = cv2.VideoCapture(video_path)
cv2.namedWindow("Benchmark TT", cv2.WINDOW_AUTOSIZE)
while cap.isOpened():
ret, frame = cap.read()
if not ret: break
now = time.time()
frame_show = cv2.resize(frame, (480, 270))
# Inferencia frame por frame sin hilos (sincrónico)
res = model.predict(frame_show, conf=0.40, iou=0.50, classes=[0], verbose=False, imgsz=480, device='cpu')
boxes = res[0].boxes.xyxy.cpu().numpy().tolist() if res[0].boxes else []
tracks = manager.update(boxes, frame_show, now, turno_activo=True)
for trk in tracks:
if trk.time_since_update == 0:
dibujar_track(frame_show, trk)
cv2.imshow("Benchmark TT", frame_show)
# Si presionas espacio se pausa, con 'q' sales
key = cv2.waitKey(30) & 0xFF
if key == ord('q'): break
elif key == ord(' '): cv2.waitKey(-1)
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
test_video("video.mp4") # Pon aquí el nombre de tu video

View File

@ -1,481 +1,465 @@
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import cv2
import numpy as np
from deepface import DeepFace
import pickle
import time
import threading
import asyncio
import edge_tts
import subprocess
from datetime import datetime
import warnings
import urllib.request
import torch
if torch.cuda.is_available():
device = "cuda"
print("GPU detectada → usando GPU 🚀")
else:
device = "cpu"
print("GPU no disponible → usando CPU ⚠️")
import torch
if torch.cuda.is_available():
device = "cuda"
print("GPU detectada → usando GPU 🚀")
else:
device = "cpu"
print("GPU no disponible → usando CPU ⚠️")
warnings.filterwarnings("ignore")
# ──────────────────────────────────────────────────────────────────────────────
# CONFIGURACIÓN
# ──────────────────────────────────────────────────────────────────────────────
DB_PATH = "db_institucion"
CACHE_PATH = "cache_nombres"
VECTORS_FILE = "base_datos_rostros.pkl"
TIMESTAMPS_FILE = "representaciones_timestamps.pkl"
UMBRAL_SIM = 0.42 # Por encima → identificado. Por debajo → desconocido.
COOLDOWN_TIME = 15 # Segundos entre saludos
USUARIO, PASSWORD, IP_DVR = "admin", "TCA200503", "192.168.1.244"
RTSP_URL = f"rtsp://{USUARIO}:{PASSWORD}@{IP_DVR}:554/Streaming/Channels/702"
for path in [DB_PATH, CACHE_PATH]:
os.makedirs(path, exist_ok=True)
# ──────────────────────────────────────────────────────────────────────────────
# YUNET — Detector facial rápido en CPU
# ──────────────────────────────────────────────────────────────────────────────
YUNET_MODEL_PATH = "face_detection_yunet_2023mar.onnx"
if not os.path.exists(YUNET_MODEL_PATH):
print(f"Descargando YuNet ({YUNET_MODEL_PATH})...")
url = ("https://github.com/opencv/opencv_zoo/raw/main/models/"
"face_detection_yunet/face_detection_yunet_2023mar.onnx")
urllib.request.urlretrieve(url, YUNET_MODEL_PATH)
print("YuNet descargado.")
# Detector estricto para ROIs grandes (persona cerca)
detector_yunet = cv2.FaceDetectorYN.create(
model=YUNET_MODEL_PATH, config="",
input_size=(320, 320),
score_threshold=0.70,
nms_threshold=0.3,
top_k=5000
)
# Detector permisivo para ROIs pequeños (persona lejos)
detector_yunet_lejano = cv2.FaceDetectorYN.create(
model=YUNET_MODEL_PATH, config="",
input_size=(320, 320),
score_threshold=0.45,
nms_threshold=0.3,
top_k=5000
)
def detectar_rostros_yunet(roi, lock=None):
"""
Elige automáticamente el detector según el tamaño del ROI.
"""
h_roi, w_roi = roi.shape[:2]
area = w_roi * h_roi
det = detector_yunet if area > 8000 else detector_yunet_lejano
try:
if lock:
with lock:
det.setInputSize((w_roi, h_roi))
_, faces = det.detect(roi)
else:
det.setInputSize((w_roi, h_roi))
_, faces = det.detect(roi)
except Exception:
return []
if faces is None:
return []
resultado = []
for face in faces:
try:
fx, fy, fw, fh = map(int, face[:4])
score = float(face[14]) if len(face) > 14 else 1.0
resultado.append((fx, fy, fw, fh, score))
except (ValueError, OverflowError, TypeError):
continue
return resultado
# ──────────────────────────────────────────────────────────────────────────────
# SISTEMA DE AUDIO
# ──────────────────────────────────────────────────────────────────────────────
def obtener_audios_humanos(genero):
hora = datetime.now().hour
es_mujer = genero.lower() == 'woman'
suffix = "_m.mp3" if es_mujer else "_h.mp3"
if 5 <= hora < 12:
intro = "dias.mp3"
elif 12 <= hora < 19:
intro = "tarde.mp3"
else:
intro = "noches.mp3"
cierre = ("fin_noche" if (hora >= 19 or hora < 5) else "fin_dia") + suffix
return intro, cierre
async def sintetizar_nombre(nombre, ruta):
nombre_limpio = nombre.replace('_', ' ')
try:
comunicador = edge_tts.Communicate(nombre_limpio, "es-MX-DaliaNeural", rate="+10%")
await comunicador.save(ruta)
except Exception:
pass
def reproducir(archivo):
if os.path.exists(archivo):
subprocess.Popen(
["mpv", "--no-video", "--volume=100", archivo],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
def hilo_bienvenida(nombre, genero):
archivo_nombre = os.path.join(CACHE_PATH, f"nombre_{nombre}.mp3")
if not os.path.exists(archivo_nombre):
try:
asyncio.run(sintetizar_nombre(nombre, archivo_nombre))
except Exception:
pass
intro, cierre = obtener_audios_humanos(genero)
archivos = [f for f in [intro, archivo_nombre, cierre] if os.path.exists(f)]
if archivos:
subprocess.Popen(
["mpv", "--no-video", "--volume=100"] + archivos,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# ──────────────────────────────────────────────────────────────────────────────
# GESTIÓN DE BASE DE DATOS (AHORA CON RETINAFACE Y ALINEACIÓN)
# ──────────────────────────────────────────────────────────────────────────────
def gestionar_vectores(actualizar=False):
import json # ⚡ Asegúrate de tener importado json
vectores_actuales = {}
if os.path.exists(VECTORS_FILE):
try:
with open(VECTORS_FILE, 'rb') as f:
vectores_actuales = pickle.load(f)
except Exception:
vectores_actuales = {}
if not actualizar:
return vectores_actuales
timestamps = {}
if os.path.exists(TIMESTAMPS_FILE):
try:
with open(TIMESTAMPS_FILE, 'rb') as f:
timestamps = pickle.load(f)
except Exception:
timestamps = {}
# ──────────────────────────────────────────────────────────
# CARGA DEL CACHÉ DE GÉNEROS
# ──────────────────────────────────────────────────────────
ruta_generos = os.path.join(CACHE_PATH, "generos.json")
dic_generos = {}
if os.path.exists(ruta_generos):
try:
with open(ruta_generos, 'r') as f:
dic_generos = json.load(f)
except Exception:
pass
print("\nACTUALIZANDO BASE DE DATOS (Alineación y Caché de Géneros)...")
imagenes = [f for f in os.listdir(DB_PATH) if f.lower().endswith(('.jpg', '.png'))]
nombres_en_disco = set()
hubo_cambios = False
cambio_generos = False # Bandera para saber si actualizamos el JSON
for archivo in imagenes:
nombre_archivo = os.path.splitext(archivo)[0]
ruta_img = os.path.join(DB_PATH, archivo)
nombres_en_disco.add(nombre_archivo)
ts_actual = os.path.getmtime(ruta_img)
ts_guardado = timestamps.get(nombre_archivo, 0)
# Si ya tenemos el vector pero NO tenemos su género en el JSON, forzamos el procesamiento
falta_genero = nombre_archivo not in dic_generos
if nombre_archivo in vectores_actuales and ts_actual == ts_guardado and not falta_genero:
continue
try:
img_db = cv2.imread(ruta_img)
lab = cv2.cvtColor(img_db, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
l = clahe.apply(l)
img_mejorada = cv2.cvtColor(cv2.merge((l, a, b)), cv2.COLOR_LAB2BGR)
# IA DE GÉNERO (Solo se ejecuta 1 vez por persona en toda la vida del sistema)
if falta_genero:
try:
analisis = DeepFace.analyze(img_mejorada, actions=['gender'], enforce_detection=False)[0]
dic_generos[nombre_archivo] = analisis.get('dominant_gender', 'Man')
except Exception:
dic_generos[nombre_archivo] = "Man" # Respaldo
cambio_generos = True
# Extraemos el vector
res = DeepFace.represent(
img_path=img_mejorada,
model_name="ArcFace",
detector_backend="opencv",
align=False,
enforce_detection=True
)
emb = np.array(res[0]["embedding"], dtype=np.float32)
norma = np.linalg.norm(emb)
if norma > 0:
emb = emb / norma
vectores_actuales[nombre_archivo] = emb
timestamps[nombre_archivo] = ts_actual
hubo_cambios = True
print(f" Procesado y alineado: {nombre_archivo} | Género: {dic_generos.get(nombre_archivo)}")
except Exception as e:
print(f" Rostro no válido en '{archivo}', omitido. Error: {e}")
# Limpieza de eliminados
for nombre in list(vectores_actuales.keys()):
if nombre not in nombres_en_disco:
del vectores_actuales[nombre]
timestamps.pop(nombre, None)
if nombre in dic_generos:
del dic_generos[nombre]
cambio_generos = True
hubo_cambios = True
print(f" Eliminado (sin foto): {nombre}")
# Guardado de la memoria
if hubo_cambios:
with open(VECTORS_FILE, 'wb') as f:
pickle.dump(vectores_actuales, f)
with open(TIMESTAMPS_FILE, 'wb') as f:
pickle.dump(timestamps, f)
# Guardado del JSON de géneros si hubo descubrimientos nuevos
if cambio_generos:
with open(ruta_generos, 'w') as f:
json.dump(dic_generos, f)
if hubo_cambios or cambio_generos:
print(" Sincronización terminada.\n")
else:
print(" Sin cambios. Base de datos al día.\n")
return vectores_actuales
# ──────────────────────────────────────────────────────────────────────────────
# BÚSQUEDA BLINDADA (Similitud Coseno estricta)
# ──────────────────────────────────────────────────────────────────────────────
def buscar_mejor_match(emb_consulta, base_datos):
# ⚡ MAGIA 3: Normalización L2 del vector entrante
norma = np.linalg.norm(emb_consulta)
if norma > 0:
emb_consulta = emb_consulta / norma
mejor_match, max_sim = None, -1.0
for nombre, vec in base_datos.items():
# Como ambos están normalizados, esto es Similitud Coseno pura (-1.0 a 1.0)
sim = float(np.dot(emb_consulta, vec))
if sim > max_sim:
max_sim = sim
mejor_match = nombre
return mejor_match, max_sim
# ──────────────────────────────────────────────────────────────────────────────
# LOOP DE PRUEBA Y REGISTRO (CON SIMETRÍA ESTRICTA)
# ──────────────────────────────────────────────────────────────────────────────
def sistema_interactivo():
base_datos = gestionar_vectores(actualizar=False)
cap = cv2.VideoCapture(RTSP_URL)
ultimo_saludo = 0
persona_actual = None
confirmaciones = 0
print("\n" + "=" * 50)
print(" MÓDULO DE REGISTRO Y DEPURACIÓN ESTRICTO")
print(" [R] Registrar nuevo rostro | [Q] Salir")
print("=" * 50 + "\n")
faces_ultimo_frame = []
while True:
ret, frame = cap.read()
if not ret:
time.sleep(2)
cap.open(RTSP_URL)
continue
h, w = frame.shape[:2]
display_frame = frame.copy()
tiempo_actual = time.time()
faces_raw = detectar_rostros_yunet(frame)
faces_ultimo_frame = faces_raw
for (fx, fy, fw, fh, score_yunet) in faces_raw:
fx = max(0, fx); fy = max(0, fy)
fw = min(w - fx, fw); fh = min(h - fy, fh)
if fw <= 0 or fh <= 0:
continue
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (255, 200, 0), 2)
cv2.putText(display_frame, f"YN:{score_yunet:.2f}",
(fx, fy - 25), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 200, 0), 1)
if (tiempo_actual - ultimo_saludo) <= COOLDOWN_TIME:
continue
m = int(fw * 0.15)
roi = frame[max(0, fy-m): min(h, fy+fh+m),
max(0, fx-m): min(w, fx+fw+m)]
# 🛡️ FILTRO DE TAMAÑO FÍSICO
if roi.size == 0 or roi.shape[0] < 40 or roi.shape[1] < 40:
cv2.putText(display_frame, "muy pequeno",
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 100, 255), 1)
continue
# 🛡️ FILTRO DE NITIDEZ
gray_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
nitidez = cv2.Laplacian(gray_roi, cv2.CV_64F).var()
if nitidez < 50.0:
cv2.putText(display_frame, f"blur({nitidez:.0f})",
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 165, 255), 1)
continue
# 🌙 SIMETRÍA 1: VISIÓN NOCTURNA (CLAHE) AL VIDEO EN VIVO
try:
lab = cv2.cvtColor(roi, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
l = clahe.apply(l)
roi_mejorado = cv2.cvtColor(cv2.merge((l, a, b)), cv2.COLOR_LAB2BGR)
except Exception:
roi_mejorado = roi # Respaldo de seguridad
# 🧠 SIMETRÍA 2: MOTOR MTCNN Y ALINEACIÓN (Igual que la Base de Datos)
try:
res = DeepFace.represent(
img_path=roi_mejorado,
model_name="ArcFace",
detector_backend="mtcnn", # El mismo que en gestionar_vectores
align=True, # Enderezamos la cara
enforce_detection=True # Si MTCNN no ve cara clara, aborta
)
emb = np.array(res[0]["embedding"], dtype=np.float32)
mejor_match, max_sim = buscar_mejor_match(emb, base_datos)
except Exception:
# MTCNN abortó porque la cara estaba de perfil, tapada o no era una cara
cv2.putText(display_frame, "MTCNN Ignorado",
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
continue
estado = " IDENTIFICADO" if max_sim > UMBRAL_SIM else "DESCONOCIDO"
nombre_d = mejor_match.split('_')[0] if mejor_match else "nadie"
n_bloques = int(max_sim * 20)
barra = "" * n_bloques + "" * (20 - n_bloques)
print(f"[REGISTRO] {estado} | {nombre_d:<14} | {barra} | "
f"{max_sim*100:.1f}% (umbral: {UMBRAL_SIM*100:.0f}%)")
if max_sim > UMBRAL_SIM and mejor_match:
color = (0, 255, 0)
texto = f"{mejor_match.split('_')[0]} ({max_sim:.2f})"
if mejor_match == persona_actual:
confirmaciones += 1
else:
persona_actual, confirmaciones = mejor_match, 1
if confirmaciones >= 1:
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (0, 255, 0), 3)
try:
analisis = DeepFace.analyze(
roi_mejorado, actions=['gender'], enforce_detection=False
)[0]
genero = analisis['dominant_gender']
except Exception:
genero = "Man"
threading.Thread(
target=hilo_bienvenida,
args=(mejor_match, genero),
daemon=True
).start()
ultimo_saludo = tiempo_actual
confirmaciones = 0
else:
color = (0, 0, 255)
texto = f"? ({max_sim:.2f})"
confirmaciones = max(0, confirmaciones - 1)
cv2.putText(display_frame, texto,
(fx, fy - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
cv2.imshow("Módulo de Registro", display_frame)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('r'):
if faces_ultimo_frame:
areas = [fw * fh for (fx, fy, fw, fh, _) in faces_ultimo_frame]
fx, fy, fw, fh, _ = faces_ultimo_frame[np.argmax(areas)]
m_x = int(fw * 0.30)
m_y = int(fh * 0.30)
face_roi = frame[max(0, fy-m_y): min(h, fy+fh+m_y),
max(0, fx-m_x): min(w, fx+fw+m_x)]
if face_roi.size > 0:
nom = input("\nNombre de la persona: ").strip()
if nom:
foto_path = os.path.join(DB_PATH, f"{nom}.jpg")
cv2.imwrite(foto_path, face_roi)
print(f"[OK] Rostro de '{nom}' guardado. Sincronizando...")
base_datos = gestionar_vectores(actualizar=True)
else:
print("[!] Registro cancelado.")
else:
print("[!] Recorte vacío. Intenta de nuevo.")
else:
print("\n[!] No se detectó rostro. Acércate más o mira a la lente.")
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
sistema_interactivo()
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import cv2
import numpy as np
from deepface import DeepFace
import pickle
import time
import threading
import asyncio
import edge_tts
import subprocess
from datetime import datetime
import warnings
import urllib.request
warnings.filterwarnings("ignore")
# ──────────────────────────────────────────────────────────────────────────────
# CONFIGURACIÓN
# ──────────────────────────────────────────────────────────────────────────────
DB_PATH = "db_institucion"
CACHE_PATH = "cache_nombres"
VECTORS_FILE = "base_datos_rostros.pkl"
TIMESTAMPS_FILE = "representaciones_timestamps.pkl"
UMBRAL_SIM = 0.42 # Por encima → identificado. Por debajo → desconocido.
COOLDOWN_TIME = 15 # Segundos entre saludos
USUARIO, PASSWORD, IP_DVR = "admin", "TCA200503", "192.168.1.244"
RTSP_URL = f"rtsp://{USUARIO}:{PASSWORD}@{IP_DVR}:554/Streaming/Channels/702"
for path in [DB_PATH, CACHE_PATH]:
os.makedirs(path, exist_ok=True)
# ──────────────────────────────────────────────────────────────────────────────
# YUNET — Detector facial rápido en CPU
# ──────────────────────────────────────────────────────────────────────────────
YUNET_MODEL_PATH = "face_detection_yunet_2023mar.onnx"
if not os.path.exists(YUNET_MODEL_PATH):
print(f"Descargando YuNet ({YUNET_MODEL_PATH})...")
url = ("https://github.com/opencv/opencv_zoo/raw/main/models/"
"face_detection_yunet/face_detection_yunet_2023mar.onnx")
urllib.request.urlretrieve(url, YUNET_MODEL_PATH)
print("YuNet descargado.")
# Detector estricto para ROIs grandes (persona cerca)
detector_yunet = cv2.FaceDetectorYN.create(
model=YUNET_MODEL_PATH, config="",
input_size=(320, 320),
score_threshold=0.70,
nms_threshold=0.3,
top_k=5000
)
# Detector permisivo para ROIs pequeños (persona lejos)
detector_yunet_lejano = cv2.FaceDetectorYN.create(
model=YUNET_MODEL_PATH, config="",
input_size=(320, 320),
score_threshold=0.45,
nms_threshold=0.3,
top_k=5000
)
def detectar_rostros_yunet(roi, lock=None):
"""
Elige automáticamente el detector según el tamaño del ROI.
"""
h_roi, w_roi = roi.shape[:2]
area = w_roi * h_roi
det = detector_yunet if area > 8000 else detector_yunet_lejano
try:
if lock:
with lock:
det.setInputSize((w_roi, h_roi))
_, faces = det.detect(roi)
else:
det.setInputSize((w_roi, h_roi))
_, faces = det.detect(roi)
except Exception:
return []
if faces is None:
return []
resultado = []
for face in faces:
try:
fx, fy, fw, fh = map(int, face[:4])
score = float(face[14]) if len(face) > 14 else 1.0
resultado.append((fx, fy, fw, fh, score))
except (ValueError, OverflowError, TypeError):
continue
return resultado
# ──────────────────────────────────────────────────────────────────────────────
# SISTEMA DE AUDIO
# ──────────────────────────────────────────────────────────────────────────────
def obtener_audios_humanos(genero):
hora = datetime.now().hour
es_mujer = genero.lower() == 'woman'
suffix = "_m.mp3" if es_mujer else "_h.mp3"
if 5 <= hora < 12:
intro = "dias.mp3"
elif 12 <= hora < 19:
intro = "tarde.mp3"
else:
intro = "noches.mp3"
cierre = ("fin_noche" if (hora >= 19 or hora < 5) else "fin_dia") + suffix
return intro, cierre
async def sintetizar_nombre(nombre, ruta):
nombre_limpio = nombre.replace('_', ' ')
try:
comunicador = edge_tts.Communicate(nombre_limpio, "es-MX-DaliaNeural", rate="+10%")
await comunicador.save(ruta)
except Exception:
pass
def reproducir(archivo):
if os.path.exists(archivo):
subprocess.Popen(
["mpv", "--no-video", "--volume=100", archivo],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
def hilo_bienvenida(nombre, genero):
archivo_nombre = os.path.join(CACHE_PATH, f"nombre_{nombre}.mp3")
if not os.path.exists(archivo_nombre):
try:
asyncio.run(sintetizar_nombre(nombre, archivo_nombre))
except Exception:
pass
intro, cierre = obtener_audios_humanos(genero)
archivos = [f for f in [intro, archivo_nombre, cierre] if os.path.exists(f)]
if archivos:
subprocess.Popen(
["mpv", "--no-video", "--volume=100"] + archivos,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# ──────────────────────────────────────────────────────────────────────────────
# GESTIÓN DE BASE DE DATOS (AHORA CON RETINAFACE Y ALINEACIÓN)
# ──────────────────────────────────────────────────────────────────────────────
def gestionar_vectores(actualizar=False):
import json # ⚡ Asegúrate de tener importado json
vectores_actuales = {}
if os.path.exists(VECTORS_FILE):
try:
with open(VECTORS_FILE, 'rb') as f:
vectores_actuales = pickle.load(f)
except Exception:
vectores_actuales = {}
if not actualizar:
return vectores_actuales
timestamps = {}
if os.path.exists(TIMESTAMPS_FILE):
try:
with open(TIMESTAMPS_FILE, 'rb') as f:
timestamps = pickle.load(f)
except Exception:
timestamps = {}
# ──────────────────────────────────────────────────────────
# CARGA DEL CACHÉ DE GÉNEROS
# ──────────────────────────────────────────────────────────
ruta_generos = os.path.join(CACHE_PATH, "generos.json")
dic_generos = {}
if os.path.exists(ruta_generos):
try:
with open(ruta_generos, 'r') as f:
dic_generos = json.load(f)
except Exception:
pass
print("\nACTUALIZANDO BASE DE DATOS (Alineación y Caché de Géneros)...")
imagenes = [f for f in os.listdir(DB_PATH) if f.lower().endswith(('.jpg', '.png'))]
nombres_en_disco = set()
hubo_cambios = False
cambio_generos = False # Bandera para saber si actualizamos el JSON
for archivo in imagenes:
nombre_archivo = os.path.splitext(archivo)[0]
ruta_img = os.path.join(DB_PATH, archivo)
nombres_en_disco.add(nombre_archivo)
ts_actual = os.path.getmtime(ruta_img)
ts_guardado = timestamps.get(nombre_archivo, 0)
# Si ya tenemos el vector pero NO tenemos su género en el JSON, forzamos el procesamiento
falta_genero = nombre_archivo not in dic_generos
if nombre_archivo in vectores_actuales and ts_actual == ts_guardado and not falta_genero:
continue
try:
img_db = cv2.imread(ruta_img)
lab = cv2.cvtColor(img_db, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
l = clahe.apply(l)
img_mejorada = cv2.cvtColor(cv2.merge((l, a, b)), cv2.COLOR_LAB2BGR)
# IA DE GÉNERO (Solo se ejecuta 1 vez por persona en toda la vida del sistema)
if falta_genero:
try:
analisis = DeepFace.analyze(img_mejorada, actions=['gender'], enforce_detection=False)[0]
dic_generos[nombre_archivo] = analisis.get('dominant_gender', 'Man')
except Exception:
dic_generos[nombre_archivo] = "Man" # Respaldo
cambio_generos = True
# Extraemos el vector
res = DeepFace.represent(
img_path=img_mejorada,
model_name="ArcFace",
detector_backend="mtcnn",
align=True,
enforce_detection=True
)
emb = np.array(res[0]["embedding"], dtype=np.float32)
norma = np.linalg.norm(emb)
if norma > 0:
emb = emb / norma
vectores_actuales[nombre_archivo] = emb
timestamps[nombre_archivo] = ts_actual
hubo_cambios = True
print(f" Procesado y alineado: {nombre_archivo} | Género: {dic_generos.get(nombre_archivo)}")
except Exception as e:
print(f" Rostro no válido en '{archivo}', omitido. Error: {e}")
# Limpieza de eliminados
for nombre in list(vectores_actuales.keys()):
if nombre not in nombres_en_disco:
del vectores_actuales[nombre]
timestamps.pop(nombre, None)
if nombre in dic_generos:
del dic_generos[nombre]
cambio_generos = True
hubo_cambios = True
print(f" Eliminado (sin foto): {nombre}")
# Guardado de la memoria
if hubo_cambios:
with open(VECTORS_FILE, 'wb') as f:
pickle.dump(vectores_actuales, f)
with open(TIMESTAMPS_FILE, 'wb') as f:
pickle.dump(timestamps, f)
# Guardado del JSON de géneros si hubo descubrimientos nuevos
if cambio_generos:
with open(ruta_generos, 'w') as f:
json.dump(dic_generos, f)
if hubo_cambios or cambio_generos:
print(" Sincronización terminada.\n")
else:
print(" Sin cambios. Base de datos al día.\n")
return vectores_actuales
# ──────────────────────────────────────────────────────────────────────────────
# BÚSQUEDA BLINDADA (Similitud Coseno estricta)
# ──────────────────────────────────────────────────────────────────────────────
def buscar_mejor_match(emb_consulta, base_datos):
# ⚡ MAGIA 3: Normalización L2 del vector entrante
norma = np.linalg.norm(emb_consulta)
if norma > 0:
emb_consulta = emb_consulta / norma
mejor_match, max_sim = None, -1.0
for nombre, vec in base_datos.items():
# Como ambos están normalizados, esto es Similitud Coseno pura (-1.0 a 1.0)
sim = float(np.dot(emb_consulta, vec))
if sim > max_sim:
max_sim = sim
mejor_match = nombre
return mejor_match, max_sim
# ──────────────────────────────────────────────────────────────────────────────
# LOOP DE PRUEBA Y REGISTRO (CON SIMETRÍA ESTRICTA)
# ──────────────────────────────────────────────────────────────────────────────
def sistema_interactivo():
base_datos = gestionar_vectores(actualizar=False)
cap = cv2.VideoCapture(RTSP_URL)
ultimo_saludo = 0
persona_actual = None
confirmaciones = 0
print("\n" + "=" * 50)
print(" MÓDULO DE REGISTRO Y DEPURACIÓN ESTRICTO")
print(" [R] Registrar nuevo rostro | [Q] Salir")
print("=" * 50 + "\n")
faces_ultimo_frame = []
while True:
ret, frame = cap.read()
if not ret:
time.sleep(2)
cap.open(RTSP_URL)
continue
h, w = frame.shape[:2]
display_frame = frame.copy()
tiempo_actual = time.time()
faces_raw = detectar_rostros_yunet(frame)
faces_ultimo_frame = faces_raw
for (fx, fy, fw, fh, score_yunet) in faces_raw:
fx = max(0, fx); fy = max(0, fy)
fw = min(w - fx, fw); fh = min(h - fy, fh)
if fw <= 0 or fh <= 0:
continue
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (255, 200, 0), 2)
cv2.putText(display_frame, f"YN:{score_yunet:.2f}",
(fx, fy - 25), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 200, 0), 1)
if (tiempo_actual - ultimo_saludo) <= COOLDOWN_TIME:
continue
m = int(fw * 0.15)
roi = frame[max(0, fy-m): min(h, fy+fh+m),
max(0, fx-m): min(w, fx+fw+m)]
# 🛡️ FILTRO DE TAMAÑO FÍSICO
if roi.size == 0 or roi.shape[0] < 40 or roi.shape[1] < 40:
cv2.putText(display_frame, "muy pequeno",
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 100, 255), 1)
continue
# 🛡️ FILTRO DE NITIDEZ
gray_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
nitidez = cv2.Laplacian(gray_roi, cv2.CV_64F).var()
if nitidez < 50.0:
cv2.putText(display_frame, f"blur({nitidez:.0f})",
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 165, 255), 1)
continue
# 🌙 SIMETRÍA 1: VISIÓN NOCTURNA (CLAHE) AL VIDEO EN VIVO
try:
lab = cv2.cvtColor(roi, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
l = clahe.apply(l)
roi_mejorado = cv2.cvtColor(cv2.merge((l, a, b)), cv2.COLOR_LAB2BGR)
except Exception:
roi_mejorado = roi # Respaldo de seguridad
# 🧠 SIMETRÍA 2: MOTOR MTCNN Y ALINEACIÓN (Igual que la Base de Datos)
try:
res = DeepFace.represent(
img_path=roi_mejorado,
model_name="ArcFace",
detector_backend="mtcnn", # El mismo que en gestionar_vectores
align=True, # Enderezamos la cara
enforce_detection=True # Si MTCNN no ve cara clara, aborta
)
emb = np.array(res[0]["embedding"], dtype=np.float32)
mejor_match, max_sim = buscar_mejor_match(emb, base_datos)
except Exception:
# MTCNN abortó porque la cara estaba de perfil, tapada o no era una cara
cv2.putText(display_frame, "MTCNN Ignorado",
(fx, fy-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
continue
estado = " IDENTIFICADO" if max_sim > UMBRAL_SIM else "DESCONOCIDO"
nombre_d = mejor_match.split('_')[0] if mejor_match else "nadie"
n_bloques = int(max_sim * 20)
barra = "" * n_bloques + "" * (20 - n_bloques)
print(f"[REGISTRO] {estado} | {nombre_d:<14} | {barra} | "
f"{max_sim*100:.1f}% (umbral: {UMBRAL_SIM*100:.0f}%)")
if max_sim > UMBRAL_SIM and mejor_match:
color = (0, 255, 0)
texto = f"{mejor_match.split('_')[0]} ({max_sim:.2f})"
if mejor_match == persona_actual:
confirmaciones += 1
else:
persona_actual, confirmaciones = mejor_match, 1
if confirmaciones >= 1:
cv2.rectangle(display_frame, (fx, fy), (fx+fw, fy+fh), (0, 255, 0), 3)
try:
analisis = DeepFace.analyze(
roi_mejorado, actions=['gender'], enforce_detection=False
)[0]
genero = analisis['dominant_gender']
except Exception:
genero = "Man"
threading.Thread(
target=hilo_bienvenida,
args=(mejor_match, genero),
daemon=True
).start()
ultimo_saludo = tiempo_actual
confirmaciones = 0
else:
color = (0, 0, 255)
texto = f"? ({max_sim:.2f})"
confirmaciones = max(0, confirmaciones - 1)
cv2.putText(display_frame, texto,
(fx, fy - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
cv2.imshow("Módulo de Registro", display_frame)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('r'):
if faces_ultimo_frame:
areas = [fw * fh for (fx, fy, fw, fh, _) in faces_ultimo_frame]
fx, fy, fw, fh, _ = faces_ultimo_frame[np.argmax(areas)]
m_x = int(fw * 0.30)
m_y = int(fh * 0.30)
face_roi = frame[max(0, fy-m_y): min(h, fy+fh+m_y),
max(0, fx-m_x): min(w, fx+fw+m_x)]
if face_roi.size > 0:
nom = input("\nNombre de la persona: ").strip()
if nom:
foto_path = os.path.join(DB_PATH, f"{nom}.jpg")
cv2.imwrite(foto_path, face_roi)
print(f"[OK] Rostro de '{nom}' guardado. Sincronizando...")
base_datos = gestionar_vectores(actualizar=True)
else:
print("[!] Registro cancelado.")
else:
print("[!] Recorte vacío. Intenta de nuevo.")
else:
print("\n[!] No se detectó rostro. Acércate más o mira a la lente.")
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
sistema_interactivo()

Binary file not shown.

View File

@ -1,96 +1,96 @@
absl-py==2.4.0
aiohappyeyeballs==2.6.1
aiohttp==3.13.3
aiosignal==1.4.0
astunparse==1.6.3
attrs==25.4.0
beautifulsoup4==4.14.3
blinker==1.9.0
certifi==2026.1.4
charset-normalizer==3.4.4
click==8.3.1
contourpy==1.3.2
cycler==0.12.1
deepface==0.0.98
edge-tts==7.2.7
filelock==3.20.0
fire==0.7.1
Flask==3.1.2
flask-cors==6.0.2
flatbuffers==25.12.19
fonttools==4.61.1
frozenlist==1.8.0
fsspec==2025.12.0
gast==0.7.0
gdown==5.2.1
google-pasta==0.2.0
grpcio==1.78.0
gunicorn==25.0.3
h5py==3.15.1
idna==3.11
itsdangerous==2.2.0
Jinja2==3.1.6
joblib==1.5.3
keras==3.12.1
kiwisolver==1.4.9
lap==0.5.12
libclang==18.1.1
lightdsa==0.0.3
lightecc==0.0.4
lightphe==0.0.20
lz4==4.4.5
Markdown==3.10.2
markdown-it-py==4.0.0
MarkupSafe==2.1.5
matplotlib==3.10.8
mdurl==0.1.2
ml_dtypes==0.5.4
mpmath==1.3.0
mtcnn==1.0.0
multidict==6.7.1
namex==0.1.0
networkx==3.4.2
numpy==1.26.4
onnxruntime==1.23.2
opencv-python==4.11.0.86
opt_einsum==3.4.0
optree==0.18.0
packaging==26.0
pandas==2.3.3
pillow==12.0.0
polars==1.38.1
polars-runtime-32==1.38.1
propcache==0.4.1
protobuf==6.33.5
psutil==7.2.2
Pygments==2.19.2
pyparsing==3.3.2
PySocks==1.7.1
python-dateutil==2.9.0.post0
python-dotenv==1.2.1
PyYAML==6.0.3
requests==2.32.5
retina-face==0.0.17
rich==14.3.2
scipy==1.15.3
six==1.17.0
soupsieve==2.8.3
sympy==1.14.0
tabulate==0.9.0
tensorboard==2.20.0
tensorboard-data-server==0.7.2
tensorflow==2.20.0
tensorflow-io-gcs-filesystem==0.37.1
termcolor==3.3.0
tf_keras==2.20.1
torch==2.10.0
torchreid==0.2.5
torchvision==0.25.0
tqdm==4.67.3
typing_extensions==4.15.0
ultralytics==8.4.14
ultralytics-thop==2.0.18
urllib3==2.6.3
Werkzeug==3.1.5
wrapt==2.1.1
yarl==1.22.0
absl-py==2.4.0
aiohappyeyeballs==2.6.1
aiohttp==3.13.3
aiosignal==1.4.0
astunparse==1.6.3
attrs==25.4.0
beautifulsoup4==4.14.3
blinker==1.9.0
certifi==2026.1.4
charset-normalizer==3.4.4
click==8.3.1
contourpy==1.3.3
cycler==0.12.1
deepface==0.0.98
edge-tts==7.2.7
filelock==3.20.0
fire==0.7.1
Flask==3.1.2
flask-cors==6.0.2
flatbuffers==25.12.19
fonttools==4.61.1
frozenlist==1.8.0
fsspec==2025.12.0
gast==0.7.0
gdown==5.2.1
google-pasta==0.2.0
grpcio==1.78.0
gunicorn==25.0.3
h5py==3.15.1
idna==3.11
itsdangerous==2.2.0
Jinja2==3.1.6
joblib==1.5.3
keras==3.13.2
kiwisolver==1.4.9
lap==0.5.12
libclang==18.1.1
lightdsa==0.0.3
lightecc==0.0.4
lightphe==0.0.20
lz4==4.4.5
Markdown==3.10.2
markdown-it-py==4.0.0
MarkupSafe==2.1.5
matplotlib==3.10.8
mdurl==0.1.2
ml_dtypes==0.5.4
mpmath==1.3.0
mtcnn==1.0.0
multidict==6.7.1
namex==0.1.0
networkx==3.6.1
numpy==1.26.4
onnxruntime==1.24.2
opencv-python==4.11.0.86
opt_einsum==3.4.0
optree==0.18.0
packaging==26.0
pandas==3.0.0
pillow==12.0.0
polars==1.38.1
polars-runtime-32==1.38.1
propcache==0.4.1
protobuf==6.33.5
psutil==7.2.2
Pygments==2.19.2
pyparsing==3.3.2
PySocks==1.7.1
python-dateutil==2.9.0.post0
python-dotenv==1.2.1
PyYAML==6.0.3
requests==2.32.5
retina-face==0.0.17
rich==14.3.2
scipy==1.17.0
six==1.17.0
soupsieve==2.8.3
sympy==1.14.0
tabulate==0.9.0
tensorboard==2.20.0
tensorboard-data-server==0.7.2
tensorflow==2.20.0
tensorflow-io-gcs-filesystem==0.37.1
termcolor==3.3.0
tf_keras==2.20.1
torch==2.10.0+cpu
torchreid==0.2.5
torchvision==0.25.0+cpu
tqdm==4.67.3
typing_extensions==4.15.0
ultralytics==8.4.14
ultralytics-thop==2.0.18
urllib3==2.6.3
Werkzeug==3.1.5
wrapt==2.1.1
yarl==1.22.0

Binary file not shown.

Before

Width:  |  Height:  |  Size: 357 KiB

View File

@ -1,409 +0,0 @@
"""
BOCINA INTELIGENTE - CLIENTE PYTHON
====================================
Cliente para enviar audio y comandos a la bocina ESP32 via WebSocket
Uso:
python bocina_client.py
(Luego ingresa la IP y el nombre cuando se solicite)
"""
import asyncio
import websockets
import json
import struct
import math
import sys
import time
import os
from typing import Optional
# ==================== LIMPIAR PANTALLA ====================
def limpiar_pantalla():
"""Limpia la consola según el sistema operativo"""
os.system('cls' if os.name == 'nt' else 'clear')
# ==================== CLASE BOCINA ====================
class BocinaInteligente:
"""Cliente para controlar la bocina inteligente ESP32"""
def __init__(self, ip: str, puerto: int = 81):
self.ip = ip
self.puerto = puerto
self.url = f"ws://{ip}:{puerto}"
self.websocket: Optional[websockets.WebSocketClientProtocol] = None
self.chunk_size = 1024
self.timeout = 5
self.conectado = False
async def conectar(self) -> bool:
"""Conectar al ESP32"""
try:
print(f"🔌 Conectando a {self.url}...")
self.websocket = await websockets.connect(self.url)
# Esperar mensaje de bienvenida
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
if data.get("status") == "ok":
print(f"{data.get('msg')}")
self.conectado = True
return True
except Exception as e:
print(f" ❌ Error: {e}")
return False
return False
async def desconectar(self):
"""Cerrar conexión"""
if self.websocket:
await self.websocket.close()
self.conectado = False
print("🔌 Conexión cerrada")
async def ping(self) -> bool:
"""Probar conexión con el ESP32"""
if not self.websocket:
return False
try:
await self.websocket.send(json.dumps({"cmd": "PING"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
return data.get("status") == "ok"
except:
return False
async def obtener_estado(self) -> dict:
"""Obtener estadísticas del ESP32"""
if not self.websocket:
return {}
try:
await self.websocket.send(json.dumps({"cmd": "STATUS"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
return json.loads(response)
except:
return {}
async def detener(self) -> bool:
"""Detener reproducción"""
if not self.websocket:
return False
try:
await self.websocket.send(json.dumps({"cmd": "STOP"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
return data.get("status") == "ok"
except:
return False
async def enviar_audio(self, audio_data: bytes, nombre: str = "audio") -> bool:
"""
Enviar audio al ESP32
Args:
audio_data: Datos de audio en formato PCM (16kHz, 16bits, mono)
nombre: Nombre identificador (para logs)
"""
if not self.websocket:
print("❌ No hay conexión")
return False
total_chunks = (len(audio_data) + self.chunk_size - 1) // self.chunk_size
print(f"📤 Enviando {total_chunks} chunks ({len(audio_data)} bytes) para '{nombre}'")
inicio = time.time()
for i, chunk_start in enumerate(range(0, len(audio_data), self.chunk_size)):
chunk = audio_data[chunk_start:chunk_start + self.chunk_size]
await self.websocket.send(chunk)
# Mostrar progreso cada 10 chunks o al final
if (i + 1) % 10 == 0 or i == total_chunks - 1:
porcentaje = ((i + 1) * 100) // total_chunks
print(f" 📊 Progreso: {porcentaje}% ({i+1}/{total_chunks} chunks)")
# Pequeña pausa para no saturar
await asyncio.sleep(0.005)
elapsed = time.time() - inicio
print(f"✅ Audio enviado en {elapsed:.2f} segundos")
return True
# ==================== GENERADORES DE AUDIO ====================
def generar_tono(frecuencia: int = 440, duracion_ms: int = 2000,
sample_rate: int = 16000, amplitud: int = 16000) -> bytes:
"""
Generar un tono seno en formato PCM
Args:
frecuencia: Frecuencia del tono en Hz
duracion_ms: Duración en milisegundos
sample_rate: Frecuencia de muestreo
amplitud: Amplitud máxima (0-32767)
"""
num_muestras = int(sample_rate * duracion_ms / 1000)
audio = bytearray()
for i in range(num_muestras):
valor = int(amplitud * math.sin(2 * math.pi * frecuencia * i / sample_rate))
audio.extend(struct.pack('<h', valor)) # little-endian, 16 bits
return bytes(audio)
def generar_melodia_bienvenida() -> bytes:
"""Generar melodía de bienvenida (Do-Re-Mi-Fa-Sol)"""
sample_rate = 16000
duracion_nota = 500 # ms por nota
notas = [261, 293, 329, 349, 392, 440] # Do, Re, Mi, Fa, Sol, La
audio = bytearray()
for nota in notas:
nota_audio = generar_tono(nota, duracion_nota, sample_rate, amplitud=12000)
audio.extend(nota_audio)
return bytes(audio)
def generar_saludo_personalizado(nombre: str) -> bytes:
"""
Generar un saludo personalizado (versión simple)
En un caso real, aquí usarías un servicio TTS
"""
# Usar frecuencia diferente según la longitud del nombre
frecuencia_base = 440
frecuencia = frecuencia_base + (len(nombre) * 10)
# Limitar frecuencia máxima
if frecuencia > 800:
frecuencia = 800
return generar_tono(frecuencia, duracion_ms=2000)
# ==================== MENÚ PRINCIPAL ====================
def mostrar_menu():
"""Muestra el menú principal"""
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE - CONTROL")
print("=" * 50)
print("\n📋 Opciones disponibles:")
print(" 1. 🔊 Enviar tono de prueba (440Hz)")
print(" 2. 🎵 Enviar melodía de bienvenida")
print(" 3. 💬 Enviar saludo personalizado")
print(" 4. 📊 Ver estado del ESP32")
print(" 5. 🏓 Probar ping")
print(" 6. 🔇 Detener reproducción")
print(" 7. 🔄 Reconectar")
print(" 8. 🚪 Salir")
print("-" * 50)
# ==================== MODO INTERACTIVO ====================
async def modo_interactivo():
"""Modo interactivo con entrada de IP y nombre"""
# Limpiar pantalla
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE")
print("=" * 50)
# Solicitar IP
print("\n📡 Configuración de conexión:")
ip_default = "192.168.15.128"
ip = input(f" IP del ESP32 [{ip_default}]: ").strip()
if not ip:
ip = ip_default
# Solicitar nombre por defecto para saludos
nombre_default = "Visitante"
nombre = input(f" Nombre por defecto [{nombre_default}]: ").strip()
if not nombre:
nombre = nombre_default
# Crear instancia
bocina = BocinaInteligente(ip)
# Conectar
print("\n🔄 Conectando...")
if not await bocina.conectar():
print("\n❌ No se pudo conectar al ESP32")
print(" Verifica que:")
print(f" 1. La IP {ip} sea correcta")
print(" 2. El ESP32 esté encendido")
print(" 3. Estés en la misma red WiFi")
input("\n Presiona Enter para salir...")
return
print("\n✅ ¡Conectado exitosamente!")
print(f" 📡 IP: {ip}")
print(f" 👤 Nombre: {nombre}")
# Bucle principal
while True:
mostrar_menu()
opcion = input("\n👉 Selecciona una opción (1-8): ").strip()
if opcion == "1":
print("\n🔊 Enviando tono de prueba (440Hz)...")
audio = generar_tono(440, 2000)
await bocina.enviar_audio(audio, "Tono 440Hz")
elif opcion == "2":
print("\n🎵 Enviando melodía de bienvenida...")
audio = generar_melodia_bienvenida()
await bocina.enviar_audio(audio, "Melodía")
elif opcion == "3":
# Pedir nombre específico para este saludo
nombre_saludo = input(f" 👤 Nombre (Enter para usar '{nombre}'): ").strip()
if not nombre_saludo:
nombre_saludo = nombre
print(f"\n🔊 Generando saludo para '{nombre_saludo}'...")
audio = generar_saludo_personalizado(nombre_saludo)
await bocina.enviar_audio(audio, nombre_saludo)
elif opcion == "4":
print("\n📊 Obteniendo estado del ESP32...")
estado = await bocina.obtener_estado()
if estado:
print("\n 📡 Estado del sistema:")
print(f" Status: {estado.get('status', 'desconocido')}")
print(f" 📦 Bytes recibidos: {estado.get('bytes_recibidos', 0)}")
print(f" 🔢 Chunks recibidos: {estado.get('chunks_recibidos', 0)}")
print(f" 🎵 Audio activo: {'✅ Sí' if estado.get('audio_activo') else '❌ No'}")
print(f" 📶 WiFi RSSI: {estado.get('wifi_rssi', 0)} dBm")
else:
print(" ❌ No se pudo obtener estado")
elif opcion == "5":
print("\n🏓 Probando ping...")
inicio = time.time()
if await bocina.ping():
latencia = (time.time() - inicio) * 1000
print(f" ✅ PONG recibido (latencia: {latencia:.0f}ms)")
else:
print(" ❌ Sin respuesta - verifica la conexión")
elif opcion == "6":
print("\n🔇 Deteniendo reproducción...")
if await bocina.detener():
print(" ✅ Reproducción detenida")
else:
print(" ⚠️ No se pudo detener o ya estaba detenido")
elif opcion == "7":
print("\n🔄 Reconectando...")
await bocina.desconectar()
await asyncio.sleep(1)
if await bocina.conectar():
print(" ✅ Reconectado exitosamente")
else:
print(" ❌ Error al reconectar")
elif opcion == "8":
print("\n👋 Saliendo...")
break
else:
print("❌ Opción inválida")
# Pequeña pausa antes de volver al menú
await asyncio.sleep(0.5)
# Cerrar conexión
await bocina.desconectar()
print("\n✅ Programa finalizado")
# ==================== MODO RÁPIDO ====================
async def modo_rapido():
"""Modo rápido: pide IP y nombre y envía saludo inmediato"""
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE - MODO RÁPIDO")
print("=" * 50)
# Solicitar IP
print("\n📡 Configuración:")
ip_default = "192.168.15.128"
ip = input(f" IP del ESP32 [{ip_default}]: ").strip()
if not ip:
ip = ip_default
# Solicitar nombre
nombre = input(" 👤 Nombre de la persona: ").strip()
if not nombre:
nombre = "Visitante"
# Conectar y enviar
bocina = BocinaInteligente(ip)
print("\n🔄 Conectando...")
if not await bocina.conectar():
print("❌ No se pudo conectar")
input("\nPresiona Enter para salir...")
return
print(f"\n🔊 Enviando saludo para '{nombre}'...")
audio = generar_saludo_personalizado(nombre)
await bocina.enviar_audio(audio, nombre)
# Mostrar estado
await asyncio.sleep(1)
estado = await bocina.obtener_estado()
if estado:
print(f"\n📊 Enviados: {estado.get('bytes_recibidos', 0)} bytes")
await bocina.desconectar()
print("\n✅ Saludo enviado!")
input("\nPresiona Enter para salir...")
# ==================== MAIN ====================
async def main():
"""Función principal"""
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE v1.0")
print("=" * 50)
print("\nSelecciona modo de operación:")
print(" 1. 🎮 Modo interactivo (menú completo)")
print(" 2. ⚡ Modo rápido (solo enviar saludo)")
print(" 3. 🚪 Salir")
modo = input("\n👉 Opción (1-3): ").strip()
if modo == "1":
await modo_interactivo()
elif modo == "2":
await modo_rapido()
else:
print("\n👋 Hasta luego!")
return
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n👋 Programa interrumpido")
except Exception as e:
print(f"\n❌ Error: {e}")
input("\nPresiona Enter para salir...")

File diff suppressed because it is too large Load Diff