Skip to content

Commit 1dee052

Browse files
committed
feat(dataplane): make datplane consume memfd
With this commit the dataplane now consumes and acts on the memfd created by dataplane init from the previous commit. Signed-off-by: Daniel Noland <daniel@githedgehog.com>
1 parent f6f0b71 commit 1dee052

File tree

5 files changed

+88
-82
lines changed

5 files changed

+88
-82
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Dockerfile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ FROM $BASE AS dataplane
33
ARG ARTIFACT
44
ARG ARTIFACT_CLI
55
COPY --link --chown=0:0 "${ARTIFACT}" /dataplane
6+
COPY --link --chown=0:0 "${ARTIFACT}" /dataplane-init
67
COPY --link --chown=0:0 "${ARTIFACT_CLI}" /dataplane-cli
78
WORKDIR /
8-
ENTRYPOINT ["/dataplane"]
9+
ENTRYPOINT ["/dataplane-init"]

dataplane/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ ordermap = { workspace = true, features = ["std"] }
3131
parking_lot = { workspace = true }
3232
pipeline = { workspace = true }
3333
pkt-meta = { workspace = true }
34+
rkyv = { workspace = true, features = [] }
3435
routing = { workspace = true }
3536
serde = { workspace = true, features = ["derive"] }
3637
stats = { workspace = true }

dataplane/src/drivers/dpdk.rs

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use dpdk::queue::tx::{TxQueueConfig, TxQueueIndex};
1414
use dpdk::{dev, eal, socket};
1515
use tracing::{debug, error, info, trace, warn};
1616

17-
use crate::CmdArgs;
1817
use net::buffer::PacketBufferMut;
1918
use net::packet::Packet;
2019
use pipeline::sample_nfs::Passthrough;
@@ -96,34 +95,42 @@ fn start_rte_workers(devices: &[Dev], setup_pipeline: &(impl Sync + Fn() -> DynP
9695
info!("Starting RTE Worker on {lcore_id:?}");
9796
WorkerThread::launch(lcore_id, move || {
9897
let mut pipeline = setup_pipeline();
99-
let rx_queue = devices[0]
100-
.rx_queue(RxQueueIndex(u16::try_from(i).unwrap()))
101-
.unwrap();
102-
let tx_queue = devices[0]
103-
.tx_queue(TxQueueIndex(u16::try_from(i).unwrap()))
104-
.unwrap();
98+
let queues: Vec<_> = devices
99+
.iter()
100+
.map(|device| {
101+
let rx_queue = device
102+
.rx_queue(RxQueueIndex(u16::try_from(i).unwrap()))
103+
.unwrap();
104+
let tx_queue = device
105+
.tx_queue(TxQueueIndex(u16::try_from(i).unwrap()))
106+
.unwrap();
107+
(rx_queue, tx_queue)
108+
})
109+
.collect();
105110
loop {
106-
let mbufs = rx_queue.receive();
107-
let pkts = mbufs.filter_map(|mbuf| match Packet::new(mbuf) {
108-
Ok(pkt) => {
109-
debug!("packet: {pkt:?}");
110-
Some(pkt)
111-
}
112-
Err(e) => {
113-
trace!("Failed to parse packet: {e:?}");
114-
None
115-
}
116-
});
111+
for (rx_queue, tx_queue) in &queues {
112+
let mbufs = rx_queue.receive();
113+
let pkts = mbufs.filter_map(|mbuf| match Packet::new(mbuf) {
114+
Ok(pkt) => {
115+
debug!("packet: {pkt:?}");
116+
Some(pkt)
117+
}
118+
Err(e) => {
119+
trace!("Failed to parse packet: {e:?}");
120+
None
121+
}
122+
});
117123

118-
let pkts_out = pipeline.process(pkts);
119-
let buffers = pkts_out.filter_map(|pkt| match pkt.serialize() {
120-
Ok(buf) => Some(buf),
121-
Err(e) => {
122-
error!("{e:?}");
123-
None
124-
}
125-
});
126-
tx_queue.transmit(buffers);
124+
let pkts_out = pipeline.process(pkts);
125+
let buffers = pkts_out.filter_map(|pkt| match pkt.serialize() {
126+
Ok(buf) => Some(buf),
127+
Err(e) => {
128+
error!("{e:?}");
129+
None
130+
}
131+
});
132+
tx_queue.transmit(buffers);
133+
}
127134
}
128135
});
129136
});
@@ -135,9 +142,10 @@ impl DriverDpdk {
135142
pub fn start(
136143
args: impl IntoIterator<Item = impl AsRef<str>>,
137144
setup_pipeline: &(impl Sync + Fn() -> DynPipeline<Mbuf>),
138-
) {
139-
let eal = init_eal(args);
145+
) -> (Eal, Vec<Dev>) {
146+
let eal = eal::init(args);
140147
let devices = init_devices(&eal);
141148
start_rte_workers(&devices, setup_pipeline);
149+
(eal, devices)
142150
}
143151
}

dataplane/src/main.rs

Lines changed: 47 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ mod statistics;
1111

1212
use crate::packet_processor::start_router;
1313
use crate::statistics::MetricsServer;
14-
use args::{CmdArgs, Parser};
14+
use args::{LaunchConfiguration, TracingConfigSection};
1515

1616
use drivers::dpdk::DriverDpdk;
1717
use drivers::kernel::DriverKernel;
@@ -53,71 +53,72 @@ fn setup_pipeline<Buf: PacketBufferMut>() -> DynPipeline<Buf> {
5353
}
5454
}
5555

