diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java index eafbefde82..df141319e4 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java @@ -1193,11 +1193,6 @@ public boolean isTokenFederationEnabled() { return getParameter(DatabricksJdbcUrlParams.ENABLE_TOKEN_FEDERATION, "1").equals("1"); } - @Override - public boolean isStreamingChunkProviderEnabled() { - return getParameter(DatabricksJdbcUrlParams.ENABLE_STREAMING_CHUNK_PROVIDER).equals("1"); - } - @Override public int getLinkPrefetchWindow() { return Integer.parseInt(getParameter(DatabricksJdbcUrlParams.LINK_PREFETCH_WINDOW)); diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java index 1d7067a3e8..947020acc7 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java +++ b/src/main/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResult.java @@ -103,42 +103,31 @@ private static ChunkProvider createRemoteChunkProvider( IDatabricksConnectionContext connectionContext = session.getConnectionContext(); - if (connectionContext.isStreamingChunkProviderEnabled()) { - LOGGER.info( - "Using StreamingChunkProvider for statementId: {}", statementId.toSQLExecStatementId()); - - ChunkLinkFetcher linkFetcher = new SeaChunkLinkFetcher(session, statementId); - CompressionCodec compressionCodec = resultManifest.getResultCompression(); - int maxChunksInMemory = connectionContext.getCloudFetchThreadPoolSize(); - int linkPrefetchWindow = connectionContext.getLinkPrefetchWindow(); - int chunkReadyTimeoutSeconds = connectionContext.getChunkReadyTimeoutSeconds(); - double cloudFetchSpeedThreshold = connectionContext.getCloudFetchSpeedThreshold(); - - // Convert ExternalLinks to ChunkLinkFetchResult for the provider - ChunkLinkFetchResult initialLinks = - convertToChunkLinkFetchResult( - resultData.getExternalLinks(), resultManifest.getTotalChunkCount()); - - return new StreamingChunkProvider( - linkFetcher, - httpClient, - compressionCodec, - statementId, - maxChunksInMemory, - linkPrefetchWindow, - chunkReadyTimeoutSeconds, - cloudFetchSpeedThreshold, - initialLinks); - } else { - // Use the original RemoteChunkProvider - return new RemoteChunkProvider( - statementId, - resultManifest, - resultData, - session, - httpClient, - connectionContext.getCloudFetchThreadPoolSize()); - } + LOGGER.info( + "Using StreamingChunkProvider for statementId: {}", statementId.toSQLExecStatementId()); + + ChunkLinkFetcher linkFetcher = new SeaChunkLinkFetcher(session, statementId); + CompressionCodec compressionCodec = resultManifest.getResultCompression(); + int maxChunksInMemory = connectionContext.getCloudFetchThreadPoolSize(); + int linkPrefetchWindow = connectionContext.getLinkPrefetchWindow(); + int chunkReadyTimeoutSeconds = connectionContext.getChunkReadyTimeoutSeconds(); + double cloudFetchSpeedThreshold = connectionContext.getCloudFetchSpeedThreshold(); + + // Convert ExternalLinks to ChunkLinkFetchResult for the provider + ChunkLinkFetchResult initialLinks = + convertToChunkLinkFetchResult( + resultData.getExternalLinks(), resultManifest.getTotalChunkCount()); + + return new StreamingChunkProvider( + linkFetcher, + httpClient, + compressionCodec, + statementId, + maxChunksInMemory, + linkPrefetchWindow, + chunkReadyTimeoutSeconds, + cloudFetchSpeedThreshold, + initialLinks); } public ArrowStreamResult( @@ -193,39 +182,28 @@ private static ChunkProvider createThriftRemoteChunkProvider( CompressionCodec compressionCodec = CompressionCodec.getCompressionMapping(resultsResp.getResultSetMetadata()); - if (connectionContext.isStreamingChunkProviderEnabled()) { - StatementId statementId = parentStatement.getStatementId(); - LOGGER.info("Using StreamingChunkProvider for Thrift statementId: {}", statementId); - - ChunkLinkFetcher linkFetcher = new ThriftChunkLinkFetcher(session, statementId); - int maxChunksInMemory = connectionContext.getCloudFetchThreadPoolSize(); - int linkPrefetchWindow = connectionContext.getLinkPrefetchWindow(); - int chunkReadyTimeoutSeconds = connectionContext.getChunkReadyTimeoutSeconds(); - double cloudFetchSpeedThreshold = connectionContext.getCloudFetchSpeedThreshold(); - - // Convert initial Thrift links to ChunkLinkFetchResult - ChunkLinkFetchResult initialLinks = convertThriftLinksToChunkLinkFetchResult(resultsResp); - - return new StreamingChunkProvider( - linkFetcher, - httpClient, - compressionCodec, - statementId, - maxChunksInMemory, - linkPrefetchWindow, - chunkReadyTimeoutSeconds, - cloudFetchSpeedThreshold, - initialLinks); - } else { - // Use the original RemoteChunkProvider - return new RemoteChunkProvider( - parentStatement, - resultsResp, - session, - httpClient, - connectionContext.getCloudFetchThreadPoolSize(), - compressionCodec); - } + StatementId statementId = parentStatement.getStatementId(); + LOGGER.info("Using StreamingChunkProvider for Thrift statementId: {}", statementId); + + ChunkLinkFetcher linkFetcher = new ThriftChunkLinkFetcher(session, statementId); + int maxChunksInMemory = connectionContext.getCloudFetchThreadPoolSize(); + int linkPrefetchWindow = connectionContext.getLinkPrefetchWindow(); + int chunkReadyTimeoutSeconds = connectionContext.getChunkReadyTimeoutSeconds(); + double cloudFetchSpeedThreshold = connectionContext.getCloudFetchSpeedThreshold(); + + // Convert initial Thrift links to ChunkLinkFetchResult + ChunkLinkFetchResult initialLinks = convertThriftLinksToChunkLinkFetchResult(resultsResp); + + return new StreamingChunkProvider( + linkFetcher, + httpClient, + compressionCodec, + statementId, + maxChunksInMemory, + linkPrefetchWindow, + chunkReadyTimeoutSeconds, + cloudFetchSpeedThreshold, + initialLinks); } public List getArrowMetadata() throws DatabricksSQLException { diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/ChunkDownloadTask.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/ChunkDownloadTask.java deleted file mode 100644 index 007af228e9..0000000000 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/ChunkDownloadTask.java +++ /dev/null @@ -1,129 +0,0 @@ -package com.databricks.jdbc.api.impl.arrow; - -import com.databricks.jdbc.api.internal.IDatabricksConnectionContext; -import com.databricks.jdbc.common.util.DatabricksThreadContextHolder; -import com.databricks.jdbc.dbclient.IDatabricksHttpClient; -import com.databricks.jdbc.exception.DatabricksSQLException; -import com.databricks.jdbc.log.JdbcLogger; -import com.databricks.jdbc.log.JdbcLoggerFactory; -import com.databricks.jdbc.model.core.ExternalLink; -import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode; -import java.io.IOException; -import java.util.Arrays; -import java.util.concurrent.ExecutionException; - -/** Task class to manage download for a single chunk. */ -class ChunkDownloadTask implements DatabricksCallableTask { - - private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(ChunkDownloadTask.class); - public static final int MAX_RETRIES = 5; - private static final long RETRY_DELAY_MS = 1500; // 1.5 seconds - private final ArrowResultChunk chunk; - private final IDatabricksHttpClient httpClient; - private final ChunkDownloadManager chunkDownloader; - private final IDatabricksConnectionContext connectionContext; - private final String statementId; - private final ChunkLinkDownloadService linkDownloadService; - Throwable uncaughtException = null; - - ChunkDownloadTask( - ArrowResultChunk chunk, - IDatabricksHttpClient httpClient, - ChunkDownloadManager chunkDownloader, - ChunkLinkDownloadService linkDownloadService) { - this.chunk = chunk; - this.httpClient = httpClient; - this.chunkDownloader = chunkDownloader; - this.connectionContext = DatabricksThreadContextHolder.getConnectionContext(); - this.statementId = DatabricksThreadContextHolder.getStatementId(); - this.linkDownloadService = linkDownloadService; - } - - @Override - public Void call() throws DatabricksSQLException, ExecutionException, InterruptedException { - int retries = 0; - boolean downloadSuccessful = false; - - // Sets context in the newly spawned thread - DatabricksThreadContextHolder.setConnectionContext(this.connectionContext); - DatabricksThreadContextHolder.setStatementId(this.statementId); - - try { - DatabricksThreadContextHolder.setRetryCount(retries); - while (!downloadSuccessful) { - try { - if (chunk.isChunkLinkInvalid()) { - ExternalLink link = - linkDownloadService - .getLinkForChunk(chunk.getChunkIndex()) - .get(); // Block until link is available - chunk.setChunkLink(link); - } - - chunk.downloadData( - httpClient, - chunkDownloader.getCompressionCodec(), - connectionContext != null ? connectionContext.getCloudFetchSpeedThreshold() : 0.1); - downloadSuccessful = true; - } catch (IOException | DatabricksSQLException e) { - retries++; - if (retries >= MAX_RETRIES) { - LOGGER.error( - e, - "Failed to download chunk after %d attempts. Chunk index: %d, Error: %s", - MAX_RETRIES, - chunk.getChunkIndex(), - e.getMessage()); - chunk.setStatus(ChunkStatus.DOWNLOAD_FAILED); - throw new DatabricksSQLException( - "Failed to download chunk after multiple attempts", - e, - statementId, - chunk.getChunkIndex(), - DatabricksDriverErrorCode.CHUNK_DOWNLOAD_ERROR.name()); - } else { - LOGGER.warn( - String.format( - "Retry attempt %d for chunk index: %d, Error: %s", - retries, chunk.getChunkIndex(), e.getMessage())); - chunk.setStatus(ChunkStatus.DOWNLOAD_RETRY); - try { - Thread.sleep(RETRY_DELAY_MS); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new DatabricksSQLException( - "Chunk download was interrupted", - ie, - DatabricksDriverErrorCode.THREAD_INTERRUPTED_ERROR); - } - } - } - } - } catch (Throwable t) { - uncaughtException = t; - throw t; - } finally { - if (downloadSuccessful) { - chunk.getChunkReadyFuture().complete(null); // complete the void future successfully - } else { - LOGGER.info( - "Uncaught exception during chunk download. Chunk index: {}, Error: {}", - chunk.getChunkIndex(), - Arrays.toString(uncaughtException.getStackTrace())); - // Status is set to DOWNLOAD_SUCCEEDED in the happy path. For any failure case, - // explicitly set status to DOWNLOAD_FAILED here to ensure consistent error handling - chunk.setStatus(ChunkStatus.DOWNLOAD_FAILED); - chunk - .getChunkReadyFuture() - .completeExceptionally( - new DatabricksSQLException( - "Download failed for chunk index " + chunk.getChunkIndex(), - DatabricksDriverErrorCode.CHUNK_DOWNLOAD_ERROR)); - } - - DatabricksThreadContextHolder.clearAllContext(); - } - - return null; - } -} diff --git a/src/main/java/com/databricks/jdbc/api/impl/arrow/RemoteChunkProvider.java b/src/main/java/com/databricks/jdbc/api/impl/arrow/RemoteChunkProvider.java deleted file mode 100644 index f8ddf0a2b5..0000000000 --- a/src/main/java/com/databricks/jdbc/api/impl/arrow/RemoteChunkProvider.java +++ /dev/null @@ -1,145 +0,0 @@ -package com.databricks.jdbc.api.impl.arrow; - -import com.databricks.jdbc.api.internal.IDatabricksSession; -import com.databricks.jdbc.api.internal.IDatabricksStatementInternal; -import com.databricks.jdbc.common.CompressionCodec; -import com.databricks.jdbc.common.util.DatabricksThreadContextHolder; -import com.databricks.jdbc.dbclient.IDatabricksHttpClient; -import com.databricks.jdbc.dbclient.impl.common.StatementId; -import com.databricks.jdbc.exception.DatabricksSQLException; -import com.databricks.jdbc.model.client.thrift.generated.TFetchResultsResp; -import com.databricks.jdbc.model.client.thrift.generated.TSparkArrowResultLink; -import com.databricks.jdbc.model.core.ResultData; -import com.databricks.jdbc.model.core.ResultManifest; -import com.databricks.sdk.service.sql.BaseChunkInfo; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - -public class RemoteChunkProvider extends AbstractRemoteChunkProvider { - private static final String CHUNKS_DOWNLOADER_THREAD_POOL_PREFIX = - "databricks-jdbc-chunks-downloader-"; - private ExecutorService chunkDownloaderExecutorService; - - RemoteChunkProvider( - StatementId statementId, - ResultManifest resultManifest, - ResultData resultData, - IDatabricksSession session, - IDatabricksHttpClient httpClient, - int chunksDownloaderThreadPoolSize) - throws DatabricksSQLException { - super( - statementId, - resultManifest, - resultData, - session, - httpClient, - chunksDownloaderThreadPoolSize, - resultManifest.getResultCompression()); - } - - RemoteChunkProvider( - IDatabricksStatementInternal parentStatement, - TFetchResultsResp resultsResp, - IDatabricksSession session, - IDatabricksHttpClient httpClient, - int chunksDownloaderThreadPoolSize, - CompressionCodec compressionCodec) - throws DatabricksSQLException { - super( - parentStatement, - resultsResp, - session, - httpClient, - chunksDownloaderThreadPoolSize, - compressionCodec); - } - - /** {@inheritDoc} */ - @Override - protected ArrowResultChunk createChunk( - StatementId statementId, long chunkIndex, BaseChunkInfo chunkInfo) - throws DatabricksSQLException { - return ArrowResultChunk.builder() - .withStatementId(statementId) - .withChunkInfo(chunkInfo) - .withChunkReadyTimeoutSeconds(chunkReadyTimeoutSeconds) - .build(); - } - - /** {@inheritDoc} */ - @Override - protected ArrowResultChunk createChunk( - StatementId statementId, long chunkIndex, TSparkArrowResultLink resultLink) - throws DatabricksSQLException { - return ArrowResultChunk.builder() - .withStatementId(statementId) - .withThriftChunkInfo(chunkIndex, resultLink) - .withChunkReadyTimeoutSeconds(chunkReadyTimeoutSeconds) - .build(); - } - - /** - * {@inheritDoc} - * - *

Downloads the next set of available chunks asynchronously using a thread pool executor. This - * method: - * - *

    - *
  • Initializes a thread pool executor if not already created - *
  • Submits chunk download tasks to the executor while: - *
      - *
    • The provider is not closed - *
    • There are more chunks available to download - *
    • The number of chunks in memory is below the allowed limit - *
    - *
  • Tracks the total chunks in memory and the next chunk to download - *
- * - * Each chunk download is handled by a separate {@link ChunkDownloadTask} running in the executor - * service. This implementation provides non-blocking downloads using a custom thread pool for - * chunk downloads. - */ - @Override - public void downloadNextChunks() { - if (chunkDownloaderExecutorService == null) { - chunkDownloaderExecutorService = createChunksDownloaderExecutorService(); - } - - while (!isClosed - && nextChunkToDownload < chunkCount - && totalChunksInMemory < allowedChunksInMemory) { - ArrowResultChunk chunk = chunkIndexToChunksMap.get(nextChunkToDownload); - chunkDownloaderExecutorService.submit( - new ChunkDownloadTask(chunk, httpClient, this, linkDownloadService)); - totalChunksInMemory++; - nextChunkToDownload++; - } - } - - /** {@inheritDoc} */ - @Override - protected void doClose() { - isClosed = true; - chunkDownloaderExecutorService.shutdownNow(); - chunkIndexToChunksMap.values().forEach(ArrowResultChunk::releaseChunk); - DatabricksThreadContextHolder.clearStatementInfo(); - } - - private ExecutorService createChunksDownloaderExecutorService() { - ThreadFactory threadFactory = - new ThreadFactory() { - private final AtomicInteger threadCount = new AtomicInteger(1); - - public Thread newThread(final Runnable r) { - final Thread thread = new Thread(r); - thread.setName(CHUNKS_DOWNLOADER_THREAD_POOL_PREFIX + threadCount.getAndIncrement()); - thread.setDaemon(true); - return thread; - } - }; - return Executors.newFixedThreadPool(maxParallelChunkDownloadsPerQuery, threadFactory); - } -} diff --git a/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java b/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java index f3a8c09114..dc9617de1e 100644 --- a/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java +++ b/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java @@ -423,9 +423,6 @@ public interface IDatabricksConnectionContext { /** Returns whether token federation is enabled for authentication. */ boolean isTokenFederationEnabled(); - /** Returns whether streaming chunk provider is enabled for result fetching. */ - boolean isStreamingChunkProviderEnabled(); - /** * Returns the number of chunk links to prefetch ahead of consumption. * diff --git a/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java b/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java index ddbec923f8..599f557d50 100644 --- a/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java +++ b/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java @@ -188,10 +188,6 @@ public enum DatabricksJdbcUrlParams { "1"), ENABLE_TOKEN_FEDERATION( "EnableTokenFederation", "Enable token federation for authentication", "1"), - ENABLE_STREAMING_CHUNK_PROVIDER( - "EnableStreamingChunkProvider", - "Enable streaming chunk provider for result fetching (experimental)", - "0"), LINK_PREFETCH_WINDOW( "LinkPrefetchWindow", "Number of chunk links to prefetch ahead of consumption. " diff --git a/src/test/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResultTest.java b/src/test/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResultTest.java index 6e32861fec..5156b14561 100644 --- a/src/test/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResultTest.java +++ b/src/test/java/com/databricks/jdbc/api/impl/arrow/ArrowStreamResultTest.java @@ -477,17 +477,12 @@ public void testNullComplexTypeWithComplexDatatypeSupportDisabled() throws Excep // ==================== StreamingChunkProvider Instantiation Tests ==================== @Test - public void testStreamingChunkProviderEnabledForSeaResult() throws Exception { - // Enable StreamingChunkProvider via connection property + public void testStreamingChunkProviderForSeaResult() throws Exception { + // StreamingChunkProvider is always used for SEA results Properties props = new Properties(); - props.setProperty("EnableStreamingChunkProvider", "1"); IDatabricksConnectionContext connectionContext = DatabricksConnectionContextFactory.create(JDBC_URL, props); - assertTrue( - connectionContext.isStreamingChunkProviderEnabled(), - "StreamingChunkProvider should be enabled via property"); - DatabricksSession localSession = new DatabricksSession(connectionContext, mockedSdkClient); // Setup result manifest with external links (triggers remote chunk provider path) @@ -517,55 +512,12 @@ public void testStreamingChunkProviderEnabledForSeaResult() throws Exception { } @Test - public void testStreamingChunkProviderDisabledUsesRemoteChunkProvider() throws Exception { - // Default properties - StreamingChunkProvider disabled - Properties props = new Properties(); - IDatabricksConnectionContext connectionContext = - DatabricksConnectionContextFactory.create(JDBC_URL, props); - - assertFalse( - connectionContext.isStreamingChunkProviderEnabled(), - "StreamingChunkProvider should be disabled by default"); - - DatabricksSession localSession = new DatabricksSession(connectionContext, mockedSdkClient); - - ResultManifest resultManifest = - new ResultManifest() - .setTotalChunkCount(1L) - .setTotalRowCount(110L) - .setTotalByteCount(1000L) - .setResultCompression(CompressionCodec.NONE) - .setChunks(this.chunkInfos.subList(0, 1)) - .setSchema(new ResultSchema().setColumns(new ArrayList<>()).setColumnCount(0L)); - - ResultData localResultData = new ResultData().setExternalLinks(getChunkLinks(0L, 0L, true)); - - setupMockResponse(); - when(mockHttpClient.execute(isA(HttpUriRequest.class), eq(true))).thenReturn(httpResponse); - - ArrowStreamResult result = - new ArrowStreamResult( - resultManifest, localResultData, STATEMENT_ID, localSession, mockHttpClient); - - // Verify result was created successfully with RemoteChunkProvider - assertNotNull(result); - assertTrue(result.hasNext(), "Result should have data"); - assertTrue(result.next()); - assertDoesNotThrow(result::close); - } - - @Test - public void testStreamingChunkProviderEnabledForThriftResult() throws Exception { - // Enable StreamingChunkProvider via connection property + public void testStreamingChunkProviderForThriftResult() throws Exception { + // StreamingChunkProvider is always used for Thrift results Properties props = new Properties(); - props.setProperty("EnableStreamingChunkProvider", "1"); IDatabricksConnectionContext connectionContext = DatabricksConnectionContextFactory.create(JDBC_URL, props); - assertTrue( - connectionContext.isStreamingChunkProviderEnabled(), - "StreamingChunkProvider should be enabled via property"); - when(session.getConnectionContext()).thenReturn(connectionContext); when(metadataResp.getSchema()).thenReturn(TEST_TABLE_SCHEMA); @@ -617,14 +569,10 @@ private Object[][] createTestData(Schema schema, int rows) { @Test public void testSeaEmptyLinksWithZeroChunkCountReturnsEndOfStream() throws Exception { - // Enable StreamingChunkProvider Properties props = new Properties(); - props.setProperty("EnableStreamingChunkProvider", "1"); IDatabricksConnectionContext connectionContext = DatabricksConnectionContextFactory.create(JDBC_URL, props); - assertTrue(connectionContext.isStreamingChunkProviderEnabled()); - DatabricksSession localSession = new DatabricksSession(connectionContext, mockedSdkClient); // Create result manifest with zero chunks and empty external links @@ -649,14 +597,10 @@ public void testSeaEmptyLinksWithZeroChunkCountReturnsEndOfStream() throws Excep @Test public void testSeaNullLinksWithZeroChunkCountReturnsEndOfStream() throws Exception { - // Enable StreamingChunkProvider Properties props = new Properties(); - props.setProperty("EnableStreamingChunkProvider", "1"); IDatabricksConnectionContext connectionContext = DatabricksConnectionContextFactory.create(JDBC_URL, props); - assertTrue(connectionContext.isStreamingChunkProviderEnabled()); - DatabricksSession localSession = new DatabricksSession(connectionContext, mockedSdkClient); // Create result manifest with zero chunks and null external links diff --git a/src/test/java/com/databricks/jdbc/api/impl/arrow/ChunkDownloadTaskTest.java b/src/test/java/com/databricks/jdbc/api/impl/arrow/ChunkDownloadTaskTest.java deleted file mode 100644 index b10336f1e3..0000000000 --- a/src/test/java/com/databricks/jdbc/api/impl/arrow/ChunkDownloadTaskTest.java +++ /dev/null @@ -1,222 +0,0 @@ -package com.databricks.jdbc.api.impl.arrow; - -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; - -import com.databricks.jdbc.common.CompressionCodec; -import com.databricks.jdbc.dbclient.IDatabricksHttpClient; -import com.databricks.jdbc.dbclient.impl.common.StatementId; -import com.databricks.jdbc.exception.DatabricksParsingException; -import com.databricks.jdbc.exception.DatabricksSQLException; -import com.databricks.jdbc.model.core.ExternalLink; -import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode; -import com.databricks.sdk.service.sql.BaseChunkInfo; -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.net.SocketException; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.http.HttpEntity; -import org.apache.http.StatusLine; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -public class ChunkDownloadTaskTest { - @Mock ArrowResultChunk chunk; - @Mock IDatabricksHttpClient httpClient; - @Mock RemoteChunkProvider remoteChunkProvider; - @Mock ChunkLinkDownloadService chunkLinkDownloadService; - private ChunkDownloadTask chunkDownloadTask; - private CompletableFuture downloadFuture; - - @BeforeEach - void setUp() { - MockitoAnnotations.openMocks(this); - downloadFuture = new CompletableFuture<>(); - chunkDownloadTask = - new ChunkDownloadTask(chunk, httpClient, remoteChunkProvider, chunkLinkDownloadService); - } - - @Test - void testRetryLogicWithSocketException() throws Exception { - when(chunk.getChunkReadyFuture()).thenReturn(downloadFuture); - when(chunk.isChunkLinkInvalid()).thenReturn(false); - when(chunk.getChunkIndex()).thenReturn(7L); - when(remoteChunkProvider.getCompressionCodec()).thenReturn(CompressionCodec.NONE); - DatabricksParsingException throwableError = - new DatabricksParsingException( - "Connection reset", - new SocketException("Connection reset"), - DatabricksDriverErrorCode.INVALID_STATE); - - // Simulate SocketException for the first two attempts, then succeed - doThrow(throwableError) - .doThrow(throwableError) - .doNothing() - .when(chunk) - .downloadData(httpClient, CompressionCodec.NONE, 0.1); - - chunkDownloadTask.call(); - - verify(chunk, times(3)).downloadData(httpClient, CompressionCodec.NONE, 0.1); - assertTrue(downloadFuture.isDone()); - assertDoesNotThrow(() -> downloadFuture.get()); - } - - @Test - void testRetryLogicExhaustedWithSocketException() throws Exception { - when(chunk.getChunkReadyFuture()).thenReturn(downloadFuture); - when(chunk.isChunkLinkInvalid()).thenReturn(false); - when(chunk.getChunkIndex()).thenReturn(7L); - when(remoteChunkProvider.getCompressionCodec()).thenReturn(CompressionCodec.NONE); - - // Simulate SocketException for all attempts - doThrow( - new DatabricksParsingException( - "Connection reset", - new SocketException("Connection reset"), - DatabricksDriverErrorCode.INVALID_STATE)) - .when(chunk) - .downloadData(httpClient, CompressionCodec.NONE, 0.1); - - assertThrows(DatabricksSQLException.class, () -> chunkDownloadTask.call()); - verify(chunk, times(ChunkDownloadTask.MAX_RETRIES)) - .downloadData(httpClient, CompressionCodec.NONE, 0.1); - assertTrue(downloadFuture.isDone()); - ExecutionException executionException = - assertThrows(ExecutionException.class, () -> downloadFuture.get()); - assertInstanceOf(DatabricksSQLException.class, executionException.getCause()); - } - - @Test - void testRetryLogicWithRealChunkAndStatusTransitions() throws Exception { - StatementId statementId = new StatementId("test-statement-123"); - ExternalLink mockExternalLink = mock(ExternalLink.class); - when(mockExternalLink.getExternalLink()).thenReturn("https://test-url.com/chunk"); - when(mockExternalLink.getHttpHeaders()).thenReturn(Collections.emptyMap()); - - Instant expiry = Instant.now().plus(1, ChronoUnit.DAYS); - when(mockExternalLink.getExpiration()).thenReturn(expiry.toString()); - - ArrowResultChunk realChunk = - ArrowResultChunk.builder() - .withStatementId(statementId) - .withChunkInfo(createMockBaseChunkInfo(7L, 100L, 0L)) - .withChunkStatus(ChunkStatus.URL_FETCHED) - .withChunkReadyTimeoutSeconds(30) - .build(); - - // Set the chunk link after creation - realChunk.setChunkLink(mockExternalLink); - - // Mock HTTP client and responses - CloseableHttpResponse mockResponse = mock(CloseableHttpResponse.class); - HttpEntity mockEntity = mock(HttpEntity.class); - StatusLine mockStatusLine = mock(StatusLine.class); - - when(mockResponse.getStatusLine()).thenReturn(mockStatusLine); - when(mockStatusLine.getStatusCode()).thenReturn(200); - when(mockResponse.getEntity()).thenReturn(mockEntity); - - // Track status changes by wrapping the real chunk - List statusHistory = new ArrayList<>(); - ArrowResultChunk spiedChunk = spy(realChunk); - doAnswer( - invocation -> { - ChunkStatus status = invocation.getArgument(0); - statusHistory.add(status); - // Call the real setStatus method - invocation.callRealMethod(); - return null; - }) - .when(spiedChunk) - .setStatus(any(ChunkStatus.class)); - - // Mock the initializeData method to avoid Arrow parsing issues - doAnswer( - invocation -> { - // Simulate successful data initialization - spiedChunk.setStatus(ChunkStatus.PROCESSING_SUCCEEDED); - return null; - }) - .when(spiedChunk) - .initializeData(any(InputStream.class)); - - // Create a valid Arrow stream for successful response - byte[] validArrowData = createValidArrowStreamData(); - - // Mock HTTP client to fail twice, then succeed - AtomicInteger httpCallCount = new AtomicInteger(0); - when(httpClient.execute(any(HttpGet.class), eq(true))) - .thenAnswer( - invocation -> { - int callNumber = httpCallCount.incrementAndGet(); - if (callNumber <= 2) { - // First two calls throw SocketException - throw new SocketException("Connection reset by peer"); - } else { - // Third call succeeds - when(mockEntity.getContent()).thenReturn(new ByteArrayInputStream(validArrowData)); - return mockResponse; - } - }); - - when(remoteChunkProvider.getCompressionCodec()).thenReturn(CompressionCodec.NONE); - - // Create task with the spied chunk - ChunkDownloadTask task = - new ChunkDownloadTask( - spiedChunk, httpClient, remoteChunkProvider, chunkLinkDownloadService); - - // Execute the task - assertDoesNotThrow(task::call); - - // Verify HTTP client was called 3 times (2 failures + 1 success) - verify(httpClient, times(3)).execute(any(HttpGet.class), eq(true)); - - // Verify status progression: DOWNLOAD_FAILED -> DOWNLOAD_RETRY -> DOWNLOAD_FAILED -> - // DOWNLOAD_RETRY -> DOWNLOAD_SUCCEEDED -> PROCESSING_SUCCEEDED - assertEquals(6, statusHistory.size()); - assertEquals(ChunkStatus.DOWNLOAD_FAILED, statusHistory.get(0)); - assertEquals(ChunkStatus.DOWNLOAD_RETRY, statusHistory.get(1)); // First retry - assertEquals(ChunkStatus.DOWNLOAD_FAILED, statusHistory.get(2)); - assertEquals(ChunkStatus.DOWNLOAD_RETRY, statusHistory.get(3)); // Second retry - assertEquals(ChunkStatus.DOWNLOAD_SUCCEEDED, statusHistory.get(4)); // Download success - - // The chunk should eventually reach PROCESSING_SUCCEEDED after parsing the Arrow data - assertEquals(ChunkStatus.PROCESSING_SUCCEEDED, spiedChunk.getStatus()); - - // Verify the future completed successfully - CompletableFuture chunkFuture = spiedChunk.getChunkReadyFuture(); - assertTrue(chunkFuture.isDone()); - assertDoesNotThrow(() -> chunkFuture.get()); - - // Verify initializeData was called once (on successful download) - verify(spiedChunk, times(1)).initializeData(any(InputStream.class)); - } - - private BaseChunkInfo createMockBaseChunkInfo(long chunkIndex, long rowCount, long rowOffset) { - BaseChunkInfo mockChunkInfo = mock(BaseChunkInfo.class); - when(mockChunkInfo.getChunkIndex()).thenReturn(chunkIndex); - when(mockChunkInfo.getRowCount()).thenReturn(rowCount); - when(mockChunkInfo.getRowOffset()).thenReturn(rowOffset); - return mockChunkInfo; - } - - private byte[] createValidArrowStreamData() { - return new byte[] {1, 2, 3, 4, 5}; - } -} diff --git a/src/test/java/com/databricks/jdbc/api/impl/arrow/RemoteChunkProviderTest.java b/src/test/java/com/databricks/jdbc/api/impl/arrow/RemoteChunkProviderTest.java deleted file mode 100644 index 21cbad1f99..0000000000 --- a/src/test/java/com/databricks/jdbc/api/impl/arrow/RemoteChunkProviderTest.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.databricks.jdbc.api.impl.arrow; - -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import com.databricks.jdbc.api.internal.IDatabricksConnectionContext; -import com.databricks.jdbc.api.internal.IDatabricksSession; -import com.databricks.jdbc.common.DatabricksClientType; -import com.databricks.jdbc.dbclient.impl.common.StatementId; -import com.databricks.jdbc.model.core.ResultData; -import com.databricks.jdbc.model.core.ResultManifest; -import com.databricks.jdbc.model.core.ResultSchema; -import java.util.ArrayList; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -public class RemoteChunkProviderTest { - - private static final StatementId STATEMENT_ID = new StatementId("statement_id"); - @Mock private IDatabricksSession mockSession; - - @Test - public void testInitEmptyChunkDownloader() { - ResultManifest resultManifest = - new ResultManifest() - .setTotalChunkCount(0L) - .setTotalRowCount(0L) - .setSchema(new ResultSchema().setColumns(new ArrayList<>())); - ResultData resultData = new ResultData().setExternalLinks(new ArrayList<>()); - when(mockSession.getConnectionContext()).thenReturn(mock(IDatabricksConnectionContext.class)); - when(mockSession.getConnectionContext().getClientType()).thenReturn(DatabricksClientType.SEA); - assertDoesNotThrow( - () -> - new RemoteChunkProvider( - STATEMENT_ID, resultManifest, resultData, mockSession, null, 4)); - } -} diff --git a/src/test/java/com/databricks/jdbc/integration/e2e/ThriftCloudFetchTests.java b/src/test/java/com/databricks/jdbc/integration/e2e/ThriftCloudFetchTests.java index 1e166b3eaa..da95ea63b6 100644 --- a/src/test/java/com/databricks/jdbc/integration/e2e/ThriftCloudFetchTests.java +++ b/src/test/java/com/databricks/jdbc/integration/e2e/ThriftCloudFetchTests.java @@ -1,7 +1,6 @@ package com.databricks.jdbc.integration.e2e; import static com.databricks.jdbc.integration.IntegrationTestUtil.getValidJDBCConnection; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -9,23 +8,17 @@ import com.databricks.jdbc.api.impl.DatabricksConnection; import com.databricks.jdbc.api.impl.DatabricksResultSet; import com.databricks.jdbc.api.impl.DatabricksStatement; -import com.databricks.jdbc.api.impl.arrow.AbstractRemoteChunkProvider; -import com.databricks.jdbc.api.impl.arrow.ArrowResultChunk; import com.databricks.jdbc.api.impl.arrow.ChunkProvider; import com.databricks.jdbc.api.internal.IDatabricksSession; import com.databricks.jdbc.dbclient.IDatabricksClient; import com.databricks.jdbc.dbclient.impl.common.StatementId; -import com.databricks.jdbc.exception.DatabricksSQLException; import com.databricks.jdbc.log.JdbcLogger; import com.databricks.jdbc.log.JdbcLoggerFactory; import com.databricks.jdbc.model.core.ExternalLink; import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.List; import java.util.Optional; import java.util.Properties; import org.junit.jupiter.api.AfterEach; @@ -60,16 +53,15 @@ void tearDown() throws Exception { } /** - * Test refetching CloudFetch links from various startRowOffsets. + * Test CloudFetch with multiple chunks using StreamingChunkProvider. * *

This test: * *

    *
  1. Executes a query that generates multiple CloudFetch chunks - *
  2. Extracts the chunk provider to get chunk metadata - *
  3. Refetches from 3 different offsets: start (0), middle, and end - *
  4. Verifies each refetch returns correct subset of links - *
  5. Verifies link properties match (chunkIndex, rowOffset, rowCount, byteCount) + *
  6. Verifies the chunk provider is created successfully + *
  7. Iterates through chunks to verify they can be accessed + *
  8. Tests link refetching from the server *
*/ @Test @@ -84,7 +76,7 @@ void testCloudFetchLinksRefetchAtStartRowOffset() throws Exception { LOGGER.info("Query executed, extracting chunks..."); - // Step 2: Extract the chunks that were created by initializeChunksMap + // Step 2: Extract the chunks that were created DatabricksStatement dbStatement = (DatabricksStatement) stmt; StatementId statementId = dbStatement.getStatementId(); assertNotNull(statementId, "StatementId should be set after execution"); @@ -93,9 +85,7 @@ void testCloudFetchLinksRefetchAtStartRowOffset() throws Exception { assertTrue( chunkProviderOptional.isPresent(), "Chunk provider should exist for CloudFetch result set"); - @SuppressWarnings("unchecked") - AbstractRemoteChunkProvider chunkProvider = - (AbstractRemoteChunkProvider) chunkProviderOptional.get(); + ChunkProvider chunkProvider = chunkProviderOptional.get(); long totalChunks = chunkProvider.getChunkCount(); @@ -103,42 +93,32 @@ void testCloudFetchLinksRefetchAtStartRowOffset() throws Exception { totalChunks > 2, "Should have at least 3 chunks for this test, got: " + totalChunks); LOGGER.info("Total chunks: " + totalChunks); - // Step 3: Test refetching from various chunk indices. + // Step 3: Test refetching links from the server DatabricksConnection dbConnection = (DatabricksConnection) connection.unwrap(DatabricksConnection.class); IDatabricksSession session = dbConnection.getSession(); IDatabricksClient client = session.getDatabricksClient(); - long end = totalChunks - 1; - long mid = totalChunks / 2; - List chunkIndicesToTest = new ArrayList<>(List.of(0L, 1L, mid - 1, mid, mid + 1, end)); - // Randomize the order to make sure there is no sequence to the responses. We use FETCH_NEXT - // when fetching the next set of links. - Collections.shuffle(chunkIndicesToTest); - - for (long chunkIndex : chunkIndicesToTest) { - ArrowResultChunk targetChunk = chunkProvider.getChunkByIndex(chunkIndex); - assertNotNull(targetChunk, "Target chunk should exist at index " + chunkIndex); - - long chunkStartRowOffset = targetChunk.getStartRowOffset(); - LOGGER.info( - "Refetching from chunk index " - + chunkIndex - + " with startRowOffset: " - + chunkStartRowOffset); - - testRefetchLinks(statementId, chunkIndex, chunkStartRowOffset, chunkProvider, client); + // Test refetching from the beginning (chunk 0, row offset 0) + testRefetchLinks(statementId, 0L, 0L, client); + + // Iterate through chunks to verify they can be accessed + int chunksProcessed = 0; + while (chunkProvider.hasNextChunk() && chunksProcessed < 5) { + assertTrue(chunkProvider.next(), "Should be able to advance to next chunk"); + assertNotNull(chunkProvider.getChunk(), "Chunk should not be null"); + chunksProcessed++; + LOGGER.info("Successfully accessed chunk " + chunksProcessed); } + + assertTrue(chunksProcessed > 0, "Should have processed at least one chunk"); + LOGGER.info("Total chunks processed: " + chunksProcessed); } } private void testRefetchLinks( - StatementId statementId, - long chunkIndex, - long chunkStartRowOffset, - AbstractRemoteChunkProvider chunkProvider, - IDatabricksClient client) - throws DatabricksSQLException { + StatementId statementId, long chunkIndex, long chunkStartRowOffset, IDatabricksClient client) + throws Exception { // Fetch from the startRowOffset of the target chunk. Collection refetchedLinks = client.getResultChunks(statementId, chunkIndex, chunkStartRowOffset).getChunkLinks(); @@ -148,40 +128,10 @@ private void testRefetchLinks( LOGGER.info("Refetched " + refetchedLinks.size() + " links from chunk index " + chunkIndex); - // Convert refetched links to a list for comparison - List refetchedLinksList = new ArrayList<>(refetchedLinks); - - // Compare each refetched link with the corresponding original link. - for (int i = 0; i < refetchedLinksList.size(); i++) { - long originalChunkIndex = chunkIndex + i; - ArrowResultChunk originalChunk = chunkProvider.getChunkByIndex(originalChunkIndex); - assertNotNull(originalChunk, "Original chunk should exist at index " + originalChunkIndex); - - ExternalLink originalLink = originalChunk.getChunkLink(); - ExternalLink refetchedLink = refetchedLinksList.get(i); - - assertEquals( - originalLink.getChunkIndex(), - refetchedLink.getChunkIndex(), - "Chunk index should match for chunk " + originalChunkIndex); - - assertEquals( - originalLink.getRowOffset(), - refetchedLink.getRowOffset(), - "Start row offset should match for chunk " + originalChunkIndex); - - assertEquals( - originalLink.getRowCount(), - refetchedLink.getRowCount(), - "Row count should match for chunk " + originalChunkIndex); - - assertEquals( - originalLink.getByteCount(), - refetchedLink.getByteCount(), - "Byte count should match for chunk " + originalChunkIndex); - - assertNotNull(originalLink.getExternalLink(), "Original file link should not be null"); - assertNotNull(refetchedLink.getExternalLink(), "Refetched file link should not be null"); + // Verify each link has valid properties + for (ExternalLink link : refetchedLinks) { + assertNotNull(link.getExternalLink(), "Link should have a file URL"); + assertTrue(link.getRowCount() > 0, "Link should have positive row count"); } } } diff --git a/src/test/java/com/databricks/jdbc/integration/fakeservice/tests/ThriftCloudFetchFakeIntegrationTests.java b/src/test/java/com/databricks/jdbc/integration/fakeservice/tests/ThriftCloudFetchFakeIntegrationTests.java index d138d3e828..47e7e57171 100644 --- a/src/test/java/com/databricks/jdbc/integration/fakeservice/tests/ThriftCloudFetchFakeIntegrationTests.java +++ b/src/test/java/com/databricks/jdbc/integration/fakeservice/tests/ThriftCloudFetchFakeIntegrationTests.java @@ -1,7 +1,6 @@ package com.databricks.jdbc.integration.fakeservice.tests; import static com.databricks.jdbc.integration.IntegrationTestUtil.getValidJDBCConnection; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -9,21 +8,16 @@ import com.databricks.jdbc.api.impl.DatabricksConnection; import com.databricks.jdbc.api.impl.DatabricksResultSet; import com.databricks.jdbc.api.impl.DatabricksStatement; -import com.databricks.jdbc.api.impl.arrow.AbstractRemoteChunkProvider; -import com.databricks.jdbc.api.impl.arrow.ArrowResultChunk; import com.databricks.jdbc.api.impl.arrow.ChunkProvider; import com.databricks.jdbc.api.internal.IDatabricksSession; import com.databricks.jdbc.dbclient.IDatabricksClient; import com.databricks.jdbc.dbclient.impl.common.StatementId; -import com.databricks.jdbc.exception.DatabricksSQLException; import com.databricks.jdbc.integration.fakeservice.AbstractFakeServiceIntegrationTests; import com.databricks.jdbc.model.core.ExternalLink; import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; import java.util.Optional; import java.util.Properties; import org.apache.logging.log4j.LogManager; @@ -32,7 +26,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -/** Integration test to test CloudFetch link refetching using Thrift client. */ +/** Integration test to test CloudFetch using StreamingChunkProvider with Thrift client. */ public class ThriftCloudFetchFakeIntegrationTests extends AbstractFakeServiceIntegrationTests { /** Connection to the Databricks cluster. */ private Connection connection; @@ -62,7 +56,17 @@ void tearDown() throws Exception { } } - /** Test refetching CloudFetch links from various startRowOffsets. */ + /** + * Test CloudFetch with multiple chunks using StreamingChunkProvider. + * + *

This test verifies: + * + *

    + *
  1. StreamingChunkProvider is created for CloudFetch results + *
  2. Chunks can be iterated through successfully + *
  3. Links can be refetched from the server + *
+ */ @Test void testCloudFetchLinksRefetchAtStartRowOffset() throws Exception { final int maxRows = 6_000_000; // Generate many chunk links. @@ -76,7 +80,7 @@ void testCloudFetchLinksRefetchAtStartRowOffset() throws Exception { LOGGER.info("Query executed, extracting chunks ..."); - // Extract the chunks that were created by initializeChunksMap + // Extract the chunks that were created DatabricksStatement dbStatement = (DatabricksStatement) stmt; StatementId statementId = dbStatement.getStatementId(); assertNotNull(statementId, "StatementId should be set after execution"); @@ -85,9 +89,7 @@ void testCloudFetchLinksRefetchAtStartRowOffset() throws Exception { assertTrue( chunkProviderOptional.isPresent(), "Chunk provider should exist for CloudFetch result set"); - @SuppressWarnings("unchecked") - AbstractRemoteChunkProvider chunkProvider = - (AbstractRemoteChunkProvider) chunkProviderOptional.get(); + ChunkProvider chunkProvider = chunkProviderOptional.get(); long totalChunks = chunkProvider.getChunkCount(); assertTrue( @@ -99,35 +101,26 @@ void testCloudFetchLinksRefetchAtStartRowOffset() throws Exception { IDatabricksSession session = dbConnection.getSession(); IDatabricksClient client = session.getDatabricksClient(); - // Test refetching from various chunk indices: start (0), middle, and end. - long middle = totalChunks / 2; - long end = totalChunks - 1; + // Test refetching from the beginning (chunk 0, row offset 0) + testRefetchLinks(statementId, 0L, 0L, client); - // Go through some corner cases. - List chunkIndices = List.of(middle, 0L, middle + 1, end, middle - 1, 1L); - - for (long chunkIndex : chunkIndices) { - ArrowResultChunk targetChunk = chunkProvider.getChunkByIndex(chunkIndex); - assertNotNull(targetChunk, "Target chunk should exist at index " + chunkIndex); - - long chunkStartRowOffset = targetChunk.getStartRowOffset(); - LOGGER.info( - "Refetching from chunk index {} with startRowOffset: {}", - chunkIndex, - chunkStartRowOffset); - - testRefetchLinks(statementId, chunkIndex, chunkStartRowOffset, chunkProvider, client); + // Iterate through chunks to verify they can be accessed + int chunksProcessed = 0; + while (chunkProvider.hasNextChunk() && chunksProcessed < 5) { + assertTrue(chunkProvider.next(), "Should be able to advance to next chunk"); + assertNotNull(chunkProvider.getChunk(), "Chunk should not be null"); + chunksProcessed++; + LOGGER.info("Successfully accessed chunk {}", chunksProcessed); } + + assertTrue(chunksProcessed > 0, "Should have processed at least one chunk"); + LOGGER.info("Total chunks processed: {}", chunksProcessed); } } private void testRefetchLinks( - StatementId statementId, - long chunkIndex, - long chunkStartRowOffset, - AbstractRemoteChunkProvider chunkProvider, - IDatabricksClient client) - throws DatabricksSQLException { + StatementId statementId, long chunkIndex, long chunkStartRowOffset, IDatabricksClient client) + throws Exception { // Fetch from the startRowOffset of the target chunk Collection refetchedLinks = @@ -136,42 +129,12 @@ private void testRefetchLinks( assertNotNull(refetchedLinks, "Refetched links should not be null"); assertFalse(refetchedLinks.isEmpty(), "Refetched links should not be empty"); - LOGGER.info("Refetched " + refetchedLinks.size() + " links from chunk index " + chunkIndex); - - // Convert to list for comparison - List refetchedLinksList = new ArrayList<>(refetchedLinks); - - // Compare each refetched link with the corresponding original link - for (int i = 0; i < refetchedLinksList.size(); i++) { - long originalChunkIndex = chunkIndex + i; - ArrowResultChunk originalChunk = chunkProvider.getChunkByIndex(originalChunkIndex); - assertNotNull(originalChunk, "Original chunk should exist at index " + originalChunkIndex); - - ExternalLink originalLink = originalChunk.getChunkLink(); - ExternalLink refetchedLink = refetchedLinksList.get(i); - - assertEquals( - originalLink.getChunkIndex(), - refetchedLink.getChunkIndex(), - "Chunk index should match for chunk " + originalChunkIndex); - - assertEquals( - originalLink.getRowOffset(), - refetchedLink.getRowOffset(), - "Start row offset should match for chunk " + originalChunkIndex); - - assertEquals( - originalLink.getRowCount(), - refetchedLink.getRowCount(), - "Row count should match for chunk " + originalChunkIndex); - - assertEquals( - originalLink.getByteCount(), - refetchedLink.getByteCount(), - "Byte count should match for chunk " + originalChunkIndex); + LOGGER.info("Refetched {} links from chunk index {}", refetchedLinks.size(), chunkIndex); - assertNotNull(originalLink.getExternalLink(), "Original file link should not be null"); - assertNotNull(refetchedLink.getExternalLink(), "Refetched file link should not be null"); + // Verify each link has valid properties + for (ExternalLink link : refetchedLinks) { + assertNotNull(link.getExternalLink(), "Link should have a file URL"); + assertTrue(link.getRowCount() > 0, "Link should have positive row count"); } } }