diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 443f955a5fb63..481fa55e2d5b0 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -67,6 +67,7 @@ import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogSegment; +import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason; import org.apache.kafka.storage.internals.log.OffsetIndex; import org.apache.kafka.storage.internals.log.OffsetPosition; import org.apache.kafka.storage.internals.log.OffsetResultHolder; @@ -907,28 +908,90 @@ private void maybeUpdateCopiedOffset(UnifiedLog log) throws RemoteStorageExcepti } } + /** + * Check if segment has already expired based on remote storage's retention time. + */ + private boolean isSegmentExpiredByTimeForRemoteStorage(LogSegment segment, long retentionMs) throws IOException { + if (retentionMs <= 0) { + return false; + } + return time.milliseconds() - segment.largestTimestamp() > retentionMs; + } + + /** + * Check if segment has already expired based on remote storageā€˜s retention size. + */ + private boolean isSegmentExpiredBySizeForRemoteStorage(LogSegment segment, long retentionBytes, long logSize, long accumulatedSkippedSize) { + if (retentionBytes <= 0) { + return false; + } + return (logSize - retentionBytes - accumulatedSkippedSize) > segment.size(); + } + /** * Segments which match the following criteria are eligible for copying to remote storage: * 1) Segment is not the active segment and * 2) Segment end-offset is less than the last-stable-offset as remote storage should contain only * committed/acked messages + * + * Additionally, if a segment has already expired based on remote storage's retention configuration, + * it will be skipped from upload and logStartOffset will be updated to allow local deletion. + * However, we can only skip and advance logStartOffset for a CONTIGUOUS sequence of expired segments + * from the beginning. Once we encounter a non-expired segment, we must stop skipping to ensure + * data consistency (similar to local segment deletion logic). + * * @param log The log from which the segments are to be copied * @param fromOffset The offset from which the segments are to be copied * @param lastStableOffset The last stable offset of the log * @return candidate log segments to be copied to remote storage */ - List candidateLogSegments(UnifiedLog log, Long fromOffset, Long lastStableOffset) { + List candidateLogSegments(UnifiedLog log, Long fromOffset, Long lastStableOffset) throws IOException { List candidateLogSegments = new ArrayList<>(); List segments = log.logSegments(fromOffset, Long.MAX_VALUE); - if (!segments.isEmpty()) { - for (int idx = 1; idx < segments.size(); idx++) { - LogSegment previousSeg = segments.get(idx - 1); - LogSegment currentSeg = segments.get(idx); - if (currentSeg.baseOffset() <= lastStableOffset) { + if (segments.isEmpty()) { + return candidateLogSegments; + } + long retentionMs = log.config() != null ? log.config().retentionMs : -1; + long retentionSize = log.config() != null ? log.config().retentionSize : -1; + // Compute log.size() once when retention is size-based; skip when not needed to avoid wasted work. + long logSize = retentionSize > 0 ? log.size() : -1; + long accumulatedSkippedSize = 0; + // Flag to track if we can still skip expired segments. + // We can only skip contiguous expired segments from the beginning. + // Once we encounter a non-expired segment, we must stop skipping. + boolean canSkipExpiredSegments = true; + + for (int idx = 1; idx < segments.size(); idx++) { + LogSegment previousSeg = segments.get(idx - 1); + LogSegment currentSeg = segments.get(idx); + if (currentSeg.baseOffset() > lastStableOffset) { + continue; + } + + boolean isExpired = isSegmentExpiredByTimeForRemoteStorage(previousSeg, retentionMs) || + isSegmentExpiredBySizeForRemoteStorage(previousSeg, retentionSize, logSize, accumulatedSkippedSize); + + if (isExpired) { + if (canSkipExpiredSegments) { + // Can skip and advance logStartOffset for contiguous expired segments at the beginning + long newLogStartOffset = currentSeg.baseOffset(); + log.maybeIncrementLogStartOffset(newLogStartOffset, LogStartOffsetIncrementReason.SegmentExpiredByRemoteRetention); + logger.info("Segment {} has already expired based on remote storage's retention configuration. Skipping upload and incrementing logStartOffset to {} to allow local deletion.", + previousSeg, newLogStartOffset); + accumulatedSkippedSize += previousSeg.size(); + } else { + // Expired segment after a non-expired segment - cannot skip. + // Add to candidates to maintain order. Will be handled after earlier segments are uploaded. + logger.debug("Segment {} is expired but must be processed because earlier non-expired segments are pending upload.", + previousSeg); candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset())); } + continue; } - // Discard the last active segment + + // Found a non-expired segment - stop allowing skip-and-advance for subsequent expired segments + canSkipExpiredSegments = false; + candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset())); } return candidateLogSegments; } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java index 3c9881d0f94d2..210f04119ca81 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogStartOffsetIncrementReason.java @@ -20,6 +20,7 @@ public enum LogStartOffsetIncrementReason { LeaderOffsetIncremented("leader offset increment"), SegmentDeletion("segment deletion"), + SegmentExpiredByRemoteRetention("segment has already expired based on remote storage retention policy"), ClientRecordDeletion("client delete records request"), SnapshotGenerated("snapshot generated"); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java index c8efe9884d77b..4b1c0d85b023f 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java @@ -2113,7 +2113,7 @@ public void testRemoteSegmentWithinLeaderEpochsForOverlappingSegments() { } @Test - public void testCandidateLogSegmentsSkipsActiveSegment() { + public void testCandidateLogSegmentsSkipsActiveSegment() throws IOException { UnifiedLog log = mock(UnifiedLog.class); LogSegment segment1 = mock(LogSegment.class); LogSegment segment2 = mock(LogSegment.class); @@ -2136,7 +2136,7 @@ public void testCandidateLogSegmentsSkipsActiveSegment() { } @Test - public void testCandidateLogSegmentsSkipsSegmentsAfterLastStableOffset() { + public void testCandidateLogSegmentsSkipsSegmentsAfterLastStableOffset() throws IOException { UnifiedLog log = mock(UnifiedLog.class); LogSegment segment1 = mock(LogSegment.class); LogSegment segment2 = mock(LogSegment.class);