56-
fn process_tracing_cmds(args: &CmdArgs) {
57-
if let Some(tracing) = args.tracing()
56+
fn process_tracing_cmds(cfg: &TracingConfigSection) {
57+
if let Some(tracing) = &cfg.config
5858
&& let Err(e) = get_trace_ctl().setup_from_string(tracing)
5959
{
6060
error!("Invalid tracing configuration: {e}");
6161
panic!("Invalid tracing configuration: {e}");
6262
}
63-
if args.show_tracing_tags() {
64-
let out = get_trace_ctl()
65-
.as_string_by_tag()
66-
.unwrap_or_else(|e| e.to_string());
67-
println!("{out}");
68-
std::process::exit(0);
63+
match cfg.show.tags {
64+
args::TracingDisplayOption::Hide => {}
65+
args::TracingDisplayOption::Show => {
66+
let out = get_trace_ctl()
67+
.as_string_by_tag()
68+
.unwrap_or_else(|e| e.to_string());
69+
println!("{out}");
70+
std::process::exit(0);
71+
}
6972
}
70-
if args.show_tracing_targets() {
73+
if cfg.show.targets == args::TracingDisplayOption::Show {
7174
let out = get_trace_ctl()
7275
.as_string()
7376
.unwrap_or_else(|e| e.to_string());
7477
println!("{out}");
7578
std::process::exit(0);
7679
}
77-
if args.tracing_config_generate() {
78-
let out = get_trace_ctl()
79-
.as_config_string()
80-
.unwrap_or_else(|e| e.to_string());
81-
println!("{out}");
82-
std::process::exit(0);
83-
}
80+
// if args.tracing_config_generate() {
81+
// let out = get_trace_ctl()
82+
// .as_config_string()
83+
// .unwrap_or_else(|e| e.to_string());
84+
// println!("{out}");
85+
// std::process::exit(0);
86+
// }
8487
}
8588

8689
fn main() {
90+
let launch_config = LaunchConfiguration::inherit();
8791
init_logging();
88-
let args = CmdArgs::parse();
89-
process_tracing_cmds(&args);
92+
info!("launch config: {launch_config:?}");
93+
process_tracing_cmds(&launch_config.tracing);
9094

9195
info!("Starting gateway process...");
9296

9397
let (stop_tx, stop_rx) = std::sync::mpsc::channel();
9498
ctrlc::set_handler(move || stop_tx.send(()).expect("Error sending SIGINT signal"))
9599
.expect("failed to set SIGINT handler");
96100

97-
let grpc_addr = match args.get_grpc_address() {
98-
Ok(addr) => addr,
99-
Err(e) => {
100-
error!("Invalid gRPC address configuration: {e}");
101-
panic!("Management service configuration error. Aborting...");
102-
}
103-
};
101+
let grpc_addr = launch_config.config_server.address;
104102

105103
/* router parameters */
106-
let Ok(config) = RouterParamsBuilder::default()
107-
.metrics_addr(args.metrics_address())
108-
.cli_sock_path(args.cli_sock_path())
109-
.cpi_sock_path(args.cpi_sock_path())
110-
.frr_agent_path(args.frr_agent_path())
104+
let config = match RouterParamsBuilder::default()
105+
.metrics_addr(launch_config.metrics.address)
106+
.cli_sock_path(launch_config.cli.cli_sock_path)
107+
.cpi_sock_path(launch_config.routing.control_plane_socket)
108+
.frr_agent_path(launch_config.routing.frr_agent_socket)
111109
.build()
112-
else {
113-
error!("Bad router configuration");
114-
panic!("Bad router configuration");
110+
{
111+
Ok(config) => config,
112+
Err(e) => {
113+
error!("error building router parameters: {e}");
114+
panic!("error building router parameters: {e}");
115+
}
115116
};
116117

117118
// start the router; returns control-plane handles and a pipeline factory (Arc<... Fn() -> DynPipeline<_> >)
118119
let setup = start_router(config).expect("failed to start router");
119120

120-
MetricsServer::new(args.metrics_address(), setup.stats);
121+
let _metrics_server = MetricsServer::new(launch_config.metrics.address, setup.stats);
121122

122123
/* pipeline builder */
123124
let pipeline_factory = setup.pipeline;
@@ -135,25 +136,19 @@ fn main() {
135136
.expect("Failed to start gRPC server");
136137

137138
/* start driver with the provided pipeline builder */
138-
match args.get_driver_name() {
139-
"dpdk" => {
140-
info!("Using driver DPDK...");
141-
DriverDpdk::start(args.eal_params(), &setup_pipeline);
142-
}
143-
"kernel" => {
144-
info!("Using driver kernel...");
145-
DriverKernel::start(
146-
args.kernel_interfaces(),
147-
args.kernel_num_workers(),
148-
&pipeline_factory,
149-
);
139+
let _keep = match &launch_config.driver {
140+
args::DriverConfigSection::Dpdk(dpdk_driver_config) => {
141+
let (eal, devices) =
142+
DriverDpdk::start(dpdk_driver_config.eal_args.clone(), &setup_pipeline);
143+
info!("Now using driver DPDK...");
144+
Some((eal, devices))
150145
}
151-
other => {
152-
error!("Unknown driver '{other}'. Aborting...");
153-
panic!("Packet processing pipeline failed to start. Aborting...");
146+
args::DriverConfigSection::Kernel(kernel_driver_config) => {
147+
DriverKernel::start(&kernel_driver_config.interfaces, 2, &pipeline_factory);
148+
info!("Now using driver kernel...");
149+
None
154150
}
155-
}
156-
151+
};
157152
stop_rx.recv().expect("failed to receive stop signal");
158153
info!("Shutting down dataplane");
159154
std::process::exit(0);

0 commit comments

Comments
 (0)