Skip to content

Commit 27de828

Browse files
committed
Handle timeout cases properly: do nothing
1 parent 648b09f commit 27de828

File tree

3 files changed

+6
-14
lines changed

3 files changed

+6
-14
lines changed

src/Commands/KafkaConsumeCommand.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public function handle(): int
5050
return 1;
5151
}
5252

53-
$consumeTimeout = $processorData['consume_timeout'] ?? 5000;
53+
$consumeTimeout = $processorData['consume_timeout'] ?? 20000;
5454

5555
$supportedProcessorTypes = ['action', 'job'];
5656
$processorType = $processorData['type'] ?? 'action';

src/Exceptions/KafkaConsumerTimedOutException.php

Lines changed: 0 additions & 10 deletions
This file was deleted.

src/HighLevelConsumer.php

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
use Greensight\LaravelPhpRdKafka\KafkaManager;
66
use Greensight\LaravelPhpRdKafkaConsumer\Exceptions\KafkaConsumerException;
7-
use Greensight\LaravelPhpRdKafkaConsumer\Exceptions\KafkaConsumerTimedOutException;
87
use RdKafka\Exception as RdKafkaException;
98
use RdKafka\KafkaConsumer;
109
use RdKafka\Message;
@@ -17,7 +16,7 @@ class HighLevelConsumer
1716
public function __construct(
1817
protected string $topicName,
1918
?string $consumerName,
20-
protected int $consumeTimeout = 5000,
19+
protected int $consumeTimeout = 20000,
2120
)
2221
{
2322
$manager = resolve(KafkaManager::class);
@@ -37,13 +36,16 @@ public function listen(string $processorClassName, string $processorType, string
3736
$message = $this->consumer->consume($this->consumeTimeout);
3837

3938
switch ($message->err) {
39+
4040
case RD_KAFKA_RESP_ERR_NO_ERROR:
4141
$this->executeProcessor($processorClassName, $processorType, $processorQueue, $message);
4242
$this->consumer->commitAsync($message);
4343
break;
4444

4545
case RD_KAFKA_RESP_ERR__TIMED_OUT:
46-
throw new KafkaConsumerTimedOutException('Kafka error: ' . $message->errstr());
46+
// This also happens when there is no new messages in the topic after the specified timeout: https://github.com/arnaud-lb/php-rdkafka/issues/343
47+
// We cannot differentiate broker timeout, poll timeout and eof timeout and are forced to keep on polling as a result.
48+
// When kafka broker goes back online the connection will mostly likely be reestablished.
4749
break;
4850

4951
default:

0 commit comments

Comments
 (0)