diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 27a3eb08..5e91b274 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -44,6 +44,9 @@ jobs: - name: Run Compile for Kafka 1.x.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }} run: sbt -J-Xmx6144m ";++${{ matrix.scala }} ;clean ;kafka1x/compile" + - name: Run Compile for Kafka 4.x.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }} + run: sbt -J-Xmx6144m ";++${{ matrix.scala }} ;clean ;kafka4x/compile" + tests: name: scala-${{ matrix.scala }} jdk-${{ matrix.java }} tests runs-on: ubuntu-latest diff --git a/README.md b/README.md index 5026496d..31fed72e 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,20 @@ Work in progress! 8. [Maintainers](#maintainers) 9. [License](#license) +## Getting Started with Kafka 4.0.0 or above + +In SBT: + +```scala +libraryDependencies += "io.monix" %% "monix-kafka-4x" % "" +``` + +Also add a dependency override: + +```scala +dependencyOverrides += "org.apache.kafka" % "kafka-clients" % "4.0.0" +``` + ## Getting Started with Kafka 1.0.x or above In SBT: diff --git a/build.sbt b/build.sbt index ad2dfc67..967ae112 100644 --- a/build.sbt +++ b/build.sbt @@ -151,13 +151,21 @@ lazy val commonDependencies = Seq( "org.scalacheck" %% "scalacheck" % "1.15.2" % "test") ) -ThisBuild / scalaVersion := "2.13.8" -ThisBuild / crossScalaVersions := List("2.12.15", "2.13.8") +ThisBuild / scalaVersion := "2.13.15" +ThisBuild / crossScalaVersions := List("2.12.15", "2.13.15") lazy val monixKafka = project.in(file(".")) .settings(sharedSettings) .settings(doNotPublishArtifact) - .aggregate(kafka1x, kafka11, kafka10) + .aggregate(kafka4x, kafka1x, kafka11, kafka10) + +lazy val kafka4x = project.in(file("kafka-4.0.x")) + .settings(commonDependencies) + .settings(mimaSettings("monix-kafka-4x")) + .settings( + name := "monix-kafka-4x", + libraryDependencies += "org.apache.kafka" % "kafka-clients" % "4.0.0" exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j") + ) lazy val kafka1x = project.in(file("kafka-1.0.x")) .settings(commonDependencies) diff --git a/kafka-4.0.x/src/main/resources/monix/kafka/default.conf b/kafka-4.0.x/src/main/resources/monix/kafka/default.conf new file mode 100644 index 00000000..91aefa9c --- /dev/null +++ b/kafka-4.0.x/src/main/resources/monix/kafka/default.conf @@ -0,0 +1,81 @@ +kafka { + bootstrap.servers = "localhost:9092" + client.id = "" + + # E.g. "org.apache.kafka.clients.producer.internals.DefaultPartitioner" + partitioner.class = null + + acks = "1" + buffer.memory = 33554432 + compression.type = "none" + retries = 0 + max.in.flight.requests.per.connection = 5 + + ssl.key.password = null + ssl.keystore.password = null + ssl.keystore.location = null + ssl.truststore.password = null + ssl.truststore.location = null + + batch.size = 16384 + connections.max.idle.ms = 540000 + linger.ms = 0 + max.block.ms = 60000 + max.request.size = 1048576 + + receive.buffer.bytes = 32768 + request.timeout.ms = 40000 + + sasl.kerberos.service.name = null + sasl.mechanism = "GSSAPI" + + security.protocol = "PLAINTEXT" + send.buffer.bytes = 131072 + ssl.enabled.protocols = "TLSv1.2,TLSv1.1,TLSv1" + ssl.keystore.type = "JKS" + ssl.protocol = "TLS" + ssl.provider = null + ssl.truststore.type = "JKS" + + reconnect.backoff.ms = 50 + retry.backoff.ms = 100 + + metadata.max.age.ms = 300000 + + metric.reporters = "" + metrics.num.samples = 2 + metrics.sample.window.ms = 30000 + + # Consumer specific settings + client.rack = "" + fetch.min.bytes = 1 + fetch.max.bytes = 52428800 + group.id = "" + heartbeat.interval.ms = 3000 + max.partition.fetch.bytes = 1048576 + auto.offset.reset = "latest" + # Disabled to use back-pressure or manual commits instead + enable.auto.commit = false + exclude.internal.topics = true + receive.buffer.bytes = 65536 + check.crcs = true + fetch.max.wait.ms = 500 + # Default values for polling + # See https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread + session.timeout.ms = 10000 + max.poll.records = 500 + max.poll.interval.ms = 300000 + + # Monix specific settings + + # Number of requests that KafkaProducerSink + # can push in parallel + monix.producer.sink.parallelism = 100 + # Triggers either seekToEnd or seektoBeginning when the observable starts + # Possible values: end, beginning, no-seek + monix.observable.seek.onStart = "no-seek" + # Possible values: sync, async + monix.observable.commit.type = "sync" + # Possible values: before-ack, after-ack or no-ack + monix.observable.commit.order = "after-ack" +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/Commit.scala b/kafka-4.0.x/src/main/scala/monix/kafka/Commit.scala new file mode 100644 index 00000000..36b804b8 --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/Commit.scala @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import monix.eval.Task +import org.apache.kafka.common.TopicPartition + +/** Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context. + */ +trait Commit { + def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] + def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] +} + +private[kafka] object Commit { + + val empty: Commit = new Commit { + override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit + override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit + } +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/CommittableMessage.scala b/kafka-4.0.x/src/main/scala/monix/kafka/CommittableMessage.scala new file mode 100644 index 00000000..253f52d5 --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/CommittableMessage.scala @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import org.apache.kafka.clients.consumer.ConsumerRecord + +/** Represents data consumed from Kafka and [[CommittableOffset]] built from it + */ +final case class CommittableMessage[K, V](record: ConsumerRecord[K, V], committableOffset: CommittableOffset) diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/CommittableOffset.scala b/kafka-4.0.x/src/main/scala/monix/kafka/CommittableOffset.scala new file mode 100644 index 00000000..60974e4f --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/CommittableOffset.scala @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import monix.eval.Task +import org.apache.kafka.common.TopicPartition + +/** Represents offset for specified topic and partition that can be + * committed synchronously by [[commitSync]] method call or asynchronously by one of commitAsync methods. + * To achieve good performance it is recommended to use batched commit with + * [[CommittableOffsetBatch]] class. + * + * @param topicPartition is the topic and partition identifier + * + * @param offset is the offset to be committed + * + * @param commitCallback is the set of callbacks for batched commit realized as closures + * in [[KafkaConsumerObservable]] context. + */ +final class CommittableOffset private[kafka] ( + val topicPartition: TopicPartition, + val offset: Long, + private[kafka] val commitCallback: Commit) { + + /** Synchronously commits [[offset]] for the [[topicPartition]] to Kafka. It is recommended + * to use batched commit with [[CommittableOffsetBatch]] class. + */ + def commitSync(): Task[Unit] = commitCallback.commitBatchSync(Map(topicPartition -> offset)) + + /** Asynchronously commits [[offset]] to Kafka. It is recommended + * to use batched commit with [[CommittableOffsetBatch]] class. + */ + def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset)) +} + +object CommittableOffset { + + private[kafka] def apply(topicPartition: TopicPartition, offset: Long, commitCallback: Commit): CommittableOffset = + new CommittableOffset(topicPartition, offset, commitCallback) +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala b/kafka-4.0.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala new file mode 100644 index 00000000..750a2149 --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import monix.eval.Task +import org.apache.kafka.common.TopicPartition + +/** Batch of Kafka offsets which can be committed together. + * Can be built from offsets sequence by [[CommittableOffsetBatch#apply]] method. + * You can also use [[CommittableOffsetBatch#empty]] method to create empty batch and + * add offsets to it using [[updated]] method. + * + * WARNING: Order of the offsets is important. Only the last added offset + * for topic and partition will be committed to Kafka. + * + * @param offsets is the offsets batch for a provided topics and partitions. + * Make sure that each of them was received from one [[KafkaConsumerObservable]]. + * + * @param commitCallback is the set of callbacks for batched commit realized as closure + * in [[KafkaConsumerObservable]] context. This parameter is obtained from the last [[CommittableOffset]] + * added to batch. + */ +final class CommittableOffsetBatch private[kafka] (val offsets: Map[TopicPartition, Long], commitCallback: Commit) { + + /** Synchronously commits [[offsets]] to Kafka + */ + def commitSync(): Task[Unit] = commitCallback.commitBatchSync(offsets) + + /** Asynchronously commits [[offsets]] to Kafka + */ + def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(offsets) + + /** Adds new [[CommittableOffset]] to batch. Added offset replaces previous one specified + * for same topic and partition. + */ + def updated(committableOffset: CommittableOffset): CommittableOffsetBatch = + new CommittableOffsetBatch( + offsets.updated(committableOffset.topicPartition, committableOffset.offset), + committableOffset.commitCallback + ) +} + +object CommittableOffsetBatch { + + /** Creates empty [[CommittableOffsetBatch]]. Can be used as neutral element in fold: + * {{{ + * offsets.foldLeft(CommittableOffsetBatch.empty)(_ updated _) + * }}} + */ + val empty: CommittableOffsetBatch = new CommittableOffsetBatch(Map.empty, Commit.empty) + + /** Builds [[CommittableOffsetBatch]] from offsets sequence. Be careful with + * sequence order. If there is more than once offset for a topic and partition in the + * sequence then the last one will remain. + */ + def apply(offsets: Seq[CommittableOffset]): CommittableOffsetBatch = + if (offsets.nonEmpty) { + val aggregatedOffsets = offsets.foldLeft(Map.empty[TopicPartition, Long]) { (acc, o) => + acc.updated(o.topicPartition, o.offset) + } + new CommittableOffsetBatch(aggregatedOffsets, offsets.head.commitCallback) + } else { + empty + } + + /** Builds [[CommittableOffsetBatch]] list from offsets sequence by merging the offsets + * that have the same commit callback. This will help when the committable offsets are + * from different consumers. + * {{{ + * CommittableOffsetBatch.mergeByCommitCallback(offsets) + * }}} + */ + def mergeByCommitCallback(committableOffsets: Seq[CommittableOffset]): List[CommittableOffsetBatch] = { + if (committableOffsets.nonEmpty) { + committableOffsets + .groupBy(_.commitCallback) + .values + .map(CommittableOffsetBatch(_)) + .toList + } else { + List.empty + } + } +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/Deserializer.scala b/kafka-4.0.x/src/main/scala/monix/kafka/Deserializer.scala new file mode 100644 index 00000000..e5aa8148 --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/Deserializer.scala @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import java.nio.ByteBuffer +import org.apache.kafka.common.serialization._ +import org.apache.kafka.common.serialization.{Deserializer => KafkaDeserializer} +import org.apache.kafka.common.utils.Bytes + +/** Wraps a Kafka `Deserializer`, provided for + * convenience, since it can be implicitly fetched + * from the context. + * + * @param className is the full package path to the Kafka `Deserializer` + * + * @param classType is the `java.lang.Class` for [[className]] + * + * @param constructor creates an instance of [[classType]]. + * This is defaulted with a `Deserializer.Constructor[A]` function that creates a + * new instance using an assumed empty constructor. + * Supplying this parameter allows for manual provision of the `Deserializer`. + */ +final case class Deserializer[A]( + className: String, + classType: Class[_ <: KafkaDeserializer[A]], + constructor: Deserializer.Constructor[A] = (d: Deserializer[A]) => d.classType.getDeclaredConstructor().newInstance()) { + + /** Creates a new instance. */ + def create(): KafkaDeserializer[A] = + constructor(this) +} + +object Deserializer { + + implicit def fromKafkaDeserializer[A](implicit des: KafkaDeserializer[A]): Deserializer[A] = + Deserializer[A]( + className = des.getClass.getName, + classType = des.getClass, + constructor = _ => des + ) + + /** Alias for the function that provides an instance of + * the Kafka `Deserializer`. + */ + type Constructor[A] = (Deserializer[A]) => KafkaDeserializer[A] + + implicit val forStrings: Deserializer[String] = + Deserializer[String]( + className = "org.apache.kafka.common.serialization.StringDeserializer", + classType = classOf[StringDeserializer] + ) + + implicit val forByteArray: Deserializer[Array[Byte]] = + Deserializer[Array[Byte]]( + className = "org.apache.kafka.common.serialization.ByteArrayDeserializer", + classType = classOf[ByteArrayDeserializer] + ) + + implicit val forByteBuffer: Deserializer[ByteBuffer] = + Deserializer[ByteBuffer]( + className = "org.apache.kafka.common.serialization.ByteBufferDeserializer", + classType = classOf[ByteBufferDeserializer] + ) + + implicit val forBytes: Deserializer[Bytes] = + Deserializer[Bytes]( + className = "org.apache.kafka.common.serialization.BytesDeserializer", + classType = classOf[BytesDeserializer] + ) + + implicit val forJavaDouble: Deserializer[java.lang.Double] = + Deserializer[java.lang.Double]( + className = "org.apache.kafka.common.serialization.DoubleDeserializer", + classType = classOf[DoubleDeserializer] + ) + + implicit val forJavaInteger: Deserializer[java.lang.Integer] = + Deserializer[java.lang.Integer]( + className = "org.apache.kafka.common.serialization.IntegerDeserializer", + classType = classOf[IntegerDeserializer] + ) + + implicit val forJavaLong: Deserializer[java.lang.Long] = + Deserializer[java.lang.Long]( + className = "org.apache.kafka.common.serialization.LongDeserializer", + classType = classOf[LongDeserializer] + ) +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala b/kafka-4.0.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala new file mode 100644 index 00000000..c9fe3801 --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala @@ -0,0 +1,458 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import java.io.File +import java.util.Properties +import com.typesafe.config.{Config, ConfigFactory} +import monix.execution.internal.InternalApi +import monix.kafka.config._ + +import scala.jdk.CollectionConverters._ +import scala.concurrent.duration._ + +/** Configuration for Kafka Consumer. + * + * For the official documentation on the available configuration + * options, see + * [[https://kafka.apache.org/documentation.html#consumerconfigs Consumer Configs]] + * on `kafka.apache.org`. + * + * @param bootstrapServers is the `bootstrap.servers` setting, + * a list of host/port pairs to use for establishing + * the initial connection to the Kafka cluster. + * + * @param fetchMinBytes is the `fetch.min.bytes` setting, + * the minimum amount of data the server should return + * for a fetch request. + * + * @param fetchMaxBytes is the `fetch.max.bytes` setting, + * the maximum amount of data the server should return + * for a fetch request. + * + * @param groupId is the `group.id` setting, a unique string + * that identifies the consumer group this consumer + * belongs to. + * + * @param heartbeatInterval is the `heartbeat.interval.ms` setting, + * the expected time between heartbeats to the consumer coordinator + * when using Kafka's group management facilities. + * + * @param maxPartitionFetchBytes is the `max.partition.fetch.bytes` + * setting, the maximum amount of data per-partition the + * server will return. + * + * @param sessionTimeout is the `session.timeout.ms` setting, + * the timeout used to detect failures when using Kafka's + * group management facilities. + * + * @param sslKeyPassword is the `ssl.key.password` setting and represents + * the password of the private key in the key store file. + * This is optional for client. + * + * @param sslKeyStorePassword is the `ssl.keystore.password` setting, + * being the password of the private key in the key store file. + * This is optional for client. + * + * @param sslKeyStoreLocation is the `ssl.keystore.location` setting and + * represents the location of the key store file. This is optional + * for client and can be used for two-way authentication for client. + * + * @param sslTrustStoreLocation is the `ssl.truststore.location` setting + * and is the location of the trust store file. + * + * @param sslTrustStorePassword is the `ssl.truststore.password` setting + * and is the password for the trust store file. + * + * @param autoOffsetReset is the `auto.offset.reset` setting, + * specifying what to do when there is no initial offset in + * Kafka or if the current offset does not exist any more + * on the server (e.g. because that data has been deleted). + * + * @param connectionsMaxIdleTime is the `connections.max.idle.ms` setting + * and specifies how much time to wait before closing idle connections. + * + * @param enableAutoCommit is the `enable.auto.commit` setting. + * If true the consumer's offset will be periodically committed + * in the background. + * + * @param excludeInternalTopics is the `exclude.internal.topics` setting. + * Whether records from internal topics (such as offsets) should be + * exposed to the consumer. If set to true the only way to receive + * records from an internal topic is subscribing to it. + * + * @param maxPollRecords is the `max.poll.records` setting, the + * maximum number of records returned in a single call to poll(). + * + * @param receiveBufferInBytes is the `receive.buffer.bytes` setting, + * the size of the TCP receive buffer (SO_RCVBUF) to use + * when reading data. + * + * @param requestTimeout is the `request.timeout.ms` setting, + * The configuration controls the maximum amount of time + * the client will wait for the response of a request. + * If the response is not received before the timeout elapses + * the client will resend the request if necessary or fail the + * request if retries are exhausted. + * + * @param saslKerberosServiceName is the `sasl.kerberos.service.name` setting, + * being the Kerberos principal name that Kafka runs as. + * + * @param saslMechanism is the `sasl.mechanism` setting, being the SASL + * mechanism used for client connections. This may be any mechanism + * for which a security provider is available. + * + * @param securityProtocol is the `security.protocol` setting, + * being the protocol used to communicate with brokers. + * + * @param sendBufferInBytes is the `send.buffer.bytes` setting, + * being the size of the TCP send buffer (SO_SNDBUF) to use + * when sending data. + * + * @param sslEnabledProtocols is the `ssl.enabled.protocols` setting, + * being the list of protocols enabled for SSL connections. + * + * @param sslKeystoreType is the `ssl.keystore.type` setting, + * being the file format of the key store file. + * + * @param sslProtocol is the `ssl.protocol` setting, + * being the SSL protocol used to generate the SSLContext. + * Default setting is TLS, which is fine for most cases. + * Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, + * SSLv2 and SSLv3 may be supported in older JVMs, but their usage + * is discouraged due to known security vulnerabilities. + * + * @param sslProvider is the `ssl.provider` setting, + * being the name of the security provider used for SSL connections. + * Default value is the default security provider of the JVM. + * + * @param sslTruststoreType is the `ssl.truststore.type` setting, being + * the file format of the trust store file. + * + * @param checkCRCs is the `check.crcs` setting, specifying to + * automatically check the CRC32 of the records consumed. + * This ensures no on-the-wire or on-disk corruption to the + * messages occurred. This check adds some overhead, so it may + * be disabled in cases seeking extreme performance. + * + * @param clientId is the `client.id` setting, + * an id string to pass to the server when making requests. + * The purpose of this is to be able to track the source of + * requests beyond just ip/port by allowing a logical application + * name to be included in server-side request logging. + * + * @param clientRack is the `client.rack` setting. + * A rack identifier for this client. + * This can be any string value which indicates where this client is physically located. + * It corresponds with the broker config 'broker.rack' + * + * @param fetchMaxWaitTime is the `fetch.max.wait.ms` setting, + * the maximum amount of time the server will block before + * answering the fetch request if there isn't sufficient data to + * immediately satisfy the requirement given by fetch.min.bytes. + * + * @param metadataMaxAge is the `metadata.max.age.ms` setting. + * The period of time in milliseconds after which we force a + * refresh of metadata even if we haven't seen any partition + * leadership changes to proactively discover any new brokers + * or partitions. + * + * @param metricReporters is the `metric.reporters` setting. + * A list of classes to use as metrics reporters. Implementing the + * `MetricReporter` interface allows plugging in classes that will + * be notified of new metric creation. The JmxReporter is always + * included to register JMX statistics + * + * @param metricsNumSamples is the `metrics.num.samples` setting. + * The number of samples maintained to compute metrics. + * + * @param metricsSampleWindow is the `metrics.sample.window.ms` setting. + * The metrics system maintains a configurable number of samples over + * a fixed window size. This configuration controls the size of the + * window. For example we might maintain two samples each measured + * over a 30 second period. When a window expires we erase and + * overwrite the oldest window. + * + * @param reconnectBackoffTime is the `reconnect.backoff.ms` setting. + * The amount of time to wait before attempting to reconnect to a + * given host. This avoids repeatedly connecting to a host in a + * tight loop. This backoff applies to all requests sent by the + * consumer to the broker. + * + * @param retryBackoffTime is the `retry.backoff.ms` setting. + * The amount of time to wait before attempting to retry a failed + * request to a given topic partition. This avoids repeatedly + * sending requests in a tight loop under some failure scenarios. + * + * @param observableCommitType is the `monix.observable.commit.type` setting. + * Represents the type of commit to make when the [[enableAutoCommit]] + * setting is set to `false`, in which case the observable has to + * commit on every batch. + * + * @param observableCommitOrder is the `monix.observable.commit.order` setting. + * Specifies when the commit should happen, like before we receive the + * acknowledgement from downstream, or afterwards. + * + * @param properties map of other properties that will be passed to + * the underlying kafka client. Any properties not explicitly handled + * by this object can be set via the map, but in case of a duplicate + * a value set on the case class will overwrite value set via properties. + */ +case class KafkaConsumerConfig( + bootstrapServers: List[String], + fetchMinBytes: Int, + fetchMaxBytes: Int, + groupId: String, + heartbeatInterval: FiniteDuration, + maxPartitionFetchBytes: Int, + sessionTimeout: FiniteDuration, + sslKeyPassword: Option[String], + sslKeyStorePassword: Option[String], + sslKeyStoreLocation: Option[String], + sslTrustStoreLocation: Option[String], + sslTrustStorePassword: Option[String], + autoOffsetReset: AutoOffsetReset, + connectionsMaxIdleTime: FiniteDuration, + enableAutoCommit: Boolean, + excludeInternalTopics: Boolean, + maxPollRecords: Int, + maxPollInterval: FiniteDuration, + receiveBufferInBytes: Int, + requestTimeout: FiniteDuration, + saslKerberosServiceName: Option[String], + saslMechanism: String, + securityProtocol: SecurityProtocol, + sendBufferInBytes: Int, + sslEnabledProtocols: List[SSLProtocol], + sslKeystoreType: String, + sslProtocol: SSLProtocol, + sslProvider: Option[String], + sslTruststoreType: String, + checkCRCs: Boolean, + clientId: String, + clientRack: String, + fetchMaxWaitTime: FiniteDuration, + metadataMaxAge: FiniteDuration, + metricReporters: List[String], + metricsNumSamples: Int, + metricsSampleWindow: FiniteDuration, + reconnectBackoffTime: FiniteDuration, + retryBackoffTime: FiniteDuration, + observableCommitType: ObservableCommitType, + observableCommitOrder: ObservableCommitOrder, + observableSeekOnStart: ObservableSeekOnStart, + properties: Map[String, String]) { + + def toMap: Map[String, String] = properties ++ Map( + "bootstrap.servers" -> bootstrapServers.mkString(","), + "fetch.min.bytes" -> fetchMinBytes.toString, + "fetch.max.bytes" -> fetchMaxBytes.toString, + "group.id" -> groupId, + "heartbeat.interval.ms" -> heartbeatInterval.toMillis.toString, + "max.partition.fetch.bytes" -> maxPartitionFetchBytes.toString, + "session.timeout.ms" -> sessionTimeout.toMillis.toString, + "ssl.key.password" -> sslKeyPassword.orNull, + "ssl.keystore.password" -> sslKeyStorePassword.orNull, + "ssl.keystore.location" -> sslKeyStoreLocation.orNull, + "ssl.truststore.password" -> sslTrustStorePassword.orNull, + "ssl.truststore.location" -> sslTrustStoreLocation.orNull, + "auto.offset.reset" -> autoOffsetReset.id, + "connections.max.idle.ms" -> connectionsMaxIdleTime.toMillis.toString, + "enable.auto.commit" -> enableAutoCommit.toString, + "exclude.internal.topics" -> excludeInternalTopics.toString, + "max.poll.records" -> maxPollRecords.toString, + "max.poll.interval.ms" -> maxPollInterval.toMillis.toString, + "receive.buffer.bytes" -> receiveBufferInBytes.toString, + "request.timeout.ms" -> requestTimeout.toMillis.toString, + "sasl.kerberos.service.name" -> saslKerberosServiceName.orNull, + "sasl.mechanism" -> saslMechanism, + "security.protocol" -> securityProtocol.id, + "send.buffer.bytes" -> sendBufferInBytes.toString, + "ssl.enabled.protocols" -> sslEnabledProtocols.map(_.id).mkString(","), + "ssl.keystore.type" -> sslKeystoreType, + "ssl.protocol" -> sslProtocol.id, + "ssl.provider" -> sslProvider.orNull, + "ssl.truststore.type" -> sslTruststoreType, + "check.crcs" -> checkCRCs.toString, + "client.id" -> clientId, + "client.rack" -> clientRack, + "fetch.max.wait.ms" -> fetchMaxWaitTime.toMillis.toString, + "metadata.max.age.ms" -> metadataMaxAge.toMillis.toString, + "metric.reporters" -> metricReporters.mkString(","), + "metrics.num.samples" -> metricsNumSamples.toString, + "metrics.sample.window.ms" -> metricsSampleWindow.toMillis.toString, + "reconnect.backoff.ms" -> reconnectBackoffTime.toMillis.toString, + "retry.backoff.ms" -> retryBackoffTime.toMillis.toString + ) + + private[kafka] var pollHeartbeatRate: FiniteDuration = 15.millis + + @InternalApi + private[kafka] def withPollHeartBeatRate(interval: FiniteDuration): KafkaConsumerConfig = { + pollHeartbeatRate = interval + this + } + + def toJavaMap: java.util.Map[String, Object] = + toMap.filter(_._2 != null).map { case (a, b) => (a, b.asInstanceOf[AnyRef]) }.asJava + + def toProperties: Properties = { + val props = new Properties() + for ((k, v) <- toMap; if v != null) props.put(k, v) + props + } +} + +object KafkaConsumerConfig { + private val defaultRootPath = "kafka" + + lazy private val defaultConf: Config = + ConfigFactory.load("monix/kafka/default.conf").getConfig(defaultRootPath) + + /** Returns the default configuration, specified the `monix-kafka` project + * in `monix/kafka/default.conf`. + */ + lazy val default: KafkaConsumerConfig = + apply(defaultConf, includeDefaults = false) + + /** Loads the [[KafkaConsumerConfig]] either from a file path or + * from a resource, if `config.file` or `config.resource` are + * defined, or otherwise returns the default config. + * + * If you want to specify a `config.file`, you can configure the + * Java process on execution like so: + * {{{ + * java -Dconfig.file=/path/to/application.conf + * }}} + * + * Or if you want to specify a `config.resource` to be loaded + * from the executable's distributed JAR or classpath: + * {{{ + * java -Dconfig.resource=com/company/mySpecial.conf + * }}} + * + * In case neither of these are specified, then the configuration + * loaded is the default one, from the `monix-kafka` project, specified + * in `monix/kafka/default.conf`. + */ + def load(): KafkaConsumerConfig = + Option(System.getProperty("config.file")).map(f => new File(f)) match { + case Some(file) if file.exists() => + loadFile(file) + case _ => + Option(System.getProperty("config.resource")) match { + case Some(resource) => + loadResource(resource) + case None => + default + } + } + + /** Loads a [[KafkaConsumerConfig]] from a project resource. + * + * @param resourceBaseName is the resource from where to load the config + * @param rootPath is the config root path (e.g. `kafka`) + * @param includeDefaults should be `true` in case you want to fallback + * to the default values provided by the `monix-kafka` library + * in `monix/kafka/default.conf` + */ + def loadResource( + resourceBaseName: String, + rootPath: String = defaultRootPath, + includeDefaults: Boolean = true): KafkaConsumerConfig = + apply(ConfigFactory.load(resourceBaseName).getConfig(rootPath), includeDefaults) + + /** Loads a [[KafkaConsumerConfig]] from a specified file. + * + * @param file is the configuration path from where to load the config + * @param rootPath is the config root path (e.g. `kafka`) + * @param includeDefaults should be `true` in case you want to fallback + * to the default values provided by the `monix-kafka` library + * in `monix/kafka/default.conf` + */ + def loadFile(file: File, rootPath: String = defaultRootPath, includeDefaults: Boolean = true): KafkaConsumerConfig = + apply(ConfigFactory.parseFile(file).resolve().getConfig(rootPath), includeDefaults) + + /** Loads the [[KafkaConsumerConfig]] from a parsed + * `com.typesafe.config.Config` reference. + * + * NOTE that this method doesn't assume any path prefix for loading the + * configuration settings, so it does NOT assume a root path like `kafka`. + * In case case you need that, you can always do: + * + * {{{ + * KafkaConsumerConfig(globalConfig.getConfig("kafka")) + * }}} + * + * @param source is the typesafe `Config` object to read from + * @param includeDefaults should be `true` in case you want to fallback + * to the default values provided by the `monix-kafka` library + * in `monix/kafka/default.conf` + */ + def apply(source: Config, includeDefaults: Boolean = true): KafkaConsumerConfig = { + val config = if (!includeDefaults) source else source.withFallback(defaultConf) + def getOptString(path: String): Option[String] = + if (config.hasPath(path)) Option(config.getString(path)) + else None + + KafkaConsumerConfig( + bootstrapServers = config.getString("bootstrap.servers").trim.split("\\s*,\\s*").toList, + fetchMinBytes = config.getInt("fetch.min.bytes"), + fetchMaxBytes = config.getInt("fetch.max.bytes"), + groupId = config.getString("group.id"), + heartbeatInterval = config.getInt("heartbeat.interval.ms").millis, + maxPartitionFetchBytes = config.getInt("max.partition.fetch.bytes"), + sessionTimeout = config.getInt("session.timeout.ms").millis, + sslKeyPassword = getOptString("ssl.key.password"), + sslKeyStorePassword = getOptString("ssl.keystore.password"), + sslKeyStoreLocation = getOptString("ssl.keystore.location"), + sslTrustStorePassword = getOptString("ssl.truststore.password"), + sslTrustStoreLocation = getOptString("ssl.truststore.location"), + autoOffsetReset = AutoOffsetReset(config.getString("auto.offset.reset")), + connectionsMaxIdleTime = config.getInt("connections.max.idle.ms").millis, + enableAutoCommit = config.getBoolean("enable.auto.commit"), + excludeInternalTopics = config.getBoolean("exclude.internal.topics"), + maxPollRecords = config.getInt("max.poll.records"), + maxPollInterval = config.getInt("max.poll.interval.ms").millis, + receiveBufferInBytes = config.getInt("receive.buffer.bytes"), + requestTimeout = config.getInt("request.timeout.ms").millis, + saslKerberosServiceName = getOptString("sasl.kerberos.service.name"), + saslMechanism = config.getString("sasl.mechanism"), + securityProtocol = SecurityProtocol(config.getString("security.protocol")), + sendBufferInBytes = config.getInt("send.buffer.bytes"), + sslEnabledProtocols = config.getString("ssl.enabled.protocols").split("\\s*,\\s*").map(SSLProtocol.apply).toList, + sslKeystoreType = config.getString("ssl.keystore.type"), + sslProtocol = SSLProtocol(config.getString("ssl.protocol")), + sslProvider = getOptString("ssl.provider"), + sslTruststoreType = config.getString("ssl.truststore.type"), + checkCRCs = config.getBoolean("check.crcs"), + clientId = config.getString("client.id"), + clientRack = config.getString("client.rack"), + fetchMaxWaitTime = config.getInt("fetch.max.wait.ms").millis, + metadataMaxAge = config.getInt("metadata.max.age.ms").millis, + metricReporters = config.getString("metric.reporters").trim.split("\\s*,\\s*").toList, + metricsNumSamples = config.getInt("metrics.num.samples"), + metricsSampleWindow = config.getInt("metrics.sample.window.ms").millis, + reconnectBackoffTime = config.getInt("reconnect.backoff.ms").millis, + retryBackoffTime = config.getInt("retry.backoff.ms").millis, + observableCommitType = ObservableCommitType(config.getString("monix.observable.commit.type")), + observableCommitOrder = ObservableCommitOrder(config.getString("monix.observable.commit.order")), + observableSeekOnStart = ObservableSeekOnStart(config.getString("monix.observable.seek.onStart")), + properties = Map.empty + ) + } +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala b/kafka-4.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala new file mode 100644 index 00000000..f2392860 --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala @@ -0,0 +1,290 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import monix.eval.Task +import monix.execution.Ack.{Continue, Stop} +import monix.execution.{Ack, Callback, Cancelable, Scheduler} +import monix.kafka.config.ObservableCommitOrder +import monix.reactive.Observable +import monix.reactive.observers.Subscriber +import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer} + +import scala.jdk.CollectionConverters._ +import scala.concurrent.blocking +import scala.util.matching.Regex +import scala.concurrent.duration._ + +/** Exposes an `Observable` that consumes a Kafka stream by + * means of a Kafka Consumer client. + * + * In order to get initialized, it needs a configuration. See the + * [[KafkaConsumerConfig]] needed and see `monix/kafka/default.conf`, + * (in the resource files) that is exposing all default values. + */ +trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { + + protected def config: KafkaConsumerConfig + protected def consumerT: Task[Consumer[K, V]] + + @volatile + protected var isAcked = true + + /** Creates a task that polls the source, then feeds the downstream + * subscriber, returning the resulting acknowledgement + */ + protected def ackTask(consumer: Consumer[K, V], out: Subscriber[Out]): Task[Ack] + + override final def unsafeSubscribeFn(out: Subscriber[Out]): Cancelable = { + import out.scheduler + + val callback = new Callback[Throwable, Unit] { + def onSuccess(value: Unit): Unit = + out.onComplete() + + def onError(ex: Throwable): Unit = + out.onError(ex) + } + + feedTask(out).runAsync(callback) + } + + private def feedTask(out: Subscriber[Out]): Task[Unit] = { + Task.create { (scheduler, cb) => + implicit val s = scheduler + val startConsuming = + consumerT.bracket { c => + // Skipping all available messages on all partitions + if (config.observableSeekOnStart.isSeekEnd) c.seekToEnd(Nil.asJavaCollection) + else if (config.observableSeekOnStart.isSeekBeginning) c.seekToBeginning(Nil.asJavaCollection) + Task.race(runLoop(c, out), pollHeartbeat(c).loopForever).void + } { consumer => + // Forced asynchronous boundary + Task.evalAsync(consumer.synchronized(blocking(consumer.close()))) + } + startConsuming.runAsync(cb) + } + } + + /** Returns a task that continuously polls the `KafkaConsumer` for + * new messages and feeds the given subscriber. + * + * Creates an asynchronous boundary on every poll. + */ + private def runLoop(consumer: Consumer[K, V], out: Subscriber[Out]): Task[Unit] = { + ackTask(consumer, out).flatMap { + case Stop => Task.unit + case Continue => runLoop(consumer, out) + } + } + + /** Returns task that constantly polls the `KafkaConsumer` in case subscriber + * is still processing last fed batch. + * This allows producer process commit calls and also keeps consumer alive even + * with long batch processing. + * + * If polling fails the error is reported to the subscriber through the scheduler. + * + * @see [[https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread]] + */ + private def pollHeartbeat(consumer: Consumer[K, V])(implicit scheduler: Scheduler): Task[Unit] = { + Task.sleep(config.pollHeartbeatRate) >> + Task.eval { + if (!isAcked) { + consumer.synchronized { + // needed in order to ensure that the consumer assignment + // is paused, meaning that no messages will get lost. + val assignment = consumer.assignment() + consumer.pause(assignment) + val records = blocking(consumer.poll(java.time.Duration.ZERO)) + if (!records.isEmpty) { + val errorMsg = s"Received ${records.count()} unexpected messages." + throw new IllegalStateException(errorMsg) + } + } + } + }.onErrorHandleWith { ex => + Task.now(scheduler.reportFailure(ex)) >> + Task.sleep(1.seconds) + } + } + +} + +object KafkaConsumerObservable { + + /** Builds a [[KafkaConsumerObservable]] instance. + * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. + * @param consumer is a factory for the + * `org.apache.kafka.clients.consumer.KafkaConsumer` + * instance to use for consuming from Kafka + */ + def apply[K, V]( + cfg: KafkaConsumerConfig, + consumer: Task[Consumer[K, V]]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = + new KafkaConsumerObservableAutoCommit[K, V](cfg, consumer) + + /** Builds a [[KafkaConsumerObservable]] instance. + * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. + * @param topics is the list of Kafka topics to subscribe to. + */ + def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], + V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = { + + val consumer = createConsumer[K, V](cfg, topics) + apply(cfg, consumer) + } + + /** Builds a [[KafkaConsumerObservable]] instance. + * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. + * @param topicsRegex is the pattern of Kafka topics to subscribe to. + */ + def apply[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit + K: Deserializer[K], + V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = { + + val consumer = createConsumer[K, V](cfg, topicsRegex) + apply(cfg, consumer) + } + + /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets + * and forcibly disables auto commits in configuration. + * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. + * + * Usage example: + * {{{ + * KafkaConsumerObservable.manualCommit[String,String](consumerCfg, List(topicName)) + * .map(message => message.record.value() -> message.committableOffset) + * .mapEval { case (value, offset) => performBusinessLogic(value).map(_ => offset) } + * .bufferTimedAndCounted(1.second, 1000) + * .mapEval(offsets => CommittableOffsetBatch(offsets).commitSync()) + * .subscribe() + * }}} + * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. Auto commit will disabled and + * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! + * @param consumer is a factory for the + * `org.apache.kafka.clients.consumer.KafkaConsumer` + * instance to use for consuming from Kafka + */ + def manualCommit[K, V]( + cfg: KafkaConsumerConfig, + consumer: Task[Consumer[K, V]]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { + + val manualCommitConfig = cfg + .copy(observableCommitOrder = ObservableCommitOrder.NoAck, enableAutoCommit = false) + .withPollHeartBeatRate(cfg.pollHeartbeatRate) + new KafkaConsumerObservableManualCommit[K, V](manualCommitConfig, consumer) + } + + /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets + * and forcibly disables auto commits in configuration. + * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. + * + * Usage example: + * {{{ + * KafkaConsumerObservable.manualCommit[String,String](consumerCfg, List(topicName)) + * .map(message => message.record.value() -> message.committableOffset) + * .mapEval { case (value, offset) => performBusinessLogic(value).map(_ => offset) } + * .bufferTimedAndCounted(1.second, 1000) + * .mapEval(offsets => CommittableOffsetBatch(offsets).commitSync()) + * .subscribe() + * }}} + * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. Auto commit will disabled and + * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! + * @param topics is the list of Kafka topics to subscribe to. + */ + def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], + V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { + + val consumer = createConsumer[K, V](cfg, topics) + manualCommit(cfg, consumer) + } + + /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets + * and forcibly disables auto commits in configuration. + * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. + * + * Usage example: + * {{{ + * KafkaConsumerObservable.manualCommit[String,String](consumerCfg, List(topicName)) + * .map(message => message.record.value() -> message.committableOffset) + * .mapEval { case (value, offset) => performBusinessLogic(value).map(_ => offset) } + * .bufferTimedAndCounted(1.second, 1000) + * .mapEval(offsets => CommittableOffsetBatch(offsets).commitSync()) + * .subscribe() + * }}} + * + * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the + * consumer; also make sure to see `monix/kafka/default.conf` for + * the default values being used. Auto commit will disabled and + * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! + * @param topicsRegex is the pattern of Kafka topics to subscribe to. + */ + def manualCommit[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit + K: Deserializer[K], + V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { + + val consumer = createConsumer[K, V](cfg, topicsRegex) + manualCommit(cfg, consumer) + } + + /** Returns a `Task` for creating a consumer instance given list of topics. */ + def createConsumer[K, V](config: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], + V: Deserializer[V]): Task[Consumer[K, V]] = { + + Task.evalAsync { + val configMap = config.toJavaMap + blocking { + val consumer = new KafkaConsumer[K, V](configMap, K.create(), V.create()) + consumer.subscribe(topics.asJava) + consumer + } + } + } + + /** Returns a `Task` for creating a consumer instance given topics regex. */ + def createConsumer[K, V](config: KafkaConsumerConfig, topicsRegex: Regex)(implicit + K: Deserializer[K], + V: Deserializer[V]): Task[Consumer[K, V]] = { + Task.evalAsync { + val configMap = config.toJavaMap + blocking { + val consumer = new KafkaConsumer[K, V](configMap, K.create(), V.create()) + consumer.subscribe(topicsRegex.pattern) + consumer + } + } + } +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala b/kafka-4.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala new file mode 100644 index 00000000..ecb52244 --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import monix.eval.Task +import monix.execution.Ack.Stop +import monix.execution.cancelables.BooleanCancelable +import monix.execution.{Ack, Callback} +import monix.kafka.config.ObservableCommitType +import monix.reactive.Observer +import monix.reactive.observers.Subscriber +import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord} + +import scala.jdk.CollectionConverters._ +import scala.concurrent.{blocking, Future} +import scala.util.control.NonFatal +import scala.util.{Failure, Success} + +/** KafkaConsumerObservable implementation which commits offsets itself. + */ +final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] ( + override protected val config: KafkaConsumerConfig, + override protected val consumerT: Task[Consumer[K, V]]) + extends KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] { + + /* Based on the [[KafkaConsumerConfig.observableCommitType]] it + * chooses to commit the offsets in the consumer, by doing either + * a `commitSync` or a `commitAsync`. + * + * MUST BE synchronized by `consumer`. + */ + private def consumerCommit(consumer: Consumer[K, V]): Unit = + config.observableCommitType match { + case ObservableCommitType.Sync => + blocking(consumer.commitSync()) + case ObservableCommitType.Async => + blocking(consumer.commitAsync()) + } + + // Caching value to save CPU cycles + private val pollTimeoutMillis = config.fetchMaxWaitTime.toMillis + // Boolean value indicating that we should trigger a commit before downstream ack + private val shouldCommitBefore = !config.enableAutoCommit && config.observableCommitOrder.isBefore + // Boolean value indicating that we should trigger a commit after downstream ack + private val shouldCommitAfter = !config.enableAutoCommit && config.observableCommitOrder.isAfter + + override protected def ackTask(consumer: Consumer[K, V], out: Subscriber[ConsumerRecord[K, V]]): Task[Ack] = + Task.create { (scheduler, cb) => + implicit val s = scheduler + val asyncCb = Callback.forked(cb) + val cancelable = BooleanCancelable() + + // Forced asynchronous boundary (on the I/O scheduler) + s.execute { () => + val ackFuture = + try consumer.synchronized { + val assignment = consumer.assignment() + if (cancelable.isCanceled) Stop + else { + consumer.resume(assignment) + val next = blocking(consumer.poll(java.time.Duration.ofMillis(pollTimeoutMillis))) + consumer.pause(assignment) + if (shouldCommitBefore) consumerCommit(consumer) + // Feeding the observer happens on the Subscriber's scheduler + // if any asynchronous boundaries happen + isAcked = false + Observer.feed(out, next.asScala)(out.scheduler) + } + } catch { + case NonFatal(ex) => + Future.failed(ex) + } + + ackFuture.syncOnComplete { + case Success(ack) => + isAcked = true + // The `streamError` flag protects against contract violations + // (i.e. onSuccess/onError should happen only once). + // Not really required, but we don't want to depend on the + // scheduler implementation. + var streamErrors = true + try consumer.synchronized { + // In case the task has been cancelled, there's no point + // in continuing to do anything else + if (cancelable.isCanceled) { + streamErrors = false + asyncCb.onSuccess(Stop) + } else { + if (shouldCommitAfter) consumerCommit(consumer) + streamErrors = false + asyncCb.onSuccess(ack) + } + } catch { + case NonFatal(ex) => + if (streamErrors) asyncCb.onError(ex) + else s.reportFailure(ex) + } + + case Failure(ex) => + isAcked = true + asyncCb.onError(ex) + } + } + cancelable + } +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala b/kafka-4.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala new file mode 100644 index 00000000..eeae924d --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import monix.eval.Task +import monix.execution.Ack.Stop +import monix.execution.cancelables.BooleanCancelable +import monix.execution.{Ack, Callback} +import monix.reactive.Observer +import monix.reactive.observers.Subscriber +import org.apache.kafka.clients.consumer.{Consumer, OffsetAndMetadata, OffsetCommitCallback} +import org.apache.kafka.common.TopicPartition + +import java.util +import scala.concurrent.{blocking, Future} +import scala.util.control.NonFatal +import scala.util.{Failure, Success} +import scala.jdk.CollectionConverters._ + +/** KafkaConsumerObservable with ability to manual commit offsets + * and forcibly disables auto commits in configuration. + * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. + */ +final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( + override protected val config: KafkaConsumerConfig, + override protected val consumerT: Task[Consumer[K, V]]) + extends KafkaConsumerObservable[K, V, CommittableMessage[K, V]] { + + // Caching value to save CPU cycles + private val pollTimeoutMillis = config.fetchMaxWaitTime.toMillis + + case class CommitWithConsumer(consumer: Consumer[K, V]) extends Commit { + + override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = + Task(blocking(consumer.synchronized(consumer.commitSync(batch.map { case (k, v) => + k -> new OffsetAndMetadata(v) + }.asJava)))) + + override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = { + Task + .async0[Unit] { (s, cb) => + val asyncCb = Callback.forked(cb)(s) + s.execute { () => + val offsets = batch.map { case (k, v) => k -> new OffsetAndMetadata(v) }.asJava + val offsetCommitCallback = new OffsetCommitCallback { + def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], ex: Exception): Unit = + if (ex != null && !cb.tryOnError(ex)) { s.reportFailure(ex) } + else { cb.tryOnSuccess(()) } + } + try { + consumer.synchronized(consumer.commitAsync(offsets, offsetCommitCallback)) + } catch { + case NonFatal(ex) => + if (!asyncCb.tryOnError(ex)) { s.reportFailure(ex) } + } + } + } + } + } + + override protected def ackTask(consumer: Consumer[K, V], out: Subscriber[CommittableMessage[K, V]]): Task[Ack] = + Task.create { (scheduler, cb) => + implicit val s = scheduler + val asyncCb = Callback.forked(cb) + val cancelable = BooleanCancelable() + + val commit: Commit = CommitWithConsumer(consumer) + + // Forced asynchronous boundary (on the I/O scheduler) + s.execute { () => + val ackFuture: Future[Ack] = + if (cancelable.isCanceled) Stop + else { + try consumer.synchronized { + val assignment = consumer.assignment() + consumer.resume(assignment) + val next = blocking(consumer.poll(java.time.Duration.ofMillis(pollTimeoutMillis))) + consumer.pause(assignment) + val result = next.asScala.map { record => + CommittableMessage( + record, + CommittableOffset( + new TopicPartition(record.topic(), record.partition()), + record.offset() + 1, + commit)) + } + // Feeding the observer happens on the Subscriber's scheduler + // if any asynchronous boundaries happen + isAcked = false + Observer.feed(out, result)(out.scheduler) + } catch { + case NonFatal(ex) => + Future.failed(ex) + } + + } + + ackFuture.syncOnComplete { + case Success(ack) => + isAcked = true + // The `streamError` flag protects against contract violations + // (i.e. onSuccess/onError should happen only once). + // Not really required, but we don't want to depend on the + // scheduler implementation. + var streamErrors = true + try { + // In case the task has been cancelled, there's no point + // in continuing to do anything else + if (cancelable.isCanceled) { + streamErrors = false + asyncCb.onSuccess(Stop) + } else { + streamErrors = false + asyncCb.onSuccess(ack) + } + } catch { + case NonFatal(ex) => + if (streamErrors) asyncCb.onError(ex) + else s.reportFailure(ex) + } + + case Failure(ex) => + isAcked = true + asyncCb.onError(ex) + } + } + cancelable + } +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/KafkaProducer.scala b/kafka-4.0.x/src/main/scala/monix/kafka/KafkaProducer.scala new file mode 100644 index 00000000..ef9d4024 --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/KafkaProducer.scala @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import com.typesafe.scalalogging.StrictLogging +import monix.eval.{Coeval, Task} +import monix.execution.atomic.Atomic +import monix.execution.cancelables.{SingleAssignCancelable, StackedCancelable} +import monix.execution.{Callback, Cancelable, Scheduler} +import org.apache.kafka.clients.producer.{ + ProducerRecord, + RecordMetadata, + Callback => KafkaCallback, + Producer => ApacheProducer, + KafkaProducer => ApacheKafkaProducer +} + +import scala.util.control.NonFatal + +/** Wraps the Kafka Producer. + * + * Calling `producer.send` returns a `Task[Option[RecordMetadata]]` + * which can then be run and transformed into a `Future`. + * + * If the `Task` completes with `None` it means that `producer.send` + * method was called after the producer was closed and that + * the message wasn't successfully acknowledged by the Kafka broker. + * In case of the failure of the underlying Kafka client the producer + * will bubble up the exception and fail the `Task`. + * + * All successfully delivered messages will complete with `Some[RecordMetadata]`. + */ +trait KafkaProducer[K, V] extends Serializable { + def underlying: Task[ApacheProducer[K, V]] + def send(topic: String, value: V): Task[Option[RecordMetadata]] + def send(topic: String, key: K, value: V): Task[Option[RecordMetadata]] + def send(record: ProducerRecord[K, V]): Task[Option[RecordMetadata]] + def close(): Task[Unit] +} + +object KafkaProducer { + + /** Builds a [[KafkaProducer]] instance. */ + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)(implicit + K: Serializer[K], + V: Serializer[V]): KafkaProducer[K, V] = { + val producerRef: Coeval[ApacheProducer[K, V]] = Coeval { + val keySerializer = K.create() + val valueSerializer = V.create() + val configJavaMap = config.toJavaMap + keySerializer.configure(configJavaMap, true) + valueSerializer.configure(configJavaMap, false) + new ApacheKafkaProducer[K, V](configJavaMap, keySerializer, valueSerializer) + } + new Implementation[K, V](config, sc, producerRef) + } + + /** Builds a [[KafkaProducer]] instance with provided Apache Producer. */ + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, producerRef: Coeval[ApacheProducer[K, V]])(implicit + K: Serializer[K], + V: Serializer[V]): KafkaProducer[K, V] = { + new Implementation[K, V](config, sc, producerRef) + } + + private final class Implementation[K, V]( + config: KafkaProducerConfig, + sc: Scheduler, + producer: Coeval[ApacheProducer[K, V]])(implicit K: Serializer[K], V: Serializer[V]) + extends KafkaProducer[K, V] with StrictLogging { + + private val isCanceled = Atomic(false) + + lazy val producerRef = producer.value() + + def underlying: Task[ApacheProducer[K, V]] = + Task.eval(producerRef) + + def send(topic: String, value: V): Task[Option[RecordMetadata]] = + send(new ProducerRecord[K, V](topic, value)) + + def send(topic: String, key: K, value: V): Task[Option[RecordMetadata]] = + send(new ProducerRecord[K, V](topic, key, value)) + + def send(record: ProducerRecord[K, V]): Task[Option[RecordMetadata]] = + Task.create[Option[RecordMetadata]] { (s, cb) => + val asyncCb = Callback.forked(cb)(s) + val connection = StackedCancelable() + // Forcing asynchronous boundary + sc.execute(() => { + if (isCanceled.get()) { + asyncCb.onSuccess(None) + } else { + val isActive = Atomic(true) + val cancelable = SingleAssignCancelable() + try { + // Using asynchronous API + val future = producerRef.send( + record, + new KafkaCallback { + def onCompletion(meta: RecordMetadata, exception: Exception): Unit = + if (isActive.getAndSet(false)) { + connection.pop() + if (exception != null) + asyncCb.onError(exception) + else + asyncCb.onSuccess(Option(meta)) + } else if (exception != null) { + s.reportFailure(exception) + } + } + ) + + cancelable := Cancelable(() => future.cancel(false)) + connection.push(cancelable) + } catch { + case NonFatal(ex) => + // Needs synchronization, otherwise we are violating the contract + if (isActive.compareAndSet(expect = true, update = false)) { + connection.pop() + ex match { + case _: IllegalStateException if isCanceled.get() => + asyncCb.onSuccess(None) + case _ => + asyncCb.onError(ex) + } + } else { + s.reportFailure(ex) + } + } + } + }) + connection + } + + def close(): Task[Unit] = + Task.create { (s, cb) => + val asyncCb = Callback.forked(cb)(s) + // Forcing asynchronous boundary + sc.execute { () => + { + if (!isCanceled.compareAndSet(expect = false, update = true)) { + asyncCb.onSuccess(()) + } else { + try { + producerRef.close() + asyncCb.onSuccess(()) + } catch { + case NonFatal(ex) => + asyncCb.onError(ex) + } + } + } + } + } + } +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala b/kafka-4.0.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala new file mode 100644 index 00000000..6ad7d1a7 --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala @@ -0,0 +1,416 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import java.io.File +import java.util.Properties + +import com.typesafe.config.{Config, ConfigFactory} +import monix.kafka.config._ + +import scala.jdk.CollectionConverters._ +import scala.concurrent.duration._ + +/** The Kafka Producer config. + * + * For the official documentation on the available configuration + * options, see + * [[https://kafka.apache.org/documentation.html#producerconfigs Producer Configs]] + * on `kafka.apache.org`. + * + * @param bootstrapServers is the `bootstrap.servers` setting + * and represents the list of servers to connect to. + * + * @param acks is the `acks` setting and represents + * the number of acknowledgments the producer requires the leader to + * have received before considering a request complete. + * See [[monix.kafka.config.Acks Acks]]. + * + * @param bufferMemoryInBytes is the `buffer.memory` setting and + * represents the total bytes of memory the producer + * can use to buffer records waiting to be sent to the server. + * + * @param compressionType is the `compression.type` setting and specifies + * what compression algorithm to apply to all the generated data + * by the producer. The default is none (no compression applied). + * + * @param retries is the `retries` setting. A value greater than zero will + * cause the client to resend any record whose send fails with + * a potentially transient error. + * + * @param sslKeyPassword is the `ssl.key.password` setting and represents + * the password of the private key in the key store file. + * This is optional for client. + * + * @param sslKeyStorePassword is the `ssl.keystore.password` setting, + * being the password of the private key in the key store file. + * This is optional for client. + * + * @param sslKeyStoreLocation is the `ssl.keystore.location` setting and + * represents the location of the key store file. This is optional + * for client and can be used for two-way authentication for client. + * + * @param sslTrustStoreLocation is the `ssl.truststore.location` setting + * and is the location of the trust store file. + * + * @param sslTrustStorePassword is the `ssl.truststore.password` setting + * and is the password for the trust store file. + * + * @param batchSizeInBytes is the `batch.size` setting. + * The producer will attempt to batch records together into fewer + * requests whenever multiple records are being sent to the + * same partition. This setting specifies the maximum number of + * records to batch together. + * + * @param clientId is the `client.id` setting, + * an id string to pass to the server when making requests. + * The purpose of this is to be able to track the source of + * requests beyond just ip/port by allowing a logical application + * name to be included in server-side request logging. + * + * @param connectionsMaxIdleTime is the `connections.max.idle.ms` setting + * and specifies how much time to wait before closing idle connections. + * + * @param lingerTime is the `linger.ms` setting + * and specifies to buffer records for more efficient batching, + * up to the maximum batch size or for the maximum `lingerTime`. + * If zero, then no buffering will happen, but if different + * from zero, then records will be delayed in absence of load. + * + * @param maxBlockTime is the `max.block.ms` setting. + * The configuration controls how long `KafkaProducer.send()` and + * `KafkaProducer.partitionsFor()` will block. These methods can be + * blocked either because the buffer is full or metadata unavailable. + * + * @param maxRequestSizeInBytes is the `max.request.size` setting + * and represents the maximum size of a request in bytes. + * This is also effectively a cap on the maximum record size. + * + * @param maxInFlightRequestsPerConnection is the `max.in.flight.requests.per.connection` setting + * and represents the maximum number of unacknowledged request the client will send + * on a single connection before blocking. + * If this setting is set to be greater than 1 and there are failed sends, + * there is a risk of message re-ordering due to retries (if enabled). + * + * @param partitionerClass is the `partitioner.class` setting + * and represents a class that implements the + * `org.apache.kafka.clients.producer.Partitioner` interface. + * + * @param receiveBufferInBytes is the `receive.buffer.bytes` setting + * being the size of the TCP receive buffer (SO_RCVBUF) to use + * when reading data. + * + * @param requestTimeout is the `request.timeout.ms` setting, + * a configuration the controls the maximum amount of time + * the client will wait for the response of a request. + * + * @param saslKerberosServiceName is the `sasl.kerberos.service.name` setting, + * being the Kerberos principal name that Kafka runs as. + * + * @param saslMechanism is the `sasl.mechanism` setting, being the SASL + * mechanism used for client connections. This may be any mechanism + * for which a security provider is available. + * + * @param securityProtocol is the `security.protocol` setting, + * being the protocol used to communicate with brokers. + * + * @param sendBufferInBytes is the `send.buffer.bytes` setting, + * being the size of the TCP send buffer (SO_SNDBUF) to use + * when sending data. + * + * @param sslEnabledProtocols is the `ssl.enabled.protocols` setting, + * being the list of protocols enabled for SSL connections. + * + * @param sslKeystoreType is the `ssl.keystore.type` setting, + * being the file format of the key store file. + * + * @param sslProtocol is the `ssl.protocol` setting, + * being the SSL protocol used to generate the SSLContext. + * Default setting is TLS, which is fine for most cases. + * Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, + * SSLv2 and SSLv3 may be supported in older JVMs, but their usage + * is discouraged due to known security vulnerabilities. + * + * @param sslProvider is the `ssl.provider` setting, + * being the name of the security provider used for SSL connections. + * Default value is the default security provider of the JVM. + * + * @param sslTruststoreType is the `ssl.truststore.type` setting, being + * the file format of the trust store file. + * + * @param reconnectBackoffTime is the `reconnect.backoff.ms` setting. + * The amount of time to wait before attempting to reconnect to a + * given host. This avoids repeatedly connecting to a host in a + * tight loop. This backoff applies to all requests sent by the + * consumer to the broker. + * + * @param retryBackoffTime is the `retry.backoff.ms` setting. + * The amount of time to wait before attempting to retry a failed + * request to a given topic partition. This avoids repeatedly + * sending requests in a tight loop under some failure scenarios. + * + * @param metadataMaxAge is the `metadata.max.age.ms` setting. + * The period of time in milliseconds after which we force a + * refresh of metadata even if we haven't seen any partition + * leadership changes to proactively discover any new brokers + * or partitions. + * + * @param metricReporters is the `metric.reporters` setting. + * A list of classes to use as metrics reporters. Implementing the + * `MetricReporter` interface allows plugging in classes that will + * be notified of new metric creation. The JmxReporter is always + * included to register JMX statistics + * + * @param metricsNumSamples is the `metrics.num.samples` setting. + * The number of samples maintained to compute metrics. + * + * @param metricsSampleWindow is the `metrics.sample.window.ms` setting. + * The metrics system maintains a configurable number of samples over + * a fixed window size. This configuration controls the size of the + * window. For example we might maintain two samples each measured + * over a 30 second period. When a window expires we erase and + * overwrite the oldest window. + * + * @param monixSinkParallelism is the `monix.producer.sink.parallelism` + * setting indicating how many requests the [[KafkaProducerSink]] + * can execute in parallel. + * + * @param properties map of other properties that will be passed to + * the underlying kafka client. Any properties not explicitly handled + * by this object can be set via the map, but in case of a duplicate + * a value set on the case class will overwrite value set via properties. + */ +final case class KafkaProducerConfig( + bootstrapServers: List[String], + acks: Acks, + bufferMemoryInBytes: Int, + compressionType: CompressionType, + retries: Int, + sslKeyPassword: Option[String], + sslKeyStorePassword: Option[String], + sslKeyStoreLocation: Option[String], + sslTrustStoreLocation: Option[String], + sslTrustStorePassword: Option[String], + batchSizeInBytes: Int, + clientId: String, + connectionsMaxIdleTime: FiniteDuration, + lingerTime: FiniteDuration, + maxBlockTime: FiniteDuration, + maxRequestSizeInBytes: Int, + maxInFlightRequestsPerConnection: Int, + partitionerClass: Option[PartitionerName], + receiveBufferInBytes: Int, + requestTimeout: FiniteDuration, + saslKerberosServiceName: Option[String], + saslMechanism: String, + securityProtocol: SecurityProtocol, + sendBufferInBytes: Int, + sslEnabledProtocols: List[SSLProtocol], + sslKeystoreType: String, + sslProtocol: SSLProtocol, + sslProvider: Option[String], + sslTruststoreType: String, + reconnectBackoffTime: FiniteDuration, + retryBackoffTime: FiniteDuration, + metadataMaxAge: FiniteDuration, + metricReporters: List[String], + metricsNumSamples: Int, + metricsSampleWindow: FiniteDuration, + monixSinkParallelism: Int, + properties: Map[String, String]) { + + def toMap: Map[String, String] = properties ++ Map( + "bootstrap.servers" -> bootstrapServers.mkString(","), + "acks" -> acks.id, + "buffer.memory" -> bufferMemoryInBytes.toString, + "compression.type" -> compressionType.id, + "retries" -> retries.toString, + "ssl.key.password" -> sslKeyPassword.orNull, + "ssl.keystore.password" -> sslKeyStorePassword.orNull, + "ssl.keystore.location" -> sslKeyStoreLocation.orNull, + "ssl.truststore.password" -> sslTrustStorePassword.orNull, + "ssl.truststore.location" -> sslTrustStoreLocation.orNull, + "batch.size" -> batchSizeInBytes.toString, + "client.id" -> clientId, + "connections.max.idle.ms" -> connectionsMaxIdleTime.toMillis.toString, + "linger.ms" -> lingerTime.toMillis.toString, + "max.block.ms" -> maxBlockTime.toMillis.toString, + "max.request.size" -> maxRequestSizeInBytes.toString, + "max.in.flight.requests.per.connection" -> maxInFlightRequestsPerConnection.toString, + "partitioner.class" -> partitionerClass.map(_.className).orNull, + "receive.buffer.bytes" -> receiveBufferInBytes.toString, + "request.timeout.ms" -> requestTimeout.toMillis.toString, + "sasl.kerberos.service.name" -> saslKerberosServiceName.orNull, + "sasl.mechanism" -> saslMechanism, + "security.protocol" -> securityProtocol.id, + "send.buffer.bytes" -> sendBufferInBytes.toString, + "ssl.enabled.protocols" -> sslEnabledProtocols.map(_.id).mkString(","), + "ssl.keystore.type" -> sslKeystoreType, + "ssl.protocol" -> sslProtocol.id, + "ssl.provider" -> sslProvider.orNull, + "ssl.truststore.type" -> sslTruststoreType, + "reconnect.backoff.ms" -> reconnectBackoffTime.toMillis.toString, + "retry.backoff.ms" -> retryBackoffTime.toMillis.toString, + "metadata.max.age.ms" -> metadataMaxAge.toMillis.toString, + "metric.reporters" -> metricReporters.mkString(","), + "metrics.num.samples" -> metricsNumSamples.toString, + "metrics.sample.window.ms" -> metricsSampleWindow.toMillis.toString + ) + + def toJavaMap: java.util.Map[String, Object] = + toMap.filter(_._2 != null).map { case (a, b) => (a, b.asInstanceOf[AnyRef]) }.asJava + + def toProperties: Properties = { + val props = new Properties() + for ((k, v) <- toMap; if v != null) props.put(k, v) + props + } +} + +object KafkaProducerConfig { + private val defaultRootPath = "kafka" + + lazy private val defaultConf: Config = + ConfigFactory.load("monix/kafka/default.conf").getConfig(defaultRootPath) + + /** Returns the default configuration, specified the `monix-kafka` project + * in `monix/kafka/default.conf`. + */ + lazy val default: KafkaProducerConfig = + apply(defaultConf, includeDefaults = false) + + /** Loads the [[KafkaProducerConfig]] either from a file path or + * from a resource, if `config.file` or `config.resource` are + * defined, or otherwise returns the default config. + * + * If you want to specify a `config.file`, you can configure the + * Java process on execution like so: + * {{{ + * java -Dconfig.file=/path/to/application.conf + * }}} + * + * Or if you want to specify a `config.resource` to be loaded + * from the executable's distributed JAR or classpath: + * {{{ + * java -Dconfig.resource=com/company/mySpecial.conf + * }}} + * + * In case neither of these are specified, then the configuration + * loaded is the default one, from the `monix-kafka` project, specified + * in `monix/kafka/default.conf`. + */ + def load(): KafkaProducerConfig = + Option(System.getProperty("config.file")).map(f => new File(f)) match { + case Some(file) if file.exists() => + loadFile(file) + case _ => + Option(System.getProperty("config.resource")) match { + case Some(resource) => + loadResource(resource) + case None => + default + } + } + + /** Loads a [[KafkaProducerConfig]] from a project resource. + * + * @param resourceBaseName is the resource from where to load the config + * @param rootPath is the config root path (e.g. `kafka`) + * @param includeDefaults should be `true` in case you want to fallback + * to the default values provided by the `monix-kafka` library + * in `monix/kafka/default.conf` + */ + def loadResource( + resourceBaseName: String, + rootPath: String = defaultRootPath, + includeDefaults: Boolean = true): KafkaProducerConfig = + apply(ConfigFactory.load(resourceBaseName).getConfig(rootPath), includeDefaults) + + /** Loads a [[KafkaProducerConfig]] from a specified file. + * + * @param file is the configuration path from where to load the config + * @param rootPath is the config root path (e.g. `kafka`) + * @param includeDefaults should be `true` in case you want to fallback + * to the default values provided by the `monix-kafka` library + * in `monix/kafka/default.conf` + */ + def loadFile(file: File, rootPath: String = defaultRootPath, includeDefaults: Boolean = true): KafkaProducerConfig = + apply(ConfigFactory.parseFile(file).resolve().getConfig(rootPath), includeDefaults) + + /** Loads the [[KafkaProducerConfig]] from a parsed + * `com.typesafe.config.Config` reference. + * + * NOTE that this method doesn't assume any path prefix for loading the + * configuration settings, so it does NOT assume a root path like `kafka`. + * In case case you need that, you can always do: + * + * {{{ + * KafkaProducerConfig(globalConfig.getConfig("kafka")) + * }}} + * + * @param source is the typesafe `Config` object to read from + * @param includeDefaults should be `true` in case you want to fallback + * to the default values provided by the `monix-kafka` library + * in `monix/kafka/default.conf` + */ + def apply(source: Config, includeDefaults: Boolean = true): KafkaProducerConfig = { + val config = if (!includeDefaults) source else source.withFallback(defaultConf) + def getOptString(path: String): Option[String] = + if (config.hasPath(path)) Option(config.getString(path)) + else None + + KafkaProducerConfig( + bootstrapServers = config.getString("bootstrap.servers").trim.split("\\s*,\\s*").toList, + acks = Acks(config.getString("acks")), + bufferMemoryInBytes = config.getInt("buffer.memory"), + compressionType = CompressionType(config.getString("compression.type")), + retries = config.getInt("retries"), + sslKeyPassword = getOptString("ssl.key.password"), + sslKeyStorePassword = getOptString("ssl.keystore.password"), + sslKeyStoreLocation = getOptString("ssl.keystore.location"), + sslTrustStorePassword = getOptString("ssl.truststore.password"), + sslTrustStoreLocation = getOptString("ssl.truststore.location"), + batchSizeInBytes = config.getInt("batch.size"), + clientId = config.getString("client.id"), + connectionsMaxIdleTime = config.getInt("connections.max.idle.ms").millis, + lingerTime = config.getInt("linger.ms").millis, + maxBlockTime = config.getInt("max.block.ms").millis, + maxRequestSizeInBytes = config.getInt("max.request.size"), + maxInFlightRequestsPerConnection = config.getInt("max.in.flight.requests.per.connection"), + partitionerClass = getOptString("partitioner.class").filter(_.nonEmpty).map(PartitionerName.apply), + receiveBufferInBytes = config.getInt("receive.buffer.bytes"), + requestTimeout = config.getInt("request.timeout.ms").millis, + saslKerberosServiceName = getOptString("sasl.kerberos.service.name"), + saslMechanism = config.getString("sasl.mechanism"), + securityProtocol = SecurityProtocol(config.getString("security.protocol")), + sendBufferInBytes = config.getInt("send.buffer.bytes"), + sslEnabledProtocols = config.getString("ssl.enabled.protocols").split("\\s*,\\s*").map(SSLProtocol.apply).toList, + sslKeystoreType = config.getString("ssl.keystore.type"), + sslProtocol = SSLProtocol(config.getString("ssl.protocol")), + sslProvider = getOptString("ssl.provider"), + sslTruststoreType = config.getString("ssl.truststore.type"), + reconnectBackoffTime = config.getInt("reconnect.backoff.ms").millis, + retryBackoffTime = config.getInt("retry.backoff.ms").millis, + metadataMaxAge = config.getInt("metadata.max.age.ms").millis, + metricReporters = config.getString("metric.reporters").trim.split("\\s*,\\s*").toList, + metricsNumSamples = config.getInt("metrics.num.samples"), + metricsSampleWindow = config.getInt("metrics.sample.window.ms").millis, + monixSinkParallelism = config.getInt("monix.producer.sink.parallelism"), + properties = Map.empty + ) + } +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/KafkaProducerSink.scala b/kafka-4.0.x/src/main/scala/monix/kafka/KafkaProducerSink.scala new file mode 100644 index 00000000..c38a396e --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/KafkaProducerSink.scala @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import com.typesafe.scalalogging.StrictLogging +import monix.eval.{Coeval, Task} +import monix.execution.Ack.{Continue, Stop} +import monix.execution.cancelables.AssignableCancelable +import monix.execution.{Ack, Callback, Scheduler} +import monix.reactive.Consumer +import monix.reactive.observers.Subscriber +import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata} + +import scala.concurrent.Future +import scala.util.{Failure, Success} + +/** A `monix.reactive.Consumer` that pushes incoming messages into + * a [[KafkaProducer]]. + * + * You can customize behavior in case of an errors when sending messages to Kafka + * with `onSendError`. The `Task` should return one of: + * - `Continue` to ignore the errors and try again with the next batch + * - `Stop` to stop the stream gracefully which will also commit latest batch if using [[KafkaConsumerObservableAutoCommit]] + * - An error with `Task.raiseError` which will finish the stream with an error. + */ +final class KafkaProducerSink[K, V] private ( + producer: Coeval[KafkaProducer[K, V]], + shouldTerminate: Boolean, + parallelism: Int, + onSendError: Throwable => Task[Ack]) + extends Consumer[Seq[ProducerRecord[K, V]], Unit] with StrictLogging with Serializable { + + require(parallelism >= 1, "parallelism >= 1") + + def createSubscriber( + cb: Callback[Throwable, Unit], + s: Scheduler): (Subscriber[Seq[ProducerRecord[K, V]]], AssignableCancelable.Multi) = { + val out = new Subscriber[Seq[ProducerRecord[K, V]]] { self => + implicit val scheduler: Scheduler = s + private[this] val p = producer.memoize + private[this] var isActive = true + + def onNext(list: Seq[ProducerRecord[K, V]]): Future[Ack] = + self.synchronized { + if (!isActive) Stop + else { + val sendTask: Task[Seq[Option[RecordMetadata]]] = + if (parallelism == 1) + Task.traverse(list)(p.value().send(_)) + else { + Task.parTraverseN(parallelism)(list)(r => p.value().send(r)) + } + + val recovered = sendTask.redeemWith( + ex => onSendError(ex), + _ => Task.pure(Continue) + ) + + recovered.runToFuture + } + } + + def terminate(cb: => Unit): Unit = + self.synchronized { + if (isActive) { + isActive = false + + if (!shouldTerminate) cb + else + Task(p.value().close()).flatten.materialize.foreach { + case Success(_) => cb + case Failure(ex) => + logger.error("Unexpected error in KafkaProducerSink", ex) + cb + } + } + } + + def onError(ex: Throwable): Unit = + terminate(cb.onError(ex)) + def onComplete(): Unit = + terminate(cb.onSuccess(())) + } + + (out, AssignableCancelable.dummy) + } +} + +object KafkaProducerSink extends StrictLogging { + + private[this] val onSendErrorDefault = (ex: Throwable) => + Task { + logger.error("Unexpected error in KafkaProducerSink", ex) + Continue + } + + /** Builder for [[KafkaProducerSink]]. */ + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)(implicit + K: Serializer[K], + V: Serializer[V]): KafkaProducerSink[K, V] = apply(config, sc, onSendErrorDefault) + + /** Builder for [[KafkaProducerSink]]. */ + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, onSendError: Throwable => Task[Ack])(implicit + K: Serializer[K], + V: Serializer[V]): KafkaProducerSink[K, V] = { + + val producer = Coeval(KafkaProducer[K, V](config, sc)) + new KafkaProducerSink(producer, shouldTerminate = true, parallelism = config.monixSinkParallelism, onSendError) + } + + /** Builder for [[KafkaProducerSink]]. */ + def apply[K, V](producer: Coeval[KafkaProducer[K, V]], parallelism: Int): KafkaProducerSink[K, V] = + apply(producer, parallelism, onSendErrorDefault) + + /** Builder for [[KafkaProducerSink]]. */ + def apply[K, V]( + producer: Coeval[KafkaProducer[K, V]], + parallelism: Int, + onSendError: Throwable => Task[Ack]): KafkaProducerSink[K, V] = + new KafkaProducerSink(producer, shouldTerminate = false, parallelism = parallelism, onSendError) +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/Serializer.scala b/kafka-4.0.x/src/main/scala/monix/kafka/Serializer.scala new file mode 100644 index 00000000..367b8517 --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/Serializer.scala @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import java.nio.ByteBuffer +import org.apache.kafka.common.serialization._ +import org.apache.kafka.common.serialization.{Serializer => KafkaSerializer} +import org.apache.kafka.common.utils.Bytes + +/** Wraps a Kafka `Serializer`, provided for + * convenience, since it can be implicitly fetched + * from the context. + * + * @param className is the full package path to the Kafka `Serializer` + * + * @param classType is the `java.lang.Class` for [[className]] + * + * @param constructor creates an instance of [[classType]]. + * This is defaulted with a `Serializer.Constructor[A]` function that creates a + * new instance using an assumed empty constructor. + * Supplying this parameter allows for manual provision of the `Serializer`. + */ +final case class Serializer[A]( + className: String, + classType: Class[_ <: KafkaSerializer[A]], + constructor: Serializer.Constructor[A] = (s: Serializer[A]) => s.classType.getDeclaredConstructor().newInstance()) { + + /** Creates a new instance. */ + def create(): KafkaSerializer[A] = + constructor(this) +} + +object Serializer { + + implicit def fromKafkaSerializer[A](implicit ser: KafkaSerializer[A]): Serializer[A] = + Serializer[A]( + className = ser.getClass.getName, + classType = ser.getClass, + constructor = _ => ser + ) + + /** Alias for the function that provides an instance of + * the Kafka `Serializer`. + */ + type Constructor[A] = (Serializer[A]) => KafkaSerializer[A] + + implicit val forStrings: Serializer[String] = + Serializer[String]( + className = "org.apache.kafka.common.serialization.StringSerializer", + classType = classOf[StringSerializer] + ) + + implicit val forByteArray: Serializer[Array[Byte]] = + Serializer[Array[Byte]]( + className = "org.apache.kafka.common.serialization.ByteArraySerializer", + classType = classOf[ByteArraySerializer] + ) + + implicit val forByteBuffer: Serializer[ByteBuffer] = + Serializer[ByteBuffer]( + className = "org.apache.kafka.common.serialization.ByteBufferSerializer", + classType = classOf[ByteBufferSerializer] + ) + + implicit val forBytes: Serializer[Bytes] = + Serializer[Bytes]( + className = "org.apache.kafka.common.serialization.BytesSerializer", + classType = classOf[BytesSerializer] + ) + + implicit val forJavaDouble: Serializer[java.lang.Double] = + Serializer[java.lang.Double]( + className = "org.apache.kafka.common.serialization.DoubleSerializer", + classType = classOf[DoubleSerializer] + ) + + implicit val forJavaInteger: Serializer[java.lang.Integer] = + Serializer[java.lang.Integer]( + className = "org.apache.kafka.common.serialization.IntegerSerializer", + classType = classOf[IntegerSerializer] + ) + + implicit val forJavaLong: Serializer[java.lang.Long] = + Serializer[java.lang.Long]( + className = "org.apache.kafka.common.serialization.LongSerializer", + classType = classOf[LongSerializer] + ) +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/config/Acks.scala b/kafka-4.0.x/src/main/scala/monix/kafka/config/Acks.scala new file mode 100644 index 00000000..87661b2e --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/config/Acks.scala @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka.config + +import com.typesafe.config.ConfigException.BadValue + +/** Enumeration for specifying the `acks` setting in + * [[monix.kafka.KafkaProducerConfig KafkaProducerConfig]]. + * + * Represents the number of acknowledgments the producer requires + * the leader to have received before considering a request complete. + * This controls the durability of records that are sent. + * + * For the available options see: + * + * - [[Acks.Zero]] + * - [[Acks.NonZero]] + * - [[Acks.All]] + */ +sealed trait Acks extends Product with Serializable { + def id: String +} + +object Acks { + + @throws(classOf[BadValue]) + def apply(id: String): Acks = + id match { + case All.id => All + case Number(nrStr) => + val nr = nrStr.toInt + if (nr == 0) Zero else NonZero(nr) + case _ => + throw new BadValue("kafka.acks", s"Invalid value: $id") + } + + /** If set to zero then the producer will not wait + * for any acknowledgment from the server at all. + * + * The record will be immediately added to the socket buffer and + * considered sent. No guarantee can be made that the server has received + * the record in this case, and the retries configuration will not + * take effect (as the client won't generally know of any failures). + * The offset given back for each record will always be set to -1. + */ + case object Zero extends Acks { + val id = "0" + } + + /** This will mean the leader will write the record to its local + * log but will respond without awaiting full acknowledgement + * from all followers. + * + * In this case should the leader fail immediately after acknowledging + * the record but before the followers have replicated it then the + * record will be lost. + */ + final case class NonZero(nr: Int) extends Acks { + require(nr > 0, "nr > 0") + val id = nr.toString + } + + /** This means the leader will wait for the + * full set of in-sync replicas to acknowledge the record. + * + * This guarantees that the record will not be lost as long as + * at least one in-sync replica remains alive. This is the strongest + * available guarantee. + */ + case object All extends Acks { + val id = "all" + } + + // Regular expression for parsing IDs + private val Number = """^(\d+)$""".r +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/config/AutoOffsetReset.scala b/kafka-4.0.x/src/main/scala/monix/kafka/config/AutoOffsetReset.scala new file mode 100644 index 00000000..1f01689e --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/config/AutoOffsetReset.scala @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka.config + +import com.typesafe.config.ConfigException.BadValue + +/** What to do when there is no initial offset in Kafka or if the + * current offset does not exist any more on the server + * (e.g. because that data has been deleted). + * + * Available choices: + * + * - [[AutoOffsetReset.Earliest]] + * - [[AutoOffsetReset.Latest]] + * - [[AutoOffsetReset.Throw]] + */ +sealed trait AutoOffsetReset extends Serializable { + def id: String +} + +object AutoOffsetReset { + + @throws(classOf[BadValue]) + def apply(id: String): AutoOffsetReset = + id.trim.toLowerCase match { + case Earliest.id => Earliest + case Latest.id => Latest + case Throw.id => Throw + case _ => + throw new BadValue("kafka.auto.offset.reset", s"Invalid value: $id") + } + + /** Automatically reset the offset to the earliest offset. */ + case object Earliest extends AutoOffsetReset { + val id = "earliest" + } + + /** Automatically reset the offset to the latest offset. */ + case object Latest extends AutoOffsetReset { + val id = "latest" + } + + /** Throw exception to the consumer if no previous offset + * is found for the consumer's group. + */ + case object Throw extends AutoOffsetReset { + val id = "none" + } +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/config/ClassName.scala b/kafka-4.0.x/src/main/scala/monix/kafka/config/ClassName.scala new file mode 100644 index 00000000..51ef7d5a --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/config/ClassName.scala @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka.config + +import scala.reflect.ClassTag + +abstract class ClassName[T](implicit T: ClassTag[T]) extends Serializable { + def className: String + + val classType: Class[_ <: T] = + Class.forName(className).asInstanceOf[Class[_ <: T]] + + require( + findClass(classType :: Nil, T.runtimeClass), + s"Given type $className does not implement ${T.runtimeClass}" + ) + + private def findClass(stack: List[Class[_]], searched: Class[_]): Boolean = + stack match { + case Nil => false + case x :: xs => + if (x == searched) true + else { + val superClass: List[Class[_]] = Option(x.getSuperclass).toList + val rest = superClass ::: x.getInterfaces.toList ::: xs + findClass(rest, searched) + } + } +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/config/CompressionType.scala b/kafka-4.0.x/src/main/scala/monix/kafka/config/CompressionType.scala new file mode 100644 index 00000000..2b07c1cf --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/config/CompressionType.scala @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka.config + +import com.typesafe.config.ConfigException.BadValue + +/** The compression type for all data generated by the producer, + * the `compression.type` from the Kafka Producer configuration. + * + * The default is none (i.e. no compression). Compression is of full + * batches of data, so the efficacy of batching will also impact + * the compression ratio (more batching means better compression). + * + * Valid values: + * + * - [[CompressionType.Uncompressed]] + * - [[CompressionType.Gzip]] + * - [[CompressionType.Snappy]] + * - [[CompressionType.Lz4]] + */ +sealed trait CompressionType extends Serializable { + def id: String +} + +object CompressionType { + + @throws(classOf[BadValue]) + def apply(id: String): CompressionType = + id match { + case Uncompressed.id => Uncompressed + case Gzip.id => Gzip + case Snappy.id => Snappy + case Lz4.id => Lz4 + case _ => + throw new BadValue("kafka.compression.type", s"Invalid value: $id") + } + + case object Uncompressed extends CompressionType { + val id = "none" + } + + case object Gzip extends CompressionType { + val id = "gzip" + } + + case object Snappy extends CompressionType { + val id = "snappy" + } + + case object Lz4 extends CompressionType { + val id = "lz4" + } +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/config/ObservableCommitOrder.scala b/kafka-4.0.x/src/main/scala/monix/kafka/config/ObservableCommitOrder.scala new file mode 100644 index 00000000..e586acc6 --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/config/ObservableCommitOrder.scala @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka.config + +import com.typesafe.config.ConfigException.BadValue + +/** Specifies the consumer commit order, to use by the + * [[monix.kafka.KafkaConsumerObservable KafkaConsumerObservable]] + * in case `kafka.enable.auto.commit` is set to `false`. + * + * Available options: + * + * - [[ObservableCommitOrder.BeforeAck]] specifies to do a commit + * before acknowledgement is received from downstream + * - [[ObservableCommitOrder.AfterAck]] specifies to do a commit + * after acknowledgement is received from downstream + * - [[ObservableCommitOrder.NoAck]] specifies to skip committing + */ +sealed trait ObservableCommitOrder extends Serializable { + def id: String + + def isBefore: Boolean = + this match { + case ObservableCommitOrder.BeforeAck => true + case _ => false + } + + def isAfter: Boolean = + this match { + case ObservableCommitOrder.AfterAck => true + case _ => false + } +} + +object ObservableCommitOrder { + + @throws(classOf[BadValue]) + def apply(id: String): ObservableCommitOrder = + id match { + case BeforeAck.id => BeforeAck + case AfterAck.id => AfterAck + case NoAck.id => NoAck + case _ => + throw new BadValue("kafka.monix.observable.commit.order", s"Invalid value: $id") + } + + /** Do a `commit` in the Kafka Consumer before + * receiving an acknowledgement from downstream. + */ + case object BeforeAck extends ObservableCommitOrder { + val id = "before-ack" + } + + /** Do a `commit` in the Kafka Consumer after + * receiving an acknowledgement from downstream. + */ + case object AfterAck extends ObservableCommitOrder { + val id = "after-ack" + } + + /** Do not `commit` in the Kafka Consumer. + */ + case object NoAck extends ObservableCommitOrder { + val id = "no-ack" + } +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/config/ObservableCommitType.scala b/kafka-4.0.x/src/main/scala/monix/kafka/config/ObservableCommitType.scala new file mode 100644 index 00000000..ddbb6c79 --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/config/ObservableCommitType.scala @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka.config + +import com.typesafe.config.ConfigException.BadValue + +/** Specifies the consumer commit type, to use by the + * [[monix.kafka.KafkaConsumerObservable KafkaConsumerObservable]] + * in case `kafka.enable.auto.commit` is set to `false`. + * + * Available options: + * + * - [[ObservableCommitType.Sync]] + * - [[ObservableCommitType.Async]] + */ +sealed trait ObservableCommitType extends Serializable { + def id: String +} + +object ObservableCommitType { + + @throws(classOf[BadValue]) + def apply(id: String): ObservableCommitType = + id match { + case Sync.id => Sync + case Async.id => Async + case _ => + throw new BadValue("kafka.monix.observable.commit.type", s"Invalid value: $id") + } + + /** Uses `consumer.commitSync()` after each batch + * if `enable.auto.commit` is `false`. + */ + case object Sync extends ObservableCommitType { + val id = "sync" + } + + /** Uses `consumer.commitAsync()` after each batch + * if `enable.auto.commit` is `false`. + */ + case object Async extends ObservableCommitType { + val id = "async" + } +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala b/kafka-4.0.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala new file mode 100644 index 00000000..29dd8acf --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/config/ObservableSeekOnStart.scala @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka.config + +import com.typesafe.config.ConfigException.BadValue + +/** Specifies whether to call `seekToEnd` or `seekToBeginning` when starting + * [[monix.kafka.KafkaConsumerObservable KafkaConsumerObservable]] + * + * Available options: + * + * - [[ObservableSeekOnStart.End]] + * - [[ObservableSeekOnStart.Beginning]] + * - [[ObservableSeekOnStart.NoSeek]] + */ +sealed trait ObservableSeekOnStart extends Serializable { + def id: String + + def isSeekBeginning: Boolean = + this match { + case ObservableSeekOnStart.Beginning => true + case _ => false + } + + def isSeekEnd: Boolean = + this match { + case ObservableSeekOnStart.End => true + case _ => false + } +} + +object ObservableSeekOnStart { + + @throws(classOf[BadValue]) + def apply(id: String): ObservableSeekOnStart = + id match { + case End.id => End + case Beginning.id => Beginning + case NoSeek.id => NoSeek + case _ => + throw new BadValue("kafka.monix.observable.seek.onStart", s"Invalid value: $id") + } + + /** Calls `consumer.seekToEnd()` when starting consumer. + */ + case object End extends ObservableSeekOnStart { + val id = "end" + } + + /** Calls `consumer.seekToBeginning()` when starting consumer. + */ + case object Beginning extends ObservableSeekOnStart { + val id = "beginning" + } + + /** Does not call neither `consumer.seekToEnd()` nor `consumer.seekToBeginning` + * when starting consumer. + */ + case object NoSeek extends ObservableSeekOnStart { + val id = "no-seek" + } +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/config/PartitionerName.scala b/kafka-4.0.x/src/main/scala/monix/kafka/config/PartitionerName.scala new file mode 100644 index 00000000..a6813237 --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/config/PartitionerName.scala @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka.config + +import org.apache.kafka.clients.producer.Partitioner +import scala.reflect.ClassTag + +final case class PartitionerName(className: String) extends ClassName[Partitioner] { + + /** Creates a new instance of the referenced `Serializer`. */ + def createInstance(): Partitioner = + classType.getDeclaredConstructor().newInstance() +} + +object PartitionerName { + + /** Builds a [[PartitionerName]], given a class. */ + def apply[C <: Partitioner](implicit C: ClassTag[C]): PartitionerName = + PartitionerName(C.runtimeClass.getCanonicalName) + + /** Returns the default `Partitioner` instance. */ + val default: PartitionerName = + PartitionerName("org.apache.kafka.clients.producer.internals.DefaultPartitioner") +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/config/SSLProtocol.scala b/kafka-4.0.x/src/main/scala/monix/kafka/config/SSLProtocol.scala new file mode 100644 index 00000000..836f3587 --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/config/SSLProtocol.scala @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka.config + +import java.security.NoSuchAlgorithmException +import javax.net.ssl.SSLContext + +import com.typesafe.config.ConfigException.BadValue + +/** Represents the available protocols to use for + * SSL connections. + * + * Available values: + * + * - [[SSLProtocol.TLSv12]] + * - [[SSLProtocol.TLSv11]] + * - [[SSLProtocol.TLSv1]] + * - [[SSLProtocol.TLS]] + * - [[SSLProtocol.SSLv3]] (prefer only for older JVMs) + * - [[SSLProtocol.SSLv2]] (prefer only for older JVMs, no longer available for Java 8) + * - [[SSLProtocol.SSL]] (prefer only for older JVMs) + */ +sealed trait SSLProtocol extends Serializable { + def id: String + + def getInstance(): Option[SSLContext] = + try Some(SSLContext.getInstance(id)) + catch { + case _: NoSuchAlgorithmException => + None + } +} + +object SSLProtocol { + + @throws(classOf[BadValue]) + def apply(id: String): SSLProtocol = { + val algorithm = id match { + case TLSv12.id => TLSv12 + case TLSv11.id => TLSv11 + case TLSv1.id => TLSv1 + case TLS.id => TLS + case SSLv3.id => SSLv3 + case SSLv2.id => SSLv2 + case SSL.id => SSL + case _ => + throw new BadValue("kafka.ssl.enabled.protocols", s"Invalid value: $id") + } + + algorithm.getInstance() match { + case Some(_) => algorithm + case None => + throw new BadValue("kafka.ssl.enabled.protocols", s"Unsupported SSL protocol: $id") + } + } + + case object TLSv12 extends SSLProtocol { + val id = "TLSv1.2" + } + + case object TLSv11 extends SSLProtocol { + val id = "TLSv1.1" + } + + case object TLSv1 extends SSLProtocol { + val id = "TLSv1" + } + + case object TLS extends SSLProtocol { + val id = "TLS" + } + + /** WARNING: deprecated, might not work on recent versions + * of the JVM. Prefer TLS. + */ + case object SSLv3 extends SSLProtocol { + val id = "SSLv3" + } + + /** WARNING: deprecated, might not work on recent versions + * of the JVM. Prefer TLS. + */ + case object SSLv2 extends SSLProtocol { + val id = "SSLv2" + } + + /** WARNING: deprecated, might not work on recent versions + * of the JVM. Prefer TLS. + */ + case object SSL extends SSLProtocol { + val id = "SSL" + } +} diff --git a/kafka-4.0.x/src/main/scala/monix/kafka/config/SecurityProtocol.scala b/kafka-4.0.x/src/main/scala/monix/kafka/config/SecurityProtocol.scala new file mode 100644 index 00000000..39ccda9e --- /dev/null +++ b/kafka-4.0.x/src/main/scala/monix/kafka/config/SecurityProtocol.scala @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2014-2022 by The Monix Project Developers. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka.config + +import com.typesafe.config.ConfigException.BadValue + +/** The `security.protocol` setting for the Kafka Producer. + * + * Represents the protocol used to communicate with brokers. + * + * Valid values are: + * + * - [[SecurityProtocol.PLAINTEXT]] + * - [[SecurityProtocol.SSL]] + * - [[SecurityProtocol.SASL_PLAINTEXT]] + * - [[SecurityProtocol.SASL_SSL]] + */ +sealed trait SecurityProtocol extends Serializable { + def id: String +} + +object SecurityProtocol { + + @throws(classOf[BadValue]) + def apply(id: String): SecurityProtocol = + id match { + case PLAINTEXT.id => PLAINTEXT + case SSL.id => SSL + case SASL_PLAINTEXT.id => SASL_PLAINTEXT + case SASL_SSL.id => SASL_SSL + case _ => + throw new BadValue("kafka.security.protocol", s"Invalid value: $id") + } + + case object PLAINTEXT extends SecurityProtocol { + val id = "PLAINTEXT" + } + + case object SSL extends SecurityProtocol { + val id = "SSL" + } + + case object SASL_PLAINTEXT extends SecurityProtocol { + val id = "SASL_PLAINTEXT" + } + + case object SASL_SSL extends SecurityProtocol { + val id = "SASL_SSL" + } +} diff --git a/kafka-4.0.x/src/test/resources/logback-test.xml b/kafka-4.0.x/src/test/resources/logback-test.xml new file mode 100644 index 00000000..1e022eb2 --- /dev/null +++ b/kafka-4.0.x/src/test/resources/logback-test.xml @@ -0,0 +1,25 @@ + + + + %d{yyyyMMdd-HH:mm:ss.SSSZ} [%thread] %-5level %logger - %msg%n + + + + + + + + + + + + + + + + + + + + + diff --git a/kafka-4.0.x/src/test/scala/monix/kafka/ConfigTest.scala b/kafka-4.0.x/src/test/scala/monix/kafka/ConfigTest.scala new file mode 100644 index 00000000..ec1565bf --- /dev/null +++ b/kafka-4.0.x/src/test/scala/monix/kafka/ConfigTest.scala @@ -0,0 +1,37 @@ +package monix.kafka + +import org.scalatest.FunSuite + +class ConfigTest extends FunSuite { + test("overwrite properties with values from producer config") { + val config = + KafkaProducerConfig.default + .copy(bootstrapServers = List("localhost:9092"), properties = Map("bootstrap.servers" -> "127.0.0.1:9092")) + + assert( + config.toProperties.getProperty("bootstrap.servers") == "localhost:9092" + ) + } + + test("overwrite properties with values from consumer config") { + val config = + KafkaConsumerConfig.default + .copy(bootstrapServers = List("localhost:9092"), properties = Map("bootstrap.servers" -> "127.0.0.1:9092")) + + assert( + config.toProperties.getProperty("bootstrap.servers") == "localhost:9092" + ) + } + + test("convert to Java map from producer config and filter null values") { + val config = KafkaProducerConfig.default.toJavaMap + + assert(!config.containsValue(null)) + } + + test("convert to Java map from consumer config and filter null values") { + val config = KafkaConsumerConfig.default.toJavaMap + + assert(!config.containsValue(null)) + } +} diff --git a/kafka-4.0.x/src/test/scala/monix/kafka/KafkaTestKit.scala b/kafka-4.0.x/src/test/scala/monix/kafka/KafkaTestKit.scala new file mode 100644 index 00000000..72c0b577 --- /dev/null +++ b/kafka-4.0.x/src/test/scala/monix/kafka/KafkaTestKit.scala @@ -0,0 +1,11 @@ +package monix.kafka + +import net.manub.embeddedkafka.EmbeddedKafka +import org.scalatest.Suite + +trait KafkaTestKit extends EmbeddedKafka { self: Suite => + + sys.addShutdownHook { + EmbeddedKafka.stop() + } +} diff --git a/kafka-4.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala b/kafka-4.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala new file mode 100644 index 00000000..56cdf168 --- /dev/null +++ b/kafka-4.0.x/src/test/scala/monix/kafka/MergeByCommitCallbackTest.scala @@ -0,0 +1,84 @@ +package monix.kafka + +import monix.eval.Task +import monix.kafka.config.AutoOffsetReset +import monix.reactive.Observable +import org.apache.kafka.clients.producer.ProducerRecord +import org.scalatest.{FunSuite, Matchers} + +import scala.concurrent.duration._ +import scala.concurrent.Await +import monix.execution.Scheduler.Implicits.global +import org.apache.kafka.common.TopicPartition +import org.scalacheck.Gen +import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks + +class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaCheckDrivenPropertyChecks with Matchers { + + val commitCallbacks: List[Commit] = List.fill(4)(new Commit { + override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit + + override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = + Task.unit + }) + + val committableOffsetsGen: Gen[CommittableOffset] = for { + partition <- Gen.posNum[Int] + offset <- Gen.posNum[Long] + commit <- Gen.oneOf(commitCallbacks) + } yield CommittableOffset(new TopicPartition("topic", partition), offset, commit) + + test("merge by commit callback works") { + forAll(Gen.nonEmptyListOf(committableOffsetsGen)) { offsets => + val partitions = offsets.map(_.topicPartition) + val received: List[CommittableOffsetBatch] = CommittableOffsetBatch.mergeByCommitCallback(offsets) + + received.foreach { batch => partitions should contain allElementsOf batch.offsets.keys } + + received.size should be <= 4 + } + } + + test("merge by commit callback for multiple consumers") { + withRunningKafka { + val count = 10000 + val topicName = "monix-kafka-merge-by-commit" + + val producerCfg = KafkaProducerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + clientId = "monix-kafka-1-0-producer-test" + ) + + val producer = KafkaProducerSink[String, String](producerCfg, io) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = Observable + .range(0, 4) + .mergeMap(i => createConsumer(i.toInt, topicName).take(500)) + .bufferTumbling(2000) + .map(CommittableOffsetBatch.mergeByCommitCallback) + .map { offsetBatches => assert(offsetBatches.length == 4) } + .completedL + + Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) + } + } + + private def createConsumer(i: Int, topicName: String): Observable[CommittableOffset] = { + val cfg = KafkaConsumerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + groupId = s"kafka-tests-$i", + autoOffsetReset = AutoOffsetReset.Earliest + ) + + KafkaConsumerObservable + .manualCommit[String, String](cfg, List(topicName)) + .executeOn(io) + .map(_.committableOffset) + } +} diff --git a/kafka-4.0.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala b/kafka-4.0.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala new file mode 100644 index 00000000..bcf4fa67 --- /dev/null +++ b/kafka-4.0.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2014-2019 by its authors. Some rights reserved. + * See the project homepage at: https://github.com/monix/monix-kafka + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import monix.eval.Task +import monix.execution.Scheduler.Implicits.global +import monix.kafka.config.AutoOffsetReset +import monix.reactive.Observable +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.clients.producer.ProducerRecord +import org.scalatest.FunSuite + +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration._ + +class MonixKafkaTopicListTest extends FunSuite with KafkaTestKit { + val topicName = "monix-kafka-tests" + + val producerCfg = KafkaProducerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + clientId = "monix-kafka-1-0-producer-test" + ) + + val consumerCfg = KafkaConsumerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + groupId = "kafka-tests", + clientId = "monix-kafka-1-0-consumer-test", + autoOffsetReset = AutoOffsetReset.Earliest + ) + + test("publish one message when subscribed to topics list") { + + withRunningKafka { + val producer = KafkaProducer[String, String](producerCfg, io) + + val consumerTask = + KafkaConsumerObservable.createConsumer[String, String](consumerCfg, List(topicName)).executeOn(io) + val consumer = Await.result(consumerTask.runToFuture, 60.seconds) + + try { + // Publishing one message + val send = producer.send(topicName, "my-message") + Await.result(send.runToFuture, 30.seconds) + + val records = consumer.poll(java.time.Duration.ofMillis(10.seconds.toMillis)).asScala.map(_.value()).toList + assert(records === List("my-message")) + } finally { + Await.result(producer.close().runToFuture, Duration.Inf) + consumer.close() + } + } + } + + test("listen for one message when subscribed to topics list") { + withRunningKafka { + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable[String, String](consumerCfg, List(topicName)).executeOn(io) + try { + // Publishing one message + val send = producer.send(topicName, "test-message") + Await.result(send.runToFuture, 30.seconds) + + val first = consumer.take(1).map(_.value()).firstL + val result = Await.result(first.runToFuture, 30.seconds) + assert(result === "test-message") + } finally { + Await.result(producer.close().runToFuture, Duration.Inf) + } + } + } + + test("full producer/consumer test when subscribed to topics list") { + withRunningKafka { + val count = 10000 + + val producer = KafkaProducerSink[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable[String, String](consumerCfg, List(topicName)).executeOn(io).take(count) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = consumer + .map(_.value()) + .toListL + + val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) + assert(result.map(_.toInt).sum === (0 until count).sum) + } + } + + test("manual commit consumer test when subscribed to topics list") { + withRunningKafka { + + val count = 10000 + val topicName = "monix-kafka-manual-commit-tests" + + val producer = KafkaProducerSink[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](consumerCfg, List(topicName)) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = consumer + .executeOn(io) + .bufferTumbling(count) + .map { messages => messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset)) } + .mapEval { case (values, batch) => Task.shift *> batch.commitSync().map(_ => values -> batch.offsets) } + .headL + + val ((result, offsets), _) = + Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) + + val properOffsets = Map(new TopicPartition(topicName, 0) -> 10000) + assert(result.map(_.toInt).sum === (0 until count).sum && offsets === properOffsets) + } + } + + test("publish to closed producer when subscribed to topics list") { + withRunningKafka { + val producer = KafkaProducer[String, String](producerCfg, io) + val sendTask = producer.send(topicName, "test-message") + + val result = for { + //Force creation of producer + s1 <- producer.send(topicName, "test-message-1") + res <- Task.parZip2(producer.close(), Task.parSequence(List.fill(10)(sendTask)).attempt) + (_, s2) = res + s3 <- sendTask + } yield (s1, s2, s3) + + val (first, second, third) = Await.result(result.runToFuture, 60.seconds) + assert(first.isDefined && second.isRight && third.isEmpty) + } + } +} diff --git a/kafka-4.0.x/src/test/scala/monix/kafka/MonixKafkaTopicRegexTest.scala b/kafka-4.0.x/src/test/scala/monix/kafka/MonixKafkaTopicRegexTest.scala new file mode 100644 index 00000000..419ab5b3 --- /dev/null +++ b/kafka-4.0.x/src/test/scala/monix/kafka/MonixKafkaTopicRegexTest.scala @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2014-2019 by its authors. Some rights reserved. + * See the project homepage at: https://github.com/monix/monix-kafka + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix.kafka + +import monix.eval.Task +import monix.execution.Scheduler.Implicits.global +import monix.kafka.config.AutoOffsetReset +import monix.reactive.Observable +import org.apache.kafka.clients.producer.ProducerRecord +import org.scalatest.FunSuite + +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration._ + +class MonixKafkaTopicRegexTest extends FunSuite with KafkaTestKit { + val topicsRegex = "monix-kafka-tests-.*".r + val topicMatchingRegex = "monix-kafka-tests-anything" + + val producerCfg = KafkaProducerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + clientId = "monix-kafka-1-0-producer-test" + ) + + val consumerCfg = KafkaConsumerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + groupId = "kafka-tests", + clientId = "monix-kafka-1-0-consumer-test", + autoOffsetReset = AutoOffsetReset.Earliest + ) + + test("publish one message when subscribed to topics regex") { + withRunningKafka { + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumerTask = KafkaConsumerObservable.createConsumer[String, String](consumerCfg, topicsRegex).executeOn(io) + val consumer = Await.result(consumerTask.runToFuture, 60.seconds) + + try { + // Publishing one message + val send = producer.send(topicMatchingRegex, "my-message") + Await.result(send.runToFuture, 30.seconds) + + val records = consumer.poll(java.time.Duration.ofMillis(10.seconds.toMillis)).asScala.map(_.value()).toList + assert(records === List("my-message")) + } finally { + Await.result(producer.close().runToFuture, Duration.Inf) + consumer.close() + } + } + } + + test("listen for one message when subscribed to topics regex") { + withRunningKafka { + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable[String, String](consumerCfg, topicsRegex).executeOn(io) + try { + // Publishing one message + val send = producer.send(topicMatchingRegex, "test-message") + Await.result(send.runToFuture, 30.seconds) + + val first = consumer.take(1).map(_.value()).firstL + val result = Await.result(first.runToFuture, 30.seconds) + assert(result === "test-message") + } finally { + Await.result(producer.close().runToFuture, Duration.Inf) + } + } + } + + test("full producer/consumer test when subscribed to topics regex") { + withRunningKafka { + val count = 10000 + + val producer = KafkaProducerSink[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable[String, String](consumerCfg, topicsRegex).executeOn(io).take(count) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicMatchingRegex, "obs", msg.toString)) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = consumer + .map(_.value()) + .toListL + + val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) + assert(result.map(_.toInt).sum === (0 until count).sum) + } + } +} diff --git a/kafka-4.0.x/src/test/scala/monix/kafka/PollHeartbeatTest.scala b/kafka-4.0.x/src/test/scala/monix/kafka/PollHeartbeatTest.scala new file mode 100644 index 00000000..2be9232e --- /dev/null +++ b/kafka-4.0.x/src/test/scala/monix/kafka/PollHeartbeatTest.scala @@ -0,0 +1,236 @@ +package monix.kafka + +import monix.eval.Task +import monix.kafka.config.AutoOffsetReset +import monix.reactive.Observable +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import monix.execution.Scheduler.Implicits.global +import org.scalactic.source +import org.scalatest.FunSuite +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.duration._ + +class PollHeartbeatTest extends FunSuite with KafkaTestKit with ScalaFutures { + + val topicName = "monix-kafka-tests" + + override implicit val patienceConfig: PatienceConfig = PatienceConfig(30.seconds, 100.milliseconds) + + val producerCfg: KafkaProducerConfig = KafkaProducerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + clientId = "monix-kafka-1-0-producer-test" + ) + + val consumerCfg: KafkaConsumerConfig = KafkaConsumerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + groupId = "kafka-tests", + clientId = "monix-kafka-1-0-consumer-test", + autoOffsetReset = AutoOffsetReset.Earliest + ) + + test("slow downstream with small poll heartbeat and manual async commit keeps the consumer assignment") { + withRunningKafka { + + val count = 250 + val topicName = "monix-kafka-manual-commit-tests" + val delay = 200.millis + val pollHeartbeat = 2.millis + val fastPollHeartbeatConfig = + consumerCfg.copy(maxPollInterval = 200.millis, maxPollRecords = 1).withPollHeartBeatRate(pollHeartbeat) + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](fastPollHeartbeatConfig, List(topicName)) + + val pushT = Observable + .fromIterable(1 to count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEval(producer.send) + .lastL + + val listT = consumer + .executeOn(io) + .doOnNextF { committableMessage => + Task.sleep(delay) *> + CommittableOffsetBatch(Seq(committableMessage.committableOffset)).commitAsync().executeAsync + } + .take(count) + .toListL + + val (committableMessages, _) = Task.parZip2(listT.executeAsync, pushT.executeAsync).runSyncUnsafe(100.seconds) + val CommittableMessage(lastRecord, lastCommittableOffset) = committableMessages.last + assert(pollHeartbeat * 10 < delay) + assert((1 to count).sum === committableMessages.map(_.record.value().toInt).sum) + assert(lastRecord.value().toInt === count) + assert(count === lastCommittableOffset.offset) + assert(new TopicPartition(topicName, 0) === lastCommittableOffset.topicPartition) + } + } + + test("auto committable consumer with slow processing doesn't cause rebalancing") { + withRunningKafka { + val count = 10000 + + val consumerConfig = consumerCfg.copy( + maxPollInterval = 200.millis, + heartbeatInterval = 10.millis, + maxPollRecords = 1 + ) + + val producer = KafkaProducerSink[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable[String, String](consumerConfig, List(topicName)).executeOn(io) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = consumer + .take(count) + .map(_.value()) + .bufferTumbling(count / 4) + .mapEval(s => Task.sleep(2.second) >> Task.delay(s)) + .flatMap(Observable.fromIterable) + .toListL + + val (result, _) = Task.parZip2(listT.executeAsync, pushT.delayExecution(1.second).executeAsync).runSyncUnsafe() + assert(result.map(_.toInt).sum === (0 until count).sum) + } + } + + test("slow committable downstream with small poll heartbeat does not cause rebalancing") { + withRunningKafka { + val totalRecords = 1000 + val topicName = "monix-kafka-manual-commit-tests" + val downstreamLatency = 40.millis + val pollHeartbeat = 1.millis + val maxPollInterval = 10.millis + val maxPollRecords = 1 + val fastPollHeartbeatConfig = + consumerCfg + .copy(maxPollInterval = 200.millis, maxPollRecords = maxPollRecords) + .withPollHeartBeatRate(pollHeartbeat) + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](fastPollHeartbeatConfig, List(topicName)) + + val pushT = Observable + .fromIterable(1 to totalRecords) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEval(producer.send) + .lastL + + val listT = consumer + .executeOn(io) + .mapEvalF { committableMessage => + val manualCommit = Task + .defer(committableMessage.committableOffset.commitAsync()) + .as(committableMessage) + Task.sleep(downstreamLatency) *> manualCommit + } + .take(totalRecords) + .toListL + + val (committableMessages, _) = + Task.parZip2(listT.executeAsync, pushT.delayExecution(100.millis).executeAsync).runSyncUnsafe() + val CommittableMessage(lastRecord, lastCommittableOffset) = committableMessages.last + assert(pollHeartbeat * 10 < downstreamLatency) + assert(pollHeartbeat < maxPollInterval) + assert(maxPollInterval < downstreamLatency) + assert((1 to totalRecords).sum === committableMessages.map(_.record.value().toInt).sum) + assert(lastRecord.value().toInt === totalRecords) + assert(totalRecords === lastCommittableOffset.offset) + assert(new TopicPartition(topicName, 0) === lastCommittableOffset.topicPartition) + } + } + + //unhappy scenario + test("slow committable downstream with small `maxPollInterval` and high `pollHeartBeat` causes consumer rebalance") { + withRunningKafka { + val totalRecords = 200 + val topicName = "monix-kafka-manual-commit-tests" + val downstreamLatency = 2.seconds + val pollHeartbeat = 15.seconds + val maxPollInterval = 100.millis + val fastPollHeartbeatConfig = + consumerCfg.copy(maxPollInterval = maxPollInterval, maxPollRecords = 1).withPollHeartBeatRate(pollHeartbeat) + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](fastPollHeartbeatConfig, List(topicName)) + + val pushT = Observable + .fromIterable(1 to totalRecords) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEval(producer.send) + .lastL + + val listT = consumer + .executeOn(io) + .mapEvalF { committableMessage => + val manualCommit = Task + .defer(committableMessage.committableOffset.commitAsync()) + .as(committableMessage) + Task.sleep(downstreamLatency) *> manualCommit + } + .take(totalRecords) + .toListL + + assert(pollHeartbeat > downstreamLatency) + assert(maxPollInterval < downstreamLatency) + assert(fastPollHeartbeatConfig.pollHeartbeatRate === pollHeartbeat) + + val t = Task.parZip2(listT.executeAsync, pushT.executeAsync).map(_._1) + whenReady(t.runToFuture.failed) { ex => + assert(ex.getMessage.contains("the group has already rebalanced and assigned the partitions to another member")) + } + + } + } + + test("super slow committable downstream causes consumer rebalance") { + withRunningKafka { + val totalRecords = 3 + val topicName = "monix-kafka-manual-commit-tests" + val downstreamLatency = 55.seconds + val pollHeartbeat = 5.seconds + val maxPollInterval = 4.seconds + // the downstreamLatency is higher than the `maxPollInterval` + // but smaller than `pollHeartBeat`, kafka will trigger rebalance + // and the consumer will be kicked out of the consumer group. + val fastPollHeartbeatConfig = + consumerCfg.copy(maxPollInterval = maxPollInterval, maxPollRecords = 1).withPollHeartBeatRate(pollHeartbeat) + + val producer = KafkaProducer[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable.manualCommit[String, String](fastPollHeartbeatConfig, List(topicName)) + + val pushT = Observable + .fromIterable(1 to totalRecords) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .mapEval(producer.send) + .lastL + + val listT = consumer + .executeOn(io) + .doOnNextF { committableMessage => + val manualCommit = Task.defer(committableMessage.committableOffset.commitAsync()) + Task.sleep(downstreamLatency) *> manualCommit + } + .take(totalRecords) + .toListL + + assert(pollHeartbeat * 10 < downstreamLatency) + assert(maxPollInterval * 10 < downstreamLatency) + assert(fastPollHeartbeatConfig.pollHeartbeatRate === pollHeartbeat) + + implicit val patienceConfig: PatienceConfig = PatienceConfig(30.seconds, 100.milliseconds) + + val t = Task.parZip2(listT.executeAsync, pushT.executeAsync).map(_._1) + whenReady(t.runToFuture.failed) { ex => + assert(ex.getMessage.contains("the group has already rebalanced and assigned the partitions to another member")) + }(PatienceConfig(200.seconds, 1.seconds), source.Position.here) + } + } + +} diff --git a/kafka-4.0.x/src/test/scala/monix/kafka/SerializationTest.scala b/kafka-4.0.x/src/test/scala/monix/kafka/SerializationTest.scala new file mode 100644 index 00000000..1a80c91d --- /dev/null +++ b/kafka-4.0.x/src/test/scala/monix/kafka/SerializationTest.scala @@ -0,0 +1,135 @@ +package monix.kafka + +import java.util + +import monix.eval.Task +import monix.kafka.config.AutoOffsetReset +import monix.reactive.Observable +import org.apache.kafka.clients.producer.ProducerRecord +import org.scalatest.FunSuite +import org.apache.kafka.common.serialization.{Serializer => KafkaSerializer} +import org.apache.kafka.common.serialization.{Deserializer => KafkaDeserializer} + +import scala.concurrent.duration._ +import scala.concurrent.Await +import monix.execution.Scheduler.Implicits.global +import monix.execution.exceptions.DummyException + +class SerializationTest extends FunSuite with KafkaTestKit { + + val producerCfg = KafkaProducerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + clientId = "monix-kafka-1-0-serialization-test" + ) + + val consumerCfg = KafkaConsumerConfig.default.copy( + bootstrapServers = List("127.0.0.1:6001"), + groupId = "kafka-tests", + clientId = "monix-kafka-1-0-serialization-test", + autoOffsetReset = AutoOffsetReset.Earliest + ) + + test("serialization/deserialization using kafka.common.serialization") { + withRunningKafka { + val topicName = "monix-kafka-serialization-tests" + val count = 10000 + + implicit val serializer: KafkaSerializer[A] = new ASerializer + implicit val deserializer: KafkaDeserializer[A] = new ADeserializer + + val producer = KafkaProducerSink[String, A](producerCfg, io) + val consumer = KafkaConsumerObservable[String, A](consumerCfg, List(topicName)).executeOn(io).take(count) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", A(msg.toString))) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = consumer + .map(_.value()) + .toListL + + val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 60.seconds) + assert(result.map(_.value.toInt).sum === (0 until count).sum) + } + } + + test("allow to fail the stream if serialization throws") { + withRunningKafka { + val topicName = "monix-kafka-serialization-failing-tests" + val dummy = DummyException("boom") + + implicit val serializer: KafkaSerializer[A] = new AFailingSerializer + + val producer = KafkaProducerSink[String, A](producerCfg, io, (_: Throwable) => Task.raiseError(dummy)) + + val pushT = Observable + .evalOnce(new ProducerRecord(topicName, "obs", A(1.toString))) + .bufferIntrospective(1024) + .consumeWith(producer) + + assertThrows[DummyException] { + Await.result(pushT.runToFuture, 60.seconds) + } + } + } + + test("allow to recover from serialization errors") { + withRunningKafka { + val topicName = "monix-kafka-serialization-continuing-tests" + val count = 100 + + implicit val serializer: KafkaSerializer[A] = new AHalfFailingSerializer + implicit val deserializer: KafkaDeserializer[A] = new ADeserializer + + val producer = KafkaProducerSink[String, A](producerCfg, io) + val consumer = KafkaConsumerObservable[String, A](consumerCfg, List(topicName)).executeOn(io).take(count / 2) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", A(msg.toString))) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = consumer + .map(_.value()) + .toListL + + val (result, _) = Await.result(Task.parZip2(listT, pushT).runToFuture, 70.seconds) + assert(result.map(_.value.toInt).sum === (0 until count).filter(_ % 2 == 0).sum) + } + } + +} + +case class A(value: String) + +class ASerializer extends KafkaSerializer[A] { + override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () + + override def serialize(topic: String, data: A): Array[Byte] = + if (data == null) null else data.value.getBytes + + override def close(): Unit = () +} + +class ADeserializer extends KafkaDeserializer[A] { + override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () + + override def deserialize(topic: String, data: Array[Byte]): A = if (data == null) null else A(new String(data)) + + override def close(): Unit = () +} + +class AFailingSerializer extends ASerializer { + override def serialize(topic: String, data: A): Array[Byte] = throw new RuntimeException("fail") +} + +class AHalfFailingSerializer extends ASerializer { + + override def serialize(topic: String, data: A): Array[Byte] = { + if (data.value.toInt % 2 == 0) super.serialize(topic, data) + else throw new RuntimeException("fail") + } +} diff --git a/kafka-4.0.x/src/test/scala/monix/kafka/package.scala b/kafka-4.0.x/src/test/scala/monix/kafka/package.scala new file mode 100644 index 00000000..b7b91f4d --- /dev/null +++ b/kafka-4.0.x/src/test/scala/monix/kafka/package.scala @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2014-2019 by its authors. Some rights reserved. + * See the project homepage at: https://github.com/monix/monix-kafka + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monix + +import monix.execution.Scheduler + +package object kafka { + + /** I/O scheduler meant for tests. */ + lazy val io = Scheduler.io("monix-kafka-tests") +}