diff --git a/src/main/java/io/vertx/httpproxy/impl/HttpClientResponseBody.java b/src/main/java/io/vertx/httpproxy/impl/HttpClientResponseBody.java new file mode 100644 index 0000000..0084960 --- /dev/null +++ b/src/main/java/io/vertx/httpproxy/impl/HttpClientResponseBody.java @@ -0,0 +1,162 @@ +package io.vertx.httpproxy.impl; + +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClientResponse; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.streams.Pipe; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; +import io.vertx.httpproxy.Body; + + +/** + * created by wang007 on 2025/9/15 + */ +public class HttpClientResponseBody implements Body, ReadStream { + + private final HttpClientResponse response; + private final String mediaType; + private final long length; + + private volatile Handler endHandler; + + private volatile HttpServerResponse dst; + + public HttpClientResponseBody(HttpClientResponse response, long length, String mediaType) { + this.response = response; + this.mediaType = mediaType; + this.length = length; + } + + @Override + public String mediaType() { + return mediaType; + } + + @Override + public long length() { + return length; + } + + @Override + public ReadStream stream() { + return this; + } + + + @Override + public ReadStream exceptionHandler(@Nullable Handler handler) { + response.exceptionHandler(handler); + return this; + } + + @Override + public ReadStream handler(@Nullable Handler handler) { + response.handler(handler); + return this; + } + + @Override + public ReadStream pause() { + response.pause(); + return this; + } + + @Override + public ReadStream resume() { + response.resume(); + return this; + } + + @Override + public ReadStream fetch(long amount) { + response.fetch(amount); + return this; + } + + @Override + public ReadStream endHandler(@Nullable Handler endHandler) { + if (endHandler == null) { + response.endHandler(null); + return this; + } + Handler current = this.endHandler; + this.endHandler = endHandler; + if (current != null) { + return this; + } + + response.endHandler(v -> { + try { + MultiMap trailers = response.trailers(); + if (trailers.isEmpty()) { + return; + } + HttpServerResponse dst = this.dst; + if (dst == null) { + return; + } + MultiMap dstTrailers = dst.trailers(); + dstTrailers.addAll(trailers); + } finally { + Handler h = this.endHandler; + if (h != null) { + h.handle(null); + } + } + }); + + return this; + } + + + @Override + public Future pipeTo(WriteStream dst) { + if (dst instanceof HttpServerResponse) { + this.dst = (HttpServerResponse) dst; + } + return ReadStream.super.pipeTo(dst); + } + + @Override + public Pipe pipe() { + Pipe pipe = ReadStream.super.pipe(); + return new Pipe<>() { + @Override + public Pipe endOnFailure(boolean end) { + pipe.endOnFailure(end); + return this; + } + + @Override + public Pipe endOnSuccess(boolean end) { + pipe.endOnSuccess(end); + return this; + } + + @Override + public Pipe endOnComplete(boolean end) { + pipe.endOnComplete(end); + return this; + } + + @Override + public Future to(WriteStream dst) { + if (dst instanceof HttpServerResponse) { + HttpClientResponseBody.this.dst = (HttpServerResponse) dst; + } + return pipe.to(dst); + } + + @Override + public void close() { + pipe.close(); + } + }; + + } +} diff --git a/src/main/java/io/vertx/httpproxy/impl/ProxiedResponse.java b/src/main/java/io/vertx/httpproxy/impl/ProxiedResponse.java index 2d255ee..50f3e2b 100644 --- a/src/main/java/io/vertx/httpproxy/impl/ProxiedResponse.java +++ b/src/main/java/io/vertx/httpproxy/impl/ProxiedResponse.java @@ -74,7 +74,7 @@ class ProxiedResponse implements ProxyResponse { this.proxiedResponse = proxiedResponse; this.statusCode = response.statusCode(); this.statusMessage = response.statusMessage(); - this.body = Body.body(response, contentLength, contentType); + this.body = new HttpClientResponseBody(response, contentLength, contentType); long maxAge = -1; boolean publicCacheControl = false; diff --git a/src/test/java/io/vertx/tests/ProxyRequestTest.java b/src/test/java/io/vertx/tests/ProxyRequestTest.java index d341f35..fb611c7 100644 --- a/src/test/java/io/vertx/tests/ProxyRequestTest.java +++ b/src/test/java/io/vertx/tests/ProxyRequestTest.java @@ -121,6 +121,32 @@ public void testChunkedFrontendRequest(TestContext ctx) { .onComplete(ctx.asyncAssertSuccess()); } + @Test + public void testResponseTrailer(TestContext ctx) { + runHttpTest(ctx, req -> { + String te = req.getHeader(HttpHeaders.TRANSFER_ENCODING); + if (te == null || !te.equalsIgnoreCase("chunked")) { + ctx.fail("got non chunked request"); + } + HttpServerResponse response = req.response(); + response.setChunked(true); + response.trailers().add("marked", "1"); + response.end("Hello World"); + }, ctx.asyncAssertSuccess()); + httpClient = vertx.createHttpClient(); + httpClient + .request(HttpMethod.POST, 8080, "localhost", "/somepath") + .compose(req -> req + .setChunked(true) + .send("chunk") + .andThen(ctx.asyncAssertSuccess(resp -> ctx.assertEquals(200, resp.statusCode()))) + .compose(response -> response.end() + .map(response) + .andThen(ctx.asyncAssertSuccess(resp -> ctx.assertEquals("1", resp.getTrailer("marked")))))) + .onComplete(ctx.asyncAssertSuccess()); + } + + @Test public void testNonChunkedFrontendRequest(TestContext ctx) { runHttpTest(ctx, req -> {