From c74c516a786237fb7b51d42f99bb5add7cdafe7c Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Sun, 1 Feb 2026 11:30:22 -0800 Subject: [PATCH 1/7] RateLimiter POC --- .../beam/gradle/BeamModulePlugin.groovy | 1 + examples/java/build.gradle | 8 + .../beam/examples/RateLimiterSimple.java | 114 ++++++++++ .../terraform/envoy-ratelimiter/README.md | 93 ++++++++ sdks/java/io/components/build.gradle | 15 +- .../ratelimiter/EnvoyRateLimiter.java | 40 ++++ .../ratelimiter/EnvoyRateLimiterContext.java | 41 ++++ .../ratelimiter/EnvoyRateLimiterFactory.java | 214 ++++++++++++++++++ .../components/ratelimiter/RateLimiter.java | 41 ++++ .../ratelimiter/RateLimiterClientCache.java | 86 +++++++ .../ratelimiter/RateLimiterContext.java | 27 +++ .../ratelimiter/RateLimiterFactory.java | 57 +++++ .../ratelimiter/RateLimiterOptions.java | 57 +++++ .../io/google-cloud-platform/build.gradle | 1 + .../beam/sdk/io/gcp/spanner/SpannerIO.java | 28 ++- settings.gradle.kts | 1 + 16 files changed, 817 insertions(+), 7 deletions(-) create mode 100644 examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterContext.java create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 7db9e56e7194..260dada6e934 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -727,6 +727,7 @@ class BeamModulePlugin implements Plugin { commons_math3 : "org.apache.commons:commons-math3:3.6.1", dbcp2 : "org.apache.commons:commons-dbcp2:$dbcp2_version", error_prone_annotations : "com.google.errorprone:error_prone_annotations:$errorprone_version", + envoy_control_plane_api : "io.envoyproxy.controlplane:api:1.0.49", failsafe : "dev.failsafe:failsafe:3.3.0", flogger_system_backend : "com.google.flogger:flogger-system-backend:0.7.4", gax : "com.google.api:gax", // google_cloud_platform_libraries_bom sets version diff --git a/examples/java/build.gradle b/examples/java/build.gradle index 5334538cc09f..361df320c848 100644 --- a/examples/java/build.gradle +++ b/examples/java/build.gradle @@ -54,6 +54,7 @@ dependencies { implementation project(":sdks:java:extensions:python") implementation project(":sdks:java:io:google-cloud-platform") implementation project(":sdks:java:io:kafka") + implementation project(":sdks:java:io:components") implementation project(":sdks:java:extensions:ml") implementation library.java.avro implementation library.java.bigdataoss_util @@ -157,3 +158,10 @@ task wordCount(type:JavaExec) { systemProperties = System.getProperties() args = ["--output=/tmp/output.txt"] } + +task exec (type:JavaExec) { + main = System.getProperty("mainClass") + classpath = sourceSets.main.runtimeClasspath + systemProperties System.getProperties() + args System.getProperty("exec.args", "").split() +} \ No newline at end of file diff --git a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java new file mode 100644 index 000000000000..75e8219b62e1 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples; + +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterContext; +import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterFactory; +import org.apache.beam.sdk.io.components.ratelimiter.RateLimiter; +import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterFactory; +import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterOptions; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A simple example demonstrating how to use the {@link RateLimiter} in a custom {@link DoFn}. + * + *

This pipeline creates a small set of elements and processes them using a DoFn that calls an + * external service (simulated). The processing is rate-limited using an Envoy Rate Limit Service. + * + *

To run this example, you need a running Envoy Rate Limit Service. + */ +public class RateLimiterSimple { + + public interface Options extends PipelineOptions { + @Description("Address of the Envoy Rate Limit Service (e.g., localhost:8081)") + String getRateLimiterAddress(); + + void setRateLimiterAddress(String value); + + @Description("Domain for the Rate Limit Service") + String getRateLimiterDomain(); + + void setRateLimiterDomain(String value); + } + + static class CallExternalServiceFn extends DoFn { + private final String rlsAddress; + private final String rlsDomain; + private transient @Nullable RateLimiter rateLimiter = null; + + public CallExternalServiceFn(String rlsAddress, String rlsDomain) { + this.rlsAddress = rlsAddress; + this.rlsDomain = rlsDomain; + } + + @Setup + public void setup() { + // Create the RateLimiterOptions. + RateLimiterOptions options = RateLimiterOptions.builder().setAddress(rlsAddress).build(); + + // Static RateLimtier with pre-configured domain and descriptors + RateLimiterFactory factory = new EnvoyRateLimiterFactory(options); + EnvoyRateLimiterContext context = + EnvoyRateLimiterContext.create(rlsDomain, ImmutableMap.of("database", "users")); + this.rateLimiter = factory.getLimiter(context); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + String element = c.element(); + try { + Preconditions.checkNotNull(rateLimiter).allow(1); + } catch (Exception e) { + throw new RuntimeException("Failed to acquire rate limit token", e); + } + + // Simulate external API call + Thread.sleep(100); + System.out.println("Processing: " + element); + c.output("Processed: " + element); + } + } + + public static void main(String[] args) { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + + p.apply( + "CreateItems", + Create.of( + IntStream.range(0, 1000000).mapToObj(i -> "item" + i).collect(Collectors.toList()))) + .apply( + "CallExternalService", + ParDo.of( + new CallExternalServiceFn( + options.getRateLimiterAddress(), options.getRateLimiterDomain()))); + + p.run().waitUntilFinish(); + } +} diff --git a/examples/terraform/envoy-ratelimiter/README.md b/examples/terraform/envoy-ratelimiter/README.md index 47d66832487d..1e8e64fb58f9 100644 --- a/examples/terraform/envoy-ratelimiter/README.md +++ b/examples/terraform/envoy-ratelimiter/README.md @@ -38,6 +38,99 @@ Example Beam Pipelines using it: - **StatsD Exporter**: Sidecar container that converts StatsD metrics to Prometheus format, exposed on port `9102`. - **Internal Load Balancer**: A Google Cloud TCP Load Balancer exposing the Rate Limit service internally within the VPC. + +```mermaid +graph TD + %% Styles + classDef gcp fill:#e8f0fe,stroke:#4285f4,stroke-width:2px,rx:5,ry:5; + classDef vpc fill:#f1f3f4,stroke:#9aa0a6,stroke-width:2px,stroke-dasharray: 5 5,rx:5,ry:5; + classDef subnet fill:#ffffff,stroke:#dfe1e5,stroke-width:2px,rx:5,ry:5; + classDef gke fill:#e6f4ea,stroke:#34a853,stroke-width:2px,rx:5,ry:5; + classDef pod fill:#ffffff,stroke:#34a853,stroke-width:1px,rx:5,ry:5; + classDef container fill:#e8f0fe,stroke:#4285f4,stroke-width:1px,rx:3,ry:3; + classDef sidecar fill:#fce8e6,stroke:#d93025,stroke-width:1px,rx:3,ry:3; + classDef svc fill:#34a853,stroke:#ffffff,color:#ffffff,rx:5,ry:5; + classDef lb fill:#34a853,stroke:#ffffff,color:#ffffff,rx:5,ry:5; + classDef client fill:#fbbc04,stroke:#ffffff,color:#000000,rx:5,ry:5,font-weight:bold; + + subgraph GCP["Google Cloud Platform"] + direction TB + subgraph VPC["VPC Network"] + subgraph Subnet["Subnet (Private)"] + + ILB["Internal Load Balancer
(ratelimit-external)"]:::lb + + subgraph GKE["GKE Autopilot Cluster"] + + subgraph K8s["K8s Namespace"] + + ConfigMap["ConfigMap: ratelimit-config"]:::container + + subgraph RedisStack["Redis Infrastructure"] + Redis_SVC["Service: redis"]:::svc + Redis_Pod["Pod: redis"]:::pod + Redis_SVC --> Redis_Pod + end + + subgraph RateLimitStack["Rate Limit Deployment (Autoscaled)"] + RL_SVC["Service: ratelimit"]:::svc + + subgraph Pod1["Pod Replica 1"] + direction TB + Envoy1["Envoy Container"]:::container + StatsD1["StatsD Sidecar"]:::sidecar + Envoy1 -->|localhost:9125 UDP| StatsD1 + end + + subgraph Pod2["Pod Replica 2"] + direction TB + Envoy2["Envoy Container"]:::container + StatsD2["StatsD Sidecar"]:::sidecar + Envoy2 -->|localhost:9125 UDP| StatsD2 + end + + subgraph PodN["Pod Replica ... N"] + direction TB + EnvoyN["Envoy Container"]:::container + StatsDN["StatsD Sidecar"]:::sidecar + EnvoyN -->|localhost:9125 UDP| StatsDN + end + + RL_SVC --> Pod1 + RL_SVC --> Pod2 + RL_SVC ~~~ PodN + end + + HPA["HPA: ratelimit-hpa"]:::container + + ILB --> RL_SVC + Envoy1 & Envoy2 & EnvoyN -->|Redis Protocol| Redis_SVC + Pod1 & Pod2 & PodN -.->|Mount| ConfigMap + + HPA -.->|Monitors CPU/Mem| RateLimitStack + HPA -.->|Scales Replicas| RateLimitStack + end + end + end + end + end + + subgraph Clients["Dataflow Workers (Clients)"] + direction LR + Worker1["Worker 1"]:::client + Worker2["Worker 2"]:::client + WorkerN["Worker ..."]:::client + end + + Worker1 & Worker2 & WorkerN -->|gRPC / HTTP :8081| ILB + + class GCP gcp + class VPC vpc + class Subnet subnet + class GKE gke + class Pod1,Pod2,PodN,Redis_Pod pod +``` + ## Prerequisites: ### Following items need to be setup for Envoy Rate Limiter deployment on GCP: 1. [GCP project](https://cloud.google.com/resource-manager/docs/creating-managing-projects) diff --git a/sdks/java/io/components/build.gradle b/sdks/java/io/components/build.gradle index 25bf95772110..fe9eceab7f1b 100644 --- a/sdks/java/io/components/build.gradle +++ b/sdks/java/io/components/build.gradle @@ -18,15 +18,15 @@ plugins { id 'org.apache.beam.module' } applyJavaNature( - automaticModuleName: 'org.apache.beam.sdk.io.components', + automaticModuleName: 'org.apache.beam.sdk.io.components', ) description = "Apache Beam :: SDKs :: Java :: IO :: Components" ext.summary = "Components for building fully featured IOs" dependencies { - implementation project(path: ":sdks:java:core", configuration: "shadow") - implementation library.java.protobuf_java + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.protobuf_java permitUnusedDeclared library.java.protobuf_java // BEAM-11761 implementation library.java.slf4j_api implementation library.java.vendored_guava_32_1_2_jre @@ -34,8 +34,15 @@ dependencies { testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation library.java.guava_testlib - testImplementation library.java.junit + testImplementation library.java.junit testImplementation library.java.hamcrest testRuntimeOnly library.java.slf4j_jdk14 testImplementation project(path: ":runners:direct-java", configuration: "shadow") + + // Envoy Rate Limiter Dependencies + implementation library.java.envoy_control_plane_api + implementation library.java.grpc_api + implementation library.java.grpc_stub + implementation library.java.grpc_protobuf + implementation library.java.auto_value_annotations } \ No newline at end of file diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java new file mode 100644 index 000000000000..6d70a2380cba --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import java.io.IOException; + +/** + * A lightweight handle for an Envoy-based rate limiter. + * + *

Delegates checks to the {@link EnvoyRateLimiterFactory} using the baked-in {@link Context}. + */ +public class EnvoyRateLimiter implements RateLimiter { + private final EnvoyRateLimiterFactory factory; + private final EnvoyRateLimiterContext context; + + public EnvoyRateLimiter(EnvoyRateLimiterFactory factory, EnvoyRateLimiterContext context) { + this.factory = factory; + this.context = context; + } + + @Override + public boolean allow(int permits) throws IOException, InterruptedException { + return factory.check(context, permits); + } +} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java new file mode 100644 index 000000000000..7d04f52ab1f0 --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import com.google.auto.value.AutoValue; +import java.util.Map; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; + +/** + * Context for an Envoy Rate Limiter check. + * + *

Contains the domain and descriptors required to define a specific rate limit bucket. + */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class EnvoyRateLimiterContext implements RateLimiterContext { + + public abstract String getDomain(); + + public abstract Map getDescriptors(); + + public static EnvoyRateLimiterContext create(String domain, Map descriptors) { + return new AutoValue_EnvoyRateLimiterContext(domain, descriptors); + } +} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java new file mode 100644 index 000000000000..1e18e55f4c64 --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import io.envoyproxy.envoy.extensions.common.ratelimit.v3.RateLimitDescriptor; +import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitRequest; +import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitResponse; +import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitServiceGrpc; +import io.grpc.StatusRuntimeException; +import java.io.IOException; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.components.throttling.ThrottlingSignaler; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A {@link RateLimiterFactory} for Envoy Rate Limit Service. */ +public class EnvoyRateLimiterFactory implements RateLimiterFactory { + private static final Logger LOG = LoggerFactory.getLogger(EnvoyRateLimiterFactory.class); + private static final int RPC_RETRY_COUNT = 3; + private static final long RPC_RETRY_DELAY_MILLIS = 5000; + + private final RateLimiterOptions options; + + private transient volatile @Nullable RateLimitServiceGrpc.RateLimitServiceBlockingStub stub; + private final ThrottlingSignaler throttlingSignaler; + + private final Counter requestsTotal; + private final Counter requestsAllowed; + private final Counter requestsThrottled; + private final Counter rpcErrors; + private final Counter rpcRetries; + private final Distribution rpcLatency; + + public EnvoyRateLimiterFactory(RateLimiterOptions options) { + this.options = options; + String namespace = EnvoyRateLimiterFactory.class.getName(); + this.throttlingSignaler = new ThrottlingSignaler(namespace); + this.requestsTotal = Metrics.counter(namespace, "ratelimit-requests-total"); + this.requestsAllowed = Metrics.counter(namespace, "ratelimit-requests-allowed"); + this.requestsThrottled = Metrics.counter(namespace, "ratelimit-requests-throttled"); + this.rpcErrors = Metrics.counter(namespace, "ratelimit-rpc-errors"); + this.rpcRetries = Metrics.counter(namespace, "ratelimit-rpc-retries"); + this.rpcLatency = Metrics.distribution(namespace, "ratelimit-rpc-latency-ms"); + } + + private void init() { + if (stub != null) { + return; + } + synchronized (this) { + if (stub == null) { + RateLimiterClientCache clientCache = + RateLimiterClientCache.getOrCreate(options.getAddress()); + stub = + RateLimitServiceGrpc.newBlockingStub( + Preconditions.checkNotNull(clientCache).getChannel()); + } + } + } + + @Override + public RateLimiter getLimiter(RateLimiterContext context) { + if (!(context instanceof EnvoyRateLimiterContext)) { + throw new IllegalArgumentException( + "EnvoyRateLimiterFactory requires EnvoyRateLimiterContext"); + } + return new EnvoyRateLimiter(this, (EnvoyRateLimiterContext) context); + } + + @Override + public boolean check(RateLimiterContext context, int permits) + throws IOException, InterruptedException { + if (!(context instanceof EnvoyRateLimiterContext)) { + throw new IllegalArgumentException( + "EnvoyRateLimiterFactory requires EnvoyRateLimiterContext, got: " + + context.getClass().getName()); + } + EnvoyRateLimiterContext envoyContext = (EnvoyRateLimiterContext) context; + return callEnvoy(envoyContext, permits); + } + + private boolean callEnvoy(EnvoyRateLimiterContext context, int tokens) + throws IOException, InterruptedException { + + init(); + Sleeper sleeper = Sleeper.DEFAULT; + RateLimitServiceGrpc.RateLimitServiceBlockingStub currentStub = stub; + if (currentStub == null) { + throw new IllegalStateException("RateLimitService stub is null"); + } + + Map descriptors = context.getDescriptors(); + RateLimitDescriptor.Builder descriptorBuilder = RateLimitDescriptor.newBuilder(); + + for (Map.Entry entry : descriptors.entrySet()) { + descriptorBuilder.addEntries( + RateLimitDescriptor.Entry.newBuilder() + .setKey(entry.getKey()) + .setValue(entry.getValue()) + .build()); + } + + RateLimitRequest request = + RateLimitRequest.newBuilder() + .setDomain(context.getDomain()) + .setHitsAddend(tokens) + .addDescriptors(descriptorBuilder.build()) + .build(); + + boolean blockUntilAllowed = options.isBlockUntilAllowed(); + int maxRetries = options.getMaxRetries(); + long timeoutMillis = options.getTimeout().toMillis(); + + requestsTotal.inc(); + int attempt = 0; + while (true) { + if (!blockUntilAllowed && attempt > maxRetries) { + return false; + } + + // RPC Retry Loop + RateLimitResponse response = null; + long startTime = System.currentTimeMillis(); + for (int i = 0; i < RPC_RETRY_COUNT; i++) { + try { + response = + currentStub + .withDeadlineAfter(timeoutMillis, java.util.concurrent.TimeUnit.MILLISECONDS) + .shouldRateLimit(request); + long endTime = System.currentTimeMillis(); + rpcLatency.update(endTime - startTime); + break; + } catch (StatusRuntimeException e) { + rpcErrors.inc(); + if (i == RPC_RETRY_COUNT - 1) { + LOG.error("RateLimitService call failed after {} attempts", RPC_RETRY_COUNT, e); + throw new IOException("Failed to call Rate Limit Service", e); + } + rpcRetries.inc(); + LOG.warn("RateLimitService call failed, retrying", e); + if (sleeper != null) { + sleeper.sleep(RPC_RETRY_DELAY_MILLIS); + } + } + } + + if (response == null) { + throw new IOException("Failed to get response from Rate Limit Service"); + } + + if (response.getOverallCode() == RateLimitResponse.Code.OK) { + requestsAllowed.inc(); + return true; + } else if (response.getOverallCode() == RateLimitResponse.Code.OVER_LIMIT) { + long sleepMillis = 0; + for (RateLimitResponse.DescriptorStatus status : response.getStatusesList()) { + if (status.getCode() == RateLimitResponse.Code.OVER_LIMIT + && status.hasDurationUntilReset()) { + long durationMillis = + status.getDurationUntilReset().getSeconds() * 1000 + + status.getDurationUntilReset().getNanos() / 1_000_000; + if (durationMillis > sleepMillis) { + sleepMillis = durationMillis; + } + } + } + + if (sleepMillis == 0) { + sleepMillis = 1000; + } + + long jitter = + (long) + (java.util.concurrent.ThreadLocalRandom.current().nextDouble() + * (0.01 * sleepMillis)); + sleepMillis += jitter; + + LOG.warn("Throttled by RLS, sleeping for {} ms", sleepMillis); + if (sleeper != null) { + requestsThrottled.inc(); + if (throttlingSignaler != null) { + throttlingSignaler.signalThrottling(sleepMillis); + } + sleeper.sleep(sleepMillis); + } + attempt++; + } else { + throw new IOException( + "Rate Limit Service returned unknown code: " + response.getOverallCode()); + } + } + } +} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java new file mode 100644 index 000000000000..6620354fe7cd --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import java.io.IOException; +import java.io.Serializable; + +/** + * A RateLimiter allows to fetch permits from a rate limiter service and blocks execution when the + * rate limit is exceeded. + * + *

Implementations must be {@link Serializable} as they are passed to workers. + */ +public interface RateLimiter extends Serializable { + + /** + * Blocks until the specified number of permits are acquired and returns true if the request was + * allowed or false if the request was rejected. + * + * @param permits Number of permits to acquire. + * @return true if the request was allowed, false if it was rejected (and retries exceeded). + * @throws IOException if there is an error communicating with the rate limiter service. + * @throws InterruptedException if the thread is interrupted while waiting. + */ + boolean allow(int permits) throws IOException, InterruptedException; +} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java new file mode 100644 index 000000000000..6e212786706a --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A static cache for {@link ManagedChannel}s to Rate Limit Service. + * + *

This class ensures that multiple DoFn instances (threads) in the same Worker sharing the same + * RLS address will share a single {@link ManagedChannel}. + * + *

It uses reference counting to close the channel when it is no longer in use by any RateLimiter + * instance. + */ +public class RateLimiterClientCache { + private static final Logger LOG = LoggerFactory.getLogger(RateLimiterClientCache.class); + private static final Map CACHE = new ConcurrentHashMap<>(); + + private final ManagedChannel channel; + private final String address; + private int refCount = 0; + + private RateLimiterClientCache(String address) { + this.address = address; + LOG.info("Creating new ManagedChannel for RLS at {}", address); + this.channel = ManagedChannelBuilder.forTarget(address).usePlaintext().build(); + } + + /** Gets or creates a cached client for the given address. Increments the reference count. */ + public static synchronized RateLimiterClientCache getOrCreate(String address) { + RateLimiterClientCache client = CACHE.get(address); + if (client == null) { + client = new RateLimiterClientCache(address); + CACHE.put(address, client); + } + client.refCount++; + LOG.debug("Referenced RLS Channel for {}. New RefCount: {}", address, client.refCount); + return client; + } + + public ManagedChannel getChannel() { + return channel; + } + + /** + * Releases the client. Decrements the reference count. If reference count reaches 0, the channel + * is shut down and removed from the cache. + */ + public synchronized void release() { + refCount--; + LOG.debug("Released RLS Channel for {}. New RefCount: {}", address, refCount); + if (refCount <= 0) { + LOG.info("Closing ManagedChannel for RLS at {}", address); + CACHE.remove(address); + channel.shutdown(); + try { + channel.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.error("Couldn't gracefully close gRPC channel={}", channel, e); + } + channel.shutdownNow(); + } + } +} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterContext.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterContext.java new file mode 100644 index 000000000000..6387bf5789e4 --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterContext.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import java.io.Serializable; + +/** + * A marker interface for context data required to check ratelimit. + * + *

Implementations must be {@link Serializable}. + */ +public interface RateLimiterContext extends Serializable {} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java new file mode 100644 index 000000000000..c1f16404cf5e --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import java.io.IOException; +import java.io.Serializable; + +/** + * A factory that manages connections to rate limit service and creates lightweight handles. + * + *

Implementations must be {@link Serializable} as they are passed to workers. The factory + * typically manages the heavy connection (e.g. gRPC stub) and is thread-safe. + */ +public interface RateLimiterFactory extends Serializable { + + /** + * Creates a lightweight ratelimiter handle bound to a specific context. + * + *

Use this when passing ratelimiter to IO components, which doesn't need to know about the + * configuration or the underlying ratelimiter service details. This is also useful in DoFns when + * you want to use the ratelimiter in a static way based on the compile time context. + * + * @param context The context for the ratelimit. + * @return A {@link RateLimiter} handle. + */ + RateLimiter getLimiter(RateLimiterContext context); + + /** + * Blocks until the specified number of permits are acquired and returns true if the request was + * allowed or false if the request was rejected. + * + *

Use this for when the ratelimit namespace or descriptors are not known at compile time. + * allows you to use the ratelimiter in a dynamic way based on the runtime data. + * + * @param context The context for the ratelimit. + * @param permits Number of permits to acquire. + * @return true if the request is allowed, false if rejected. + * @throws IOException if there is an error communicating with the ratelimiter service. + * @throws InterruptedException if the thread is interrupted while waiting. + */ + boolean check(RateLimiterContext context, int permits) throws IOException, InterruptedException; +} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java new file mode 100644 index 000000000000..a0aafdc09d89 --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.time.Duration; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; + +/** Configuration options for {@link RateLimiterFactory}. */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class RateLimiterOptions implements Serializable { + public abstract String getAddress(); + + public abstract Duration getTimeout(); + + public abstract boolean isBlockUntilAllowed(); + + public abstract int getMaxRetries(); + + public static Builder builder() { + return new AutoValue_RateLimiterOptions.Builder() + .setTimeout(Duration.ofSeconds(5)) + .setBlockUntilAllowed(true) + .setMaxRetries(3); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setAddress(String address); + + public abstract Builder setTimeout(Duration timeout); + + public abstract Builder setBlockUntilAllowed(boolean blockUntilAllowed); + + public abstract Builder setMaxRetries(int maxRetries); + + public abstract RateLimiterOptions build(); + } +} diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 5dd3f9bb761d..ec92b0c5c7f4 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -162,6 +162,7 @@ dependencies { testImplementation project(path: ":runners:direct-java", configuration: "shadow") testImplementation project(":sdks:java:managed") testImplementation project(path: ":sdks:java:io:common") + implementation project(":sdks:java:io:components") testImplementation project(path: ":sdks:java:testing:test-utils") testImplementation library.java.commons_math3 testImplementation library.java.google_cloud_bigquery diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 450710112a1b..78a70cb3b3bb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -75,6 +75,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.components.ratelimiter.RateLimiter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants; import org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory; @@ -1289,6 +1290,8 @@ public abstract static class Write extends PTransform, Spa abstract @Nullable PCollectionView getDialectView(); + abstract @Nullable RateLimiter getRateLimiter(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -1310,6 +1313,8 @@ abstract static class Builder { abstract Builder setDialectView(PCollectionView dialect); + abstract Builder setRateLimiter(RateLimiter rateLimiter); + abstract Write build(); } @@ -1393,6 +1398,11 @@ public Write withUsingPlainTextChannel(ValueProvider plainText) { return withSpannerConfig(config.withUsingPlainTextChannel(plainText)); } + /** Specifies the {@link RateLimiter} to use to throttle IO. */ + public Write withRateLimiter(RateLimiter rateLimiter) { + return toBuilder().setRateLimiter(rateLimiter).build(); + } + /** * Specifies whether to use plaintext channel. * @@ -1697,7 +1707,10 @@ public SpannerWriteResult expand(PCollection input) { "Write batches to Spanner", ParDo.of( new WriteToSpannerFn( - spec.getSpannerConfig(), spec.getFailureMode(), FAILED_MUTATIONS_TAG)) + spec.getSpannerConfig(), + spec.getFailureMode(), + FAILED_MUTATIONS_TAG, + spec.getRateLimiter())) .withOutputTags(MAIN_OUT_TAG, TupleTagList.of(FAILED_MUTATIONS_TAG))); return new SpannerWriteResult( @@ -2458,11 +2471,17 @@ static class WriteToSpannerFn extends DoFn, Void> { private transient FluentBackoff bundleWriteBackoff; private transient LoadingCache writeMetricsByTableName; + private final @Nullable RateLimiter rateLimiter; + WriteToSpannerFn( - SpannerConfig spannerConfig, FailureMode failureMode, TupleTag failedTag) { + SpannerConfig spannerConfig, + FailureMode failureMode, + TupleTag failedTag, + @Nullable RateLimiter rateLimiter) { this.spannerConfig = spannerConfig; this.failureMode = failureMode; this.failedTag = failedTag; + this.rateLimiter = rateLimiter; } @Setup @@ -2618,7 +2637,7 @@ private static ServiceCallMetric buildWriteServiceCallMetric( /** Write the Mutations to Spanner, handling DEADLINE_EXCEEDED with backoff/retries. */ private void writeMutations(Iterable mutationIterable) - throws SpannerException, IOException { + throws SpannerException, IOException, InterruptedException { BackOff backoff = bundleWriteBackoff.backoff(); List mutations = ImmutableList.copyOf(mutationIterable); @@ -2626,6 +2645,9 @@ private void writeMutations(Iterable mutationIterable) Stopwatch timer = Stopwatch.createStarted(); // loop is broken on success, timeout backoff/retry attempts exceeded, or other failure. try { + if (rateLimiter != null) { + rateLimiter.allow(1); + } spannerWriteWithRetryIfSchemaChange(mutations); spannerWriteSuccess.inc(); return; diff --git a/settings.gradle.kts b/settings.gradle.kts index 4540fa4b597b..1d6e0e6e20ae 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -212,6 +212,7 @@ include(":sdks:java:io:azure-cosmos") include(":sdks:java:io:cassandra") include(":sdks:java:io:clickhouse") include(":sdks:java:io:common") +include(":sdks:java:io:components") include(":sdks:java:io:contextualtextio") include(":sdks:java:io:debezium") include(":sdks:java:io:debezium:expansion-service") From cb005dae698acc195d257210fd76ffdc36cddd9b Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Mon, 2 Feb 2026 10:22:48 -0800 Subject: [PATCH 2/7] refactor code --- examples/java/build.gradle | 7 -- .../terraform/envoy-ratelimiter/README.md | 92 ------------------- sdks/java/io/components/build.gradle | 18 ++-- .../ratelimiter/EnvoyRateLimiter.java | 4 +- .../ratelimiter/EnvoyRateLimiterContext.java | 2 +- .../ratelimiter/EnvoyRateLimiterFactory.java | 2 +- .../ratelimiter/RateLimiterFactory.java | 2 +- 7 files changed, 14 insertions(+), 113 deletions(-) diff --git a/examples/java/build.gradle b/examples/java/build.gradle index 361df320c848..8ae0b47ef923 100644 --- a/examples/java/build.gradle +++ b/examples/java/build.gradle @@ -158,10 +158,3 @@ task wordCount(type:JavaExec) { systemProperties = System.getProperties() args = ["--output=/tmp/output.txt"] } - -task exec (type:JavaExec) { - main = System.getProperty("mainClass") - classpath = sourceSets.main.runtimeClasspath - systemProperties System.getProperties() - args System.getProperty("exec.args", "").split() -} \ No newline at end of file diff --git a/examples/terraform/envoy-ratelimiter/README.md b/examples/terraform/envoy-ratelimiter/README.md index 1e8e64fb58f9..07d9b6f32c7e 100644 --- a/examples/terraform/envoy-ratelimiter/README.md +++ b/examples/terraform/envoy-ratelimiter/README.md @@ -39,98 +39,6 @@ Example Beam Pipelines using it: - **Internal Load Balancer**: A Google Cloud TCP Load Balancer exposing the Rate Limit service internally within the VPC. -```mermaid -graph TD - %% Styles - classDef gcp fill:#e8f0fe,stroke:#4285f4,stroke-width:2px,rx:5,ry:5; - classDef vpc fill:#f1f3f4,stroke:#9aa0a6,stroke-width:2px,stroke-dasharray: 5 5,rx:5,ry:5; - classDef subnet fill:#ffffff,stroke:#dfe1e5,stroke-width:2px,rx:5,ry:5; - classDef gke fill:#e6f4ea,stroke:#34a853,stroke-width:2px,rx:5,ry:5; - classDef pod fill:#ffffff,stroke:#34a853,stroke-width:1px,rx:5,ry:5; - classDef container fill:#e8f0fe,stroke:#4285f4,stroke-width:1px,rx:3,ry:3; - classDef sidecar fill:#fce8e6,stroke:#d93025,stroke-width:1px,rx:3,ry:3; - classDef svc fill:#34a853,stroke:#ffffff,color:#ffffff,rx:5,ry:5; - classDef lb fill:#34a853,stroke:#ffffff,color:#ffffff,rx:5,ry:5; - classDef client fill:#fbbc04,stroke:#ffffff,color:#000000,rx:5,ry:5,font-weight:bold; - - subgraph GCP["Google Cloud Platform"] - direction TB - subgraph VPC["VPC Network"] - subgraph Subnet["Subnet (Private)"] - - ILB["Internal Load Balancer
(ratelimit-external)"]:::lb - - subgraph GKE["GKE Autopilot Cluster"] - - subgraph K8s["K8s Namespace"] - - ConfigMap["ConfigMap: ratelimit-config"]:::container - - subgraph RedisStack["Redis Infrastructure"] - Redis_SVC["Service: redis"]:::svc - Redis_Pod["Pod: redis"]:::pod - Redis_SVC --> Redis_Pod - end - - subgraph RateLimitStack["Rate Limit Deployment (Autoscaled)"] - RL_SVC["Service: ratelimit"]:::svc - - subgraph Pod1["Pod Replica 1"] - direction TB - Envoy1["Envoy Container"]:::container - StatsD1["StatsD Sidecar"]:::sidecar - Envoy1 -->|localhost:9125 UDP| StatsD1 - end - - subgraph Pod2["Pod Replica 2"] - direction TB - Envoy2["Envoy Container"]:::container - StatsD2["StatsD Sidecar"]:::sidecar - Envoy2 -->|localhost:9125 UDP| StatsD2 - end - - subgraph PodN["Pod Replica ... N"] - direction TB - EnvoyN["Envoy Container"]:::container - StatsDN["StatsD Sidecar"]:::sidecar - EnvoyN -->|localhost:9125 UDP| StatsDN - end - - RL_SVC --> Pod1 - RL_SVC --> Pod2 - RL_SVC ~~~ PodN - end - - HPA["HPA: ratelimit-hpa"]:::container - - ILB --> RL_SVC - Envoy1 & Envoy2 & EnvoyN -->|Redis Protocol| Redis_SVC - Pod1 & Pod2 & PodN -.->|Mount| ConfigMap - - HPA -.->|Monitors CPU/Mem| RateLimitStack - HPA -.->|Scales Replicas| RateLimitStack - end - end - end - end - end - - subgraph Clients["Dataflow Workers (Clients)"] - direction LR - Worker1["Worker 1"]:::client - Worker2["Worker 2"]:::client - WorkerN["Worker ..."]:::client - end - - Worker1 & Worker2 & WorkerN -->|gRPC / HTTP :8081| ILB - - class GCP gcp - class VPC vpc - class Subnet subnet - class GKE gke - class Pod1,Pod2,PodN,Redis_Pod pod -``` - ## Prerequisites: ### Following items need to be setup for Envoy Rate Limiter deployment on GCP: 1. [GCP project](https://cloud.google.com/resource-manager/docs/creating-managing-projects) diff --git a/sdks/java/io/components/build.gradle b/sdks/java/io/components/build.gradle index fe9eceab7f1b..87f93229a9c3 100644 --- a/sdks/java/io/components/build.gradle +++ b/sdks/java/io/components/build.gradle @@ -25,8 +25,8 @@ description = "Apache Beam :: SDKs :: Java :: IO :: Components" ext.summary = "Components for building fully featured IOs" dependencies { - implementation project(path: ":sdks:java:core", configuration: "shadow") - implementation library.java.protobuf_java + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.protobuf_java permitUnusedDeclared library.java.protobuf_java // BEAM-11761 implementation library.java.slf4j_api implementation library.java.vendored_guava_32_1_2_jre @@ -34,15 +34,15 @@ dependencies { testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation library.java.guava_testlib - testImplementation library.java.junit + testImplementation library.java.junit testImplementation library.java.hamcrest testRuntimeOnly library.java.slf4j_jdk14 testImplementation project(path: ":runners:direct-java", configuration: "shadow") - // Envoy Rate Limiter Dependencies - implementation library.java.envoy_control_plane_api - implementation library.java.grpc_api - implementation library.java.grpc_stub - implementation library.java.grpc_protobuf - implementation library.java.auto_value_annotations + // Envoy Rate Limiter Dependencies + implementation library.java.envoy_control_plane_api + implementation library.java.grpc_api + implementation library.java.grpc_stub + implementation library.java.grpc_protobuf + implementation library.java.auto_value_annotations } \ No newline at end of file diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java index 6d70a2380cba..6adcef078e3d 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java @@ -22,7 +22,7 @@ /** * A lightweight handle for an Envoy-based rate limiter. * - *

Delegates checks to the {@link EnvoyRateLimiterFactory} using the baked-in {@link Context}. + *

Delegates work to the {@link EnvoyRateLimiterFactory} using the baked-in {@link Context}. */ public class EnvoyRateLimiter implements RateLimiter { private final EnvoyRateLimiterFactory factory; @@ -35,6 +35,6 @@ public EnvoyRateLimiter(EnvoyRateLimiterFactory factory, EnvoyRateLimiterContext @Override public boolean allow(int permits) throws IOException, InterruptedException { - return factory.check(context, permits); + return factory.allow(context, permits); } } diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java index 7d04f52ab1f0..4b88aa3a1b0c 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java @@ -23,7 +23,7 @@ import org.apache.beam.sdk.schemas.annotations.DefaultSchema; /** - * Context for an Envoy Rate Limiter check. + * Context for an Envoy Rate Limiter call. * *

Contains the domain and descriptors required to define a specific rate limit bucket. */ diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java index 1e18e55f4c64..2adb9e87a980 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java @@ -89,7 +89,7 @@ public RateLimiter getLimiter(RateLimiterContext context) { } @Override - public boolean check(RateLimiterContext context, int permits) + public boolean allow(RateLimiterContext context, int permits) throws IOException, InterruptedException { if (!(context instanceof EnvoyRateLimiterContext)) { throw new IllegalArgumentException( diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java index c1f16404cf5e..6ff1c8316cf4 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java @@ -53,5 +53,5 @@ public interface RateLimiterFactory extends Serializable { * @throws IOException if there is an error communicating with the ratelimiter service. * @throws InterruptedException if the thread is interrupted while waiting. */ - boolean check(RateLimiterContext context, int permits) throws IOException, InterruptedException; + boolean allow(RateLimiterContext context, int permits) throws IOException, InterruptedException; } From 2ed7704d6f637b0c183b4f2ac71bc901ffb484d3 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Mon, 2 Feb 2026 10:26:57 -0800 Subject: [PATCH 3/7] clean up --- examples/terraform/envoy-ratelimiter/README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/terraform/envoy-ratelimiter/README.md b/examples/terraform/envoy-ratelimiter/README.md index 07d9b6f32c7e..47d66832487d 100644 --- a/examples/terraform/envoy-ratelimiter/README.md +++ b/examples/terraform/envoy-ratelimiter/README.md @@ -38,7 +38,6 @@ Example Beam Pipelines using it: - **StatsD Exporter**: Sidecar container that converts StatsD metrics to Prometheus format, exposed on port `9102`. - **Internal Load Balancer**: A Google Cloud TCP Load Balancer exposing the Rate Limit service internally within the VPC. - ## Prerequisites: ### Following items need to be setup for Envoy Rate Limiter deployment on GCP: 1. [GCP project](https://cloud.google.com/resource-manager/docs/creating-managing-projects) From caee8018bf16293c876a85c1fe541df056b3b19d Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Tue, 3 Feb 2026 11:58:37 -0800 Subject: [PATCH 4/7] add autocloseable --- .../beam/examples/RateLimiterSimple.java | 23 +++++++++++++--- .../ratelimiter/EnvoyRateLimiter.java | 5 ++++ .../ratelimiter/EnvoyRateLimiterContext.java | 27 ++++++++++++++++--- .../ratelimiter/EnvoyRateLimiterFactory.java | 19 ++++++++----- .../components/ratelimiter/RateLimiter.java | 2 +- .../ratelimiter/RateLimiterFactory.java | 2 +- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 5 +++- 7 files changed, 67 insertions(+), 16 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java index 75e8219b62e1..5b7225ffcda6 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java +++ b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -61,6 +60,7 @@ static class CallExternalServiceFn extends DoFn { private final String rlsAddress; private final String rlsDomain; private transient @Nullable RateLimiter rateLimiter = null; + private transient @Nullable RateLimiterFactory factory = null; public CallExternalServiceFn(String rlsAddress, String rlsDomain) { this.rlsAddress = rlsAddress; @@ -73,12 +73,27 @@ public void setup() { RateLimiterOptions options = RateLimiterOptions.builder().setAddress(rlsAddress).build(); // Static RateLimtier with pre-configured domain and descriptors - RateLimiterFactory factory = new EnvoyRateLimiterFactory(options); + EnvoyRateLimiterFactory factory = new EnvoyRateLimiterFactory(options); + this.factory = factory; EnvoyRateLimiterContext context = - EnvoyRateLimiterContext.create(rlsDomain, ImmutableMap.of("database", "users")); + EnvoyRateLimiterContext.builder() + .setDomain(rlsDomain) + .addDescriptor("database", "users") + .build(); this.rateLimiter = factory.getLimiter(context); } + @Teardown + public void teardown() { + if (factory != null) { + try { + factory.close(); + } catch (Exception e) { + throw new RuntimeException("Failed to close RateLimiterFactory", e); + } + } + } + @ProcessElement public void processElement(ProcessContext c) throws Exception { String element = c.element(); @@ -102,7 +117,7 @@ public static void main(String[] args) { p.apply( "CreateItems", Create.of( - IntStream.range(0, 1000000).mapToObj(i -> "item" + i).collect(Collectors.toList()))) + IntStream.range(0, 100).mapToObj(i -> "item" + i).collect(Collectors.toList()))) .apply( "CallExternalService", ParDo.of( diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java index 6adcef078e3d..93210a9777b9 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java @@ -37,4 +37,9 @@ public EnvoyRateLimiter(EnvoyRateLimiterFactory factory, EnvoyRateLimiterContext public boolean allow(int permits) throws IOException, InterruptedException { return factory.allow(context, permits); } + + @Override + public void close() throws Exception { + factory.close(); + } } diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java index 4b88aa3a1b0c..89b644c165f1 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java @@ -18,9 +18,11 @@ package org.apache.beam.sdk.io.components.ratelimiter; import com.google.auto.value.AutoValue; +import edu.umd.cs.findbugs.annotations.NonNull; import java.util.Map; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; /** * Context for an Envoy Rate Limiter call. @@ -33,9 +35,28 @@ public abstract class EnvoyRateLimiterContext implements RateLimiterContext { public abstract String getDomain(); - public abstract Map getDescriptors(); + public abstract ImmutableMap getDescriptors(); - public static EnvoyRateLimiterContext create(String domain, Map descriptors) { - return new AutoValue_EnvoyRateLimiterContext(domain, descriptors); + public static Builder builder() { + return new AutoValue_EnvoyRateLimiterContext.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setDomain(@NonNull String domain); + + public abstract ImmutableMap.Builder descriptorsBuilder(); + + public Builder addDescriptor(@NonNull String key, @NonNull String value) { + descriptorsBuilder().put(key, value); + return this; + } + + public Builder setDescriptors(@NonNull Map descriptors) { + descriptorsBuilder().putAll(descriptors); + return this; + } + + public abstract EnvoyRateLimiterContext build(); } } diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java index 2adb9e87a980..e56dd0a5d990 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java @@ -30,7 +30,6 @@ import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.util.Sleeper; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +42,7 @@ public class EnvoyRateLimiterFactory implements RateLimiterFactory { private final RateLimiterOptions options; private transient volatile @Nullable RateLimitServiceGrpc.RateLimitServiceBlockingStub stub; + private transient @Nullable RateLimiterClientCache clientCache; private final ThrottlingSignaler throttlingSignaler; private final Counter requestsTotal; @@ -64,17 +64,24 @@ public EnvoyRateLimiterFactory(RateLimiterOptions options) { this.rpcLatency = Metrics.distribution(namespace, "ratelimit-rpc-latency-ms"); } + @Override + public synchronized void close() { + if (clientCache != null) { + clientCache.release(); + clientCache = null; + stub = null; + } + } + private void init() { if (stub != null) { return; } synchronized (this) { if (stub == null) { - RateLimiterClientCache clientCache = - RateLimiterClientCache.getOrCreate(options.getAddress()); - stub = - RateLimitServiceGrpc.newBlockingStub( - Preconditions.checkNotNull(clientCache).getChannel()); + RateLimiterClientCache cache = RateLimiterClientCache.getOrCreate(options.getAddress()); + this.clientCache = cache; + stub = RateLimitServiceGrpc.newBlockingStub(cache.getChannel()); } } } diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java index 6620354fe7cd..8c02654b3964 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java @@ -26,7 +26,7 @@ * *

Implementations must be {@link Serializable} as they are passed to workers. */ -public interface RateLimiter extends Serializable { +public interface RateLimiter extends Serializable, AutoCloseable { /** * Blocks until the specified number of permits are acquired and returns true if the request was diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java index 6ff1c8316cf4..b4330cd53db5 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java @@ -26,7 +26,7 @@ *

Implementations must be {@link Serializable} as they are passed to workers. The factory * typically manages the heavy connection (e.g. gRPC stub) and is thread-safe. */ -public interface RateLimiterFactory extends Serializable { +public interface RateLimiterFactory extends Serializable, AutoCloseable { /** * Creates a lightweight ratelimiter handle bound to a specific context. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 78a70cb3b3bb..1568faf25928 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -2510,8 +2510,11 @@ public ServiceCallMetric load(String tableName) { } @Teardown - public void teardown() { + public void teardown() throws Exception { spannerAccessor.close(); + if (rateLimiter != null) { + rateLimiter.close(); + } } @ProcessElement From f96570e379fe4320bbb228f48418e3a19bde2769 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Tue, 3 Feb 2026 12:02:51 -0800 Subject: [PATCH 5/7] add autocloseable --- .../java/org/apache/beam/examples/RateLimiterSimple.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java index 5b7225ffcda6..e7ad40747b13 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java +++ b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java @@ -60,7 +60,6 @@ static class CallExternalServiceFn extends DoFn { private final String rlsAddress; private final String rlsDomain; private transient @Nullable RateLimiter rateLimiter = null; - private transient @Nullable RateLimiterFactory factory = null; public CallExternalServiceFn(String rlsAddress, String rlsDomain) { this.rlsAddress = rlsAddress; @@ -74,7 +73,6 @@ public void setup() { // Static RateLimtier with pre-configured domain and descriptors EnvoyRateLimiterFactory factory = new EnvoyRateLimiterFactory(options); - this.factory = factory; EnvoyRateLimiterContext context = EnvoyRateLimiterContext.builder() .setDomain(rlsDomain) @@ -85,11 +83,11 @@ public void setup() { @Teardown public void teardown() { - if (factory != null) { + if (rateLimiter != null) { try { - factory.close(); + rateLimiter.close(); } catch (Exception e) { - throw new RuntimeException("Failed to close RateLimiterFactory", e); + throw new RuntimeException("Failed to close RateLimiter", e); } } } From b9be38285db139a0dcb1ddb95c74a7aeef83404a Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Wed, 4 Feb 2026 14:34:44 -0800 Subject: [PATCH 6/7] clean up --- .../beam/examples/RateLimiterSimple.java | 16 ++++++++++------ sdks/java/io/components/build.gradle | 12 +++++------- .../ratelimiter/EnvoyRateLimiter.java | 3 ++- .../ratelimiter/EnvoyRateLimiterContext.java | 2 +- .../ratelimiter/EnvoyRateLimiterFactory.java | 7 +++---- .../ratelimiter/RateLimiterOptions.java | 19 +++++++------------ 6 files changed, 28 insertions(+), 31 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java index e7ad40747b13..89e2d5d06802 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java +++ b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterContext; import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterFactory; import org.apache.beam.sdk.io.components.ratelimiter.RateLimiter; +import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterContext; import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterFactory; import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterOptions; import org.apache.beam.sdk.options.Description; @@ -33,6 +34,8 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A simple example demonstrating how to use the {@link RateLimiter} in a custom {@link DoFn}. @@ -45,12 +48,12 @@ public class RateLimiterSimple { public interface Options extends PipelineOptions { - @Description("Address of the Envoy Rate Limit Service (e.g., localhost:8081)") + @Description("Address of the Envoy Rate Limit Service(eg:localhost:8081)") String getRateLimiterAddress(); void setRateLimiterAddress(String value); - @Description("Domain for the Rate Limit Service") + @Description("Domain for the Rate Limit Service(eg:mydomain)") String getRateLimiterDomain(); void setRateLimiterDomain(String value); @@ -59,7 +62,8 @@ public interface Options extends PipelineOptions { static class CallExternalServiceFn extends DoFn { private final String rlsAddress; private final String rlsDomain; - private transient @Nullable RateLimiter rateLimiter = null; + private transient @Nullable RateLimiter rateLimiter; + private static final Logger LOG = LoggerFactory.getLogger(CallExternalServiceFn.class); public CallExternalServiceFn(String rlsAddress, String rlsDomain) { this.rlsAddress = rlsAddress; @@ -72,8 +76,8 @@ public void setup() { RateLimiterOptions options = RateLimiterOptions.builder().setAddress(rlsAddress).build(); // Static RateLimtier with pre-configured domain and descriptors - EnvoyRateLimiterFactory factory = new EnvoyRateLimiterFactory(options); - EnvoyRateLimiterContext context = + RateLimiterFactory factory = new EnvoyRateLimiterFactory(options); + RateLimiterContext context = EnvoyRateLimiterContext.builder() .setDomain(rlsDomain) .addDescriptor("database", "users") @@ -102,8 +106,8 @@ public void processElement(ProcessContext c) throws Exception { } // Simulate external API call + LOG.info("Processing: " + element); Thread.sleep(100); - System.out.println("Processing: " + element); c.output("Processed: " + element); } } diff --git a/sdks/java/io/components/build.gradle b/sdks/java/io/components/build.gradle index 87f93229a9c3..b19c58e658c7 100644 --- a/sdks/java/io/components/build.gradle +++ b/sdks/java/io/components/build.gradle @@ -26,6 +26,11 @@ ext.summary = "Components for building fully featured IOs" dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.auto_value_annotations + implementation library.java.envoy_control_plane_api + implementation library.java.grpc_api + implementation library.java.grpc_stub + implementation library.java.grpc_protobuf implementation library.java.protobuf_java permitUnusedDeclared library.java.protobuf_java // BEAM-11761 implementation library.java.slf4j_api @@ -38,11 +43,4 @@ dependencies { testImplementation library.java.hamcrest testRuntimeOnly library.java.slf4j_jdk14 testImplementation project(path: ":runners:direct-java", configuration: "shadow") - - // Envoy Rate Limiter Dependencies - implementation library.java.envoy_control_plane_api - implementation library.java.grpc_api - implementation library.java.grpc_stub - implementation library.java.grpc_protobuf - implementation library.java.auto_value_annotations } \ No newline at end of file diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java index 93210a9777b9..9fc3da80dca4 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java @@ -22,7 +22,8 @@ /** * A lightweight handle for an Envoy-based rate limiter. * - *

Delegates work to the {@link EnvoyRateLimiterFactory} using the baked-in {@link Context}. + *

Delegates work to the {@link EnvoyRateLimiterFactory} using the baked-in {@link + * EnvoyRateLimiterContext}. */ public class EnvoyRateLimiter implements RateLimiter { private final EnvoyRateLimiterFactory factory; diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java index 89b644c165f1..b710e74b914f 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java @@ -18,11 +18,11 @@ package org.apache.beam.sdk.io.components.ratelimiter; import com.google.auto.value.AutoValue; -import edu.umd.cs.findbugs.annotations.NonNull; import java.util.Map; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.NonNull; /** * Context for an Envoy Rate Limiter call. diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java index e56dd0a5d990..1644101994e5 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java @@ -80,8 +80,8 @@ private void init() { synchronized (this) { if (stub == null) { RateLimiterClientCache cache = RateLimiterClientCache.getOrCreate(options.getAddress()); - this.clientCache = cache; stub = RateLimitServiceGrpc.newBlockingStub(cache.getChannel()); + this.clientCache = cache; } } } @@ -135,14 +135,13 @@ private boolean callEnvoy(EnvoyRateLimiterContext context, int tokens) .addDescriptors(descriptorBuilder.build()) .build(); - boolean blockUntilAllowed = options.isBlockUntilAllowed(); - int maxRetries = options.getMaxRetries(); + Integer maxRetries = options.getMaxRetries(); long timeoutMillis = options.getTimeout().toMillis(); requestsTotal.inc(); int attempt = 0; while (true) { - if (!blockUntilAllowed && attempt > maxRetries) { + if (maxRetries != null && attempt > maxRetries) { return false; } diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java index a0aafdc09d89..95f3bba6200b 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java @@ -20,6 +20,7 @@ import com.google.auto.value.AutoValue; import java.io.Serializable; import java.time.Duration; +import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; @@ -29,28 +30,22 @@ public abstract class RateLimiterOptions implements Serializable { public abstract String getAddress(); - public abstract Duration getTimeout(); - - public abstract boolean isBlockUntilAllowed(); + @Nullable + public abstract Integer getMaxRetries(); - public abstract int getMaxRetries(); + public abstract Duration getTimeout(); public static Builder builder() { - return new AutoValue_RateLimiterOptions.Builder() - .setTimeout(Duration.ofSeconds(5)) - .setBlockUntilAllowed(true) - .setMaxRetries(3); + return new AutoValue_RateLimiterOptions.Builder().setTimeout(Duration.ofSeconds(5)); } @AutoValue.Builder public abstract static class Builder { public abstract Builder setAddress(String address); - public abstract Builder setTimeout(Duration timeout); - - public abstract Builder setBlockUntilAllowed(boolean blockUntilAllowed); + public abstract Builder setMaxRetries(Integer maxRetries); - public abstract Builder setMaxRetries(int maxRetries); + public abstract Builder setTimeout(Duration timeout); public abstract RateLimiterOptions build(); } From a163f7827ec27755cf88d4544183074b8f9a0caa Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 5 Feb 2026 11:07:03 -0800 Subject: [PATCH 7/7] refactor code --- .../ratelimiter/EnvoyRateLimiterFactory.java | 22 ++- .../ratelimiter/RateLimiterClientCache.java | 35 ++-- .../ratelimiter/RateLimiterOptions.java | 14 +- .../ratelimiter/EnvoyRateLimiterTest.java | 168 ++++++++++++++++++ .../RateLimiterClientCacheTest.java | 115 ++++++++++++ .../ratelimiter/RateLimiterOptionsTest.java | 81 +++++++++ 6 files changed, 416 insertions(+), 19 deletions(-) create mode 100644 sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterTest.java create mode 100644 sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCacheTest.java create mode 100644 sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptionsTest.java diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java index 1644101994e5..5a27c309d4ec 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java @@ -30,6 +30,8 @@ import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +46,7 @@ public class EnvoyRateLimiterFactory implements RateLimiterFactory { private transient volatile @Nullable RateLimitServiceGrpc.RateLimitServiceBlockingStub stub; private transient @Nullable RateLimiterClientCache clientCache; private final ThrottlingSignaler throttlingSignaler; + private final Sleeper sleeper; private final Counter requestsTotal; private final Counter requestsAllowed; @@ -53,7 +56,13 @@ public class EnvoyRateLimiterFactory implements RateLimiterFactory { private final Distribution rpcLatency; public EnvoyRateLimiterFactory(RateLimiterOptions options) { + this(options, Sleeper.DEFAULT); + } + + @VisibleForTesting + EnvoyRateLimiterFactory(RateLimiterOptions options, Sleeper sleeper) { this.options = options; + this.sleeper = sleeper; String namespace = EnvoyRateLimiterFactory.class.getName(); this.throttlingSignaler = new ThrottlingSignaler(namespace); this.requestsTotal = Metrics.counter(namespace, "ratelimit-requests-total"); @@ -69,8 +78,8 @@ public synchronized void close() { if (clientCache != null) { clientCache.release(); clientCache = null; - stub = null; } + stub = null; } private void init() { @@ -80,12 +89,17 @@ private void init() { synchronized (this) { if (stub == null) { RateLimiterClientCache cache = RateLimiterClientCache.getOrCreate(options.getAddress()); - stub = RateLimitServiceGrpc.newBlockingStub(cache.getChannel()); this.clientCache = cache; + stub = RateLimitServiceGrpc.newBlockingStub(cache.getChannel()); } } } + @VisibleForTesting + void setStub(RateLimitServiceGrpc.RateLimitServiceBlockingStub stub) { + this.stub = stub; + } + @Override public RateLimiter getLimiter(RateLimiterContext context) { if (!(context instanceof EnvoyRateLimiterContext)) { @@ -104,6 +118,7 @@ public boolean allow(RateLimiterContext context, int permits) + context.getClass().getName()); } EnvoyRateLimiterContext envoyContext = (EnvoyRateLimiterContext) context; + Preconditions.checkArgument(permits >= 0, "Permits must be non-negative"); return callEnvoy(envoyContext, permits); } @@ -111,10 +126,9 @@ private boolean callEnvoy(EnvoyRateLimiterContext context, int tokens) throws IOException, InterruptedException { init(); - Sleeper sleeper = Sleeper.DEFAULT; RateLimitServiceGrpc.RateLimitServiceBlockingStub currentStub = stub; if (currentStub == null) { - throw new IllegalStateException("RateLimitService stub is null"); + throw new IllegalStateException("RateLimitServiceStub is null"); } Map descriptors = context.getDescriptors(); diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java index 6e212786706a..9857ca9b4bd7 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java @@ -48,7 +48,11 @@ private RateLimiterClientCache(String address) { this.channel = ManagedChannelBuilder.forTarget(address).usePlaintext().build(); } - /** Gets or creates a cached client for the given address. Increments the reference count. */ + /** + * Gets or creates a cached client for the given address. Increments the reference count. + * Synchronized on the class to prevent race conditions when multiple instances call getOrCreate() + * simultaneously + */ public static synchronized RateLimiterClientCache getOrCreate(String address) { RateLimiterClientCache client = CACHE.get(address); if (client == null) { @@ -66,21 +70,24 @@ public ManagedChannel getChannel() { /** * Releases the client. Decrements the reference count. If reference count reaches 0, the channel - * is shut down and removed from the cache. + * is shut down and removed from the cache. Synchronized on the class to prevent race conditions + * when multiple instances call release() simultaneously */ - public synchronized void release() { - refCount--; - LOG.debug("Released RLS Channel for {}. New RefCount: {}", address, refCount); - if (refCount <= 0) { - LOG.info("Closing ManagedChannel for RLS at {}", address); - CACHE.remove(address); - channel.shutdown(); - try { - channel.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOG.error("Couldn't gracefully close gRPC channel={}", channel, e); + public void release() { + synchronized (RateLimiterClientCache.class) { + refCount--; + LOG.debug("Released RLS Channel for {}. New RefCount: {}", address, refCount); + if (refCount <= 0) { + LOG.info("Closing ManagedChannel for RLS at {}", address); + CACHE.remove(address); + channel.shutdown(); + try { + channel.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.error("Couldn't gracefully close gRPC channel={}", channel, e); + } + channel.shutdownNow(); } - channel.shutdownNow(); } } } diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java index 95f3bba6200b..3b925609bd70 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java @@ -23,6 +23,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; /** Configuration options for {@link RateLimiterFactory}. */ @DefaultSchema(AutoValueSchema.class) @@ -47,6 +48,17 @@ public abstract static class Builder { public abstract Builder setTimeout(Duration timeout); - public abstract RateLimiterOptions build(); + abstract RateLimiterOptions autoBuild(); + + public RateLimiterOptions build() { + RateLimiterOptions options = autoBuild(); + Preconditions.checkArgument( + options.getTimeout().compareTo(Duration.ZERO) > 0, "Timeout must be positive"); + Integer maxRetries = options.getMaxRetries(); + if (maxRetries != null) { + Preconditions.checkArgument(maxRetries >= 0, "MaxRetries must be non-negative"); + } + return options; + } } } diff --git a/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterTest.java b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterTest.java new file mode 100644 index 000000000000..e94d0b42eb3c --- /dev/null +++ b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.verify; + +import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitRequest; +import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitResponse; +import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitServiceGrpc; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import org.apache.beam.sdk.util.Sleeper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link EnvoyRateLimiterFactory}. */ +@RunWith(JUnit4.class) +public class EnvoyRateLimiterTest { + @Mock private Sleeper sleeper; + + private EnvoyRateLimiterFactory factory; + private RateLimiterOptions options; + private EnvoyRateLimiterContext context; + + private Server server; + private ManagedChannel channel; + private TestRateLimitService service; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.openMocks(this); + options = + RateLimiterOptions.builder() + .setAddress("localhost:8081") + .setTimeout(java.time.Duration.ofSeconds(1)) + .build(); + + String serverName = InProcessServerBuilder.generateName(); + service = new TestRateLimitService(); + server = + InProcessServerBuilder.forName(serverName) + .directExecutor() + .addService(service) + .build() + .start(); + channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); + + factory = new EnvoyRateLimiterFactory(options, sleeper); + factory.setStub(RateLimitServiceGrpc.newBlockingStub(channel)); + + context = + EnvoyRateLimiterContext.builder() + .setDomain("test-domain") + .addDescriptor("key", "value") + .build(); + } + + @After + public void tearDown() { + if (channel != null) { + channel.shutdownNow(); + } + if (server != null) { + server.shutdownNow(); + } + } + + @Test + public void testAllow_OK() throws Exception { + service.responseToReturn = + RateLimitResponse.newBuilder().setOverallCode(RateLimitResponse.Code.OK).build(); + + assertTrue(factory.allow(context, 1)); + } + + @Test + public void testAllow_OverLimit() throws Exception { + service.responseToReturn = + RateLimitResponse.newBuilder() + .setOverallCode(RateLimitResponse.Code.OVER_LIMIT) + .addStatuses( + RateLimitResponse.DescriptorStatus.newBuilder() + .setCode(RateLimitResponse.Code.OVER_LIMIT) + .setDurationUntilReset( + com.google.protobuf.Duration.newBuilder().setSeconds(1).build()) + .build()) + .build(); + + factory = + new EnvoyRateLimiterFactory( + RateLimiterOptions.builder() + .setAddress("foo") + .setTimeout(java.time.Duration.ofSeconds(1)) + .setMaxRetries(1) + .build(), + sleeper); + factory.setStub(RateLimitServiceGrpc.newBlockingStub(channel)); + + assertFalse(factory.allow(context, 1)); + + // Verify sleep was called. + verify(sleeper, org.mockito.Mockito.atLeastOnce()).sleep(anyLong()); + } + + @Test + public void testAllow_RpcError() throws Exception { + service.errorToThrow = Status.UNAVAILABLE.asRuntimeException(); + assertThrows(IOException.class, () -> factory.allow(context, 1)); + } + + @Test + public void testInvalidContext() { + assertThrows( + IllegalArgumentException.class, () -> factory.allow(new RateLimiterContext() {}, 1)); + } + + static class TestRateLimitService extends RateLimitServiceGrpc.RateLimitServiceImplBase { + volatile RateLimitResponse responseToReturn; + volatile RuntimeException errorToThrow; + + @Override + public void shouldRateLimit( + RateLimitRequest request, StreamObserver responseObserver) { + if (errorToThrow != null) { + responseObserver.onError(errorToThrow); + return; + } + if (responseToReturn != null) { + responseObserver.onNext(responseToReturn); + responseObserver.onCompleted(); + } else { + // Default OK + responseObserver.onNext( + RateLimitResponse.newBuilder().setOverallCode(RateLimitResponse.Code.OK).build()); + responseObserver.onCompleted(); + } + } + } +} diff --git a/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCacheTest.java b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCacheTest.java new file mode 100644 index 000000000000..4eb61b279c34 --- /dev/null +++ b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCacheTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link RateLimiterClientCache}. */ +@RunWith(JUnit4.class) +public class RateLimiterClientCacheTest { + + @Test + public void testGetOrCreate_SameAddress() { + String address = "addr1"; + RateLimiterClientCache client1 = RateLimiterClientCache.getOrCreate(address); + RateLimiterClientCache client2 = RateLimiterClientCache.getOrCreate(address); + + assertSame(client1, client2); + assertFalse(client1.getChannel().isShutdown()); + + // cleanup + client1.release(); + // client2 is still using the same channel + assertFalse(client1.getChannel().isShutdown()); + client2.release(); + assertTrue(client1.getChannel().isShutdown()); + } + + @Test + public void testGetOrCreate_DifferentAddress_ReturnsDifferentInstances() { + RateLimiterClientCache client1 = RateLimiterClientCache.getOrCreate("addr1"); + RateLimiterClientCache client2 = RateLimiterClientCache.getOrCreate("addr2"); + + assertNotSame(client1, client2); + + assertFalse(client1.getChannel().isShutdown()); + assertFalse(client2.getChannel().isShutdown()); + client1.release(); + assertTrue(client1.getChannel().isShutdown()); + client2.release(); + assertTrue(client2.getChannel().isShutdown()); + } + + @Test + public void testConcurrency() throws InterruptedException, ExecutionException { + int threads = 10; + int iterations = 100; + String address = "concurrent-addr"; + ExecutorService pool = Executors.newFixedThreadPool(threads); + List> futures = new ArrayList<>(); + + for (int i = 0; i < threads; i++) { + futures.add( + pool.submit( + new Callable() { + @Override + public Boolean call() { + for (int j = 0; j < iterations; j++) { + RateLimiterClientCache client = RateLimiterClientCache.getOrCreate(address); + // do some tiny work + try { + Thread.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + client.release(); + } + return true; + } + })); + } + + for (Future f : futures) { + assertTrue(f.get()); + } + + pool.shutdown(); + pool.awaitTermination(5, TimeUnit.SECONDS); + + // After all threads are done, cache should be empty or create new one cleanly + RateLimiterClientCache client = RateLimiterClientCache.getOrCreate(address); + assertFalse(client.getChannel().isShutdown()); + client.release(); + assertTrue(client.getChannel().isShutdown()); + } +} diff --git a/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptionsTest.java b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptionsTest.java new file mode 100644 index 000000000000..cb8674b4e502 --- /dev/null +++ b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptionsTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import java.time.Duration; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link RateLimiterOptions}. */ +@RunWith(JUnit4.class) +public class RateLimiterOptionsTest { + + @Test + public void testValidOptions() { + RateLimiterOptions options = + RateLimiterOptions.builder() + .setAddress("localhost:8081") + .setTimeout(Duration.ofSeconds(1)) + .setMaxRetries(3) + .build(); + + assertEquals("localhost:8081", options.getAddress()); + assertEquals(Duration.ofSeconds(1), options.getTimeout()); + assertEquals(Integer.valueOf(3), options.getMaxRetries()); + } + + @Test + public void testNegativeTimeout() { + assertThrows( + IllegalArgumentException.class, + () -> + RateLimiterOptions.builder() + .setAddress("localhost:8081") + .setTimeout(Duration.ofSeconds(-1)) + .build()); + } + + @Test + public void testZeroTimeout() { + assertThrows( + IllegalArgumentException.class, + () -> + RateLimiterOptions.builder() + .setAddress("localhost:8081") + .setTimeout(Duration.ZERO) + .build()); + } + + @Test + public void testNegativeMaxRetries() { + assertThrows( + IllegalArgumentException.class, + () -> RateLimiterOptions.builder().setAddress("localhost:8081").setMaxRetries(-1).build()); + } + + @Test + public void testNullMaxRetriesIsAllowed() { + RateLimiterOptions options = + RateLimiterOptions.builder().setAddress("localhost:8081").setMaxRetries(null).build(); + assertEquals(null, options.getMaxRetries()); + } +}