From 79d07901840403cca44a16f8236cf98e1aad894a Mon Sep 17 00:00:00 2001 From: Anupam Alok Date: Wed, 4 Feb 2026 19:07:03 +0530 Subject: [PATCH] Preserve user-provided acks config without mutation --- .../clients/producer/ProducerConfig.java | 35 ++++++++++++++----- .../clients/producer/ProducerConfigTest.java | 21 +++++++++++ 2 files changed, 47 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 313648497bab1..eccc6f5b73f96 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -590,12 +590,13 @@ private void maybeOverrideClientId(final Map configs) { private void postProcessAndValidateIdempotenceConfigs(final Map configs) { final Map originalConfigs = this.originals(); - final String acksStr = parseAcks(this.getString(ACKS_CONFIG)); - configs.put(ACKS_CONFIG, acksStr); + String acksConfig = this.getString(ACKS_CONFIG); + final Short acks = parseAcks(acksConfig); + configs.put(ACKS_CONFIG, Short.toString(acks)); final boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG); boolean idempotenceEnabled = this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG); boolean shouldDisableIdempotence = false; - + // For idempotence producers, values for `retries` and `acks` and `max.in.flight.requests.per.connection` need validation if (idempotenceEnabled) { final int retries = this.getInt(RETRIES_CONFIG); @@ -607,7 +608,7 @@ private void postProcessAndValidateIdempotenceConfigs(final Map shouldDisableIdempotence = true; } - final short acks = Short.parseShort(acksStr); + if (acks != (short) -1) { if (userConfiguredIdempotence) { throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " + @@ -648,16 +649,32 @@ private void postProcessAndValidateIdempotenceConfigs(final Map " is set to true. Transactions will not expire with two-phase commit enabled." ); } + log.info("Producer configs after post-processing: {}", configs); + } - private static String parseAcks(String acksString) { - try { - return acksString.trim().equalsIgnoreCase("all") ? "-1" : Short.parseShort(acksString.trim()) + ""; - } catch (NumberFormatException e) { - throw new ConfigException("Invalid configuration value for 'acks': " + acksString); + private static short parseAcks(String acksString) { + if (acksString == null) { + throw new ConfigException("acks must be set"); + } + + String value = acksString.trim(); + + if (value.equalsIgnoreCase("all") || value.equals("-1")) { + return (short) -1; } + + if (value.equals("0") || value.equals("1")) { + return Short.parseShort(value); + } + + throw new ConfigException( + "Invalid value for 'acks': " + acksString + + ". Valid values are '0', '1', '-1', or 'all'." + ); } + static Map appendSerializerToConfig(Map configs, Serializer keySerializer, Serializer valueSerializer) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java index 5fd9ab727e046..6446f71374216 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java @@ -202,4 +202,25 @@ public void testValidateConfigPropertiesFile() { } } } + /** + * Verifies that KafkaProducer does not mutate the user-provided configurationwhen normalizing the acks setting + */ + @Test + public void shouldNotMutateUserProvidedAcksConfig() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + + assertEquals("all", props.get(ProducerConfig.ACKS_CONFIG)); + + assertDoesNotThrow(() -> new KafkaProducer<>(props)); + + assertEquals( + "all", + props.get(ProducerConfig.ACKS_CONFIG), + "KafkaProducer must not rewrite user-provided acks config" + ); + } }