Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 220 additions & 19 deletions vmm/src/api/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,24 @@
use std::collections::BTreeMap;
use std::error::Error;
use std::fs::File;
use std::os::fd::AsRawFd;
use std::os::unix::io::{IntoRawFd, RawFd};
use std::os::unix::net::UnixListener;
use std::panic::AssertUnwindSafe;
use std::path::PathBuf;
use std::sync::LazyLock;
use std::sync::mpsc::Sender;
use std::sync::mpsc::{Receiver, Sender, channel, sync_channel};
use std::sync::{Arc, LazyLock, Mutex};
use std::thread;

use hypervisor::HypervisorType;
use micro_http::{
Body, HttpServer, MediaType, Method, Request, Response, ServerError, StatusCode, Version,
Body, HttpServer, MediaType, Method, Request, Response, ServerError, ServerRequest,
ServerResponse, StatusCode, Version,
};
use seccompiler::{SeccompAction, apply_filter};
use serde_json::Error as SerdeError;
use thiserror::Error;
use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
use vmm_sys_util::eventfd::EventFd;

use self::http_endpoint::{VmActionHandler, VmCreate, VmInfo, VmmPing, VmmShutdown};
Expand Down Expand Up @@ -314,6 +317,152 @@ fn handle_http_request(
response
}

/// Keeps track of the worker threads, and the resources needed to interact
/// with them.
#[derive(Debug)]
struct HttpWorkerThreads {
// The worker threads themselves.
threads: Vec<thread::JoinHandle<Result<()>>>,
// An MPSC channel to send server requests to the workers. We put it into
// an option so we can easily drop it in the destructor.
request_tx: Option<Sender<ServerRequest>>,
// An MPSC channel that the workers use to send responses to the HTTP
// server thread.
response_rx: Receiver<ServerResponse>,
// Workers signal this eventfd when they have a response for the HTTP
// server thread.
response_event: EventFd,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

theoretically we could also get rid of this and use a channel from the workers to the server, right? Then all thredas would have the same sender (without the need for arc<mutex> as this works out of the box) and the main thread the receiver.

Thoughts?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a channel from the worker threads to the main thread, but I don't understand why that means that we can get rid of this eventfd. The main thread has to monitor two things:

  • Requests from the API, that it has to distribute to the workers
  • Responses from the workers, that it has to forward to the http_server.

The only usable mechanism here (that I know of) is the epoll mechanism, and we need this eventfd for that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from offline discussion. it is okay to keep as we need eventfd handling with the underlying http server anyway

}

impl HttpWorkerThreads {
fn new(
thread_count: usize,
api_notifier: EventFd,
api_sender: Sender<ApiRequest>,
seccomp_action: &SeccompAction,
hypervisor_type: HypervisorType,
landlock_enable: bool,
exit_evt: EventFd,
) -> Result<Self> {
let response_event = EventFd::new(libc::EFD_NONBLOCK).map_err(VmmError::EventFdCreate)?;
let (response_tx, response_rx) = sync_channel::<ServerResponse>(thread_count);

let mut threads = Vec::new();
let (request_tx, request_rx) = channel::<ServerRequest>();

let request_rx = Arc::new(Mutex::new(request_rx));

// We use the same seccomp filter that we already use for the HTTP server thread.
let api_seccomp_filter =
get_seccomp_filter(seccomp_action, Thread::HttpApi, hypervisor_type)
.map_err(VmmError::CreateSeccompFilter)?;

for n in 0..thread_count {
let response_event = response_event.try_clone().map_err(VmmError::EventFdClone)?;

let response_tx = response_tx.clone();
let request_rx = request_rx.clone();

let api_notifier = api_notifier.try_clone().map_err(VmmError::EventFdClone)?;
let api_sender = api_sender.clone();

let api_seccomp_filter = api_seccomp_filter.clone();
let exit_evt = exit_evt.try_clone().map_err(VmmError::EventFdClone)?;

let thread = thread::Builder::new()
.name(format!("http-worker-{n}").to_string())
.spawn(move || {
debug!("Spawned HTTP worker thread with id {n}",);
if !api_seccomp_filter.is_empty() {
apply_filter(&api_seccomp_filter)
.map_err(VmmError::ApplySeccompFilter)
.map_err(|e| {
error!("Error applying seccomp filter: {:?}", e);
exit_evt.write(1).ok();
e
})?;
}

if landlock_enable {
Landlock::new()
.map_err(VmmError::CreateLandlock)?
.restrict_self()
.map_err(VmmError::ApplyLandlock)
.map_err(|e| {
error!("Error applying landlock to http-worker thread: {:?}", e);
exit_evt.write(1).ok();
e
})?;
}

std::panic::catch_unwind(AssertUnwindSafe(move || {
let id = n;
loop {
let request = request_rx.lock().unwrap().recv();
match request {
Ok(msg) => {
// Process the server request
let response = msg.process(|request| {
handle_http_request(request, &api_notifier, &api_sender)
});

// Send the response to the HTTP server thread together with this
// threads id.
if let Err(e) = response_tx.send(response) {
error!(
"HTTP worker thread {id}: error sending response {}",
e
);
break;
}

// Notify the HTTP server thread.
response_event.write(1).ok();
}
Err(e) => {
error!(
"HTTP worker thread {id}: error receiving request {}",
e
);
break;
}
}
}
}))
.map_err(|_| {
error!("http-worker thread {n} panicked");
exit_evt.write(1).ok()
})
.ok();

Ok(())
})
.map_err(VmmError::HttpThreadSpawn)?;

threads.push(thread);
}

Ok(Self {
threads,
request_tx: Some(request_tx),
response_rx,
response_event,
})
}
}

