66use std:: collections:: BTreeMap ;
77use std:: error:: Error ;
88use std:: fs:: File ;
9+ use std:: os:: fd:: AsRawFd ;
910use std:: os:: unix:: io:: { IntoRawFd , RawFd } ;
1011use std:: os:: unix:: net:: UnixListener ;
1112use std:: panic:: AssertUnwindSafe ;
1213use std:: path:: PathBuf ;
13- use std:: sync:: LazyLock ;
14- use std:: sync:: mpsc :: Sender ;
14+ use std:: sync:: mpsc :: { Receiver , Sender , channel , sync_channel } ;
15+ use std:: sync:: { Arc , LazyLock , Mutex } ;
1516use std:: thread;
1617
1718use hypervisor:: HypervisorType ;
1819use micro_http:: {
19- Body , HttpServer , MediaType , Method , Request , Response , ServerError , StatusCode , Version ,
20+ Body , HttpServer , MediaType , Method , Request , Response , ServerError , ServerRequest ,
21+ ServerResponse , StatusCode , Version ,
2022} ;
2123use seccompiler:: { SeccompAction , apply_filter} ;
2224use serde_json:: Error as SerdeError ;
2325use thiserror:: Error ;
26+ use vmm_sys_util:: epoll:: { ControlOperation , Epoll , EpollEvent , EventSet } ;
2427use vmm_sys_util:: eventfd:: EventFd ;
2528
2629use self :: http_endpoint:: { VmActionHandler , VmCreate , VmInfo , VmmPing , VmmShutdown } ;
@@ -314,6 +317,152 @@ fn handle_http_request(
314317 response
315318}
316319
320+ /// Keeps track of the worker threads, and the resources needed to interact
321+ /// with them.
322+ #[ derive( Debug ) ]
323+ struct HttpWorkerThreads {
324+ // The worker threads themselves.
325+ threads : Vec < thread:: JoinHandle < Result < ( ) > > > ,
326+ // An MPSC channel to send server requests to the workers. We put it into
327+ // an option so we can easily drop it in the destructor.
328+ request_tx : Option < Sender < ServerRequest > > ,
329+ // An MPSC channel that the workers use to send responses to the HTTP
330+ // server thread.
331+ response_rx : Receiver < ServerResponse > ,
332+ // Workers signal this eventfd when they have a response for the HTTP
333+ // server thread.
334+ response_event : EventFd ,
335+ }
336+
337+ impl HttpWorkerThreads {
338+ fn new (
339+ thread_count : usize ,
340+ api_notifier : EventFd ,
341+ api_sender : Sender < ApiRequest > ,
342+ seccomp_action : & SeccompAction ,
343+ hypervisor_type : HypervisorType ,
344+ landlock_enable : bool ,
345+ exit_evt : EventFd ,
346+ ) -> Result < Self > {
347+ let response_event = EventFd :: new ( libc:: EFD_NONBLOCK ) . map_err ( VmmError :: EventFdCreate ) ?;
348+ let ( response_tx, response_rx) = sync_channel :: < ServerResponse > ( thread_count) ;
349+
350+ let mut threads = Vec :: new ( ) ;
351+ let ( request_tx, request_rx) = channel :: < ServerRequest > ( ) ;
352+
353+ let request_rx = Arc :: new ( Mutex :: new ( request_rx) ) ;
354+
355+ // We use the same seccomp filter that we already use for the HTTP server thread.
356+ let api_seccomp_filter =
357+ get_seccomp_filter ( seccomp_action, Thread :: HttpApi , hypervisor_type)
358+ . map_err ( VmmError :: CreateSeccompFilter ) ?;
359+
360+ for n in 0 ..thread_count {
361+ let response_event = response_event. try_clone ( ) . map_err ( VmmError :: EventFdClone ) ?;
362+
363+ let response_tx = response_tx. clone ( ) ;
364+ let request_rx = request_rx. clone ( ) ;
365+
366+ let api_notifier = api_notifier. try_clone ( ) . map_err ( VmmError :: EventFdClone ) ?;
367+ let api_sender = api_sender. clone ( ) ;
368+
369+ let api_seccomp_filter = api_seccomp_filter. clone ( ) ;
370+ let exit_evt = exit_evt. try_clone ( ) . map_err ( VmmError :: EventFdClone ) ?;
371+
372+ let thread = thread:: Builder :: new ( )
373+ . name ( format ! ( "http-worker-{n}" ) . to_string ( ) )
374+ . spawn ( move || {
375+ debug ! ( "Spawned HTTP worker thread with id {n}" , ) ;
376+ if !api_seccomp_filter. is_empty ( ) {
377+ apply_filter ( & api_seccomp_filter)
378+ . map_err ( VmmError :: ApplySeccompFilter )
379+ . map_err ( |e| {
380+ error ! ( "Error applying seccomp filter: {:?}" , e) ;
381+ exit_evt. write ( 1 ) . ok ( ) ;
382+ e
383+ } ) ?;
384+ }
385+
386+ if landlock_enable {
387+ Landlock :: new ( )
388+ . map_err ( VmmError :: CreateLandlock ) ?
389+ . restrict_self ( )
390+ . map_err ( VmmError :: ApplyLandlock )
391+ . map_err ( |e| {
392+ error ! ( "Error applying landlock to http-worker thread: {:?}" , e) ;
393+ exit_evt. write ( 1 ) . ok ( ) ;
394+ e
395+ } ) ?;
396+ }
397+
398+ std:: panic:: catch_unwind ( AssertUnwindSafe ( move || {
399+ let id = n;
400+ loop {
401+ let request = request_rx. lock ( ) . unwrap ( ) . recv ( ) ;
402+ match request {
403+ Ok ( msg) => {
404+ // Process the server request
405+ let response = msg. process ( |request| {
406+ handle_http_request ( request, & api_notifier, & api_sender)
407+ } ) ;
408+
409+ // Send the response to the HTTP server thread together with this
410+ // threads id.
411+ if let Err ( e) = response_tx. send ( response) {
412+ error ! (
413+ "HTTP worker thread {id}: error sending response {}" ,
414+ e
415+ ) ;
416+ break ;
417+ }
418+
419+ // Notify the HTTP server thread.
420+ response_event. write ( 1 ) . ok ( ) ;
421+ }
422+ Err ( e) => {
423+ error ! (
424+ "HTTP worker thread {id}: error receiving request {}" ,
425+ e
426+ ) ;
427+ break ;
428+ }
429+ }
430+ }
431+ } ) )
432+ . map_err ( |_| {
433+ error ! ( "http-worker thread {n} panicked" ) ;
434+ exit_evt. write ( 1 ) . ok ( )
435+ } )
436+ . ok ( ) ;
437+
438+ Ok ( ( ) )
439+ } )
440+ . map_err ( VmmError :: HttpThreadSpawn ) ?;
441+
442+ threads. push ( thread) ;
443+ }
444+
445+ Ok ( Self {
446+ threads,
447+ request_tx : Some ( request_tx) ,
448+ response_rx,
449+ response_event,
450+ } )
451+ }
452+ }
453+
454+ impl Drop for HttpWorkerThreads {
455+ fn drop ( & mut self ) {
456+ // Dropping the Sender side of the request channels to throw the worker
457+ // threads out of their loops.
458+ drop ( self . request_tx . take ( ) ) ;
459+ // Now we can join each thread.
460+ self . threads
461+ . drain ( ..)
462+ . for_each ( |thread| thread. join ( ) . unwrap ( ) . unwrap ( ) ) ;
463+ }
464+ }
465+
317466fn start_http_thread (
318467 mut server : HttpServer ,
319468 api_notifier : EventFd ,
@@ -334,6 +483,42 @@ fn start_http_thread(
334483 . add_kill_switch ( api_shutdown_fd_clone)
335484 . map_err ( VmmError :: CreateApiServer ) ?;
336485
486+ // We use the epoll mechanism to parallelize this. The epoll tokens are
487+ // attached when registering the FDs with epoll. That way we can later
488+ // check why we were notified.
489+ const HTTP_EPOLL_TOKEN : u64 = 1 ;
490+ const WORKER_EPOLL_TOKEN : u64 = 2 ;
491+
492+ // The epoll instance our HTTP server thread will wait on.
493+ let outer_epoll = Epoll :: new ( ) . unwrap ( ) ;
494+ let worker_threads = HttpWorkerThreads :: new (
495+ 2 ,
496+ api_notifier,
497+ api_sender,
498+ seccomp_action,
499+ hypervisor_type,
500+ landlock_enable,
501+ exit_evt. try_clone ( ) . unwrap ( ) ,
502+ ) ?;
503+
504+ // Register the fd that the worker threads will signal.
505+ outer_epoll
506+ . ctl (
507+ ControlOperation :: Add ,
508+ worker_threads. response_event . as_raw_fd ( ) ,
509+ EpollEvent :: new ( EventSet :: IN , WORKER_EPOLL_TOKEN ) ,
510+ )
511+ . unwrap ( ) ;
512+
513+ // Register the HttpServer's fd.
514+ outer_epoll
515+ . ctl (
516+ ControlOperation :: Add ,
517+ server. epoll ( ) . as_raw_fd ( ) ,
518+ EpollEvent :: new ( EventSet :: IN , HTTP_EPOLL_TOKEN ) ,
519+ )
520+ . unwrap ( ) ;
521+
337522 let thread = thread:: Builder :: new ( )
338523 . name ( "http-server" . to_string ( ) )
339524 . spawn ( move || {
@@ -361,24 +546,42 @@ fn start_http_thread(
361546 }
362547
363548 std:: panic:: catch_unwind ( AssertUnwindSafe ( move || {
549+ let mut events = vec ! [ EpollEvent :: default ( ) ; 32 ] ;
364550 server. start_server ( ) . unwrap ( ) ;
551+
365552 loop {
366- match server. requests ( ) {
367- Ok ( request_vec) => {
368- for server_request in request_vec {
369- if let Err ( e) = server. respond ( server_request. process ( |request| {
370- handle_http_request ( request, & api_notifier, & api_sender)
371- } ) ) {
553+ let n = outer_epoll. wait ( -1 , & mut events) . unwrap ( ) ;
554+ for ev in events. iter ( ) . take ( n) {
555+ match ev. data ( ) {
556+ HTTP_EPOLL_TOKEN => {
557+ // The HttpServer got a request, handle that.
558+ match server. requests ( ) {
559+ Ok ( request_vec) => {
560+ for server_request in request_vec {
561+ worker_threads. request_tx . as_ref ( ) . unwrap ( ) . send ( server_request) . unwrap ( ) ;
562+ }
563+ }
564+ Err ( ServerError :: ShutdownEvent ) => {
565+ server. flush_outgoing_writes ( ) ;
566+ return ;
567+ }
568+ Err ( e) => {
569+ error ! (
570+ "HTTP server error on retrieving incoming request. Error: {e}"
571+ ) ;
572+ }
573+ }
574+ }
575+ WORKER_EPOLL_TOKEN => {
576+ // One of the worker threads has a response.
577+ // We clear the eventfd first.
578+ let _ = worker_threads. response_event . read ( ) . unwrap ( ) ;
579+ let response = worker_threads. response_rx . recv ( ) . unwrap ( ) ;
580+ if let Err ( e) = server. respond ( response) {
372581 error ! ( "HTTP server error on response: {e}" ) ;
373582 }
374583 }
375- }
376- Err ( ServerError :: ShutdownEvent ) => {
377- server. flush_outgoing_writes ( ) ;
378- return ;
379- }
380- Err ( e) => {
381- error ! ( "HTTP server error on retrieving incoming request. Error: {e}" ) ;
584+ _ => { }
382585 }
383586 }
384587 }
0 commit comments