From d3796daf8433611043f6680412b27355317896e1 Mon Sep 17 00:00:00 2001 From: Daniel Schierbeck Date: Wed, 14 Oct 2020 13:05:10 +0200 Subject: [PATCH 01/30] v1.3.0 --- CHANGELOG.md | 2 ++ lib/kafka/version.rb | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1084c6e1d..c1adb26e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ Changes and additions to the library will be listed here. ## Unreleased +## 1.3.0 + - Support custom assignment strategy (#846). - Improved Exceptions in TransactionManager (#862). diff --git a/lib/kafka/version.rb b/lib/kafka/version.rb index 22636a095..523fd37e0 100644 --- a/lib/kafka/version.rb +++ b/lib/kafka/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Kafka - VERSION = "1.2.0" + VERSION = "1.3.0" end From d20b62a2ac7698f7d8a500de7f2726f2e6f74b06 Mon Sep 17 00:00:00 2001 From: Stanislav Meshkov Date: Wed, 21 Oct 2020 17:04:39 +0300 Subject: [PATCH 02/30] fix Kafka::TransactionManager#send_offsets_to_txn --- .../protocol/add_offsets_to_txn_response.rb | 2 + .../protocol/txn_offset_commit_response.rb | 39 +++++++-- lib/kafka/transaction_manager.rb | 19 ++++- spec/transaction_manager_spec.rb | 85 ++++++++++++++++++- 4 files changed, 134 insertions(+), 11 deletions(-) diff --git a/lib/kafka/protocol/add_offsets_to_txn_response.rb b/lib/kafka/protocol/add_offsets_to_txn_response.rb index 830613dfb..7ac824cd7 100644 --- a/lib/kafka/protocol/add_offsets_to_txn_response.rb +++ b/lib/kafka/protocol/add_offsets_to_txn_response.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + module Kafka module Protocol class AddOffsetsToTxnResponse diff --git a/lib/kafka/protocol/txn_offset_commit_response.rb b/lib/kafka/protocol/txn_offset_commit_response.rb index 6395bf030..628af7d66 100644 --- a/lib/kafka/protocol/txn_offset_commit_response.rb +++ b/lib/kafka/protocol/txn_offset_commit_response.rb @@ -1,17 +1,46 @@ +# frozen_string_literal: true + module Kafka module Protocol class TxnOffsetCommitResponse + class PartitionError + attr_reader :partition, :error_code + + def initialize(partition:, error_code:) + @partition = partition + @error_code = error_code + end + end + + class TopicPartitionsError + attr_reader :topic, :partitions + + def initialize(topic:, partitions:) + @topic = topic + @partitions = partitions + end + end - attr_reader :error_code + attr_reader :errors - def initialize(error_code:) - @error_code = error_code + def initialize(errors:) + @errors = errors end def self.decode(decoder) _throttle_time_ms = decoder.int32 - error_code = decoder.int16 - new(error_code: error_code) + errors = decoder.array do + TopicPartitionsError.new( + topic: decoder.string, + partitions: decoder.array do + PartitionError.new( + partition: decoder.int32, + error_code: decoder.int16 + ) + end + ) + end + new(errors: errors) end end end diff --git a/lib/kafka/transaction_manager.rb b/lib/kafka/transaction_manager.rb index fd7f9351f..23c8328c6 100644 --- a/lib/kafka/transaction_manager.rb +++ b/lib/kafka/transaction_manager.rb @@ -233,14 +233,23 @@ def send_offsets_to_txn(offsets:, group_id:) ) Protocol.handle_error(add_response.error_code) - send_response = transaction_coordinator.txn_offset_commit( + send_response = group_coordinator(group_id: group_id).txn_offset_commit( transactional_id: @transactional_id, group_id: group_id, producer_id: @producer_id, producer_epoch: @producer_epoch, offsets: offsets ) - Protocol.handle_error(send_response.error_code) + send_response.errors.each do |tp| + tp.partitions.each do |partition| + Protocol.handle_error(partition.error_code) + end + end + + nil + rescue + @transaction_state.transition_to!(TransactionStateMachine::ERROR) + raise end def in_transaction? @@ -283,6 +292,12 @@ def transaction_coordinator ) end + def group_coordinator(group_id:) + @cluster.get_group_coordinator( + group_id: group_id + ) + end + def complete_transaction @transaction_state.transition_to!(TransactionStateMachine::READY) @transaction_partitions = {} diff --git a/spec/transaction_manager_spec.rb b/spec/transaction_manager_spec.rb index b0fe7ee4c..6305e7e91 100644 --- a/spec/transaction_manager_spec.rb +++ b/spec/transaction_manager_spec.rb @@ -4,6 +4,7 @@ let!(:logger) { LOGGER } let!(:cluster) { double(:cluster) } let!(:transaction_coordinator) { double(:broker) } + let!(:group_coordinator) { double(:broker) } let!(:manager) do described_class.new(logger: logger, cluster: cluster) @@ -13,6 +14,9 @@ allow(cluster).to receive(:get_transaction_coordinator).and_return( transaction_coordinator ) + allow(cluster).to receive(:get_group_coordinator).and_return( + group_coordinator + ) allow(transaction_coordinator).to receive(:init_producer_id).and_return( Kafka::Protocol::InitProducerIDResponse.new( error_code: 0, @@ -586,9 +590,10 @@ error_code: 0 ) ) - allow(transaction_coordinator).to receive(:txn_offset_commit).and_return( - Kafka::Protocol::TxnOffsetCommitResponse.new( - error_code: 0 + allow(group_coordinator).to receive(:txn_offset_commit).and_return( + txn_offset_commit_response( + 'hello' => [1], + 'world' => [2] ) ) end @@ -596,7 +601,63 @@ it 'notifies transaction coordinator' do manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1) expect(transaction_coordinator).to have_received(:add_offsets_to_txn) - expect(transaction_coordinator).to have_received(:txn_offset_commit) + expect(group_coordinator).to have_received(:txn_offset_commit) + end + end + + context 'transaction coordinator returns error' do + before do + manager.init_transactions + manager.begin_transaction + allow(transaction_coordinator).to receive(:add_offsets_to_txn).and_return( + Kafka::Protocol::AddOffsetsToTxnResponse.new( + error_code: 47 + ) + ) + end + + it 'raises exception' do + expect do + manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1) + end.to raise_error(Kafka::InvalidProducerEpochError) + end + + it 'changes state to error' do + begin + manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1) + rescue; end + expect(manager.error?).to eql(true) + end + end + + context 'group coordinator returns error' do + before do + manager.init_transactions + manager.begin_transaction + allow(transaction_coordinator).to receive(:add_offsets_to_txn).and_return( + Kafka::Protocol::AddOffsetsToTxnResponse.new( + error_code: 0 + ) + ) + allow(group_coordinator).to receive(:txn_offset_commit).and_return( + txn_offset_commit_response( + { 'hello' => [1], 'world' => [2] }, + error_code: 47 + ) + ) + end + + it 'raises exception' do + expect do + manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1) + end.to raise_error(Kafka::InvalidProducerEpochError) + end + + it 'changes state to error' do + begin + manager.send_offsets_to_txn(offsets: [1, 2], group_id: 1) + rescue; end + expect(manager.error?).to eql(true) end end end @@ -617,3 +678,19 @@ def success_add_partitions_to_txn_response(topics) end ) end + +def txn_offset_commit_response(topics, error_code: 0) + Kafka::Protocol::TxnOffsetCommitResponse.new( + errors: topics.map do |topic, partitions| + Kafka::Protocol::TxnOffsetCommitResponse::TopicPartitionsError.new( + topic: topic, + partitions: partitions.map do |partition| + Kafka::Protocol::TxnOffsetCommitResponse::PartitionError.new( + partition: partition, + error_code: error_code + ) + end + ) + end + ) +end \ No newline at end of file From 8d25a79ae8b439f3b33395d17f6eedf658da56c3 Mon Sep 17 00:00:00 2001 From: Stanislav Meshkov Date: Wed, 21 Oct 2020 17:57:18 +0300 Subject: [PATCH 03/30] fix rubocop offense --- spec/transaction_manager_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/transaction_manager_spec.rb b/spec/transaction_manager_spec.rb index 6305e7e91..015beb0aa 100644 --- a/spec/transaction_manager_spec.rb +++ b/spec/transaction_manager_spec.rb @@ -693,4 +693,4 @@ def txn_offset_commit_response(topics, error_code: 0) ) end ) -end \ No newline at end of file +end From 9da0459cb454c17a307286361c402daafb1d6cfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lourens=20Naud=C3=A9?= Date: Wed, 11 Nov 2020 12:32:20 +0000 Subject: [PATCH 04/30] Add kafka 2.6.0 to Circle CI --- .circleci/config.yml | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index 9b78abdd1..c607764b1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -299,6 +299,38 @@ jobs: - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional + kafka-2.6: + docker: + - image: circleci/ruby:2.5.1-node + environment: + LOG_LEVEL: DEBUG + - image: wurstmeister/zookeeper + - image: wurstmeister/kafka:2.13-2.6.0 + environment: + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ADVERTISED_PORT: 9092 + KAFKA_PORT: 9092 + KAFKA_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_DELETE_TOPIC_ENABLE: true + - image: wurstmeister/kafka:2.13-2.6.0 + environment: + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ADVERTISED_PORT: 9093 + KAFKA_PORT: 9093 + KAFKA_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_DELETE_TOPIC_ENABLE: true + - image: wurstmeister/kafka:2.13-2.6.0 + environment: + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ADVERTISED_PORT: 9094 + KAFKA_PORT: 9094 + KAFKA_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_DELETE_TOPIC_ENABLE: true + steps: + - checkout + - run: bundle install --path vendor/bundle + - run: bundle exec rspec --profile --tag functional spec/functional + workflows: version: 2 test: @@ -313,3 +345,4 @@ workflows: - kafka-2.3 - kafka-2.4 - kafka-2.5 + - kafka-2.6 From fbf5273317ae29e6d543b3b7d28e20951b57d7ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lourens=20Naud=C3=A9?= Date: Wed, 11 Nov 2020 12:41:44 +0000 Subject: [PATCH 05/30] Add Kafka 2.6 support to README --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index 21577ffd5..0326ecbc4 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,11 @@ Or install it yourself as: Limited support Limited support + + Kafka 2.6 + Limited support + Limited support + This library is targeting Kafka 0.9 with the v0.4.x series and Kafka 0.10 with the v0.5.x series. There's limited support for Kafka 0.8, and things should work with Kafka 0.11, although there may be performance issues due to changes in the protocol. @@ -143,6 +148,7 @@ This library is targeting Kafka 0.9 with the v0.4.x series and Kafka 0.10 with t - **Kafka 2.3:** Everything that works with Kafka 2.2 should still work, but so far no features specific to Kafka 2.3 have been added. - **Kafka 2.4:** Everything that works with Kafka 2.3 should still work, but so far no features specific to Kafka 2.4 have been added. - **Kafka 2.5:** Everything that works with Kafka 2.4 should still work, but so far no features specific to Kafka 2.5 have been added. +- **Kafka 2.6:** Everything that works with Kafka 2.5 should still work, but so far no features specific to Kafka 2.6 have been added. This library requires Ruby 2.1 or higher. From d48aab871d5a52c02ad37073e33f03dbf3145f53 Mon Sep 17 00:00:00 2001 From: Stanislav Meshkov Date: Thu, 12 Nov 2020 16:09:34 +0300 Subject: [PATCH 06/30] add info to changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c1adb26e8..6e31a47d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ Changes and additions to the library will be listed here. ## Unreleased +- Fix `Kafka::TransactionManager#send_offsets_to_txn` (#866). + ## 1.3.0 - Support custom assignment strategy (#846). From 3e6ea9fd03dfe03738ee5aa35647b1f7f339f495 Mon Sep 17 00:00:00 2001 From: abicky Date: Sat, 12 Dec 2020 20:57:09 +0900 Subject: [PATCH 07/30] Resolve RSpec::Mocks::OutsideOfExampleError --- spec/async_producer_spec.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/spec/async_producer_spec.rb b/spec/async_producer_spec.rb index 7a0ed1351..ff91e9c8a 100644 --- a/spec/async_producer_spec.rb +++ b/spec/async_producer_spec.rb @@ -76,6 +76,8 @@ def instrument(name, payload = {}) sleep 0.2 # wait for worker to call produce expect(sync_producer).to have_received(:produce) + + async_producer.shutdown end it "retries until configured max_retries" do @@ -89,6 +91,8 @@ def instrument(name, payload = {}) metric = instrumenter.metrics_for("error.async_producer").first expect(metric.payload[:error]).to be_a(Kafka::BufferOverflow) expect(sync_producer).to have_received(:produce).exactly(3).times + + async_producer.shutdown end it "requires `topic` to be a String" do From ca33e4046995c3c7fd8829ae43c9af54b6c247ed Mon Sep 17 00:00:00 2001 From: abicky Date: Sat, 12 Dec 2020 21:23:35 +0900 Subject: [PATCH 08/30] Install cmake to install snappy --- .circleci/config.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index c607764b1..fa1a8f04d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -7,6 +7,7 @@ jobs: LOG_LEVEL: DEBUG steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec - run: bundle exec rubocop @@ -40,6 +41,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -72,6 +74,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -104,6 +107,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -136,6 +140,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -168,6 +173,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -200,6 +206,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -232,6 +239,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -264,6 +272,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -296,6 +305,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -328,6 +338,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional From 6e25c4cfa96892a9b542499c5242733c03e4846f Mon Sep 17 00:00:00 2001 From: abicky Date: Sat, 12 Dec 2020 21:03:57 +0900 Subject: [PATCH 09/30] Resolve "Passing the keyword argument as ..." deprecation warning --- spec/spec_helper.rb | 4 ++-- spec/transaction_manager_spec.rb | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 32563ab04..1e8a44202 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -61,9 +61,9 @@ def generate_topic_name "#{RUN_ID}-topic-#{SecureRandom.uuid}" end - def create_random_topic(*args) + def create_random_topic(**args) topic = generate_topic_name - create_topic(topic, *args) + create_topic(topic, **args) topic end diff --git a/spec/transaction_manager_spec.rb b/spec/transaction_manager_spec.rb index 015beb0aa..494a02265 100644 --- a/spec/transaction_manager_spec.rb +++ b/spec/transaction_manager_spec.rb @@ -591,10 +591,10 @@ ) ) allow(group_coordinator).to receive(:txn_offset_commit).and_return( - txn_offset_commit_response( + txn_offset_commit_response({ 'hello' => [1], 'world' => [2] - ) + }) ) end From fe2fb3d6067cfe0847ae2a469f9bd2987983d816 Mon Sep 17 00:00:00 2001 From: abicky Date: Sat, 12 Dec 2020 21:23:35 +0900 Subject: [PATCH 10/30] Install cmake to install snappy --- .circleci/config.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index c607764b1..fa1a8f04d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -7,6 +7,7 @@ jobs: LOG_LEVEL: DEBUG steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec - run: bundle exec rubocop @@ -40,6 +41,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -72,6 +74,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -104,6 +107,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -136,6 +140,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -168,6 +173,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -200,6 +206,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -232,6 +239,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -264,6 +272,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -296,6 +305,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -328,6 +338,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional From 7d1c38443c2f2fe1f2b95b40d2e593de9926d2f4 Mon Sep 17 00:00:00 2001 From: abicky Date: Sun, 13 Dec 2020 02:36:26 +0900 Subject: [PATCH 11/30] Make "consuming messages with a custom assignment strategy" stable In the example, one consumer sometimes consumers all messages before another consumer joins. As a result, the example is unstable. This commit makes it stable by ensuring all consumers have joined. --- spec/functional/consumer_group_spec.rb | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/spec/functional/consumer_group_spec.rb b/spec/functional/consumer_group_spec.rb index 23cc1525e..4368f63ad 100644 --- a/spec/functional/consumer_group_spec.rb +++ b/spec/functional/consumer_group_spec.rb @@ -476,12 +476,21 @@ def call(cluster:, members:, partitions:) end end + joinined_consumers = [] consumers = 2.times.map do |i| assignment_strategy = assignment_strategy_class.new(i + 1) kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger) consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, assignment_strategy: assignment_strategy) consumer.subscribe(topic) + + allow(consumer).to receive(:trigger_heartbeat).and_wrap_original do |m, *args| + joinined_consumers |= [consumer] + # Wait until all the consumers try to join to prevent one consumer from processing all messages + raise Kafka::HeartbeatError if joinined_consumers.size < consumers.size + m.call(*args) + end + consumer end From cda766766974ff1d97586c6468d7b9182ef02f8f Mon Sep 17 00:00:00 2001 From: abicky Date: Sat, 12 Dec 2020 21:23:35 +0900 Subject: [PATCH 12/30] Install cmake to install snappy --- .circleci/config.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index c607764b1..fa1a8f04d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -7,6 +7,7 @@ jobs: LOG_LEVEL: DEBUG steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec - run: bundle exec rubocop @@ -40,6 +41,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -72,6 +74,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -104,6 +107,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -136,6 +140,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -168,6 +173,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -200,6 +206,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -232,6 +239,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -264,6 +272,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -296,6 +305,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -328,6 +338,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional From fe7bc77212bfdff71cc3705ea68900e328922f9d Mon Sep 17 00:00:00 2001 From: Takeshi Arabiki Date: Thu, 31 Dec 2020 00:15:14 +0900 Subject: [PATCH 13/30] Apply suggestions from code review Co-authored-by: Daniel Schierbeck --- spec/functional/consumer_group_spec.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spec/functional/consumer_group_spec.rb b/spec/functional/consumer_group_spec.rb index 4368f63ad..322ab9e63 100644 --- a/spec/functional/consumer_group_spec.rb +++ b/spec/functional/consumer_group_spec.rb @@ -476,7 +476,7 @@ def call(cluster:, members:, partitions:) end end - joinined_consumers = [] + joined_consumers = [] consumers = 2.times.map do |i| assignment_strategy = assignment_strategy_class.new(i + 1) @@ -485,9 +485,9 @@ def call(cluster:, members:, partitions:) consumer.subscribe(topic) allow(consumer).to receive(:trigger_heartbeat).and_wrap_original do |m, *args| - joinined_consumers |= [consumer] + joined_consumers |= [consumer] # Wait until all the consumers try to join to prevent one consumer from processing all messages - raise Kafka::HeartbeatError if joinined_consumers.size < consumers.size + raise Kafka::HeartbeatError if joined_consumers.size < consumers.size m.call(*args) end From ad8d8d9c85d7335f930e2d1cf89d4570820e37f1 Mon Sep 17 00:00:00 2001 From: vvuibert Date: Mon, 4 Jan 2021 14:16:21 -0500 Subject: [PATCH 14/30] kafka 2.7.0 --- .circleci/config.yml | 34 ++++++++++++++++++++++++++++++++++ README.md | 6 ++++++ 2 files changed, 40 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index fa1a8f04d..3cc9f5d0e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -342,6 +342,39 @@ jobs: - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional + kafka-2.7: + docker: + - image: circleci/ruby:2.5.1-node + environment: + LOG_LEVEL: DEBUG + - image: wurstmeister/zookeeper + - image: wurstmeister/kafka:2.13-2.7.0 + environment: + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ADVERTISED_PORT: 9092 + KAFKA_PORT: 9092 + KAFKA_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_DELETE_TOPIC_ENABLE: true + - image: wurstmeister/kafka:2.13-2.7.0 + environment: + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ADVERTISED_PORT: 9093 + KAFKA_PORT: 9093 + KAFKA_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_DELETE_TOPIC_ENABLE: true + - image: wurstmeister/kafka:2.13-2.7.0 + environment: + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ADVERTISED_PORT: 9094 + KAFKA_PORT: 9094 + KAFKA_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_DELETE_TOPIC_ENABLE: true + steps: + - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy + - run: bundle install --path vendor/bundle + - run: bundle exec rspec --profile --tag functional spec/functional + workflows: version: 2 test: @@ -357,3 +390,4 @@ workflows: - kafka-2.4 - kafka-2.5 - kafka-2.6 + - kafka-2.7 diff --git a/README.md b/README.md index 2de660263..ebe3d8365 100644 --- a/README.md +++ b/README.md @@ -134,6 +134,11 @@ Or install it yourself as: Limited support Limited support + + Kafka 2.7 + Limited support + Limited support + This library is targeting Kafka 0.9 with the v0.4.x series and Kafka 0.10 with the v0.5.x series. There's limited support for Kafka 0.8, and things should work with Kafka 0.11, although there may be performance issues due to changes in the protocol. @@ -150,6 +155,7 @@ This library is targeting Kafka 0.9 with the v0.4.x series and Kafka 0.10 with t - **Kafka 2.4:** Everything that works with Kafka 2.3 should still work, but so far no features specific to Kafka 2.4 have been added. - **Kafka 2.5:** Everything that works with Kafka 2.4 should still work, but so far no features specific to Kafka 2.5 have been added. - **Kafka 2.6:** Everything that works with Kafka 2.5 should still work, but so far no features specific to Kafka 2.6 have been added. +- **Kafka 2.7:** Everything that works with Kafka 2.6 should still work, but so far no features specific to Kafka 2.7 have been added. This library requires Ruby 2.1 or higher. From 9028ed792e189976b30d7bce31d4ed9cb8dbf113 Mon Sep 17 00:00:00 2001 From: vvuibert Date: Wed, 13 Jan 2021 12:09:53 -0500 Subject: [PATCH 15/30] fix describe topic test --- spec/functional/topic_management_spec.rb | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/spec/functional/topic_management_spec.rb b/spec/functional/topic_management_spec.rb index d10b5fdd6..3dbdcaec3 100644 --- a/spec/functional/topic_management_spec.rb +++ b/spec/functional/topic_management_spec.rb @@ -70,11 +70,10 @@ def with_retry(opts = {}, &block) topic = generate_topic_name kafka.create_topic(topic, num_partitions: 3) - configs = kafka.describe_topic(topic, %w(retention.ms retention.bytes non_exists)) + configs = kafka.describe_topic(topic, %w(retention.ms)) - expect(configs.keys).to eql(%w(retention.ms retention.bytes)) + expect(configs.keys).to eql(%w(retention.ms)) expect(configs['retention.ms']).to be_a(String) - expect(configs['retention.bytes']).to be_a(String) end example "alter a topic configuration" do @@ -90,8 +89,9 @@ def with_retry(opts = {}, &block) 'max.message.bytes' => '987654' ) - configs = kafka.describe_topic(topic, %w(retention.ms max.message.bytes)) - expect(configs['retention.ms']).to eql('1234567') - expect(configs['max.message.bytes']).to eql('987654') + retention_configs = kafka.describe_topic(topic, %w(retention.ms)) + expect(retention_configs['retention.ms']).to eql('1234567') + max_msg_bytes_configs = kafka.describe_topic(topic, %w(max.message.bytes)) + expect(max_msg_bytes_configs['max.message.bytes']).to eql('987654') end end From b323c63e52753b94902a606ebc046556981bfeee Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Sun, 24 Jan 2021 20:21:19 -0800 Subject: [PATCH 16/30] Adding murmur2_random partition assignment --- lib/kafka/client.rb | 8 +++- lib/kafka/murmur2_partitioner.rb | 35 ++++++++++++++ lib/kafka/producer.rb | 1 + ruby-kafka.gemspec | 1 + spec/partitioner_spec.rb | 82 ++++++++++++++++++++++++++------ 5 files changed, 110 insertions(+), 17 deletions(-) create mode 100644 lib/kafka/murmur2_partitioner.rb diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 0525f2bff..52ef9ffab 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -65,7 +65,7 @@ class Client # @param ssl_ca_certs_from_system [Boolean] whether to use the CA certs from the # system's default certificate store. # - # @param partitioner [Partitioner, nil] the partitioner that should be used by the client. + # @param partitioner [String, nil] the partitioner that should be used by the client. # # @param sasl_oauth_token_provider [Object, nil] OAuthBearer Token Provider instance that # implements method token. See {Sasl::OAuth#initialize} @@ -124,7 +124,11 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time ) @cluster = initialize_cluster - @partitioner = partitioner || Partitioner.new + @partitioner = if partitioner + Object.const_get(partitioner).new + else + Partitioner.new + end end # Delivers a single message to the Kafka cluster. diff --git a/lib/kafka/murmur2_partitioner.rb b/lib/kafka/murmur2_partitioner.rb new file mode 100644 index 000000000..e7161dfb9 --- /dev/null +++ b/lib/kafka/murmur2_partitioner.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +require 'digest/murmurhash' + +module Kafka + + # Java producer compatible message partitioner + class Murmur2Partitioner + SEED = [0x9747b28c].pack('L') + + # Assigns a partition number based on a partition key. If no explicit + # partition key is provided, the message key will be used instead. + # + # If the key is nil, then a random partition is selected. Otherwise, a hash + # of the key is used to deterministically find a partition. As long as the + # number of partitions doesn't change, the same key will always be assigned + # to the same partition. + # + # @param partition_count [Integer] the number of partitions in the topic. + # @param message [Kafka::PendingMessage] the message that should be assigned + # a partition. + # @return [Integer] the partition number. + def call(partition_count, message) + raise ArgumentError if partition_count == 0 + + key = message.partition_key || message.key + + if key.nil? + rand(partition_count) + else + (Digest::MurmurHash2.rawdigest(key, SEED) & 0x7fffffff) % partition_count + end + end + end +end diff --git a/lib/kafka/producer.rb b/lib/kafka/producer.rb index b504dbfdd..0a70f1b41 100644 --- a/lib/kafka/producer.rb +++ b/lib/kafka/producer.rb @@ -2,6 +2,7 @@ require "set" require "kafka/partitioner" +require "kafka/murmur2_partitioner" require "kafka/message_buffer" require "kafka/produce_operation" require "kafka/pending_message_queue" diff --git a/ruby-kafka.gemspec b/ruby-kafka.gemspec index 2589e97d5..0b0dbbfd6 100644 --- a/ruby-kafka.gemspec +++ b/ruby-kafka.gemspec @@ -28,6 +28,7 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.add_dependency 'digest-crc' + spec.add_dependency "digest-murmurhash" spec.add_development_dependency "bundler", ">= 1.9.5" spec.add_development_dependency "rake", "~> 10.0" diff --git a/spec/partitioner_spec.rb b/spec/partitioner_spec.rb index dafe73557..9b779551f 100644 --- a/spec/partitioner_spec.rb +++ b/spec/partitioner_spec.rb @@ -1,29 +1,81 @@ # frozen_string_literal: true describe Kafka::Partitioner, "#call" do - let(:partitioner) { Kafka::Partitioner.new } let(:message) { double(:message, key: nil, partition_key: "yolo") } - it "deterministically returns a partition number for a partition key and partition count" do - partition = partitioner.call(3, message) - expect(partition).to eq 0 - end + describe "default partitioner" do + let(:partitioner) { Kafka::Partitioner.new } + + it "deterministically returns a partition number for a partition key and partition count" do + partition = partitioner.call(3, message) + expect(partition).to eq 0 + end + + it "falls back to the message key if no partition key is available" do + allow(message).to receive(:partition_key) { nil } + allow(message).to receive(:key) { "hey" } - it "falls back to the message key if no partition key is available" do - allow(message).to receive(:partition_key) { nil } - allow(message).to receive(:key) { "hey" } + partition = partitioner.call(3, message) - partition = partitioner.call(3, message) + expect(partition).to eq 2 + end - expect(partition).to eq 2 + it "randomly picks a partition if the key is nil" do + allow(message).to receive(:key) { nil } + allow(message).to receive(:partition_key) { nil } + + partitions = 30.times.map { partitioner.call(3, message) } + + expect(partitions.uniq).to contain_exactly(0, 1, 2) + end end - it "randomly picks a partition if the key is nil" do - allow(message).to receive(:key) { nil } - allow(message).to receive(:partition_key) { nil } + describe "murmur2 partitioner" do + let(:partitioner) { Kafka::Murmur2Partitioner.new } + let(:message) { double(:message, key: nil, partition_key: "yolo") } + + it "deterministically returns a partition number for a partition key and partition count" do + partition = partitioner.call(3, message) + expect(partition).to eq 0 + end + + it "falls back to the message key if no partition key is available" do + allow(message).to receive(:partition_key) { nil } + allow(message).to receive(:key) { "hey" } + + partition = partitioner.call(3, message) + + expect(partition).to eq 1 + end + + it "randomly picks a partition if the key is nil" do + allow(message).to receive(:key) { nil } + allow(message).to receive(:partition_key) { nil } + + partitions = 30.times.map { partitioner.call(3, message) } - partitions = 30.times.map { partitioner.call(3, message) } + expect(partitions.uniq).to contain_exactly(0, 1, 2) + end - expect(partitions.uniq).to contain_exactly(0, 1, 2) + it "picks a Java Kafka compatible partition" do + partition_count = 100 + { + # librdkafka test cases taken from tests/0048-partitioner.c + "" => 0x106e08d9 % partition_count, + "this is another string with more length to it perhaps" => 0x4f7703da % partition_count, + "hejsan" => 0x5ec19395 % partition_count, + # Java Kafka test cases taken from UtilsTest.java. + # The Java tests check the result of murmur2 directly, + # so have been ANDd with 0x7fffffff to work here + "21" => (-973932308 & 0x7fffffff) % partition_count, + "foobar" => (-790332482 & 0x7fffffff) % partition_count, + "a-little-bit-long-string" => (-985981536 & 0x7fffffff) % partition_count, + "a-little-bit-longer-string" => (-1486304829 & 0x7fffffff) % partition_count, + "lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8" => (-58897971 & 0x7fffffff) % partition_count + }.each do |key, partition| + allow(message).to receive(:partition_key) { key } + expect(partitioner.call(partition_count, message)).to eq partition + end + end end end From db7f5f4a3d097e72e0140aa0fd332c3d317eb170 Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Tue, 26 Jan 2021 20:13:00 -0800 Subject: [PATCH 17/30] Add partitioner_klass as client param --- lib/kafka/client.rb | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 52ef9ffab..3ee8aa39e 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -65,7 +65,9 @@ class Client # @param ssl_ca_certs_from_system [Boolean] whether to use the CA certs from the # system's default certificate store. # - # @param partitioner [String, nil] the partitioner that should be used by the client. + # @param partitioner [Partitioner, nil] the partitioner that should be used by the client. + # + # @param partitioner_klass [String, nil] the partitioner klass that should be used by the client if no partitioner is supplied. # # @param sasl_oauth_token_provider [Object, nil] OAuthBearer Token Provider instance that # implements method token. See {Sasl::OAuth#initialize} @@ -80,7 +82,7 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time ssl_client_cert_key_password: nil, ssl_client_cert_chain: nil, sasl_gssapi_principal: nil, sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil, sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil, - sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true) + sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, partitioner_klass: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true) @logger = TaggedLogger.new(logger) @instrumenter = Instrumenter.new(client_id: client_id) @seed_brokers = normalize_seed_brokers(seed_brokers) @@ -124,11 +126,14 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time ) @cluster = initialize_cluster - @partitioner = if partitioner - Object.const_get(partitioner).new - else - Partitioner.new - end + @partitioner = + if partitioner + partitioner + elsif partitioner_klass + Object.const_get(partitioner_klass).new + else + Partitioner.new + end end # Delivers a single message to the Kafka cluster. From 06175734c4d5f3094bfdaaf5b2f8345d39029e37 Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Mon, 1 Feb 2021 11:39:43 -0800 Subject: [PATCH 18/30] Changes `digest-murmurhash` to be an optional dependency And all the plumbing to load it --- lib/kafka/crc32_hash.rb | 15 +++++++++++++ lib/kafka/digest.rb | 24 ++++++++++++++++++++ lib/kafka/murmur2_hash.rb | 17 +++++++++++++++ lib/kafka/murmur2_partitioner.rb | 35 ------------------------------ lib/kafka/partitioner.rb | 7 ++++-- lib/kafka/producer.rb | 1 - lib/kafka/protocol/record_batch.rb | 2 +- ruby-kafka.gemspec | 2 +- spec/digest_spec.rb | 33 ++++++++++++++++++++++++++++ spec/partitioner_spec.rb | 2 +- 10 files changed, 97 insertions(+), 41 deletions(-) create mode 100644 lib/kafka/crc32_hash.rb create mode 100644 lib/kafka/digest.rb create mode 100644 lib/kafka/murmur2_hash.rb delete mode 100644 lib/kafka/murmur2_partitioner.rb create mode 100644 spec/digest_spec.rb diff --git a/lib/kafka/crc32_hash.rb b/lib/kafka/crc32_hash.rb new file mode 100644 index 000000000..af342ff48 --- /dev/null +++ b/lib/kafka/crc32_hash.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +require "zlib" + +module Kafka + class Crc32Hash + + # crc32 is part of the gems dependencies + def load; end + + def hash(value) + Zlib.crc32(value) + end + end +end diff --git a/lib/kafka/digest.rb b/lib/kafka/digest.rb new file mode 100644 index 000000000..f76885184 --- /dev/null +++ b/lib/kafka/digest.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +require "kafka/crc32_hash" +require "kafka/murmur2_hash" + +module Kafka + module Digest + FUNCTIONS_BY_NAME = { + :crc32 => Crc32Hash.new, + :murmur2 => Murmur2Hash.new + }.freeze + + # TODO: Should I just call this `hashing` or something? + def self.find_digest(name) + digest = FUNCTIONS_BY_NAME.fetch(name) do + raise LoadError, "Unknown hash function #{name}" + end + + digest.load + + digest + end + end +end diff --git a/lib/kafka/murmur2_hash.rb b/lib/kafka/murmur2_hash.rb new file mode 100644 index 000000000..a6223b0d6 --- /dev/null +++ b/lib/kafka/murmur2_hash.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +module Kafka + class Murmur2Hash + SEED = [0x9747b28c].pack('L') + + def load + require 'digest/murmurhash' + rescue LoadError + raise LoadError, "using murmur2 hashing requires adding a dependency on the `digest-murmurhash` gem to your Gemfile." + end + + def hash(value) + ::Digest::MurmurHash2.rawdigest(value, SEED) & 0x7fffffff + end + end +end diff --git a/lib/kafka/murmur2_partitioner.rb b/lib/kafka/murmur2_partitioner.rb deleted file mode 100644 index e7161dfb9..000000000 --- a/lib/kafka/murmur2_partitioner.rb +++ /dev/null @@ -1,35 +0,0 @@ -# frozen_string_literal: true - -require 'digest/murmurhash' - -module Kafka - - # Java producer compatible message partitioner - class Murmur2Partitioner - SEED = [0x9747b28c].pack('L') - - # Assigns a partition number based on a partition key. If no explicit - # partition key is provided, the message key will be used instead. - # - # If the key is nil, then a random partition is selected. Otherwise, a hash - # of the key is used to deterministically find a partition. As long as the - # number of partitions doesn't change, the same key will always be assigned - # to the same partition. - # - # @param partition_count [Integer] the number of partitions in the topic. - # @param message [Kafka::PendingMessage] the message that should be assigned - # a partition. - # @return [Integer] the partition number. - def call(partition_count, message) - raise ArgumentError if partition_count == 0 - - key = message.partition_key || message.key - - if key.nil? - rand(partition_count) - else - (Digest::MurmurHash2.rawdigest(key, SEED) & 0x7fffffff) % partition_count - end - end - end -end diff --git a/lib/kafka/partitioner.rb b/lib/kafka/partitioner.rb index f4fcd2882..14901ca1f 100644 --- a/lib/kafka/partitioner.rb +++ b/lib/kafka/partitioner.rb @@ -1,11 +1,14 @@ # frozen_string_literal: true -require "zlib" +require "kafka/digest" module Kafka # Assigns partitions to messages. class Partitioner + def initialize(hash_function: nil) + @digest = Digest.find_digest(hash_function || :crc32) + end # Assigns a partition number based on a partition key. If no explicit # partition key is provided, the message key will be used instead. @@ -28,7 +31,7 @@ def call(partition_count, message) if key.nil? rand(partition_count) else - Zlib.crc32(key) % partition_count + @digest.hash(key) % partition_count end end end diff --git a/lib/kafka/producer.rb b/lib/kafka/producer.rb index 0a70f1b41..b504dbfdd 100644 --- a/lib/kafka/producer.rb +++ b/lib/kafka/producer.rb @@ -2,7 +2,6 @@ require "set" require "kafka/partitioner" -require "kafka/murmur2_partitioner" require "kafka/message_buffer" require "kafka/produce_operation" require "kafka/pending_message_queue" diff --git a/lib/kafka/protocol/record_batch.rb b/lib/kafka/protocol/record_batch.rb index 4201cc737..b9f6ea2c9 100644 --- a/lib/kafka/protocol/record_batch.rb +++ b/lib/kafka/protocol/record_batch.rb @@ -77,7 +77,7 @@ def encode(encoder) record_batch_encoder.write_int8(MAGIC_BYTE) body = encode_record_batch_body - crc = Digest::CRC32c.checksum(body) + crc = ::Digest::CRC32c.checksum(body) record_batch_encoder.write_int32(crc) record_batch_encoder.write(body) diff --git a/ruby-kafka.gemspec b/ruby-kafka.gemspec index 0b0dbbfd6..d50092dc5 100644 --- a/ruby-kafka.gemspec +++ b/ruby-kafka.gemspec @@ -28,12 +28,12 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.add_dependency 'digest-crc' - spec.add_dependency "digest-murmurhash" spec.add_development_dependency "bundler", ">= 1.9.5" spec.add_development_dependency "rake", "~> 10.0" spec.add_development_dependency "rspec" spec.add_development_dependency "pry" + spec.add_development_dependency "digest-murmurhash" spec.add_development_dependency "dotenv" spec.add_development_dependency "docker-api" spec.add_development_dependency "rspec-benchmark" diff --git a/spec/digest_spec.rb b/spec/digest_spec.rb new file mode 100644 index 000000000..c4b98a8bf --- /dev/null +++ b/spec/digest_spec.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +describe Kafka::Digest do + describe "crc32" do + let(:digest) { Kafka::Digest.find_digest(:crc32) } + + it "is supported" do + expect(digest).to be_truthy + end + + it "produces hash for value" do + expect(digest.hash("yolo")).to eq(1623057525) + end + end + + describe "murmur2" do + let(:digest) { Kafka::Digest.find_digest(:murmur2) } + + it "is supported" do + expect(digest).to be_truthy + end + + it "produces hash for value" do + expect(digest.hash("yolo")).to eq(1633766415) + end + end + + describe "unknown hash function" do + it "raises" do + expect { Kafka::Digest.find_digest(:yolo) }.to raise_error + end + end +end diff --git a/spec/partitioner_spec.rb b/spec/partitioner_spec.rb index 9b779551f..547dccd13 100644 --- a/spec/partitioner_spec.rb +++ b/spec/partitioner_spec.rb @@ -31,7 +31,7 @@ end describe "murmur2 partitioner" do - let(:partitioner) { Kafka::Murmur2Partitioner.new } + let(:partitioner) { Kafka::Partitioner.new(hash_function: :murmur2) } let(:message) { double(:message, key: nil, partition_key: "yolo") } it "deterministically returns a partition number for a partition key and partition count" do From 4669c204498565fd8addf7d32907e0532c7f6d13 Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Mon, 1 Feb 2021 11:40:01 -0800 Subject: [PATCH 19/30] Revert client API change --- lib/kafka/client.rb | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 3ee8aa39e..0525f2bff 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -67,8 +67,6 @@ class Client # # @param partitioner [Partitioner, nil] the partitioner that should be used by the client. # - # @param partitioner_klass [String, nil] the partitioner klass that should be used by the client if no partitioner is supplied. - # # @param sasl_oauth_token_provider [Object, nil] OAuthBearer Token Provider instance that # implements method token. See {Sasl::OAuth#initialize} # @@ -82,7 +80,7 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time ssl_client_cert_key_password: nil, ssl_client_cert_chain: nil, sasl_gssapi_principal: nil, sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil, sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil, - sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, partitioner_klass: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true) + sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true) @logger = TaggedLogger.new(logger) @instrumenter = Instrumenter.new(client_id: client_id) @seed_brokers = normalize_seed_brokers(seed_brokers) @@ -126,14 +124,7 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time ) @cluster = initialize_cluster - @partitioner = - if partitioner - partitioner - elsif partitioner_klass - Object.const_get(partitioner_klass).new - else - Partitioner.new - end + @partitioner = partitioner || Partitioner.new end # Delivers a single message to the Kafka cluster. From b2f3eedd288ab13a6752a155b216931a3861b332 Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Mon, 1 Feb 2021 11:42:34 -0800 Subject: [PATCH 20/30] Small cleanup --- lib/kafka/digest.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/kafka/digest.rb b/lib/kafka/digest.rb index f76885184..8ba4cc206 100644 --- a/lib/kafka/digest.rb +++ b/lib/kafka/digest.rb @@ -10,14 +10,12 @@ module Digest :murmur2 => Murmur2Hash.new }.freeze - # TODO: Should I just call this `hashing` or something? def self.find_digest(name) digest = FUNCTIONS_BY_NAME.fetch(name) do raise LoadError, "Unknown hash function #{name}" end digest.load - digest end end From d49e7c4493c130a63fc0951a50bba5496541cb98 Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Mon, 1 Feb 2021 20:19:03 -0800 Subject: [PATCH 21/30] small cleanup --- lib/kafka/crc32_hash.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafka/crc32_hash.rb b/lib/kafka/crc32_hash.rb index af342ff48..1849008a6 100644 --- a/lib/kafka/crc32_hash.rb +++ b/lib/kafka/crc32_hash.rb @@ -5,7 +5,7 @@ module Kafka class Crc32Hash - # crc32 is part of the gems dependencies + # crc32 is supported natively def load; end def hash(value) From 5a3374e80354c3345c3d8053708b42d4fabf2844 Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Mon, 1 Feb 2021 20:19:12 -0800 Subject: [PATCH 22/30] Add murmur2 to readme --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index ebe3d8365..a6b963761 100644 --- a/README.md +++ b/README.md @@ -382,6 +382,16 @@ partitioner = -> (partition_count, message) { ... } Kafka.new(partitioner: partitioner, ...) ``` +##### Supported partitioning schemes + +In order for semantic partitioning to work a `partition_key` must map to the same partition number every time. The general approach, and the one used by this library, is to hash the key and mod it by the number of partitions. There are many different algorithms to calculate a hash. By default `crc32` is used. `murmur2` is also supported for compatibility with Java based Kafka producers. + +To use `murmur2` hashing pass it as an argument to `Partitioner`. For example: + +```ruby +Kafka.new(partitioner: Kafka::Partitioner.new(hash_function: :murmur2)) +``` + #### Buffering and Error Handling The producer is designed for resilience in the face of temporary network errors, Kafka broker failovers, and other issues that prevent the client from writing messages to the destination topics. It does this by employing local, in-memory buffers. Only when messages are acknowledged by a Kafka broker will they be removed from the buffer. From c52c238000dded74097e9853b7a0f08d756719a7 Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Mon, 1 Feb 2021 20:19:22 -0800 Subject: [PATCH 23/30] Add murmur2 to changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e31a47d6..e1db8b5aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ Changes and additions to the library will be listed here. ## Unreleased - Fix `Kafka::TransactionManager#send_offsets_to_txn` (#866). +- Add support for `murmur2` based partitioning. ## 1.3.0 From f3e5078d902dbf9e0e09b56fa639f68fb50e9080 Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Mon, 1 Feb 2021 20:29:42 -0800 Subject: [PATCH 24/30] Add Partitioner doc --- lib/kafka/partitioner.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/kafka/partitioner.rb b/lib/kafka/partitioner.rb index 14901ca1f..e11052442 100644 --- a/lib/kafka/partitioner.rb +++ b/lib/kafka/partitioner.rb @@ -6,6 +6,8 @@ module Kafka # Assigns partitions to messages. class Partitioner + # @param hash_function [Symbol, nil] the algorithm used to compute a messages + # destination partition. Default is :crc32 def initialize(hash_function: nil) @digest = Digest.find_digest(hash_function || :crc32) end From fd89412ecad8f250e15670a57faaf479cc7f06f0 Mon Sep 17 00:00:00 2001 From: Steven Diviney Date: Mon, 1 Feb 2021 20:32:02 -0800 Subject: [PATCH 25/30] Correcr readme doc --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a6b963761..1ff5b616f 100644 --- a/README.md +++ b/README.md @@ -384,7 +384,7 @@ Kafka.new(partitioner: partitioner, ...) ##### Supported partitioning schemes -In order for semantic partitioning to work a `partition_key` must map to the same partition number every time. The general approach, and the one used by this library, is to hash the key and mod it by the number of partitions. There are many different algorithms to calculate a hash. By default `crc32` is used. `murmur2` is also supported for compatibility with Java based Kafka producers. +In order for semantic partitioning to work a `partition_key` must map to the same partition number every time. The general approach, and the one used by this library, is to hash the key and mod it by the number of partitions. There are many different algorithms that can be used to calculate a hash. By default `crc32` is used. `murmur2` is also supported for compatibility with Java based Kafka producers. To use `murmur2` hashing pass it as an argument to `Partitioner`. For example: From 56a0476dda3867916a4190293c502814cbc429ee Mon Sep 17 00:00:00 2001 From: Roko Kruze Date: Wed, 7 Apr 2021 14:29:03 -0700 Subject: [PATCH 26/30] Updated to ignore all control batches Control batches can show up even outside of transactions. This change drops the requirement that you must be in a transaction for a record to be labled a control batch. --- lib/kafka/protocol/record_batch.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafka/protocol/record_batch.rb b/lib/kafka/protocol/record_batch.rb index b9f6ea2c9..75c861234 100644 --- a/lib/kafka/protocol/record_batch.rb +++ b/lib/kafka/protocol/record_batch.rb @@ -213,7 +213,7 @@ def self.decode(decoder) end def mark_control_record - if in_transaction && is_control_batch + if is_control_batch record = @records.first record.is_control_record = true unless record.nil? end From 2cf27e4fd81f6e9c53c42e0e75765df10890eefe Mon Sep 17 00:00:00 2001 From: abicky Date: Sat, 12 Dec 2020 20:39:48 +0900 Subject: [PATCH 27/30] Support seed brokers' hostname with multiple addresses By default, the Java implementation of a Kafka client can use each returned IP address from a hostname. cf. https://kafka.apache.org/documentation/#client.dns.lookup librdkafka also does: > If host resolves to multiple addresses librdkafka will > round-robin the addresses for each connection attempt. cf. https://github.com/edenhill/librdkafka/blob/v1.5.3/INTRODUCTION.md#brokers Such a feature helps us to manage seed brokers' IP addresses easily, so this commit implements it. --- README.md | 6 ++++++ lib/kafka/client.rb | 9 +++++++- lib/kafka/cluster.rb | 50 +++++++++++++++++++++++++------------------- spec/cluster_spec.rb | 43 +++++++++++++++++++++++++++++++++++++ 4 files changed, 85 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 1ff5b616f..3a6541726 100644 --- a/README.md +++ b/README.md @@ -176,6 +176,12 @@ require "kafka" kafka = Kafka.new(["kafka1:9092", "kafka2:9092"], client_id: "my-application") ``` +You can also use a hostname with seed brokers' IP addresses: + +```ruby +kafka = Kafka.new("seed-brokers:9092", client_id: "my-application", resolve_seed_brokers: true) +``` + ### Producing Messages to Kafka The simplest way to write a message to a Kafka topic is to call `#deliver_message`: diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 0525f2bff..2b5c46ed2 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -74,16 +74,22 @@ class Client # the SSL certificate and the signing chain of the certificate have the correct domains # based on the CA certificate # + # @param resolve_seed_brokers [Boolean] whether to resolve each hostname of the seed brokers. + # If a broker is resolved to multiple IP addresses, the client tries to connect to each + # of the addresses until it can connect. + # # @return [Client] def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil, ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, ssl_client_cert_key_password: nil, ssl_client_cert_chain: nil, sasl_gssapi_principal: nil, sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil, sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil, - sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true) + sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true, + resolve_seed_brokers: false) @logger = TaggedLogger.new(logger) @instrumenter = Instrumenter.new(client_id: client_id) @seed_brokers = normalize_seed_brokers(seed_brokers) + @resolve_seed_brokers = resolve_seed_brokers ssl_context = SslContext.build( ca_cert_file_path: ssl_ca_cert_file_path, @@ -809,6 +815,7 @@ def initialize_cluster seed_brokers: @seed_brokers, broker_pool: broker_pool, logger: @logger, + resolve_seed_brokers: @resolve_seed_brokers, ) end diff --git a/lib/kafka/cluster.rb b/lib/kafka/cluster.rb index bc3717e8c..f72d1ff70 100644 --- a/lib/kafka/cluster.rb +++ b/lib/kafka/cluster.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "kafka/broker_pool" +require "resolv" require "set" module Kafka @@ -18,7 +19,8 @@ class Cluster # @param seed_brokers [Array] # @param broker_pool [Kafka::BrokerPool] # @param logger [Logger] - def initialize(seed_brokers:, broker_pool:, logger:) + # @param resolve_seed_brokers [Boolean] See {Kafka::Client#initialize} + def initialize(seed_brokers:, broker_pool:, logger:, resolve_seed_brokers: false) if seed_brokers.empty? raise ArgumentError, "At least one seed broker must be configured" end @@ -26,6 +28,7 @@ def initialize(seed_brokers:, broker_pool:, logger:) @logger = TaggedLogger.new(logger) @seed_brokers = seed_brokers @broker_pool = broker_pool + @resolve_seed_brokers = resolve_seed_brokers @cluster_info = nil @stale = true @@ -418,32 +421,35 @@ def get_leader_id(topic, partition) # @return [Protocol::MetadataResponse] the cluster metadata. def fetch_cluster_info errors = [] - @seed_brokers.shuffle.each do |node| - @logger.info "Fetching cluster metadata from #{node}" - - begin - broker = @broker_pool.connect(node.hostname, node.port) - cluster_info = broker.fetch_metadata(topics: @target_topics) - - if cluster_info.brokers.empty? - @logger.error "No brokers in cluster" - else - @logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}" - - @stale = false - - return cluster_info + (@resolve_seed_brokers ? Resolv.getaddresses(node.hostname).shuffle : [node.hostname]).each do |hostname_or_ip| + node_info = node.to_s + node_info << " (#{hostname_or_ip})" if node.hostname != hostname_or_ip + @logger.info "Fetching cluster metadata from #{node_info}" + + begin + broker = @broker_pool.connect(hostname_or_ip, node.port) + cluster_info = broker.fetch_metadata(topics: @target_topics) + + if cluster_info.brokers.empty? + @logger.error "No brokers in cluster" + else + @logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}" + + @stale = false + + return cluster_info + end + rescue Error => e + @logger.error "Failed to fetch metadata from #{node_info}: #{e}" + errors << [node_info, e] + ensure + broker.disconnect unless broker.nil? end - rescue Error => e - @logger.error "Failed to fetch metadata from #{node}: #{e}" - errors << [node, e] - ensure - broker.disconnect unless broker.nil? end end - error_description = errors.map {|node, exception| "- #{node}: #{exception}" }.join("\n") + error_description = errors.map {|node_info, exception| "- #{node_info}: #{exception}" }.join("\n") raise ConnectionError, "Could not connect to any of the seed brokers:\n#{error_description}" end diff --git a/spec/cluster_spec.rb b/spec/cluster_spec.rb index acefa1f62..9d1d7d32d 100644 --- a/spec/cluster_spec.rb +++ b/spec/cluster_spec.rb @@ -103,4 +103,47 @@ }.to raise_exception(ArgumentError) end end + + describe "#cluster_info" do + let(:cluster) { + Kafka::Cluster.new( + seed_brokers: [URI("kafka://test1:9092")], + broker_pool: broker_pool, + logger: LOGGER, + resolve_seed_brokers: resolve_seed_brokers, + ) + } + + before do + allow(broker).to receive(:fetch_metadata) { raise Kafka::ConnectionError, "Operation timed out" } + allow(broker).to receive(:disconnect) + end + + context "when resolve_seed_brokers is false" do + let(:resolve_seed_brokers) { false } + + it "tries the seed broker hostnames as is" do + expect(broker_pool).to receive(:connect).with("test1", 9092) { broker } + expect { + cluster.cluster_info + }.to raise_error(Kafka::ConnectionError, %r{kafka://test1:9092: Operation timed out}) + end + end + + context "when resolve_seed_brokers is true" do + let(:resolve_seed_brokers) { true } + + before do + allow(Resolv).to receive(:getaddresses) { ["127.0.0.1", "::1"] } + end + + it "tries all the resolved IP addresses" do + expect(broker_pool).to receive(:connect).with("127.0.0.1", 9092) { broker } + expect(broker_pool).to receive(:connect).with("::1", 9092) { broker } + expect { + cluster.cluster_info + }.to raise_error(Kafka::ConnectionError, %r{kafka://test1:9092 \(127\.0\.0\.1\): Operation timed out}) + end + end + end end From f07b4c122d03dca3c789c452ca36885e3e120b6f Mon Sep 17 00:00:00 2001 From: abicky Date: Thu, 15 Apr 2021 05:09:31 +0900 Subject: [PATCH 28/30] Fix "@param tag has unknown parameter name" This commit resolves the following warnings: ``` % yard doc [warn]: @param tag has unknown parameter name: group_id: in file `lib/kafka/cluster.rb' near line 122 [warn]: @param tag has unknown parameter name: transactional_id: in file `lib/kafka/cluster.rb' near line 132 [warn]: @param tag has unknown parameter name: string in file `lib/kafka/protocol/encoder.rb' near line 131 -- snip -- ``` --- lib/kafka/cluster.rb | 4 ++-- lib/kafka/protocol/encoder.rb | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/kafka/cluster.rb b/lib/kafka/cluster.rb index bc3717e8c..98c813bb9 100644 --- a/lib/kafka/cluster.rb +++ b/lib/kafka/cluster.rb @@ -117,7 +117,7 @@ def get_leader(topic, partition) # Finds the broker acting as the coordinator of the given group. # - # @param group_id: [String] + # @param group_id [String] # @return [Broker] the broker that's currently coordinator. def get_group_coordinator(group_id:) @logger.debug "Getting group coordinator for `#{group_id}`" @@ -127,7 +127,7 @@ def get_group_coordinator(group_id:) # Finds the broker acting as the coordinator of the given transaction. # - # @param transactional_id: [String] + # @param transactional_id [String] # @return [Broker] the broker that's currently coordinator. def get_transaction_coordinator(transactional_id:) @logger.debug "Getting transaction coordinator for `#{transactional_id}`" diff --git a/lib/kafka/protocol/encoder.rb b/lib/kafka/protocol/encoder.rb index d4bc7a391..56a584a01 100644 --- a/lib/kafka/protocol/encoder.rb +++ b/lib/kafka/protocol/encoder.rb @@ -126,7 +126,7 @@ def write_varint_string(string) # Writes an integer under varints serializing to the IO object. # https://developers.google.com/protocol-buffers/docs/encoding#varints # - # @param string [Integer] + # @param int [Integer] # @return [nil] def write_varint(int) int = int << 1 From ef7c21c8828f51368f914ebac488cda87918cd0f Mon Sep 17 00:00:00 2001 From: abicky Date: Fri, 16 Apr 2021 23:04:03 +0900 Subject: [PATCH 29/30] Add resolve_seed_brokers option to changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e1db8b5aa..3dcb0e7e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ Changes and additions to the library will be listed here. - Fix `Kafka::TransactionManager#send_offsets_to_txn` (#866). - Add support for `murmur2` based partitioning. +- Add `resolve_seed_brokers` option to support seed brokers' hostname with multiple addresses (#877). ## 1.3.0 From d9cb8d1758d6d840e42117e25c22be656e18e150 Mon Sep 17 00:00:00 2001 From: Brent Wheeldon Date: Wed, 14 Apr 2021 15:54:03 -0400 Subject: [PATCH 30/30] Handle SyncGroup responses with a non-zero error and no assignments When the error code is non-zero there isn't necessarily (ever?) an assignment. Currently the decoder raises a `TypeError` ("no implicit conversion of nil into String") - this updates the logic to only try to decode the assignment if there are any bytes. --- CHANGELOG.md | 1 + lib/kafka/protocol/sync_group_response.rb | 7 ++++-- spec/protocol/sync_group_response_spec.rb | 28 +++++++++++++++++++++++ 3 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 spec/protocol/sync_group_response_spec.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 3dcb0e7e8..04ff2d26e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ Changes and additions to the library will be listed here. - Fix `Kafka::TransactionManager#send_offsets_to_txn` (#866). - Add support for `murmur2` based partitioning. - Add `resolve_seed_brokers` option to support seed brokers' hostname with multiple addresses (#877). +- Handle SyncGroup responses with a non-zero error and no assignments (#896). ## 1.3.0 diff --git a/lib/kafka/protocol/sync_group_response.rb b/lib/kafka/protocol/sync_group_response.rb index a1e4aab83..148945095 100644 --- a/lib/kafka/protocol/sync_group_response.rb +++ b/lib/kafka/protocol/sync_group_response.rb @@ -13,9 +13,12 @@ def initialize(error_code:, member_assignment:) end def self.decode(decoder) + error_code = decoder.int16 + member_assignment_bytes = decoder.bytes + new( - error_code: decoder.int16, - member_assignment: MemberAssignment.decode(Decoder.from_string(decoder.bytes)), + error_code: error_code, + member_assignment: member_assignment_bytes ? MemberAssignment.decode(Decoder.from_string(member_assignment_bytes)) : nil ) end end diff --git a/spec/protocol/sync_group_response_spec.rb b/spec/protocol/sync_group_response_spec.rb new file mode 100644 index 000000000..8979f244c --- /dev/null +++ b/spec/protocol/sync_group_response_spec.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +describe Kafka::Protocol::SyncGroupResponse do + describe ".decode" do + subject(:response) { Kafka::Protocol::SyncGroupResponse.decode(decoder) } + + let(:decoder) { Kafka::Protocol::Decoder.new(buffer) } + let(:buffer) { StringIO.new(response_bytes) } + + context "the response is successful" do + let(:response_bytes) { "\x00\x00\x00\x00\x007\x00\x00\x00\x00\x00\x01\x00\x1Fsome-topic-f064d6897583eb395896\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\xFF\xFF\xFF\xFF" } + + it "decodes the response including the member assignment" do + expect(response.error_code).to eq 0 + expect(response.member_assignment.topics).to eq({ "some-topic-f064d6897583eb395896" => [0, 1] }) + end + end + + context "the response is not successful" do + let(:response_bytes) { "\x00\x19\xFF\xFF\xFF\xFF" } + + it "decodes the response including the member assignment" do + expect(response.error_code).to eq 25 + expect(response.member_assignment).to be_nil + end + end + end +end