diff --git a/kernel/run.py b/kernel/run.py index 7f9dae27b..5ac802152 100644 --- a/kernel/run.py +++ b/kernel/run.py @@ -745,7 +745,9 @@ def configure_network(args): for _, ncfg in zip(range(0, args.workers), NETWORK_CONFIG): sudo[tunctl[['-t', ncfg, '-u', user, '-g', group]]]() sudo[ip[['link', 'set', ncfg, 'up']]](retcode=(0, 1)) + sudo[ip[['link', 'set', ncfg, 'txqueuelen', 65536]]]() sudo[brctl[['addif', 'br0', ncfg]]]() + sudo[ip[['link', 'set', 'br0', 'txqueuelen', 65536]]]() sudo[ip[['link', 'set', 'br0', 'up']]](retcode=(0, 1)) diff --git a/kernel/src/arch/x86_64/rackscale/client_state.rs b/kernel/src/arch/x86_64/rackscale/client_state.rs index daf91843a..b405c7141 100644 --- a/kernel/src/arch/x86_64/rackscale/client_state.rs +++ b/kernel/src/arch/x86_64/rackscale/client_state.rs @@ -16,7 +16,7 @@ use rpc::client::Client; use rpc::rpc::RPCError; use crate::arch::kcb::try_per_core_mem; -use crate::arch::rackscale::controller::CONTROLLER_PORT_BASE; +use crate::arch::rackscale::controller::{CONTROLLER_PORT_BASE, MAX_CORES_PER_CLIENT}; use crate::arch::rackscale::fileops::rw::{RW_SHMEM_BUF, RW_SHMEM_BUF_LEN}; use crate::arch::rackscale::FrameCacheBase; use crate::arch::MAX_MACHINES; @@ -25,11 +25,12 @@ use crate::error::{KError, KResult}; use crate::memory::backends::MemManager; use crate::memory::shmem_affinity::{local_shmem_affinity, mid_to_shmem_affinity}; use crate::process::MAX_PROCESSES; +use crate::transport::shmem::NUM_SHMEM_TRANSPORTS; /// This is the state the client records about itself pub(crate) struct ClientState { /// The RPC client used to communicate with the controller - pub(crate) rpc_client: Arc>, + pub(crate) rpc_clients: Arc, { NUM_SHMEM_TRANSPORTS as usize }>>, /// Used to store shmem affinity base pages pub(crate) affinity_base_pages: Arc>, MAX_MACHINES>>, @@ -40,27 +41,39 @@ pub(crate) struct ClientState { impl ClientState { pub(crate) fn new() -> ClientState { - // Create network stack and instantiate RPC Client - let rpc_client = if crate::CMDLINE + let clients = if crate::CMDLINE .get() .map_or(false, |c| c.transport == Transport::Ethernet) { - Arc::new(Mutex::new( - crate::transport::ethernet::init_ethernet_rpc( - smoltcp::wire::IpAddress::v4(172, 31, 0, 11), - CONTROLLER_PORT_BASE + (*crate::environment::MACHINE_ID as u16 - 1), - true, - ) - .expect("Failed to initialize ethernet RPC"), - )) + let num_cores: u64 = atopology::MACHINE_TOPOLOGY.num_threads() as u64; + let mid = *crate::environment::MACHINE_ID; + let port_base = CONTROLLER_PORT_BASE + ((mid as u16 - 1) * MAX_CORES_PER_CLIENT); + + log::debug!( + "Sending ethernet initialization for client with {:?} cores, mid {:?}, and port {:?}", + num_cores, + mid, + port_base, + ); + + crate::transport::ethernet::init_ethernet_rpc( + smoltcp::wire::IpAddress::v4(172, 31, 0, 11), + port_base, + num_cores, + false, + ) + .expect("Failed to initialize ethernet RPC") } else { - // Default is Shmem, even if transport unspecified - Arc::new(Mutex::new( - crate::transport::shmem::init_shmem_rpc(true) - .expect("Failed to initialize shmem RPC"), - )) + crate::transport::shmem::init_shmem_rpc().expect("Failed to initialize shmem RPC") }; + log::warn!("CLIENT READY"); + + let mut rpc_clients = ArrayVec::new(); + for client in clients.into_iter() { + rpc_clients.push(Mutex::new(client)); + } + let mut per_process_base_pages = ArrayVec::new(); for _i in 0..MAX_PROCESSES { // TODO(rackscale): this is a bogus affinity because it should really be "ANY_SHMEM" @@ -76,7 +89,7 @@ impl ClientState { log::debug!("Finished initializing client state"); ClientState { - rpc_client, + rpc_clients: Arc::new(rpc_clients), affinity_base_pages: Arc::new(affinity_base_pages), per_process_base_pages: Arc::new(per_process_base_pages), } diff --git a/kernel/src/arch/x86_64/rackscale/controller.rs b/kernel/src/arch/x86_64/rackscale/controller.rs index 23339f0ef..f87915a81 100644 --- a/kernel/src/arch/x86_64/rackscale/controller.rs +++ b/kernel/src/arch/x86_64/rackscale/controller.rs @@ -18,7 +18,9 @@ use crate::arch::rackscale::dcm::{ use crate::arch::MAX_MACHINES; use crate::cmdline::Transport; use crate::transport::ethernet::ETHERNET_IFACE; -use crate::transport::shmem::create_shmem_transport; +use crate::transport::shmem::{create_shmem_transport, NUM_SHMEM_TRANSPORTS}; + +use crate::arch::rackscale::registration::rpc_servers_to_register; use super::*; @@ -27,50 +29,108 @@ pub(crate) const CONTROLLER_PORT_BASE: u16 = 6970; static ClientReadyCount: AtomicU64 = AtomicU64::new(0); static DCMServerReady: AtomicBool = AtomicBool::new(false); +// Used for port allocation ranges for rpc servers +pub const MAX_CORES_PER_CLIENT: u16 = 24; + /// Controller main method pub(crate) fn run() { + use core::hint::spin_loop; + use core::time::Duration; let mid = *crate::environment::CORE_ID; - // Initialize one server per controller thread - let mut server = if crate::CMDLINE + // TODO: still dependent on NUM_SHMEM_TRANSPORTS, ensure it works for eth too + let mut servers: ArrayVec, { NUM_SHMEM_TRANSPORTS as usize }> = ArrayVec::new(); + + if crate::CMDLINE .get() .map_or(false, |c| c.transport == Transport::Ethernet) { + let port_base = CONTROLLER_PORT_BASE + (mid as u16 - 1) * MAX_CORES_PER_CLIENT; + + log::debug!( + "Initializing transport with mid {:?} on port {:?}", + mid, + port_base + ); let transport = Box::new( - TCPTransport::new( - None, - CONTROLLER_PORT_BASE + mid as u16 - 1, - Arc::clone(ÐERNET_IFACE), - ) - .expect("Failed to create TCP transport"), + TCPTransport::new(None, port_base, Arc::clone(ÐERNET_IFACE)) + .expect("Failed to create TCP transport"), ); let mut server = Server::new(transport); register_rpcs(&mut server); - server + servers.push(server); + + ClientReadyCount.fetch_add(1, Ordering::SeqCst); + while !DCMServerReady.load(Ordering::SeqCst) {} + + servers[0] + .add_client(&CLIENT_REGISTRAR) + .expect("Failed to accept client"); + + log::debug!("Initial RPC server initialized. Learning client topology..."); + + // wait until controller learns about client topology + while (*rpc_servers_to_register.lock() == 0) {} + + log::debug!( + "Received client topology, registering subsequent {:?} cores", + *rpc_servers_to_register.lock() + ); + + // register n-1 servers as we already handled the initial request + // will do nothing if no more servers to register + for i in 0..*rpc_servers_to_register.lock() - 1 { + let transport = Box::new( + TCPTransport::new( + None, + port_base + (i as u16 + 1), + Arc::clone(ÐERNET_IFACE), + ) + .expect("Failed to create TCP transport"), + ); + let mut server = Server::new(transport); + register_rpcs(&mut server); + servers.push(server); + } + + log::debug!("Transports added. Adding clients..."); + + // already registered the first server + // will do nothing if no more servers to register + for s_index in 1..servers.len() { + servers[s_index] + .add_client(&CLIENT_REGISTRAR) + .expect("Failed to accept client"); + } + + log::debug!("Finished registering RPC servers for client"); } else if crate::CMDLINE .get() .map_or(false, |c| c.transport == Transport::Shmem) { - let transport = Box::new( - create_shmem_transport(mid.try_into().unwrap()) - .expect("Failed to create shmem transport"), - ); + let transports = create_shmem_transport(mid.try_into().unwrap()) + .expect("Failed to create shmem transport"); - let mut server = Server::new(transport); - register_rpcs(&mut server); - server - } else { - unreachable!("No supported transport layer specified in kernel argument"); - }; + // let mut servers: ArrayVec, { NUM_SHMEM_TRANSPORTS as usize }> = ArrayVec::new(); + for transport in transports.into_iter() { + let mut server = Server::new(Box::new(transport)); + register_rpcs(&mut server); + servers.push(server); + } - ClientReadyCount.fetch_add(1, Ordering::SeqCst); + ClientReadyCount.fetch_add(1, Ordering::SeqCst); - // Wait for all clients to connect before fulfilling any RPCs. - while !DCMServerReady.load(Ordering::SeqCst) {} + // Wait for all clients to connect before fulfilling any RPCs. + while !DCMServerReady.load(Ordering::SeqCst) {} - server - .add_client(&CLIENT_REGISTRAR) - .expect("Failed to accept client"); + for s_index in 0..servers.len() { + servers[s_index] + .add_client(&CLIENT_REGISTRAR) + .expect("Failed to accept client"); + } + } else { + unreachable!("No supported transport layer specified in kernel argument"); + }; ClientReadyCount.fetch_add(1, Ordering::SeqCst); @@ -114,9 +174,11 @@ pub(crate) fn run() { // Start running the RPC server log::info!("Starting RPC server for client {:?}!", mid); loop { - let _handled = server - .try_handle() - .expect("Controller failed to handle RPC"); + for s_index in 0..servers.len() { + let _handled = servers[s_index] + .try_handle() + .expect("Controller failed to handle RPC"); + } } } diff --git a/kernel/src/arch/x86_64/rackscale/dcm/mod.rs b/kernel/src/arch/x86_64/rackscale/dcm/mod.rs index f8c004c89..b6d9d8356 100644 --- a/kernel/src/arch/x86_64/rackscale/dcm/mod.rs +++ b/kernel/src/arch/x86_64/rackscale/dcm/mod.rs @@ -121,6 +121,10 @@ unsafe_abomonate!(DCMOps); lazy_static! { pub(crate) static ref DCM_CLIENT: Arc> = Arc::new(Mutex::new( - init_ethernet_rpc(IpAddress::v4(172, 31, 0, 20), DCM_CLIENT_PORT, false).unwrap(), + init_ethernet_rpc(IpAddress::v4(172, 31, 0, 20), DCM_CLIENT_PORT, 1, true) + .unwrap() + .into_iter() + .nth(0) + .unwrap(), )); } diff --git a/kernel/src/arch/x86_64/rackscale/fileops/close.rs b/kernel/src/arch/x86_64/rackscale/fileops/close.rs index 76a8424e4..75d9cfcfa 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/close.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/close.rs @@ -34,11 +34,13 @@ pub(crate) fn rpc_close(pid: usize, fd: FileDescriptor) -> KResult<(u64, u64)> { let mut res_data = [0u8; core::mem::size_of::>()]; // Call Close() RPC - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::Close as RPCType, - &[&req_data], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::Close as RPCType, + &[&req_data], + &mut [&mut res_data], + )?; // Decode and return result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/fileops/delete.rs b/kernel/src/arch/x86_64/rackscale/fileops/delete.rs index 4b4157f65..cf5fa52f5 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/delete.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/delete.rs @@ -36,11 +36,13 @@ pub(crate) fn rpc_delete(pid: usize, pathname: String) -> KResult<(u64, u64)> { let mut res_data = [0u8; core::mem::size_of::>()]; // Call RPC - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::Delete as RPCType, - &[&req_data, &pathname.as_bytes()], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::Delete as RPCType, + &[&req_data, &pathname.as_bytes()], + &mut [&mut res_data], + )?; // Decode result - return result if decoding successful if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/fileops/getinfo.rs b/kernel/src/arch/x86_64/rackscale/fileops/getinfo.rs index 89ff5451c..33a83864d 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/getinfo.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/getinfo.rs @@ -34,11 +34,13 @@ pub(crate) fn rpc_getinfo + Debug>(pid: usize, name: P) -> KResul // Construct result buffer and call RPC let mut res_data = [0u8; core::mem::size_of::>()]; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::GetInfo as RPCType, - &[&req_data, name.as_ref()], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::GetInfo as RPCType, + &[&req_data, name.as_ref()], + &mut [&mut res_data], + )?; // Decode and return the result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/fileops/mkdir.rs b/kernel/src/arch/x86_64/rackscale/fileops/mkdir.rs index a64802c18..c8879a3c2 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/mkdir.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/mkdir.rs @@ -42,11 +42,13 @@ pub(crate) fn rpc_mkdir + Debug>( let mut res_data = [0u8; core::mem::size_of::>()]; // Call RPC - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::MkDir as RPCType, - &[&req_data, pathname.as_ref()], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::MkDir as RPCType, + &[&req_data, pathname.as_ref()], + &mut [&mut res_data], + )?; // Parse and return result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/fileops/open.rs b/kernel/src/arch/x86_64/rackscale/fileops/open.rs index f263acf22..cb21c281c 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/open.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/open.rs @@ -54,11 +54,13 @@ fn rpc_open_create + Debug>( let mut res_data = [0u8; core::mem::size_of::>()]; // Call the RPC - CLIENT_STATE.rpc_client.lock().call( - rpc_type, - &[&req_data, pathname.as_ref()], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + rpc_type, + &[&req_data, pathname.as_ref()], + &mut [&mut res_data], + )?; // Decode and return the result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/fileops/rename.rs b/kernel/src/arch/x86_64/rackscale/fileops/rename.rs index 55cf81faf..ccfbc81f9 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/rename.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/rename.rs @@ -44,11 +44,13 @@ pub(crate) fn rpc_rename + Debug>( let mut res_data = [0u8; core::mem::size_of::>()]; // Call the RPC - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::FileRename as RPCType, - &[&req_data, oldname.as_ref(), newname.as_ref()], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::FileRename as RPCType, + &[&req_data, oldname.as_ref(), newname.as_ref()], + &mut [&mut res_data], + )?; // Parse and return the result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/fileops/rw.rs b/kernel/src/arch/x86_64/rackscale/fileops/rw.rs index a45ac4a97..81eb3a62b 100644 --- a/kernel/src/arch/x86_64/rackscale/fileops/rw.rs +++ b/kernel/src/arch/x86_64/rackscale/fileops/rw.rs @@ -69,8 +69,7 @@ pub(crate) fn rpc_writeat( } else { KernelRpc::WriteAt as RPCType }; - CLIENT_STATE - .rpc_client + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] .lock() .call(rpc_type, &[&req_data, &data], &mut [&mut res_data])?; @@ -129,11 +128,13 @@ pub(crate) fn rpc_readat( KernelRpc::ReadAt as RPCType }; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::ReadAt as RPCType, - &[&req_data], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::ReadAt as RPCType, + &[&req_data], + &mut [&mut res_data], + )?; // Decode result, if successful, return result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/get_shmem_frames.rs b/kernel/src/arch/x86_64/rackscale/get_shmem_frames.rs index 205d9d803..9dae7c098 100644 --- a/kernel/src/arch/x86_64/rackscale/get_shmem_frames.rs +++ b/kernel/src/arch/x86_64/rackscale/get_shmem_frames.rs @@ -40,7 +40,11 @@ unsafe_abomonate!(ShmemRegion: base, affinity); // This isn't truly a syscall pub(crate) fn rpc_get_shmem_frames(pid: Option, num_frames: usize) -> KResult> { assert!(num_frames > 0); - log::debug!("GetShmemFrames({:?})", num_frames); + log::debug!( + "GetShmemFrames({:?}) core={:?}", + num_frames, + kpi::system::mtid_from_gtid(*crate::environment::CORE_ID) + ); let mid = if pid.is_none() { Some(*crate::environment::MACHINE_ID) @@ -66,8 +70,7 @@ pub(crate) fn rpc_get_shmem_frames(pid: Option, num_frames: usize) -> KResu for i in 0..max_res_size { res_data.push(0u8); } - CLIENT_STATE - .rpc_client + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] .lock() .call( KernelRpc::GetShmemFrames as RPCType, diff --git a/kernel/src/arch/x86_64/rackscale/get_shmem_structure.rs b/kernel/src/arch/x86_64/rackscale/get_shmem_structure.rs index 3c2a5f997..3a4cccf63 100644 --- a/kernel/src/arch/x86_64/rackscale/get_shmem_structure.rs +++ b/kernel/src/arch/x86_64/rackscale/get_shmem_structure.rs @@ -63,8 +63,7 @@ pub(crate) fn rpc_get_shmem_structure( // Make buffer max size of MAX_PROCESS (for NrProcLogs), 1 (for NrLog) let mut res_data = [0u8; core::mem::size_of::<[u64; MAX_PROCESSES]>()]; - CLIENT_STATE - .rpc_client + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] .lock() .call( KernelRpc::GetShmemStructure as RPCType, diff --git a/kernel/src/arch/x86_64/rackscale/processops/allocate_physical.rs b/kernel/src/arch/x86_64/rackscale/processops/allocate_physical.rs index 6c2dce75e..eb4b5f2cb 100644 --- a/kernel/src/arch/x86_64/rackscale/processops/allocate_physical.rs +++ b/kernel/src/arch/x86_64/rackscale/processops/allocate_physical.rs @@ -45,11 +45,13 @@ pub(crate) fn rpc_allocate_physical(pid: Pid, size: u64, affinity: u64) -> KResu // Create result buffer let mut res_data = [0u8; core::mem::size_of::>()]; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::AllocatePhysical as RPCType, - &[&req_data], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::AllocatePhysical as RPCType, + &[&req_data], + &mut [&mut res_data], + )?; // Decode result, return result if decoded successfully if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/processops/print.rs b/kernel/src/arch/x86_64/rackscale/processops/print.rs index 283aacf9b..3ceafab85 100644 --- a/kernel/src/arch/x86_64/rackscale/processops/print.rs +++ b/kernel/src/arch/x86_64/rackscale/processops/print.rs @@ -35,11 +35,13 @@ pub(crate) fn rpc_log(msg: String) -> KResult<(u64, u64)> { // Construct result buffer and call RPC let mut res_data = [0u8; core::mem::size_of::>()]; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::Log as RPCType, - &[&req_data, print_str.as_ref()], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::Log as RPCType, + &[&req_data, print_str.as_ref()], + &mut [&mut res_data], + )?; // Decode and return the result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/processops/release_core.rs b/kernel/src/arch/x86_64/rackscale/processops/release_core.rs index e912d0540..586f7b52a 100644 --- a/kernel/src/arch/x86_64/rackscale/processops/release_core.rs +++ b/kernel/src/arch/x86_64/rackscale/processops/release_core.rs @@ -38,11 +38,13 @@ pub(crate) fn rpc_release_core(pid: Pid, gtid: ThreadId) -> KResult<(u64, u64)> // Create result buffer let mut res_data = [0u8; core::mem::size_of::>()]; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::ReleaseCore as RPCType, - &[&req_data], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::ReleaseCore as RPCType, + &[&req_data], + &mut [&mut res_data], + )?; // Decode result, return result if decoded successfully if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/processops/release_physical.rs b/kernel/src/arch/x86_64/rackscale/processops/release_physical.rs index 1bc76e7d4..b067125d9 100644 --- a/kernel/src/arch/x86_64/rackscale/processops/release_physical.rs +++ b/kernel/src/arch/x86_64/rackscale/processops/release_physical.rs @@ -54,11 +54,13 @@ pub(crate) fn rpc_release_physical(pid: Pid, frame_id: u64) -> KResult<(u64, u64 // Create result buffer let mut res_data = [0u8; core::mem::size_of::>()]; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::ReleasePhysical as RPCType, - &[&req_data], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::ReleasePhysical as RPCType, + &[&req_data], + &mut [&mut res_data], + )?; // Decode result, return result if decoded successfully if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/processops/request_core.rs b/kernel/src/arch/x86_64/rackscale/processops/request_core.rs index bda7182ef..8c83465b1 100644 --- a/kernel/src/arch/x86_64/rackscale/processops/request_core.rs +++ b/kernel/src/arch/x86_64/rackscale/processops/request_core.rs @@ -38,11 +38,13 @@ pub(crate) fn rpc_request_core(pid: Pid, new_pid: bool, entry_point: u64) -> KRe // Construct result buffer and call RPC let mut res_data = [0u8; core::mem::size_of::>()]; - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::RequestCore as RPCType, - &[&req_data], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::RequestCore as RPCType, + &[&req_data], + &mut [&mut res_data], + )?; // Decode and return the result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/arch/x86_64/rackscale/registration.rs b/kernel/src/arch/x86_64/rackscale/registration.rs index d2dbf390a..4e92ff88a 100644 --- a/kernel/src/arch/x86_64/rackscale/registration.rs +++ b/kernel/src/arch/x86_64/rackscale/registration.rs @@ -7,6 +7,8 @@ use abomonation::{decode, encode, unsafe_abomonate, Abomonation}; use core2::io::Result as IOResult; use core2::io::Write; use fallible_collections::{FallibleVec, FallibleVecGlobal}; +use lazy_static::lazy_static; +use spin::Mutex; use kpi::system::{CpuThread, MachineId}; use rpc::client::Client; @@ -21,29 +23,36 @@ use crate::memory::shmem_affinity::mid_to_shmem_affinity; use crate::memory::{Frame, PAddr, LARGE_PAGE_SIZE}; use crate::transport::shmem::{get_affinity_shmem, get_affinity_shmem_by_mid}; +lazy_static! { + pub static ref rpc_servers_to_register: Mutex = Mutex::new(0); +} + #[derive(Debug, Default)] pub(crate) struct ClientRegistrationRequest { pub(crate) mid: MachineId, pub(crate) shmem_region_base: u64, pub(crate) shmem_region_size: usize, pub(crate) num_cores: u64, + pub(crate) is_first: bool, } unsafe_abomonate!( ClientRegistrationRequest: mid, shmem_region_base, shmem_region_size, - num_cores + num_cores, + is_first ); // Called by client to register client with the controller pub(crate) fn initialize_client( mut client: Client, send_client_data: bool, // This field is used to indicate if init_client() should send ClientRegistrationRequest + is_dcm: bool, ) -> KResult { // Don't modify this line without modifying testutils/rackscale_runner.rs - log::warn!("CLIENT READY"); + // log::warn!("CLIENT READY"); - if send_client_data { + if !is_dcm { // Fetch system information let shmem_region = get_affinity_shmem(); let hwthreads = atopology::MACHINE_TOPOLOGY.threads(); @@ -63,12 +72,15 @@ pub(crate) fn initialize_client( assert!(client_threads.len() == num_threads); log::debug!("client_threads: {:?}", client_threads); + let is_first = if send_client_data { true } else { false }; + // Construct client registration request let req = ClientRegistrationRequest { mid: *crate::environment::MACHINE_ID, shmem_region_base: shmem_region.base.as_u64(), shmem_region_size: shmem_region.size, num_cores: num_threads as u64, + is_first: is_first, }; // Serialize and send the registration request to the controller @@ -103,6 +115,12 @@ pub(crate) fn register_client(hdr: &mut RPCHeader, payload: &mut [u8]) -> Result req.shmem_region_base + req.shmem_region_size as u64 ); + if !req.is_first { + return Ok(()); + } + + *rpc_servers_to_register.lock() = req.num_cores; + // Parse out hw_threads let hw_threads = match unsafe { decode::>(hwthreads_data) } { Some((hw_threads, [])) => hw_threads, diff --git a/kernel/src/arch/x86_64/rackscale/systemops/get_hardware_threads.rs b/kernel/src/arch/x86_64/rackscale/systemops/get_hardware_threads.rs index 8cbcb3aec..6bc5577b2 100644 --- a/kernel/src/arch/x86_64/rackscale/systemops/get_hardware_threads.rs +++ b/kernel/src/arch/x86_64/rackscale/systemops/get_hardware_threads.rs @@ -31,11 +31,13 @@ pub(crate) fn rpc_get_hardware_threads( let mut res_data = [0u8; core::mem::size_of::>() + 5 * 4096]; // Call GetHardwareThreads() RPC - CLIENT_STATE.rpc_client.lock().call( - KernelRpc::GetHardwareThreads as RPCType, - &[&[]], - &mut [&mut res_data], - )?; + CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)] + .lock() + .call( + KernelRpc::GetHardwareThreads as RPCType, + &[&[]], + &mut [&mut res_data], + )?; // Decode and return result if let Some((res, remaining)) = unsafe { decode::>(&mut res_data) } { diff --git a/kernel/src/transport/ethernet.rs b/kernel/src/transport/ethernet.rs index 93658dc7d..22b378991 100644 --- a/kernel/src/transport/ethernet.rs +++ b/kernel/src/transport/ethernet.rs @@ -95,18 +95,47 @@ pub(crate) fn init_network<'a>() -> KResult #[allow(unused)] pub(crate) fn init_ethernet_rpc( server_ip: smoltcp::wire::IpAddress, - server_port: u16, - send_client_data: bool, // This field is used to indicate if init_client() should send ClientRegistrationRequest -) -> KResult { + server_port_base: u16, + num_cores: u64, + is_dcm: bool, +) -> KResult> { use crate::arch::rackscale::registration::initialize_client; use alloc::boxed::Box; + use core::hint::spin_loop; + use core::time::Duration; use rpc::client::Client; use rpc::transport::TCPTransport; - let rpc_transport = Box::new( - TCPTransport::new(Some(server_ip), server_port, Arc::clone(ÐERNET_IFACE)) - .map_err(|err| KError::RackscaleRPCError { err })?, - ); - let mut client = Client::new(rpc_transport); - initialize_client(client, send_client_data) + let mut clients: Vec = Vec::new(); + + for core in 0..num_cores { + let server_port = server_port_base + core as u16; + + let rpc_transport = Box::new( + TCPTransport::new(Some(server_ip), server_port, Arc::clone(ÐERNET_IFACE)) + .map_err(|err| KError::RackscaleRPCError { err })?, + ); + let mut client = Client::new(rpc_transport); + + // only send client data on first client registration request + let send_client_data = if core == 0 { true } else { false }; + + client = initialize_client(client, send_client_data, is_dcm) + .expect("Failed to initialize client"); + + // After sending client initialization, allow time for controller to + // register additional rpc servers before sending more requests + if send_client_data && !is_dcm && num_cores > 1 { + unsafe { + let start = rawtime::Instant::now(); + while start.elapsed() < Duration::from_secs(5) { + spin_loop(); + } + } + } + + clients.push(client); + } + + Ok(clients) } diff --git a/kernel/src/transport/shmem.rs b/kernel/src/transport/shmem.rs index b518a4551..9468d3cc8 100644 --- a/kernel/src/transport/shmem.rs +++ b/kernel/src/transport/shmem.rs @@ -307,13 +307,20 @@ const SHMEM_QUEUE_SIZE: usize = 32; // The total size of two queues(sender and reciever) should be less than the transport size. #[cfg(feature = "rpc")] -const_assert!(2 * SHMEM_QUEUE_SIZE * QUEUE_ENTRY_SIZE <= SHMEM_TRANSPORT_SIZE as usize); +const_assert!(2 * SHMEM_QUEUE_SIZE * QUEUE_ENTRY_SIZE <= SINGLE_SHMEM_TRANSPORT_SIZE as usize); #[cfg(feature = "rpc")] -pub(crate) const SHMEM_TRANSPORT_SIZE: u64 = 2 * 1024 * 1024; +pub(crate) const SINGLE_SHMEM_TRANSPORT_SIZE: u64 = 2 * 1024 * 1024; +// TODO(rackscale, hack): max cores at 24 for now #[cfg(feature = "rpc")] -pub(crate) fn create_shmem_transport(mid: MachineId) -> KResult> { +pub(crate) const NUM_SHMEM_TRANSPORTS: u64 = 24; + +#[cfg(feature = "rpc")] +pub(crate) const SHMEM_TRANSPORT_SIZE: u64 = SINGLE_SHMEM_TRANSPORT_SIZE * NUM_SHMEM_TRANSPORTS; + +#[cfg(feature = "rpc")] +pub(crate) fn create_shmem_transport(mid: MachineId) -> KResult>> { use rpc::transport::shmem::allocator::ShmemAllocator; use rpc::transport::shmem::Queue; use rpc::transport::shmem::{Receiver, Sender}; @@ -326,57 +333,74 @@ pub(crate) fn create_shmem_transport(mid: MachineId) -> KResult= SHMEM_TRANSPORT_SIZE); - let allocator = ShmemAllocator::new(base_addr.as_u64(), SHMEM_TRANSPORT_SIZE); - match crate::CMDLINE.get().map_or(Mode::Native, |c| c.mode) { - Mode::Controller => { - let server_to_client_queue = - Arc::new(Queue::with_capacity_in(true, SHMEM_QUEUE_SIZE, &allocator).unwrap()); - let client_to_server_queue = - Arc::new(Queue::with_capacity_in(true, SHMEM_QUEUE_SIZE, &allocator).unwrap()); - let server_sender = Sender::with_shared_queue(server_to_client_queue.clone()); - let server_receiver = Receiver::with_shared_queue(client_to_server_queue.clone()); - log::info!( - "Controller: Created shared-memory transport for machine {}! size={:?}, base={:?}", + let mut transports = Vec::try_with_capacity(NUM_SHMEM_TRANSPORTS as usize)?; + + for transport_offset in 0..NUM_SHMEM_TRANSPORTS { + let allocator = ShmemAllocator::new( + base_addr.as_u64() + transport_offset * SINGLE_SHMEM_TRANSPORT_SIZE, + SINGLE_SHMEM_TRANSPORT_SIZE, + ); + match crate::CMDLINE.get().map_or(Mode::Native, |c| c.mode) { + Mode::Controller => { + let server_to_client_queue = + Arc::new(Queue::with_capacity_in(true, SHMEM_QUEUE_SIZE, &allocator).unwrap()); + let client_to_server_queue = + Arc::new(Queue::with_capacity_in(true, SHMEM_QUEUE_SIZE, &allocator).unwrap()); + let server_sender = Sender::with_shared_queue(server_to_client_queue.clone()); + let server_receiver = Receiver::with_shared_queue(client_to_server_queue.clone()); + log::info!( + "Controller: Created shared-memory transport for machine {}! size={:?}, base={:x}", mid, - SHMEM_TRANSPORT_SIZE, - base_addr - ); - Ok(ShmemTransport::new(server_receiver, server_sender)) - } - Mode::Client => { - let server_to_client_queue = - Arc::new(Queue::with_capacity_in(false, SHMEM_QUEUE_SIZE, &allocator).unwrap()); - let client_to_server_queue = - Arc::new(Queue::with_capacity_in(false, SHMEM_QUEUE_SIZE, &allocator).unwrap()); - let client_receiver = Receiver::with_shared_queue(server_to_client_queue.clone()); - let client_sender = Sender::with_shared_queue(client_to_server_queue.clone()); - log::info!( - "Client: Created shared-memory transport! size={:?}, base={:?}", - SHMEM_TRANSPORT_SIZE, - base_addr + SINGLE_SHMEM_TRANSPORT_SIZE, + base_addr.as_u64() + transport_offset * SINGLE_SHMEM_TRANSPORT_SIZE ); - Ok(ShmemTransport::new(client_receiver, client_sender)) - } - Mode::Native => { - log::error!("Native mode not supported for shmem"); - Err(KError::InvalidNativeMode) + transports.push(ShmemTransport::new(server_receiver, server_sender)); + } + Mode::Client => { + let server_to_client_queue = + Arc::new(Queue::with_capacity_in(false, SHMEM_QUEUE_SIZE, &allocator).unwrap()); + let client_to_server_queue = + Arc::new(Queue::with_capacity_in(false, SHMEM_QUEUE_SIZE, &allocator).unwrap()); + let client_receiver = Receiver::with_shared_queue(server_to_client_queue.clone()); + let client_sender = Sender::with_shared_queue(client_to_server_queue.clone()); + log::info!( + "Client: Created shared-memory transport! size={:?}, base={:x}", + SINGLE_SHMEM_TRANSPORT_SIZE, + base_addr.as_u64() + transport_offset * SINGLE_SHMEM_TRANSPORT_SIZE + ); + transports.push(ShmemTransport::new(client_receiver, client_sender)); + } + Mode::Native => { + log::error!("Native mode not supported for shmem"); + return Err(KError::InvalidNativeMode); + } } } + Ok(transports) } #[cfg(feature = "rpc")] -pub(crate) fn init_shmem_rpc( - send_client_data: bool, // This field is used to indicate if init_client() should send ClientRegistrationRequest -) -> KResult { +pub(crate) fn init_shmem_rpc() -> KResult> { use crate::arch::rackscale::registration::initialize_client; use rpc::client::Client; // Set up the transport - let transport = Box::try_new(create_shmem_transport(*crate::environment::MACHINE_ID)?)?; - + let transports = create_shmem_transport(*crate::environment::MACHINE_ID)?; + let mut clients = Vec::try_with_capacity(transports.len())?; + let mut first = true; + for transport in transports.into_iter() { + let client = Client::new(Box::new(transport)); + + let client = if first { + first = false; + initialize_client(client, true, false).expect("Failed to initialize client") + } else { + initialize_client(client, false, false).expect("Failed to initialize client") + }; + clients.push(client); + } // Create the client - let client = Client::new(transport); - initialize_client(client, send_client_data) + Ok(clients) } #[cfg(feature = "rackscale")] diff --git a/kernel/tests/s11_rackscale_benchmarks.rs b/kernel/tests/s11_rackscale_benchmarks.rs index 9021981f0..c19ca9e81 100644 --- a/kernel/tests/s11_rackscale_benchmarks.rs +++ b/kernel/tests/s11_rackscale_benchmarks.rs @@ -26,8 +26,8 @@ fn s11_rackscale_shmem_fxmark_benchmark() { rackscale_fxmark_benchmark(RackscaleTransport::Shmem); } -#[test] #[ignore] +#[test] #[cfg(not(feature = "baremetal"))] fn s11_rackscale_ethernet_fxmark_benchmark() { rackscale_fxmark_benchmark(RackscaleTransport::Ethernet); @@ -134,14 +134,16 @@ fn rackscale_fxmark_benchmark(transport: RackscaleTransport) { fn timeout_fn(num_cores: usize) -> u64 { 180_000 + 5_000 * num_cores as u64 } + fn mem_fn(num_cores: usize, is_smoke: bool) -> usize { if is_smoke { 8192 } else { // Memory must also be divisible by number of nodes, which could be 1, 2, 3, or 4 - 2048 * (((num_cores + 3 - 1) / 3) * 3) + 4096 * (((((num_cores + 1) / 2) + 3 - 1) / 3) * 3) } } + let bench = RackscaleBench { test, cmd_fn, @@ -722,7 +724,7 @@ fn rackscale_memcached_dcm(transport: RackscaleTransport, dcm_config: Option { iface: Arc>>,