Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ class BeamModulePlugin implements Plugin<Project> {
commons_math3 : "org.apache.commons:commons-math3:3.6.1",
dbcp2 : "org.apache.commons:commons-dbcp2:$dbcp2_version",
error_prone_annotations : "com.google.errorprone:error_prone_annotations:$errorprone_version",
envoy_control_plane_api : "io.envoyproxy.controlplane:api:1.0.49",
failsafe : "dev.failsafe:failsafe:3.3.0",
flogger_system_backend : "com.google.flogger:flogger-system-backend:0.7.4",
gax : "com.google.api:gax", // google_cloud_platform_libraries_bom sets version
Expand Down
1 change: 1 addition & 0 deletions examples/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dependencies {
implementation project(":sdks:java:extensions:python")
implementation project(":sdks:java:io:google-cloud-platform")
implementation project(":sdks:java:io:kafka")
implementation project(":sdks:java:io:components")
implementation project(":sdks:java:extensions:ml")
implementation library.java.avro
implementation library.java.bigdataoss_util
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples;

import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterContext;
import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterFactory;
import org.apache.beam.sdk.io.components.ratelimiter.RateLimiter;
import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterContext;
import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterFactory;
import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterOptions;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A simple example demonstrating how to use the {@link RateLimiter} in a custom {@link DoFn}.
*
* <p>This pipeline creates a small set of elements and processes them using a DoFn that calls an
* external service (simulated). The processing is rate-limited using an Envoy Rate Limit Service.
*
* <p>To run this example, you need a running Envoy Rate Limit Service.
*/
public class RateLimiterSimple {

public interface Options extends PipelineOptions {
@Description("Address of the Envoy Rate Limit Service(eg:localhost:8081)")
String getRateLimiterAddress();

void setRateLimiterAddress(String value);

@Description("Domain for the Rate Limit Service(eg:mydomain)")
String getRateLimiterDomain();

void setRateLimiterDomain(String value);
}

static class CallExternalServiceFn extends DoFn<String, String> {
private final String rlsAddress;
private final String rlsDomain;
private transient @Nullable RateLimiter rateLimiter;
private static final Logger LOG = LoggerFactory.getLogger(CallExternalServiceFn.class);

public CallExternalServiceFn(String rlsAddress, String rlsDomain) {
this.rlsAddress = rlsAddress;
this.rlsDomain = rlsDomain;
}

@Setup
public void setup() {
// Create the RateLimiterOptions.
RateLimiterOptions options = RateLimiterOptions.builder().setAddress(rlsAddress).build();

// Static RateLimtier with pre-configured domain and descriptors
RateLimiterFactory factory = new EnvoyRateLimiterFactory(options);
RateLimiterContext context =
EnvoyRateLimiterContext.builder()
.setDomain(rlsDomain)
.addDescriptor("database", "users")
.build();
this.rateLimiter = factory.getLimiter(context);
}

@Teardown
public void teardown() {
if (rateLimiter != null) {
try {
rateLimiter.close();
} catch (Exception e) {
throw new RuntimeException("Failed to close RateLimiter", e);
}
}
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String element = c.element();
try {
Preconditions.checkNotNull(rateLimiter).allow(1);
} catch (Exception e) {
throw new RuntimeException("Failed to acquire rate limit token", e);
}

// Simulate external API call
LOG.info("Processing: " + element);
Thread.sleep(100);
c.output("Processed: " + element);
}
}

public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);

p.apply(
"CreateItems",
Create.of(
IntStream.range(0, 100).mapToObj(i -> "item" + i).collect(Collectors.toList())))
.apply(
"CallExternalService",
ParDo.of(
new CallExternalServiceFn(
options.getRateLimiterAddress(), options.getRateLimiterDomain())));

p.run().waitUntilFinish();
}
}
7 changes: 6 additions & 1 deletion sdks/java/io/components/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@

plugins { id 'org.apache.beam.module' }
applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.components',
automaticModuleName: 'org.apache.beam.sdk.io.components',
)

description = "Apache Beam :: SDKs :: Java :: IO :: Components"
ext.summary = "Components for building fully featured IOs"

dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.auto_value_annotations
implementation library.java.envoy_control_plane_api
implementation library.java.grpc_api
implementation library.java.grpc_stub
implementation library.java.grpc_protobuf
implementation library.java.protobuf_java
permitUnusedDeclared library.java.protobuf_java // BEAM-11761
implementation library.java.slf4j_api
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.components.ratelimiter;

import java.io.IOException;

/**
* A lightweight handle for an Envoy-based rate limiter.
*
* <p>Delegates work to the {@link EnvoyRateLimiterFactory} using the baked-in {@link
* EnvoyRateLimiterContext}.
*/
public class EnvoyRateLimiter implements RateLimiter {
private final EnvoyRateLimiterFactory factory;
private final EnvoyRateLimiterContext context;

public EnvoyRateLimiter(EnvoyRateLimiterFactory factory, EnvoyRateLimiterContext context) {
this.factory = factory;
this.context = context;
}

@Override
public boolean allow(int permits) throws IOException, InterruptedException {
return factory.allow(context, permits);
}

@Override
public void close() throws Exception {
factory.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.components.ratelimiter;

import com.google.auto.value.AutoValue;
import java.util.Map;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.NonNull;

/**
* Context for an Envoy Rate Limiter call.
*
* <p>Contains the domain and descriptors required to define a specific rate limit bucket.
*/
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class EnvoyRateLimiterContext implements RateLimiterContext {

public abstract String getDomain();

public abstract ImmutableMap<String, String> getDescriptors();

public static Builder builder() {
return new AutoValue_EnvoyRateLimiterContext.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setDomain(@NonNull String domain);

public abstract ImmutableMap.Builder<String, String> descriptorsBuilder();

public Builder addDescriptor(@NonNull String key, @NonNull String value) {
descriptorsBuilder().put(key, value);
return this;
}

public Builder setDescriptors(@NonNull Map<String, String> descriptors) {
descriptorsBuilder().putAll(descriptors);
return this;
}

public abstract EnvoyRateLimiterContext build();
}
}
Loading
Loading