224 lines
7.4 KiB
Python
224 lines
7.4 KiB
Python
"""
|
|
BOCINA CORE - Módulo para controlar la bocina ESP32
|
|
"""
|
|
|
|
import asyncio
|
|
import websockets
|
|
import json
|
|
import struct
|
|
import math
|
|
from typing import Optional, Dict, Any
|
|
|
|
from .config import config
|
|
|
|
# ==================== CONSTANTES ====================
|
|
CHUNK_SIZE = 1024
|
|
SAMPLE_RATE = 16000
|
|
|
|
|
|
# ==================== CLASE PRINCIPAL ====================
|
|
class BocinaCore:
|
|
"""Clase principal para controlar la bocina ESP32"""
|
|
|
|
def __init__(self, ip: str = None, puerto: int = None):
|
|
"""
|
|
Inicializa el controlador de la bocina
|
|
|
|
Args:
|
|
ip: IP del ESP32 (si es None, usa la del archivo de configuración)
|
|
puerto: Puerto WebSocket (si es None, usa el del archivo)
|
|
"""
|
|
self.ip = ip or config.obtener_ip()
|
|
self.puerto = puerto or config.obtener_puerto()
|
|
self.url = f"ws://{self.ip}:{self.puerto}"
|
|
self.duracion_ms = config.obtener_duracion()
|
|
self.tono_base = config.obtener_tono_base()
|
|
self.amplitud = config.obtener_amplitud()
|
|
self.timeout = config.obtener_timeout()
|
|
|
|
self._websocket = None
|
|
self._conectado = False
|
|
|
|
# ==================== MÉTODOS PÚBLICOS ====================
|
|
|
|
def saludar(self, nombre: str, duracion_ms: int = None, tono_personalizado: bool = True) -> bool:
|
|
"""
|
|
Envía un saludo a la bocina
|
|
|
|
Args:
|
|
nombre: Nombre de la persona
|
|
duracion_ms: Duración del saludo (None = usa config)
|
|
tono_personalizado: Si True, varía el tono según el nombre
|
|
"""
|
|
duracion = duracion_ms or self.duracion_ms
|
|
return self._ejecutar_async(self._saludar_async(nombre, duracion, tono_personalizado))
|
|
|
|
def detener(self) -> bool:
|
|
"""Detiene la reproducción actual"""
|
|
return self._ejecutar_async(self._detener_async())
|
|
|
|
def estado(self) -> Dict[str, Any]:
|
|
"""Obtiene el estado actual de la bocina"""
|
|
return self._ejecutar_async(self._estado_async())
|
|
|
|
def ping(self) -> bool:
|
|
"""Prueba la conexión con la bocina"""
|
|
return self._ejecutar_async(self._ping_async())
|
|
|
|
def conectar(self) -> bool:
|
|
"""Establece conexión manual con la bocina"""
|
|
return self._ejecutar_async(self._conectar_async())
|
|
|
|
def desconectar(self) -> bool:
|
|
"""Cierra la conexión con la bocina"""
|
|
return self._ejecutar_async(self._desconectar_async())
|
|
|
|
# ==================== MÉTODOS INTERNOS ====================
|
|
|
|
def _ejecutar_async(self, corutina):
|
|
"""Ejecuta una función asíncrona desde código síncrono"""
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
if loop.is_running():
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
resultado = loop.run_until_complete(corutina)
|
|
loop.close()
|
|
return resultado
|
|
else:
|
|
return loop.run_until_complete(corutina)
|
|
except RuntimeError:
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
resultado = loop.run_until_complete(corutina)
|
|
loop.close()
|
|
return resultado
|
|
|
|
async def _conectar_async(self) -> bool:
|
|
"""Conexión asíncrona"""
|
|
try:
|
|
self._websocket = await websockets.connect(self.url, timeout=self.timeout)
|
|
await self._websocket.recv()
|
|
self._conectado = True
|
|
return True
|
|
except Exception:
|
|
self._conectado = False
|
|
return False
|
|
|
|
async def _desconectar_async(self) -> bool:
|
|
"""Desconexión asíncrona"""
|
|
if self._websocket:
|
|
await self._websocket.close()
|
|
self._websocket = None
|
|
self._conectado = False
|
|
return True
|
|
|
|
async def _asegurar_conexion(self) -> bool:
|
|
"""Asegura que haya una conexión activa"""
|
|
if not self._conectado or not self._websocket:
|
|
return await self._conectar_async()
|
|
return True
|
|
|
|
async def _saludar_async(self, nombre: str, duracion_ms: int, tono_personalizado: bool) -> bool:
|
|
"""Enviar saludo asíncrono"""
|
|
if not await self._asegurar_conexion():
|
|
return False
|
|
|
|
try:
|
|
audio = self._generar_audio(nombre, duracion_ms, tono_personalizado)
|
|
|
|
for i in range(0, len(audio), CHUNK_SIZE):
|
|
chunk = audio[i:i + CHUNK_SIZE]
|
|
await self._websocket.send(chunk)
|
|
await asyncio.sleep(0.005)
|
|
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
async def _detener_async(self) -> bool:
|
|
"""Detener reproducción asíncrono"""
|
|
if not await self._asegurar_conexion():
|
|
return False
|
|
|
|
try:
|
|
await self._websocket.send(json.dumps({"cmd": "STOP"}))
|
|
await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
async def _estado_async(self) -> Dict[str, Any]:
|
|
"""Obtener estado asíncrono"""
|
|
if not await self._asegurar_conexion():
|
|
return {}
|
|
|
|
try:
|
|
await self._websocket.send(json.dumps({"cmd": "STATUS"}))
|
|
respuesta = await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout)
|
|
return json.loads(respuesta)
|
|
except Exception:
|
|
return {}
|
|
|
|
async def _ping_async(self) -> bool:
|
|
"""Ping asíncrono"""
|
|
if not await self._asegurar_conexion():
|
|
return False
|
|
|
|
try:
|
|
await self._websocket.send(json.dumps({"cmd": "PING"}))
|
|
respuesta = await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout)
|
|
data = json.loads(respuesta)
|
|
return data.get("status") == "ok"
|
|
except Exception:
|
|
return False
|
|
|
|
def _generar_audio(self, nombre: str, duracion_ms: int, tono_personalizado: bool) -> bytes:
|
|
"""Genera audio PCM para el saludo"""
|
|
num_muestras = int(SAMPLE_RATE * duracion_ms / 1000)
|
|
|
|
if tono_personalizado:
|
|
frecuencia = self.tono_base + (len(nombre) * 10)
|
|
if frecuencia > 800:
|
|
frecuencia = 800
|
|
else:
|
|
frecuencia = self.tono_base
|
|
|
|
audio = bytearray()
|
|
for i in range(num_muestras):
|
|
valor = int(self.amplitud * math.sin(2 * math.pi * frecuencia * i / SAMPLE_RATE))
|
|
audio.extend(struct.pack('<h', valor))
|
|
|
|
return bytes(audio)
|
|
|
|
|
|
# ==================== FUNCIONES SIMPLES (API RÁPIDA) ====================
|
|
|
|
def saludar(nombre: str, ip: str = None) -> bool:
|
|
"""Función rápida para saludar a una persona"""
|
|
bocina = BocinaCore(ip)
|
|
return bocina.saludar(nombre)
|
|
|
|
|
|
def detener(ip: str = None) -> bool:
|
|
"""Detiene la reproducción"""
|
|
bocina = BocinaCore(ip)
|
|
return bocina.detener()
|
|
|
|
|
|
def obtener_estado(ip: str = None) -> dict:
|
|
"""Obtiene el estado de la bocina"""
|
|
bocina = BocinaCore(ip)
|
|
return bocina.estado()
|
|
|
|
|
|
def configurar_ip(nueva_ip: str):
|
|
"""Actualiza la IP en el archivo de configuración"""
|
|
from .config import config
|
|
config.actualizar_ip(nueva_ip)
|
|
|
|
|
|
def mostrar_configuracion():
|
|
"""Muestra la configuración actual"""
|
|
from .config import config
|
|
config.mostrar_configuracion() |