Compare commits

..

3 Commits

Author SHA1 Message Date
ede8d8dda8 Estructura core, configuración de bocina over ip e integración de .gitignore 2026-04-10 13:24:14 -06:00
5a52018b03 preubas 2026-04-08 11:01:55 -06:00
aa2132f3cf Cambios yolo en gpu 2026-04-08 11:00:23 -06:00
22 changed files with 3161 additions and 2207 deletions

4
.gitignore vendored
View File

@ -3,6 +3,10 @@
__pycache__/
*.py[cod]
*$py.class
ven2/
.venv/
__pycache__/
*.pkl
# C extensions
*.so

BIN
bus.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 134 KiB

Binary file not shown.

26
comandos.txt Normal file
View File

@ -0,0 +1,26 @@
# Base estable
pip install numpy==1.26.4
# OpenCV compatible con numpy 1.x
pip install opencv-python==4.8.1.78
pip install torch==2.1.2 torchvision==0.16.2 torchaudio==2.1.2 --index-url https://download.pytorch.org/whl/cu118
pip install ultralytics --no-deps
pip install opencv-python==4.8.1.78 matplotlib pyyaml scipy requests pillow
pip install tensorflow==2.21
pip install tf-keras
pip install deepface
pip install onnxruntime
pip install edge-tts
pip install numpy pandas
sudo apt install libxcb-xinerama0
sudo apt install fonts-dejavu
QT_DEBUG_PLUGINS=0 python fusion.py
pip cache purge
python -c "import torch; print(torch.cuda.is_available())"
python -c "import torch; print(torch.cuda.get_device_name(0))"

View File

@ -0,0 +1,13 @@
[ESP32]
ip = 192.168.15.128
puerto = 81
[Audio]
duracion_ms = 2000
tono_base = 440
amplitud = 16000
[General]
timeout = 5
reconectar = true

78
configurar_bocina.py Normal file
View File

