From b5b0a23e1651dc06076143c2ca1993314991ab45 Mon Sep 17 00:00:00 2001 From: Jian Date: Mon, 26 Jan 2026 20:52:37 +0800 Subject: [PATCH 01/12] KAFKA-19952: Don't upload to remote if the segment had already expired according to remote retention configure Signed-off-by: Jian --- .../log/remote/storage/RemoteLogManager.java | 63 +++++++++++++++---- 1 file changed, 50 insertions(+), 13 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 443f955a5fb63..d72e7dca2ddc6 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -76,6 +76,7 @@ import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool; import org.apache.kafka.storage.internals.log.TransactionIndex; import org.apache.kafka.storage.internals.log.TxnIndexSearchResult; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; import org.apache.kafka.storage.internals.log.UnifiedLog; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; @@ -907,28 +908,64 @@ private void maybeUpdateCopiedOffset(UnifiedLog log) throws RemoteStorageExcepti } } + /** + * Check if segment is already expired based on global retention time. + */ + private boolean isSegmentExpiredByTime(LogSegment segment, UnifiedLog log) throws IOException { + long retentionMs = log.config().retentionMs; + if (retentionMs <= 0) { + return false; + } + return time.milliseconds() - segment.largestTimestamp() >= retentionMs; + } + + /** + * Check if segment should be skipped due to global retention size. + */ + private boolean isSegmentExpiredBySize(LogSegment segment, UnifiedLog log, long accumulatedSkippedSize) { + long retentionBytes = log.config().retentionSize; + if (retentionBytes <= 0) { + return false; + } + long excessSize = log.size() - retentionBytes; + return excessSize - accumulatedSkippedSize >= segment.size(); + } + /** * Segments which match the following criteria are eligible for copying to remote storage: * 1) Segment is not the active segment and * 2) Segment end-offset is less than the last-stable-offset as remote storage should contain only * committed/acked messages - * @param log The log from which the segments are to be copied - * @param fromOffset The offset from which the segments are to be copied - * @param lastStableOffset The last stable offset of the log - * @return candidate log segments to be copied to remote storage + * + * Additionally, if a segment is already expired globally (based on retention.ms or retention.bytes), + * it will be skipped from upload and logStartOffset will be updated to allow local deletion. */ - List candidateLogSegments(UnifiedLog log, Long fromOffset, Long lastStableOffset) { + List candidateLogSegments(UnifiedLog log, Long fromOffset, Long lastStableOffset) throws IOException { List candidateLogSegments = new ArrayList<>(); List segments = log.logSegments(fromOffset, Long.MAX_VALUE); - if (!segments.isEmpty()) { - for (int idx = 1; idx < segments.size(); idx++) { - LogSegment previousSeg = segments.get(idx - 1); - LogSegment currentSeg = segments.get(idx); - if (currentSeg.baseOffset() <= lastStableOffset) { - candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset())); - } + if (segments.isEmpty()) { + return candidateLogSegments; + } + + long accumulatedSkippedSize = 0; + for (int idx = 1; idx < segments.size(); idx++) { + LogSegment previousSeg = segments.get(idx - 1); + LogSegment currentSeg = segments.get(idx); + if (currentSeg.baseOffset() > lastStableOffset) { + continue; + } + + if (isSegmentExpiredByTime(previousSeg, log) || + isSegmentExpiredBySize(previousSeg, log, accumulatedSkippedSize)) { + long newLogStartOffset = currentSeg.baseOffset(); + log.maybeIncrementLogStartOffset(newLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion); + logger.info("Segment {} is already expired globally. Skipping upload and incrementing logStartOffset to {} to allow local deletion.", + previousSeg, newLogStartOffset); + accumulatedSkippedSize += previousSeg.size(); + continue; } - // Discard the last active segment + + candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset())); } return candidateLogSegments; } From 15d5568699887fee1d24061f8dd47494178d6cd3 Mon Sep 17 00:00:00 2001 From: Jian Date: Mon, 26 Jan 2026 20:55:44 +0800 Subject: [PATCH 02/12] KAFKA-19952: Fix code style Signed-off-by: Jian --- .../kafka/server/log/remote/storage/RemoteLogManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index d72e7dca2ddc6..806a6f0ab5c0b 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -67,6 +67,7 @@ import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogSegment; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; import org.apache.kafka.storage.internals.log.OffsetIndex; import org.apache.kafka.storage.internals.log.OffsetPosition; import org.apache.kafka.storage.internals.log.OffsetResultHolder; @@ -76,7 +77,6 @@ import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool; import org.apache.kafka.storage.internals.log.TransactionIndex; import org.apache.kafka.storage.internals.log.TxnIndexSearchResult; -import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; import org.apache.kafka.storage.internals.log.UnifiedLog; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; From 2bfb775cd1202e53ff7d85fb0e64a11921791471 Mon Sep 17 00:00:00 2001 From: Jian Date: Mon, 26 Jan 2026 21:11:02 +0800 Subject: [PATCH 03/12] KAFKA-19952: Rollback the java doc Signed-off-by: Jian --- .../kafka/server/log/remote/storage/RemoteLogManager.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 806a6f0ab5c0b..995abbf0413b8 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -939,6 +939,11 @@ private boolean isSegmentExpiredBySize(LogSegment segment, UnifiedLog log, long * * Additionally, if a segment is already expired globally (based on retention.ms or retention.bytes), * it will be skipped from upload and logStartOffset will be updated to allow local deletion. + * + * @param log The log from which the segments are to be copied + * @param fromOffset The offset from which the segments are to be copied + * @param lastStableOffset The last stable offset of the log + * @return candidate log segments to be copied to remote storage */ List candidateLogSegments(UnifiedLog log, Long fromOffset, Long lastStableOffset) throws IOException { List candidateLogSegments = new ArrayList<>(); From 93ebe5a137be21de504e48573120b6f7d9174555 Mon Sep 17 00:00:00 2001 From: Jian Date: Mon, 26 Jan 2026 22:40:20 +0800 Subject: [PATCH 04/12] KAFKA-19952: Fix the unit test Signed-off-by: Jian --- .../kafka/server/log/remote/storage/RemoteLogManagerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java index c8efe9884d77b..4b1c0d85b023f 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java @@ -2113,7 +2113,7 @@ public void testRemoteSegmentWithinLeaderEpochsForOverlappingSegments() { } @Test - public void testCandidateLogSegmentsSkipsActiveSegment() { + public void testCandidateLogSegmentsSkipsActiveSegment() throws IOException { UnifiedLog log = mock(UnifiedLog.class); LogSegment segment1 = mock(LogSegment.class); LogSegment segment2 = mock(LogSegment.class); @@ -2136,7 +2136,7 @@ public void testCandidateLogSegmentsSkipsActiveSegment() { } @Test - public void testCandidateLogSegmentsSkipsSegmentsAfterLastStableOffset() { + public void testCandidateLogSegmentsSkipsSegmentsAfterLastStableOffset() throws IOException { UnifiedLog log = mock(UnifiedLog.class); LogSegment segment1 = mock(LogSegment.class); LogSegment segment2 = mock(LogSegment.class); From a852feab3bda2a3f5d63b93737ef232a3aa300b8 Mon Sep 17 00:00:00 2001 From: Jian Date: Tue, 27 Jan 2026 16:27:35 +0800 Subject: [PATCH 05/12] KAFKA-19952: refactor the naming Signed-off-by: Jian --- .../log/remote/storage/RemoteLogManager.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 995abbf0413b8..0f49d9c163d0c 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -909,26 +909,26 @@ private void maybeUpdateCopiedOffset(UnifiedLog log) throws RemoteStorageExcepti } /** - * Check if segment is already expired based on global retention time. + * Check if segment has already expired based on remote storage's retention time. */ - private boolean isSegmentExpiredByTime(LogSegment segment, UnifiedLog log) throws IOException { + private boolean isSegmentExpiredByTimeForRemoteStorage(LogSegment segment, UnifiedLog log) throws IOException { long retentionMs = log.config().retentionMs; if (retentionMs <= 0) { return false; } - return time.milliseconds() - segment.largestTimestamp() >= retentionMs; + return time.milliseconds() - segment.largestTimestamp() > retentionMs; } /** - * Check if segment should be skipped due to global retention size. + * Check if segment has already expired based on remote storage‘s retention size. */ - private boolean isSegmentExpiredBySize(LogSegment segment, UnifiedLog log, long accumulatedSkippedSize) { + private boolean isSegmentExpiredBySizeForRemoteStorage(LogSegment segment, UnifiedLog log, long accumulatedSkippedSize) { long retentionBytes = log.config().retentionSize; if (retentionBytes <= 0) { return false; } long excessSize = log.size() - retentionBytes; - return excessSize - accumulatedSkippedSize >= segment.size(); + return excessSize - accumulatedSkippedSize > segment.size(); } /** @@ -937,7 +937,7 @@ private boolean isSegmentExpiredBySize(LogSegment segment, UnifiedLog log, long * 2) Segment end-offset is less than the last-stable-offset as remote storage should contain only * committed/acked messages * - * Additionally, if a segment is already expired globally (based on retention.ms or retention.bytes), + * Additionally, if a segment has already expired based on remote storage's retention configuration, * it will be skipped from upload and logStartOffset will be updated to allow local deletion. * * @param log The log from which the segments are to be copied @@ -960,11 +960,11 @@ List candidateLogSegments(UnifiedLog log, Long fromOffset, L continue; } - if (isSegmentExpiredByTime(previousSeg, log) || - isSegmentExpiredBySize(previousSeg, log, accumulatedSkippedSize)) { + if (isSegmentExpiredByTimeForRemoteStorage(previousSeg, log) || + isSegmentExpiredBySizeForRemoteStorage(previousSeg, log, accumulatedSkippedSize)) { long newLogStartOffset = currentSeg.baseOffset(); log.maybeIncrementLogStartOffset(newLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion); - logger.info("Segment {} is already expired globally. Skipping upload and incrementing logStartOffset to {} to allow local deletion.", + logger.info("Segment {} has already expired based on remote storage retention configuration. Skipping upload and incrementing logStartOffset to {} to allow local deletion.", previousSeg, newLogStartOffset); accumulatedSkippedSize += previousSeg.size(); continue; From 22d2ffd2351582c376ca1ff2178a69efde25fcb8 Mon Sep 17 00:00:00 2001 From: Jian Date: Tue, 27 Jan 2026 17:00:28 +0800 Subject: [PATCH 06/12] KAFKA-19952: improve the java doc Signed-off-by: Jian --- .../kafka/server/log/remote/storage/RemoteLogManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 0f49d9c163d0c..85721b2990de7 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -964,7 +964,7 @@ List candidateLogSegments(UnifiedLog log, Long fromOffset, L isSegmentExpiredBySizeForRemoteStorage(previousSeg, log, accumulatedSkippedSize)) { long newLogStartOffset = currentSeg.baseOffset(); log.maybeIncrementLogStartOffset(newLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion); - logger.info("Segment {} has already expired based on remote storage retention configuration. Skipping upload and incrementing logStartOffset to {} to allow local deletion.", + logger.info("Segment {} has already expired based on remote storage's retention configuration. Skipping upload and incrementing logStartOffset to {} to allow local deletion.", previousSeg, newLogStartOffset); accumulatedSkippedSize += previousSeg.size(); continue; From e93b0fe829f0422aff016373c2a2541d073e5217 Mon Sep 17 00:00:00 2001 From: Jian Date: Tue, 27 Jan 2026 17:06:25 +0800 Subject: [PATCH 07/12] KAFKA-19952: add reason for segment deleting Signed-off-by: Jian --- .../kafka/server/log/remote/storage/RemoteLogManager.java | 2 +- .../storage/internals/log/LogStartOffsetIncrementReason.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 85721b2990de7..264f6af5ba9e2 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -963,7 +963,7 @@ List candidateLogSegments(UnifiedLog log, Long fromOffset, L if (isSegmentExpiredByTimeForRemoteStorage(previousSeg, log) || isSegmentExpiredBySizeForRemoteStorage(previousSeg, log, accumulatedSkippedSize)) { long newLogStartOffset = currentSeg.baseOffset(); - log.maybeIncrementLogStartOffset(newLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion); + log.maybeIncrementLogStartOffset(newLogStartOffset, LogStartOffsetIncrementReason.SegmentExpired); logger.info("Segment {} has already expired based on remote storage's retention configuration. Skipping upload and incrementing logStartOffset to {} to allow local deletion.", previousSeg, newLogStartOffset); accumulatedSkippedSize += previousSeg.size(); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java index 3c9881d0f94d2..c2cb2264671b2 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java @@ -20,6 +20,7 @@ public enum LogStartOffsetIncrementReason { LeaderOffsetIncremented("leader offset increment"), SegmentDeletion("segment deletion"), + SegmentExpired("segment expired"), ClientRecordDeletion("client delete records request"), SnapshotGenerated("snapshot generated"); From 32e56d2a95db14e40aea28e16d045b705530ce96 Mon Sep 17 00:00:00 2001 From: Jian Date: Tue, 27 Jan 2026 17:35:11 +0800 Subject: [PATCH 08/12] KAFKA-19952: improvement Signed-off-by: Jian --- .../kafka/server/log/remote/storage/RemoteLogManager.java | 2 +- .../storage/internals/log/LogStartOffsetIncrementReason.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 264f6af5ba9e2..9f1221f87d26a 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -963,7 +963,7 @@ List candidateLogSegments(UnifiedLog log, Long fromOffset, L if (isSegmentExpiredByTimeForRemoteStorage(previousSeg, log) || isSegmentExpiredBySizeForRemoteStorage(previousSeg, log, accumulatedSkippedSize)) { long newLogStartOffset = currentSeg.baseOffset(); - log.maybeIncrementLogStartOffset(newLogStartOffset, LogStartOffsetIncrementReason.SegmentExpired); + log.maybeIncrementLogStartOffset(newLogStartOffset, LogStartOffsetIncrementReason.LocalSegmentExpiredByRemoteRetention); logger.info("Segment {} has already expired based on remote storage's retention configuration. Skipping upload and incrementing logStartOffset to {} to allow local deletion.", previousSeg, newLogStartOffset); accumulatedSkippedSize += previousSeg.size(); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java index c2cb2264671b2..1211ce50da3bd 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java @@ -20,7 +20,7 @@ public enum LogStartOffsetIncrementReason { LeaderOffsetIncremented("leader offset increment"), SegmentDeletion("segment deletion"), - SegmentExpired("segment expired"), + LocalSegmentExpiredByRemoteRetention("local segment has already expired based on remote storage retention policy"), ClientRecordDeletion("client delete records request"), SnapshotGenerated("snapshot generated"); From f38e19b965cf4b002330eaa7b96e2361920e1f53 Mon Sep 17 00:00:00 2001 From: Jian Date: Tue, 27 Jan 2026 18:48:06 +0800 Subject: [PATCH 09/12] KAFKA-19952: refactor for performance Signed-off-by: Jian --- .../log/remote/storage/RemoteLogManager.java | 20 +++++++++---------- .../log/LogStartOffsetIncrementReason.java | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 9f1221f87d26a..18ca37269234b 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -911,8 +911,7 @@ private void maybeUpdateCopiedOffset(UnifiedLog log) throws RemoteStorageExcepti /** * Check if segment has already expired based on remote storage's retention time. */ - private boolean isSegmentExpiredByTimeForRemoteStorage(LogSegment segment, UnifiedLog log) throws IOException { - long retentionMs = log.config().retentionMs; + private boolean isSegmentExpiredByTimeForRemoteStorage(LogSegment segment, long retentionMs) throws IOException { if (retentionMs <= 0) { return false; } @@ -922,13 +921,11 @@ private boolean isSegmentExpiredByTimeForRemoteStorage(LogSegment segment, Unifi /** * Check if segment has already expired based on remote storage‘s retention size. */ - private boolean isSegmentExpiredBySizeForRemoteStorage(LogSegment segment, UnifiedLog log, long accumulatedSkippedSize) { - long retentionBytes = log.config().retentionSize; + private boolean isSegmentExpiredBySizeForRemoteStorage(LogSegment segment, long retentionBytes, long logSize, long accumulatedSkippedSize) { if (retentionBytes <= 0) { return false; } - long excessSize = log.size() - retentionBytes; - return excessSize - accumulatedSkippedSize > segment.size(); + return (logSize - retentionBytes - accumulatedSkippedSize) > segment.size(); } /** @@ -951,7 +948,10 @@ List candidateLogSegments(UnifiedLog log, Long fromOffset, L if (segments.isEmpty()) { return candidateLogSegments; } - + long retentionMs = log.config().retentionMs; + long retentionSize = log.config().retentionSize; + // Compute log.size() once when retention is size-based; skip when not needed to avoid wasted work. + long logSize = retentionSize > 0 ? log.size() : -1; long accumulatedSkippedSize = 0; for (int idx = 1; idx < segments.size(); idx++) { LogSegment previousSeg = segments.get(idx - 1); @@ -960,10 +960,10 @@ List candidateLogSegments(UnifiedLog log, Long fromOffset, L continue; } - if (isSegmentExpiredByTimeForRemoteStorage(previousSeg, log) || - isSegmentExpiredBySizeForRemoteStorage(previousSeg, log, accumulatedSkippedSize)) { + if (isSegmentExpiredByTimeForRemoteStorage(previousSeg, retentionMs) || + isSegmentExpiredBySizeForRemoteStorage(previousSeg, retentionSize, logSize, accumulatedSkippedSize)) { long newLogStartOffset = currentSeg.baseOffset(); - log.maybeIncrementLogStartOffset(newLogStartOffset, LogStartOffsetIncrementReason.LocalSegmentExpiredByRemoteRetention); + log.maybeIncrementLogStartOffset(newLogStartOffset, LogStartOffsetIncrementReason.SegmentExpiredByRemoteRetention); logger.info("Segment {} has already expired based on remote storage's retention configuration. Skipping upload and incrementing logStartOffset to {} to allow local deletion.", previousSeg, newLogStartOffset); accumulatedSkippedSize += previousSeg.size(); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java index 1211ce50da3bd..210f04119ca81 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java @@ -20,7 +20,7 @@ public enum LogStartOffsetIncrementReason { LeaderOffsetIncremented("leader offset increment"), SegmentDeletion("segment deletion"), - LocalSegmentExpiredByRemoteRetention("local segment has already expired based on remote storage retention policy"), + SegmentExpiredByRemoteRetention("segment has already expired based on remote storage retention policy"), ClientRecordDeletion("client delete records request"), SnapshotGenerated("snapshot generated"); From 64cddab2c99e158e6d71e9838ae6a85535700d83 Mon Sep 17 00:00:00 2001 From: Jian Date: Tue, 27 Jan 2026 19:02:49 +0800 Subject: [PATCH 10/12] KAFKA-19952: fix the unit test Signed-off-by: Jian --- .../kafka/server/log/remote/storage/RemoteLogManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 18ca37269234b..dc49e11ef1ae1 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -948,8 +948,8 @@ List candidateLogSegments(UnifiedLog log, Long fromOffset, L if (segments.isEmpty()) { return candidateLogSegments; } - long retentionMs = log.config().retentionMs; - long retentionSize = log.config().retentionSize; + long retentionMs = log.config() != null? log.config().retentionMs : -1; + long retentionSize = log.config() != null? log.config().retentionSize : -1; // Compute log.size() once when retention is size-based; skip when not needed to avoid wasted work. long logSize = retentionSize > 0 ? log.size() : -1; long accumulatedSkippedSize = 0; From 84b8821ecd9961a0439223b2cd920709f49dce87 Mon Sep 17 00:00:00 2001 From: Jian Date: Tue, 27 Jan 2026 19:12:26 +0800 Subject: [PATCH 11/12] KAFKA-19952: fix the check style Signed-off-by: Jian --- .../kafka/server/log/remote/storage/RemoteLogManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index dc49e11ef1ae1..7d26df0c175ef 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -948,8 +948,8 @@ List candidateLogSegments(UnifiedLog log, Long fromOffset, L if (segments.isEmpty()) { return candidateLogSegments; } - long retentionMs = log.config() != null? log.config().retentionMs : -1; - long retentionSize = log.config() != null? log.config().retentionSize : -1; + long retentionMs = log.config() != null ? log.config().retentionMs : -1; + long retentionSize = log.config() != null ? log.config().retentionSize : -1; // Compute log.size() once when retention is size-based; skip when not needed to avoid wasted work. long logSize = retentionSize > 0 ? log.size() : -1; long accumulatedSkippedSize = 0; From b0299f2e924fabd518c843f727a77475635795ac Mon Sep 17 00:00:00 2001 From: stroller Date: Thu, 29 Jan 2026 15:29:31 +0800 Subject: [PATCH 12/12] fix the issue for timed based Signed-off-by: stroller --- .../log/remote/storage/RemoteLogManager.java | 35 +++++++++++++++---- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 7d26df0c175ef..481fa55e2d5b0 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -936,6 +936,9 @@ private boolean isSegmentExpiredBySizeForRemoteStorage(LogSegment segment, long * * Additionally, if a segment has already expired based on remote storage's retention configuration, * it will be skipped from upload and logStartOffset will be updated to allow local deletion. + * However, we can only skip and advance logStartOffset for a CONTIGUOUS sequence of expired segments + * from the beginning. Once we encounter a non-expired segment, we must stop skipping to ensure + * data consistency (similar to local segment deletion logic). * * @param log The log from which the segments are to be copied * @param fromOffset The offset from which the segments are to be copied @@ -953,6 +956,11 @@ List candidateLogSegments(UnifiedLog log, Long fromOffset, L // Compute log.size() once when retention is size-based; skip when not needed to avoid wasted work. long logSize = retentionSize > 0 ? log.size() : -1; long accumulatedSkippedSize = 0; + // Flag to track if we can still skip expired segments. + // We can only skip contiguous expired segments from the beginning. + // Once we encounter a non-expired segment, we must stop skipping. + boolean canSkipExpiredSegments = true; + for (int idx = 1; idx < segments.size(); idx++) { LogSegment previousSeg = segments.get(idx - 1); LogSegment currentSeg = segments.get(idx); @@ -960,16 +968,29 @@ List candidateLogSegments(UnifiedLog log, Long fromOffset, L continue; } - if (isSegmentExpiredByTimeForRemoteStorage(previousSeg, retentionMs) || - isSegmentExpiredBySizeForRemoteStorage(previousSeg, retentionSize, logSize, accumulatedSkippedSize)) { - long newLogStartOffset = currentSeg.baseOffset(); - log.maybeIncrementLogStartOffset(newLogStartOffset, LogStartOffsetIncrementReason.SegmentExpiredByRemoteRetention); - logger.info("Segment {} has already expired based on remote storage's retention configuration. Skipping upload and incrementing logStartOffset to {} to allow local deletion.", - previousSeg, newLogStartOffset); - accumulatedSkippedSize += previousSeg.size(); + boolean isExpired = isSegmentExpiredByTimeForRemoteStorage(previousSeg, retentionMs) || + isSegmentExpiredBySizeForRemoteStorage(previousSeg, retentionSize, logSize, accumulatedSkippedSize); + + if (isExpired) { + if (canSkipExpiredSegments) { + // Can skip and advance logStartOffset for contiguous expired segments at the beginning + long newLogStartOffset = currentSeg.baseOffset(); + log.maybeIncrementLogStartOffset(newLogStartOffset, LogStartOffsetIncrementReason.SegmentExpiredByRemoteRetention); + logger.info("Segment {} has already expired based on remote storage's retention configuration. Skipping upload and incrementing logStartOffset to {} to allow local deletion.", + previousSeg, newLogStartOffset); + accumulatedSkippedSize += previousSeg.size(); + } else { + // Expired segment after a non-expired segment - cannot skip. + // Add to candidates to maintain order. Will be handled after earlier segments are uploaded. + logger.debug("Segment {} is expired but must be processed because earlier non-expired segments are pending upload.", + previousSeg); + candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset())); + } continue; } + // Found a non-expired segment - stop allowing skip-and-advance for subsequent expired segments + canSkipExpiredSegments = false; candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset())); } return candidateLogSegments;