Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ readme = "README.md"
description = "Experimental structured concurrency support for async Rust (similar to trio's nurseries). Works with non-Send futures on stable Rust."

[dependencies]
futures = "0.3.21"
async-trait = "0.1.56"
pin-project = "1.1.5"
futures-channel = { version = "0.3.21", default-features = false, features = ["alloc"] }
futures-core = { version = "0.3.21", default-features = false, features = ["alloc"] }
futures-util = { version = "0.3.21", default-features = false, features = ["alloc"] }
pin-project-lite = "0.2.14"

[dev-dependencies]
futures = "0.3.21"
anyhow = "1"
tokio = { version = "1.17.0", features = ["full"] }
58 changes: 29 additions & 29 deletions src/body.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,36 @@
use std::{pin::Pin, rc::Rc, task::Poll};

use futures::Future;
use pin_project::{pin_project, pinned_drop};
use futures_core::Future;
use pin_project_lite::pin_project;

use crate::scope::Scope;

/// The future for a scope's "body".
///
/// It is not considered complete until (a) the body is done and (b) any spawned futures are done.
/// Its result is whatever the body returned.
///
/// # Unsafe contract
///
/// - `body_future` and `result` will be dropped BEFORE `scope`.
#[pin_project(PinnedDrop)]
pub(crate) struct Body<'scope, 'env: 'scope, R, F>
where
R: 'env,
{
#[pin]
body_future: Option<F>,
result: Option<R>,
scope: Rc<Scope<'scope, 'env, R>>,
pin_project! {
/// The future for a scope's "body".
///
/// It is not considered complete until (a) the body is done and (b) any spawned futures are done.
/// Its result is whatever the body returned.
///
/// # Unsafe contract
///
/// - `body_future` and `result` will be dropped BEFORE `scope`.
pub(crate) struct Body<'scope, 'env: 'scope, R, F>
where
R: 'env,
{
#[pin]
body_future: Option<F>,
result: Option<R>,
scope: Rc<Scope<'scope, 'env, R>>,
}

impl<R, F> PinnedDrop for Body<'_, '_, R, F> {
fn drop(this: Pin<&mut Self>) {
// Fulfill our unsafe contract and ensure we drop other fields
// before we drop scope.
this.clear();
}
}
}

impl<'scope, 'env, R, F> Body<'scope, 'env, R, F> {
Expand All @@ -44,15 +53,6 @@ impl<'scope, 'env, R, F> Body<'scope, 'env, R, F> {
}
}

#[pinned_drop]
impl<'scope, 'env, R, F> PinnedDrop for Body<'scope, 'env, R, F> {
fn drop(self: Pin<&mut Self>) {
// Fulfill our unsafe contract and ensure we drop other fields
// before we drop scope.
self.clear();
}
}

impl<'scope, 'env, R, F> Future for Body<'scope, 'env, R, F>
where
F: Future<Output = R>,
Expand Down Expand Up @@ -80,7 +80,7 @@ where
// so forward that result. Otherwise, the `result` from our body future
// should be available, so return that.
match ready!(this.scope.poll_jobs(cx)) {
Some(v) => return Poll::Ready(v),
Some(v) => Poll::Ready(v),
None => match this.result.take() {
None => Poll::Pending,
Some(v) => Poll::Ready(v),
Expand Down
12 changes: 6 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mod spawned;
///
/// ```rust
/// # futures::executor::block_on(async {
/// let scope = moro::async_scope!(|scope| {/* ... */}).await;
/// let scope = moro_local::async_scope!(|scope| {/* ... */}).await;
/// # });
/// ```
///
Expand All @@ -50,7 +50,7 @@ mod spawned;
/// ```rust
/// # futures::executor::block_on(async {
/// let r = 22;
/// let scope = moro::async_scope!(|scope| {
/// let scope = moro_local::async_scope!(|scope| {
/// // OK to refer to `r` here
/// scope.spawn(async { r }).await
/// });
Expand All @@ -65,7 +65,7 @@ mod spawned;
///
/// ```rust,compile_fail,E0373
/// # futures::executor::block_on(async {
/// let scope = moro::async_scope!(|scope| {
/// let scope = moro_local::async_scope!(|scope| {
/// let r = 22;
//
/// // NOT ok to refer to `r` now, because `r`
Expand All @@ -88,7 +88,7 @@ mod spawned;
/// ```rust
/// # futures::executor::block_on(async {
/// let v = vec![1, 2, 3, 5];
/// let scope = moro::async_scope!(|scope| {
/// let scope = moro_local::async_scope!(|scope| {
/// let job = scope.spawn(async {
/// let r: i32 = v.iter().sum();
/// r
Expand All @@ -108,7 +108,7 @@ mod spawned;
///
/// ```rust
/// # futures::executor::block_on(async {
/// let scope = moro::async_scope!(|scope| -> Result<(), u32> {
/// let scope = moro_local::async_scope!(|scope| -> Result<(), u32> {
/// Err(22) // Ok type would otherwise be unconstrained
/// });
/// let result = scope.await;
Expand Down Expand Up @@ -139,7 +139,7 @@ macro_rules! async_scope {
}};
}

use futures::future::LocalBoxFuture;
use futures_core::future::LocalBoxFuture;

pub use self::scope::Scope;
pub use self::scope_body::ScopeBody;
Expand Down
8 changes: 5 additions & 3 deletions src/scope.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::{cell::RefCell, marker::PhantomData, pin::Pin, rc::Rc, task::Poll};

use futures::{channel::oneshot, future::LocalBoxFuture, stream::FuturesUnordered, Future, Stream};
use futures_channel::oneshot;
use futures_core::{future::LocalBoxFuture, Future, Stream};
use futures_util::stream::FuturesUnordered;

use crate::Spawned;

Expand Down Expand Up @@ -80,7 +82,7 @@ impl<'scope, 'env, R> Scope<'scope, 'env, R> {
///
/// ```rust
/// # futures::executor::block_on(async {
/// let result = moro::async_scope!(|scope| {
/// let result = moro_local::async_scope!(|scope| {
/// scope.spawn(async { /* ... */ });
///
/// // Calling `scope.terminate` here will terminate the async
Expand All @@ -98,7 +100,7 @@ impl<'scope, 'env, R> Scope<'scope, 'env, R> {
T: 'scope,
{
if self.terminated.borrow().is_none() {
self.terminated.replace(Some(value.into()));
self.terminated.replace(Some(value));
}

// The code below will never run
Expand Down
19 changes: 10 additions & 9 deletions src/scope_body.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::pin::Pin;

use futures::Future;
use pin_project::pin_project;
use futures_core::Future;
use pin_project_lite::pin_project;

use crate::body::Body;

#[pin_project]
pub struct ScopeBody<'env, R: 'env, F>
where
F: Future<Output = R>,
{
#[pin]
body: Body<'env, 'env, R, F>,
pin_project! {
pub struct ScopeBody<'env, R: 'env, F>
where
F: Future<Output = R>,
{
#[pin]
body: Body<'env, 'env, R, F>,
}
}

impl<'env, R, F> ScopeBody<'env, R, F>
Expand Down
2 changes: 1 addition & 1 deletion src/spawned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::pin::Pin;

use crate::prelude::*;
use crate::Scope;
use futures::Future;
use futures_core::Future;

pub struct Spawned<F> {
f: F,
Expand Down