Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions invoker/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<cloudevents.sdk.version>4.0.1</cloudevents.sdk.version>
<jetty.version>12.1.3</jetty.version>
</properties>

<licenses>
Expand All @@ -46,11 +47,6 @@
<artifactId>functions-framework-api</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
Expand Down Expand Up @@ -97,13 +93,13 @@
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>9.4.58.v20250814</version>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.4.58.v20250814</version>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>2.0.9</version>
</dependency>
<dependency>
<groupId>com.beust</groupId>
Expand Down Expand Up @@ -151,7 +147,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
<version>9.4.58.v20250814</version>
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,32 @@
import io.cloudevents.http.HttpMessageFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;

/** Executes the user's background function. */
public final class BackgroundFunctionExecutor extends HttpServlet {
public final class BackgroundFunctionExecutor extends Handler.Abstract {
private static final Logger logger = Logger.getLogger("com.google.cloud.functions.invoker");

private final FunctionExecutor<?> functionExecutor;
Expand Down Expand Up @@ -177,8 +184,13 @@ static Optional<Type> backgroundFunctionTypeArgument(
.findFirst();
}

private static Event parseLegacyEvent(HttpServletRequest req) throws IOException {
try (BufferedReader bodyReader = req.getReader()) {
private static Event parseLegacyEvent(Request req) throws IOException {
try (BufferedReader bodyReader =
new BufferedReader(
new InputStreamReader(
Content.Source.asInputStream(req),
Objects.requireNonNullElse(
Request.getCharset(req), StandardCharsets.ISO_8859_1)))) {
return parseLegacyEvent(bodyReader);
}
}
Expand Down Expand Up @@ -225,7 +237,7 @@ private static Context contextFromCloudEvent(CloudEvent cloudEvent) {
* for the various triggers. CloudEvents are ones that follow the standards defined by <a
* href="https://cloudevents.io">cloudevents.io</a>.
*
* @param <CloudEventDataT> the type to be used in the {@link Unmarshallers} call when
* @param <CloudEventDataT> the type to be used in the {code Unmarshallers} call when
* unmarshalling this event, if it is a CloudEvent.
*/
private abstract static class FunctionExecutor<CloudEventDataT> {
Expand Down Expand Up @@ -322,23 +334,25 @@ void serviceCloudEvent(CloudEvent cloudEvent) throws Exception {

/** Executes the user's background function. This can handle all HTTP methods. */
@Override
public void service(HttpServletRequest req, HttpServletResponse res) throws IOException {
String contentType = req.getContentType();
public boolean handle(Request req, Response res, Callback callback) throws Exception {
String contentType = req.getHeaders().get(HttpHeader.CONTENT_TYPE);
try {
executionIdUtil.storeExecutionId(req);
if ((contentType != null && contentType.startsWith("application/cloudevents+json"))
|| req.getHeader("ce-specversion") != null) {
|| req.getHeaders().get("ce-specversion") != null) {
serviceCloudEvent(req);
} else {
serviceLegacyEvent(req);
}
res.setStatus(HttpServletResponse.SC_OK);
res.setStatus(HttpStatus.OK_200);
callback.succeeded();
} catch (Throwable t) {
res.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
logger.log(Level.SEVERE, "Failed to execute " + functionExecutor.functionName(), t);
Response.writeError(req, res, callback, HttpStatus.INTERNAL_SERVER_ERROR_500, null);
} finally {
executionIdUtil.removeExecutionId();
}
return true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here or at the top of the method explaining the return value? It seems confusing to me at face value

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is specified in the javadoc of handle

Returns:
True if and only if the request will be handled, a response generated and the callback eventually called. This may occur within the scope of the call to this method, or asynchronously some time later. If false is returned, then this method must not generate a response, nor complete the callback.

In this case we are always handling the request so we should always return true.

}

private enum CloudEventKind {
Expand All @@ -352,10 +366,14 @@ private enum CloudEventKind {
* @param <CloudEventT> a fake type parameter, which corresponds to the type parameter of {@link
* FunctionExecutor}.
*/
private <CloudEventT> void serviceCloudEvent(HttpServletRequest req) throws Exception {
private <CloudEventT> void serviceCloudEvent(Request req) throws Exception {
@SuppressWarnings("unchecked")
FunctionExecutor<CloudEventT> executor = (FunctionExecutor<CloudEventT>) functionExecutor;
byte[] body = req.getInputStream().readAllBytes();

// Read the entire request body into a byte array.
// TODO: this method is deprecated for removal, use the method introduced by
// https://github.com/jetty/jetty.project/pull/13939 when it is released.
byte[] body = Content.Source.asByteArrayAsync(req, -1).get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this array manipulation in a comment? why the '-1'?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This asByteArrayAsync method is now deprecated without replacement, I have opened jetty/jetty.project#13939 for an replacement method as Content.Source.asInputStream(req).readAllBytes() could be much less efficient.

But this is simply reading the entire content of the request into a byte[]. The second argument was the length of the content with -1 meaning read until EOF.

MessageReader reader = HttpMessageFactory.createReaderFromMultimap(headerMap(req), body);
// It's important not to set the context ClassLoader earlier, because MessageUtils will use
// ServiceLoader.load(EventFormat.class) to find a handler to deserialize a binary CloudEvent
Expand All @@ -369,17 +387,17 @@ private <CloudEventT> void serviceCloudEvent(HttpServletRequest req) throws Exce
// https://github.com/cloudevents/sdk-java/pull/259.
}

private static Map<String, List<String>> headerMap(HttpServletRequest req) {
private static Map<String, List<String>> headerMap(Request req) {
Map<String, List<String>> headerMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (String header : Collections.list(req.getHeaderNames())) {
for (String value : Collections.list(req.getHeaders(header))) {
headerMap.computeIfAbsent(header, unused -> new ArrayList<>()).add(value);
}
for (HttpField field : req.getHeaders()) {
headerMap
.computeIfAbsent(field.getName(), unused -> new ArrayList<>())
.addAll(field.getValueList());
}
return headerMap;
}

private void serviceLegacyEvent(HttpServletRequest req) throws Exception {
private void serviceLegacyEvent(Request req) throws Exception {
Event event = parseLegacyEvent(req);
runWithContextClassLoader(() -> functionExecutor.serviceLegacyEvent(event));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import com.google.cloud.functions.invoker.http.HttpResponseImpl;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;

/** Executes the user's method. */
public class HttpFunctionExecutor extends HttpServlet {
public class HttpFunctionExecutor extends Handler.Abstract {
private static final Logger logger = Logger.getLogger("com.google.cloud.functions.invoker");

private final HttpFunction function;
Expand Down Expand Up @@ -65,21 +67,23 @@ public static HttpFunctionExecutor forClass(Class<?> functionClass) {

/** Executes the user's method, can handle all HTTP type methods. */
@Override
public void service(HttpServletRequest req, HttpServletResponse res) {
HttpRequestImpl reqImpl = new HttpRequestImpl(req);
HttpResponseImpl respImpl = new HttpResponseImpl(res);
public boolean handle(Request request, Response response, Callback callback) throws Exception {

HttpRequestImpl reqImpl = new HttpRequestImpl(request);
HttpResponseImpl respImpl = new HttpResponseImpl(response);
ClassLoader oldContextLoader = Thread.currentThread().getContextClassLoader();
try {
executionIdUtil.storeExecutionId(req);
executionIdUtil.storeExecutionId(request);
Thread.currentThread().setContextClassLoader(function.getClass().getClassLoader());
function.service(reqImpl, respImpl);
respImpl.close(callback);
} catch (Throwable t) {
logger.log(Level.SEVERE, "Failed to execute " + function.getClass().getName(), t);
res.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
Response.writeError(request, response, callback, HttpStatus.INTERNAL_SERVER_ERROR_500, null);
} finally {
Thread.currentThread().setContextClassLoader(oldContextLoader);
executionIdUtil.removeExecutionId();
respImpl.flush();
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;

public class TypedFunctionExecutor extends HttpServlet {
public class TypedFunctionExecutor extends Handler.Abstract {
private static final String APPLY_METHOD = "apply";
private static final Logger logger = Logger.getLogger("com.google.cloud.functions.invoker");

Expand Down Expand Up @@ -94,18 +96,21 @@ static Optional<Type> handlerTypeArgument(Class<? extends TypedFunction<?, ?>> f

/** Executes the user's method, can handle all HTTP type methods. */
@Override
public void service(HttpServletRequest req, HttpServletResponse res) {
public boolean handle(Request req, Response res, Callback callback) throws Exception {
HttpRequestImpl reqImpl = new HttpRequestImpl(req);
HttpResponseImpl resImpl = new HttpResponseImpl(res);
ClassLoader oldContextClassLoader = Thread.currentThread().getContextClassLoader();

try {
Thread.currentThread().setContextClassLoader(function.getClass().getClassLoader());
handleRequest(reqImpl, resImpl);
resImpl.close(callback);
} catch (Throwable t) {
Response.writeError(req, res, callback, HttpStatus.INTERNAL_SERVER_ERROR_500, null, t);
} finally {
Thread.currentThread().setContextClassLoader(oldContextClassLoader);
resImpl.flush();
}
return true;
}

private void handleRequest(HttpRequest req, HttpResponse res) {
Expand All @@ -114,7 +119,7 @@ private void handleRequest(HttpRequest req, HttpResponse res) {
reqObj = format.deserialize(req, argType);
} catch (Throwable t) {
logger.log(Level.SEVERE, "Failed to parse request for " + function.getClass().getName(), t);
res.setStatusCode(HttpServletResponse.SC_BAD_REQUEST);
res.setStatusCode(HttpStatus.BAD_REQUEST_400);
return;
}

Expand All @@ -123,7 +128,7 @@ private void handleRequest(HttpRequest req, HttpResponse res) {
resObj = function.apply(reqObj);
} catch (Throwable t) {
logger.log(Level.SEVERE, "Failed to execute " + function.getClass().getName(), t);
res.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
res.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR_500);
return;
}

Expand All @@ -132,7 +137,7 @@ private void handleRequest(HttpRequest req, HttpResponse res) {
} catch (Throwable t) {
logger.log(
Level.SEVERE, "Failed to serialize response for " + function.getClass().getName(), t);
res.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
res.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR_500);
return;
}
}
Expand All @@ -147,7 +152,7 @@ private static class GsonWireFormat implements TypedFunction.WireFormat {
@Override
public void serialize(Object object, HttpResponse response) throws Exception {
if (object == null) {
response.setStatusCode(HttpServletResponse.SC_NO_CONTENT);
response.setStatusCode(HttpStatus.NO_CONTENT_204);
return;
}
try (BufferedWriter bodyWriter = response.getWriter()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Handler;
import java.util.logging.Logger;
import javax.servlet.http.HttpServletRequest;
import org.eclipse.jetty.server.Request;

/**
* A helper class that either fetches a unique execution id from request HTTP headers or generates a
Expand All @@ -23,7 +23,7 @@ public final class ExecutionIdUtil {
* Add mapping to root logger from current thread id to execution id. This mapping will be used to
* append the execution id to log lines.
*/
public void storeExecutionId(HttpServletRequest request) {
public void storeExecutionId(Request request) {
if (!executionIdLoggingEnabled()) {
return;
}
Expand All @@ -47,8 +47,8 @@ public void removeExecutionId() {
}
}

private String getOrGenerateExecutionId(HttpServletRequest request) {
String executionId = request.getHeader(EXECUTION_ID_HTTP_HEADER);
private String getOrGenerateExecutionId(Request request) {
String executionId = request.getHeaders().get(EXECUTION_ID_HTTP_HEADER);
if (executionId == null) {
byte[] array = new byte[EXECUTION_ID_LENGTH];
random.nextBytes(array);
Expand Down
Loading
Loading