Skip to content

Commit 9dc4862

Browse files
authored
Merge pull request #167 from Dondehip/master
DynamoDB hedging Sample
2 parents fdb2801 + 639c10a commit 9dc4862

File tree

7 files changed

+632
-0
lines changed

7 files changed

+632
-0
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
HELP.md
2+
target/
3+
!.mvn/wrapper/maven-wrapper.jar
4+
!**/src/main/**/target/
5+
!**/src/test/**/target/
6+
7+
### STS ###
8+
.apt_generated
9+
.classpath
10+
.factorypath
11+
.project
12+
.settings
13+
.springBeans
14+
.sts4-cache
15+
16+
### IntelliJ IDEA ###
17+
.idea
18+
*.iws
19+
*.iml
20+
*.ipr
21+
22+
### NetBeans ###
23+
/nbproject/private/
24+
/nbbuild/
25+
/dist/
26+
/nbdist/
27+
/.nb-gradle/
28+
build/
29+
!**/src/main/**/build/
30+
!**/src/test/**/build/
31+
32+
### VS Code ###
33+
.vscode/
34+
src/test/.DS_Store
35+
src/test/java/.DS_Store
36+
src/test/java/com/.DS_Store
37+
src/.DS_Store
38+
src/main/.DS_Store
39+
src/main/java/.DS_Store
40+
src/main/java/com/.DS_Store
41+
/**/.DS_Store
42+
mvnw
43+
mvnw.cmd
44+
.mvn/wrapper/maven-wrapper.jar
45+
.mvn/wrapper/maven-wrapper.properties
46+
47+
#### Project specific #####
48+
loadtest/data/**
49+
logs/*.*
50+
51+
*.log
52+
*.code-workspace
53+
/.gradle/
54+
/build/
55+
56+
/.idea/
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# DynamoDB Hedging Pattern Implementation with Java
2+
3+
This project demonstrates the implementation of the hedging pattern with Amazon DynamoDB using the AWS SDK for Java v2. The hedging pattern helps improve tail latency by sending multiple identical requests and using the first response that arrives.
4+
5+
## Concepts
6+
7+
### What is Request Hedging?
8+
Request hedging is a reliability pattern where multiple identical requests are sent to different replicas of a service. The first successful response is used while other requests are canceled. This pattern helps reduce tail latency at the cost of additional resource usage.
9+
10+
### Benefits
11+
- Reduces p99 latency (tail latency)
12+
- Improves reliability by routing around slow DynamoDB replicas
13+
- Handles transient issues automatically
14+
15+
### Trade-offs
16+
- Increased DynamoDB consumed capacity units
17+
- Higher costs due to multiple requests
18+
- Additional client-side complexity
19+
20+
## Project Structure
21+
22+
```
23+
src/
24+
├── main/
25+
│ └── java/
26+
│ └── com/example/dynamodbhedging/
27+
│ ├── DDBHedgingRequestHandler.java // Generic hedging request handler implementation
28+
│ ├── DynamoDBHedgedQuery.java // Main class demoestrating the use of hedging
29+
│ └── DynamoDBOperations.java // DynamoDB operations with hedging
30+
└── test/
31+
└── java/
32+
└── com/example/dynamodbhedging/
33+
└── DynamoDBHedgedQueryTest.java // Test cases
34+
35+
```
36+
## Getting Started
37+
### Prerequisites
38+
39+
* Java 21 or later
40+
41+
* Maven
42+
43+
* DynamoDB table created with a partitioned key and sample data loaded
44+
45+
### How to use the DDBHedgingRequestHandler class
46+
```java
47+
DDBHedgingRequestHandler<QueryResponse> hedgingHandler = new DDBHedgingRequestHandler();
48+
49+
// create the DynamoDB operation
50+
Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
51+
expressionAttributeValues.put(":pkValue", AttributeValue.builder().s(partitionKeyValue).build());
52+
53+
// Create the query request
54+
QueryRequest queryRequest = QueryRequest.builder().tableName(tableName).keyConditionExpression(partitionKeyName + " = :pkValue").expressionAttributeValues(expressionAttributeValues).build();
55+
56+
// Create the supplier that will execute the query and pass to the hedgeRequests() method of the DDBHedgingRequestHandler class
57+
return hedgingHandler.hedgeRequests(() -> asyncClient.query(queryRequest), List.of(50) // Delays in milliseconds for hedge requests, you can pass multiple values if you wish to do multiple hedging calls
58+
);
59+
```
60+
61+
### Running the sample:
62+
63+
Maven plugin configuration is provided for running the sample class DynamoDBHedgedQuery.
64+
To run this class update the following configurations to suite your environment.
65+
66+
```xml
67+
<plugin>
68+
<groupId>org.codehaus.mojo</groupId>
69+
<artifactId>exec-maven-plugin</artifactId>
70+
<configuration>
71+
<mainClass>com.example.dynamodbhedging.DynamoDBHedgedQuery</mainClass>
72+
<arguments>
73+
74+
<argument><!-- DynamoDB table name --></argument>
75+
76+
<argument><!-- Name of the Partition Key --></argument>
77+
78+
<argument><!-- Partition Key value for the items to retrieve --></argument>
79+
80+
<argument><!-- Number of iterations for the code to run --></argument>
81+
</arguments>
82+
</configuration>
83+
</plugin>
84+
```
85+
After adding this configuration, you can run your main class using:
86+
87+
```bash
88+
mvn exec:java
89+
```
90+
91+
If you need to pass different arguments at runtime, you can override the configured arguments using the command line:
92+
93+
```bash
94+
mvn exec:java -Dexec.args="DynamoBDTableName Partition_Key Key_value number_of_iterations"
95+
```
96+
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<groupId>DynamoDBHedgingJ2</groupId>
7+
<artifactId>DynamoDBHedgingJ2</artifactId>
8+
<version>1.0-SNAPSHOT</version>
9+
<properties>
10+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
11+
<java.version>17</java.version>
12+
<maven.compiler.target>17</maven.compiler.target>
13+
<maven.compiler.source>17</maven.compiler.source>
14+
</properties>
15+
<build>
16+
<plugins>
17+
<plugin>
18+
<groupId>org.apache.maven.plugins</groupId>
19+
<artifactId>maven-compiler-plugin</artifactId>
20+
<version>3.1</version>
21+
<configuration>
22+
<source>${java.version}</source>
23+
<target>${java.version}</target>
24+
</configuration>
25+
</plugin>
26+
<plugin>
27+
<groupId>org.apache.maven.plugins</groupId>
28+
<artifactId>maven-surefire-plugin</artifactId>
29+
<version>3.5.2</version>
30+
<configuration>
31+
<argLine>-XX:+EnableDynamicAgentLoading</argLine>
32+
</configuration>
33+
</plugin>
34+
<plugin>
35+
<groupId>org.codehaus.mojo</groupId>
36+
<artifactId>exec-maven-plugin</artifactId>
37+
<version>3.5.0</version>
38+
<configuration>
39+
<mainClass>com.example.dynamodbhedging.DynamoDBHedgedQuery</mainClass>
40+
<arguments>
41+
<!-- DynamoDB table name -->
42+
<argument>hedging-demo-102</argument>
43+
<!-- Name of the Partition Key -->
44+
<argument>PK</argument>
45+
<!-- Partition Key value for the items to retrieve -->
46+
<argument>7343-K7Ws6YE0MTfJwQn</argument>
47+
<!-- Number of iterations for the code to run -->
48+
<argument>100</argument>
49+
</arguments>
50+
</configuration>
51+
</plugin>
52+
</plugins>
53+
</build>
54+
<dependencyManagement>
55+
<dependencies>
56+
<dependency>
57+
<groupId>software.amazon.awssdk</groupId>
58+
<artifactId>bom</artifactId>
59+
<version>2.29.45</version>
60+
<type>pom</type>
61+
<scope>import</scope>
62+
</dependency>
63+
</dependencies>
64+
</dependencyManagement>
65+
<dependencies>
66+
<dependency>
67+
<groupId>software.amazon.awssdk</groupId>
68+
<artifactId>dynamodb-enhanced</artifactId>
69+
</dependency>
70+
<dependency>
71+
<groupId>org.junit.jupiter</groupId>
72+
<artifactId>junit-jupiter</artifactId>
73+
<version>5.11.4</version>
74+
<scope>test</scope>
75+
</dependency>
76+
<dependency>
77+
<groupId>org.slf4j</groupId>
78+
<artifactId>slf4j-log4j12</artifactId>
79+
<version>1.7.36</version>
80+
</dependency>
81+
<dependency>
82+
<groupId>software.amazon.awssdk</groupId>
83+
<artifactId>dynamodb</artifactId>
84+
</dependency>
85+
<dependency>
86+
<groupId>io.reactivex.rxjava3</groupId>
87+
<artifactId>rxjava</artifactId>
88+
<version>3.1.6</version>
89+
</dependency>
90+
<dependency>
91+
<groupId>io.projectreactor</groupId>
92+
<artifactId>reactor-core</artifactId>
93+
<version>3.3.5.RELEASE</version>
94+
</dependency>
95+
<dependency>
96+
<groupId>software.amazon.awssdk</groupId>
97+
<artifactId>sso</artifactId>
98+
</dependency>
99+
<dependency>
100+
<groupId>software.amazon.awssdk</groupId>
101+
<artifactId>ssooidc</artifactId>
102+
</dependency>
103+
<dependency>
104+
<groupId>org.mockito</groupId>
105+
<artifactId>mockito-core</artifactId>
106+
<version>5.11.0</version>
107+
<scope>test</scope>
108+
</dependency>
109+
<dependency>
110+
<groupId>org.mockito</groupId>
111+
<artifactId>mockito-junit-jupiter</artifactId>
112+
<version>5.11.0</version>
113+
<scope>test</scope>
114+
</dependency>
115+
</dependencies>
116+
</project>
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package com.example.dynamodbhedging;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.TimeUnit;
10+
import java.util.function.Supplier;
11+
12+
/**
13+
* A generic request handler that implements hedging pattern for DynamoDB requests.
14+
* Hedging helps improve tail latency by sending multiple identical requests after specified delays
15+
* and using the first successful response.
16+
*
17+
* @param <T> The type of response expected from the requests
18+
*/
19+
public class DDBHedgingRequestHandler<T> {
20+
21+
// Logger instance for tracking request execution and debugging
22+
private static final Logger logger = LoggerFactory.getLogger(DDBHedgingRequestHandler.class);
23+
24+
/**
25+
* Executes a request with hedging strategy. This method sends an initial request and then
26+
* sends additional identical requests (hedged requests) after specified delays if the initial
27+
* request hasn't completed.
28+
*
29+
* @param supplier A supplier that produces the CompletableFuture for the request to be hedged
30+
* @param delaysInMillis A list of delays (in milliseconds) for when to send subsequent hedged requests.
31+
* Each delay represents the time to wait before sending the next request.
32+
* If null or empty, only the initial request will be sent.
33+
* @return A CompletableFuture that completes with the first successful response from any of the requests
34+
*/
35+
public CompletableFuture<T> hedgeRequests(
36+
Supplier<CompletableFuture<T>> supplier,
37+
List<Integer> delaysInMillis) {
38+
39+
// If no delays are specified, just execute a single request without hedging
40+
if (delaysInMillis == null || delaysInMillis.isEmpty()) {
41+
return supplier.get();
42+
}
43+
44+
// Execute the initial request immediately
45+
logger.info("Initiating initial request");
46+
CompletableFuture<T> firstRequest = supplier.get()
47+
.thenApply(response -> response);
48+
49+
// Keep track of all requests (initial + hedged) for management
50+
List<CompletableFuture<T>> allRequests = new ArrayList<>();
51+
allRequests.add(firstRequest);
52+
53+
// Create hedged requests for each delay
54+
for (int i = 0; i < delaysInMillis.size(); i++) {
55+
// Calculate request number for logging (2 onwards, as 1 is the initial request)
56+
final int requestNumber = i + 2;
57+
58+
long delay = delaysInMillis.get(i);
59+
60+
// Create a new hedged request with specified delay
61+
CompletableFuture<T> hedgedRequest = CompletableFuture.supplyAsync(() -> {
62+
logger.info("Check: Before hedged request#{} can be initiated", requestNumber);
63+
64+
// Before executing a new hedged request, check if any previous request has completed
65+
CompletableFuture<T> completedFuture = allRequests.stream()
66+
.filter(CompletableFuture::isDone)
67+
.findFirst()
68+
.orElse(null);
69+
70+
// If a previous request has completed, use its result instead of making a new request
71+
if (completedFuture != null) {
72+
logger.info("Previous request already completed, skipping hedge request#{}", requestNumber);
73+
return completedFuture.join();
74+
}
75+
76+
// Execute the hedged request if no previous request has completed
77+
logger.info("Initiating hedge request#{}", requestNumber);
78+
return supplier.get()
79+
.thenApply(response -> {
80+
// Pass through the successful response
81+
return response;
82+
})
83+
.exceptionally(throwable -> {
84+
// If this hedged request fails, fall back to the result of the first request
85+
logger.warn("Hedged request#{} failed: {}", requestNumber, throwable.getMessage());
86+
return firstRequest.join();
87+
})
88+
.join();
89+
}, CompletableFuture.delayedExecutor(delay, TimeUnit.MILLISECONDS));
90+
91+
// Add the hedged request to the list of all requests
92+
allRequests.add(hedgedRequest);
93+
}
94+
95+
// Return a future that completes when any of the request completes successfully
96+
return CompletableFuture.anyOf(allRequests.toArray(new CompletableFuture[0]))
97+
.thenApply(result -> {
98+
// Cast the result to the expected type
99+
T response = (T) result;
100+
// Clean up by cancelling any remaining pending requests
101+
cancelPendingRequests(allRequests);
102+
return response;
103+
});
104+
}
105+
106+
/**
107+
* Cancels all pending requests that haven't completed yet.
108+
* This is called after receiving the first successful response to avoid unnecessary processing.
109+
*
110+
* @param allRequests List of all CompletableFuture requests that were initiated
111+
*/
112+
private void cancelPendingRequests(List<CompletableFuture<T>> allRequests) {
113+
logger.info("Cancelling pending requests");
114+
// Iterate through all requests and cancel those that haven't completed
115+
allRequests.forEach(request -> {
116+
if (!request.isDone()) {
117+
request.cancel(true);
118+
}
119+
});
120+
}
121+
}

0 commit comments

Comments
 (0)