Skip to content

Commit c5ed091

Browse files
Removed NextID
- replace i with UUID - remove the need to increment everytime - depend on key
1 parent 4e137e9 commit c5ed091

File tree

3 files changed

+51
-35
lines changed

3 files changed

+51
-35
lines changed

src/engine.rs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1+
use crate::Timer;
12
use crate::{bindings::wasi::io::poll::Pollable, poll_tasks::PollTasks};
23
use crate::{io::timer::TIMERS, poll_tasks::EventWithWaker};
4+
use dashmap::DashMap;
35
use futures::pin_mut;
46
use lazy_static::lazy_static;
57
use std::{
68
future::Future,
79
pin::Pin,
810
sync::{
9-
atomic::{AtomicBool, AtomicU32, Ordering},
11+
atomic::{AtomicBool, Ordering},
1012
Arc, Mutex,
1113
},
1214
task::{Context, Wake, Waker},
@@ -16,8 +18,6 @@ lazy_static! {
1618
pub static ref REACTOR: Mutex<Reactor<'static>> = Mutex::new(Reactor::default());
1719

1820
}
19-
20-
pub(crate) static NEXT_ID: AtomicU32 = AtomicU32::new(0);
2121
/// The async engine instance
2222
pub struct WasmRuntimeAsyncEngine;
2323

@@ -37,6 +37,7 @@ impl<'a> Task<'a> {
3737
pub struct Reactor<'a> {
3838
events: PollTasks,
3939
future_tasks: Vec<Task<'a>>, //right now the engine holds the tasks but depending
40+
timers: DashMap<String, EventWithWaker<Timer>>,
4041
}
4142

4243
impl<'a> Reactor<'a> {
@@ -49,8 +50,13 @@ impl<'a> Reactor<'a> {
4950
self.events.contains(key)
5051
}
5152

53+
//checks if timer is pollable
54+
pub fn is_timer_pollable(&self, key: &str) -> bool {
55+
self.timers.contains_key(key)
56+
}
57+
5258
//polls event queue to see if any of the events are readycar
53-
pub fn wait(&mut self) {
59+
pub fn wait_for_io(&mut self) {
5460
self.events.wait_for_pollables();
5561
}
5662

@@ -62,6 +68,26 @@ impl<'a> Reactor<'a> {
6268
pub fn is_empty(&self) -> bool {
6369
self.events.is_empty() && self.future_tasks.is_empty()
6470
}
71+
72+
pub(crate) fn update_timers(&self) {
73+
self.timers.iter_mut().for_each(|mut cell| {
74+
cell.0.update_elapsed();
75+
if cell.0.elapsed() {
76+
cell.1.wake_by_ref();
77+
}
78+
});
79+
}
80+
81+
pub(crate) fn timer_has_elapsed(&self, timer_key: &str) -> bool {
82+
self.timers
83+
.get(timer_key)
84+
.map(|s| s.0.elapsed())
85+
.unwrap_or_default()
86+
}
87+
88+
pub(crate) fn register_timer(&mut self, timer_name: String, event: EventWithWaker<Timer>) {
89+
self.timers.insert(timer_name, event);
90+
}
6591
}
6692

6793
impl WasmRuntimeAsyncEngine {
@@ -75,14 +101,9 @@ impl WasmRuntimeAsyncEngine {
75101
let mut future_tasks = Vec::new();
76102
future_tasks.push(task);
77103
loop {
78-
TIMERS.iter_mut().for_each(|mut cell| {
79-
cell.0.update_elapsed();
80-
if cell.0.elapsed() {
81-
cell.1.wake_by_ref();
82-
}
83-
});
84104
let mut reactor = REACTOR.lock().unwrap();
85-
reactor.wait();
105+
reactor.update_timers();
106+
reactor.wait_for_io();
86107
reactor.future_tasks.retain_mut(|task_info| {
87108
if task_info.waker.should_wake() {
88109
task_info.waker.reset();

src/io/net.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use uuid::Uuid;
2+
13
use crate::{
24
bindings::wasi::{
35
io::{
@@ -11,7 +13,7 @@ use crate::{
1113
tcp_create_socket::{create_tcp_socket, ErrorCode},
1214
},
1315
},
14-
engine::{NEXT_ID, REACTOR},
16+
engine::REACTOR,
1517
};
1618
use std::io::ErrorKind;
1719
use std::net::IpAddr;
@@ -59,10 +61,7 @@ impl TcpStream {
5961
pub async fn connect<T: Into<IpAddress>>(&mut self, address: T, port: u16) -> IOResult<()> {
6062
let connect_future = ConnectionFuture {
6163
stream: self,
62-
async_key: format!(
63-
"socket-connection={}",
64-
NEXT_ID.load(std::sync::atomic::Ordering::Relaxed)
65-
),
64+
async_key: format!("socket-connection={}", Uuid::new_v4()),
6665
address: address.into(),
6766
port,
6867
};
@@ -160,7 +159,6 @@ impl<'a> Future for ConnectionFuture<'a> {
160159
let this = self.get_mut();
161160
if !REACTOR.lock().unwrap().is_pollable(&this.async_key) {
162161
this.stream.start_connect(this.address, this.port)?;
163-
NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
164162
REACTOR.lock().unwrap().register(
165163
this.async_key.clone(),
166164
(this.stream.pollable.clone(), cx.waker().clone()),

src/io/timer.rs

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
use crate::engine::REACTOR;
2+
use crate::poll_tasks::EventWithWaker;
13
use dashmap::DashMap;
24
use lazy_static::lazy_static;
35
use std::future::Future;
46
use std::time::Duration;
57
use std::time::Instant;
6-
7-
use crate::engine::NEXT_ID;
8-
use crate::poll_tasks::EventWithWaker;
8+
use uuid::Uuid;
99
lazy_static! {
1010
pub static ref TIMERS: DashMap<u32, EventWithWaker<Timer>> = DashMap::new();
1111
}
@@ -21,7 +21,7 @@ impl Timer {
2121
/// create a timer that resolves once it elapses
2222
pub async fn sleep(until: std::time::Duration) {
2323
let timeout = TimeoutFuture {
24-
id: None,
24+
timer_key: format!("sleep-{}", Uuid::new_v4()),
2525
timer: Self {
2626
at: Instant::now(),
2727
until,
@@ -36,7 +36,6 @@ impl Timer {
3636
.checked_duration_since(self.at)
3737
.map(|s| s > self.until)
3838
.unwrap_or_default();
39-
println!("elapsed {elapsed}");
4039
self.elapsed = elapsed;
4140
}
4241

@@ -46,7 +45,7 @@ impl Timer {
4645
}
4746

4847
struct TimeoutFuture {
49-
id: Option<u32>,
48+
timer_key: String,
5049
timer: Timer,
5150
}
5251

@@ -57,19 +56,17 @@ impl Future for TimeoutFuture {
5756
cx: &mut std::task::Context<'_>,
5857
) -> std::task::Poll<Self::Output> {
5958
let this = self.get_mut();
60-
match this.id {
61-
Some(id) => {
62-
let has_elapsed = TIMERS.get(&id).map(|s| s.0.elapsed()).unwrap_or_default();
63-
if has_elapsed {
64-
return std::task::Poll::Ready(());
65-
}
66-
}
67-
None => {
68-
let id = NEXT_ID.load(std::sync::atomic::Ordering::Relaxed);
69-
TIMERS.insert(id, (this.timer.clone(), cx.waker().clone()));
70-
NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
71-
let _ = this.id.insert(id);
59+
let mut reactor = REACTOR.lock().unwrap();
60+
if reactor.is_timer_pollable(&this.timer_key) {
61+
let has_elapsed = reactor.timer_has_elapsed(&this.timer_key);
62+
if has_elapsed {
63+
return std::task::Poll::Ready(());
7264
}
65+
} else {
66+
reactor.register_timer(
67+
this.timer_key.clone(),
68+
(this.timer.clone(), cx.waker().clone()),
69+
);
7370
}
7471

7572
std::task::Poll::Pending

0 commit comments

Comments
 (0)