diff --git a/.gitignore b/.gitignore
index dfc6122dc..45a7fffc4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,7 @@
target/
.idea/
+.idx/
+.vscode/
*.iml
__pycache__/
.DS_Store
diff --git a/.kokoro/nightly/integration-named-db.cfg b/.kokoro/nightly/integration-named-db.cfg
index 7d0afbaed..1d3f31f63 100644
--- a/.kokoro/nightly/integration-named-db.cfg
+++ b/.kokoro/nightly/integration-named-db.cfg
@@ -13,7 +13,7 @@ env_vars: {
env_vars: {
key: "INTEGRATION_TEST_ARGS"
- value: "-DFIRESTORE_NAMED_DATABASE=test-db"
+ value: "-DFIRESTORE_NAMED_DATABASE=test-db -DFIRESTORE_EDITION=standard"
}
# TODO: remove this after we've migrated all tests and scripts
diff --git a/.kokoro/nightly/integration.cfg b/.kokoro/nightly/integration.cfg
index f60aebf4b..5d2dffad2 100644
--- a/.kokoro/nightly/integration.cfg
+++ b/.kokoro/nightly/integration.cfg
@@ -10,6 +10,12 @@ env_vars: {
key: "JOB_TYPE"
value: "integration"
}
+
+env_vars: {
+ key: "INTEGRATION_TEST_ARGS"
+ value: "-DFIRESTORE_EDITION=standard"
+}
+
# TODO: remove this after we've migrated all tests and scripts
env_vars: {
key: "GCLOUD_PROJECT"
diff --git a/.kokoro/presubmit/graalvm-a.cfg b/.kokoro/presubmit/graalvm-a.cfg
index 35e57f557..11c421440 100644
--- a/.kokoro/presubmit/graalvm-a.cfg
+++ b/.kokoro/presubmit/graalvm-a.cfg
@@ -11,6 +11,11 @@ env_vars: {
value: "graalvm"
}
+env_vars: {
+ key: "INTEGRATION_TEST_ARGS"
+ value: "-DFIRESTORE_EDITION=standard"
+}
+
# TODO: remove this after we've migrated all tests and scripts
env_vars: {
key: "GCLOUD_PROJECT"
@@ -35,4 +40,4 @@ env_vars: {
env_vars: {
key: "IT_SERVICE_ACCOUNT_EMAIL"
value: "it-service-account@gcloud-devel.iam.gserviceaccount.com"
-}
\ No newline at end of file
+}
diff --git a/.kokoro/presubmit/graalvm-b.cfg b/.kokoro/presubmit/graalvm-b.cfg
index 24accde47..2fb516651 100644
--- a/.kokoro/presubmit/graalvm-b.cfg
+++ b/.kokoro/presubmit/graalvm-b.cfg
@@ -11,6 +11,11 @@ env_vars: {
value: "graalvm"
}
+env_vars: {
+ key: "INTEGRATION_TEST_ARGS"
+ value: "-DFIRESTORE_EDITION=standard"
+}
+
# TODO: remove this after we've migrated all tests and scripts
env_vars: {
key: "GCLOUD_PROJECT"
@@ -35,4 +40,4 @@ env_vars: {
env_vars: {
key: "IT_SERVICE_ACCOUNT_EMAIL"
value: "it-service-account@gcloud-devel.iam.gserviceaccount.com"
-}
\ No newline at end of file
+}
diff --git a/.kokoro/presubmit/graalvm-c.cfg b/.kokoro/presubmit/graalvm-c.cfg
index 01407e173..1554918af 100644
--- a/.kokoro/presubmit/graalvm-c.cfg
+++ b/.kokoro/presubmit/graalvm-c.cfg
@@ -11,6 +11,11 @@ env_vars: {
value: "graalvm"
}
+env_vars: {
+ key: "INTEGRATION_TEST_ARGS"
+ value: "-DFIRESTORE_EDITION=standard"
+}
+
# TODO: remove this after we've migrated all tests and scripts
env_vars: {
key: "GCLOUD_PROJECT"
@@ -35,4 +40,4 @@ env_vars: {
env_vars: {
key: "IT_SERVICE_ACCOUNT_EMAIL"
value: "it-service-account@gcloud-devel.iam.gserviceaccount.com"
-}
\ No newline at end of file
+}
diff --git a/.kokoro/presubmit/graalvm-native-a.cfg b/.kokoro/presubmit/graalvm-native-a.cfg
index d2cba4c76..1e6e21370 100644
--- a/.kokoro/presubmit/graalvm-native-a.cfg
+++ b/.kokoro/presubmit/graalvm-native-a.cfg
@@ -11,6 +11,11 @@ env_vars: {
value: "graalvm"
}
+env_vars: {
+ key: "INTEGRATION_TEST_ARGS"
+ value: "-DFIRESTORE_EDITION=standard"
+}
+
env_vars: {
key: "GOOGLE_CLOUD_PROJECT"
value: "java-review"
@@ -24,4 +29,4 @@ env_vars: {
env_vars: {
key: "SECRET_MANAGER_KEYS"
value: "java-review_firestore-java-it"
-}
\ No newline at end of file
+}
diff --git a/.kokoro/presubmit/graalvm-native-b.cfg b/.kokoro/presubmit/graalvm-native-b.cfg
index eb0fc34c6..1f79b0698 100644
--- a/.kokoro/presubmit/graalvm-native-b.cfg
+++ b/.kokoro/presubmit/graalvm-native-b.cfg
@@ -11,6 +11,11 @@ env_vars: {
value: "graalvm"
}
+env_vars: {
+ key: "INTEGRATION_TEST_ARGS"
+ value: "-DFIRESTORE_EDITION=standard"
+}
+
env_vars: {
key: "GOOGLE_CLOUD_PROJECT"
value: "java-review"
@@ -24,4 +29,4 @@ env_vars: {
env_vars: {
key: "SECRET_MANAGER_KEYS"
value: "java-review_firestore-java-it"
-}
\ No newline at end of file
+}
diff --git a/.kokoro/presubmit/graalvm-native-c.cfg b/.kokoro/presubmit/graalvm-native-c.cfg
index a48dfa802..5dba283ef 100644
--- a/.kokoro/presubmit/graalvm-native-c.cfg
+++ b/.kokoro/presubmit/graalvm-native-c.cfg
@@ -11,6 +11,11 @@ env_vars: {
value: "graalvm"
}
+env_vars: {
+ key: "INTEGRATION_TEST_ARGS"
+ value: "-DFIRESTORE_EDITION=standard"
+}
+
env_vars: {
key: "GOOGLE_CLOUD_PROJECT"
value: "java-review"
@@ -24,4 +29,4 @@ env_vars: {
env_vars: {
key: "SECRET_MANAGER_KEYS"
value: "java-review_firestore-java-it"
-}
\ No newline at end of file
+}
diff --git a/.kokoro/presubmit/integration-named-db.cfg b/.kokoro/presubmit/integration-named-db.cfg
index b66392e83..81bb77849 100644
--- a/.kokoro/presubmit/integration-named-db.cfg
+++ b/.kokoro/presubmit/integration-named-db.cfg
@@ -13,7 +13,7 @@ env_vars: {
env_vars: {
key: "INTEGRATION_TEST_ARGS"
- value: "-DFIRESTORE_NAMED_DATABASE=test-db"
+ value: "-DFIRESTORE_NAMED_DATABASE=test-db -DFIRESTORE_EDITION=standard"
}
env_vars: {
diff --git a/.kokoro/presubmit/integration.cfg b/.kokoro/presubmit/integration.cfg
index 40ef5968d..accd9db7e 100644
--- a/.kokoro/presubmit/integration.cfg
+++ b/.kokoro/presubmit/integration.cfg
@@ -11,6 +11,11 @@ env_vars: {
value: "integration"
}
+env_vars: {
+ key: "INTEGRATION_TEST_ARGS"
+ value: "-DFIRESTORE_EDITION=standard"
+}
+
env_vars: {
key: "GCLOUD_PROJECT"
value: "java-review"
diff --git a/README.md b/README.md
index 976e6e3ee..485c87f7d 100644
--- a/README.md
+++ b/README.md
@@ -106,6 +106,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-firestore/tre
| Sample | Source Code | Try it |
| --------------------------- | --------------------------------- | ------ |
+| Pipeline Snippets | [source code](https://github.com/googleapis/java-firestore/blob/main/samples/preview-snippets/src/main/java/com/example/firestore/PipelineSnippets.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-firestore&page=editor&open_in_editor=samples/preview-snippets/src/main/java/com/example/firestore/PipelineSnippets.java) |
| Quickstart | [source code](https://github.com/googleapis/java-firestore/blob/main/samples/snippets/src/main/java/com/example/firestore/Quickstart.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-firestore&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/firestore/Quickstart.java) |
| Example Firestore Beam Read | [source code](https://github.com/googleapis/java-firestore/blob/main/samples/snippets/src/main/java/com/example/firestore/beam/ExampleFirestoreBeamRead.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-firestore&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/firestore/beam/ExampleFirestoreBeamRead.java) |
| Example Firestore Beam Write | [source code](https://github.com/googleapis/java-firestore/blob/main/samples/snippets/src/main/java/com/example/firestore/beam/ExampleFirestoreBeamWrite.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-firestore&page=editor&open_in_editor=samples/snippets/src/main/java/com/example/firestore/beam/ExampleFirestoreBeamWrite.java) |
diff --git a/google-cloud-firestore/clirr-ignored-differences.xml b/google-cloud-firestore/clirr-ignored-differences.xml
index a1064b15c..6685f0d8a 100644
--- a/google-cloud-firestore/clirr-ignored-differences.xml
+++ b/google-cloud-firestore/clirr-ignored-differences.xml
@@ -346,4 +346,26 @@
void internalStream(*)*
+
+
+
+ 7012
+ com/google/cloud/firestore/Firestore
+ com.google.cloud.firestore.PipelineSource pipeline()
+
+
+ 7013
+ com/google/cloud/firestore/Transaction
+ com.google.api.core.ApiFuture execute(com.google.cloud.firestore.Pipeline)
+
+
+ 7013
+ com/google/cloud/firestore/Transaction
+ com.google.api.core.ApiFuture execute(com.google.cloud.firestore.Pipeline, com.google.cloud.firestore.pipeline.stages.PipelineExecuteOptions)
+
+
+ 7012
+ com/google/cloud/firestore/spi/v1/FirestoreRpc
+ com.google.api.gax.rpc.ServerStreamingCallable executePipelineCallable()
+
diff --git a/google-cloud-firestore/pom.xml b/google-cloud-firestore/pom.xml
index e2f86c6a7..f00aab94e 100644
--- a/google-cloud-firestore/pom.xml
+++ b/google-cloud-firestore/pom.xml
@@ -306,6 +306,51 @@
+
+ maven-assembly-plugin
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ default-compile
+ none
+
+
+ default-testCompile
+ none
+
+
+ compile
+ compile
+
+ compile
+
+
+
+ testCompile
+ test-compile
+
+ testCompile
+
+
+
+
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/AggregateQuery.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/AggregateQuery.java
index 89702e423..830d3bc61 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/AggregateQuery.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/AggregateQuery.java
@@ -16,6 +16,7 @@
package com.google.cloud.firestore;
+import static com.google.cloud.firestore.pipeline.expressions.Expression.and;
import static com.google.cloud.firestore.telemetry.TelemetryConstants.METHOD_NAME_RUN_AGGREGATION_QUERY;
import static com.google.cloud.firestore.telemetry.TraceUtil.ATTRIBUTE_KEY_ATTEMPT;
@@ -27,6 +28,8 @@
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.Timestamp;
+import com.google.cloud.firestore.pipeline.expressions.AliasedAggregate;
+import com.google.cloud.firestore.pipeline.expressions.BooleanExpression;
import com.google.cloud.firestore.telemetry.MetricsUtil.MetricsContext;
import com.google.cloud.firestore.telemetry.TelemetryConstants;
import com.google.cloud.firestore.telemetry.TelemetryConstants.MetricType;
@@ -50,6 +53,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -84,6 +88,30 @@ public Query getQuery() {
return query;
}
+ Pipeline pipeline() {
+ Pipeline pipeline = getQuery().pipeline();
+
+ List existsExprs =
+ this.aggregateFieldList.stream()
+ .map(PipelineUtils::toPipelineExistsExpr)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ if (existsExprs.size() == 1) {
+ pipeline = pipeline.where(existsExprs.get(0));
+ } else if (existsExprs.size() > 1) {
+ pipeline =
+ pipeline.where(
+ and(
+ existsExprs.get(0),
+ existsExprs.subList(1, existsExprs.size()).toArray(new BooleanExpression[0])));
+ }
+
+ return pipeline.aggregate(
+ this.aggregateFieldList.stream()
+ .map(PipelineUtils::toPipelineAggregatorTarget)
+ .toArray(AliasedAggregate[]::new));
+ }
+
/**
* Executes this query.
*
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/AggregateQuerySnapshot.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/AggregateQuerySnapshot.java
index ff7b99906..be98e61d2 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/AggregateQuerySnapshot.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/AggregateQuerySnapshot.java
@@ -16,6 +16,7 @@
package com.google.cloud.firestore;
+import com.google.api.core.InternalApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.cloud.Timestamp;
import com.google.firestore.v1.Value;
@@ -189,4 +190,9 @@ public boolean equals(Object object) {
public int hashCode() {
return Objects.hash(query, data);
}
+
+ @InternalApi
+ Map getData() {
+ return data;
+ }
}
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ExplainStats.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ExplainStats.java
new file mode 100644
index 000000000..9d255ac73
--- /dev/null
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ExplainStats.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.firestore;
+
+import com.google.api.core.BetaApi;
+import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.StringValue;
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper object to access explain stats if explain or analyze was enabled for the Pipeline query
+ * execution.
+ */
+@BetaApi
+public final class ExplainStats {
+
+ private final Any explainStatsData;
+
+ /**
+ * @hideconstructor
+ * @param explainStatsData The raw proto message of the explain stats.
+ */
+ ExplainStats(@Nonnull Any explainStatsData) {
+ this.explainStatsData = explainStatsData;
+ }
+
+ /**
+ * Returns the explain stats in an encoded proto format, as returned from the Firestore backend.
+ * The caller is responsible for unpacking this proto message.
+ */
+ @Nonnull
+ public Any getRawData() {
+ return explainStatsData;
+ }
+
+ private StringValue decode() {
+ try {
+ return explainStatsData.unpack(StringValue.class);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(
+ "Unable to decode explain stats. Did you request an output format that returns a string"
+ + " value, such as 'text' or 'json'?",
+ e);
+ }
+ }
+
+ /**
+ * When explain stats were requested with `outputFormat = 'text'`, this returns the explain stats
+ * string verbatim as returned from the Firestore backend.
+ *
+ *
If explain stats were requested with `outputFormat = 'json'`, this returns the explain stats
+ * as stringified JSON, which was returned from the Firestore backend.
+ */
+ @Nonnull
+ public String getText() {
+ return decode().getValue();
+ }
+}
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FieldPath.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FieldPath.java
index e6939c445..c5b9a8173 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FieldPath.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FieldPath.java
@@ -16,6 +16,7 @@
package com.google.cloud.firestore;
+import com.google.api.core.InternalApi;
import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -79,7 +80,8 @@ static boolean isDocumentId(String path) {
}
/** Returns a field path from a dot separated string. Does not support escaping. */
- static FieldPath fromDotSeparatedString(String field) {
+ @InternalApi
+ public static FieldPath fromDotSeparatedString(String field) {
if (PROHIBITED_CHARACTERS.matcher(field).matches()) {
throw new IllegalArgumentException("Use FieldPath.of() for field names containing '˜*/[]'.");
}
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java
index 5bbb1164a..a5a4caad5 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Firestore.java
@@ -66,6 +66,37 @@ public interface Firestore extends Service, AutoCloseable {
*/
CollectionGroup collectionGroup(@Nonnull String collectionId);
+ /**
+ * Creates a new {@link PipelineSource} to build and execute a data pipeline.
+ *
+ *
A pipeline is composed of a sequence of stages. Each stage processes the output from the
+ * previous one, and the final stage's output is the result of the pipeline's execution.
+ *
+ *
Note on Execution: The stages are conceptual. The Firestore backend may optimize
+ * execution (e.g., reordering or merging stages) as long as the final result remains the same.
+ *
+ *
Important Limitations:
+ *
+ *
+ *
Pipelines operate on a request/response basis only.
+ *
They do not utilize or update the local SDK cache.
+ *
They do not support realtime snapshot listeners.
+ *
+ *
+ * @return A {@code PipelineSource} to begin defining the pipeline's stages.
+ */
+ PipelineSource pipeline();
+
/**
* Executes the given updateFunction and then attempts to commit the changes applied within the
* transaction. If any document read within the transaction has changed, the updateFunction will
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java
index 4d532b459..ce0fd6736 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java
@@ -21,6 +21,7 @@
import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
+import com.google.api.core.BetaApi;
import com.google.api.core.NanoClock;
import com.google.api.core.ObsoleteApi;
import com.google.api.core.SettableApiFuture;
@@ -416,6 +417,13 @@ public CollectionGroup collectionGroup(@Nonnull final String collectionId) {
return new CollectionGroup(this, collectionId);
}
+ @Nonnull
+ @Override
+ @BetaApi
+ public PipelineSource pipeline() {
+ return new PipelineSource(this);
+ }
+
@Nonnull
@Override
public ApiFuture runTransaction(@Nonnull final Transaction.Function updateFunction) {
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Pipeline.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Pipeline.java
new file mode 100644
index 000000000..8bb00b7b4
--- /dev/null
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Pipeline.java
@@ -0,0 +1,1387 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.firestore;
+
+import static com.google.cloud.firestore.pipeline.expressions.Expression.field;
+import static com.google.cloud.firestore.telemetry.TraceUtil.ATTRIBUTE_KEY_DOC_COUNT;
+import static com.google.cloud.firestore.telemetry.TraceUtil.ATTRIBUTE_KEY_IS_TRANSACTIONAL;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.BetaApi;
+import com.google.api.core.InternalApi;
+import com.google.api.core.InternalExtensionOnly;
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.rpc.ApiStreamObserver;
+import com.google.api.gax.rpc.ResponseObserver;
+import com.google.api.gax.rpc.StreamController;
+import com.google.cloud.Timestamp;
+import com.google.cloud.firestore.pipeline.expressions.AggregateFunction;
+import com.google.cloud.firestore.pipeline.expressions.AliasedAggregate;
+import com.google.cloud.firestore.pipeline.expressions.AliasedExpression;
+import com.google.cloud.firestore.pipeline.expressions.BooleanExpression;
+import com.google.cloud.firestore.pipeline.expressions.Expression;
+import com.google.cloud.firestore.pipeline.expressions.Field;
+import com.google.cloud.firestore.pipeline.expressions.FunctionExpression;
+import com.google.cloud.firestore.pipeline.expressions.Ordering;
+import com.google.cloud.firestore.pipeline.expressions.Selectable;
+import com.google.cloud.firestore.pipeline.stages.AddFields;
+import com.google.cloud.firestore.pipeline.stages.Aggregate;
+import com.google.cloud.firestore.pipeline.stages.AggregateOptions;
+import com.google.cloud.firestore.pipeline.stages.Distinct;
+import com.google.cloud.firestore.pipeline.stages.FindNearest;
+import com.google.cloud.firestore.pipeline.stages.FindNearestOptions;
+import com.google.cloud.firestore.pipeline.stages.Limit;
+import com.google.cloud.firestore.pipeline.stages.Offset;
+import com.google.cloud.firestore.pipeline.stages.PipelineExecuteOptions;
+import com.google.cloud.firestore.pipeline.stages.RawStage;
+import com.google.cloud.firestore.pipeline.stages.RemoveFields;
+import com.google.cloud.firestore.pipeline.stages.ReplaceWith;
+import com.google.cloud.firestore.pipeline.stages.Sample;
+import com.google.cloud.firestore.pipeline.stages.Select;
+import com.google.cloud.firestore.pipeline.stages.Sort;
+import com.google.cloud.firestore.pipeline.stages.Stage;
+import com.google.cloud.firestore.pipeline.stages.StageUtils;
+import com.google.cloud.firestore.pipeline.stages.Union;
+import com.google.cloud.firestore.pipeline.stages.Unnest;
+import com.google.cloud.firestore.pipeline.stages.UnnestOptions;
+import com.google.cloud.firestore.pipeline.stages.Where;
+import com.google.cloud.firestore.telemetry.MetricsUtil.MetricsContext;
+import com.google.cloud.firestore.telemetry.TelemetryConstants;
+import com.google.cloud.firestore.telemetry.TelemetryConstants.MetricType;
+import com.google.cloud.firestore.telemetry.TraceUtil;
+import com.google.cloud.firestore.telemetry.TraceUtil.Scope;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.firestore.v1.Document;
+import com.google.firestore.v1.ExecutePipelineRequest;
+import com.google.firestore.v1.ExecutePipelineResponse;
+import com.google.firestore.v1.StructuredPipeline;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.ByteString;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * The Pipeline class provides a flexible and expressive framework for building complex data
+ * transformation and query pipelines for Firestore.
+ *
+ *
A pipeline takes data sources, such as Firestore collections or collection groups, and applies
+ * a series of stages that are chained together. Each stage takes the output from the previous stage
+ * (or the data source) and produces an output for the next stage (or as the final output of the
+ * pipeline).
+ *
+ *
Expressions from {@link com.google.cloud.firestore.pipeline.expressions} can be used within
+ * each stages to filter and transform data through the stage.
+ *
+ *
NOTE: The chained stages do not prescribe exactly how Firestore will execute the pipeline.
+ * Instead, Firestore only guarantees that the result is the same as if the chained stages were
+ * executed in order.
+ *
+ *
Usage Examples:
+ *
+ *
{@code
+ * Firestore firestore; // A valid firestore instance.
+ *
+ * // Example 1: Select specific fields and rename 'rating' to 'bookRating'
+ * Snapshot results1 = firestore.pipeline()
+ * .collection("books")
+ * .select(field("title"), field("author"), field("rating").as("bookRating"))
+ * .execute()
+ * .get();
+ *
+ * // Example 2: Filter documents where 'genre' is "Science Fiction" and 'published' is after 1950
+ * Snapshot results2 = firestore.pipeline()
+ * .collection("books")
+ * .where(and(eq("genre", "Science Fiction"), gt("published", 1950)))
+ * .execute()
+ * .get();
+ * // Same as above but using methods on expressions as opposed to static functions.
+ * results2 = firestore.pipeline()
+ * .collection("books")
+ * .where(and(field("genre").eq("Science Fiction"), field("published").gt(1950)))
+ * .execute()
+ * .get();
+ *
+ * // Example 3: Calculate the average rating of books published after 1980
+ * Snapshot results3 = firestore.pipeline()
+ * .collection("books")
+ * .where(gt("published", 1980))
+ * .aggregate(avg("rating").as("averageRating"))
+ * .execute()
+ * .get();
+ * }
+ */
+@BetaApi
+public final class Pipeline {
+ /**
+ * A Snapshot contains the results of a pipeline execution. It can be used to access the
+ * documents, execution time, and explain stats.
+ */
+ @BetaApi
+ public static final class Snapshot {
+
+ private final Pipeline pipeline;
+ private final Timestamp executionTime;
+ private final List results;
+ private final ExplainStats explainStats;
+
+ Snapshot(
+ @Nonnull Pipeline pipeline,
+ @Nonnull List results,
+ @Nonnull Timestamp executionTime,
+ @Nullable ExplainStats explainStats) {
+ this.pipeline = pipeline;
+ this.results = results;
+ this.executionTime = executionTime;
+ this.explainStats = explainStats;
+ }
+
+ /**
+ * The Pipeline on which you called `execute()` in order to get this `Snapshot`.
+ *
+ * @return The pipeline that was executed.
+ */
+ @Nonnull
+ Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ /**
+ * An array of all the results in the `Snapshot`.
+ *
+ * @return The list of results.
+ */
+ @Nonnull
+ public List getResults() {
+ return results;
+ }
+
+ /**
+ * The time at which the pipeline producing this result is executed.
+ *
+ * @return The execution time of the pipeline.
+ */
+ @Nonnull
+ public Timestamp getExecutionTime() {
+ return executionTime;
+ }
+
+ /**
+ * Return stats from query explain.
+ *
+ *
If `explainOptions.mode` was set to `execute` or left unset, then this returns `null`.
+ *
+ * @return The explain stats, or `null` if not available.
+ */
+ @Nullable
+ public ExplainStats getExplainStats() {
+ return explainStats;
+ }
+ }
+
+ private static Logger logger = Logger.getLogger(Pipeline.class.getName());
+ private final FluentIterable stages;
+ private final FirestoreRpcContext> rpcContext;
+
+ private Pipeline(FirestoreRpcContext> rpcContext, FluentIterable stages) {
+ this.rpcContext = rpcContext;
+ this.stages = stages;
+ }
+
+ @InternalApi
+ Pipeline(FirestoreRpcContext> rpcContext, Stage stage) {
+ this(rpcContext, FluentIterable.of(stage));
+ }
+
+ private Pipeline append(Stage stage) {
+ return new Pipeline(this.rpcContext, stages.append(stage));
+ }
+
+ /**
+ * Adds new fields to outputs from previous stages.
+ *
+ *
This stage allows you to compute values on-the-fly based on existing data from previous
+ * stages or constants. You can use this to create new fields or overwrite existing ones (if there
+ * is name overlaps).
+ *
+ *
The added fields are defined using {@link Selectable} expressions, which can be:
+ *
+ *
+ *
{@link Field}: References an existing document field.
+ *
{@link FunctionExpression}: Performs a calculation using functions like `add`, `multiply`
+ * with assigned aliases using {@link Expression#as(String)}.
+ *
+ *
+ * @param fields The fields to add to the documents, specified as {@link Selectable} expressions.
+ * @return A new Pipeline object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline addFields(Selectable... fields) {
+ return append(new AddFields(PipelineUtils.selectablesToMap(fields)));
+ }
+
+ /**
+ * Remove fields from outputs of previous stages.
+ *
+ *
+ *
+ * @param fields The fields to remove.
+ * @return A new Pipeline object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline removeFields(Field... fields) {
+ return append(
+ new RemoveFields(
+ ImmutableList.builder().addAll(Arrays.stream(fields).iterator()).build()));
+ }
+
+ /**
+ * Selects or creates a set of fields from the outputs of previous stages.
+ *
+ *
The selected fields are defined using {@link Selectable} expressions, which can be:
+ *
+ *
+ *
{@link Field}: References an existing document field.
+ *
{@link FunctionExpression}: Represents the result of a function with an assigned alias
+ * name using {@link Expression#as(String)}
+ *
+ *
+ *
If no selections are provided, the output of this stage is empty. Use {@link
+ * com.google.cloud.firestore.Pipeline#addFields(Selectable...)} instead if only additions are
+ * desired.
+ *
+ *
+ *
+ * @param selections The fields to include in the output documents, specified as {@link
+ * Selectable} expressions.
+ * @return A new Pipeline object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline select(Selectable... selections) {
+ return append(new Select(PipelineUtils.selectablesToMap(selections)));
+ }
+
+ /**
+ * Selects a set of fields from the outputs of previous stages.
+ *
+ *
If no selections are provided, the output of this stage is empty. Use {@link
+ * com.google.cloud.firestore.Pipeline#addFields(Selectable...)} instead if only additions are
+ * desired.
+ *
+ *
Example:
+ *
+ *
{@code
+ * firestore.collection("books")
+ * .select("name", "address");
+ *
+ * // The above is a shorthand of this:
+ * firestore.pipeline().collection("books")
+ * .select(field("name"), field("address"));
+ * }
+ *
+ * @param fields The name of the fields to include in the output documents.
+ * @return A new Pipeline object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline select(String... fields) {
+ return append(new Select(PipelineUtils.fieldNamesToMap(fields)));
+ }
+
+ /**
+ * Filters the documents from previous stages to only include those matching the specified {@link
+ * BooleanExpression}.
+ *
+ *
This stage allows you to apply conditions to the data, similar to a "WHERE" clause in SQL.
+ * You can filter documents based on their field values, using implementions of {@link
+ * BooleanExpression}, typically including but not limited to:
+ *
+ *
+ *
field comparators: {@link FunctionExpression#equal}, {@link FunctionExpression#lessThan}
+ * (less than), {@link FunctionExpression#greaterThan} (greater than), etc.
+ *
+ *
+ * @param condition The {@link BooleanExpression} to apply.
+ * @return A new Pipeline object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline where(BooleanExpression condition) {
+ return append(new Where(condition));
+ }
+
+ /**
+ * Skips the first `offset` number of documents from the results of previous stages.
+ *
+ *
This stage is useful for implementing pagination in your pipelines, allowing you to retrieve
+ * results in chunks. It is typically used in conjunction with {@link #limit(int)} to control the
+ * size of each page.
+ *
+ *
Example:
+ *
+ *
{@code
+ * // Retrieve the second page of 20 results
+ * firestore.pipeline().collection("books")
+ * .sort(field("published").descending())
+ * .offset(20) // Skip the first 20 results
+ * .limit(20); // Take the next 20 results
+ * }
+ *
+ * @param offset The number of documents to skip.
+ * @return A new Pipeline object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline offset(int offset) {
+ return append(new Offset(offset));
+ }
+
+ /**
+ * Limits the maximum number of documents returned by previous stages to `limit`.
+ *
+ *
This stage is particularly useful when you want to retrieve a controlled subset of data from
+ * a potentially large result set. It's often used for:
+ *
+ *
+ *
**Pagination:** In combination with {@link #offset(int)} to retrieve specific pages of
+ * results.
+ *
**Limiting Data Retrieval:** To prevent excessive data transfer and improve performance,
+ * especially when dealing with large collections.
+ *
+ *
+ *
Example:
+ *
+ *
{@code
+ * // Limit the results to the top 10 highest-rated books
+ * firestore.pipeline().collection("books")
+ * .sort(field("rating").descending())
+ * .limit(10);
+ * }
+ *
+ * @param limit The maximum number of documents to return.
+ * @return A new Pipeline object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline limit(int limit) {
+ return append(new Limit(limit));
+ }
+
+ /**
+ * Performs aggregation operations on the documents from previous stages.
+ *
+ *
This stage allows you to calculate aggregate values over a set of documents. You define the
+ * aggregations to perform using {@link AliasedExpression} expressions which are typically results
+ * of calling {@link Expression#as(String)} on {@link AggregateFunction} instances.
+ *
+ *
Example:
+ *
+ *
{@code
+ * // Calculate the average rating and the total number of books
+ * firestore.pipeline().collection("books")
+ * .aggregate(
+ * field("rating").avg().as("averageRating"),
+ * countAll().as("totalBooks")
+ * );
+ * }
+ *
+ * @param accumulators The {@link AliasedExpression} expressions, each wrapping an {@link
+ * AggregateFunction} and provide a name for the accumulated results.
+ * @return A new Pipeline object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline aggregate(AliasedAggregate... accumulators) {
+ return append(Aggregate.withAccumulators(accumulators));
+ }
+
+ /**
+ * Performs optionally grouped aggregation operations on the documents from previous stages.
+ *
+ *
This stage allows you to calculate aggregate values over a set of documents, optionally
+ * grouped by one or more fields or functions. You can specify:
+ *
+ *
+ *
**Grouping Fields or Functions:** One or more fields or functions to group the documents
+ * by. For each distinct combination of values in these fields, a separate group is created.
+ * If no grouping fields are provided, a single group containing all documents is used. Not
+ * specifying groups is the same as putting the entire inputs into one group.
+ *
**Accumulators:** One or more accumulation operations to perform within each group. These
+ * are defined using {@link AliasedExpression} expressions, which are typically created by
+ * calling {@link Expression#as(String)} on {@link AggregateFunction} instances. Each
+ * aggregation calculates a value (e.g., sum, average, count) based on the documents within
+ * its group.
+ *
+ *
+ *
Example:
+ *
+ *
{@code
+ * // Calculate the average rating for each genre.
+ * firestore.pipeline().collection("books")
+ * .aggregate(
+ * Aggregate
+ * .withAccumulators(avg("rating").as("avg_rating"))
+ * .withGroups("genre"));
+ * }
+ *
+ * @param aggregate An {@link Aggregate} object that specifies the grouping fields (if any) and
+ * the aggregation operations to perform.
+ * @return A new {@code Pipeline} object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline aggregate(Aggregate aggregate) {
+ return append(aggregate);
+ }
+
+ @BetaApi
+ public Pipeline aggregate(Aggregate aggregate, AggregateOptions options) {
+ return append(aggregate.withOptions(options));
+ }
+
+ /**
+ * Returns a set of distinct field values from the inputs to this stage.
+ *
+ *
This stage run through the results from previous stages to include only results with unique
+ * combinations of values for the specified fields and produce these fields as the output.
+ *
+ *
Example:
+ *
+ *
{@code
+ * // Get a list of unique genres.
+ * firestore.pipeline().collection("books")
+ * .distinct("genre");
+ * }
+ *
+ * @param fields The fields to consider when determining distinct values.
+ * @return A new {@code Pipeline} object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline distinct(String... fields) {
+ return append(new Distinct(PipelineUtils.fieldNamesToMap(fields)));
+ }
+
+ /**
+ * Returns a set of distinct {@link Expression} values from the inputs to this stage.
+ *
+ *
This stage run through the results from previous stages to include only results with unique
+ * combinations of {@link Expression} values ({@link Field}, {@link FunctionExpression}, etc).
+ *
+ *
The parameters to this stage are defined using {@link Selectable} expressions, which can be:
+ *
+ *
+ *
{@link Field}: References an existing document field.
+ *
{@link FunctionExpression}: Represents the result of a function with an assigned alias
+ * name using {@link Expression#as(String)}
+ *
+ *
+ *
Example:
+ *
+ *
{@code
+ * // Get a list of unique author names in uppercase and genre combinations.
+ * firestore.pipeline().collection("books")
+ * .distinct(toUppercase(field("author")).as("authorName"), field("genre"))
+ * .select("authorName");
+ * }
+ *
+ * @param selectables The {@link Selectable} expressions to consider when determining distinct
+ * value combinations.
+ * @return A new {@code Pipeline} object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline distinct(Selectable... selectables) {
+ return append(new Distinct(PipelineUtils.selectablesToMap(selectables)));
+ }
+
+ /**
+ * Performs vector distance (similarity) search with given parameters to the stage inputs.
+ *
+ *
This stage adds a "nearest neighbor search" capability to your pipelines. Given a field that
+ * stores vectors and a target vector, this stage will identify and return the inputs whose vector
+ * field is closest to the target vector, using the parameters specified in `options`.
+ *
+ *
Example:
+ *
+ *
{@code
+ * // Find books with similar "topicVectors" to the given targetVector
+ * firestore.pipeline().collection("books")
+ * .findNearest("topicVectors", targetVector, FindNearest.DistanceMeasure.COSINE,
+ * new FindNearestOptions()
+ * .withLimit(10)
+ * .withDistanceField("distance"));
+ * }
+ *
+ * @param fieldName The name of the field containing the vector data. This field should store
+ * {@link VectorValue}.
+ * @param vector The target vector to compare against.
+ * @param distanceMeasure The distance measure to use: cosine, euclidean, etc.
+ * @param options Configuration options for the nearest neighbor search, such as limit and output
+ * distance field name.
+ * @return A new {@code Pipeline} object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline findNearest(
+ String fieldName,
+ double[] vector,
+ FindNearest.DistanceMeasure distanceMeasure,
+ FindNearestOptions options) {
+ return findNearest(field(fieldName), vector, distanceMeasure, options);
+ }
+
+ /**
+ * Performs vector distance (similarity) search with given parameters to the stage inputs.
+ *
+ *
This stage adds a "nearest neighbor search" capability to your pipelines. Given an
+ * expression that evaluates to a vector and a target vector, this stage will identify and return
+ * the inputs whose vector expression is closest to the target vector, using the parameters
+ * specified in `options`.
+ *
+ *
Example:
+ *
+ *
{@code
+ * // Find books with similar "topicVectors" to the given targetVector
+ * firestore.pipeline().collection("books")
+ * .findNearest(
+ * field("topicVectors"),
+ * targetVector,
+ * FindNearest.DistanceMeasure.COSINE,
+ * new FindNearestOptions()
+ * .withLimit(10)
+ * .withDistanceField("distance"));
+ * }
+ *
+ * @param property The expression that evaluates to a vector value using the stage inputs.
+ * @param vector The target vector to compare against.
+ * @param distanceMeasure The distance measure to use: cosine, euclidean, etc.
+ * @param options Configuration options for the nearest neighbor search, such as limit and output
+ * distance field name.
+ * @return A new {@code Pipeline} object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline findNearest(
+ Expression property,
+ double[] vector,
+ FindNearest.DistanceMeasure distanceMeasure,
+ FindNearestOptions options) {
+ // Implementation for findNearest (add the FindNearest stage if needed)
+ return append(new FindNearest(property, new VectorValue(vector), distanceMeasure, options));
+ }
+
+ /**
+ * Sorts the documents from previous stages based on one or more {@link Ordering} criteria.
+ *
+ *
This stage allows you to order the results of your pipeline. You can specify multiple {@link
+ * Ordering} instances to sort by multiple fields in ascending or descending order. If documents
+ * have the same value for a field used for sorting, the next specified ordering will be used. If
+ * all orderings result in equal comparison, the documents are considered equal and the order is
+ * unspecified.
+ *
+ *
Example:
+ *
+ *
{@code
+ * // Sort books by rating in descending order, and then by title in ascending order for books with the same rating
+ * firestore.pipeline().collection("books")
+ * .sort(
+ * Ordering.of("rating").descending(),
+ * Ordering.of("title") // Ascending order is the default
+ * );
+ * }
+ *
+ * @param orders One or more {@link Ordering} instances specifying the sorting criteria.
+ * @return A new {@code Pipeline} object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline sort(Ordering... orders) {
+ return append(new Sort(ImmutableList.copyOf(orders)));
+ }
+
+ /**
+ * Fully overwrites all fields in a document with those coming from a nested map.
+ *
+ *
This stage allows you to emit a map value as a document. Each key of the map becomes a field
+ * on the document that contains the corresponding value.
+ *
+ *
+ *
+ * @param fieldName The name of the field containing the nested map.
+ * @return A new {@code Pipeline} object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline replaceWith(String fieldName) {
+ return replaceWith(field(fieldName));
+ }
+
+ /**
+ * Fully overwrites all fields in a document with those coming from a nested map.
+ *
+ *
This stage allows you to emit a map value as a document. Each key of the map becomes a field
+ * on the document that contains the corresponding value.
+ *
+ *
+ *
+ * @param expr The {@link Expression} field containing the nested map.
+ * @return A new {@code Pipeline} object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline replaceWith(Expression expr) {
+ return append(new ReplaceWith(expr));
+ }
+
+ /**
+ * Performs a pseudo-random sampling of the documents from the previous stage.
+ *
+ *
This stage will filter documents pseudo-randomly. The 'limit' parameter specifies the number
+ * of documents to emit from this stage, but if there are fewer documents from previous stage than
+ * the 'limit' parameter, then no filtering will occur and all documents will pass through.
+ *
+ *
+ *
+ * @param limit The number of documents to emit, if possible.
+ * @return A new {@code Pipeline} object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline sample(int limit) {
+ return sample(Sample.withDocLimit(limit));
+ }
+
+ /**
+ * Performs a pseudo-random sampling of the documents from the previous stage.
+ *
+ *
This stage will filter documents pseudo-randomly. The 'options' parameter specifies how
+ * sampling will be performed. See {@code SampleOptions} for more information.
+ *
+ *
+ *
+ * @param sample The {@code Sample} specifies how sampling is performed.
+ * @return A new {@code Pipeline} object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline sample(Sample sample) {
+ return append(sample);
+ }
+
+ /**
+ * Performs union of all documents from two pipelines, including duplicates.
+ *
+ *
This stage will pass through documents from previous stage, and also pass through documents
+ * from previous stage of the `other` {@code Pipeline} given in parameter. The order of documents
+ * emitted from this stage is undefined.
+ *
+ *
+ *
+ * @param other The other {@code Pipeline} that is part of union.
+ * @return A new {@code Pipeline} object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline union(Pipeline other) {
+ return append(new Union(other));
+ }
+
+ /**
+ * Produces a document for each element in array found in previous stage document.
+ *
+ *
For each previous stage document, this stage will emit zero or more augmented documents. The
+ * input array found in the previous stage document field specified by the `fieldName` parameter,
+ * will for each input array element produce an augmented document. The input array element will
+ * augment the previous stage document by replacing the field specified by `fieldName` parameter
+ * with the element value.
+ *
+ *
In other words, the field containing the input array will be removed from the augmented
+ * document and replaced by the corresponding array element.
+ *
+ *
Example:
+ *
+ *
{@code
+ * // Input:
+ * // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": [ "comedy", "space", "adventure" ], ... }
+ *
+ * // Emit a book document for each tag of the book.
+ * firestore.pipeline().collection("books")
+ * .unnest("tags", "tag");
+ *
+ * // Output:
+ * // { "title": "The Hitchhiker's Guide to the Galaxy", "tag": "comedy", ... }
+ * // { "title": "The Hitchhiker's Guide to the Galaxy", "tag": "space", ... }
+ * // { "title": "The Hitchhiker's Guide to the Galaxy", "tag": "adventure", ... }
+ * }
+ *
+ * @param fieldName The name of the field containing the array.
+ * @return A new {@code Pipeline} object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline unnest(String fieldName, String alias) {
+ // return unnest(field(fieldName));
+ return append(new Unnest(field(fieldName), alias));
+ }
+
+ // /**
+ // * Produces a document for each element in array found in previous stage document.
+ // *
+ // *
For each previous stage document, this stage will emit zero or more augmented documents.
+ // * The input array found in the specified by {@code Selectable} expression parameter, will for
+ // * each input array element produce an augmented document. The input array element will augment
+ // * the previous stage document by assigning the {@code Selectable} alias the element value.
+ // *
+ // *
Example:
+ // *
+ // *
{@code
+ // * // Input:
+ // * // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": [ "comedy", "space",
+ // "adventure" ], ... }
+ // *
+ // * // Emit a book document for each tag of the book.
+ // * firestore.pipeline().collection("books")
+ // * .unnest(field("tags").as("tag"));
+ // *
+ // * // Output:
+ // * // { "title": "The Hitchhiker's Guide to the Galaxy", "tag": "comedy", "tags": [ "comedy",
+ // "space", "adventure" ], ... }
+ // * // { "title": "The Hitchhiker's Guide to the Galaxy", "tag": "space", "tags": [ "comedy",
+ // "space", "adventure" ], ... }
+ // * // { "title": "The Hitchhiker's Guide to the Galaxy", "tag": "adventure", "tags": [
+ // "comedy", "space", "adventure" ], ... }
+ // * }
+ // *
+ // * @param field The expression that evaluates to the input array.
+ // * @return A new {@code Pipeline} object with this stage appended to the stage list.
+ // */
+ // @BetaApi
+ // public Pipeline unnest(Selectable field) {
+ // return append(new Unnest(field));
+ // }
+
+ /**
+ * Produces a document for each element in array found in previous stage document.
+ *
+ *
For each previous stage document, this stage will emit zero or more augmented documents. The
+ * input array found in the previous stage document field specified by the `fieldName` parameter,
+ * will for each input array element produce an augmented document. The input array element will
+ * augment the previous stage document by replacing the field specified by `fieldName` parameter
+ * with the element value.
+ *
+ *
In other words, the field containing the input array will be removed from the augmented
+ * document and replaced by the corresponding array element.
+ *
+ *
Example:
+ *
+ *
{@code
+ * // Input:
+ * // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": [ "comedy", "space", "adventure" ], ... }
+ *
+ * // Emit a book document for each tag of the book.
+ * firestore.pipeline().collection("books")
+ * .unnest("tags", "tag", new UnnestOptions().withIndexField("tagIndex"));
+ *
+ * // Output:
+ * // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 0, "tag": "comedy", ... }
+ * // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 1, "tag": "space", ... }
+ * // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 2, "tag": "adventure", ... }
+ * }
+ *
+ * @param fieldName The name of the field containing the array.
+ * @param options The {@code UnnestOptions} options.
+ * @return A new {@code Pipeline} object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline unnest(String fieldName, String alias, UnnestOptions options) {
+ return append(new Unnest(field(fieldName), alias, options));
+ }
+
+ /**
+ * Produces a document for each element in array found in previous stage document.
+ *
+ *
For each previous stage document, this stage will emit zero or more augmented documents. The
+ * input array found in the previous stage document field specified by the `fieldName` parameter,
+ * will for each input array element produce an augmented document. The input array element will
+ * augment the previous stage document by replacing the field specified by `fieldName` parameter
+ * with the element value.
+ *
+ *
In other words, the field containing the input array will be removed from the augmented
+ * document and replaced by the corresponding array element.
+ *
+ *
Example:
+ *
+ *
{@code
+ * // Input:
+ * // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": [ "comedy", "space", "adventure" ], ... }
+ *
+ * // Emit a book document for each tag of the book.
+ * firestore.pipeline().collection("books")
+ * .unnest(field("tags").as("tag"));
+ *
+ * // Output:
+ * // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 0, "tag": "comedy", ... }
+ * // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 1, "tag": "space", ... }
+ * // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 2, "tag": "adventure", ... }
+ * }
+ *
+ * @param expr The name of the expression containing the array.
+ * @return A new {@code Pipeline} object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline unnest(Selectable expr) {
+ return append(new Unnest(expr));
+ }
+
+ // /**
+ // * Produces a document for each element in array found in previous stage document.
+ // *
+ // *
For each previous stage document, this stage will emit zero or more augmented documents.
+ // * The input array found in the specified by {@code Selectable} expression parameter, will for
+ // * each input array element produce an augmented document. The input array element will augment
+ // * the previous stage document by assigning the {@code Selectable} alias the element value.
+ // *
+ // *
Example:
+ // *
+ // *
{@code
+ // * // Input:
+ // * // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": [ "comedy", "space",
+ // "adventure" ], ... }
+ // *
+ // * // Emit a book document for each tag of the book.
+ // * firestore.pipeline().collection("books")
+ // * .unnest(field("tags").as("tag"), UnnestOptions.indexField("tagIndex"));
+ // *
+ // * // Output:
+ // * // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 0, "tag": "comedy",
+ // "tags": [ "comedy", "space", "adventure" ], ... }
+ // * // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 1, "tag": "space", "tags":
+ // [ "comedy", "space", "adventure" ], ... }
+ // * // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 2, "tag": "adventure",
+ // "tags": [ "comedy", "space", "adventure" ], ... }
+ // * }
+ // *
+ // * @param field The expression that evaluates to the input array.
+ // * @param options The {@code UnnestOptions} options.
+ // * @return A new {@code Pipeline} object with this stage appended to the stage list.
+ // */
+ // @BetaApi
+ // public Pipeline unnest(Selectable field, UnnestOptions options) {
+ // return append(new Unnest(field, options));
+ // }
+
+ /**
+ * Adds a generic stage to the pipeline.
+ *
+ *
This method provides a flexible way to extend the pipeline's functionality by adding custom
+ * stages. Each generic stage is defined by a unique `name` and a set of `params` that control its
+ * behavior.
+ *
+ *
Example (Assuming there is no "where" stage available in SDK):
+ *
+ *
{@code
+ * // Assume we don't have a built-in "where" stage
+ * Map whereParams = new HashMap<>();
+ * whereParams.put("condition", field("published").lt(1900));
+ *
+ * firestore.pipeline().collection("books")
+ * .genericStage("where", Lists.newArrayList(field("published").lt(1900)), new RawOptions()) // Custom "where" stage
+ * .select("title", "author");
+ * }
+ *
+ * @return A new {@code Pipeline} object with this stage appended to the stage list.
+ */
+ @BetaApi
+ public Pipeline rawStage(RawStage stage) {
+ return append(stage);
+ }
+
+ /**
+ * Executes this pipeline and returns a future to represent the asynchronous operation.
+ *
+ *
The returned {@link ApiFuture} can be used to track the progress of the pipeline execution
+ * and retrieve the results (or handle any errors) asynchronously.
+ *
+ *
The pipeline results are returned as a list of {@link PipelineResult} objects. Each {@link
+ * PipelineResult} typically represents a single key/value map that has passed through all the
+ * stages of the pipeline, however this might differ depends on the stages involved in the
+ * pipeline. For example:
+ *
+ *
+ *
If there are no stages or only transformation stages, each {@link PipelineResult}
+ * represents a single document.
+ *
If there is an aggregation, only a single {@link PipelineResult} is returned,
+ * representing the aggregated results over the entire dataset .
+ *
If there is an aggregation stage with grouping, each {@link PipelineResult} represents a
+ * distinct group and its associated aggregated values.
+ *
+ *
+ * @return An {@link ApiFuture} representing the asynchronous pipeline execution.
+ */
+ @BetaApi
+ public ApiFuture execute() {
+ return execute(new PipelineExecuteOptions(), null, null);
+ }
+
+ @BetaApi
+ public ApiFuture execute(PipelineExecuteOptions options) {
+ return execute(options, null, null);
+ }
+
+ MetricsContext createMetricsContext(String methodName) {
+ return rpcContext.getFirestore().getOptions().getMetricsUtil().createMetricsContext(methodName);
+ }
+
+ /**
+ * Executes this pipeline, providing results to the given {@link ApiStreamObserver} as they become
+ * available.
+ *
+ *
This method allows you to process pipeline results in a streaming fashion, rather than
+ * waiting for the entire pipeline execution to complete. The provided {@link ApiStreamObserver}
+ * will receive:
+ *
+ *
+ *
**onNext(PipelineResult):** Called for each {@link PipelineResult} produced by the
+ * pipeline. Each {@link PipelineResult} typically represents a single key/value map that
+ * has passed through all the stages. However, the exact structure might differ based on the
+ * stages involved in the pipeline (as described in {@link #execute()}).
+ *
**onError(Throwable):** Called if an error occurs during pipeline execution.
+ *
**onCompleted():** Called when the pipeline has finished processing all documents.
+ *
+ *
+ * @param observer The {@link ApiStreamObserver} to receive pipeline results and events.
+ */
+ @BetaApi
+ public void execute(ApiStreamObserver observer) {
+ MetricsContext metricsContext =
+ createMetricsContext(TelemetryConstants.METHOD_NAME_EXECUTE_PIPELINE_EXECUTE);
+
+ executeInternal(
+ new PipelineExecuteOptions(),
+ null,
+ null,
+ new PipelineResultObserver() {
+ @Override
+ public void onNext(PipelineResult result) {
+ observer.onNext(result);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ observer.onError(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ observer.onCompleted();
+ }
+ },
+ metricsContext);
+ }
+
+ ApiFuture execute(
+ @Nonnull PipelineExecuteOptions options,
+ @Nullable final ByteString transactionId,
+ @Nullable com.google.protobuf.Timestamp readTime) {
+ TraceUtil.Span span =
+ rpcContext
+ .getFirestore()
+ .getOptions()
+ .getTraceUtil()
+ .startSpan(TelemetryConstants.METHOD_NAME_PIPELINE_EXECUTE);
+
+ MetricsContext metricsContext =
+ createMetricsContext(TelemetryConstants.METHOD_NAME_EXECUTE_PIPELINE_EXECUTE);
+
+ try (Scope ignored = span.makeCurrent()) {
+ SettableApiFuture futureResult = SettableApiFuture.create();
+
+ executeInternal(
+ options,
+ transactionId,
+ readTime,
+ new PipelineResultObserver() {
+ final List results = new ArrayList<>();
+
+ @Override
+ public void onCompleted() {
+ futureResult.set(
+ new Snapshot(Pipeline.this, results, getExecutionTime(), getExplainStats()));
+ }
+
+ @Override
+ public void onNext(PipelineResult result) {
+ results.add(result);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ futureResult.setException(t);
+ }
+ },
+ metricsContext);
+
+ span.endAtFuture(futureResult);
+ return futureResult;
+ } catch (Exception error) {
+ span.end(error);
+ metricsContext.recordLatency(MetricType.END_TO_END_LATENCY, error);
+ throw error;
+ }
+ }
+
+ void executeInternal(
+ @Nonnull PipelineExecuteOptions options,
+ @Nullable final ByteString transactionId,
+ @Nullable com.google.protobuf.Timestamp readTime,
+ PipelineResultObserver observer,
+ MetricsContext metricsContext) {
+ ExecutePipelineRequest.Builder request =
+ ExecutePipelineRequest.newBuilder()
+ .setDatabase(rpcContext.getDatabaseName())
+ .setStructuredPipeline(
+ StructuredPipeline.newBuilder()
+ .setPipeline(toProto())
+ .putAllOptions(StageUtils.toMap(options))
+ .build());
+
+ if (transactionId != null) {
+ request.setTransaction(transactionId);
+ }
+
+ if (readTime != null) {
+ request.setReadTime(readTime);
+ }
+
+ pipelineInternalStream(
+ request.build(),
+ new PipelineResultObserver() {
+ @Override
+ public void onCompleted() {
+ observer.setExplainStats(getExplainStats());
+ observer.setExecutionTime(getExecutionTime());
+ observer.onCompleted();
+ }
+
+ @Override
+ public void onNext(PipelineResult result) {
+ observer.onNext(result);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ observer.onError(t);
+ }
+ },
+ metricsContext);
+ }
+
+ @InternalApi
+ private com.google.firestore.v1.Pipeline toProto() {
+ return com.google.firestore.v1.Pipeline.newBuilder()
+ .addAllStages(stages.transform(StageUtils::toStageProto))
+ .build();
+ }
+
+ @InternalApi
+ public com.google.firestore.v1.Value toProtoValue() {
+ return Value.newBuilder().setPipelineValue(toProto()).build();
+ }
+
+ private void pipelineInternalStream(
+ ExecutePipelineRequest request,
+ PipelineResultObserver resultObserver,
+ MetricsContext metricsContext) {
+ TraceUtil traceUtil = rpcContext.getFirestore().getOptions().getTraceUtil();
+
+ // To reduce the size of traces, we only register one event for every 100 responses
+ // that we receive from the server.
+ final int NUM_RESPONSES_PER_TRACE_EVENT = 100;
+
+ TraceUtil.Span currentSpan = traceUtil.currentSpan();
+ currentSpan.addEvent(
+ TelemetryConstants.METHOD_NAME_EXECUTE_PIPELINE,
+ new ImmutableMap.Builder()
+ .put(ATTRIBUTE_KEY_IS_TRANSACTIONAL, request.hasTransaction())
+ .build());
+
+ ResponseObserver observer =
+ new ResponseObserver() {
+ Timestamp executionTime = null;
+ boolean firstResponse = false;
+ int numDocuments = 0;
+ boolean hasCompleted = false;
+
+ @Override
+ public void onStart(StreamController controller) {
+ // No action needed in onStart
+ }
+
+ @Override
+ public void onResponse(ExecutePipelineResponse response) {
+ if (!firstResponse) {
+ firstResponse = true;
+ currentSpan.addEvent(
+ TelemetryConstants.METHOD_NAME_EXECUTE_PIPELINE + ": First Response");
+ metricsContext.recordLatency(MetricType.FIRST_RESPONSE_LATENCY);
+ }
+
+ if (response.hasExplainStats()) {
+ resultObserver.setExplainStats(
+ new ExplainStats(response.getExplainStats().getData()));
+ }
+
+ if (response.hasExecutionTime()) {
+ executionTime = Timestamp.fromProto(response.getExecutionTime());
+ }
+
+ if (response.getResultsCount() > 0) {
+ numDocuments += response.getResultsCount();
+ if (numDocuments % NUM_RESPONSES_PER_TRACE_EVENT == 0) {
+ currentSpan.addEvent(
+ TelemetryConstants.METHOD_NAME_EXECUTE_PIPELINE
+ + ": Received "
+ + numDocuments
+ + " results");
+ }
+
+ for (Document doc : response.getResultsList()) {
+ resultObserver.onNext(PipelineResult.fromDocument(rpcContext, executionTime, doc));
+ }
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ currentSpan.addEvent(
+ TelemetryConstants.METHOD_NAME_EXECUTE_PIPELINE + ": Error",
+ ImmutableMap.of("error.message", throwable.toString()));
+ metricsContext.recordLatency(MetricType.END_TO_END_LATENCY, throwable);
+ resultObserver.onError(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ if (hasCompleted) {
+ return;
+ }
+ hasCompleted = true;
+
+ metricsContext.recordLatency(MetricType.END_TO_END_LATENCY);
+
+ currentSpan.addEvent(
+ TelemetryConstants.METHOD_NAME_EXECUTE_PIPELINE + ": Completed",
+ ImmutableMap.of(ATTRIBUTE_KEY_DOC_COUNT, numDocuments));
+ resultObserver.onCompleted(executionTime);
+ }
+ };
+
+ logger.log(Level.FINEST, "Sending pipeline request: " + request.getStructuredPipeline());
+
+ rpcContext.streamRequest(request, observer, rpcContext.getClient().executePipelineCallable());
+ }
+
+ private interface ResultObserver extends ApiStreamObserver {
+ void onCompleted(Timestamp executionTime);
+
+ void setExplainStats(ExplainStats explainStats);
+
+ void setExecutionTime(Timestamp executionTime);
+ }
+
+ @InternalExtensionOnly
+ abstract static class PipelineResultObserver implements ResultObserver {
+ private Timestamp executionTime;
+ private ExplainStats explainStats;
+
+ @Override
+ public void onCompleted(Timestamp executionTime) {
+ this.executionTime = executionTime;
+ this.onCompleted();
+ }
+
+ public Timestamp getExecutionTime() {
+ return executionTime;
+ }
+
+ public ExplainStats getExplainStats() {
+ return explainStats;
+ }
+
+ @Override
+ public void setExplainStats(ExplainStats explainStats) {
+ this.explainStats = explainStats;
+ }
+
+ @Override
+ public void setExecutionTime(Timestamp executionTime) {
+ this.executionTime = executionTime;
+ }
+ }
+}
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/PipelineResult.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/PipelineResult.java
new file mode 100644
index 000000000..166dd4a0f
--- /dev/null
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/PipelineResult.java
@@ -0,0 +1,456 @@
+/*
+ * Copyright 2017 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.firestore;
+
+import com.google.api.core.BetaApi;
+import com.google.api.core.InternalExtensionOnly;
+import com.google.cloud.Timestamp;
+import com.google.cloud.firestore.encoding.CustomClassMapper;
+import com.google.common.base.Preconditions;
+import com.google.firestore.v1.Document;
+import com.google.firestore.v1.Value;
+import com.google.firestore.v1.Write;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * A PipelineResult contains data read from a Firestore Pipeline. The data can be extracted with the
+ * {@link #getData()} or {@link #get(String)} methods.
+ *
+ *
If the PipelineResult represents a non-document result, getReference() will return a null
+ * value.
+ *
+ *
Subclassing Note: Firestore classes are not meant to be subclassed except for use in
+ * test mocks. Subclassing is not supported in production code and new SDK releases may break code
+ * that does so.
+ */
+@InternalExtensionOnly
+@BetaApi
+public final class PipelineResult {
+
+ private final FirestoreRpcContext> rpcContext;
+ @Nullable private final DocumentReference docRef;
+ @Nullable private final Map fields;
+ @Nonnull private final Timestamp executionTime;
+ @Nullable private final Timestamp updateTime;
+ @Nullable private final Timestamp createTime;
+
+ PipelineResult(
+ FirestoreRpcContext> rpcContext,
+ @Nullable DocumentReference docRef,
+ @Nullable Map fields,
+ @Nonnull Timestamp executionTime,
+ @Nullable Timestamp updateTime,
+ @Nullable Timestamp createTime) { // Elevated access level for mocking.
+ this.rpcContext = rpcContext;
+ this.docRef = docRef;
+ this.fields = fields;
+ this.executionTime = executionTime;
+ if (updateTime != null && updateTime.equals(Timestamp.ofTimeMicroseconds(0))) {
+ updateTime = null;
+ }
+ this.updateTime = updateTime;
+
+ if (createTime != null && createTime.equals(Timestamp.ofTimeMicroseconds(0))) {
+ createTime = null;
+ }
+ this.createTime = createTime;
+ }
+
+ /**
+ * Returns the ID of the document represented by this result. Returns null if this result is not
+ * corresponding to a Firestore document.
+ */
+ @Nullable
+ @BetaApi
+ public String getId() {
+ return docRef.getId();
+ }
+
+ static PipelineResult fromDocument(
+ FirestoreRpcContext> rpcContext, Timestamp executionTime, Document document) {
+ return new PipelineResult(
+ rpcContext,
+ document.getName().isEmpty()
+ ? null
+ : new DocumentReference(rpcContext, ResourcePath.create(document.getName())),
+ document.getFieldsMap(),
+ executionTime,
+ Timestamp.fromProto(document.getUpdateTime()),
+ Timestamp.fromProto(document.getCreateTime()));
+ }
+
+ /** Returns the time at which the pipeline producing this result is executed. */
+ @Nullable
+ @BetaApi
+ public Timestamp getExecutionTime() {
+ return executionTime;
+ }
+
+ /**
+ * Returns the time at which this document was last updated. Returns null if this result is not
+ * corresponding to a Firestore document.
+ */
+ @Nullable
+ @BetaApi
+ public Timestamp getUpdateTime() {
+ return updateTime;
+ }
+
+ /**
+ * Returns the time at which this document was created. Returns null if this result is not
+ * corresponding to a Firestore document.
+ */
+ @Nullable
+ @BetaApi
+ public Timestamp getCreateTime() {
+ return createTime;
+ }
+
+ /**
+ * Returns whether or not the field exists in the document. Returns false if the document does not
+ * exist.
+ *
+ * @return whether the document existed in this snapshot.
+ */
+ @BetaApi
+ public boolean exists() {
+ return fields != null;
+ }
+
+ /**
+ * Returns the fields of the result as a Map or null if the result doesn't exist. Field values
+ * will be converted to their native Java representation.
+ *
+ * @return The fields of the document as a Map or null if the result doesn't exist.
+ */
+ @Nonnull
+ @BetaApi
+ public Map getData() {
+ if (fields == null) {
+ return null;
+ }
+
+ Map decodedFields = new HashMap<>();
+ for (Map.Entry entry : fields.entrySet()) {
+ Object decodedValue = UserDataConverter.decodeValue(rpcContext, entry.getValue());
+ decodedFields.put(entry.getKey(), decodedValue);
+ }
+ return decodedFields;
+ }
+
+ /**
+ * Returns the contents of the document converted to a POJO or null if the result doesn't exist.
+ *
+ * @param valueType The Java class to create
+ * @return The contents of the document in an object of type T or null if the result doesn't
+ * exist.
+ */
+ @Nullable
+ @BetaApi
+ T toObject(@Nonnull Class valueType) {
+ Map data = getData();
+ return data == null ? null : CustomClassMapper.convertToCustomClass(data, valueType, docRef);
+ }
+
+ /**
+ * Returns whether or not the field exists in the document. Returns false if the result does not
+ * exist.
+ *
+ * @param field the path to the field.
+ * @return true iff the field exists.
+ */
+ @BetaApi
+ public boolean contains(@Nonnull String field) {
+ return contains(FieldPath.fromDotSeparatedString(field));
+ }
+
+ /**
+ * Returns whether or not the field exists in the document. Returns false if the result does not
+ * exist.
+ *
+ * @param fieldPath the path to the field.
+ * @return true iff the field exists.
+ */
+ @BetaApi
+ public boolean contains(@Nonnull FieldPath fieldPath) {
+ return this.extractField(fieldPath) != null;
+ }
+
+ /**
+ * Returns the value at the field or null if the field doesn't exist.
+ *
+ * @param field The path to the field.
+ * @return The value at the given field or null.
+ */
+ @Nullable
+ @BetaApi
+ public Object get(@Nonnull String field) {
+ return get(FieldPath.fromDotSeparatedString(field));
+ }
+
+ /**
+ * Returns the value at the field, converted to a POJO, or null if the field or result doesn't
+ * exist.
+ *
+ * @param field The path to the field
+ * @param valueType The Java class to convert the field value to.
+ * @return The value at the given field or null.
+ */
+ @Nullable
+ @BetaApi
+ public T get(@Nonnull String field, @Nonnull Class valueType) {
+ return get(FieldPath.fromDotSeparatedString(field), valueType);
+ }
+
+ /**
+ * Returns the value at the field or null if the field doesn't exist.
+ *
+ * @param fieldPath The path to the field.
+ * @return The value at the given field or null.
+ */
+ @Nullable
+ @BetaApi
+ public Object get(@Nonnull FieldPath fieldPath) {
+ Value value = extractField(fieldPath);
+
+ if (value == null) {
+ return null;
+ }
+
+ return UserDataConverter.decodeValue(rpcContext, value);
+ }
+
+ /**
+ * Returns the value at the field, converted to a POJO, or null if the field or result doesn't
+ * exist.
+ *
+ * @param fieldPath The path to the field
+ * @param valueType The Java class to convert the field value to.
+ * @return The value at the given field or null.
+ */
+ @Nullable
+ @BetaApi
+ public T get(@Nonnull FieldPath fieldPath, Class valueType) {
+ Object data = get(fieldPath);
+ return data == null ? null : CustomClassMapper.convertToCustomClass(data, valueType, docRef);
+ }
+
+ /** Returns the Value Proto at 'fieldPath'. Returns null if the field was not found. */
+ @Nullable
+ Value extractField(@Nonnull FieldPath fieldPath) {
+ Value value = null;
+
+ if (fields != null) {
+ Iterator components = fieldPath.getSegments().iterator();
+ value = fields.get(components.next());
+
+ while (value != null && components.hasNext()) {
+ if (value.getValueTypeCase() != Value.ValueTypeCase.MAP_VALUE) {
+ return null;
+ }
+ value = value.getMapValue().getFieldsOrDefault(components.next(), null);
+ }
+ }
+
+ return value;
+ }
+
+ /**
+ * Returns the value of the field as a boolean.
+ *
+ * @param field The path to the field.
+ * @throws RuntimeException if the value is not a Boolean.
+ * @return The value of the field.
+ */
+ @Nullable
+ @BetaApi
+ public Boolean getBoolean(@Nonnull String field) {
+ return (Boolean) get(field);
+ }
+
+ /**
+ * Returns the value of the field as a double.
+ *
+ * @param field The path to the field.
+ * @throws RuntimeException if the value is not a Number.
+ * @return The value of the field.
+ */
+ @Nullable
+ @BetaApi
+ public Double getDouble(@Nonnull String field) {
+ Number number = (Number) get(field);
+ return number == null ? null : number.doubleValue();
+ }
+
+ /**
+ * Returns the value of the field as a String.
+ *
+ * @param field The path to the field.
+ * @throws RuntimeException if the value is not a String.
+ * @return The value of the field.
+ */
+ @Nullable
+ @BetaApi
+ public String getString(@Nonnull String field) {
+ return (String) get(field);
+ }
+
+ /**
+ * Returns the value of the field as a long.
+ *
+ * @param field The path to the field.
+ * @throws RuntimeException if the value is not a Number.
+ * @return The value of the field.
+ */
+ @Nullable
+ @BetaApi
+ public Long getLong(@Nonnull String field) {
+ Number number = (Number) get(field);
+ return number == null ? null : number.longValue();
+ }
+
+ /**
+ * Returns the value of the field as a Date.
+ *
+ * @param field The path to the field.
+ * @throws RuntimeException if the value is not a Date.
+ * @return The value of the field.
+ */
+ @Nullable
+ @BetaApi
+ public Date getDate(@Nonnull String field) {
+ Timestamp timestamp = getTimestamp(field);
+ return timestamp == null ? null : timestamp.toDate();
+ }
+
+ /**
+ * Returns the value of the field as a {@link Timestamp}.
+ *
+ * @param field The path to the field.
+ * @throws RuntimeException if the value is not a Date.
+ * @return The value of the field.
+ */
+ @Nullable
+ @BetaApi
+ public Timestamp getTimestamp(@Nonnull String field) {
+ return (Timestamp) get(field);
+ }
+
+ /**
+ * Returns the value of the field as a Blob.
+ *
+ * @param field The path to the field.
+ * @throws RuntimeException if the value is not a Blob.
+ * @return The value of the field.
+ */
+ @Nullable
+ @BetaApi
+ public Blob getBlob(@Nonnull String field) {
+ return (Blob) get(field);
+ }
+
+ /**
+ * Returns the value of the field as a GeoPoint.
+ *
+ * @param field The path to the field.
+ * @throws RuntimeException if the value is not a GeoPoint.
+ * @return The value of the field.
+ */
+ @Nullable
+ @BetaApi
+ public GeoPoint getGeoPoint(@Nonnull String field) {
+ return (GeoPoint) get(field);
+ }
+
+ /**
+ * Gets the reference to the document.
+ *
+ * @return The reference to the document.
+ */
+ @BetaApi
+ public DocumentReference getReference() {
+ return docRef;
+ }
+
+ /** Checks whether this DocumentSnapshot contains any fields. */
+ boolean isEmpty() {
+ return fields == null || fields.isEmpty();
+ }
+
+ Map getProtoFields() {
+ return fields;
+ }
+
+ Write.Builder toPb() {
+ Preconditions.checkState(exists(), "Can't call toDocument() on a document that doesn't exist");
+ Write.Builder write = Write.newBuilder();
+ Document.Builder document = write.getUpdateBuilder();
+ document.setName(docRef.getName());
+ document.putAllFields(fields);
+ return write;
+ }
+
+ Document.Builder toDocumentPb() {
+ Preconditions.checkState(exists(), "Can't call toDocument() on a document that doesn't exist");
+ Document.Builder document = Document.newBuilder();
+ return document
+ .setName(docRef.getName())
+ .putAllFields(fields)
+ .setCreateTime(createTime.toProto())
+ .setUpdateTime(updateTime.toProto());
+ }
+
+ /**
+ * Returns true if the document's data and path in this DocumentSnapshot equals the provided
+ * snapshot.
+ *
+ * @param obj The object to compare against.
+ * @return Whether this DocumentSnapshot is equal to the provided object.
+ */
+ @Override
+ @BetaApi
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || !(obj instanceof PipelineResult)) {
+ return false;
+ }
+ PipelineResult that = (PipelineResult) obj;
+ return Objects.equals(rpcContext, that.rpcContext)
+ && Objects.equals(docRef, that.docRef)
+ && Objects.equals(fields, that.fields);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(rpcContext, docRef, fields);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "%s{doc=%s, fields=%s, executionTime=%s, updateTime=%s, createTime=%s}",
+ getClass().getSimpleName(), docRef, fields, executionTime, updateTime, createTime);
+ }
+}
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/PipelineSource.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/PipelineSource.java
new file mode 100644
index 000000000..1c4d51255
--- /dev/null
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/PipelineSource.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.firestore;
+
+import com.google.api.core.BetaApi;
+import com.google.api.core.InternalApi;
+import com.google.cloud.firestore.pipeline.stages.Collection;
+import com.google.cloud.firestore.pipeline.stages.CollectionGroup;
+import com.google.cloud.firestore.pipeline.stages.CollectionGroupOptions;
+import com.google.cloud.firestore.pipeline.stages.CollectionOptions;
+import com.google.cloud.firestore.pipeline.stages.Database;
+import com.google.cloud.firestore.pipeline.stages.Documents;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+
+/**
+ * A factory for creating {@link Pipeline} instances, which provide a framework for building data
+ * transformation and query pipelines for Firestore.
+ *
+ *
Start by calling {@link Firestore#pipeline()} to obtain an instance of {@code PipelineSource}.
+ * From there, you can use the provided methods (like {@link #collection(String)}) to specify the
+ * data source for your pipeline.
+ *
+ *
This class is typically used to start building Firestore pipelines. It allows you to define
+ * the initial data source for a pipeline.
+ *
+ *
Example Usage:
+ *
+ *
{@code
+ * firestore.pipeline() // Get a PipelineSource instance
+ * .collection("users") // Create a pipeline that operates on a collection
+ * .select("name"); // Add stages to the pipeline
+ * }
+ */
+@BetaApi
+public final class PipelineSource {
+ private final FirestoreRpcContext> rpcContext;
+
+ @InternalApi
+ PipelineSource(FirestoreRpcContext> rpcContext) {
+ this.rpcContext = rpcContext;
+ }
+
+ /**
+ * Creates a new {@link Pipeline} that operates on the specified Firestore collection.
+ *
+ * @param path The path to the Firestore collection (e.g., "users").
+ * @return A new {@code Pipeline} instance targeting the specified collection.
+ */
+ @Nonnull
+ @BetaApi
+ public Pipeline collection(@Nonnull String path) {
+ return collection(path, new CollectionOptions());
+ }
+
+ @Nonnull
+ @BetaApi
+ public Pipeline collection(@Nonnull String path, CollectionOptions options) {
+ return new Pipeline(this.rpcContext, new Collection(path, options));
+ }
+
+ @Nonnull
+ @BetaApi
+ public Pipeline collection(@Nonnull CollectionReference ref) {
+ if (!this.rpcContext.getFirestore().equals(ref.getFirestore())) {
+ throw new IllegalArgumentException(
+ "Invalid CollectionReference. The Firestore instance of the CollectionReference must"
+ + " match the Firestore instance of the PipelineSource.");
+ }
+
+ return collection(ref.getPath(), new CollectionOptions());
+ }
+
+ /**
+ * Creates a new {@link Pipeline} that operates on all documents in a collection group.
+ *
+ *
A collection group consists of all collections with the same ID. For example, if you have
+ * collections named "users" under different documents, you can query them together using a
+ * collection group query.
+ *
+ * @param collectionId The ID of the collection group.
+ * @return A new {@code Pipeline} instance targeting the specified collection group.
+ */
+ @Nonnull
+ @BetaApi
+ public Pipeline collectionGroup(@Nonnull String collectionId) {
+ return collectionGroup(collectionId, new CollectionGroupOptions());
+ }
+
+ @Nonnull
+ @BetaApi
+ public Pipeline collectionGroup(@Nonnull String collectionId, CollectionGroupOptions options) {
+ Preconditions.checkArgument(
+ !collectionId.contains("/"),
+ "Invalid collectionId '%s'. Collection IDs must not contain '/'.",
+ collectionId);
+ return new Pipeline(this.rpcContext, new CollectionGroup(collectionId, options));
+ }
+
+ /**
+ * Creates a new {@link Pipeline} that operates on all documents in the Firestore database.
+ *
+ *
Use this method with caution as it can lead to very large result sets. It is usually only
+ * useful at development stage.
+ *
+ * @return A new {@code Pipeline} instance targeting all documents in the database.
+ */
+ @Nonnull
+ @BetaApi
+ public Pipeline database() {
+ return new Pipeline(this.rpcContext, new Database());
+ }
+
+ /**
+ * Creates a new {@link Pipeline} that operates on a specific set of Firestore documents.
+ *
+ * @param docs The {@link DocumentReference} instances representing the documents to include in
+ * the pipeline.
+ * @return A new {@code Pipeline} instance targeting the specified documents.
+ */
+ @Nonnull
+ @BetaApi
+ public Pipeline documents(DocumentReference... docs) {
+ return new Pipeline(this.rpcContext, Documents.of(docs));
+ }
+
+ /**
+ * Creates a new {@link Pipeline} from the given {@link Query}. Under the hood, this will
+ * translate the query semantics (order by document ID, etc.) to an equivalent pipeline.
+ *
+ * @param query The {@link Query} to translate into the resulting pipeline.
+ * @return A new {@code Pipeline} that is equivalent to the given query.
+ */
+ @Nonnull
+ @BetaApi
+ public Pipeline createFrom(Query query) {
+ return query.pipeline();
+ }
+
+ /**
+ * Creates a new {@link Pipeline} from the given {@link AggregateQuery}. Under the hood, this will
+ * translate the query semantics (order by document ID, etc.) to an equivalent pipeline.
+ *
+ * @param query The {@link AggregateQuery} to translate into the resulting pipeline.
+ * @return A new {@code Pipeline} that is equivalent to the given query.
+ */
+ @Nonnull
+ @BetaApi
+ public Pipeline createFrom(AggregateQuery query) {
+ return query.pipeline();
+ }
+}
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/PipelineUtils.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/PipelineUtils.java
new file mode 100644
index 000000000..f056ba6f0
--- /dev/null
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/PipelineUtils.java
@@ -0,0 +1,248 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.firestore;
+
+import static com.google.cloud.firestore.pipeline.expressions.AggregateFunction.countAll;
+import static com.google.cloud.firestore.pipeline.expressions.Expression.and;
+import static com.google.cloud.firestore.pipeline.expressions.Expression.arrayContainsAny;
+import static com.google.cloud.firestore.pipeline.expressions.Expression.field;
+import static com.google.cloud.firestore.pipeline.expressions.Expression.not;
+import static com.google.cloud.firestore.pipeline.expressions.Expression.nullValue;
+import static com.google.cloud.firestore.pipeline.expressions.Expression.or;
+import static com.google.cloud.firestore.pipeline.expressions.FunctionUtils.aggregateFunctionToValue;
+import static com.google.cloud.firestore.pipeline.expressions.FunctionUtils.exprToValue;
+
+import com.google.api.core.InternalApi;
+import com.google.cloud.firestore.Query.ComparisonFilterInternal;
+import com.google.cloud.firestore.Query.CompositeFilterInternal;
+import com.google.cloud.firestore.Query.FilterInternal;
+import com.google.cloud.firestore.Query.LimitType;
+import com.google.cloud.firestore.Query.UnaryFilterInternal;
+import com.google.cloud.firestore.pipeline.expressions.AggregateFunction;
+import com.google.cloud.firestore.pipeline.expressions.AliasedAggregate;
+import com.google.cloud.firestore.pipeline.expressions.AliasedExpression;
+import com.google.cloud.firestore.pipeline.expressions.BooleanExpression;
+import com.google.cloud.firestore.pipeline.expressions.Expression;
+import com.google.cloud.firestore.pipeline.expressions.Field;
+import com.google.cloud.firestore.pipeline.expressions.Selectable;
+import com.google.common.collect.Lists;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.MapValue;
+import com.google.firestore.v1.Value;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@InternalApi
+public class PipelineUtils {
+ @InternalApi
+ public static Value encodeValue(Object value) {
+ return UserDataConverter.encodeValue(FieldPath.empty(), value, UserDataConverter.ARGUMENT);
+ }
+
+ @InternalApi
+ public static Value encodeValue(Expression value) {
+ return exprToValue(value);
+ }
+
+ @InternalApi
+ public static Value encodeValue(AggregateFunction value) {
+ return aggregateFunctionToValue(value);
+ }
+
+ @InternalApi
+ public static Value encodeValue(String value) {
+ return Value.newBuilder().setStringValue(value).build();
+ }
+
+ @InternalApi
+ public static Value encodeValue(boolean value) {
+ return Value.newBuilder().setBooleanValue(value).build();
+ }
+
+ @InternalApi
+ public static Value encodeValue(long value) {
+ return Value.newBuilder().setIntegerValue(value).build();
+ }
+
+ @InternalApi
+ public static Value encodeValue(double value) {
+ return Value.newBuilder().setDoubleValue(value).build();
+ }
+
+ @InternalApi
+ public static Value encodeValue(Map options) {
+ return Value.newBuilder()
+ .setMapValue(MapValue.newBuilder().putAllFields(options).build())
+ .build();
+ }
+
+ @InternalApi
+ static BooleanExpression toPipelineBooleanExpr(FilterInternal f) {
+ if (f instanceof ComparisonFilterInternal) {
+ ComparisonFilterInternal comparisonFilter = (ComparisonFilterInternal) f;
+ Field field = Field.ofServerPath(comparisonFilter.fieldReference.getFieldPath());
+ Value value = comparisonFilter.value;
+ switch (comparisonFilter.operator) {
+ case LESS_THAN:
+ return and(field.exists(), field.lessThan(value));
+ case LESS_THAN_OR_EQUAL:
+ return and(field.exists(), field.lessThanOrEqual(value));
+ case GREATER_THAN:
+ return and(field.exists(), field.greaterThan(value));
+ case GREATER_THAN_OR_EQUAL:
+ return and(field.exists(), field.greaterThanOrEqual(value));
+ case EQUAL:
+ return and(field.exists(), field.equal(value));
+ case NOT_EQUAL:
+ return and(field.exists(), field.notEqual(value));
+ case ARRAY_CONTAINS:
+ return and(field.exists(), field.arrayContains(value));
+ case IN:
+ List valuesList = value.getArrayValue().getValuesList();
+ return and(field.exists(), Expression.equalAny(field, Lists.newArrayList(valuesList)));
+ case ARRAY_CONTAINS_ANY:
+ List valuesListAny = value.getArrayValue().getValuesList();
+ return and(field.exists(), arrayContainsAny(field, Lists.newArrayList(valuesListAny)));
+ case NOT_IN:
+ List notInValues = value.getArrayValue().getValuesList();
+ return and(
+ field.exists(), not(Expression.equalAny(field, Lists.newArrayList(notInValues))));
+ default:
+ // Handle OPERATOR_UNSPECIFIED and UNRECOGNIZED cases as needed
+ throw new IllegalArgumentException("Unsupported operator: " + comparisonFilter.operator);
+ }
+ } else if (f instanceof CompositeFilterInternal) {
+ CompositeFilterInternal compositeFilter = (CompositeFilterInternal) f;
+ switch (compositeFilter.getOperator()) {
+ case AND:
+ List conditions =
+ compositeFilter.getFilters().stream()
+ .map(PipelineUtils::toPipelineBooleanExpr)
+ .collect(Collectors.toList());
+ return and(
+ conditions.get(0),
+ conditions.subList(1, conditions.size()).toArray(new BooleanExpression[0]));
+ case OR:
+ List orConditions =
+ compositeFilter.getFilters().stream()
+ .map(PipelineUtils::toPipelineBooleanExpr)
+ .collect(Collectors.toList());
+ return or(
+ orConditions.get(0),
+ orConditions.subList(1, orConditions.size()).toArray(new BooleanExpression[0]));
+ default:
+ // Handle OPERATOR_UNSPECIFIED and UNRECOGNIZED cases as needed
+ throw new IllegalArgumentException(
+ "Unsupported operator: " + compositeFilter.getOperator());
+ }
+ } else if (f instanceof UnaryFilterInternal) {
+ UnaryFilterInternal unaryFilter = (UnaryFilterInternal) f;
+ Field field = Field.ofServerPath(unaryFilter.fieldReference.getFieldPath());
+ switch (unaryFilter.getOperator()) {
+ case IS_NAN:
+ return and(field.exists(), field.equal(Double.NaN));
+ case IS_NULL:
+ return and(field.exists(), field.equal(nullValue()));
+ case IS_NOT_NAN:
+ return and(field.exists(), field.notEqual(Double.NaN));
+ case IS_NOT_NULL:
+ return and(field.exists(), field.notEqual(nullValue()));
+ default:
+ // Handle OPERATOR_UNSPECIFIED and UNRECOGNIZED cases as needed
+ throw new IllegalArgumentException("Unsupported operator: " + unaryFilter.getOperator());
+ }
+ } else {
+ // Handle other FilterInternal types as needed
+ throw new IllegalArgumentException("Unsupported filter type: " + f.getClass().getName());
+ }
+ }
+
+ @InternalApi
+ static Pipeline toPaginatedPipeline(
+ Pipeline pipeline,
+ Cursor start,
+ Cursor end,
+ Integer limit,
+ LimitType limitType,
+ Integer offset) {
+ throw new UnsupportedOperationException(
+ "Converting to pagination pipeline is not support yet.");
+ }
+
+ @InternalApi
+ static AliasedAggregate toPipelineAggregatorTarget(AggregateField f) {
+ String operator = f.getOperator();
+ String fieldPath = f.getFieldPath();
+
+ switch (operator) {
+ case "sum":
+ return Field.ofServerPath(fieldPath).sum().as(f.getAlias());
+
+ case "count":
+ return countAll().as(f.getAlias());
+ case "average":
+ return Field.ofServerPath(fieldPath).average().as(f.getAlias());
+ default:
+ // Handle the 'else' case appropriately in your Java code
+ throw new IllegalArgumentException("Unsupported operator: " + operator);
+ }
+ }
+
+ @InternalApi
+ static BooleanExpression toPipelineExistsExpr(AggregateField f) {
+ String fieldPath = f.getFieldPath();
+
+ if (fieldPath.isEmpty()) {
+ return null;
+ }
+ return Field.ofServerPath(fieldPath).exists();
+ }
+
+ @InternalApi
+ public static Map selectablesToMap(Selectable... selectables) {
+ Map projMap = new HashMap<>();
+ for (Selectable proj : selectables) {
+ if (proj instanceof Field) {
+ Field fieldProj = (Field) proj;
+ if (projMap.containsKey(fieldProj.getPath().getEncodedPath())) {
+ throw new IllegalArgumentException(
+ "Duplicate alias or field name: " + fieldProj.getPath().getEncodedPath());
+ }
+ projMap.put(fieldProj.getPath().getEncodedPath(), fieldProj);
+ } else if (proj instanceof AliasedExpression) {
+ AliasedExpression aliasedExpr = (AliasedExpression) proj;
+ if (projMap.containsKey(aliasedExpr.getAlias())) {
+ throw new IllegalArgumentException(
+ "Duplicate alias or field name: " + aliasedExpr.getAlias());
+ }
+ projMap.put(aliasedExpr.getAlias(), aliasedExpr.getExpr());
+ }
+ }
+ return projMap;
+ }
+
+ @InternalApi
+ public static Map fieldNamesToMap(String... fields) {
+ Map projMap = new HashMap<>();
+ for (String field : fields) {
+ projMap.put(field, field(field));
+ }
+ return projMap;
+ }
+}
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java
index 2bf9e98f9..295a99d95 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java
@@ -16,6 +16,10 @@
package com.google.cloud.firestore;
+import static com.google.cloud.firestore.PipelineUtils.toPipelineBooleanExpr;
+import static com.google.cloud.firestore.pipeline.expressions.Expression.and;
+import static com.google.cloud.firestore.pipeline.expressions.Expression.or;
+import static com.google.cloud.firestore.telemetry.TraceUtil.*;
import static com.google.firestore.v1.StructuredQuery.FieldFilter.Operator.ARRAY_CONTAINS;
import static com.google.firestore.v1.StructuredQuery.FieldFilter.Operator.ARRAY_CONTAINS_ANY;
import static com.google.firestore.v1.StructuredQuery.FieldFilter.Operator.EQUAL;
@@ -35,6 +39,11 @@
import com.google.cloud.Timestamp;
import com.google.cloud.firestore.Query.QueryOptions.Builder;
import com.google.cloud.firestore.encoding.CustomClassMapper;
+import com.google.cloud.firestore.pipeline.expressions.BooleanExpression;
+import com.google.cloud.firestore.pipeline.expressions.Expression;
+import com.google.cloud.firestore.pipeline.expressions.Field;
+import com.google.cloud.firestore.pipeline.expressions.Ordering;
+import com.google.cloud.firestore.pipeline.expressions.Selectable;
import com.google.cloud.firestore.telemetry.MetricsUtil.MetricsContext;
import com.google.cloud.firestore.telemetry.TelemetryConstants;
import com.google.common.base.Preconditions;
@@ -51,6 +60,7 @@
import com.google.firestore.v1.StructuredQuery.FieldReference;
import com.google.firestore.v1.StructuredQuery.Filter;
import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.StructuredQuery.UnaryFilter;
import com.google.firestore.v1.Value;
import com.google.protobuf.ByteString;
import com.google.protobuf.Int32Value;
@@ -161,6 +171,11 @@ public List getFilters() {
return filters;
}
+ @Nonnull
+ CompositeFilter.Operator getOperator() {
+ return this.operator;
+ }
+
@Nullable
@Override
public FieldReference getFirstInequalityField() {
@@ -226,7 +241,7 @@ public List getFlattenedFilters() {
}
}
- private static class UnaryFilterInternal extends FieldFilterInternal {
+ static class UnaryFilterInternal extends FieldFilterInternal {
private final StructuredQuery.UnaryFilter.Operator operator;
@@ -253,6 +268,11 @@ Filter toProto() {
return result.build();
}
+ @Nonnull
+ UnaryFilter.Operator getOperator() {
+ return this.operator;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -2003,6 +2023,144 @@ public VectorQuery findNearest(
this, vectorField, queryVector, limit, distanceMeasure, vectorQueryOptions);
}
+ Pipeline pipeline() {
+ // From
+ Pipeline ppl =
+ this.options.getAllDescendants()
+ ? new PipelineSource(this.rpcContext).collectionGroup(this.options.getCollectionId())
+ : new PipelineSource(this.rpcContext)
+ .collection(
+ this.options.getParentPath().append(this.options.getCollectionId()).getPath());
+
+ // Filters
+ for (FilterInternal f : this.options.getFilters()) {
+ ppl = ppl.where(toPipelineBooleanExpr(f));
+ }
+
+ // Projections
+ if (this.options.getFieldProjections() != null
+ && !this.options.getFieldProjections().isEmpty()) {
+ ppl =
+ ppl.select(
+ this.options.getFieldProjections().stream()
+ .map(fieldReference -> Field.ofServerPath(fieldReference.getFieldPath()))
+ .toArray(Selectable[]::new));
+ }
+
+ // Orders
+ List normalizedOrderBy = createImplicitOrderBy();
+ int size = normalizedOrderBy.size();
+ List fields = new ArrayList<>(size);
+ List orderings = new ArrayList<>(size);
+ for (FieldOrder order : normalizedOrderBy) {
+ Field field = Field.ofServerPath(order.fieldReference.getFieldPath());
+ fields.add(field);
+ if (order.direction == Direction.ASCENDING) {
+ orderings.add(field.ascending());
+ } else {
+ orderings.add(field.descending());
+ }
+ }
+
+ if (fields.size() == 1) {
+ ppl = ppl.where(fields.get(0).exists());
+ } else {
+ ppl =
+ ppl.where(
+ and(
+ fields.get(0).exists(),
+ fields.subList(1, fields.size()).stream()
+ .map((Field field) -> field.exists())
+ .toArray(BooleanExpression[]::new)));
+ }
+
+ // Cursors, Limit, Offset
+ if (this.options.getStartCursor() != null) {
+ ppl = ppl.where(whereConditionsFromCursor(options.getStartCursor(), orderings, true));
+ }
+
+ if (this.options.getEndCursor() != null) {
+ ppl = ppl.where(whereConditionsFromCursor(options.getEndCursor(), orderings, false));
+ }
+
+ if (options.getLimit() != null) {
+ // TODO: Handle situation where user enters limit larger than integer.
+ if (options.getLimitType() == LimitType.First) {
+ ppl = ppl.sort(orderings.toArray(new Ordering[0]));
+ ppl = ppl.limit(options.getLimit());
+ } else {
+ if (options.getFieldOrders().isEmpty()) {
+ throw new IllegalStateException(
+ "limitToLast() queries require specifying at least one orderBy() clause");
+ }
+
+ List reversedOrderings = new ArrayList<>();
+ for (Ordering ordering : orderings) {
+ reversedOrderings.add(reverseOrdering(ordering));
+ }
+ ppl = ppl.sort(reversedOrderings.toArray(new Ordering[0]));
+ ppl = ppl.limit(options.getLimit());
+ ppl = ppl.sort(orderings.toArray(new Ordering[0]));
+ }
+ } else {
+ ppl = ppl.sort(orderings.toArray(new Ordering[0]));
+ }
+
+ return ppl;
+ }
+
+ private static Ordering reverseOrdering(Ordering ordering) {
+ if (ordering.getDir() == Ordering.Direction.ASCENDING) {
+ return ordering.getExpr().descending();
+ } else {
+ return ordering.getExpr().ascending();
+ }
+ }
+
+ private static BooleanExpression getCursorExclusiveCondition(
+ boolean isStart, Ordering ordering, Value value) {
+ if (isStart && ordering.getDir() == Ordering.Direction.ASCENDING
+ || !isStart && ordering.getDir() == Ordering.Direction.DESCENDING) {
+ return ordering.getExpr().greaterThan(value);
+ } else {
+ return ordering.getExpr().lessThan(value);
+ }
+ }
+
+ private static BooleanExpression whereConditionsFromCursor(
+ Cursor bound, List orderings, boolean isStart) {
+ List boundPosition = bound.getValuesList();
+ int size = boundPosition.size();
+ if (size > orderings.size()) {
+ throw new IllegalArgumentException("Bound positions must not exceed order fields.");
+ }
+
+ int last = size - 1;
+ BooleanExpression condition =
+ getCursorExclusiveCondition(isStart, orderings.get(last), boundPosition.get(last));
+ if (isBoundInclusive(bound, isStart)) {
+ condition =
+ or(condition, Expression.equal(orderings.get(last).getExpr(), boundPosition.get(last)));
+ }
+ for (int i = size - 2; i >= 0; i--) {
+ final Ordering ordering = orderings.get(i);
+ final Value value = boundPosition.get(i);
+ condition =
+ or(
+ getCursorExclusiveCondition(isStart, ordering, value),
+ and(ordering.getExpr().equal(value), condition));
+ }
+ return condition;
+ }
+
+ private static boolean isBoundInclusive(Cursor bound, boolean isStart) {
+ if (isStart) {
+ return bound.getBefore();
+ } else {
+ return !bound.getBefore();
+ }
+ }
+
/**
* Returns true if this Query is equal to the provided object.
*
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ReadTimeTransaction.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ReadTimeTransaction.java
index 8780e9c63..4c30caa4d 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ReadTimeTransaction.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ReadTimeTransaction.java
@@ -18,6 +18,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
+import com.google.cloud.firestore.pipeline.stages.PipelineExecuteOptions;
import com.google.cloud.firestore.telemetry.TelemetryConstants;
import com.google.cloud.firestore.telemetry.TraceUtil;
import com.google.common.base.Preconditions;
@@ -129,6 +130,21 @@ public ApiFuture get(@Nonnull AggregateQuery query) {
}
}
+ @Nonnull
+ @Override
+ public ApiFuture execute(@Nonnull Pipeline pipeline) {
+ return execute(pipeline, new PipelineExecuteOptions());
+ }
+
+ @Nonnull
+ @Override
+ public ApiFuture execute(
+ @Nonnull Pipeline pipeline, @Nonnull PipelineExecuteOptions options) {
+ try (TraceUtil.Scope ignored = transactionTraceContext.makeCurrent()) {
+ return pipeline.execute(options, null, readTime);
+ }
+ }
+
@Nonnull
@Override
public Transaction create(
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ServerSideTransaction.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ServerSideTransaction.java
index 40dd64ce8..1c89b2461 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ServerSideTransaction.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ServerSideTransaction.java
@@ -19,6 +19,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.firestore.TransactionOptions.TransactionOptionsType;
+import com.google.cloud.firestore.pipeline.stages.PipelineExecuteOptions;
import com.google.cloud.firestore.telemetry.TelemetryConstants;
import com.google.cloud.firestore.telemetry.TraceUtil;
import com.google.common.base.Preconditions;
@@ -266,4 +267,19 @@ public ApiFuture get(@Nonnull AggregateQuery query) {
return query.get(transactionId, null);
}
}
+
+ @Nonnull
+ @Override
+ public ApiFuture execute(@Nonnull Pipeline pipeline) {
+ return execute(pipeline, new PipelineExecuteOptions());
+ }
+
+ @Nonnull
+ @Override
+ public ApiFuture execute(
+ @Nonnull Pipeline pipeline, @Nonnull PipelineExecuteOptions options) {
+ try (TraceUtil.Scope ignored = transactionTraceContext.makeCurrent()) {
+ return pipeline.execute(new PipelineExecuteOptions(), transactionId, null);
+ }
+ }
}
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java
index ff1b7e2d6..7404c4e69 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java
@@ -17,7 +17,9 @@
package com.google.cloud.firestore;
import com.google.api.core.ApiFuture;
+import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
+import com.google.cloud.firestore.pipeline.stages.PipelineExecuteOptions;
import com.google.cloud.firestore.telemetry.MetricsUtil;
import com.google.cloud.firestore.telemetry.TraceUtil;
import com.google.cloud.firestore.telemetry.TraceUtil.Context;
@@ -141,4 +143,19 @@ public abstract ApiFuture> getAll(
*/
@Nonnull
public abstract ApiFuture get(@Nonnull AggregateQuery query);
+
+ /**
+ * @return The result of the aggregation.
+ */
+ @Nonnull
+ @BetaApi
+ public abstract ApiFuture execute(@Nonnull Pipeline pipeline);
+
+ /**
+ * @return The result of the aggregation.
+ */
+ @Nonnull
+ @BetaApi
+ public abstract ApiFuture execute(
+ @Nonnull Pipeline pipeline, @Nonnull PipelineExecuteOptions options);
}
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UserDataConverter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UserDataConverter.java
index 45f2a6627..23673c348 100644
--- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UserDataConverter.java
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UserDataConverter.java
@@ -16,7 +16,12 @@
package com.google.cloud.firestore;
+import static com.google.cloud.firestore.pipeline.expressions.FunctionUtils.aggregateFunctionToValue;
+import static com.google.cloud.firestore.pipeline.expressions.FunctionUtils.exprToValue;
+
import com.google.cloud.Timestamp;
+import com.google.cloud.firestore.pipeline.expressions.AggregateFunction;
+import com.google.cloud.firestore.pipeline.expressions.Expression;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -38,6 +43,9 @@
/** Converts user input into the Firestore Value representation. */
class UserDataConverter {
+
+ static final Value NULL_VALUE = Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build();
+
private static final Logger LOGGER = Logger.getLogger(UserDataConverter.class.getName());
/** Controls the behavior for field deletes. */
@@ -117,8 +125,9 @@ static Value encodeValue(
+ " as an argument at field '%s'.",
path);
return null;
+
} else if (sanitizedObject == null) {
- return Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build();
+ return NULL_VALUE;
} else if (sanitizedObject instanceof String) {
return Value.newBuilder().setStringValue((String) sanitizedObject).build();
} else if (sanitizedObject instanceof Integer) {
@@ -159,6 +168,10 @@ static Value encodeValue(
} else if (sanitizedObject instanceof Blob) {
Blob blob = (Blob) sanitizedObject;
return Value.newBuilder().setBytesValue(blob.toByteString()).build();
+ } else if (sanitizedObject instanceof Expression) {
+ return exprToValue((Expression) sanitizedObject);
+ } else if (sanitizedObject instanceof AggregateFunction) {
+ return aggregateFunctionToValue((AggregateFunction) sanitizedObject);
} else if (sanitizedObject instanceof Value) {
return (Value) sanitizedObject;
} else if (sanitizedObject instanceof DocumentReference) {
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/AggregateFunction.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/AggregateFunction.java
new file mode 100644
index 000000000..04e2c73f2
--- /dev/null
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/AggregateFunction.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.firestore.pipeline.expressions;
+
+import com.google.api.core.BetaApi;
+import com.google.common.collect.ImmutableList;
+import com.google.firestore.v1.Value;
+import java.util.stream.Collectors;
+
+@BetaApi
+public class AggregateFunction {
+ private final String name;
+ private final ImmutableList params;
+
+ private AggregateFunction(String name, Expression... params) {
+ this.name = name;
+ this.params = ImmutableList.copyOf(params);
+ }
+
+ private AggregateFunction(String name, String fieldName) {
+ this(name, Expression.field(fieldName));
+ }
+
+ @BetaApi
+ public static AggregateFunction generic(String name, Expression... expr) {
+ return new AggregateFunction(name, expr);
+ }
+
+ @BetaApi
+ public static AggregateFunction countAll() {
+ return new AggregateFunction("count");
+ }
+
+ @BetaApi
+ public static AggregateFunction count(String fieldName) {
+ return new AggregateFunction("count", fieldName);
+ }
+
+ @BetaApi
+ public static AggregateFunction count(Expression expression) {
+ return new AggregateFunction("count", expression);
+ }
+
+ @BetaApi
+ public static AggregateFunction countDistinct(String fieldName) {
+ return new AggregateFunction("count_distinct", fieldName);
+ }
+
+ @BetaApi
+ public static AggregateFunction countDistinct(Expression expression) {
+ return new AggregateFunction("count_distinct", expression);
+ }
+
+ @BetaApi
+ public static AggregateFunction countIf(BooleanExpression condition) {
+ return new AggregateFunction("count_if", condition);
+ }
+
+ @BetaApi
+ public static AggregateFunction sum(String fieldName) {
+ return new AggregateFunction("sum", fieldName);
+ }
+
+ @BetaApi
+ public static AggregateFunction sum(Expression expression) {
+ return new AggregateFunction("sum", expression);
+ }
+
+ @BetaApi
+ public static AggregateFunction average(String fieldName) {
+ return new AggregateFunction("average", fieldName);
+ }
+
+ @BetaApi
+ public static AggregateFunction average(Expression expression) {
+ return new AggregateFunction("average", expression);
+ }
+
+ @BetaApi
+ public static AggregateFunction minimum(String fieldName) {
+ return new AggregateFunction("minimum", fieldName);
+ }
+
+ @BetaApi
+ public static AggregateFunction minimum(Expression expression) {
+ return new AggregateFunction("minimum", expression);
+ }
+
+ @BetaApi
+ public static AggregateFunction maximum(String fieldName) {
+ return new AggregateFunction("maximum", fieldName);
+ }
+
+ @BetaApi
+ public static AggregateFunction maximum(Expression expression) {
+ return new AggregateFunction("maximum", expression);
+ }
+
+ @BetaApi
+ public AliasedAggregate as(String alias) {
+ return new AliasedAggregate(alias, this);
+ }
+
+ Value toProto() {
+ return Value.newBuilder()
+ .setFunctionValue(
+ com.google.firestore.v1.Function.newBuilder()
+ .setName(this.name)
+ .addAllArgs(
+ this.params.stream()
+ .map(FunctionUtils::exprToValue)
+ .collect(Collectors.toList())))
+ .build();
+ }
+}
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/AliasedAggregate.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/AliasedAggregate.java
new file mode 100644
index 000000000..573785972
--- /dev/null
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/AliasedAggregate.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.firestore.pipeline.expressions;
+
+import com.google.api.core.BetaApi;
+
+@BetaApi
+public class AliasedAggregate {
+ private final String alias;
+ private final AggregateFunction expr;
+
+ AliasedAggregate(String alias, AggregateFunction expr) {
+ this.alias = alias;
+ this.expr = expr;
+ }
+
+ public String getAlias() {
+ return alias;
+ }
+
+ public AggregateFunction getExpr() {
+ return expr;
+ }
+}
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/AliasedExpression.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/AliasedExpression.java
new file mode 100644
index 000000000..f5ab70f21
--- /dev/null
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/AliasedExpression.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.firestore.pipeline.expressions;
+
+import com.google.api.core.InternalApi;
+import com.google.firestore.v1.Value;
+
+@InternalApi
+public final class AliasedExpression implements Selectable {
+
+ private final String alias;
+ private final Expression expr;
+
+ @InternalApi
+ AliasedExpression(Expression expr, String alias) {
+ this.expr = expr;
+ this.alias = alias;
+ }
+
+ @InternalApi
+ public String getAlias() {
+ return alias;
+ }
+
+ @InternalApi
+ public Expression getExpr() {
+ return expr;
+ }
+
+ public Selectable as(String alias) {
+ return new AliasedExpression(this.expr, alias);
+ }
+
+ Value toProto() {
+ return expr.toProto();
+ }
+}
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/BooleanExpression.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/BooleanExpression.java
new file mode 100644
index 000000000..1a674631d
--- /dev/null
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/BooleanExpression.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.firestore.pipeline.expressions;
+
+import com.google.api.core.BetaApi;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+@BetaApi
+public class BooleanExpression extends FunctionExpression {
+ BooleanExpression(String name, Expression... params) {
+ super(name, Lists.newArrayList(params));
+ }
+
+ BooleanExpression(String name, ImmutableList params) {
+ super(name, params);
+ }
+
+ /**
+ * Creates a conditional expression that evaluates to a {@code thenExpr} expression if this
+ * condition is true or an {@code elseExpr} expression if the condition is false.
+ *
+ * @param thenExpr The expression to evaluate if the condition is true.
+ * @param elseExpr The expression to evaluate if the condition is false.
+ * @return A new {@link Expression} representing the conditional operation.
+ */
+ @BetaApi
+ public final Expression conditional(Expression thenExpr, Expression elseExpr) {
+ return conditional((BooleanExpression) this, thenExpr, elseExpr);
+ }
+
+ /**
+ * Creates a conditional expression that evaluates to a {@code thenValue} if this condition is
+ * true or an {@code elseValue} if the condition is false.
+ *
+ * @param thenValue Value if the condition is true.
+ * @param elseValue Value if the condition is false.
+ * @return A new {@link Expression} representing the conditional operation.
+ */
+ @BetaApi
+ public final Expression conditional(Object thenValue, Object elseValue) {
+ return conditional((BooleanExpression) this, thenValue, elseValue);
+ }
+
+ /**
+ * Creates an expression that returns the {@code catchExpr} argument if there is an error, else
+ * return the result of this expression.
+ *
+ * @param catchExpr The catch expression that will be evaluated and returned if the this
+ * expression produces an error.
+ * @return A new {@link Expression} representing the ifError operation.
+ */
+ @BetaApi
+ public final BooleanExpression ifError(BooleanExpression catchExpr) {
+ return ifError(this, catchExpr);
+ }
+
+ /**
+ * Creates an expression that negates this boolean expression.
+ *
+ * @return A new {@link BooleanExpression} representing the not operation.
+ */
+ @BetaApi
+ public final BooleanExpression not() {
+ return not(this);
+ }
+
+ /**
+ * Creates a 'raw' boolean function expression. This is useful if the expression is available in
+ * the backend, but not yet in the current version of the SDK yet.
+ *
+ * @param name The name of the raw function.
+ * @param params The expressions to be passed as arguments to the function.
+ * @return A new [BooleanExpression] representing the raw function.
+ */
+ public static BooleanExpression rawFunction(String name, Expression... params) {
+ return new BooleanExpression(name, params);
+ }
+}
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/Constant.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/Constant.java
new file mode 100644
index 000000000..94a5a73ad
--- /dev/null
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/Constant.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.firestore.pipeline.expressions;
+
+import static com.google.cloud.firestore.PipelineUtils.encodeValue;
+
+import com.google.api.core.BetaApi;
+import com.google.firestore.v1.Value;
+
+@BetaApi
+final class Constant extends Expression {
+
+ static final Constant NULL = new Constant(null);
+
+ private final Object value;
+
+ Constant(Object value) {
+ this.value = value;
+ }
+
+ @Override
+ Value toProto() {
+ return encodeValue(value);
+ }
+}
diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/Expression.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/Expression.java
new file mode 100644
index 000000000..3142ab95b
--- /dev/null
+++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/pipeline/expressions/Expression.java
@@ -0,0 +1,4663 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.firestore.pipeline.expressions;
+
+import com.google.api.core.BetaApi;
+import com.google.api.core.InternalApi;
+import com.google.cloud.Timestamp;
+import com.google.cloud.firestore.Blob;
+import com.google.cloud.firestore.DocumentReference;
+import com.google.cloud.firestore.FieldPath;
+import com.google.cloud.firestore.FieldValue;
+import com.google.cloud.firestore.GeoPoint;
+import com.google.cloud.firestore.VectorValue;
+import com.google.common.collect.ImmutableList;
+import com.google.firestore.v1.Value;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents an expression that can be evaluated to a value within the execution of a {@link
+ * com.google.cloud.firestore.Pipeline}.
+ *
+ *
Expressions are the building blocks for creating complex queries and transformations in
+ * Firestore pipelines. They can represent:
+ *
+ *
+ *
**Field references:** Access values from document fields.
+ *
**Function calls:** Apply functions to one or more expressions.
+ *
+ *
+ *
The `Expression` class provides a fluent API for building expressions. You can chain together
+ * method calls to create complex expressions.
+ */
+@BetaApi
+public abstract class Expression {
+
+ /** Constructor is package-private to prevent extension. */
+ Expression() {}
+
+ private static Expression toExprOrConstant(Object o) {
+ return o instanceof Expression ? (Expression) o : new Constant(o);
+ }
+
+ private static ImmutableList toArrayOfExprOrConstant(Object... others) {
+ return Arrays.stream(others)
+ .map(Expression::toExprOrConstant)
+ .collect(ImmutableList.toImmutableList());
+ }
+
+ @InternalApi
+ abstract Value toProto();
+
+ // Constants
+ /**
+ * Create a constant for a {@link String} value.
+ *
+ * @param value The {@link String} value.
+ * @return A new {@link Expression} constant instance.
+ */
+ @BetaApi
+ public static Expression constant(String value) {
+ return new Constant(value);
+ }
+
+ /**
+ * Create a constant for a {@link Number} value.
+ *
+ * @param value The {@link Number} value.
+ * @return A new {@link Expression} constant instance.
+ */
+ @BetaApi
+ public static Expression constant(Number value) {
+ return new Constant(value);
+ }
+
+ /**
+ * Create a constant for a {@link Date} value.
+ *
+ * @param value The {@link Date} value.
+ * @return A new {@link Expression} constant instance.
+ */
+ @BetaApi
+ public static Expression constant(Date value) {
+ return new Constant(value);
+ }
+
+ /**
+ * Create a constant for a {@link Timestamp} value.
+ *
+ * @param value The {@link Timestamp} value.
+ * @return A new {@link Expression} constant instance.
+ */
+ @BetaApi
+ public static Expression constant(Timestamp value) {
+ return new Constant(value);
+ }
+
+ /**
+ * Create a constant for a {@link Boolean} value.
+ *
+ * @param value The {@link Boolean} value.
+ * @return A new {@link BooleanExpression} constant instance.
+ */
+ @BetaApi
+ public static BooleanExpression constant(Boolean value) {
+ return equal(new Constant(value), true);
+ }
+
+ /**
+ * Create a constant for a {@link GeoPoint} value.
+ *
+ * @param value The {@link GeoPoint} value.
+ * @return A new {@link Expression} constant instance.
+ */
+ @BetaApi
+ public static Expression constant(GeoPoint value) {
+ return new Constant(value);
+ }
+
+ /**
+ * Create a constant for a {@link Blob} value.
+ *
+ * @param value The {@link Blob} value.
+ * @return A new {@link Expression} constant instance.
+ */
+ @BetaApi
+ public static Expression constant(Blob value) {
+ return new Constant(value);
+ }
+
+ /**
+ * Create a constant for a {@link DocumentReference} value.
+ *
+ * @param value The {@link DocumentReference} value.
+ * @return A new {@link Expression} constant instance.
+ */
+ @BetaApi
+ public static Expression constant(DocumentReference value) {
+ return new Constant(value);
+ }
+
+ /**
+ * Create a constant for a bytes value.
+ *
+ * @param value The bytes value.
+ * @return A new {@link Expression} constant instance.
+ */
+ @BetaApi
+ public static Expression constant(byte[] value) {
+ return new Constant(value);
+ }
+
+ /**
+ * Create a constant for a {@link VectorValue} value.
+ *
+ * @param value The {@link VectorValue} value.
+ * @return A new {@link Expression} constant instance.
+ */
+ @BetaApi
+ public static Expression constant(VectorValue value) {
+ return new Constant(value);
+ }
+
+ /**
+ * Constant for a null value.
+ *
+ * @return An {@link Expression} constant instance.
+ */
+ @BetaApi
+ public static Expression nullValue() {
+ return Constant.NULL;
+ }
+
+ /**
+ * Create a vector constant for a {@code double[]} value.
+ *
+ * @param value The {@code double[]} value.
+ * @return An {@link Expression} constant instance.
+ */
+ @BetaApi
+ public static Expression vector(double[] value) {
+ return new Constant(FieldValue.vector(value));
+ }
+
+ /**
+ * Create a vector constant for a {@link VectorValue} value.
+ *
+ * @param value The {@link VectorValue} value.
+ * @return An {@link Expression} constant instance.
+ */
+ @BetaApi
+ public static Expression vector(VectorValue value) {
+ return new Constant(value);
+ }
+
+ // Field Reference
+ /**
+ * Creates a {@link Field} instance representing the field at the given path.
+ *
+ *
The path can be a simple field name (e.g., "name") or a dot-separated path to a nested field
+ * (e.g., "address.city").
+ *
+ * @param path The path to the field.
+ * @return A new {@link Field} instance representing the specified path.
+ */
+ @BetaApi
+ public static Field field(String path) {
+ return Field.ofUserPath(path);
+ }
+
+ /**
+ * Creates a {@link Field} instance representing the field at the given path.
+ *
+ *
The path can be a simple field name (e.g., "name") or a dot-separated path to a nested field
+ * (e.g., "address.city").
+ *
+ * @param fieldPath The {@link FieldPath} to the field.
+ * @return A new {@link Field} instance representing the specified path.
+ */
+ @BetaApi
+ public static Field field(FieldPath fieldPath) {
+ return Field.ofUserPath(fieldPath.toString());
+ }
+
+ /**
+ * Creates an expression that returns the current timestamp.
+ *
+ * @return A new {@link Expression} representing the current timestamp.
+ */
+ @BetaApi
+ public static Expression currentTimestamp() {
+ return new FunctionExpression("current_timestamp", ImmutableList.of());
+ }
+
+ /**
+ * Creates an expression that returns a default value if an expression evaluates to an absent
+ * value.
+ *
+ * @param ifExpr The expression to check.
+ * @param elseExpr The default value.
+ * @return A new {@link Expression} representing the ifAbsent operation.
+ */
+ @BetaApi
+ public static Expression ifAbsent(Expression ifExpr, Expression elseExpr) {
+ return new FunctionExpression("if_absent", ImmutableList.of(ifExpr, elseExpr));
+ }
+
+ /**
+ * Creates an expression that returns a default value if an expression evaluates to an absent
+ * value.
+ *
+ * @param ifExpr The expression to check.
+ * @param elseValue The default value.
+ * @return A new {@link Expression} representing the ifAbsent operation.
+ */
+ @BetaApi
+ public static Expression ifAbsent(Expression ifExpr, Object elseValue) {
+ return ifAbsent(ifExpr, toExprOrConstant(elseValue));
+ }
+
+ /**
+ * Creates an expression that returns a default value if a field is absent.
+ *
+ * @param ifFieldName The field to check.
+ * @param elseExpr The default value.
+ * @return A new {@link Expression} representing the ifAbsent operation.
+ */
+ @BetaApi
+ public static Expression ifAbsent(String ifFieldName, Expression elseExpr) {
+ return ifAbsent(field(ifFieldName), elseExpr);
+ }
+
+ /**
+ * Creates an expression that returns a default value if a field is absent.
+ *
+ * @param ifFieldName The field to check.
+ * @param elseValue The default value.
+ * @return A new {@link Expression} representing the ifAbsent operation.
+ */
+ @BetaApi
+ public static Expression ifAbsent(String ifFieldName, Object elseValue) {
+ return ifAbsent(field(ifFieldName), toExprOrConstant(elseValue));
+ }
+
+ /**
+ * Creates an expression that joins the elements of an array into a string.
+ *
+ * @param arrayExpression The expression representing the array.
+ * @param delimiter The delimiter to use.
+ * @return A new {@link Expression} representing the join operation.
+ */
+ @BetaApi
+ public static Expression join(Expression arrayExpression, String delimiter) {
+ return new FunctionExpression("join", ImmutableList.of(arrayExpression, constant(delimiter)));
+ }
+
+ /**
+ * Creates an expression that joins the elements of an array into a string.
+ *
+ * @param arrayExpression The expression representing the array.
+ * @param delimiterExpression The expression representing the delimiter.
+ * @return A new {@link Expression} representing the join operation.
+ */
+ @BetaApi
+ public static Expression join(Expression arrayExpression, Expression delimiterExpression) {
+ return new FunctionExpression("join", ImmutableList.of(arrayExpression, delimiterExpression));
+ }
+
+ /**
+ * Creates an expression that joins the elements of an array into a string.
+ *
+ * @param arrayFieldName The field name of the array.
+ * @param delimiter The delimiter to use.
+ * @return A new {@link Expression} representing the join operation.
+ */
+ @BetaApi
+ public static Expression join(String arrayFieldName, String delimiter) {
+ return join(field(arrayFieldName), constant(delimiter));
+ }
+
+ /**
+ * Creates an expression that joins the elements of an array into a string.
+ *
+ * @param arrayFieldName The field name of the array.
+ * @param delimiterExpression The expression representing the delimiter.
+ * @return A new {@link Expression} representing the join operation.
+ */
+ @BetaApi
+ public static Expression join(String arrayFieldName, Expression delimiterExpression) {
+ return join(field(arrayFieldName), delimiterExpression);
+ }
+
+ // Generic Function
+ /**
+ * Creates a generic function expression that is not yet implemented.
+ *
+ * @param name The name of the generic function.
+ * @param expr The expressions to be passed as arguments to the function.
+ * @return A new {@link Expression} representing the generic function.
+ */
+ @BetaApi
+ public static Expression generic(String name, Expression... expr) {
+ return new FunctionExpression(name, ImmutableList.copyOf(expr));
+ }
+
+ // Logical Operators
+ /**
+ * Creates an expression that performs a logical 'AND' operation.
+ *
+ * @param condition The first {@link BooleanExpression}.
+ * @param conditions Additional {@link BooleanExpression}s.
+ * @return A new {@link BooleanExpression} representing the logical 'AND' operation.
+ */
+ @BetaApi
+ public static BooleanExpression and(
+ BooleanExpression condition, BooleanExpression... conditions) {
+ ImmutableList.Builder builder = ImmutableList.builder();
+ builder.add(condition);
+ builder.add(conditions);
+ return new BooleanExpression("and", builder.build());
+ }
+
+ /**
+ * Creates an expression that performs a logical 'OR' operation.
+ *
+ * @param condition The first {@link BooleanExpression}.
+ * @param conditions Additional {@link BooleanExpression}s.
+ * @return A new {@link BooleanExpression} representing the logical 'OR' operation.
+ */
+ @BetaApi
+ public static BooleanExpression or(BooleanExpression condition, BooleanExpression... conditions) {
+ ImmutableList.Builder builder = ImmutableList.builder();
+ builder.add(condition);
+ builder.add(conditions);
+ return new BooleanExpression("or", builder.build());
+ }
+
+ /**
+ * Creates an expression that performs a logical 'XOR' operation.
+ *
+ * @param condition The first {@link BooleanExpression}.
+ * @param conditions Additional {@link BooleanExpression}s.
+ * @return A new {@link BooleanExpression} representing the logical 'XOR' operation.
+ */
+ @BetaApi
+ public static BooleanExpression xor(
+ BooleanExpression condition, BooleanExpression... conditions) {
+ ImmutableList.Builder builder = ImmutableList.builder();
+ builder.add(condition);
+ builder.add(conditions);
+ return new BooleanExpression("xor", builder.build());
+ }
+
+ /**
+ * Creates an expression that negates a boolean expression.
+ *
+ * @param condition The boolean expression to negate.
+ * @return A new {@link BooleanExpression} representing the not operation.
+ */
+ @BetaApi
+ public static BooleanExpression not(BooleanExpression condition) {
+ return new BooleanExpression("not", condition);
+ }
+
+ // Arithmetic Operators
+ /**
+ * Creates an expression that adds numeric expressions.
+ *
+ * @param first Numeric expression to add.
+ * @param second Numeric expression to add.
+ * @return A new {@link Expression} representing the addition operation.
+ */
+ @BetaApi
+ public static Expression add(Expression first, Expression second) {
+ return new FunctionExpression("add", ImmutableList.of(first, second));
+ }
+
+ /**
+ * Creates an expression that adds numeric expressions with a constant.
+ *
+ * @param first Numeric expression to add.
+ * @param second Constant to add.
+ * @return A new {@link Expression} representing the addition operation.
+ */
+ @BetaApi
+ public static Expression add(Expression first, Number second) {
+ return add(first, constant(second));
+ }
+
+ /**
+ * Creates an expression that adds a numeric field with a numeric expression.
+ *
+ * @param fieldName Numeric field to add.
+ * @param second Numeric expression to add to field value.
+ * @return A new {@link Expression} representing the addition operation.
+ */
+ @BetaApi
+ public static Expression add(String fieldName, Expression second) {
+ return add(field(fieldName), second);
+ }
+
+ /**
+ * Creates an expression that adds a numeric field with constant.
+ *
+ * @param fieldName Numeric field to add.
+ * @param second Constant to add.
+ * @return A new {@link Expression} representing the addition operation.
+ */
+ @BetaApi
+ public static Expression add(String fieldName, Number second) {
+ return add(field(fieldName), constant(second));
+ }
+
+ /**
+ * Creates an expression that subtracts two expressions.
+ *
+ * @param minuend Numeric expression to subtract from.
+ * @param subtrahend Numeric expression to subtract.
+ * @return A new {@link Expression} representing the subtract operation.
+ */
+ @BetaApi
+ public static Expression subtract(Expression minuend, Expression subtrahend) {
+ return new FunctionExpression("subtract", ImmutableList.of(minuend, subtrahend));
+ }
+
+ /**
+ * Creates an expression that subtracts a constant value from a numeric expression.
+ *
+ * @param minuend Numeric expression to subtract from.
+ * @param subtrahend Constant to subtract.
+ * @return A new {@link Expression} representing the subtract operation.
+ */
+ @BetaApi
+ public static Expression subtract(Expression minuend, Number subtrahend) {
+ return subtract(minuend, constant(subtrahend));
+ }
+
+ /**
+ * Creates an expression that subtracts a numeric expressions from numeric field.
+ *
+ * @param fieldName Numeric field to subtract from.
+ * @param subtrahend Numeric expression to subtract.
+ * @return A new {@link Expression} representing the subtract operation.
+ */
+ @BetaApi
+ public static Expression subtract(String fieldName, Expression subtrahend) {
+ return subtract(field(fieldName), subtrahend);
+ }
+
+ /**
+ * Creates an expression that subtracts a constant from numeric field.
+ *
+ * @param fieldName Numeric field to subtract from.
+ * @param subtrahend Constant to subtract.
+ * @return A new {@link Expression} representing the subtract operation.
+ */
+ @BetaApi
+ public static Expression subtract(String fieldName, Number subtrahend) {
+ return subtract(field(fieldName), constant(subtrahend));
+ }
+
+ /**
+ * Creates an expression that multiplies numeric expressions.
+ *
+ * @param first Numeric expression to multiply.
+ * @param second Numeric expression to multiply.
+ * @return A new {@link Expression} representing the multiplication operation.
+ */
+ @BetaApi
+ public static Expression multiply(Expression first, Expression second) {
+ return new FunctionExpression("multiply", ImmutableList.of(first, second));
+ }
+
+ /**
+ * Creates an expression that multiplies numeric expressions with a constant.
+ *
+ * @param first Numeric expression to multiply.
+ * @param second Constant to multiply.
+ * @return A new {@link Expression} representing the multiplication operation.
+ */
+ @BetaApi
+ public static Expression multiply(Expression first, Number second) {
+ return multiply(first, constant(second));
+ }
+
+ /**
+ * Creates an expression that multiplies a numeric field with a numeric expression.
+ *
+ * @param fieldName Numeric field to multiply.
+ * @param second Numeric expression to multiply.
+ * @return A new {@link Expression} representing the multiplication operation.
+ */
+ @BetaApi
+ public static Expression multiply(String fieldName, Expression second) {
+ return multiply(field(fieldName), second);
+ }
+
+ /**
+ * Creates an expression that multiplies a numeric field with a constant.
+ *
+ * @param fieldName Numeric field to multiply.
+ * @param second Constant to multiply.
+ * @return A new {@link Expression} representing the multiplication operation.
+ */
+ @BetaApi
+ public static Expression multiply(String fieldName, Number second) {
+ return multiply(field(fieldName), constant(second));
+ }
+
+ /**
+ * Creates an expression that divides two numeric expressions.
+ *
+ * @param dividend The numeric expression to be divided.
+ * @param divisor The numeric expression to divide by.
+ * @return A new {@link Expression} representing the division operation.
+ */
+ @BetaApi
+ public static Expression divide(Expression dividend, Expression divisor) {
+ return new FunctionExpression("divide", ImmutableList.of(dividend, divisor));
+ }
+
+ /**
+ * Creates an expression that divides a numeric expression by a constant.
+ *
+ * @param dividend The numeric expression to be divided.
+ * @param divisor The constant to divide by.
+ * @return A new {@link Expression} representing the division operation.
+ */
+ @BetaApi
+ public static Expression divide(Expression dividend, Number divisor) {
+ return divide(dividend, constant(divisor));
+ }
+
+ /**
+ * Creates an expression that divides numeric field by a numeric expression.
+ *
+ * @param fieldName The numeric field name to be divided.
+ * @param divisor The numeric expression to divide by.
+ * @return A new {@link Expression} representing the divide operation.
+ */
+ @BetaApi
+ public static Expression divide(String fieldName, Expression divisor) {
+ return divide(field(fieldName), divisor);
+ }
+
+ /**
+ * Creates an expression that divides a numeric field by a constant.
+ *
+ * @param fieldName The numeric field name to be divided.
+ * @param divisor The constant to divide by.
+ * @return A new {@link Expression} representing the divide operation.
+ */
+ @BetaApi
+ public static Expression divide(String fieldName, Number divisor) {
+ return divide(field(fieldName), constant(divisor));
+ }
+
+ /**
+ * Creates an expression that calculates the modulo (remainder) of dividing two numeric
+ * expressions.
+ *
+ * @param dividend The numeric expression to be divided.
+ * @param divisor The numeric expression to divide by.
+ * @return A new {@link Expression} representing the modulo operation.
+ */
+ @BetaApi
+ public static Expression mod(Expression dividend, Expression divisor) {
+ return new FunctionExpression("mod", ImmutableList.of(dividend, divisor));
+ }
+
+ /**
+ * Creates an expression that calculates the modulo (remainder) of dividing a numeric expression
+ * by a constant.
+ *
+ * @param dividend The numeric expression to be divided.
+ * @param divisor The constant to divide by.
+ * @return A new {@link Expression} representing the modulo operation.
+ */
+ @BetaApi
+ public static Expression mod(Expression dividend, Number divisor) {
+ return mod(dividend, constant(divisor));
+ }
+
+ /**
+ * Creates an expression that calculates the modulo (remainder) of dividing a numeric field by a
+ * constant.
+ *
+ * @param fieldName The numeric field name to be divided.
+ * @param divisor The numeric expression to divide by.
+ * @return A new {@link Expression} representing the modulo operation.
+ */
+ @BetaApi
+ public static Expression mod(String fieldName, Expression divisor) {
+ return mod(field(fieldName), divisor);
+ }
+
+ /**
+ * Creates an expression that calculates the modulo (remainder) of dividing a numeric field by a
+ * constant.
+ *
+ * @param fieldName The numeric field name to be divided.
+ * @param divisor The constant to divide by.
+ * @return A new {@link Expression} representing the modulo operation.
+ */
+ @BetaApi
+ public static Expression mod(String fieldName, Number divisor) {
+ return mod(field(fieldName), constant(divisor));
+ }
+
+ // Comparison Operators
+ /**
+ * Creates an expression that checks if two expressions are equal.
+ *
+ * @param left The first expression.
+ * @param right The second expression.
+ * @return A new {@link BooleanExpression} representing the equality comparison.
+ */
+ @BetaApi
+ public static BooleanExpression equal(Expression left, Expression right) {
+ return new BooleanExpression("equal", left, right);
+ }
+
+ /**
+ * Creates an expression that checks if an expression is equal to a constant value.
+ *
+ * @param left The expression.
+ * @param right The constant value.
+ * @return A new {@link BooleanExpression} representing the equality comparison.
+ */
+ @BetaApi
+ public static BooleanExpression equal(Expression left, Object right) {
+ return new BooleanExpression("equal", left, toExprOrConstant(right));
+ }
+
+ /**
+ * Creates an expression that checks if a field is equal to an expression.
+ *
+ * @param fieldName The field name.
+ * @param right The expression.
+ * @return A new {@link BooleanExpression} representing the equality comparison.
+ */
+ @BetaApi
+ public static BooleanExpression equal(String fieldName, Expression right) {
+ return equal(field(fieldName), right);
+ }
+
+ /**
+ * Creates an expression that checks if a field is equal to a constant value.
+ *
+ * @param fieldName The field name.
+ * @param right The constant value.
+ * @return A new {@link BooleanExpression} representing the equality comparison.
+ */
+ @BetaApi
+ public static BooleanExpression equal(String fieldName, Object right) {
+ return equal(field(fieldName), toExprOrConstant(right));
+ }
+
+ /**
+ * Creates an expression that checks if two expressions are not equal.
+ *
+ * @param left The first expression.
+ * @param right The second expression.
+ * @return A new {@link BooleanExpression} representing the inequality comparison.
+ */
+ @BetaApi
+ public static BooleanExpression notEqual(Expression left, Expression right) {
+ return new BooleanExpression("not_equal", left, right);
+ }
+
+ /**
+ * Creates an expression that checks if an expression is not equal to a constant value.
+ *
+ * @param left The expression.
+ * @param right The constant value.
+ * @return A new {@link BooleanExpression} representing the inequality comparison.
+ */
+ @BetaApi
+ public static BooleanExpression notEqual(Expression left, Object right) {
+ return new BooleanExpression("not_equal", left, toExprOrConstant(right));
+ }
+
+ /**
+ * Creates an expression that checks if a field is not equal to an expression.
+ *
+ * @param fieldName The field name.
+ * @param right The expression.
+ * @return A new {@link BooleanExpression} representing the inequality comparison.
+ */
+ @BetaApi
+ public static BooleanExpression notEqual(String fieldName, Expression right) {
+ return notEqual(field(fieldName), right);
+ }
+
+ /**
+ * Creates an expression that checks if a field is not equal to a constant value.
+ *
+ * @param fieldName The field name.
+ * @param right The constant value.
+ * @return A new {@link BooleanExpression} representing the inequality comparison.
+ */
+ @BetaApi
+ public static BooleanExpression notEqual(String fieldName, Object right) {
+ return notEqual(field(fieldName), toExprOrConstant(right));
+ }
+
+ /**
+ * Creates an expression that checks if the first expression is greater than the second
+ * expression.
+ *
+ * @param left The first expression.
+ * @param right The second expression.
+ * @return A new {@link BooleanExpression} representing the greater than comparison.
+ */
+ @BetaApi
+ public static BooleanExpression greaterThan(Expression left, Expression right) {
+ return new BooleanExpression("greater_than", left, right);
+ }
+
+ /**
+ * Creates an expression that checks if an expression is greater than a constant value.
+ *
+ * @param left The expression.
+ * @param right The constant value.
+ * @return A new {@link BooleanExpression} representing the greater than comparison.
+ */
+ @BetaApi
+ public static BooleanExpression greaterThan(Expression left, Object right) {
+ return new BooleanExpression("greater_than", left, toExprOrConstant(right));
+ }
+
+ /**
+ * Creates an expression that checks if a field is greater than an expression.
+ *
+ * @param fieldName The field name.
+ * @param right The expression.
+ * @return A new {@link BooleanExpression} representing the greater than comparison.
+ */
+ @BetaApi
+ public static BooleanExpression greaterThan(String fieldName, Expression right) {
+ return greaterThan(field(fieldName), right);
+ }
+
+ /**
+ * Creates an expression that checks if a field is greater than a constant value.
+ *
+ * @param fieldName The field name.
+ * @param right The constant value.
+ * @return A new {@link BooleanExpression} representing the greater than comparison.
+ */
+ @BetaApi
+ public static BooleanExpression greaterThan(String fieldName, Object right) {
+ return greaterThan(field(fieldName), toExprOrConstant(right));
+ }
+
+ /**
+ * Creates an expression that checks if the first expression is greater than or equal to the
+ * second expression.
+ *
+ * @param left The first expression.
+ * @param right The second expression.
+ * @return A new {@link BooleanExpression} representing the greater than or equal to comparison.
+ */
+ @BetaApi
+ public static BooleanExpression greaterThanOrEqual(Expression left, Expression right) {
+ return new BooleanExpression("greater_than_or_equal", left, right);
+ }
+
+ /**
+ * Creates an expression that checks if an expression is greater than or equal to a constant
+ * value.
+ *
+ * @param left The expression.
+ * @param right The constant value.
+ * @return A new {@link BooleanExpression} representing the greater than or equal to comparison.
+ */
+ @BetaApi
+ public static BooleanExpression greaterThanOrEqual(Expression left, Object right) {
+ return new BooleanExpression("greater_than_or_equal", left, toExprOrConstant(right));
+ }
+
+ /**
+ * Creates an expression that checks if a field is greater than or equal to an expression.
+ *
+ * @param fieldName The field name.
+ * @param right The expression.
+ * @return A new {@link BooleanExpression} representing the greater than or equal to comparison.
+ */
+ @BetaApi
+ public static BooleanExpression greaterThanOrEqual(String fieldName, Expression right) {
+ return greaterThanOrEqual(field(fieldName), right);
+ }
+
+ /**
+ * Creates an expression that checks if a field is greater than or equal to a constant value.
+ *
+ * @param fieldName The field name.
+ * @param right The constant value.
+ * @return A new {@link BooleanExpression} representing the greater than or equal to comparison.
+ */
+ @BetaApi
+ public static BooleanExpression greaterThanOrEqual(String fieldName, Object right) {
+ return greaterThanOrEqual(field(fieldName), toExprOrConstant(right));
+ }
+
+ /**
+ * Creates an expression that checks if the first expression is less than the second expression.
+ *
+ * @param left The first expression.
+ * @param right The second expression.
+ * @return A new {@link BooleanExpression} representing the less than comparison.
+ */
+ @BetaApi
+ public static BooleanExpression lessThan(Expression left, Expression right) {
+ return new BooleanExpression("less_than", left, right);
+ }
+
+ /**
+ * Creates an expression that checks if an expression is less than a constant value.
+ *
+ * @param left The expression.
+ * @param right The constant value.
+ * @return A new {@link BooleanExpression} representing the less than comparison.
+ */
+ @BetaApi
+ public static BooleanExpression lessThan(Expression left, Object right) {
+ return new BooleanExpression("less_than", left, toExprOrConstant(right));
+ }
+
+ /**
+ * Creates an expression that checks if a field is less than an expression.
+ *
+ * @param fieldName The field name.
+ * @param right The expression.
+ * @return A new {@link BooleanExpression} representing the less than comparison.
+ */
+ @BetaApi
+ public static BooleanExpression lessThan(String fieldName, Expression right) {
+ return lessThan(field(fieldName), right);
+ }
+
+ /**
+ * Creates an expression that checks if a field is less than a constant value.
+ *
+ * @param fieldName The field name.
+ * @param right The constant value.
+ * @return A new {@link BooleanExpression} representing the less than comparison.
+ */
+ @BetaApi
+ public static BooleanExpression lessThan(String fieldName, Object right) {
+ return lessThan(field(fieldName), toExprOrConstant(right));
+ }
+
+ /**
+ * Creates an expression that checks if the first expression is less than or equal to the second
+ * expression.
+ *
+ * @param left The first expression.
+ * @param right The second expression.
+ * @return A new {@link BooleanExpression} representing the less than or equal to comparison.
+ */
+ @BetaApi
+ public static BooleanExpression lessThanOrEqual(Expression left, Expression right) {
+ return new BooleanExpression("less_than_or_equal", left, right);
+ }
+
+ /**
+ * Creates an expression that checks if an expression is less than or equal to a constant value.
+ *
+ * @param left The expression.
+ * @param right The constant value.
+ * @return A new {@link BooleanExpression} representing the less than or equal to comparison.
+ */
+ @BetaApi
+ public static BooleanExpression lessThanOrEqual(Expression left, Object right) {
+ return new BooleanExpression("less_than_or_equal", left, toExprOrConstant(right));
+ }
+
+ /**
+ * Creates an expression that checks if a field is less than or equal to an expression.
+ *
+ * @param fieldName The field name.
+ * @param right The expression.
+ * @return A new {@link BooleanExpression} representing the less than or equal to comparison.
+ */
+ @BetaApi
+ public static BooleanExpression lessThanOrEqual(String fieldName, Expression right) {
+ return lessThanOrEqual(field(fieldName), right);
+ }
+
+ /**
+ * Creates an expression that checks if a field is less than or equal to a constant value.
+ *
+ * @param fieldName The field name.
+ * @param right The constant value.
+ * @return A new {@link BooleanExpression} representing the less than or equal to comparison.
+ */
+ @BetaApi
+ public static BooleanExpression lessThanOrEqual(String fieldName, Object right) {
+ return lessThanOrEqual(field(fieldName), toExprOrConstant(right));
+ }
+
+ /**
+ * Creates an expression that checks if an {@code expression}, when evaluated, is equal to any of
+ * the provided {@code values}.
+ *
+ * @param expression The expression whose results to compare.
+ * @param values The values to check against.
+ * @return A new {@link BooleanExpression} representing the 'IN' comparison.
+ */
+ @BetaApi
+ public static BooleanExpression equalAny(Expression expression, List