diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 6c4bed311d36e..ab7424f7b8b8a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -557,6 +557,12 @@ public void tick() { } } + // KAFKA-17676: Check for and auto-regenerate configs for inconsistent connectors + // This handles the case where task configs were lost due to topic compaction + if (isLeader()) { + processInconsistentConnectors(); + } + // Let the group take any actions it needs to try { long nextRequestTimeoutMs = scheduledTick != null ? Math.max(scheduledTick - time.milliseconds(), 0L) : Long.MAX_VALUE; @@ -708,6 +714,33 @@ private synchronized boolean updateConfigsWithIncrementalCooperative(AtomicRefer return retValue; } + /** + * KAFKA-17676: Check for connectors with inconsistent task configurations + * (due to topic compaction or partial writes) and automatically trigger reconfiguration. + * This is called by the leader on each tick to ensure inconsistent connectors are recovered. + */ + private void processInconsistentConnectors() { + Set inconsistent = configState.inconsistentConnectors(); + if (inconsistent.isEmpty()) { + return; + } + + for (String connectorName : inconsistent) { + // Only reconfigure if the connector config exists and connector is supposed to be started + if (configState.connectorConfig(connectorName) != null + && configState.targetState(connectorName) == TargetState.STARTED) { + log.warn("Connector '{}' has inconsistent task configurations (KAFKA-17676). " + + "Triggering automatic reconfiguration to regenerate task configs.", connectorName); + try { + reconfigureConnectorTasksWithRetry(time.milliseconds(), connectorName); + } catch (Exception e) { + log.error("Failed to trigger reconfiguration for inconsistent connector '{}'. " + + "Manual intervention may be required.", connectorName, e); + } + } + } + } + private void processConnectorConfigUpdates(Set connectorConfigUpdates) { // If we only have connector config updates, we can just bounce the updated connectors that are // currently assigned to this worker. @@ -2001,27 +2034,48 @@ private static Collection assignmentDifference(Collection update, Coll private boolean startTask(ConnectorTaskId taskId) { log.info("Starting task {}", taskId); - Map connProps = configState.connectorConfig(taskId.connector()); + String connectorName = taskId.connector(); + Map connProps = configState.connectorConfig(connectorName); + + // KAFKA-17676: Check if task config exists before trying to start task + // Task configs can be lost due to topic compaction + Map taskConfig = configState.taskConfig(taskId); + if (taskConfig == null) { + if (worker.isRunning(connectorName)) { + log.warn("Task configuration for {} is missing (possibly due to config topic compaction). " + + "Requesting reconfiguration to regenerate task configs.", taskId); + requestTaskReconfiguration(connectorName); + throw new ConnectException("Task configuration for " + taskId + " is missing. " + + "Reconfiguration requested - task will be retried."); + } else { + // Connector not running - task will fail and be retried when connector starts + log.warn("Task configuration for {} is missing and connector {} is not running. " + + "Task will be retried when connector starts.", taskId, connectorName); + throw new ConnectException("Task configuration for " + taskId + " is missing. " + + "Waiting for connector " + connectorName + " to start."); + } + } + switch (connectorType(connProps)) { case SINK: return worker.startSinkTask( taskId, configState, connProps, - configState.taskConfig(taskId), + taskConfig, this, - configState.targetState(taskId.connector()) + configState.targetState(connectorName) ); case SOURCE: if (config.exactlyOnceSourceEnabled()) { - int taskGeneration = configState.taskConfigGeneration(taskId.connector()); + int taskGeneration = configState.taskConfigGeneration(connectorName); return worker.startExactlyOnceSourceTask( taskId, configState, connProps, - configState.taskConfig(taskId), + taskConfig, this, - configState.targetState(taskId.connector()), + configState.targetState(connectorName), () -> { FutureCallback preflightFencing = new FutureCallback<>(); fenceZombieSourceTasks(taskId, preflightFencing); @@ -2041,9 +2095,9 @@ private boolean startTask(ConnectorTaskId taskId) { taskId, configState, connProps, - configState.taskConfig(taskId), + taskConfig, this, - configState.targetState(taskId.connector()) + configState.targetState(connectorName) ); } default: diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 0e425301c1111..cffbb2b2de60e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -544,10 +544,26 @@ public void removeConnectorConfig(String connector) { log.debug("Removing connector configuration for connector '{}'", connector); try { Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS); - List keyValues = List.of( - new ProducerKeyValue(CONNECTOR_KEY(connector), null), - new ProducerKeyValue(TARGET_STATE_KEY(connector), null) - ); + List keyValues = new ArrayList<>(); + + // Tombstone connector config and target state + keyValues.add(new ProducerKeyValue(CONNECTOR_KEY(connector), null)); + keyValues.add(new ProducerKeyValue(TARGET_STATE_KEY(connector), null)); + + // KAFKA-17676: Also tombstone all task configs to prevent orphaned records + // This ensures clean deletion and prevents issues when connector is recreated + Integer taskCount = connectorTaskCounts.get(connector); + if (taskCount != null) { + for (int i = 0; i < taskCount; i++) { + ConnectorTaskId taskId = new ConnectorTaskId(connector, i); + log.debug("Tombstoning task configuration for task '{}'", taskId); + keyValues.add(new ProducerKeyValue(TASK_KEY(taskId), null)); + } + } + + // Also tombstone the commit record to clean up completely + keyValues.add(new ProducerKeyValue(COMMIT_TASKS_KEY(connector), null)); + sendPrivileged(keyValues, timer); configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);