Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 82 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
This library is designed as an adapter for `futures_util::TryStream`, allowing for easy parsing of an incoming byte stream (such as from an HTTP response) and splitting it into multiple parts (`Part`). It is especially useful for handling `multipart/byteranges` HTTP responses.

A common use case is sending an HTTP Range request to a server and then parsing the resulting `multipart/byteranges` response body.
The example below demonstrates how to use reqwest to download multiple ranges of a file and parse the individual parts using multipart_stream.
The example below demonstrates how to use reqwest to download multiple ranges of a file and parse the individual parts using `multipart_stream`.

```rust
use multipart_async_stream::{LendingIterator, MultipartStream, TryStreamExt, header::CONTENT_TYPE};
Expand Down Expand Up @@ -39,7 +39,86 @@ The output of the program above is:

```bash
{"content-type": "image/png", "content-range": "bytes 0-31/10845"}
b"\x89PNG\r\n\x1a\n\0\0\0\rIHDR\0\0\0\xf4\0\0\0B\x08\x06\0\0\0`\xbc\xfb"
body streaming: b"\x89PNG\r\n\x1a\n\0\0\0\rIHDR\0\0\0\xf4\0\0\0B\x08\x06\0\0\0`\xbc\xfb"
{"content-type": "image/png", "content-range": "bytes 64-127/10845"}
b"L:com.adobe.xmp\0\0\0\0\0<?xpacket begin=\"\xef\xbb\xbf\" id=\"W5M0MpCehiHzreSzNT"
body streaming: b"L:com.adobe.xmp\0\0\0\0"
body streaming: b"\0<?xpacket begin=\"\xef\xbb\xbf\" id=\"W5M0MpCehiHzreSzNT"
```

## Important Usage Note: Consuming the Part Body

When using this library, a critical point is that you must completely consume the body stream of the current Part before requesting the next one.
If you call `m.next().await` before the previous Part's body has been fully read to its end (i.e., the stream returns `None`), `MultipartStream` will return an `Error::BodyNotConsumed` error.
This is because MultipartStream can only begin parsing the boundary and headers for the next part after the current part's body data stream has ended. An instance of Part internally holds a mutable borrow of the main stream, effectively locking its state. Only when this body stream is fully consumed (and the Part is dropped) can the main stream's state advance.

### ❌ Incorrect Example

The following code will trigger a `BodyNotConsumed` error because it gets the first part but immediately tries to get the next one without consuming the first part's body.

```rust
use multipart_async_stream::{MultipartStream, LendingIterator, Error};
// ... other imports ...

#[tokio::test]
async fn test_body_not_consumed_error_example() {
const BOUNDARY: &str = "boundary";
const BODY: &[u8] = b"\
--boundary\r\n\
Content-Disposition: form-data; name=\"field1\"\r\n\
\r\n\
value1\r\n\
--boundary\r\n\
Content-Disposition: form-data; name=\"field2\"\r\n\
\r\n\
value2\r\n\
--boundary--\r\n";

let stream = create_stream_from_chunks(BODY, BODY.len()); // Assuming create_stream_from_chunks is a test helper
let mut m = MultipartStream::new(stream, BOUNDARY.as_bytes());

// Get the first part, but we don't process its body
let _part1 = m.next().await.unwrap().unwrap();

// Immediately try to get the next part while part1's body is not consumed.
// In practice, Rust's borrow checker would prevent this code from even compiling,
// as `_part1` holds an active mutable borrow of `m`. The runtime error
// shown here acts as a logical safeguard.
let result = m.next().await;

// This will fail!
assert!(matches!(result, Some(Err(Error::BodyNotConsumed))));
println!("Received expected error: {:?}", result.unwrap().err().unwrap());
}
```

### ✅ Correct Example

The correct approach is to use a loop to ensure the body stream is fully read. The part object is implicitly dropped at the end of the loop's iteration, which releases the lock on the main stream.

```rust
use multipart_async_stream::{MultipartStream, LendingIterator, TryStreamExt};
// ... other imports ...

