diff --git a/README.md b/README.md index 26b1b4c..73c2f6c 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ This library is designed as an adapter for `futures_util::TryStream`, allowing for easy parsing of an incoming byte stream (such as from an HTTP response) and splitting it into multiple parts (`Part`). It is especially useful for handling `multipart/byteranges` HTTP responses. A common use case is sending an HTTP Range request to a server and then parsing the resulting `multipart/byteranges` response body. -The example below demonstrates how to use reqwest to download multiple ranges of a file and parse the individual parts using multipart_stream. +The example below demonstrates how to use reqwest to download multiple ranges of a file and parse the individual parts using `multipart_stream`. ```rust use multipart_async_stream::{LendingIterator, MultipartStream, TryStreamExt, header::CONTENT_TYPE}; @@ -39,7 +39,86 @@ The output of the program above is: ```bash {"content-type": "image/png", "content-range": "bytes 0-31/10845"} -b"\x89PNG\r\n\x1a\n\0\0\0\rIHDR\0\0\0\xf4\0\0\0B\x08\x06\0\0\0`\xbc\xfb" +body streaming: b"\x89PNG\r\n\x1a\n\0\0\0\rIHDR\0\0\0\xf4\0\0\0B\x08\x06\0\0\0`\xbc\xfb" {"content-type": "image/png", "content-range": "bytes 64-127/10845"} -b"L:com.adobe.xmp\0\0\0\0\0(bytes::Bytes::from_static(BODY))]); + # let mut m = MultipartStream::new(stream, BOUNDARY.as_bytes()); + + // Use a while let loop to iterate over all parts + while let Some(Ok(part)) = m.next().await { + println!("Headers: {:?}", part.headers()); + + // Create an inner loop to consume all body chunks of the current part + let mut body = part.body(); + while let Ok(Some(chunk)) = body.try_next().await { + // Process the chunk... + println!("Got a body chunk: {:?}", chunk); + } + // When this inner loop finishes, the body stream is exhausted. + // The `part` will be dropped at the end of this main loop's iteration. + // Now it's safe to proceed to the next call of `m.next().await`. + } +} ``` \ No newline at end of file diff --git a/examples/demo.rs b/examples/demo.rs index d39c360..8102b47 100644 --- a/examples/demo.rs +++ b/examples/demo.rs @@ -19,7 +19,7 @@ async fn main() { println!("{:?}", part.headers()); let mut body = part.body(); while let Ok(Some(b)) = body.try_next().await { - println!("{:?}", b); + println!("body streaming: {:?}", b); } } } diff --git a/src/lib.rs b/src/lib.rs index a929f05..62d6325 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -88,10 +88,7 @@ where fn update_scan(&mut self, new_scan: usize) { use ParserState::*; match &mut self.state { - Preamble(scan) | ReadingHeaders(scan) | StreamingBody(scan) => { - debug_assert!(new_scan > *scan); - *scan = new_scan - } + Preamble(scan) | ReadingHeaders(scan) | StreamingBody(scan) => *scan = new_scan, Finished => unreachable!("cannot invoke add_scan on finished state"), /* 几乎不可能会在完成状态继续update * scan */ } @@ -102,7 +99,7 @@ where use ParserState::*; use Poll::*; let pattern_len = self.pattern.len(); - let sub_pattern_len = pattern_len - 2; + let sub_pattern_len = pattern_len - 2; // check tail soon loop { let prev_buf_len = self.buf.len(); let scan = match self.state { @@ -121,7 +118,7 @@ where Some(CRLF) => { // multipart 的流没有结束,开始下一个 part headers 的解析, // 此时立刻调用此函数只会返回 none - self.state = Preamble(0); + self.state = Preamble(0); // 没有立刻清除边界,帮助 preamble 定位 let chunk = self.buf.split_to(pattern_start).freeze(); return Ready(Some(Ok(chunk))); } @@ -135,21 +132,20 @@ where } Some(_) => { // 恰好有和模式一样的内容在 body 中 - let new_scan = self.buf.len() - sub_pattern_len + 1; - if new_scan == scan { - return Ready(Some(Err(ParseError::BufferNoChange.into()))); - } - self.update_scan(new_scan); + let last_window_end = self.buf.len() - sub_pattern_len; + let chunk = self.buf.split_to(last_window_end).freeze(); + self.update_scan(0); + return Ready(Some(Ok(chunk))); } // 继续接收来判断后两个字节 None => {} } } else { - let new_scan = self.buf.len() - sub_pattern_len + 1; - if new_scan == scan { - return Ready(Some(Err(ParseError::BufferNoChange.into()))); - } - self.update_scan(new_scan); + // 返回前面的字节(因为完全匹配不到边界),保留 窗口 -1 字节,然后将 scan 更新到 0 + let last_window_end = self.buf.len() - sub_pattern_len + 1; + let chunk = self.buf.split_to(last_window_end).freeze(); + self.update_scan(0); + return Ready(Some(Ok(chunk))); } } @@ -552,4 +548,61 @@ body\r\n\ panic!("Expected a ParseError::Other with InvalidHeaderName"); } } + #[tokio::test] + async fn test_streaming_body() { + const BOUNDARY: &str = "boundary"; + const PART1_BODY: &[u8] = b"This is the first part's body, which is quite long to demonstrate streaming."; + const PART2_BODY: &[u8] = b"This is the second part, which is also streamed."; + // 使用 format! 来构建 body,避免手动拼接字符串可能引入的错误 + let body_content = format!( + "--{boundary}\r\nContent-Disposition: form-data; \ + name=\"field1\"\r\n\r\n{part1_body}\r\n--{boundary}\r\nContent-Disposition: form-data; \ + name=\"field2\"\r\n\r\n{part2_body}\r\n--{boundary}--\r\n", + boundary = BOUNDARY, + part1_body = std::str::from_utf8(PART1_BODY).unwrap(), + part2_body = std::str::from_utf8(PART2_BODY).unwrap(), + ); + let body_bytes = body_content.as_bytes(); + let chunk_size = 10; + // 使用一个很小的块大小来确保 body 是以流的形式被处理的 + let stream = create_stream_from_chunks(body_bytes, chunk_size); + let mut multipart_stream = MultipartStream::new(stream, BOUNDARY.as_bytes()); + + // --- 处理第一部分 --- + let part1 = multipart_stream.next().await.unwrap().unwrap(); + assert_eq!(part1.headers().get("content-disposition").unwrap(), "form-data; name=\"field1\""); + + // 手动从 body 流中一块一块地读取数据 + let mut body_stream1 = part1.body(); + let mut collected_body1 = Vec::new(); + let mut i = 0; + while let Some(chunk_result) = body_stream1.try_next().await.unwrap() { + i += 1; + // 断言我们确实收到了非空的数据块 + assert!(!chunk_result.is_empty()); + collected_body1.extend_from_slice(&chunk_result); + } + drop(body_stream1); + assert_eq!(i, PART1_BODY.len().div_ceil(chunk_size)); + // 验证完整接收到的 body 内容是否正确 + assert_eq!(collected_body1, PART1_BODY); + i = 0; + // --- 处理第二部分 --- + let part2 = multipart_stream.next().await.unwrap().unwrap(); + assert_eq!(part2.headers().get("content-disposition").unwrap(), "form-data; name=\"field2\""); + + let mut body_stream2 = part2.body(); + let mut collected_body2 = Vec::new(); + while let Some(chunk_result) = body_stream2.try_next().await.unwrap() { + i += 1; + assert!(!chunk_result.is_empty()); + collected_body2.extend_from_slice(&chunk_result); + } + assert_eq!(collected_body2, PART2_BODY); + drop(body_stream2); + assert_eq!(i, PART2_BODY.len().div_ceil(chunk_size)); + + // --- 确认流已结束 --- + assert!(multipart_stream.next().await.is_none()); + } }