409 lines
13 KiB
Python
409 lines
13 KiB
Python
|
|
|
||
|
|
"""
|
||
|
|
BOCINA INTELIGENTE - CLIENTE PYTHON
|
||
|
|
====================================
|
||
|
|
Cliente para enviar audio y comandos a la bocina ESP32 via WebSocket
|
||
|
|
|
||
|
|
Uso:
|
||
|
|
python bocina_client.py
|
||
|
|
(Luego ingresa la IP y el nombre cuando se solicite)
|
||
|
|
"""
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
import websockets
|
||
|
|
import json
|
||
|
|
import struct
|
||
|
|
import math
|
||
|
|
import sys
|
||
|
|
import time
|
||
|
|
import os
|
||
|
|
from typing import Optional
|
||
|
|
|
||
|
|
# ==================== LIMPIAR PANTALLA ====================
|
||
|
|
def limpiar_pantalla():
|
||
|
|
"""Limpia la consola según el sistema operativo"""
|
||
|
|
os.system('cls' if os.name == 'nt' else 'clear')
|
||
|
|
|
||
|
|
# ==================== CLASE BOCINA ====================
|
||
|
|
class BocinaInteligente:
|
||
|
|
"""Cliente para controlar la bocina inteligente ESP32"""
|
||
|
|
|
||
|
|
def __init__(self, ip: str, puerto: int = 81):
|
||
|
|
self.ip = ip
|
||
|
|
self.puerto = puerto
|
||
|
|
self.url = f"ws://{ip}:{puerto}"
|
||
|
|
self.websocket: Optional[websockets.WebSocketClientProtocol] = None
|
||
|
|
self.chunk_size = 1024
|
||
|
|
self.timeout = 5
|
||
|
|
self.conectado = False
|
||
|
|
|
||
|
|
async def conectar(self) -> bool:
|
||
|
|
"""Conectar al ESP32"""
|
||
|
|
try:
|
||
|
|
print(f"🔌 Conectando a {self.url}...")
|
||
|
|
self.websocket = await websockets.connect(self.url)
|
||
|
|
|
||
|
|
# Esperar mensaje de bienvenida
|
||
|
|
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
|
||
|
|
data = json.loads(response)
|
||
|
|
|
||
|
|
if data.get("status") == "ok":
|
||
|
|
print(f" ✅ {data.get('msg')}")
|
||
|
|
self.conectado = True
|
||
|
|
return True
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f" ❌ Error: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
return False
|
||
|
|
|
||
|
|
async def desconectar(self):
|
||
|
|
"""Cerrar conexión"""
|
||
|
|
if self.websocket:
|
||
|
|
await self.websocket.close()
|
||
|
|
self.conectado = False
|
||
|
|
print("🔌 Conexión cerrada")
|
||
|
|
|
||
|
|
async def ping(self) -> bool:
|
||
|
|
"""Probar conexión con el ESP32"""
|
||
|
|
if not self.websocket:
|
||
|
|
return False
|
||
|
|
|
||
|
|
try:
|
||
|
|
await self.websocket.send(json.dumps({"cmd": "PING"}))
|
||
|
|
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
|
||
|
|
data = json.loads(response)
|
||
|
|
return data.get("status") == "ok"
|
||
|
|
except:
|
||
|
|
return False
|
||
|
|
|
||
|
|
async def obtener_estado(self) -> dict:
|
||
|
|
"""Obtener estadísticas del ESP32"""
|
||
|
|
if not self.websocket:
|
||
|
|
return {}
|
||
|
|
|
||
|
|
try:
|
||
|
|
await self.websocket.send(json.dumps({"cmd": "STATUS"}))
|
||
|
|
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
|
||
|
|
return json.loads(response)
|
||
|
|
except:
|
||
|
|
return {}
|
||
|
|
|
||
|
|
async def detener(self) -> bool:
|
||
|
|
"""Detener reproducción"""
|
||
|
|
if not self.websocket:
|
||
|
|
return False
|
||
|
|
|
||
|
|
try:
|
||
|
|
await self.websocket.send(json.dumps({"cmd": "STOP"}))
|
||
|
|
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
|
||
|
|
data = json.loads(response)
|
||
|
|
return data.get("status") == "ok"
|
||
|
|
except:
|
||
|
|
return False
|
||
|
|
|
||
|
|
async def enviar_audio(self, audio_data: bytes, nombre: str = "audio") -> bool:
|
||
|
|
"""
|
||
|
|
Enviar audio al ESP32
|
||
|
|
|
||
|
|
Args:
|
||
|
|
audio_data: Datos de audio en formato PCM (16kHz, 16bits, mono)
|
||
|
|
nombre: Nombre identificador (para logs)
|
||
|
|
"""
|
||
|
|
if not self.websocket:
|
||
|
|
print("❌ No hay conexión")
|
||
|
|
return False
|
||
|
|
|
||
|
|
total_chunks = (len(audio_data) + self.chunk_size - 1) // self.chunk_size
|
||
|
|
print(f"📤 Enviando {total_chunks} chunks ({len(audio_data)} bytes) para '{nombre}'")
|
||
|
|
|
||
|
|
inicio = time.time()
|
||
|
|
|
||
|
|
for i, chunk_start in enumerate(range(0, len(audio_data), self.chunk_size)):
|
||
|
|
chunk = audio_data[chunk_start:chunk_start + self.chunk_size]
|
||
|
|
await self.websocket.send(chunk)
|
||
|
|
|
||
|
|
# Mostrar progreso cada 10 chunks o al final
|
||
|
|
if (i + 1) % 10 == 0 or i == total_chunks - 1:
|
||
|
|
porcentaje = ((i + 1) * 100) // total_chunks
|
||
|
|
print(f" 📊 Progreso: {porcentaje}% ({i+1}/{total_chunks} chunks)")
|
||
|
|
|
||
|
|
# Pequeña pausa para no saturar
|
||
|
|
await asyncio.sleep(0.005)
|
||
|
|
|
||
|
|
elapsed = time.time() - inicio
|
||
|
|
print(f"✅ Audio enviado en {elapsed:.2f} segundos")
|
||
|
|
return True
|
||
|
|
|
||
|
|
|
||
|
|
# ==================== GENERADORES DE AUDIO ====================
|
||
|
|
def generar_tono(frecuencia: int = 440, duracion_ms: int = 2000,
|
||
|
|
sample_rate: int = 16000, amplitud: int = 16000) -> bytes:
|
||
|
|
"""
|
||
|
|
Generar un tono seno en formato PCM
|
||
|
|
|
||
|
|
Args:
|
||
|
|
frecuencia: Frecuencia del tono en Hz
|
||
|
|
duracion_ms: Duración en milisegundos
|
||
|
|
sample_rate: Frecuencia de muestreo
|
||
|
|
amplitud: Amplitud máxima (0-32767)
|
||
|
|
"""
|
||
|
|
num_muestras = int(sample_rate * duracion_ms / 1000)
|
||
|
|
audio = bytearray()
|
||
|
|
|
||
|
|
for i in range(num_muestras):
|
||
|
|
valor = int(amplitud * math.sin(2 * math.pi * frecuencia * i / sample_rate))
|
||
|
|
audio.extend(struct.pack('<h', valor)) # little-endian, 16 bits
|
||
|
|
|
||
|
|
return bytes(audio)
|
||
|
|
|
||
|
|
|
||
|
|
def generar_melodia_bienvenida() -> bytes:
|
||
|
|
"""Generar melodía de bienvenida (Do-Re-Mi-Fa-Sol)"""
|
||
|
|
sample_rate = 16000
|
||
|
|
duracion_nota = 500 # ms por nota
|
||
|
|
notas = [261, 293, 329, 349, 392, 440] # Do, Re, Mi, Fa, Sol, La
|
||
|
|
audio = bytearray()
|
||
|
|
|
||
|
|
for nota in notas:
|
||
|
|
nota_audio = generar_tono(nota, duracion_nota, sample_rate, amplitud=12000)
|
||
|
|
audio.extend(nota_audio)
|
||
|
|
|
||
|
|
return bytes(audio)
|
||
|
|
|
||
|
|
|
||
|
|
def generar_saludo_personalizado(nombre: str) -> bytes:
|
||
|
|
"""
|
||
|
|
Generar un saludo personalizado (versión simple)
|
||
|
|
En un caso real, aquí usarías un servicio TTS
|
||
|
|
"""
|
||
|
|
# Usar frecuencia diferente según la longitud del nombre
|
||
|
|
frecuencia_base = 440
|
||
|
|
frecuencia = frecuencia_base + (len(nombre) * 10)
|
||
|
|
# Limitar frecuencia máxima
|
||
|
|
if frecuencia > 800:
|
||
|
|
frecuencia = 800
|
||
|
|
return generar_tono(frecuencia, duracion_ms=2000)
|
||
|
|
|
||
|
|
|
||
|
|
# ==================== MENÚ PRINCIPAL ====================
|
||
|
|
def mostrar_menu():
|
||
|
|
"""Muestra el menú principal"""
|
||
|
|
print("\n" + "=" * 50)
|
||
|
|
print(" 🎵 BOCINA INTELIGENTE - CONTROL")
|
||
|
|
print("=" * 50)
|
||
|
|
print("\n📋 Opciones disponibles:")
|
||
|
|
print(" 1. 🔊 Enviar tono de prueba (440Hz)")
|
||
|
|
print(" 2. 🎵 Enviar melodía de bienvenida")
|
||
|
|
print(" 3. 💬 Enviar saludo personalizado")
|
||
|
|
print(" 4. 📊 Ver estado del ESP32")
|
||
|
|
print(" 5. 🏓 Probar ping")
|
||
|
|
print(" 6. 🔇 Detener reproducción")
|
||
|
|
print(" 7. 🔄 Reconectar")
|
||
|
|
print(" 8. 🚪 Salir")
|
||
|
|
print("-" * 50)
|
||
|
|
|
||
|
|
|
||
|
|
# ==================== MODO INTERACTIVO ====================
|
||
|
|
async def modo_interactivo():
|
||
|
|
"""Modo interactivo con entrada de IP y nombre"""
|
||
|
|
|
||
|
|
# Limpiar pantalla
|
||
|
|
limpiar_pantalla()
|
||
|
|
|
||
|
|
print("\n" + "=" * 50)
|
||
|
|
print(" 🎵 BOCINA INTELIGENTE")
|
||
|
|
print("=" * 50)
|
||
|
|
|
||
|
|
# Solicitar IP
|
||
|
|
print("\n📡 Configuración de conexión:")
|
||
|
|
ip_default = "192.168.15.128"
|
||
|
|
ip = input(f" IP del ESP32 [{ip_default}]: ").strip()
|
||
|
|
if not ip:
|
||
|
|
ip = ip_default
|
||
|
|
|
||
|
|
# Solicitar nombre por defecto para saludos
|
||
|
|
nombre_default = "Visitante"
|
||
|
|
nombre = input(f" Nombre por defecto [{nombre_default}]: ").strip()
|
||
|
|
if not nombre:
|
||
|
|
nombre = nombre_default
|
||
|
|
|
||
|
|
# Crear instancia
|
||
|
|
bocina = BocinaInteligente(ip)
|
||
|
|
|
||
|
|
# Conectar
|
||
|
|
print("\n🔄 Conectando...")
|
||
|
|
if not await bocina.conectar():
|
||
|
|
print("\n❌ No se pudo conectar al ESP32")
|
||
|
|
print(" Verifica que:")
|
||
|
|
print(f" 1. La IP {ip} sea correcta")
|
||
|
|
print(" 2. El ESP32 esté encendido")
|
||
|
|
print(" 3. Estés en la misma red WiFi")
|
||
|
|
input("\n Presiona Enter para salir...")
|
||
|
|
return
|
||
|
|
|
||
|
|
print("\n✅ ¡Conectado exitosamente!")
|
||
|
|
print(f" 📡 IP: {ip}")
|
||
|
|
print(f" 👤 Nombre: {nombre}")
|
||
|
|
|
||
|
|
# Bucle principal
|
||
|
|
while True:
|
||
|
|
mostrar_menu()
|
||
|
|
|
||
|
|
opcion = input("\n👉 Selecciona una opción (1-8): ").strip()
|
||
|
|
|
||
|
|
if opcion == "1":
|
||
|
|
print("\n🔊 Enviando tono de prueba (440Hz)...")
|
||
|
|
audio = generar_tono(440, 2000)
|
||
|
|
await bocina.enviar_audio(audio, "Tono 440Hz")
|
||
|
|
|
||
|
|
elif opcion == "2":
|
||
|
|
print("\n🎵 Enviando melodía de bienvenida...")
|
||
|
|
audio = generar_melodia_bienvenida()
|
||
|
|
await bocina.enviar_audio(audio, "Melodía")
|
||
|
|
|
||
|
|
elif opcion == "3":
|
||
|
|
# Pedir nombre específico para este saludo
|
||
|
|
nombre_saludo = input(f" 👤 Nombre (Enter para usar '{nombre}'): ").strip()
|
||
|
|
if not nombre_saludo:
|
||
|
|
nombre_saludo = nombre
|
||
|
|
|
||
|
|
print(f"\n🔊 Generando saludo para '{nombre_saludo}'...")
|
||
|
|
audio = generar_saludo_personalizado(nombre_saludo)
|
||
|
|
await bocina.enviar_audio(audio, nombre_saludo)
|
||
|
|
|
||
|
|
elif opcion == "4":
|
||
|
|
print("\n📊 Obteniendo estado del ESP32...")
|
||
|
|
estado = await bocina.obtener_estado()
|
||
|
|
if estado:
|
||
|
|
print("\n 📡 Estado del sistema:")
|
||
|
|
print(f" Status: {estado.get('status', 'desconocido')}")
|
||
|
|
print(f" 📦 Bytes recibidos: {estado.get('bytes_recibidos', 0)}")
|
||
|
|
print(f" 🔢 Chunks recibidos: {estado.get('chunks_recibidos', 0)}")
|
||
|
|
print(f" 🎵 Audio activo: {'✅ Sí' if estado.get('audio_activo') else '❌ No'}")
|
||
|
|
print(f" 📶 WiFi RSSI: {estado.get('wifi_rssi', 0)} dBm")
|
||
|
|
else:
|
||
|
|
print(" ❌ No se pudo obtener estado")
|
||
|
|
|
||
|
|
elif opcion == "5":
|
||
|
|
print("\n🏓 Probando ping...")
|
||
|
|
inicio = time.time()
|
||
|
|
if await bocina.ping():
|
||
|
|
latencia = (time.time() - inicio) * 1000
|
||
|
|
print(f" ✅ PONG recibido (latencia: {latencia:.0f}ms)")
|
||
|
|
else:
|
||
|
|
print(" ❌ Sin respuesta - verifica la conexión")
|
||
|
|
|
||
|
|
elif opcion == "6":
|
||
|
|
print("\n🔇 Deteniendo reproducción...")
|
||
|
|
if await bocina.detener():
|
||
|
|
print(" ✅ Reproducción detenida")
|
||
|
|
else:
|
||
|
|
print(" ⚠️ No se pudo detener o ya estaba detenido")
|
||
|
|
|
||
|
|
elif opcion == "7":
|
||
|
|
print("\n🔄 Reconectando...")
|
||
|
|
await bocina.desconectar()
|
||
|
|
await asyncio.sleep(1)
|
||
|
|
if await bocina.conectar():
|
||
|
|
print(" ✅ Reconectado exitosamente")
|
||
|
|
else:
|
||
|
|
print(" ❌ Error al reconectar")
|
||
|
|
|
||
|
|
elif opcion == "8":
|
||
|
|
print("\n👋 Saliendo...")
|
||
|
|
break
|
||
|
|
|
||
|
|
else:
|
||
|
|
print("❌ Opción inválida")
|
||
|
|
|
||
|
|
# Pequeña pausa antes de volver al menú
|
||
|
|
await asyncio.sleep(0.5)
|
||
|
|
|
||
|
|
# Cerrar conexión
|
||
|
|
await bocina.desconectar()
|
||
|
|
print("\n✅ Programa finalizado")
|
||
|
|
|
||
|
|
|
||
|
|
# ==================== MODO RÁPIDO ====================
|
||
|
|
async def modo_rapido():
|
||
|
|
"""Modo rápido: pide IP y nombre y envía saludo inmediato"""
|
||
|
|
|
||
|
|
limpiar_pantalla()
|
||
|
|
|
||
|
|
print("\n" + "=" * 50)
|
||
|
|
print(" 🎵 BOCINA INTELIGENTE - MODO RÁPIDO")
|
||
|
|
print("=" * 50)
|
||
|
|
|
||
|
|
# Solicitar IP
|
||
|
|
print("\n📡 Configuración:")
|
||
|
|
ip_default = "192.168.15.128"
|
||
|
|
ip = input(f" IP del ESP32 [{ip_default}]: ").strip()
|
||
|
|
if not ip:
|
||
|
|
ip = ip_default
|
||
|
|
|
||
|
|
# Solicitar nombre
|
||
|
|
nombre = input(" 👤 Nombre de la persona: ").strip()
|
||
|
|
if not nombre:
|
||
|
|
nombre = "Visitante"
|
||
|
|
|
||
|
|
# Conectar y enviar
|
||
|
|
bocina = BocinaInteligente(ip)
|
||
|
|
|
||
|
|
print("\n🔄 Conectando...")
|
||
|
|
if not await bocina.conectar():
|
||
|
|
print("❌ No se pudo conectar")
|
||
|
|
input("\nPresiona Enter para salir...")
|
||
|
|
return
|
||
|
|
|
||
|
|
print(f"\n🔊 Enviando saludo para '{nombre}'...")
|
||
|
|
audio = generar_saludo_personalizado(nombre)
|
||
|
|
await bocina.enviar_audio(audio, nombre)
|
||
|
|
|
||
|
|
# Mostrar estado
|
||
|
|
await asyncio.sleep(1)
|
||
|
|
estado = await bocina.obtener_estado()
|
||
|
|
if estado:
|
||
|
|
print(f"\n📊 Enviados: {estado.get('bytes_recibidos', 0)} bytes")
|
||
|
|
|
||
|
|
await bocina.desconectar()
|
||
|
|
|
||
|
|
print("\n✅ Saludo enviado!")
|
||
|
|
input("\nPresiona Enter para salir...")
|
||
|
|
|
||
|
|
|
||
|
|
# ==================== MAIN ====================
|
||
|
|
async def main():
|
||
|
|
"""Función principal"""
|
||
|
|
|
||
|
|
limpiar_pantalla()
|
||
|
|
|
||
|
|
print("\n" + "=" * 50)
|
||
|
|
print(" 🎵 BOCINA INTELIGENTE v1.0")
|
||
|
|
print("=" * 50)
|
||
|
|
|
||
|
|
print("\nSelecciona modo de operación:")
|
||
|
|
print(" 1. 🎮 Modo interactivo (menú completo)")
|
||
|
|
print(" 2. ⚡ Modo rápido (solo enviar saludo)")
|
||
|
|
print(" 3. 🚪 Salir")
|
||
|
|
|
||
|
|
modo = input("\n👉 Opción (1-3): ").strip()
|
||
|
|
|
||
|
|
if modo == "1":
|
||
|
|
await modo_interactivo()
|
||
|
|
elif modo == "2":
|
||
|
|
await modo_rapido()
|
||
|
|
else:
|
||
|
|
print("\n👋 Hasta luego!")
|
||
|
|
return
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
try:
|
||
|
|
asyncio.run(main())
|
||
|
|
except KeyboardInterrupt:
|
||
|
|
print("\n\n👋 Programa interrumpido")
|
||
|
|
except Exception as e:
|
||
|
|
print(f"\n❌ Error: {e}")
|
||
|
|
input("\nPresiona Enter para salir...")
|