diff --git a/CLAUDE.local.md b/CLAUDE.local.md index 24b1580d7..ea8744d0e 100644 --- a/CLAUDE.local.md +++ b/CLAUDE.local.md @@ -1,5 +1,17 @@ - contract states are commutative monoids, they can be "merged" in any order to arrive at the same result. This may reduce some potential race conditions. +## 📁 Organized Notes System + +All debugging notes have been reorganized into `claude.notes.local/` for better navigation: + +### [→ Go to Main Index](./claude.notes.local/index.md) + +### Quick Links +- **[Current Focus: Connection Stability](./claude.notes.local/active/connection-stability-2025-06-25.md)** +- **[Active Hypotheses](./claude.notes.local/active/hypotheses.md)** +- **[Essential Commands](./claude.notes.local/reference/commands.md)** +- **[Code Locations](./claude.notes.local/reference/code-locations.md)** + ## Important Testing Notes ### Always Use Network Mode for Testing @@ -7,69 +19,10 @@ - Local mode bypasses critical networking components that need to be tested - Always test with `freenet network` to ensure realistic behavior -## Quick Reference - Essential Commands - -### River Development -```bash -# Publish River (use this, not custom scripts) -cd ~/code/freenet/river && RUST_MIN_STACK=16777216 cargo make publish-river-debug - -# Verify River build time (CRITICAL - only way to confirm new version is served) -curl -s http://127.0.0.1:50509/v1/contract/web/BcfxyjCH4snaknrBoCiqhYc9UFvmiJvhsp5d4L5DuvRa/ | grep -o 'Built: [^<]*' | head -1 -``` - -### Freenet Management -```bash -# Start Freenet -./target/release/freenet network > freenet-debug.log 2>&1 & - -# Check status -ps aux | grep freenet | grep -v grep | grep -v tail | grep -v journalctl - -# Monitor logs -tail -f freenet-debug.log -``` - -## Detailed Documentation Files - -### Current Active Debugging -- **Directory**: `freenet-invitation-bug.local/` (consolidated debugging) - - `README.md` - Overview and quick commands - - `river-notes/` - River-specific debugging documentation - - `contract-test/` - Minimal Rust test to reproduce PUT/GET issue - -### River Invitation Bug (2025-01-18) -- **Status**: CONFIRMED - Contract operations hang on live network, work in integration tests -- **Root Cause**: Freenet node receives WebSocket requests but never responds -- **Test Directory**: `freenet-invitation-bug.local/live-network-test/` -- **Confirmed Findings**: - - River correctly sends PUT/GET requests via WebSocket - - Raw WebSocket test: Receives binary error response from server - - freenet-stdlib test: GET request sent but never receives response (2min timeout) - - Integration test `test_put_contract` passes when run in isolation - - Issue affects both PUT and GET operations -- **Current Investigation**: Systematically debugging why Freenet node doesn't respond to contract operations -- **See**: `freenet-invitation-bug.local/river-notes/invitation-bug-analysis-update.md` - -### Historical Analysis (Reference Only) -- **Transport Layer Issues**: See lines 3-145 in previous version of this file (archived) -- **River Testing Procedures**: See lines 97-145 in previous version of this file (archived) - -### CI Tools -- **GitHub CI Monitoring**: `~/code/agent.scripts/wait-for-ci.sh [PR_NUMBER]` - -### Testing Tools -- **Puppeteer Testing Guide**: `puppeteer-testing-guide.local.md` - Essential patterns for testing Dioxus apps with MCP Puppeteer tools -- **Playwright Notes**: `playwright-notes.local.md` - Learnings and patterns for testing River with Playwright MCP tools - -## Key Code Locations -- **River Room Creation**: `/home/ian/code/freenet/river/ui/src/components/room_list/create_room_modal.rs` -- **River Room Synchronizer**: `/home/ian/code/freenet/river/ui/src/components/app/freenet_api/room_synchronizer.rs` -- **River Room Data**: `/home/ian/code/freenet/river/ui/src/room_data.rs` +## Current Priority (2025-06-25) -## Organization Rules -1. **Check this file first** for command reference and active debugging directories -2. **Use standard commands** instead of creating custom scripts -3. **Verify River build timestamps** after publishing -4. **Create timestamped .local directories** for complex debugging sessions -5. **Update this index** when adding new debugging directories or tools \ No newline at end of file +Per discussion with Nacho: +1. **Focus**: Connection stability to gateways +2. **Goal**: 10+ minute stable connections +3. **Method**: Create minimal transport test binary +4. **Avoid**: Getting distracted by River/contract issues \ No newline at end of file diff --git a/apps/freenet-ping/Cargo.lock b/apps/freenet-ping/Cargo.lock index 49de44b5c..e19999a41 100644 --- a/apps/freenet-ping/Cargo.lock +++ b/apps/freenet-ping/Cargo.lock @@ -1292,7 +1292,7 @@ dependencies = [ "directories", "either", "flatbuffers", - "freenet-stdlib 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", + "freenet-stdlib", "futures", "headers", "hickory-resolver", @@ -1344,15 +1344,6 @@ dependencies = [ "syn 2.0.104", ] -[[package]] -name = "freenet-macros" -version = "0.1.2" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.104", -] - [[package]] name = "freenet-ping-app" version = "0.1.11" @@ -1362,7 +1353,7 @@ dependencies = [ "clap", "freenet", "freenet-ping-types", - "freenet-stdlib 0.1.11", + "freenet-stdlib", "futures", "humantime", "once_cell", @@ -1382,7 +1373,7 @@ name = "freenet-ping-contract" version = "0.1.11" dependencies = [ "freenet-ping-types", - "freenet-stdlib 0.1.11", + "freenet-stdlib", "serde_json", ] @@ -1392,42 +1383,12 @@ version = "0.1.11" dependencies = [ "chrono", "clap", - "freenet-stdlib 0.1.11", + "freenet-stdlib", "humantime", "humantime-serde", "serde", ] -[[package]] -name = "freenet-stdlib" -version = "0.1.11" -dependencies = [ - "arbitrary", - "bincode", - "blake3", - "bs58", - "byteorder", - "chrono", - "flatbuffers", - "freenet-macros 0.1.2", - "futures", - "js-sys", - "once_cell", - "semver", - "serde", - "serde-wasm-bindgen 0.6.5", - "serde_bytes", - "serde_json", - "serde_with", - "thiserror 1.0.69", - "tokio", - "tokio-tungstenite 0.26.2", - "tracing", - "tracing-subscriber", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "freenet-stdlib" version = "0.1.11" @@ -1441,7 +1402,7 @@ dependencies = [ "byteorder", "chrono", "flatbuffers", - "freenet-macros 0.1.1", + "freenet-macros", "futures", "js-sys", "once_cell", diff --git a/apps/freenet-ping/app/src/ping_client.rs b/apps/freenet-ping/app/src/ping_client.rs index 3e3cc6e20..60e37f6ec 100644 --- a/apps/freenet-ping/app/src/ping_client.rs +++ b/apps/freenet-ping/app/src/ping_client.rs @@ -169,7 +169,7 @@ pub async fn wait_for_subscribe_response( pub async fn connect_to_host( host: &str, ) -> Result> { - let uri = format!("ws://{}/v1/contract/command?encodingProtocol=native", host); + let uri = format!("ws://{host}/v1/contract/command?encodingProtocol=native"); let (stream, _resp) = tokio_tungstenite::connect_async(&uri).await.map_err(|e| { tracing::error!(err=%e); e diff --git a/apps/freenet-ping/app/tests/common/mod.rs b/apps/freenet-ping/app/tests/common/mod.rs index ec63ad34c..962bdbffb 100644 --- a/apps/freenet-ping/app/tests/common/mod.rs +++ b/apps/freenet-ping/app/tests/common/mod.rs @@ -174,10 +174,7 @@ const WASM_FILE_NAME: &str = "freenet-ping-contract"; pub const APP_TAG: &str = "ping-app"; pub async fn connect_ws_client(ws_port: u16) -> Result { - let uri = format!( - "ws://127.0.0.1:{}/v1/contract/command?encodingProtocol=native", - ws_port - ); + let uri = format!("ws://127.0.0.1:{ws_port}/v1/contract/command?encodingProtocol=native"); let (stream, _) = connect_async(&uri).await?; Ok(WebApi::start(stream)) } @@ -334,7 +331,7 @@ pub(crate) fn pipe_std_streams(mut child: Child) -> anyhow::Result<()> { let reader = std::io::BufReader::new(c_stderr); for line in reader.lines() { let line = line?; - writeln!(stderr, "{}", line)?; + writeln!(stderr, "{line}")?; } Ok(()) }; @@ -345,7 +342,7 @@ pub(crate) fn pipe_std_streams(mut child: Child) -> anyhow::Result<()> { let reader = std::io::BufReader::new(c_stdout); for line in reader.lines() { let line = line?; - writeln!(stdout, "{}", line)?; + writeln!(stdout, "{line}")?; } Ok(()) }; diff --git a/apps/freenet-ping/app/tests/run_app_blocked_peers.rs b/apps/freenet-ping/app/tests/run_app_blocked_peers.rs index 20e5f6876..f8c1d5452 100644 --- a/apps/freenet-ping/app/tests/run_app_blocked_peers.rs +++ b/apps/freenet-ping/app/tests/run_app_blocked_peers.rs @@ -223,17 +223,13 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { tokio::time::sleep(config.initial_wait).await; // Connect to nodes via WebSocket - let uri_gw = format!( - "ws://127.0.0.1:{}/v1/contract/command?encodingProtocol=native", - ws_api_port_gw - ); + let uri_gw = + format!("ws://127.0.0.1:{ws_api_port_gw}/v1/contract/command?encodingProtocol=native"); let uri_node1 = format!( - "ws://127.0.0.1:{}/v1/contract/command?encodingProtocol=native", - ws_api_port_node1 + "ws://127.0.0.1:{ws_api_port_node1}/v1/contract/command?encodingProtocol=native" ); let uri_node2 = format!( - "ws://127.0.0.1:{}/v1/contract/command?encodingProtocol=native", - ws_api_port_node2 + "ws://127.0.0.1:{ws_api_port_node2}/v1/contract/command?encodingProtocol=native" ); tracing::info!("Connecting to Gateway at {}", uri_gw); @@ -496,7 +492,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { ); let mut gw_ping_refresh = Ping::default(); - let gw_refresh_tag = format!("{}-refresh", gw_tag); + let gw_refresh_tag = format!("{gw_tag}-refresh"); gw_ping_refresh.insert(gw_refresh_tag.clone()); tracing::info!("Gateway sending refresh update: {}", gw_refresh_tag); client_gw @@ -509,7 +505,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { .await?; let mut node1_ping_refresh = Ping::default(); - let node1_refresh_tag = format!("{}-refresh", node1_tag); + let node1_refresh_tag = format!("{node1_tag}-refresh"); node1_ping_refresh.insert(node1_refresh_tag.clone()); tracing::info!("Node1 sending refresh update: {}", node1_refresh_tag); client_node1 @@ -522,7 +518,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { .await?; let mut node2_ping_refresh = Ping::default(); - let node2_refresh_tag = format!("{}-refresh", node2_tag); + let node2_refresh_tag = format!("{node2_tag}-refresh"); node2_ping_refresh.insert(node2_refresh_tag.clone()); tracing::info!("Node2 sending refresh update: {}", node2_refresh_tag); client_node2 @@ -540,7 +536,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { for i in 1..=config.update_rounds { // Gateway update let mut gw_ping_refresh = Ping::default(); - let gw_refresh_tag = format!("{}-round{}", gw_tag, i); + let gw_refresh_tag = format!("{gw_tag}-round{i}"); gw_ping_refresh.insert(gw_refresh_tag.clone()); tracing::info!("Gateway sending round {} update: {}", i, gw_refresh_tag); client_gw @@ -554,7 +550,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { // Node1 update let mut node1_ping_refresh = Ping::default(); - let node1_refresh_tag = format!("{}-round{}", node1_tag, i); + let node1_refresh_tag = format!("{node1_tag}-round{i}"); node1_ping_refresh.insert(node1_refresh_tag.clone()); tracing::info!("Node1 sending round {} update: {}", i, node1_refresh_tag); client_node1 @@ -568,7 +564,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { // Node2 update let mut node2_ping_refresh = Ping::default(); - let node2_refresh_tag = format!("{}-round{}", node2_tag, i); + let node2_refresh_tag = format!("{node2_tag}-round{i}"); node2_ping_refresh.insert(node2_refresh_tag.clone()); tracing::info!("Node2 sending round {} update: {}", i, node2_refresh_tag); client_node2 @@ -631,7 +627,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { // Gateway final update let mut gw_ping_final = Ping::default(); - let gw_final_tag = format!("{}-final", gw_tag); + let gw_final_tag = format!("{gw_tag}-final"); gw_ping_final.insert(gw_final_tag.clone()); tracing::info!("Gateway sending final update: {}", gw_final_tag); client_gw @@ -645,7 +641,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { // Node1 final update let mut node1_ping_final = Ping::default(); - let node1_final_tag = format!("{}-final", node1_tag); + let node1_final_tag = format!("{node1_tag}-final"); node1_ping_final.insert(node1_final_tag.clone()); tracing::info!("Node1 sending final update: {}", node1_final_tag); client_node1 @@ -659,7 +655,7 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult { // Node2 final update let mut node2_ping_final = Ping::default(); - let node2_final_tag = format!("{}-final", node2_tag); + let node2_final_tag = format!("{node2_tag}-final"); node2_ping_final.insert(node2_final_tag.clone()); tracing::info!("Node2 sending final update: {}", node2_final_tag); client_node2 diff --git a/apps/freenet-ping/types/src/lib.rs b/apps/freenet-ping/types/src/lib.rs index 4fdca297b..a300045d7 100644 --- a/apps/freenet-ping/types/src/lib.rs +++ b/apps/freenet-ping/types/src/lib.rs @@ -501,22 +501,19 @@ mod tests { // Since TTL is 25s, all ping3 entries (4) and most ping2 entries should be included assert_eq!( ping3_count, 4, - "Expected all 4 entries from ping3 (newest), but found {}", - ping3_count + "Expected all 4 entries from ping3 (newest), but found {ping3_count}" ); // Check that we have at least 3 entries from ping2 assert!( ping2_count >= 3, - "Expected at least 3 entries from ping2 (middle), but found {}", - ping2_count + "Expected at least 3 entries from ping2 (middle), but found {ping2_count}" ); // Due to TTL, we expect at most 3 entries from ping1 assert!( ping1_count <= 3, - "Expected at most 3 entries from ping1 (oldest), but got {}", - ping1_count + "Expected at most 3 entries from ping1 (oldest), but got {ping1_count}" ); // Verify total count matches what we found diff --git a/arm-build/arm-freenet.zip b/arm-build/arm-freenet.zip new file mode 100644 index 000000000..9bbbc9f5d Binary files /dev/null and b/arm-build/arm-freenet.zip differ diff --git a/arm-build/freenet b/arm-build/freenet new file mode 100755 index 000000000..ae1faff86 Binary files /dev/null and b/arm-build/freenet differ diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 003edde70..4ef238f1e 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -12,6 +12,10 @@ repository = "https://github.com/freenet/freenet-core" name = "freenet" path = "src/bin/freenet.rs" +[[bin]] +name = "transport-test" +path = "src/bin/transport_test.rs" + [dependencies] ahash = "0.8" anyhow = "1" diff --git a/crates/core/src/bin/handshake_test.rs b/crates/core/src/bin/handshake_test.rs new file mode 100644 index 000000000..95b5395e2 --- /dev/null +++ b/crates/core/src/bin/handshake_test.rs @@ -0,0 +1,210 @@ +#!/usr/bin/env rust +//! Minimal handshake test to isolate where Freenet handshake protocol fails +//! +//! This binary progressively tests handshake steps: +//! 1. Send basic UDP packet (like transport-test) +//! 2. Send packet with Freenet header structure +//! 3. Send actual initial handshake packet +//! +//! Usage: +//! handshake-test --remote 136.62.52.28:31337 # Test against ziggy +//! handshake-test --remote 100.27.151.80:31337 # Test against vega + +use clap::Parser; +use std::net::SocketAddr; +use std::time::{Duration, Instant}; +use tokio::net::UdpSocket; +use tokio::time::{sleep, timeout}; +use tracing::{debug, error, info, warn}; + +#[derive(Parser)] +struct Args { + /// Remote gateway address to test against + #[clap(long)] + remote: SocketAddr, + + /// Local address to bind to + #[clap(long, default_value = "0.0.0.0:0")] + local: SocketAddr, + + /// Enable verbose logging + #[clap(long)] + verbose: bool, +} + +async fn test_basic_udp(socket: &UdpSocket, remote: SocketAddr) -> anyhow::Result<()> { + info!("Phase 1: Testing basic UDP connectivity"); + + let test_packet = b"HANDSHAKE_TEST_BASIC_UDP"; + + for i in 1..=3 { + debug!("Sending basic UDP packet #{}", i); + + let start = Instant::now(); + socket.send_to(test_packet, remote).await?; + + // Try to receive any response (with short timeout) + match timeout( + Duration::from_millis(1000), + socket.recv_from(&mut [0u8; 1024]), + ) + .await + { + Ok(Ok((len, addr))) => { + info!( + "✓ Received response from {}: {} bytes (took {:?})", + addr, + len, + start.elapsed() + ); + } + Ok(Err(e)) => { + warn!("✗ UDP error: {}", e); + } + Err(_) => { + debug!("✗ No response to basic UDP packet #{} (expected)", i); + } + } + + sleep(Duration::from_millis(500)).await; + } + + info!("Phase 1 complete: Basic UDP connectivity tested"); + Ok(()) +} + +async fn test_freenet_header(socket: &UdpSocket, remote: SocketAddr) -> anyhow::Result<()> { + info!("Phase 2: Testing packet with Freenet-like header"); + + // Create a packet that looks somewhat like a Freenet packet structure + // but is clearly a test packet + let mut test_packet = Vec::new(); + test_packet.extend_from_slice(b"FNET"); // Fake Freenet magic + test_packet.extend_from_slice(&1u32.to_be_bytes()); // Version + test_packet.extend_from_slice(&0u32.to_be_bytes()); // Test packet type + test_packet.extend_from_slice(b"HANDSHAKE_TEST_HEADER"); // Payload + + for i in 1..=3 { + debug!("Sending Freenet-header test packet #{}", i); + + let start = Instant::now(); + socket.send_to(&test_packet, remote).await?; + + // Try to receive any response + match timeout( + Duration::from_millis(2000), + socket.recv_from(&mut [0u8; 1024]), + ) + .await + { + Ok(Ok((len, addr))) => { + info!( + "✓ Received response from {}: {} bytes (took {:?})", + addr, + len, + start.elapsed() + ); + return Ok(()); // Got a response, this is interesting! + } + Ok(Err(e)) => { + warn!("✗ UDP error: {}", e); + } + Err(_) => { + debug!("✗ No response to header test packet #{}", i); + } + } + + sleep(Duration::from_millis(500)).await; + } + + info!("Phase 2 complete: Freenet header test (no responses expected)"); + Ok(()) +} + +async fn test_connection_attempt_logging( + socket: &UdpSocket, + remote: SocketAddr, +) -> anyhow::Result<()> { + info!("Phase 3: Testing rapid connection attempts (to trigger handshake errors)"); + + // Send multiple "connection-like" packets quickly to see if we can trigger + // the same handshake errors we see in the full Freenet logs + let test_packet = b"FREENET_CONNECTION_TEST_RAPID_ATTEMPTS"; + + let start_time = Instant::now(); + for i in 1..=10 { + debug!("Sending rapid connection attempt #{}", i); + socket.send_to(test_packet, remote).await?; + + // Very short delay to simulate rapid connection attempts + sleep(Duration::from_millis(100)).await; + } + + // Wait a bit to see if any delayed responses come in + info!("Waiting for any delayed responses..."); + for _ in 0..5 { + match timeout( + Duration::from_millis(1000), + socket.recv_from(&mut [0u8; 1024]), + ) + .await + { + Ok(Ok((len, addr))) => { + info!( + "✓ Delayed response from {}: {} bytes (total time: {:?})", + addr, + len, + start_time.elapsed() + ); + } + Ok(Err(e)) => { + warn!("✗ UDP error: {}", e); + break; + } + Err(_) => { + debug!("No delayed response"); + break; + } + } + } + + info!("Phase 3 complete: Rapid connection attempts tested"); + Ok(()) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args = Args::parse(); + + // Initialize logging + let log_level = if args.verbose { + tracing::Level::DEBUG + } else { + tracing::Level::INFO + }; + + tracing_subscriber::fmt() + .with_max_level(log_level) + .with_target(false) + .with_thread_ids(true) + .init(); + + info!("Starting handshake test"); + info!("Local bind: {}", args.local); + info!("Remote target: {}", args.remote); + + // Bind UDP socket + let socket = UdpSocket::bind(args.local).await?; + let local_addr = socket.local_addr()?; + info!("UDP socket bound successfully to {}", local_addr); + + // Run test phases + test_basic_udp(&socket, args.remote).await?; + test_freenet_header(&socket, args.remote).await?; + test_connection_attempt_logging(&socket, args.remote).await?; + + info!("All handshake test phases complete"); + info!("Next step: Monitor gateway logs for any evidence of receiving our test packets"); + + Ok(()) +} diff --git a/crates/core/src/bin/transport_test.rs b/crates/core/src/bin/transport_test.rs new file mode 100644 index 000000000..b01d746b7 --- /dev/null +++ b/crates/core/src/bin/transport_test.rs @@ -0,0 +1,288 @@ +//! Minimal transport test for debugging connection stability +//! +//! This creates a simple UDP client/server to test connection patterns +//! similar to Freenet's transport layer, focusing on reproducing the +//! 30-second timeout issue. + +use std::net::SocketAddr; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use clap::Parser; +use tokio::net::UdpSocket; +use tokio::time::interval; +use tracing::{debug, error, info, warn}; + +#[derive(Parser, Debug)] +#[command(name = "transport-test")] +#[command(about = "Minimal transport layer test for debugging connection stability")] +struct Args { + /// Run as server (listener) or client (connector) + #[arg(long)] + server: bool, + + /// Address to bind to (server) or connect to (client) + #[arg(long, default_value = "0.0.0.0:31337")] + address: String, + + /// For client mode: remote server address + #[arg(long)] + remote: Option, + + /// Keep-alive interval in seconds + #[arg(long, default_value = "10")] + keepalive: u64, + + /// Enable verbose logging + #[arg(long)] + verbose: bool, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args = Args::parse(); + + // Initialize logging + let filter = if args.verbose { + "transport_test=trace,freenet=trace" + } else { + "transport_test=info,freenet=info" + }; + + tracing_subscriber::fmt() + .with_env_filter(filter) + .with_target(false) + .with_thread_ids(true) + .with_timer(tracing_subscriber::fmt::time::uptime()) + .init(); + + let addr: SocketAddr = args.address.parse()?; + + if args.server { + run_server(addr, args.keepalive).await + } else { + let remote = args + .remote + .ok_or_else(|| anyhow::anyhow!("--remote required in client mode"))?; + let remote_addr: SocketAddr = remote.parse()?; + run_client(addr, remote_addr, args.keepalive).await + } +} + +async fn run_server(bind_addr: SocketAddr, keepalive_interval: u64) -> anyhow::Result<()> { + info!("Starting transport test SERVER on {}", bind_addr); + info!("Keep-alive interval: {}s", keepalive_interval); + + let socket = Arc::new(UdpSocket::bind(bind_addr).await?); + info!("UDP socket bound successfully"); + + // Track connection state + let mut clients = std::collections::HashMap::new(); + let start_time = Instant::now(); + + // Stats + let messages_received = Arc::new(AtomicU64::new(0)); + + // Buffer for receiving + let mut buf = vec![0u8; 65536]; + + loop { + match socket.recv_from(&mut buf).await { + Ok((len, from)) => { + let msg_count = messages_received.fetch_add(1, Ordering::Relaxed) + 1; + let now = Instant::now(); + let uptime = start_time.elapsed().as_secs(); + + // Parse message type + if len >= 4 { + let msg_type = &buf[0..4]; + match msg_type { + b"KEEP" => { + // Track last keep-alive time + let last_seen = clients.entry(from).or_insert(now); + let since_last = now.duration_since(*last_seen).as_secs_f64(); + *last_seen = now; + + info!( + "Keep-alive from {} | Time since last: {:.1}s | Total msgs: {} | Uptime: {}s", + from, since_last, msg_count, uptime + ); + + // Echo back + if let Err(e) = socket.send_to(b"ACKK", from).await { + error!("Failed to send keep-alive ACK: {}", e); + } + } + b"DATA" => { + info!( + "Data message from {} | {} bytes | Uptime: {}s", + from, len, uptime + ); + + // Echo back + if let Err(e) = socket.send_to(b"ACKD", from).await { + error!("Failed to send data ACK: {}", e); + } + } + _ => { + warn!( + "Unknown message type from {}: {:?}", + from, + &buf[0..4.min(len)] + ); + } + } + } + + // Check for timed-out clients (simulate 30s timeout) + let timeout_threshold = Duration::from_secs(30); + clients.retain(|addr, last_seen| { + let elapsed = now.duration_since(*last_seen); + if elapsed > timeout_threshold { + warn!( + "Client {} timed out (no message for {}s)", + addr, + elapsed.as_secs() + ); + false + } else { + true + } + }); + } + Err(e) => { + error!("Error receiving packet: {}", e); + } + } + } +} + +async fn run_client( + bind_addr: SocketAddr, + remote_addr: SocketAddr, + keepalive_interval: u64, +) -> anyhow::Result<()> { + info!("Starting transport test CLIENT"); + info!("Local bind: {}", bind_addr); + info!("Remote server: {}", remote_addr); + info!("Keep-alive interval: {}s", keepalive_interval); + + let socket = Arc::new(UdpSocket::bind(bind_addr).await?); + info!("UDP socket bound successfully"); + + // Connection state + let connected = Arc::new(AtomicBool::new(false)); + let last_received = Arc::new(parking_lot::Mutex::new(Instant::now())); + let start_time = Instant::now(); + + // Stats + let keepalives_sent = Arc::new(AtomicU64::new(0)); + let keepalives_acked = Arc::new(AtomicU64::new(0)); + + // Keep-alive task + let socket_send = socket.clone(); + let ka_sent = keepalives_sent.clone(); + let ka_task = tokio::spawn(async move { + let mut ticker = interval(Duration::from_secs(keepalive_interval)); + + loop { + ticker.tick().await; + + let count = ka_sent.fetch_add(1, Ordering::Relaxed) + 1; + let uptime = start_time.elapsed().as_secs(); + + info!("Sending keep-alive #{} | Uptime: {}s", count, uptime); + + match socket_send.send_to(b"KEEP", remote_addr).await { + Ok(_) => { + debug!("Keep-alive sent successfully"); + } + Err(e) => { + error!("Failed to send keep-alive: {}", e); + } + } + } + }); + + // Receive task + let socket_recv = socket.clone(); + let last_received_clone = last_received.clone(); + let connected_clone = connected.clone(); + let recv_task = tokio::spawn(async move { + let mut buf = vec![0u8; 65536]; + + loop { + match socket_recv.recv_from(&mut buf).await { + Ok((len, from)) => { + if from != remote_addr { + warn!("Received packet from unexpected source: {}", from); + continue; + } + + let now = Instant::now(); + *last_received_clone.lock() = now; + + if !connected_clone.load(Ordering::Relaxed) { + connected_clone.store(true, Ordering::Relaxed); + info!("Connection established with {}", from); + } + + if len >= 4 { + let msg_type = &buf[0..4]; + match msg_type { + b"ACKK" => { + let acked = keepalives_acked.fetch_add(1, Ordering::Relaxed) + 1; + let sent = keepalives_sent.load(Ordering::Relaxed); + let uptime = start_time.elapsed().as_secs(); + + info!( + "Keep-alive ACK received | Sent: {} Acked: {} | Uptime: {}s", + sent, acked, uptime + ); + } + b"ACKD" => { + debug!("Data ACK received"); + } + _ => { + warn!("Unknown message type: {:?}", &buf[0..4.min(len)]); + } + } + } + } + Err(e) => { + error!("Error receiving packet: {}", e); + } + } + } + }); + + // Monitor task - check for timeout + let monitor_task = tokio::spawn(async move { + let mut ticker = interval(Duration::from_secs(1)); + + loop { + ticker.tick().await; + + if connected.load(Ordering::Relaxed) { + let elapsed = last_received.lock().elapsed(); + if elapsed > Duration::from_secs(30) { + error!( + "CONNECTION TIMEOUT: No packet received for {}s (30s threshold exceeded)", + elapsed.as_secs() + ); + connected.store(false, Ordering::Relaxed); + } + } + } + }); + + // Wait for tasks + tokio::select! { + _ = ka_task => warn!("Keep-alive task ended"), + _ = recv_task => warn!("Receive task ended"), + _ = monitor_task => warn!("Monitor task ended"), + } + + Ok(()) +} diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs index 2c09d9265..fe7679a47 100644 --- a/crates/core/src/config/mod.rs +++ b/crates/core/src/config/mod.rs @@ -181,7 +181,7 @@ impl ConfigArgs { } ext => Err(std::io::Error::new( std::io::ErrorKind::InvalidInput, - format!("Invalid configuration file extension: {}", ext), + format!("Invalid configuration file extension: {ext}"), )), } } diff --git a/crates/core/src/contract/handler.rs b/crates/core/src/contract/handler.rs index 783a67075..6c5b5c42e 100644 --- a/crates/core/src/contract/handler.rs +++ b/crates/core/src/contract/handler.rs @@ -390,7 +390,7 @@ impl std::fmt::Display for ContractHandlerEvent { let mut params = String::new(); params.push_str("0x"); for b in contract.params().as_ref().iter().take(8) { - write!(&mut params, "{:02x}", b)?; + write!(&mut params, "{b:02x}")?; } params.push_str("..."); write!(f, "put query {{ {key}, params: {params} }}",) diff --git a/crates/core/src/contract/mod.rs b/crates/core/src/contract/mod.rs index 5e541e7d9..46db75d20 100644 --- a/crates/core/src/contract/mod.rs +++ b/crates/core/src/contract/mod.rs @@ -37,6 +37,9 @@ where key, return_contract_code, } => { + let start = std::time::Instant::now(); + tracing::info!(%key, %return_contract_code, "Starting contract GET execution"); + match contract_handler .executor() .fetch_contract(key, return_contract_code) @@ -44,6 +47,13 @@ where .await { Ok((state, contract)) => { + let elapsed = start.elapsed(); + if elapsed > std::time::Duration::from_millis(10) { + tracing::warn!(%key, elapsed_ms = elapsed.as_millis(), "SLOW contract GET execution blocked message pipeline!"); + } else { + tracing::info!(%key, elapsed_ms = elapsed.as_millis(), "Contract GET execution completed"); + } + tracing::debug!(with_contract_code = %return_contract_code, has_contract = %contract.is_some(), "Fetched contract {key}"); contract_handler .channel() @@ -88,6 +98,9 @@ where related_contracts, contract, } => { + let start = std::time::Instant::now(); + tracing::info!(%key, "Starting contract PUT execution"); + let put_result = contract_handler .executor() .upsert_contract_state( @@ -99,6 +112,13 @@ where .instrument(tracing::info_span!("upsert_contract_state", %key)) .await; + let elapsed = start.elapsed(); + if elapsed > std::time::Duration::from_millis(10) { + tracing::warn!(%key, elapsed_ms = elapsed.as_millis(), "SLOW contract PUT execution blocked message pipeline!"); + } else { + tracing::info!(%key, elapsed_ms = elapsed.as_millis(), "Contract PUT execution completed"); + } + let event_result = match put_result { Ok(UpsertResult::NoChange) => ContractHandlerEvent::PutResponse { new_value: Ok(state), diff --git a/crates/core/src/message.rs b/crates/core/src/message.rs index 1c95f77a2..9aaa2db8e 100644 --- a/crates/core/src/message.rs +++ b/crates/core/src/message.rs @@ -386,7 +386,7 @@ impl Display for NodeEvent { write!(f, "QueryNodeDiagnostics") } NodeEvent::TransactionTimedOut(transaction) => { - write!(f, "Transaction timed out ({})", transaction) + write!(f, "Transaction timed out ({transaction})") } } } @@ -463,7 +463,7 @@ impl Display for NetMessage { Update(msg) => msg.fmt(f)?, Aborted(msg) => msg.fmt(f)?, Unsubscribed { key, from, .. } => { - write!(f, "Unsubscribed {{ key: {}, from: {} }}", key, from)?; + write!(f, "Unsubscribed {{ key: {key}, from: {from} }}")?; } }, }; diff --git a/crates/core/src/node/mod.rs b/crates/core/src/node/mod.rs index 4395ef892..88b2aba1e 100644 --- a/crates/core/src/node/mod.rs +++ b/crates/core/src/node/mod.rs @@ -229,7 +229,7 @@ impl NodeConfig { let hostname = if hostname.ends_with('.') { hostname } else { - Cow::Owned(format!("{}.", hostname)) + Cow::Owned(format!("{hostname}.")) }; let ips = resolver.lookup_ip(hostname.as_ref()).await?; @@ -592,9 +592,23 @@ async fn process_message_v1( .await; } NetMessageV1::Put(ref op) => { + let put_start = std::time::Instant::now(); + tracing::info!( + op_id = %op.id(), + "PUT_START: Beginning PUT operation" + ); + let op_result = handle_op_request::(&op_manager, &mut conn_manager, op).await; + let elapsed = put_start.elapsed(); + tracing::info!( + op_id = %op.id(), + elapsed_ms = elapsed.as_millis(), + completed = is_operation_completed(&op_result), + "PUT_END: Completed PUT operation" + ); + if is_operation_completed(&op_result) { if let Some(ref op_execution_callback) = pending_op_result { let tx_id = *op.id(); diff --git a/crates/core/src/node/network_bridge/handshake.rs b/crates/core/src/node/network_bridge/handshake.rs index a2a441ce6..4db65634f 100644 --- a/crates/core/src/node/network_bridge/handshake.rs +++ b/crates/core/src/node/network_bridge/handshake.rs @@ -279,7 +279,12 @@ impl HandshakeHandler { Ok(Event::OutboundConnectionSuccessful { peer_id, connection }) } Some(Ok(InternalEvent::OutboundGwConnEstablished(id, connection))) => { - tracing::info!(at=?connection.my_address(), from=%connection.remote_addr(), "Outbound gateway connection successful"); + tracing::info!( + at=?connection.my_address(), + from=%connection.remote_addr(), + peer_id=%id.addr, + "Outbound gateway connection successful - peer_id address vs connection address" + ); if let Some(addr) = connection.my_address() { tracing::debug!(%addr, "Attempting setting own peer key"); self.connection_manager.try_set_peer_key(addr); @@ -289,11 +294,22 @@ impl HandshakeHandler { } } tracing::debug!(at=?connection.my_address(), from=%connection.remote_addr(), "Outbound connection to gw successful"); + tracing::info!( + at=?connection.my_address(), + from=%connection.remote_addr(), + "HANDSHAKE_WAIT_GW: Starting wait_for_gw_confirmation" + ); self.wait_for_gw_confirmation(id, connection, Ring::DEFAULT_MAX_HOPS_TO_LIVE).await?; continue; } Some(Ok(InternalEvent::FinishedOutboundConnProcess(tracker))) => { self.connecting.remove(&tracker.gw_peer.peer.addr); + tracing::warn!( + "CONNECTING_MAP: Removed entry for {} (FinishedOutboundConnProcess), map now has {} entries: {:?}", + tracker.gw_peer.peer.addr, + self.connecting.len(), + self.connecting.keys().collect::>() + ); // at this point we are done checking all the accepts inbound from a transient gw conn tracing::debug!(at=?tracker.gw_conn.my_address(), gw=%tracker.gw_conn.remote_addr(), "Done checking, connection not accepted by gw, dropping connection"); Ok(Event::OutboundGatewayConnectionRejected { peer_id: tracker.gw_peer.peer }) @@ -302,6 +318,12 @@ impl HandshakeHandler { tracing::debug!(at=?tracker.gw_conn.my_address(), from=%tracker.gw_conn.remote_addr(), "Outbound connection to gw confirmed"); self.connected.insert(tracker.gw_conn.remote_addr()); self.connecting.remove(&tracker.gw_conn.remote_addr()); + tracing::info!( + "CONNECTING_MAP: Removed entry for {} (OutboundGwConnConfirmed), map now has {} entries: {:?}", + tracker.gw_conn.remote_addr(), + self.connecting.len(), + self.connecting.keys().collect::>() + ); return Ok(Event::OutboundGatewayConnectionSuccessful { peer_id: tracker.gw_peer.peer, connection: tracker.gw_conn, @@ -333,12 +355,40 @@ impl HandshakeHandler { } Some(Ok(InternalEvent::DropInboundConnection(addr))) => { self.connecting.remove(&addr); + tracing::warn!( + "CONNECTING_MAP: Removed entry for {} (DropInboundConnection), map now has {} entries: {:?}", + addr, + self.connecting.len(), + self.connecting.keys().collect::>() + ); self.outbound_messages.remove(&addr); continue; } Some(Err((peer_id, error))) => { tracing::debug!(from=%peer_id.addr, "Outbound connection failed: {error}"); - self.connecting.remove(&peer_id.addr); + + // Only remove from connecting map if this isn't a duplicate connection attempt + // Otherwise we might remove the entry for a connection that's still being established + let should_remove = !matches!(&error, HandshakeError::TransportError(e) + if e.to_string().contains("connection attempt already in progress")); + + if should_remove { + self.connecting.remove(&peer_id.addr); + tracing::warn!( + "CONNECTING_MAP: Removed entry for {} (OutboundConnectionFailed: {}), map now has {} entries: {:?}", + peer_id.addr, + error, + self.connecting.len(), + self.connecting.keys().collect::>() + ); + } else { + tracing::info!( + "CONNECTING_MAP: NOT removing entry for {} (duplicate connection attempt), map still has {} entries", + peer_id.addr, + self.connecting.len() + ); + } + self.outbound_messages.remove(&peer_id.addr); self.connection_manager.prune_alive_connection(&peer_id); Ok(Event::OutboundConnectionFailed { peer_id, error }) @@ -508,6 +558,12 @@ impl HandshakeHandler { Ok(ForwardResult::Rejected) => { self.outbound_messages.remove(&remote); self.connecting.remove(&remote); + tracing::warn!( + "CONNECTING_MAP: Removed entry for {} (ForwardResult::Rejected), map now has {} entries: {:?}", + remote, + self.connecting.len(), + self.connecting.keys().collect::>() + ); return Ok(Event::InboundConnectionRejected { peer_id: joiner }); } Err(e) => { @@ -520,6 +576,12 @@ impl HandshakeHandler { InternalEvent::DropInboundConnection(addr) => { self.outbound_messages.remove(&addr); self.connecting.remove(&addr); + tracing::warn!( + "CONNECTING_MAP: Removed entry for {} (DropInboundConnection in unconfirmed), map now has {} entries: {:?}", + addr, + self.connecting.len(), + self.connecting.keys().collect::>() + ); continue; } other => { @@ -547,11 +609,23 @@ impl HandshakeHandler { self.connected.remove(&peer.addr); self.outbound_messages.remove(&peer.addr); self.connecting.remove(&peer.addr); + tracing::warn!( + "CONNECTING_MAP: Removed entry for {} (ExternConnection::Dropped), map now has {} entries: {:?}", + peer.addr, + self.connecting.len(), + self.connecting.keys().collect::>() + ); } Some(ExternConnection::DropConnectionByAddr(addr)) => { self.connected.remove(&addr); self.outbound_messages.remove(&addr); self.connecting.remove(&addr); + tracing::warn!( + "CONNECTING_MAP: Removed entry for {} (DropConnectionByAddr), map now has {} entries: {:?}", + addr, + self.connecting.len(), + self.connecting.keys().collect::>() + ); } None => return Err(HandshakeError::ChannelClosed), } @@ -663,11 +737,24 @@ impl HandshakeHandler { return Some(Event::RemoveTransaction(tx)); } self.connecting.insert(addr, tx); + tracing::info!( + "CONNECTING_MAP: Added entry for {} in outbound() (tx: {}), map now has {} entries: {:?}", + addr, + tx, + self.connecting.len(), + self.connecting.keys().collect::>() + ); } if alive_conn.send(op).await.is_err() { self.outbound_messages.remove(&addr); self.connecting.remove(&addr); + tracing::warn!( + "CONNECTING_MAP: Removed entry for {} (send failed), map now has {} entries: {:?}", + addr, + self.connecting.len(), + self.connecting.keys().collect::>() + ); } None } else { @@ -723,6 +810,13 @@ impl HandshakeHandler { return; } self.connecting.insert(remote.addr, transaction); + tracing::info!( + "CONNECTING_MAP: Added entry for {} (tx: {}), map now has {} entries: {:?}", + remote.addr, + transaction, + self.connecting.len(), + self.connecting.keys().collect::>() + ); tracing::debug!("Starting outbound connection to {addr}", addr = remote.addr); let f = self .outbound_conn_handler @@ -753,10 +847,29 @@ impl HandshakeHandler { conn: PeerConnection, max_hops_to_live: usize, ) -> Result<()> { + tracing::info!( + "CONNECTING_MAP_CHECK: Looking for gateway {} in connecting map, connection remote is {}, thread: {:?}", + gw_peer_id.addr, + conn.remote_addr(), + std::thread::current().id() + ); let tx = *self .connecting .get(&gw_peer_id.addr) - .ok_or_else(|| HandshakeError::ConnectionClosed(conn.remote_addr()))?; + .ok_or_else(|| { + tracing::warn!( + "HANDSHAKE_FAIL: No connecting entry found for gateway {}, connecting map has {} entries: {:?}", + gw_peer_id.addr, + self.connecting.len(), + self.connecting.keys().collect::>() + ); + HandshakeError::ConnectionClosed(conn.remote_addr()) + })?; + tracing::info!( + "CONNECTING_MAP_CHECK: Successfully found entry for gateway {} with tx: {}, proceeding with confirmation", + gw_peer_id.addr, + tx + ); let this_peer = self.connection_manager.own_location().peer; tracing::debug!(at=?conn.my_address(), %this_peer.addr, from=%conn.remote_addr(), remote_addr = %gw_peer_id, "Waiting for confirmation from gw"); self.ongoing_outbound_connections.push( @@ -994,11 +1107,22 @@ async fn gw_peer_connection_listener( mut conn: PeerConnection, mut outbound: PeerOutboundMessage, ) -> Result<(InternalEvent, PeerOutboundMessage), HandshakeError> { - tracing::debug!(from=%conn.remote_addr(), "Starting gw_peer_connection_listener"); + tracing::info!( + from=%conn.remote_addr(), + at=?conn.my_address(), + "HANDSHAKE_START: Starting gateway peer connection listener" + ); loop { tokio::select! { msg = outbound.0.recv() => { - let Some(msg) = msg else { break Err(HandshakeError::ConnectionClosed(conn.remote_addr())); }; + let Some(msg) = msg else { + tracing::warn!( + at=?conn.my_address(), + from=%conn.remote_addr(), + "HANDSHAKE_FAIL: Outbound message channel closed during handshake" + ); + break Err(HandshakeError::ConnectionClosed(conn.remote_addr())); + }; tracing::debug!(at=?conn.my_address(), from=%conn.remote_addr() ,"Sending message to peer. Msg: {msg}"); conn @@ -1007,8 +1131,13 @@ async fn gw_peer_connection_listener( } msg = conn.recv() => { let Ok(msg) = msg.map_err(|error| { - tracing::error!(at=?conn.my_address(), from=%conn.remote_addr(), "Error while receiving message: {error}"); + tracing::error!(at=?conn.my_address(), from=%conn.remote_addr(), "HANDSHAKE_FAIL: Error while receiving message: {error}"); }) else { + tracing::warn!( + at=?conn.my_address(), + from=%conn.remote_addr(), + "HANDSHAKE_FAIL: Connection recv failed during handshake" + ); break Err(HandshakeError::ConnectionClosed(conn.remote_addr())); }; let net_message = decode_msg(&msg).unwrap(); diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 26c9c20e8..277d0f82c 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -92,15 +92,60 @@ impl NetworkBridge for P2pBridge { } async fn send(&self, target: &PeerId, msg: NetMessage) -> super::ConnResult<()> { + let send_start = std::time::Instant::now(); + let msg_type = match &msg { + NetMessage::V1(inner) => match inner { + crate::message::NetMessageV1::Put(_) => "PUT".to_string(), + crate::message::NetMessageV1::Get(_) => "GET".to_string(), + crate::message::NetMessageV1::Update(_) => "UPDATE".to_string(), + crate::message::NetMessageV1::Subscribe(_) => "SUBSCRIBE".to_string(), + _ => format!("{inner:?}") + .split('(') + .next() + .unwrap_or("Unknown") + .to_string(), + }, + }; + + tracing::info!( + target = %target, + msg_type = %msg_type, + "MESSAGE_SEND_START: Sending message to peer" + ); + self.log_register .register_events(NetEventLog::from_outbound_msg(&msg, &self.op_manager.ring)) .await; self.op_manager.sending_transaction(target, &msg); - self.ev_listener_tx + + let send_result = self + .ev_listener_tx .send(Left((target.clone(), Box::new(msg)))) - .await - .map_err(|_| ConnectionError::SendNotCompleted(target.clone()))?; - Ok(()) + .await; + + let elapsed = send_start.elapsed(); + + match send_result { + Ok(_) => { + tracing::info!( + target = %target, + msg_type = %msg_type, + elapsed_ms = elapsed.as_millis(), + "MESSAGE_SEND_SUCCESS: Message sent successfully" + ); + Ok(()) + } + Err(e) => { + tracing::error!( + target = %target, + msg_type = %msg_type, + elapsed_ms = elapsed.as_millis(), + error = %e, + "MESSAGE_SEND_FAILED: Failed to send message" + ); + Err(ConnectionError::SendNotCompleted(target.clone())) + } + } } } @@ -575,7 +620,7 @@ impl P2pConnManager { NodeEvent::Disconnect { cause } => { tracing::info!( "Disconnecting from network{}", - cause.map(|c| format!(": {}", c)).unwrap_or_default() + cause.map(|c| format!(": {c}")).unwrap_or_default() ); break; } @@ -621,7 +666,7 @@ impl P2pConnManager { } Err(HandshakeError::ChannelClosed) => Ok(EventResult::Event(ConnEvent::ClosedChannel.into())), Err(e) => { - tracing::warn!("Handshake error: {:?}", e); + tracing::warn!("HANDSHAKE_ERROR: Handshake failed with error: {:?}", e); Ok(EventResult::Continue) } } diff --git a/crates/core/src/node/testing_impl.rs b/crates/core/src/node/testing_impl.rs index f373e35f5..96bcb32e2 100644 --- a/crates/core/src/node/testing_impl.rs +++ b/crates/core/src/node/testing_impl.rs @@ -627,7 +627,7 @@ impl SimNetwork { HashSet::from_iter(self.number_of_gateways..num_nodes + self.number_of_gateways); let mut missing: Vec<_> = expected .difference(&connected) - .map(|n| format!("node-{}", n)) + .map(|n| format!("node-{n}")) .collect(); tracing::info!("Number of simulated nodes: {num_nodes}"); diff --git a/crates/core/src/node/testing_impl/network.rs b/crates/core/src/node/testing_impl/network.rs index 4cef1cadc..30debfab6 100644 --- a/crates/core/src/node/testing_impl/network.rs +++ b/crates/core/src/node/testing_impl/network.rs @@ -44,7 +44,7 @@ impl NetworkPeer { .await .expect("Failed to connect to supervisor"); - let config_url = format!("http://127.0.0.1:3000/v1/config/{}", peer_id); + let config_url = format!("http://127.0.0.1:3000/v1/config/{peer_id}"); let response = reqwest::get(&config_url).await?; let peer_config = response.json::().await?; diff --git a/crates/core/src/operations/get.rs b/crates/core/src/operations/get.rs index 4e0f5a93a..bbe59e415 100644 --- a/crates/core/src/operations/get.rs +++ b/crates/core/src/operations/get.rs @@ -169,8 +169,7 @@ impl Display for GetState { } => { write!( f, - "PrepareRequest(key: {}, id: {}, fetch_contract: {}, subscribe: {})", - key, id, fetch_contract, subscribe + "PrepareRequest(key: {key}, id: {id}, fetch_contract: {fetch_contract}, subscribe: {subscribe})" ) } GetState::AwaitingResponse { @@ -183,9 +182,9 @@ impl Display for GetState { alternatives: _, attempts_at_hop: _, } => { - write!(f, "AwaitingResponse(requester: {:?}, fetch_contract: {}, retries: {}, current_hop: {}, subscribe: {})", requester, fetch_contract, retries, current_hop, subscribe) + write!(f, "AwaitingResponse(requester: {requester:?}, fetch_contract: {fetch_contract}, retries: {retries}, current_hop: {current_hop}, subscribe: {subscribe})") } - GetState::Finished { key, .. } => write!(f, "Finished(key: {})", key), + GetState::Finished { key, .. } => write!(f, "Finished(key: {key})"), } } } diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 285f972dc..93522ed78 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -95,6 +95,11 @@ where return Ok(None); } Err(err) => { + tracing::error!( + %tx_id, + error = %err, + "PUT_ERROR: Operation failed with error" + ); if let Some(sender) = sender { network_bridge .send(&sender, NetMessage::V1(NetMessageV1::Aborted(tx_id))) @@ -313,21 +318,30 @@ async fn start_subscription_request( try_get: bool, skip_list: HashSet, ) { + tracing::info!( + %key, + try_get, + skip_list_size = skip_list.len(), + "SUBSCRIPTION_START: Starting subscription request for contract" + ); + let sub_op = subscribe::start_op(key); if let Err(error) = subscribe::request_subscribe(op_manager, sub_op).await { if !try_get { - tracing::warn!(%error, "Error subscribing to contract"); + tracing::warn!(%error, %key, "SUBSCRIPTION_ERROR: Error subscribing to contract"); return; } if let OpError::ContractError(ContractError::ContractNotFound(key)) = &error { - tracing::debug!(%key, "Contract not found, trying to get it first"); + tracing::debug!(%key, "SUBSCRIPTION_RETRY: Contract not found, trying to get it first"); let get_op = get::start_op(*key, true, true); if let Err(error) = get::request_get(op_manager, get_op, skip_list).await { - tracing::warn!(%error, "Error getting contract"); + tracing::warn!(%error, %key, "SUBSCRIPTION_ERROR: Error getting contract"); } } else { - tracing::warn!(%error, "Error subscribing to contract"); + tracing::warn!(%error, %key, "SUBSCRIPTION_ERROR: Error subscribing to contract"); } + } else { + tracing::info!(%key, "SUBSCRIPTION_SUCCESS: Successfully subscribed to contract"); } } diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index d93c870c8..94dbfa1ef 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -140,6 +140,7 @@ impl Operation for PutOp { let return_msg; let new_state; let stats = self.stats; + let old_state = format!("{:?}", self.state); // Capture before moving match input { PutMsg::RequestPut { @@ -226,6 +227,15 @@ impl Operation for PutOp { let should_seed = op_manager.ring.should_seed(&key); let should_handle_locally = !is_subscribed_contract && should_seed; + tracing::info!( + tx = %id, + %key, + is_subscribed_contract, + should_seed, + should_handle_locally, + "PUT_SEEDING_DECISION: Evaluating if node should cache contract" + ); + tracing::debug!( tx = %id, %key, @@ -431,6 +441,12 @@ impl Operation for PutOp { ); // Subscriber nodes have been notified of the change, the operation is completed + tracing::info!( + tx = %id, + key = %key, + target = %upstream.peer, + "PUT_SUCCESS_SEND: Broadcasting complete, sending SuccessfulPut upstream" + ); return_msg = Some(PutMsg::SuccessfulPut { id: *id, target: upstream.clone(), @@ -440,6 +456,11 @@ impl Operation for PutOp { new_state = None; } PutMsg::SuccessfulPut { id, .. } => { + tracing::info!( + tx = %id, + current_state = ?self.state, + "PUT_SUCCESS_MSG: Received SuccessfulPut message" + ); match self.state { Some(PutState::AwaitingResponse { key, @@ -519,6 +540,12 @@ impl Operation for PutOp { // Forward success message upstream if needed if let Some(upstream) = upstream { + tracing::info!( + tx = %id, + key = %key, + target = %upstream.peer, + "PUT_SUCCESS_SEND: Contract stored, sending SuccessfulPut upstream" + ); return_msg = Some(PutMsg::SuccessfulPut { id: *id, target: upstream, @@ -675,6 +702,14 @@ impl Operation for PutOp { _ => return Err(OpError::UnexpectedOpState), } + tracing::info!( + tx = %self.id, + old_state = %old_state, + new_state = ?new_state, + has_return_msg = return_msg.is_some(), + "PUT_STATE_TRANSITION: PUT operation state change" + ); + build_op_result(self.id, new_state, return_msg, stats) }) } @@ -768,6 +803,12 @@ async fn try_to_broadcast( return Err(OpError::StatePushed); } else { new_state = None; + tracing::info!( + tx = %id, + key = %key, + target = %upstream.peer, + "PUT_SUCCESS_SEND: Final hop complete, sending SuccessfulPut upstream" + ); return_msg = Some(PutMsg::SuccessfulPut { id, target: upstream, @@ -810,6 +851,7 @@ pub(crate) fn start_op( } } +#[derive(Debug)] pub enum PutState { ReceivedRequest, /// Preparing request for put op. @@ -908,6 +950,12 @@ async fn put_contract( related_contracts: RelatedContracts<'static>, contract: &ContractContainer, ) -> Result { + tracing::info!( + %key, + state_size = state.size(), + "PUT_CONTRACT_START: Attempting to cache contract locally" + ); + // after the contract has been cached, push the update query match op_manager .notify_contract_handler(ContractHandlerEvent::PutQuery { @@ -920,16 +968,29 @@ async fn put_contract( { Ok(ContractHandlerEvent::PutResponse { new_value: Ok(new_val), - }) => Ok(new_val), + }) => { + tracing::info!( + %key, + new_state_size = new_val.size(), + "PUT_CONTRACT_SUCCESS: Contract cached successfully" + ); + Ok(new_val) + } Ok(ContractHandlerEvent::PutResponse { new_value: Err(err), }) => { - tracing::error!(%key, "Failed to update contract value: {}", err); + tracing::error!(%key, error = %err, "PUT_CONTRACT_ERROR: Failed to update contract value"); Err(OpError::from(err)) // TODO: not a valid value update, notify back to requester } - Err(err) => Err(err.into()), - Ok(_) => Err(OpError::UnexpectedOpState), + Err(err) => { + tracing::error!(%key, error = %err, "PUT_CONTRACT_ERROR: Contract handler error"); + Err(err.into()) + } + Ok(other) => { + tracing::error!(%key, response = ?other, "PUT_CONTRACT_ERROR: Unexpected contract handler response"); + Err(OpError::UnexpectedOpState) + } } } diff --git a/crates/core/src/ring/location.rs b/crates/core/src/ring/location.rs index 1539838d2..6f8807628 100644 --- a/crates/core/src/ring/location.rs +++ b/crates/core/src/ring/location.rs @@ -287,8 +287,7 @@ mod test { let gap = sorted_locs[i] - sorted_locs[i - 1]; assert!( gap < max_acceptable_gap, - "Found too large gap ({}) between consecutive locations", - gap + "Found too large gap ({gap}) between consecutive locations" ); } @@ -296,8 +295,7 @@ mod test { let wrap_gap = 1.0 - sorted_locs.last().unwrap() + sorted_locs[0]; assert!( wrap_gap < max_acceptable_gap, - "Found too large wrap-around gap ({})", - wrap_gap + "Found too large wrap-around gap ({wrap_gap})" ); } diff --git a/crates/core/src/router/isotonic_estimator.rs b/crates/core/src/router/isotonic_estimator.rs index a14e7d888..9a82ba228 100644 --- a/crates/core/src/router/isotonic_estimator.rs +++ b/crates/core/src/router/isotonic_estimator.rs @@ -265,7 +265,7 @@ mod tests { // Check that the errors are small let average_error = errors.iter().sum::() / errors.len() as f64; - println!("Average error: {}", average_error); + println!("Average error: {average_error}"); assert!(average_error < 0.01); } @@ -302,7 +302,7 @@ mod tests { // Check that the errors are small let average_error = errors.iter().sum::() / errors.len() as f64; - println!("Average error: {}", average_error); + println!("Average error: {average_error}"); assert!(average_error < 0.01); } diff --git a/crates/core/src/server/errors.rs b/crates/core/src/server/errors.rs index f62c79467..fd8827c75 100644 --- a/crates/core/src/server/errors.rs +++ b/crates/core/src/server/errors.rs @@ -34,10 +34,10 @@ impl WebSocketApiError { pub fn error_message(&self) -> String { match self { WebSocketApiError::InvalidParam { error_cause } => { - format!("Invalid request params: {}", error_cause) + format!("Invalid request params: {error_cause}") } - WebSocketApiError::NodeError { error_cause } => format!("Node error: {}", error_cause), - WebSocketApiError::AxumError { error } => format!("Server error: {}", error), + WebSocketApiError::NodeError { error_cause } => format!("Node error: {error_cause}"), + WebSocketApiError::AxumError { error } => format!("Server error: {error}"), WebSocketApiError::MissingContract { key } => format!("Missing contract {key}"), } } diff --git a/crates/core/src/server/http_gateway/v1.rs b/crates/core/src/server/http_gateway/v1.rs index 7dfe934f8..1de4cbf85 100644 --- a/crates/core/src/server/http_gateway/v1.rs +++ b/crates/core/src/server/http_gateway/v1.rs @@ -79,7 +79,7 @@ async fn web_home( async fn web_subpages( Path((key, last_path)): Path<(String, String)>, ) -> Result { - let full_path: String = format!("/v1/contract/web/{}/{}", key, last_path); + let full_path: String = format!("/v1/contract/web/{key}/{last_path}"); path_handlers::variable_content(key, full_path) .await .map_err(|e| *e) diff --git a/crates/core/src/topology/mod.rs b/crates/core/src/topology/mod.rs index 5868322ef..f877256b0 100644 --- a/crates/core/src/topology/mod.rs +++ b/crates/core/src/topology/mod.rs @@ -690,7 +690,7 @@ mod tests { assert_eq!(peers.len(), 1); assert_eq!(peers[0], *worst_peer); } - _ => panic!("Expected to remove a peer, adjustment was {:?}", adjustment), + _ => panic!("Expected to remove a peer, adjustment was {adjustment:?}"), } }); } @@ -739,10 +739,7 @@ mod tests { assert!(locations[0] >= peers[0].location.unwrap()); assert!(locations[0] <= peers[1].location.unwrap()); } - _ => panic!( - "Expected to add a connection, adjustment was {:?}", - adjustment - ), + _ => panic!("Expected to add a connection, adjustment was {adjustment:?}"), } }); } @@ -777,7 +774,7 @@ mod tests { match adjustment { TopologyAdjustment::NoChange => {} - _ => panic!("Expected no adjustment, adjustment was {:?}", adjustment), + _ => panic!("Expected no adjustment, adjustment was {adjustment:?}"), } }); } @@ -822,7 +819,7 @@ mod tests { assert_eq!(location, my_location); } } - _ => panic!("Expected AddConnections, but was: {:?}", adjustment), + _ => panic!("Expected AddConnections, but was: {adjustment:?}"), } }); } diff --git a/crates/core/src/topology/small_world_rand.rs b/crates/core/src/topology/small_world_rand.rs index 787f98374..81d7a5d2c 100644 --- a/crates/core/src/topology/small_world_rand.rs +++ b/crates/core/src/topology/small_world_rand.rs @@ -67,8 +67,7 @@ mod tests { // Check if p_value is above 0.05, indicating that we fail to reject the null hypothesis assert!( p_value > 0.05, - "Chi-squared test failed, p_value = {}", - p_value + "Chi-squared test failed, p_value = {p_value}" ); } } diff --git a/crates/core/src/transport/connection_handler.rs b/crates/core/src/transport/connection_handler.rs index 4ac351ef8..55b229eae 100644 --- a/crates/core/src/transport/connection_handler.rs +++ b/crates/core/src/transport/connection_handler.rs @@ -139,7 +139,11 @@ impl OutboundConnectionHandler { ) -> Result<(Self, mpsc::Receiver), TransportError> { // Channel buffer is one so senders will await until the receiver is ready, important for bandwidth limiting let (conn_handler_sender, conn_handler_receiver) = mpsc::channel(100); - let (new_connection_sender, new_connection_notifier) = mpsc::channel(10); + // Increase buffer size to prevent blocking - gateways can have many connections + let (new_connection_sender, new_connection_notifier) = mpsc::channel(1000); + tracing::info!( + "Creating connection handler with new_connection_notifier buffer size: 1000 (was 10)" + ); // Channel buffer is one so senders will await until the receiver is ready, important for bandwidth limiting let (outbound_sender, outbound_recv) = mpsc::channel(100); @@ -327,6 +331,13 @@ impl UdpPacketsListener { let dropped_count = self.dropped_packets.entry(remote_addr).or_insert(0); *dropped_count += 1; + // INSTRUMENTATION: Log every channel overflow immediately + tracing::warn!( + %remote_addr, + dropped_count = *dropped_count, + "CHANNEL_OVERFLOW: Dropping packet due to full channel (buffer size: 100)" + ); + // Log warning every 10 seconds if packets are being dropped let now = Instant::now(); if now.duration_since(self.last_drop_warning) > Duration::from_secs(10) { @@ -440,12 +451,24 @@ impl UdpPacketsListener { self.remote_connections.insert(remote_addr, inbound_remote_connection); - if self.new_connection_notifier - .send(PeerConnection::new(outbound_remote_conn)) - .await - .is_err() { - tracing::error!(%remote_addr, "gateway connection established but failed to notify new connection"); - break 'outer Err(TransportError::ConnectionClosed(self.this_addr)); + // CRITICAL: Use try_send to avoid blocking the entire UDP listener + match self.new_connection_notifier + .try_send(PeerConnection::new(outbound_remote_conn)) { + Ok(()) => { + tracing::debug!(%remote_addr, "Successfully notified new gateway connection"); + } + Err(mpsc::error::TrySendError::Full(_)) => { + tracing::error!( + %remote_addr, + "CRITICAL: new_connection_notifier channel is FULL (capacity: 10). This blocks all packet processing!" + ); + // TODO: We should handle this better - maybe increase buffer size or process connections differently + break 'outer Err(TransportError::ConnectionClosed(self.this_addr)); + } + Err(mpsc::error::TrySendError::Closed(_)) => { + tracing::error!(%remote_addr, "new_connection_notifier channel is closed"); + break 'outer Err(TransportError::ConnectionClosed(self.this_addr)); + } } sent_tracker.lock().report_sent_packet( @@ -1097,9 +1120,9 @@ mod version_cmp { }; if !flags_str.is_empty() { - format!("{}.{}.{}-{}", major, minor, patch, flags_str) + format!("{major}.{minor}.{patch}-{flags_str}") } else { - format!("{}.{}.{}", major, minor, patch) + format!("{major}.{minor}.{patch}") } } @@ -1122,8 +1145,7 @@ mod version_cmp { // Step 3: Compare the decoded string with the original version string assert_eq!( decoded, version_str, - "Failed for version string '{}', decoded as '{}'", - version_str, decoded + "Failed for version string '{version_str}', decoded as '{decoded}'" ); } diff --git a/crates/core/src/transport/mod.rs b/crates/core/src/transport/mod.rs index 12f21c072..c3b45888a 100644 --- a/crates/core/src/transport/mod.rs +++ b/crates/core/src/transport/mod.rs @@ -131,7 +131,7 @@ mod tests { // Simulate resending packet sent_tracker.report_sent_packet(id, packet); } - _ => panic!("Expected resend action for packet {}", id), + _ => panic!("Expected resend action for packet {id}"), } } } diff --git a/crates/core/src/transport/packet_data.rs b/crates/core/src/transport/packet_data.rs index d14f33853..218f12c00 100644 --- a/crates/core/src/transport/packet_data.rs +++ b/crates/core/src/transport/packet_data.rs @@ -321,7 +321,7 @@ mod tests { // Ensure decrypted data matches original assert_eq!(&decrypted_data.data(), &original_data.data()); } - Err(e) => panic!("Decryption failed with error: {:?}", e), + Err(e) => panic!("Decryption failed with error: {e:?}"), } } diff --git a/crates/core/src/transport/peer_connection.rs b/crates/core/src/transport/peer_connection.rs index 61b73f81d..b25c41ade 100644 --- a/crates/core/src/transport/peer_connection.rs +++ b/crates/core/src/transport/peer_connection.rs @@ -523,10 +523,20 @@ impl PeerConnection { continue; } } + let process_start = std::time::Instant::now(); if let Some(msg) = self.process_inbound(payload).await.map_err(|error| { tracing::error!(%error, %packet_id, remote = %self.remote_conn.remote_addr, "error processing inbound packet"); error })? { + let process_elapsed = process_start.elapsed(); + if process_elapsed > std::time::Duration::from_millis(50) { + tracing::warn!( + %packet_id, + remote = %self.remote_conn.remote_addr, + elapsed_ms = process_elapsed.as_millis(), + "SLOW inbound packet processing!" + ); + } tracing::trace!(%packet_id, "returning full stream message"); return Ok(msg); } diff --git a/crates/core/src/transport/peer_connection/outbound_stream.rs b/crates/core/src/transport/peer_connection/outbound_stream.rs index fa19e1285..41af4909d 100644 --- a/crates/core/src/transport/peer_connection/outbound_stream.rs +++ b/crates/core/src/transport/peer_connection/outbound_stream.rs @@ -266,14 +266,12 @@ mod tests { // But with 8 packets and 1 packet per 10ms batch, actual time is ~70-80ms // Allow margin for processing overhead and timing precision println!( - "Transfer took: {:?}, packets sent: {}, expected: {}", - elapsed, packet_count, expected_packets + "Transfer took: {elapsed:?}, packets sent: {packet_count}, expected: {expected_packets}" ); - println!("Bytes per packet: ~{}", MAX_DATA_SIZE); + println!("Bytes per packet: ~{MAX_DATA_SIZE}"); assert!( elapsed.as_millis() >= 60, - "Transfer completed too quickly: {:?}", - elapsed + "Transfer completed too quickly: {elapsed:?}" ); Ok(()) @@ -338,8 +336,7 @@ mod tests { // Without rate limiting, should complete very quickly (< 50ms) assert!( elapsed.as_millis() < 50, - "Transfer took too long without rate limit: {:?}", - elapsed + "Transfer took too long without rate limit: {elapsed:?}" ); Ok(()) diff --git a/crates/core/src/transport/rate_limiter.rs b/crates/core/src/transport/rate_limiter.rs index eb8a915fd..9e2a08ee2 100644 --- a/crates/core/src/transport/rate_limiter.rs +++ b/crates/core/src/transport/rate_limiter.rs @@ -40,13 +40,58 @@ impl PacketRateLimiter { socket: Arc, ) { tracing::info!("Rate limiter task started"); + let mut packet_count = 0u64; + let mut last_log_time = Instant::now(); + while let Some((socket_addr, packet)) = self.outbound_packets.recv().await { - // tracing::trace!(%socket_addr, packet_len = %packet.len(), "Sending outbound packet"); + packet_count += 1; + + // Log statistics every 10 seconds + if last_log_time.elapsed() > Duration::from_secs(10) { + tracing::info!( + "OUTBOUND_STATS: {} packets sent in last 10s, channel backlog: {}", + packet_count, + self.outbound_packets.len() + ); + packet_count = 0; + last_log_time = Instant::now(); + } + + // INSTRUMENTATION: Track channel depth + let channel_len = self.outbound_packets.len(); + if channel_len > 50 { + tracing::warn!( + %socket_addr, + channel_depth = channel_len, + "CHANNEL_BACKLOG: Outbound packet channel backing up" + ); + } + if let Some(bandwidth_limit) = bandwidth_limit { self.rate_limiting(bandwidth_limit, &*socket, packet, socket_addr) .await; } else { - let _ = socket.send_to(&packet, socket_addr).await; + let send_start = Instant::now(); + match socket.send_to(&packet, socket_addr).await { + Ok(bytes_sent) => { + let elapsed = send_start.elapsed(); + if elapsed > Duration::from_millis(50) { + tracing::warn!( + %socket_addr, + elapsed_ms = elapsed.as_millis(), + bytes_sent, + "SLOW_UDP_SEND: UDP send took longer than 50ms" + ); + } + } + Err(e) => { + tracing::error!( + %socket_addr, + error = %e, + "UDP_SEND_FAILED: Failed to send packet" + ); + } + } } } tracing::debug!("Rate limiter task ended unexpectedly"); diff --git a/crates/core/src/transport/symmetric_message.rs b/crates/core/src/transport/symmetric_message.rs index ddcda62cc..1b52b4230 100644 --- a/crates/core/src/transport/symmetric_message.rs +++ b/crates/core/src/transport/symmetric_message.rs @@ -262,8 +262,7 @@ impl std::fmt::Display for SymmetricMessagePayload { .. } => write!( f, - "StreamFragment: (stream id: {:?}, fragment no: {:?}) ", - stream_id, fragment_number + "StreamFragment: (stream id: {stream_id:?}, fragment no: {fragment_number:?}) " ), SymmetricMessagePayload::NoOp => write!(f, "NoOp"), } diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs index 08c6323ce..c3f95a1fb 100644 --- a/crates/core/src/util/mod.rs +++ b/crates/core/src/util/mod.rs @@ -271,8 +271,7 @@ pub(crate) mod test { .unwrap(); assert!( total > Duration::from_secs(18) && total < Duration::from_secs(20), - "total: {:?}", - total + "total: {total:?}" ); let base = Duration::from_millis(600); diff --git a/crates/core/src/wasm_runtime/tests/contract_metering.rs b/crates/core/src/wasm_runtime/tests/contract_metering.rs index 802aed22f..3597dd562 100644 --- a/crates/core/src/wasm_runtime/tests/contract_metering.rs +++ b/crates/core/src/wasm_runtime/tests/contract_metering.rs @@ -52,7 +52,7 @@ fn validate_state_metering() -> Result<(), Box> { ); let duration = time.elapsed().as_secs_f64(); - println!("Duration: {:.2}s", duration); + println!("Duration: {duration:.2}s"); assert!(duration < 5.0, "Should not timeout"); assert!( @@ -103,7 +103,7 @@ fn test_update_state_metering() -> Result<(), Box> { ); let duration = time.elapsed().as_secs_f64(); - println!("Duration: {:.2}s", duration); + println!("Duration: {duration:.2}s"); assert!(duration < 5.0, "Should not timeout"); assert!( @@ -150,7 +150,7 @@ fn test_summarize_state_metering() -> Result<(), Box> { let result = runtime.summarize_state(&contract_key, &Parameters::from([].as_ref()), &state); let duration = time.elapsed().as_secs_f64(); - println!("Duration: {:.2}s", duration); + println!("Duration: {duration:.2}s"); assert!(duration < 5.0, "Should not timeout"); assert!( @@ -202,7 +202,7 @@ fn test_get_state_delta_metering() -> Result<(), Box> { ); let duration = time.elapsed().as_secs_f64(); - println!("Duration: {:.2}s", duration); + println!("Duration: {duration:.2}s"); assert!(duration < 5.0, "Should not timeout"); assert!( @@ -254,11 +254,7 @@ fn test_timeout_metering() -> Result<(), Box> { ); let duration = time.elapsed().as_secs_f64(); - assert!( - duration < 5.5, - "Took {:.2}s, should timeout before", - duration - ); + assert!(duration < 5.5, "Took {duration:.2}s, should timeout before"); assert!( matches!( diff --git a/crates/fdev/src/commands/v1.rs b/crates/fdev/src/commands/v1.rs index f9970718f..01df01ef7 100644 --- a/crates/fdev/src/commands/v1.rs +++ b/crates/fdev/src/commands/v1.rs @@ -16,8 +16,7 @@ pub(super) async fn start_api_client(cfg: BaseConfig) -> anyhow::Result }; let (stream, _) = tokio_tungstenite::connect_async(&format!( - "ws://{}/v1/contract/command?encodingProtocol=native", - target + "ws://{target}/v1/contract/command?encodingProtocol=native" )) .await .map_err(|e| { diff --git a/crates/fdev/src/diagnostics.rs b/crates/fdev/src/diagnostics.rs index 8c6ae5249..3687be372 100644 --- a/crates/fdev/src/diagnostics.rs +++ b/crates/fdev/src/diagnostics.rs @@ -66,10 +66,10 @@ pub async fn diagnostics(base_cfg: BaseConfig, contract_keys: Vec) -> an } ); if let Some(listening_address) = &node_info.listening_address { - println!(" Listening Address: {}", listening_address); + println!(" Listening Address: {listening_address}"); } if let Some(location) = &node_info.location { - println!(" Location: {}", location); + println!(" Location: {location}"); } println!(); } diff --git a/crates/fdev/src/testing/network.rs b/crates/fdev/src/testing/network.rs index 6bfd6e397..d2f2b7e23 100644 --- a/crates/fdev/src/testing/network.rs +++ b/crates/fdev/src/testing/network.rs @@ -149,8 +149,7 @@ impl SubProcess { .spawn() .map_err(|e| { NetworkSimulationError::SubProcessStartFailure(format!( - "Failed to start subprocess: {}", - e + "Failed to start subprocess: {e}" )) })?; @@ -164,10 +163,7 @@ impl SubProcess { async fn start_supervisor(config: &TestConfig) -> anyhow::Result<(), Error> { let mut network = super::config_sim_network(config).await.map_err(|e| { - NetworkSimulationError::NetworkError(format!( - "Failed to configure simulation network: {}", - e - )) + NetworkSimulationError::NetworkError(format!("Failed to configure simulation network: {e}")) })?; let supervisor = Arc::new(Supervisor::new(&mut network).await); @@ -295,7 +291,7 @@ async fn config_handler( match config.get(&id) { Some(node_config) => axum::response::Json(node_config.clone()).into_response(), None => { - let body = format!("No config found for peer_id: {}", peer_id); + let body = format!("No config found for peer_id: {peer_id}"); let response = Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::from(body)) @@ -368,7 +364,7 @@ async fn handle_incoming_messages( // Handle the received message or log the error. match result { Ok(message) => process_message(message, supervisor).await?, - Err(e) => eprintln!("Error in WebSocket communication: {}", e), + Err(e) => eprintln!("Error in WebSocket communication: {e}"), } } Ok(()) diff --git a/crates/fdev/src/testing/network/v1.rs b/crates/fdev/src/testing/network/v1.rs index 54afb50af..954881254 100644 --- a/crates/fdev/src/testing/network/v1.rs +++ b/crates/fdev/src/testing/network/v1.rs @@ -30,7 +30,7 @@ pub async fn start_server_v1(supervisor: Arc) -> Result<(), NetworkS axum::serve(listener, router) .await - .map_err(|e| NetworkSimulationError::ServerStartFailure(format!("Server error: {}", e))) + .map_err(|e| NetworkSimulationError::ServerStartFailure(format!("Server error: {e}"))) }); startup_receiver diff --git a/crates/fdev/src/wasm_runtime/state/v1.rs b/crates/fdev/src/wasm_runtime/state/v1.rs index d164cf484..a68155f35 100644 --- a/crates/fdev/src/wasm_runtime/state/v1.rs +++ b/crates/fdev/src/wasm_runtime/state/v1.rs @@ -4,8 +4,7 @@ impl AppState { pub async fn new_v1(config: &ExecutorConfig) -> anyhow::Result { let target: SocketAddr = (config.address, config.port).into(); let (stream, _) = tokio_tungstenite::connect_async(&format!( - "ws://{}/v1/contract/command?encodingProtocol=native", - target + "ws://{target}/v1/contract/command?encodingProtocol=native" )) .await .map_err(|e| { diff --git a/deploy-to-ziggy.sh b/deploy-to-ziggy.sh new file mode 100755 index 000000000..e0e74edcf --- /dev/null +++ b/deploy-to-ziggy.sh @@ -0,0 +1,33 @@ +#!/bin/bash +# Deploy instrumented freenet binary to ziggy + +set -e + +echo "Deploying instrumented freenet binary to ziggy..." + +# Stop the service first +echo "Stopping freenet service..." +sudo systemctl stop freenet + +# Wait for process to fully stop +sleep 2 + +# Backup existing binary +sudo cp /usr/local/bin/freenet /usr/local/bin/freenet.backup.$(date +%Y%m%d-%H%M%S) + +# Copy new binary +sudo cp /tmp/freenet-instrumented /usr/local/bin/freenet +sudo chmod +x /usr/local/bin/freenet + +# Start freenet service +echo "Starting freenet service..." +sudo systemctl start freenet + +# Wait for service to start +sleep 5 + +# Check status +sudo systemctl status freenet | head -20 + +echo "Deployment complete! Monitor logs with:" +echo "sudo journalctl -u freenet -f | grep -E '(SLOW|contract|PUT|GET|elapsed_ms)'" \ No newline at end of file diff --git a/monitor-vega-issue.sh b/monitor-vega-issue.sh new file mode 100755 index 000000000..cd5c2f06a --- /dev/null +++ b/monitor-vega-issue.sh @@ -0,0 +1,67 @@ +#!/bin/bash + +# Monitor vega for UDP backlog and contract timing issues +# This script will alert when the issue manifests + +echo "Starting vega monitoring for UDP backlog and contract timing..." +echo "Press Ctrl+C to stop" + +ALERT_FILE="/tmp/vega-issue-detected.log" +rm -f "$ALERT_FILE" + +check_udp_backlog() { + local recv_q=$(ssh freenet@vega.locut.us "ss -lun sport = :31337" 2>/dev/null | awk 'NR==2 {print $2}') + if [[ -n "$recv_q" && "$recv_q" -gt 1000 ]]; then + echo "$(date): ALERT! UDP backlog detected: ${recv_q} bytes" | tee -a "$ALERT_FILE" + return 0 + fi + return 1 +} + +check_contract_timing() { + local slow_ops=$(ssh freenet@vega.locut.us "sudo journalctl -u freenet-gateway --since='1 minute ago' | grep -E 'SLOW.*execution|blocked message pipeline'" 2>/dev/null) + if [[ -n "$slow_ops" ]]; then + echo "$(date): ALERT! Slow contract operations detected:" | tee -a "$ALERT_FILE" + echo "$slow_ops" | tee -a "$ALERT_FILE" + return 0 + fi + return 1 +} + +check_channel_overflow() { + local overflow=$(ssh freenet@vega.locut.us "sudo journalctl -u freenet-gateway --since='1 minute ago' | grep -E 'Channel overflow|channel full|dropping packets'" 2>/dev/null) + if [[ -n "$overflow" ]]; then + echo "$(date): ALERT! Channel overflow detected:" | tee -a "$ALERT_FILE" + echo "$overflow" | tee -a "$ALERT_FILE" + return 0 + fi + return 1 +} + +while true; do + echo -n "$(date '+%H:%M:%S'): Checking... " + + issue_found=false + + if check_udp_backlog; then + issue_found=true + fi + + if check_contract_timing; then + issue_found=true + fi + + if check_channel_overflow; then + issue_found=true + fi + + if [[ "$issue_found" == "false" ]]; then + echo "OK" + else + echo -e "\n$(date): Issue detected! Check $ALERT_FILE for details" + # Also capture full logs when issue is detected + ssh freenet@vega.locut.us "sudo journalctl -u freenet-gateway --since='5 minutes ago'" > "/tmp/vega-issue-logs-$(date +%Y%m%d-%H%M%S).log" + fi + + sleep 30 +done \ No newline at end of file diff --git a/monitor-ziggy-timing.sh b/monitor-ziggy-timing.sh new file mode 100755 index 000000000..1fba29ab7 --- /dev/null +++ b/monitor-ziggy-timing.sh @@ -0,0 +1,22 @@ +#!/bin/bash +# Monitor ziggy for contract execution timing issues + +echo "Starting ziggy monitoring for contract timing..." +echo "This will capture SLOW operations and contract PUT/GET timing" +echo "Press Ctrl+C to stop" + +# Monitor logs for timing issues +ssh ian@ziggy "sudo journalctl -u freenet -f | grep -E '(SLOW|contract PUT|contract GET|elapsed_ms|Starting contract|Timeout)' --line-buffered" | while read -r line; do + timestamp=$(date +"%H:%M:%S") + echo "[$timestamp] $line" + + # Alert on slow operations + if echo "$line" | grep -q "SLOW"; then + echo "*** ALERT: SLOW OPERATION DETECTED ***" + fi + + # Alert on contract operations + if echo "$line" | grep -q "Starting contract"; then + echo ">>> Contract operation started..." + fi +done \ No newline at end of file diff --git a/test-river-invite-vega.js b/test-river-invite-vega.js new file mode 100755 index 000000000..163fedd19 --- /dev/null +++ b/test-river-invite-vega.js @@ -0,0 +1,62 @@ +#!/usr/bin/env node + +const WebSocket = require('ws'); +const crypto = require('crypto'); + +// Test River invitation flow to trigger contract operations on vega +const VEGA_URL = 'ws://vega.locut.us:50509/ws'; +const TEST_CONTRACT_KEY = 'BcfxyjCH4snaknrBoCiqhYc9UFvmiJvhsp5d4L5DuvRa'; + +console.log('Testing River invitation flow to trigger contract operations on vega...'); +console.log(`Connecting to: ${VEGA_URL}`); + +const ws = new WebSocket(VEGA_URL); + +ws.on('open', () => { + console.log('Connected to vega WebSocket'); + + // Send a GET request for River contract to trigger WASM execution + const getMessage = { + type: 'get', + key: TEST_CONTRACT_KEY, + fetch_contract: true, + id: crypto.randomBytes(16).toString('hex') + }; + + console.log('Sending GET request:', getMessage); + ws.send(JSON.stringify(getMessage)); + + // Also send a PUT to trigger more WASM execution + setTimeout(() => { + const putMessage = { + type: 'put', + key: TEST_CONTRACT_KEY, + state: { + data: Buffer.from('test state data').toString('base64') + }, + id: crypto.randomBytes(16).toString('hex') + }; + + console.log('Sending PUT request:', putMessage); + ws.send(JSON.stringify(putMessage)); + }, 1000); +}); + +ws.on('message', (data) => { + console.log('Received:', data.toString()); +}); + +ws.on('error', (error) => { + console.error('WebSocket error:', error); +}); + +ws.on('close', () => { + console.log('WebSocket closed'); +}); + +// Keep connection open for 30 seconds to see if it times out +setTimeout(() => { + console.log('Test complete, closing connection'); + ws.close(); + process.exit(0); +}, 30000); \ No newline at end of file diff --git a/test-river-invite.js b/test-river-invite.js new file mode 100644 index 000000000..3219242bc --- /dev/null +++ b/test-river-invite.js @@ -0,0 +1,118 @@ +// Test script for River room invitation flow +// This script creates a room and tests the member synchronization + +const RIVER_URL = 'http://127.0.0.1:50509/v1/contract/web/BcfxyjCH4snaknrBoCiqhYc9UFvmiJvhsp5d4L5DuvRa/'; + +async function sleep(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +async function testRiverInvite() { + console.log('Starting River invitation test...'); + + // Step 1: Navigate to River in first tab (room creator) + console.log('1. Opening River for room creator (Alice)...'); + await playwright.navigate({ url: RIVER_URL }); + await sleep(2000); + + // Step 2: Create a room + console.log('2. Creating room...'); + // Click create room button in sidebar + await playwright.click({ selector: '.room-actions button.create' }); + await sleep(500); + + // Fill in room details - target the last modal (most recent) + await playwright.fill({ + selector: '.modal:last-of-type .field:has(label:has-text("Room Name")) input' , + value: 'Test Room ' + Date.now() + }); + await playwright.fill({ + selector: '.modal:last-of-type .field:has(label:has-text("Your Nickname")) input', + value: 'Alice' + }); + + // Click the create button in the last modal + await playwright.click({ selector: '.modal:last-of-type button.is-primary' }); + await sleep(3000); + + console.log('3. Room created, taking screenshot...'); + await playwright.screenshot({ name: 'room-created', fullPage: true }); + + // Step 3: Create invite link + console.log('4. Creating invite link...'); + await playwright.click({ selector: '.member-actions button.invite' }); + await sleep(1000); + + // Get the invite link + const inviteLinkElement = await playwright.evaluate({ + script: `document.querySelector('.modal.is-active input[readonly]')?.value || ''` + }); + + console.log('5. Invite link:', inviteLinkElement); + + if (!inviteLinkElement) { + console.error('Failed to get invite link!'); + return; + } + + // Step 4: Open invite link in new tab + console.log('6. Opening invite link in new tab for Bob...'); + await playwright.clickAndSwitchTab({ selector: 'body' }); // Click body to ensure we can switch tabs + await playwright.navigate({ url: inviteLinkElement }); + await sleep(3000); + + console.log('7. Invite page loaded, taking screenshot...'); + await playwright.screenshot({ name: 'invite-page', fullPage: true }); + + // Step 5: Accept invitation + console.log('8. Accepting invitation...'); + // Fill in nickname + await playwright.fill({ + selector: '.modal.is-active input[type="text"]', + value: 'Bob' + }); + + // Click accept + await playwright.click({ selector: '.modal.is-active button:has-text("Accept")' }); + + console.log('9. Waiting for room to load...'); + await sleep(5000); + + console.log('10. Taking screenshot of Bob\'s view...'); + await playwright.screenshot({ name: 'bob-room-view', fullPage: true }); + + // Step 6: Check member list in Bob's view + const bobMemberList = await playwright.evaluate({ + script: `Array.from(document.querySelectorAll('.member-list-list li')).map(li => li.textContent.trim())` + }); + console.log('11. Members visible to Bob:', bobMemberList); + + // Step 7: Switch back to Alice's tab + console.log('12. Switching back to Alice\'s tab...'); + await playwright.goBack(); + await sleep(2000); + + // Step 8: Check member list in Alice's view + const aliceMemberList = await playwright.evaluate({ + script: `Array.from(document.querySelectorAll('.member-list-list li')).map(li => li.textContent.trim())` + }); + console.log('13. Members visible to Alice:', aliceMemberList); + + console.log('14. Taking final screenshot of Alice\'s view...'); + await playwright.screenshot({ name: 'alice-final-view', fullPage: true }); + + // Analysis + console.log('\n=== Test Results ==='); + console.log('Alice sees members:', aliceMemberList); + console.log('Bob sees members:', bobMemberList); + + if (aliceMemberList.length === 2 && bobMemberList.length === 2) { + console.log('✅ SUCCESS: Both users see both members!'); + } else { + console.log('❌ FAIL: Member lists are not synchronized properly'); + console.log('Expected: Both users should see 2 members (Alice and Bob)'); + } +} + +// Run the test +testRiverInvite().catch(console.error); \ No newline at end of file