async fn correct_usage_example() {
// ... (setup for stream and boundary) ...
# const BOUNDARY: &str = "boundary";
# const BODY: &[u8] = b"--boundary\r\n\r\nvalue1\r\n--boundary\r\n\r\nvalue2\r\n--boundary--";
# let stream = futures_util::stream::iter(vec![Ok::<_, std::io::Error>(bytes::Bytes::from_static(BODY))]);
# let mut m = MultipartStream::new(stream, BOUNDARY.as_bytes());

// Use a while let loop to iterate over all parts
while let Some(Ok(part)) = m.next().await {
println!("Headers: {:?}", part.headers());

// Create an inner loop to consume all body chunks of the current part
let mut body = part.body();
while let Ok(Some(chunk)) = body.try_next().await {
// Process the chunk...
println!("Got a body chunk: {:?}", chunk);
}
// When this inner loop finishes, the body stream is exhausted.
// The `part` will be dropped at the end of this main loop's iteration.
// Now it's safe to proceed to the next call of `m.next().await`.
}
}
```
2 changes: 1 addition & 1 deletion examples/demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn main() {
println!("{:?}", part.headers());
let mut body = part.body();
while let Ok(Some(b)) = body.try_next().await {
println!("{:?}", b);
println!("body streaming: {:?}", b);
}
}
}
85 changes: 69 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,7 @@ where
fn update_scan(&mut self, new_scan: usize) {
use ParserState::*;
match &mut self.state {
Preamble(scan) | ReadingHeaders(scan) | StreamingBody(scan) => {
debug_assert!(new_scan > *scan);
*scan = new_scan
}
Preamble(scan) | ReadingHeaders(scan) | StreamingBody(scan) => *scan = new_scan,
Finished => unreachable!("cannot invoke add_scan on finished state"), /* 几乎不可能会在完成状态继续update
* scan */
}
Expand All @@ -102,7 +99,7 @@ where
use ParserState::*;
use Poll::*;
let pattern_len = self.pattern.len();
let sub_pattern_len = pattern_len - 2;
let sub_pattern_len = pattern_len - 2; // check tail soon
loop {
let prev_buf_len = self.buf.len();
let scan = match self.state {
Expand All @@ -121,7 +118,7 @@ where
Some(CRLF) => {
// multipart 的流没有结束,开始下一个 part headers 的解析,
// 此时立刻调用此函数只会返回 none
self.state = Preamble(0);
self.state = Preamble(0); // 没有立刻清除边界,帮助 preamble 定位
let chunk = self.buf.split_to(pattern_start).freeze();
return Ready(Some(Ok(chunk)));
}
Expand All @@ -135,21 +132,20 @@ where
}
Some(_) => {
// 恰好有和模式一样的内容在 body 中
let new_scan = self.buf.len() - sub_pattern_len + 1;
if new_scan == scan {
return Ready(Some(Err(ParseError::BufferNoChange.into())));
}
self.update_scan(new_scan);
let last_window_end = self.buf.len() - sub_pattern_len;
let chunk = self.buf.split_to(last_window_end).freeze();
self.update_scan(0);
return Ready(Some(Ok(chunk)));
}
// 继续接收来判断后两个字节
None => {}
}
} else {
let new_scan = self.buf.len() - sub_pattern_len + 1;
if new_scan == scan {
return Ready(Some(Err(ParseError::BufferNoChange.into())));
}
self.update_scan(new_scan);
// 返回前面的字节(因为完全匹配不到边界),保留 窗口 -1 字节,然后将 scan 更新到 0
let last_window_end = self.buf.len() - sub_pattern_len + 1;
let chunk = self.buf.split_to(last_window_end).freeze();
self.update_scan(0);
return Ready(Some(Ok(chunk)));
}
}

Expand Down Expand Up @@ -552,4 +548,61 @@ body\r\n\
panic!("Expected a ParseError::Other with InvalidHeaderName");
}
}
#[tokio::test]
async fn test_streaming_body() {
const BOUNDARY: &str = "boundary";
const PART1_BODY: &[u8] = b"This is the first part's body, which is quite long to demonstrate streaming.";
const PART2_BODY: &[u8] = b"This is the second part, which is also streamed.";
// 使用 format! 来构建 body,避免手动拼接字符串可能引入的错误
let body_content = format!(
"--{boundary}\r\nContent-Disposition: form-data; \
name=\"field1\"\r\n\r\n{part1_body}\r\n--{boundary}\r\nContent-Disposition: form-data; \
name=\"field2\"\r\n\r\n{part2_body}\r\n--{boundary}--\r\n",
boundary = BOUNDARY,
part1_body = std::str::from_utf8(PART1_BODY).unwrap(),
part2_body = std::str::from_utf8(PART2_BODY).unwrap(),
);
let body_bytes = body_content.as_bytes();
let chunk_size = 10;
// 使用一个很小的块大小来确保 body 是以流的形式被处理的
let stream = create_stream_from_chunks(body_bytes, chunk_size);
let mut multipart_stream = MultipartStream::new(stream, BOUNDARY.as_bytes());

// --- 处理第一部分 ---
let part1 = multipart_stream.next().await.unwrap().unwrap();
assert_eq!(part1.headers().get("content-disposition").unwrap(), "form-data; name=\"field1\"");

// 手动从 body 流中一块一块地读取数据
let mut body_stream1 = part1.body();
let mut collected_body1 = Vec::new();
let mut i = 0;
while let Some(chunk_result) = body_stream1.try_next().await.unwrap() {
i += 1;
// 断言我们确实收到了非空的数据块
assert!(!chunk_result.is_empty());
collected_body1.extend_from_slice(&chunk_result);
}
drop(body_stream1);
assert_eq!(i, PART1_BODY.len().div_ceil(chunk_size));
// 验证完整接收到的 body 内容是否正确
assert_eq!(collected_body1, PART1_BODY);
i = 0;
// --- 处理第二部分 ---
let part2 = multipart_stream.next().await.unwrap().unwrap();
assert_eq!(part2.headers().get("content-disposition").unwrap(), "form-data; name=\"field2\"");

let mut body_stream2 = part2.body();
let mut collected_body2 = Vec::new();
while let Some(chunk_result) = body_stream2.try_next().await.unwrap() {
i += 1;
assert!(!chunk_result.is_empty());
collected_body2.extend_from_slice(&chunk_result);
}
assert_eq!(collected_body2, PART2_BODY);
drop(body_stream2);
assert_eq!(i, PART2_BODY.len().div_ceil(chunk_size));

// --- 确认流已结束 ---
assert!(multipart_stream.next().await.is_none());
}
}
Loading