Skip to content

Commit 1ec3d5d

Browse files
committed
deffer hashing when necessary
1 parent 63da44b commit 1ec3d5d

File tree

7 files changed

+42
-40
lines changed

7 files changed

+42
-40
lines changed

src/main/java/com/instaclustr/esop/impl/AbstractTracker.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.common.util.concurrent.ListenableFuture;
2020
import com.google.common.util.concurrent.ListeningExecutorService;
2121

22+
import com.instaclustr.esop.impl.hash.HashService;
2223
import org.slf4j.Logger;
2324
import org.slf4j.LoggerFactory;
2425

@@ -48,7 +49,7 @@ public abstract class AbstractTracker<UNIT extends Unit, SESSION extends Session
4849

4950
protected final ListeningExecutorService finisherExecutorService;
5051
protected final OperationsService operationsService;
51-
protected final HashSpec hashSpec;
52+
protected final HashService hashService;
5253

5354
protected final List<UNIT> units = Collections.synchronizedList(new ArrayList<>());
5455
protected final Set<Session<UNIT>> sessions = Collections.synchronizedSet(new HashSet<>());
@@ -58,10 +59,10 @@ public abstract class AbstractTracker<UNIT extends Unit, SESSION extends Session
5859

5960
public AbstractTracker(final ListeningExecutorService finisherExecutorService,
6061
final OperationsService operationsService,
61-
final HashSpec hashSpec) {
62+
final HashService hashService) {
6263
this.finisherExecutorService = finisherExecutorService;
6364
this.operationsService = operationsService;
64-
this.hashSpec = hashSpec;
65+
this.hashService = hashService;
6566

6667
}
6768

@@ -90,7 +91,7 @@ public abstract UNIT constructUnitToSubmit(final INTERACTOR interactor,
9091
final ManifestEntry manifestEntry,
9192
final AtomicBoolean shouldCancel,
9293
final String snapshotTag,
93-
final HashSpec hashSpec);
94+
final HashService hashService);
9495

9596
public abstract Session<UNIT> constructSession();
9697

@@ -131,7 +132,7 @@ public synchronized Session<UNIT> submit(final INTERACTOR interactor,
131132
}
132133

