Skip to content

Commit b1e00aa

Browse files
Add timeout functionality
- Added timeout future funcitonality - changes variable names
1 parent c5ed091 commit b1e00aa

File tree

2 files changed

+55
-6
lines changed

2 files changed

+55
-6
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ dashmap = "6.1.0"
1111
lazy_static = "1.5.0"
1212
wit-bindgen-rt = { version = "0.30.0", features = ["bitflags"] }
1313
autoincrement = { version = "1", features = ["derive", "async"] }
14+
pin-project-lite = "0.2.16"
1415
[dependencies.uuid]
1516
version = "1.17.0"
1617
# Lets you generate random UUIDs

src/io/timer.rs

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use crate::engine::REACTOR;
22
use crate::poll_tasks::EventWithWaker;
33
use dashmap::DashMap;
4+
use futures::FutureExt;
45
use lazy_static::lazy_static;
56
use std::future::Future;
7+
use std::task::Poll;
68
use std::time::Duration;
79
use std::time::Instant;
810
use uuid::Uuid;
@@ -13,28 +15,45 @@ lazy_static! {
1315
#[derive(Debug, Clone)]
1416
pub struct Timer {
1517
at: Instant,
16-
until: Duration,
18+
deadline: Duration,
1719
elapsed: bool,
1820
}
1921

2022
impl Timer {
2123
/// create a timer that resolves once it elapses
2224
pub async fn sleep(until: std::time::Duration) {
23-
let timeout = TimeoutFuture {
25+
let timeout = TimeFuture {
2426
timer_key: format!("sleep-{}", Uuid::new_v4()),
2527
timer: Self {
2628
at: Instant::now(),
27-
until,
29+
deadline: until,
2830
elapsed: false,
2931
},
3032
};
3133
timeout.await
3234
}
35+
36+
pub async fn timeout<K, F: Future<Output = K>>(
37+
&self,
38+
fut: F,
39+
deadline: std::time::Duration,
40+
) -> std::io::Result<K> {
41+
let timer_future = TimeFuture {
42+
timer_key: format!("timeout-{}", Uuid::new_v4()),
43+
timer: Self {
44+
at: Instant::now(),
45+
deadline,
46+
elapsed: false,
47+
},
48+
};
49+
let timeout_future = TimeoutFuture { timer_future, fut };
50+
timeout_future.await
51+
}
3352
pub fn update_elapsed(&mut self) {
3453
let new_now = Instant::now();
3554
let elapsed = new_now
3655
.checked_duration_since(self.at)
37-
.map(|s| s > self.until)
56+
.map(|s| s > self.deadline)
3857
.unwrap_or_default();
3958
self.elapsed = elapsed;
4059
}
@@ -44,12 +63,12 @@ impl Timer {
4463
}
4564
}
4665

47-
struct TimeoutFuture {
66+
struct TimeFuture {
4867
timer_key: String,
4968
timer: Timer,
5069
}
5170

52-
impl Future for TimeoutFuture {
71+
impl Future for TimeFuture {
5372
type Output = ();
5473
fn poll(
5574
self: std::pin::Pin<&mut Self>,
@@ -72,3 +91,32 @@ impl Future for TimeoutFuture {
7291
std::task::Poll::Pending
7392
}
7493
}
94+
95+
pin_project_lite::pin_project! {
96+
pub struct TimeoutFuture<K,F:Future<Output = K>>
97+
{
98+
#[pin]
99+
fut:F,
100+
#[pin]
101+
timer_future:TimeFuture
102+
}
103+
}
104+
105+
impl<K, F: Future<Output = K>> Future for TimeoutFuture<K, F> {
106+
type Output = Result<K, std::io::Error>;
107+
fn poll(
108+
self: std::pin::Pin<&mut Self>,
109+
cx: &mut std::task::Context<'_>,
110+
) -> std::task::Poll<Self::Output> {
111+
let mut this = self.project();
112+
if this.timer_future.poll_unpin(cx).is_pending() {
113+
match this.fut.poll_unpin(cx) {
114+
Poll::Ready(ready) => Poll::Ready(Ok(ready)),
115+
Poll::Pending => Poll::Pending,
116+
}
117+
} else {
118+
let error = std::io::Error::new(std::io::ErrorKind::TimedOut, "Timer has elapsed");
119+
Poll::Ready(Err(error))
120+
}
121+
}
122+
}

0 commit comments

Comments
 (0)