Skip to content

Commit dcf45a9

Browse files
authored
Merge pull request #5461 from cloudflare/jasnell/replace-body-stream-take-2
2 parents 78fce87 + 91b3650 commit dcf45a9

File tree

4 files changed

+32
-15
lines changed

4 files changed

+32
-15
lines changed

src/workerd/api/http.c++

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -715,10 +715,21 @@ Body::ExtractedBody Body::extractBody(jsg::Lock& js, Initializer init) {
715715
}
716716
}
717717

718-
auto bodyStream = kj::heap<BodyBufferInputStream>(buffer.clone(js));
718+
auto buf = buffer.clone(js);
719719

720-
return {js.alloc<ReadableStream>(IoContext::current(), kj::mv(bodyStream)), kj::mv(buffer),
721-
kj::mv(contentType)};
720+
if (util::Autogate::isEnabled(util::AutogateKey::BODY_BUFFER_INPUT_STREAM_REPLACEMENT)) {
721+
auto memStream = newMemoryInputStream(buf.view, kj::heap(kj::mv(buf.ownBytes)));
722+
auto rs = newSystemStream(kj::mv(memStream), StreamEncoding::IDENTITY);
723+
724+
return {js.alloc<ReadableStream>(IoContext::current(), kj::mv(rs)), kj::mv(buffer),
725+
kj::mv(contentType)};
726+
} else {
727+
// TODO(cleanup): Remove once the Autogate is removed.
728+
auto bodyStream = kj::heap<BodyBufferInputStream>(kj::mv(buf));
729+
730+
return {js.alloc<ReadableStream>(IoContext::current(), kj::mv(bodyStream)), kj::mv(buffer),
731+
kj::mv(contentType)};
732+
}
722733
}
723734

724735
Body::Body(jsg::Lock& js, kj::Maybe<ExtractedBody> init, Headers& headers)
@@ -765,8 +776,15 @@ void Body::rewindBody(jsg::Lock& js) {
765776

766777
KJ_IF_SOME(i, impl) {
767778
auto bufferCopy = KJ_ASSERT_NONNULL(i.buffer).clone(js);
768-
auto bodyStream = kj::heap<BodyBufferInputStream>(kj::mv(bufferCopy));
769-
i.stream = js.alloc<ReadableStream>(IoContext::current(), kj::mv(bodyStream));
779+
if (util::Autogate::isEnabled(util::AutogateKey::BODY_BUFFER_INPUT_STREAM_REPLACEMENT)) {
780+
auto memStream = newMemoryInputStream(bufferCopy.view, kj::heap(kj::mv(bufferCopy.ownBytes)));
781+
auto rs = newSystemStream(kj::mv(memStream), StreamEncoding::IDENTITY);
782+
i.stream = js.alloc<ReadableStream>(IoContext::current(), kj::mv(rs));
783+
} else {
784+
// TODO(cleanup): Remove once the Autogate is removed.
785+
auto bodyStream = kj::heap<BodyBufferInputStream>(kj::mv(bufferCopy));
786+
i.stream = js.alloc<ReadableStream>(IoContext::current(), kj::mv(bodyStream));
787+
}
770788
}
771789
}
772790

src/workerd/util/autogate.c++

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ kj::StringPtr KJ_STRINGIFY(AutogateKey key) {
2525
return "streaming-tail-worker"_kj;
2626
case AutogateKey::TAIL_STREAM_REFACTOR:
2727
return "tail-stream-refactor"_kj;
28+
case AutogateKey::BODY_BUFFER_INPUT_STREAM_REPLACEMENT:
29+
return "body-buffer-input-stream-replacement"_kj;
2830
case AutogateKey::NumOfKeys:
2931
KJ_FAIL_ASSERT("NumOfKeys should not be used in getName");
3032
}

src/workerd/util/autogate.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ enum class AutogateKey {
2020
STREAMING_TAIL_WORKER,
2121
// Enable refactor used to consolidate the different tail worker stream implementations.
2222
TAIL_STREAM_REFACTOR,
23+
// Enable the BodyBufferInputStream replacement
24+
BODY_BUFFER_INPUT_STREAM_REPLACEMENT,
2325
NumOfKeys // Reserved for iteration.
2426
};
2527

src/workerd/util/stream-utils.c++

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ class MemoryInputStream final: public kj::AsyncInputStream {
3737
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
3838
auto ptr = kj::arrayPtr<kj::byte>(static_cast<kj::byte*>(buffer), maxBytes);
3939
size_t toRead = kj::min(data.size(), ptr.size());
40-
KJ_DEFER(advance(toRead));
4140
if (toRead == 0) return toRead;
4241
ptr.first(toRead).copyFrom(data.first(toRead));
42+
data = data.slice(toRead);
4343
return toRead;
4444
}
4545

@@ -51,8 +51,11 @@ class MemoryInputStream final: public kj::AsyncInputStream {
5151
// An optimized pumpTo... we know we have all the data right here. We can
5252
// just write it all at once up to `amount`.
5353
uint64_t toRead = kj::min(data.size(), amount);
54-
KJ_DEFER(advance(toRead));
54+
if (toRead == 0) {
55+
co_return toRead;
56+
}
5557
co_await output.write(data.first(toRead));
58+
data = data.slice(toRead);
5659
co_return toRead;
5760
}
5861

@@ -74,14 +77,6 @@ class MemoryInputStream final: public kj::AsyncInputStream {
7477
private:
7578
kj::ArrayPtr<const kj::byte> data;
7679
kj::Maybe<kj::Rc<OwnedBacking>> ownedBacking;
77-
78-
void advance(size_t amount) {
79-
data = data.slice(amount);
80-
// If we've consumed all the data, drop our reference to the backing storage eagerly.
81-
if (data.size() == 0) {
82-
ownedBacking = kj::none;
83-
}
84-
}
8580
};
8681

8782
class NeuterableInputStreamImpl final: public NeuterableInputStream {

0 commit comments

Comments
 (0)