66use std:: collections:: BTreeMap ;
77use std:: error:: Error ;
88use std:: fs:: File ;
9+ use std:: os:: fd:: AsRawFd ;
910use std:: os:: unix:: io:: { IntoRawFd , RawFd } ;
1011use std:: os:: unix:: net:: UnixListener ;
1112use std:: panic:: AssertUnwindSafe ;
@@ -16,11 +17,13 @@ use std::thread;
1617
1718use hypervisor:: HypervisorType ;
1819use micro_http:: {
19- Body , HttpServer , MediaType , Method , Request , Response , ServerError , StatusCode , Version ,
20+ Body , HttpServer , MediaType , Method , Request , Response , ServerError , ServerRequest ,
21+ ServerResponse , StatusCode , Version ,
2022} ;
2123use seccompiler:: { SeccompAction , apply_filter} ;
2224use serde_json:: Error as SerdeError ;
2325use thiserror:: Error ;
26+ use vmm_sys_util:: epoll:: { ControlOperation , Epoll , EpollEvent , EventSet } ;
2427use vmm_sys_util:: eventfd:: EventFd ;
2528
2629use self :: http_endpoint:: { VmActionHandler , VmCreate , VmInfo , VmmPing , VmmShutdown } ;
@@ -314,6 +317,188 @@ fn handle_http_request(
314317 response
315318}
316319
320+ // This answer is returned by the worker threads to the HTTP Server thread. The
321+ // ID is used to mark the worker that just sent the response as ready.
322+ struct WorkerAnswer {
323+ id : usize ,
324+ response : ServerResponse ,
325+ }
326+ struct HttpWorkerThreads {
327+ // The worker threads themselves.
328+ threads : Vec < thread:: JoinHandle < Result < ( ) > > > ,
329+ // Workers that are ready for more work are marked with true.
330+ threads_ready : Vec < bool > ,
331+ // An MPSC channel for every worker. We use this to send server requests
332+ // to the workers.
333+ request_txs : Vec < std:: sync:: mpsc:: SyncSender < ServerRequest > > ,
334+ // If all workers are busy, we queue server requests here.
335+ request_queue : Vec < ServerRequest > ,
336+ // A counter to select the workers round-robin.
337+ next_request : usize ,
338+ // An MPSC channel that the workers use to send responses to the HTTP
339+ // server thread.
340+ response_rx : std:: sync:: mpsc:: Receiver < WorkerAnswer > ,
341+ // Workers signal this eventfd when they have a response for the HTTP
342+ // server thread.
343+ response_event : EventFd ,
344+ }
345+
346+ impl HttpWorkerThreads {
347+ fn new (
348+ thread_count : usize ,
349+ api_notifier : EventFd ,
350+ api_sender : Sender < ApiRequest > ,
351+ seccomp_action : & SeccompAction ,
352+ hypervisor_type : HypervisorType ,
353+ landlock_enable : bool ,
354+ exit_evt : EventFd ,
355+ ) -> Result < Self > {
356+ let response_event = EventFd :: new ( libc:: EFD_NONBLOCK ) . map_err ( VmmError :: EventFdCreate ) ?;
357+ let ( response_tx, response_rx) =
358+ std:: sync:: mpsc:: sync_channel :: < WorkerAnswer > ( thread_count) ;
359+
360+ let mut threads = Vec :: new ( ) ;
361+ let mut threads_ready = Vec :: new ( ) ;
362+ let mut request_txs = Vec :: new ( ) ;
363+
364+ // We use the same seccomp filter that we already use for the HTTP server thread.
365+ let api_seccomp_filter =
366+ get_seccomp_filter ( seccomp_action, Thread :: HttpApi , hypervisor_type)
367+ . map_err ( VmmError :: CreateSeccompFilter ) ?;
368+
369+ for n in 0 ..thread_count {
370+ let response_event = response_event. try_clone ( ) . map_err ( VmmError :: EventFdClone ) ?;
371+
372+ let response_tx = response_tx. clone ( ) ;
373+ let ( request_tx, request_rx) = std:: sync:: mpsc:: sync_channel :: < ServerRequest > ( 1 ) ;
374+
375+ let api_notifier = api_notifier. try_clone ( ) . map_err ( VmmError :: EventFdClone ) ?;
376+ let api_sender = api_sender. clone ( ) ;
377+
378+ let api_seccomp_filter = api_seccomp_filter. clone ( ) ;
379+ let exit_evt = exit_evt. try_clone ( ) . map_err ( VmmError :: EventFdClone ) ?;
380+
381+ let thread = thread:: Builder :: new ( )
382+ . name ( format ! ( "http-worker-{n}" ) . to_string ( ) )
383+ . spawn ( move || {
384+ info ! ( "Spawned HTTP worker thread with id {n}" , ) ;
385+ if !api_seccomp_filter. is_empty ( ) {
386+ apply_filter ( & api_seccomp_filter)
387+ . map_err ( VmmError :: ApplySeccompFilter )
388+ . map_err ( |e| {
389+ error ! ( "Error applying seccomp filter: {:?}" , e) ;
390+ exit_evt. write ( 1 ) . ok ( ) ;
391+ e
392+ } ) ?;
393+ }
394+
395+ if landlock_enable {
396+ Landlock :: new ( )
397+ . map_err ( VmmError :: CreateLandlock ) ?
398+ . restrict_self ( )
399+ . map_err ( VmmError :: ApplyLandlock )
400+ . map_err ( |e| {
401+ error ! ( "Error applying landlock to http-worker thread: {:?}" , e) ;
402+ exit_evt. write ( 1 ) . ok ( ) ;
403+ e
404+ } ) ?;
405+ }
406+
407+ std:: panic:: catch_unwind ( AssertUnwindSafe ( move || {
408+ loop {
409+ let id = n;
410+ match request_rx. recv ( ) {
411+ Ok ( msg) => {
412+ // Process the server request
413+ let response = msg. process ( |request| {
414+ handle_http_request ( request, & api_notifier, & api_sender)
415+ } ) ;
416+
417+ // Send the response to the HTTP server thread together with this
418+ // threads id.
419+ let response = WorkerAnswer { id, response } ;
420+ if let Err ( e) = response_tx. send ( response) {
421+ error ! (
422+ "HTTP worker thread {id}: error sending response {}" ,
423+ e
424+ ) ;
425+ break ;
426+ }
427+
428+ // Notify the HTTP server thread.
429+ response_event. write ( 1 ) . ok ( ) ;
430+ }
431+ Err ( e) => {
432+ error ! (
433+ "HTTP worker thread {id}: error receiving request {}" ,
434+ e
435+ ) ;
436+ break ;
437+ }
438+ }
439+ }
440+ } ) )
441+ . map_err ( |_| {
442+ error ! ( "http-worker thread {n} panicked" ) ;
443+ exit_evt. write ( 1 ) . ok ( )
444+ } )
445+ . ok ( ) ;
446+
447+ Ok ( ( ) )
448+ } )
449+ . map_err ( VmmError :: HttpThreadSpawn ) ?;
450+
451+ threads. push ( thread) ;
452+ threads_ready. push ( true ) ;
453+ request_txs. push ( request_tx) ;
454+ }
455+
456+ Ok ( Self {
457+ threads,
458+ threads_ready,
459+ request_txs,
460+ request_queue : Vec :: new ( ) ,
461+ next_request : 0 ,
462+ response_rx,
463+ response_event,
464+ } )
465+ }
466+
467+ /// Returns a channel to a worker thread that is ready for more work, if
468+ /// any workers are ready.
469+ fn next_worker ( & mut self ) -> std:: option:: Option < & std:: sync:: mpsc:: SyncSender < ServerRequest > > {
470+ let next: usize = self . next_request ;
471+ self . next_request = next. wrapping_add ( 1 ) ;
472+
473+ // Check if any worker is ready.
474+ if !self . threads_ready . iter ( ) . any ( |v| * v == true ) {
475+ return None ;
476+ }
477+
478+ let worker = next % self . threads . len ( ) ;
479+ if self . threads_ready [ worker] {
480+ // Happy path: the next worker is ready for more work.
481+ self . threads_ready [ worker] = false ;
482+ return Some ( & self . request_txs [ worker] ) ;
483+ }
484+
485+ // If the selected worker is not ready, check the next worker.
486+ self . next_worker ( )
487+ }
488+ }
489+
490+ impl Drop for HttpWorkerThreads {
491+ fn drop ( & mut self ) {
492+ // Dropping the Sender side of the request channels to throw the worker
493+ // threads out of their loops.
494+ self . request_txs . clear ( ) ;
495+ // Now we can join each thread.
496+ self . threads
497+ . drain ( ..)
498+ . for_each ( |thread| thread. join ( ) . unwrap ( ) . unwrap ( ) ) ;
499+ }
500+ }
501+
317502fn start_http_thread (
318503 mut server : HttpServer ,
319504 api_notifier : EventFd ,
@@ -334,6 +519,42 @@ fn start_http_thread(
334519 . add_kill_switch ( api_shutdown_fd_clone)
335520 . map_err ( VmmError :: CreateApiServer ) ?;
336521
522+ // We use the epoll mechanism to parallelize this. The epoll tokens are
523+ // attached when registering the FDs with epoll. That way we can later
524+ // check why we were notified.
525+ const HTTP_EPOLL_TOKEN : u64 = 1 ;
526+ const WORKER_EPOLL_TOKEN : u64 = 2 ;
527+
528+ // The epoll instance our HTTP server thread will wait on.
529+ let outer_epoll = Epoll :: new ( ) . unwrap ( ) ;
530+ let mut worker_threads = HttpWorkerThreads :: new (
531+ 2 ,
532+ api_notifier,
533+ api_sender,
534+ seccomp_action,
535+ hypervisor_type,
536+ landlock_enable,
537+ exit_evt. try_clone ( ) . unwrap ( ) ,
538+ ) ?;
539+
540+ // Register the fd that the worker threads will signal.
541+ outer_epoll
542+ . ctl (
543+ ControlOperation :: Add ,
544+ worker_threads. response_event . as_raw_fd ( ) ,
545+ EpollEvent :: new ( EventSet :: IN , WORKER_EPOLL_TOKEN ) ,
546+ )
547+ . unwrap ( ) ;
548+
549+ // Register the HttpServer's fd.
550+ outer_epoll
551+ . ctl (
552+ ControlOperation :: Add ,
553+ server. epoll ( ) . as_raw_fd ( ) ,
554+ EpollEvent :: new ( EventSet :: IN , HTTP_EPOLL_TOKEN ) ,
555+ )
556+ . unwrap ( ) ;
557+
337558 let thread = thread:: Builder :: new ( )
338559 . name ( "http-server" . to_string ( ) )
339560 . spawn ( move || {
@@ -361,27 +582,60 @@ fn start_http_thread(
361582 }
362583
363584 std:: panic:: catch_unwind ( AssertUnwindSafe ( move || {
585+ let mut events = vec ! [ EpollEvent :: default ( ) ; 32 ] ;
364586 server. start_server ( ) . unwrap ( ) ;
587+
365588 loop {
366- match server. requests ( ) {
367- Ok ( request_vec) => {
368- for server_request in request_vec {
369- if let Err ( e) = server. respond ( server_request. process ( |request| {
370- handle_http_request ( request, & api_notifier, & api_sender)
371- } ) ) {
589+ let n = outer_epoll. wait ( -1 , & mut events) . unwrap ( ) ;
590+ for ev in events. iter ( ) . take ( n) {
591+ match ev. data ( ) {
592+ HTTP_EPOLL_TOKEN => {
593+ // The HttpServer got a request, handle that.
594+ match server. requests ( ) {
595+ Ok ( request_vec) => {
596+ for server_request in request_vec {
597+ match worker_threads. next_worker ( ) {
598+ Some ( worker_tx) => worker_tx. send ( server_request) . unwrap ( ) ,
599+ None => worker_threads. request_queue . push ( server_request) ,
600+ }
601+ }
602+ }
603+ Err ( ServerError :: ShutdownEvent ) => {
604+ server. flush_outgoing_writes ( ) ;
605+ return ;
606+ }
607+ Err ( e) => {
608+ error ! (
609+ "HTTP server error on retrieving incoming request. Error: {}" ,
610+ e
611+ ) ;
612+ }
613+ }
614+ }
615+ WORKER_EPOLL_TOKEN => {
616+ // One of the worker threads has a response.
617+ // We clear the eventfd first.
618+ let _ = worker_threads. response_event . read ( ) . unwrap ( ) ;
619+ let response = worker_threads. response_rx . recv ( ) . unwrap ( ) ;
620+ if let Err ( e) = server. respond ( response. response ) {
372621 error ! ( "HTTP server error on response: {}" , e) ;
373622 }
623+
624+ // We mark that worker thread as ready for more work.
625+ worker_threads. threads_ready [ response. id ] = true ;
626+ // We check whether a request arrived while we no workers were ready.
627+ if !worker_threads. request_queue . is_empty ( ) {
628+ let request = worker_threads. request_queue . pop ( ) . unwrap ( ) ;
629+ match worker_threads. next_worker ( ) {
630+ Some ( worker_tx) => worker_tx. send ( request) . unwrap ( ) ,
631+ None => {
632+ // This should not happen, because we just marked a worker as ready.
633+ warn ! ( "HTTP Server: no worker ready, but one worker was just marked as ready." ) ;
634+ worker_threads. request_queue . push ( request) } ,
635+ }
636+ }
374637 }
375- }
376- Err ( ServerError :: ShutdownEvent ) => {
377- server. flush_outgoing_writes ( ) ;
378- return ;
379- }
380- Err ( e) => {
381- error ! (
382- "HTTP server error on retrieving incoming request. Error: {}" ,
383- e
384- ) ;
638+ _ => { }
385639 }
386640 }
387641 }
0 commit comments