From c9058df5b5333684e9239310581375428edf8064 Mon Sep 17 00:00:00 2001 From: "Darby.Han" Date: Wed, 16 Oct 2019 22:02:46 +0900 Subject: [PATCH] Add kafka header - KafkaMirrorMaker support kafka header --- build.gradle | 1 + .../datastream/common/BrooklinEnvelope.java | 44 ++++++++++++++++--- .../connectors/kafka/KafkaConnectorTask.java | 2 +- .../KafkaMirrorMakerConnectorTask.java | 2 +- .../kafka/KafkaTransportProvider.java | 10 ++++- 5 files changed, 49 insertions(+), 10 deletions(-) diff --git a/build.gradle b/build.gradle index 8df961ba8..51b942958 100644 --- a/build.gradle +++ b/build.gradle @@ -139,6 +139,7 @@ project(':datastream-common') { compile "com.linkedin.pegasus:restli-server:$pegasusVersion" compile "com.intellij:annotations:$intellijAnnotationsVersion" compile "com.google.guava:guava:$guavaVersion" + compile "org.apache.kafka:kafka-clients:$kafkaVersion" testCompile "org.mockito:mockito-core:$mockitoVersion" } } diff --git a/datastream-common/src/main/java/com/linkedin/datastream/common/BrooklinEnvelope.java b/datastream-common/src/main/java/com/linkedin/datastream/common/BrooklinEnvelope.java index d572e4755..092080610 100644 --- a/datastream-common/src/main/java/com/linkedin/datastream/common/BrooklinEnvelope.java +++ b/datastream-common/src/main/java/com/linkedin/datastream/common/BrooklinEnvelope.java @@ -11,6 +11,7 @@ import org.apache.avro.reflect.Nullable; import org.apache.commons.lang.Validate; +import org.apache.kafka.common.header.Headers; /** @@ -29,6 +30,8 @@ public class BrooklinEnvelope { private Map _metadata; + private Headers _headers; + /** * Construct a BrooklinEnvelope using record key, value, and metadata * @param key The record key (e.g. primary key) @@ -36,7 +39,7 @@ public class BrooklinEnvelope { * @param metadata Additional metadata to associate with the change event */ public BrooklinEnvelope(Object key, Object value, Map metadata) { - this(key, value, null, metadata); + this(key, value, null, metadata, null); } /** @@ -45,14 +48,28 @@ public BrooklinEnvelope(Object key, Object value, Map metadata) * @param previousValue The old record value * @param value The new record value * @param metadata Additional metadata to associate with the change event + * @param headers Kafka Headers */ public BrooklinEnvelope(@Nullable Object key, @Nullable Object value, @Nullable Object previousValue, - Map metadata) { + Map metadata, Headers headers) { Validate.notNull(metadata, "metadata cannot be null"); setKey(key); setValue(value); setPreviousValue(previousValue); setMetadata(metadata); + setHeaders(headers); + } + + /** + * Construct a BrooklinEnvelope using record key, value, and metadata + * @param key The record key (e.g. primary key) + * @param previousValue The old record value + * @param value The new record value + * @param metadata Additional metadata to associate with the change event + */ + public BrooklinEnvelope(@Nullable Object key, @Nullable Object value, @Nullable Object previousValue, + Map metadata) { + this(key, value, previousValue, metadata, null); } /** @@ -129,6 +146,21 @@ public void setMetadata(Map metadata) { _metadata = metadata; } + + /** + * Get the kafka headers + */ + public Headers getHeaders() { + return _headers; + } + + /** + * Set the kafka headers + */ + public void setHeaders(Headers headers) { + _headers = headers; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -139,17 +171,17 @@ public boolean equals(Object o) { } BrooklinEnvelope task = (BrooklinEnvelope) o; return Objects.equals(_previousValue, task._previousValue) && Objects.equals(_key, task._key) && Objects.equals( - _value, task._value) && Objects.equals(_metadata, task._metadata); + _value, task._value) && Objects.equals(_metadata, task._metadata) && Objects.equals(_headers, task._headers); } @Override public int hashCode() { - return Objects.hash(_key, _value, _previousValue, _metadata); + return Objects.hash(_key, _value, _previousValue, _metadata, _headers); } @Override public String toString() { - return String.format("Key:(%s), Value:(%s), PreviousValue:(%s), Metadata=(%s)", _key, _value, _previousValue, - _metadata); + return String.format("Key:(%s), Value:(%s), PreviousValue:(%s), Metadata=(%s), Headers=(%s)", _key, _value, _previousValue, + _metadata, _headers); } } diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaConnectorTask.java index c276a490b..d705def72 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/KafkaConnectorTask.java @@ -144,7 +144,7 @@ protected DatastreamProducerRecord translate(ConsumerRecord fromKafka, Ins eventsSourceTimestamp = fromKafka.timestamp(); } - BrooklinEnvelope envelope = new BrooklinEnvelope(fromKafka.key(), fromKafka.value(), null, metadata); + BrooklinEnvelope envelope = new BrooklinEnvelope(fromKafka.key(), fromKafka.value(), null, metadata, fromKafka.headers()); DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder(); builder.addEvent(envelope); builder.setEventsSourceTimestamp(eventsSourceTimestamp); diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java index f21f323ad..8562077b7 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java @@ -228,7 +228,7 @@ protected DatastreamProducerRecord translate(ConsumerRecord fromKafka, Ins String offsetStr = String.valueOf(offset); metadata.put(KAFKA_ORIGIN_OFFSET, offsetStr); metadata.put(BrooklinEnvelopeMetadataConstants.EVENT_TIMESTAMP, String.valueOf(eventsSourceTimestamp)); - BrooklinEnvelope envelope = new BrooklinEnvelope(fromKafka.key(), fromKafka.value(), null, metadata); + BrooklinEnvelope envelope = new BrooklinEnvelope(fromKafka.key(), fromKafka.value(), null, metadata, fromKafka.headers()); DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder(); builder.addEvent(envelope); builder.setEventsSourceTimestamp(eventsSourceTimestamp); diff --git a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTransportProvider.java b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTransportProvider.java index 868bfec88..0752dadfb 100644 --- a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTransportProvider.java +++ b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTransportProvider.java @@ -17,6 +17,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.Headers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,6 +100,7 @@ private ProducerRecord convertToProducerRecord(String topicName, byte[] keyValue = new byte[0]; byte[] payloadValue = new byte[0]; + Headers recordHeader = null; if (event instanceof BrooklinEnvelope) { BrooklinEnvelope envelope = (BrooklinEnvelope) event; if (envelope.key().isPresent() && envelope.key().get() instanceof byte[]) { @@ -108,19 +110,23 @@ private ProducerRecord convertToProducerRecord(String topicName, if (envelope.value().isPresent() && envelope.value().get() instanceof byte[]) { payloadValue = (byte[]) envelope.value().get(); } + + if (envelope.getHeaders() != null) { + recordHeader = envelope.getHeaders(); + } } else if (event instanceof byte[]) { payloadValue = (byte[]) event; } if (partition.isPresent() && partition.get() >= 0) { // If the partition is specified. We send the record to the specific partition - return new ProducerRecord<>(topicName, partition.get(), keyValue, payloadValue); + return new ProducerRecord<>(topicName, partition.get(), keyValue, payloadValue, recordHeader); } else { // If the partition is not specified. We use the partitionKey as the key. Kafka will use the hash of that // to determine the partition. If partitionKey does not exist, use the key value. keyValue = record.getPartitionKey().isPresent() ? record.getPartitionKey().get().getBytes(StandardCharsets.UTF_8) : null; - return new ProducerRecord<>(topicName, keyValue, payloadValue); + return new ProducerRecord<>(topicName, null, keyValue, payloadValue, recordHeader); } }