From 8a4e1d3d03d8194b6f95cb95280c93e4835a11b1 Mon Sep 17 00:00:00 2001 From: Ayala Markowits Date: Tue, 3 Feb 2026 11:53:09 +0200 Subject: [PATCH] KAFKA-17676: Fix task config loss due to topic compaction with auto-recovery This commit adds three fixes for the NPE/task failure issues caused when task configs are lost due to connect-configs topic compaction: Fix 1: Leader periodic check for inconsistent connectors (processInconsistentConnectors) - Leader checks configState.inconsistentConnectors() on each tick - Automatically triggers reconfigureConnectorTasksWithRetry() for any connector with incomplete task configs - This proactively recovers connectors before tasks fail to start Fix 2: Auto-recovery in startTask() when task config is missing - Checks if taskConfig is null before attempting to start task - If connector is running, calls requestTaskReconfiguration() to regenerate configs - Throws ConnectException so task will be retried after configs are regenerated - This provides fallback recovery if periodic check misses the issue Fix 3: Proper cleanup in removeConnectorConfig() - When connector is deleted, also tombstone all task configs and commit record - Prevents orphaned task configs that could cause issues when connector is recreated - Original code only tombstoned connector config and target state These fixes ensure Kafka Connect can automatically recover from task config loss due to topic compaction without manual intervention. --- .../distributed/DistributedHerder.java | 70 ++++++++++++++++--- .../storage/KafkaConfigBackingStore.java | 24 +++++-- 2 files changed, 82 insertions(+), 12 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 6c4bed311d36e..ab7424f7b8b8a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -557,6 +557,12 @@ public void tick() { } } + // KAFKA-17676: Check for and auto-regenerate configs for inconsistent connectors + // This handles the case where task configs were lost due to topic compaction + if (isLeader()) { + processInconsistentConnectors(); + } + // Let the group take any actions it needs to try { long nextRequestTimeoutMs = scheduledTick != null ? Math.max(scheduledTick - time.milliseconds(), 0L) : Long.MAX_VALUE; @@ -708,6 +714,33 @@ private synchronized boolean updateConfigsWithIncrementalCooperative(AtomicRefer return retValue; } + /** + * KAFKA-17676: Check for connectors with inconsistent task configurations + * (due to topic compaction or partial writes) and automatically trigger reconfiguration. + * This is called by the leader on each tick to ensure inconsistent connectors are recovered. + */ + private void processInconsistentConnectors() { + Set inconsistent = configState.inconsistentConnectors(); + if (inconsistent.isEmpty()) { + return; + } + + for (String connectorName : inconsistent) { + // Only reconfigure if the connector config exists and connector is supposed to be started + if (configState.connectorConfig(connectorName) != null + && configState.targetState(connectorName) == TargetState.STARTED) { + log.warn("Connector '{}' has inconsistent task configurations (KAFKA-17676). " + + "Triggering automatic reconfiguration to regenerate task configs.", connectorName); + try { + reconfigureConnectorTasksWithRetry(time.milliseconds(), connectorName); + } catch (Exception e) { + log.error("Failed to trigger reconfiguration for inconsistent connector '{}'. " + + "Manual intervention may be required.", connectorName, e); + } + } + } + } + private void processConnectorConfigUpdates(Set connectorConfigUpdates) { // If we only have connector config updates, we can just bounce the updated connectors that are // currently assigned to this worker. @@ -2001,27 +2034,48 @@ private static Collection assignmentDifference(Collection update, Coll private boolean startTask(ConnectorTaskId taskId) { log.info("Starting task {}", taskId); - Map connProps = configState.connectorConfig(taskId.connector()); + String connectorName = taskId.connector(); + Map connProps = configState.connectorConfig(connectorName); + + // KAFKA-17676: Check if task config exists before trying to start task + // Task configs can be lost due to topic compaction + Map taskConfig = configState.taskConfig(taskId); + if (taskConfig == null) { + if (worker.isRunning(connectorName)) { + log.warn("Task configuration for {} is missing (possibly due to config topic compaction). " + + "Requesting reconfiguration to regenerate task configs.", taskId); + requestTaskReconfiguration(connectorName); + throw new ConnectException("Task configuration for " + taskId + " is missing. " + + "Reconfiguration requested - task will be retried."); + } else { + // Connector not running - task will fail and be retried when connector starts + log.warn("Task configuration for {} is missing and connector {} is not running. " + + "Task will be retried when connector starts.", taskId, connectorName); + throw new ConnectException("Task configuration for " + taskId + " is missing. " + + "Waiting for connector " + connectorName + " to start."); + } + } + switch (connectorType(connProps)) { case SINK: return worker.startSinkTask( taskId, configState, connProps, - configState.taskConfig(taskId), + taskConfig, this, - configState.targetState(taskId.connector()) + configState.targetState(connectorName) ); case SOURCE: if (config.exactlyOnceSourceEnabled()) { - int taskGeneration = configState.taskConfigGeneration(taskId.connector()); + int taskGeneration = configState.taskConfigGeneration(connectorName); return worker.startExactlyOnceSourceTask( taskId, configState, connProps, - configState.taskConfig(taskId), + taskConfig, this, - configState.targetState(taskId.connector()), + configState.targetState(connectorName), () -> { FutureCallback preflightFencing = new FutureCallback<>(); fenceZombieSourceTasks(taskId, preflightFencing); @@ -2041,9 +2095,9 @@ private boolean startTask(ConnectorTaskId taskId) { taskId, configState, connProps, - configState.taskConfig(taskId), + taskConfig, this, - configState.targetState(taskId.connector()) + configState.targetState(connectorName) ); } default: diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 0e425301c1111..cffbb2b2de60e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -544,10 +544,26 @@ public void removeConnectorConfig(String connector) { log.debug("Removing connector configuration for connector '{}'", connector); try { Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS); - List keyValues = List.of( - new ProducerKeyValue(CONNECTOR_KEY(connector), null), - new ProducerKeyValue(TARGET_STATE_KEY(connector), null) - ); + List keyValues = new ArrayList<>(); + + // Tombstone connector config and target state + keyValues.add(new ProducerKeyValue(CONNECTOR_KEY(connector), null)); + keyValues.add(new ProducerKeyValue(TARGET_STATE_KEY(connector), null)); + + // KAFKA-17676: Also tombstone all task configs to prevent orphaned records + // This ensures clean deletion and prevents issues when connector is recreated + Integer taskCount = connectorTaskCounts.get(connector); + if (taskCount != null) { + for (int i = 0; i < taskCount; i++) { + ConnectorTaskId taskId = new ConnectorTaskId(connector, i); + log.debug("Tombstoning task configuration for task '{}'", taskId); + keyValues.add(new ProducerKeyValue(TASK_KEY(taskId), null)); + } + } + + // Also tombstone the commit record to clean up completely + keyValues.add(new ProducerKeyValue(COMMIT_TASKS_KEY(connector), null)); + sendPrivileged(keyValues, timer); configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);