Skip to content
This repository was archived by the owner on Jul 17, 2025. It is now read-only.
2 changes: 2 additions & 0 deletions kernel/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,9 @@ def configure_network(args):
for _, ncfg in zip(range(0, args.workers), NETWORK_CONFIG):
sudo[tunctl[['-t', ncfg, '-u', user, '-g', group]]]()
sudo[ip[['link', 'set', ncfg, 'up']]](retcode=(0, 1))
sudo[ip[['link', 'set', ncfg, 'txqueuelen', 65536]]]()
sudo[brctl[['addif', 'br0', ncfg]]]()
sudo[ip[['link', 'set', 'br0', 'txqueuelen', 65536]]]()
sudo[ip[['link', 'set', 'br0', 'up']]](retcode=(0, 1))


Expand Down
49 changes: 31 additions & 18 deletions kernel/src/arch/x86_64/rackscale/client_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use rpc::client::Client;
use rpc::rpc::RPCError;

use crate::arch::kcb::try_per_core_mem;
use crate::arch::rackscale::controller::CONTROLLER_PORT_BASE;
use crate::arch::rackscale::controller::{CONTROLLER_PORT_BASE, MAX_CORES_PER_CLIENT};
use crate::arch::rackscale::fileops::rw::{RW_SHMEM_BUF, RW_SHMEM_BUF_LEN};
use crate::arch::rackscale::FrameCacheBase;
use crate::arch::MAX_MACHINES;
Expand All @@ -25,11 +25,12 @@ use crate::error::{KError, KResult};
use crate::memory::backends::MemManager;
use crate::memory::shmem_affinity::{local_shmem_affinity, mid_to_shmem_affinity};
use crate::process::MAX_PROCESSES;
use crate::transport::shmem::NUM_SHMEM_TRANSPORTS;

