Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
d3796da
v1.3.0
dasch Oct 14, 2020
d20b62a
fix Kafka::TransactionManager#send_offsets_to_txn
stasCF Oct 21, 2020
8d25a79
fix rubocop offense
stasCF Oct 21, 2020
9da0459
Add kafka 2.6.0 to Circle CI
methodmissing Nov 11, 2020
fbf5273
Add Kafka 2.6 support to README
methodmissing Nov 11, 2020
d48aab8
add info to changelog
stasCF Nov 12, 2020
1f72b1f
Merge pull request #866 from stasCF/fix-send-offsets-to-txn
dasch Nov 12, 2020
8e00bea
Merge pull request #869 from Shopify/kafka-2.6.0
dasch Nov 12, 2020
3e6ea9f
Resolve RSpec::Mocks::OutsideOfExampleError
abicky Dec 12, 2020
ca33e40
Install cmake to install snappy
abicky Dec 12, 2020
6e25c4c
Resolve "Passing the keyword argument as ..." deprecation warning
abicky Dec 12, 2020
fe2fb3d
Install cmake to install snappy
abicky Dec 12, 2020
7d1c384
Make "consuming messages with a custom assignment strategy" stable
abicky Dec 12, 2020
cda7667
Install cmake to install snappy
abicky Dec 12, 2020
fc5c69d
Merge pull request #875 from abicky/resolve-deprecation-warning
dasch Dec 30, 2020
6d80b33
Merge pull request #874 from abicky/resolve-outside-of-example-error
dasch Dec 30, 2020
fe7bc77
Apply suggestions from code review
abicky Dec 30, 2020
49c6194
Merge pull request #876 from abicky/make-custom-assignment-strategy-e…
dasch Dec 30, 2020
ad8d8d9
kafka 2.7.0
vvuibert Jan 4, 2021
9028ed7
fix describe topic test
vvuibert Jan 13, 2021
0d6c451
Merge pull request #880 from Shopify/kafka-2.7.0
dasch Jan 14, 2021
b323c63
Adding murmur2_random partition assignment
Jan 25, 2021
db7f5f4
Add partitioner_klass as client param
Jan 27, 2021
0617573
Changes `digest-murmurhash` to be an optional dependency
Feb 1, 2021
4669c20
Revert client API change
Feb 1, 2021
b2f3eed
Small cleanup
Feb 1, 2021
d49e7c4
small cleanup
Feb 2, 2021
5a3374e
Add murmur2 to readme
Feb 2, 2021
c52c238
Add murmur2 to changelog
Feb 2, 2021
f3e5078
Add Partitioner doc
Feb 2, 2021
fd89412
Correcr readme doc
Feb 2, 2021
a56d16b
Merge pull request #884 from zendesk/divo/murmur2
dasch Feb 3, 2021
56a0476
Updated to ignore all control batches
rkruze Apr 7, 2021
2cf27e4
Support seed brokers' hostname with multiple addresses
abicky Dec 12, 2020
f07b4c1
Fix "@param tag has unknown parameter name"
abicky Apr 14, 2021
ef7c21c
Add resolve_seed_brokers option to changelog
abicky Apr 16, 2021
e766e2c
Merge pull request #877 from abicky/support-seed-broker-with-multiple…
dasch Apr 19, 2021
1e28e9b
Merge pull request #897 from abicky/fix-yard-annotation
dasch Apr 19, 2021
733a47d
Merge pull request #893 from rkruze/record_batch
dasch Apr 19, 2021
d9cb8d1
Handle SyncGroup responses with a non-zero error and no assignments
Apr 14, 2021
36e6a4b
Merge pull request #896 from BrentWheeldon/BrentWheeldon/group-sync-f…
dasch Apr 19, 2021
4568074
Refresh metadata if necessary on deliver_message
wamaral May 3, 2021
4b4f6bf
Update CHANGELOG
wamaral May 17, 2021
65e4af5
Merge pull request #901 from qonto/deliver-refresh-metadata
dasch May 18, 2021
93d6463
Make ssl_ca_cert_file_path support an array of files
syedriko Jun 28, 2021
374608f
ISSUE-525/764: Add multi subscription round robin assignment strategy.
Jun 8, 2021
ad688de
ISSUE-525/764: Local @topics variable does not contain all the subscr…
Jun 8, 2021
fa5e4ad
Add entry to the change log
Jul 26, 2021
7aadb07
Merge pull request #903 from eduardopoleoflipp/multiple_topic_subscri…
dasch Jul 26, 2021
e49b4b3
This spec stopped working at some point
dasch Jul 28, 2021
7a55dd0
Merge pull request #905 from syedriko/ssl_ca_cert_file_path_array
dasch Jul 28, 2021
8a18f9e
Merge pull request #909 from zendesk/dasch-remove-broken-spec
dasch Jul 28, 2021
68b8761
Fix multiple `[Producer name]` tags on failures
ojab Jul 30, 2021
56b5713
Merge pull request #910 from ojab/fixup_infinite_tags
dasch Aug 25, 2021
d5def61
v1.4.0
dasch Aug 25, 2021
edc50ac
AWS IAM SASL auth for MSK
jobeus Jul 20, 2021
29a4433
Some fixes, changelog, readme
jobeus Nov 22, 2021
750169c
oops bracket got deleted
jobeus Nov 22, 2021
ca53165
Sync Time.now.utc through the methods
jobeus Dec 10, 2021
5bd7d56
Missed a couple time_now places
jobeus Feb 3, 2022
f274edf
Add missing keyword args
kirkokada Mar 2, 2022
38f85c3
Add session token to iam mechanism
kirkokada Mar 10, 2022
efbe3c9
Add session token to canonical query
kirkokada Mar 10, 2022
bb6d655
Remove security token param if nil
kirkokada Mar 15, 2022
624475e
Add space
kirkokada Mar 17, 2022
9d44e51
Undo unnecessary chagne
kirkokada Mar 21, 2022
327da87
use bitnami/kafka:2.7.0 image
kirkokada Apr 1, 2022
9c89990
Merge pull request #937 from kirkokada/aws-iam-auth
dasch Apr 19, 2022
555281e
Merge pull request #939 from kirkokada/update-ci-test-suite
dasch Apr 19, 2022
7a97502
v1.5.0
leonmaia May 25, 2022
3b15941
Avoid unnecessary Hash merges
dasch Jun 13, 2022
292f2d5
Merge pull request #950 from zendesk/dasch-avoid-noop-merge
dasch Jun 14, 2022
5549a1b
Merge branch 'master' of https://github.com/zendesk/ruby-kafka into c…
AndreyNenashev Jul 8, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ jobs:
LOG_LEVEL: DEBUG
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec
- run: bundle exec rubocop
Expand Down Expand Up @@ -40,6 +41,7 @@ jobs:
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

