diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 499d04dca7..301d098f55 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -18,6 +18,7 @@ - **Enhanced `enableMultipleCatalogSupport` behavior**: When this parameter is disabled (`enableMultipleCatalogSupport=0`), metadata operations (such as `getSchemas()`, `getTables()`, `getColumns()`, etc.) now return results only when the catalog parameter is either `null` or matches the current catalog. For any other catalog name, an empty result set is returned. This ensures metadata queries are restricted to the current catalog context. When enabled (`enableMultipleCatalogSupport=1`), metadata operations continue to work across all accessible catalogs. ### Fixed +- Fixed telemetry push to retry on partial failures with exponential backoff. When the telemetry service returns a partial success response, the client now automatically retries up to 3 times instead of silently dropping the failed events. - Fixed timeout exception handling to throw `SQLTimeoutException` instead of `DatabricksHttpException` when queries timeout during result fetching phase. This completes the timeout exception fix to handle both query execution polling and result fetching phases. - Fixed `getTypeInfo()` and `getClientInfoProperties()` to return fresh ResultSet instances on each call instead of shared static instances. This resolves issues where calling these methods multiple times would fail due to exhausted cursor state (Issue #1178). - Fixed complex data type metadata support when retrieving 0 rows in Arrow format diff --git a/src/main/java/com/databricks/jdbc/common/util/RetryUtil.java b/src/main/java/com/databricks/jdbc/common/util/RetryUtil.java new file mode 100644 index 0000000000..bc0ea7abf7 --- /dev/null +++ b/src/main/java/com/databricks/jdbc/common/util/RetryUtil.java @@ -0,0 +1,25 @@ +package com.databricks.jdbc.common.util; + +public final class RetryUtil { + + private static final int DEFAULT_BACKOFF_FACTOR = 2; + private static final int MIN_BACKOFF_INTERVAL = 1000; + private static final int MAX_RETRY_INTERVAL = 10 * 1000; + + private RetryUtil() {} + + public static long calculateExponentialBackoff(int executionCount) { + return Math.min( + MIN_BACKOFF_INTERVAL * (long) Math.pow(DEFAULT_BACKOFF_FACTOR, executionCount), + MAX_RETRY_INTERVAL); + } + + public static void doSleepForDelay(long delayMillis) { + try { + Thread.sleep(delayMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Sleep interrupted", e); + } + } +} diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/http/DatabricksHttpRetryHandler.java b/src/main/java/com/databricks/jdbc/dbclient/impl/http/DatabricksHttpRetryHandler.java index 94650ef19c..b4e168ede9 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/http/DatabricksHttpRetryHandler.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/http/DatabricksHttpRetryHandler.java @@ -3,6 +3,7 @@ import static com.databricks.jdbc.common.DatabricksJdbcConstants.THRIFT_ERROR_MESSAGE_HEADER; import com.databricks.jdbc.api.internal.IDatabricksConnectionContext; +import com.databricks.jdbc.common.util.RetryUtil; import com.databricks.jdbc.exception.DatabricksRetryHandlerException; import com.databricks.jdbc.log.JdbcLogger; import com.databricks.jdbc.log.JdbcLoggerFactory; @@ -31,9 +32,6 @@ public class DatabricksHttpRetryHandler private static final String RATE_LIMIT_ACCUMULATED_TIME_KEY = "rateLimitAccumulatedTime"; private static final String API_CODES_ACCUMULATED_TIME_KEY = "apiCodesAccumulatedTime"; static final String RETRY_AFTER_HEADER = "Retry-After"; - private static final int DEFAULT_BACKOFF_FACTOR = 2; // Exponential factor - private static final int MIN_BACKOFF_INTERVAL = 1000; // 1s - private static final int MAX_RETRY_INTERVAL = 10 * 1000; // 10s private final IDatabricksConnectionContext connectionContext; @@ -242,10 +240,9 @@ static long calculateDelayInMillis(int errorCode, int executionCount, int retryI } } + @VisibleForTesting static long calculateExponentialBackoff(int executionCount) { - return Math.min( - MIN_BACKOFF_INTERVAL * (long) Math.pow(DEFAULT_BACKOFF_FACTOR, executionCount), - MAX_RETRY_INTERVAL); + return RetryUtil.calculateExponentialBackoff(executionCount); } static int getErrorCodeFromException(IOException exception) { @@ -273,13 +270,8 @@ private static long getAccumulatedTime(HttpContext context, String key) { } @VisibleForTesting - protected void doSleepForDelay(long delayMillis) { - try { - Thread.sleep(delayMillis); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); // Restore the interrupt status - throw new RuntimeException("Sleep interrupted", e); - } + void doSleepForDelay(long delayMillis) { + RetryUtil.doSleepForDelay(delayMillis); } /** Check if the request is retryable based on the status code and any connection preferences. */ diff --git a/src/main/java/com/databricks/jdbc/telemetry/TelemetryPushClient.java b/src/main/java/com/databricks/jdbc/telemetry/TelemetryPushClient.java index 1befdec99f..0cdac985c5 100644 --- a/src/main/java/com/databricks/jdbc/telemetry/TelemetryPushClient.java +++ b/src/main/java/com/databricks/jdbc/telemetry/TelemetryPushClient.java @@ -6,6 +6,7 @@ import com.databricks.jdbc.common.DatabricksJdbcConstants; import com.databricks.jdbc.common.HttpClientType; import com.databricks.jdbc.common.util.HttpUtil; +import com.databricks.jdbc.common.util.RetryUtil; import com.databricks.jdbc.dbclient.IDatabricksHttpClient; import com.databricks.jdbc.dbclient.impl.http.DatabricksHttpClientFactory; import com.databricks.jdbc.dbclient.impl.sqlexec.PathConstants; @@ -29,73 +30,110 @@ public class TelemetryPushClient implements ITelemetryPushClient { private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(TelemetryPushClient.class); private static final String REQUEST_ID_HEADER = "x-request-id"; + private static final int DEFAULT_MAX_RETRIES = 3; + private final boolean isAuthenticated; private final IDatabricksConnectionContext connectionContext; private final DatabricksConfig databricksConfig; + private final int maxRetries; public TelemetryPushClient( boolean isAuthenticated, IDatabricksConnectionContext connectionContext, DatabricksConfig databricksConfig) { + this(isAuthenticated, connectionContext, databricksConfig, DEFAULT_MAX_RETRIES); + } + + public TelemetryPushClient( + boolean isAuthenticated, + IDatabricksConnectionContext connectionContext, + DatabricksConfig databricksConfig, + int maxRetries) { this.isAuthenticated = isAuthenticated; this.connectionContext = connectionContext; this.databricksConfig = databricksConfig; + this.maxRetries = maxRetries; } @Override public void pushEvent(TelemetryRequest request) throws Exception { - IDatabricksHttpClient httpClient = - DatabricksHttpClientFactory.getInstance() - .getClient(connectionContext, HttpClientType.TELEMETRY); - String path = - isAuthenticated - ? PathConstants.TELEMETRY_PATH - : PathConstants.TELEMETRY_PATH_UNAUTHENTICATED; - String uri = new URIBuilder(connectionContext.getHostUrl()).setPath(path).toString(); - HttpPost post = new HttpPost(uri); - post.setEntity( - new StringEntity(getTelemetryMapper().writeValueAsString(request), StandardCharsets.UTF_8)); - DatabricksJdbcConstants.JSON_HTTP_HEADERS.forEach(post::addHeader); - Map authHeaders = - isAuthenticated ? databricksConfig.authenticate() : Collections.emptyMap(); - authHeaders.forEach(post::addHeader); - try (CloseableHttpResponse response = httpClient.execute(post)) { - // TODO: check response and add retry for partial failures - if (!HttpUtil.isSuccessfulHttpResponse(response)) { + int remainingRetries = maxRetries; + while (true) { + IDatabricksHttpClient httpClient = + DatabricksHttpClientFactory.getInstance() + .getClient(connectionContext, HttpClientType.TELEMETRY); + String path = + isAuthenticated + ? PathConstants.TELEMETRY_PATH + : PathConstants.TELEMETRY_PATH_UNAUTHENTICATED; + String uri = new URIBuilder(connectionContext.getHostUrl()).setPath(path).toString(); + HttpPost post = new HttpPost(uri); + post.setEntity( + new StringEntity( + getTelemetryMapper().writeValueAsString(request), StandardCharsets.UTF_8)); + DatabricksJdbcConstants.JSON_HTTP_HEADERS.forEach(post::addHeader); + Map authHeaders = + isAuthenticated ? databricksConfig.authenticate() : Collections.emptyMap(); + authHeaders.forEach(post::addHeader); + + try (CloseableHttpResponse response = httpClient.execute(post)) { + if (!HttpUtil.isSuccessfulHttpResponse(response)) { + LOGGER.trace( + "Failed to push telemetry logs with error response: {}", response.getStatusLine()); + if (connectionContext.isTelemetryCircuitBreakerEnabled()) { + throw new DatabricksTelemetryException( + "Telemetry push failed with response: " + response.getStatusLine()); + } else { + return; + } + } + TelemetryResponse telResponse = + getTelemetryMapper() + .readValue(EntityUtils.toString(response.getEntity()), TelemetryResponse.class); LOGGER.trace( - "Failed to push telemetry logs with error response: {}", response.getStatusLine()); - if (connectionContext.isTelemetryCircuitBreakerEnabled()) { - throw new DatabricksTelemetryException( - "Telemetry push failed with response: " + response.getStatusLine()); + "Pushed Telemetry logs with request-Id {} with events {} with error count {}", + response.getFirstHeader(REQUEST_ID_HEADER), + telResponse.getNumProtoSuccess(), + telResponse.getErrors().size()); + if (!telResponse.getErrors().isEmpty()) { + LOGGER.trace("Failed to push telemetry logs with error: {}", telResponse.getErrors()); + } + + if (request.getProtoLogs().size() != telResponse.getNumProtoSuccess()) { + LOGGER.debug( + "Partial failure while pushing telemetry logs: request count: {}, upload count: {}, remaining retries: {}", + request.getProtoLogs().size(), + telResponse.getNumProtoSuccess(), + remainingRetries); + + if (remainingRetries > 0) { + long backoffMs = RetryUtil.calculateExponentialBackoff(maxRetries - remainingRetries); + LOGGER.debug( + "Retrying telemetry push after {}ms ({} retries remaining)", + backoffMs, + remainingRetries); + RetryUtil.doSleepForDelay(backoffMs); + remainingRetries--; + } else { + LOGGER.debug( + "Max retries exhausted for telemetry push. Dropping {} events", + request.getProtoLogs().size() - telResponse.getNumProtoSuccess()); + return; + } } else { return; } - } - TelemetryResponse telResponse = - getTelemetryMapper() - .readValue(EntityUtils.toString(response.getEntity()), TelemetryResponse.class); - LOGGER.trace( - "Pushed Telemetry logs with request-Id {} with events {} with error count {}", - response.getFirstHeader(REQUEST_ID_HEADER), - telResponse.getNumProtoSuccess(), - telResponse.getErrors().size()); - if (!telResponse.getErrors().isEmpty()) { - LOGGER.trace("Failed to push telemetry logs with error: {}", telResponse.getErrors()); - } - if (request.getProtoLogs().size() != telResponse.getNumProtoSuccess()) { + } catch (DatabricksTelemetryException e) { + throw e; + } catch (Exception e) { LOGGER.debug( - "Partial failure while pushing telemetry logs with error response: {}, request count: {}, upload count: {}", - telResponse.getErrors(), - request.getProtoLogs().size(), - telResponse.getNumProtoSuccess()); - } - } catch (Exception e) { - LOGGER.debug( - "Failed to push telemetry logs with error: {}, request: {}", - e.getMessage(), - getTelemetryMapper().writeValueAsString(request)); - if (connectionContext.isTelemetryCircuitBreakerEnabled()) { - throw new DatabricksTelemetryException("Exception while pushing telemetry logs", e); + "Failed to push telemetry logs with error: {}, request: {}", + e.getMessage(), + getTelemetryMapper().writeValueAsString(request)); + if (connectionContext.isTelemetryCircuitBreakerEnabled()) { + throw new DatabricksTelemetryException("Exception while pushing telemetry logs", e); + } + return; } } } diff --git a/src/test/java/com/databricks/jdbc/common/util/RetryUtilTest.java b/src/test/java/com/databricks/jdbc/common/util/RetryUtilTest.java new file mode 100644 index 0000000000..1320e44a24 --- /dev/null +++ b/src/test/java/com/databricks/jdbc/common/util/RetryUtilTest.java @@ -0,0 +1,54 @@ +package com.databricks.jdbc.common.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.Test; + +class RetryUtilTest { + + @Test + void calculateExponentialBackoff_firstAttempt() { + long result = RetryUtil.calculateExponentialBackoff(0); + assertEquals(1000, result); + } + + @Test + void calculateExponentialBackoff_secondAttempt() { + long result = RetryUtil.calculateExponentialBackoff(1); + assertEquals(2000, result); + } + + @Test + void calculateExponentialBackoff_thirdAttempt() { + long result = RetryUtil.calculateExponentialBackoff(2); + assertEquals(4000, result); + } + + @Test + void calculateExponentialBackoff_exceedsMax() { + long result = RetryUtil.calculateExponentialBackoff(10); + assertEquals(10000, result); + } + + @Test + void doSleepForDelay_negativeDelay() { + assertThrows(IllegalArgumentException.class, () -> RetryUtil.doSleepForDelay(-1)); + } + + @Test + void doSleepForDelay_zeroDelay() { + long start = System.currentTimeMillis(); + RetryUtil.doSleepForDelay(0); + long elapsed = System.currentTimeMillis() - start; + assertEquals(0, elapsed, 10); + } + + @Test + void doSleepForDelay_smallDelay() { + long start = System.currentTimeMillis(); + RetryUtil.doSleepForDelay(50); + long elapsed = System.currentTimeMillis() - start; + assertEquals(50, elapsed, 20); + } +} diff --git a/src/test/java/com/databricks/jdbc/telemetry/TelemetryPushClientTest.java b/src/test/java/com/databricks/jdbc/telemetry/TelemetryPushClientTest.java index 8b89204c76..de8f4e31e5 100644 --- a/src/test/java/com/databricks/jdbc/telemetry/TelemetryPushClientTest.java +++ b/src/test/java/com/databricks/jdbc/telemetry/TelemetryPushClientTest.java @@ -2,9 +2,10 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -14,8 +15,11 @@ import com.databricks.jdbc.dbclient.impl.http.DatabricksHttpClientFactory; import com.databricks.jdbc.exception.DatabricksTelemetryException; import com.databricks.jdbc.model.telemetry.TelemetryRequest; +import java.util.Arrays; +import org.apache.http.HttpEntity; import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpUriRequest; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -164,4 +168,96 @@ public void pushEvent_doesNotThrow_onErrorCodes_whenCBDisabled(int statusCode) t assertDoesNotThrow(() -> client.pushEvent(new TelemetryRequest())); } } + + @Test + public void pushEvent_noRetryOnFullSuccess() throws Exception { + try (MockedStatic factoryMocked = + org.mockito.Mockito.mockStatic(DatabricksHttpClientFactory.class)) { + DatabricksHttpClientFactory mockFactory = mock(DatabricksHttpClientFactory.class); + factoryMocked.when(DatabricksHttpClientFactory::getInstance).thenReturn(mockFactory); + + IDatabricksHttpClient mockHttpClient = mock(IDatabricksHttpClient.class); + when(mockFactory.getClient(any(), any())).thenReturn(mockHttpClient); + + TelemetryRequest request = new TelemetryRequest(); + request.setProtoLogs(Arrays.asList("log1", "log2")); + + CloseableHttpResponse successResponse = mock(CloseableHttpResponse.class); + StatusLine successStatusLine = mock(StatusLine.class); + when(successResponse.getStatusLine()).thenReturn(successStatusLine); + when(successStatusLine.getStatusCode()).thenReturn(200); + + HttpEntity successEntity = mock(HttpEntity.class); + when(successResponse.getEntity()).thenReturn(successEntity); + when(successEntity.getContent()) + .thenReturn( + new java.io.ByteArrayInputStream("{\"numProtoSuccess\":2,\"errors\":[]}".getBytes())); + + when(mockHttpClient.execute(any(HttpUriRequest.class))).thenReturn(successResponse); + + IDatabricksConnectionContext mockContext = mock(IDatabricksConnectionContext.class); + when(mockContext.getHostUrl()).thenReturn("https://example.com"); + when(mockContext.isTelemetryCircuitBreakerEnabled()).thenReturn(false); + + TelemetryPushClient client = + new TelemetryPushClient(false /* isAuthenticated */, mockContext, null); + + assertDoesNotThrow(() -> client.pushEvent(request)); + + verify(mockHttpClient, times(1)).execute(any(HttpUriRequest.class)); + } + } + + @Test + public void pushEvent_retriesOnPartialSuccess() throws Exception { + try (MockedStatic factoryMocked = + org.mockito.Mockito.mockStatic(DatabricksHttpClientFactory.class)) { + DatabricksHttpClientFactory mockFactory = mock(DatabricksHttpClientFactory.class); + factoryMocked.when(DatabricksHttpClientFactory::getInstance).thenReturn(mockFactory); + + IDatabricksHttpClient mockHttpClient = mock(IDatabricksHttpClient.class); + when(mockFactory.getClient(any(), any())).thenReturn(mockHttpClient); + + TelemetryRequest request = new TelemetryRequest(); + request.setProtoLogs(Arrays.asList("log1", "log2", "log3")); + + CloseableHttpResponse response1 = mock(CloseableHttpResponse.class); + StatusLine statusLine1 = mock(StatusLine.class); + when(response1.getStatusLine()).thenReturn(statusLine1); + when(statusLine1.getStatusCode()).thenReturn(200); + + HttpEntity entity1 = mock(HttpEntity.class); + when(response1.getEntity()).thenReturn(entity1); + when(entity1.getContent()) + .thenReturn( + new java.io.ByteArrayInputStream("{\"numProtoSuccess\":1,\"errors\":[]}".getBytes())); + + CloseableHttpResponse response2 = mock(CloseableHttpResponse.class); + StatusLine statusLine2 = mock(StatusLine.class); + when(response2.getStatusLine()).thenReturn(statusLine2); + when(statusLine2.getStatusCode()).thenReturn(200); + + HttpEntity entity2 = mock(HttpEntity.class); + when(response2.getEntity()).thenReturn(entity2); + when(entity2.getContent()) + .thenReturn( + new java.io.ByteArrayInputStream("{\"numProtoSuccess\":3,\"errors\":[]}".getBytes())); + + when(mockHttpClient.execute(any(HttpUriRequest.class))) + .thenReturn(response1) + .thenReturn(response2); + + IDatabricksConnectionContext mockContext = mock(IDatabricksConnectionContext.class); + when(mockContext.getHostUrl()).thenReturn("https://example.com"); + when(mockContext.isTelemetryCircuitBreakerEnabled()).thenReturn(false); + + TelemetryPushClient client = + new TelemetryPushClient( + false /* isAuthenticated */, mockContext, null, 3 /* maxRetries */); + + assertDoesNotThrow(() -> client.pushEvent(request)); + + verify(mockHttpClient, times(2)).execute(any(HttpUriRequest.class)); + } + } }