Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ public DatabricksResultSet executeStatement(
try {
Thread.sleep(connectionContext.getAsyncExecPollInterval());
} catch (InterruptedException e) {
// Cancel the statement on the server before throwing - it may still be running
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have cancel being fired from checkTimeout above also. We can keep a check that if cancel was already attempted, don't do cancel again.

tryCancelStatement(typedStatementId, "thread interruption");
String timeoutErrorMessage =
String.format(
"Thread interrupted due to statement timeout. StatementID %s", statementId);
Comment on lines 254 to 261
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The InterruptedException catch block does not restore the thread interrupt flag. Please call Thread.currentThread().interrupt() before proceeding (even if you cancel and throw), so upstream code can observe the interruption consistently.

Copilot uses AI. Check for mistakes.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix

Expand All @@ -268,6 +270,8 @@ public DatabricksResultSet executeStatement(
req.withHeaders(getHeaders("getStatement"));
response = wrapGetStatementResponse(apiClient.execute(req, GetStatementResponse.class));
} catch (IOException e) {
// Cancel the statement on the server before throwing - it may still be running
tryCancelStatement(typedStatementId, "polling error");
String errorMessage = "Error while processing the get statement response";
LOGGER.error(errorMessage, e);
throw new DatabricksSQLException(
Expand Down Expand Up @@ -428,6 +432,33 @@ public void cancelStatement(StatementId typedStatementId) throws DatabricksSQLEx
}
}

/**
* Attempts to cancel a statement, logging any errors but not throwing.
*
* <p>This method is used during error handling to ensure statements are cancelled on the server
* when the client encounters transient errors during polling. Since the statement may still be
* running on the server, we attempt to cancel it to avoid resource leaks.
*
* @param statementId The statement ID to cancel
* @param reason The reason for cancellation (for logging)
*/
private void tryCancelStatement(StatementId statementId, String reason) {
try {
LOGGER.debug(
"Attempting to cancel statement {} due to {}",
statementId.toSQLExecStatementId(),
reason);
cancelStatement(statementId);
LOGGER.debug("Successfully cancelled statement {} due to {}", statementId, reason);
} catch (Exception e) {
LOGGER.warn(
"Failed to cancel statement {} after {}: {}",
statementId.toSQLExecStatementId(),
reason,
e.getMessage());
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The warning log drops the exception stack trace by logging only e.getMessage(). Please include the throwable in the log call (e.g., as the last argument) so cancellation failures can be debugged when needed.

Suggested change
e.getMessage());
e.getMessage(),
e);

Copilot uses AI. Check for mistakes.
}
}

@Override
public ChunkLinkFetchResult getResultChunks(
StatementId typedStatementId, long chunkIndex, long chunkStartRowOffset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,36 @@ TCancelOperationResp cancelOperation(TCancelOperationReq req) throws DatabricksH
}
}

/**
* Attempts to cancel an operation, logging any errors but not throwing.
*
* <p>This method is used during error handling to ensure operations are cancelled on the server
* when the client encounters transient errors during polling. Since the operation may still be
* running on the server, we attempt to cancel it to avoid resource leaks.
*
* @param operationHandle The operation handle to cancel
* @param statementId The statement ID (for logging)
* @param reason The reason for cancellation (for logging)
*/
private void tryCancelOperation(
TOperationHandle operationHandle, StatementId statementId, String reason) {
try {
LOGGER.debug(
"Attempting to cancel operation for statement {} due to {}",
statementId.toSQLExecStatementId(),
reason);
cancelOperation(new TCancelOperationReq().setOperationHandle(operationHandle));
LOGGER.debug(
"Successfully cancelled operation for statement {} due to {}", statementId, reason);
} catch (Exception e) {
LOGGER.warn(
"Failed to cancel operation for statement {} after {}: {}",
statementId.toSQLExecStatementId(),
reason,
e.getMessage());
Comment on lines +195 to +198
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same logging issue here: only e.getMessage() is logged, which loses the stack trace/context. Please pass the exception into the logger so the full failure cause is captured.

Suggested change
"Failed to cancel operation for statement {} after {}: {}",
statementId.toSQLExecStatementId(),
reason,
e.getMessage());
"Failed to cancel operation for statement {} after {}",
statementId.toSQLExecStatementId(),
reason,
e);

Copilot uses AI. Check for mistakes.
}
}

