From d478a91d1dc619e71842ba199b6ba59a1018845f Mon Sep 17 00:00:00 2001 From: Luke Zbihlyj Date: Sat, 1 Nov 2025 10:57:01 -0700 Subject: [PATCH 1/4] Read full stream before returning first result to ensure it is not dropped prematurely, fixes #269 --- src/run_query_dsl/utils.rs | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/src/run_query_dsl/utils.rs b/src/run_query_dsl/utils.rs index 22f5891..f14a268 100644 --- a/src/run_query_dsl/utils.rs +++ b/src/run_query_dsl/utils.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use diesel::QueryResult; -use futures_core::{ready, TryFuture, TryStream}; +use futures_core::{TryFuture, TryStream}; use futures_util::{TryFutureExt, TryStreamExt}; // We use a custom future implementation here to erase some lifetimes @@ -80,14 +80,23 @@ where /// Converts a stream into a future, only yielding the first element. /// Based on [`futures_util::stream::StreamFuture`]. -pub struct LoadNext { - stream: Option, +/// +/// Consumes the entire stream to ensure proper cleanup before returning which is +/// required to fix: https://github.com/weiznich/diesel_async/issues/269 +pub struct LoadNext +where + St: TryStream + Unpin, +{ + future: futures_util::stream::TryCollect>, } -impl LoadNext { +impl LoadNext +where + St: TryStream + Unpin, +{ pub(crate) fn new(stream: St) -> Self { Self { - stream: Some(stream), + future: stream.try_collect(), } } } @@ -99,14 +108,15 @@ where type Output = QueryResult; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let first = { - let s = self.stream.as_mut().expect("polling LoadNext twice"); - ready!(s.try_poll_next_unpin(cx)) - }; - self.stream = None; - match first { - Some(first) => Poll::Ready(first), - None => Poll::Ready(Err(diesel::result::Error::NotFound)), + match Pin::new(&mut self.future).poll(cx) { + Poll::Ready(Ok(results)) => { + match results.into_iter().next() { + Some(first) => Poll::Ready(Ok(first)), + None => Poll::Ready(Err(diesel::result::Error::NotFound)), + } + } + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => Poll::Pending, } } } From 86104c2c882c5f54d07ea0ca12049edf516ee453 Mon Sep 17 00:00:00 2001 From: Luke Zbihlyj Date: Sat, 1 Nov 2025 11:07:46 -0700 Subject: [PATCH 2/4] Run cargo fmt to fix styling --- src/run_query_dsl/utils.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/run_query_dsl/utils.rs b/src/run_query_dsl/utils.rs index f14a268..aff9761 100644 --- a/src/run_query_dsl/utils.rs +++ b/src/run_query_dsl/utils.rs @@ -109,12 +109,10 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match Pin::new(&mut self.future).poll(cx) { - Poll::Ready(Ok(results)) => { - match results.into_iter().next() { - Some(first) => Poll::Ready(Ok(first)), - None => Poll::Ready(Err(diesel::result::Error::NotFound)), - } - } + Poll::Ready(Ok(results)) => match results.into_iter().next() { + Some(first) => Poll::Ready(Ok(first)), + None => Poll::Ready(Err(diesel::result::Error::NotFound)), + }, Poll::Ready(Err(e)) => Poll::Ready(Err(e)), Poll::Pending => Poll::Pending, } From cd5baabf51fa7e769b756dacaa9ca1df2c4a6e90 Mon Sep 17 00:00:00 2001 From: Luke Zbihlyj Date: Wed, 5 Nov 2025 14:48:14 -0800 Subject: [PATCH 3/4] Follow structure of other util structs --- src/run_query_dsl/utils.rs | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/run_query_dsl/utils.rs b/src/run_query_dsl/utils.rs index aff9761..a33e85e 100644 --- a/src/run_query_dsl/utils.rs +++ b/src/run_query_dsl/utils.rs @@ -83,29 +83,27 @@ where /// /// Consumes the entire stream to ensure proper cleanup before returning which is /// required to fix: https://github.com/weiznich/diesel_async/issues/269 -pub struct LoadNext -where - St: TryStream + Unpin, -{ - future: futures_util::stream::TryCollect>, +#[repr(transparent)] +pub struct LoadNext { + future: futures_util::stream::TryCollect>, } -impl LoadNext +impl LoadNext where - St: TryStream + Unpin, + T: TryStream + Unpin, { - pub(crate) fn new(stream: St) -> Self { + pub(crate) fn new(stream: T) -> Self { Self { future: stream.try_collect(), } } } -impl Future for LoadNext +impl Future for LoadNext where - St: TryStream + Unpin, + T: TryStream + Unpin, { - type Output = QueryResult; + type Output = QueryResult; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match Pin::new(&mut self.future).poll(cx) { From d060cbc59189ea95835429969bc64df35dd259fc Mon Sep 17 00:00:00 2001 From: Luke Zbihlyj Date: Wed, 5 Nov 2025 15:09:58 -0800 Subject: [PATCH 4/4] Perform type erasure by returning result as T instead of St::Ok --- src/run_query_dsl/mod.rs | 2 +- src/run_query_dsl/utils.rs | 30 ++++++++++++++++++++---------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/src/run_query_dsl/mod.rs b/src/run_query_dsl/mod.rs index 18a9fda..9625b43 100644 --- a/src/run_query_dsl/mod.rs +++ b/src/run_query_dsl/mod.rs @@ -146,7 +146,7 @@ pub mod return_futures { /// /// This is essentially `impl Future>` pub type GetResult<'conn, 'query, Q: LoadQuery<'query, Conn, U>, Conn, U> = - utils::AndThen, utils::LoadNext>>>>; + utils::AndThen, utils::LoadNext>>, U>>; } /// Methods used to execute queries. diff --git a/src/run_query_dsl/utils.rs b/src/run_query_dsl/utils.rs index a33e85e..3d63927 100644 --- a/src/run_query_dsl/utils.rs +++ b/src/run_query_dsl/utils.rs @@ -4,6 +4,7 @@ use std::task::{Context, Poll}; use diesel::QueryResult; use futures_core::{TryFuture, TryStream}; +use futures_util::stream::TryCollect; use futures_util::{TryFutureExt, TryStreamExt}; // We use a custom future implementation here to erase some lifetimes @@ -84,29 +85,38 @@ where /// Consumes the entire stream to ensure proper cleanup before returning which is /// required to fix: https://github.com/weiznich/diesel_async/issues/269 #[repr(transparent)] -pub struct LoadNext { - future: futures_util::stream::TryCollect>, +pub struct LoadNext +where + F: TryStream, +{ + future: TryCollect>, } -impl LoadNext +impl LoadNext where - T: TryStream + Unpin, + F: TryStream, { - pub(crate) fn new(stream: T) -> Self { + pub(crate) fn new(stream: F) -> Self { Self { future: stream.try_collect(), } } } -impl Future for LoadNext +impl Future for LoadNext where - T: TryStream + Unpin, + F: TryStream, + TryCollect>: Future, diesel::result::Error>>, { - type Output = QueryResult; + type Output = QueryResult; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.future).poll(cx) { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match unsafe { + // SAFETY: This projects pinning to the only inner field + self.map_unchecked_mut(|s| &mut s.future) + } + .poll(cx) + { Poll::Ready(Ok(results)) => match results.into_iter().next() { Some(first) => Poll::Ready(Ok(first)), None => Poll::Ready(Err(diesel::result::Error::NotFound)),