66use std:: collections:: BTreeMap ;
77use std:: error:: Error ;
88use std:: fs:: File ;
9+ use std:: os:: fd:: AsRawFd ;
910use std:: os:: unix:: io:: { IntoRawFd , RawFd } ;
1011use std:: os:: unix:: net:: UnixListener ;
1112use std:: panic:: AssertUnwindSafe ;
1213use std:: path:: PathBuf ;
13- use std:: sync:: LazyLock ;
1414use std:: sync:: mpsc:: Sender ;
15+ use std:: sync:: { Arc , LazyLock , Mutex } ;
1516use std:: thread;
1617
1718use hypervisor:: HypervisorType ;
1819use micro_http:: {
19- Body , HttpServer , MediaType , Method , Request , Response , ServerError , StatusCode , Version ,
20+ Body , HttpServer , MediaType , Method , Request , Response , ServerError , ServerRequest ,
21+ ServerResponse , StatusCode , Version ,
2022} ;
2123use seccompiler:: { SeccompAction , apply_filter} ;
2224use serde_json:: Error as SerdeError ;
2325use thiserror:: Error ;
26+ use vmm_sys_util:: epoll:: { ControlOperation , Epoll , EpollEvent , EventSet } ;
2427use vmm_sys_util:: eventfd:: EventFd ;
2528
2629use self :: http_endpoint:: { VmActionHandler , VmCreate , VmInfo , VmmPing , VmmShutdown } ;
@@ -314,6 +317,153 @@ fn handle_http_request(
314317 response
315318}
316319
320+ /// Keeps track of the worker threads, and the resources needed to interact
321+ /// with them.
322+ #[ derive( Debug ) ]
323+ struct HttpWorkerThreads {
324+ // The worker threads themselves.
325+ threads : Vec < thread:: JoinHandle < Result < ( ) > > > ,
326+ // An MPSC channel to send server requests to the workers. We put it into
327+ // an option so we can easily drop it in the destructor.
328+ request_tx : Option < std:: sync:: mpsc:: Sender < ServerRequest > > ,
329+ // An MPSC channel that the workers use to send responses to the HTTP
330+ // server thread.
331+ response_rx : std:: sync:: mpsc:: Receiver < ServerResponse > ,
332+ // Workers signal this eventfd when they have a response for the HTTP
333+ // server thread.
334+ response_event : EventFd ,
335+ }
336+
337+ impl HttpWorkerThreads {
338+ fn new (
339+ thread_count : usize ,
340+ api_notifier : EventFd ,
341+ api_sender : Sender < ApiRequest > ,
342+ seccomp_action : & SeccompAction ,
343+ hypervisor_type : HypervisorType ,
344+ landlock_enable : bool ,
345+ exit_evt : EventFd ,
346+ ) -> Result < Self > {
347+ let response_event = EventFd :: new ( libc:: EFD_NONBLOCK ) . map_err ( VmmError :: EventFdCreate ) ?;
348+ let ( response_tx, response_rx) =
349+ std:: sync:: mpsc:: sync_channel :: < ServerResponse > ( thread_count) ;
350+
351+ let mut threads = Vec :: new ( ) ;
352+ let ( request_tx, request_rx) = std:: sync:: mpsc:: channel :: < ServerRequest > ( ) ;
353+
354+ let request_rx = Arc :: new ( Mutex :: new ( request_rx) ) ;
355+
356+ // We use the same seccomp filter that we already use for the HTTP server thread.
357+ let api_seccomp_filter =
358+ get_seccomp_filter ( seccomp_action, Thread :: HttpApi , hypervisor_type)
359+ . map_err ( VmmError :: CreateSeccompFilter ) ?;
360+
361+ for n in 0 ..thread_count {
362+ let response_event = response_event. try_clone ( ) . map_err ( VmmError :: EventFdClone ) ?;
363+
364+ let response_tx = response_tx. clone ( ) ;
365+ let request_rx = request_rx. clone ( ) ;
366+
367+ let api_notifier = api_notifier. try_clone ( ) . map_err ( VmmError :: EventFdClone ) ?;
368+ let api_sender = api_sender. clone ( ) ;
369+
370+ let api_seccomp_filter = api_seccomp_filter. clone ( ) ;
371+ let exit_evt = exit_evt. try_clone ( ) . map_err ( VmmError :: EventFdClone ) ?;
372+
373+ let thread = thread:: Builder :: new ( )
374+ . name ( format ! ( "http-worker-{n}" ) . to_string ( ) )
375+ . spawn ( move || {
376+ info ! ( "Spawned HTTP worker thread with id {n}" , ) ;
377+ if !api_seccomp_filter. is_empty ( ) {
378+ apply_filter ( & api_seccomp_filter)
379+ . map_err ( VmmError :: ApplySeccompFilter )
380+ . map_err ( |e| {
381+ error ! ( "Error applying seccomp filter: {:?}" , e) ;
382+ exit_evt. write ( 1 ) . ok ( ) ;
383+ e
384+ } ) ?;
385+ }
386+
387+ if landlock_enable {
388+ Landlock :: new ( )
389+ . map_err ( VmmError :: CreateLandlock ) ?
390+ . restrict_self ( )
391+ . map_err ( VmmError :: ApplyLandlock )
392+ . map_err ( |e| {
393+ error ! ( "Error applying landlock to http-worker thread: {:?}" , e) ;
394+ exit_evt. write ( 1 ) . ok ( ) ;
395+ e
396+ } ) ?;
397+ }
398+
399+ std:: panic:: catch_unwind ( AssertUnwindSafe ( move || {
400+ let id = n;
401+ loop {
402+ let request = request_rx. lock ( ) . unwrap ( ) . recv ( ) ;
403+ match request {
404+ Ok ( msg) => {
405+ // Process the server request
406+ let response = msg. process ( |request| {
407+ handle_http_request ( request, & api_notifier, & api_sender)
408+ } ) ;
409+
410+ // Send the response to the HTTP server thread together with this
411+ // threads id.
412+ if let Err ( e) = response_tx. send ( response) {
413+ error ! (
414+ "HTTP worker thread {id}: error sending response {}" ,
415+ e
416+ ) ;
417+ break ;
418+ }
419+
420+ // Notify the HTTP server thread.
421+ response_event. write ( 1 ) . ok ( ) ;
422+ }
423+ Err ( e) => {
424+ error ! (
425+ "HTTP worker thread {id}: error receiving request {}" ,
426+ e
427+ ) ;
428+ break ;
429+ }
430+ }
431+ }
432+ } ) )
433+ . map_err ( |_| {
434+ error ! ( "http-worker thread {n} panicked" ) ;
435+ exit_evt. write ( 1 ) . ok ( )
436+ } )
437+ . ok ( ) ;
438+
439+ Ok ( ( ) )
440+ } )
441+ . map_err ( VmmError :: HttpThreadSpawn ) ?;
442+
443+ threads. push ( thread) ;
444+ }
445+
446+ Ok ( Self {
447+ threads,
448+ request_tx : Some ( request_tx) ,
449+ response_rx,
450+ response_event,
451+ } )
452+ }
453+ }
454+
455+ impl Drop for HttpWorkerThreads {
456+ fn drop ( & mut self ) {
457+ // Dropping the Sender side of the request channels to throw the worker
458+ // threads out of their loops.
459+ drop ( self . request_tx . take ( ) ) ;
460+ // Now we can join each thread.
461+ self . threads
462+ . drain ( ..)
463+ . for_each ( |thread| thread. join ( ) . unwrap ( ) . unwrap ( ) ) ;
464+ }
465+ }
466+
317467fn start_http_thread (
318468 mut server : HttpServer ,
319469 api_notifier : EventFd ,
@@ -334,6 +484,42 @@ fn start_http_thread(
334484 . add_kill_switch ( api_shutdown_fd_clone)
335485 . map_err ( VmmError :: CreateApiServer ) ?;
336486
487+ // We use the epoll mechanism to parallelize this. The epoll tokens are
488+ // attached when registering the FDs with epoll. That way we can later
489+ // check why we were notified.
490+ const HTTP_EPOLL_TOKEN : u64 = 1 ;
491+ const WORKER_EPOLL_TOKEN : u64 = 2 ;
492+
493+ // The epoll instance our HTTP server thread will wait on.
494+ let outer_epoll = Epoll :: new ( ) . unwrap ( ) ;
495+ let worker_threads = HttpWorkerThreads :: new (
496+ 2 ,
497+ api_notifier,
498+ api_sender,
499+ seccomp_action,
500+ hypervisor_type,
501+ landlock_enable,
502+ exit_evt. try_clone ( ) . unwrap ( ) ,
503+ ) ?;
504+
505+ // Register the fd that the worker threads will signal.
506+ outer_epoll
507+ . ctl (
508+ ControlOperation :: Add ,
509+ worker_threads. response_event . as_raw_fd ( ) ,
510+ EpollEvent :: new ( EventSet :: IN , WORKER_EPOLL_TOKEN ) ,
511+ )
512+ . unwrap ( ) ;
513+
514+ // Register the HttpServer's fd.
515+ outer_epoll
516+ . ctl (
517+ ControlOperation :: Add ,
518+ server. epoll ( ) . as_raw_fd ( ) ,
519+ EpollEvent :: new ( EventSet :: IN , HTTP_EPOLL_TOKEN ) ,
520+ )
521+ . unwrap ( ) ;
522+
337523 let thread = thread:: Builder :: new ( )
338524 . name ( "http-server" . to_string ( ) )
339525 . spawn ( move || {
@@ -361,27 +547,43 @@ fn start_http_thread(
361547 }
362548
363549 std:: panic:: catch_unwind ( AssertUnwindSafe ( move || {
550+ let mut events = vec ! [ EpollEvent :: default ( ) ; 32 ] ;
364551 server. start_server ( ) . unwrap ( ) ;
552+
365553 loop {
366- match server. requests ( ) {
367- Ok ( request_vec) => {
368- for server_request in request_vec {
369- if let Err ( e) = server. respond ( server_request. process ( |request| {
370- handle_http_request ( request, & api_notifier, & api_sender)
371- } ) ) {
554+ let n = outer_epoll. wait ( -1 , & mut events) . unwrap ( ) ;
555+ for ev in events. iter ( ) . take ( n) {
556+ match ev. data ( ) {
557+ HTTP_EPOLL_TOKEN => {
558+ // The HttpServer got a request, handle that.
559+ match server. requests ( ) {
560+ Ok ( request_vec) => {
561+ for server_request in request_vec {
562+ worker_threads. request_tx . as_ref ( ) . unwrap ( ) . send ( server_request) . unwrap ( ) ;
563+ }
564+ }
565+ Err ( ServerError :: ShutdownEvent ) => {
566+ server. flush_outgoing_writes ( ) ;
567+ return ;
568+ }
569+ Err ( e) => {
570+ error ! (
571+ "HTTP server error on retrieving incoming request. Error: {}" ,
572+ e
573+ ) ;
574+ }
575+ }
576+ }
577+ WORKER_EPOLL_TOKEN => {
578+ // One of the worker threads has a response.
579+ // We clear the eventfd first.
580+ let _ = worker_threads. response_event . read ( ) . unwrap ( ) ;
581+ let response = worker_threads. response_rx . recv ( ) . unwrap ( ) ;
582+ if let Err ( e) = server. respond ( response) {
372583 error ! ( "HTTP server error on response: {}" , e) ;
373584 }
374585 }
375- }
376- Err ( ServerError :: ShutdownEvent ) => {
377- server. flush_outgoing_writes ( ) ;
378- return ;
379- }
380- Err ( e) => {
381- error ! (
382- "HTTP server error on retrieving incoming request. Error: {}" ,
383- e
384- ) ;
586+ _ => { }
385587 }
386588 }
387589 }
0 commit comments