""" BOCINA CORE - Módulo para controlar la bocina ESP32 """ import asyncio import websockets import json import struct import math from typing import Optional, Dict, Any from .config import config # ==================== CONSTANTES ==================== CHUNK_SIZE = 1024 SAMPLE_RATE = 16000 # ==================== CLASE PRINCIPAL ==================== class BocinaCore: """Clase principal para controlar la bocina ESP32""" def __init__(self, ip: str = None, puerto: int = None): """ Inicializa el controlador de la bocina Args: ip: IP del ESP32 (si es None, usa la del archivo de configuración) puerto: Puerto WebSocket (si es None, usa el del archivo) """ self.ip = ip or config.obtener_ip() self.puerto = puerto or config.obtener_puerto() self.url = f"ws://{self.ip}:{self.puerto}" self.duracion_ms = config.obtener_duracion() self.tono_base = config.obtener_tono_base() self.amplitud = config.obtener_amplitud() self.timeout = config.obtener_timeout() self._websocket = None self._conectado = False # ==================== MÉTODOS PÚBLICOS ==================== def saludar(self, nombre: str, duracion_ms: int = None, tono_personalizado: bool = True) -> bool: """ Envía un saludo a la bocina Args: nombre: Nombre de la persona duracion_ms: Duración del saludo (None = usa config) tono_personalizado: Si True, varía el tono según el nombre """ duracion = duracion_ms or self.duracion_ms return self._ejecutar_async(self._saludar_async(nombre, duracion, tono_personalizado)) def detener(self) -> bool: """Detiene la reproducción actual""" return self._ejecutar_async(self._detener_async()) def estado(self) -> Dict[str, Any]: """Obtiene el estado actual de la bocina""" return self._ejecutar_async(self._estado_async()) def ping(self) -> bool: """Prueba la conexión con la bocina""" return self._ejecutar_async(self._ping_async()) def conectar(self) -> bool: """Establece conexión manual con la bocina""" return self._ejecutar_async(self._conectar_async()) def desconectar(self) -> bool: """Cierra la conexión con la bocina""" return self._ejecutar_async(self._desconectar_async()) # ==================== MÉTODOS INTERNOS ==================== def _ejecutar_async(self, corutina): """Ejecuta una función asíncrona desde código síncrono""" try: loop = asyncio.get_event_loop() if loop.is_running(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) resultado = loop.run_until_complete(corutina) loop.close() return resultado else: return loop.run_until_complete(corutina) except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) resultado = loop.run_until_complete(corutina) loop.close() return resultado async def _conectar_async(self) -> bool: """Conexión asíncrona""" try: self._websocket = await websockets.connect(self.url, timeout=self.timeout) await self._websocket.recv() self._conectado = True return True except Exception: self._conectado = False return False async def _desconectar_async(self) -> bool: """Desconexión asíncrona""" if self._websocket: await self._websocket.close() self._websocket = None self._conectado = False return True async def _asegurar_conexion(self) -> bool: """Asegura que haya una conexión activa""" if not self._conectado or not self._websocket: return await self._conectar_async() return True async def _saludar_async(self, nombre: str, duracion_ms: int, tono_personalizado: bool) -> bool: """Enviar saludo asíncrono""" if not await self._asegurar_conexion(): return False try: audio = self._generar_audio(nombre, duracion_ms, tono_personalizado) for i in range(0, len(audio), CHUNK_SIZE): chunk = audio[i:i + CHUNK_SIZE] await self._websocket.send(chunk) await asyncio.sleep(0.005) return True except Exception: return False async def _detener_async(self) -> bool: """Detener reproducción asíncrono""" if not await self._asegurar_conexion(): return False try: await self._websocket.send(json.dumps({"cmd": "STOP"})) await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout) return True except Exception: return False async def _estado_async(self) -> Dict[str, Any]: """Obtener estado asíncrono""" if not await self._asegurar_conexion(): return {} try: await self._websocket.send(json.dumps({"cmd": "STATUS"})) respuesta = await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout) return json.loads(respuesta) except Exception: return {} async def _ping_async(self) -> bool: """Ping asíncrono""" if not await self._asegurar_conexion(): return False try: await self._websocket.send(json.dumps({"cmd": "PING"})) respuesta = await asyncio.wait_for(self._websocket.recv(), timeout=self.timeout) data = json.loads(respuesta) return data.get("status") == "ok" except Exception: return False def _generar_audio(self, nombre: str, duracion_ms: int, tono_personalizado: bool) -> bytes: """Genera audio PCM para el saludo""" num_muestras = int(SAMPLE_RATE * duracion_ms / 1000) if tono_personalizado: frecuencia = self.tono_base + (len(nombre) * 10) if frecuencia > 800: frecuencia = 800 else: frecuencia = self.tono_base audio = bytearray() for i in range(num_muestras): valor = int(self.amplitud * math.sin(2 * math.pi * frecuencia * i / SAMPLE_RATE)) audio.extend(struct.pack(' bool: """Función rápida para saludar a una persona""" bocina = BocinaCore(ip) return bocina.saludar(nombre) def detener(ip: str = None) -> bool: """Detiene la reproducción""" bocina = BocinaCore(ip) return bocina.detener() def obtener_estado(ip: str = None) -> dict: """Obtiene el estado de la bocina""" bocina = BocinaCore(ip) return bocina.estado() def configurar_ip(nueva_ip: str): """Actualiza la IP en el archivo de configuración""" from .config import config config.actualizar_ip(nueva_ip) def mostrar_configuracion(): """Muestra la configuración actual""" from .config import config config.mostrar_configuracion()