From ede8d8dda85b2ba266c7a64a1bb708103516d71e Mon Sep 17 00:00:00 2001 From: ecuellar Date: Fri, 10 Apr 2026 13:24:14 -0600 Subject: [PATCH] =?UTF-8?q?Estructura=20core,=20configuraci=C3=B3n=20de=20?= =?UTF-8?q?bocina=20over=20ip=20e=20integraci=C3=B3n=20de=20.gitignore?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 6 +- config/speaker_iot/settings.ini | 13 + configurar_bocina.py | 78 ++++++ core/speaker_iot/__init__.py | 21 ++ core/speaker_iot/bocina_core.py | 224 +++++++++++++++++ core/speaker_iot/config.py | 108 +++++++++ core/speaker_iot/ejemplo.py | 60 +++++ core/speaker_iot/settings.ini | 0 core/speaker_iot/test_esp32.py | 409 ++++++++++++++++++++++++++++++++ 9 files changed, 918 insertions(+), 1 deletion(-) create mode 100644 config/speaker_iot/settings.ini create mode 100644 configurar_bocina.py create mode 100644 core/speaker_iot/__init__.py create mode 100644 core/speaker_iot/bocina_core.py create mode 100644 core/speaker_iot/config.py create mode 100644 core/speaker_iot/ejemplo.py create mode 100644 core/speaker_iot/settings.ini create mode 100644 core/speaker_iot/test_esp32.py diff --git a/.gitignore b/.gitignore index d214f2e..0e2db4e 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,10 @@ __pycache__/ *.py[cod] *$py.class +ven2/ +.venv/ +__pycache__/ +*.pkl # C extensions *.so @@ -171,4 +175,4 @@ ia_env/ # ──────────────────────────────────────────────────────── *.pt - *.onnx \ No newline at end of file + *.onnx diff --git a/config/speaker_iot/settings.ini b/config/speaker_iot/settings.ini new file mode 100644 index 0000000..4196b2d --- /dev/null +++ b/config/speaker_iot/settings.ini @@ -0,0 +1,13 @@ +[ESP32] +ip = 192.168.15.128 +puerto = 81 + +[Audio] +duracion_ms = 2000 +tono_base = 440 +amplitud = 16000 + +[General] +timeout = 5 +reconectar = true + diff --git a/configurar_bocina.py b/configurar_bocina.py new file mode 100644 index 0000000..2c5842f --- /dev/null +++ b/configurar_bocina.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Script para configurar la bocina ESP32 +Permite cambiar IP, puerto, duración, etc. +""" + +import sys +import os + +# Agregar el proyecto al path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from core.speaker_iot import configurar_ip, mostrar_configuracion +from core.speaker_iot.config import config + +def main(): + print("\n" + "=" * 50) + print(" 🎵 CONFIGURACIÓN DE BOCINA IoT") + print("=" * 50) + + mostrar_configuracion() + + print("\n" + "-" * 50) + print("¿Qué deseas configurar?") + print(" 1. Cambiar IP") + print(" 2. Cambiar puerto") + print(" 3. Cambiar duración del audio") + print(" 4. Ver configuración actual") + print(" 5. Restaurar valores por defecto") + print(" 6. Salir") + + opcion = input("\n👉 Opción (1-6): ").strip() + + if opcion == "1": + nueva_ip = input("📡 Nueva IP: ").strip() + if nueva_ip: + config.actualizar_ip(nueva_ip) + print(f"✅ IP actualizada a: {nueva_ip}") + + elif opcion == "2": + nuevo_puerto = input("🔌 Nuevo puerto [81]: ").strip() + if nuevo_puerto: + config.config.set("ESP32", "puerto", nuevo_puerto) + with open(config.CONFIG_FILE, 'w') as f: + config.config.write(f) + print(f"✅ Puerto actualizado a: {nuevo_puerto}") + + elif opcion == "3": + nueva_duracion = input("⏱️ Nueva duración en ms [2000]: ").strip() + if nueva_duracion: + config.config.set("Audio", "duracion_ms", nueva_duracion) + with open(config.CONFIG_FILE, 'w') as f: + config.config.write(f) + print(f"✅ Duración actualizada a: {nueva_duracion}ms") + + elif opcion == "4": + mostrar_configuracion() + + elif opcion == "5": + confirmar = input("⚠️ ¿Restaurar configuración por defecto? (s/n): ").strip().lower() + if confirmar == 's': + config._crear_configuracion_default() + print("✅ Configuración restaurada") + mostrar_configuracion() + + elif opcion == "6": + print("\n👋 Hasta luego!") + return + + else: + print("❌ Opción inválida") + + print("\n✅ Configuración guardada!") + input("\nPresiona Enter para salir...") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/core/speaker_iot/__init__.py b/core/speaker_iot/__init__.py new file mode 100644 index 0000000..2fc1e11 --- /dev/null +++ b/core/speaker_iot/__init__.py @@ -0,0 +1,21 @@ +""" +Speaker IoT - Módulo para controlar bocina ESP32 +""" + +from .bocina_core import ( + BocinaCore, + saludar, + detener, + obtener_estado, + configurar_ip, + mostrar_configuracion +) + +__all__ = [ + 'BocinaCore', + 'saludar', + 'detener', + 'obtener_estado', + 'configurar_ip', + 'mostrar_configuracion' +] \ No newline at end of file diff --git a/core/speaker_iot/bocina_core.py b/core/speaker_iot/bocina_core.py new file mode 100644 index 0000000..a915dce --- /dev/null +++ b/core/speaker_iot/bocina_core.py @@ -0,0 +1,224 @@ +""" +BOCINA CORE - Módulo para controlar la bocina ESP32 +""" + +import asyncio +import websockets +import json +import struct +import math +from typing import Optional, Dict, Any + +from .config import config + +# ==================== CONSTANTES ==================== +CHUNK_SIZE = 1024 +SAMPLE_RATE = 16000 + + +# ==================== CLASE PRINCIPAL ==================== +class BocinaCore: + """Clase principal para controlar la bocina ESP32""" + + def __init__(self, ip: str = None, puerto: int = None): + """ + Inicializa el controlador de la bocina + + Args: + ip: IP del ESP32 (si es None, usa la del archivo de configuración) + puerto: Puerto WebSocket (si es None, usa el del archivo) + """ + self.ip = ip or config.obtener_ip() + self.puerto = puerto or config.obtener_puerto() + self.url = f"ws://{self.ip}:{self.puerto}" + self.duracion_ms = config.obtener_duracion() + self.tono_base = config.obtener_tono_base() + self.amplitud = config.obtener_amplitud() + self.timeout = config.obtener_timeout() + + self._websocket = None + self._conectado = False + + # ==================== MÉTODOS PÚBLICOS ==================== + + def saludar(self, nombre: str, duracion_ms: int = None, tono_personalizado: bool = True) -> bool: + """ + Envía un saludo a la bocina + + Args: + nombre: Nombre de la persona + duracion_ms: Duración del saludo (None = usa config) + tono_personalizado: Si True, varía el tono según el nombre + """ + duracion = duracion_ms or self.duracion_ms + return self._ejecutar_async(self._saludar_async(nombre, duracion, tono_personalizado)) + + def detener(self) -> bool: + """Detiene la reproducción actual""" + return self._ejecutar_async(self._detener_async()) + + def estado(self) -> Dict[str, Any]: + """Obtiene el estado actual de la bocina""" + return self._ejecutar_async(self._estado_async()) + + def ping(self) -> bool: + """Prueba la conexión con la bocina""" + return self._ejecutar_async(self._ping_async()) + + def conectar(self) -> bool: + """Establece conexión manual con la bocina""" + return self._ejecutar_async(self._conectar_async()) + + def desconectar(self) -> bool: + """Cierra la conexión con la bocina""" + return self._ejecutar_async(self._desconectar_async()) + + # ==================== MÉTODOS INTERNOS ==================== + + def _ejecutar_async(self, corutina): + """Ejecuta una función asíncrona desde código síncrono""" + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + resultado = loop.run_until_complete(corutina) + loop.close() + return resultado + else: + return loop.run_until_complete(corutina) + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + resultado = loop.run_until_complete(corutina) + loop.close() + return resultado + + async def _conectar_async(self) -> bool: + """Conexión asíncrona""" + try: + self._websocket = await websockets.connect(self.url, timeout=self.timeout) + await self._websocket.recv() + self._conectado = True + return True + except Exception: + self._conectado = False + return False + + async def _desconectar_async(self) -> bool: + """Desconexión asíncrona""" + if self._websocket: + await self._websocket.close() + self._websocket = None + self._conectado = False + return True + + async def _asegurar_conexion(self) -> bool: + """Asegura que haya una conexión activa""" + if not self._conectado or not self._websocket: + return await self._conectar_async() + return True + + async def _saludar_async(self, nombre: str, duracion_ms: int, tono_personalizado: bool) -> bool: + """Enviar saludo asíncrono""" + if not await self._asegurar_conexion(): + return False + + try: + audio = self._generar_audio(nombre, duracion_ms, tono_personalizado) + + for i in range(0, len(audio), CHUNK_SIZE): + chunk = audio[i:i + CHUNK_SIZE] + await self._websocket.send(chunk) + await asyncio.sleep(0.005) + + return True + except Exception: + return False + + async def _detener_async(self) -> bool: + """Detener reproducción asíncrono""" + if not await self._asegurar_conexion(): + return False + + try: + await self._websocket.send(json.dumps({"cmd": "STOP"})) + await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout) + return True + except Exception: + return False + + async def _estado_async(self) -> Dict[str, Any]: + """Obtener estado asíncrono""" + if not await self._asegurar_conexion(): + return {} + + try: + await self._websocket.send(json.dumps({"cmd": "STATUS"})) + respuesta = await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout) + return json.loads(respuesta) + except Exception: + return {} + + async def _ping_async(self) -> bool: + """Ping asíncrono""" + if not await self._asegurar_conexion(): + return False + + try: + await self._websocket.send(json.dumps({"cmd": "PING"})) + respuesta = await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout) + data = json.loads(respuesta) + return data.get("status") == "ok" + except Exception: + return False + + def _generar_audio(self, nombre: str, duracion_ms: int, tono_personalizado: bool) -> bytes: + """Genera audio PCM para el saludo""" + num_muestras = int(SAMPLE_RATE * duracion_ms / 1000) + + if tono_personalizado: + frecuencia = self.tono_base + (len(nombre) * 10) + if frecuencia > 800: + frecuencia = 800 + else: + frecuencia = self.tono_base + + audio = bytearray() + for i in range(num_muestras): + valor = int(self.amplitud * math.sin(2 * math.pi * frecuencia * i / SAMPLE_RATE)) + audio.extend(struct.pack(' bool: + """Función rápida para saludar a una persona""" + bocina = BocinaCore(ip) + return bocina.saludar(nombre) + + +def detener(ip: str = None) -> bool: + """Detiene la reproducción""" + bocina = BocinaCore(ip) + return bocina.detener() + + +def obtener_estado(ip: str = None) -> dict: + """Obtiene el estado de la bocina""" + bocina = BocinaCore(ip) + return bocina.estado() + + +def configurar_ip(nueva_ip: str): + """Actualiza la IP en el archivo de configuración""" + from .config import config + config.actualizar_ip(nueva_ip) + + +def mostrar_configuracion(): + """Muestra la configuración actual""" + from .config import config + config.mostrar_configuracion() \ No newline at end of file diff --git a/core/speaker_iot/config.py b/core/speaker_iot/config.py new file mode 100644 index 0000000..569cee7 --- /dev/null +++ b/core/speaker_iot/config.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Módulo de configuración para la bocina ESP32 +""" + +import os +import configparser +from pathlib import Path + +# ==================== RUTAS ==================== +BASE_DIR = Path(__file__).resolve().parent.parent.parent +CONFIG_DIR = BASE_DIR / "config" / "speaker_iot" +CONFIG_FILE = CONFIG_DIR / "settings.ini" + +# ==================== CONFIGURACIÓN POR DEFECTO ==================== +DEFAULT_CONFIG = { + "ESP32": { + "ip": "192.168.15.128", + "puerto": "81" + }, + "Audio": { + "duracion_ms": "2000", + "tono_base": "440", + "amplitud": "16000" + }, + "General": { + "timeout": "5", + "reconectar": "true" + } +} + + +class ConfiguracionBocina: + """Gestor de configuración para la bocina""" + + def __init__(self): + self.config = configparser.ConfigParser() + self._cargar_configuracion() + + def _cargar_configuracion(self): + """Carga la configuración desde el archivo o crea uno por defecto""" + if CONFIG_FILE.exists(): + self.config.read(CONFIG_FILE, encoding='utf-8') + else: + self._crear_configuracion_default() + + def _crear_configuracion_default(self): + """Crea archivo de configuración por defecto""" + # Crear directorio si no existe + CONFIG_DIR.mkdir(parents=True, exist_ok=True) + + # Cargar valores por defecto + for seccion, valores in DEFAULT_CONFIG.items(): + self.config[seccion] = valores + + # Guardar archivo + with open(CONFIG_FILE, 'w', encoding='utf-8') as f: + self.config.write(f) + + print(f"📝 Configuración creada en: {CONFIG_FILE}") + + def obtener_ip(self) -> str: + """Obtiene la IP de la bocina""" + return self.config.get("ESP32", "ip", fallback="192.168.15.128") + + def obtener_puerto(self) -> int: + """Obtiene el puerto de la bocina""" + return self.config.getint("ESP32", "puerto", fallback=81) + + def obtener_duracion(self) -> int: + """Obtiene duración del audio en ms""" + return self.config.getint("Audio", "duracion_ms", fallback=2000) + + def obtener_tono_base(self) -> int: + """Obtiene frecuencia base del tono""" + return self.config.getint("Audio", "tono_base", fallback=440) + + def obtener_amplitud(self) -> int: + """Obtiene amplitud del audio""" + return self.config.getint("Audio", "amplitud", fallback=16000) + + def obtener_timeout(self) -> int: + """Obtiene timeout en segundos""" + return self.config.getint("General", "timeout", fallback=5) + + def reconectar_auto(self) -> bool: + """Obtiene si debe reconectar automáticamente""" + return self.config.getboolean("General", "reconectar", fallback=True) + + def actualizar_ip(self, nueva_ip: str): + """Actualiza la IP de la bocina""" + self.config.set("ESP32", "ip", nueva_ip) + with open(CONFIG_FILE, 'w', encoding='utf-8') as f: + self.config.write(f) + print(f"✅ IP actualizada a: {nueva_ip}") + + def mostrar_configuracion(self): + """Muestra la configuración actual""" + print("\n📡 Configuración actual:") + print(f" IP: {self.obtener_ip()}:{self.obtener_puerto()}") + print(f" Duración: {self.obtener_duracion()}ms") + print(f" Tono base: {self.obtener_tono_base()}Hz") + print(f" Timeout: {self.obtener_timeout()}s") + + +# Instancia global +config = ConfiguracionBocina() \ No newline at end of file diff --git a/core/speaker_iot/ejemplo.py b/core/speaker_iot/ejemplo.py new file mode 100644 index 0000000..258ae36 --- /dev/null +++ b/core/speaker_iot/ejemplo.py @@ -0,0 +1,60 @@ + +# Solo importas lo que necesitas +from core.speaker_iot import saludar, detener, obtener_estado + +# ===== EJEMPLO 1: Cuando detectas una persona ===== +def mi_detector(): + nombre = "Ana" # Tu IA obtiene el nombre + + # Enviar saludo (¡una sola línea!) + saludar(nombre) + + # También puedes verificar si funcionó + if saludar(nombre): + print(f"✅ Saludo enviado a {nombre}") + else: + print(f"❌ Error al enviar saludo a {nombre}") + +# ===== EJEMPLO 2: Dentro de tu loop principal ===== +while True: + persona = detectar_persona() # Tu función de detección + + if persona: + nombre = obtener_nombre(persona) # Tu base de datos + saludar(nombre) # Envía el saludo + +# ===== EJEMPLO 3: Clase completa ===== +class MiSistemaIA: + def __init__(self): + self.bocina_ip = "192.168.15.128" # O usa la del config + + def on_persona_detectada(self, persona): + nombre = self.obtener_nombre(persona) + if nombre: + print(f"🎉 Detectada: {nombre}") + saludar(nombre) # ¡Así de simple! + + def obtener_nombre(self, persona): + # Tu lógica para obtener nombre + return persona.get("nombre", "Visitante") + +# ===== Ejemplo completo de integración ===== + +class SistemaSeguridad: + def __init__(self): + self.personas_conocidas = ["Ana", "Carlos", "Maria"] + print("✅ Sistema iniciado - Bocina lista") + + def detectar(self, nombre): + if nombre in self.personas_conocidas: + print(f"🔔 ¡Bienvenido {nombre}!") + saludar(nombre) # Envía saludo + return True + else: + print(f"⚠️ Persona no registrada: {nombre}") + return False + +# Uso +sistema = SistemaSeguridad() +sistema.detectar("Ana") # Reproduce sonido +sistema.detectar("Luis") # No reproduce \ No newline at end of file diff --git a/core/speaker_iot/settings.ini b/core/speaker_iot/settings.ini new file mode 100644 index 0000000..e69de29 diff --git a/core/speaker_iot/test_esp32.py b/core/speaker_iot/test_esp32.py new file mode 100644 index 0000000..3f73b7b --- /dev/null +++ b/core/speaker_iot/test_esp32.py @@ -0,0 +1,409 @@ + +""" +BOCINA INTELIGENTE - CLIENTE PYTHON +==================================== +Cliente para enviar audio y comandos a la bocina ESP32 via WebSocket + +Uso: + python bocina_client.py + (Luego ingresa la IP y el nombre cuando se solicite) +""" + +import asyncio +import websockets +import json +import struct +import math +import sys +import time +import os +from typing import Optional + +# ==================== LIMPIAR PANTALLA ==================== +def limpiar_pantalla(): + """Limpia la consola según el sistema operativo""" + os.system('cls' if os.name == 'nt' else 'clear') + +# ==================== CLASE BOCINA ==================== +class BocinaInteligente: + """Cliente para controlar la bocina inteligente ESP32""" + + def __init__(self, ip: str, puerto: int = 81): + self.ip = ip + self.puerto = puerto + self.url = f"ws://{ip}:{puerto}" + self.websocket: Optional[websockets.WebSocketClientProtocol] = None + self.chunk_size = 1024 + self.timeout = 5 + self.conectado = False + + async def conectar(self) -> bool: + """Conectar al ESP32""" + try: + print(f"🔌 Conectando a {self.url}...") + self.websocket = await websockets.connect(self.url) + + # Esperar mensaje de bienvenida + response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout) + data = json.loads(response) + + if data.get("status") == "ok": + print(f" ✅ {data.get('msg')}") + self.conectado = True + return True + + except Exception as e: + print(f" ❌ Error: {e}") + return False + + return False + + async def desconectar(self): + """Cerrar conexión""" + if self.websocket: + await self.websocket.close() + self.conectado = False + print("🔌 Conexión cerrada") + + async def ping(self) -> bool: + """Probar conexión con el ESP32""" + if not self.websocket: + return False + + try: + await self.websocket.send(json.dumps({"cmd": "PING"})) + response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout) + data = json.loads(response) + return data.get("status") == "ok" + except: + return False + + async def obtener_estado(self) -> dict: + """Obtener estadísticas del ESP32""" + if not self.websocket: + return {} + + try: + await self.websocket.send(json.dumps({"cmd": "STATUS"})) + response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout) + return json.loads(response) + except: + return {} + + async def detener(self) -> bool: + """Detener reproducción""" + if not self.websocket: + return False + + try: + await self.websocket.send(json.dumps({"cmd": "STOP"})) + response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout) + data = json.loads(response) + return data.get("status") == "ok" + except: + return False + + async def enviar_audio(self, audio_data: bytes, nombre: str = "audio") -> bool: + """ + Enviar audio al ESP32 + + Args: + audio_data: Datos de audio en formato PCM (16kHz, 16bits, mono) + nombre: Nombre identificador (para logs) + """ + if not self.websocket: + print("❌ No hay conexión") + return False + + total_chunks = (len(audio_data) + self.chunk_size - 1) // self.chunk_size + print(f"📤 Enviando {total_chunks} chunks ({len(audio_data)} bytes) para '{nombre}'") + + inicio = time.time() + + for i, chunk_start in enumerate(range(0, len(audio_data), self.chunk_size)): + chunk = audio_data[chunk_start:chunk_start + self.chunk_size] + await self.websocket.send(chunk) + + # Mostrar progreso cada 10 chunks o al final + if (i + 1) % 10 == 0 or i == total_chunks - 1: + porcentaje = ((i + 1) * 100) // total_chunks + print(f" 📊 Progreso: {porcentaje}% ({i+1}/{total_chunks} chunks)") + + # Pequeña pausa para no saturar + await asyncio.sleep(0.005) + + elapsed = time.time() - inicio + print(f"✅ Audio enviado en {elapsed:.2f} segundos") + return True + + +# ==================== GENERADORES DE AUDIO ==================== +def generar_tono(frecuencia: int = 440, duracion_ms: int = 2000, + sample_rate: int = 16000, amplitud: int = 16000) -> bytes: + """ + Generar un tono seno en formato PCM + + Args: + frecuencia: Frecuencia del tono en Hz + duracion_ms: Duración en milisegundos + sample_rate: Frecuencia de muestreo + amplitud: Amplitud máxima (0-32767) + """ + num_muestras = int(sample_rate * duracion_ms / 1000) + audio = bytearray() + + for i in range(num_muestras): + valor = int(amplitud * math.sin(2 * math.pi * frecuencia * i / sample_rate)) + audio.extend(struct.pack(' bytes: + """Generar melodía de bienvenida (Do-Re-Mi-Fa-Sol)""" + sample_rate = 16000 + duracion_nota = 500 # ms por nota + notas = [261, 293, 329, 349, 392, 440] # Do, Re, Mi, Fa, Sol, La + audio = bytearray() + + for nota in notas: + nota_audio = generar_tono(nota, duracion_nota, sample_rate, amplitud=12000) + audio.extend(nota_audio) + + return bytes(audio) + + +def generar_saludo_personalizado(nombre: str) -> bytes: + """ + Generar un saludo personalizado (versión simple) + En un caso real, aquí usarías un servicio TTS + """ + # Usar frecuencia diferente según la longitud del nombre + frecuencia_base = 440 + frecuencia = frecuencia_base + (len(nombre) * 10) + # Limitar frecuencia máxima + if frecuencia > 800: + frecuencia = 800 + return generar_tono(frecuencia, duracion_ms=2000) + + +# ==================== MENÚ PRINCIPAL ==================== +def mostrar_menu(): + """Muestra el menú principal""" + print("\n" + "=" * 50) + print(" 🎵 BOCINA INTELIGENTE - CONTROL") + print("=" * 50) + print("\n📋 Opciones disponibles:") + print(" 1. 🔊 Enviar tono de prueba (440Hz)") + print(" 2. 🎵 Enviar melodía de bienvenida") + print(" 3. 💬 Enviar saludo personalizado") + print(" 4. 📊 Ver estado del ESP32") + print(" 5. 🏓 Probar ping") + print(" 6. 🔇 Detener reproducción") + print(" 7. 🔄 Reconectar") + print(" 8. 🚪 Salir") + print("-" * 50) + + +# ==================== MODO INTERACTIVO ==================== +async def modo_interactivo(): + """Modo interactivo con entrada de IP y nombre""" + + # Limpiar pantalla + limpiar_pantalla() + + print("\n" + "=" * 50) + print(" 🎵 BOCINA INTELIGENTE") + print("=" * 50) + + # Solicitar IP + print("\n📡 Configuración de conexión:") + ip_default = "192.168.15.128" + ip = input(f" IP del ESP32 [{ip_default}]: ").strip() + if not ip: + ip = ip_default + + # Solicitar nombre por defecto para saludos + nombre_default = "Visitante" + nombre = input(f" Nombre por defecto [{nombre_default}]: ").strip() + if not nombre: + nombre = nombre_default + + # Crear instancia + bocina = BocinaInteligente(ip) + + # Conectar + print("\n🔄 Conectando...") + if not await bocina.conectar(): + print("\n❌ No se pudo conectar al ESP32") + print(" Verifica que:") + print(f" 1. La IP {ip} sea correcta") + print(" 2. El ESP32 esté encendido") + print(" 3. Estés en la misma red WiFi") + input("\n Presiona Enter para salir...") + return + + print("\n✅ ¡Conectado exitosamente!") + print(f" 📡 IP: {ip}") + print(f" 👤 Nombre: {nombre}") + + # Bucle principal + while True: + mostrar_menu() + + opcion = input("\n👉 Selecciona una opción (1-8): ").strip() + + if opcion == "1": + print("\n🔊 Enviando tono de prueba (440Hz)...") + audio = generar_tono(440, 2000) + await bocina.enviar_audio(audio, "Tono 440Hz") + + elif opcion == "2": + print("\n🎵 Enviando melodía de bienvenida...") + audio = generar_melodia_bienvenida() + await bocina.enviar_audio(audio, "Melodía") + + elif opcion == "3": + # Pedir nombre específico para este saludo + nombre_saludo = input(f" 👤 Nombre (Enter para usar '{nombre}'): ").strip() + if not nombre_saludo: + nombre_saludo = nombre + + print(f"\n🔊 Generando saludo para '{nombre_saludo}'...") + audio = generar_saludo_personalizado(nombre_saludo) + await bocina.enviar_audio(audio, nombre_saludo) + + elif opcion == "4": + print("\n📊 Obteniendo estado del ESP32...") + estado = await bocina.obtener_estado() + if estado: + print("\n 📡 Estado del sistema:") + print(f" Status: {estado.get('status', 'desconocido')}") + print(f" 📦 Bytes recibidos: {estado.get('bytes_recibidos', 0)}") + print(f" 🔢 Chunks recibidos: {estado.get('chunks_recibidos', 0)}") + print(f" 🎵 Audio activo: {'✅ Sí' if estado.get('audio_activo') else '❌ No'}") + print(f" 📶 WiFi RSSI: {estado.get('wifi_rssi', 0)} dBm") + else: + print(" ❌ No se pudo obtener estado") + + elif opcion == "5": + print("\n🏓 Probando ping...") + inicio = time.time() + if await bocina.ping(): + latencia = (time.time() - inicio) * 1000 + print(f" ✅ PONG recibido (latencia: {latencia:.0f}ms)") + else: + print(" ❌ Sin respuesta - verifica la conexión") + + elif opcion == "6": + print("\n🔇 Deteniendo reproducción...") + if await bocina.detener(): + print(" ✅ Reproducción detenida") + else: + print(" ⚠️ No se pudo detener o ya estaba detenido") + + elif opcion == "7": + print("\n🔄 Reconectando...") + await bocina.desconectar() + await asyncio.sleep(1) + if await bocina.conectar(): + print(" ✅ Reconectado exitosamente") + else: + print(" ❌ Error al reconectar") + + elif opcion == "8": + print("\n👋 Saliendo...") + break + + else: + print("❌ Opción inválida") + + # Pequeña pausa antes de volver al menú + await asyncio.sleep(0.5) + + # Cerrar conexión + await bocina.desconectar() + print("\n✅ Programa finalizado") + + +# ==================== MODO RÁPIDO ==================== +async def modo_rapido(): + """Modo rápido: pide IP y nombre y envía saludo inmediato""" + + limpiar_pantalla() + + print("\n" + "=" * 50) + print(" 🎵 BOCINA INTELIGENTE - MODO RÁPIDO") + print("=" * 50) + + # Solicitar IP + print("\n📡 Configuración:") + ip_default = "192.168.15.128" + ip = input(f" IP del ESP32 [{ip_default}]: ").strip() + if not ip: + ip = ip_default + + # Solicitar nombre + nombre = input(" 👤 Nombre de la persona: ").strip() + if not nombre: + nombre = "Visitante" + + # Conectar y enviar + bocina = BocinaInteligente(ip) + + print("\n🔄 Conectando...") + if not await bocina.conectar(): + print("❌ No se pudo conectar") + input("\nPresiona Enter para salir...") + return + + print(f"\n🔊 Enviando saludo para '{nombre}'...") + audio = generar_saludo_personalizado(nombre) + await bocina.enviar_audio(audio, nombre) + + # Mostrar estado + await asyncio.sleep(1) + estado = await bocina.obtener_estado() + if estado: + print(f"\n📊 Enviados: {estado.get('bytes_recibidos', 0)} bytes") + + await bocina.desconectar() + + print("\n✅ Saludo enviado!") + input("\nPresiona Enter para salir...") + + +# ==================== MAIN ==================== +async def main(): + """Función principal""" + + limpiar_pantalla() + + print("\n" + "=" * 50) + print(" 🎵 BOCINA INTELIGENTE v1.0") + print("=" * 50) + + print("\nSelecciona modo de operación:") + print(" 1. 🎮 Modo interactivo (menú completo)") + print(" 2. ⚡ Modo rápido (solo enviar saludo)") + print(" 3. 🚪 Salir") + + modo = input("\n👉 Opción (1-3): ").strip() + + if modo == "1": + await modo_interactivo() + elif modo == "2": + await modo_rapido() + else: + print("\n👋 Hasta luego!") + return + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\n\n👋 Programa interrumpido") + except Exception as e: + print(f"\n❌ Error: {e}") + input("\nPresiona Enter para salir...") \ No newline at end of file