Skip to content

Commit 45ebca1

Browse files
Change around synchronization primitives
Instead of the lock around the reactor: - use dashmaps on the future tasks - use mutex for the poll tasks - use integrated tests for testing functionality
1 parent b1e00aa commit 45ebca1

File tree

5 files changed

+100
-44
lines changed

5 files changed

+100
-44
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[package]
2-
name = "wasm-runtime"
2+
name = "tiny-wasm-runtime"
33
version = "0.1.0"
44
edition = "2021"
55

src/engine.rs

Lines changed: 50 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use crate::Timer;
22
use crate::{bindings::wasi::io::poll::Pollable, poll_tasks::PollTasks};
33
use crate::{io::timer::TIMERS, poll_tasks::EventWithWaker};
44
use dashmap::DashMap;
5-
use futures::pin_mut;
65
use lazy_static::lazy_static;
76
use std::{
87
future::Future,
@@ -13,9 +12,10 @@ use std::{
1312
},
1413
task::{Context, Wake, Waker},
1514
};
15+
use uuid::Uuid;
1616
lazy_static! {
1717
/// The global reactor for this runtime
18-
pub static ref REACTOR: Mutex<Reactor<'static>> = Mutex::new(Reactor::default());
18+
pub static ref REACTOR: Reactor<'static> = Reactor::default();
1919

2020
}
2121
/// The async engine instance
@@ -35,19 +35,19 @@ impl<'a> Task<'a> {
3535
}
3636
#[derive(Default)]
3737
pub struct Reactor<'a> {
38-
events: PollTasks,
39-
future_tasks: Vec<Task<'a>>, //right now the engine holds the tasks but depending
38+
events: Mutex<PollTasks>,
39+
future_tasks: DashMap<Uuid, Mutex<Task<'a>>>, //right now the engine holds the tasks but depending
4040
timers: DashMap<String, EventWithWaker<Timer>>,
4141
}
4242

