Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
97a5a2c
Adding log statement + removing eviction on exception for debugging p…
zarna1parekh Jul 30, 2025
7443840
Formatting
zarna1parekh Jul 30, 2025
30ef18c
adding cache eviction back
zarna1parekh Jul 30, 2025
11de483
printing only relevant fields
zarna1parekh Jul 30, 2025
4294bc1
printing specific fields
zarna1parekh Jul 30, 2025
ac66b41
Checking Lucene Index status before upload
zarna1parekh Aug 12, 2025
f909840
post download check on cache nodes
zarna1parekh Aug 6, 2025
5c4b07a
Addressing directory locking issue
zarna1parekh Aug 12, 2025
c975fe3
Fixing test cases
zarna1parekh Aug 13, 2025
99ae402
Updating logging statement
zarna1parekh Aug 13, 2025
b09f9e0
lucene check after download on cache nodes
zarna1parekh Aug 13, 2025
4809591
log clean up
zarna1parekh Aug 13, 2025
808160a
Code Refactor
zarna1parekh Aug 13, 2025
fd4744c
S3 Exception logging
zarna1parekh Aug 16, 2025
31a054c
exposing the s3 exception on failure
zarna1parekh Aug 16, 2025
dbfdfe0
Check if download is failing from S3
zarna1parekh Aug 17, 2025
2c2ccec
adding more log statements
zarna1parekh Aug 17, 2025
5d41271
log lines
zarna1parekh Aug 17, 2025
6dc0c76
log line
zarna1parekh Aug 17, 2025
8acd25f
Increasing retries for S3
zarna1parekh Aug 17, 2025
e5abb7b
Checksum validation
zarna1parekh Aug 17, 2025
98dceab
Checksum only when required
zarna1parekh Aug 17, 2025
48c83b2
logging cache node id
zarna1parekh Aug 18, 2025
e171c05
logs
zarna1parekh Aug 18, 2025
f6cb74b
crc32 checksum validation
zarna1parekh Aug 19, 2025
52f394e
changing log level to info
zarna1parekh Aug 19, 2025
d2d5093
Enriching log statement
zarna1parekh Sep 10, 2025
e3e3b56
fmt
zarna1parekh Sep 10, 2025
cfadbdc
Directory cleanup
zarna1parekh Sep 11, 2025
17b0dfb
Do not delete the error dir
zarna1parekh Sep 12, 2025
4540e97
fmt
zarna1parekh Sep 12, 2025
6b72251
reverting unwanted changes
zarna1parekh Sep 12, 2025
961c8d5
Updating async client to have more Native Memory
zarna1parekh Sep 13, 2025
dd68d77
fmt
zarna1parekh Sep 13, 2025
f041267
Incrasing Native memory to multipart upload
zarna1parekh Sep 14, 2025
bcd0970
fmt
zarna1parekh Sep 14, 2025
175de27
Conservative on Native memory
zarna1parekh Sep 15, 2025
b17a16c
Upload sequentially instead of dir upload
zarna1parekh Sep 16, 2025
2787a34
build failure
zarna1parekh Sep 16, 2025
11691ed
handling wildcard imports
zarna1parekh Sep 17, 2025
67d541d
fixing build issue
zarna1parekh Sep 17, 2025
7575911
Cleanup Chunk Manager
zarna1parekh Sep 17, 2025
b75d892
cleanup debug code
zarna1parekh Sep 19, 2025
5b879f8
Fixing test cases
zarna1parekh Sep 22, 2025
2db12ce
Verobose log line
zarna1parekh Sep 22, 2025
15e76f4
upload time taken
zarna1parekh Sep 26, 2025
d497e6f
Closing searcher and writer before upload begins
zarna1parekh Oct 2, 2025
414bd33
Close scheduled jobs
zarna1parekh Oct 3, 2025
6c14cbe
rolback searcher changes
zarna1parekh Oct 5, 2025
ca6fb0e
index writer close + lucene check after upload + local download
zarna1parekh Oct 11, 2025
e45a2de
closing searcher before snapshot uplaod
zarna1parekh Oct 14, 2025
4e658fd
Updating AWS SDK version
zarna1parekh Oct 14, 2025
3f0b897
Updating aws crt version
zarna1parekh Oct 14, 2025
4920c58
Reverting javac to 21
zarna1parekh Oct 14, 2025
f5270f4
Fixing test cases
zarna1parekh Oct 15, 2025
76809e2
Java version 21
zarna1parekh Oct 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions astra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<opensearch.version>2.11.1</opensearch.version>
<curator.version>5.8.0</curator.version>
<log4j.version>2.24.3</log4j.version>
<aws.sdk.version>2.31.63</aws.sdk.version>
<aws.sdk.version>2.35.6</aws.sdk.version>
<error.prone.version>2.38.0</error.prone.version>
<junit.jupiter.version>5.13.1</junit.jupiter.version>
</properties>
Expand Down Expand Up @@ -357,7 +357,7 @@
<dependency>
<groupId>software.amazon.awssdk.crt</groupId>
<artifactId>aws-crt</artifactId>
<version>0.38.5</version>
<version>0.39.0</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
Expand Down
106 changes: 101 additions & 5 deletions astra/src/main/java/com/slack/astra/blobfs/BlobStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,18 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.slf4j.Logger;
Expand All @@ -21,6 +27,7 @@
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer;
import software.amazon.awssdk.crt.s3.ChecksumAlgorithm;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.Delete;
Expand All @@ -41,6 +48,7 @@
import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest;
import software.amazon.awssdk.transfer.s3.model.DownloadRequest;
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;