@ -0,0 +1,78 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script para configurar la bocina ESP32
Permite cambiar IP, puerto, duración, etc.
"""
import sys
import os
# Agregar el proyecto al path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from core.speaker_iot import configurar_ip, mostrar_configuracion
from core.speaker_iot.config import config
def main():
print("\n" + "=" * 50)
print(" 🎵 CONFIGURACIÓN DE BOCINA IoT")
print("=" * 50)
mostrar_configuracion()
print("\n" + "-" * 50)
print("¿Qué deseas configurar?")
print(" 1. Cambiar IP")
print(" 2. Cambiar puerto")
print(" 3. Cambiar duración del audio")
print(" 4. Ver configuración actual")
print(" 5. Restaurar valores por defecto")
print(" 6. Salir")
opcion = input("\n👉 Opción (1-6): ").strip()
if opcion == "1":
nueva_ip = input("📡 Nueva IP: ").strip()
if nueva_ip:
config.actualizar_ip(nueva_ip)
print(f"✅ IP actualizada a: {nueva_ip}")
elif opcion == "2":
nuevo_puerto = input("🔌 Nuevo puerto [81]: ").strip()
if nuevo_puerto:
config.config.set("ESP32", "puerto", nuevo_puerto)
with open(config.CONFIG_FILE, 'w') as f:
config.config.write(f)
print(f"✅ Puerto actualizado a: {nuevo_puerto}")
elif opcion == "3":
nueva_duracion = input("⏱️ Nueva duración en ms [2000]: ").strip()
if nueva_duracion:
config.config.set("Audio", "duracion_ms", nueva_duracion)
with open(config.CONFIG_FILE, 'w') as f:
config.config.write(f)
print(f"✅ Duración actualizada a: {nueva_duracion}ms")
elif opcion == "4":
mostrar_configuracion()
elif opcion == "5":
confirmar = input("⚠️ ¿Restaurar configuración por defecto? (s/n): ").strip().lower()
if confirmar == 's':
config._crear_configuracion_default()
print("✅ Configuración restaurada")
mostrar_configuracion()
elif opcion == "6":
print("\n👋 Hasta luego!")
return
else:
print("❌ Opción inválida")
print("\n✅ Configuración guardada!")
input("\nPresiona Enter para salir...")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,21 @@
"""
Speaker IoT - Módulo para controlar bocina ESP32
"""
from .bocina_core import (
BocinaCore,
saludar,
detener,
obtener_estado,
configurar_ip,
mostrar_configuracion
)
__all__ = [
'BocinaCore',
'saludar',
'detener',
'obtener_estado',
'configurar_ip',
'mostrar_configuracion'
]

View File

@ -0,0 +1,224 @@
"""
BOCINA CORE - Módulo para controlar la bocina ESP32
"""
import asyncio
import websockets
import json
import struct
import math
from typing import Optional, Dict, Any
from .config import config
# ==================== CONSTANTES ====================
CHUNK_SIZE = 1024
SAMPLE_RATE = 16000
# ==================== CLASE PRINCIPAL ====================
class BocinaCore:
"""Clase principal para controlar la bocina ESP32"""
def __init__(self, ip: str = None, puerto: int = None):
"""
Inicializa el controlador de la bocina
Args:
ip: IP del ESP32 (si es None, usa la del archivo de configuración)
puerto: Puerto WebSocket (si es None, usa el del archivo)
"""
self.ip = ip or config.obtener_ip()
self.puerto = puerto or config.obtener_puerto()
self.url = f"ws://{self.ip}:{self.puerto}"
self.duracion_ms = config.obtener_duracion()
self.tono_base = config.obtener_tono_base()
self.amplitud = config.obtener_amplitud()
self.timeout = config.obtener_timeout()
self._websocket = None
self._conectado = False
# ==================== MÉTODOS PÚBLICOS ====================
def saludar(self, nombre: str, duracion_ms: int = None, tono_personalizado: bool = True) -> bool:
"""
Envía un saludo a la bocina
Args:
nombre: Nombre de la persona
duracion_ms: Duración del saludo (None = usa config)
tono_personalizado: Si True, varía el tono según el nombre
"""
duracion = duracion_ms or self.duracion_ms
return self._ejecutar_async(self._saludar_async(nombre, duracion, tono_personalizado))
def detener(self) -> bool:
"""Detiene la reproducción actual"""
return self._ejecutar_async(self._detener_async())
def estado(self) -> Dict[str, Any]:
"""Obtiene el estado actual de la bocina"""
return self._ejecutar_async(self._estado_async())
def ping(self) -> bool:
"""Prueba la conexión con la bocina"""
return self._ejecutar_async(self._ping_async())
def conectar(self) -> bool:
"""Establece conexión manual con la bocina"""
return self._ejecutar_async(self._conectar_async())
def desconectar(self) -> bool:
"""Cierra la conexión con la bocina"""
return self._ejecutar_async(self._desconectar_async())
# ==================== MÉTODOS INTERNOS ====================
def _ejecutar_async(self, corutina):
"""Ejecuta una función asíncrona desde código síncrono"""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
resultado = loop.run_until_complete(corutina)
loop.close()
return resultado
else:
return loop.run_until_complete(corutina)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
resultado = loop.run_until_complete(corutina)
loop.close()
return resultado
async def _conectar_async(self) -> bool:
"""Conexión asíncrona"""
try:
self._websocket = await websockets.connect(self.url, timeout=self.timeout)
await self._websocket.recv()
self._conectado = True
return True
except Exception:
self._conectado = False
return False
async def _desconectar_async(self) -> bool:
"""Desconexión asíncrona"""
if self._websocket:
await self._websocket.close()
self._websocket = None
self._conectado = False
return True
async def _asegurar_conexion(self) -> bool:
"""Asegura que haya una conexión activa"""
if not self._conectado or not self._websocket:
return await self._conectar_async()
return True
async def _saludar_async(self, nombre: str, duracion_ms: int, tono_personalizado: bool) -> bool:
"""Enviar saludo asíncrono"""
if not await self._asegurar_conexion():
return False
try:
audio = self._generar_audio(nombre, duracion_ms, tono_personalizado)
for i in range(0, len(audio), CHUNK_SIZE):
chunk = audio[i:i + CHUNK_SIZE]
await self._websocket.send(chunk)
await asyncio.sleep(0.005)
return True
except Exception:
return False
async def _detener_async(self) -> bool:
"""Detener reproducción asíncrono"""
if not await self._asegurar_conexion():
return False
try:
await self._websocket.send(json.dumps({"cmd": "STOP"}))
await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout)
return True
except Exception:
return False
async def _estado_async(self) -> Dict[str, Any]:
"""Obtener estado asíncrono"""
if not await self._asegurar_conexion():
return {}
try:
await self._websocket.send(json.dumps({"cmd": "STATUS"}))
respuesta = await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout)
return json.loads(respuesta)
except Exception:
return {}
async def _ping_async(self) -> bool:
"""Ping asíncrono"""
if not await self._asegurar_conexion():
return False
try:
await self._websocket.send(json.dumps({"cmd": "PING"}))
respuesta = await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout)
data = json.loads(respuesta)
return data.get("status") == "ok"
except Exception:
return False
def _generar_audio(self, nombre: str, duracion_ms: int, tono_personalizado: bool) -> bytes:
"""Genera audio PCM para el saludo"""
num_muestras = int(SAMPLE_RATE * duracion_ms / 1000)
if tono_personalizado:
frecuencia = self.tono_base + (len(nombre) * 10)
if frecuencia > 800:
frecuencia = 800
else:
frecuencia = self.tono_base
audio = bytearray()
for i in range(num_muestras):
valor = int(self.amplitud * math.sin(2 * math.pi * frecuencia * i / SAMPLE_RATE))
audio.extend(struct.pack('<h', valor))
return bytes(audio)
# ==================== FUNCIONES SIMPLES (API RÁPIDA) ====================
def saludar(nombre: str, ip: str = None) -> bool:
"""Función rápida para saludar a una persona"""
bocina = BocinaCore(ip)
return bocina.saludar(nombre)
def detener(ip: str = None) -> bool:
"""Detiene la reproducción"""
bocina = BocinaCore(ip)
return bocina.detener()
def obtener_estado(ip: str = None) -> dict:
"""Obtiene el estado de la bocina"""
bocina = BocinaCore(ip)
return bocina.estado()
def configurar_ip(nueva_ip: str):
"""Actualiza la IP en el archivo de configuración"""
from .config import config
config.actualizar_ip(nueva_ip)
def mostrar_configuracion():
"""Muestra la configuración actual"""
from .config import config
config.mostrar_configuracion()

108
core/speaker_iot/config.py Normal file
View File

@ -0,0 +1,108 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Módulo de configuración para la bocina ESP32
"""
import os
import configparser
from pathlib import Path
# ==================== RUTAS ====================
BASE_DIR = Path(__file__).resolve().parent.parent.parent
CONFIG_DIR = BASE_DIR / "config" / "speaker_iot"
CONFIG_FILE = CONFIG_DIR / "settings.ini"
# ==================== CONFIGURACIÓN POR DEFECTO ====================
DEFAULT_CONFIG = {
"ESP32": {
"ip": "192.168.15.128",
"puerto": "81"
},
"Audio": {
"duracion_ms": "2000",
"tono_base": "440",
"amplitud": "16000"
},
"General": {
"timeout": "5",
"reconectar": "true"
}
}
class ConfiguracionBocina:
"""Gestor de configuración para la bocina"""
def __init__(self):
self.config = configparser.ConfigParser()
self._cargar_configuracion()
def _cargar_configuracion(self):
"""Carga la configuración desde el archivo o crea uno por defecto"""
if CONFIG_FILE.exists():
self.config.read(CONFIG_FILE, encoding='utf-8')
else:
self._crear_configuracion_default()
def _crear_configuracion_default(self):
"""Crea archivo de configuración por defecto"""
# Crear directorio si no existe
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
# Cargar valores por defecto
for seccion, valores in DEFAULT_CONFIG.items():
self.config[seccion] = valores
# Guardar archivo
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
self.config.write(f)
print(f"📝 Configuración creada en: {CONFIG_FILE}")
def obtener_ip(self) -> str:
"""Obtiene la IP de la bocina"""
return self.config.get("ESP32", "ip", fallback="192.168.15.128")
def obtener_puerto(self) -> int:
"""Obtiene el puerto de la bocina"""
return self.config.getint("ESP32", "puerto", fallback=81)
def obtener_duracion(self) -> int:
"""Obtiene duración del audio en ms"""
return self.config.getint("Audio", "duracion_ms", fallback=2000)
def obtener_tono_base(self) -> int:
"""Obtiene frecuencia base del tono"""
return self.config.getint("Audio", "tono_base", fallback=440)
def obtener_amplitud(self) -> int:
"""Obtiene amplitud del audio"""
return self.config.getint("Audio", "amplitud", fallback=16000)
def obtener_timeout(self) -> int:
"""Obtiene timeout en segundos"""
return self.config.getint("General", "timeout", fallback=5)
def reconectar_auto(self) -> bool:
"""Obtiene si debe reconectar automáticamente"""
return self.config.getboolean("General", "reconectar", fallback=True)
def actualizar_ip(self, nueva_ip: str):
"""Actualiza la IP de la bocina"""
self.config.set("ESP32", "ip", nueva_ip)
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
self.config.write(f)
print(f"✅ IP actualizada a: {nueva_ip}")
def mostrar_configuracion(self):
"""Muestra la configuración actual"""
print("\n📡 Configuración actual:")
print(f" IP: {self.obtener_ip()}:{self.obtener_puerto()}")
print(f" Duración: {self.obtener_duracion()}ms")
print(f" Tono base: {self.obtener_tono_base()}Hz")
print(f" Timeout: {self.obtener_timeout()}s")
# Instancia global
config = ConfiguracionBocina()