4343
impl<'a> Reactor<'a> {
4444
//adds event to the queue
45-
pub fn register(&mut self, event_name: String, pollable: EventWithWaker<Arc<Pollable>>) {
46-
self.events.push(event_name, pollable);
45+
pub fn register(&self, event_name: String, pollable: EventWithWaker<Arc<Pollable>>) {
46+
self.events.lock().unwrap().push(event_name, pollable);
4747
}
4848
//checks if descriptor has been added to the polling queue
4949
pub fn is_pollable(&self, key: &str) -> bool {
50-
self.events.contains(key)
50+
self.events.lock().unwrap().contains(key)
5151
}
5252

5353
//checks if timer is pollable
@@ -56,17 +56,17 @@ impl<'a> Reactor<'a> {
5656
}
5757

5858
//polls event queue to see if any of the events are readycar
59-
pub fn wait_for_io(&mut self) {
60-
self.events.wait_for_pollables();
59+
pub fn wait_for_io(&self) {
60+
self.events.lock().unwrap().wait_for_pollables();
6161
}
6262

6363
//checks if event is ready
64-
pub fn check_ready(&mut self, event_name: &str) -> bool {
65-
self.events.check_if_ready(event_name)
64+
pub fn check_ready(&self, event_name: &str) -> bool {
65+
self.events.lock().unwrap().check_if_ready(event_name)
6666
}
6767

6868
pub fn is_empty(&self) -> bool {
69-
self.events.is_empty() && self.future_tasks.is_empty()
69+
self.events.lock().unwrap().is_empty() && self.future_tasks.is_empty()
7070
}
7171

7272
pub(crate) fn update_timers(&self) {
@@ -85,47 +85,58 @@ impl<'a> Reactor<'a> {
8585
.unwrap_or_default()
8686
}
8787

88-
pub(crate) fn register_timer(&mut self, timer_name: String, event: EventWithWaker<Timer>) {
88+
pub(crate) fn register_timer(&self, timer_name: String, event: EventWithWaker<Timer>) {
8989
self.timers.insert(timer_name, event);
9090
}
91-
}
9291

93-
impl WasmRuntimeAsyncEngine {
94-
/// function to execute futures
95-
pub fn block_on<K, F: Future<Output = K> + Send, Fun: FnOnce() -> F>(async_closure: Fun) {
96-
let future = async_closure();
97-
pin_mut!(future);
92+
pub(crate) fn push_task<K, F: Future<Output = K> + Send + 'static>(&self, future: F) -> Uuid {
9893
let task = Task::new(Box::pin(async move {
9994
let _result = future.await;
10095
}));
101-
let mut future_tasks = Vec::new();
102-
future_tasks.push(task);
96+
let id = Uuid::new_v4();
97+
self.future_tasks.insert(id, Mutex::new(task));
98+
id
99+
}
100+
}
101+
102+
impl WasmRuntimeAsyncEngine {
103+
/// function to execute futures
104+
pub fn block_on<K, F: Future<Output = K> + Send + 'static>(future: F) {
105+
let reactor = &REACTOR;
106+
reactor.push_task(future);
103107
loop {
104-
let mut reactor = REACTOR.lock().unwrap();
105108
reactor.update_timers();
106109
reactor.wait_for_io();
107-
reactor.future_tasks.retain_mut(|task_info| {
108-
if task_info.waker.should_wake() {
109-
task_info.waker.reset();
110-
let waker: Waker = task_info.waker.clone().into();
110+
111+
reactor.future_tasks.retain(|_, task_info| {
112+
let waker = task_info.get_mut().unwrap().waker.clone();
113+
if waker.should_wake() {
114+
waker.reset();
115+
let waker: Waker = waker.into();
111116
let mut context = Context::from_waker(&waker);
112-
if task_info.task.as_mut().poll(&mut context).is_ready() {
117+
118+
if task_info
119+
.get_mut()
120+
.unwrap()
121+
.task
122+
.as_mut()
123+
.poll(&mut context)
124+
.is_ready()
125+
{
113126
return false;
114127
}
115128
}
116129
true
117130
});
131+
118132
if TIMERS.is_empty() && reactor.is_empty() {
119133
break;
120134
}
121135
}
122136
}
123137

124138
pub fn spawn<K, F: Future<Output = ()> + Send + 'static>(future: F) {
125-
let task = Task::new(Box::pin(async move {
126-
let _result = future.await;
127-
}));
128-
REACTOR.lock().unwrap().future_tasks.push(task);
139+
REACTOR.push_task(future);
129140
}
130141
}
131142

@@ -188,14 +199,15 @@ mod test {
188199
#[test]
189200
fn test_enqueue() {
190201
let count_future = CountFuture { max: 3, min: 0 };
191-
let mut reactor = Reactor::default();
192-
reactor.future_tasks.push(Task::new(Box::pin(async move {
202+
let reactor = Reactor::default();
203+
let id = reactor.push_task(async move {
193204
count_future.await;
194-
})));
195-
let task = reactor.future_tasks.first_mut().unwrap();
196-
let fut_waker = task.waker.clone();
205+
});
206+
let mut future_task = reactor.future_tasks.get_mut(&id).unwrap();
207+
let task = future_task.value_mut();
208+
let fut_waker = task.lock().unwrap().waker.clone();
197209
let waker: Waker = fut_waker.into();
198-
let count_future = &mut task.task;
210+
let count_future = &mut task.lock().unwrap().task;
199211
let mut context = Context::from_waker(&waker);
200212
futures::pin_mut!(count_future);
201213
let _ = count_future.as_mut().poll(&mut context);
@@ -205,6 +217,6 @@ mod test {
205217
fn test_block_on() {
206218
let count_future = CountFuture { max: 3, min: 0 };
207219

208-
WasmRuntimeAsyncEngine::block_on(|| async move { assert_eq!(count_future.await, 3) });
220+
WasmRuntimeAsyncEngine::block_on(async move { assert_eq!(count_future.await, 3) });
209221
}
210222
}

src/io/net.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,16 +157,16 @@ impl<'a> Future for ConnectionFuture<'a> {
157157
cx: &mut std::task::Context<'_>,
158158
) -> std::task::Poll<Self::Output> {
159159
let this = self.get_mut();
160-
if !REACTOR.lock().unwrap().is_pollable(&this.async_key) {
160+
if !REACTOR.is_pollable(&this.async_key) {
161161
this.stream.start_connect(this.address, this.port)?;
162-
REACTOR.lock().unwrap().register(
162+
REACTOR.register(
163163
this.async_key.clone(),
164164
(this.stream.pollable.clone(), cx.waker().clone()),
165165
);
166166
}
167167

168168
//A PLACE TO CHECK IF THE REACTOR UPDATED THIS KEY
169-
if REACTOR.lock().unwrap().check_ready(&this.async_key) {
169+
if REACTOR.check_ready(&this.async_key) {
170170
this.stream.finish_connecting()?;
171171
Poll::Ready(Ok(()))
172172
} else {

src/io/timer.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ impl Timer {
3434
}
3535

3636
pub async fn timeout<K, F: Future<Output = K>>(
37-
&self,
3837
fut: F,
3938
deadline: std::time::Duration,
4039
) -> std::io::Result<K> {
@@ -75,7 +74,7 @@ impl Future for TimeFuture {
7574
cx: &mut std::task::Context<'_>,
7675
) -> std::task::Poll<Self::Output> {
7776
let this = self.get_mut();
78-
let mut reactor = REACTOR.lock().unwrap();
77+
let reactor = &REACTOR;
7978
if reactor.is_timer_pollable(&this.timer_key) {
8079
let has_elapsed = reactor.timer_has_elapsed(&this.timer_key);
8180
if has_elapsed {

tests/engine.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
use std::sync::atomic::{AtomicBool, Ordering};
2+
use std::sync::Arc;
3+
use tiny_wasm_runtime::{Timer, WasmRuntimeAsyncEngine};
4+
5+
pub async fn test_timers_with_assertions() {
6+
let task_a_done: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
7+
let task_b_done = Arc::new(AtomicBool::new(false));
8+
9+
let task_a_done_clone = task_a_done.clone();
10+
let task_b_done_clone = task_b_done.clone();
11+
println!("spaw goes here");
12+
WasmRuntimeAsyncEngine::spawn::<(), _>(async move {
13+
println!("sleep happens here");
14+
Timer::sleep(std::time::Duration::from_secs(1)).await;
15+
task_a_done_clone.store(true, Ordering::SeqCst);
16+
});
17+
18+
println!("spaw goes here 2");
19+
20+
WasmRuntimeAsyncEngine::spawn::<(), _>(async move {
21+
Timer::sleep(std::time::Duration::from_millis(500)).await;
22+
task_b_done_clone.store(true, Ordering::SeqCst);
23+
});
24+
25+
// Do main timeout
26+
let slow_future = async {
27+
Timer::sleep(std::time::Duration::from_secs(2)).await;
28+
"completed"
29+
};
30+
31+
let res = Timer::timeout(slow_future, std::time::Duration::from_secs(1)).await;
32+
33+
assert!(res.is_err(), "Expected the main future to time out");
34+
assert!(
35+
task_a_done.load(Ordering::SeqCst) || task_b_done.load(Ordering::SeqCst),
36+
"At least one background task should have completed by now"
37+
);
38+
}
39+
40+
#[test]
41+
fn test_full_engine_runtime() {
42+
WasmRuntimeAsyncEngine::block_on(async {
43+
test_timers_with_assertions().await;
44+
});
45+
}

0 commit comments

Comments
 (0)