Skip to content

Commit dafcccf

Browse files
committed
fix: improve reconnection with sleeping and tunnel
1 parent 2b1b80f commit dafcccf

File tree

24 files changed

+466
-333
lines changed

24 files changed

+466
-333
lines changed

Cargo.lock

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ regex = "1.4"
5959
rstest = "0.26.1"
6060
rustls-pemfile = "2.2.0"
6161
rustyline = "15.0.0"
62+
scc = "3.3.2"
6263
serde_bare = "0.5.0"
6364
serde_html_form = "0.2.7"
6465
serde_yaml = "0.9.34"

engine/artifacts/errors/guard.websocket_pending_limit_reached.json

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/epoxy/src/ops/kv/get_local.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,12 @@ pub async fn epoxy_kv_get_local(ctx: &OperationCtx, input: &Input) -> Result<Out
3030
let packed_key = packed_key.clone();
3131
let kv_key = kv_key.clone();
3232
async move {
33-
(async move {
34-
let value = tx.get(&packed_key, Serializable).await?;
35-
if let Some(v) = value {
36-
Ok(Some(kv_key.deserialize(&v)?))
37-
} else {
38-
Ok(None)
39-
}
40-
})
41-
.await
33+
let value = tx.get(&packed_key, Serializable).await?;
34+
if let Some(v) = value {
35+
Ok(Some(kv_key.deserialize(&v)?))
36+
} else {
37+
Ok(None)
38+
}
4239
}
4340
})
4441
.custom_instrument(tracing::info_span!("get_local_tx"))

engine/packages/epoxy/src/ops/kv/get_optimistic.rs

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -52,29 +52,26 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
5252
let kv_key = kv_key.clone();
5353
let cache_key = cache_key.clone();
5454
async move {
55-
(async move {
56-
let (value, cache_value) = tokio::try_join!(
57-
async {
58-
let v = tx.get(&packed_key, Serializable).await?;
59-
if let Some(ref bytes) = v {
60-
Ok(Some(kv_key.deserialize(bytes)?))
61-
} else {
62-
Ok(None)
63-
}
64-
},
65-
async {
66-
let v = tx.get(&packed_cache_key, Serializable).await?;
67-
if let Some(ref bytes) = v {
68-
Ok(Some(cache_key.deserialize(bytes)?))
69-
} else {
70-
Ok(None)
71-
}
55+
let (value, cache_value) = tokio::try_join!(
56+
async {
57+
let v = tx.get(&packed_key, Serializable).await?;
58+
if let Some(ref bytes) = v {
59+
Ok(Some(kv_key.deserialize(bytes)?))
60+
} else {
61+
Ok(None)
7262
}
73-
)?;
63+
},
64+
async {
65+
let v = tx.get(&packed_cache_key, Serializable).await?;
66+
if let Some(ref bytes) = v {
67+
Ok(Some(cache_key.deserialize(bytes)?))
68+
} else {
69+
Ok(None)
70+
}
71+
}
72+
)?;
7473

75-
Ok(value.or(cache_value))
76-
})
77-
.await
74+
Ok(value.or(cache_value))
7875
}
7976
})
8077
.custom_instrument(tracing::info_span!("get_optimistic_tx"))
@@ -134,13 +131,11 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul
134131
let packed_cache_key = packed_cache_key.clone();
135132
let cache_key = cache_key.clone();
136133
let value_to_cache = value.clone();
134+
137135
async move {
138-
(async move {
139-
let serialized = cache_key.serialize(value_to_cache)?;
140-
tx.set(&packed_cache_key, &serialized);
141-
Ok(())
142-
})
143-
.await
136+
let serialized = cache_key.serialize(value_to_cache)?;
137+
tx.set(&packed_cache_key, &serialized);
138+
Ok(())
144139
}
145140
})
146141
.custom_instrument(tracing::info_span!("cache_value_tx"))

engine/packages/gasoline/src/ctx/standalone.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ impl StandaloneCtx {
4646
) -> WorkflowResult<Self> {
4747
let ts = rivet_util::timestamp::now();
4848

49-
let span = tracing::Span::current();
50-
span.record("req_id", req_id.to_string());
51-
span.record("ray_id", ray_id.to_string());
49+
tracing::Span::current()
50+
.record("req_id", req_id.to_string())
51+
.record("ray_id", ray_id.to_string());
5252

5353
let msg_ctx = MessageCtx::new(&config, &pools, &cache, ray_id)?;
5454

engine/packages/guard-core/src/custom_serve.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use async_trait::async_trait;
33
use bytes::Bytes;
44
use http_body_util::Full;
55
use hyper::{Request, Response};
6+
use uuid::Uuid;
67

78
use crate::WebSocketHandle;
89
use crate::proxy_service::ResponseBody;
@@ -25,5 +26,7 @@ pub trait CustomServeTrait: Send + Sync {
2526
headers: &hyper::HeaderMap,
2627
path: &str,
2728
request_context: &mut RequestContext,
29+
// Identifies the websocket across retries.
30+
unique_request_id: Uuid,
2831
) -> Result<()>;
2932
}

