diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java index dd3911740..9fb57fca5 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java @@ -254,6 +254,8 @@ public DatabricksResultSet executeStatement( try { Thread.sleep(connectionContext.getAsyncExecPollInterval()); } catch (InterruptedException e) { + // Cancel the statement on the server before throwing - it may still be running + tryCancelStatement(typedStatementId, "thread interruption"); String timeoutErrorMessage = String.format( "Thread interrupted due to statement timeout. StatementID %s", statementId); @@ -268,6 +270,8 @@ public DatabricksResultSet executeStatement( req.withHeaders(getHeaders("getStatement")); response = wrapGetStatementResponse(apiClient.execute(req, GetStatementResponse.class)); } catch (IOException e) { + // Cancel the statement on the server before throwing - it may still be running + tryCancelStatement(typedStatementId, "polling error"); String errorMessage = "Error while processing the get statement response"; LOGGER.error(errorMessage, e); throw new DatabricksSQLException( @@ -428,6 +432,33 @@ public void cancelStatement(StatementId typedStatementId) throws DatabricksSQLEx } } + /** + * Attempts to cancel a statement, logging any errors but not throwing. + * + *

This method is used during error handling to ensure statements are cancelled on the server + * when the client encounters transient errors during polling. Since the statement may still be + * running on the server, we attempt to cancel it to avoid resource leaks. + * + * @param statementId The statement ID to cancel + * @param reason The reason for cancellation (for logging) + */ + private void tryCancelStatement(StatementId statementId, String reason) { + try { + LOGGER.debug( + "Attempting to cancel statement {} due to {}", + statementId.toSQLExecStatementId(), + reason); + cancelStatement(statementId); + LOGGER.debug("Successfully cancelled statement {} due to {}", statementId, reason); + } catch (Exception e) { + LOGGER.warn( + "Failed to cancel statement {} after {}: {}", + statementId.toSQLExecStatementId(), + reason, + e.getMessage()); + } + } + @Override public ChunkLinkFetchResult getResultChunks( StatementId typedStatementId, long chunkIndex, long chunkStartRowOffset) diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java b/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java index 14af79bcb..9681e445a 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java @@ -169,6 +169,36 @@ TCancelOperationResp cancelOperation(TCancelOperationReq req) throws DatabricksH } } + /** + * Attempts to cancel an operation, logging any errors but not throwing. + * + *

This method is used during error handling to ensure operations are cancelled on the server + * when the client encounters transient errors during polling. Since the operation may still be + * running on the server, we attempt to cancel it to avoid resource leaks. + * + * @param operationHandle The operation handle to cancel + * @param statementId The statement ID (for logging) + * @param reason The reason for cancellation (for logging) + */ + private void tryCancelOperation( + TOperationHandle operationHandle, StatementId statementId, String reason) { + try { + LOGGER.debug( + "Attempting to cancel operation for statement {} due to {}", + statementId.toSQLExecStatementId(), + reason); + cancelOperation(new TCancelOperationReq().setOperationHandle(operationHandle)); + LOGGER.debug( + "Successfully cancelled operation for statement {} due to {}", statementId, reason); + } catch (Exception e) { + LOGGER.warn( + "Failed to cancel operation for statement {} after {}: {}", + statementId.toSQLExecStatementId(), + reason, + e.getMessage()); + } + } + TCloseOperationResp closeOperation(TCloseOperationReq req) throws DatabricksHttpException { try { return getThriftClient().CloseOperation(req); @@ -310,7 +340,14 @@ private TGetOperationStatusResp pollTillOperationFinished( timeoutHandler.checkTimeout(); // Polling for operation status - statusResp = getOperationStatus(statusReq, statementId); + try { + statusResp = getOperationStatus(statusReq, statementId); + } catch (TException e) { + // Cancel the statement on the server before re-throwing. Unlikely the cancel will succeed, + // at least it will print an error message indicating the operation is still running. + tryCancelOperation(response.getOperationHandle(), statementId, "polling error"); + throw e; + } checkOperationStatusForErrors(statusResp, statementId.toSQLExecStatementId()); // Save some time if sleep isn't required by breaking. if (!shouldContinuePolling(statusResp)) { @@ -320,8 +357,7 @@ private TGetOperationStatusResp pollTillOperationFinished( TimeUnit.MILLISECONDS.sleep(asyncPollIntervalMillis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // Restore interrupt flag - cancelOperation( - new TCancelOperationReq().setOperationHandle(response.getOperationHandle())); + tryCancelOperation(response.getOperationHandle(), statementId, "thread interruption"); throw new DatabricksSQLException( "Query execution interrupted", e, DatabricksDriverErrorCode.THREAD_INTERRUPTED_ERROR); } diff --git a/src/test/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClientTest.java b/src/test/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClientTest.java index 98c91b9b9..2236f937e 100644 --- a/src/test/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClientTest.java +++ b/src/test/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClientTest.java @@ -886,4 +886,67 @@ public void testExecuteStatementWithClosedStatusAndNoParentStatement() throws Ex connection.getSession(), null)); } + + @Test + public void testPollingErrorCancelsStatement() throws Exception { + // Test that when polling fails with an IOException, the statement is cancelled + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksSdkClient databricksSdkClient = + new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient); + DatabricksConnection connection = + new DatabricksConnection(connectionContext, databricksSdkClient); + + // Mock session creation + CreateSessionResponse sessionResponse = new CreateSessionResponse().setSessionId(SESSION_ID); + when(apiClient.execute(any(Request.class), eq(CreateSessionResponse.class))) + .thenReturn(sessionResponse); + connection.open(); + + DatabricksStatement statement = new DatabricksStatement(connection); + + // Create initial response with RUNNING status (requires polling) + StatementStatus runningStatus = new StatementStatus().setState(StatementState.RUNNING); + ExecuteStatementResponse executeResponse = + new ExecuteStatementResponse() + .setStatementId(STATEMENT_ID.toSQLExecStatementId()) + .setStatus(runningStatus); + + // Track cancel calls + final boolean[] cancelCalled = {false}; + + when(apiClient.execute(any(Request.class), any())) + .thenAnswer( + invocation -> { + Request req = invocation.getArgument(0); + if (req.getUrl().equals(STATEMENT_PATH)) { + return executeResponse; + } else if (req.getUrl().equals(SESSION_PATH)) { + return sessionResponse; + } else if (req.getUrl().contains("/statements/") + && req.getUrl().contains("/cancel")) { + cancelCalled[0] = true; + return null; + } else if (req.getUrl().contains("/statements/")) { + // Simulate polling error - throw IOException + throw new IOException("Network error during polling"); + } + return null; + }); + + // Execute should throw due to polling error + assertThrows( + DatabricksSQLException.class, + () -> + databricksSdkClient.executeStatement( + STATEMENT, + warehouse, + new HashMap<>(), + StatementType.QUERY, + connection.getSession(), + statement)); + + // Verify that cancel was called + assertTrue(cancelCalled[0], "Cancel should have been called when polling failed"); + } } diff --git a/src/test/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessorTest.java b/src/test/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessorTest.java index f67e0e497..65e5213e9 100644 --- a/src/test/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessorTest.java +++ b/src/test/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessorTest.java @@ -953,4 +953,38 @@ private TFetchResultsReq getFetchResultsRequest(boolean includeMetadata) } return request; } + + @Test + public void testPollingErrorCancelsOperation() throws Exception { + // Test that when GetOperationStatus fails with TException, the operation is cancelled + TExecuteStatementReq request = new TExecuteStatementReq(); + + // Response with RUNNING state - requires polling + TExecuteStatementResp tExecuteStatementResp = + new TExecuteStatementResp() + .setOperationHandle(tOperationHandle) + .setStatus(new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS)); + + Statement statement = mock(Statement.class); + when(parentStatement.getStatement()).thenReturn(statement); + when(statement.getQueryTimeout()).thenReturn(0); + when(thriftClient.ExecuteStatement(request)).thenReturn(tExecuteStatementResp); + + // First GetOperationStatus returns RUNNING, second throws TException + when(thriftClient.GetOperationStatus(operationStatusReq)) + .thenReturn(operationStatusRunningResp) + .thenThrow(new TException("Network error during polling")); + + // Mock CancelOperation to succeed + when(thriftClient.CancelOperation(any(TCancelOperationReq.class))) + .thenReturn(new TCancelOperationResp()); + + // Execute should throw due to polling error + assertThrows( + DatabricksHttpException.class, + () -> accessor.execute(request, parentStatement, session, StatementType.SQL)); + + // Verify that CancelOperation was called + verify(thriftClient).CancelOperation(any(TCancelOperationReq.class)); + } }