@@ -63,6 +63,11 @@ final class KafkaConsumerBuilder implements KafkaConsumerBuilderInterface
6363 */
6464 private $ consumeCallback ;
6565
66+ /**
67+ * @var callable
68+ */
69+ private $ logCallback ;
70+
6671 /**
6772 * @var callable
6873 */
@@ -247,6 +252,20 @@ public function withConsumeCallback(callable $consumeCallback): KafkaConsumerBui
247252 return $ that ;
248253 }
249254
255+ /**
256+ * Callback for log related events
257+ *
258+ * @param callable $logCallback
259+ * @return KafkaConsumerBuilderInterface
260+ */
261+ public function withLogCallback (callable $ logCallback ): KafkaConsumerBuilderInterface
262+ {
263+ $ that = clone $ this ;
264+ $ that ->logCallback = $ logCallback ;
265+
266+ return $ that ;
267+ }
268+
250269 /**
251270 * Set callback that is being called on offset commits
252271 *
@@ -293,7 +312,6 @@ public function build(): KafkaConsumerInterface
293312
294313 //set additional config
295314 $ this ->config ['group.id ' ] = $ this ->consumerGroup ;
296- $ this ->config ['enable.auto.offset.store ' ] = false ;
297315
298316 //create config
299317 $ kafkaConfig = new KafkaConfiguration (
@@ -309,6 +327,7 @@ public function build(): KafkaConsumerInterface
309327 //create RdConsumer
310328
311329 if (self ::CONSUMER_TYPE_LOW_LEVEL === $ this ->consumerType ) {
330+ $ this ->config ['enable.auto.offset.store ' ] = false ;
312331 if (null !== $ this ->consumeCallback ) {
313332 throw new KafkaConsumerBuilderException (
314333 sprintf (
@@ -328,6 +347,8 @@ public function build(): KafkaConsumerInterface
328347 );
329348 }
330349
350+ $ this ->config ['enable.auto.commit ' ] = false ;
351+
331352 $ rdKafkaConsumer = new RdKafkaHighLevelConsumer ($ kafkaConfig );
332353
333354 return new KafkaHighLevelConsumer ($ rdKafkaConsumer , $ kafkaConfig , $ this ->decoder );
@@ -349,6 +370,10 @@ private function registerCallbacks(KafkaConfiguration $conf): void
349370 $ conf ->setConsumeCb ($ this ->consumeCallback );
350371 }
351372
373+ if (null !== $ this ->logCallback ) {
374+ $ conf ->setLogCb ($ this ->logCallback );
375+ }
376+
352377 if (null !== $ this ->offsetCommitCallback ) {
353378 $ conf ->setOffsetCommitCb ($ this ->rebalanceCallback );
354379 }
0 commit comments