Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ use crate::protocol::PatchApplyEndEvent;
use crate::protocol::ReviewDecision;
use crate::protocol::SandboxPolicy;
use crate::protocol::SessionConfiguredEvent;
use crate::protocol::StreamRetryEvent;
use crate::protocol::Submission;
use crate::protocol::TaskCompleteEvent;
use crate::protocol::TurnDiffEvent;
Expand Down Expand Up @@ -1501,16 +1502,17 @@ async fn run_turn(
"stream disconnected - retrying turn ({retries}/{max_retries} in {delay:?})...",
);

// Surface retry information to any UI/front‑end so the
// user understands what is happening instead of staring
// at a seemingly frozen screen.
sess.notify_background_event(
&sub_id,
format!(
"stream error: {e}; retrying {retries}/{max_retries} in {delay:?}…"
),
)
.await;
// Surface retry information to any UI/front‑end in a structured way.
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::StreamRetry(StreamRetryEvent {
attempt: retries as u32,
max_attempts: max_retries as u32,
delay,
cause: e.to_string(),
}),
};
let _ = sess.tx_event.send(event).await;

tokio::time::sleep(delay).await;
} else {
Expand Down Expand Up @@ -1739,13 +1741,16 @@ async fn run_compact_task(
if retries < max_retries {
retries += 1;
let delay = backoff(retries);
sess.notify_background_event(
&sub_id,
format!(
"stream error: {e}; retrying {retries}/{max_retries} in {delay:?}…"
),
)
.await;
let event = Event {
id: sub_id.clone(),
msg: EventMsg::StreamRetry(StreamRetryEvent {
attempt: retries as u32,
max_attempts: max_retries as u32,
delay,
cause: e.to_string(),
}),
};
sess.send_event(event).await;
tokio::time::sleep(delay).await;
continue;
} else {
Expand Down
16 changes: 16 additions & 0 deletions codex-rs/exec/src/event_processor_with_human_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::StreamRetryEvent;
use codex_core::protocol::TaskCompleteEvent;
use codex_core::protocol::TurnAbortReason;
use codex_core::protocol::TurnDiffEvent;
Expand Down Expand Up @@ -174,6 +175,21 @@ impl EventProcessor for EventProcessorWithHumanOutput {
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
ts_println!(self, "{}", message.style(self.dimmed));
}
EventMsg::StreamRetry(StreamRetryEvent {
attempt,
max_attempts,
delay,
cause,
}) => {
ts_println!(
self,
"{}",
format!(
"stream error: {cause}; retrying {attempt}/{max_attempts} in {delay:?}…"
)
.style(self.dimmed)
);
}
EventMsg::TaskStarted => {
// Ignore.
}
Expand Down
1 change: 1 addition & 0 deletions codex-rs/mcp-server/src/codex_tool_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ async fn run_codex_tool_session_inner(
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::ExecCommandEnd(_)
| EventMsg::BackgroundEvent(_)
| EventMsg::StreamRetry(_)
| EventMsg::PatchApplyBegin(_)
| EventMsg::PatchApplyEnd(_)
| EventMsg::TurnDiff(_)
Expand Down
11 changes: 11 additions & 0 deletions codex-rs/protocol/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,9 @@ pub enum EventMsg {

ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent),

/// Notification that a streaming response failed transiently and will be retried.
StreamRetry(StreamRetryEvent),

BackgroundEvent(BackgroundEventEvent),

/// Notification that the agent is about to apply a code patch. Mirrors
Expand Down Expand Up @@ -681,6 +684,14 @@ pub struct ApplyPatchApprovalRequestEvent {
pub grant_root: Option<PathBuf>,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct StreamRetryEvent {
pub attempt: u32,
pub max_attempts: u32,
pub delay: Duration,
pub cause: String,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct BackgroundEventEvent {
pub message: String,
Expand Down
21 changes: 21 additions & 0 deletions codex-rs/tui/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use crate::slash_command::SlashCommand;
use crate::tui;
use codex_core::ConversationManager;
use codex_core::config::Config;
use codex_core::protocol::BackgroundEventEvent;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use color_eyre::eyre::Result;
use crossterm::SynchronizedUpdate;
Expand Down Expand Up @@ -274,6 +276,25 @@ impl App<'_> {
}
AppEvent::KeyEvent(key_event) => {
match key_event {
KeyEvent {
code: KeyCode::Char('e'),
modifiers: crossterm::event::KeyModifiers::CONTROL,
kind: KeyEventKind::Press,
..
} => {
let env = std::env::var("MANUALLY_DEBUG_TUI_BACKGROUND_RETRY")
.unwrap_or_default();
if env == "1" {
Comment on lines +284 to +287
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} => {
let env = std::env::var("MANUALLY_DEBUG_TUI_BACKGROUND_RETRY")
.unwrap_or_default();
if env == "1" {
} if std::env::var("MANUALLY_DEBUG_TUI_BACKGROUND_RETRY").is_ok_and(|v| v == "1") => {
// ...

then you don't have to repeat the dispatch_key_event logic.

self.app_event_tx.send(AppEvent::CodexEvent(Event {
id: "manual".to_string(),
msg: EventMsg::BackgroundEvent(BackgroundEventEvent {
message: "stream error: stream disconnected before completion: idle timeout waiting for SSE; retrying 1/5 in 200ms…".to_string(),
}),
}));
} else {
self.dispatch_key_event(key_event);
}
}
KeyEvent {
code: KeyCode::Char('c'),
modifiers: crossterm::event::KeyModifiers::CONTROL,
Expand Down
13 changes: 13 additions & 0 deletions codex-rs/tui/src/chatwidget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use codex_core::protocol::McpToolCallBeginEvent;
use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::Op;
use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::StreamRetryEvent;
use codex_core::protocol::TaskCompleteEvent;
use codex_core::protocol::TokenUsage;
use codex_core::protocol::TurnDiffEvent;
Expand Down Expand Up @@ -297,6 +298,7 @@ impl ChatWidget<'_> {

fn on_background_event(&mut self, message: String) {
debug!("BackgroundEvent: {message}");
self.add_to_history(&history_cell::new_background_event(message));
}
/// Periodic tick to commit at most one queued line to history with a small delay,
/// animating the output.
Expand Down Expand Up @@ -652,6 +654,17 @@ impl ChatWidget<'_> {
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
self.on_background_event(message)
}
EventMsg::StreamRetry(StreamRetryEvent {
attempt,
max_attempts,
delay,
cause,
}) => {
let text = format!(
"stream error: {cause}; retrying {attempt}/{max_attempts} in {delay:?}…"
);
self.on_background_event(text)
}
}
// Coalesce redraws: issue at most one after handling the event
if self.needs_redraw {
Expand Down
116 changes: 115 additions & 1 deletion codex-rs/tui/src/chatwidget/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ fn open_fixture(name: &str) -> std::fs::File {
return f;
}
}
// 2) Fallback to parent (workspace root)
// 2) Fallback to parent (workspace crate root)
{
let mut p = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
p.push("..");
Expand Down Expand Up @@ -979,3 +979,117 @@ fn deltas_then_same_final_message_are_rendered_snapshot() {
.collect::<String>();
assert_snapshot!(combined);
}

#[tokio::test(flavor = "current_thread")]
async fn timeout_session_transcript_shows_background_errors() {
let (mut chat, rx, _op_rx) = make_chatwidget_manual();

// Set up a VT100 test terminal to capture ANSI visual output
let width: u16 = 80;
let height: u16 = 2000;
let viewport = ratatui::layout::Rect::new(0, height - 1, width, 1);
let backend = ratatui::backend::TestBackend::new(width, height);
let mut terminal = crate::custom_terminal::Terminal::with_options(backend)
.expect("failed to construct terminal");
terminal.set_viewport_area(viewport);

// Replay the recorded session into the widget and collect transcript
let file = open_fixture("timeout-session-log.jsonl");
let reader = BufReader::new(file);
let mut ansi: Vec<u8> = Vec::new();

for line in reader.lines() {
let line = line.expect("read line");
if line.trim().is_empty() || line.starts_with('#') {
continue;
}
let Ok(v): Result<serde_json::Value, _> = serde_json::from_str(&line) else {
continue;
};
let Some(dir) = v.get("dir").and_then(|d| d.as_str()) else {
continue;
};
if dir != "to_tui" {
continue;
}
let Some(kind) = v.get("kind").and_then(|k| k.as_str()) else {
continue;
};

match kind {
"codex_event" => {
if let Some(payload) = v.get("payload") {
let ev: Event = serde_json::from_value(payload.clone()).expect("parse");
chat.handle_codex_event(ev);
while let Ok(app_ev) = rx.try_recv() {
if let AppEvent::InsertHistory(lines) = app_ev {
crate::insert_history::insert_history_lines_to_writer(
&mut terminal,
&mut ansi,
lines,
);
}
}
}
}
"app_event" => {
if let Some(variant) = v.get("variant").and_then(|s| s.as_str()) {
if variant == "CommitTick" {
chat.on_commit_tick();
while let Ok(app_ev) = rx.try_recv() {
if let AppEvent::InsertHistory(lines) = app_ev {
crate::insert_history::insert_history_lines_to_writer(
&mut terminal,
&mut ansi,
lines,
);
}
}
}
}
}
_ => {}
}
}

// Build the final VT100 visual by parsing the ANSI stream. Trim trailing spaces per line
// and drop trailing empty lines for stable comparisons.
let mut parser = vt100::Parser::new(height, width, 0);
parser.process(&ansi);
let mut lines: Vec<String> = Vec::with_capacity(height as usize);
for row in 0..height {
let mut s = String::with_capacity(width as usize);
for col in 0..width {
if let Some(cell) = parser.screen().cell(row, col) {
if let Some(ch) = cell.contents().chars().next() {
s.push(ch);
} else {
s.push(' ');
}
} else {
s.push(' ');
}
}
lines.push(s.trim_end().to_string());
}
while lines.last().is_some_and(|l| l.is_empty()) {
lines.pop();
}

let visible_after = lines.join("\n");
let visible_flat = visible_after.replace('\n', " ");

// Assertions: ensure background events are visible and contain timeout info.
assert!(
visible_flat.contains("stream error:"),
"missing 'stream error:' in vt100 output:\n{visible_after}"
);
assert!(
visible_flat.contains("idle timeout waiting for SSE"),
"missing timeout detail in vt100 output:\n{visible_after}"
);
assert!(
visible_flat.contains("retrying 1/"),
"missing retry indicator in vt100 output:\n{visible_after}"
);
}
Loading
Loading