From c2b8f6607b53ba6dd8feb9e43b90ccf3ba256f20 Mon Sep 17 00:00:00 2001 From: Blake Eggleston Date: Wed, 22 Oct 2025 16:55:54 -0700 Subject: [PATCH 1/4] Replace blocking wait with non-blocking delay in paxos repair --- .../cleanup/PaxosCleanupLocalCoordinator.java | 71 ++++++++++++++++--- 1 file changed, 62 insertions(+), 9 deletions(-) diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java index 7e5935f03d4a..fe9677faa792 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java @@ -20,14 +20,17 @@ import java.util.Collection; import java.util.Map; +import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; @@ -58,6 +61,18 @@ public class PaxosCleanupLocalCoordinator extends AsyncFuture inflight = new ConcurrentHashMap<>(); + private final Queue delayed = new LinkedBlockingQueue<>(); private final PaxosTableRepairs tableRepairs; private PaxosCleanupLocalCoordinator(SharedContext ctx, UUID session, TableId tableId, Collection> ranges, CloseableIterator uncommittedIter, boolean autoRepair) @@ -125,6 +141,31 @@ public static PaxosCleanupLocalCoordinator createForAutoRepair(SharedContext ctx return new PaxosCleanupLocalCoordinator(ctx, INTERNAL_SESSION, tableId, ranges, iterator, true); } + private boolean maybeDelay(UncommittedPaxosKey uncommitted) + { + if (!DatabaseDescriptor.getPaxosRepairRaceWait()) + return false; + + + long txnTimeoutMillis = Math.max(getCasContentionTimeout(MILLISECONDS), getWriteRpcTimeout(MILLISECONDS)); + long nowMillis = Clock.Global.currentTimeMillis(); + long ballotElapsedMillis = nowMillis - MICROSECONDS.toMillis(uncommitted.ballot().unixMicros()); + + if (ballotElapsedMillis < 0 && Math.abs(ballotElapsedMillis) > SECONDS.toMillis(1)) + logger.warn("Encountered ballot that is more than 1 second in the future, is there a clock sync issue? {}", uncommitted.ballot()); + + if (ballotElapsedMillis >= txnTimeoutMillis) + return false; + + long sleepMillis = txnTimeoutMillis - ballotElapsedMillis; + logger.info("Paxos auto repair encountered a potentially in progress ballot, sleeping {}ms to allow the in flight operation to finish", sleepMillis); + + delayed.add(new DelayedRepair(uncommitted, nowMillis + sleepMillis)); + ScheduledExecutors.scheduledFastTasks.schedule(this::scheduleKeyRepairsOrFinish, sleepMillis, MILLISECONDS); + + return true; + } + /** * Schedule as many key repairs as we can, up to the paralellism limit. If no repairs are scheduled and * none are in flight when the iterator is exhausted, the session will be finished @@ -141,18 +182,33 @@ private void scheduleKeyRepairsOrFinish() return; } - long txnTimeoutMicros = Math.max(getCasContentionTimeout(MICROSECONDS), getWriteRpcTimeout(MICROSECONDS)); - boolean waitForCoordinator = DatabaseDescriptor.getPaxosRepairRaceWait(); - while (inflight.size() < parallelism && uncommittedIter.hasNext()) - repairKey(uncommittedIter.next(), txnTimeoutMicros, waitForCoordinator); - + while (inflight.size() < parallelism && !isDone()) + { + if (!delayed.isEmpty() && delayed.peek().startAfterMillis < Clock.Global.currentTimeMillis()) + { + DelayedRepair delayedRepair = delayed.remove(); + repairKey(delayedRepair.uncommitted); + } + else if (uncommittedIter.hasNext()) + { + UncommittedPaxosKey uncommitted = uncommittedIter.next(); + if (!maybeDelay(uncommitted)) + { + repairKey(uncommitted); + } + } + else + { + break; + } + } } if (inflight.isEmpty()) finish(); } - private boolean repairKey(UncommittedPaxosKey uncommitted, long txnTimeoutMicros, boolean waitForCoordinator) + private boolean repairKey(UncommittedPaxosKey uncommitted) { logger.trace("repairing {}", uncommitted); Preconditions.checkState(!inflight.containsKey(uncommitted.getKey())); @@ -163,9 +219,6 @@ private boolean repairKey(UncommittedPaxosKey uncommitted, long txnTimeoutMicros if (consistency == null) return false; - if (waitForCoordinator) - maybeWaitForOriginalCoordinator(uncommitted, txnTimeoutMicros); - inflight.put(uncommitted.getKey(), tableRepairs.startOrGetOrQueue(uncommitted.getKey(), uncommitted.ballot(), uncommitted.getConsistencyLevel(), table, result -> { if (result.wasSuccessful()) onKeyFinish(uncommitted.getKey()); From 63ebcd7fd2b1d849cea3fb055270bc8daa1df6ac Mon Sep 17 00:00:00 2001 From: Blake Eggleston Date: Thu, 23 Oct 2025 08:54:29 -0700 Subject: [PATCH 2/4] review feedback --- .../cleanup/PaxosCleanupLocalCoordinator.java | 40 +++++++------------ 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java index fe9677faa792..63b3712b2df5 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java @@ -26,7 +26,6 @@ import java.util.concurrent.LinkedBlockingQueue; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,12 +63,17 @@ public class PaxosCleanupLocalCoordinator extends AsyncFuture 0; } } @@ -141,6 +145,10 @@ public static PaxosCleanupLocalCoordinator createForAutoRepair(SharedContext ctx return new PaxosCleanupLocalCoordinator(ctx, INTERNAL_SESSION, tableId, ranges, iterator, true); } + /** + * Wait to repair things that are still potentially executing at the original coordinator to avoid + * causing timeouts. This should only have to happen at most a few times when the repair starts + */ private boolean maybeDelay(UncommittedPaxosKey uncommitted) { if (!DatabaseDescriptor.getPaxosRepairRaceWait()) @@ -160,7 +168,7 @@ private boolean maybeDelay(UncommittedPaxosKey uncommitted) long sleepMillis = txnTimeoutMillis - ballotElapsedMillis; logger.info("Paxos auto repair encountered a potentially in progress ballot, sleeping {}ms to allow the in flight operation to finish", sleepMillis); - delayed.add(new DelayedRepair(uncommitted, nowMillis + sleepMillis)); + delayed.add(new DelayedRepair(uncommitted, sleepMillis)); ScheduledExecutors.scheduledFastTasks.schedule(this::scheduleKeyRepairsOrFinish, sleepMillis, MILLISECONDS); return true; @@ -170,7 +178,7 @@ private boolean maybeDelay(UncommittedPaxosKey uncommitted) * Schedule as many key repairs as we can, up to the paralellism limit. If no repairs are scheduled and * none are in flight when the iterator is exhausted, the session will be finished */ - private void scheduleKeyRepairsOrFinish() + private synchronized void scheduleKeyRepairsOrFinish() { int parallelism = DatabaseDescriptor.getPaxosRepairParallelism(); Preconditions.checkArgument(parallelism > 0); @@ -184,7 +192,7 @@ private void scheduleKeyRepairsOrFinish() while (inflight.size() < parallelism && !isDone()) { - if (!delayed.isEmpty() && delayed.peek().startAfterMillis < Clock.Global.currentTimeMillis()) + if (!delayed.isEmpty() && delayed.peek().isRunnable()) { DelayedRepair delayedRepair = delayed.remove(); repairKey(delayedRepair.uncommitted); @@ -228,24 +236,6 @@ private boolean repairKey(UncommittedPaxosKey uncommitted) return true; } - /** - * Wait to repair things that are still potentially executing at the original coordinator to avoid - * causing timeouts. This should only have to happen at most a few times when the repair starts - */ - private static void maybeWaitForOriginalCoordinator(UncommittedPaxosKey uncommitted, long txnTimeoutMicros) - { - long nowMicros = MILLISECONDS.toMicros(Clock.Global.currentTimeMillis()); - long ballotElapsedMicros = nowMicros - uncommitted.ballot().unixMicros(); - if (ballotElapsedMicros < 0 && Math.abs(ballotElapsedMicros) > SECONDS.toMicros(1)) - logger.warn("Encountered ballot that is more than 1 second in the future, is there a clock sync issue? {}", uncommitted.ballot()); - if (ballotElapsedMicros < txnTimeoutMicros) - { - long sleepMicros = txnTimeoutMicros - ballotElapsedMicros; - logger.info("Paxos auto repair encountered a potentially in progress ballot, sleeping {}us to allow the in flight operation to finish", sleepMicros); - Uninterruptibles.sleepUninterruptibly(sleepMicros, MICROSECONDS); - } - } - private synchronized void onKeyFinish(DecoratedKey key) { if (!inflight.containsKey(key)) From bfd6d1e55408c6e70ff38250f4c78bdc165960de Mon Sep 17 00:00:00 2001 From: Blake Eggleston Date: Thu, 23 Oct 2025 09:12:27 -0700 Subject: [PATCH 3/4] fix completion check --- .../service/paxos/cleanup/PaxosCleanupLocalCoordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java index 63b3712b2df5..8c4ff3ec2067 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java @@ -212,7 +212,7 @@ else if (uncommittedIter.hasNext()) } } - if (inflight.isEmpty()) + if (inflight.isEmpty() && delayed.isEmpty()) finish(); } From a47841b4ef977f0e7ac23ba60359fbf91e670980 Mon Sep 17 00:00:00 2001 From: Blake Eggleston Date: Thu, 23 Oct 2025 09:12:33 -0700 Subject: [PATCH 4/4] use priority queue --- .../cleanup/PaxosCleanupLocalCoordinator.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java index 8c4ff3ec2067..62124a5a20c9 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupLocalCoordinator.java @@ -19,11 +19,12 @@ package org.apache.cassandra.service.paxos.cleanup; import java.util.Collection; +import java.util.Comparator; import java.util.Map; +import java.util.PriorityQueue; import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; import com.google.common.base.Preconditions; import org.slf4j.Logger; @@ -65,6 +66,20 @@ private static class DelayedRepair private final UncommittedPaxosKey uncommitted; private final long scheduledAtNanos; + private static Comparator PRIORITY_COMPARATOR = new Comparator() + { + @Override + public int compare(DelayedRepair o1, DelayedRepair o2) + { + long delta = o1.scheduledAtNanos - o2.scheduledAtNanos; + if (delta > 0) + return 1; + if (delta < 0) + return -1; + return 0; + } + }; + public DelayedRepair(UncommittedPaxosKey uncommitted, long sleepMillis) { this.uncommitted = uncommitted; @@ -88,7 +103,7 @@ public boolean isRunnable() private final boolean autoRepair; private final Map inflight = new ConcurrentHashMap<>(); - private final Queue delayed = new LinkedBlockingQueue<>(); + private final Queue delayed = new PriorityQueue<>(DelayedRepair.PRIORITY_COMPARATOR); private final PaxosTableRepairs tableRepairs; private PaxosCleanupLocalCoordinator(SharedContext ctx, UUID session, TableId tableId, Collection> ranges, CloseableIterator uncommittedIter, boolean autoRepair)