133134
if (alreadySubmitted == null) {
134-
final UNIT unit = constructUnitToSubmit(interactor, entry, operation.getShouldCancel(), snapshotTag, hashSpec);
135+
final UNIT unit = constructUnitToSubmit(interactor, entry, operation.getShouldCancel(), snapshotTag, hashService);
135136

136137
units.add(unit);
137138
futures.put(executorService.submit(unit), unit);
@@ -226,7 +227,7 @@ public static abstract class Unit implements java.util.concurrent.Callable<Void>
226227
@JsonIgnore
227228
protected String snapshotTag;
228229
@JsonIgnore
229-
protected HashSpec hashSpec;
230+
protected HashService hashService;
230231
protected final ManifestEntry manifestEntry;
231232
protected volatile State state = NOT_STARTED;
232233
protected Throwable throwable = null;
@@ -235,10 +236,10 @@ public static abstract class Unit implements java.util.concurrent.Callable<Void>
235236

236237
public Unit(final ManifestEntry manifestEntry,
237238
final AtomicBoolean shouldCancel,
238-
final HashSpec hashSpec) {
239+
final HashService hashService) {
239240
this.manifestEntry = manifestEntry;
240241
this.shouldCancel = shouldCancel;
241-
this.hashSpec = hashSpec;
242+
this.hashService = hashService;
242243
}
243244

244245
public enum State {

src/main/java/com/instaclustr/esop/impl/SSTableUtils.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ public class SSTableUtils {
4444
private static final int SSTABLE_PREFIX_IDX = 1;
4545
private static final int SSTABLE_GENERATION_IDX = 2;
4646
private static final Pattern CHECKSUM_RE = Pattern.compile("^([a-zA-Z0-9]+).*");
47-
private static final HashService hashService = new HashServiceImpl(new HashSpec());
4847

4948
public static String sstableHash(Path path) throws IOException {
5049
final Matcher matcher = SSTABLE_RE.matcher(path.getFileName().toString());
@@ -105,14 +104,11 @@ public static String calculateChecksum(final Path filePath) throws IOException {
105104
public static Map<String, List<ManifestEntry>> getSSTables(String keyspace,
106105
String table,
107106
Path snapshotDirectory,
108-
Path tableBackupPath,
109-
HashSpec hashSpec) throws IOException {
107+
Path tableBackupPath) throws IOException {
110108
if (!Files.exists(snapshotDirectory)) {
111109
return Collections.emptyMap();
112110
}
113111

114-
final HashService hashService = new HashServiceImpl(hashSpec);
115-
116112
return Files.list(snapshotDirectory)
117113
.flatMap(path -> {
118114
if (isCassandra22SecIndex(path)) {
@@ -151,12 +147,11 @@ public static Map<String, List<ManifestEntry>> getSSTables(String keyspace,
151147
}
152148

153149
backupPath = backupPath.resolve(hash).resolve(manifestComponentFileName.getFileName());
154-
final String hashOfFile = hashService.hash(sstableComponent);
155150

156151
entries.add(new ManifestEntry(backupPath,
157152
sstableComponent,
158153
ManifestEntry.Type.FILE,
159-
hashOfFile,
154+
null, // don't hash on listing, make it faster
160155
new KeyspaceTable(keyspace, table),
161156
null));
162157
}

src/main/java/com/instaclustr/esop/impl/Snapshots.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ public static Table parse(final String keyspace, final String table, final List<
571571
final Path tablePath = Paths.get("data").resolve(Paths.get(keyspace, table));
572572

573573
for (final Path path : value) {
574-
tb.sstables.putAll(SSTableUtils.getSSTables(keyspace, table, path, tablePath, Snapshots.hashSpec));
574+
tb.sstables.putAll(SSTableUtils.getSSTables(keyspace, table, path, tablePath));
575575
}
576576

577577
final Optional<Path> schemaPath = value.stream().map(p -> p.resolve("schema.cql")).filter(Files::exists).findFirst();

src/main/java/com/instaclustr/esop/impl/backup/UploadTracker.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import com.google.common.util.concurrent.ListeningExecutorService;
1313
import com.google.common.util.concurrent.RateLimiter;
1414

15+
import com.instaclustr.esop.impl.hash.HashService;
16+
import com.instaclustr.esop.impl.hash.HashServiceImpl;
1517
import org.slf4j.Logger;
1618
import org.slf4j.LoggerFactory;
1719

@@ -45,17 +47,17 @@ public class UploadTracker extends AbstractTracker<UploadUnit, UploadSession, Ba
4547
@Inject
4648
public UploadTracker(final @UploadingFinisher ListeningExecutorService finisherExecutorService,
4749
final OperationsService operationsService,
48-
final HashSpec hashSpec) {
49-
super(finisherExecutorService, operationsService, hashSpec);
50+
final HashService hashService) {
51+
super(finisherExecutorService, operationsService, hashService);
5052
}
5153

5254
@Override
5355
public UploadUnit constructUnitToSubmit(final Backuper backuper,
5456
final ManifestEntry manifestEntry,
5557
final AtomicBoolean shouldCancel,
5658
final String snapshotTag,
57-
final HashSpec hashSpec) {
58-
return new UploadUnit(backuper, manifestEntry, shouldCancel, snapshotTag, hashSpec);
59+
final HashService hashService) {
60+
return new UploadUnit(backuper, manifestEntry, shouldCancel, snapshotTag, hashService);
5961
}
6062

6163
@Override
@@ -96,8 +98,8 @@ public UploadUnit(final Backuper backuper,
9698
final ManifestEntry manifestEntry,
9799
final AtomicBoolean shouldCancel,
98100
final String snapshotTag,
99-
final HashSpec hashSpec) {
100-
super(manifestEntry, shouldCancel, hashSpec);
101+
final HashService hashService) {
102+
super(manifestEntry, shouldCancel, hashService);
101103
this.backuper = backuper;
102104
this.snapshotTag = snapshotTag;
103105
}
@@ -127,7 +129,7 @@ public Void call() {
127129
};
128130

129131
if (manifestEntry.type != MANIFEST_FILE && getRetrier(backuper.request.retry).submit(condition)) {
130-
logger.info("{}skipping the upload of alredy uploaded file {}",
132+
logger.info("{}skipping the upload of already uploaded file {}",
131133
snapshotTag != null ? "Snapshot " + snapshotTag + " - " : "",
132134
ref.canonicalPath);
133135

@@ -144,6 +146,9 @@ public Void call() {
144146
snapshotTag != null ? "Snapshot " + snapshotTag + " - " : "",
145147
manifestEntry.objectKey,
146148
DataSize.bytesToHumanReadable(manifestEntry.size)));
149+
150+
manifestEntry.hash = hashService.hash(manifestEntry.localFile);
151+
147152
// never encrypt manifest
148153
if (manifestEntry.type == MANIFEST_FILE) {
149154
backuper.uploadFile(manifestEntry, rateLimitedStream, ref);

src/main/java/com/instaclustr/esop/impl/restore/DownloadTracker.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import com.google.common.util.concurrent.ListeningExecutorService;
99

10+
import com.instaclustr.esop.impl.hash.HashService;
1011
import org.slf4j.Logger;
1112
import org.slf4j.LoggerFactory;
1213

@@ -18,7 +19,6 @@
1819
import com.instaclustr.esop.impl.RemoteObjectReference;
1920
import com.instaclustr.esop.impl.hash.HashService.HashVerificationException;
2021
import com.instaclustr.esop.impl.hash.HashServiceImpl;
21-
import com.instaclustr.esop.impl.hash.HashSpec;
2222
import com.instaclustr.esop.impl.restore.DownloadTracker.DownloadSession;
2323
import com.instaclustr.esop.impl.restore.DownloadTracker.DownloadUnit;
2424
import com.instaclustr.esop.impl.restore.RestoreModules.DownloadingFinisher;
@@ -36,17 +36,17 @@ public class DownloadTracker extends AbstractTracker<DownloadUnit, DownloadSessi
3636
@Inject
3737
public DownloadTracker(final @DownloadingFinisher ListeningExecutorService finisherExecutorService,
3838
final OperationsService operationsService,
39-
final HashSpec hashSpec) {
40-
super(finisherExecutorService, operationsService, hashSpec);
39+
final HashService hashService) {
40+
super(finisherExecutorService, operationsService, hashService);
4141
}
4242

4343
@Override
4444
public DownloadUnit constructUnitToSubmit(final Restorer restorer,
4545
final ManifestEntry manifestEntry,
4646
final AtomicBoolean shouldCancel,
4747
final String snapshotTag,
48-
final HashSpec hashSpec) {
49-
return new DownloadUnit(restorer, manifestEntry, shouldCancel, snapshotTag, hashSpec);
48+
final HashService hashService) {
49+
return new DownloadUnit(restorer, manifestEntry, shouldCancel, snapshotTag, hashService);
5050
}
5151

5252
@Override
@@ -80,8 +80,8 @@ public DownloadUnit(final Restorer restorer,
8080
final ManifestEntry manifestEntry,
8181
final AtomicBoolean shouldCancel,
8282
final String snapshotTag,
83-
final HashSpec hashSpec) {
84-
super(manifestEntry, shouldCancel, hashSpec);
83+
final HashService hashService) {
84+
super(manifestEntry, shouldCancel, hashService);
8585
this.restorer = restorer;
8686
super.snapshotTag = snapshotTag;
8787
}
@@ -106,7 +106,7 @@ public Void call() {
106106
// hash upon downloading
107107
try {
108108
if (manifestEntry.type == Type.FILE) {
109-
new HashServiceImpl(hashSpec).verify(localPath, manifestEntry.hash);
109+
hashService.verify(localPath, manifestEntry.hash);
110110
}
111111
} catch (final HashVerificationException ex) {
112112
// delete it if has is wrong so on the next try, it will be missing and we will download it again
@@ -123,7 +123,7 @@ public Void call() {
123123
logger.info(String.format("Skipping download of file %s to %s, file already exists locally.",
124124
remoteObjectReference.getObjectKey(), manifestEntry.localFile));
125125
// if it exists, verify its hash to be sure it was not altered
126-
new HashServiceImpl(hashSpec).verify(localPath, manifestEntry.hash);
126+
hashService.verify(localPath, manifestEntry.hash);
127127
state = FINISHED;
128128
} else {
129129
// if it exists and manifest does not have hash field, consider it to be finished without any check

src/test/java/com/instaclustr/esop/backup/BackupRestoreTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import com.instaclustr.cassandra.CassandraVersion;
1818
import com.instaclustr.esop.impl.ManifestEntry;
1919
import com.instaclustr.esop.impl.SSTableUtils;
20-
import com.instaclustr.esop.impl.hash.HashSpec;
2120
import org.testng.annotations.AfterClass;
2221
import org.testng.annotations.BeforeClass;
2322
import org.testng.annotations.BeforeTest;
@@ -85,11 +84,11 @@ public void testSSTableLister() throws Exception {
8584
final String keyspace = "keyspace1";
8685
final String table1 = "table1";
8786
final Path table1Path = tempDirs.get(testFileConfig.cassandraVersion.toString()).resolve("data/" + keyspace + "/" + table1);
88-
Map<String, List<ManifestEntry>> sstables = SSTableUtils.getSSTables(keyspace, table1, table1Path, backupRoot.resolve(table1Path.getFileName()), new HashSpec());
87+
Map<String, List<ManifestEntry>> sstables = SSTableUtils.getSSTables(keyspace, table1, table1Path, backupRoot.resolve(table1Path.getFileName()));
8988

9089
final String table2 = "table2";
9190
final Path table2Path = tempDirs.get(testFileConfig.cassandraVersion.toString()).resolve("data/" + keyspace + "/" + table2);
92-
sstables.putAll(SSTableUtils.getSSTables(keyspace, table2, table2Path, backupRoot.resolve(table2Path.getFileName()), new HashSpec()));
91+
sstables.putAll(SSTableUtils.getSSTables(keyspace, table2, table2Path, backupRoot.resolve(table2Path.getFileName())));
9392

9493
Map<Path, Path> manifestMap = new HashMap<>();
9594
for (ManifestEntry e : sstables.values().stream().flatMap(Collection::stream).collect(Collectors.toList())) {

src/test/java/com/instaclustr/esop/backup/embedded/UploadTrackerTest.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import com.instaclustr.esop.impl.backup.UploadTracker;
3838
import com.instaclustr.esop.impl.backup.coordination.ClearSnapshotOperation;
3939
import com.instaclustr.esop.impl.backup.coordination.TakeSnapshotOperation;
40+
import com.instaclustr.esop.impl.hash.HashService;
41+
import com.instaclustr.esop.impl.hash.HashServiceImpl;
4042
import com.instaclustr.esop.impl.hash.HashSpec;
4143
import com.instaclustr.esop.local.LocalFileBackuper;
4244
import com.instaclustr.esop.local.LocalFileModule;
@@ -122,15 +124,15 @@ public void testUploadTracker() throws Exception {
122124

123125
final ListeningExecutorService finisher = new Executors.FixedTasksExecutorSupplier().get(10);
124126

125-
uploadTracker = new UploadTracker(finisher, operationsService, new HashSpec()) {
127+
uploadTracker = new UploadTracker(finisher, operationsService, new HashServiceImpl(new HashSpec())) {
126128
// override for testing purposes
127129
@Override
128130
public UploadUnit constructUnitToSubmit(final Backuper backuper,
129131
final ManifestEntry manifestEntry,
130132
final AtomicBoolean shouldCancel,
131133
final String snapshotTag,
132-
final HashSpec hashSpec) {
133-
return new TestingUploadUnit(wait, backuper, manifestEntry, shouldCancel, snapshotTag, hashSpec);
134+
final HashService hashService) {
135+
return new TestingUploadUnit(wait, backuper, manifestEntry, shouldCancel, snapshotTag, hashService);
134136
}
135137
};
136138

@@ -277,8 +279,8 @@ public TestingUploadUnit(final AtomicBoolean wait,
277279
final ManifestEntry manifestEntry,
278280
final AtomicBoolean shouldCancel,
279281
final String snapshotTag,
280-
final HashSpec hashSpec) {
281-
super(backuper, manifestEntry, shouldCancel, snapshotTag, hashSpec);
282+
final HashService hashService) {
283+
super(backuper, manifestEntry, shouldCancel, snapshotTag, hashService);
282284
this.wait = wait;
283285
}
284286

0 commit comments

Comments
 (0)