/**
* Blob store abstraction for basic operations on chunk/snapshots on remote storage. All operations
Expand Down Expand Up @@ -68,6 +76,69 @@ public BlobStore(S3AsyncClient s3AsyncClient, String bucketName) {
* @throws IllegalStateException Thrown if any files fail to upload
* @throws RuntimeException Thrown when error is considered generally non-retryable
*/
public void uploadSequentially(String prefix, Path directoryToUpload) {
assert prefix != null && !prefix.isEmpty();
assert directoryToUpload.toFile().isDirectory();
assert Objects.requireNonNull(directoryToUpload.toFile().listFiles()).length > 0;

try {
// Get all files and sort by size (largest first for better progress visibility)
List<Path> allFiles;
try (Stream<Path> pathStream = Files.walk(directoryToUpload)) {
allFiles =
pathStream
.filter(Files::isRegularFile)
.sorted((a, b) -> Long.compare(b.toFile().length(), a.toFile().length()))
.collect(Collectors.toList());
}

List<String> failedUploads = new ArrayList<>();
int completedCount = 0;

for (Path file : allFiles) {
String relativePath = directoryToUpload.relativize(file).toString();
String s3Key = prefix + "/" + relativePath;

try {
// Upload single file with checksum validation
transferManager
.uploadFile(
UploadFileRequest.builder()
.putObjectRequest(
req ->
req.bucket(bucketName)
.key(s3Key)
.checksumAlgorithm(
String.valueOf(
ChecksumAlgorithm.SHA256)) // Critical for integrity
)
.source(file)
.build())
.completionFuture()
.get(); // Wait for completion

completedCount++;
} catch (Exception e) {
LOG.error("Failed to upload file: {} for prefix: {}", relativePath, prefix, e);
failedUploads.add(relativePath);
}
}

// Handle failures same as original code
if (!failedUploads.isEmpty()) {
throw new IllegalStateException(
String.format(
"Some files failed to upload for prefix %s - attempted to upload %s files, failed %s.",
prefix, allFiles.size(), failedUploads.size()));
}
LOG.info(
"Successfully uploaded all {} files sequentially for prefix {}", completedCount, prefix);

} catch (IOException e) {
throw new RuntimeException("Failed to walk directory", e);
}
}

public void upload(String prefix, Path directoryToUpload) {
assert prefix != null && !prefix.isEmpty();
assert directoryToUpload.toFile().isDirectory();
Expand Down Expand Up @@ -132,14 +203,17 @@ public void download(String prefix, Path destinationDirectory) {
download
.failedTransfers()
.forEach(
failedFileUpload ->
LOG.error(
"Error attempting to download file from S3", failedFileUpload.exception()));
failedFileDownload -> {
LOG.error(
"Error attempting to download file from S3 for prefix {}",
prefix,
failedFileDownload.exception());
});

throw new IllegalStateException(
String.format(
"Some files failed to download - failed to download %s files.",
download.failedTransfers().size()));
"Some files failed to download for prefix %s - failed to download %s files.",
prefix, download.failedTransfers().size()));
}
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -195,6 +269,28 @@ public List<String> listFiles(String prefix) {
return filesList;
}

