From c51340570880f783a31fbbfd91578fe6f6db7c9d Mon Sep 17 00:00:00 2001 From: TengYao Chi Date: Wed, 4 Feb 2026 13:44:40 +0000 Subject: [PATCH 1/3] KAFKA-20120: Create HeadersSerializer and HeadersDeserializer --- .../state/internals/HeadersDeserializer.java | 87 ++++++++++ .../state/internals/HeadersSerializer.java | 85 ++++++++++ .../internals/HeadersDeserializerTest.java | 127 ++++++++++++++ .../internals/HeadersSerializerTest.java | 159 ++++++++++++++++++ 4 files changed, 458 insertions(+) create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java new file mode 100644 index 0000000000000..9d1de4696ad6b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.utils.ByteUtils; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +/** + * Deserializer for Kafka Headers. + * + * Deserialization format (per KIP-1271): + * [NumHeaders(varint)][Header1][Header2]... + * + * Each header: + * [KeyLength(varint)][KeyBytes(UTF-8)][ValueLength(varint)][ValueBytes] + * + * Note: ValueLength is -1 for null values (encoded as varint). + * All integers are decoded from varints (signed varint encoding). + * + * This deserializer expects the headersBytes portion without a size prefix. + * The size prefix is handled by the outer deserializer (e.g., ValueTimestampHeadersDeserializer). + * + * This is used by KIP-1271 to deserialize headers from state stores. + */ +public class HeadersDeserializer { + + /** + * Deserializes headers from a byte array using varint encoding per KIP-1271. + *

+ * The input format is [count][header1][header2]... without a size prefix. + * + * @param data the serialized byte array (can be null) + * @return the deserialized headers + */ + public Headers deserialize(final byte[] data) { + if (data == null || data.length == 0) { + return new RecordHeaders(); + } + + final ByteBuffer buffer = ByteBuffer.wrap(data); + final int headerCount = ByteUtils.readVarint(buffer); + + if (headerCount == 0) { + return new RecordHeaders(); + } + + final RecordHeaders headers = new RecordHeaders(); + + for (int i = 0; i < headerCount; i++) { + final int keyLength = ByteUtils.readVarint(buffer); + final byte[] keyBytes = new byte[keyLength]; + buffer.get(keyBytes); + final String key = new String(keyBytes, StandardCharsets.UTF_8); + + final int valueLength = ByteUtils.readVarint(buffer); + final byte[] value; + if (valueLength == -1) { + value = null; + } else { + value = new byte[valueLength]; + buffer.get(value); + } + + headers.add(key, value); + } + + return headers; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java new file mode 100644 index 0000000000000..543f953745e5e --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.utils.ByteUtils; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +/** + * Serializer for Kafka Headers. + *

+ * Serialization format (per KIP-1271): + * [NumHeaders(varint)][Header1][Header2]... + *

+ * Each header: + * [KeyLength(varint)][KeyBytes(UTF-8)][ValueLength(varint)][ValueBytes] + *

+ * Note: ValueLength is -1 for null values (encoded as varint). + * All integers are encoded as varints (signed varint encoding). + *

+ * This serializer produces the headersBytes portion. The headersSize prefix + * is added by the outer serializer (e.g., ValueTimestampHeadersSerializer). + *

+ * This is used by KIP-1271 to serialize headers for storage in state stores. + */ +public class HeadersSerializer { + + /** + * Serializes headers into a byte array using varint encoding per KIP-1271. + *

+ * The output format is [count][header1][header2]... without a size prefix. + * The size prefix is added by the outer serializer that uses this. + * + * @param headers the headers to serialize (can be null) + * @return the serialized byte array + */ + public byte[] serialize(final Headers headers) { + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(baos)) { + + final Header[] headerArray = (headers == null) ? new Header[0] : headers.toArray(); + ByteUtils.writeVarint(headerArray.length, out); + + for (final Header header : headerArray) { + final byte[] keyBytes = header.key().getBytes(StandardCharsets.UTF_8); + final byte[] valueBytes = header.value(); + + ByteUtils.writeVarint(keyBytes.length, out); + out.write(keyBytes); + + // Write value length and value bytes (varint + raw bytes) + // null is represented as -1, encoded as varint + if (valueBytes == null) { + ByteUtils.writeVarint(-1, out); + } else { + ByteUtils.writeVarint(valueBytes.length, out); + out.write(valueBytes); + } + } + + return baos.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize headers", e); + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java new file mode 100644 index 0000000000000..8762a77a30b6e --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class HeadersDeserializerTest { + + private final HeadersSerializer serializer = new HeadersSerializer(); + private final HeadersDeserializer deserializer = new HeadersDeserializer(); + + @Test + public void shouldDeserializeNullData() { + final Headers headers = deserializer.deserialize(null); + + assertNotNull(headers); + assertEquals(0, headers.toArray().length); + } + + @Test + public void shouldDeserializeEmptyData() { + final Headers headers = deserializer.deserialize(new byte[0]); + + assertNotNull(headers); + assertEquals(0, headers.toArray().length); + } + + @Test + public void shouldRoundTripEmptyHeaders() { + final Headers original = new RecordHeaders(); + final byte[] serialized = serializer.serialize(original); + final Headers deserialized = deserializer.deserialize(serialized); + + assertNotNull(deserialized); + assertEquals(0, deserialized.toArray().length); + } + + @Test + public void shouldRoundTripSingleHeader() { + final Headers original = new RecordHeaders() + .add("key1", "value1".getBytes()); + final byte[] serialized = serializer.serialize(original); + final Headers deserialized = deserializer.deserialize(serialized); + + assertNotNull(deserialized); + assertEquals(1, deserialized.toArray().length); + + final Header header = deserialized.lastHeader("key1"); + assertNotNull(header); + assertEquals("key1", header.key()); + assertArrayEquals("value1".getBytes(), header.value()); + } + + @Test + public void shouldRoundTripMultipleHeaders() { + final Headers original = new RecordHeaders() + .add("key0", "value0".getBytes()) + .add("key1", "value1".getBytes()) + .add("key2", "value2".getBytes()); + final byte[] serialized = serializer.serialize(original); + final Headers deserialized = deserializer.deserialize(serialized); + assertNotNull(deserialized); + + final Header[] headerArray = deserialized.toArray(); + assertEquals(3, headerArray.length); + for (int i = 0; i < headerArray.length; i++) { + Header next = headerArray[i]; + assertEquals("key" + i, next.key()); + assertArrayEquals(("value" + i).getBytes(), next.value()); + } + } + + @Test + public void shouldRoundTripHeaderWithNullValue() { + final Headers original = new RecordHeaders() + .add("key1", null); + final byte[] serialized = serializer.serialize(original); + final Headers deserialized = deserializer.deserialize(serialized); + + assertNotNull(deserialized); + assertEquals(1, deserialized.toArray().length); + + final Header header = deserialized.lastHeader("key1"); + assertNotNull(header); + assertEquals("key1", header.key()); + assertNull(header.value()); + } + + @Test + public void shouldRoundTripHeaderWithEmptyValue() { + final Headers original = new RecordHeaders() + .add("key1", new byte[0]); + final byte[] serialized = serializer.serialize(original); + final Headers deserialized = deserializer.deserialize(serialized); + + assertNotNull(deserialized); + assertEquals(1, deserialized.toArray().length); + + final Header header = deserialized.lastHeader("key1"); + assertNotNull(header); + assertEquals("key1", header.key()); + assertArrayEquals(new byte[0], header.value()); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java new file mode 100644 index 0000000000000..dd8e130fb6d1e --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class HeadersSerializerTest { + + private final HeadersSerializer serializer = new HeadersSerializer(); + private final HeadersDeserializer deserializer = new HeadersDeserializer(); + + @Test + public void shouldSerializeNullHeaders() { + final byte[] serialized = serializer.serialize(null); + + assertNotNull(serialized); + assertEquals(1, serialized.length, "Null headers should have 1 byte to indicate headers count is 0"); + assertEquals(0, serialized[0], "The byte should be 0 (varint encoding of 0)"); + } + + @Test + public void shouldSerializeEmptyHeaders() { + final Headers headers = new RecordHeaders(); + final byte[] serialized = serializer.serialize(headers); + + assertNotNull(serialized); + assertEquals(1, serialized.length, "Empty headers should have 1 byte to indicate headers count is 0"); + assertEquals(0, serialized[0], "The byte should be 0 (varint encoding of 0)"); + } + + @Test + public void shouldSerializeSingleHeader() { + final Headers headers = new RecordHeaders() + .add("key1", "value1".getBytes()); + final byte[] serialized = serializer.serialize(headers); + + assertNotNull(serialized); + assertTrue(serialized.length > 0); + + final Headers deserialized = deserializer.deserialize(serialized); + assertNotNull(deserialized); + assertEquals(1, deserialized.toArray().length); + + final Header header = deserialized.lastHeader("key1"); + assertNotNull(header); + assertEquals("key1", header.key()); + assertArrayEquals("value1".getBytes(), header.value()); + } + + @Test + public void shouldSerializeMultipleHeaders() { + final Headers headers = new RecordHeaders() + .add("key0", "value0".getBytes()) + .add("key1", "value1".getBytes()) + .add("key2", "value2".getBytes()); + final byte[] serialized = serializer.serialize(headers); + + assertNotNull(serialized); + assertTrue(serialized.length > 0); + + final Headers deserialized = deserializer.deserialize(serialized); + assertNotNull(deserialized); + assertEquals(3, deserialized.toArray().length); + + final Header[] headerArray = deserialized.toArray(); + for (int i = 0; i < headerArray.length; i++) { + final Header header = headerArray[i]; + assertEquals("key" + i, header.key()); + assertArrayEquals(("value" + i).getBytes(), header.value()); + } + } + + @Test + public void shouldSerializeHeaderWithNullValue() { + final Headers headers = new RecordHeaders() + .add("key1", null); + final byte[] serialized = serializer.serialize(headers); + + assertNotNull(serialized); + assertTrue(serialized.length > 0); + + final Headers deserialized = deserializer.deserialize(serialized); + assertNotNull(deserialized); + assertEquals(1, deserialized.toArray().length); + + final Header header = deserialized.lastHeader("key1"); + assertNotNull(header); + assertEquals("key1", header.key()); + assertNull(header.value()); + } + + @Test + public void shouldSerializeHeadersWithEmptyValue() { + final Headers headers = new RecordHeaders() + .add("key1", new byte[0]); + final byte[] serialized = serializer.serialize(headers); + + assertNotNull(serialized); + assertTrue(serialized.length > 0); + + final Headers deserialized = deserializer.deserialize(serialized); + assertNotNull(deserialized); + assertEquals(1, deserialized.toArray().length); + + final Header header = deserialized.lastHeader("key1"); + assertNotNull(header); + assertEquals("key1", header.key()); + assertArrayEquals(new byte[0], header.value()); + } + + @Test + public void shouldSerializeHeadersWithSpecialCharacters() { + final Headers headers = new RecordHeaders() + .add("key-with-dash", "value".getBytes()) + .add("key.with.dots", "value".getBytes()) + .add("key_with_underscores", "value".getBytes()); + final byte[] serialized = serializer.serialize(headers); + + assertNotNull(serialized); + assertTrue(serialized.length > 0); + + final Headers deserialized = deserializer.deserialize(serialized); + assertNotNull(deserialized); + assertEquals(3, deserialized.toArray().length); + + assertNotNull(deserialized.lastHeader("key-with-dash")); + assertArrayEquals("value".getBytes(), deserialized.lastHeader("key-with-dash").value()); + + assertNotNull(deserialized.lastHeader("key.with.dots")); + assertArrayEquals("value".getBytes(), deserialized.lastHeader("key.with.dots").value()); + + assertNotNull(deserialized.lastHeader("key_with_underscores")); + assertArrayEquals("value".getBytes(), deserialized.lastHeader("key_with_underscores").value()); + } +} From 5da224cde42d68233472ca5009f645af07c079d5 Mon Sep 17 00:00:00 2001 From: TengYao Chi Date: Wed, 4 Feb 2026 22:18:03 +0000 Subject: [PATCH 2/3] implement interfaces --- .../state/internals/HeadersDeserializer.java | 6 +++-- .../state/internals/HeadersSerializer.java | 7 ++++-- .../internals/HeadersDeserializerTest.java | 24 +++++++++---------- .../internals/HeadersSerializerTest.java | 24 +++++++++---------- 4 files changed, 33 insertions(+), 28 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java index 9d1de4696ad6b..3e9622cbeb682 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.ByteUtils; import java.nio.ByteBuffer; @@ -40,17 +41,18 @@ * * This is used by KIP-1271 to deserialize headers from state stores. */ -public class HeadersDeserializer { +public class HeadersDeserializer implements Deserializer { /** * Deserializes headers from a byte array using varint encoding per KIP-1271. *

* The input format is [count][header1][header2]... without a size prefix. * + * @param topic topic associated with the data * @param data the serialized byte array (can be null) * @return the deserialized headers */ - public Headers deserialize(final byte[] data) { + public Headers deserialize(final String topic, final byte[] data) { if (data == null || data.length == 0) { return new RecordHeaders(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java index 543f953745e5e..94772ce700c26 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.ByteUtils; import java.io.ByteArrayOutputStream; @@ -42,7 +43,7 @@ *

* This is used by KIP-1271 to serialize headers for storage in state stores. */ -public class HeadersSerializer { +public class HeadersSerializer implements Serializer { /** * Serializes headers into a byte array using varint encoding per KIP-1271. @@ -50,10 +51,12 @@ public class HeadersSerializer { * The output format is [count][header1][header2]... without a size prefix. * The size prefix is added by the outer serializer that uses this. * + * @param topic topic associated with data * @param headers the headers to serialize (can be null) * @return the serialized byte array */ - public byte[] serialize(final Headers headers) { + @Override + public byte[] serialize(final String topic, final Headers headers) { try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final DataOutputStream out = new DataOutputStream(baos)) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java index 8762a77a30b6e..2a36d34c56449 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java @@ -34,7 +34,7 @@ public class HeadersDeserializerTest { @Test public void shouldDeserializeNullData() { - final Headers headers = deserializer.deserialize(null); + final Headers headers = deserializer.deserialize("", null); assertNotNull(headers); assertEquals(0, headers.toArray().length); @@ -42,7 +42,7 @@ public void shouldDeserializeNullData() { @Test public void shouldDeserializeEmptyData() { - final Headers headers = deserializer.deserialize(new byte[0]); + final Headers headers = deserializer.deserialize("", new byte[0]); assertNotNull(headers); assertEquals(0, headers.toArray().length); @@ -51,8 +51,8 @@ public void shouldDeserializeEmptyData() { @Test public void shouldRoundTripEmptyHeaders() { final Headers original = new RecordHeaders(); - final byte[] serialized = serializer.serialize(original); - final Headers deserialized = deserializer.deserialize(serialized); + final byte[] serialized = serializer.serialize("", original); + final Headers deserialized = deserializer.deserialize("", serialized); assertNotNull(deserialized); assertEquals(0, deserialized.toArray().length); @@ -62,8 +62,8 @@ public void shouldRoundTripEmptyHeaders() { public void shouldRoundTripSingleHeader() { final Headers original = new RecordHeaders() .add("key1", "value1".getBytes()); - final byte[] serialized = serializer.serialize(original); - final Headers deserialized = deserializer.deserialize(serialized); + final byte[] serialized = serializer.serialize("", original); + final Headers deserialized = deserializer.deserialize("", serialized); assertNotNull(deserialized); assertEquals(1, deserialized.toArray().length); @@ -80,8 +80,8 @@ public void shouldRoundTripMultipleHeaders() { .add("key0", "value0".getBytes()) .add("key1", "value1".getBytes()) .add("key2", "value2".getBytes()); - final byte[] serialized = serializer.serialize(original); - final Headers deserialized = deserializer.deserialize(serialized); + final byte[] serialized = serializer.serialize("", original); + final Headers deserialized = deserializer.deserialize("", serialized); assertNotNull(deserialized); final Header[] headerArray = deserialized.toArray(); @@ -97,8 +97,8 @@ public void shouldRoundTripMultipleHeaders() { public void shouldRoundTripHeaderWithNullValue() { final Headers original = new RecordHeaders() .add("key1", null); - final byte[] serialized = serializer.serialize(original); - final Headers deserialized = deserializer.deserialize(serialized); + final byte[] serialized = serializer.serialize("", original); + final Headers deserialized = deserializer.deserialize("", serialized); assertNotNull(deserialized); assertEquals(1, deserialized.toArray().length); @@ -113,8 +113,8 @@ public void shouldRoundTripHeaderWithNullValue() { public void shouldRoundTripHeaderWithEmptyValue() { final Headers original = new RecordHeaders() .add("key1", new byte[0]); - final byte[] serialized = serializer.serialize(original); - final Headers deserialized = deserializer.deserialize(serialized); + final byte[] serialized = serializer.serialize("", original); + final Headers deserialized = deserializer.deserialize("", serialized); assertNotNull(deserialized); assertEquals(1, deserialized.toArray().length); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java index dd8e130fb6d1e..c6e59208720ee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java @@ -35,7 +35,7 @@ public class HeadersSerializerTest { @Test public void shouldSerializeNullHeaders() { - final byte[] serialized = serializer.serialize(null); + final byte[] serialized = serializer.serialize("", null); assertNotNull(serialized); assertEquals(1, serialized.length, "Null headers should have 1 byte to indicate headers count is 0"); @@ -45,7 +45,7 @@ public void shouldSerializeNullHeaders() { @Test public void shouldSerializeEmptyHeaders() { final Headers headers = new RecordHeaders(); - final byte[] serialized = serializer.serialize(headers); + final byte[] serialized = serializer.serialize("", headers); assertNotNull(serialized); assertEquals(1, serialized.length, "Empty headers should have 1 byte to indicate headers count is 0"); @@ -56,12 +56,12 @@ public void shouldSerializeEmptyHeaders() { public void shouldSerializeSingleHeader() { final Headers headers = new RecordHeaders() .add("key1", "value1".getBytes()); - final byte[] serialized = serializer.serialize(headers); + final byte[] serialized = serializer.serialize("", headers); assertNotNull(serialized); assertTrue(serialized.length > 0); - final Headers deserialized = deserializer.deserialize(serialized); + final Headers deserialized = deserializer.deserialize("", serialized); assertNotNull(deserialized); assertEquals(1, deserialized.toArray().length); @@ -77,12 +77,12 @@ public void shouldSerializeMultipleHeaders() { .add("key0", "value0".getBytes()) .add("key1", "value1".getBytes()) .add("key2", "value2".getBytes()); - final byte[] serialized = serializer.serialize(headers); + final byte[] serialized = serializer.serialize("", headers); assertNotNull(serialized); assertTrue(serialized.length > 0); - final Headers deserialized = deserializer.deserialize(serialized); + final Headers deserialized = deserializer.deserialize("", serialized); assertNotNull(deserialized); assertEquals(3, deserialized.toArray().length); @@ -98,12 +98,12 @@ public void shouldSerializeMultipleHeaders() { public void shouldSerializeHeaderWithNullValue() { final Headers headers = new RecordHeaders() .add("key1", null); - final byte[] serialized = serializer.serialize(headers); + final byte[] serialized = serializer.serialize("", headers); assertNotNull(serialized); assertTrue(serialized.length > 0); - final Headers deserialized = deserializer.deserialize(serialized); + final Headers deserialized = deserializer.deserialize("", serialized); assertNotNull(deserialized); assertEquals(1, deserialized.toArray().length); @@ -117,12 +117,12 @@ public void shouldSerializeHeaderWithNullValue() { public void shouldSerializeHeadersWithEmptyValue() { final Headers headers = new RecordHeaders() .add("key1", new byte[0]); - final byte[] serialized = serializer.serialize(headers); + final byte[] serialized = serializer.serialize("", headers); assertNotNull(serialized); assertTrue(serialized.length > 0); - final Headers deserialized = deserializer.deserialize(serialized); + final Headers deserialized = deserializer.deserialize("", serialized); assertNotNull(deserialized); assertEquals(1, deserialized.toArray().length); @@ -138,12 +138,12 @@ public void shouldSerializeHeadersWithSpecialCharacters() { .add("key-with-dash", "value".getBytes()) .add("key.with.dots", "value".getBytes()) .add("key_with_underscores", "value".getBytes()); - final byte[] serialized = serializer.serialize(headers); + final byte[] serialized = serializer.serialize("", headers); assertNotNull(serialized); assertTrue(serialized.length > 0); - final Headers deserialized = deserializer.deserialize(serialized); + final Headers deserialized = deserializer.deserialize("", serialized); assertNotNull(deserialized); assertEquals(3, deserialized.toArray().length); From a6a9270f08da3cd1a2a6b05f3a26413c72aaa7fe Mon Sep 17 00:00:00 2001 From: TengYao Chi Date: Thu, 5 Feb 2026 14:08:28 +0000 Subject: [PATCH 3/3] address comments --- .../kafka/streams/state/internals/HeadersDeserializer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java index 3e9622cbeb682..70a1107d98116 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java @@ -58,15 +58,15 @@ public Headers deserialize(final String topic, final byte[] data) { } final ByteBuffer buffer = ByteBuffer.wrap(data); - final int headerCount = ByteUtils.readVarint(buffer); + final int headersCount = ByteUtils.readVarint(buffer); - if (headerCount == 0) { + if (headersCount == 0) { return new RecordHeaders(); } final RecordHeaders headers = new RecordHeaders(); - for (int i = 0; i < headerCount; i++) { + for (int i = 0; i < headersCount; i++) { final int keyLength = ByteUtils.readVarint(buffer); final byte[] keyBytes = new byte[keyLength]; buffer.get(keyBytes);