Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,8 @@ && isErrorOperationState(statusResp.getOperationState())) {
LOGGER.error(errorMsg);

String sqlState = statusResp.getSqlState();
if (QUERY_EXECUTION_TIMEOUT_SQLSTATE.equals(sqlState)) {
if (QUERY_EXECUTION_TIMEOUT_SQLSTATE.equals(sqlState)
|| statusResp.getOperationState() == TOperationState.TIMEDOUT_STATE) {
throw new DatabricksTimeoutException(
errorMsg, null, DatabricksDriverErrorCode.OPERATION_TIMEOUT_ERROR);
}
Expand Down Expand Up @@ -805,7 +806,9 @@ private boolean isErrorStatusCode(TStatus status) {
}

private boolean isErrorOperationState(TOperationState state) {
return state == TOperationState.ERROR_STATE || state == TOperationState.CLOSED_STATE;
return state == TOperationState.ERROR_STATE
|| state == TOperationState.CLOSED_STATE
|| state == TOperationState.TIMEDOUT_STATE;
}

private boolean isPendingOperationState(TOperationState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,69 @@ void testServerSideTimeoutThrowsTimeoutException() throws TException, SQLExcepti
() -> accessor.execute(request, parentStatement, session, StatementType.SQL));
}

@Test
void testTimedOutStateInDirectResultsThrowsTimeoutException()
throws TException, SQLException, DatabricksValidationException {
// Reproduces the interactive cluster Case B: server enforces query timeout and returns
// TIMEDOUT_STATE directly in directResults before the client polling loop starts.
// Previously isErrorOperationState excluded TIMEDOUT_STATE, causing the driver to fall
// through to executeFetchRequest and throw DatabricksHttpException instead.
setup(true);

TExecuteStatementReq request = new TExecuteStatementReq();
TSparkDirectResults timedOutDirectResults =
new TSparkDirectResults()
.setOperationStatus(
new TGetOperationStatusResp()
.setStatus(new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS))
.setOperationState(TOperationState.TIMEDOUT_STATE)
.setErrorMessage("Query timed out after 1 seconds"));
TExecuteStatementResp tExecuteStatementResp =
new TExecuteStatementResp()
.setOperationHandle(tOperationHandle)
.setStatus(new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS))
.setDirectResults(timedOutDirectResults);
when(thriftClient.ExecuteStatement(request)).thenReturn(tExecuteStatementResp);

Statement statement = mock(Statement.class);
when(parentStatement.getStatement()).thenReturn(statement);
when(statement.getQueryTimeout()).thenReturn(300); // Long client timeout — server fires first

assertThrows(
DatabricksTimeoutException.class,
() -> accessor.execute(request, parentStatement, session, StatementType.SQL));
}

@Test
void testTimedOutStateDuringPollingThrowsTimeoutException()
throws TException, SQLException, DatabricksValidationException {
// Server returns RUNNING_STATE initially, then TIMEDOUT_STATE during polling —
// e.g. cluster enforces its own max query duration while client timeout is longer.
setup(true);

TExecuteStatementReq request = new TExecuteStatementReq();
TExecuteStatementResp tExecuteStatementResp =
new TExecuteStatementResp()
.setOperationHandle(tOperationHandle)
.setStatus(new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS));
when(thriftClient.ExecuteStatement(request)).thenReturn(tExecuteStatementResp);

TGetOperationStatusResp timedOutStatusResp =
new TGetOperationStatusResp()
.setStatus(new TStatus().setStatusCode(TStatusCode.SUCCESS_STATUS))
.setOperationState(TOperationState.TIMEDOUT_STATE)
.setErrorMessage("Query timed out after 1 seconds");
when(thriftClient.GetOperationStatus(operationStatusReq)).thenReturn(timedOutStatusResp);

Statement statement = mock(Statement.class);
when(parentStatement.getStatement()).thenReturn(statement);
when(statement.getQueryTimeout()).thenReturn(300); // Long client timeout — server fires first

assertThrows(
DatabricksTimeoutException.class,
() -> accessor.execute(request, parentStatement, session, StatementType.SQL));
}

@Test
void testFetchResultsWithCustomMaxRowsPerBlock()
throws TException, SQLException, DatabricksValidationException {
Expand Down
Loading