diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java index 62c16a746ffce..839e9f863b621 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java @@ -1811,20 +1811,83 @@ private boolean remoteLogEnabledAndRemoteCopyEnabled() { return remoteLogEnabled() && !config().remoteLogCopyDisable(); } - private boolean isSegmentEligibleForDeletion(Optional nextSegmentOpt, long upperBoundOffset) { + private boolean isSegmentEligibleForDeletion(LogSegment segment, Optional nextSegmentOpt, long upperBoundOffset) { boolean allowDeletionDueToLogStartOffsetIncremented = nextSegmentOpt.isPresent() && logStartOffset >= nextSegmentOpt.get().baseOffset(); // Segments are eligible for deletion when: // 1. they are uploaded to the remote storage // 2. log-start-offset was incremented higher than the largest offset in the candidate segment + // 3. they have exceeded the remote retention time (retention.ms), even if upload failed + // 4. only local segments size exceeds the remote retention size (retention.bytes), even if upload failed // Note: when remote log copy is disabled, we will fall back to local log check using retention.ms/bytes if (remoteLogEnabledAndRemoteCopyEnabled()) { - return (upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage()) || - allowDeletionDueToLogStartOffsetIncremented; + // Check if segment is uploaded to remote storage + boolean uploadedToRemote = upperBoundOffset > 0 && upperBoundOffset - 1 <= highestOffsetInRemoteStorage(); + + // Check if log-start-offset was incremented + if (uploadedToRemote || allowDeletionDueToLogStartOffsetIncremented) { + return true; + } + + if (isSegmentEligibleForDeletionByRemoteRetentionTime(segment)) { + return true; + } + + if (isSegmentEligibleForDeletionByRemoteRetentionSize(segment)) { + return true; + } + + return false; } else { return true; } } + private boolean isSegmentEligibleForDeletionByRemoteRetentionTime(LogSegment segment) { + long remoteRetentionMs = config().retentionMs; + if (remoteRetentionMs < 0) { + return false; + } + + try { + long currentTime = time().milliseconds(); + long segmentTimestamp = segment.largestTimestamp(); + if (currentTime - segmentTimestamp > remoteRetentionMs) { + logger.info("Segment {} is eligible for deletion due to remote retention time {}ms breach, " + + "even though it hasn't been uploaded to remote storage yet. " + + "This prevents local disk from filling up when remote storage is unavailable.", + segment, remoteRetentionMs); + return true; + } + } catch (IOException e) { + logger.warn("Failed to read largest timestamp for segment {}, skipping time-based retention check. " + + "Error: {}", segment, e.getMessage()); + } + return false; + } + + private boolean isSegmentEligibleForDeletionByRemoteRetentionSize(LogSegment segment) { + long remoteRetentionSize = config().retentionSize; + if (remoteRetentionSize < 0) { + return false; + } + + long onlyLocalSize; + if (highestOffsetInRemoteStorage() < 0) { + onlyLocalSize = size(); + } else { + onlyLocalSize = onlyLocalLogSegmentsSize(); + } + + if (onlyLocalSize > remoteRetentionSize) { + logger.info("Segment {} is eligible for deletion due to remote retention size {} bytes breach. " + + "Only local segments size {} exceeds the limit, even though it hasn't been uploaded to remote storage yet. " + + "This prevents local disk from filling up when remote storage is unavailable.", + segment, remoteRetentionSize, onlyLocalSize); + return true; + } + return false; + } + /** * Find segments starting from the oldest until the user-supplied predicate is false. * A final segment that is empty will never be returned. @@ -1855,7 +1918,7 @@ public List deletableSegments(DeletionCondition predicate) throws IO if (predicateResult && remoteLogEnabled() && nextSegmentOpt.isEmpty() && segment.size() > 0) { shouldRoll = true; } - if (predicateResult && !isLastSegmentAndEmpty && isSegmentEligibleForDeletion(nextSegmentOpt, upperBoundOffset)) { + if (predicateResult && !isLastSegmentAndEmpty && isSegmentEligibleForDeletion(segment, nextSegmentOpt, upperBoundOffset)) { deletable.add(segment); segmentOpt = nextSegmentOpt; } else {