From 711e1a0d93d796383da3e818e7ae8e7c30dab07d Mon Sep 17 00:00:00 2001 From: Dustin Black Date: Wed, 10 Sep 2025 15:22:52 +0200 Subject: [PATCH 1/5] feat: Add server-side CPU affinity This commit introduces the ability to pin the benchmark server to a specific CPU core using the --server-affinity command-line argument. This feature is useful for creating a more stable and repeatable testing environment by reducing performance noise from CPU context switching and cache misses. Key changes: - Added 'core_affinity' crate as a dependency. - Added '--server-affinity' and '--client-affinity' CLI flags. - Implemented the logic to spawn the server task on a dedicated thread with the specified CPU affinity. - Added unit tests for argument parsing and the new affinity-spawning logic. - Updated documentation and configuration display to reflect the new options. Note: Client-side affinity is parsed but not yet implemented. AI-assisted-by: Gemini 2.5 Pro feat: Add client-side CPU affinity This commit implements client-side CPU affinity and refactors the test execution logic to support it. Key changes: - Refactored single-threaded test functions to spawn the client logic in a separate task with the specified affinity. - Client tasks now return raw latency data, which is then processed by the main task. - Updated README.md to document the new --server-affinity and --client-affinity flags. - Added documentation to the refactored functions in benchmark.rs. AI-assisted-by: Gemini 2.5 Pro fix(build): Correct task spawning and error handling This commit addresses several compilation errors and warnings that arose from recent changes to the task spawning and affinity logic. The primary issue was a 'moved value' error with the oneshot sender ('tx') used for server readiness signaling. This was resolved by refactoring the server-spawning logic to ensure the sender is only used once. This change introduced a type mismatch in the 'match' statement handling the result of 'spawn_with_affinity', which has also been corrected. Additionally, several instances of an erroneous double '??' operator on non-Result types were fixed, and unused imports were removed. AI-assisted-by: Gemini 2.5 Pro --- Cargo.lock | 34 ++ Cargo.toml | 1 + README.md | 19 +- src/benchmark.rs | 836 +++++++++++++++++++++++++---------------------- src/cli.rs | 47 +++ src/lib.rs | 2 + src/utils.rs | 132 ++++++++ 7 files changed, 672 insertions(+), 399 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e477442..71bf93d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -311,6 +311,17 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core_affinity" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a034b3a7b624016c6e13f5df875747cc25f884156aad2abd12b6c46797971342" +dependencies = [ + "libc", + "num_cpus", + "winapi", +] + [[package]] name = "crc32fast" version = "1.5.0" @@ -569,6 +580,7 @@ dependencies = [ "chrono", "clap", "colored", + "core_affinity", "criterion", "crossbeam", "hdrhistogram", @@ -1505,6 +1517,22 @@ dependencies = [ "windows", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + [[package]] name = "winapi-util" version = "0.1.9" @@ -1514,6 +1542,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows" version = "0.34.0" diff --git a/Cargo.toml b/Cargo.toml index 682c83a..a4b0073 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ tracing-appender = "0.2.3" time = ">=0.3.34, <0.3.36" # Pinned to a compatible range for MSRV 1.70 parking_lot = "0.12.4" thiserror = "2.0.16" +core_affinity = "0.8.3" [dev-dependencies] criterion = "0.5" diff --git a/README.md b/README.md index be140d5..efbb4a1 100644 --- a/README.md +++ b/README.md @@ -141,6 +141,22 @@ ipc-benchmark -m tcp --host 127.0.0.1 --port 9090 ipc-benchmark -m shm --buffer-size 16384 ``` +### CPU Affinity + +For the most stable and repeatable results, you can pin the server and client processes to specific CPU cores. This minimizes performance noise from OS scheduling, context switching, and cache misses. + +- `--server-affinity `: Pins the server process to a specific CPU core. +- `--client-affinity `: Pins the client process to a specific CPU core. + +```bash +# Pin the server to core 2 and the client to core 3 +ipc-benchmark \ + -m uds \ + -i 100000 \ + --server-affinity 2 \ + --client-affinity 3 +``` + ### Test Configuration Examples #### High-Throughput Testing @@ -563,5 +579,4 @@ Common issues and solutions: - **Single-threaded** (`-c 1`): Most accurate latency measurements - **Simulated concurrency** (`-c 2+`): Good for throughput scaling analysis -- **Shared memory**: Always single-threaded for reliability - +- **Shared memory**: Always single-threaded for reliability \ No newline at end of file diff --git a/src/benchmark.rs b/src/benchmark.rs index 94b740d..67b2d9b 100644 --- a/src/benchmark.rs +++ b/src/benchmark.rs @@ -40,6 +40,7 @@ use crate::{ ipc::{Message, MessageType, TransportConfig, TransportFactory}, metrics::{LatencyType, MetricsCollector, PerformanceMetrics}, results::BenchmarkResults, + utils::spawn_with_affinity, }; use anyhow::Result; use std::time::{Duration, Instant}; @@ -133,6 +134,12 @@ pub struct BenchmarkConfig { /// Base port number that will be modified to ensure uniqueness /// across concurrent tests. Ignored by non-network mechanisms. pub port: u16, + + /// Server CPU affinity + pub server_affinity: Option, + + /// Client CPU affinity + pub client_affinity: Option, } impl BenchmarkConfig { @@ -191,6 +198,8 @@ impl BenchmarkConfig { buffer_size: args.buffer_size, host: args.host.clone(), port: args.port, + server_affinity: args.server_affinity, + client_affinity: args.client_affinity, }) } } @@ -237,6 +246,8 @@ impl BenchmarkConfig { /// # log_file: None, /// # streaming_output_json: None, /// # streaming_output_csv: None, +/// # server_affinity: None, +/// # client_affinity: None, /// # }; /// let config = BenchmarkConfig::from_args(&args)?; /// #[cfg(unix)] @@ -400,24 +411,32 @@ impl BenchmarkRunner { let config = transport_config.clone(); let mechanism = self.mechanism; // Copy mechanism to move into closure let warmup_iterations = self.config.warmup_iterations; + let server_affinity = self.config.server_affinity; tokio::spawn(async move { - let mut transport = TransportFactory::create(&mechanism)?; - if let Err(e) = transport.start_server(&config).await { - let _ = tx.send(Err(e)); - return Err(anyhow::anyhow!("Server failed to start")); - } - - // Signal that server is ready to accept connections - let _ = tx.send(Ok(())); - debug!("Server signaled ready for warmup"); + let server_future = async move { + let mut transport = TransportFactory::create(&mechanism)?; + transport.start_server(&config).await?; + Ok(transport) + }; + + match spawn_with_affinity(server_future, server_affinity).await { + Ok(mut transport) => { + if tx.send(Ok(())).is_err() { + return; // client disconnected + } + debug!("Server signaled ready for warmup"); - // Receive warmup messages without measuring performance - for _ in 0..warmup_iterations { - let _ = transport.receive().await?; + for _ in 0..warmup_iterations { + if transport.receive().await.is_err() { + break; + } + } + let _ = transport.close().await; + } + Err(e) => { + let _ = tx.send(Err(e)); + } } - - transport.close().await?; - Ok::<(), anyhow::Error>(()) }) }; @@ -436,7 +455,7 @@ impl BenchmarkRunner { // Clean up resources client_transport.close().await?; - server_handle.await??; + server_handle.await?; debug!("Warmup completed"); Ok(()) @@ -575,11 +594,18 @@ impl BenchmarkRunner { /// /// ## Execution Flow /// - /// 1. **Server Setup**: Start server and wait for readiness - /// 2. **Client Connection**: Connect client to server - /// 3. **Message Transmission**: Send messages with precise timing - /// 4. **Metrics Collection**: Record latency for each message sent - /// 5. **Resource Cleanup**: Close connections and clean up resources + /// 1. **Server Setup**: The server is spawned in a background task, optionally pinned + /// to a specific CPU core if `server_affinity` is set. It signals readiness via a + /// `oneshot` channel. + /// 2. **Client Setup**: The client logic is encapsulated in a future, which is then + /// spawned in a separate background task. If `client_affinity` is set, this task + /// runs on a dedicated thread pinned to the specified CPU core. + /// 3. **Data Collection**: The client task collects raw latency measurements and + /// returns them in a `Vec`. + /// 4. **Metrics Processing**: The main task awaits the client's results and then + // records the latencies into the `MetricsCollector` and streams them to the + /// `ResultsManager`. + /// 5. **Cleanup**: Both client and server tasks are awaited to ensure graceful shutdown. /// /// ## Timing Methodology /// @@ -609,51 +635,57 @@ impl BenchmarkRunner { let mechanism = self.mechanism; let duration = self.config.duration; let msg_count = self.get_msg_count(); - + let server_affinity = self.config.server_affinity; tokio::spawn(async move { - let mut transport = TransportFactory::create(&mechanism)?; - if let Err(e) = transport.start_server(&config).await { - let _ = tx.send(Err(e)); - return Err(anyhow::anyhow!("Server failed to start")); - } - - // Signal that server is ready - let _ = tx.send(Ok(())); - debug!("Server signaled ready for one-way test"); + let server_future = async move { + let mut transport = TransportFactory::create(&mechanism)?; + transport.start_server(&config).await?; + Ok(transport) + }; + + match spawn_with_affinity(server_future, server_affinity).await { + Ok(mut transport) => { + if tx.send(Ok(())).is_err() { + return; // client disconnected + } + debug!("Server signaled ready for one-way test"); - let start_time = Instant::now(); - let mut received = 0; + let start_time = Instant::now(); + let mut received = 0; - // Server receive loop - adapts to duration or message count mode - loop { - // Check if we should stop based on duration or message count - if let Some(dur) = duration { - if start_time.elapsed() >= dur { - break; - } - } else if received >= msg_count { - break; - } + // Server receive loop - adapts to duration or message count mode + loop { + // Check if we should stop based on duration or message count + if let Some(dur) = duration { + if start_time.elapsed() >= dur { + break; + } + } else if received >= msg_count { + break; + } - // Try to receive with a shorter timeout to avoid hanging - match timeout(Duration::from_millis(50), transport.receive()).await { - Ok(Ok(_)) => { - received += 1; - } - Ok(Err(_)) => break, // Transport error - Err(_) => { - // Timeout - check if duration-based test is done - if duration.is_some() { - continue; // Keep waiting for duration-based test - } else { - break; // Message-count-based test with no more messages + // Try to receive with a shorter timeout to avoid hanging + match timeout(Duration::from_millis(50), transport.receive()).await { + Ok(Ok(_)) => { + received += 1; + } + Ok(Err(_)) => break, // Transport error + Err(_) => { + // Timeout - check if duration-based test is done + if duration.is_some() { + continue; // Keep waiting for duration-based test + } else { + break; // Message-count-based test with no more messages + } + } } } + let _ = transport.close().await; + } + Err(e) => { + let _ = tx.send(Err(e)); } } - - transport.close().await?; - Ok::<(), anyhow::Error>(()) }) }; @@ -661,83 +693,80 @@ impl BenchmarkRunner { rx.await??; debug!("Client received server ready signal for one-way test"); - // Connect client - client_transport.start_client(transport_config).await?; + let client_config = self.config.clone(); + let transport_config_clone = transport_config.clone(); - // Send messages and measure latency - let payload = vec![0u8; self.config.message_size]; - let start_time = Instant::now(); + let client_future = async move { + let mut latencies = Vec::new(); + // Connect client + client_transport + .start_client(&transport_config_clone) + .await?; - if let Some(duration) = self.config.duration { - // Duration-based test: loop until time expires - let mut i = 0u64; - while start_time.elapsed() < duration { - let send_time = Instant::now(); - let message = Message::new(i, payload.clone(), MessageType::OneWay); - - // Use timeout for individual sends to avoid blocking indefinitely - match tokio::time::timeout( - Duration::from_millis(50), - client_transport.send(&message), - ) - .await - { - Ok(Ok(_)) => { - let latency = send_time.elapsed(); - metrics_collector - .record_message(self.config.message_size, Some(latency))?; - - // Stream individual latency record if enabled - if let Some(ref mut manager) = results_manager { - let record = crate::results::MessageLatencyRecord::new( - i, - self.mechanism, - self.config.message_size, - crate::metrics::LatencyType::OneWay, - latency, - ); - manager.stream_latency_record(&record).await?; + // Send messages and measure latency + let payload = vec![0u8; client_config.message_size]; + let start_time = Instant::now(); + + if let Some(duration) = client_config.duration { + // Duration-based test: loop until time expires + let mut i = 0u64; + while start_time.elapsed() < duration { + let send_time = Instant::now(); + let message = Message::new(i, payload.clone(), MessageType::OneWay); + + // Use timeout for individual sends to avoid blocking indefinitely + match tokio::time::timeout( + Duration::from_millis(50), + client_transport.send(&message), + ) + .await + { + Ok(Ok(_)) => { + latencies.push(send_time.elapsed()); + i += 1; + } + Ok(Err(_)) => break, // Transport error + Err(_) => { + // Send timeout - ring buffer might be full, small delay and continue + sleep(Duration::from_millis(1)).await; + continue; } - - i += 1; - } - Ok(Err(_)) => break, // Transport error - Err(_) => { - // Send timeout - ring buffer might be full, small delay and continue - sleep(Duration::from_millis(1)).await; - continue; } } + } else { + // Message-count-based test: loop for fixed number of messages + let msg_count = client_config.msg_count.unwrap_or_default(); + + for i in 0..msg_count { + let send_time = Instant::now(); + let message = Message::new(i as u64, payload.clone(), MessageType::OneWay); + let _ = client_transport.send(&message).await?; + latencies.push(send_time.elapsed()); + } } - } else { - // Message-count-based test: loop for fixed number of messages - let msg_count = self.get_msg_count(); - for i in 0..msg_count { - let send_time = Instant::now(); - let message = Message::new(i as u64, payload.clone(), MessageType::OneWay); - let _ = client_transport.send(&message).await?; - - let latency = send_time.elapsed(); - metrics_collector.record_message(self.config.message_size, Some(latency))?; - - // Stream individual latency record if enabled - if let Some(ref mut manager) = results_manager { - let record = crate::results::MessageLatencyRecord::new( - i as u64, - self.mechanism, - self.config.message_size, - crate::metrics::LatencyType::OneWay, - latency, - ); - manager.stream_latency_record(&record).await?; - } + // Clean up resources + client_transport.close().await?; + Ok(latencies) + }; + + let latencies = spawn_with_affinity(client_future, self.config.client_affinity).await?; + + for (i, latency) in latencies.iter().enumerate() { + metrics_collector.record_message(self.config.message_size, Some(*latency))?; + if let Some(ref mut manager) = results_manager { + let record = crate::results::MessageLatencyRecord::new( + i as u64, + self.mechanism, + self.config.message_size, + crate::metrics::LatencyType::OneWay, + *latency, + ); + manager.stream_latency_record(&record).await?; } } - // Clean up resources - client_transport.close().await?; - server_handle.await??; + server_handle.await?; Ok(()) } @@ -777,60 +806,66 @@ impl BenchmarkRunner { let mechanism = self.mechanism; let duration = self.config.duration; let msg_count = self.get_msg_count(); - + let server_affinity = self.config.server_affinity; tokio::spawn(async move { - let mut transport = TransportFactory::create(&mechanism)?; - if let Err(e) = transport.start_server(&config).await { - let _ = tx.send(Err(e)); - return Err(anyhow::anyhow!("Server failed to start")); - } - - // Signal that server is ready - let _ = tx.send(Ok(())); - debug!("Server signaled ready for round-trip test"); - - let start_time = Instant::now(); - let mut received = 0; - - // Server request-response loop - loop { - // Check if we should stop based on duration or iterations - if let Some(dur) = duration { - if start_time.elapsed() >= dur { - break; + let server_future = async move { + let mut transport = TransportFactory::create(&mechanism)?; + transport.start_server(&config).await?; + Ok(transport) + }; + + match spawn_with_affinity(server_future, server_affinity).await { + Ok(mut transport) => { + if tx.send(Ok(())).is_err() { + return; // client disconnected } - } else if received >= msg_count { - break; - } + debug!("Server signaled ready for round-trip test"); - // Try to receive with a shorter timeout - match timeout(Duration::from_millis(50), transport.receive()).await { - Ok(Ok(request)) => { - received += 1; - // Echo back with modified ID to indicate processing - let response = Message::new( - request.id + 1000000, - request.payload, - MessageType::Response, - ); - if (transport.send(&response).await).is_err() { - break; // Client disconnected + let start_time = Instant::now(); + let mut received = 0; + + // Server request-response loop + loop { + // Check if we should stop based on duration or iterations + if let Some(dur) = duration { + if start_time.elapsed() >= dur { + break; + } + } else if received >= msg_count { + break; } - } - Ok(Err(_)) => break, // Transport error - Err(_) => { - // Timeout - check if duration-based test is done - if duration.is_some() { - continue; // Keep waiting for duration-based test - } else { - break; // Message-count-based test with no more messages + + // Try to receive with a shorter timeout + match timeout(Duration::from_millis(50), transport.receive()).await { + Ok(Ok(request)) => { + received += 1; + // Echo back with modified ID to indicate processing + let response = Message::new( + request.id + 1000000, + request.payload, + MessageType::Response, + ); + if (transport.send(&response).await).is_err() { + break; // Client disconnected + } + } + Ok(Err(_)) => break, // Transport error + Err(_) => { + // Timeout - check if duration-based test is done + if duration.is_some() { + continue; // Keep waiting for duration-based test + } else { + break; // Message-count-based test with no more messages + } + } } } + let _ = transport.close().await; + } + Err(e) => { + let _ = tx.send(Err(e)); } } - - transport.close().await?; - Ok::<(), anyhow::Error>(()) }) }; @@ -838,102 +873,101 @@ impl BenchmarkRunner { rx.await??; debug!("Client received server ready signal for round-trip test"); - // Connect client - client_transport.start_client(transport_config).await?; + let client_config = self.config.clone(); + let transport_config_clone = transport_config.clone(); - // Send messages and measure round-trip latency - let payload = vec![0u8; self.config.message_size]; - let start_time = Instant::now(); + let client_future = async move { + let mut latencies = Vec::new(); + // Connect client + client_transport + .start_client(&transport_config_clone) + .await?; - if let Some(duration) = self.config.duration { - // Duration-based test: loop until time expires - let mut i = 0u64; - while start_time.elapsed() < duration { - let send_time = Instant::now(); - let message = Message::new(i, payload.clone(), MessageType::Request); - - // Use timeout for sends and receives to avoid blocking indefinitely - match tokio::time::timeout( - Duration::from_millis(50), - client_transport.send(&message), - ) - .await - { - Ok(Ok(_)) => { - // Send succeeded - increment counter immediately to ensure unique message IDs - i += 1; - - // Try to receive response with timeout - match tokio::time::timeout( - Duration::from_millis(50), - client_transport.receive(), - ) - .await - { - Ok(Ok(_)) => { - let latency = send_time.elapsed(); - metrics_collector - .record_message(self.config.message_size, Some(latency))?; - - // Stream individual latency record if enabled - if let Some(ref mut manager) = results_manager { - let record = crate::results::MessageLatencyRecord::new( - i - 1, // Use the actual message ID that was sent - self.mechanism, - self.config.message_size, - crate::metrics::LatencyType::RoundTrip, - latency, - ); - manager.stream_latency_record(&record).await?; + // Send messages and measure round-trip latency + let payload = vec![0u8; client_config.message_size]; + let start_time = Instant::now(); + + if let Some(duration) = client_config.duration { + // Duration-based test: loop until time expires + let mut i = 0u64; + while start_time.elapsed() < duration { + let send_time = Instant::now(); + let message = Message::new(i, payload.clone(), MessageType::Request); + + // Use timeout for sends and receives to avoid blocking indefinitely + match tokio::time::timeout( + Duration::from_millis(50), + client_transport.send(&message), + ) + .await + { + Ok(Ok(_)) => { + // Send succeeded - increment counter immediately to ensure unique message IDs + i += 1; + + // Try to receive response with timeout + match tokio::time::timeout( + Duration::from_millis(50), + client_transport.receive(), + ) + .await + { + Ok(Ok(_)) => { + latencies.push(send_time.elapsed()); + } + Ok(Err(_)) => break, // Transport error + Err(_) => { + // Receive timeout, but send was successful so we continue + // Message ID was already incremented after successful send + sleep(Duration::from_millis(1)).await; + continue; } - } - Ok(Err(_)) => break, // Transport error - Err(_) => { - // Receive timeout, but send was successful so we continue - // Message ID was already incremented after successful send - sleep(Duration::from_millis(1)).await; - continue; } } - } - Ok(Err(_)) => break, // Transport error - Err(_) => { - // Send timeout - ring buffer might be full, small delay and continue - sleep(Duration::from_millis(1)).await; - continue; + Ok(Err(_)) => break, // Transport error + Err(_) => { + // Send timeout - ring buffer might be full, small delay and continue + sleep(Duration::from_millis(1)).await; + continue; + } } } - } - } else { - // Message-count-based test: loop for fixed number of messages - let msg_count = self.get_msg_count(); + } else { + // Message-count-based test: loop for fixed number of messages + let msg_count = client_config.msg_count.unwrap_or_default(); - for i in 0..msg_count { - let send_time = Instant::now(); - let message = Message::new(i as u64, payload.clone(), MessageType::Request); - let _ = client_transport.send(&message).await?; - - let _ = client_transport.receive().await?; - let latency = send_time.elapsed(); - metrics_collector.record_message(self.config.message_size, Some(latency))?; - - // Stream individual latency record if enabled - if let Some(ref mut manager) = results_manager { - let record = crate::results::MessageLatencyRecord::new( - i as u64, - self.mechanism, - self.config.message_size, - crate::metrics::LatencyType::RoundTrip, - latency, - ); - manager.stream_latency_record(&record).await?; + for i in 0..msg_count { + let send_time = Instant::now(); + let message = Message::new(i as u64, payload.clone(), MessageType::Request); + let _ = client_transport.send(&message).await?; + + let _ = client_transport.receive().await?; + latencies.push(send_time.elapsed()); } } + + // Clean up resources + client_transport.close().await?; + Ok(latencies) + }; + + let latencies = spawn_with_affinity(client_future, self.config.client_affinity).await?; + + for (i, latency) in latencies.iter().enumerate() { + metrics_collector.record_message(self.config.message_size, Some(*latency))?; + if let Some(ref mut manager) = results_manager { + let record = crate::results::MessageLatencyRecord::new( + i as u64, + self.mechanism, + self.config.message_size, + crate::metrics::LatencyType::RoundTrip, + *latency, + ); + manager.stream_latency_record(&record).await?; + } } - // Clean up resources - client_transport.close().await?; - server_handle.await??; + server_handle.await?; Ok(()) } @@ -1151,58 +1185,64 @@ impl BenchmarkRunner { let mechanism = self.mechanism; let duration = self.config.duration; let msg_count = self.get_msg_count(); - + let server_affinity = self.config.server_affinity; tokio::spawn(async move { - let mut transport = TransportFactory::create(&mechanism)?; - if let Err(e) = transport.start_server(&config).await { - let _ = tx.send(Err(e)); - return Err(anyhow::anyhow!("Server failed to start")); - } - - // Signal that server is ready - let _ = tx.send(Ok(())); - debug!("Server signaled ready for combined test"); + let server_future = async move { + let mut transport = TransportFactory::create(&mechanism)?; + transport.start_server(&config).await?; + Ok(transport) + }; + + match spawn_with_affinity(server_future, server_affinity).await { + Ok(mut transport) => { + if tx.send(Ok(())).is_err() { + return; // client disconnected + } + debug!("Server signaled ready for combined test"); - let start_time = Instant::now(); - let mut processed = 0; + let start_time = Instant::now(); + let mut processed = 0; - // Server receive and response loop - handles both one-way tracking and round-trip responses - loop { - // Check if we should stop based on duration or iterations - if let Some(dur) = duration { - if start_time.elapsed() >= dur { - break; - } - } else if processed >= msg_count { - break; - } + // Server receive and response loop - handles both one-way tracking and round-trip responses + loop { + // Check if we should stop based on duration or iterations + if let Some(dur) = duration { + if start_time.elapsed() >= dur { + break; + } + } else if processed >= msg_count { + break; + } - // Receive message and send response for round-trip timing - match timeout(Duration::from_millis(50), transport.receive()).await { - Ok(Ok(msg)) => { - // Send immediate response for round-trip timing - let response = Message::new( - msg.id + 1000000, // Offset to distinguish response - msg.payload.clone(), - MessageType::Response, - ); - let _ = transport.send(&response).await; - processed += 1; - } - Ok(Err(_)) => break, // Transport error - Err(_) => { - // Timeout - check if duration-based test is done - if duration.is_some() { - continue; // Keep waiting for duration-based test - } else { - break; // Message-count-based test with no more messages + // Receive message and send response for round-trip timing + match timeout(Duration::from_millis(50), transport.receive()).await { + Ok(Ok(msg)) => { + // Send immediate response for round-trip timing + let response = Message::new( + msg.id + 1000000, // Offset to distinguish response + msg.payload.clone(), + MessageType::Response, + ); + let _ = transport.send(&response).await; + processed += 1; + } + Ok(Err(_)) => break, // Transport error + Err(_) => { + // Timeout - check if duration-based test is done + if duration.is_some() { + continue; // Keep waiting for duration-based test + } else { + break; // Message-count-based test with no more messages + } + } } } + let _ = transport.close().await; + } + Err(e) => { + let _ = tx.send(Err(e)); } } - - transport.close().await?; - Ok::<(), anyhow::Error>(()) }) }; @@ -1210,116 +1250,112 @@ impl BenchmarkRunner { rx.await??; debug!("Client received server ready signal for combined test"); - // Connect client - client_transport.start_client(transport_config).await?; + let client_config = self.config.clone(); + let transport_config_clone = transport_config.clone(); - // Create payload for messages - let payload = vec![0u8; self.config.message_size]; - let start_time = Instant::now(); + let client_future = async move { + let mut one_way_latencies = Vec::new(); + let mut round_trip_latencies = Vec::new(); + // Connect client + client_transport + .start_client(&transport_config_clone) + .await?; - if let Some(duration) = self.config.duration { - // Duration-based test: loop until time expires - let mut i = 0u64; - while start_time.elapsed() < duration { - let send_start = Instant::now(); - let message = Message::new(i, payload.clone(), MessageType::Request); - - // Send message and measure one-way latency - match tokio::time::timeout( - Duration::from_millis(50), - client_transport.send(&message), - ) - .await - { - Ok(Ok(_)) => { - let one_way_latency = send_start.elapsed(); - - // Try to receive response and measure round-trip latency - match tokio::time::timeout( - Duration::from_millis(50), - client_transport.receive(), - ) - .await - { - Ok(Ok(_)) => { - let round_trip_latency = send_start.elapsed(); - - // Record both metrics - one_way_metrics.record_message( - self.config.message_size, - Some(one_way_latency), - )?; - round_trip_metrics.record_message( - self.config.message_size, - Some(round_trip_latency), - )?; - - // Stream combined record if enabled - if let Some(ref mut manager) = results_manager { - let record = crate::results::MessageLatencyRecord::new_combined( - i, - self.mechanism, - self.config.message_size, - one_way_latency, - round_trip_latency, - ); - manager.write_streaming_record_direct(&record).await?; + // Create payload for messages + let payload = vec![0u8; client_config.message_size]; + let start_time = Instant::now(); + + if let Some(duration) = client_config.duration { + // Duration-based test: loop until time expires + let mut i = 0u64; + while start_time.elapsed() < duration { + let send_start = Instant::now(); + let message = Message::new(i, payload.clone(), MessageType::Request); + + // Send message and measure one-way latency + match tokio::time::timeout( + Duration::from_millis(50), + client_transport.send(&message), + ) + .await + { + Ok(Ok(_)) => { + let one_way_latency = send_start.elapsed(); + + // Try to receive response and measure round-trip latency + match tokio::time::timeout( + Duration::from_millis(50), + client_transport.receive(), + ) + .await + { + Ok(Ok(_)) => { + let round_trip_latency = send_start.elapsed(); + one_way_latencies.push(one_way_latency); + round_trip_latencies.push(round_trip_latency); + i += 1; + } + Ok(Err(_)) => break, // Transport error + Err(_) => { + // Receive timeout - continue without incrementing + sleep(Duration::from_millis(1)).await; + continue; } - - i += 1; - } - Ok(Err(_)) => break, // Transport error - Err(_) => { - // Receive timeout - continue without incrementing - sleep(Duration::from_millis(1)).await; - continue; } } - } - Ok(Err(_)) => break, // Send transport error - Err(_) => { - // Send timeout - continue - sleep(Duration::from_millis(1)).await; - continue; + Ok(Err(_)) => break, // Send transport error + Err(_) => { + // Send timeout - continue + sleep(Duration::from_millis(1)).await; + continue; + } } } - } - } else { - // Message-count-based test: loop for fixed number of messages - let msg_count = self.get_msg_count(); + } else { + // Message-count-based test: loop for fixed number of messages + let msg_count = client_config.msg_count.unwrap_or_default(); - for i in 0..msg_count { - let send_start = Instant::now(); - let message = Message::new(i as u64, payload.clone(), MessageType::Request); - - let _ = client_transport.send(&message).await?; - let one_way_latency = send_start.elapsed(); - - let _ = client_transport.receive().await?; - let round_trip_latency = send_start.elapsed(); - - // Record both metrics - one_way_metrics.record_message(self.config.message_size, Some(one_way_latency))?; - round_trip_metrics - .record_message(self.config.message_size, Some(round_trip_latency))?; - - // Stream combined record if enabled - if let Some(ref mut manager) = results_manager { - let record = crate::results::MessageLatencyRecord::new_combined( - i as u64, - self.mechanism, - self.config.message_size, - one_way_latency, - round_trip_latency, - ); - manager.write_streaming_record_direct(&record).await?; + for i in 0..msg_count { + let send_start = Instant::now(); + let message = Message::new(i as u64, payload.clone(), MessageType::Request); + + let _ = client_transport.send(&message).await?; + let one_way_latency = send_start.elapsed(); + + let _ = client_transport.receive().await?; + let round_trip_latency = send_start.elapsed(); + one_way_latencies.push(one_way_latency); + round_trip_latencies.push(round_trip_latency); } } + + // Clean up resources + client_transport.close().await?; + Ok((one_way_latencies, round_trip_latencies)) + }; + + let (one_way_latencies, round_trip_latencies) = + spawn_with_affinity(client_future, self.config.client_affinity).await?; + + for (i, latency) in one_way_latencies.iter().enumerate() { + one_way_metrics.record_message(self.config.message_size, Some(*latency))?; + if let Some(ref mut manager) = results_manager { + let record = crate::results::MessageLatencyRecord::new_combined( + i as u64, + self.mechanism, + self.config.message_size, + *latency, + round_trip_latencies[i], + ); + manager.write_streaming_record_direct(&record).await?; + } } - // Clean up resources - client_transport.close().await?; - server_handle.await??; + for latency in round_trip_latencies { + round_trip_metrics.record_message(self.config.message_size, Some(latency))?; + } + + server_handle.await?; Ok(()) } @@ -1487,6 +1523,8 @@ mod tests { buffer_size: Some(8192), host: "127.0.0.1".to_string(), port: 8080, + server_affinity: None, + client_affinity: None, }; assert_eq!(config.message_size, 1024); @@ -1513,6 +1551,8 @@ mod tests { buffer_size: Some(8192), host: "127.0.0.1".to_string(), port: 8080, + server_affinity: None, + client_affinity: None, }; let runner = BenchmarkRunner::new(config, IpcMechanism::UnixDomainSocket); @@ -1548,6 +1588,8 @@ mod tests { buffer_size: None, host: "127.0.0.1".to_string(), port: 8080, + server_affinity: None, + client_affinity: None, }; // Scenario 1: User-provided buffer size is always respected. diff --git a/src/cli.rs b/src/cli.rs index ed9f80b..6a1ece7 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -144,6 +144,20 @@ pub struct Args { #[arg(short = 'c', long, default_value_t = crate::defaults::CONCURRENCY, help_heading = CONCURRENCY)] pub concurrency: usize, + /// Server CPU affinity. + /// + /// Specifies the CPU core to which the server process should be pinned. + /// Pinning can reduce cache misses and context switching, improving performance. + #[arg(long, help_heading = CONCURRENCY)] + pub server_affinity: Option, + + /// Client CPU affinity. + /// + /// Specifies the CPU core to which the client process should be pinned. + /// Pinning can reduce cache misses and context switching, improving performance. + #[arg(long, help_heading = CONCURRENCY)] + pub client_affinity: Option, + /// Path to the final JSON output file. If used without a path, defaults to 'benchmark_results.json'. /// /// If the flag is not used, no final JSON file will be written, but a summary @@ -462,6 +476,12 @@ pub struct BenchmarkConfiguration { /// Port number for network-based mechanisms pub port: u16, + + /// Server CPU affinity + pub server_affinity: Option, + + /// Client CPU affinity + pub client_affinity: Option, } impl From<&Args> for BenchmarkConfiguration { @@ -505,6 +525,8 @@ impl From<&Args> for BenchmarkConfiguration { buffer_size: args.buffer_size, host: args.host.clone(), port: args.port, + server_affinity: args.server_affinity, + client_affinity: args.client_affinity, } } } @@ -644,6 +666,17 @@ impl fmt::Display for Args { writeln!(f, " Warmup Iterations: {}", self.warmup_iterations)?; writeln!(f, " Test Types: {}", test_types)?; + + // Display CPU affinity settings + let server_affinity_str = self + .server_affinity + .map_or("Not set".to_string(), |c| c.to_string()); + let client_affinity_str = self + .client_affinity + .map_or("Not set".to_string(), |c| c.to_string()); + writeln!(f, " Server Affinity: {}", server_affinity_str)?; + writeln!(f, " Client Affinity: {}", client_affinity_str)?; + // Conditionally display the path for the main JSON output file. if let Some(output_dest) = self.output_file.as_ref() { writeln!(f, " Output File: {}", output_dest.display())?; @@ -737,4 +770,18 @@ mod tests { all_mechanisms ); } + + /// Test parsing of CPU affinity arguments + #[test] + fn test_parse_affinity_args() { + let args = Args::parse_from(&[ + "ipc-benchmark", + "--server-affinity", + "2", + "--client-affinity", + "3", + ]); + assert_eq!(args.server_affinity, Some(2)); + assert_eq!(args.client_affinity, Some(3)); + } } diff --git a/src/lib.rs b/src/lib.rs index 23e1879..5a3cb69 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,6 +45,8 @@ //! buffer_size: Some(8192), //! host: "127.0.0.1".to_string(), //! port: 8080, +//! server_affinity: None, +//! client_affinity: None, //! }; //! //! let runner = BenchmarkRunner::new(config, IpcMechanism::UnixDomainSocket); diff --git a/src/utils.rs b/src/utils.rs index 2b4b183..af2748a 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -21,8 +21,69 @@ //! - **Performance**: Minimal overhead for frequently called functions //! - **Extensibility**: Easy to add new formatters and validators +use anyhow::{anyhow, Result}; +use std::future::Future; use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::warn; + +/// Spawns a Tokio task on a new thread with a specific CPU affinity. +/// +/// If `core_id` is `Some`, this function will create a new thread, pin it to the +/// specified CPU core, and then spawn the given future on a single-threaded Tokio +/// runtime on that thread. +/// +/// If `core_id` is `None`, it will spawn the future on the default Tokio runtime +/// without any specific affinity. +/// +/// # Arguments +/// +/// * `future` - The future to execute. +/// * `core_id` - The ID of the CPU core to pin the thread to. +/// +/// # Returns +/// +/// A `JoinHandle` for the spawned task. +pub async fn spawn_with_affinity(future: F, core_id: Option) -> Result +where + F: Future> + Send + 'static, + T: Send + 'static, +{ + match core_id { + Some(id) => { + let handle = tokio::task::spawn_blocking(move || { + let core_ids = core_affinity::get_core_ids().ok_or_else(|| { + anyhow!("Failed to get core IDs, is this a supported platform?") + })?; + + if core_ids.is_empty() { + return Err(anyhow!("No available CPU cores found.")); + } + + let target_core = core_ids.get(id).ok_or_else(|| { + anyhow!( + "Invalid core ID: {}. System has {} available cores (valid IDs are 0 to {}).", + id, + core_ids.len(), + core_ids.len() - 1 + ) + })?; + + if !core_affinity::set_for_current(*target_core) { + warn!("Failed to set affinity for core ID: {}", id); + } + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + rt.block_on(future) + }); + handle.await? + } + None => future.await, + } +} + /// Get current timestamp as nanoseconds since Unix epoch /// /// Provides high-precision timing information for performance measurement @@ -55,3 +116,74 @@ pub fn current_timestamp_ns() -> u64 { .unwrap_or_default() .as_nanos() as u64 } + +#[cfg(test)] +mod tests { + use super::*; + use std::thread; + + /// Test that a future can be spawned with affinity and its result retrieved. + #[tokio::test] + async fn test_spawn_with_affinity_retrieves_result() { + let future = async { Ok("Hello from thread") }; + // Spawn on core 0 if available, otherwise fallback to no affinity + let core_id = if core_affinity::get_core_ids().is_some() { + Some(0) + } else { + None + }; + let result = spawn_with_affinity(future, core_id).await.unwrap(); + assert_eq!(result, "Hello from thread"); + } + + /// Test that spawning with a specific core ID runs the future on a different thread. + #[tokio::test] + async fn test_spawn_with_affinity_uses_new_thread() { + let main_thread_id = thread::current().id(); + let future = async move { Ok(thread::current().id()) }; + + // Spawn on core 0 if available, otherwise fallback to no affinity. + // This test is most meaningful when a core is available. + let core_id = if core_affinity::get_core_ids().is_some() { + Some(0) + } else { + None + }; + + if core_id.is_some() { + let future_thread_id = spawn_with_affinity(future, core_id).await.unwrap(); + assert_ne!( + main_thread_id, future_thread_id, + "Future should have run on a different thread" + ); + } else { + // If we can't test with affinity, we can't guarantee a new thread. + // Tokio's default scheduler might run it on the same thread. + // We'll just log a warning that the main part of the test was skipped. + warn!("Could not get core IDs, skipping new-thread-id check in test_spawn_with_affinity_uses_new_thread"); + } + } + + /// Test that an invalid core ID returns a descriptive error message. + #[tokio::test] + async fn test_spawn_with_affinity_invalid_core_id() { + let future = async { Ok(()) }; + // Use an impossibly high core ID + let core_id = Some(9999); + + // This test should only run on systems where we can actually get core IDs + if let Some(cores) = core_affinity::get_core_ids() { + let result = spawn_with_affinity(future, core_id).await; + assert!(result.is_err()); + let error_message = result.err().unwrap().to_string(); + let expected_message = format!( + "Invalid core ID: 9999. System has {} available cores (valid IDs are 0 to {}).", + cores.len(), + cores.len() - 1 + ); + assert_eq!(error_message, expected_message); + } else { + warn!("Could not get core IDs, skipping invalid-core-id check in test_spawn_with_affinity_invalid_core_id"); + } + } +} From 6b76363271df4e3c78fb05f165ed4299c4e450ad Mon Sep 17 00:00:00 2001 From: Dustin Black Date: Wed, 10 Sep 2025 17:33:15 +0200 Subject: [PATCH 2/5] refactor(cli): Move affinity arguments to dedicated help section Improves the clarity of the command-line help text by moving the '--server-affinity' and '--client-affinity' flags from the 'Concurrency' heading to a more appropriate 'Affinity' heading. AI-assisted-by: Gemini 2.5 Pro --- src/cli.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 6a1ece7..128cd64 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -81,6 +81,7 @@ fn styles() -> Styles { // Define constants for help headings to ensure consistency. const TIMING: &str = "Timing"; const CONCURRENCY: &str = "Concurrency"; +const AFFINITY: &str = "Affinity"; const OUTPUT_AND_LOGGING: &str = "Output and Logging"; const ADVANCED: &str = "Advanced"; @@ -148,14 +149,14 @@ pub struct Args { /// /// Specifies the CPU core to which the server process should be pinned. /// Pinning can reduce cache misses and context switching, improving performance. - #[arg(long, help_heading = CONCURRENCY)] + #[arg(long, help_heading = AFFINITY)] pub server_affinity: Option, /// Client CPU affinity. /// /// Specifies the CPU core to which the client process should be pinned. /// Pinning can reduce cache misses and context switching, improving performance. - #[arg(long, help_heading = CONCURRENCY)] + #[arg(long, help_heading = AFFINITY)] pub client_affinity: Option, /// Path to the final JSON output file. If used without a path, defaults to 'benchmark_results.json'. From 3251c3fa110e657b46ab175646c6747a094b7f41 Mon Sep 17 00:00:00 2001 From: Dustin Black Date: Wed, 10 Sep 2025 17:35:19 +0200 Subject: [PATCH 3/5] correct error reported by clippy --- src/cli.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cli.rs b/src/cli.rs index 128cd64..61e7157 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -775,7 +775,7 @@ mod tests { /// Test parsing of CPU affinity arguments #[test] fn test_parse_affinity_args() { - let args = Args::parse_from(&[ + let args = Args::parse_from([ "ipc-benchmark", "--server-affinity", "2", From 22932b9cf391f5568bd1befdab1a02b9d0da17e3 Mon Sep 17 00:00:00 2001 From: Dustin Black Date: Wed, 10 Sep 2025 17:44:25 +0200 Subject: [PATCH 4/5] feat(cli): Display CPU affinity settings in benchmark summary Adds the configured server and client CPU affinity values to the benchmark configuration summary that is printed at the start of a run. This provides immediate visual confirmation that the affinity settings have been applied. AI-assisted-by: Gemini 2.5 Pro --- src/benchmark.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/benchmark.rs b/src/benchmark.rs index 67b2d9b..8e0ed90 100644 --- a/src/benchmark.rs +++ b/src/benchmark.rs @@ -323,6 +323,16 @@ impl BenchmarkRunner { } else { info!(" Message Count: {}", self.get_msg_count()); } + let server_affinity_str = self + .config + .server_affinity + .map_or("Not set".to_string(), |c| c.to_string()); + let client_affinity_str = self + .config + .client_affinity + .map_or("Not set".to_string(), |c| c.to_string()); + info!(" Server Affinity: {}", server_affinity_str); + info!(" Client Affinity: {}", client_affinity_str); info!("-----------------------------------------------------------------"); // Initialize results structure with test configuration From 3cda34109fea8b57d78bb32dd9611ce50a63154c Mon Sep 17 00:00:00 2001 From: Dustin Black Date: Wed, 10 Sep 2025 18:08:42 +0200 Subject: [PATCH 5/5] fix(affinity): Correct intermittent core detection failure Resolves a race condition where `core_affinity::get_core_ids()` would intermittently report only a single available core, causing affinity validation to fail. The fix ensures that the list of available cores is retrieved from the main thread (which has an unrestricted view) before entering the `spawn_blocking` context, where the thread's affinity might already be constrained. This makes the core ID validation robust against the race condition. AI-assisted-by: Gemini 2.5 Pro --- src/utils.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/utils.rs b/src/utils.rs index af2748a..343ba17 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -51,11 +51,11 @@ where { match core_id { Some(id) => { - let handle = tokio::task::spawn_blocking(move || { - let core_ids = core_affinity::get_core_ids().ok_or_else(|| { - anyhow!("Failed to get core IDs, is this a supported platform?") - })?; + // Get the core IDs from the current thread, which is assumed to have an unrestricted view. + let core_ids = core_affinity::get_core_ids() + .ok_or_else(|| anyhow!("Failed to get core IDs, is this a supported platform?"))?; + let handle = tokio::task::spawn_blocking(move || { if core_ids.is_empty() { return Err(anyhow!("No available CPU cores found.")); }