diff --git a/micronaut/pom.xml b/micronaut/pom.xml
index 38ab3ae..eaf0ede 100644
--- a/micronaut/pom.xml
+++ b/micronaut/pom.xml
@@ -4,7 +4,7 @@
com.reforge
sdk-parent
- 0.3.26
+ 0.3.27
sdk-micronaut-extension
diff --git a/pom.xml b/pom.xml
index 7a5d08f..62a3335 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,7 +11,7 @@
com.reforge
sdk-parent
- 0.3.26
+ 0.3.27
pom
Reforge SDK Parent POM
Parent POM for Reforge SDK modules providing feature flags, configuration management, and A/B testing capabilities
diff --git a/sdk/pom.xml b/sdk/pom.xml
index d066bf6..311cdf5 100644
--- a/sdk/pom.xml
+++ b/sdk/pom.xml
@@ -4,7 +4,7 @@
com.reforge
sdk-parent
- 0.3.26
+ 0.3.27
sdk
diff --git a/sdk/src/main/java/com/reforge/sdk/internal/HttpClient.java b/sdk/src/main/java/com/reforge/sdk/internal/HttpClient.java
index b86fba2..80fe1fa 100644
--- a/sdk/src/main/java/com/reforge/sdk/internal/HttpClient.java
+++ b/sdk/src/main/java/com/reforge/sdk/internal/HttpClient.java
@@ -275,6 +275,10 @@ private CompletableFuture>> requestConfigs
// Build a synthetic response for the 200 case.
Supplier supplier = () -> {
try {
+ if (bodyBytes.length == 0) {
+ LOG.warn("Rejecting zero-byte config data from HTTP response");
+ throw new IllegalArgumentException("Zero-byte config data is not valid");
+ }
return Prefab.Configs.parseFrom(bodyBytes);
} catch (IOException e) {
throw new UncheckedIOException(e);
@@ -287,6 +291,10 @@ private CompletableFuture>> requestConfigs
// For other status codes, simply wrap the response.
Supplier supplier = () -> {
try (ByteArrayInputStream bais = new ByteArrayInputStream(response.body())) {
+ if (response.body().length == 0) {
+ LOG.warn("Rejecting zero-byte config data from HTTP response");
+ throw new IllegalArgumentException("Zero-byte config data is not valid");
+ }
return Prefab.Configs.parseFrom(bais);
} catch (IOException e) {
throw new UncheckedIOException(e);
@@ -324,6 +332,10 @@ private HttpResponse> createCachedHitResponse(
) {
Supplier supplier = () -> {
try {
+ if (entry.data.length == 0) {
+ LOG.warn("Rejecting zero-byte config data from cache");
+ throw new IllegalArgumentException("Zero-byte config data is not valid");
+ }
return Prefab.Configs.parseFrom(entry.data);
} catch (IOException e) {
throw new UncheckedIOException(e);
diff --git a/sdk/src/main/java/com/reforge/sdk/internal/SseConfigStreamingSubscriber.java b/sdk/src/main/java/com/reforge/sdk/internal/SseConfigStreamingSubscriber.java
index 2ada95e..bcbff6b 100644
--- a/sdk/src/main/java/com/reforge/sdk/internal/SseConfigStreamingSubscriber.java
+++ b/sdk/src/main/java/com/reforge/sdk/internal/SseConfigStreamingSubscriber.java
@@ -117,13 +117,16 @@ public void onNext(Event item) {
hasReceivedData.set(true);
String dataPayload = dataEvent.getData().trim();
if (!dataPayload.isEmpty()) {
- Prefab.Configs configs = Prefab.Configs.parseFrom(
- Base64.getDecoder().decode(dataPayload)
- );
- if (!configs.hasConfigServicePointer()) {
- LOG.debug("Ignoring empty config keep-alive");
+ byte[] decodedData = Base64.getDecoder().decode(dataPayload);
+ if (decodedData.length == 0) {
+ LOG.warn("Ignoring zero-byte config data from SSE stream");
} else {
- configConsumer.accept(configs);
+ Prefab.Configs configs = Prefab.Configs.parseFrom(decodedData);
+ if (!configs.hasConfigServicePointer()) {
+ LOG.debug("Ignoring empty config keep-alive");
+ } else {
+ configConsumer.accept(configs);
+ }
}
}
} catch (InvalidProtocolBufferException e) {
diff --git a/sdk/src/test/java/com/reforge/sdk/internal/HttpClientTest.java b/sdk/src/test/java/com/reforge/sdk/internal/HttpClientTest.java
index 8927145..3f081e2 100644
--- a/sdk/src/test/java/com/reforge/sdk/internal/HttpClientTest.java
+++ b/sdk/src/test/java/com/reforge/sdk/internal/HttpClientTest.java
@@ -48,7 +48,12 @@ void setup() {
@Test
void testFailoverForConfigFetch() throws Exception {
// Use byte[]–based mocks since requestConfigsFromURI uses BodyHandlers.ofByteArray().
- Prefab.Configs dummyConfigs = Prefab.Configs.newBuilder().build();
+ Prefab.Configs dummyConfigs = Prefab.Configs
+ .newBuilder()
+ .setConfigServicePointer(
+ Prefab.ConfigServicePointer.newBuilder().setProjectId(123L)
+ )
+ .build();
byte[] dummyBytes = dummyConfigs.toByteArray();
HttpResponse failureResponse = mock(HttpResponse.class);
@@ -128,7 +133,12 @@ void testFailoverForSSEConnection() throws Exception {
@Test
void testBasicCaching() throws Exception {
- Prefab.Configs dummyConfigs = Prefab.Configs.newBuilder().build();
+ Prefab.Configs dummyConfigs = Prefab.Configs
+ .newBuilder()
+ .setConfigServicePointer(
+ Prefab.ConfigServicePointer.newBuilder().setProjectId(123L)
+ )
+ .build();
byte[] dummyBytes = dummyConfigs.toByteArray();
HttpResponse httpResponse200 = mock(HttpResponse.class);
@@ -175,7 +185,12 @@ void testBasicCaching() throws Exception {
@Test
void testConditionalGet304() throws Exception {
// In order to trigger a conditional GET, we insert a cached entry that is expired.
- Prefab.Configs dummyConfigs = Prefab.Configs.newBuilder().build();
+ Prefab.Configs dummyConfigs = Prefab.Configs
+ .newBuilder()
+ .setConfigServicePointer(
+ Prefab.ConfigServicePointer.newBuilder().setProjectId(123L)
+ )
+ .build();
byte[] dummyBytes = dummyConfigs.toByteArray();
// Use a time far enough in the past to ensure expiration.
long past = System.currentTimeMillis() - 10_000;
@@ -225,7 +240,12 @@ void testConditionalGet304() throws Exception {
@Test
void testClearCache() throws Exception {
- Prefab.Configs dummyConfigs = Prefab.Configs.newBuilder().build();
+ Prefab.Configs dummyConfigs = Prefab.Configs
+ .newBuilder()
+ .setConfigServicePointer(
+ Prefab.ConfigServicePointer.newBuilder().setProjectId(123L)
+ )
+ .build();
byte[] dummyBytes = dummyConfigs.toByteArray();
HttpResponse httpResponse200 = mock(HttpResponse.class);
@@ -280,7 +300,12 @@ void testClearCache() throws Exception {
@Test
void testNoCacheResponseAlwaysRevalidates() throws Exception {
// Create a valid Prefab.Configs instance and its serialized form.
- Prefab.Configs dummyConfigs = Prefab.Configs.newBuilder().build();
+ Prefab.Configs dummyConfigs = Prefab.Configs
+ .newBuilder()
+ .setConfigServicePointer(
+ Prefab.ConfigServicePointer.newBuilder().setProjectId(123L)
+ )
+ .build();
byte[] dummyBytes = dummyConfigs.toByteArray();
// Simulate a 200 response with Cache-Control: no-cache and an ETag.
@@ -365,4 +390,170 @@ void testNoCacheResponseAlwaysRevalidates() throws Exception {
assertThat(sentRequest.headers().firstValue("If-None-Match"))
.contains("etag-no-cache");
}
+
+ @Test
+ void testZeroByteConfigRejectionFromHttpResponse() throws Exception {
+ // Mock a 200 response that returns zero bytes
+ byte[] zeroBytes = new byte[0];
+ HttpResponse zeroByteResponse = mock(HttpResponse.class);
+ when(zeroByteResponse.statusCode()).thenReturn(200);
+ when(zeroByteResponse.body()).thenReturn(zeroBytes);
+ when(zeroByteResponse.headers()).thenReturn(HttpHeaders.of(Map.of(), (k, v) -> true));
+
+ CompletableFuture> futureZeroBytes = CompletableFuture.completedFuture(
+ zeroByteResponse
+ );
+ when(
+ mockHttpClient.sendAsync(
+ any(HttpRequest.class),
+ any(HttpResponse.BodyHandler.class)
+ )
+ )
+ .thenReturn(futureZeroBytes);
+
+ // Request configs - this should eventually fail after retries
+ CompletableFuture>> result = prefabHttpClient.requestConfigs(
+ 0L
+ );
+
+ HttpResponse> response = result.get();
+ assertThat(response.statusCode()).isEqualTo(200);
+
+ // Try to get the body - this should throw IllegalArgumentException
+ try {
+ response.body().get();
+ assertThat(false)
+ .as("Expected IllegalArgumentException for zero-byte config")
+ .isTrue();
+ } catch (IllegalArgumentException e) {
+ // Should get IllegalArgumentException from zero-byte rejection
+ assertThat(e.getMessage()).contains("Zero-byte config data is not valid");
+ }
+ }
+
+ @Test
+ void testZeroByteConfigRejectionFromNon200Response() throws Exception {
+ // Mock a 404 response that returns zero bytes
+ byte[] zeroBytes = new byte[0];
+ HttpResponse zeroByteResponse = mock(HttpResponse.class);
+ when(zeroByteResponse.statusCode()).thenReturn(404);
+ when(zeroByteResponse.body()).thenReturn(zeroBytes);
+ when(zeroByteResponse.headers()).thenReturn(HttpHeaders.of(Map.of(), (k, v) -> true));
+
+ CompletableFuture> futureZeroBytes = CompletableFuture.completedFuture(
+ zeroByteResponse
+ );
+ when(
+ mockHttpClient.sendAsync(
+ any(HttpRequest.class),
+ any(HttpResponse.BodyHandler.class)
+ )
+ )
+ .thenReturn(futureZeroBytes);
+
+ // Request configs - this should eventually fail after retries
+ CompletableFuture>> result = prefabHttpClient.requestConfigs(
+ 0L
+ );
+
+ HttpResponse> response = result.get();
+ assertThat(response.statusCode()).isEqualTo(404);
+
+ // Try to get the body - this should throw IllegalArgumentException
+ try {
+ response.body().get();
+ assertThat(false)
+ .as("Expected IllegalArgumentException for zero-byte config")
+ .isTrue();
+ } catch (IllegalArgumentException e) {
+ // Should get IllegalArgumentException from zero-byte rejection
+ assertThat(e.getMessage()).contains("Zero-byte config data is not valid");
+ }
+ }
+
+ @Test
+ void testZeroByteConfigRejectionFromCache() throws Exception {
+ // Insert zero-byte data directly into cache
+ URI uri = URI.create("http://a.example.com/api/v2/configs/0");
+ Field cacheField = HttpClient.class.getDeclaredField("configCache");
+ cacheField.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ Cache cache = (Cache) cacheField.get(
+ prefabHttpClient
+ );
+
+ // Create a cache entry with zero-byte data that is still fresh
+ long future = System.currentTimeMillis() + 60_000;
+ HttpClient.CacheEntry zeroByteCacheEntry = new HttpClient.CacheEntry(
+ new byte[0],
+ "zero-byte-etag",
+ future
+ );
+ cache.put(uri, zeroByteCacheEntry);
+
+ // Request configs - should return cached response but fail when accessing body
+ CompletableFuture>> result = prefabHttpClient.requestConfigs(
+ 0L
+ );
+
+ HttpResponse> response = result.get();
+ assertThat(response.statusCode()).isEqualTo(200);
+ assertThat(response.headers().firstValue("X-Cache")).contains("HIT");
+
+ // Try to get the body - this should throw IllegalArgumentException
+ try {
+ response.body().get();
+ assertThat(false)
+ .as("Expected IllegalArgumentException for zero-byte cached config")
+ .isTrue();
+ } catch (IllegalArgumentException e) {
+ // Should get IllegalArgumentException from zero-byte rejection
+ assertThat(e.getMessage()).contains("Zero-byte config data is not valid");
+ }
+ }
+
+ @Test
+ void testValidConfigProcessingAfterZeroByteRejectionImplementation() throws Exception {
+ // This test verifies that valid configs can still be processed normally
+ // even when zero-byte rejection is in place
+
+ Prefab.Configs validConfigs = Prefab.Configs
+ .newBuilder()
+ .setConfigServicePointer(
+ Prefab.ConfigServicePointer.newBuilder().setProjectId(456L)
+ )
+ .build();
+ byte[] validBytes = validConfigs.toByteArray();
+ HttpResponse validResponse = mock(HttpResponse.class);
+ when(validResponse.statusCode()).thenReturn(200);
+ when(validResponse.body()).thenReturn(validBytes);
+ when(validResponse.headers()).thenReturn(HttpHeaders.of(Map.of(), (k, v) -> true));
+
+ CompletableFuture> futureValidBytes = CompletableFuture.completedFuture(
+ validResponse
+ );
+
+ when(
+ mockHttpClient.sendAsync(
+ any(HttpRequest.class),
+ any(HttpResponse.BodyHandler.class)
+ )
+ )
+ .thenReturn(futureValidBytes);
+
+ // Request configs - should succeed with valid data
+ CompletableFuture>> result = prefabHttpClient.requestConfigs(
+ 0L
+ );
+
+ HttpResponse> response = result.get();
+ assertThat(response.statusCode()).isEqualTo(200);
+
+ // Should be able to get valid configs without exception
+ Prefab.Configs configs = response.body().get();
+ assertThat(configs).isEqualTo(validConfigs);
+
+ // Should have called sendAsync once
+ verify(mockHttpClient, times(1)).sendAsync(any(), any());
+ }
}
diff --git a/sdk/src/test/java/com/reforge/sdk/internal/SseConfigStreamingSubscriberTest.java b/sdk/src/test/java/com/reforge/sdk/internal/SseConfigStreamingSubscriberTest.java
new file mode 100644
index 0000000..c52f444
--- /dev/null
+++ b/sdk/src/test/java/com/reforge/sdk/internal/SseConfigStreamingSubscriberTest.java
@@ -0,0 +1,203 @@
+package com.reforge.sdk.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.*;
+
+import cloud.prefab.domain.Prefab;
+import cloud.prefab.sse.events.DataEvent;
+import java.util.Base64;
+import java.util.concurrent.Flow;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SseConfigStreamingSubscriberTest {
+
+ @Mock
+ HttpClient mockHttpClient;
+
+ @Mock
+ Supplier mockHighwaterMarkSupplier;
+
+ @Mock
+ Consumer mockConfigsConsumer;
+
+ @Mock
+ ScheduledExecutorService mockScheduledExecutorService;
+
+ SseConfigStreamingSubscriber.FlowSubscriber flowSubscriber;
+
+ @BeforeEach
+ void setup() {
+ // Setup method - mocks will be configured in individual tests as needed
+ }
+
+ @Test
+ void testZeroByteDataEventIgnored() {
+ // Create a FlowSubscriber directly to test zero-byte handling
+ Consumer mockRestartHandler = mock(Consumer.class);
+ flowSubscriber =
+ new SseConfigStreamingSubscriber.FlowSubscriber(
+ mockConfigsConsumer,
+ mockRestartHandler
+ );
+
+ // Mock the subscription
+ Flow.Subscription mockSubscription = mock(Flow.Subscription.class);
+ flowSubscriber.onSubscribe(mockSubscription);
+
+ // Create a DataEvent with zero-byte base64 data (empty string encodes to zero bytes)
+ String emptyBase64 = Base64.getEncoder().encodeToString(new byte[0]);
+ DataEvent zeroByteDataEvent = new DataEvent("config", emptyBase64, null);
+
+ // Process the zero-byte data event
+ flowSubscriber.onNext(zeroByteDataEvent);
+
+ // Verify that the config consumer was never called (zero bytes should be ignored)
+ verify(mockConfigsConsumer, never()).accept(any(Prefab.Configs.class));
+
+ // Verify that hasReceivedData is still true (we did receive data, just ignored it)
+ assertThat(flowSubscriber.getHasReceivedData()).isTrue();
+
+ // Verify that subscription.request(1) was called twice (once for onSubscribe, once for onNext)
+ verify(mockSubscription, times(2)).request(1);
+ }
+
+ @Test
+ void testValidDataEventProcessed() throws Exception {
+ // Create a FlowSubscriber directly to test valid data handling
+ Consumer mockRestartHandler = mock(Consumer.class);
+ flowSubscriber =
+ new SseConfigStreamingSubscriber.FlowSubscriber(
+ mockConfigsConsumer,
+ mockRestartHandler
+ );
+
+ // Mock the subscription
+ Flow.Subscription mockSubscription = mock(Flow.Subscription.class);
+ flowSubscriber.onSubscribe(mockSubscription);
+
+ // Create valid config data
+ Prefab.Configs validConfigs = Prefab.Configs
+ .newBuilder()
+ .setConfigServicePointer(
+ Prefab.ConfigServicePointer.newBuilder().setProjectId(123L).setProjectEnvId(456L)
+ )
+ .build();
+ byte[] validBytes = validConfigs.toByteArray();
+ String validBase64 = Base64.getEncoder().encodeToString(validBytes);
+
+ DataEvent validDataEvent = new DataEvent("config", validBase64, null);
+
+ // Process the valid data event
+ flowSubscriber.onNext(validDataEvent);
+
+ // Verify that the config consumer was called with the parsed configs
+ ArgumentCaptor configsCaptor = ArgumentCaptor.forClass(
+ Prefab.Configs.class
+ );
+ verify(mockConfigsConsumer, times(1)).accept(configsCaptor.capture());
+
+ Prefab.Configs capturedConfigs = configsCaptor.getValue();
+ assertThat(capturedConfigs.getConfigServicePointer().getProjectId()).isEqualTo(123L);
+ assertThat(capturedConfigs.getConfigServicePointer().getProjectEnvId())
+ .isEqualTo(456L);
+
+ // Verify that hasReceivedData is true
+ assertThat(flowSubscriber.getHasReceivedData()).isTrue();
+ }
+
+ @Test
+ void testEmptyConfigKeepAliveIgnored() throws Exception {
+ // Create a FlowSubscriber directly to test keep-alive handling
+ Consumer mockRestartHandler = mock(Consumer.class);
+ flowSubscriber =
+ new SseConfigStreamingSubscriber.FlowSubscriber(
+ mockConfigsConsumer,
+ mockRestartHandler
+ );
+
+ // Mock the subscription
+ Flow.Subscription mockSubscription = mock(Flow.Subscription.class);
+ flowSubscriber.onSubscribe(mockSubscription);
+
+ // Create empty config data (no ConfigServicePointer)
+ Prefab.Configs emptyConfigs = Prefab.Configs.newBuilder().build();
+ byte[] emptyBytes = emptyConfigs.toByteArray();
+ String emptyBase64 = Base64.getEncoder().encodeToString(emptyBytes);
+
+ DataEvent keepAliveEvent = new DataEvent("config", emptyBase64, null);
+
+ // Process the keep-alive event
+ flowSubscriber.onNext(keepAliveEvent);
+
+ // Verify that the config consumer was never called (keep-alive should be ignored)
+ verify(mockConfigsConsumer, never()).accept(any(Prefab.Configs.class));
+
+ // Verify that hasReceivedData is true (we did receive data)
+ assertThat(flowSubscriber.getHasReceivedData()).isTrue();
+ }
+
+ @Test
+ void testEmptyDataPayloadIgnored() {
+ // Create a FlowSubscriber directly to test empty payload handling
+ Consumer mockRestartHandler = mock(Consumer.class);
+ flowSubscriber =
+ new SseConfigStreamingSubscriber.FlowSubscriber(
+ mockConfigsConsumer,
+ mockRestartHandler
+ );
+
+ // Mock the subscription
+ Flow.Subscription mockSubscription = mock(Flow.Subscription.class);
+ flowSubscriber.onSubscribe(mockSubscription);
+
+ // Create a DataEvent with empty data payload
+ DataEvent emptyDataEvent = new DataEvent("config", "", null);
+
+ // Process the empty data event
+ flowSubscriber.onNext(emptyDataEvent);
+
+ // Verify that the config consumer was never called
+ verify(mockConfigsConsumer, never()).accept(any(Prefab.Configs.class));
+
+ // Verify that hasReceivedData is still true (we did receive data, just ignored it)
+ assertThat(flowSubscriber.getHasReceivedData()).isTrue();
+ }
+
+ @Test
+ void testWhitespaceOnlyDataPayloadIgnored() {
+ // Create a FlowSubscriber directly to test whitespace-only payload handling
+ Consumer mockRestartHandler = mock(Consumer.class);
+ flowSubscriber =
+ new SseConfigStreamingSubscriber.FlowSubscriber(
+ mockConfigsConsumer,
+ mockRestartHandler
+ );
+
+ // Mock the subscription
+ Flow.Subscription mockSubscription = mock(Flow.Subscription.class);
+ flowSubscriber.onSubscribe(mockSubscription);
+
+ // Create a DataEvent with whitespace-only data payload
+ DataEvent whitespaceDataEvent = new DataEvent("config", " \n\t ", null);
+
+ // Process the whitespace data event
+ flowSubscriber.onNext(whitespaceDataEvent);
+
+ // Verify that the config consumer was never called
+ verify(mockConfigsConsumer, never()).accept(any(Prefab.Configs.class));
+
+ // Verify that hasReceivedData is still true (we did receive data, just ignored it)
+ assertThat(flowSubscriber.getHasReceivedData()).isTrue();
+ }
+}