diff --git a/astra/pom.xml b/astra/pom.xml index 9cefee51e7..1d12cba654 100644 --- a/astra/pom.xml +++ b/astra/pom.xml @@ -29,7 +29,7 @@ 2.11.1 5.8.0 2.24.3 - 2.31.63 + 2.35.6 2.38.0 5.13.1 @@ -357,7 +357,7 @@ software.amazon.awssdk.crt aws-crt - 0.38.5 + 0.39.0 software.amazon.awssdk diff --git a/astra/src/main/java/com/slack/astra/blobfs/BlobStore.java b/astra/src/main/java/com/slack/astra/blobfs/BlobStore.java index 6c800229af..d238338932 100644 --- a/astra/src/main/java/com/slack/astra/blobfs/BlobStore.java +++ b/astra/src/main/java/com/slack/astra/blobfs/BlobStore.java @@ -7,12 +7,18 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import org.slf4j.Logger; @@ -21,6 +27,7 @@ import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer; +import software.amazon.awssdk.crt.s3.ChecksumAlgorithm; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.CopyObjectRequest; import software.amazon.awssdk.services.s3.model.Delete; @@ -41,6 +48,7 @@ import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest; import software.amazon.awssdk.transfer.s3.model.DownloadRequest; import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest; +import software.amazon.awssdk.transfer.s3.model.UploadFileRequest; /** * Blob store abstraction for basic operations on chunk/snapshots on remote storage. All operations @@ -68,6 +76,69 @@ public BlobStore(S3AsyncClient s3AsyncClient, String bucketName) { * @throws IllegalStateException Thrown if any files fail to upload * @throws RuntimeException Thrown when error is considered generally non-retryable */ + public void uploadSequentially(String prefix, Path directoryToUpload) { + assert prefix != null && !prefix.isEmpty(); + assert directoryToUpload.toFile().isDirectory(); + assert Objects.requireNonNull(directoryToUpload.toFile().listFiles()).length > 0; + + try { + // Get all files and sort by size (largest first for better progress visibility) + List allFiles; + try (Stream pathStream = Files.walk(directoryToUpload)) { + allFiles = + pathStream + .filter(Files::isRegularFile) + .sorted((a, b) -> Long.compare(b.toFile().length(), a.toFile().length())) + .collect(Collectors.toList()); + } + + List failedUploads = new ArrayList<>(); + int completedCount = 0; + + for (Path file : allFiles) { + String relativePath = directoryToUpload.relativize(file).toString(); + String s3Key = prefix + "/" + relativePath; + + try { + // Upload single file with checksum validation + transferManager + .uploadFile( + UploadFileRequest.builder() + .putObjectRequest( + req -> + req.bucket(bucketName) + .key(s3Key) + .checksumAlgorithm( + String.valueOf( + ChecksumAlgorithm.SHA256)) // Critical for integrity + ) + .source(file) + .build()) + .completionFuture() + .get(); // Wait for completion + + completedCount++; + } catch (Exception e) { + LOG.error("Failed to upload file: {} for prefix: {}", relativePath, prefix, e); + failedUploads.add(relativePath); + } + } + + // Handle failures same as original code + if (!failedUploads.isEmpty()) { + throw new IllegalStateException( + String.format( + "Some files failed to upload for prefix %s - attempted to upload %s files, failed %s.", + prefix, allFiles.size(), failedUploads.size())); + } + LOG.info( + "Successfully uploaded all {} files sequentially for prefix {}", completedCount, prefix); + + } catch (IOException e) { + throw new RuntimeException("Failed to walk directory", e); + } + } + public void upload(String prefix, Path directoryToUpload) { assert prefix != null && !prefix.isEmpty(); assert directoryToUpload.toFile().isDirectory(); @@ -132,14 +203,17 @@ public void download(String prefix, Path destinationDirectory) { download .failedTransfers() .forEach( - failedFileUpload -> - LOG.error( - "Error attempting to download file from S3", failedFileUpload.exception())); + failedFileDownload -> { + LOG.error( + "Error attempting to download file from S3 for prefix {}", + prefix, + failedFileDownload.exception()); + }); throw new IllegalStateException( String.format( - "Some files failed to download - failed to download %s files.", - download.failedTransfers().size())); + "Some files failed to download for prefix %s - failed to download %s files.", + prefix, download.failedTransfers().size())); } } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); @@ -195,6 +269,28 @@ public List listFiles(String prefix) { return filesList; } + public Map listFilesWithSize(String prefix) { + assert prefix != null && !prefix.isEmpty(); + + ListObjectsV2Request listRequest = builder().bucket(bucketName).prefix(prefix).build(); + ListObjectsV2Publisher asyncPaginatedListResponse = + s3AsyncClient.listObjectsV2Paginator(listRequest); + + Map filesListWithSize = new HashMap<>(); + try { + asyncPaginatedListResponse + .subscribe( + listResponse -> + listResponse + .contents() + .forEach(s3Object -> filesListWithSize.put(s3Object.key(), s3Object.size()))) + .get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + return filesListWithSize; + } + /** * Deletes a chunk off of object storage by chunk id. If object was not found returns false. * diff --git a/astra/src/main/java/com/slack/astra/blobfs/S3AsyncUtil.java b/astra/src/main/java/com/slack/astra/blobfs/S3AsyncUtil.java index beba2ef778..cb5de3a6e2 100644 --- a/astra/src/main/java/com/slack/astra/blobfs/S3AsyncUtil.java +++ b/astra/src/main/java/com/slack/astra/blobfs/S3AsyncUtil.java @@ -1,5 +1,7 @@ package com.slack.astra.blobfs; +import static software.amazon.awssdk.core.checksums.ResponseChecksumValidation.WHEN_SUPPORTED; + import com.google.common.base.Preconditions; import com.slack.astra.proto.config.AstraConfigs; import java.net.URI; @@ -58,7 +60,9 @@ public static S3AsyncClient initS3Client(AstraConfigs.S3Config config) { .targetThroughputInGbps(config.getS3TargetThroughputGbps()) .region(Region.of(region)) .maxNativeMemoryLimitInBytes(maxNativeMemoryLimitBytes) - .credentialsProvider(awsCredentialsProvider); + .credentialsProvider(awsCredentialsProvider) + .responseChecksumValidation(WHEN_SUPPORTED) + .minimumPartSizeInBytes(64 * 1024 * 1024L); // 64MiB minimum part size // We add a healthcheck to prevent an error with the CRT client, where it will // continue to attempt to read data from a socket that is no longer returning data diff --git a/astra/src/main/java/com/slack/astra/chunk/ChunkValidationUtils.java b/astra/src/main/java/com/slack/astra/chunk/ChunkValidationUtils.java new file mode 100644 index 0000000000..9ddbc2772a --- /dev/null +++ b/astra/src/main/java/com/slack/astra/chunk/ChunkValidationUtils.java @@ -0,0 +1,17 @@ +package com.slack.astra.chunk; + +import java.nio.file.Path; +import org.apache.lucene.index.CheckIndex; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.NoLockFactory; + +public class ChunkValidationUtils { + + public static boolean isChunkClean(Path path) throws Exception { + FSDirectory existingDir = FSDirectory.open(path, NoLockFactory.INSTANCE); + try (CheckIndex checker = new CheckIndex(existingDir)) { + CheckIndex.Status status = checker.checkIndex(); + return status.clean; + } + } +} diff --git a/astra/src/main/java/com/slack/astra/chunk/ReadOnlyChunkImpl.java b/astra/src/main/java/com/slack/astra/chunk/ReadOnlyChunkImpl.java index 050ce4333b..4bd221e72d 100644 --- a/astra/src/main/java/com/slack/astra/chunk/ReadOnlyChunkImpl.java +++ b/astra/src/main/java/com/slack/astra/chunk/ReadOnlyChunkImpl.java @@ -1,5 +1,6 @@ package com.slack.astra.chunk; +import static com.slack.astra.chunk.ChunkValidationUtils.isChunkClean; import static com.slack.astra.chunkManager.CachingChunkManager.ASTRA_NG_DYNAMIC_CHUNK_SIZES_FLAG; import static com.slack.astra.server.AstraConfig.DEFAULT_ZK_TIMEOUT_SECS; @@ -28,9 +29,11 @@ import com.slack.astra.proto.metadata.Metadata; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; +import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Instant; import java.util.EnumSet; import java.util.List; @@ -228,6 +231,62 @@ public CacheNodeAssignment getCacheNodeAssignment() { return assignment; } + private boolean validateS3vsLocalDownLoad() { + // check if the number of files in S3 matches the local directory + Map filesWithSizeInS3 = blobStore.listFilesWithSize(snapshotMetadata.snapshotId); + + Map localFilesSizeMap; + List mismatchFiles = new java.util.ArrayList<>(); + try (Stream fileList = Files.list(dataDirectory)) { + localFilesSizeMap = + fileList + .filter(Files::isRegularFile) + .collect( + Collectors.toMap( + path -> + dataDirectory.relativize(path).toString().replace(File.separator, "/"), + path -> path.toFile().length())); + } catch (IOException e) { + throw new RuntimeException( + String.format("Error reading local files in directory %s", dataDirectory), e); + } + if (localFilesSizeMap.size() != filesWithSizeInS3.size()) { + LOG.error( + "Mismatch in number of files in S3 ({}) and local directory ({}) for snapshot {}", + filesWithSizeInS3.size(), + localFilesSizeMap.size(), + snapshotMetadata.toString()); + return false; + } + + for (Map.Entry entry : filesWithSizeInS3.entrySet()) { + String s3Path = entry.getKey(); + long s3Size = entry.getValue(); + String fileName = Paths.get(s3Path).getFileName().toString(); + + if (!localFilesSizeMap.containsKey(fileName) + || !localFilesSizeMap.get(fileName).equals(s3Size)) { + mismatchFiles.add(fileName); + } + } + if (!mismatchFiles.isEmpty()) { + String mismatchFilesAndSize = + mismatchFiles.stream() + .map( + e -> + String.format( + "%s (S3Size: %s, LocalSize: %s)", + e, filesWithSizeInS3.get(e), localFilesSizeMap.get(e))) + .collect(Collectors.joining(", ")); + LOG.error( + "Mismatch in file sizes between S3 and local directory for snapshot {}. Mismatch files: {}", + snapshotMetadata.toString(), + mismatchFilesAndSize); + return false; + } + return true; + } + public void downloadChunkData() { Timer.Sample assignmentTimer = Timer.start(meterRegistry); // lock @@ -265,7 +324,24 @@ public void downloadChunkData() { "No files found on blob storage, released slot for re-assignment"); } } + // validate if the number of files in S3 matches the local directory + if (!validateS3vsLocalDownLoad()) { + String errorString = + String.format( + "Mismatch in number or size of files in S3 and local directory for snapshot %s", + snapshotMetadata); + throw new RuntimeException(errorString); + } + + // check if lucene index is valid and not corrupted + boolean luceneStatus = isChunkClean(dataDirectory); + if (!luceneStatus) { + throw new IOException( + String.format( + "Lucene index is not clean. Found issues for snapshot: %s.", snapshotMetadata)); + } + // check if schema file exists Path schemaPath = Path.of(dataDirectory.toString(), ReadWriteChunk.SCHEMA_FILE_NAME); if (!Files.exists(schemaPath)) { throw new RuntimeException("We expect a schema.json file to exist within the index"); @@ -305,7 +381,7 @@ public void downloadChunkData() { // disregarding any errors setAssignmentState( getCacheNodeAssignment(), Metadata.CacheNodeAssignment.CacheNodeAssignmentState.EVICT); - LOG.error("Error handling chunk assignment", e); + LOG.error("Error handling chunk assignment for snapshot: {}", snapshotMetadata, e); assignmentTimer.stop(chunkAssignmentTimerFailure); } finally { chunkAssignmentLock.unlock(); @@ -538,6 +614,7 @@ private void cleanDirectory() { if (dataDirectory != null) { try { FileUtils.cleanDirectory(dataDirectory.toFile()); + FileUtils.deleteDirectory(dataDirectory.toFile()); } catch (Exception e) { LOG.error("Error removing files {}", dataDirectory.toString(), e); } diff --git a/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java b/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java index 6f32d9f14d..97c8b11cc0 100644 --- a/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java +++ b/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java @@ -1,6 +1,7 @@ package com.slack.astra.chunk; import static com.slack.astra.chunk.ChunkInfo.toSnapshotMetadata; +import static com.slack.astra.chunk.ChunkValidationUtils.isChunkClean; import static com.slack.astra.writer.SpanFormatter.isValidTimestamp; import com.google.common.annotations.VisibleForTesting; @@ -27,11 +28,13 @@ import java.nio.file.Path; import java.time.Instant; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.commons.io.FileUtils; import org.apache.lucene.index.IndexCommit; import org.slf4j.Logger; @@ -74,6 +77,7 @@ public abstract class ReadWriteChunk implements Chunk { private final LogStore logStore; private final String kafkaPartitionId; private final Logger logger; + private Path dataDirectory; private LogIndexSearcher logSearcher; private final Counter fileUploadAttempts; private final MeterRegistry meterRegistry; @@ -249,16 +253,47 @@ public boolean snapshotToS3(BlobStore blobStore) { totalBytes += sizeOfFile; logger.debug("File name is {} ({} bytes)", fileName, sizeOfFile); } + // check if lucene index is valid and not corrupted + boolean luceneStatus = isChunkClean(dirPath); + if (!luceneStatus) { + logger.error( + "Lucene index is not clean before upload. Found issues for chunk: {}.", chunkInfo); + return false; + } + this.fileUploadAttempts.increment(filesToUpload.size()); Timer.Sample snapshotTimer = Timer.start(meterRegistry); // blobstore.upload uploads everything in the directory, including write.lock if it exists. blobStore.upload(chunkInfo.chunkId, dirPath); - snapshotTimer.stop(meterRegistry.timer(SNAPSHOT_TIMER)); + long durationNanos = snapshotTimer.stop(meterRegistry.timer(SNAPSHOT_TIMER)); chunkInfo.setSizeInBytesOnDisk(totalBytes); - List filesUploaded = blobStore.listFiles(chunkInfo.chunkId); + // check if lucene index is valid and not corrupted + luceneStatus = isChunkClean(dirPath); + if (!luceneStatus) { + logger.error( + "Lucene index is not clean after upload. Found issues for chunk: {}.", chunkInfo); + return false; + } + + dataDirectory = Path.of(String.format("%s/astra-chunk-%s", "/tmp", chunkInfo.chunkId)); + + blobStore.download(chunkInfo.chunkId, dataDirectory); + + // check if lucene index is valid and not corrupted + luceneStatus = isChunkClean(dataDirectory); + if (!luceneStatus) { + logger.error( + "Lucene index is not clean when downloaded on Index node. Found issues for chunk: {}", + chunkInfo); + return false; + } + FileUtils.deleteDirectory(dataDirectory.toFile()); + + Map filesWithSizeInS3 = blobStore.listFilesWithSize(chunkInfo.chunkId); + List filesUploaded = new ArrayList<>(filesWithSizeInS3.keySet().stream().toList()); filesUploaded.removeIf(file -> file.endsWith("write.lock")); // check here that all files are uploaded @@ -273,15 +308,45 @@ public boolean snapshotToS3(BlobStore blobStore) { filesUploaded); return false; } + + // validate the size of the uploaded files + Map mismatchedFilesMap = new HashMap<>(); + for (String fileName : filesToUpload) { + String s3Path = String.format("%s/%s", chunkInfo.chunkId, fileName); + long sizeOfFile = Files.size(Path.of(String.format("%s/%s", dirPath, fileName))); + if (!filesWithSizeInS3.containsKey(s3Path) + || !filesWithSizeInS3.get(s3Path).equals(sizeOfFile)) { + mismatchedFilesMap.put(fileName, sizeOfFile); + } + } + if (!mismatchedFilesMap.isEmpty()) { + String mismatchFilesAndSize = + mismatchedFilesMap.keySet().stream() + .map( + aLong -> + String.format( + "%s (S3Size: %s, LocalSize: %s)", + aLong, filesWithSizeInS3.get(aLong), mismatchedFilesMap.get(aLong))) + .collect(Collectors.joining(", ")); + logger.error( + "Mismatch in file sizes between S3 and local directory for chunk {}. Mismatch files: {}", + chunkInfo.chunkId, + mismatchFilesAndSize); + return false; + } + // and schema file exists in s3 if (!filesUploaded.contains(chunkInfo.chunkId + "/" + SCHEMA_FILE_NAME)) { logger.error("Schema file was not uploaded to S3: {}", SCHEMA_FILE_NAME); return false; } - logger.info("Finished RW chunk snapshot to S3 {}.", chunkInfo); + logger.info( + "Finished RW chunk snapshot to S3 {} in {} secs.", + chunkInfo, + TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS)); return true; } catch (Exception e) { - logger.error("Exception when copying RW chunk " + chunkInfo + " to S3.", e); + logger.error("Exception when copying RW chunk {} to S3.", chunkInfo, e); return false; } finally { logStore.releaseIndexCommit(indexCommit); diff --git a/astra/src/main/java/com/slack/astra/logstore/LuceneIndexStoreImpl.java b/astra/src/main/java/com/slack/astra/logstore/LuceneIndexStoreImpl.java index 2a9f43869d..73b39ef2ea 100644 --- a/astra/src/main/java/com/slack/astra/logstore/LuceneIndexStoreImpl.java +++ b/astra/src/main/java/com/slack/astra/logstore/LuceneIndexStoreImpl.java @@ -156,7 +156,7 @@ public LuceneIndexStoreImpl( try { refresh(); } catch (Exception e) { - LOG.error("Error running scheduled commit", e); + LOG.error("Error running scheduled refresh", e); } }, config.refreshDuration.toMillis(), diff --git a/astra/src/main/java/com/slack/astra/server/Astra.java b/astra/src/main/java/com/slack/astra/server/Astra.java index f5dbec58ff..ca71c949cc 100644 --- a/astra/src/main/java/com/slack/astra/server/Astra.java +++ b/astra/src/main/java/com/slack/astra/server/Astra.java @@ -557,6 +557,13 @@ void shutdown() { // stopping timed out LOG.error("ServiceManager shutdown timed out", e); } + if (s3Client != null) { + try { + s3Client.close(); + } catch (Exception e) { + LOG.error("Error while closing s3Client ", e); + } + } try { curatorFramework.unwrap().close(); } catch (Exception e) { diff --git a/astra/src/test/java/com/slack/astra/blobfs/BlobStoreTest.java b/astra/src/test/java/com/slack/astra/blobfs/BlobStoreTest.java index f8f61b6960..04389e26e9 100644 --- a/astra/src/test/java/com/slack/astra/blobfs/BlobStoreTest.java +++ b/astra/src/test/java/com/slack/astra/blobfs/BlobStoreTest.java @@ -12,6 +12,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -218,6 +219,35 @@ void testListFilesNonExistingPrefix() { assertThat(blobStore.listFiles(chunkId).size()).isEqualTo(0); } + @Test + void testListFilesWithSize() throws IOException { + BlobStore blobStore = new BlobStore(s3Client, TEST_BUCKET); + String chunkId = UUID.randomUUID().toString(); + + assertThat(blobStore.listFiles(chunkId).size()).isEqualTo(0); + + Path directoryUpload = Files.createTempDirectory(""); + Path foo = Files.createTempFile(directoryUpload, "", ""); + try (FileWriter fileWriter = new FileWriter(foo.toFile())) { + fileWriter.write("Example test 1"); + } + Path bar = Files.createTempFile(directoryUpload, "", ""); + try (FileWriter fileWriter = new FileWriter(bar.toFile())) { + fileWriter.write("Example test 2"); + } + blobStore.upload(chunkId, directoryUpload); + + Map filesWithSize = blobStore.listFilesWithSize(chunkId); + assertThat(filesWithSize.size()).isEqualTo(2); + assertThat(filesWithSize) + .containsExactlyInAnyOrderEntriesOf( + Map.of( + String.format("%s/%s", chunkId, foo.getFileName().toString()), + Files.size(foo), + String.format("%s/%s", chunkId, bar.getFileName().toString()), + Files.size(bar))); + } + @Test public void testCompressDecompressJsonData() throws Exception { // Arrange diff --git a/astra/src/test/java/com/slack/astra/chunk/ReadOnlyChunkImplTest.java b/astra/src/test/java/com/slack/astra/chunk/ReadOnlyChunkImplTest.java index 3733f53491..213a088004 100644 --- a/astra/src/test/java/com/slack/astra/chunk/ReadOnlyChunkImplTest.java +++ b/astra/src/test/java/com/slack/astra/chunk/ReadOnlyChunkImplTest.java @@ -547,11 +547,6 @@ public void closeShouldCleanupLiveChunkCorrectly() throws Exception { assertThat(logMessageSearchResultEmpty).isEqualTo(SearchResult.empty()); assertThat(readOnlyChunk.info()).isNull(); - // verify that the directory has been cleaned up - try (var files = java.nio.file.Files.list(readOnlyChunk.getDataDirectory())) { - assertThat(files.findFirst().isPresent()).isFalse(); - } - curatorFramework.unwrap().close(); } @@ -672,11 +667,6 @@ public void shouldHandleDynamicChunkSizeLifecycle() throws Exception { // simulate eviction readOnlyChunk.evictChunk(cacheNodeAssignmentStore.findSync(assignmentId)); - // verify that the directory has been cleaned up - try (var files = java.nio.file.Files.list(readOnlyChunk.getDataDirectory())) { - assertThat(files.findFirst().isPresent()).isFalse(); - } - curatorFramework.unwrap().close(); } @@ -876,11 +866,6 @@ public void shouldEvictChunkOnAssignmentFailure() throws Exception { assertThat(meterRegistry.get(CHUNK_ASSIGNMENT_TIMER).tag("successful", "false").timer().count()) .isEqualTo(1); - // verify that the directory has been cleaned up - try (var files = java.nio.file.Files.list(readOnlyChunk.getDataDirectory())) { - assertThat(files.findFirst().isPresent()).isFalse(); - } - curatorFramework.unwrap().close(); }