Expand Down Expand Up @@ -72,6 +74,7 @@ jobs:
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

Expand Down Expand Up @@ -104,6 +107,7 @@ jobs:
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

Expand Down Expand Up @@ -136,6 +140,7 @@ jobs:
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

Expand Down Expand Up @@ -168,6 +173,7 @@ jobs:
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

Expand Down Expand Up @@ -200,6 +206,7 @@ jobs:
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

Expand Down Expand Up @@ -232,6 +239,7 @@ jobs:
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

Expand Down Expand Up @@ -264,6 +272,7 @@ jobs:
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

Expand Down Expand Up @@ -296,6 +305,75 @@ jobs:
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

kafka-2.6:
docker:
- image: circleci/ruby:2.5.1-node
environment:
LOG_LEVEL: DEBUG
- image: wurstmeister/zookeeper
- image: wurstmeister/kafka:2.13-2.6.0
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9092
KAFKA_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
- image: wurstmeister/kafka:2.13-2.6.0
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9093
KAFKA_PORT: 9093
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
- image: wurstmeister/kafka:2.13-2.6.0
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9094
KAFKA_PORT: 9094
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

kafka-2.7:
docker:
- image: circleci/ruby:2.5.1-node
environment:
LOG_LEVEL: DEBUG
- image: bitnami/zookeeper
environment:
ALLOW_ANONYMOUS_LOGIN: yes
- image: bitnami/kafka:2.7.0
environment:
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_CFG_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
- image: bitnami/kafka:2.7.0
environment:
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
KAFKA_CFG_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
- image: bitnami/kafka:2.7.0
environment:
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9094
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094
KAFKA_CFG_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_DELETE_TOPIC_ENABLE: true
steps:
- checkout
- run: sudo apt-get update && sudo apt-get install -y cmake # For installing snappy
- run: bundle install --path vendor/bundle
- run: bundle exec rspec --profile --tag functional spec/functional

Expand All @@ -313,3 +391,5 @@ workflows:
- kafka-2.3
- kafka-2.4
- kafka-2.5
- kafka-2.6
- kafka-2.7
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@ Changes and additions to the library will be listed here.

## Unreleased