public Map<String, Long> listFilesWithSize(String prefix) {
assert prefix != null && !prefix.isEmpty();

ListObjectsV2Request listRequest = builder().bucket(bucketName).prefix(prefix).build();
ListObjectsV2Publisher asyncPaginatedListResponse =
s3AsyncClient.listObjectsV2Paginator(listRequest);

Map<String, Long> filesListWithSize = new HashMap<>();
try {
asyncPaginatedListResponse
.subscribe(
listResponse ->
listResponse
.contents()
.forEach(s3Object -> filesListWithSize.put(s3Object.key(), s3Object.size())))
.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return filesListWithSize;
Comment on lines +273 to +291

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could extract a helper method that would be used by both listFiles... methods that takes a prefix and a Consumer. Then this could look like the following.

Also, the block passed to subscribe could be called in multiple threads, so this should use a storage class that is safe wrt concurrent modifications.

Suggested change
assert prefix != null && !prefix.isEmpty();
ListObjectsV2Request listRequest = builder().bucket(bucketName).prefix(prefix).build();
ListObjectsV2Publisher asyncPaginatedListResponse =
s3AsyncClient.listObjectsV2Paginator(listRequest);
Map<String, Long> filesListWithSize = new HashMap<>();
try {
asyncPaginatedListResponse
.subscribe(
listResponse ->
listResponse
.contents()
.forEach(s3Object -> filesListWithSize.put(s3Object.key(), s3Object.size())))
.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return filesListWithSize;
Map<String, Long> filesWithSize = new ConcurrentHashMap<>();
listFilesAndDo(prefix, s3Object -> filesListWithSize.put(s3Object.key(), s3Object.size()));
return filesWithSize;

}

/**
* Deletes a chunk off of object storage by chunk id. If object was not found returns false.
*
Expand Down
6 changes: 5 additions & 1 deletion astra/src/main/java/com/slack/astra/blobfs/S3AsyncUtil.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.slack.astra.blobfs;

import static software.amazon.awssdk.core.checksums.ResponseChecksumValidation.WHEN_SUPPORTED;

import com.google.common.base.Preconditions;
import com.slack.astra.proto.config.AstraConfigs;
import java.net.URI;
Expand Down Expand Up @@ -58,7 +60,9 @@ public static S3AsyncClient initS3Client(AstraConfigs.S3Config config) {
.targetThroughputInGbps(config.getS3TargetThroughputGbps())
.region(Region.of(region))
.maxNativeMemoryLimitInBytes(maxNativeMemoryLimitBytes)
.credentialsProvider(awsCredentialsProvider);
.credentialsProvider(awsCredentialsProvider)
.responseChecksumValidation(WHEN_SUPPORTED)
.minimumPartSizeInBytes(64 * 1024 * 1024L); // 64MiB minimum part size

// We add a healthcheck to prevent an error with the CRT client, where it will
// continue to attempt to read data from a socket that is no longer returning data
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.slack.astra.chunk;

import java.nio.file.Path;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.NoLockFactory;

public class ChunkValidationUtils {

public static boolean isChunkClean(Path path) throws Exception {
FSDirectory existingDir = FSDirectory.open(path, NoLockFactory.INSTANCE);
try (CheckIndex checker = new CheckIndex(existingDir)) {
CheckIndex.Status status = checker.checkIndex();
return status.clean;
}
}
}
79 changes: 78 additions & 1 deletion astra/src/main/java/com/slack/astra/chunk/ReadOnlyChunkImpl.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.slack.astra.chunk;

import static com.slack.astra.chunk.ChunkValidationUtils.isChunkClean;
import static com.slack.astra.chunkManager.CachingChunkManager.ASTRA_NG_DYNAMIC_CHUNK_SIZES_FLAG;
import static com.slack.astra.server.AstraConfig.DEFAULT_ZK_TIMEOUT_SECS;

Expand Down Expand Up @@ -28,9 +29,11 @@
import com.slack.astra.proto.metadata.Metadata;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.EnumSet;
import java.util.List;
Expand Down Expand Up @@ -228,6 +231,62 @@ public CacheNodeAssignment getCacheNodeAssignment() {
return assignment;
}

private boolean validateS3vsLocalDownLoad() {
// check if the number of files in S3 matches the local directory
Map<String, Long> filesWithSizeInS3 = blobStore.listFilesWithSize(snapshotMetadata.snapshotId);

Map<String, Long> localFilesSizeMap;
List<String> mismatchFiles = new java.util.ArrayList<>();
try (Stream<Path> fileList = Files.list(dataDirectory)) {
localFilesSizeMap =
fileList
.filter(Files::isRegularFile)
.collect(
Collectors.toMap(
path ->
dataDirectory.relativize(path).toString().replace(File.separator, "/"),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I think the replace isn't necessary since Files.list() only returns files in the current directory. Although, maybe you should use Path#getFileName().toString() here, which would align with calling Paths.get(s3Path).getFileName().toString() on the s3 entries below.

path -> path.toFile().length()));
} catch (IOException e) {
throw new RuntimeException(
String.format("Error reading local files in directory %s", dataDirectory), e);
}
if (localFilesSizeMap.size() != filesWithSizeInS3.size()) {
LOG.error(
"Mismatch in number of files in S3 ({}) and local directory ({}) for snapshot {}",
filesWithSizeInS3.size(),
localFilesSizeMap.size(),
snapshotMetadata.toString());
return false;
}

for (Map.Entry<String, Long> entry : filesWithSizeInS3.entrySet()) {
String s3Path = entry.getKey();
long s3Size = entry.getValue();
String fileName = Paths.get(s3Path).getFileName().toString();

if (!localFilesSizeMap.containsKey(fileName)
|| !localFilesSizeMap.get(fileName).equals(s3Size)) {
mismatchFiles.add(fileName);
}
}
if (!mismatchFiles.isEmpty()) {
String mismatchFilesAndSize =
mismatchFiles.stream()
.map(
e ->
String.format(
"%s (S3Size: %s, LocalSize: %s)",
e, filesWithSizeInS3.get(e), localFilesSizeMap.get(e)))
.collect(Collectors.joining(", "));
LOG.error(
"Mismatch in file sizes between S3 and local directory for snapshot {}. Mismatch files: {}",
snapshotMetadata.toString(),
mismatchFilesAndSize);
return false;
}
return true;
}

public void downloadChunkData() {
Timer.Sample assignmentTimer = Timer.start(meterRegistry);
// lock
Expand Down Expand Up @@ -265,7 +324,24 @@ public void downloadChunkData() {
"No files found on blob storage, released slot for re-assignment");
}
}
// validate if the number of files in S3 matches the local directory
if (!validateS3vsLocalDownLoad()) {
String errorString =
String.format(
"Mismatch in number or size of files in S3 and local directory for snapshot %s",
snapshotMetadata);
throw new RuntimeException(errorString);
}

// check if lucene index is valid and not corrupted
boolean luceneStatus = isChunkClean(dataDirectory);
if (!luceneStatus) {
throw new IOException(
String.format(
"Lucene index is not clean. Found issues for snapshot: %s.", snapshotMetadata));
}

// check if schema file exists
Path schemaPath = Path.of(dataDirectory.toString(), ReadWriteChunk.SCHEMA_FILE_NAME);
if (!Files.exists(schemaPath)) {
throw new RuntimeException("We expect a schema.json file to exist within the index");
Expand Down Expand Up @@ -305,7 +381,7 @@ public void downloadChunkData() {
// disregarding any errors
setAssignmentState(
getCacheNodeAssignment(), Metadata.CacheNodeAssignment.CacheNodeAssignmentState.EVICT);
LOG.error("Error handling chunk assignment", e);
LOG.error("Error handling chunk assignment for snapshot: {}", snapshotMetadata, e);
assignmentTimer.stop(chunkAssignmentTimerFailure);
} finally {
chunkAssignmentLock.unlock();
Expand Down Expand Up @@ -538,6 +614,7 @@ private void cleanDirectory() {
if (dataDirectory != null) {
try {
FileUtils.cleanDirectory(dataDirectory.toFile());
FileUtils.deleteDirectory(dataDirectory.toFile());
} catch (Exception e) {
LOG.error("Error removing files {}", dataDirectory.toString(), e);
}
Expand Down
Loading