From 6a9b6f9e0002bd19c84604c15aa0ceae41469878 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 25 Sep 2025 18:44:58 +1000 Subject: [PATCH 1/4] chore(implementation)!: use Jetty-12 core without servlets Port the invoker to upgrade to Eclipse Jetty-12 version 12. Specifically using the new core APIs of Eclipse Jetty-12 that allow the overhead of a Servlet container to be avoided. BREAKING CHANGE: use Java 17 or above, as required by Eclipse Jetty-12. Signed-off-by: Lachlan Roberts --- invoker/core/pom.xml | 22 +-- .../invoker/BackgroundFunctionExecutor.java | 54 +++--- .../invoker/HttpFunctionExecutor.java | 24 ++- .../invoker/TypedFunctionExecutor.java | 25 ++- .../invoker/gcf/ExecutionIdUtil.java | 8 +- .../invoker/http/HttpRequestImpl.java | 150 +++++++++------ .../invoker/http/HttpResponseImpl.java | 178 +++++++++++++----- .../functions/invoker/http/TimeoutFilter.java | 71 ------- .../invoker/http/TimeoutHandler.java | 89 +++++++++ .../functions/invoker/runner/Invoker.java | 113 ++++++----- .../functions/invoker/IntegrationTest.java | 122 +++++++----- .../functions/invoker/http/HttpTest.java | 155 ++++++++------- .../invoker/testfunctions/BufferedWrites.java | 27 +++ 13 files changed, 629 insertions(+), 409 deletions(-) delete mode 100644 invoker/core/src/main/java/com/google/cloud/functions/invoker/http/TimeoutFilter.java create mode 100644 invoker/core/src/main/java/com/google/cloud/functions/invoker/http/TimeoutHandler.java create mode 100644 invoker/core/src/test/java/com/google/cloud/functions/invoker/testfunctions/BufferedWrites.java diff --git a/invoker/core/pom.xml b/invoker/core/pom.xml index dc17c779..7d13c924 100644 --- a/invoker/core/pom.xml +++ b/invoker/core/pom.xml @@ -20,9 +20,10 @@ UTF-8 5.3.2 - 11 - 11 + 17 + 17 4.0.1 + 12.1.1 @@ -46,11 +47,6 @@ functions-framework-api 1.1.4 - - javax.servlet - javax.servlet-api - 4.0.1 - io.cloudevents cloudevents-core @@ -97,13 +93,13 @@ org.eclipse.jetty - jetty-servlet - 9.4.57.v20241219 + jetty-server + ${jetty.version} - org.eclipse.jetty - jetty-server - 9.4.57.v20241219 + org.slf4j + slf4j-jdk14 + 2.0.9 com.beust @@ -151,7 +147,7 @@ org.eclipse.jetty jetty-client - 9.4.57.v20241219 + ${jetty.version} test diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java index 331c9586..cea9396f 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java @@ -30,25 +30,32 @@ import io.cloudevents.http.HttpMessageFactory; import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; import java.io.Reader; import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.TreeMap; import java.util.logging.Level; import java.util.logging.Logger; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Response; +import org.eclipse.jetty.util.Callback; /** Executes the user's background function. */ -public final class BackgroundFunctionExecutor extends HttpServlet { +public final class BackgroundFunctionExecutor extends Handler.Abstract { private static final Logger logger = Logger.getLogger("com.google.cloud.functions.invoker"); private final FunctionExecutor functionExecutor; @@ -177,8 +184,10 @@ static Optional backgroundFunctionTypeArgument( .findFirst(); } - private static Event parseLegacyEvent(HttpServletRequest req) throws IOException { - try (BufferedReader bodyReader = req.getReader()) { + private static Event parseLegacyEvent(Request req) throws IOException { + try (BufferedReader bodyReader = new BufferedReader( + new InputStreamReader(Content.Source.asInputStream(req), + Objects.requireNonNullElse(Request.getCharset(req), StandardCharsets.ISO_8859_1)))) { return parseLegacyEvent(bodyReader); } } @@ -225,7 +234,7 @@ private static Context contextFromCloudEvent(CloudEvent cloudEvent) { * for the various triggers. CloudEvents are ones that follow the standards defined by cloudevents.io. * - * @param the type to be used in the {@link Unmarshallers} call when + * @param the type to be used in the {code Unmarshallers} call when * unmarshalling this event, if it is a CloudEvent. */ private abstract static class FunctionExecutor { @@ -322,23 +331,26 @@ void serviceCloudEvent(CloudEvent cloudEvent) throws Exception { /** Executes the user's background function. This can handle all HTTP methods. */ @Override - public void service(HttpServletRequest req, HttpServletResponse res) throws IOException { - String contentType = req.getContentType(); + public boolean handle(Request req, Response res, Callback callback) throws Exception { + String contentType = req.getHeaders().get(HttpHeader.CONTENT_TYPE); try { executionIdUtil.storeExecutionId(req); if ((contentType != null && contentType.startsWith("application/cloudevents+json")) - || req.getHeader("ce-specversion") != null) { + || req.getHeaders().get("ce-specversion") != null) { serviceCloudEvent(req); } else { serviceLegacyEvent(req); } - res.setStatus(HttpServletResponse.SC_OK); + res.setStatus(HttpStatus.OK_200); + callback.succeeded(); } catch (Throwable t) { - res.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); logger.log(Level.SEVERE, "Failed to execute " + functionExecutor.functionName(), t); + res.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500); + callback.succeeded(); } finally { executionIdUtil.removeExecutionId(); } + return true; } private enum CloudEventKind { @@ -352,10 +364,11 @@ private enum CloudEventKind { * @param a fake type parameter, which corresponds to the type parameter of {@link * FunctionExecutor}. */ - private void serviceCloudEvent(HttpServletRequest req) throws Exception { + private void serviceCloudEvent(Request req) throws Exception { @SuppressWarnings("unchecked") FunctionExecutor executor = (FunctionExecutor) functionExecutor; - byte[] body = req.getInputStream().readAllBytes(); + + byte[] body = Content.Source.asByteArrayAsync(req, -1).get(); MessageReader reader = HttpMessageFactory.createReaderFromMultimap(headerMap(req), body); // It's important not to set the context ClassLoader earlier, because MessageUtils will use // ServiceLoader.load(EventFormat.class) to find a handler to deserialize a binary CloudEvent @@ -369,17 +382,16 @@ private void serviceCloudEvent(HttpServletRequest req) throws Exce // https://github.com/cloudevents/sdk-java/pull/259. } - private static Map> headerMap(HttpServletRequest req) { + private static Map> headerMap(Request req) { Map> headerMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); - for (String header : Collections.list(req.getHeaderNames())) { - for (String value : Collections.list(req.getHeaders(header))) { - headerMap.computeIfAbsent(header, unused -> new ArrayList<>()).add(value); - } + for (HttpField field : req.getHeaders()) { + headerMap.computeIfAbsent(field.getName(), unused -> new ArrayList<>()) + .addAll(field.getValueList()); } return headerMap; } - private void serviceLegacyEvent(HttpServletRequest req) throws Exception { + private void serviceLegacyEvent(Request req) throws Exception { Event event = parseLegacyEvent(req); runWithContextClassLoader(() -> functionExecutor.serviceLegacyEvent(event)); } diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/HttpFunctionExecutor.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/HttpFunctionExecutor.java index 01f07e74..d82f83a9 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/HttpFunctionExecutor.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/HttpFunctionExecutor.java @@ -20,12 +20,14 @@ import com.google.cloud.functions.invoker.http.HttpResponseImpl; import java.util.logging.Level; import java.util.logging.Logger; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Response; +import org.eclipse.jetty.util.Callback; /** Executes the user's method. */ -public class HttpFunctionExecutor extends HttpServlet { +public class HttpFunctionExecutor extends Handler.Abstract { private static final Logger logger = Logger.getLogger("com.google.cloud.functions.invoker"); private final HttpFunction function; @@ -65,21 +67,23 @@ public static HttpFunctionExecutor forClass(Class functionClass) { /** Executes the user's method, can handle all HTTP type methods. */ @Override - public void service(HttpServletRequest req, HttpServletResponse res) { - HttpRequestImpl reqImpl = new HttpRequestImpl(req); - HttpResponseImpl respImpl = new HttpResponseImpl(res); + public boolean handle(Request request, Response response, Callback callback) throws Exception { + + HttpRequestImpl reqImpl = new HttpRequestImpl(request); + HttpResponseImpl respImpl = new HttpResponseImpl(response); ClassLoader oldContextLoader = Thread.currentThread().getContextClassLoader(); try { - executionIdUtil.storeExecutionId(req); + executionIdUtil.storeExecutionId(request); Thread.currentThread().setContextClassLoader(function.getClass().getClassLoader()); function.service(reqImpl, respImpl); + respImpl.close(callback); } catch (Throwable t) { logger.log(Level.SEVERE, "Failed to execute " + function.getClass().getName(), t); - res.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + Response.writeError(request, response, callback, HttpStatus.INTERNAL_SERVER_ERROR_500, null, t); } finally { Thread.currentThread().setContextClassLoader(oldContextLoader); executionIdUtil.removeExecutionId(); - respImpl.flush(); } + return true; } } diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/TypedFunctionExecutor.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/TypedFunctionExecutor.java index a6edfc32..63418705 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/TypedFunctionExecutor.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/TypedFunctionExecutor.java @@ -15,11 +15,13 @@ import java.util.Optional; import java.util.logging.Level; import java.util.logging.Logger; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Response; +import org.eclipse.jetty.util.Callback; -public class TypedFunctionExecutor extends HttpServlet { +public class TypedFunctionExecutor extends Handler.Abstract { private static final String APPLY_METHOD = "apply"; private static final Logger logger = Logger.getLogger("com.google.cloud.functions.invoker"); @@ -94,7 +96,7 @@ static Optional handlerTypeArgument(Class> f /** Executes the user's method, can handle all HTTP type methods. */ @Override - public void service(HttpServletRequest req, HttpServletResponse res) { + public boolean handle(Request req, Response res, Callback callback) throws Exception { HttpRequestImpl reqImpl = new HttpRequestImpl(req); HttpResponseImpl resImpl = new HttpResponseImpl(res); ClassLoader oldContextClassLoader = Thread.currentThread().getContextClassLoader(); @@ -102,10 +104,13 @@ public void service(HttpServletRequest req, HttpServletResponse res) { try { Thread.currentThread().setContextClassLoader(function.getClass().getClassLoader()); handleRequest(reqImpl, resImpl); + resImpl.close(callback); + } catch (Throwable t) { + Response.writeError(req, res, callback, HttpStatus.INTERNAL_SERVER_ERROR_500, null, t); } finally { Thread.currentThread().setContextClassLoader(oldContextClassLoader); - resImpl.flush(); } + return true; } private void handleRequest(HttpRequest req, HttpResponse res) { @@ -114,7 +119,7 @@ private void handleRequest(HttpRequest req, HttpResponse res) { reqObj = format.deserialize(req, argType); } catch (Throwable t) { logger.log(Level.SEVERE, "Failed to parse request for " + function.getClass().getName(), t); - res.setStatusCode(HttpServletResponse.SC_BAD_REQUEST); + res.setStatusCode(HttpStatus.BAD_REQUEST_400); return; } @@ -123,7 +128,7 @@ private void handleRequest(HttpRequest req, HttpResponse res) { resObj = function.apply(reqObj); } catch (Throwable t) { logger.log(Level.SEVERE, "Failed to execute " + function.getClass().getName(), t); - res.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + res.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR_500); return; } @@ -132,7 +137,7 @@ private void handleRequest(HttpRequest req, HttpResponse res) { } catch (Throwable t) { logger.log( Level.SEVERE, "Failed to serialize response for " + function.getClass().getName(), t); - res.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + res.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR_500); return; } } @@ -147,7 +152,7 @@ private static class GsonWireFormat implements TypedFunction.WireFormat { @Override public void serialize(Object object, HttpResponse response) throws Exception { if (object == null) { - response.setStatusCode(HttpServletResponse.SC_NO_CONTENT); + response.setStatusCode(HttpStatus.NO_CONTENT_204); return; } try (BufferedWriter bodyWriter = response.getWriter()) { diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/gcf/ExecutionIdUtil.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/gcf/ExecutionIdUtil.java index 7987317d..becf2c5c 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/gcf/ExecutionIdUtil.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/gcf/ExecutionIdUtil.java @@ -5,7 +5,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.logging.Handler; import java.util.logging.Logger; -import javax.servlet.http.HttpServletRequest; +import org.eclipse.jetty.server.Request; /** * A helper class that either fetches a unique execution id from request HTTP headers or generates a @@ -23,7 +23,7 @@ public final class ExecutionIdUtil { * Add mapping to root logger from current thread id to execution id. This mapping will be used to * append the execution id to log lines. */ - public void storeExecutionId(HttpServletRequest request) { + public void storeExecutionId(Request request) { if (!executionIdLoggingEnabled()) { return; } @@ -47,8 +47,8 @@ public void removeExecutionId() { } } - private String getOrGenerateExecutionId(HttpServletRequest request) { - String executionId = request.getHeader(EXECUTION_ID_HTTP_HEADER); + private String getOrGenerateExecutionId(Request request) { + String executionId = request.getHeaders().get(EXECUTION_ID_HTTP_HEADER); if (executionId == null) { byte[] array = new byte[EXECUTION_ID_LENGTH]; random.nextBytes(array); diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpRequestImpl.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpRequestImpl.java index 2119645a..786661d9 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpRequestImpl.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpRequestImpl.java @@ -14,33 +14,33 @@ package com.google.cloud.functions.invoker.http; -import static java.util.stream.Collectors.toMap; - import com.google.cloud.functions.HttpRequest; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.io.UncheckedIOException; -import java.util.AbstractMap.SimpleEntry; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.TreeMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.Part; +import org.eclipse.jetty.http.*; +import org.eclipse.jetty.http.MultiPart.Part; +import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.util.Fields; public class HttpRequestImpl implements HttpRequest { - private final HttpServletRequest request; + private final Request request; + private InputStream inputStream; + private BufferedReader reader; - public HttpRequestImpl(HttpServletRequest request) { + public HttpRequestImpl(Request request) { this.request = request; } @@ -51,133 +51,163 @@ public String getMethod() { @Override public String getUri() { - String url = request.getRequestURL().toString(); - if (request.getQueryString() != null) { - url += "?" + request.getQueryString(); - } - return url; + return request.getHttpURI().asString(); } @Override public String getPath() { - return request.getRequestURI(); + return request.getHttpURI().getCanonicalPath(); } @Override public Optional getQuery() { - return Optional.ofNullable(request.getQueryString()); + return Optional.ofNullable(request.getHttpURI().getQuery()); } @Override public Map> getQueryParameters() { - return request.getParameterMap().entrySet().stream() - .collect(toMap(Map.Entry::getKey, e -> Arrays.asList(e.getValue()))); + Fields fields = Request.extractQueryParameters(request); + if (fields.isEmpty()) { + return Collections.emptyMap(); + } + + Map> map = new HashMap<>(); + fields.forEach( + field -> map.put(field.getName(), Collections.unmodifiableList(field.getValues()))); + return Collections.unmodifiableMap(map); } @Override public Map getParts() { - String contentType = request.getContentType(); - if (contentType == null || !request.getContentType().startsWith("multipart/form-data")) { + String contentType = request.getHeaders().get(HttpHeader.CONTENT_TYPE); + if (contentType == null || !contentType.startsWith(MimeTypes.Type.MULTIPART_FORM_DATA.asString())) { throw new IllegalStateException("Content-Type must be multipart/form-data: " + contentType); } - try { - return request.getParts().stream().collect(toMap(Part::getName, HttpPartImpl::new)); - } catch (IOException e) { - throw new UncheckedIOException(e); - } catch (ServletException e) { - throw new RuntimeException(e.getMessage(), e); + + // The multipart parsing is done by the EagerContentHandler, so we just call getParts. + MultiPartFormData.Parts parts = MultiPartFormData.getParts(request); + if (parts == null){ + throw new IllegalStateException(); } + + if (parts.size() == 0) { + return Collections.emptyMap(); + } + + Map map = new HashMap<>(); + parts.forEach(part -> map.put(part.getName(), new HttpPartImpl(part))); + return Collections.unmodifiableMap(map); } @Override public Optional getContentType() { - return Optional.ofNullable(request.getContentType()); + return Optional.ofNullable(request.getHeaders().get(HttpHeader.CONTENT_TYPE)); } @Override public long getContentLength() { - return request.getContentLength(); + return request.getLength(); } @Override public Optional getCharacterEncoding() { - return Optional.ofNullable(request.getCharacterEncoding()); + Charset charset = Request.getCharset(request); + return Optional.ofNullable(charset == null ? null : charset.name()); } @Override public InputStream getInputStream() throws IOException { - return request.getInputStream(); + if (reader != null) { + throw new IllegalStateException("getReader() already called"); + } + if (inputStream == null) { + inputStream = Content.Source.asInputStream(request); + } + return inputStream; } @Override public BufferedReader getReader() throws IOException { - return request.getReader(); + if (reader == null) { + if (inputStream != null) { + throw new IllegalStateException("getInputStream already called"); + } + inputStream = Content.Source.asInputStream(request); + reader = + new BufferedReader( + new InputStreamReader( + getInputStream(), + Objects.requireNonNullElse(Request.getCharset(request), StandardCharsets.UTF_8))); + } + return reader; } @Override public Map> getHeaders() { - return Collections.list(request.getHeaderNames()).stream() - .collect( - toMap( - name -> name, - name -> Collections.list(request.getHeaders(name)), - (a, b) -> b, - () -> new TreeMap<>(String.CASE_INSENSITIVE_ORDER))); + return toStringListMap(request.getHeaders()); + } + + static Map> toStringListMap(HttpFields headers) { + Map> map = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); + for (HttpField field : headers) { + map.computeIfAbsent(field.getName(), key -> new ArrayList<>()).add(field.getValue()); + } + return map; } private static class HttpPartImpl implements HttpPart { private final Part part; + private final String contentType; private HttpPartImpl(Part part) { this.part = part; + contentType = part.getHeaders().get(HttpHeader.CONTENT_TYPE); } @Override public Optional getFileName() { - return Optional.ofNullable(part.getSubmittedFileName()); + return Optional.ofNullable(part.getFileName()); } @Override public Optional getContentType() { - return Optional.ofNullable(part.getContentType()); + return Optional.ofNullable(contentType); } @Override public long getContentLength() { - return part.getSize(); + return part.getLength(); } @Override public Optional getCharacterEncoding() { - String contentType = getContentType().orElse(null); - if (contentType == null) { - return Optional.empty(); - } - Pattern charsetPattern = Pattern.compile("(?i).*;\\s*charset\\s*=([^;\\s]*)\\s*(;|$)"); - Matcher matcher = charsetPattern.matcher(contentType); - return matcher.matches() ? Optional.of(matcher.group(1)) : Optional.empty(); + return Optional.ofNullable(MimeTypes.getCharsetFromContentType(contentType)); } @Override public InputStream getInputStream() throws IOException { - return part.getInputStream(); + // TODO: update with createContentSource when https://github.com/jetty/jetty.project/pull/13610 is released. + Content.Source contentSource = part.newContentSource(null, 0, -1); + return Content.Source.asInputStream(contentSource); } @Override public BufferedReader getReader() throws IOException { - String encoding = getCharacterEncoding().orElse("utf-8"); - return new BufferedReader(new InputStreamReader(getInputStream(), encoding)); + return new BufferedReader( + new InputStreamReader( + getInputStream(), + Objects.requireNonNullElse( + MimeTypes.DEFAULTS.getCharset(contentType), StandardCharsets.UTF_8))); } @Override public Map> getHeaders() { - return part.getHeaderNames().stream() - .map(name -> new SimpleEntry<>(name, list(part.getHeaders(name)))) - .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); + return HttpRequestImpl.toStringListMap(part.getHeaders()); } - private static List list(Collection collection) { - return (collection instanceof List) ? (List) collection : new ArrayList<>(collection); + @Override + public String toString() { + return "%s{%s}".formatted(super.toString(), part); } } } diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpResponseImpl.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpResponseImpl.java index c02246f0..60216514 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpResponseImpl.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpResponseImpl.java @@ -14,24 +14,33 @@ package com.google.cloud.functions.invoker.http; -import static java.util.stream.Collectors.toMap; - import com.google.cloud.functions.HttpResponse; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collection; +import java.io.Writer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; -import java.util.TreeMap; -import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.io.WriteThroughWriter; +import org.eclipse.jetty.io.content.BufferedContentSink; +import org.eclipse.jetty.io.content.ContentSinkOutputStream; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Response; +import org.eclipse.jetty.util.Callback; public class HttpResponseImpl implements HttpResponse { - private final HttpServletResponse response; + private final Response response; + private ContentSinkOutputStream outputStream; + private BufferedWriter writer; + private Charset charset; - public HttpResponseImpl(HttpServletResponse response) { + public HttpResponseImpl(Response response) { this.response = response; } @@ -43,75 +52,152 @@ public void setStatusCode(int code) { @Override @SuppressWarnings("deprecation") public void setStatusCode(int code, String message) { - response.setStatus(code, message); + response.setStatus(code); } @Override public void setContentType(String contentType) { - response.setContentType(contentType); + response.getHeaders().put(HttpHeader.CONTENT_TYPE, contentType); + charset = response.getRequest().getContext().getMimeTypes().getCharset(contentType); } @Override public Optional getContentType() { - return Optional.ofNullable(response.getContentType()); + return Optional.ofNullable(response.getHeaders().get(HttpHeader.CONTENT_TYPE)); } @Override public void appendHeader(String key, String value) { - response.addHeader(key, value); + if (HttpHeader.CONTENT_TYPE.is(key)) { + setContentType(value); + } else { + response.getHeaders().add(key, value); + } } @Override public Map> getHeaders() { - return response.getHeaderNames().stream() - .collect( - toMap( - name -> name, - name -> new ArrayList<>(response.getHeaders(name)), - (a, b) -> b, - () -> new TreeMap<>(String.CASE_INSENSITIVE_ORDER))); - } - - private static List list(Collection collection) { - return (collection instanceof List) ? (List) collection : new ArrayList<>(collection); + return HttpRequestImpl.toStringListMap(response.getHeaders()); } @Override - public OutputStream getOutputStream() throws IOException { - return response.getOutputStream(); + public OutputStream getOutputStream() { + if (writer != null) { + throw new IllegalStateException("getWriter called"); + } else if (outputStream == null) { + Request request = response.getRequest(); + int outputBufferSize = request.getConnectionMetaData().getHttpConfiguration() + .getOutputBufferSize(); + BufferedContentSink bufferedContentSink = new BufferedContentSink(response, + request.getComponents().getByteBufferPool(), + false, outputBufferSize / 2, outputBufferSize); + outputStream = new ContentSinkOutputStream(bufferedContentSink); + } + return outputStream; } - private BufferedWriter writer; - @Override public synchronized BufferedWriter getWriter() throws IOException { if (writer == null) { - // Unfortunately this means that we get two intermediate objects between the object we return - // and the underlying Writer that response.getWriter() wraps. We could try accessing the - // PrintWriter.out field via reflection, but that sort of access to non-public fields of - // platform classes is now frowned on and may draw warnings or even fail in subsequent - // versions. We could instead wrap the OutputStream, but that would require us to deduce the - // appropriate Charset, using logic like this: - // https://github.com/eclipse/jetty.project/blob/923ec38adf/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java#L731 - // We may end up doing that if performance is an issue. - writer = new BufferedWriter(response.getWriter()); + if (outputStream != null) { + throw new IllegalStateException("getOutputStream called"); + } + + writer = new NonBufferedWriter(WriteThroughWriter.newWriter(getOutputStream(), + Objects.requireNonNullElse(charset, StandardCharsets.UTF_8))); } return writer; } - public void flush() { + /** + * Close the response, flushing all content. + * + * @param callback a {@link Callback} to be completed when the response is closed. + */ + public void close(Callback callback) { try { - // We can't use HttpServletResponse.flushBuffer() because we wrap the - // PrintWriter returned by HttpServletResponse in our own BufferedWriter - // to match our API. So we have to flush whichever of getWriter() or - // getOutputStream() works. - try { - getOutputStream().flush(); - } catch (IllegalStateException e) { - getWriter().flush(); + // The writer has been constructed to do no buffering, so it does not need to be flushed + if (outputStream != null) { + // Do an asynchronous close, so large buffered content may be written without blocking + outputStream.close(callback); + } else { + callback.succeeded(); } } catch (IOException e) { - // Too bad, can't flush. + // Too bad, can't close. + } + } + + /** + * A {@link BufferedWriter} that does not buffer. + * It is generally more efficient to buffer at the {@link Content.Sink} level, + * since frequently total content is smaller than a single buffer and + * the {@link Content.Sink} can turn a close into a last write that will avoid + * chunking the response if at all possible. However, {@link BufferedWriter} + * is in the API for {@link HttpResponse}, so we must return a writer of + * that type. + */ + private static class NonBufferedWriter extends BufferedWriter { + private final Writer writer; + + public NonBufferedWriter(Writer out) { + super(out, 1); + writer = out; + } + + @Override + public void write(int c) throws IOException { + writer.write(c); + } + + @Override + public void write(char[] cbuf) throws IOException { + writer.write(cbuf); + } + + @Override + public void write(char[] cbuf, int off, int len) throws IOException { + writer.write(cbuf, off, len); + } + + @Override + public void write(String str) throws IOException { + writer.write(str); + } + + @Override + public void write(String str, int off, int len) throws IOException { + writer.write(str, off, len); + } + + @Override + public Writer append(CharSequence csq) throws IOException { + return writer.append(csq); + } + + @Override + public Writer append(CharSequence csq, int start, int end) throws IOException { + return writer.append(csq, start, end); + } + + @Override + public Writer append(char c) throws IOException { + return writer.append(c); + } + + @Override + public void flush() throws IOException { + writer.flush(); + } + + @Override + public void close() throws IOException { + writer.close(); + } + + @Override + public void newLine() throws IOException { + writer.write(System.lineSeparator()); } } } diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/TimeoutFilter.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/TimeoutFilter.java deleted file mode 100644 index e0577f9b..00000000 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/TimeoutFilter.java +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright 2024 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.cloud.functions.invoker.http; - -import java.io.IOException; -import java.util.Timer; -import java.util.TimerTask; -import java.util.logging.Logger; -import javax.servlet.Filter; -import javax.servlet.FilterChain; -import javax.servlet.ServletException; -import javax.servlet.ServletRequest; -import javax.servlet.ServletResponse; -import javax.servlet.http.HttpServletResponse; - -public class TimeoutFilter implements Filter { - - private static final Logger logger = Logger.getLogger(TimeoutFilter.class.getName()); - private final int timeoutMs; - - public TimeoutFilter(int timeoutSeconds) { - this.timeoutMs = timeoutSeconds * 1000; // Convert seconds to milliseconds - } - - @Override - public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) - throws IOException, ServletException { - Timer timer = new Timer(true); - TimerTask timeoutTask = - new TimerTask() { - @Override - public void run() { - if (response instanceof HttpServletResponse) { - try { - ((HttpServletResponse) response) - .sendError(HttpServletResponse.SC_REQUEST_TIMEOUT, "Request timed out"); - } catch (IOException e) { - logger.warning("Error while sending HTTP response: " + e.toString()); - } - } else { - try { - response.getWriter().write("Request timed out"); - } catch (IOException e) { - logger.warning("Error while writing response: " + e.toString()); - } - } - } - }; - - timer.schedule(timeoutTask, timeoutMs); - - try { - chain.doFilter(request, response); - timeoutTask.cancel(); - } finally { - timer.purge(); - } - } -} diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/TimeoutHandler.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/TimeoutHandler.java new file mode 100644 index 00000000..480bc383 --- /dev/null +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/TimeoutHandler.java @@ -0,0 +1,89 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.functions.invoker.http; + +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicBoolean; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Response; +import org.eclipse.jetty.util.Callback; + +public class TimeoutHandler extends Handler.Wrapper { + private final int timeoutMs; + + public TimeoutHandler(int timeoutSeconds, Handler handler) { + setHandler(handler); + this.timeoutMs = timeoutSeconds * 1000; // Convert seconds to milliseconds + } + + @Override + public boolean handle(Request request, Response response, Callback callback) throws Exception { + // Wrap the callback to ensure it is only called once between the handler and the timeout task. + AtomicBoolean completed = new AtomicBoolean(false); + Callback wrappedCallback = new Callback() { + @Override + public void succeeded() { + if (completed.compareAndSet(false, true)) { + callback.succeeded(); + } + } + + @Override + public void failed(Throwable x) { + if (completed.compareAndSet(false, true)) { + callback.failed(x); + } + } + + @Override + public InvocationType getInvocationType() { + return callback.getInvocationType(); + } + }; + + // TODO: consider wrapping the request/response to throw if they are used after timeout. + // TODO: Use org.eclipse.jetty.io.CyclicTimeouts which is optimized for timeouts which are almost always cancelled. + Timer timer = new Timer(true); + TimerTask timeoutTask = + new TimerTask() { + @Override + public void run() { + // TODO: there is a race between the handler writing response and timeout firing. + // This timeout firing doesn't stop the thread handling the request / response it just writes an error to the response. + Response.writeError( + request, + response, + callback, + HttpStatus.REQUEST_TIMEOUT_408, + "Function execution timed out"); + } + }; + + timer.schedule(timeoutTask, timeoutMs); + + boolean handle; + try { + handle = super.handle(request, response, wrappedCallback); + timeoutTask.cancel(); + } finally { + timer.purge(); + } + + return handle; + } +} diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/runner/Invoker.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/runner/Invoker.java index da5e72ec..24a7aa6a 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/runner/Invoker.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/runner/Invoker.java @@ -25,7 +25,7 @@ import com.google.cloud.functions.invoker.HttpFunctionExecutor; import com.google.cloud.functions.invoker.TypedFunctionExecutor; import com.google.cloud.functions.invoker.gcf.JsonLogHandler; -import com.google.cloud.functions.invoker.http.TimeoutFilter; +import com.google.cloud.functions.invoker.http.TimeoutHandler; import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; @@ -39,32 +39,25 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.logging.Handler; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Stream; -import javax.servlet.DispatcherType; -import javax.servlet.MultipartConfigElement; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpStatus; -import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.http.MultiPartConfig; +import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Response; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.server.handler.HandlerWrapper; -import org.eclipse.jetty.servlet.FilterHolder; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.server.handler.EagerContentHandler; +import org.eclipse.jetty.server.handler.ErrorHandler; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.thread.QueuedThreadPool; /** @@ -91,7 +84,7 @@ public class Invoker { // if we arrange for them to be formatted using StackDriver's "structured // logging" JSON format. Remove the JDK's standard logger and replace it with // the JSON one. - for (Handler handler : rootLogger.getHandlers()) { + for (java.util.logging.Handler handler : rootLogger.getHandlers()) { rootLogger.removeHandler(handler); } rootLogger.addHandler(new JsonLogHandler(System.out, false)); @@ -242,7 +235,7 @@ ClassLoader getFunctionClassLoader() { * unit or integration test, use {@link #startTestServer()} instead. * * @see #stopServer() - * @throws Exception + * @throws Exception If there was a problem starting the server */ public void startServer() throws Exception { startServer(true); @@ -274,7 +267,7 @@ public void startServer() throws Exception { * } * * @see #stopServer() - * @throws Exception + * @throws Exception If there was a problem starting the server */ public void startTestServer() throws Exception { startServer(false); @@ -287,34 +280,40 @@ private void startServer(boolean join) throws Exception { QueuedThreadPool pool = new QueuedThreadPool(1024); server = new Server(pool); + server.setErrorHandler( + new ErrorHandler() { + @Override + protected void generateResponse(Request request, Response response, int code, String message, Throwable cause, Callback callback) { + // Suppress error body + callback.succeeded(); + } + }); ServerConnector connector = new ServerConnector(server); connector.setPort(port); - server.setConnectors(new Connector[] {connector}); - - ServletContextHandler servletContextHandler = new ServletContextHandler(); - servletContextHandler.setContextPath("/"); - server.setHandler(NotFoundHandler.forServlet(servletContextHandler)); + connector.setReuseAddress(true); + connector.setReusePort(true); + server.addConnector(connector); Class functionClass = loadFunctionClass(); - HttpServlet servlet; + Handler handler; if (functionSignatureType == null) { - servlet = servletForDeducedSignatureType(functionClass); + handler = handlerForDeducedSignatureType(functionClass); } else { switch (functionSignatureType) { case "http": if (TypedFunction.class.isAssignableFrom(functionClass)) { - servlet = TypedFunctionExecutor.forClass(functionClass); + handler = TypedFunctionExecutor.forClass(functionClass); } else { - servlet = HttpFunctionExecutor.forClass(functionClass); + handler = HttpFunctionExecutor.forClass(functionClass); } break; case "event": case "cloudevent": - servlet = BackgroundFunctionExecutor.forClass(functionClass); + handler = BackgroundFunctionExecutor.forClass(functionClass); break; case "typed": - servlet = TypedFunctionExecutor.forClass(functionClass); + handler = TypedFunctionExecutor.forClass(functionClass); break; default: String error = @@ -325,10 +324,18 @@ private void startServer(boolean join) throws Exception { throw new RuntimeException(error); } } - ServletHolder servletHolder = new ServletHolder(servlet); - servletHolder.getRegistration().setMultipartConfig(new MultipartConfigElement("")); - servletContextHandler.addServlet(servletHolder, "/*"); - servletContextHandler = addTimerFilterForRequestTimeout(servletContextHandler); + + // Possibly wrap with TimeoutHandler if CLOUD_RUN_TIMEOUT_SECONDS is set. + handler = addTimerHandlerForRequestTimeout(handler); + server.setHandler(handler); + + // Add a handler to asynchronously parse multipart before invoking the function. + MultiPartConfig config = new MultiPartConfig.Builder().maxMemoryPartSize(-1).build(); + EagerContentHandler.MultiPartContentLoaderFactory factory = + new EagerContentHandler.MultiPartContentLoaderFactory(config); + server.insertHandler(new EagerContentHandler(factory)); + + server.insertHandler(new NotFoundHandler()); server.start(); logServerInfo(); @@ -376,7 +383,7 @@ private Class loadFunctionClass() throws ClassNotFoundException { } } - private HttpServlet servletForDeducedSignatureType(Class functionClass) { + private Handler handlerForDeducedSignatureType(Class functionClass) { if (HttpFunction.class.isAssignableFrom(functionClass)) { return HttpFunctionExecutor.forClass(functionClass); } @@ -398,16 +405,13 @@ private HttpServlet servletForDeducedSignatureType(Class functionClass) { throw new RuntimeException(error); } - private ServletContextHandler addTimerFilterForRequestTimeout( - ServletContextHandler servletContextHandler) { + private Handler addTimerHandlerForRequestTimeout(Handler handler) { String timeoutSeconds = System.getenv("CLOUD_RUN_TIMEOUT_SECONDS"); if (timeoutSeconds == null) { - return servletContextHandler; + return handler; } int seconds = Integer.parseInt(timeoutSeconds); - FilterHolder holder = new FilterHolder(new TimeoutFilter(seconds)); - servletContextHandler.addFilter(holder, "/*", EnumSet.of(DispatcherType.REQUEST)); - return servletContextHandler; + return new TimeoutHandler(seconds, handler); } static URL[] classpathToUrls(String classpath) { @@ -468,32 +472,24 @@ private static boolean isGcf() { /** * Wrapper that intercepts requests for {@code /favicon.ico} and {@code /robots.txt} and causes - * them to produce a 404 status. Otherwise they would be sent to the function code, like any other - * URL, meaning that someone testing their function by using a browser as an HTTP client can see - * two requests, one for {@code /favicon.ico} and one for {@code /} (or whatever). + * them to produce a 404 status. Otherwise, they would be sent to the function code, like any + * other URL, meaning that someone testing their function by using a browser as an HTTP client can + * see two requests, one for {@code /favicon.ico} and one for {@code /} (or whatever). */ - private static class NotFoundHandler extends HandlerWrapper { - static NotFoundHandler forServlet(ServletContextHandler servletHandler) { - NotFoundHandler handler = new NotFoundHandler(); - handler.setHandler(servletHandler); - return handler; - } + private static class NotFoundHandler extends Handler.Wrapper { private static final Set NOT_FOUND_PATHS = new HashSet<>(Arrays.asList("/favicon.ico", "/robots.txt")); @Override - public void handle( - String target, - Request baseRequest, - HttpServletRequest request, - HttpServletResponse response) - throws IOException, ServletException { - if (NOT_FOUND_PATHS.contains(request.getRequestURI())) { - response.sendError(HttpStatus.NOT_FOUND_404, "Not Found"); - return; + public boolean handle(Request request, Response response, Callback callback) throws Exception { + if (NOT_FOUND_PATHS.contains(request.getHttpURI().getCanonicalPath())) { + response.setStatus(HttpStatus.NOT_FOUND_404); + callback.succeeded(); + return true; } - super.handle(target, baseRequest, request, response); + + return super.handle(request, response, callback); } } @@ -522,7 +518,6 @@ private static class OnlyApiClassLoader extends ClassLoader { protected Class findClass(String name) throws ClassNotFoundException { String prefix = "com.google.cloud.functions."; if ((name.startsWith(prefix) && Character.isUpperCase(name.charAt(prefix.length()))) - || name.startsWith("javax.servlet.") || isCloudEventsApiClass(name)) { return runtimeClassLoader.loadClass(name); } diff --git a/invoker/core/src/test/java/com/google/cloud/functions/invoker/IntegrationTest.java b/invoker/core/src/test/java/com/google/cloud/functions/invoker/IntegrationTest.java index 2f1d8bc8..d4ff5260 100644 --- a/invoker/core/src/test/java/com/google/cloud/functions/invoker/IntegrationTest.java +++ b/invoker/core/src/test/java/com/google/cloud/functions/invoker/IntegrationTest.java @@ -47,6 +47,7 @@ import java.net.URI; import java.net.URL; import java.net.URLEncoder; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -66,16 +67,17 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; +import org.eclipse.jetty.client.ByteBufferRequestContent; +import org.eclipse.jetty.client.ContentResponse; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.api.ContentProvider; -import org.eclipse.jetty.client.api.ContentResponse; -import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.util.BytesContentProvider; -import org.eclipse.jetty.client.util.MultiPartContentProvider; -import org.eclipse.jetty.client.util.StringContentProvider; +import org.eclipse.jetty.client.MultiPartRequestContent; +import org.eclipse.jetty.client.Request; +import org.eclipse.jetty.client.StringRequestContent; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.MultiPart; +import org.eclipse.jetty.http.MultiPart.ContentSourcePart; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -91,7 +93,7 @@ public class IntegrationTest { @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule public final TestName testName = new TestName(); - private static final String SERVER_READY_STRING = "Started ServerConnector"; + private static final String SERVER_READY_STRING = "Started oejs.ServerConnector"; private static final String EXECUTION_ID_HTTP_HEADER = "HTTP_FUNCTION_EXECUTION_ID"; private static final String EXECUTION_ID = "1234abcd"; @@ -167,10 +169,12 @@ abstract static class TestCase { abstract String url(); - abstract ContentProvider requestContent(); + abstract Request.Content requestContent(); abstract int expectedResponseCode(); + abstract Optional> expectedResponseHeaders(); + abstract Optional expectedResponseText(); abstract Optional expectedJson(); @@ -190,6 +194,7 @@ static Builder builder() { .setUrl("/") .setRequestText("") .setExpectedResponseCode(HttpStatus.OK_200) + .setExpectedResponseHeaders(ImmutableMap.of()) .setExpectedResponseText("") .setHttpContentType("text/plain") .setHttpHeaders(ImmutableMap.of()); @@ -200,14 +205,16 @@ abstract static class Builder { abstract Builder setUrl(String x); - abstract Builder setRequestContent(ContentProvider x); + abstract Builder setRequestContent(Request.Content x); Builder setRequestText(String text) { - return setRequestContent(new StringContentProvider(text)); + return setRequestContent(new StringRequestContent(text)); } abstract Builder setExpectedResponseCode(int x); + abstract Builder setExpectedResponseHeaders(Map x); + abstract Builder setExpectedResponseText(String x); abstract Builder setExpectedResponseText(Optional x); @@ -253,37 +260,42 @@ public void helloWorld() throws Exception { testHttpFunction( fullTarget("HelloWorld"), ImmutableList.of( - TestCase.builder().setExpectedResponseText("hello\n").build(), + TestCase.builder() + .setExpectedResponseHeaders(ImmutableMap.of( + "Content-Length", "*")) + .setExpectedResponseText("hello\n") + .build(), FAVICON_TEST_CASE, ROBOTS_TXT_TEST_CASE)); } @Test - public void timeoutHttpSuccess() throws Exception { - testFunction( - SignatureType.HTTP, - fullTarget("TimeoutHttp"), - ImmutableList.of(), + public void bufferedWrites() throws Exception { + // This test checks that writes are buffered and are written + // in an efficient way with known content-length if possible. + testHttpFunction( + fullTarget("BufferedWrites"), ImmutableList.of( TestCase.builder() - .setExpectedResponseText("finished\n") - .setExpectedResponseText(Optional.empty()) - .build()), - ImmutableMap.of("CLOUD_RUN_TIMEOUT_SECONDS", "3")); - } - - @Test - public void timeoutHttpTimesOut() throws Exception { - testFunction( - SignatureType.HTTP, - fullTarget("TimeoutHttp"), - ImmutableList.of(), - ImmutableList.of( + .setUrl("/target?writes=2") + .setExpectedResponseText("write 0\nwrite 1\n") + .setExpectedResponseHeaders(ImmutableMap.of( + "x-write-0", "true", + "x-write-1", "true", + "x-written", "true", + "Content-Length", "16" + )) + .build(), TestCase.builder() - .setExpectedResponseCode(408) - .setExpectedResponseText(Optional.empty()) - .build()), - ImmutableMap.of("CLOUD_RUN_TIMEOUT_SECONDS", "1")); + .setUrl("/target?writes=2&flush=true") + .setExpectedResponseText("write 0\nwrite 1\n") + .setExpectedResponseHeaders(ImmutableMap.of( + "x-write-0", "true", + "x-write-1", "true", + "x-written", "-", + "Transfer-Encoding", "chunked")) + .build() + )); } @Test @@ -291,7 +303,7 @@ public void exceptionHttp() throws Exception { String exceptionExpectedOutput = "\"severity\": \"ERROR\", \"logging.googleapis.com/sourceLocation\": {\"file\":" + " \"com/google/cloud/functions/invoker/HttpFunctionExecutor.java\", \"method\":" - + " \"service\"}, \"execution_id\": \"" + + " \"handle\"}, \"execution_id\": \"" + EXECUTION_ID + "\"," + " \"message\": \"Failed to execute" @@ -302,6 +314,7 @@ public void exceptionHttp() throws Exception { ImmutableList.of( TestCase.builder() .setExpectedResponseCode(500) + .setExpectedResponseText("") .setHttpHeaders(ImmutableMap.of(EXECUTION_ID_HTTP_HEADER, EXECUTION_ID)) .setExpectedOutput(exceptionExpectedOutput) .build())); @@ -312,7 +325,7 @@ public void exceptionBackground() throws Exception { String exceptionExpectedOutput = "\"severity\": \"ERROR\", \"logging.googleapis.com/sourceLocation\": {\"file\":" + " \"com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java\", \"method\":" - + " \"service\"}, \"execution_id\": \"" + + " \"handle\"}, \"execution_id\": \"" + EXECUTION_ID + "\", " + "\"message\": \"Failed to execute" @@ -583,7 +596,7 @@ public void nativeCloudEventException() throws Exception { String exceptionExpectedOutput = "\"severity\": \"ERROR\", \"logging.googleapis.com/sourceLocation\": {\"file\":" + " \"com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java\", \"method\":" - + " \"service\"}, \"execution_id\": \"" + + " \"handle\"}, \"execution_id\": \"" + EXECUTION_ID + "\", " + "\"message\": \"Failed to execute" @@ -632,11 +645,14 @@ public void packageless() throws Exception { @Test public void multipart() throws Exception { - MultiPartContentProvider multiPartProvider = new MultiPartContentProvider(); + MultiPartRequestContent multiPartRequestContent = new MultiPartRequestContent(); byte[] bytes = new byte[17]; - multiPartProvider.addFieldPart("bytes", new BytesContentProvider(bytes), new HttpFields()); - String string = "1234567890"; - multiPartProvider.addFieldPart("string", new StringContentProvider(string), new HttpFields()); + multiPartRequestContent.addPart(new ContentSourcePart("bytes", null, + HttpFields.EMPTY, new ByteBufferRequestContent(ByteBuffer.wrap(bytes)))); + multiPartRequestContent.addPart(new MultiPart.ContentSourcePart("string", null, + HttpFields.EMPTY, new StringRequestContent("1234567890"))); + multiPartRequestContent.close(); + String expectedResponse = "part bytes type application/octet-stream length 17\n" + "part string type text/plain;charset=UTF-8 length 10\n"; @@ -644,8 +660,8 @@ public void multipart() throws Exception { fullTarget("Multipart"), ImmutableList.of( TestCase.builder() - .setHttpContentType(Optional.empty()) - .setRequestContent(multiPartProvider) + .setHttpContentType(multiPartRequestContent.getContentType()) + .setRequestContent(multiPartRequestContent) .setExpectedResponseText(expectedResponse) .build())); } @@ -789,16 +805,28 @@ private void testFunction( testCase.snoopFile().ifPresent(File::delete); String uri = "http://localhost:" + serverPort + testCase.url(); Request request = httpClient.POST(uri); - testCase - .httpContentType() - .ifPresent(contentType -> request.header(HttpHeader.CONTENT_TYPE, contentType)); - testCase.httpHeaders().forEach((header, value) -> request.header(header, value)); - request.content(testCase.requestContent()); + + request.headers(m -> { + testCase.httpContentType().ifPresent(contentType -> m.put(HttpHeader.CONTENT_TYPE, contentType)); + testCase.httpHeaders().forEach(m::put); + }); + request.body(testCase.requestContent()); ContentResponse response = request.send(); expect .withMessage("Response to %s is %s %s", uri, response.getStatus(), response.getReason()) .that(response.getStatus()) .isEqualTo(testCase.expectedResponseCode()); + testCase.expectedResponseHeaders().ifPresent(map -> { + for (Map.Entry entry : map.entrySet()) { + if ("*".equals(entry.getValue())) { + expect.that(response.getHeaders().getFieldNamesCollection()).contains(entry.getKey()); + } else if ("-".equals(entry.getValue())) { + expect.that(response.getHeaders().getFieldNamesCollection()).doesNotContain(entry.getKey()); + } else { + expect.that(response.getHeaders().getValuesList(entry.getKey())).contains(entry.getValue()); + } + } + }); testCase .expectedResponseText() .ifPresent(text -> expect.that(response.getContentAsString()).isEqualTo(text)); diff --git a/invoker/core/src/test/java/com/google/cloud/functions/invoker/http/HttpTest.java b/invoker/core/src/test/java/com/google/cloud/functions/invoker/http/HttpTest.java index e0ca4675..24a57ce5 100644 --- a/invoker/core/src/test/java/com/google/cloud/functions/invoker/http/HttpTest.java +++ b/invoker/core/src/test/java/com/google/cloud/functions/invoker/http/HttpTest.java @@ -27,6 +27,7 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.net.ServerSocket; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -34,22 +35,19 @@ import java.util.TreeMap; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import javax.servlet.MultipartConfigElement; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.client.ByteBufferRequestContent; +import org.eclipse.jetty.client.ContentResponse; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.api.ContentResponse; -import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.util.BytesContentProvider; -import org.eclipse.jetty.client.util.MultiPartContentProvider; -import org.eclipse.jetty.client.util.StringContentProvider; -import org.eclipse.jetty.http.HttpFields; -import org.eclipse.jetty.http.HttpHeader; -import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.client.MultiPartRequestContent; +import org.eclipse.jetty.client.StringRequestContent; +import org.eclipse.jetty.http.*; +import org.eclipse.jetty.http.HttpStatus.Code; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Response; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.server.handler.EagerContentHandler; +import org.eclipse.jetty.util.Callback; import org.junit.BeforeClass; import org.junit.Test; @@ -81,21 +79,22 @@ public static void allocateServerPort() throws IOException { } /** - * Wrapper class that allows us to start a Jetty server with a single servlet for {@code /*} - * within a try-with-resources statement. The servlet will be configured to support multipart + * Wrapper class that allows us to start a Jetty server with a single handler for {@code /*} + * within a try-with-resources statement. The handler will be configured to support multipart * requests. */ private static class SimpleServer implements AutoCloseable { private final Server server; - SimpleServer(HttpServlet servlet) throws Exception { + SimpleServer(Handler handler) throws Exception { this.server = new Server(serverPort); - ServletContextHandler context = new ServletContextHandler(); - context.setContextPath("/"); - server.setHandler(context); - ServletHolder servletHolder = new ServletHolder(servlet); - servletHolder.getRegistration().setMultipartConfig(new MultipartConfigElement("tiddly")); - context.addServlet(servletHolder, "/*"); + server.setHandler(handler); + + MultiPartConfig config = new MultiPartConfig.Builder().maxMemoryPartSize(-1).build(); + EagerContentHandler.MultiPartContentLoaderFactory factory = + new EagerContentHandler.MultiPartContentLoaderFactory(config); + server.insertHandler(new EagerContentHandler(factory)); + server.start(); } @@ -112,16 +111,16 @@ private interface HttpRequestTest { /** * Tests methods on the {@link HttpRequest} object while the request is being serviced. We are not - * guaranteed that the underlying {@link HttpServletRequest} object will still be valid when the + * guaranteed that the underlying {@link Request} object will still be valid when the * request completes, and in fact in Jetty it isn't. So we perform the checks in the context of - * the servlet, and report any exception back to the test method. + * the handler, and report any exception back to the test method. */ @Test public void httpRequestMethods() throws Exception { AtomicReference testReference = new AtomicReference<>(); AtomicReference exceptionReference = new AtomicReference<>(); - HttpRequestServlet testServlet = new HttpRequestServlet(testReference, exceptionReference); - try (SimpleServer server = new SimpleServer(testServlet)) { + HttpRequestHandler testHandler = new HttpRequestHandler(testReference, exceptionReference); + try (SimpleServer server = new SimpleServer(testHandler)) { httpRequestMethods(testReference, exceptionReference); } } @@ -192,14 +191,16 @@ private void httpRequestMethods( }; for (HttpRequestTest test : tests) { testReference.set(test); - Request request = + org.eclipse.jetty.client.Request request = httpClient .POST(uri) - .header(HttpHeader.CONTENT_TYPE, "text/plain; charset=utf-8") - .header("foo", "bar") - .header("foo", "baz") - .header("CaSe-SeNsItIvE", "VaLuE") - .content(new StringContentProvider(TEST_BODY)); + .headers(m -> { + m.add(HttpHeader.CONTENT_TYPE, "text/plain; charset=utf-8"); + m.add("foo", "bar"); + m.add("foo", "baz"); + m.add("CaSe-SeNsItIvE", "VaLuE"); + }) + .body(new StringRequestContent(TEST_BODY)); ContentResponse response = request.send(); assertThat(response.getStatus()).isEqualTo(HttpStatus.OK_200); throwIfNotNull(exceptionReference.get()); @@ -222,8 +223,8 @@ public void emptyRequest() throws Exception { }; AtomicReference exceptionReference = new AtomicReference<>(); AtomicReference testReference = new AtomicReference<>(test); - HttpRequestServlet testServlet = new HttpRequestServlet(testReference, exceptionReference); - try (SimpleServer server = new SimpleServer(testServlet)) { + HttpRequestHandler testHandler = new HttpRequestHandler(testReference, exceptionReference); + try (SimpleServer server = new SimpleServer(testHandler)) { ContentResponse response = httpClient.POST(uri).send(); assertThat(response.getStatus()).isEqualTo(HttpStatus.OK_200); throwIfNotNull(exceptionReference.get()); @@ -239,20 +240,22 @@ private void validateReader(BufferedReader reader) { public void multiPartRequest() throws Exception { AtomicReference testReference = new AtomicReference<>(); AtomicReference exceptionReference = new AtomicReference<>(); - HttpRequestServlet testServlet = new HttpRequestServlet(testReference, exceptionReference); + HttpRequestHandler testHandler = new HttpRequestHandler(testReference, exceptionReference); HttpClient httpClient = new HttpClient(); httpClient.start(); String uri = "http://localhost:" + serverPort + "/"; - MultiPartContentProvider multiPart = new MultiPartContentProvider(); - HttpFields textHttpFields = new HttpFields(); - textHttpFields.add("foo", "bar"); - multiPart.addFieldPart("text", new StringContentProvider(TEST_BODY), textHttpFields); - HttpFields bytesHttpFields = new HttpFields(); - bytesHttpFields.add("foo", "baz"); - bytesHttpFields.add("foo", "buh"); + MultiPartRequestContent multiPart = new MultiPartRequestContent(); + HttpFields textHttpFields = HttpFields.build() + .add("foo", "bar"); + multiPart.addPart(new MultiPart.ContentSourcePart("text", null, textHttpFields, + new StringRequestContent(TEST_BODY))); + HttpFields.Mutable bytesHttpFields = HttpFields.build() + .add("foo", "baz") + .add("foo", "buh"); assertThat(bytesHttpFields.getValuesList("foo")).containsExactly("baz", "buh"); - multiPart.addFilePart( - "binary", "/tmp/binary.x", new BytesContentProvider(RANDOM_BYTES), bytesHttpFields); + multiPart.addPart(new MultiPart.ContentSourcePart("binary", "/tmp/binary.x", + bytesHttpFields, new ByteBufferRequestContent(ByteBuffer.wrap(RANDOM_BYTES)))); + multiPart.close(); HttpRequestTest test = request -> { // The Content-Type header will also have a boundary=something attribute. @@ -271,10 +274,9 @@ public void multiPartRequest() throws Exception { assertThat(bytesPart.getFileName()).hasValue("/tmp/binary.x"); assertThat(bytesPart.getContentLength()).isEqualTo(RANDOM_BYTES.length); assertThat(bytesPart.getContentType()).hasValue("application/octet-stream"); - // We only see ["buh"] here, not ["baz", "buh"], apparently due to a Jetty bug. - // Repeated headers on multi-part content are not a big problem anyway. List foos = bytesPart.getHeaders().get("foo"); - assertThat(foos).contains("buh"); + assertThat(foos).containsExactly("baz", "buh"); + byte[] bytes = new byte[RANDOM_BYTES.length]; try (InputStream inputStream = bytesPart.getInputStream()) { assertThat(inputStream.read(bytes)).isEqualTo(bytes.length); @@ -282,20 +284,22 @@ public void multiPartRequest() throws Exception { assertThat(bytes).isEqualTo(RANDOM_BYTES); } }; - try (SimpleServer server = new SimpleServer(testServlet)) { + try (SimpleServer server = new SimpleServer(testHandler)) { testReference.set(test); - Request request = httpClient.POST(uri).header("foo", "oof").content(multiPart); + org.eclipse.jetty.client.Request request = httpClient.POST(uri) + .headers(m -> m.put("foo", "oof")) + .body(multiPart); ContentResponse response = request.send(); assertThat(response.getStatus()).isEqualTo(HttpStatus.OK_200); throwIfNotNull(exceptionReference.get()); } } - private static class HttpRequestServlet extends HttpServlet { + private static class HttpRequestHandler extends Handler.Abstract { private final AtomicReference testReference; private final AtomicReference exceptionReference; - private HttpRequestServlet( + private HttpRequestHandler( AtomicReference testReference, AtomicReference exceptionReference) { this.testReference = testReference; @@ -303,12 +307,21 @@ private HttpRequestServlet( } @Override - protected void doPost(HttpServletRequest req, HttpServletResponse resp) { + public boolean handle(Request request, Response response, Callback callback) { try { - testReference.get().test(new HttpRequestImpl(req)); + if (!HttpMethod.POST.is(request.getMethod())) { + response.setStatus(HttpStatus.METHOD_NOT_ALLOWED_405); + callback.succeeded(); + return true; + } + + testReference.get().test(new HttpRequestImpl(request)); } catch (Throwable t) { + t.printStackTrace(); exceptionReference.set(t); } + callback.succeeded(); + return true; } } @@ -326,8 +339,8 @@ private interface HttpResponseTest { public void httpResponseSetAndGet() throws Exception { AtomicReference testReference = new AtomicReference<>(); AtomicReference exceptionReference = new AtomicReference<>(); - HttpResponseServlet testServlet = new HttpResponseServlet(testReference, exceptionReference); - try (SimpleServer server = new SimpleServer(testServlet)) { + HttpResponseHandler testHandler = new HttpResponseHandler(testReference, exceptionReference); + try (SimpleServer server = new SimpleServer(testHandler)) { httpResponseSetAndGet(testReference, exceptionReference); } } @@ -349,8 +362,7 @@ private void httpResponseSetAndGet( .containsAtLeast("Content-Type", Arrays.asList("application/octet-stream")); }, response -> { - Map> initialHeaders = response.getHeaders(); - // The servlet spec says this should be empty, but actually we get a Date header here. + // The fields are initialized with a Date header as per the HTTP RFCs. // So we just check that we can add our own headers. response.appendHeader("foo", "bar"); response.appendHeader("wibbly", "wobbly"); @@ -365,18 +377,18 @@ private void httpResponseSetAndGet( HttpClient httpClient = new HttpClient(); httpClient.start(); String uri = "http://localhost:" + serverPort; - Request request = httpClient.POST(uri); + org.eclipse.jetty.client.Request request = httpClient.POST(uri); ContentResponse response = request.send(); assertThat(response.getStatus()).isEqualTo(HttpStatus.OK_200); throwIfNotNull(exceptionReference.get()); } } - private static class HttpResponseServlet extends HttpServlet { + private static class HttpResponseHandler extends Handler.Abstract { private final AtomicReference testReference; private final AtomicReference exceptionReference; - private HttpResponseServlet( + private HttpResponseHandler( AtomicReference testReference, AtomicReference exceptionReference) { this.testReference = testReference; @@ -384,12 +396,18 @@ private HttpResponseServlet( } @Override - protected void doPost(HttpServletRequest req, HttpServletResponse resp) { + public boolean handle(Request request, Response response, Callback callback) { + if (!HttpMethod.POST.is(request.getMethod())) { + return false; + } try { - testReference.get().test(new HttpResponseImpl(resp)); + testReference.get().test(new HttpResponseImpl(response)); + callback.succeeded(); } catch (Throwable t) { exceptionReference.set(t); + Response.writeError(request, response, callback, t); } + return true; } } @@ -416,15 +434,15 @@ private static ResponseTest responseTest( /** * Tests that operations on the {@link HttpResponse} have the appropriate effect on the HTTP * response that ends up being sent. Here, for each check, we have two operations: the operation - * on the {@link HttpResponse}, which happens inside the servlet, and the operation to check the + * on the {@link HttpResponse}, which happens inside the handler, and the operation to check the * HTTP result, which happens in the client thread. */ @Test public void httpResponseEffects() throws Exception { AtomicReference testReference = new AtomicReference<>(); AtomicReference exceptionReference = new AtomicReference<>(); - HttpResponseServlet testServlet = new HttpResponseServlet(testReference, exceptionReference); - try (SimpleServer server = new SimpleServer(testServlet)) { + HttpResponseHandler testHandler = new HttpResponseHandler(testReference, exceptionReference); + try (SimpleServer server = new SimpleServer(testHandler)) { httpResponseEffects(testReference, exceptionReference); } } @@ -444,10 +462,11 @@ private void httpResponseEffects( response -> response.setStatusCode(HttpStatus.IM_A_TEAPOT_418), response -> assertThat(response.getStatus()).isEqualTo(HttpStatus.IM_A_TEAPOT_418)), responseTest( + // reason string cannot be set by application response -> response.setStatusCode(HttpStatus.IM_A_TEAPOT_418, "Je suis une théière"), response -> { assertThat(response.getStatus()).isEqualTo(HttpStatus.IM_A_TEAPOT_418); - assertThat(response.getReason()).isEqualTo("Je suis une théière"); + assertThat(response.getReason()).isEqualTo(Code.IM_A_TEAPOT.getMessage()); }), responseTest( response -> response.setContentType("application/noddy"), @@ -490,7 +509,7 @@ private void httpResponseEffects( HttpClient httpClient = new HttpClient(); httpClient.start(); String uri = "http://localhost:" + serverPort; - Request request = httpClient.POST(uri); + org.eclipse.jetty.client.Request request = httpClient.POST(uri); ContentResponse response = request.send(); throwIfNotNull(exceptionReference.get()); test.responseCheck.test(response); diff --git a/invoker/core/src/test/java/com/google/cloud/functions/invoker/testfunctions/BufferedWrites.java b/invoker/core/src/test/java/com/google/cloud/functions/invoker/testfunctions/BufferedWrites.java new file mode 100644 index 00000000..a7989a74 --- /dev/null +++ b/invoker/core/src/test/java/com/google/cloud/functions/invoker/testfunctions/BufferedWrites.java @@ -0,0 +1,27 @@ +package com.google.cloud.functions.invoker.testfunctions; + +import com.google.cloud.functions.HttpFunction; +import com.google.cloud.functions.HttpRequest; +import com.google.cloud.functions.HttpResponse; +import java.io.BufferedWriter; +import java.util.List; +import java.util.Map; + +public class BufferedWrites implements HttpFunction { + @Override + public void service(HttpRequest request, HttpResponse response) throws Exception { + Map> queryParameters = request.getQueryParameters(); + int writes = Integer.parseInt(request.getFirstQueryParameter("writes").orElse("0")); + boolean flush = Boolean.parseBoolean(request.getFirstQueryParameter("flush").orElse("false")); + + BufferedWriter writer = response.getWriter(); + for (int i = 0; i < writes; i++) { + response.appendHeader("x-write-" + i, "true"); + writer.write("write " + i + "\n"); + } + if (flush) { + writer.flush(); + } + response.appendHeader("x-written", "true"); + } +} From 42ec133f845457aeba58d57912cae048827bae73 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Mon, 29 Sep 2025 14:18:26 +1000 Subject: [PATCH 2/4] chore(implementation)!: use Jetty-12 core without servlets Port the invoker to upgrade to Eclipse Jetty-12 version 12. Specifically using the new core APIs of Eclipse Jetty-12 that allow the overhead of a Servlet container to be avoided. BREAKING CHANGE: use Java 17 or above, as required by Eclipse Jetty-12. Signed-off-by: Lachlan Roberts --- .../google/cloud/functions/invoker/HttpFunctionExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/HttpFunctionExecutor.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/HttpFunctionExecutor.java index d82f83a9..b414f110 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/HttpFunctionExecutor.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/HttpFunctionExecutor.java @@ -79,7 +79,7 @@ public boolean handle(Request request, Response response, Callback callback) thr respImpl.close(callback); } catch (Throwable t) { logger.log(Level.SEVERE, "Failed to execute " + function.getClass().getName(), t); - Response.writeError(request, response, callback, HttpStatus.INTERNAL_SERVER_ERROR_500, null, t); + Response.writeError(request, response, callback, HttpStatus.INTERNAL_SERVER_ERROR_500, null); } finally { Thread.currentThread().setContextClassLoader(oldContextLoader); executionIdUtil.removeExecutionId(); From 66ec2653f6f9315191fca56e6a327ecc7d448eda Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Wed, 29 Oct 2025 21:32:01 +1100 Subject: [PATCH 3/4] chore(implementation)!: use Jetty-12 core without servlets Port the invoker to upgrade to Eclipse Jetty-12 version 12. Specifically using the new core APIs of Eclipse Jetty-12 that allow the overhead of a Servlet container to be avoided. BREAKING CHANGE: use Java 17 or above, as required by Eclipse Jetty-12. Signed-off-by: Lachlan Roberts --- invoker/core/pom.xml | 2 +- .../invoker/BackgroundFunctionExecutor.java | 3 +- .../invoker/http/HttpRequestImpl.java | 21 +++----- .../invoker/http/HttpResponseImpl.java | 5 +- .../functions/invoker/http/HttpUtil.java | 32 +++++++++++ .../invoker/http/TimeoutHandler.java | 2 +- .../functions/invoker/IntegrationTest.java | 53 ++++++++++++++++--- .../functions/invoker/http/HttpTest.java | 19 +++---- 8 files changed, 96 insertions(+), 41 deletions(-) create mode 100644 invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpUtil.java diff --git a/invoker/core/pom.xml b/invoker/core/pom.xml index 7d13c924..738478f8 100644 --- a/invoker/core/pom.xml +++ b/invoker/core/pom.xml @@ -23,7 +23,7 @@ 17 17 4.0.1 - 12.1.1 + 12.1.3 diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java index cea9396f..d685aefa 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java @@ -345,8 +345,7 @@ public boolean handle(Request req, Response res, Callback callback) throws Excep callback.succeeded(); } catch (Throwable t) { logger.log(Level.SEVERE, "Failed to execute " + functionExecutor.functionName(), t); - res.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500); - callback.succeeded(); + Response.writeError(req, res, callback, HttpStatus.INTERNAL_SERVER_ERROR_500, null); } finally { executionIdUtil.removeExecutionId(); } diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpRequestImpl.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpRequestImpl.java index 786661d9..93252fe6 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpRequestImpl.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpRequestImpl.java @@ -21,16 +21,16 @@ import java.io.InputStreamReader; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.TreeMap; -import org.eclipse.jetty.http.*; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.MimeTypes; import org.eclipse.jetty.http.MultiPart.Part; +import org.eclipse.jetty.http.MultiPartFormData; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.util.Fields; @@ -144,15 +144,7 @@ public BufferedReader getReader() throws IOException { @Override public Map> getHeaders() { - return toStringListMap(request.getHeaders()); - } - - static Map> toStringListMap(HttpFields headers) { - Map> map = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); - for (HttpField field : headers) { - map.computeIfAbsent(field.getName(), key -> new ArrayList<>()).add(field.getValue()); - } - return map; + return HttpUtil.toStringListMap(request.getHeaders()); } private static class HttpPartImpl implements HttpPart { @@ -186,8 +178,7 @@ public Optional getCharacterEncoding() { @Override public InputStream getInputStream() throws IOException { - // TODO: update with createContentSource when https://github.com/jetty/jetty.project/pull/13610 is released. - Content.Source contentSource = part.newContentSource(null, 0, -1); + Content.Source contentSource = part.createContentSource(); return Content.Source.asInputStream(contentSource); } @@ -202,7 +193,7 @@ public BufferedReader getReader() throws IOException { @Override public Map> getHeaders() { - return HttpRequestImpl.toStringListMap(part.getHeaders()); + return HttpUtil.toStringListMap(part.getHeaders()); } @Override diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpResponseImpl.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpResponseImpl.java index 60216514..40d9c7e7 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpResponseImpl.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpResponseImpl.java @@ -77,14 +77,15 @@ public void appendHeader(String key, String value) { @Override public Map> getHeaders() { - return HttpRequestImpl.toStringListMap(response.getHeaders()); + return HttpUtil.toStringListMap(response.getHeaders()); } @Override public OutputStream getOutputStream() { if (writer != null) { throw new IllegalStateException("getWriter called"); - } else if (outputStream == null) { + } + if (outputStream == null) { Request request = response.getRequest(); int outputBufferSize = request.getConnectionMetaData().getHttpConfiguration() .getOutputBufferSize(); diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpUtil.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpUtil.java new file mode 100644 index 00000000..042af255 --- /dev/null +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpUtil.java @@ -0,0 +1,32 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.cloud.functions.invoker.http; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.HttpFields; + +class HttpUtil { + public static Map> toStringListMap(HttpFields headers) { + Map> map = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); + for (HttpField field : headers) { + map.computeIfAbsent(field.getName(), key -> new ArrayList<>()).add(field.getValue()); + } + return map; + } +} diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/TimeoutHandler.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/TimeoutHandler.java index 480bc383..af14cb5c 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/TimeoutHandler.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/TimeoutHandler.java @@ -1,4 +1,4 @@ -// Copyright 2024 Google LLC +// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/invoker/core/src/test/java/com/google/cloud/functions/invoker/IntegrationTest.java b/invoker/core/src/test/java/com/google/cloud/functions/invoker/IntegrationTest.java index d4ff5260..c5ac7700 100644 --- a/invoker/core/src/test/java/com/google/cloud/functions/invoker/IntegrationTest.java +++ b/invoker/core/src/test/java/com/google/cloud/functions/invoker/IntegrationTest.java @@ -173,6 +173,13 @@ abstract static class TestCase { abstract int expectedResponseCode(); + /** + * Expected response headers map, header name -> value. + * Value "*" asserts the header is present with any value. + * Value "-" asserts the header is not present. + * + * @return the expected response headers for this test case. + */ abstract Optional> expectedResponseHeaders(); abstract Optional expectedResponseText(); @@ -269,10 +276,39 @@ public void helloWorld() throws Exception { ROBOTS_TXT_TEST_CASE)); } + @Test + public void timeoutHttpSuccess() throws Exception { + testFunction( + SignatureType.HTTP, + fullTarget("TimeoutHttp"), + ImmutableList.of(), + ImmutableList.of( + TestCase.builder() + .setExpectedResponseText("finished\n") + .setExpectedResponseText(Optional.empty()) + .build()), + ImmutableMap.of("CLOUD_RUN_TIMEOUT_SECONDS", "3")); + } + + @Test + public void timeoutHttpTimesOut() throws Exception { + testFunction( + SignatureType.HTTP, + fullTarget("TimeoutHttp"), + ImmutableList.of(), + ImmutableList.of( + TestCase.builder() + .setExpectedResponseCode(408) + .setExpectedResponseText(Optional.empty()) + .build()), + ImmutableMap.of("CLOUD_RUN_TIMEOUT_SECONDS", "1")); + } + @Test public void bufferedWrites() throws Exception { - // This test checks that writes are buffered and are written - // in an efficient way with known content-length if possible. + // This test checks that writes are buffered, and are written + // in an efficient way with known content-length if possible + // instead of doing a chunked response. testHttpFunction( fullTarget("BufferedWrites"), ImmutableList.of( @@ -798,17 +834,17 @@ private void testFunction( throws Exception { ServerProcess serverProcess = startServer(signatureType, target, extraArgs, environmentVariables); + HttpClient httpClient = new HttpClient(); try { - HttpClient httpClient = new HttpClient(); httpClient.start(); for (TestCase testCase : testCases) { testCase.snoopFile().ifPresent(File::delete); String uri = "http://localhost:" + serverPort + testCase.url(); Request request = httpClient.POST(uri); - request.headers(m -> { - testCase.httpContentType().ifPresent(contentType -> m.put(HttpHeader.CONTENT_TYPE, contentType)); - testCase.httpHeaders().forEach(m::put); + request.headers(headers -> { + testCase.httpContentType().ifPresent(contentType -> headers.put(HttpHeader.CONTENT_TYPE, contentType)); + testCase.httpHeaders().forEach(headers::put); }); request.body(testCase.requestContent()); ContentResponse response = request.send(); @@ -816,8 +852,8 @@ private void testFunction( .withMessage("Response to %s is %s %s", uri, response.getStatus(), response.getReason()) .that(response.getStatus()) .isEqualTo(testCase.expectedResponseCode()); - testCase.expectedResponseHeaders().ifPresent(map -> { - for (Map.Entry entry : map.entrySet()) { + testCase.expectedResponseHeaders().ifPresent(expectedResponseHeaders -> { + for (Map.Entry entry : expectedResponseHeaders.entrySet()) { if ("*".equals(entry.getValue())) { expect.that(response.getHeaders().getFieldNamesCollection()).contains(entry.getKey()); } else if ("-".equals(entry.getValue())) { @@ -839,6 +875,7 @@ private void testFunction( } } finally { serverProcess.close(); + httpClient.stop(); } for (TestCase testCase : testCases) { testCase diff --git a/invoker/core/src/test/java/com/google/cloud/functions/invoker/http/HttpTest.java b/invoker/core/src/test/java/com/google/cloud/functions/invoker/http/HttpTest.java index 24a57ce5..d4658a1d 100644 --- a/invoker/core/src/test/java/com/google/cloud/functions/invoker/http/HttpTest.java +++ b/invoker/core/src/test/java/com/google/cloud/functions/invoker/http/HttpTest.java @@ -40,8 +40,12 @@ import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.MultiPartRequestContent; import org.eclipse.jetty.client.StringRequestContent; -import org.eclipse.jetty.http.*; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus.Code; +import org.eclipse.jetty.http.MultiPart; +import org.eclipse.jetty.http.MultiPartConfig; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; @@ -309,16 +313,10 @@ private HttpRequestHandler( @Override public boolean handle(Request request, Response response, Callback callback) { try { - if (!HttpMethod.POST.is(request.getMethod())) { - response.setStatus(HttpStatus.METHOD_NOT_ALLOWED_405); - callback.succeeded(); - return true; - } - testReference.get().test(new HttpRequestImpl(request)); } catch (Throwable t) { - t.printStackTrace(); exceptionReference.set(t); + Response.writeError(request, response, callback, t); } callback.succeeded(); return true; @@ -397,9 +395,6 @@ private HttpResponseHandler( @Override public boolean handle(Request request, Response response, Callback callback) { - if (!HttpMethod.POST.is(request.getMethod())) { - return false; - } try { testReference.get().test(new HttpResponseImpl(response)); callback.succeeded(); @@ -462,10 +457,10 @@ private void httpResponseEffects( response -> response.setStatusCode(HttpStatus.IM_A_TEAPOT_418), response -> assertThat(response.getStatus()).isEqualTo(HttpStatus.IM_A_TEAPOT_418)), responseTest( - // reason string cannot be set by application response -> response.setStatusCode(HttpStatus.IM_A_TEAPOT_418, "Je suis une théière"), response -> { assertThat(response.getStatus()).isEqualTo(HttpStatus.IM_A_TEAPOT_418); + // Reason string cannot be set by the application. assertThat(response.getReason()).isEqualTo(Code.IM_A_TEAPOT.getMessage()); }), responseTest( From 712daf1a6084ec03f1a8f975e70e5324eab73fd8 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Fri, 31 Oct 2025 16:22:24 +1100 Subject: [PATCH 4/4] chore(implementation)!: use Jetty-12 core without servlets Port the invoker to upgrade to Eclipse Jetty-12 version 12. Specifically using the new core APIs of Eclipse Jetty-12 that allow the overhead of a Servlet container to be avoided. BREAKING CHANGE: use Java 17 or above, as required by Eclipse Jetty-12. Signed-off-by: Lachlan Roberts --- .../invoker/BackgroundFunctionExecutor.java | 15 ++- .../invoker/http/HttpRequestImpl.java | 9 +- .../invoker/http/HttpResponseImpl.java | 32 ++++--- .../invoker/http/TimeoutHandler.java | 91 ++++++++----------- .../functions/invoker/runner/Invoker.java | 8 +- 5 files changed, 81 insertions(+), 74 deletions(-) diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java index d685aefa..097b9a67 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java @@ -185,9 +185,12 @@ static Optional backgroundFunctionTypeArgument( } private static Event parseLegacyEvent(Request req) throws IOException { - try (BufferedReader bodyReader = new BufferedReader( - new InputStreamReader(Content.Source.asInputStream(req), - Objects.requireNonNullElse(Request.getCharset(req), StandardCharsets.ISO_8859_1)))) { + try (BufferedReader bodyReader = + new BufferedReader( + new InputStreamReader( + Content.Source.asInputStream(req), + Objects.requireNonNullElse( + Request.getCharset(req), StandardCharsets.ISO_8859_1)))) { return parseLegacyEvent(bodyReader); } } @@ -367,6 +370,9 @@ private void serviceCloudEvent(Request req) throws Exception { @SuppressWarnings("unchecked") FunctionExecutor executor = (FunctionExecutor) functionExecutor; + // Read the entire request body into a byte array. + // TODO: this method is deprecated for removal, use the method introduced by + // https://github.com/jetty/jetty.project/pull/13939 when it is released. byte[] body = Content.Source.asByteArrayAsync(req, -1).get(); MessageReader reader = HttpMessageFactory.createReaderFromMultimap(headerMap(req), body); // It's important not to set the context ClassLoader earlier, because MessageUtils will use @@ -384,7 +390,8 @@ private void serviceCloudEvent(Request req) throws Exception { private static Map> headerMap(Request req) { Map> headerMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); for (HttpField field : req.getHeaders()) { - headerMap.computeIfAbsent(field.getName(), unused -> new ArrayList<>()) + headerMap + .computeIfAbsent(field.getName(), unused -> new ArrayList<>()) .addAll(field.getValueList()); } return headerMap; diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpRequestImpl.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpRequestImpl.java index 93252fe6..31ee4ac6 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpRequestImpl.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpRequestImpl.java @@ -80,13 +80,14 @@ public Map> getQueryParameters() { @Override public Map getParts() { String contentType = request.getHeaders().get(HttpHeader.CONTENT_TYPE); - if (contentType == null || !contentType.startsWith(MimeTypes.Type.MULTIPART_FORM_DATA.asString())) { + if (contentType == null + || !contentType.startsWith(MimeTypes.Type.MULTIPART_FORM_DATA.asString())) { throw new IllegalStateException("Content-Type must be multipart/form-data: " + contentType); } // The multipart parsing is done by the EagerContentHandler, so we just call getParts. MultiPartFormData.Parts parts = MultiPartFormData.getParts(request); - if (parts == null){ + if (parts == null) { throw new IllegalStateException(); } @@ -178,8 +179,8 @@ public Optional getCharacterEncoding() { @Override public InputStream getInputStream() throws IOException { - Content.Source contentSource = part.createContentSource(); - return Content.Source.asInputStream(contentSource); + Content.Source contentSource = part.createContentSource(); + return Content.Source.asInputStream(contentSource); } @Override diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpResponseImpl.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpResponseImpl.java index 40d9c7e7..088738f9 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpResponseImpl.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/HttpResponseImpl.java @@ -87,11 +87,15 @@ public OutputStream getOutputStream() { } if (outputStream == null) { Request request = response.getRequest(); - int outputBufferSize = request.getConnectionMetaData().getHttpConfiguration() - .getOutputBufferSize(); - BufferedContentSink bufferedContentSink = new BufferedContentSink(response, - request.getComponents().getByteBufferPool(), - false, outputBufferSize / 2, outputBufferSize); + int outputBufferSize = + request.getConnectionMetaData().getHttpConfiguration().getOutputBufferSize(); + BufferedContentSink bufferedContentSink = + new BufferedContentSink( + response, + request.getComponents().getByteBufferPool(), + false, + outputBufferSize / 2, + outputBufferSize); outputStream = new ContentSinkOutputStream(bufferedContentSink); } return outputStream; @@ -104,8 +108,10 @@ public synchronized BufferedWriter getWriter() throws IOException { throw new IllegalStateException("getOutputStream called"); } - writer = new NonBufferedWriter(WriteThroughWriter.newWriter(getOutputStream(), - Objects.requireNonNullElse(charset, StandardCharsets.UTF_8))); + writer = + new NonBufferedWriter( + WriteThroughWriter.newWriter( + getOutputStream(), Objects.requireNonNullElse(charset, StandardCharsets.UTF_8))); } return writer; } @@ -130,13 +136,11 @@ public void close(Callback callback) { } /** - * A {@link BufferedWriter} that does not buffer. - * It is generally more efficient to buffer at the {@link Content.Sink} level, - * since frequently total content is smaller than a single buffer and - * the {@link Content.Sink} can turn a close into a last write that will avoid - * chunking the response if at all possible. However, {@link BufferedWriter} - * is in the API for {@link HttpResponse}, so we must return a writer of - * that type. + * A {@link BufferedWriter} that does not buffer. It is generally more efficient to buffer at the + * {@link Content.Sink} level, since frequently total content is smaller than a single buffer and + * the {@link Content.Sink} can turn a close into a last write that will avoid chunking the + * response if at all possible. However, {@link BufferedWriter} is in the API for {@link + * HttpResponse}, so we must return a writer of that type. */ private static class NonBufferedWriter extends BufferedWriter { private final Writer writer; diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/TimeoutHandler.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/TimeoutHandler.java index af14cb5c..8e0b1832 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/TimeoutHandler.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/http/TimeoutHandler.java @@ -14,76 +14,65 @@ package com.google.cloud.functions.invoker.http; -import java.util.Timer; -import java.util.TimerTask; +import java.time.Duration; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.thread.Scheduler; public class TimeoutHandler extends Handler.Wrapper { - private final int timeoutMs; + private final Duration timeout; public TimeoutHandler(int timeoutSeconds, Handler handler) { setHandler(handler); - this.timeoutMs = timeoutSeconds * 1000; // Convert seconds to milliseconds + timeout = Duration.ofSeconds(timeoutSeconds); } @Override public boolean handle(Request request, Response response, Callback callback) throws Exception { - // Wrap the callback to ensure it is only called once between the handler and the timeout task. - AtomicBoolean completed = new AtomicBoolean(false); - Callback wrappedCallback = new Callback() { - @Override - public void succeeded() { - if (completed.compareAndSet(false, true)) { - callback.succeeded(); - } - } + // Wrap the callback to ensure it is only completed once between the + // handler and the timeout task. + Callback wrappedCallback = new ProtectedCallback(callback); + Scheduler.Task timeoutTask = + request + .getComponents() + .getScheduler() + .schedule( + () -> wrappedCallback.failed(new TimeoutException("Function execution timed out")), + timeout); - @Override - public void failed(Throwable x) { - if (completed.compareAndSet(false, true)) { - callback.failed(x); - } - } + // Cancel the timeout if the request completes the callback first. + return super.handle(request, response, Callback.from(timeoutTask::cancel, wrappedCallback)); + } - @Override - public InvocationType getInvocationType() { - return callback.getInvocationType(); - } - }; + private static class ProtectedCallback implements Callback { + private final Callback callback; + private final AtomicBoolean completed = new AtomicBoolean(false); - // TODO: consider wrapping the request/response to throw if they are used after timeout. - // TODO: Use org.eclipse.jetty.io.CyclicTimeouts which is optimized for timeouts which are almost always cancelled. - Timer timer = new Timer(true); - TimerTask timeoutTask = - new TimerTask() { - @Override - public void run() { - // TODO: there is a race between the handler writing response and timeout firing. - // This timeout firing doesn't stop the thread handling the request / response it just writes an error to the response. - Response.writeError( - request, - response, - callback, - HttpStatus.REQUEST_TIMEOUT_408, - "Function execution timed out"); - } - }; + public ProtectedCallback(Callback callback) { + this.callback = callback; + } - timer.schedule(timeoutTask, timeoutMs); + @Override + public void succeeded() { + if (completed.compareAndSet(false, true)) { + callback.succeeded(); + } + } - boolean handle; - try { - handle = super.handle(request, response, wrappedCallback); - timeoutTask.cancel(); - } finally { - timer.purge(); + @Override + public void failed(Throwable x) { + if (completed.compareAndSet(false, true)) { + callback.failed(x); + } } - return handle; + @Override + public InvocationType getInvocationType() { + return callback.getInvocationType(); + } } -} +} \ No newline at end of file diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/runner/Invoker.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/runner/Invoker.java index 24a7aa6a..20c3f248 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/runner/Invoker.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/runner/Invoker.java @@ -283,7 +283,13 @@ private void startServer(boolean join) throws Exception { server.setErrorHandler( new ErrorHandler() { @Override - protected void generateResponse(Request request, Response response, int code, String message, Throwable cause, Callback callback) { + protected void generateResponse( + Request request, + Response response, + int code, + String message, + Throwable cause, + Callback callback) { // Suppress error body callback.succeeded(); }