diff --git a/quickwit/quickwit-actors/src/envelope.rs b/quickwit/quickwit-actors/src/envelope.rs index 0bdf504b519..a3ca6d6b9ad 100644 --- a/quickwit/quickwit-actors/src/envelope.rs +++ b/quickwit/quickwit-actors/src/envelope.rs @@ -50,13 +50,18 @@ impl Envelope { } } - /// Execute the captured handle function. + /// Executes the captured handle function. + /// + /// When exiting, also returns the message type name. pub async fn handle_message( &mut self, actor: &mut A, ctx: &ActorContext, - ) -> Result<(), ActorExitStatus> { - self.handler_envelope.handle_message(actor, ctx).await?; + ) -> Result<(), (ActorExitStatus, &'static str)> { + let handling_res = self.handler_envelope.handle_message(actor, ctx).await; + if let Err(exit_status) = handling_res { + return Err((exit_status, self.handler_envelope.message_type_name())); + } Ok(()) } } @@ -70,6 +75,8 @@ impl fmt::Debug for Envelope { #[async_trait] trait EnvelopeT: Send { + fn message_type_name(&self) -> &'static str; + fn debug_msg(&self) -> String; /// Returns the message as a boxed any. @@ -91,6 +98,10 @@ where A: DeferableReplyHandler, M: fmt::Debug + Send + 'static, { + fn message_type_name(&self) -> &'static str { + std::any::type_name::() + } + fn debug_msg(&self) -> String { #[allow(clippy::needless_option_take)] if let Some((_response_tx, msg)) = self.as_ref().take() { diff --git a/quickwit/quickwit-actors/src/spawn_builder.rs b/quickwit/quickwit-actors/src/spawn_builder.rs index 8cabe17ea57..6dfc1aa9155 100644 --- a/quickwit/quickwit-actors/src/spawn_builder.rs +++ b/quickwit/quickwit-actors/src/spawn_builder.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt; use std::time::Duration; use anyhow::Context; @@ -216,6 +217,26 @@ impl SpawnBuilder { } } +enum ActorExitPhase { + Initializing, + Handling { message: &'static str }, + Running, + OnDrainedMessaged, + Completed, +} + +impl fmt::Debug for ActorExitPhase { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ActorExitPhase::Initializing => write!(f, "initializing"), + ActorExitPhase::Handling { message } => write!(f, "handling({message})"), + ActorExitPhase::Running => write!(f, "running"), + ActorExitPhase::OnDrainedMessaged => write!(f, "on_drained_messages"), + ActorExitPhase::Completed => write!(f, "completed"), + } + } +} + /// Receives an envelope from either the high priority queue or the low priority queue. /// /// In the paused state, the actor will only attempt to receive high priority messages. @@ -250,10 +271,10 @@ impl ActorExecutionEnv { self.actor.get_mut().initialize(&self.ctx).await } - async fn process_messages(&mut self) -> ActorExitStatus { + async fn process_messages(&mut self) -> (ActorExitStatus, ActorExitPhase) { loop { - if let Err(exit_status) = self.process_all_available_messages().await { - return exit_status; + if let Err((exit_status, exit_phase)) = self.process_all_available_messages().await { + return (exit_status, exit_phase); } } } @@ -261,22 +282,25 @@ impl ActorExecutionEnv { async fn process_one_message( &mut self, mut envelope: Envelope, - ) -> Result<(), ActorExitStatus> { + ) -> Result<(), (ActorExitStatus, ActorExitPhase)> { self.yield_and_check_if_killed().await?; envelope .handle_message(self.actor.get_mut(), &self.ctx) - .await?; + .await + .map_err(|(exit_status, message)| { + (exit_status, ActorExitPhase::Handling { message }) + })?; Ok(()) } - async fn yield_and_check_if_killed(&mut self) -> Result<(), ActorExitStatus> { + async fn yield_and_check_if_killed(&mut self) -> Result<(), (ActorExitStatus, ActorExitPhase)> { if self.ctx.kill_switch().is_dead() { - return Err(ActorExitStatus::Killed); + return Err((ActorExitStatus::Killed, ActorExitPhase::Running)); } if self.actor.get_mut().yield_after_each_message() { self.ctx.yield_now().await; if self.ctx.kill_switch().is_dead() { - return Err(ActorExitStatus::Killed); + return Err((ActorExitStatus::Killed, ActorExitPhase::Running)); } } else { self.ctx.record_progress(); @@ -284,7 +308,9 @@ impl ActorExecutionEnv { Ok(()) } - async fn process_all_available_messages(&mut self) -> Result<(), ActorExitStatus> { + async fn process_all_available_messages( + &mut self, + ) -> Result<(), (ActorExitStatus, ActorExitPhase)> { self.yield_and_check_if_killed().await?; let envelope = recv_envelope(&mut self.inbox, &self.ctx).await; self.process_one_message(envelope).await?; @@ -304,7 +330,11 @@ impl ActorExecutionEnv { break; } } - self.actor.get_mut().on_drained_messages(&self.ctx).await?; + self.actor + .get_mut() + .on_drained_messages(&self.ctx) + .await + .map_err(|exit_status| (exit_status, ActorExitPhase::OnDrainedMessaged))?; } if self.ctx.mailbox().is_last_mailbox() { // We double check here that the mailbox does not contain any messages, @@ -314,8 +344,7 @@ impl ActorExecutionEnv { if self.inbox.is_empty() { // No one will be able to send us more messages. // We can exit the actor. - info!(actor = self.ctx.actor_instance_id(), "no more messages"); - return Err(ActorExitStatus::Success); + return Err((ActorExitStatus::Success, ActorExitPhase::Completed)); } } @@ -340,23 +369,6 @@ impl ActorExecutionEnv { } exit_status } - - fn process_exit_status(&self, exit_status: &ActorExitStatus) { - match &exit_status { - ActorExitStatus::Success - | ActorExitStatus::Quit - | ActorExitStatus::DownstreamClosed - | ActorExitStatus::Killed => {} - ActorExitStatus::Failure(err) => { - error!(cause=?err, exit_status=?exit_status, "actor-failure"); - } - ActorExitStatus::Panicked => { - error!(exit_status=?exit_status, "actor-failure"); - } - } - info!(actor_id = %self.ctx.actor_instance_id(), exit_status = %exit_status, "actor-exit"); - self.ctx.exit(exit_status); - } } impl Drop for ActorExecutionEnv { @@ -382,19 +394,32 @@ async fn actor_loop( let initialize_exit_status_res: Result<(), ActorExitStatus> = actor_env.initialize().await; drop(no_advance_time_guard); - let after_process_exit_status = if let Err(initialize_exit_status) = initialize_exit_status_res - { - // We do not process messages if initialize yield an error. - // We still call finalize however! - initialize_exit_status - } else { - actor_env.process_messages().await + let (after_process_exit_status, exit_phase) = + if let Err(initialize_exit_status) = initialize_exit_status_res { + // We do not process messages if initialize yield an error. + // We still call finalize however! + (initialize_exit_status, ActorExitPhase::Initializing) + } else { + actor_env.process_messages().await + }; + + let actor_id = actor_env.ctx.actor_instance_id(); + match after_process_exit_status { + ActorExitStatus::Success + | ActorExitStatus::Quit + | ActorExitStatus::DownstreamClosed + | ActorExitStatus::Killed => { + info!(actor_id, phase = ?exit_phase, exit_status = ?after_process_exit_status, "actor-exit"); + } + ActorExitStatus::Failure(_) | ActorExitStatus::Panicked => { + error!(actor_id, phase = ?exit_phase, exit_status = ?after_process_exit_status, "actor-exit"); + } }; // TODO the no advance time guard for finalize has a race condition. Ideally we would // like to have the guard before we drop the last envelope. let final_exit_status = actor_env.finalize(after_process_exit_status).await; // The last observation is collected on `ActorExecutionEnv::Drop`. - actor_env.process_exit_status(&final_exit_status); + actor_env.ctx.exit(&final_exit_status); final_exit_status }