IdentificacionIA/test_esp32.py

409 lines
13 KiB
Python
Raw Normal View History

"""
BOCINA INTELIGENTE - CLIENTE PYTHON
====================================
Cliente para enviar audio y comandos a la bocina ESP32 via WebSocket
Uso:
python bocina_client.py
(Luego ingresa la IP y el nombre cuando se solicite)
"""
import asyncio
import websockets
import json
import struct
import math
import sys
import time
import os
from typing import Optional
# ==================== LIMPIAR PANTALLA ====================
def limpiar_pantalla():
"""Limpia la consola según el sistema operativo"""
os.system('cls' if os.name == 'nt' else 'clear')
# ==================== CLASE BOCINA ====================
class BocinaInteligente:
"""Cliente para controlar la bocina inteligente ESP32"""
def __init__(self, ip: str, puerto: int = 81):
self.ip = ip
self.puerto = puerto
self.url = f"ws://{ip}:{puerto}"
self.websocket: Optional[websockets.WebSocketClientProtocol] = None
self.chunk_size = 1024
self.timeout = 5
self.conectado = False
async def conectar(self) -> bool:
"""Conectar al ESP32"""
try:
print(f"🔌 Conectando a {self.url}...")
self.websocket = await websockets.connect(self.url)
# Esperar mensaje de bienvenida
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
if data.get("status") == "ok":
print(f"{data.get('msg')}")
self.conectado = True
return True
except Exception as e:
print(f" ❌ Error: {e}")
return False
return False
async def desconectar(self):
"""Cerrar conexión"""
if self.websocket:
await self.websocket.close()
self.conectado = False
print("🔌 Conexión cerrada")
async def ping(self) -> bool:
"""Probar conexión con el ESP32"""
if not self.websocket:
return False
try:
await self.websocket.send(json.dumps({"cmd": "PING"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
return data.get("status") == "ok"
except:
return False
async def obtener_estado(self) -> dict:
"""Obtener estadísticas del ESP32"""
if not self.websocket:
return {}
try:
await self.websocket.send(json.dumps({"cmd": "STATUS"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
return json.loads(response)
except:
return {}
async def detener(self) -> bool:
"""Detener reproducción"""
if not self.websocket:
return False
try:
await self.websocket.send(json.dumps({"cmd": "STOP"}))
response = await asyncio.wait_for(self.websocket.recv(), timeout=self.timeout)
data = json.loads(response)
return data.get("status") == "ok"
except:
return False
async def enviar_audio(self, audio_data: bytes, nombre: str = "audio") -> bool:
"""
Enviar audio al ESP32
Args:
audio_data: Datos de audio en formato PCM (16kHz, 16bits, mono)
nombre: Nombre identificador (para logs)
"""
if not self.websocket:
print("❌ No hay conexión")
return False
total_chunks = (len(audio_data) + self.chunk_size - 1) // self.chunk_size
print(f"📤 Enviando {total_chunks} chunks ({len(audio_data)} bytes) para '{nombre}'")
inicio = time.time()
for i, chunk_start in enumerate(range(0, len(audio_data), self.chunk_size)):
chunk = audio_data[chunk_start:chunk_start + self.chunk_size]
await self.websocket.send(chunk)
# Mostrar progreso cada 10 chunks o al final
if (i + 1) % 10 == 0 or i == total_chunks - 1:
porcentaje = ((i + 1) * 100) // total_chunks
print(f" 📊 Progreso: {porcentaje}% ({i+1}/{total_chunks} chunks)")
# Pequeña pausa para no saturar
await asyncio.sleep(0.005)
elapsed = time.time() - inicio
print(f"✅ Audio enviado en {elapsed:.2f} segundos")
return True
# ==================== GENERADORES DE AUDIO ====================
def generar_tono(frecuencia: int = 440, duracion_ms: int = 2000,
sample_rate: int = 16000, amplitud: int = 16000) -> bytes:
"""
Generar un tono seno en formato PCM
Args:
frecuencia: Frecuencia del tono en Hz
duracion_ms: Duración en milisegundos
sample_rate: Frecuencia de muestreo
amplitud: Amplitud máxima (0-32767)
"""
num_muestras = int(sample_rate * duracion_ms / 1000)
audio = bytearray()
for i in range(num_muestras):
valor = int(amplitud * math.sin(2 * math.pi * frecuencia * i / sample_rate))
audio.extend(struct.pack('<h', valor)) # little-endian, 16 bits
return bytes(audio)
def generar_melodia_bienvenida() -> bytes:
"""Generar melodía de bienvenida (Do-Re-Mi-Fa-Sol)"""
sample_rate = 16000
duracion_nota = 500 # ms por nota
notas = [261, 293, 329, 349, 392, 440] # Do, Re, Mi, Fa, Sol, La
audio = bytearray()
for nota in notas:
nota_audio = generar_tono(nota, duracion_nota, sample_rate, amplitud=12000)
audio.extend(nota_audio)
return bytes(audio)
def generar_saludo_personalizado(nombre: str) -> bytes:
"""
Generar un saludo personalizado (versión simple)
En un caso real, aquí usarías un servicio TTS
"""
# Usar frecuencia diferente según la longitud del nombre
frecuencia_base = 440
frecuencia = frecuencia_base + (len(nombre) * 10)
# Limitar frecuencia máxima
if frecuencia > 800:
frecuencia = 800
return generar_tono(frecuencia, duracion_ms=2000)
# ==================== MENÚ PRINCIPAL ====================
def mostrar_menu():
"""Muestra el menú principal"""
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE - CONTROL")
print("=" * 50)
print("\n📋 Opciones disponibles:")
print(" 1. 🔊 Enviar tono de prueba (440Hz)")
print(" 2. 🎵 Enviar melodía de bienvenida")
print(" 3. 💬 Enviar saludo personalizado")
print(" 4. 📊 Ver estado del ESP32")
print(" 5. 🏓 Probar ping")
print(" 6. 🔇 Detener reproducción")
print(" 7. 🔄 Reconectar")
print(" 8. 🚪 Salir")
print("-" * 50)
# ==================== MODO INTERACTIVO ====================
async def modo_interactivo():
"""Modo interactivo con entrada de IP y nombre"""
# Limpiar pantalla
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE")
print("=" * 50)
# Solicitar IP
print("\n📡 Configuración de conexión:")
ip_default = "192.168.15.128"
ip = input(f" IP del ESP32 [{ip_default}]: ").strip()
if not ip:
ip = ip_default
# Solicitar nombre por defecto para saludos
nombre_default = "Visitante"
nombre = input(f" Nombre por defecto [{nombre_default}]: ").strip()
if not nombre:
nombre = nombre_default
# Crear instancia
bocina = BocinaInteligente(ip)
# Conectar
print("\n🔄 Conectando...")
if not await bocina.conectar():
print("\n❌ No se pudo conectar al ESP32")
print(" Verifica que:")
print(f" 1. La IP {ip} sea correcta")
print(" 2. El ESP32 esté encendido")
print(" 3. Estés en la misma red WiFi")
input("\n Presiona Enter para salir...")
return
print("\n✅ ¡Conectado exitosamente!")
print(f" 📡 IP: {ip}")
print(f" 👤 Nombre: {nombre}")
# Bucle principal
while True:
mostrar_menu()
opcion = input("\n👉 Selecciona una opción (1-8): ").strip()
if opcion == "1":
print("\n🔊 Enviando tono de prueba (440Hz)...")
audio = generar_tono(440, 2000)
await bocina.enviar_audio(audio, "Tono 440Hz")
elif opcion == "2":
print("\n🎵 Enviando melodía de bienvenida...")
audio = generar_melodia_bienvenida()
await bocina.enviar_audio(audio, "Melodía")
elif opcion == "3":
# Pedir nombre específico para este saludo
nombre_saludo = input(f" 👤 Nombre (Enter para usar '{nombre}'): ").strip()
if not nombre_saludo:
nombre_saludo = nombre
print(f"\n🔊 Generando saludo para '{nombre_saludo}'...")
audio = generar_saludo_personalizado(nombre_saludo)
await bocina.enviar_audio(audio, nombre_saludo)
elif opcion == "4":
print("\n📊 Obteniendo estado del ESP32...")
estado = await bocina.obtener_estado()
if estado:
print("\n 📡 Estado del sistema:")
print(f" Status: {estado.get('status', 'desconocido')}")
print(f" 📦 Bytes recibidos: {estado.get('bytes_recibidos', 0)}")
print(f" 🔢 Chunks recibidos: {estado.get('chunks_recibidos', 0)}")
print(f" 🎵 Audio activo: {'✅ Sí' if estado.get('audio_activo') else '❌ No'}")
print(f" 📶 WiFi RSSI: {estado.get('wifi_rssi', 0)} dBm")
else:
print(" ❌ No se pudo obtener estado")
elif opcion == "5":
print("\n🏓 Probando ping...")
inicio = time.time()
if await bocina.ping():
latencia = (time.time() - inicio) * 1000
print(f" ✅ PONG recibido (latencia: {latencia:.0f}ms)")
else:
print(" ❌ Sin respuesta - verifica la conexión")
elif opcion == "6":
print("\n🔇 Deteniendo reproducción...")
if await bocina.detener():
print(" ✅ Reproducción detenida")
else:
print(" ⚠️ No se pudo detener o ya estaba detenido")
elif opcion == "7":
print("\n🔄 Reconectando...")
await bocina.desconectar()
await asyncio.sleep(1)
if await bocina.conectar():
print(" ✅ Reconectado exitosamente")
else:
print(" ❌ Error al reconectar")
elif opcion == "8":
print("\n👋 Saliendo...")
break
else:
print("❌ Opción inválida")
# Pequeña pausa antes de volver al menú
await asyncio.sleep(0.5)
# Cerrar conexión
await bocina.desconectar()
print("\n✅ Programa finalizado")
# ==================== MODO RÁPIDO ====================
async def modo_rapido():
"""Modo rápido: pide IP y nombre y envía saludo inmediato"""
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE - MODO RÁPIDO")
print("=" * 50)
# Solicitar IP
print("\n📡 Configuración:")
ip_default = "192.168.15.128"
ip = input(f" IP del ESP32 [{ip_default}]: ").strip()
if not ip:
ip = ip_default
# Solicitar nombre
nombre = input(" 👤 Nombre de la persona: ").strip()
if not nombre:
nombre = "Visitante"
# Conectar y enviar
bocina = BocinaInteligente(ip)
print("\n🔄 Conectando...")
if not await bocina.conectar():
print("❌ No se pudo conectar")
input("\nPresiona Enter para salir...")
return
print(f"\n🔊 Enviando saludo para '{nombre}'...")
audio = generar_saludo_personalizado(nombre)
await bocina.enviar_audio(audio, nombre)
# Mostrar estado
await asyncio.sleep(1)
estado = await bocina.obtener_estado()
if estado:
print(f"\n📊 Enviados: {estado.get('bytes_recibidos', 0)} bytes")
await bocina.desconectar()
print("\n✅ Saludo enviado!")
input("\nPresiona Enter para salir...")
# ==================== MAIN ====================
async def main():
"""Función principal"""
limpiar_pantalla()
print("\n" + "=" * 50)
print(" 🎵 BOCINA INTELIGENTE v1.0")
print("=" * 50)
print("\nSelecciona modo de operación:")
print(" 1. 🎮 Modo interactivo (menú completo)")
print(" 2. ⚡ Modo rápido (solo enviar saludo)")
print(" 3. 🚪 Salir")
modo = input("\n👉 Opción (1-3): ").strip()
if modo == "1":
await modo_interactivo()
elif modo == "2":
await modo_rapido()
else:
print("\n👋 Hasta luego!")
return
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n👋 Programa interrumpido")
except Exception as e:
print(f"\n❌ Error: {e}")
input("\nPresiona Enter para salir...")