## 1.5.0
- Add support for AWS IAM Authentication to an MSK cluster (#907).
- Added session token to the IAM mechanism; necessary for auth via temporary credentials (#937)

## 1.4.0

- Refresh a stale cluster's metadata if necessary on `Kafka::Client#deliver_message` (#901).
- Fix `Kafka::TransactionManager#send_offsets_to_txn` (#866).
- Add support for `murmur2` based partitioning.
- Add `resolve_seed_brokers` option to support seed brokers' hostname with multiple addresses (#877).
- Handle SyncGroup responses with a non-zero error and no assignments (#896).
- Add support for non-identical topic subscriptions within the same consumer group (#525 / #764).

## 1.3.0

- Support custom assignment strategy (#846).
- Improved Exceptions in TransactionManager (#862).

Expand Down
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,16 @@ Or install it yourself as:
<td>Limited support</td>
<td>Limited support</td>
</tr>
<tr>
<th>Kafka 2.6</th>
<td>Limited support</td>
<td>Limited support</td>
</tr>
<tr>
<th>Kafka 2.7</th>
<td>Limited support</td>
<td>Limited support</td>
</tr>
</table>

This library is targeting Kafka 0.9 with the v0.4.x series and Kafka 0.10 with the v0.5.x series. There's limited support for Kafka 0.8, and things should work with Kafka 0.11, although there may be performance issues due to changes in the protocol.
Expand All @@ -144,6 +154,8 @@ This library is targeting Kafka 0.9 with the v0.4.x series and Kafka 0.10 with t
- **Kafka 2.3:** Everything that works with Kafka 2.2 should still work, but so far no features specific to Kafka 2.3 have been added.
- **Kafka 2.4:** Everything that works with Kafka 2.3 should still work, but so far no features specific to Kafka 2.4 have been added.
- **Kafka 2.5:** Everything that works with Kafka 2.4 should still work, but so far no features specific to Kafka 2.5 have been added.
- **Kafka 2.6:** Everything that works with Kafka 2.5 should still work, but so far no features specific to Kafka 2.6 have been added.
- **Kafka 2.7:** Everything that works with Kafka 2.6 should still work, but so far no features specific to Kafka 2.7 have been added.

This library requires Ruby 2.1 or higher.

Expand All @@ -164,6 +176,12 @@ require "kafka"
kafka = Kafka.new(["kafka1:9092", "kafka2:9092"], client_id: "my-application")
```

You can also use a hostname with seed brokers' IP addresses:

```ruby
kafka = Kafka.new("seed-brokers:9092", client_id: "my-application", resolve_seed_brokers: true)
```

### Producing Messages to Kafka

The simplest way to write a message to a Kafka topic is to call `#deliver_message`:
Expand Down Expand Up @@ -370,6 +388,16 @@ partitioner = -> (partition_count, message) { ... }
Kafka.new(partitioner: partitioner, ...)
```

##### Supported partitioning schemes

In order for semantic partitioning to work a `partition_key` must map to the same partition number every time. The general approach, and the one used by this library, is to hash the key and mod it by the number of partitions. There are many different algorithms that can be used to calculate a hash. By default `crc32` is used. `murmur2` is also supported for compatibility with Java based Kafka producers.

To use `murmur2` hashing pass it as an argument to `Partitioner`. For example:

```ruby
Kafka.new(partitioner: Kafka::Partitioner.new(hash_function: :murmur2))
```

#### Buffering and Error Handling

The producer is designed for resilience in the face of temporary network errors, Kafka broker failovers, and other issues that prevent the client from writing messages to the destination topics. It does this by employing local, in-memory buffers. Only when messages are acknowledged by a Kafka broker will they be removed from the buffer.
Expand Down Expand Up @@ -1093,6 +1121,20 @@ kafka = Kafka.new(
)
```

##### AWS MSK (IAM)
In order to authenticate using IAM w/ an AWS MSK cluster, set your access key, secret key, and region when initializing the Kafka client:

```ruby
k = Kafka.new(
["kafka1:9092"],
sasl_aws_msk_iam_access_key_id: 'iam_access_key',
sasl_aws_msk_iam_secret_key_id: 'iam_secret_key',
sasl_aws_msk_iam_aws_region: 'us-west-2',
ssl_ca_certs_from_system: true,
# ...
)
```

##### PLAIN
In order to authenticate using PLAIN, you must set your username and password when initializing the Kafka client:

Expand Down
68 changes: 37 additions & 31 deletions lib/kafka/async_producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -212,31 +212,45 @@ def run
@logger.push_tags(@producer.to_s)
@logger.info "Starting async producer in the background..."

do_loop
rescue Exception => e
@logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
@logger.error "Async producer crashed!"
ensure
@producer.shutdown
@logger.pop_tags
end

private

def do_loop
loop do
operation, payload = @queue.pop

case operation
when :produce
produce(payload[0], **payload[1])
deliver_messages if threshold_reached?
when :deliver_messages
deliver_messages
when :shutdown
begin
# Deliver any pending messages first.
@producer.deliver_messages
rescue Error => e
@logger.error("Failed to deliver messages during shutdown: #{e.message}")

@instrumenter.instrument("drop_messages.async_producer", {
message_count: @producer.buffer_size + @queue.size,
})
begin
operation, payload = @queue.pop

case operation
when :produce
produce(payload[0], **payload[1])
deliver_messages if threshold_reached?
when :deliver_messages
deliver_messages
when :shutdown
begin
# Deliver any pending messages first.
@producer.deliver_messages
rescue Error => e
@logger.error("Failed to deliver messages during shutdown: #{e.message}")

@instrumenter.instrument("drop_messages.async_producer", {
message_count: @producer.buffer_size + @queue.size,
})
end

# Stop the run loop.
break
else
raise "Unknown operation #{operation.inspect}"
end

# Stop the run loop.
break
else
raise "Unknown operation #{operation.inspect}"
end
end
rescue Kafka::Error => e
Expand All @@ -245,16 +259,8 @@ def run

sleep 10
retry
rescue Exception => e
@logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
@logger.error "Async producer crashed!"
ensure
@producer.shutdown
@logger.pop_tags
end

private

def produce(value, **kwargs)
retries = 0
begin
Expand Down
Loading