Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions plugins/listener/src/actors/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct SourceState {
_silence_stream_tx: Option<std::sync::mpsc::Sender<()>>,
_device_event_thread: Option<std::thread::JoinHandle<()>>,
current_mode: ChannelMode,
initialization_complete: Arc<AtomicBool>,
}

pub struct SourceActor;
Expand All @@ -65,6 +66,8 @@ impl Actor for SourceActor {
let device_monitor_handle = DeviceMonitor::spawn(event_tx);

let myself_clone = myself.clone();
let initialization_complete = Arc::new(AtomicBool::new(false));
let initialization_complete_clone = initialization_complete.clone();

let device_event_thread = std::thread::spawn(move || {
use std::sync::mpsc::RecvTimeoutError;
Expand All @@ -77,6 +80,11 @@ impl Actor for SourceActor {
Ok(event) => match event {
DeviceEvent::DefaultInputChanged { .. }
| DeviceEvent::DefaultOutputChanged { .. } => {
if !initialization_complete_clone.load(Ordering::Relaxed) {
tracing::info!(event = ?event, "device_event_ignored_during_init");
continue;
}

tracing::info!(event = ?event, "device_event_outer");

loop {
Expand Down Expand Up @@ -121,6 +129,7 @@ impl Actor for SourceActor {
_silence_stream_tx: silence_stream_tx,
_device_event_thread: Some(device_event_thread),
current_mode: ChannelMode::Dual,
initialization_complete,
};

start_source_loop(&myself, &mut st).await?;
Expand Down Expand Up @@ -149,6 +158,7 @@ impl Actor for SourceActor {
}
SourceMsg::SetMicDevice(dev) => {
st.mic_device = dev;
st.initialization_complete.store(false, Ordering::Relaxed);

if let Some(cancel_token) = st.stream_cancel_token.take() {
cancel_token.cancel();
Expand Down Expand Up @@ -193,6 +203,7 @@ async fn start_source_loop(
let token = st.token.clone();
let mic_muted = st.mic_muted.clone();
let mic_device = st.mic_device.clone();
let initialization_complete = st.initialization_complete.clone();

let stream_cancel_token = CancellationToken::new();
st.stream_cancel_token = Some(stream_cancel_token.clone());
Expand Down Expand Up @@ -250,6 +261,9 @@ async fn start_source_loop(
tokio::pin!(mic_stream);
tokio::pin!(spk_stream);

initialization_complete.store(true, Ordering::Relaxed);
tracing::info!("audio_streams_initialized");

loop {
let Some(cell) = registry::where_is(ProcessorActor::name()) else {
tracing::warn!("processor_actor_not_found");
Expand Down Expand Up @@ -312,6 +326,9 @@ async fn start_source_loop(
tokio::pin!(mic_stream);
tokio::pin!(spk_stream);

initialization_complete.store(true, Ordering::Relaxed);
tracing::info!("audio_streams_initialized");

loop {
let Some(cell) = registry::where_is(ProcessorActor::name()) else {
tracing::warn!("processor_actor_not_found");
Expand Down