View File

@ -0,0 +1,60 @@
# Solo importas lo que necesitas
from core.speaker_iot import saludar, detener, obtener_estado
# ===== EJEMPLO 1: Cuando detectas una persona =====
def mi_detector():
nombre = "Ana" # Tu IA obtiene el nombre
# Enviar saludo (¡una sola línea!)
saludar(nombre)
# También puedes verificar si funcionó
if saludar(nombre):
print(f"✅ Saludo enviado a {nombre}")
else:
print(f"❌ Error al enviar saludo a {nombre}")
# ===== EJEMPLO 2: Dentro de tu loop principal =====
while True:
persona = detectar_persona() # Tu función de detección
if persona:
nombre = obtener_nombre(persona) # Tu base de datos
saludar(nombre) # Envía el saludo
# ===== EJEMPLO 3: Clase completa =====
class MiSistemaIA:
def __init__(self):
self.bocina_ip = "192.168.15.128" # O usa la del config
def on_persona_detectada(self, persona):
nombre = self.obtener_nombre(persona)
if nombre:
print(f"🎉 Detectada: {nombre}")
saludar(nombre) # ¡Así de simple!
def obtener_nombre(self, persona):
# Tu lógica para obtener nombre
return persona.get("nombre", "Visitante")
# ===== Ejemplo completo de integración =====
class SistemaSeguridad:
def __init__(self):
self.personas_conocidas = ["Ana", "Carlos", "Maria"]
print("✅ Sistema iniciado - Bocina lista")
def detectar(self, nombre):
if nombre in self.personas_conocidas:
print(f"🔔 ¡Bienvenido {nombre}!")
saludar(nombre) # Envía saludo
return True
else:
print(f"⚠️ Persona no registrada: {nombre}")
return False
# Uso
sistema = SistemaSeguridad()
sistema.detectar("Ana") # Reproduce sonido
sistema.detectar("Luis") # No reproduce

