diff --git a/linkerd/app/admin/src/lib.rs b/linkerd/app/admin/src/lib.rs index 97c66dc513..c48f04d300 100644 --- a/linkerd/app/admin/src/lib.rs +++ b/linkerd/app/admin/src/lib.rs @@ -4,5 +4,5 @@ mod server; mod stack; -pub use self::server::{Admin, Latch, Readiness}; +pub use self::server::{Admin, Readiness}; pub use self::stack::{Config, Task}; diff --git a/linkerd/app/admin/src/server.rs b/linkerd/app/admin/src/server.rs index d70b53c5a8..41a4d92ee5 100644 --- a/linkerd/app/admin/src/server.rs +++ b/linkerd/app/admin/src/server.rs @@ -32,7 +32,7 @@ mod json; mod log; mod readiness; -pub use self::readiness::{Latch, Readiness}; +pub use self::readiness::Readiness; #[derive(Clone)] pub struct Admin { @@ -61,7 +61,7 @@ impl Admin { } fn ready_rsp(&self) -> Response { - if self.ready.is_ready() { + if self.ready.get() { Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "text/plain") @@ -269,32 +269,32 @@ mod tests { const TIMEOUT: Duration = Duration::from_secs(1); + macro_rules! call { + ($admin:expr) => {{ + let r = Request::builder() + .method(Method::GET) + .uri("http://0.0.0.0/ready") + .body(Body::empty()) + .unwrap(); + let f = $admin.clone().oneshot(r); + timeout(TIMEOUT, f).await.expect("timeout").expect("call") + }}; + } + #[tokio::test] - async fn ready_when_latches_dropped() { - let (r, l0) = Readiness::new(); - let l1 = l0.clone(); + async fn readiness() { + let readiness = Readiness::new(false); let (_, t) = trace::Settings::default().build(); let (s, _) = mpsc::unbounded_channel(); - let admin = Admin::new((), r, s, t); - macro_rules! call { - () => {{ - let r = Request::builder() - .method(Method::GET) - .uri("http://0.0.0.0/ready") - .body(Body::empty()) - .unwrap(); - let f = admin.clone().oneshot(r); - timeout(TIMEOUT, f).await.expect("timeout").expect("call") - }}; - } - assert_eq!(call!().status(), StatusCode::SERVICE_UNAVAILABLE); + let admin = Admin::new((), readiness.clone(), s, t); + assert_eq!(call!(admin).status(), StatusCode::SERVICE_UNAVAILABLE); - drop(l0); - assert_eq!(call!().status(), StatusCode::SERVICE_UNAVAILABLE); + readiness.set(true); + assert_eq!(call!(admin).status(), StatusCode::OK); - drop(l1); - assert_eq!(call!().status(), StatusCode::OK); + readiness.set(false); + assert_eq!(call!(admin).status(), StatusCode::SERVICE_UNAVAILABLE); } } diff --git a/linkerd/app/admin/src/server/readiness.rs b/linkerd/app/admin/src/server/readiness.rs index 917fecef19..ce7908a1dd 100644 --- a/linkerd/app/admin/src/server/readiness.rs +++ b/linkerd/app/admin/src/server/readiness.rs @@ -1,36 +1,18 @@ -use std::sync::{Arc, Weak}; +use std::sync::{atomic::AtomicBool, Arc}; -/// Tracks the processes's readiness to serve traffic. -/// -/// Once `is_ready()` returns true, it will never return false. #[derive(Clone, Debug)] -pub struct Readiness(Weak<()>); - -/// When all latches are dropped, the process is considered ready. -#[derive(Clone, Debug)] -pub struct Latch(Arc<()>); +pub struct Readiness(Arc); impl Readiness { - pub fn new() -> (Readiness, Latch) { - let r = Arc::new(()); - (Readiness(Arc::downgrade(&r)), Latch(r)) - } - - pub fn is_ready(&self) -> bool { - self.0.upgrade().is_none() + pub fn new(init: bool) -> Readiness { + Readiness(Arc::new(init.into())) } -} -/// ALways ready. -impl Default for Readiness { - fn default() -> Self { - Self::new().0 + pub fn get(&self) -> bool { + self.0.load(std::sync::atomic::Ordering::Acquire) } -} -impl Latch { - /// Releases this readiness latch. - pub fn release(self) { - drop(self); + pub fn set(&self, ready: bool) { + self.0.store(ready, std::sync::atomic::Ordering::Release) } } diff --git a/linkerd/app/admin/src/stack.rs b/linkerd/app/admin/src/stack.rs index 55f364b8a2..4527714d31 100644 --- a/linkerd/app/admin/src/stack.rs +++ b/linkerd/app/admin/src/stack.rs @@ -1,3 +1,4 @@ +use crate::Readiness; use linkerd_app_core::{ classify, config::ServerConfig, @@ -24,7 +25,7 @@ pub struct Config { pub struct Task { pub listen_addr: Local, - pub latch: crate::Latch, + pub ready: Readiness, pub serve: Pin + Send + 'static>>, } @@ -97,8 +98,8 @@ impl Config { .get_policy(inbound::policy::LookupAddr(listen_addr.into())) .await?; - let (ready, latch) = crate::server::Readiness::new(); - let admin = crate::server::Admin::new(report, ready, shutdown, trace); + let ready = crate::server::Readiness::new(false); + let admin = crate::server::Admin::new(report, ready.clone(), shutdown, trace); let admin = svc::stack(move |_| admin.clone()) .push(metrics.proxy.http_endpoint.to_layer::()) .push_map_target(|(permit, http)| Permitted { permit, http }) @@ -169,7 +170,7 @@ impl Config { let serve = Box::pin(serve::serve(listen, admin, drain.signaled())); Ok(Task { listen_addr, - latch, + ready, serve, }) } diff --git a/linkerd/app/integration/src/proxy.rs b/linkerd/app/integration/src/proxy.rs index a2444639f6..1a162254f6 100644 --- a/linkerd/app/integration/src/proxy.rs +++ b/linkerd/app/integration/src/proxy.rs @@ -442,7 +442,7 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { Poll::Ready(()) }); - let drain = main.spawn(); + let (_, drain) = main.spawn(); tokio::select! { _ = on_shutdown => { diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index e8e97edf8a..ab48ab8586 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -302,7 +302,7 @@ impl App { } } - pub fn spawn(self) -> drain::Signal { + pub fn spawn(self) -> (admin::Readiness, drain::Signal) { let App { admin, drain, @@ -312,6 +312,7 @@ impl App { tap, .. } = self; + let readiness = admin.ready.clone(); // Run a daemon thread for all administrative tasks. // @@ -348,11 +349,11 @@ impl App { .instrument(info_span!("identity").or_current()), ); - let latch = admin.latch; + let readiness = admin.ready; tokio::spawn( ready .map(move |()| { - latch.release(); + readiness.set(true); info!(id = %local_id, "Certified identity"); }) .instrument(info_span!("identity").or_current()), @@ -387,6 +388,6 @@ impl App { tokio::spawn(start_proxy); - drain + (readiness, drain) } } diff --git a/linkerd/signal/src/lib.rs b/linkerd/signal/src/lib.rs index 44fa63f98c..4bb4a4dd39 100644 --- a/linkerd/signal/src/lib.rs +++ b/linkerd/signal/src/lib.rs @@ -4,37 +4,30 @@ #![forbid(unsafe_code)] /// Returns a `Future` that completes when the proxy should start to shutdown. -pub async fn shutdown() { +pub async fn shutdown() -> &'static str { imp::shutdown().await } #[cfg(unix)] mod imp { use tokio::signal::unix::{signal, SignalKind}; - use tracing::info; - pub(super) async fn shutdown() { + pub(super) async fn shutdown() -> &'static str { tokio::select! { // SIGINT - To allow Ctrl-c to emulate SIGTERM while developing. - () = sig(SignalKind::interrupt(), "SIGINT") => {} + () = sig(SignalKind::interrupt()) => "SIGINT", // SIGTERM - Kubernetes sends this to start a graceful shutdown. - () = sig(SignalKind::terminate(), "SIGTERM") => {} - }; + () = sig(SignalKind::terminate()) => "SIGTERM", + } } - async fn sig(kind: SignalKind, name: &'static str) { + async fn sig(kind: SignalKind) { // Create a Future that completes the first // time the process receives 'sig'. signal(kind) .expect("Failed to register signal handler") .recv() .await; - info!( - // use target to remove 'imp' from output - target: "linkerd_proxy::signal", - "received {}, starting shutdown", - name, - ); } } @@ -42,7 +35,7 @@ mod imp { mod imp { use tracing::info; - pub(super) async fn shutdown() { + pub(super) async fn shutdown() -> &'static str { // On Windows, we don't have all the signals, but Windows also // isn't our expected deployment target. This implementation allows // developers on Windows to simulate proxy graceful shutdown @@ -51,10 +44,6 @@ mod imp { .expect("Failed to register signal handler") .recv() .await; - info!( - // use target to remove 'imp' from output - target: "linkerd_proxy::signal", - "received Ctrl-C, starting shutdown", - ); + "Ctrl-C" } } diff --git a/linkerd2-proxy/src/main.rs b/linkerd2-proxy/src/main.rs index d106ab910d..d4b8dbafa3 100644 --- a/linkerd2-proxy/src/main.rs +++ b/linkerd2-proxy/src/main.rs @@ -111,15 +111,22 @@ fn main() { } } - let drain = app.spawn(); - tokio::select! { - _ = signal::shutdown() => { - info!("Received shutdown signal"); - } - _ = shutdown_rx.recv() => { - info!("Received shutdown via admin interface"); - } - } + let (readiness, drain) = app.spawn(); + + // When a shutdown signal is received, set the readiness to false so + // that probes can discover that the proxy should not receive more traffic. + // Then, we do NOTHING. We expect a SIGKILL to come along and finish off + // the process. + tokio::spawn(async move { + let signal = signal::shutdown().await; + readiness.set(false); + info!("Received {signal}. Waiting to be terminated forcefully."); + }); + + // If the admin's shutdown channel is used, we gracefully drain open + // connections or terminate after a timeout. + shutdown_rx.recv().await; + info!("Received shutdown via admin interface"); match time::timeout(shutdown_grace_period, drain.drain()).await { Ok(()) => debug!("Shutdown completed gracefully"), Err(_) => warn!(