diff --git a/.circleci/config.yml b/.circleci/config.yml index 9b78abdd1..b3fca3f82 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -7,6 +7,7 @@ jobs: LOG_LEVEL: DEBUG steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec - run: bundle exec rubocop @@ -40,6 +41,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -72,6 +74,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -104,6 +107,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -136,6 +140,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -168,6 +173,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -200,6 +206,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -232,6 +239,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -264,6 +272,7 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -296,6 +305,75 @@ jobs: KAFKA_DELETE_TOPIC_ENABLE: true steps: - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy + - run: bundle install --path vendor/bundle + - run: bundle exec rspec --profile --tag functional spec/functional + + kafka-2.6: + docker: + - image: circleci/ruby:2.5.1-node + environment: + LOG_LEVEL: DEBUG + - image: wurstmeister/zookeeper + - image: wurstmeister/kafka:2.13-2.6.0 + environment: + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ADVERTISED_PORT: 9092 + KAFKA_PORT: 9092 + KAFKA_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_DELETE_TOPIC_ENABLE: true + - image: wurstmeister/kafka:2.13-2.6.0 + environment: + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ADVERTISED_PORT: 9093 + KAFKA_PORT: 9093 + KAFKA_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_DELETE_TOPIC_ENABLE: true + - image: wurstmeister/kafka:2.13-2.6.0 + environment: + KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ADVERTISED_PORT: 9094 + KAFKA_PORT: 9094 + KAFKA_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_DELETE_TOPIC_ENABLE: true + steps: + - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy + - run: bundle install --path vendor/bundle + - run: bundle exec rspec --profile --tag functional spec/functional + + kafka-2.7: + docker: + - image: circleci/ruby:2.5.1-node + environment: + LOG_LEVEL: DEBUG + - image: bitnami/zookeeper + environment: + ALLOW_ANONYMOUS_LOGIN: yes + - image: bitnami/kafka:2.7.0 + environment: + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_CFG_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_DELETE_TOPIC_ENABLE: true + - image: bitnami/kafka:2.7.0 + environment: + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9093 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093 + KAFKA_CFG_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_DELETE_TOPIC_ENABLE: true + - image: bitnami/kafka:2.7.0 + environment: + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9094 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094 + KAFKA_CFG_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_DELETE_TOPIC_ENABLE: true + steps: + - checkout + - run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy - run: bundle install --path vendor/bundle - run: bundle exec rspec --profile --tag functional spec/functional @@ -313,3 +391,5 @@ workflows: - kafka-2.3 - kafka-2.4 - kafka-2.5 + - kafka-2.6 + - kafka-2.7 diff --git a/CHANGELOG.md b/CHANGELOG.md index 1084c6e1d..33caef010 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,21 @@ Changes and additions to the library will be listed here. ## Unreleased +## 1.5.0 +- Add support for AWS IAM Authentication to an MSK cluster (#907). +- Added session token to the IAM mechanism; necessary for auth via temporary credentials (#937) + +## 1.4.0 + +- Refresh a stale cluster's metadata if necessary on `Kafka::Client#deliver_message` (#901). +- Fix `Kafka::TransactionManager#send_offsets_to_txn` (#866). +- Add support for `murmur2` based partitioning. +- Add `resolve_seed_brokers` option to support seed brokers' hostname with multiple addresses (#877). +- Handle SyncGroup responses with a non-zero error and no assignments (#896). +- Add support for non-identical topic subscriptions within the same consumer group (#525 / #764). + +## 1.3.0 + - Support custom assignment strategy (#846). - Improved Exceptions in TransactionManager (#862). diff --git a/README.md b/README.md index e91f5874d..27ba31332 100644 --- a/README.md +++ b/README.md @@ -129,6 +129,16 @@ Or install it yourself as: Limited support Limited support + + Kafka 2.6 + Limited support + Limited support + + + Kafka 2.7 + Limited support + Limited support + This library is targeting Kafka 0.9 with the v0.4.x series and Kafka 0.10 with the v0.5.x series. There's limited support for Kafka 0.8, and things should work with Kafka 0.11, although there may be performance issues due to changes in the protocol. @@ -144,6 +154,8 @@ This library is targeting Kafka 0.9 with the v0.4.x series and Kafka 0.10 with t - **Kafka 2.3:** Everything that works with Kafka 2.2 should still work, but so far no features specific to Kafka 2.3 have been added. - **Kafka 2.4:** Everything that works with Kafka 2.3 should still work, but so far no features specific to Kafka 2.4 have been added. - **Kafka 2.5:** Everything that works with Kafka 2.4 should still work, but so far no features specific to Kafka 2.5 have been added. +- **Kafka 2.6:** Everything that works with Kafka 2.5 should still work, but so far no features specific to Kafka 2.6 have been added. +- **Kafka 2.7:** Everything that works with Kafka 2.6 should still work, but so far no features specific to Kafka 2.7 have been added. This library requires Ruby 2.1 or higher. @@ -164,6 +176,12 @@ require "kafka" kafka = Kafka.new(["kafka1:9092", "kafka2:9092"], client_id: "my-application") ``` +You can also use a hostname with seed brokers' IP addresses: + +```ruby +kafka = Kafka.new("seed-brokers:9092", client_id: "my-application", resolve_seed_brokers: true) +``` + ### Producing Messages to Kafka The simplest way to write a message to a Kafka topic is to call `#deliver_message`: @@ -370,6 +388,16 @@ partitioner = -> (partition_count, message) { ... } Kafka.new(partitioner: partitioner, ...) ``` +##### Supported partitioning schemes + +In order for semantic partitioning to work a `partition_key` must map to the same partition number every time. The general approach, and the one used by this library, is to hash the key and mod it by the number of partitions. There are many different algorithms that can be used to calculate a hash. By default `crc32` is used. `murmur2` is also supported for compatibility with Java based Kafka producers. + +To use `murmur2` hashing pass it as an argument to `Partitioner`. For example: + +```ruby +Kafka.new(partitioner: Kafka::Partitioner.new(hash_function: :murmur2)) +``` + #### Buffering and Error Handling The producer is designed for resilience in the face of temporary network errors, Kafka broker failovers, and other issues that prevent the client from writing messages to the destination topics. It does this by employing local, in-memory buffers. Only when messages are acknowledged by a Kafka broker will they be removed from the buffer. @@ -1093,6 +1121,20 @@ kafka = Kafka.new( ) ``` +##### AWS MSK (IAM) +In order to authenticate using IAM w/ an AWS MSK cluster, set your access key, secret key, and region when initializing the Kafka client: + +```ruby +k = Kafka.new( + ["kafka1:9092"], + sasl_aws_msk_iam_access_key_id: 'iam_access_key', + sasl_aws_msk_iam_secret_key_id: 'iam_secret_key', + sasl_aws_msk_iam_aws_region: 'us-west-2', + ssl_ca_certs_from_system: true, + # ... +) +``` + ##### PLAIN In order to authenticate using PLAIN, you must set your username and password when initializing the Kafka client: diff --git a/lib/kafka/async_producer.rb b/lib/kafka/async_producer.rb index a07aed54b..8c87e9d99 100644 --- a/lib/kafka/async_producer.rb +++ b/lib/kafka/async_producer.rb @@ -212,31 +212,45 @@ def run @logger.push_tags(@producer.to_s) @logger.info "Starting async producer in the background..." + do_loop + rescue Exception => e + @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" + @logger.error "Async producer crashed!" + ensure + @producer.shutdown + @logger.pop_tags + end + + private + + def do_loop loop do - operation, payload = @queue.pop - - case operation - when :produce - produce(payload[0], **payload[1]) - deliver_messages if threshold_reached? - when :deliver_messages - deliver_messages - when :shutdown - begin - # Deliver any pending messages first. - @producer.deliver_messages - rescue Error => e - @logger.error("Failed to deliver messages during shutdown: #{e.message}") - - @instrumenter.instrument("drop_messages.async_producer", { - message_count: @producer.buffer_size + @queue.size, - }) + begin + operation, payload = @queue.pop + + case operation + when :produce + produce(payload[0], **payload[1]) + deliver_messages if threshold_reached? + when :deliver_messages + deliver_messages + when :shutdown + begin + # Deliver any pending messages first. + @producer.deliver_messages + rescue Error => e + @logger.error("Failed to deliver messages during shutdown: #{e.message}") + + @instrumenter.instrument("drop_messages.async_producer", { + message_count: @producer.buffer_size + @queue.size, + }) + end + + # Stop the run loop. + break + else + raise "Unknown operation #{operation.inspect}" end - - # Stop the run loop. - break - else - raise "Unknown operation #{operation.inspect}" end end rescue Kafka::Error => e @@ -245,16 +259,8 @@ def run sleep 10 retry - rescue Exception => e - @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" - @logger.error "Async producer crashed!" - ensure - @producer.shutdown - @logger.pop_tags end - private - def produce(value, **kwargs) retries = 0 begin diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 23829e0ed..28b3d01ea 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -1,3 +1,4 @@ +# coding: utf-8 # frozen_string_literal: true require "kafka/ssl_context" @@ -38,8 +39,8 @@ class Client # @param ssl_ca_cert [String, Array, nil] a PEM encoded CA cert, or an Array of # PEM encoded CA certs, to use with an SSL connection. # - # @param ssl_ca_cert_file_path [String, nil] a path on the filesystem to a PEM encoded CA cert - # to use with an SSL connection. + # @param ssl_ca_cert_file_path [String, Array, nil] a path on the filesystem, or an + # Array of paths, to PEM encoded CA cert(s) to use with an SSL connection. # # @param ssl_client_cert [String, nil] a PEM encoded client cert to use with an # SSL connection. Must be used in combination with ssl_client_cert_key. @@ -74,16 +75,26 @@ class Client # the SSL certificate and the signing chain of the certificate have the correct domains # based on the CA certificate # + # @param resolve_seed_brokers [Boolean] whether to resolve each hostname of the seed brokers. + # If a broker is resolved to multiple IP addresses, the client tries to connect to each + # of the addresses until it can connect. + # # @return [Client] def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil, ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, ssl_client_cert_key_password: nil, ssl_client_cert_chain: nil, sasl_gssapi_principal: nil, sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil, sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil, - sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true) + sasl_aws_msk_iam_access_key_id: nil, + sasl_aws_msk_iam_secret_key_id: nil, + sasl_aws_msk_iam_aws_region: nil, + sasl_aws_msk_iam_session_token: nil, + sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true, + resolve_seed_brokers: false) @logger = TaggedLogger.new(logger) @instrumenter = Instrumenter.new(client_id: client_id) @seed_brokers = normalize_seed_brokers(seed_brokers) + @resolve_seed_brokers = resolve_seed_brokers ssl_context = SslContext.build( ca_cert_file_path: ssl_ca_cert_file_path, @@ -105,6 +116,10 @@ def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_time sasl_scram_username: sasl_scram_username, sasl_scram_password: sasl_scram_password, sasl_scram_mechanism: sasl_scram_mechanism, + sasl_aws_msk_iam_access_key_id: sasl_aws_msk_iam_access_key_id, + sasl_aws_msk_iam_secret_key_id: sasl_aws_msk_iam_secret_key_id, + sasl_aws_msk_iam_aws_region: sasl_aws_msk_iam_aws_region, + sasl_aws_msk_iam_session_token: sasl_aws_msk_iam_session_token, sasl_oauth_token_provider: sasl_oauth_token_provider, logger: @logger ) @@ -204,6 +219,8 @@ def deliver_message(value, key: nil, headers: {}, topic:, partition: nil, partit attempt = 1 begin + @cluster.refresh_metadata_if_necessary! + operation.execute unless buffer.empty? @@ -811,6 +828,7 @@ def initialize_cluster seed_brokers: @seed_brokers, broker_pool: broker_pool, logger: @logger, + resolve_seed_brokers: @resolve_seed_brokers, ) end diff --git a/lib/kafka/cluster.rb b/lib/kafka/cluster.rb index bc3717e8c..8aaf19154 100644 --- a/lib/kafka/cluster.rb +++ b/lib/kafka/cluster.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "kafka/broker_pool" +require "resolv" require "set" module Kafka @@ -18,7 +19,8 @@ class Cluster # @param seed_brokers [Array] # @param broker_pool [Kafka::BrokerPool] # @param logger [Logger] - def initialize(seed_brokers:, broker_pool:, logger:) + # @param resolve_seed_brokers [Boolean] See {Kafka::Client#initialize} + def initialize(seed_brokers:, broker_pool:, logger:, resolve_seed_brokers: false) if seed_brokers.empty? raise ArgumentError, "At least one seed broker must be configured" end @@ -26,6 +28,7 @@ def initialize(seed_brokers:, broker_pool:, logger:) @logger = TaggedLogger.new(logger) @seed_brokers = seed_brokers @broker_pool = broker_pool + @resolve_seed_brokers = resolve_seed_brokers @cluster_info = nil @stale = true @@ -117,7 +120,7 @@ def get_leader(topic, partition) # Finds the broker acting as the coordinator of the given group. # - # @param group_id: [String] + # @param group_id [String] # @return [Broker] the broker that's currently coordinator. def get_group_coordinator(group_id:) @logger.debug "Getting group coordinator for `#{group_id}`" @@ -127,7 +130,7 @@ def get_group_coordinator(group_id:) # Finds the broker acting as the coordinator of the given transaction. # - # @param transactional_id: [String] + # @param transactional_id [String] # @return [Broker] the broker that's currently coordinator. def get_transaction_coordinator(transactional_id:) @logger.debug "Getting transaction coordinator for `#{transactional_id}`" @@ -418,32 +421,35 @@ def get_leader_id(topic, partition) # @return [Protocol::MetadataResponse] the cluster metadata. def fetch_cluster_info errors = [] - @seed_brokers.shuffle.each do |node| - @logger.info "Fetching cluster metadata from #{node}" - - begin - broker = @broker_pool.connect(node.hostname, node.port) - cluster_info = broker.fetch_metadata(topics: @target_topics) - - if cluster_info.brokers.empty? - @logger.error "No brokers in cluster" - else - @logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}" - - @stale = false - - return cluster_info + (@resolve_seed_brokers ? Resolv.getaddresses(node.hostname).shuffle : [node.hostname]).each do |hostname_or_ip| + node_info = node.to_s + node_info << " (#{hostname_or_ip})" if node.hostname != hostname_or_ip + @logger.info "Fetching cluster metadata from #{node_info}" + + begin + broker = @broker_pool.connect(hostname_or_ip, node.port) + cluster_info = broker.fetch_metadata(topics: @target_topics) + + if cluster_info.brokers.empty? + @logger.error "No brokers in cluster" + else + @logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}" + + @stale = false + + return cluster_info + end + rescue Error => e + @logger.error "Failed to fetch metadata from #{node_info}: #{e}" + errors << [node_info, e] + ensure + broker.disconnect unless broker.nil? end - rescue Error => e - @logger.error "Failed to fetch metadata from #{node}: #{e}" - errors << [node, e] - ensure - broker.disconnect unless broker.nil? end end - error_description = errors.map {|node, exception| "- #{node}: #{exception}" }.join("\n") + error_description = errors.map {|node_info, exception| "- #{node_info}: #{exception}" }.join("\n") raise ConnectionError, "Could not connect to any of the seed brokers:\n#{error_description}" end diff --git a/lib/kafka/consumer_group.rb b/lib/kafka/consumer_group.rb index 1ad1855f8..ac25626fd 100644 --- a/lib/kafka/consumer_group.rb +++ b/lib/kafka/consumer_group.rb @@ -189,9 +189,14 @@ def synchronize if group_leader? @logger.info "Chosen as leader of group `#{@group_id}`" + topics = Set.new + @members.each do |_member, metadata| + metadata.topics.each { |t| topics.add(t) } + end + group_assignment = @assignor.assign( members: @members, - topics: @topics, + topics: topics, ) end diff --git a/lib/kafka/crc32_hash.rb b/lib/kafka/crc32_hash.rb new file mode 100644 index 000000000..1849008a6 --- /dev/null +++ b/lib/kafka/crc32_hash.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +require "zlib" + +module Kafka + class Crc32Hash + + # crc32 is supported natively + def load; end + + def hash(value) + Zlib.crc32(value) + end + end +end diff --git a/lib/kafka/datadog.rb b/lib/kafka/datadog.rb index a80e9302b..70ee762dc 100644 --- a/lib/kafka/datadog.rb +++ b/lib/kafka/datadog.rb @@ -288,13 +288,13 @@ def produce_message(event) } # This gets us the write rate. - increment("producer.produce.messages", tags: tags.merge(topic: topic)) + increment("producer.produce.messages", tags: tags) # Information about typical/average/95p message size. - histogram("producer.produce.message_size", message_size, tags: tags.merge(topic: topic)) + histogram("producer.produce.message_size", message_size, tags: tags) # Aggregate message size. - count("producer.produce.message_size.sum", message_size, tags: tags.merge(topic: topic)) + count("producer.produce.message_size.sum", message_size, tags: tags) # This gets us the avg/max buffer size per producer. histogram("producer.buffer.size", buffer_size, tags: tags) diff --git a/lib/kafka/digest.rb b/lib/kafka/digest.rb new file mode 100644 index 000000000..8ba4cc206 --- /dev/null +++ b/lib/kafka/digest.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +require "kafka/crc32_hash" +require "kafka/murmur2_hash" + +module Kafka + module Digest + FUNCTIONS_BY_NAME = { + :crc32 => Crc32Hash.new, + :murmur2 => Murmur2Hash.new + }.freeze + + def self.find_digest(name) + digest = FUNCTIONS_BY_NAME.fetch(name) do + raise LoadError, "Unknown hash function #{name}" + end + + digest.load + digest + end + end +end diff --git a/lib/kafka/murmur2_hash.rb b/lib/kafka/murmur2_hash.rb new file mode 100644 index 000000000..a6223b0d6 --- /dev/null +++ b/lib/kafka/murmur2_hash.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +module Kafka + class Murmur2Hash + SEED = [0x9747b28c].pack('L') + + def load + require 'digest/murmurhash' + rescue LoadError + raise LoadError, "using murmur2 hashing requires adding a dependency on the `digest-murmurhash` gem to your Gemfile." + end + + def hash(value) + ::Digest::MurmurHash2.rawdigest(value, SEED) & 0x7fffffff + end + end +end diff --git a/lib/kafka/partitioner.rb b/lib/kafka/partitioner.rb index f4fcd2882..e11052442 100644 --- a/lib/kafka/partitioner.rb +++ b/lib/kafka/partitioner.rb @@ -1,11 +1,16 @@ # frozen_string_literal: true -require "zlib" +require "kafka/digest" module Kafka # Assigns partitions to messages. class Partitioner + # @param hash_function [Symbol, nil] the algorithm used to compute a messages + # destination partition. Default is :crc32 + def initialize(hash_function: nil) + @digest = Digest.find_digest(hash_function || :crc32) + end # Assigns a partition number based on a partition key. If no explicit # partition key is provided, the message key will be used instead. @@ -28,7 +33,7 @@ def call(partition_count, message) if key.nil? rand(partition_count) else - Zlib.crc32(key) % partition_count + @digest.hash(key) % partition_count end end end diff --git a/lib/kafka/protocol/add_offsets_to_txn_response.rb b/lib/kafka/protocol/add_offsets_to_txn_response.rb index 830613dfb..7ac824cd7 100644 --- a/lib/kafka/protocol/add_offsets_to_txn_response.rb +++ b/lib/kafka/protocol/add_offsets_to_txn_response.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + module Kafka module Protocol class AddOffsetsToTxnResponse diff --git a/lib/kafka/protocol/encoder.rb b/lib/kafka/protocol/encoder.rb index d4bc7a391..56a584a01 100644 --- a/lib/kafka/protocol/encoder.rb +++ b/lib/kafka/protocol/encoder.rb @@ -126,7 +126,7 @@ def write_varint_string(string) # Writes an integer under varints serializing to the IO object. # https://developers.google.com/protocol-buffers/docs/encoding#varints # - # @param string [Integer] + # @param int [Integer] # @return [nil] def write_varint(int) int = int << 1 diff --git a/lib/kafka/protocol/record_batch.rb b/lib/kafka/protocol/record_batch.rb index 4201cc737..75c861234 100644 --- a/lib/kafka/protocol/record_batch.rb +++ b/lib/kafka/protocol/record_batch.rb @@ -77,7 +77,7 @@ def encode(encoder) record_batch_encoder.write_int8(MAGIC_BYTE) body = encode_record_batch_body - crc = Digest::CRC32c.checksum(body) + crc = ::Digest::CRC32c.checksum(body) record_batch_encoder.write_int32(crc) record_batch_encoder.write(body) @@ -213,7 +213,7 @@ def self.decode(decoder) end def mark_control_record - if in_transaction && is_control_batch + if is_control_batch record = @records.first record.is_control_record = true unless record.nil? end diff --git a/lib/kafka/protocol/sasl_handshake_request.rb b/lib/kafka/protocol/sasl_handshake_request.rb index 9cf9087ab..5d6bdff74 100644 --- a/lib/kafka/protocol/sasl_handshake_request.rb +++ b/lib/kafka/protocol/sasl_handshake_request.rb @@ -8,7 +8,7 @@ module Protocol class SaslHandshakeRequest - SUPPORTED_MECHANISMS = %w(GSSAPI PLAIN SCRAM-SHA-256 SCRAM-SHA-512 OAUTHBEARER) + SUPPORTED_MECHANISMS = %w(AWS_MSK_IAM GSSAPI PLAIN SCRAM-SHA-256 SCRAM-SHA-512 OAUTHBEARER) def initialize(mechanism) unless SUPPORTED_MECHANISMS.include?(mechanism) diff --git a/lib/kafka/protocol/sync_group_response.rb b/lib/kafka/protocol/sync_group_response.rb index a1e4aab83..148945095 100644 --- a/lib/kafka/protocol/sync_group_response.rb +++ b/lib/kafka/protocol/sync_group_response.rb @@ -13,9 +13,12 @@ def initialize(error_code:, member_assignment:) end def self.decode(decoder) + error_code = decoder.int16 + member_assignment_bytes = decoder.bytes + new( - error_code: decoder.int16, - member_assignment: MemberAssignment.decode(Decoder.from_string(decoder.bytes)), + error_code: error_code, + member_assignment: member_assignment_bytes ? MemberAssignment.decode(Decoder.from_string(member_assignment_bytes)) : nil ) end end diff --git a/lib/kafka/protocol/txn_offset_commit_response.rb b/lib/kafka/protocol/txn_offset_commit_response.rb index 5bd3363fc..628af7d66 100644 --- a/lib/kafka/protocol/txn_offset_commit_response.rb +++ b/lib/kafka/protocol/txn_offset_commit_response.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + module Kafka module Protocol class TxnOffsetCommitResponse diff --git a/lib/kafka/round_robin_assignment_strategy.rb b/lib/kafka/round_robin_assignment_strategy.rb index 490929daf..2d46ad824 100644 --- a/lib/kafka/round_robin_assignment_strategy.rb +++ b/lib/kafka/round_robin_assignment_strategy.rb @@ -1,9 +1,9 @@ -# frozen_string_literal: true - module Kafka - # A consumer group partition assignment strategy that assigns partitions to - # consumers in a round-robin fashion. + # A round robin assignment strategy inpired on the + # original java client round robin assignor. It's capable + # of handling identical as well as different topic subscriptions + # accross the same consumer group. class RoundRobinAssignmentStrategy def protocol_name "roundrobin" @@ -19,13 +19,34 @@ def protocol_name # @return [Hash] a hash # mapping member ids to partitions. def call(cluster:, members:, partitions:) - member_ids = members.keys partitions_per_member = Hash.new {|h, k| h[k] = [] } - partitions.each_with_index do |partition, index| - partitions_per_member[member_ids[index % member_ids.count]] << partition + relevant_partitions = valid_sorted_partitions(members, partitions) + members_ids = members.keys + iterator = (0...members.size).cycle + idx = iterator.next + + relevant_partitions.each do |partition| + topic = partition.topic + + while !members[members_ids[idx]].topics.include?(topic) + idx = iterator.next + end + + partitions_per_member[members_ids[idx]] << partition + idx = iterator.next end partitions_per_member end + + def valid_sorted_partitions(members, partitions) + subscribed_topics = members.map do |id, metadata| + metadata && metadata.topics + end.flatten.compact + + partitions + .select { |partition| subscribed_topics.include?(partition.topic) } + .sort_by { |partition| partition.topic } + end end end diff --git a/lib/kafka/sasl/awsmskiam.rb b/lib/kafka/sasl/awsmskiam.rb new file mode 100644 index 000000000..b56c90011 --- /dev/null +++ b/lib/kafka/sasl/awsmskiam.rb @@ -0,0 +1,133 @@ +# frozen_string_literal: true + +require 'securerandom' +require 'base64' +require 'json' + +module Kafka + module Sasl + class AwsMskIam + AWS_MSK_IAM = "AWS_MSK_IAM" + + def initialize(aws_region:, access_key_id:, secret_key_id:, session_token: nil, logger:) + @semaphore = Mutex.new + + @aws_region = aws_region + @access_key_id = access_key_id + @secret_key_id = secret_key_id + @session_token = session_token + @logger = TaggedLogger.new(logger) + end + + def ident + AWS_MSK_IAM + end + + def configured? + @aws_region && @access_key_id && @secret_key_id + end + + def authenticate!(host, encoder, decoder) + @logger.debug "Authenticating #{@access_key_id} with SASL #{AWS_MSK_IAM}" + + host_without_port = host.split(':', -1).first + + time_now = Time.now.utc + + msg = authentication_payload(host: host_without_port, time_now: time_now) + @logger.debug "Sending first client SASL AWS_MSK_IAM message:" + @logger.debug msg + encoder.write_bytes(msg) + + begin + @server_first_message = decoder.bytes + @logger.debug "Received first server SASL AWS_MSK_IAM message: #{@server_first_message}" + + raise Kafka::Error, "SASL AWS_MSK_IAM authentication failed: unknown error" unless @server_first_message + rescue Errno::ETIMEDOUT, EOFError => e + raise Kafka::Error, "SASL AWS_MSK_IAM authentication failed: #{e.message}" + end + + @logger.debug "SASL #{AWS_MSK_IAM} authentication successful" + end + + private + + def bin_to_hex(s) + s.each_byte.map { |b| b.to_s(16).rjust(2, '0') }.join + end + + def digest + @digest ||= OpenSSL::Digest::SHA256.new + end + + def authentication_payload(host:, time_now:) + { + 'version' => "2020_10_22", + 'host' => host, + 'user-agent' => "ruby-kafka", + 'action' => "kafka-cluster:Connect", + 'x-amz-algorithm' => "AWS4-HMAC-SHA256", + 'x-amz-credential' => @access_key_id + "/" + time_now.strftime("%Y%m%d") + "/" + @aws_region + "/kafka-cluster/aws4_request", + 'x-amz-date' => time_now.strftime("%Y%m%dT%H%M%SZ"), + 'x-amz-signedheaders' => "host", + 'x-amz-expires' => "900", + 'x-amz-security-token' => @session_token, + 'x-amz-signature' => signature(host: host, time_now: time_now) + }.delete_if { |_, v| v.nil? }.to_json + end + + def canonical_request(host:, time_now:) + "GET\n" + + "/\n" + + canonical_query_string(time_now: time_now) + "\n" + + canonical_headers(host: host) + "\n" + + signed_headers + "\n" + + hashed_payload + end + + def canonical_query_string(time_now:) + params = { + "Action" => "kafka-cluster:Connect", + "X-Amz-Algorithm" => "AWS4-HMAC-SHA256", + "X-Amz-Credential" => @access_key_id + "/" + time_now.strftime("%Y%m%d") + "/" + @aws_region + "/kafka-cluster/aws4_request", + "X-Amz-Date" => time_now.strftime("%Y%m%dT%H%M%SZ"), + "X-Amz-Expires" => "900", + "X-Amz-Security-Token" => @session_token, + "X-Amz-SignedHeaders" => "host" + }.delete_if { |_, v| v.nil? } + + URI.encode_www_form(params) + end + + def canonical_headers(host:) + "host" + ":" + host + "\n" + end + + def signed_headers + "host" + end + + def hashed_payload + bin_to_hex(digest.digest("")) + end + + def string_to_sign(host:, time_now:) + "AWS4-HMAC-SHA256" + "\n" + + time_now.strftime("%Y%m%dT%H%M%SZ") + "\n" + + time_now.strftime("%Y%m%d") + "/" + @aws_region + "/kafka-cluster/aws4_request" + "\n" + + bin_to_hex(digest.digest(canonical_request(host: host, time_now: time_now))) + end + + def signature(host:, time_now:) + date_key = OpenSSL::HMAC.digest("SHA256", "AWS4" + @secret_key_id, time_now.strftime("%Y%m%d")) + date_region_key = OpenSSL::HMAC.digest("SHA256", date_key, @aws_region) + date_region_service_key = OpenSSL::HMAC.digest("SHA256", date_region_key, "kafka-cluster") + signing_key = OpenSSL::HMAC.digest("SHA256", date_region_service_key, "aws4_request") + signature = bin_to_hex(OpenSSL::HMAC.digest("SHA256", signing_key, string_to_sign(host: host, time_now: time_now))) + + signature + end + end + end +end diff --git a/lib/kafka/sasl_authenticator.rb b/lib/kafka/sasl_authenticator.rb index 2a0533302..10acf74b8 100644 --- a/lib/kafka/sasl_authenticator.rb +++ b/lib/kafka/sasl_authenticator.rb @@ -4,13 +4,18 @@ require 'kafka/sasl/gssapi' require 'kafka/sasl/scram' require 'kafka/sasl/oauth' +require 'kafka/sasl/awsmskiam' module Kafka class SaslAuthenticator def initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:, sasl_plain_authzid:, sasl_plain_username:, sasl_plain_password:, sasl_scram_username:, sasl_scram_password:, sasl_scram_mechanism:, - sasl_oauth_token_provider:) + sasl_oauth_token_provider:, + sasl_aws_msk_iam_access_key_id:, + sasl_aws_msk_iam_secret_key_id:, + sasl_aws_msk_iam_aws_region:, + sasl_aws_msk_iam_session_token: nil) @logger = TaggedLogger.new(logger) @plain = Sasl::Plain.new( @@ -33,12 +38,20 @@ def initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:, logger: @logger, ) + @aws_msk_iam = Sasl::AwsMskIam.new( + access_key_id: sasl_aws_msk_iam_access_key_id, + secret_key_id: sasl_aws_msk_iam_secret_key_id, + aws_region: sasl_aws_msk_iam_aws_region, + session_token: sasl_aws_msk_iam_session_token, + logger: @logger, + ) + @oauth = Sasl::OAuth.new( token_provider: sasl_oauth_token_provider, logger: @logger, ) - @mechanism = [@gssapi, @plain, @scram, @oauth].find(&:configured?) + @mechanism = [@gssapi, @plain, @scram, @oauth, @aws_msk_iam].find(&:configured?) end def enabled? diff --git a/lib/kafka/ssl_context.rb b/lib/kafka/ssl_context.rb index 1276b4948..c5eac4542 100644 --- a/lib/kafka/ssl_context.rb +++ b/lib/kafka/ssl_context.rb @@ -47,8 +47,8 @@ def self.build(ca_cert_file_path: nil, ca_cert: nil, client_cert: nil, client_ce Array(ca_cert).each do |cert| store.add_cert(OpenSSL::X509::Certificate.new(cert)) end - if ca_cert_file_path - store.add_file(ca_cert_file_path) + Array(ca_cert_file_path).each do |cert_file_path| + store.add_file(cert_file_path) end if ca_certs_from_system store.set_default_paths diff --git a/lib/kafka/version.rb b/lib/kafka/version.rb index 22636a095..715f24411 100644 --- a/lib/kafka/version.rb +++ b/lib/kafka/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Kafka - VERSION = "1.2.0" + VERSION = "1.5.0" end diff --git a/ruby-kafka.gemspec b/ruby-kafka.gemspec index 2589e97d5..d50092dc5 100644 --- a/ruby-kafka.gemspec +++ b/ruby-kafka.gemspec @@ -33,6 +33,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency "rake", "~> 10.0" spec.add_development_dependency "rspec" spec.add_development_dependency "pry" + spec.add_development_dependency "digest-murmurhash" spec.add_development_dependency "dotenv" spec.add_development_dependency "docker-api" spec.add_development_dependency "rspec-benchmark" diff --git a/spec/async_producer_spec.rb b/spec/async_producer_spec.rb index 7a0ed1351..ff91e9c8a 100644 --- a/spec/async_producer_spec.rb +++ b/spec/async_producer_spec.rb @@ -76,6 +76,8 @@ def instrument(name, payload = {}) sleep 0.2 # wait for worker to call produce expect(sync_producer).to have_received(:produce) + + async_producer.shutdown end it "retries until configured max_retries" do @@ -89,6 +91,8 @@ def instrument(name, payload = {}) metric = instrumenter.metrics_for("error.async_producer").first expect(metric.payload[:error]).to be_a(Kafka::BufferOverflow) expect(sync_producer).to have_received(:produce).exactly(3).times + + async_producer.shutdown end it "requires `topic` to be a String" do diff --git a/spec/cluster_spec.rb b/spec/cluster_spec.rb index acefa1f62..9d1d7d32d 100644 --- a/spec/cluster_spec.rb +++ b/spec/cluster_spec.rb @@ -103,4 +103,47 @@ }.to raise_exception(ArgumentError) end end + + describe "#cluster_info" do + let(:cluster) { + Kafka::Cluster.new( + seed_brokers: [URI("kafka://test1:9092")], + broker_pool: broker_pool, + logger: LOGGER, + resolve_seed_brokers: resolve_seed_brokers, + ) + } + + before do + allow(broker).to receive(:fetch_metadata) { raise Kafka::ConnectionError, "Operation timed out" } + allow(broker).to receive(:disconnect) + end + + context "when resolve_seed_brokers is false" do + let(:resolve_seed_brokers) { false } + + it "tries the seed broker hostnames as is" do + expect(broker_pool).to receive(:connect).with("test1", 9092) { broker } + expect { + cluster.cluster_info + }.to raise_error(Kafka::ConnectionError, %r{kafka://test1:9092: Operation timed out}) + end + end + + context "when resolve_seed_brokers is true" do + let(:resolve_seed_brokers) { true } + + before do + allow(Resolv).to receive(:getaddresses) { ["127.0.0.1", "::1"] } + end + + it "tries all the resolved IP addresses" do + expect(broker_pool).to receive(:connect).with("127.0.0.1", 9092) { broker } + expect(broker_pool).to receive(:connect).with("::1", 9092) { broker } + expect { + cluster.cluster_info + }.to raise_error(Kafka::ConnectionError, %r{kafka://test1:9092 \(127\.0\.0\.1\): Operation timed out}) + end + end + end end diff --git a/spec/digest_spec.rb b/spec/digest_spec.rb new file mode 100644 index 000000000..c4b98a8bf --- /dev/null +++ b/spec/digest_spec.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +describe Kafka::Digest do + describe "crc32" do + let(:digest) { Kafka::Digest.find_digest(:crc32) } + + it "is supported" do + expect(digest).to be_truthy + end + + it "produces hash for value" do + expect(digest.hash("yolo")).to eq(1623057525) + end + end + + describe "murmur2" do + let(:digest) { Kafka::Digest.find_digest(:murmur2) } + + it "is supported" do + expect(digest).to be_truthy + end + + it "produces hash for value" do + expect(digest.hash("yolo")).to eq(1633766415) + end + end + + describe "unknown hash function" do + it "raises" do + expect { Kafka::Digest.find_digest(:yolo) }.to raise_error + end + end +end diff --git a/spec/functional/client_spec.rb b/spec/functional/client_spec.rb index eec4f4627..cdd65498c 100644 --- a/spec/functional/client_spec.rb +++ b/spec/functional/client_spec.rb @@ -156,31 +156,6 @@ expect(message.key).to eq "xoxo" end - example "delivering a message to a topic that doesn't yet exist" do - topic = "unknown-topic-#{SecureRandom.uuid}" - now = Time.now - - expect { - Timecop.freeze(now) do - kafka.deliver_message("yolo", topic: topic, key: "xoxo", partition: 0, headers: { hello: 'World' }) - end - }.to raise_exception(Kafka::DeliveryFailed) {|exception| - expect(exception.failed_messages).to eq [ - Kafka::PendingMessage.new( - value: "yolo", - key: "xoxo", - headers: { - hello: "World", - }, - topic: topic, - partition: 0, - partition_key: nil, - create_time: now - ) - ] - } - end - example "enumerating the messages in a topic" do values = (1..10).to_a diff --git a/spec/functional/consumer_group_spec.rb b/spec/functional/consumer_group_spec.rb index 23cc1525e..c8e35af3a 100644 --- a/spec/functional/consumer_group_spec.rb +++ b/spec/functional/consumer_group_spec.rb @@ -476,12 +476,21 @@ def call(cluster:, members:, partitions:) end end + joined_consumers = [] consumers = 2.times.map do |i| assignment_strategy = assignment_strategy_class.new(i + 1) kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger) consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, assignment_strategy: assignment_strategy) consumer.subscribe(topic) + + allow(consumer).to receive(:trigger_heartbeat).and_wrap_original do |m, *args| + joined_consumers |= [consumer] + # Wait until all the consumers try to join to prevent one consumer from processing all messages + raise Kafka::HeartbeatError if joined_consumers.size < consumers.size + m.call(*args) + end + consumer end @@ -509,6 +518,68 @@ def call(cluster:, members:, partitions:) expect(received_messages.values.map(&:count)).to match_array [messages.count / 3, messages.count / 3 * 2] end + example "subscribing to different topics while in the same consumer group" do + topic1 = create_random_topic(num_partitions: 1) + topic2 = create_random_topic(num_partitions: 1) + messages = (1..500).to_a + + begin + kafka = Kafka.new(kafka_brokers, client_id: "test") + producer = kafka.producer + + messages[0..249].each do |i| + producer.produce(i.to_s, topic: topic1, partition: 0) + end + + messages[250..500].each do |i| + producer.produce(i.to_s, topic: topic2, partition: 0) + end + + producer.deliver_messages + end + + group_id = "test#{rand(1000)}" + + mutex = Mutex.new + received_messages = [] + + assignment_strategy_class = Kafka::RoundRobinAssignmentStrategy + + consumers = [topic1, topic2].map do |topic| + assignment_strategy = assignment_strategy_class.new + kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger) + consumer = kafka.consumer( + group_id: group_id, + offset_retention_time: offset_retention_time, + assignment_strategy: assignment_strategy + ) + consumer.subscribe(topic) + consumer + end + + threads = consumers.map do |consumer| + t = Thread.new do + consumer.each_message do |message| + mutex.synchronize do + received_messages << message + + if received_messages.count == messages.count + consumers.each(&:stop) + end + end + end + end + + t.abort_on_exception = true + + t + end + + threads.each(&:join) + + expect(received_messages.map(&:value).map(&:to_i)).to match_array messages + end + def wait_until(timeout:) Timeout.timeout(timeout) do sleep 0.5 until yield diff --git a/spec/functional/topic_management_spec.rb b/spec/functional/topic_management_spec.rb index d10b5fdd6..3dbdcaec3 100644 --- a/spec/functional/topic_management_spec.rb +++ b/spec/functional/topic_management_spec.rb @@ -70,11 +70,10 @@ def with_retry(opts = {}, &block) topic = generate_topic_name kafka.create_topic(topic, num_partitions: 3) - configs = kafka.describe_topic(topic, %w(retention.ms retention.bytes non_exists)) + configs = kafka.describe_topic(topic, %w(retention.ms)) - expect(configs.keys).to eql(%w(retention.ms retention.bytes)) + expect(configs.keys).to eql(%w(retention.ms)) expect(configs['retention.ms']).to be_a(String) - expect(configs['retention.bytes']).to be_a(String) end example "alter a topic configuration" do @@ -90,8 +89,9 @@ def with_retry(opts = {}, &block) 'max.message.bytes' => '987654' ) - configs = kafka.describe_topic(topic, %w(retention.ms max.message.bytes)) - expect(configs['retention.ms']).to eql('1234567') - expect(configs['max.message.bytes']).to eql('987654') + retention_configs = kafka.describe_topic(topic, %w(retention.ms)) + expect(retention_configs['retention.ms']).to eql('1234567') + max_msg_bytes_configs = kafka.describe_topic(topic, %w(max.message.bytes)) + expect(max_msg_bytes_configs['max.message.bytes']).to eql('987654') end end diff --git a/spec/partitioner_spec.rb b/spec/partitioner_spec.rb index dafe73557..547dccd13 100644 --- a/spec/partitioner_spec.rb +++ b/spec/partitioner_spec.rb @@ -1,29 +1,81 @@ # frozen_string_literal: true describe Kafka::Partitioner, "#call" do - let(:partitioner) { Kafka::Partitioner.new } let(:message) { double(:message, key: nil, partition_key: "yolo") } - it "deterministically returns a partition number for a partition key and partition count" do - partition = partitioner.call(3, message) - expect(partition).to eq 0 - end + describe "default partitioner" do + let(:partitioner) { Kafka::Partitioner.new } + + it "deterministically returns a partition number for a partition key and partition count" do + partition = partitioner.call(3, message) + expect(partition).to eq 0 + end + + it "falls back to the message key if no partition key is available" do + allow(message).to receive(:partition_key) { nil } + allow(message).to receive(:key) { "hey" } - it "falls back to the message key if no partition key is available" do - allow(message).to receive(:partition_key) { nil } - allow(message).to receive(:key) { "hey" } + partition = partitioner.call(3, message) - partition = partitioner.call(3, message) + expect(partition).to eq 2 + end - expect(partition).to eq 2 + it "randomly picks a partition if the key is nil" do + allow(message).to receive(:key) { nil } + allow(message).to receive(:partition_key) { nil } + + partitions = 30.times.map { partitioner.call(3, message) } + + expect(partitions.uniq).to contain_exactly(0, 1, 2) + end end - it "randomly picks a partition if the key is nil" do - allow(message).to receive(:key) { nil } - allow(message).to receive(:partition_key) { nil } + describe "murmur2 partitioner" do + let(:partitioner) { Kafka::Partitioner.new(hash_function: :murmur2) } + let(:message) { double(:message, key: nil, partition_key: "yolo") } + + it "deterministically returns a partition number for a partition key and partition count" do + partition = partitioner.call(3, message) + expect(partition).to eq 0 + end + + it "falls back to the message key if no partition key is available" do + allow(message).to receive(:partition_key) { nil } + allow(message).to receive(:key) { "hey" } + + partition = partitioner.call(3, message) + + expect(partition).to eq 1 + end + + it "randomly picks a partition if the key is nil" do + allow(message).to receive(:key) { nil } + allow(message).to receive(:partition_key) { nil } + + partitions = 30.times.map { partitioner.call(3, message) } - partitions = 30.times.map { partitioner.call(3, message) } + expect(partitions.uniq).to contain_exactly(0, 1, 2) + end - expect(partitions.uniq).to contain_exactly(0, 1, 2) + it "picks a Java Kafka compatible partition" do + partition_count = 100 + { + # librdkafka test cases taken from tests/0048-partitioner.c + "" => 0x106e08d9 % partition_count, + "this is another string with more length to it perhaps" => 0x4f7703da % partition_count, + "hejsan" => 0x5ec19395 % partition_count, + # Java Kafka test cases taken from UtilsTest.java. + # The Java tests check the result of murmur2 directly, + # so have been ANDd with 0x7fffffff to work here + "21" => (-973932308 & 0x7fffffff) % partition_count, + "foobar" => (-790332482 & 0x7fffffff) % partition_count, + "a-little-bit-long-string" => (-985981536 & 0x7fffffff) % partition_count, + "a-little-bit-longer-string" => (-1486304829 & 0x7fffffff) % partition_count, + "lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8" => (-58897971 & 0x7fffffff) % partition_count + }.each do |key, partition| + allow(message).to receive(:partition_key) { key } + expect(partitioner.call(partition_count, message)).to eq partition + end + end end end diff --git a/spec/protocol/sync_group_response_spec.rb b/spec/protocol/sync_group_response_spec.rb new file mode 100644 index 000000000..8979f244c --- /dev/null +++ b/spec/protocol/sync_group_response_spec.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +describe Kafka::Protocol::SyncGroupResponse do + describe ".decode" do + subject(:response) { Kafka::Protocol::SyncGroupResponse.decode(decoder) } + + let(:decoder) { Kafka::Protocol::Decoder.new(buffer) } + let(:buffer) { StringIO.new(response_bytes) } + + context "the response is successful" do + let(:response_bytes) { "\x00\x00\x00\x00\x007\x00\x00\x00\x00\x00\x01\x00\x1Fsome-topic-f064d6897583eb395896\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\xFF\xFF\xFF\xFF" } + + it "decodes the response including the member assignment" do + expect(response.error_code).to eq 0 + expect(response.member_assignment.topics).to eq({ "some-topic-f064d6897583eb395896" => [0, 1] }) + end + end + + context "the response is not successful" do + let(:response_bytes) { "\x00\x19\xFF\xFF\xFF\xFF" } + + it "decodes the response including the member assignment" do + expect(response.error_code).to eq 25 + expect(response.member_assignment).to be_nil + end + end + end +end diff --git a/spec/round_robin_assignment_strategy_spec.rb b/spec/round_robin_assignment_strategy_spec.rb index c76b9c053..af4aaf4e4 100644 --- a/spec/round_robin_assignment_strategy_spec.rb +++ b/spec/round_robin_assignment_strategy_spec.rb @@ -4,7 +4,7 @@ let(:strategy) { described_class.new } it "assigns all partitions" do - members = Hash[(0...10).map {|i| ["member#{i}", nil] }] + members = Hash[(0...10).map {|i| ["member#{i}", double(topics: ['greetings'])] }] partitions = (0...30).map {|i| double(:"partition#{i}", topic: "greetings", partition_id: i) } assignments = strategy.call(cluster: nil, members: members, partitions: partitions) @@ -21,8 +21,8 @@ end it "spreads all partitions between members" do - members = Hash[(0...10).map {|i| ["member#{i}", nil] }] topics = ["topic1", "topic2"] + members = Hash[(0...10).map {|i| ["member#{i}", double(topics: topics)] }] partitions = topics.product((0...5).to_a).map {|topic, i| double(:"partition#{i}", topic: topic, partition_id: i) } @@ -46,36 +46,50 @@ expect(num_partitions_assigned).to all eq(1) end + Metadata = Struct.new(:topics) [ { name: "uneven topics", topics: { "topic1" => [0], "topic2" => (0..50).to_a }, - members: { "member1" => nil, "member2" => nil }, + members: { + "member1" => Metadata.new(["topic1", "topic2"]), + "member2" => Metadata.new(["topic1", "topic2"]) + }, }, { name: "only one partition", topics: { "topic1" => [0] }, - members: { "member1" => nil, "member2" => nil }, + members: { + "member1" => Metadata.new(["topic1"]), + "member2" => Metadata.new(["topic1"]) + }, }, { name: "lots of partitions", topics: { "topic1" => (0..100).to_a }, - members: { "member1" => nil }, + members: { "member1" => Metadata.new(["topic1"]) }, }, { name: "lots of members", topics: { "topic1" => (0..10).to_a, "topic2" => (0..10).to_a }, - members: Hash[(0..50).map { |i| ["member#{i}", nil] }] + members: Hash[(0..50).map { |i| ["member#{i}", Metadata.new(["topic1", "topic2"])] }] }, { name: "odd number of partitions", topics: { "topic1" => (0..14).to_a }, - members: { "member1" => nil, "member2" => nil }, + members: { + "member1" => Metadata.new(["topic1"]), + "member2" => Metadata.new(["topic1"]) + }, }, { name: "five topics, 10 partitions, 3 consumers", topics: { "topic1" => [0, 1], "topic2" => [0, 1], "topic3" => [0, 1], "topic4" => [0, 1], "topic5" => [0, 1] }, - members: { "member1" => nil, "member2" => nil, "member3" => nil }, + members: { + "member1" => Metadata.new(["topic1", "topic2", "topic3", "topic4", "topic5"]), + "member2" => Metadata.new(["topic1", "topic2", "topic3", "topic4", "topic5"]), + "member3" => Metadata.new(["topic1", "topic2", "topic3", "topic4", "topic5"]) + }, } ].each do |options| name, topics, members = options[:name], options[:topics], options[:members] @@ -113,4 +127,208 @@ def expect_even_assignments(topics, assignments) expect(num_assigned).to be_within(1).of(num_partitions.to_f / assignments.count) end end + + context 'one consumer no subscriptions or topics / partitions' do + it 'returns empty assignments' do + members = { 'member1' => nil } + partitions = [] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({}) + end + end + + context 'one consumer with subscription but no matching topic partition' do + it 'returns empty assignments' do + members = { 'member1' => double(topics: ['topic1']) } + partitions = [] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({}) + end + end + + context 'one consumer subscribed to one topic with one partition' do + it 'assigns the partition to the consumer' do + members = { 'member1' => double(topics: ['topic1']) } + partitions = [ + t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0), + ] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({ + 'member1' => [t1p0] + }) + end + end + + context 'one consumer subscribed to one topic with multiple partitions' do + it 'assigns all partitions to the consumer' do + members = { 'member1' => double(topics: ['topic1']) } + partitions = [ + t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0), + t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1), + ] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({ + 'member1' => [t1p0, t1p1] + }) + end + end + + context 'one consumer subscribed to one topic but with multiple different topic partitions' do + it 'only assigns partitions for the subscribed topic' do + members = { 'member1' => double(topics: ['topic1']) } + partitions = [ + t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0), + t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1), + t2p0 = double(:"t2p0", topic: "topic2", partition_id: 0), + ] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({ + 'member1' => [t1p0, t1p1] + }) + end + end + + context 'one consumer subscribed to multiple topics' do + it 'assigns all the topics partitions to the consumer' do + members = { 'member1' => double(topics: ['topic1', 'topic2']) } + + partitions = [ + t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0), + t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1), + t2p0 = double(:"t2p0", topic: "topic2", partition_id: 0), + ] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({ + 'member1' => [t1p0, t1p1, t2p0] + }) + end + end + + context 'two consumers with one topic and only one partition' do + it 'only assigns the partition to one consumer' do + members = { + 'member1' => double(topics: ['topic1']), + 'member2' => double(topics: ['topic1']) + } + partitions = [ + t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0), + ] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({ + 'member1' => [t1p0] + }) + end + end + + context 'two consumers subscribed to one topic with two partitions' do + it 'assigns a partition to each consumer' do + members = { + 'member1' => double(topics: ['topic1']), + 'member2' => double(topics: ['topic1']) + } + partitions = [ + t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0), + t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1), + ] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({ + 'member1' => [t1p0], + 'member2' => [t1p1] + }) + end + end + + context 'multiple consumers with mixed topics subscriptions' do + it 'creates a balanced assignment' do + members = { + 'member1' => double(topics: ['topic1']), + 'member2' => double(topics: ['topic1', 'topic2']), + 'member3' => double(topics: ['topic1']) + } + partitions = [ + t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0), + t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1), + t1p2 = double(:"t1p2", topic: "topic1", partition_id: 2), + t2p0 = double(:"t2p0", topic: "topic2", partition_id: 0), + t2p1 = double(:"t2p1", topic: "topic2", partition_id: 1), + ] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({ + 'member1' => [t1p0], + 'member2' => [t1p1, t2p0, t2p1], + 'member3' => [t1p2] + }) + end + end + + context 'two consumers subscribed to two topics with three partitions each' do + it 'creates a balanced assignment' do + members = { + 'member1' => double(topics: ['topic1', 'topic2']), + 'member2' => double(topics: ['topic1', 'topic2']) + } + partitions = [ + t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0), + t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1), + t1p2 = double(:"t1p2", topic: "topic1", partition_id: 2), + t2p0 = double(:"t2p0", topic: "topic2", partition_id: 0), + t2p1 = double(:"t2p1", topic: "topic2", partition_id: 1), + t2p2 = double(:"t2p2", topic: "topic2", partition_id: 2), + ] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({ + 'member1' => [t1p0, t1p2, t2p1], + 'member2' => [t1p1, t2p0, t2p2] + }) + end + end + + context 'many consumers subscribed to one topic with partitions given out of order' do + it 'produces balanced assignments' do + members = { + 'member1' => double(topics: ['topic1']), + 'member2' => double(topics: ['topic1']), + 'member3' => double(topics: ['topic2']), + } + + partitions = [ + t2p0 = double(:"t2p0", topic: "topic2", partition_id: 0), + t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0), + t2p1 = double(:"t2p1", topic: "topic2", partition_id: 1), + t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1), + ] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + # Without sorting the partitions by topic this input would produce a non balanced assignment: + # member1 => [t1p0, t1p1] + # member2 => [] + # member3 => [t2p0, t2p1] + expect(assignments).to eq({ + 'member1' => [t1p0], + 'member2' => [t1p1], + 'member3' => [t2p0, t2p1] + }) + end + end end diff --git a/spec/sasl_authenticator_spec.rb b/spec/sasl_authenticator_spec.rb index 9bddf855a..9d4668d0c 100644 --- a/spec/sasl_authenticator_spec.rb +++ b/spec/sasl_authenticator_spec.rb @@ -41,6 +41,9 @@ sasl_scram_password: nil, sasl_scram_mechanism: nil, sasl_oauth_token_provider: nil, + sasl_aws_msk_iam_access_key_id: nil, + sasl_aws_msk_iam_secret_key_id: nil, + sasl_aws_msk_iam_aws_region: nil } } diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 32563ab04..1e8a44202 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -61,9 +61,9 @@ def generate_topic_name "#{RUN_ID}-topic-#{SecureRandom.uuid}" end - def create_random_topic(*args) + def create_random_topic(**args) topic = generate_topic_name - create_topic(topic, *args) + create_topic(topic, **args) topic end diff --git a/spec/ssl_context_spec.rb b/spec/ssl_context_spec.rb index 7ce09ce2f..ccadaa80f 100644 --- a/spec/ssl_context_spec.rb +++ b/spec/ssl_context_spec.rb @@ -31,6 +31,12 @@ }.to raise_exception(ArgumentError) end + it "raises an OpenSSL::X509::StoreError if an array of non-existing files is passed for ca_cert_file_path" do + expect { + Kafka::SslContext.build(ca_cert_file_path: ["no_such_file", "no_such_file_either"]) + }.to raise_exception(OpenSSL::X509::StoreError) + end + context "with self signed cert fixtures" do # How the certificates were generated, they are not actually in a chain # openssl req -newkey rsa:2048 -nodes -keyout spec/fixtures/client_cert_key.pem -x509 -days 365 -out spec/fixtures/client_cert.pem diff --git a/spec/transaction_manager_spec.rb b/spec/transaction_manager_spec.rb index 9895fd1d5..494a02265 100644 --- a/spec/transaction_manager_spec.rb +++ b/spec/transaction_manager_spec.rb @@ -591,10 +591,10 @@ ) ) allow(group_coordinator).to receive(:txn_offset_commit).and_return( - txn_offset_commit_response( + txn_offset_commit_response({ 'hello' => [1], 'world' => [2] - ) + }) ) end @@ -680,12 +680,12 @@ def success_add_partitions_to_txn_response(topics) end def txn_offset_commit_response(topics, error_code: 0) - Kafka::Protocol::AddPartitionsToTxnResponse.new( + Kafka::Protocol::TxnOffsetCommitResponse.new( errors: topics.map do |topic, partitions| - Kafka::Protocol::AddPartitionsToTxnResponse::TopicPartitionsError.new( + Kafka::Protocol::TxnOffsetCommitResponse::TopicPartitionsError.new( topic: topic, partitions: partitions.map do |partition| - Kafka::Protocol::AddPartitionsToTxnResponse::PartitionError.new( + Kafka::Protocol::TxnOffsetCommitResponse::PartitionError.new( partition: partition, error_code: error_code )