diff --git a/vmm/src/api/http/mod.rs b/vmm/src/api/http/mod.rs index 4dfbf7b9b0..eaa18d311b 100644 --- a/vmm/src/api/http/mod.rs +++ b/vmm/src/api/http/mod.rs @@ -6,21 +6,24 @@ use std::collections::BTreeMap; use std::error::Error; use std::fs::File; +use std::os::fd::AsRawFd; use std::os::unix::io::{IntoRawFd, RawFd}; use std::os::unix::net::UnixListener; use std::panic::AssertUnwindSafe; use std::path::PathBuf; -use std::sync::LazyLock; -use std::sync::mpsc::Sender; +use std::sync::mpsc::{Receiver, Sender, channel, sync_channel}; +use std::sync::{Arc, LazyLock, Mutex}; use std::thread; use hypervisor::HypervisorType; use micro_http::{ - Body, HttpServer, MediaType, Method, Request, Response, ServerError, StatusCode, Version, + Body, HttpServer, MediaType, Method, Request, Response, ServerError, ServerRequest, + ServerResponse, StatusCode, Version, }; use seccompiler::{SeccompAction, apply_filter}; use serde_json::Error as SerdeError; use thiserror::Error; +use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet}; use vmm_sys_util::eventfd::EventFd; use self::http_endpoint::{VmActionHandler, VmCreate, VmInfo, VmmPing, VmmShutdown}; @@ -314,6 +317,152 @@ fn handle_http_request( response } +/// Keeps track of the worker threads, and the resources needed to interact +/// with them. +#[derive(Debug)] +struct HttpWorkerThreads { + // The worker threads themselves. + threads: Vec>>, + // An MPSC channel to send server requests to the workers. We put it into + // an option so we can easily drop it in the destructor. + request_tx: Option>, + // An MPSC channel that the workers use to send responses to the HTTP + // server thread. + response_rx: Receiver, + // Workers signal this eventfd when they have a response for the HTTP + // server thread. + response_event: EventFd, +} + +impl HttpWorkerThreads { + fn new( + thread_count: usize, + api_notifier: EventFd, + api_sender: Sender, + seccomp_action: &SeccompAction, + hypervisor_type: HypervisorType, + landlock_enable: bool, + exit_evt: EventFd, + ) -> Result { + let response_event = EventFd::new(libc::EFD_NONBLOCK).map_err(VmmError::EventFdCreate)?; + let (response_tx, response_rx) = sync_channel::(thread_count); + + let mut threads = Vec::new(); + let (request_tx, request_rx) = channel::(); + + let request_rx = Arc::new(Mutex::new(request_rx)); + + // We use the same seccomp filter that we already use for the HTTP server thread. + let api_seccomp_filter = + get_seccomp_filter(seccomp_action, Thread::HttpApi, hypervisor_type) + .map_err(VmmError::CreateSeccompFilter)?; + + for n in 0..thread_count { + let response_event = response_event.try_clone().map_err(VmmError::EventFdClone)?; + + let response_tx = response_tx.clone(); + let request_rx = request_rx.clone(); + + let api_notifier = api_notifier.try_clone().map_err(VmmError::EventFdClone)?; + let api_sender = api_sender.clone(); + + let api_seccomp_filter = api_seccomp_filter.clone(); + let exit_evt = exit_evt.try_clone().map_err(VmmError::EventFdClone)?; + + let thread = thread::Builder::new() + .name(format!("http-worker-{n}").to_string()) + .spawn(move || { + debug!("Spawned HTTP worker thread with id {n}",); + if !api_seccomp_filter.is_empty() { + apply_filter(&api_seccomp_filter) + .map_err(VmmError::ApplySeccompFilter) + .map_err(|e| { + error!("Error applying seccomp filter: {:?}", e); + exit_evt.write(1).ok(); + e + })?; + } + + if landlock_enable { + Landlock::new() + .map_err(VmmError::CreateLandlock)? + .restrict_self() + .map_err(VmmError::ApplyLandlock) + .map_err(|e| { + error!("Error applying landlock to http-worker thread: {:?}", e); + exit_evt.write(1).ok(); + e + })?; + } + + std::panic::catch_unwind(AssertUnwindSafe(move || { + let id = n; + loop { + let request = request_rx.lock().unwrap().recv(); + match request { + Ok(msg) => { + // Process the server request + let response = msg.process(|request| { + handle_http_request(request, &api_notifier, &api_sender) + }); + + // Send the response to the HTTP server thread together with this + // threads id. + if let Err(e) = response_tx.send(response) { + error!( + "HTTP worker thread {id}: error sending response {}", + e + ); + break; + } + + // Notify the HTTP server thread. + response_event.write(1).ok(); + } + Err(e) => { + error!( + "HTTP worker thread {id}: error receiving request {}", + e + ); + break; + } + } + } + })) + .map_err(|_| { + error!("http-worker thread {n} panicked"); + exit_evt.write(1).ok() + }) + .ok(); + + Ok(()) + }) + .map_err(VmmError::HttpThreadSpawn)?; + + threads.push(thread); + } + + Ok(Self { + threads, + request_tx: Some(request_tx), + response_rx, + response_event, + }) + } +} + +impl Drop for HttpWorkerThreads { + fn drop(&mut self) { + // Dropping the Sender side of the request channels to throw the worker + // threads out of their loops. + drop(self.request_tx.take()); + // Now we can join each thread. + self.threads + .drain(..) + .for_each(|thread| thread.join().unwrap().unwrap()); + } +} + fn start_http_thread( mut server: HttpServer, api_notifier: EventFd, @@ -334,6 +483,42 @@ fn start_http_thread( .add_kill_switch(api_shutdown_fd_clone) .map_err(VmmError::CreateApiServer)?; + // We use the epoll mechanism to parallelize this. The epoll tokens are + // attached when registering the FDs with epoll. That way we can later + // check why we were notified. + const HTTP_EPOLL_TOKEN: u64 = 1; + const WORKER_EPOLL_TOKEN: u64 = 2; + + // The epoll instance our HTTP server thread will wait on. + let outer_epoll = Epoll::new().unwrap(); + let worker_threads = HttpWorkerThreads::new( + 2, + api_notifier, + api_sender, + seccomp_action, + hypervisor_type, + landlock_enable, + exit_evt.try_clone().unwrap(), + )?; + + // Register the fd that the worker threads will signal. + outer_epoll + .ctl( + ControlOperation::Add, + worker_threads.response_event.as_raw_fd(), + EpollEvent::new(EventSet::IN, WORKER_EPOLL_TOKEN), + ) + .unwrap(); + + // Register the HttpServer's fd. + outer_epoll + .ctl( + ControlOperation::Add, + server.epoll().as_raw_fd(), + EpollEvent::new(EventSet::IN, HTTP_EPOLL_TOKEN), + ) + .unwrap(); + let thread = thread::Builder::new() .name("http-server".to_string()) .spawn(move || { @@ -361,27 +546,43 @@ fn start_http_thread( } std::panic::catch_unwind(AssertUnwindSafe(move || { + let mut events = vec![EpollEvent::default(); 32]; server.start_server().unwrap(); + loop { - match server.requests() { - Ok(request_vec) => { - for server_request in request_vec { - if let Err(e) = server.respond(server_request.process(|request| { - handle_http_request(request, &api_notifier, &api_sender) - })) { + let n = outer_epoll.wait(-1, &mut events).unwrap(); + for ev in events.iter().take(n) { + match ev.data() { + HTTP_EPOLL_TOKEN => { + // The HttpServer got a request, handle that. + match server.requests() { + Ok(request_vec) => { + for server_request in request_vec { + worker_threads.request_tx.as_ref().unwrap().send(server_request).unwrap(); + } + } + Err(ServerError::ShutdownEvent) => { + server.flush_outgoing_writes(); + return; + } + Err(e) => { + error!( + "HTTP server error on retrieving incoming request. Error: {}", + e + ); + } + } + } + WORKER_EPOLL_TOKEN => { + // One of the worker threads has a response. + // We clear the eventfd first. + let _ = worker_threads.response_event.read().unwrap(); + let response = worker_threads.response_rx.recv().unwrap(); + if let Err(e) = server.respond(response){ error!("HTTP server error on response: {}", e); } } - } - Err(ServerError::ShutdownEvent) => { - server.flush_outgoing_writes(); - return; - } - Err(e) => { - error!( - "HTTP server error on retrieving incoming request. Error: {}", - e - ); + _ => { } } } } diff --git a/vmm/src/seccomp_filters.rs b/vmm/src/seccomp_filters.rs index 44ba48d293..a359a929e5 100644 --- a/vmm/src/seccomp_filters.rs +++ b/vmm/src/seccomp_filters.rs @@ -868,6 +868,7 @@ fn http_api_thread_rules() -> Result)>, BackendError> (libc::SYS_rt_sigprocmask, vec![]), (libc::SYS_getcwd, vec![]), (libc::SYS_clock_nanosleep, vec![]), + (libc::SYS_read, vec![]), ]) }