/// This is the state the client records about itself
pub(crate) struct ClientState {
/// The RPC client used to communicate with the controller
pub(crate) rpc_client: Arc<Mutex<Client>>,
pub(crate) rpc_clients: Arc<ArrayVec<Mutex<Client>, { NUM_SHMEM_TRANSPORTS as usize }>>,

/// Used to store shmem affinity base pages
pub(crate) affinity_base_pages: Arc<ArrayVec<Mutex<Box<dyn MemManager + Send>>, MAX_MACHINES>>,
Expand All @@ -40,27 +41,39 @@ pub(crate) struct ClientState {

impl ClientState {
pub(crate) fn new() -> ClientState {
// Create network stack and instantiate RPC Client
let rpc_client = if crate::CMDLINE
let clients = if crate::CMDLINE
.get()
.map_or(false, |c| c.transport == Transport::Ethernet)
{
Arc::new(Mutex::new(
crate::transport::ethernet::init_ethernet_rpc(
smoltcp::wire::IpAddress::v4(172, 31, 0, 11),
CONTROLLER_PORT_BASE + (*crate::environment::MACHINE_ID as u16 - 1),
true,
)
.expect("Failed to initialize ethernet RPC"),
))
let num_cores: u64 = atopology::MACHINE_TOPOLOGY.num_threads() as u64;
let mid = *crate::environment::MACHINE_ID;
let port_base = CONTROLLER_PORT_BASE + ((mid as u16 - 1) * MAX_CORES_PER_CLIENT);

log::debug!(
"Sending ethernet initialization for client with {:?} cores, mid {:?}, and port {:?}",
num_cores,
mid,
port_base,
);

crate::transport::ethernet::init_ethernet_rpc(
smoltcp::wire::IpAddress::v4(172, 31, 0, 11),
port_base,
num_cores,
false,
)
.expect("Failed to initialize ethernet RPC")
} else {
// Default is Shmem, even if transport unspecified
Arc::new(Mutex::new(
crate::transport::shmem::init_shmem_rpc(true)
.expect("Failed to initialize shmem RPC"),
))
crate::transport::shmem::init_shmem_rpc().expect("Failed to initialize shmem RPC")
};

log::warn!("CLIENT READY");

let mut rpc_clients = ArrayVec::new();
for client in clients.into_iter() {
rpc_clients.push(Mutex::new(client));
}

let mut per_process_base_pages = ArrayVec::new();
for _i in 0..MAX_PROCESSES {
// TODO(rackscale): this is a bogus affinity because it should really be "ANY_SHMEM"
Expand All @@ -76,7 +89,7 @@ impl ClientState {

log::debug!("Finished initializing client state");
ClientState {
rpc_client,
rpc_clients: Arc::new(rpc_clients),
affinity_base_pages: Arc::new(affinity_base_pages),
per_process_base_pages: Arc::new(per_process_base_pages),
}
Expand Down
120 changes: 91 additions & 29 deletions kernel/src/arch/x86_64/rackscale/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use crate::arch::rackscale::dcm::{
use crate::arch::MAX_MACHINES;
use crate::cmdline::Transport;
use crate::transport::ethernet::ETHERNET_IFACE;
use crate::transport::shmem::create_shmem_transport;
use crate::transport::shmem::{create_shmem_transport, NUM_SHMEM_TRANSPORTS};

use crate::arch::rackscale::registration::rpc_servers_to_register;

use super::*;

Expand All @@ -27,50 +29,108 @@ pub(crate) const CONTROLLER_PORT_BASE: u16 = 6970;
static ClientReadyCount: AtomicU64 = AtomicU64::new(0);
static DCMServerReady: AtomicBool = AtomicBool::new(false);

// Used for port allocation ranges for rpc servers
pub const MAX_CORES_PER_CLIENT: u16 = 24;

/// Controller main method
pub(crate) fn run() {
use core::hint::spin_loop;
use core::time::Duration;
let mid = *crate::environment::CORE_ID;

// Initialize one server per controller thread
let mut server = if crate::CMDLINE
// TODO: still dependent on NUM_SHMEM_TRANSPORTS, ensure it works for eth too
let mut servers: ArrayVec<Server<'_>, { NUM_SHMEM_TRANSPORTS as usize }> = ArrayVec::new();

if crate::CMDLINE
.get()
.map_or(false, |c| c.transport == Transport::Ethernet)
{
let port_base = CONTROLLER_PORT_BASE + (mid as u16 - 1) * MAX_CORES_PER_CLIENT;

log::debug!(
"Initializing transport with mid {:?} on port {:?}",
mid,
port_base
);
let transport = Box::new(
TCPTransport::new(
None,
CONTROLLER_PORT_BASE + mid as u16 - 1,
Arc::clone(&ETHERNET_IFACE),
)
.expect("Failed to create TCP transport"),
TCPTransport::new(None, port_base, Arc::clone(&ETHERNET_IFACE))
.expect("Failed to create TCP transport"),
);
let mut server = Server::new(transport);
register_rpcs(&mut server);
server
servers.push(server);

ClientReadyCount.fetch_add(1, Ordering::SeqCst);
while !DCMServerReady.load(Ordering::SeqCst) {}

servers[0]
.add_client(&CLIENT_REGISTRAR)
.expect("Failed to accept client");

log::debug!("Initial RPC server initialized. Learning client topology...");

// wait until controller learns about client topology
while (*rpc_servers_to_register.lock() == 0) {}

log::debug!(
"Received client topology, registering subsequent {:?} cores",
*rpc_servers_to_register.lock()
);

// register n-1 servers as we already handled the initial request
// will do nothing if no more servers to register
for i in 0..*rpc_servers_to_register.lock() - 1 {
let transport = Box::new(
TCPTransport::new(
None,
port_base + (i as u16 + 1),
Arc::clone(&ETHERNET_IFACE),
)
.expect("Failed to create TCP transport"),
);
let mut server = Server::new(transport);
register_rpcs(&mut server);
servers.push(server);
}

log::debug!("Transports added. Adding clients...");

// already registered the first server
// will do nothing if no more servers to register
for s_index in 1..servers.len() {
servers[s_index]
.add_client(&CLIENT_REGISTRAR)
.expect("Failed to accept client");
}

log::debug!("Finished registering RPC servers for client");
} else if crate::CMDLINE
.get()
.map_or(false, |c| c.transport == Transport::Shmem)
{
let transport = Box::new(
create_shmem_transport(mid.try_into().unwrap())
.expect("Failed to create shmem transport"),
);
let transports = create_shmem_transport(mid.try_into().unwrap())
.expect("Failed to create shmem transport");

let mut server = Server::new(transport);
register_rpcs(&mut server);
server
} else {
unreachable!("No supported transport layer specified in kernel argument");
};
// let mut servers: ArrayVec<Server<'_>, { NUM_SHMEM_TRANSPORTS as usize }> = ArrayVec::new();
for transport in transports.into_iter() {
let mut server = Server::new(Box::new(transport));
register_rpcs(&mut server);
servers.push(server);
}

ClientReadyCount.fetch_add(1, Ordering::SeqCst);
ClientReadyCount.fetch_add(1, Ordering::SeqCst);

// Wait for all clients to connect before fulfilling any RPCs.
while !DCMServerReady.load(Ordering::SeqCst) {}
// Wait for all clients to connect before fulfilling any RPCs.
while !DCMServerReady.load(Ordering::SeqCst) {}

server
.add_client(&CLIENT_REGISTRAR)
.expect("Failed to accept client");
for s_index in 0..servers.len() {
servers[s_index]
.add_client(&CLIENT_REGISTRAR)
.expect("Failed to accept client");
}
} else {
unreachable!("No supported transport layer specified in kernel argument");
};

ClientReadyCount.fetch_add(1, Ordering::SeqCst);

Expand Down Expand Up @@ -114,9 +174,11 @@ pub(crate) fn run() {
// Start running the RPC server
log::info!("Starting RPC server for client {:?}!", mid);
loop {
let _handled = server
.try_handle()
.expect("Controller failed to handle RPC");
for s_index in 0..servers.len() {
let _handled = servers[s_index]
.try_handle()
.expect("Controller failed to handle RPC");
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion kernel/src/arch/x86_64/rackscale/dcm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ unsafe_abomonate!(DCMOps);

lazy_static! {
pub(crate) static ref DCM_CLIENT: Arc<Mutex<Client>> = Arc::new(Mutex::new(
init_ethernet_rpc(IpAddress::v4(172, 31, 0, 20), DCM_CLIENT_PORT, false).unwrap(),
init_ethernet_rpc(IpAddress::v4(172, 31, 0, 20), DCM_CLIENT_PORT, 1, true)
.unwrap()
.into_iter()
.nth(0)
.unwrap(),
));
}
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/fileops/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ pub(crate) fn rpc_close(pid: usize, fd: FileDescriptor) -> KResult<(u64, u64)> {
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];

// Call Close() RPC
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::Close as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::Close as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;

// Decode and return result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/fileops/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ pub(crate) fn rpc_delete(pid: usize, pathname: String) -> KResult<(u64, u64)> {
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];

// Call RPC
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::Delete as RPCType,
&[&req_data, &pathname.as_bytes()],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::Delete as RPCType,
&[&req_data, &pathname.as_bytes()],
&mut [&mut res_data],
)?;

// Decode result - return result if decoding successful
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/fileops/getinfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ pub(crate) fn rpc_getinfo<P: AsRef<[u8]> + Debug>(pid: usize, name: P) -> KResul

// Construct result buffer and call RPC
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::GetInfo as RPCType,
&[&req_data, name.as_ref()],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::GetInfo as RPCType,
&[&req_data, name.as_ref()],
&mut [&mut res_data],
)?;

// Decode and return the result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/fileops/mkdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ pub(crate) fn rpc_mkdir<P: AsRef<[u8]> + Debug>(
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];

// Call RPC
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::MkDir as RPCType,
&[&req_data, pathname.as_ref()],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::MkDir as RPCType,
&[&req_data, pathname.as_ref()],
&mut [&mut res_data],
)?;

// Parse and return result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/fileops/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ fn rpc_open_create<P: AsRef<[u8]> + Debug>(
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];

// Call the RPC
CLIENT_STATE.rpc_client.lock().call(
rpc_type,
&[&req_data, pathname.as_ref()],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
rpc_type,
&[&req_data, pathname.as_ref()],
&mut [&mut res_data],
)?;

// Decode and return the result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/fileops/rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ pub(crate) fn rpc_rename<P: AsRef<[u8]> + Debug>(
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];

// Call the RPC
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::FileRename as RPCType,
&[&req_data, oldname.as_ref(), newname.as_ref()],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::FileRename as RPCType,
&[&req_data, oldname.as_ref(), newname.as_ref()],
&mut [&mut res_data],
)?;

// Parse and return the result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
Loading