From 21f121c9fa9f9add4ec4bc0a814246dee4ae00ac Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sat, 31 Jan 2026 14:23:08 +0800 Subject: [PATCH 1/4] temp version --- .../scala/unit/kafka/log/LogCleanerTest.scala | 78 +++++++++++++++++- .../kafka/storage/internals/log/Cleaner.java | 56 +++++++++++-- .../log/SegmentOverflowException.java | 31 +++++++ .../internals/log/CleanerIntegrationTest.java | 80 +++++++++++++++++++ 4 files changed, 238 insertions(+), 7 deletions(-) create mode 100644 storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentOverflowException.java create mode 100644 storage/src/test/java/org/apache/kafka/storage/internals/log/CleanerIntegrationTest.java diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index d142a4e64de5c..6cf08743b1f7e 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -29,14 +29,15 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.server.common.{RequestLocal, TransactionVersion} import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import org.apache.kafka.server.util.MockTime -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanedTransactionMetadata, Cleaner, CleanerConfig, CleanerStats, LocalLog, LogAppendInfo, LogCleaner, LogCleanerManager, LogCleaningAbortedException, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetMap, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanedTransactionMetadata, Cleaner, CleanerConfig, CleanerStats, LocalLog, LogAppendInfo, LogCleaner, LogCleanerManager, LogCleaningAbortedException, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetMap, ProducerStateManager, ProducerStateManagerConfig, SegmentOverflowException, TransactionIndex, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.internals.utils.Throttler import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} -import org.mockito.ArgumentMatchers +import org.mockito.{ArgumentMatchers, MockedConstruction, Mockito} import org.mockito.ArgumentMatchers.{any, anyString} import org.mockito.Mockito.{mockConstruction, times, verify, verifyNoMoreInteractions} +import org.mockito.invocation.InvocationOnMock import java.io.{File, RandomAccessFile} import java.nio._ @@ -2218,6 +2219,79 @@ class LogCleanerTest extends Logging { } } + @Test + def testSegmentWithCompactionDataOverflow(): Unit = { + val cleaner = makeCleaner(Int.MaxValue) + + val logProps = new Properties() + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer) + val config = LogConfig.fromProps(logConfig.originals, logProps) + val log = makeLog(config = config) + val dir = log.parentDir + + log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, key = "test".getBytes), 0) + + val segments = log.logSegments.asScala.toList + val segmentToClean = segments.head + val topicPartition = log.topicPartition() + + val mockTxnIndex = Mockito.mock(classOf[TransactionIndex]) + Mockito.when(mockTxnIndex.file()).thenReturn(new File(dir, "mock.txnindex")) + + val appendCallCount = new java.util.concurrent.atomic.AtomicInteger(0) + + val mockLogSegmentCtor = Mockito.mockConstruction( + classOf[LogSegment], + new MockedConstruction.MockInitializer[LogSegment] { + override def prepare(mock: LogSegment, context: MockedConstruction.Context): Unit = { + Mockito.when(mock.txnIndex()).thenReturn(mockTxnIndex) + Mockito.when(mock.baseOffset()).thenReturn(segmentToClean.baseOffset()) + + Mockito.when(mock.readNextOffset()).thenReturn(segmentToClean.baseOffset() + 1) + Mockito.doNothing().when(mock).onBecomeInactiveSegment() + Mockito.doNothing().when(mock).flush() + Mockito.doNothing().when(mock).setLastModified(ArgumentMatchers.anyLong()) + + Mockito.doAnswer((invocation: InvocationOnMock) => { + if (appendCallCount.incrementAndGet() == 1) { + // first time, it should throw SegmentOverflowException + throw new SegmentOverflowException(mock) + } else { + // second time, it should work fine + null + } + }).when(mock).append( + ArgumentMatchers.anyLong(), + ArgumentMatchers.any(classOf[MemoryRecords]) + ) + } + } + ) + + try { + assertThrows(classOf[LogCleaningAbortedException], () => + cleaner.cleanSegments(log, util.List.of(segmentToClean), + cleaner.offsetMap, 0L, + new CleanerStats(Time.SYSTEM), + new CleanedTransactionMetadata(), + -1, log.logEndOffset) + ) + + assertTrue(cleaner.segmentOverflowPartitions().containsKey(topicPartition)) + val segmentRatio = cleaner.segmentOverflowPartitions().get(topicPartition) + assertEquals(0.9, segmentRatio) + + val cleanable = new LogToClean(log, 0L, log.activeSegment.baseOffset, true) + cleaner.doClean(cleanable, time.milliseconds()) + + assertFalse(cleaner.segmentOverflowPartitions().containsKey(topicPartition)) + + } finally { + mockLogSegmentCtor.close() + log.close() + } + } + private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = { for (((key, value), offset) <- keysAndValues.zip(offsetSeq)) yield log.appendAsFollower(messageWithOffset(key, value, offset), Int.MaxValue).lastOffset diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java index e17809d988ece..b5c9c701b0b79 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java @@ -38,6 +38,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -60,6 +61,11 @@ public class Cleaner { private final Time time; private final Consumer checkDone; + /** + * The topic partitions that have segment overflow history mapped to their segment size ratio + */ + private final Map segmentOverflowPartitions = new HashMap<>(); + /** * Buffer used for read i/o */ @@ -169,9 +175,18 @@ public Map.Entry doClean(LogToClean cleanable, long currentT log.name(), new Date(cleanableHorizonMs), new Date(legacyDeleteHorizonMs)); CleanedTransactionMetadata transactionMetadata = new CleanedTransactionMetadata(); + double sizeRatio = 1.0; + if (segmentOverflowPartitions.containsKey(log.topicPartition())) { + sizeRatio = segmentOverflowPartitions.get(log.topicPartition()); + logger.info("Partition {} has overflow history. " + "Reducing effective segment size to {}% for this round.", + log.topicPartition(), sizeRatio * 100); + } + + int effectiveMaxSize = (int) (log.config().segmentSize() * sizeRatio); + List> groupedSegments = groupSegmentsBySize( log.logSegments(0, endOffset), - log.config().segmentSize(), + effectiveMaxSize, log.config().maxIndexSize, cleanable.firstUncleanableOffset() ); @@ -180,6 +195,13 @@ public Map.Entry doClean(LogToClean cleanable, long currentT cleanSegments(log, group, offsetMap, currentTime, stats, transactionMetadata, legacyDeleteHorizonMs, upperBoundOffset); } + if (segmentOverflowPartitions.containsKey(log.topicPartition())) { + segmentOverflowPartitions.remove(log.topicPartition()); + logger.info("Successfully cleaned log {} with degraded size (ratio: {}%). " + + "Cleared overflow marker. Next cleaning will use normal size.", + log.name(), sizeRatio * 100); + } + // record buffer utilization stats.bufferUtilization = offsetMap.utilization(); @@ -254,6 +276,20 @@ public void cleanSegments(UnifiedLog log, stats, currentTime ); + } catch (SegmentOverflowException e) { + if (segmentOverflowPartitions.containsKey(log.topicPartition())) { + Double segmentRatio = segmentOverflowPartitions.get(log.topicPartition()); + segmentOverflowPartitions.put(log.topicPartition(), segmentRatio * 0.9); + logger.warn("Repeated segment overflow for partition {}: {}. " + + "Further degrading to {}% size in next cleaning round.", + log.topicPartition(), e.getMessage(), segmentRatio * 100); + } else { + segmentOverflowPartitions.put(log.topicPartition(), 0.9); + logger.warn("Segment overflow detected for partition {}: {}. " + + "Marked for degradation to 90% size in next cleaning round.", + log.topicPartition(), e.getMessage()); + } + throw new LogCleaningAbortedException(); } catch (LogSegmentOffsetOverflowException e) { // Split the current segment. It's also safest to abort the current cleaning process, so that we retry from // scratch once the split is complete. @@ -400,10 +436,15 @@ public boolean shouldRetainRecord(RecordBatch batch, Record record) { if (outputBuffer.position() > 0) { outputBuffer.flip(); MemoryRecords retained = MemoryRecords.readableRecords(outputBuffer); - // it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads - // after `Log.replaceSegments` (which acquires the lock) is called - dest.append(result.maxOffset(), retained); - throttler.maybeThrottle(outputBuffer.limit()); + try { + // it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads + // after `Log.replaceSegments` (which acquires the lock) is called + dest.append(result.maxOffset(), retained); + throttler.maybeThrottle(outputBuffer.limit()); + } catch (IllegalArgumentException e) { + // this indicates that we have an offset overflow in the destination segment + throw new SegmentOverflowException(dest); + } } // if we read bytes but didn't get even one complete batch, our I/O buffer is too small, grow it and try again @@ -763,4 +804,9 @@ private boolean buildOffsetMapForSegment(TopicPartition topicPartition, return false; } + + // only for testing + public Map segmentOverflowPartitions() { + return Map.copyOf(segmentOverflowPartitions); + } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentOverflowException.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentOverflowException.java new file mode 100644 index 0000000000000..a999146757818 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentOverflowException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.KafkaException; + +/** + * Exception thrown when segment size would overflow during compaction + */ +public class SegmentOverflowException extends KafkaException { + public final LogSegment segment; + + public SegmentOverflowException(LogSegment segment) { + super("Segment size would overflow during compaction for segment " + segment); + this.segment = segment; + } +} diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/CleanerIntegrationTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/CleanerIntegrationTest.java new file mode 100644 index 0000000000000..c0432a96e5da8 --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/CleanerIntegrationTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import com.yammer.metrics.core.Gauge; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.server.metrics.KafkaYammerMetrics; +import org.junit.jupiter.api.Timeout; + +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +public class CleanerIntegrationTest { + + @Timeout(300) + @ClusterTest(types = Type.CO_KRAFT) + public void testCleanerSegmentCompactionOverflow(ClusterInstance cluster) throws Exception { + String topic = "compaction-overflow-test"; + try (Admin admin = cluster.admin()) { + NewTopic newTopic = new NewTopic(topic, 1, (short) 1); + newTopic.configs(Map.of( + "cleanup.policy", "compact", + "compression.type", "lz4", + "segment.bytes", String.valueOf(Integer.MAX_VALUE - 1), + "min.cleanable.dirty.ratio", "0.01" + )); + admin.createTopics(List.of(newTopic)).all().get(); + cluster.waitTopicCreation(topic, 1); + } + var valueBytes = new byte[10240]; + var random = new Random(); + random.nextBytes(valueBytes); + var producers = IntStream.range(0, 5).mapToObj(__ -> CompletableFuture.runAsync(() -> { + try (var producer = cluster.producer(Map.of( + ProducerConfig.COMPRESSION_LZ4_LEVEL_CONFIG, "17", + ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"))) { + for (int i = 0; i < 60_000; i++) { + byte[] data = Uuid.randomUuid().toString().getBytes(); + if (Math.random() < 0.1) + producer.send(new ProducerRecord<>(topic, data, null)); + else + producer.send(new ProducerRecord<>(topic, data, valueBytes)); + } + } + })).toList(); + producers.forEach(CompletableFuture::join); + var metrics = KafkaYammerMetrics.defaultRegistry().allMetrics(); + metrics.forEach((name, metric) -> { + if (name.getName().contains("uncleanable-partitions-count")) { + Gauge value = (Gauge) metric; + assertEquals(1, value.value()); + } + }); + } +} From 23c7bf07540518cbf6c471740abddb890e41d0fd Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sat, 31 Jan 2026 15:04:30 +0800 Subject: [PATCH 2/4] update the test --- checkstyle/suppressions.xml | 2 + .../internals/log/CleanerIntegrationTest.java | 64 +++++++++++-------- 2 files changed, 40 insertions(+), 26 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index e82b3e57fabf6..c752b0edfd38e 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -368,6 +368,8 @@ + + diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/CleanerIntegrationTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/CleanerIntegrationTest.java index c0432a96e5da8..ea9143b3c5f16 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/CleanerIntegrationTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/CleanerIntegrationTest.java @@ -17,16 +17,15 @@ package org.apache.kafka.storage.internals.log; import com.yammer.metrics.core.Gauge; -import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Node; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.Type; import org.apache.kafka.server.metrics.KafkaYammerMetrics; -import org.junit.jupiter.api.Timeout; import java.util.List; import java.util.Map; @@ -35,45 +34,58 @@ import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + public class CleanerIntegrationTest { - @Timeout(300) + @SuppressWarnings("unchecked") @ClusterTest(types = Type.CO_KRAFT) public void testCleanerSegmentCompactionOverflow(ClusterInstance cluster) throws Exception { String topic = "compaction-overflow-test"; - try (Admin admin = cluster.admin()) { + try (var admin = cluster.admin()) { NewTopic newTopic = new NewTopic(topic, 1, (short) 1); newTopic.configs(Map.of( - "cleanup.policy", "compact", - "compression.type", "lz4", - "segment.bytes", String.valueOf(Integer.MAX_VALUE - 1), - "min.cleanable.dirty.ratio", "0.01" + "cleanup.policy", "compact", + "compression.type", "lz4", + "segment.bytes", String.valueOf(Integer.MAX_VALUE - 1), + "min.cleanable.dirty.ratio", "0.01" )); admin.createTopics(List.of(newTopic)).all().get(); cluster.waitTopicCreation(topic, 1); - } - var valueBytes = new byte[10240]; - var random = new Random(); - random.nextBytes(valueBytes); - var producers = IntStream.range(0, 5).mapToObj(__ -> CompletableFuture.runAsync(() -> { - try (var producer = cluster.producer(Map.of( - ProducerConfig.COMPRESSION_LZ4_LEVEL_CONFIG, "17", - ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"))) { - for (int i = 0; i < 60_000; i++) { - byte[] data = Uuid.randomUuid().toString().getBytes(); - if (Math.random() < 0.1) - producer.send(new ProducerRecord<>(topic, data, null)); - else - producer.send(new ProducerRecord<>(topic, data, valueBytes)); + + var data = new byte[10240]; + var random = new Random(); + random.nextBytes(data); + var producers = IntStream.range(0, 5).mapToObj(__ -> CompletableFuture.runAsync(() -> { + try (var producer = cluster.producer(Map.of( + ProducerConfig.COMPRESSION_LZ4_LEVEL_CONFIG, "17", + ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"))) { + for (int i = 0; i < 60_000; i++) { + byte[] key = Uuid.randomUuid().toString().getBytes(); + if (Math.random() < 0.1) + producer.send(new ProducerRecord<>(topic, key, null)); + else + producer.send(new ProducerRecord<>(topic, key, data)); + } } - } - })).toList(); - producers.forEach(CompletableFuture::join); + })).toList(); + producers.forEach(CompletableFuture::join); + + var ids = admin.describeCluster().nodes().get().stream().map(Node::id).toList(); + var size = admin.describeLogDirs(ids).allDescriptions().get().entrySet() + .stream() + .flatMap(e -> e.getValue().values() + .stream() + .flatMap(v -> v.replicaInfos().entrySet().stream())) + .filter(v -> v.getKey().topic().equals(topic)) + .mapToLong(v -> v.getValue().size()).sum(); + assertTrue(Integer.MAX_VALUE < size, "log size should exceed Integer.MAX_VALUE to trigger overflow"); + } var metrics = KafkaYammerMetrics.defaultRegistry().allMetrics(); metrics.forEach((name, metric) -> { if (name.getName().contains("uncleanable-partitions-count")) { Gauge value = (Gauge) metric; - assertEquals(1, value.value()); + assertEquals(0, value.value(), "there should be no uncleanable partitions due to segment overflow"); } }); } From 9ad125d25e281c3f7ac5bd89bf6ea3f263ce14d3 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sat, 31 Jan 2026 15:37:27 +0800 Subject: [PATCH 3/4] spotless apply --- .../kafka/storage/internals/log/CleanerIntegrationTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/CleanerIntegrationTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/CleanerIntegrationTest.java index ea9143b3c5f16..de7a928e1746c 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/CleanerIntegrationTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/CleanerIntegrationTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.storage.internals.log; -import com.yammer.metrics.core.Gauge; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -27,6 +26,8 @@ import org.apache.kafka.common.test.api.Type; import org.apache.kafka.server.metrics.KafkaYammerMetrics; +import com.yammer.metrics.core.Gauge; + import java.util.List; import java.util.Map; import java.util.Random; From 9b95afed8fa3da91c0f024c9ac4dffa287298a18 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sun, 1 Feb 2026 09:00:10 +0800 Subject: [PATCH 4/4] addressed by comments --- .../kafka/storage/internals/log/Cleaner.java | 25 ++++++++----------- .../log/SegmentOverflowException.java | 2 -- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java index b5c9c701b0b79..ceb98b8a15bc0 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java @@ -175,9 +175,8 @@ public Map.Entry doClean(LogToClean cleanable, long currentT log.name(), new Date(cleanableHorizonMs), new Date(legacyDeleteHorizonMs)); CleanedTransactionMetadata transactionMetadata = new CleanedTransactionMetadata(); - double sizeRatio = 1.0; - if (segmentOverflowPartitions.containsKey(log.topicPartition())) { - sizeRatio = segmentOverflowPartitions.get(log.topicPartition()); + double sizeRatio = segmentOverflowPartitions.getOrDefault(log.topicPartition(), 1.0); + if (sizeRatio != 1.0) { logger.info("Partition {} has overflow history. " + "Reducing effective segment size to {}% for this round.", log.topicPartition(), sizeRatio * 100); } @@ -195,8 +194,7 @@ public Map.Entry doClean(LogToClean cleanable, long currentT cleanSegments(log, group, offsetMap, currentTime, stats, transactionMetadata, legacyDeleteHorizonMs, upperBoundOffset); } - if (segmentOverflowPartitions.containsKey(log.topicPartition())) { - segmentOverflowPartitions.remove(log.topicPartition()); + if (segmentOverflowPartitions.remove(log.topicPartition()) != null) { logger.info("Successfully cleaned log {} with degraded size (ratio: {}%). " + "Cleared overflow marker. Next cleaning will use normal size.", log.name(), sizeRatio * 100); @@ -277,17 +275,16 @@ public void cleanSegments(UnifiedLog log, currentTime ); } catch (SegmentOverflowException e) { - if (segmentOverflowPartitions.containsKey(log.topicPartition())) { - Double segmentRatio = segmentOverflowPartitions.get(log.topicPartition()); - segmentOverflowPartitions.put(log.topicPartition(), segmentRatio * 0.9); - logger.warn("Repeated segment overflow for partition {}: {}. " + - "Further degrading to {}% size in next cleaning round.", - log.topicPartition(), e.getMessage(), segmentRatio * 100); - } else { - segmentOverflowPartitions.put(log.topicPartition(), 0.9); + var previousRatio = segmentOverflowPartitions.put(log.topicPartition(), + segmentOverflowPartitions.getOrDefault(log.topicPartition(), 1.0) * 0.9); + if (previousRatio == null) { logger.warn("Segment overflow detected for partition {}: {}. " + "Marked for degradation to 90% size in next cleaning round.", log.topicPartition(), e.getMessage()); + } else { + logger.warn("Repeated segment overflow for partition {}: {}. " + + "Further degrading to {}% size in next cleaning round.", + log.topicPartition(), e.getMessage(), previousRatio * 0.9 * 100); } throw new LogCleaningAbortedException(); } catch (LogSegmentOffsetOverflowException e) { @@ -440,12 +437,12 @@ public boolean shouldRetainRecord(RecordBatch batch, Record record) { // it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads // after `Log.replaceSegments` (which acquires the lock) is called dest.append(result.maxOffset(), retained); - throttler.maybeThrottle(outputBuffer.limit()); } catch (IllegalArgumentException e) { // this indicates that we have an offset overflow in the destination segment throw new SegmentOverflowException(dest); } } + throttler.maybeThrottle(outputBuffer.limit()); // if we read bytes but didn't get even one complete batch, our I/O buffer is too small, grow it and try again // `result.bytesRead` contains bytes from `messagesRead` and any discarded batches. diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentOverflowException.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentOverflowException.java index a999146757818..dd64df02ec851 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentOverflowException.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/SegmentOverflowException.java @@ -22,10 +22,8 @@ * Exception thrown when segment size would overflow during compaction */ public class SegmentOverflowException extends KafkaException { - public final LogSegment segment; public SegmentOverflowException(LogSegment segment) { super("Segment size would overflow during compaction for segment " + segment); - this.segment = segment; } }