Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,12 @@ public void tick() {
}
}

// KAFKA-17676: Check for and auto-regenerate configs for inconsistent connectors
// This handles the case where task configs were lost due to topic compaction
if (isLeader()) {
processInconsistentConnectors();
}

// Let the group take any actions it needs to
try {
long nextRequestTimeoutMs = scheduledTick != null ? Math.max(scheduledTick - time.milliseconds(), 0L) : Long.MAX_VALUE;
Expand Down Expand Up @@ -708,6 +714,33 @@ private synchronized boolean updateConfigsWithIncrementalCooperative(AtomicRefer
return retValue;
}

/**
* KAFKA-17676: Check for connectors with inconsistent task configurations
* (due to topic compaction or partial writes) and automatically trigger reconfiguration.
* This is called by the leader on each tick to ensure inconsistent connectors are recovered.
*/
private void processInconsistentConnectors() {
Set<String> inconsistent = configState.inconsistentConnectors();
if (inconsistent.isEmpty()) {
return;
}

for (String connectorName : inconsistent) {
// Only reconfigure if the connector config exists and connector is supposed to be started
if (configState.connectorConfig(connectorName) != null
&& configState.targetState(connectorName) == TargetState.STARTED) {
log.warn("Connector '{}' has inconsistent task configurations (KAFKA-17676). " +
"Triggering automatic reconfiguration to regenerate task configs.", connectorName);
try {
reconfigureConnectorTasksWithRetry(time.milliseconds(), connectorName);
} catch (Exception e) {
log.error("Failed to trigger reconfiguration for inconsistent connector '{}'. " +
"Manual intervention may be required.", connectorName, e);
}
}
}
}

private void processConnectorConfigUpdates(Set<String> connectorConfigUpdates) {
// If we only have connector config updates, we can just bounce the updated connectors that are
// currently assigned to this worker.
Expand Down Expand Up @@ -2001,27 +2034,48 @@ private static <T> Collection<T> assignmentDifference(Collection<T> update, Coll

private boolean startTask(ConnectorTaskId taskId) {
log.info("Starting task {}", taskId);
Map<String, String> connProps = configState.connectorConfig(taskId.connector());
String connectorName = taskId.connector();
Map<String, String> connProps = configState.connectorConfig(connectorName);

// KAFKA-17676: Check if task config exists before trying to start task
// Task configs can be lost due to topic compaction
Map<String, String> taskConfig = configState.taskConfig(taskId);
if (taskConfig == null) {
if (worker.isRunning(connectorName)) {
log.warn("Task configuration for {} is missing (possibly due to config topic compaction). "
+ "Requesting reconfiguration to regenerate task configs.", taskId);
requestTaskReconfiguration(connectorName);
throw new ConnectException("Task configuration for " + taskId + " is missing. "
+ "Reconfiguration requested - task will be retried.");
} else {
// Connector not running - task will fail and be retried when connector starts
log.warn("Task configuration for {} is missing and connector {} is not running. "
+ "Task will be retried when connector starts.", taskId, connectorName);
throw new ConnectException("Task configuration for " + taskId + " is missing. "
+ "Waiting for connector " + connectorName + " to start.");
}
}

switch (connectorType(connProps)) {
case SINK:
return worker.startSinkTask(
taskId,
configState,
connProps,
configState.taskConfig(taskId),
taskConfig,
this,
configState.targetState(taskId.connector())
configState.targetState(connectorName)
);
case SOURCE:
if (config.exactlyOnceSourceEnabled()) {
int taskGeneration = configState.taskConfigGeneration(taskId.connector());
int taskGeneration = configState.taskConfigGeneration(connectorName);
return worker.startExactlyOnceSourceTask(
taskId,
configState,
connProps,
configState.taskConfig(taskId),
taskConfig,
this,
configState.targetState(taskId.connector()),
configState.targetState(connectorName),
() -> {
FutureCallback<Void> preflightFencing = new FutureCallback<>();
fenceZombieSourceTasks(taskId, preflightFencing);
Expand All @@ -2041,9 +2095,9 @@ private boolean startTask(ConnectorTaskId taskId) {
taskId,
configState,
connProps,
configState.taskConfig(taskId),
taskConfig,
this,
configState.targetState(taskId.connector())
configState.targetState(connectorName)
);
}
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,10 +544,26 @@ public void removeConnectorConfig(String connector) {
log.debug("Removing connector configuration for connector '{}'", connector);
try {
Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
List<ProducerKeyValue> keyValues = List.of(
new ProducerKeyValue(CONNECTOR_KEY(connector), null),
new ProducerKeyValue(TARGET_STATE_KEY(connector), null)
);
List<ProducerKeyValue> keyValues = new ArrayList<>();

// Tombstone connector config and target state
keyValues.add(new ProducerKeyValue(CONNECTOR_KEY(connector), null));
keyValues.add(new ProducerKeyValue(TARGET_STATE_KEY(connector), null));

// KAFKA-17676: Also tombstone all task configs to prevent orphaned records
// This ensures clean deletion and prevents issues when connector is recreated
Integer taskCount = connectorTaskCounts.get(connector);
if (taskCount != null) {
for (int i = 0; i < taskCount; i++) {
ConnectorTaskId taskId = new ConnectorTaskId(connector, i);
log.debug("Tombstoning task configuration for task '{}'", taskId);
keyValues.add(new ProducerKeyValue(TASK_KEY(taskId), null));
}
}

// Also tombstone the commit record to clean up completely
keyValues.add(new ProducerKeyValue(COMMIT_TASKS_KEY(connector), null));

sendPrivileged(keyValues, timer);

configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
Expand Down