From e293f23136e525932b9f761632791dd47993dac9 Mon Sep 17 00:00:00 2001 From: Erika Hunhoff Date: Wed, 18 Oct 2023 13:11:28 +0000 Subject: [PATCH 01/10] prepare for fxmark bench --- kernel/run.py | 2 ++ kernel/tests/s11_rackscale_benchmarks.rs | 7 ++++--- lib/rpc/src/transport/smoltcp.rs | 4 ++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/kernel/run.py b/kernel/run.py index 7f9dae27b..5ac802152 100644 --- a/kernel/run.py +++ b/kernel/run.py @@ -745,7 +745,9 @@ def configure_network(args): for _, ncfg in zip(range(0, args.workers), NETWORK_CONFIG): sudo[tunctl[['-t', ncfg, '-u', user, '-g', group]]]() sudo[ip[['link', 'set', ncfg, 'up']]](retcode=(0, 1)) + sudo[ip[['link', 'set', ncfg, 'txqueuelen', 65536]]]() sudo[brctl[['addif', 'br0', ncfg]]]() + sudo[ip[['link', 'set', 'br0', 'txqueuelen', 65536]]]() sudo[ip[['link', 'set', 'br0', 'up']]](retcode=(0, 1)) diff --git a/kernel/tests/s11_rackscale_benchmarks.rs b/kernel/tests/s11_rackscale_benchmarks.rs index 9021981f0..b0f8221b7 100644 --- a/kernel/tests/s11_rackscale_benchmarks.rs +++ b/kernel/tests/s11_rackscale_benchmarks.rs @@ -27,7 +27,6 @@ fn s11_rackscale_shmem_fxmark_benchmark() { } #[test] -#[ignore] #[cfg(not(feature = "baremetal"))] fn s11_rackscale_ethernet_fxmark_benchmark() { rackscale_fxmark_benchmark(RackscaleTransport::Ethernet); @@ -134,14 +133,16 @@ fn rackscale_fxmark_benchmark(transport: RackscaleTransport) { fn timeout_fn(num_cores: usize) -> u64 { 180_000 + 5_000 * num_cores as u64 } + fn mem_fn(num_cores: usize, is_smoke: bool) -> usize { if is_smoke { 8192 } else { // Memory must also be divisible by number of nodes, which could be 1, 2, 3, or 4 - 2048 * (((num_cores + 3 - 1) / 3) * 3) + 4096 * (((((num_cores + 1) / 2) + 3 - 1) / 3) * 3) } } + let bench = RackscaleBench { test, cmd_fn, @@ -722,7 +723,7 @@ fn rackscale_memcached_dcm(transport: RackscaleTransport, dcm_config: Option { iface: Arc>>, From 2ab2732bd275920920434f0c0f312092db8dcdce Mon Sep 17 00:00:00 2001 From: Erika Hunhoff Date: Thu, 19 Oct 2023 09:23:20 -0700 Subject: [PATCH 02/10] One shmem transport per client core --- .../src/arch/x86_64/rackscale/client_state.rs | 16 ++- .../src/arch/x86_64/rackscale/controller.rs | 34 ++++-- .../arch/x86_64/rackscale/fileops/close.rs | 12 +- .../arch/x86_64/rackscale/fileops/delete.rs | 12 +- .../arch/x86_64/rackscale/fileops/getinfo.rs | 12 +- .../arch/x86_64/rackscale/fileops/mkdir.rs | 12 +- .../src/arch/x86_64/rackscale/fileops/open.rs | 12 +- .../arch/x86_64/rackscale/fileops/rename.rs | 12 +- .../src/arch/x86_64/rackscale/fileops/rw.rs | 15 +-- .../arch/x86_64/rackscale/get_shmem_frames.rs | 9 +- .../x86_64/rackscale/get_shmem_structure.rs | 3 +- .../rackscale/processops/allocate_physical.rs | 12 +- .../arch/x86_64/rackscale/processops/print.rs | 12 +- .../rackscale/processops/release_core.rs | 12 +- .../rackscale/processops/release_physical.rs | 12 +- .../rackscale/processops/request_core.rs | 12 +- .../systemops/get_hardware_threads.rs | 12 +- kernel/src/transport/shmem.rs | 107 +++++++++++------- kernel/tests/s06_rackscale_tests.rs | 4 + kernel/tests/s11_rackscale_benchmarks.rs | 1 + 20 files changed, 207 insertions(+), 126 deletions(-) diff --git a/kernel/src/arch/x86_64/rackscale/client_state.rs b/kernel/src/arch/x86_64/rackscale/client_state.rs index daf91843a..466fc2ca8 100644 --- a/kernel/src/arch/x86_64/rackscale/client_state.rs +++ b/kernel/src/arch/x86_64/rackscale/client_state.rs @@ -25,11 +25,12 @@ use crate::error::{KError, KResult}; use crate::memory::backends::MemManager; use crate::memory::shmem_affinity::{local_shmem_affinity, mid_to_shmem_affinity}; use crate::process::MAX_PROCESSES; +use crate::transport::shmem::NUM_SHMEM_TRANSPORTS; /// This is the state the client records about itself pub(crate) struct ClientState { /// The RPC client used to communicate with the controller - pub(crate) rpc_client: Arc>, + pub(crate) rpc_clients: Arc, { NUM_SHMEM_TRANSPORTS as usize }>>, /// Used to store shmem affinity base pages pub(crate) affinity_base_pages: Arc>, MAX_MACHINES>>, @@ -41,7 +42,9 @@ pub(crate) struct ClientState { impl ClientState { pub(crate) fn new() -> ClientState { // Create network stack and instantiate RPC Client - let rpc_client = if crate::CMDLINE + // TODO(rackscale, hack): only allow shmem for now + /* + let rpc_clients = if crate::CMDLINE .get() .map_or(false, |c| c.transport == Transport::Ethernet) { @@ -60,6 +63,13 @@ impl ClientState { .expect("Failed to initialize shmem RPC"), )) }; + */ + let clients = + crate::transport::shmem::init_shmem_rpc(true).expect("Failed to initialize shmem RPC"); + let mut rpc_clients = ArrayVec::new(); + for client in clients.into_iter() { + rpc_clients.push(Mutex::new(client)); + } let mut per_process_base_pages = ArrayVec::new(); for _i in 0..MAX_PROCESSES { @@ -76,7 +86,7 @@ impl ClientState { log::debug!("Finished initializing client state"); ClientState { - rpc_client, + rpc_clients: Arc::new(rpc_clients), affinity_base_pages: Arc::new(affinity_base_pages), per_process_base_pages: Arc::new(per_process_base_pages), } diff --git a/kernel/src/arch/x86_64/rackscale/controller.rs b/kernel/src/arch/x86_64/rackscale/controller.rs index 23339f0ef..d14c9d837 100644 --- a/kernel/src/arch/x86_64/rackscale/controller.rs +++ b/kernel/src/arch/x86_64/rackscale/controller.rs @@ -18,7 +18,7 @@ use crate::arch::rackscale::dcm::{ use crate::arch::MAX_MACHINES; use crate::cmdline::Transport; use crate::transport::ethernet::ETHERNET_IFACE; -use crate::transport::shmem::create_shmem_transport; +use crate::transport::shmem::{create_shmem_transport, NUM_SHMEM_TRANSPORTS}; use super::*; @@ -32,6 +32,8 @@ pub(crate) fn run() { let mid = *crate::environment::CORE_ID; // Initialize one server per controller thread + // TODO(rackscale, hack): only support shmem for now + /* let mut server = if crate::CMDLINE .get() .map_or(false, |c| c.transport == Transport::Ethernet) @@ -51,26 +53,34 @@ pub(crate) fn run() { .get() .map_or(false, |c| c.transport == Transport::Shmem) { - let transport = Box::new( - create_shmem_transport(mid.try_into().unwrap()) - .expect("Failed to create shmem transport"), - ); + */ + let transports = + create_shmem_transport(mid.try_into().unwrap()).expect("Failed to create shmem transport"); - let mut server = Server::new(transport); + let mut servers: ArrayVec, { NUM_SHMEM_TRANSPORTS as usize }> = ArrayVec::new(); + for transport in transports.into_iter() { + let mut server = Server::new(Box::new(transport)); register_rpcs(&mut server); - server + servers.push(server); + } + + /* } else { unreachable!("No supported transport layer specified in kernel argument"); }; + */ ClientReadyCount.fetch_add(1, Ordering::SeqCst); // Wait for all clients to connect before fulfilling any RPCs. while !DCMServerReady.load(Ordering::SeqCst) {} - server + // TODO(rackscale, hack): only register core 0 + //for s_index in 0..servers.len() { + servers[0] .add_client(&CLIENT_REGISTRAR) .expect("Failed to accept client"); + //} ClientReadyCount.fetch_add(1, Ordering::SeqCst); @@ -114,9 +124,11 @@ pub(crate) fn run() { // Start running the RPC server log::info!("Starting RPC server for client {:?}!", mid); loop { - let _handled = server - .try_handle() - .expect("Controller failed to handle RPC"); + for s_index in 0..servers.len() { + let _handled = servers[s_index] + .try_handle() + .expect("Controller failed to handle RPC"); + } } } diff --git a/kernel/src/arch/x86_64/rackscale/fileops/close.rs b/kernel/src/arch/x86_64/rackscale/fileops/close.rs index 76a8424e4..75d9cfcfa 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/close.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/close.rs @@ -34,11 +34,13 @@ pub(crate) fn rpc_close(pid: usize, fd: FileDescriptor) -> KResult<(u64, u64)> { let mut res_data = [0u8; core::mem::size_of::>()]; // Call Close() RPC - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::Close as RPCType, - &[&req_data], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::Close as RPCType, + &[&req_data], + &mut [&mut res_data], + )?; // Decode and return result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/fileops/delete.rs b/kernel/src/arch/x86_64/rackscale/fileops/delete.rs index 4b4157f65..cf5fa52f5 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/delete.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/delete.rs @@ -36,11 +36,13 @@ pub(crate) fn rpc_delete(pid: usize, pathname: String) -> KResult<(u64, u64)> { let mut res_data = [0u8; core::mem::size_of::>()]; // Call RPC - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::Delete as RPCType, - &[&req_data, &pathname.as_bytes()], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::Delete as RPCType, + &[&req_data, &pathname.as_bytes()], + &mut [&mut res_data], + )?; // Decode result - return result if decoding successful if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/fileops/getinfo.rs b/kernel/src/arch/x86_64/rackscale/fileops/getinfo.rs index 89ff5451c..33a83864d 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/getinfo.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/getinfo.rs @@ -34,11 +34,13 @@ pub(crate) fn rpc_getinfo + Debug>(pid: usize, name: P) -> KResul // Construct result buffer and call RPC let mut res_data = [0u8; core::mem::size_of::>()]; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::GetInfo as RPCType, - &[&req_data, name.as_ref()], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::GetInfo as RPCType, + &[&req_data, name.as_ref()], + &mut [&mut res_data], + )?; // Decode and return the result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/fileops/mkdir.rs b/kernel/src/arch/x86_64/rackscale/fileops/mkdir.rs index a64802c18..c8879a3c2 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/mkdir.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/mkdir.rs @@ -42,11 +42,13 @@ pub(crate) fn rpc_mkdir + Debug>( let mut res_data = [0u8; core::mem::size_of::>()]; // Call RPC - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::MkDir as RPCType, - &[&req_data, pathname.as_ref()], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::MkDir as RPCType, + &[&req_data, pathname.as_ref()], + &mut [&mut res_data], + )?; // Parse and return result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/fileops/open.rs b/kernel/src/arch/x86_64/rackscale/fileops/open.rs index f263acf22..cb21c281c 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/open.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/open.rs @@ -54,11 +54,13 @@ fn rpc_open_create + Debug>( let mut res_data = [0u8; core::mem::size_of::>()]; // Call the RPC - CLIENT_STATE.rpc_client.lock().call( - rpc_type, - &[&req_data, pathname.as_ref()], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + rpc_type, + &[&req_data, pathname.as_ref()], + &mut [&mut res_data], + )?; // Decode and return the result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/fileops/rename.rs b/kernel/src/arch/x86_64/rackscale/fileops/rename.rs index 55cf81faf..ccfbc81f9 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/rename.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/rename.rs @@ -44,11 +44,13 @@ pub(crate) fn rpc_rename + Debug>( let mut res_data = [0u8; core::mem::size_of::>()]; // Call the RPC - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::FileRename as RPCType, - &[&req_data, oldname.as_ref(), newname.as_ref()], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::FileRename as RPCType, + &[&req_data, oldname.as_ref(), newname.as_ref()], + &mut [&mut res_data], + )?; // Parse and return the result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/fileops/rw.rs b/kernel/src/arch/x86_64/rackscale/fileops/rw.rs index a45ac4a97..81eb3a62b 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/rw.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/rw.rs @@ -69,8 +69,7 @@ pub(crate) fn rpc_writeat( } else { KernelRpc::WriteAt as RPCType }; - CLIENT_STATE - .rpc_client + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] .lock() .call(rpc_type, &[&req_data, &data], &mut [&mut res_data])?; @@ -129,11 +128,13 @@ pub(crate) fn rpc_readat( KernelRpc::ReadAt as RPCType }; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::ReadAt as RPCType, - &[&req_data], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::ReadAt as RPCType, + &[&req_data], + &mut [&mut res_data], + )?; // Decode result, if successful, return result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/get_shmem_frames.rs b/kernel/src/arch/x86_64/rackscale/get_shmem_frames.rs index 205d9d803..9dae7c098 100644 --- a/kernel/src/arch/x86_64/rackscale/get_shmem_frames.rs +++ b/kernel/src/arch/x86_64/rackscale/get_shmem_frames.rs @@ -40,7 +40,11 @@ unsafe_abomonate!(ShmemRegion: base, affinity); // This isn't truly a syscall pub(crate) fn rpc_get_shmem_frames(pid: Option, num_frames: usize) -> KResult> { assert!(num_frames > 0); - log::debug!("GetShmemFrames({:?})", num_frames); + log::debug!( + "GetShmemFrames({:?}) core={:?}", + num_frames, + kpi::system::mtid_from_gtid(*crate::environment::CORE_ID) + ); let mid = if pid.is_none() { Some(*crate::environment::MACHINE_ID) @@ -66,8 +70,7 @@ pub(crate) fn rpc_get_shmem_frames(pid: Option, num_frames: usize) -> KResu for i in 0..max_res_size { res_data.push(0u8); } - CLIENT_STATE - .rpc_client + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] .lock() .call( KernelRpc::GetShmemFrames as RPCType, diff --git a/kernel/src/arch/x86_64/rackscale/get_shmem_structure.rs b/kernel/src/arch/x86_64/rackscale/get_shmem_structure.rs index 3c2a5f997..3a4cccf63 100644 --- a/kernel/src/arch/x86_64/rackscale/get_shmem_structure.rs +++ b/kernel/src/arch/x86_64/rackscale/get_shmem_structure.rs @@ -63,8 +63,7 @@ pub(crate) fn rpc_get_shmem_structure( // Make buffer max size of MAX_PROCESS (for NrProcLogs), 1 (for NrLog) let mut res_data = [0u8; core::mem::size_of::<[u64; MAX_PROCESSES]>()]; - CLIENT_STATE - .rpc_client + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] .lock() .call( KernelRpc::GetShmemStructure as RPCType, diff --git a/kernel/src/arch/x86_64/rackscale/processops/allocate_physical.rs b/kernel/src/arch/x86_64/rackscale/processops/allocate_physical.rs index 6c2dce75e..eb4b5f2cb 100644 --- a/kernel/src/arch/x86_64/rackscale/processops/allocate_physical.rs +++ b/kernel/src/arch/x86_64/rackscale/processops/allocate_physical.rs @@ -45,11 +45,13 @@ pub(crate) fn rpc_allocate_physical(pid: Pid, size: u64, affinity: u64) -> KResu // Create result buffer let mut res_data = [0u8; core::mem::size_of::>()]; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::AllocatePhysical as RPCType, - &[&req_data], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::AllocatePhysical as RPCType, + &[&req_data], + &mut [&mut res_data], + )?; // Decode result, return result if decoded successfully if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/processops/print.rs b/kernel/src/arch/x86_64/rackscale/processops/print.rs index 283aacf9b..3ceafab85 100644 --- a/kernel/src/arch/x86_64/rackscale/processops/print.rs +++ b/kernel/src/arch/x86_64/rackscale/processops/print.rs @@ -35,11 +35,13 @@ pub(crate) fn rpc_log(msg: String) -> KResult<(u64, u64)> { // Construct result buffer and call RPC let mut res_data = [0u8; core::mem::size_of::>()]; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::Log as RPCType, - &[&req_data, print_str.as_ref()], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::Log as RPCType, + &[&req_data, print_str.as_ref()], + &mut [&mut res_data], + )?; // Decode and return the result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/processops/release_core.rs b/kernel/src/arch/x86_64/rackscale/processops/release_core.rs index e912d0540..586f7b52a 100644 --- a/kernel/src/arch/x86_64/rackscale/processops/release_core.rs +++ b/kernel/src/arch/x86_64/rackscale/processops/release_core.rs @@ -38,11 +38,13 @@ pub(crate) fn rpc_release_core(pid: Pid, gtid: ThreadId) -> KResult<(u64, u64)> // Create result buffer let mut res_data = [0u8; core::mem::size_of::>()]; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::ReleaseCore as RPCType, - &[&req_data], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::ReleaseCore as RPCType, + &[&req_data], + &mut [&mut res_data], + )?; // Decode result, return result if decoded successfully if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/processops/release_physical.rs b/kernel/src/arch/x86_64/rackscale/processops/release_physical.rs index 1bc76e7d4..b067125d9 100644 --- a/kernel/src/arch/x86_64/rackscale/processops/release_physical.rs +++ b/kernel/src/arch/x86_64/rackscale/processops/release_physical.rs @@ -54,11 +54,13 @@ pub(crate) fn rpc_release_physical(pid: Pid, frame_id: u64) -> KResult<(u64, u64 // Create result buffer let mut res_data = [0u8; core::mem::size_of::>()]; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::ReleasePhysical as RPCType, - &[&req_data], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::ReleasePhysical as RPCType, + &[&req_data], + &mut [&mut res_data], + )?; // Decode result, return result if decoded successfully if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/processops/request_core.rs b/kernel/src/arch/x86_64/rackscale/processops/request_core.rs index bda7182ef..8c83465b1 100644 --- a/kernel/src/arch/x86_64/rackscale/processops/request_core.rs +++ b/kernel/src/arch/x86_64/rackscale/processops/request_core.rs @@ -38,11 +38,13 @@ pub(crate) fn rpc_request_core(pid: Pid, new_pid: bool, entry_point: u64) -> KRe // Construct result buffer and call RPC let mut res_data = [0u8; core::mem::size_of::>()]; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::RequestCore as RPCType, - &[&req_data], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::RequestCore as RPCType, + &[&req_data], + &mut [&mut res_data], + )?; // Decode and return the result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/systemops/get_hardware_threads.rs b/kernel/src/arch/x86_64/rackscale/systemops/get_hardware_threads.rs index 8cbcb3aec..6bc5577b2 100644 --- a/kernel/src/arch/x86_64/rackscale/systemops/get_hardware_threads.rs +++ b/kernel/src/arch/x86_64/rackscale/systemops/get_hardware_threads.rs @@ -31,11 +31,13 @@ pub(crate) fn rpc_get_hardware_threads( let mut res_data = [0u8; core::mem::size_of::>() + 5 * 4096]; // Call GetHardwareThreads() RPC - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::GetHardwareThreads as RPCType, - &[&[]], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::GetHardwareThreads as RPCType, + &[&[]], + &mut [&mut res_data], + )?; // Decode and return result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/transport/shmem.rs b/kernel/src/transport/shmem.rs index b518a4551..f2b5c6126 100644 --- a/kernel/src/transport/shmem.rs +++ b/kernel/src/transport/shmem.rs @@ -307,13 +307,20 @@ const SHMEM_QUEUE_SIZE: usize = 32; // The total size of two queues(sender and reciever) should be less than the transport size. #[cfg(feature = "rpc")] -const_assert!(2 * SHMEM_QUEUE_SIZE * QUEUE_ENTRY_SIZE <= SHMEM_TRANSPORT_SIZE as usize); +const_assert!(2 * SHMEM_QUEUE_SIZE * QUEUE_ENTRY_SIZE <= SINGLE_SHMEM_TRANSPORT_SIZE as usize); #[cfg(feature = "rpc")] -pub(crate) const SHMEM_TRANSPORT_SIZE: u64 = 2 * 1024 * 1024; +pub(crate) const SINGLE_SHMEM_TRANSPORT_SIZE: u64 = 2 * 1024 * 1024; +// TODO(rackscale, hack): max cores at 24 for now #[cfg(feature = "rpc")] -pub(crate) fn create_shmem_transport(mid: MachineId) -> KResult> { +pub(crate) const NUM_SHMEM_TRANSPORTS: u64 = 24; + +#[cfg(feature = "rpc")] +pub(crate) const SHMEM_TRANSPORT_SIZE: u64 = SINGLE_SHMEM_TRANSPORT_SIZE * NUM_SHMEM_TRANSPORTS; + +#[cfg(feature = "rpc")] +pub(crate) fn create_shmem_transport(mid: MachineId) -> KResult>> { use rpc::transport::shmem::allocator::ShmemAllocator; use rpc::transport::shmem::Queue; use rpc::transport::shmem::{Receiver, Sender}; @@ -326,57 +333,77 @@ pub(crate) fn create_shmem_transport(mid: MachineId) -> KResult= SHMEM_TRANSPORT_SIZE); - let allocator = ShmemAllocator::new(base_addr.as_u64(), SHMEM_TRANSPORT_SIZE); - match crate::CMDLINE.get().map_or(Mode::Native, |c| c.mode) { - Mode::Controller => { - let server_to_client_queue = - Arc::new(Queue::with_capacity_in(true, SHMEM_QUEUE_SIZE, &allocator).unwrap()); - let client_to_server_queue = - Arc::new(Queue::with_capacity_in(true, SHMEM_QUEUE_SIZE, &allocator).unwrap()); - let server_sender = Sender::with_shared_queue(server_to_client_queue.clone()); - let server_receiver = Receiver::with_shared_queue(client_to_server_queue.clone()); - log::info!( - "Controller: Created shared-memory transport for machine {}! size={:?}, base={:?}", + let mut transports = Vec::try_with_capacity(NUM_SHMEM_TRANSPORTS as usize)?; + + for transport_offset in 0..NUM_SHMEM_TRANSPORTS { + let allocator = ShmemAllocator::new( + base_addr.as_u64() + transport_offset * SINGLE_SHMEM_TRANSPORT_SIZE, + SINGLE_SHMEM_TRANSPORT_SIZE, + ); + match crate::CMDLINE.get().map_or(Mode::Native, |c| c.mode) { + Mode::Controller => { + let server_to_client_queue = + Arc::new(Queue::with_capacity_in(true, SHMEM_QUEUE_SIZE, &allocator).unwrap()); + let client_to_server_queue = + Arc::new(Queue::with_capacity_in(true, SHMEM_QUEUE_SIZE, &allocator).unwrap()); + let server_sender = Sender::with_shared_queue(server_to_client_queue.clone()); + let server_receiver = Receiver::with_shared_queue(client_to_server_queue.clone()); + log::info!( + "Controller: Created shared-memory transport for machine {}! size={:?}, base={:x}", mid, - SHMEM_TRANSPORT_SIZE, - base_addr - ); - Ok(ShmemTransport::new(server_receiver, server_sender)) - } - Mode::Client => { - let server_to_client_queue = - Arc::new(Queue::with_capacity_in(false, SHMEM_QUEUE_SIZE, &allocator).unwrap()); - let client_to_server_queue = - Arc::new(Queue::with_capacity_in(false, SHMEM_QUEUE_SIZE, &allocator).unwrap()); - let client_receiver = Receiver::with_shared_queue(server_to_client_queue.clone()); - let client_sender = Sender::with_shared_queue(client_to_server_queue.clone()); - log::info!( - "Client: Created shared-memory transport! size={:?}, base={:?}", - SHMEM_TRANSPORT_SIZE, - base_addr + SINGLE_SHMEM_TRANSPORT_SIZE, + base_addr.as_u64() + transport_offset * SINGLE_SHMEM_TRANSPORT_SIZE ); - Ok(ShmemTransport::new(client_receiver, client_sender)) - } - Mode::Native => { - log::error!("Native mode not supported for shmem"); - Err(KError::InvalidNativeMode) + transports.push(ShmemTransport::new(server_receiver, server_sender)); + } + Mode::Client => { + let server_to_client_queue = + Arc::new(Queue::with_capacity_in(false, SHMEM_QUEUE_SIZE, &allocator).unwrap()); + let client_to_server_queue = + Arc::new(Queue::with_capacity_in(false, SHMEM_QUEUE_SIZE, &allocator).unwrap()); + let client_receiver = Receiver::with_shared_queue(server_to_client_queue.clone()); + let client_sender = Sender::with_shared_queue(client_to_server_queue.clone()); + log::info!( + "Client: Created shared-memory transport! size={:?}, base={:x}", + SINGLE_SHMEM_TRANSPORT_SIZE, + base_addr.as_u64() + transport_offset * SINGLE_SHMEM_TRANSPORT_SIZE + ); + transports.push(ShmemTransport::new(client_receiver, client_sender)); + } + Mode::Native => { + log::error!("Native mode not supported for shmem"); + return Err(KError::InvalidNativeMode); + } } } + Ok(transports) } #[cfg(feature = "rpc")] pub(crate) fn init_shmem_rpc( send_client_data: bool, // This field is used to indicate if init_client() should send ClientRegistrationRequest -) -> KResult { +) -> KResult> { use crate::arch::rackscale::registration::initialize_client; use rpc::client::Client; // Set up the transport - let transport = Box::try_new(create_shmem_transport(*crate::environment::MACHINE_ID)?)?; - + let transports = create_shmem_transport(*crate::environment::MACHINE_ID)?; + let mut clients = Vec::try_with_capacity(transports.len())?; + let mut first = true; + for transport in transports.into_iter() { + let client = Client::new(Box::new(transport)); + + let client = if first { + first = false; + initialize_client(client, send_client_data).expect("Failed to initialize client") + } else { + //initialize_client(client, false).expect("Failed to initialize client") + client + }; + clients.push(client); + } // Create the client - let client = Client::new(transport); - initialize_client(client, send_client_data) + Ok(clients) } #[cfg(feature = "rackscale")] diff --git a/kernel/tests/s06_rackscale_tests.rs b/kernel/tests/s06_rackscale_tests.rs index 0954febdb..8274a6914 100644 --- a/kernel/tests/s06_rackscale_tests.rs +++ b/kernel/tests/s06_rackscale_tests.rs @@ -22,6 +22,7 @@ fn s06_rackscale_shmem_userspace_smoke_test() { rackscale_userspace_smoke_test(RackscaleTransport::Shmem); } +#[ignore] #[cfg(not(feature = "baremetal"))] #[test] fn s06_rackscale_ethernet_userspace_smoke_test() { @@ -132,6 +133,7 @@ fn s06_rackscale_shmem_fs_test() { rackscale_fs_test(RackscaleTransport::Shmem); } +#[ignore] #[cfg(not(feature = "baremetal"))] #[test] fn s06_rackscale_ethernet_fs_test() { @@ -237,6 +239,7 @@ fn s06_rackscale_shmem_userspace_multicore_test() { rackscale_userspace_multicore_test(RackscaleTransport::Shmem); } +#[ignore] #[cfg(not(feature = "baremetal"))] #[test] fn s06_rackscale_ethernet_userspace_multicore_test() { @@ -278,6 +281,7 @@ fn rackscale_userspace_multicore_test(transport: RackscaleTransport) { test_run.run_rackscale(); } +#[ignore] #[cfg(not(feature = "baremetal"))] #[test] fn s06_rackscale_ethernet_userspace_multicore_multiclient() { diff --git a/kernel/tests/s11_rackscale_benchmarks.rs b/kernel/tests/s11_rackscale_benchmarks.rs index b0f8221b7..c19ca9e81 100644 --- a/kernel/tests/s11_rackscale_benchmarks.rs +++ b/kernel/tests/s11_rackscale_benchmarks.rs @@ -26,6 +26,7 @@ fn s11_rackscale_shmem_fxmark_benchmark() { rackscale_fxmark_benchmark(RackscaleTransport::Shmem); } +#[ignore] #[test] #[cfg(not(feature = "baremetal"))] fn s11_rackscale_ethernet_fxmark_benchmark() { From a57bbe80d4b37149c0db57afb1ff0e044260fab0 Mon Sep 17 00:00:00 2001 From: zmckevitt Date: Tue, 21 Nov 2023 01:15:16 -0500 Subject: [PATCH 03/10] Added flag for initial connection to shmem client registration request --- kernel/src/arch/x86_64/rackscale/controller.rs | 11 +++++------ kernel/src/arch/x86_64/rackscale/registration.rs | 14 ++++++++++++-- kernel/src/transport/ethernet.rs | 2 +- kernel/src/transport/shmem.rs | 7 +++---- 4 files changed, 21 insertions(+), 13 deletions(-) diff --git a/kernel/src/arch/x86_64/rackscale/controller.rs b/kernel/src/arch/x86_64/rackscale/controller.rs index d14c9d837..da206b587 100644 --- a/kernel/src/arch/x86_64/rackscale/controller.rs +++ b/kernel/src/arch/x86_64/rackscale/controller.rs @@ -75,12 +75,11 @@ pub(crate) fn run() { // Wait for all clients to connect before fulfilling any RPCs. while !DCMServerReady.load(Ordering::SeqCst) {} - // TODO(rackscale, hack): only register core 0 - //for s_index in 0..servers.len() { - servers[0] - .add_client(&CLIENT_REGISTRAR) - .expect("Failed to accept client"); - //} + for s_index in 0..servers.len() { + servers[s_index] + .add_client(&CLIENT_REGISTRAR) + .expect("Failed to accept client"); + } ClientReadyCount.fetch_add(1, Ordering::SeqCst); diff --git a/kernel/src/arch/x86_64/rackscale/registration.rs b/kernel/src/arch/x86_64/rackscale/registration.rs index d2dbf390a..5262d7e2c 100644 --- a/kernel/src/arch/x86_64/rackscale/registration.rs +++ b/kernel/src/arch/x86_64/rackscale/registration.rs @@ -27,23 +27,26 @@ pub(crate) struct ClientRegistrationRequest { pub(crate) shmem_region_base: u64, pub(crate) shmem_region_size: usize, pub(crate) num_cores: u64, + pub(crate) is_first: bool, } unsafe_abomonate!( ClientRegistrationRequest: mid, shmem_region_base, shmem_region_size, - num_cores + num_cores, + is_first ); // Called by client to register client with the controller pub(crate) fn initialize_client( mut client: Client, send_client_data: bool, // This field is used to indicate if init_client() should send ClientRegistrationRequest + is_shmem: bool, ) -> KResult { // Don't modify this line without modifying testutils/rackscale_runner.rs log::warn!("CLIENT READY"); - if send_client_data { + if is_shmem { // Fetch system information let shmem_region = get_affinity_shmem(); let hwthreads = atopology::MACHINE_TOPOLOGY.threads(); @@ -63,12 +66,15 @@ pub(crate) fn initialize_client( assert!(client_threads.len() == num_threads); log::debug!("client_threads: {:?}", client_threads); + let is_first = if send_client_data { true } else { false }; + // Construct client registration request let req = ClientRegistrationRequest { mid: *crate::environment::MACHINE_ID, shmem_region_base: shmem_region.base.as_u64(), shmem_region_size: shmem_region.size, num_cores: num_threads as u64, + is_first: is_first, }; // Serialize and send the registration request to the controller @@ -103,6 +109,10 @@ pub(crate) fn register_client(hdr: &mut RPCHeader, payload: &mut [u8]) -> Result req.shmem_region_base + req.shmem_region_size as u64 ); + if !req.is_first { + return Ok(()); + } + // Parse out hw_threads let hw_threads = match unsafe { decode::>(hwthreads_data) } { Some((hw_threads, [])) => hw_threads, diff --git a/kernel/src/transport/ethernet.rs b/kernel/src/transport/ethernet.rs index 93658dc7d..15cdd0723 100644 --- a/kernel/src/transport/ethernet.rs +++ b/kernel/src/transport/ethernet.rs @@ -108,5 +108,5 @@ pub(crate) fn init_ethernet_rpc( .map_err(|err| KError::RackscaleRPCError { err })?, ); let mut client = Client::new(rpc_transport); - initialize_client(client, send_client_data) + initialize_client(client, send_client_data, false) } diff --git a/kernel/src/transport/shmem.rs b/kernel/src/transport/shmem.rs index f2b5c6126..5063a5947 100644 --- a/kernel/src/transport/shmem.rs +++ b/kernel/src/transport/shmem.rs @@ -381,7 +381,7 @@ pub(crate) fn create_shmem_transport(mid: MachineId) -> KResult KResult> { use crate::arch::rackscale::registration::initialize_client; use rpc::client::Client; @@ -395,10 +395,9 @@ pub(crate) fn init_shmem_rpc( let client = if first { first = false; - initialize_client(client, send_client_data).expect("Failed to initialize client") + initialize_client(client, true, true).expect("Failed to initialize client") } else { - //initialize_client(client, false).expect("Failed to initialize client") - client + initialize_client(client, false, true).expect("Failed to initialize client") }; clients.push(client); } From 01a710151ca05c83b04356ebfa0bb429aff4e5c0 Mon Sep 17 00:00:00 2001 From: Erika Hunhoff Date: Tue, 21 Nov 2023 16:11:05 +0000 Subject: [PATCH 04/10] Remove commented code --- .../src/arch/x86_64/rackscale/client_state.rs | 23 ------------------- .../src/arch/x86_64/rackscale/controller.rs | 23 ------------------- 2 files changed, 46 deletions(-) diff --git a/kernel/src/arch/x86_64/rackscale/client_state.rs b/kernel/src/arch/x86_64/rackscale/client_state.rs index 466fc2ca8..763c8312e 100644 --- a/kernel/src/arch/x86_64/rackscale/client_state.rs +++ b/kernel/src/arch/x86_64/rackscale/client_state.rs @@ -41,29 +41,6 @@ pub(crate) struct ClientState { impl ClientState { pub(crate) fn new() -> ClientState { - // Create network stack and instantiate RPC Client - // TODO(rackscale, hack): only allow shmem for now - /* - let rpc_clients = if crate::CMDLINE - .get() - .map_or(false, |c| c.transport == Transport::Ethernet) - { - Arc::new(Mutex::new( - crate::transport::ethernet::init_ethernet_rpc( - smoltcp::wire::IpAddress::v4(172, 31, 0, 11), - CONTROLLER_PORT_BASE + (*crate::environment::MACHINE_ID as u16 - 1), - true, - ) - .expect("Failed to initialize ethernet RPC"), - )) - } else { - // Default is Shmem, even if transport unspecified - Arc::new(Mutex::new( - crate::transport::shmem::init_shmem_rpc(true) - .expect("Failed to initialize shmem RPC"), - )) - }; - */ let clients = crate::transport::shmem::init_shmem_rpc(true).expect("Failed to initialize shmem RPC"); let mut rpc_clients = ArrayVec::new(); diff --git a/kernel/src/arch/x86_64/rackscale/controller.rs b/kernel/src/arch/x86_64/rackscale/controller.rs index da206b587..026188f24 100644 --- a/kernel/src/arch/x86_64/rackscale/controller.rs +++ b/kernel/src/arch/x86_64/rackscale/controller.rs @@ -31,29 +31,6 @@ static DCMServerReady: AtomicBool = AtomicBool::new(false); pub(crate) fn run() { let mid = *crate::environment::CORE_ID; - // Initialize one server per controller thread - // TODO(rackscale, hack): only support shmem for now - /* - let mut server = if crate::CMDLINE - .get() - .map_or(false, |c| c.transport == Transport::Ethernet) - { - let transport = Box::new( - TCPTransport::new( - None, - CONTROLLER_PORT_BASE + mid as u16 - 1, - Arc::clone(ÐERNET_IFACE), - ) - .expect("Failed to create TCP transport"), - ); - let mut server = Server::new(transport); - register_rpcs(&mut server); - server - } else if crate::CMDLINE - .get() - .map_or(false, |c| c.transport == Transport::Shmem) - { - */ let transports = create_shmem_transport(mid.try_into().unwrap()).expect("Failed to create shmem transport"); From d4c3a326ecb0227676a9441ad7db4e368ac83a0a Mon Sep 17 00:00:00 2001 From: Erika Hunhoff Date: Tue, 21 Nov 2023 16:11:20 +0000 Subject: [PATCH 05/10] Re-enable ethernet tests --- kernel/tests/s06_rackscale_tests.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/kernel/tests/s06_rackscale_tests.rs b/kernel/tests/s06_rackscale_tests.rs index 8274a6914..0954febdb 100644 --- a/kernel/tests/s06_rackscale_tests.rs +++ b/kernel/tests/s06_rackscale_tests.rs @@ -22,7 +22,6 @@ fn s06_rackscale_shmem_userspace_smoke_test() { rackscale_userspace_smoke_test(RackscaleTransport::Shmem); } -#[ignore] #[cfg(not(feature = "baremetal"))] #[test] fn s06_rackscale_ethernet_userspace_smoke_test() { @@ -133,7 +132,6 @@ fn s06_rackscale_shmem_fs_test() { rackscale_fs_test(RackscaleTransport::Shmem); } -#[ignore] #[cfg(not(feature = "baremetal"))] #[test] fn s06_rackscale_ethernet_fs_test() { @@ -239,7 +237,6 @@ fn s06_rackscale_shmem_userspace_multicore_test() { rackscale_userspace_multicore_test(RackscaleTransport::Shmem); } -#[ignore] #[cfg(not(feature = "baremetal"))] #[test] fn s06_rackscale_ethernet_userspace_multicore_test() { @@ -281,7 +278,6 @@ fn rackscale_userspace_multicore_test(transport: RackscaleTransport) { test_run.run_rackscale(); } -#[ignore] #[cfg(not(feature = "baremetal"))] #[test] fn s06_rackscale_ethernet_userspace_multicore_multiclient() { From 8f301ef180b3e7dcac8e40d0366ebdb7a38eba7e Mon Sep 17 00:00:00 2001 From: zmckevitt Date: Tue, 21 Nov 2023 16:26:05 -0500 Subject: [PATCH 06/10] remove unused argument from shmem initialization function --- kernel/src/arch/x86_64/rackscale/client_state.rs | 2 +- kernel/src/transport/shmem.rs | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/kernel/src/arch/x86_64/rackscale/client_state.rs b/kernel/src/arch/x86_64/rackscale/client_state.rs index 763c8312e..8229be060 100644 --- a/kernel/src/arch/x86_64/rackscale/client_state.rs +++ b/kernel/src/arch/x86_64/rackscale/client_state.rs @@ -42,7 +42,7 @@ pub(crate) struct ClientState { impl ClientState { pub(crate) fn new() -> ClientState { let clients = - crate::transport::shmem::init_shmem_rpc(true).expect("Failed to initialize shmem RPC"); + crate::transport::shmem::init_shmem_rpc().expect("Failed to initialize shmem RPC"); let mut rpc_clients = ArrayVec::new(); for client in clients.into_iter() { rpc_clients.push(Mutex::new(client)); diff --git a/kernel/src/transport/shmem.rs b/kernel/src/transport/shmem.rs index 5063a5947..b86351fd4 100644 --- a/kernel/src/transport/shmem.rs +++ b/kernel/src/transport/shmem.rs @@ -380,9 +380,7 @@ pub(crate) fn create_shmem_transport(mid: MachineId) -> KResult KResult> { +pub(crate) fn init_shmem_rpc() -> KResult> { use crate::arch::rackscale::registration::initialize_client; use rpc::client::Client; From 22da29b43bc66e3d35bdba269696388054b217fa Mon Sep 17 00:00:00 2001 From: zmckevitt Date: Wed, 22 Nov 2023 00:50:37 -0500 Subject: [PATCH 07/10] Preliminary code for multi-server rpc over ethernet --- .../src/arch/x86_64/rackscale/controller.rs | 65 +++++++++++++------ kernel/src/arch/x86_64/rackscale/dcm/mod.rs | 6 +- kernel/src/transport/ethernet.rs | 18 +++-- 3 files changed, 65 insertions(+), 24 deletions(-) diff --git a/kernel/src/arch/x86_64/rackscale/controller.rs b/kernel/src/arch/x86_64/rackscale/controller.rs index 026188f24..b26468f01 100644 --- a/kernel/src/arch/x86_64/rackscale/controller.rs +++ b/kernel/src/arch/x86_64/rackscale/controller.rs @@ -27,36 +27,63 @@ pub(crate) const CONTROLLER_PORT_BASE: u16 = 6970; static ClientReadyCount: AtomicU64 = AtomicU64::new(0); static DCMServerReady: AtomicBool = AtomicBool::new(false); +// Used for port allocation ranges for rpc servers +const MAX_CORES_PER_CLIENT: u16 = 24; + /// Controller main method pub(crate) fn run() { let mid = *crate::environment::CORE_ID; - let transports = - create_shmem_transport(mid.try_into().unwrap()).expect("Failed to create shmem transport"); - + // TODO: still dependent on NUM_SHMEM_TRANSPORTS, ensure it works for eth too let mut servers: ArrayVec, { NUM_SHMEM_TRANSPORTS as usize }> = ArrayVec::new(); - for transport in transports.into_iter() { - let mut server = Server::new(Box::new(transport)); + + if crate::CMDLINE + .get() + .map_or(false, |c| c.transport == Transport::Ethernet) + { + let transport = Box::new( + TCPTransport::new( + None, + CONTROLLER_PORT_BASE + (mid as u16 - 1) * MAX_CORES_PER_CLIENT, + Arc::clone(ÐERNET_IFACE), + ) + .expect("Failed to create TCP transport"), + ); + let mut server = Server::new(transport); register_rpcs(&mut server); servers.push(server); - } + ClientReadyCount.fetch_add(1, Ordering::SeqCst); + while !DCMServerReady.load(Ordering::SeqCst) {} + servers[0] + .add_client(&CLIENT_REGISTRAR) + .expect("Failed to accept client"); + } else if crate::CMDLINE + .get() + .map_or(false, |c| c.transport == Transport::Shmem) + { + let transports = create_shmem_transport(mid.try_into().unwrap()) + .expect("Failed to create shmem transport"); + + // let mut servers: ArrayVec, { NUM_SHMEM_TRANSPORTS as usize }> = ArrayVec::new(); + for transport in transports.into_iter() { + let mut server = Server::new(Box::new(transport)); + register_rpcs(&mut server); + servers.push(server); + } + + ClientReadyCount.fetch_add(1, Ordering::SeqCst); - /* + // Wait for all clients to connect before fulfilling any RPCs. + while !DCMServerReady.load(Ordering::SeqCst) {} + + for s_index in 0..servers.len() { + servers[s_index] + .add_client(&CLIENT_REGISTRAR) + .expect("Failed to accept client"); + } } else { unreachable!("No supported transport layer specified in kernel argument"); }; - */ - - ClientReadyCount.fetch_add(1, Ordering::SeqCst); - - // Wait for all clients to connect before fulfilling any RPCs. - while !DCMServerReady.load(Ordering::SeqCst) {} - - for s_index in 0..servers.len() { - servers[s_index] - .add_client(&CLIENT_REGISTRAR) - .expect("Failed to accept client"); - } ClientReadyCount.fetch_add(1, Ordering::SeqCst); diff --git a/kernel/src/arch/x86_64/rackscale/dcm/mod.rs b/kernel/src/arch/x86_64/rackscale/dcm/mod.rs index f8c004c89..daba36978 100644 --- a/kernel/src/arch/x86_64/rackscale/dcm/mod.rs +++ b/kernel/src/arch/x86_64/rackscale/dcm/mod.rs @@ -121,6 +121,10 @@ unsafe_abomonate!(DCMOps); lazy_static! { pub(crate) static ref DCM_CLIENT: Arc> = Arc::new(Mutex::new( - init_ethernet_rpc(IpAddress::v4(172, 31, 0, 20), DCM_CLIENT_PORT, false).unwrap(), + init_ethernet_rpc(IpAddress::v4(172, 31, 0, 20), DCM_CLIENT_PORT, 1) + .unwrap() + .into_iter() + .nth(0) + .unwrap(), )); } diff --git a/kernel/src/transport/ethernet.rs b/kernel/src/transport/ethernet.rs index 15cdd0723..651788e6c 100644 --- a/kernel/src/transport/ethernet.rs +++ b/kernel/src/transport/ethernet.rs @@ -95,18 +95,28 @@ pub(crate) fn init_network<'a>() -> KResult #[allow(unused)] pub(crate) fn init_ethernet_rpc( server_ip: smoltcp::wire::IpAddress, - server_port: u16, - send_client_data: bool, // This field is used to indicate if init_client() should send ClientRegistrationRequest -) -> KResult { + server_port_base: u16, + num_cores: u16, +) -> KResult> { use crate::arch::rackscale::registration::initialize_client; use alloc::boxed::Box; use rpc::client::Client; use rpc::transport::TCPTransport; + let mut clients: Vec = Vec::new(); + + let offset = 0; + let server_port = server_port_base + offset; + let rpc_transport = Box::new( TCPTransport::new(Some(server_ip), server_port, Arc::clone(ÐERNET_IFACE)) .map_err(|err| KError::RackscaleRPCError { err })?, ); let mut client = Client::new(rpc_transport); - initialize_client(client, send_client_data, false) + + // DCM sets send_client_data (second param) false + client = initialize_client(client, false, false).expect("Failed to initialize client"); + clients.push(client); + + Ok(clients) } From 13012e754c0ce894c08cd0c3e8f8391c92bfdeba Mon Sep 17 00:00:00 2001 From: zmckevitt Date: Wed, 22 Nov 2023 20:22:37 -0700 Subject: [PATCH 08/10] Updated ethernet RPC initialization on client and controller, synchronization still required --- .../src/arch/x86_64/rackscale/client_state.rs | 22 ++++++++-- .../src/arch/x86_64/rackscale/controller.rs | 43 +++++++++++++++++- kernel/src/arch/x86_64/rackscale/dcm/mod.rs | 2 +- .../src/arch/x86_64/rackscale/registration.rs | 12 ++++- kernel/src/transport/ethernet.rs | 44 ++++++++++++++----- kernel/src/transport/shmem.rs | 4 +- 6 files changed, 105 insertions(+), 22 deletions(-) diff --git a/kernel/src/arch/x86_64/rackscale/client_state.rs b/kernel/src/arch/x86_64/rackscale/client_state.rs index 8229be060..a4cef78fc 100644 --- a/kernel/src/arch/x86_64/rackscale/client_state.rs +++ b/kernel/src/arch/x86_64/rackscale/client_state.rs @@ -16,7 +16,7 @@ use rpc::client::Client; use rpc::rpc::RPCError; use crate::arch::kcb::try_per_core_mem; -use crate::arch::rackscale::controller::CONTROLLER_PORT_BASE; +use crate::arch::rackscale::controller::{CONTROLLER_PORT_BASE, MAX_CORES_PER_CLIENT}; use crate::arch::rackscale::fileops::rw::{RW_SHMEM_BUF, RW_SHMEM_BUF_LEN}; use crate::arch::rackscale::FrameCacheBase; use crate::arch::MAX_MACHINES; @@ -41,8 +41,24 @@ pub(crate) struct ClientState { impl ClientState { pub(crate) fn new() -> ClientState { - let clients = - crate::transport::shmem::init_shmem_rpc().expect("Failed to initialize shmem RPC"); + let clients = if crate::CMDLINE + .get() + .map_or(false, |c| c.transport == Transport::Ethernet) + { + let num_cores: u64 = atopology::MACHINE_TOPOLOGY.num_threads() as u64; + + crate::transport::ethernet::init_ethernet_rpc( + smoltcp::wire::IpAddress::v4(172, 31, 0, 11), + CONTROLLER_PORT_BASE + + (*crate::environment::MACHINE_ID as u16 - 1) * MAX_CORES_PER_CLIENT, + num_cores, + false, + ) + .expect("Failed to initialize ethernet RPC") + } else { + crate::transport::shmem::init_shmem_rpc().expect("Failed to initialize shmem RPC") + }; + let mut rpc_clients = ArrayVec::new(); for client in clients.into_iter() { rpc_clients.push(Mutex::new(client)); diff --git a/kernel/src/arch/x86_64/rackscale/controller.rs b/kernel/src/arch/x86_64/rackscale/controller.rs index b26468f01..95ea43ace 100644 --- a/kernel/src/arch/x86_64/rackscale/controller.rs +++ b/kernel/src/arch/x86_64/rackscale/controller.rs @@ -20,6 +20,8 @@ use crate::cmdline::Transport; use crate::transport::ethernet::ETHERNET_IFACE; use crate::transport::shmem::{create_shmem_transport, NUM_SHMEM_TRANSPORTS}; +use crate::arch::rackscale::registration::rpc_servers_to_register; + use super::*; pub(crate) const CONTROLLER_PORT_BASE: u16 = 6970; @@ -28,10 +30,12 @@ static ClientReadyCount: AtomicU64 = AtomicU64::new(0); static DCMServerReady: AtomicBool = AtomicBool::new(false); // Used for port allocation ranges for rpc servers -const MAX_CORES_PER_CLIENT: u16 = 24; +pub const MAX_CORES_PER_CLIENT: u16 = 24; /// Controller main method pub(crate) fn run() { + use core::hint::spin_loop; + use core::time::Duration; let mid = *crate::environment::CORE_ID; // TODO: still dependent on NUM_SHMEM_TRANSPORTS, ensure it works for eth too @@ -53,10 +57,45 @@ pub(crate) fn run() { register_rpcs(&mut server); servers.push(server); ClientReadyCount.fetch_add(1, Ordering::SeqCst); - while !DCMServerReady.load(Ordering::SeqCst) {} + // while !DCMServerReady.load(Ordering::SeqCst) {} servers[0] .add_client(&CLIENT_REGISTRAR) .expect("Failed to accept client"); + + // wait until controller learns about client topology + while (*rpc_servers_to_register.lock() == 0) { + let start = rawtime::Instant::now(); + while start.elapsed() < Duration::from_secs(1) { + spin_loop(); + } + } + + for i in 0..*rpc_servers_to_register.lock() { + let transport = Box::new( + TCPTransport::new( + None, + CONTROLLER_PORT_BASE + ((mid as u16 - 1) * MAX_CORES_PER_CLIENT) + i as u16 + 1, + Arc::clone(ÐERNET_IFACE), + ) + .expect("Failed to create TCP transport"), + ); + let mut server = Server::new(transport); + register_rpcs(&mut server); + servers.push(server); + *rpc_servers_to_register.lock() -= 1; + } + + ClientReadyCount.fetch_add(1, Ordering::SeqCst); + + // Wait for all clients to connect before fulfilling any RPCs. + while !DCMServerReady.load(Ordering::SeqCst) {} + + // already registered the first server + for s_index in 1..servers.len() { + servers[s_index] + .add_client(&CLIENT_REGISTRAR) + .expect("Failed to accept client"); + } } else if crate::CMDLINE .get() .map_or(false, |c| c.transport == Transport::Shmem) diff --git a/kernel/src/arch/x86_64/rackscale/dcm/mod.rs b/kernel/src/arch/x86_64/rackscale/dcm/mod.rs index daba36978..b6d9d8356 100644 --- a/kernel/src/arch/x86_64/rackscale/dcm/mod.rs +++ b/kernel/src/arch/x86_64/rackscale/dcm/mod.rs @@ -121,7 +121,7 @@ unsafe_abomonate!(DCMOps); lazy_static! { pub(crate) static ref DCM_CLIENT: Arc> = Arc::new(Mutex::new( - init_ethernet_rpc(IpAddress::v4(172, 31, 0, 20), DCM_CLIENT_PORT, 1) + init_ethernet_rpc(IpAddress::v4(172, 31, 0, 20), DCM_CLIENT_PORT, 1, true) .unwrap() .into_iter() .nth(0) diff --git a/kernel/src/arch/x86_64/rackscale/registration.rs b/kernel/src/arch/x86_64/rackscale/registration.rs index 5262d7e2c..26bdb195e 100644 --- a/kernel/src/arch/x86_64/rackscale/registration.rs +++ b/kernel/src/arch/x86_64/rackscale/registration.rs @@ -7,6 +7,8 @@ use abomonation::{decode, encode, unsafe_abomonate, Abomonation}; use core2::io::Result as IOResult; use core2::io::Write; use fallible_collections::{FallibleVec, FallibleVecGlobal}; +use lazy_static::lazy_static; +use spin::Mutex; use kpi::system::{CpuThread, MachineId}; use rpc::client::Client; @@ -21,6 +23,10 @@ use crate::memory::shmem_affinity::mid_to_shmem_affinity; use crate::memory::{Frame, PAddr, LARGE_PAGE_SIZE}; use crate::transport::shmem::{get_affinity_shmem, get_affinity_shmem_by_mid}; +lazy_static! { + pub static ref rpc_servers_to_register: Mutex = Mutex::new(0); +} + #[derive(Debug, Default)] pub(crate) struct ClientRegistrationRequest { pub(crate) mid: MachineId, @@ -41,12 +47,12 @@ unsafe_abomonate!( pub(crate) fn initialize_client( mut client: Client, send_client_data: bool, // This field is used to indicate if init_client() should send ClientRegistrationRequest - is_shmem: bool, + is_dcm: bool, ) -> KResult { // Don't modify this line without modifying testutils/rackscale_runner.rs log::warn!("CLIENT READY"); - if is_shmem { + if !is_dcm { // Fetch system information let shmem_region = get_affinity_shmem(); let hwthreads = atopology::MACHINE_TOPOLOGY.threads(); @@ -113,6 +119,8 @@ pub(crate) fn register_client(hdr: &mut RPCHeader, payload: &mut [u8]) -> Result return Ok(()); } + *rpc_servers_to_register.lock() = req.num_cores - 1; + // Parse out hw_threads let hw_threads = match unsafe { decode::>(hwthreads_data) } { Some((hw_threads, [])) => hw_threads, diff --git a/kernel/src/transport/ethernet.rs b/kernel/src/transport/ethernet.rs index 651788e6c..3b9b3cd03 100644 --- a/kernel/src/transport/ethernet.rs +++ b/kernel/src/transport/ethernet.rs @@ -96,27 +96,47 @@ pub(crate) fn init_network<'a>() -> KResult pub(crate) fn init_ethernet_rpc( server_ip: smoltcp::wire::IpAddress, server_port_base: u16, - num_cores: u16, + num_cores: u64, + is_dcm: bool, ) -> KResult> { use crate::arch::rackscale::registration::initialize_client; use alloc::boxed::Box; + use core::hint::spin_loop; + use core::time::Duration; use rpc::client::Client; use rpc::transport::TCPTransport; let mut clients: Vec = Vec::new(); - let offset = 0; - let server_port = server_port_base + offset; - - let rpc_transport = Box::new( - TCPTransport::new(Some(server_ip), server_port, Arc::clone(ÐERNET_IFACE)) - .map_err(|err| KError::RackscaleRPCError { err })?, - ); - let mut client = Client::new(rpc_transport); + for i in 0..num_cores { + let offset = i; + let server_port = server_port_base + offset as u16; + + let rpc_transport = Box::new( + TCPTransport::new(Some(server_ip), server_port, Arc::clone(ÐERNET_IFACE)) + .map_err(|err| KError::RackscaleRPCError { err })?, + ); + let mut client = Client::new(rpc_transport); + + // only send client data on first client registration request + let send_client_data = if i == 0 { true } else { false }; + + client = initialize_client(client, send_client_data, is_dcm) + .expect("Failed to initialize client"); + + // After sending client initialization, allow time for controller to + // register additional rpc servers before sending more requests + if send_client_data && !is_dcm { + unsafe { + let start = rawtime::Instant::now(); + while start.elapsed() < Duration::from_secs(5) { + spin_loop(); + } + } + } - // DCM sets send_client_data (second param) false - client = initialize_client(client, false, false).expect("Failed to initialize client"); - clients.push(client); + clients.push(client); + } Ok(clients) } diff --git a/kernel/src/transport/shmem.rs b/kernel/src/transport/shmem.rs index b86351fd4..9468d3cc8 100644 --- a/kernel/src/transport/shmem.rs +++ b/kernel/src/transport/shmem.rs @@ -393,9 +393,9 @@ pub(crate) fn init_shmem_rpc() -> KResult> { let client = if first { first = false; - initialize_client(client, true, true).expect("Failed to initialize client") + initialize_client(client, true, false).expect("Failed to initialize client") } else { - initialize_client(client, false, true).expect("Failed to initialize client") + initialize_client(client, false, false).expect("Failed to initialize client") }; clients.push(client); } From 4acf55763300bda64b469045d058f1ad93ad3a10 Mon Sep 17 00:00:00 2001 From: zmckevitt Date: Tue, 28 Nov 2023 03:47:00 -0500 Subject: [PATCH 09/10] Fixed ethernet rpc connection issues on single core client --- .../src/arch/x86_64/rackscale/client_state.rs | 12 ++++- .../src/arch/x86_64/rackscale/controller.rs | 49 +++++++++++-------- .../src/arch/x86_64/rackscale/registration.rs | 2 +- kernel/src/transport/ethernet.rs | 7 ++- 4 files changed, 43 insertions(+), 27 deletions(-) diff --git a/kernel/src/arch/x86_64/rackscale/client_state.rs b/kernel/src/arch/x86_64/rackscale/client_state.rs index a4cef78fc..f76167712 100644 --- a/kernel/src/arch/x86_64/rackscale/client_state.rs +++ b/kernel/src/arch/x86_64/rackscale/client_state.rs @@ -46,11 +46,19 @@ impl ClientState { .map_or(false, |c| c.transport == Transport::Ethernet) { let num_cores: u64 = atopology::MACHINE_TOPOLOGY.num_threads() as u64; + let mid = *crate::environment::MACHINE_ID; + let port_base = CONTROLLER_PORT_BASE + ((mid as u16 - 1) * MAX_CORES_PER_CLIENT); + + log::debug!( + "Sending ethernet initialization for client with {:?} cores, mid {:?}, and port {:?}", + num_cores, + mid, + port_base, + ); crate::transport::ethernet::init_ethernet_rpc( smoltcp::wire::IpAddress::v4(172, 31, 0, 11), - CONTROLLER_PORT_BASE - + (*crate::environment::MACHINE_ID as u16 - 1) * MAX_CORES_PER_CLIENT, + port_base, num_cores, false, ) diff --git a/kernel/src/arch/x86_64/rackscale/controller.rs b/kernel/src/arch/x86_64/rackscale/controller.rs index 95ea43ace..e3201b1af 100644 --- a/kernel/src/arch/x86_64/rackscale/controller.rs +++ b/kernel/src/arch/x86_64/rackscale/controller.rs @@ -45,36 +45,45 @@ pub(crate) fn run() { .get() .map_or(false, |c| c.transport == Transport::Ethernet) { + let port_base = CONTROLLER_PORT_BASE + (mid as u16 - 1) * MAX_CORES_PER_CLIENT; + + log::debug!( + "Initializing transport with mid {:?} on port {:?}", + mid, + port_base + ); let transport = Box::new( - TCPTransport::new( - None, - CONTROLLER_PORT_BASE + (mid as u16 - 1) * MAX_CORES_PER_CLIENT, - Arc::clone(ÐERNET_IFACE), - ) - .expect("Failed to create TCP transport"), + TCPTransport::new(None, port_base, Arc::clone(ÐERNET_IFACE)) + .expect("Failed to create TCP transport"), ); let mut server = Server::new(transport); register_rpcs(&mut server); servers.push(server); + ClientReadyCount.fetch_add(1, Ordering::SeqCst); - // while !DCMServerReady.load(Ordering::SeqCst) {} + while !DCMServerReady.load(Ordering::SeqCst) {} + servers[0] .add_client(&CLIENT_REGISTRAR) .expect("Failed to accept client"); + log::debug!("Initial RPC server initialized. Learning client topology..."); + // wait until controller learns about client topology - while (*rpc_servers_to_register.lock() == 0) { - let start = rawtime::Instant::now(); - while start.elapsed() < Duration::from_secs(1) { - spin_loop(); - } - } + while (*rpc_servers_to_register.lock() == 0) {} + + log::debug!( + "Received client topology, registering subsequent {:?} cores", + *rpc_servers_to_register.lock() + ); - for i in 0..*rpc_servers_to_register.lock() { + // register n-1 servers as we already handled the initial request + // will do nothing if no more servers to register + for i in 0..*rpc_servers_to_register.lock() - 1 { let transport = Box::new( TCPTransport::new( None, - CONTROLLER_PORT_BASE + ((mid as u16 - 1) * MAX_CORES_PER_CLIENT) + i as u16 + 1, + port_base + (i as u16 + 1), Arc::clone(ÐERNET_IFACE), ) .expect("Failed to create TCP transport"), @@ -82,20 +91,20 @@ pub(crate) fn run() { let mut server = Server::new(transport); register_rpcs(&mut server); servers.push(server); - *rpc_servers_to_register.lock() -= 1; } + *rpc_servers_to_register.lock() = 0; - ClientReadyCount.fetch_add(1, Ordering::SeqCst); - - // Wait for all clients to connect before fulfilling any RPCs. - while !DCMServerReady.load(Ordering::SeqCst) {} + log::debug!("Transports added. Adding clients..."); // already registered the first server + // will do nothing if no more servers to register for s_index in 1..servers.len() { servers[s_index] .add_client(&CLIENT_REGISTRAR) .expect("Failed to accept client"); } + + log::debug!("Finished registering RPC servers for client"); } else if crate::CMDLINE .get() .map_or(false, |c| c.transport == Transport::Shmem) diff --git a/kernel/src/arch/x86_64/rackscale/registration.rs b/kernel/src/arch/x86_64/rackscale/registration.rs index 26bdb195e..53629cd3c 100644 --- a/kernel/src/arch/x86_64/rackscale/registration.rs +++ b/kernel/src/arch/x86_64/rackscale/registration.rs @@ -119,7 +119,7 @@ pub(crate) fn register_client(hdr: &mut RPCHeader, payload: &mut [u8]) -> Result return Ok(()); } - *rpc_servers_to_register.lock() = req.num_cores - 1; + *rpc_servers_to_register.lock() = req.num_cores; // Parse out hw_threads let hw_threads = match unsafe { decode::>(hwthreads_data) } { diff --git a/kernel/src/transport/ethernet.rs b/kernel/src/transport/ethernet.rs index 3b9b3cd03..d6ef51f98 100644 --- a/kernel/src/transport/ethernet.rs +++ b/kernel/src/transport/ethernet.rs @@ -108,9 +108,8 @@ pub(crate) fn init_ethernet_rpc( let mut clients: Vec = Vec::new(); - for i in 0..num_cores { - let offset = i; - let server_port = server_port_base + offset as u16; + for core in 0..num_cores { + let server_port = server_port_base + core as u16; let rpc_transport = Box::new( TCPTransport::new(Some(server_ip), server_port, Arc::clone(ÐERNET_IFACE)) @@ -119,7 +118,7 @@ pub(crate) fn init_ethernet_rpc( let mut client = Client::new(rpc_transport); // only send client data on first client registration request - let send_client_data = if i == 0 { true } else { false }; + let send_client_data = if core == 0 { true } else { false }; client = initialize_client(client, send_client_data, is_dcm) .expect("Failed to initialize client"); From 3a4841549e6df4669e8a9b04fb4d40167cf06333 Mon Sep 17 00:00:00 2001 From: zmckevitt Date: Fri, 1 Dec 2023 17:31:36 -0500 Subject: [PATCH 10/10] Client initialization only waits when there is more than one core to register --- kernel/src/arch/x86_64/rackscale/client_state.rs | 2 ++ kernel/src/arch/x86_64/rackscale/controller.rs | 1 - kernel/src/arch/x86_64/rackscale/registration.rs | 2 +- kernel/src/transport/ethernet.rs | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/kernel/src/arch/x86_64/rackscale/client_state.rs b/kernel/src/arch/x86_64/rackscale/client_state.rs index f76167712..b405c7141 100644 --- a/kernel/src/arch/x86_64/rackscale/client_state.rs +++ b/kernel/src/arch/x86_64/rackscale/client_state.rs @@ -67,6 +67,8 @@ impl ClientState { crate::transport::shmem::init_shmem_rpc().expect("Failed to initialize shmem RPC") }; + log::warn!("CLIENT READY"); + let mut rpc_clients = ArrayVec::new(); for client in clients.into_iter() { rpc_clients.push(Mutex::new(client)); diff --git a/kernel/src/arch/x86_64/rackscale/controller.rs b/kernel/src/arch/x86_64/rackscale/controller.rs index e3201b1af..f87915a81 100644 --- a/kernel/src/arch/x86_64/rackscale/controller.rs +++ b/kernel/src/arch/x86_64/rackscale/controller.rs @@ -92,7 +92,6 @@ pub(crate) fn run() { register_rpcs(&mut server); servers.push(server); } - *rpc_servers_to_register.lock() = 0; log::debug!("Transports added. Adding clients..."); diff --git a/kernel/src/arch/x86_64/rackscale/registration.rs b/kernel/src/arch/x86_64/rackscale/registration.rs index 53629cd3c..4e92ff88a 100644 --- a/kernel/src/arch/x86_64/rackscale/registration.rs +++ b/kernel/src/arch/x86_64/rackscale/registration.rs @@ -50,7 +50,7 @@ pub(crate) fn initialize_client( is_dcm: bool, ) -> KResult { // Don't modify this line without modifying testutils/rackscale_runner.rs - log::warn!("CLIENT READY"); + // log::warn!("CLIENT READY"); if !is_dcm { // Fetch system information diff --git a/kernel/src/transport/ethernet.rs b/kernel/src/transport/ethernet.rs index d6ef51f98..22b378991 100644 --- a/kernel/src/transport/ethernet.rs +++ b/kernel/src/transport/ethernet.rs @@ -125,7 +125,7 @@ pub(crate) fn init_ethernet_rpc( // After sending client initialization, allow time for controller to // register additional rpc servers before sending more requests - if send_client_data && !is_dcm { + if send_client_data && !is_dcm && num_cores > 1 { unsafe { let start = rawtime::Instant::now(); while start.elapsed() < Duration::from_secs(5) {