Skip to content

Commit 35b33da

Browse files
committed
Log actor exit phase
1 parent d2c6554 commit 35b33da

File tree

2 files changed

+76
-40
lines changed

2 files changed

+76
-40
lines changed

quickwit/quickwit-actors/src/envelope.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,18 @@ impl<A: Actor> Envelope<A> {
5050
}
5151
}
5252

53-
/// Execute the captured handle function.
53+
/// Executes the captured handle function.
54+
///
55+
/// When exiting, also returns the message type name.
5456
pub async fn handle_message(
5557
&mut self,
5658
actor: &mut A,
5759
ctx: &ActorContext<A>,
58-
) -> Result<(), ActorExitStatus> {
59-
self.handler_envelope.handle_message(actor, ctx).await?;
60+
) -> Result<(), (ActorExitStatus, &'static str)> {
61+
let handling_res = self.handler_envelope.handle_message(actor, ctx).await;
62+
if let Err(exit_status) = handling_res {
63+
return Err((exit_status, self.handler_envelope.message_type_name()));
64+
}
6065
Ok(())
6166
}
6267
}
@@ -70,6 +75,8 @@ impl<A: Actor> fmt::Debug for Envelope<A> {
7075

7176
#[async_trait]
7277
trait EnvelopeT<A: Actor>: Send {
78+
fn message_type_name(&self) -> &'static str;
79+
7380
fn debug_msg(&self) -> String;
7481

7582
/// Returns the message as a boxed any.
@@ -91,6 +98,10 @@ where
9198
A: DeferableReplyHandler<M>,
9299
M: fmt::Debug + Send + 'static,
93100
{
101+
fn message_type_name(&self) -> &'static str {
102+
std::any::type_name::<M>()
103+
}
104+
94105
fn debug_msg(&self) -> String {
95106
#[allow(clippy::needless_option_take)]
96107
if let Some((_response_tx, msg)) = self.as_ref().take() {

quickwit/quickwit-actors/src/spawn_builder.rs

Lines changed: 62 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::fmt;
1516
use std::time::Duration;
1617

1718
use anyhow::Context;
@@ -216,6 +217,26 @@ impl<A: Actor + Default> SpawnBuilder<A> {
216217
}
217218
}
218219

220+
enum ActorExitPhase {
221+
Initializing,
222+
Handling { message: &'static str },
223+
Running,
224+
OnDrainedMessaged,
225+
Completed,
226+
}
227+
228+
impl fmt::Debug for ActorExitPhase {
229+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230+
match self {
231+
ActorExitPhase::Initializing => write!(f, "initializing"),
232+
ActorExitPhase::Handling { message } => write!(f, "handling({message})"),
233+
ActorExitPhase::Running => write!(f, "running"),
234+
ActorExitPhase::OnDrainedMessaged => write!(f, "on_drained_messages"),
235+
ActorExitPhase::Completed => write!(f, "completed"),
236+
}
237+
}
238+
}
239+
219240
/// Receives an envelope from either the high priority queue or the low priority queue.
220241
///
221242
/// In the paused state, the actor will only attempt to receive high priority messages.
@@ -250,41 +271,46 @@ impl<A: Actor> ActorExecutionEnv<A> {
250271
self.actor.get_mut().initialize(&self.ctx).await
251272
}
252273

253-
async fn process_messages(&mut self) -> ActorExitStatus {
274+
async fn process_messages(&mut self) -> (ActorExitStatus, ActorExitPhase) {
254275
loop {
255-
if let Err(exit_status) = self.process_all_available_messages().await {
256-
return exit_status;
276+
if let Err((exit_status, exit_phase)) = self.process_all_available_messages().await {
277+
return (exit_status, exit_phase);
257278
}
258279
}
259280
}
260281

261282
async fn process_one_message(
262283
&mut self,
263284
mut envelope: Envelope<A>,
264-
) -> Result<(), ActorExitStatus> {
285+
) -> Result<(), (ActorExitStatus, ActorExitPhase)> {
265286
self.yield_and_check_if_killed().await?;
266287
envelope
267288
.handle_message(self.actor.get_mut(), &self.ctx)
268-
.await?;
289+
.await
290+
.map_err(|(exit_status, message)| {
291+
(exit_status, ActorExitPhase::Handling { message })
292+
})?;
269293
Ok(())
270294
}
271295

272-
async fn yield_and_check_if_killed(&mut self) -> Result<(), ActorExitStatus> {
296+
async fn yield_and_check_if_killed(&mut self) -> Result<(), (ActorExitStatus, ActorExitPhase)> {
273297
if self.ctx.kill_switch().is_dead() {
274-
return Err(ActorExitStatus::Killed);
298+
return Err((ActorExitStatus::Killed, ActorExitPhase::Running));
275299
}
276300
if self.actor.get_mut().yield_after_each_message() {
277301
self.ctx.yield_now().await;
278302
if self.ctx.kill_switch().is_dead() {
279-
return Err(ActorExitStatus::Killed);
303+
return Err((ActorExitStatus::Killed, ActorExitPhase::Running));
280304
}
281305
} else {
282306
self.ctx.record_progress();
283307
}
284308
Ok(())
285309
}
286310

287-
async fn process_all_available_messages(&mut self) -> Result<(), ActorExitStatus> {
311+
async fn process_all_available_messages(
312+
&mut self,
313+
) -> Result<(), (ActorExitStatus, ActorExitPhase)> {
288314
self.yield_and_check_if_killed().await?;
289315
let envelope = recv_envelope(&mut self.inbox, &self.ctx).await;
290316
self.process_one_message(envelope).await?;
@@ -304,7 +330,11 @@ impl<A: Actor> ActorExecutionEnv<A> {
304330
break;
305331
}
306332
}
307-
self.actor.get_mut().on_drained_messages(&self.ctx).await?;
333+
self.actor
334+
.get_mut()
335+
.on_drained_messages(&self.ctx)
336+
.await
337+
.map_err(|exit_status| (exit_status, ActorExitPhase::OnDrainedMessaged))?;
308338
}
309339
if self.ctx.mailbox().is_last_mailbox() {
310340
// We double check here that the mailbox does not contain any messages,
@@ -314,8 +344,7 @@ impl<A: Actor> ActorExecutionEnv<A> {
314344
if self.inbox.is_empty() {
315345
// No one will be able to send us more messages.
316346
// We can exit the actor.
317-
info!(actor = self.ctx.actor_instance_id(), "no more messages");
318-
return Err(ActorExitStatus::Success);
347+
return Err((ActorExitStatus::Success, ActorExitPhase::Completed));
319348
}
320349
}
321350

@@ -340,23 +369,6 @@ impl<A: Actor> ActorExecutionEnv<A> {
340369
}
341370
exit_status
342371
}
343-
344-
fn process_exit_status(&self, exit_status: &ActorExitStatus) {
345-
match &exit_status {
346-
ActorExitStatus::Success
347-
| ActorExitStatus::Quit
348-
| ActorExitStatus::DownstreamClosed
349-
| ActorExitStatus::Killed => {}
350-
ActorExitStatus::Failure(err) => {
351-
error!(cause=?err, exit_status=?exit_status, "actor-failure");
352-
}
353-
ActorExitStatus::Panicked => {
354-
error!(exit_status=?exit_status, "actor-failure");
355-
}
356-
}
357-
info!(actor_id = %self.ctx.actor_instance_id(), exit_status = %exit_status, "actor-exit");
358-
self.ctx.exit(exit_status);
359-
}
360372
}
361373

362374
impl<A: Actor> Drop for ActorExecutionEnv<A> {
@@ -382,19 +394,32 @@ async fn actor_loop<A: Actor>(
382394
let initialize_exit_status_res: Result<(), ActorExitStatus> = actor_env.initialize().await;
383395
drop(no_advance_time_guard);
384396

385-
let after_process_exit_status = if let Err(initialize_exit_status) = initialize_exit_status_res
386-
{
387-
// We do not process messages if initialize yield an error.
388-
// We still call finalize however!
389-
initialize_exit_status
390-
} else {
391-
actor_env.process_messages().await
397+
let (after_process_exit_status, exit_phase) =
398+
if let Err(initialize_exit_status) = initialize_exit_status_res {
399+
// We do not process messages if initialize yield an error.
400+
// We still call finalize however!
401+
(initialize_exit_status, ActorExitPhase::Initializing)
402+
} else {
403+
actor_env.process_messages().await
404+
};
405+
406+
let actor_id = actor_env.ctx.actor_instance_id();
407+
match after_process_exit_status {
408+
ActorExitStatus::Success
409+
| ActorExitStatus::Quit
410+
| ActorExitStatus::DownstreamClosed
411+
| ActorExitStatus::Killed => {
412+
info!(actor_id, phase = ?exit_phase, exit_status = ?after_process_exit_status, "actor-exit");
413+
}
414+
ActorExitStatus::Failure(_) | ActorExitStatus::Panicked => {
415+
error!(actor_id, phase = ?exit_phase, exit_status = ?after_process_exit_status, "actor-exit");
416+
}
392417
};
393418

394419
// TODO the no advance time guard for finalize has a race condition. Ideally we would
395420
// like to have the guard before we drop the last envelope.
396421
let final_exit_status = actor_env.finalize(after_process_exit_status).await;
397422
// The last observation is collected on `ActorExecutionEnv::Drop`.
398-
actor_env.process_exit_status(&final_exit_status);
423+
actor_env.ctx.exit(&final_exit_status);
399424
final_exit_status
400425
}

0 commit comments

Comments
 (0)