TCloseOperationResp closeOperation(TCloseOperationReq req) throws DatabricksHttpException {
try {
return getThriftClient().CloseOperation(req);
Expand Down Expand Up @@ -310,7 +340,14 @@ private TGetOperationStatusResp pollTillOperationFinished(
timeoutHandler.checkTimeout();

// Polling for operation status
statusResp = getOperationStatus(statusReq, statementId);
try {
statusResp = getOperationStatus(statusReq, statementId);
} catch (TException e) {
// Cancel the statement on the server before re-throwing. Unlikely the cancel will succeed,
// at least it will print an error message indicating the operation is still running.
tryCancelOperation(response.getOperationHandle(), statementId, "polling error");
throw e;
}
checkOperationStatusForErrors(statusResp, statementId.toSQLExecStatementId());
// Save some time if sleep isn't required by breaking.
if (!shouldContinuePolling(statusResp)) {
Expand All @@ -320,8 +357,7 @@ private TGetOperationStatusResp pollTillOperationFinished(
TimeUnit.MILLISECONDS.sleep(asyncPollIntervalMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore interrupt flag
Copy link
Collaborator

@gopalldb gopalldb Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restore interrupt flag -> can we add this to sdkClient handling as well?

cancelOperation(
new TCancelOperationReq().setOperationHandle(response.getOperationHandle()));
tryCancelOperation(response.getOperationHandle(), statementId, "thread interruption");
throw new DatabricksSQLException(
"Query execution interrupted", e, DatabricksDriverErrorCode.THREAD_INTERRUPTED_ERROR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -886,4 +886,67 @@ public void testExecuteStatementWithClosedStatusAndNoParentStatement() throws Ex
connection.getSession(),
null));
}

@Test
public void testPollingErrorCancelsStatement() throws Exception {
// Test that when polling fails with an IOException, the statement is cancelled
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksSdkClient databricksSdkClient =
new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient);
DatabricksConnection connection =
new DatabricksConnection(connectionContext, databricksSdkClient);

// Mock session creation
CreateSessionResponse sessionResponse = new CreateSessionResponse().setSessionId(SESSION_ID);
when(apiClient.execute(any(Request.class), eq(CreateSessionResponse.class)))
.thenReturn(sessionResponse);
connection.open();

DatabricksStatement statement = new DatabricksStatement(connection);

// Create initial response with RUNNING status (requires polling)
StatementStatus runningStatus = new StatementStatus().setState(StatementState.RUNNING);
ExecuteStatementResponse executeResponse =
new ExecuteStatementResponse()
.setStatementId(STATEMENT_ID.toSQLExecStatementId())
.setStatus(runningStatus);

// Track cancel calls
final boolean[] cancelCalled = {false};

when(apiClient.execute(any(Request.class), any()))
.thenAnswer(
invocation -> {
Request req = invocation.getArgument(0);
if (req.getUrl().equals(STATEMENT_PATH)) {
return executeResponse;
} else if (req.getUrl().equals(SESSION_PATH)) {
return sessionResponse;
} else if (req.getUrl().contains("/statements/")
&& req.getUrl().contains("/cancel")) {
cancelCalled[0] = true;
return null;
} else if (req.getUrl().contains("/statements/")) {
// Simulate polling error - throw IOException
throw new IOException("Network error during polling");
}
return null;
});

// Execute should throw due to polling error
assertThrows(
DatabricksSQLException.class,
() ->
databricksSdkClient.executeStatement(
STATEMENT,
warehouse,
new HashMap<>(),
StatementType.QUERY,
connection.getSession(),
statement));

// Verify that cancel was called
assertTrue(cancelCalled[0], "Cancel should have been called when polling failed");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -953,4 +953,38 @@ private TFetchResultsReq getFetchResultsRequest(boolean includeMetadata)
}
return request;
}

@Test
public void testPollingErrorCancelsOperation() throws Exception {
// Test that when GetOperationStatus fails with TException, the operation is cancelled
TExecuteStatementReq request = new TExecuteStatementReq();

// Response with RUNNING state - requires polling
TExecuteStatementResp tExecuteStatementResp =
new TExecuteStatementResp()
.setOperationHandle(tOperationHandle)
.setStatus(new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS));

Statement statement = mock(Statement.class);
when(parentStatement.getStatement()).thenReturn(statement);
when(statement.getQueryTimeout()).thenReturn(0);
when(thriftClient.ExecuteStatement(request)).thenReturn(tExecuteStatementResp);

// First GetOperationStatus returns RUNNING, second throws TException
when(thriftClient.GetOperationStatus(operationStatusReq))
.thenReturn(operationStatusRunningResp)
.thenThrow(new TException("Network error during polling"));

// Mock CancelOperation to succeed
when(thriftClient.CancelOperation(any(TCancelOperationReq.class)))
.thenReturn(new TCancelOperationResp());
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stub returns an empty TCancelOperationResp. If production code inspects the response status (common in Thrift patterns), an empty response can cause unexpected exceptions and make the test pass for the wrong reason (because tryCancelOperation swallows exceptions). Consider returning a response populated with a SUCCESS status to better reflect a real successful cancellation.

Suggested change
.thenReturn(new TCancelOperationResp());
.thenReturn(
new TCancelOperationResp()
.setStatus(new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS)));

Copilot uses AI. Check for mistakes.

// Execute should throw due to polling error
assertThrows(
DatabricksHttpException.class,
() -> accessor.execute(request, parentStatement, session, StatementType.SQL));

// Verify that CancelOperation was called
verify(thriftClient).CancelOperation(any(TCancelOperationReq.class));
}
}
Loading