View File

@ -0,0 +1,409 @@
"""
BOCINA INTELIGENTE - CLIENTE PYTHON
====================================
Cliente para enviar audio y comandos a la bocina ESP32 via WebSocket
Uso:
python bocina_client.py
(Luego ingresa la IP y el nombre cuando se solicite)
"""
import asyncio
import websockets
import json
import struct
import math
import sys
import time
import os
from typing import Optional
# ==================== LIMPIAR PANTALLA ====================
def limpiar_pantalla():
"""Limpia la consola según el sistema operativo"""
os.system('cls' if os.name == 'nt' else 'clear')
# ==================== CLASE BOCINA ====================
class BocinaInteligente:
"""Cliente para controlar la bocina inteligente ESP32"""
def __init__(self, ip: str, puerto: int = 81):
self.ip = ip
self.puerto = puerto
self.url = f"ws://{ip}:{puerto}"
self.websocket: Optional[websockets.WebSocketClientProtocol] = None
self.chunk_size = 1024
self.timeout = 5
self.conectado = False
async def conectar(self) -> bool:
"""Conectar al ESP32"""
try:
print(f"🔌 Conectando a {self.url}...")
self.websocket = await websockets.connect(self.url)
# Esperar mensaje de bienvenida
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
if data.get("status") == "ok":
print(f"{data.get('msg')}")
self.conectado = True
return True
except Exception as e:
print(f" ❌ Error: {e}")
return False
return False
async def desconectar(self):
"""Cerrar conexión"""
if self.websocket:
await self.websocket.close()
self.conectado = False
print("🔌 Conexión cerrada")
async def ping(self) -> bool:
"""Probar conexión con el ESP32"""
if not self.websocket:
return False
try:
await self.websocket.send(json.dumps({"cmd": "PING"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
return data.get("status") == "ok"
except:
return False
async def obtener_estado(self) -> dict:
"""Obtener estadísticas del ESP32"""
if not self.websocket:
return {}
try:
await self.websocket.send(json.dumps({"cmd": "STATUS"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
return json.loads(response)
except:
return {}
async def detener(self) -> bool:
"""Detener reproducción"""
if not self.websocket:
return False
try:
await self.websocket.send(json.dumps({"cmd": "STOP"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
return data.get("status") == "ok"
except:
return False
async def enviar_audio(self, audio_data: bytes, nombre: str = "audio") -> bool:
"""
Enviar audio al ESP32
Args:
audio_data: Datos de audio en formato PCM (16kHz, 16bits, mono)
nombre: Nombre identificador (para logs)
"""
if not self.websocket:
print("❌ No hay conexión")
return False
total_chunks = (len(audio_data) + self.chunk_size - 1) // self.chunk_size
print(f"📤 Enviando {total_chunks} chunks ({len(audio_data)} bytes) para '{nombre}'")
inicio = time.time()
for i, chunk_start in enumerate(range(0, len(audio_data), self.chunk_size)):
chunk = audio_data[chunk_start:chunk_start + self.chunk_size]
await self.websocket.send(chunk)
# Mostrar progreso cada 10 chunks o al final
if (i + 1) % 10 == 0 or i == total_chunks - 1:
porcentaje = ((i + 1) * 100) // total_chunks
print(f" 📊 Progreso: {porcentaje}% ({i+1}/{total_chunks} chunks)")
# Pequeña pausa para no saturar
await asyncio.sleep(0.005)
elapsed = time.time() - inicio
print(f"✅ Audio enviado en {elapsed:.2f} segundos")
return True
# ==================== GENERADORES DE AUDIO ====================
def generar_tono(frecuencia: int = 440, duracion_ms: int = 2000,
sample_rate: int = 16000, amplitud: int = 16000) -> bytes:
"""
Generar un tono seno en formato PCM
Args:
frecuencia: Frecuencia del tono en Hz
duracion_ms: Duración en milisegundos
sample_rate: Frecuencia de muestreo
amplitud: Amplitud máxima (0-32767)
"""
num_muestras = int(sample_rate * duracion_ms / 1000)
audio = bytearray()
for i in range(num_muestras):
valor = int(amplitud * math.sin(2 * math.pi * frecuencia * i / sample_rate))
audio.extend(struct.pack('<h', valor)) # little-endian, 16 bits
return bytes(audio)
def generar_melodia_bienvenida() -> bytes:
"""Generar melodía de bienvenida (Do-Re-Mi-Fa-Sol)"""
sample_rate = 16000
duracion_nota = 500 # ms por nota
notas = [261, 293, 329, 349, 392, 440] # Do, Re, Mi, Fa, Sol, La
audio = bytearray()
for nota in notas:
nota_audio = generar_tono(nota, duracion_nota, sample_rate, amplitud=12000)
audio.extend(nota_audio)
return bytes(audio)
def generar_saludo_personalizado(nombre: str) -> bytes:
"""
Generar un saludo personalizado (versión simple)
En un caso real, aquí usarías un servicio TTS
"""
# Usar frecuencia diferente según la longitud del nombre
frecuencia_base = 440
frecuencia = frecuencia_base + (len(nombre) * 10)
# Limitar frecuencia máxima
if frecuencia > 800:
frecuencia = 800
return generar_tono(frecuencia, duracion_ms=2000)
# ==================== MENÚ PRINCIPAL ====================
def mostrar_menu():
"""Muestra el menú principal"""
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE - CONTROL")
print("=" * 50)
print("\n📋 Opciones disponibles:")
print(" 1. 🔊 Enviar tono de prueba (440Hz)")
print(" 2. 🎵 Enviar melodía de bienvenida")
print(" 3. 💬 Enviar saludo personalizado")
print(" 4. 📊 Ver estado del ESP32")
print(" 5. 🏓 Probar ping")
print(" 6. 🔇 Detener reproducción")
print(" 7. 🔄 Reconectar")
print(" 8. 🚪 Salir")
print("-" * 50)
# ==================== MODO INTERACTIVO ====================
async def modo_interactivo():
"""Modo interactivo con entrada de IP y nombre"""
# Limpiar pantalla
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE")
print("=" * 50)
# Solicitar IP
print("\n📡 Configuración de conexión:")
ip_default = "192.168.15.128"
ip = input(f" IP del ESP32 [{ip_default}]: ").strip()
if not ip:
ip = ip_default
# Solicitar nombre por defecto para saludos
nombre_default = "Visitante"
nombre = input(f" Nombre por defecto [{nombre_default}]: ").strip()
if not nombre:
nombre = nombre_default
# Crear instancia
bocina = BocinaInteligente(ip)
# Conectar
print("\n🔄 Conectando...")
if not await bocina.conectar():
print("\n❌ No se pudo conectar al ESP32")
print(" Verifica que:")
print(f" 1. La IP {ip} sea correcta")
print(" 2. El ESP32 esté encendido")
print(" 3. Estés en la misma red WiFi")
input("\n Presiona Enter para salir...")
return
print("\n✅ ¡Conectado exitosamente!")
print(f" 📡 IP: {ip}")
print(f" 👤 Nombre: {nombre}")
# Bucle principal
while True:
mostrar_menu()
opcion = input("\n👉 Selecciona una opción (1-8): ").strip()
if opcion == "1":
print("\n🔊 Enviando tono de prueba (440Hz)...")
audio = generar_tono(440, 2000)
await bocina.enviar_audio(audio, "Tono 440Hz")
elif opcion == "2":
print("\n🎵 Enviando melodía de bienvenida...")
audio = generar_melodia_bienvenida()
await bocina.enviar_audio(audio, "Melodía")
elif opcion == "3":
# Pedir nombre específico para este saludo
nombre_saludo = input(f" 👤 Nombre (Enter para usar '{nombre}'): ").strip()
if not nombre_saludo:
nombre_saludo = nombre
print(f"\n🔊 Generando saludo para '{nombre_saludo}'...")
audio = generar_saludo_personalizado(nombre_saludo)
await bocina.enviar_audio(audio, nombre_saludo)
elif opcion == "4":
print("\n📊 Obteniendo estado del ESP32...")
estado = await bocina.obtener_estado()
if estado:
print("\n 📡 Estado del sistema:")
print(f" Status: {estado.get('status', 'desconocido')}")
print(f" 📦 Bytes recibidos: {estado.get('bytes_recibidos', 0)}")
print(f" 🔢 Chunks recibidos: {estado.get('chunks_recibidos', 0)}")
print(f" 🎵 Audio activo: {'✅ Sí' if estado.get('audio_activo') else '❌ No'}")
print(f" 📶 WiFi RSSI: {estado.get('wifi_rssi', 0)} dBm")
else:
print(" ❌ No se pudo obtener estado")
elif opcion == "5":
print("\n🏓 Probando ping...")
inicio = time.time()
if await bocina.ping():
latencia = (time.time() - inicio) * 1000
print(f" ✅ PONG recibido (latencia: {latencia:.0f}ms)")
else:
print(" ❌ Sin respuesta - verifica la conexión")
elif opcion == "6":
print("\n🔇 Deteniendo reproducción...")
if await bocina.detener():
print(" ✅ Reproducción detenida")
else:
print(" ⚠️ No se pudo detener o ya estaba detenido")
elif opcion == "7":
print("\n🔄 Reconectando...")
await bocina.desconectar()
await asyncio.sleep(1)
if await bocina.conectar():
print(" ✅ Reconectado exitosamente")
else:
print(" ❌ Error al reconectar")
elif opcion == "8":
print("\n👋 Saliendo...")
break
else:
print("❌ Opción inválida")
# Pequeña pausa antes de volver al menú
await asyncio.sleep(0.5)
# Cerrar conexión
await bocina.desconectar()
print("\n✅ Programa finalizado")
# ==================== MODO RÁPIDO ====================
async def modo_rapido():
"""Modo rápido: pide IP y nombre y envía saludo inmediato"""
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE - MODO RÁPIDO")
print("=" * 50)
# Solicitar IP
print("\n📡 Configuración:")
ip_default = "192.168.15.128"
ip = input(f" IP del ESP32 [{ip_default}]: ").strip()
if not ip:
ip = ip_default
# Solicitar nombre
nombre = input(" 👤 Nombre de la persona: ").strip()
if not nombre:
nombre = "Visitante"
# Conectar y enviar
bocina = BocinaInteligente(ip)
print("\n🔄 Conectando...")
if not await bocina.conectar():
print("❌ No se pudo conectar")
input("\nPresiona Enter para salir...")
return
print(f"\n🔊 Enviando saludo para '{nombre}'...")
audio = generar_saludo_personalizado(nombre)
await bocina.enviar_audio(audio, nombre)
# Mostrar estado
await asyncio.sleep(1)
estado = await bocina.obtener_estado()
if estado:
print(f"\n📊 Enviados: {estado.get('bytes_recibidos', 0)} bytes")
await bocina.desconectar()
print("\n✅ Saludo enviado!")
input("\nPresiona Enter para salir...")
# ==================== MAIN ====================
async def main():
"""Función principal"""
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE v1.0")
print("=" * 50)
print("\nSelecciona modo de operación:")
print(" 1. 🎮 Modo interactivo (menú completo)")
print(" 2. ⚡ Modo rápido (solo enviar saludo)")
print(" 3. 🚪 Salir")
modo = input("\n👉 Opción (1-3): ").strip()
if modo == "1":
await modo_interactivo()
elif modo == "2":
await modo_rapido()
else:
print("\n👋 Hasta luego!")
return
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n👋 Programa interrumpido")
except Exception as e:
print(f"\n❌ Error: {e}")
input("\nPresiona Enter para salir...")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 4.7 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.0 KiB

View File

@ -10,6 +10,9 @@ from queue import Queue
from deepface import DeepFace
from ultralytics import YOLO
import warnings
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Usando dispositivo: {device}")
warnings.filterwarnings("ignore")
@ -295,7 +298,7 @@ def dibujar_track_fusion(frame_show, trk, global_mem):
def main():
print("\nIniciando Sistema")
model = YOLO("yolov8n.pt")
model = YOLO("yolov8n.pt").to("cuda")
global_mem = GlobalMemory()
managers = {str(c): CamManager(c, global_mem) for c in SECUENCIA}
cams = [CamStream(u) for u in URLS]
@ -303,7 +306,7 @@ def main():
for _ in range(2):
threading.Thread(target=worker_rostros, args=(global_mem,), daemon=True).start()
cv2.namedWindow("SmartSoft", cv2.WINDOW_AUTOSIZE)
cv2.namedWindow("SmartSoft", cv2.WINDOW_NORMAL)
idx = 0
while True:

View File

@ -14,6 +14,14 @@ import subprocess
from datetime import datetime
import warnings
import urllib.request
import torch
if torch.cuda.is_available():
device = "cuda"
print("GPU detectada → usando GPU 🚀")
else:
device = "cpu"
print("GPU no disponible → usando CPU ⚠️")
warnings.filterwarnings("ignore")
@ -229,8 +237,8 @@ def gestionar_vectores(actualizar=False):
res = DeepFace.represent(
img_path=img_mejorada,
model_name="ArcFace",
detector_backend="mtcnn",
align=True,
detector_backend="opencv",
align=False,
enforce_detection=True
)
emb = np.array(res[0]["embedding"], dtype=np.float32)

Binary file not shown.

After

Width:  |  Height:  |  Size: 357 KiB