From 6949773dd3133d604c639017f58ecc99f24f1381 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Fri, 19 Sep 2025 08:20:22 +0000 Subject: [PATCH 1/2] Refactor audio interface to properly handle start/stop and threads Co-authored-by: angelo --- .../default_audio_interface.py | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/src/elevenlabs/conversational_ai/default_audio_interface.py b/src/elevenlabs/conversational_ai/default_audio_interface.py index e3bd9ad8..e97a3de9 100644 --- a/src/elevenlabs/conversational_ai/default_audio_interface.py +++ b/src/elevenlabs/conversational_ai/default_audio_interface.py @@ -16,6 +16,8 @@ def __init__(self): except ImportError: raise ImportError("To use DefaultAudioInterface you must install pyaudio.") self.pyaudio = pyaudio + self.should_stop = threading.Event() + self.output_thread = None def start(self, input_callback: Callable[[bytes], None]): # Audio input is using callbacks from pyaudio which we simply pass through. @@ -24,7 +26,7 @@ def start(self, input_callback: Callable[[bytes], None]): # Audio output is buffered so we can handle interruptions. # Start a separate thread to handle writing to the output stream. self.output_queue: queue.Queue[bytes] = queue.Queue() - self.should_stop = threading.Event() + self.should_stop.clear() # Reset the event in case start is called multiple times self.output_thread = threading.Thread(target=self._output_thread) self.p = self.pyaudio.PyAudio() @@ -50,11 +52,15 @@ def start(self, input_callback: Callable[[bytes], None]): def stop(self): self.should_stop.set() - self.output_thread.join() - self.in_stream.stop_stream() - self.in_stream.close() - self.out_stream.close() - self.p.terminate() + if self.output_thread and self.output_thread.is_alive(): + self.output_thread.join() + if hasattr(self, 'in_stream'): + self.in_stream.stop_stream() + self.in_stream.close() + if hasattr(self, 'out_stream'): + self.out_stream.close() + if hasattr(self, 'p'): + self.p.terminate() def output(self, audio: bytes): self.output_queue.put(audio) @@ -94,6 +100,8 @@ def __init__(self): except ImportError: raise ImportError("To use AsyncDefaultAudioInterface you must install pyaudio.") self.pyaudio = pyaudio + self.should_stop = asyncio.Event() + self.output_task = None async def start(self, input_callback: Callable[[bytes], Awaitable[None]]): # Audio input is using callbacks from pyaudio which we adapt to async @@ -102,7 +110,7 @@ async def start(self, input_callback: Callable[[bytes], Awaitable[None]]): # Audio output is buffered so we can handle interruptions. # Start a separate task to handle writing to the output stream. self.output_queue: asyncio.Queue[bytes] = asyncio.Queue() - self.should_stop = asyncio.Event() + self.should_stop.clear() # Reset the event in case start is called multiple times self.p = self.pyaudio.PyAudio() self.in_stream = self.p.open( @@ -128,11 +136,15 @@ async def start(self, input_callback: Callable[[bytes], Awaitable[None]]): async def stop(self): self.should_stop.set() - await self.output_task - self.in_stream.stop_stream() - self.in_stream.close() - self.out_stream.close() - self.p.terminate() + if self.output_task and not self.output_task.done(): + await self.output_task + if hasattr(self, 'in_stream'): + self.in_stream.stop_stream() + self.in_stream.close() + if hasattr(self, 'out_stream'): + self.out_stream.close() + if hasattr(self, 'p'): + self.p.terminate() async def output(self, audio: bytes): await self.output_queue.put(audio) From fdd74598075ed6d6adfee0a90aaa9bcab0649263 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Fri, 19 Sep 2025 08:32:11 +0000 Subject: [PATCH 2/2] Refactor audio interfaces to properly manage resources Initialize and clean up audio streams and PyAudio instances. Add a _started flag to prevent double starts and ensure proper cleanup. Co-authored-by: angelo --- .../default_audio_interface.py | 112 +++++++++++++----- 1 file changed, 84 insertions(+), 28 deletions(-) diff --git a/src/elevenlabs/conversational_ai/default_audio_interface.py b/src/elevenlabs/conversational_ai/default_audio_interface.py index e97a3de9..ef8adf17 100644 --- a/src/elevenlabs/conversational_ai/default_audio_interface.py +++ b/src/elevenlabs/conversational_ai/default_audio_interface.py @@ -18,14 +18,24 @@ def __init__(self): self.pyaudio = pyaudio self.should_stop = threading.Event() self.output_thread = None + self.output_queue = None + self.in_stream = None + self.out_stream = None + self.p = None + self.input_callback = None + self._started = False def start(self, input_callback: Callable[[bytes], None]): + if self._started: + # If already started, stop first to avoid resource leaks + self.stop() + # Audio input is using callbacks from pyaudio which we simply pass through. self.input_callback = input_callback # Audio output is buffered so we can handle interruptions. # Start a separate thread to handle writing to the output stream. - self.output_queue: queue.Queue[bytes] = queue.Queue() + self.output_queue = queue.Queue() self.should_stop.clear() # Reset the event in case start is called multiple times self.output_thread = threading.Thread(target=self._output_thread) @@ -49,32 +59,50 @@ def start(self, input_callback: Callable[[bytes], None]): ) self.output_thread.start() + self._started = True def stop(self): + if not self._started: + return # Nothing to stop + self.should_stop.set() + if self.output_thread and self.output_thread.is_alive(): self.output_thread.join() - if hasattr(self, 'in_stream'): + + if self.in_stream: self.in_stream.stop_stream() self.in_stream.close() - if hasattr(self, 'out_stream'): + self.in_stream = None + + if self.out_stream: self.out_stream.close() - if hasattr(self, 'p'): + self.out_stream = None + + if self.p: self.p.terminate() + self.p = None + + self.output_thread = None + self.output_queue = None + self.input_callback = None + self._started = False def output(self, audio: bytes): - self.output_queue.put(audio) + if self.output_queue: + self.output_queue.put(audio) def interrupt(self): # Clear the output queue to stop any audio that is currently playing. # Note: We can't atomically clear the whole queue, but we are doing # it from the message handling thread so no new audio will be added # while we are clearing. - try: - while True: - _ = self.output_queue.get(block=False) - except queue.Empty: - pass + if self.output_queue: + try: + while True: + _ = self.output_queue.get(block=False) + except queue.Empty: + pass def _output_thread(self): while not self.should_stop.is_set(): @@ -102,14 +130,24 @@ def __init__(self): self.pyaudio = pyaudio self.should_stop = asyncio.Event() self.output_task = None + self.output_queue = None + self.in_stream = None + self.out_stream = None + self.p = None + self.input_callback = None + self._started = False async def start(self, input_callback: Callable[[bytes], Awaitable[None]]): + if self._started: + # If already started, stop first to avoid resource leaks + await self.stop() + # Audio input is using callbacks from pyaudio which we adapt to async self.input_callback = input_callback # Audio output is buffered so we can handle interruptions. # Start a separate task to handle writing to the output stream. - self.output_queue: asyncio.Queue[bytes] = asyncio.Queue() + self.output_queue = asyncio.Queue() self.should_stop.clear() # Reset the event in case start is called multiple times self.p = self.pyaudio.PyAudio() @@ -133,37 +171,55 @@ async def start(self, input_callback: Callable[[bytes], Awaitable[None]]): # Start the output task self.output_task = asyncio.create_task(self._output_task()) + self._started = True async def stop(self): + if not self._started: + return # Nothing to stop + self.should_stop.set() + if self.output_task and not self.output_task.done(): await self.output_task - if hasattr(self, 'in_stream'): + + if self.in_stream: self.in_stream.stop_stream() self.in_stream.close() - if hasattr(self, 'out_stream'): + self.in_stream = None + + if self.out_stream: self.out_stream.close() - if hasattr(self, 'p'): + self.out_stream = None + + if self.p: self.p.terminate() + self.p = None + + self.output_task = None + self.output_queue = None + self.input_callback = None + self._started = False async def output(self, audio: bytes): - await self.output_queue.put(audio) + if self.output_queue: + await self.output_queue.put(audio) async def interrupt(self): # Clear the output queue to stop any audio that is currently playing. - try: - while True: - try: - _ = self.output_queue.get_nowait() - except asyncio.QueueEmpty: - break - except AttributeError: - # In Python 3.8, it's asyncio.QueueEmpty, in 3.10+ it's asyncio.QueueEmpty - while not self.output_queue.empty(): - try: - _ = self.output_queue.get_nowait() - except: - break + if self.output_queue: + try: + while True: + try: + _ = self.output_queue.get_nowait() + except asyncio.QueueEmpty: + break + except AttributeError: + # In Python 3.8, it's asyncio.QueueEmpty, in 3.10+ it's asyncio.QueueEmpty + while not self.output_queue.empty(): + try: + _ = self.output_queue.get_nowait() + except: + break async def _output_task(self): while not self.should_stop.is_set():