-
Notifications
You must be signed in to change notification settings - Fork 27
Xxhash integration #96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
core/src/main/java/com/instaclustr/esop/impl/hash/HashSpec.java
Outdated
Show resolved
Hide resolved
b732d3a to
517f6fa
Compare
079363e to
06a6fa2
Compare
| public AbstractOperationRequest() { | ||
| // for picocli | ||
| if (concurrentConnections == null) | ||
| concurrentConnections = getDefaultConcurrentConnections(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should validate this, what if I put 0 or negative number? Or number bigger than number of cpus I have? We should check some range the value is allowed to be in. There is "validate" method you might maybe move this validation to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, totally missed this by assumption that users should know what they're doing. But esop could be called by some script or any other automation tool, so it worth to validate if value is valid
.../main/java/com/instaclustr/esop/impl/backup/coordination/BaseBackupOperationCoordinator.java
Outdated
Show resolved
Hide resolved
| modules.add(new UploadingModule()); | ||
| modules.add(new DownloadingModule()); | ||
| modules.add(new HashModule(hashSpec)); | ||
| modules.add(new HashModule(hashSpec, request.concurrentConnections)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't you put concurrent connections into the constructor of HashSpec? Just pass it to hashSpec so you do not need to accommodate the code by passing request everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it simplifies things, we use concurrentConnections (--cc) in different places and potentially there could no be HashSpec. Needs deeper investigation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no luck with this?
| public HashModule(final HashSpec hashSpec) { | ||
| public HashModule(final HashSpec hashSpec, final int parallelHashingThreads) { | ||
| this.hashSpec = hashSpec; | ||
| this.parallelHashingThreads = parallelHashingThreads; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you would get this from hashSpec so you do not need to change this constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no luck with simplifying this?
| @JsonProperty("retry") | ||
| public RetrySpec retry = new RetrySpec(); | ||
|
|
||
| @Option(names = {"--cc", "--concurrent-connections"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should add "--parallelism" name here, do not remove already existing because if people use it already we would break it for them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I also was thinking about that but got drowned by other things 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added --parallelism option
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@worryg0d can you also rename the variable itself to parallelism?
| logger.error(ex.getMessage()); | ||
| corruptedFiles.add(entry.localFile.toString()); | ||
| } | ||
| entriesToVerify.add(entry); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while this copies the logic which was there before, I am wondering why we continue to verify other files in case we already encountered the verification failure.
If I have 20 10GiB files to verify and I fail on the first one, then I am still verifying the rest (9) completely unnecessarily. It is not like I would continue with the restoration after I verify the rest anyway.
So we might probably just fail and cancel the rest of the tasks prematurely? We are checking if corruptedFiles are empty or not further in the execution flow and throw based on that while we do not even log the corrupted files themselves. (we do for importing phase but no for hardlinking phase, it is not aligned for now).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha, that's a valid case. Agree we don't need to continue verification as we will fail anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By futher investigation, to interrupt hash computations prematurely we need to check if the current thread is interrupted each time we read data from the disk, and if so throw InterruptedException
| return forkJoinPool.submit(() -> manifestEntries.parallelStream().forEach(entry -> entry.hash = hash(entry))); | ||
| } | ||
|
|
||
| public ForkJoinTask<?> verifyAll(final List<ManifestEntry> manifestEntries, OnFailure onFailure) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we fail fast here? Just cancel the verification of the rest as soon as we see some task failed. I think there is some API in CompletableFuture for achieving that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answered above
1. value range validation for concurrentConnections option 2. introduced --parallelism alternative for concurrentConnections
…se of restoration
core/src/main/java/com/instaclustr/esop/impl/hash/HashSpec.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/instaclustr/esop/impl/AbstractOperationRequest.java
Outdated
Show resolved
Hide resolved
Use java nio channels api instead of streams for seamless hash operations interruption during manifest entries verification
No description provided.