engine/packages/guard-core/src/proxy_service.rs

Lines changed: 68 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use tokio_tungstenite::tungstenite::{
2828
};
2929
use tracing::Instrument;
3030
use url::Url;
31+
use uuid::Uuid;
3132

3233
use crate::{
3334
WebSocketHandle, custom_serve::CustomServeTrait, errors, metrics,
@@ -1171,7 +1172,7 @@ impl ProxyService {
11711172
}
11721173

11731174
// Handle WebSocket upgrade properly with hyper_tungstenite
1174-
tracing::debug!("Upgrading client connection to WebSocket");
1175+
tracing::debug!(%req_path, "Upgrading client connection to WebSocket");
11751176
let (client_response, client_ws) = match hyper_tungstenite::upgrade(req, None) {
11761177
Ok(x) => {
11771178
tracing::debug!("Client WebSocket upgrade successful");
@@ -1782,18 +1783,20 @@ impl ProxyService {
17821783
}
17831784
ResolveRouteOutput::Response(_) => unreachable!(),
17841785
ResolveRouteOutput::CustomServe(mut handlers) => {
1785-
tracing::debug!("Spawning task to handle WebSocket communication");
1786+
tracing::debug!(%req_path, "Spawning task to handle WebSocket communication");
17861787
let mut request_context = request_context.clone();
17871788
let req_headers = req_headers.clone();
17881789
let req_path = req_path.clone();
17891790
let req_host = req_host.clone();
17901791

1791-
// TODO: Handle errors here, the error message is lost
17921792
tokio::spawn(
17931793
async move {
1794+
let request_id = Uuid::new_v4();
17941795
let mut attempts = 0u32;
17951796

1796-
let ws_handle = WebSocketHandle::new(client_ws);
1797+
let ws_handle = WebSocketHandle::new(client_ws)
1798+
.await
1799+
.context("failed initiating websocket handle")?;
17971800

17981801
loop {
17991802
match handlers
@@ -1802,6 +1805,7 @@ impl ProxyService {
18021805
&req_headers,
18031806
&req_path,
18041807
&mut request_context,
1808+
request_id,
18051809
)
18061810
.await
18071811
{
@@ -1825,13 +1829,17 @@ impl ProxyService {
18251829
break;
18261830
}
18271831
Err(err) => {
1832+
tracing::debug!(?err, "websocket handler error");
1833+
18281834
attempts += 1;
18291835
if attempts > max_attempts || !is_retryable_ws_error(&err) {
1836+
tracing::debug!(?attempts, "WebSocket failed to reconnect");
1837+
18301838
// Close WebSocket with error
18311839
ws_handle
1832-
.accept_and_send(to_hyper_close(Some(
1833-
err_to_close_frame(err, ray_id),
1834-
)))
1840+
.send(to_hyper_close(Some(err_to_close_frame(
1841+
err, ray_id,
1842+
))))
18351843
.await?;
18361844

18371845
// Flush to ensure close frame is sent
@@ -1846,6 +1854,13 @@ impl ProxyService {
18461854
attempts,
18471855
initial_interval,
18481856
);
1857+
let backoff = Duration::from_millis(100);
1858+
1859+
tracing::debug!(
1860+
?backoff,
1861+
"WebSocket attempt {attempts} failed (service unavailable)"
1862+
);
1863+
18491864
tokio::time::sleep(backoff).await;
18501865

18511866
match state
@@ -1864,11 +1879,9 @@ impl ProxyService {
18641879
}
18651880
Ok(ResolveRouteOutput::Response(response)) => {
18661881
ws_handle
1867-
.accept_and_send(to_hyper_close(Some(
1868-
str_to_close_frame(
1869-
response.message.as_ref(),
1870-
),
1871-
)))
1882+
.send(to_hyper_close(Some(str_to_close_frame(
1883+
response.message.as_ref(),
1884+
))))
18721885
.await?;
18731886

18741887
// Flush to ensure close frame is sent
@@ -1879,12 +1892,10 @@ impl ProxyService {
18791892
}
18801893
Ok(ResolveRouteOutput::Target(_)) => {
18811894
ws_handle
1882-
.accept_and_send(to_hyper_close(Some(
1883-
err_to_close_frame(
1884-
errors::WebSocketTargetChanged.build(),
1885-
ray_id,
1886-
),
1887-
)))
1895+
.send(to_hyper_close(Some(err_to_close_frame(
1896+
errors::WebSocketTargetChanged.build(),
1897+
ray_id,
1898+
))))
18881899
.await?;
18891900

18901901
// Flush to ensure close frame is sent
@@ -1897,9 +1908,9 @@ impl ProxyService {
18971908
}
18981909
Err(err) => {
18991910
ws_handle
1900-
.accept_and_send(to_hyper_close(Some(
1901-
err_to_close_frame(err, ray_id),
1902-
)))
1911+
.send(to_hyper_close(Some(err_to_close_frame(
1912+
err, ray_id,
1913+
))))
19031914
.await?;
19041915

19051916
// Flush to ensure close frame is sent
@@ -1947,13 +1958,17 @@ impl ProxyService {
19471958

19481959
impl ProxyService {
19491960
// Process an individual request
1950-
#[tracing::instrument(name = "guard_request", skip_all)]
1961+
#[tracing::instrument(name = "guard_request", skip_all, fields(ray_id, req_id))]
19511962
pub async fn process(&self, mut req: Request<BodyIncoming>) -> Result<Response<ResponseBody>> {
19521963
let start_time = Instant::now();
19531964

19541965
let request_ids = RequestIds::new(self.state.config.dc_label());
19551966
req.extensions_mut().insert(request_ids);
19561967

1968+
tracing::Span::current()
1969+
.record("req_id", request_ids.req_id.to_string())
1970+
.record("ray_id", request_ids.ray_id.to_string());
1971+
19571972
// Create request context for analytics tracking
19581973
let mut request_context =
19591974
RequestContext::new(self.state.clickhouse_inserter.clone(), request_ids);
@@ -2063,35 +2078,50 @@ impl ProxyService {
20632078

20642079
// If we receive an error during a websocket request, we attempt to open the websocket anyway
20652080
// so we can send the error via websocket instead of http. Most websocket clients don't handle
2066-
// HTTP errors in a meaningful way for the user resulting in unhelpful errors
2081+
// HTTP errors in a meaningful way resulting in unhelpful errors for the user
20672082
if is_websocket {
20682083
tracing::debug!("Upgrading client connection to WebSocket for error proxy");
20692084
match hyper_tungstenite::upgrade(mock_req, None) {
20702085
Ok((client_response, client_ws)) => {
20712086
tracing::debug!("Client WebSocket upgrade for error proxy successful");
20722087

2073-
tokio::spawn(async move {
2074-
let ws_handle = WebSocketHandle::new(client_ws);
2075-
let frame = err_to_close_frame(err, Some(request_ids.ray_id));
2088+
tokio::spawn(
2089+
async move {
2090+
let ws_handle = match WebSocketHandle::new(client_ws).await {
2091+
Ok(ws_handle) => ws_handle,
2092+
Err(err) => {
2093+
tracing::debug!(
2094+
?err,
2095+
"failed initiating websocket handle for error proxy"
2096+
);
2097+
return;
2098+
}
2099+
};
2100+
let frame = err_to_close_frame(err, Some(request_ids.ray_id));
20762101

2077-
// Manual conversion to handle different tungstenite versions
2078-
let code_num: u16 = frame.code.into();
2079-
let reason = frame.reason.clone();
2102+
// Manual conversion to handle different tungstenite versions
2103+
let code_num: u16 = frame.code.into();
2104+
let reason = frame.reason.clone();
20802105

2081-
if let Err(err) = ws_handle
2082-
.accept_and_send(
2083-
tokio_tungstenite::tungstenite::Message::Close(Some(
2106+
if let Err(err) = ws_handle
2107+
.send(tokio_tungstenite::tungstenite::Message::Close(Some(
20842108
tokio_tungstenite::tungstenite::protocol::CloseFrame {
20852109
code: code_num.into(),
20862110
reason,
20872111
},
2088-
)),
2089-
)
2090-
.await
2091-
{
2092-
tracing::debug!(?err, "failed sending error proxy");
2112+
)))
2113+
.await
2114+
{
2115+
tracing::debug!(
2116+
?err,
2117+
"failed sending websocket error proxy"
2118+
);
2119+
}
20932120
}
2094-
});
2121+
.instrument(
2122+
tracing::info_span!("ws_error_proxy_task", ?request_ids.ray_id),
2123+
),
2124+
);
20952125

20962126
// Return the response that will upgrade the client connection
20972127
// For proper WebSocket handshaking, we need to preserve the original response

0 commit comments

Comments
 (0)