Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- **Enhanced `enableMultipleCatalogSupport` behavior**: When this parameter is disabled (`enableMultipleCatalogSupport=0`), metadata operations (such as `getSchemas()`, `getTables()`, `getColumns()`, etc.) now return results only when the catalog parameter is either `null` or matches the current catalog. For any other catalog name, an empty result set is returned. This ensures metadata queries are restricted to the current catalog context. When enabled (`enableMultipleCatalogSupport=1`), metadata operations continue to work across all accessible catalogs.

### Fixed
- Fixed telemetry push to retry on partial failures with exponential backoff. When the telemetry service returns a partial success response, the client now automatically retries up to 3 times instead of silently dropping the failed events.
- Fixed timeout exception handling to throw `SQLTimeoutException` instead of `DatabricksHttpException` when queries timeout during result fetching phase. This completes the timeout exception fix to handle both query execution polling and result fetching phases.
- Fixed `getTypeInfo()` and `getClientInfoProperties()` to return fresh ResultSet instances on each call instead of shared static instances. This resolves issues where calling these methods multiple times would fail due to exhausted cursor state (Issue #1178).
- Fixed complex data type metadata support when retrieving 0 rows in Arrow format
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/com/databricks/jdbc/common/util/RetryUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.databricks.jdbc.common.util;

public final class RetryUtil {

private static final int DEFAULT_BACKOFF_FACTOR = 2;
private static final int MIN_BACKOFF_INTERVAL = 1000;
private static final int MAX_RETRY_INTERVAL = 10 * 1000;

private RetryUtil() {}

public static long calculateExponentialBackoff(int executionCount) {
return Math.min(
MIN_BACKOFF_INTERVAL * (long) Math.pow(DEFAULT_BACKOFF_FACTOR, executionCount),
MAX_RETRY_INTERVAL);
}

public static void doSleepForDelay(long delayMillis) {
try {
Thread.sleep(delayMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Sleep interrupted", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.databricks.jdbc.common.DatabricksJdbcConstants.THRIFT_ERROR_MESSAGE_HEADER;

import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
import com.databricks.jdbc.common.util.RetryUtil;
import com.databricks.jdbc.exception.DatabricksRetryHandlerException;
import com.databricks.jdbc.log.JdbcLogger;
import com.databricks.jdbc.log.JdbcLoggerFactory;
Expand Down Expand Up @@ -31,9 +32,6 @@ public class DatabricksHttpRetryHandler
private static final String RATE_LIMIT_ACCUMULATED_TIME_KEY = "rateLimitAccumulatedTime";
private static final String API_CODES_ACCUMULATED_TIME_KEY = "apiCodesAccumulatedTime";
static final String RETRY_AFTER_HEADER = "Retry-After";
private static final int DEFAULT_BACKOFF_FACTOR = 2; // Exponential factor
private static final int MIN_BACKOFF_INTERVAL = 1000; // 1s
private static final int MAX_RETRY_INTERVAL = 10 * 1000; // 10s

private final IDatabricksConnectionContext connectionContext;

Expand Down Expand Up @@ -242,10 +240,9 @@ static long calculateDelayInMillis(int errorCode, int executionCount, int retryI
}
}

@VisibleForTesting
static long calculateExponentialBackoff(int executionCount) {
return Math.min(
MIN_BACKOFF_INTERVAL * (long) Math.pow(DEFAULT_BACKOFF_FACTOR, executionCount),
MAX_RETRY_INTERVAL);
return RetryUtil.calculateExponentialBackoff(executionCount);
}

static int getErrorCodeFromException(IOException exception) {
Expand Down Expand Up @@ -273,13 +270,8 @@ private static long getAccumulatedTime(HttpContext context, String key) {
}

@VisibleForTesting
protected void doSleepForDelay(long delayMillis) {
try {
Thread.sleep(delayMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore the interrupt status
throw new RuntimeException("Sleep interrupted", e);
}
void doSleepForDelay(long delayMillis) {
RetryUtil.doSleepForDelay(delayMillis);
}

/** Check if the request is retryable based on the status code and any connection preferences. */
Expand Down
132 changes: 85 additions & 47 deletions src/main/java/com/databricks/jdbc/telemetry/TelemetryPushClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.databricks.jdbc.common.DatabricksJdbcConstants;
import com.databricks.jdbc.common.HttpClientType;
import com.databricks.jdbc.common.util.HttpUtil;
import com.databricks.jdbc.common.util.RetryUtil;
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
import com.databricks.jdbc.dbclient.impl.http.DatabricksHttpClientFactory;
import com.databricks.jdbc.dbclient.impl.sqlexec.PathConstants;
Expand All @@ -29,73 +30,110 @@ public class TelemetryPushClient implements ITelemetryPushClient {
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(TelemetryPushClient.class);

private static final String REQUEST_ID_HEADER = "x-request-id";
private static final int DEFAULT_MAX_RETRIES = 3;

private final boolean isAuthenticated;
private final IDatabricksConnectionContext connectionContext;
private final DatabricksConfig databricksConfig;
private final int maxRetries;

public TelemetryPushClient(
boolean isAuthenticated,
IDatabricksConnectionContext connectionContext,
DatabricksConfig databricksConfig) {
this(isAuthenticated, connectionContext, databricksConfig, DEFAULT_MAX_RETRIES);
}

public TelemetryPushClient(
boolean isAuthenticated,
IDatabricksConnectionContext connectionContext,
DatabricksConfig databricksConfig,
int maxRetries) {
this.isAuthenticated = isAuthenticated;
this.connectionContext = connectionContext;
this.databricksConfig = databricksConfig;
this.maxRetries = maxRetries;
}

@Override
public void pushEvent(TelemetryRequest request) throws Exception {
IDatabricksHttpClient httpClient =
DatabricksHttpClientFactory.getInstance()
.getClient(connectionContext, HttpClientType.TELEMETRY);
String path =
isAuthenticated
? PathConstants.TELEMETRY_PATH
: PathConstants.TELEMETRY_PATH_UNAUTHENTICATED;
String uri = new URIBuilder(connectionContext.getHostUrl()).setPath(path).toString();
HttpPost post = new HttpPost(uri);
post.setEntity(
new StringEntity(getTelemetryMapper().writeValueAsString(request), StandardCharsets.UTF_8));
DatabricksJdbcConstants.JSON_HTTP_HEADERS.forEach(post::addHeader);
Map<String, String> authHeaders =
isAuthenticated ? databricksConfig.authenticate() : Collections.emptyMap();
authHeaders.forEach(post::addHeader);
try (CloseableHttpResponse response = httpClient.execute(post)) {
// TODO: check response and add retry for partial failures
if (!HttpUtil.isSuccessfulHttpResponse(response)) {
int remainingRetries = maxRetries;
while (true) {
IDatabricksHttpClient httpClient =
DatabricksHttpClientFactory.getInstance()
.getClient(connectionContext, HttpClientType.TELEMETRY);
String path =
isAuthenticated
? PathConstants.TELEMETRY_PATH
: PathConstants.TELEMETRY_PATH_UNAUTHENTICATED;
String uri = new URIBuilder(connectionContext.getHostUrl()).setPath(path).toString();
HttpPost post = new HttpPost(uri);
post.setEntity(
new StringEntity(
getTelemetryMapper().writeValueAsString(request), StandardCharsets.UTF_8));
DatabricksJdbcConstants.JSON_HTTP_HEADERS.forEach(post::addHeader);
Map<String, String> authHeaders =
isAuthenticated ? databricksConfig.authenticate() : Collections.emptyMap();
authHeaders.forEach(post::addHeader);

try (CloseableHttpResponse response = httpClient.execute(post)) {
if (!HttpUtil.isSuccessfulHttpResponse(response)) {
LOGGER.trace(
"Failed to push telemetry logs with error response: {}", response.getStatusLine());
if (connectionContext.isTelemetryCircuitBreakerEnabled()) {
throw new DatabricksTelemetryException(
"Telemetry push failed with response: " + response.getStatusLine());
} else {
return;
}
}
TelemetryResponse telResponse =
getTelemetryMapper()
.readValue(EntityUtils.toString(response.getEntity()), TelemetryResponse.class);
LOGGER.trace(
"Failed to push telemetry logs with error response: {}", response.getStatusLine());
if (connectionContext.isTelemetryCircuitBreakerEnabled()) {
throw new DatabricksTelemetryException(
"Telemetry push failed with response: " + response.getStatusLine());
"Pushed Telemetry logs with request-Id {} with events {} with error count {}",
response.getFirstHeader(REQUEST_ID_HEADER),
telResponse.getNumProtoSuccess(),
telResponse.getErrors().size());
if (!telResponse.getErrors().isEmpty()) {
LOGGER.trace("Failed to push telemetry logs with error: {}", telResponse.getErrors());
}

if (request.getProtoLogs().size() != telResponse.getNumProtoSuccess()) {
LOGGER.debug(
"Partial failure while pushing telemetry logs: request count: {}, upload count: {}, remaining retries: {}",
request.getProtoLogs().size(),
telResponse.getNumProtoSuccess(),
remainingRetries);

if (remainingRetries > 0) {
long backoffMs = RetryUtil.calculateExponentialBackoff(maxRetries - remainingRetries);
LOGGER.debug(
"Retrying telemetry push after {}ms ({} retries remaining)",
backoffMs,
remainingRetries);
RetryUtil.doSleepForDelay(backoffMs);
remainingRetries--;
} else {
LOGGER.debug(
"Max retries exhausted for telemetry push. Dropping {} events",
request.getProtoLogs().size() - telResponse.getNumProtoSuccess());
return;
}
} else {
return;
}
}
TelemetryResponse telResponse =
getTelemetryMapper()
.readValue(EntityUtils.toString(response.getEntity()), TelemetryResponse.class);
LOGGER.trace(
"Pushed Telemetry logs with request-Id {} with events {} with error count {}",
response.getFirstHeader(REQUEST_ID_HEADER),
telResponse.getNumProtoSuccess(),
telResponse.getErrors().size());
if (!telResponse.getErrors().isEmpty()) {
LOGGER.trace("Failed to push telemetry logs with error: {}", telResponse.getErrors());
}
if (request.getProtoLogs().size() != telResponse.getNumProtoSuccess()) {
} catch (DatabricksTelemetryException e) {
throw e;
} catch (Exception e) {
LOGGER.debug(
"Partial failure while pushing telemetry logs with error response: {}, request count: {}, upload count: {}",
telResponse.getErrors(),
request.getProtoLogs().size(),
telResponse.getNumProtoSuccess());
}
} catch (Exception e) {
LOGGER.debug(
"Failed to push telemetry logs with error: {}, request: {}",
e.getMessage(),
getTelemetryMapper().writeValueAsString(request));
if (connectionContext.isTelemetryCircuitBreakerEnabled()) {
throw new DatabricksTelemetryException("Exception while pushing telemetry logs", e);
"Failed to push telemetry logs with error: {}, request: {}",
e.getMessage(),
getTelemetryMapper().writeValueAsString(request));
if (connectionContext.isTelemetryCircuitBreakerEnabled()) {
throw new DatabricksTelemetryException("Exception while pushing telemetry logs", e);
}
return;
}
}
}
Expand Down
54 changes: 54 additions & 0 deletions src/test/java/com/databricks/jdbc/common/util/RetryUtilTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.databricks.jdbc.common.util;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import org.junit.jupiter.api.Test;

class RetryUtilTest {

@Test
void calculateExponentialBackoff_firstAttempt() {
long result = RetryUtil.calculateExponentialBackoff(0);
assertEquals(1000, result);
}

@Test
void calculateExponentialBackoff_secondAttempt() {
long result = RetryUtil.calculateExponentialBackoff(1);
assertEquals(2000, result);
}

@Test
void calculateExponentialBackoff_thirdAttempt() {
long result = RetryUtil.calculateExponentialBackoff(2);
assertEquals(4000, result);
}

@Test
void calculateExponentialBackoff_exceedsMax() {
long result = RetryUtil.calculateExponentialBackoff(10);
assertEquals(10000, result);
}

@Test
void doSleepForDelay_negativeDelay() {
assertThrows(IllegalArgumentException.class, () -> RetryUtil.doSleepForDelay(-1));
}

@Test
void doSleepForDelay_zeroDelay() {
long start = System.currentTimeMillis();
RetryUtil.doSleepForDelay(0);
long elapsed = System.currentTimeMillis() - start;
assertEquals(0, elapsed, 10);
}

@Test
void doSleepForDelay_smallDelay() {
long start = System.currentTimeMillis();
RetryUtil.doSleepForDelay(50);
long elapsed = System.currentTimeMillis() - start;
assertEquals(50, elapsed, 20);
}
}
Loading