impl Drop for HttpWorkerThreads {
fn drop(&mut self) {
// Dropping the Sender side of the request channels to throw the worker
// threads out of their loops.
drop(self.request_tx.take());
// Now we can join each thread.
self.threads
.drain(..)
.for_each(|thread| thread.join().unwrap().unwrap());
}
}

fn start_http_thread(
mut server: HttpServer,
api_notifier: EventFd,
Expand All @@ -334,6 +483,42 @@ fn start_http_thread(
.add_kill_switch(api_shutdown_fd_clone)
.map_err(VmmError::CreateApiServer)?;

// We use the epoll mechanism to parallelize this. The epoll tokens are
// attached when registering the FDs with epoll. That way we can later
// check why we were notified.
const HTTP_EPOLL_TOKEN: u64 = 1;
const WORKER_EPOLL_TOKEN: u64 = 2;

// The epoll instance our HTTP server thread will wait on.
let outer_epoll = Epoll::new().unwrap();
let worker_threads = HttpWorkerThreads::new(
2,
api_notifier,
api_sender,
seccomp_action,
hypervisor_type,
landlock_enable,
exit_evt.try_clone().unwrap(),
)?;

// Register the fd that the worker threads will signal.
outer_epoll
.ctl(
ControlOperation::Add,
worker_threads.response_event.as_raw_fd(),
EpollEvent::new(EventSet::IN, WORKER_EPOLL_TOKEN),
)
.unwrap();

// Register the HttpServer's fd.
outer_epoll
.ctl(
ControlOperation::Add,
server.epoll().as_raw_fd(),
EpollEvent::new(EventSet::IN, HTTP_EPOLL_TOKEN),
)
.unwrap();

let thread = thread::Builder::new()
.name("http-server".to_string())
.spawn(move || {
Expand Down Expand Up @@ -361,27 +546,43 @@ fn start_http_thread(
}

std::panic::catch_unwind(AssertUnwindSafe(move || {
let mut events = vec![EpollEvent::default(); 32];
server.start_server().unwrap();

loop {
match server.requests() {
Ok(request_vec) => {
for server_request in request_vec {
if let Err(e) = server.respond(server_request.process(|request| {
handle_http_request(request, &api_notifier, &api_sender)
})) {
let n = outer_epoll.wait(-1, &mut events).unwrap();
for ev in events.iter().take(n) {
match ev.data() {
HTTP_EPOLL_TOKEN => {
// The HttpServer got a request, handle that.
match server.requests() {
Ok(request_vec) => {
for server_request in request_vec {
worker_threads.request_tx.as_ref().unwrap().send(server_request).unwrap();
}
}
Err(ServerError::ShutdownEvent) => {
server.flush_outgoing_writes();
return;
}
Err(e) => {
error!(
"HTTP server error on retrieving incoming request. Error: {}",
e
);
}
}
}
WORKER_EPOLL_TOKEN => {
// One of the worker threads has a response.
// We clear the eventfd first.
let _ = worker_threads.response_event.read().unwrap();
let response = worker_threads.response_rx.recv().unwrap();
if let Err(e) = server.respond(response){
error!("HTTP server error on response: {}", e);
}
}
}
Err(ServerError::ShutdownEvent) => {
server.flush_outgoing_writes();
return;
}
Err(e) => {
error!(
"HTTP server error on retrieving incoming request. Error: {}",
e
);
_ => { }
}
}
}
Expand Down
1 change: 1 addition & 0 deletions vmm/src/seccomp_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,7 @@ fn http_api_thread_rules() -> Result<Vec<(i64, Vec<SeccompRule>)>, BackendError>
(libc::SYS_rt_sigprocmask, vec![]),
(libc::SYS_getcwd, vec![]),
(libc::SYS_clock_nanosleep, vec![]),
(libc::SYS_read, vec![]),
])
}

Expand Down
Loading