Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 102 additions & 34 deletions src/elevenlabs/conversational_ai/default_audio_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,27 @@ def __init__(self):
except ImportError:
raise ImportError("To use DefaultAudioInterface you must install pyaudio.")
self.pyaudio = pyaudio
self.should_stop = threading.Event()
self.output_thread = None
self.output_queue = None
self.in_stream = None
self.out_stream = None
self.p = None
self.input_callback = None
self._started = False

def start(self, input_callback: Callable[[bytes], None]):
if self._started:
# If already started, stop first to avoid resource leaks
self.stop()

# Audio input is using callbacks from pyaudio which we simply pass through.
self.input_callback = input_callback

# Audio output is buffered so we can handle interruptions.
# Start a separate thread to handle writing to the output stream.
self.output_queue: queue.Queue[bytes] = queue.Queue()
self.should_stop = threading.Event()
self.output_queue = queue.Queue()
self.should_stop.clear() # Reset the event in case start is called multiple times
self.output_thread = threading.Thread(target=self._output_thread)

self.p = self.pyaudio.PyAudio()
Expand All @@ -47,28 +59,50 @@ def start(self, input_callback: Callable[[bytes], None]):
)

self.output_thread.start()
self._started = True

def stop(self):
if not self._started:
return # Nothing to stop

self.should_stop.set()
self.output_thread.join()
self.in_stream.stop_stream()
self.in_stream.close()
self.out_stream.close()
self.p.terminate()

if self.output_thread and self.output_thread.is_alive():
self.output_thread.join()

if self.in_stream:
self.in_stream.stop_stream()
self.in_stream.close()
self.in_stream = None

if self.out_stream:
self.out_stream.close()
self.out_stream = None

if self.p:
self.p.terminate()
self.p = None

self.output_thread = None
self.output_queue = None
self.input_callback = None
self._started = False

def output(self, audio: bytes):
self.output_queue.put(audio)
if self.output_queue:
self.output_queue.put(audio)

def interrupt(self):
# Clear the output queue to stop any audio that is currently playing.
# Note: We can't atomically clear the whole queue, but we are doing
# it from the message handling thread so no new audio will be added
# while we are clearing.
try:
while True:
_ = self.output_queue.get(block=False)
except queue.Empty:
pass
if self.output_queue:
try:
while True:
_ = self.output_queue.get(block=False)
except queue.Empty:
pass

def _output_thread(self):
while not self.should_stop.is_set():
Expand All @@ -94,15 +128,27 @@ def __init__(self):
except ImportError:
raise ImportError("To use AsyncDefaultAudioInterface you must install pyaudio.")
self.pyaudio = pyaudio
self.should_stop = asyncio.Event()
self.output_task = None
self.output_queue = None
self.in_stream = None
self.out_stream = None
self.p = None
self.input_callback = None
self._started = False

async def start(self, input_callback: Callable[[bytes], Awaitable[None]]):
if self._started:
# If already started, stop first to avoid resource leaks
await self.stop()

# Audio input is using callbacks from pyaudio which we adapt to async
self.input_callback = input_callback

# Audio output is buffered so we can handle interruptions.
# Start a separate task to handle writing to the output stream.
self.output_queue: asyncio.Queue[bytes] = asyncio.Queue()
self.should_stop = asyncio.Event()
self.output_queue = asyncio.Queue()
self.should_stop.clear() # Reset the event in case start is called multiple times

self.p = self.pyaudio.PyAudio()
self.in_stream = self.p.open(
Expand All @@ -125,33 +171,55 @@ async def start(self, input_callback: Callable[[bytes], Awaitable[None]]):

# Start the output task
self.output_task = asyncio.create_task(self._output_task())
self._started = True

async def stop(self):
if not self._started:
return # Nothing to stop

self.should_stop.set()
await self.output_task
self.in_stream.stop_stream()
self.in_stream.close()
self.out_stream.close()
self.p.terminate()

if self.output_task and not self.output_task.done():
await self.output_task

if self.in_stream:
self.in_stream.stop_stream()
self.in_stream.close()
self.in_stream = None

if self.out_stream:
self.out_stream.close()
self.out_stream = None

if self.p:
self.p.terminate()
self.p = None

self.output_task = None
self.output_queue = None
self.input_callback = None
self._started = False

async def output(self, audio: bytes):
await self.output_queue.put(audio)
if self.output_queue:
await self.output_queue.put(audio)

async def interrupt(self):
# Clear the output queue to stop any audio that is currently playing.
try:
while True:
try:
_ = self.output_queue.get_nowait()
except asyncio.QueueEmpty:
break
except AttributeError:
# In Python 3.8, it's asyncio.QueueEmpty, in 3.10+ it's asyncio.QueueEmpty
while not self.output_queue.empty():
try:
_ = self.output_queue.get_nowait()
except:
break
if self.output_queue:
try:
while True:
try:
_ = self.output_queue.get_nowait()
except asyncio.QueueEmpty:
break
except AttributeError:
# In Python 3.8, it's asyncio.QueueEmpty, in 3.10+ it's asyncio.QueueEmpty
while not self.output_queue.empty():
try:
_ = self.output_queue.get_nowait()
except:
break

async def _output_task(self):
while not self.should_stop.is_set():
Expand Down