Skip to content

Commit 27f6bdc

Browse files
authored
Merge pull request #19 from L3o-pold/handle_multiqueue_processing
Handle multi queue processing
2 parents eeb63e7 + d3d7975 commit 27f6bdc

File tree

3 files changed

+30
-14
lines changed

3 files changed

+30
-14
lines changed

src/LaravelQueueKafkaServiceProvider.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ protected function registerDependencies()
5252
});
5353

5454
$this->app->bind('queue.kafka.consumer', function ($app, $parameters) {
55-
return new \RdKafka\KafkaConsumer($parameters['conf']);
55+
return new \RdKafka\Consumer($parameters['conf']);
5656
});
5757
}
5858

src/Queue/Jobs/KafkaJob.php

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Illuminate\Support\Str;
1212
use Rapide\LaravelQueueKafka\Exceptions\QueueKafkaException;
1313
use Rapide\LaravelQueueKafka\Queue\KafkaQueue;
14+
use RdKafka\ConsumerTopic;
1415
use RdKafka\Message;
1516

1617
class KafkaJob extends Job implements JobContract
@@ -30,22 +31,29 @@ class KafkaJob extends Job implements JobContract
3031
*/
3132
protected $message;
3233

34+
/**
35+
* @var ConsumerTopic
36+
*/
37+
protected $topic;
38+
3339
/**
3440
* KafkaJob constructor.
3541
*
3642
* @param Container $container
3743
* @param KafkaQueue $connection
3844
* @param Message $message
39-
* @param $connectionName
40-
* @param $queue
45+
* @param string $connectionName
46+
* @param string $queue
47+
* @param ConsumerTopic $topic
4148
*/
42-
public function __construct(Container $container, KafkaQueue $connection, Message $message, $connectionName, $queue)
49+
public function __construct(Container $container, KafkaQueue $connection, Message $message, $connectionName, $queue, ConsumerTopic $topic)
4350
{
4451
$this->container = $container;
4552
$this->connection = $connection;
4653
$this->message = $message;
4754
$this->connectionName = $connectionName;
4855
$this->queue = $queue;
56+
$this->topic = $topic;
4957
}
5058

5159
/**
@@ -102,7 +110,7 @@ public function delete()
102110
{
103111
try {
104112
parent::delete();
105-
$this->connection->getConsumer()->commitAsync($this->message);
113+
$this->topic->offsetStore($this->message->partition, $this->message->offset);
106114
} catch (\RdKafka\Exception $exception) {
107115
throw new QueueKafkaException('Could not delete job from the queue', 0, $exception);
108116
}

src/Queue/KafkaQueue.php

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,24 @@ class KafkaQueue extends Queue implements QueueContract
3333
*/
3434
private $producer;
3535
/**
36-
* @var \RdKafka\KafkaConsumer
36+
* @var \RdKafka\Consumer
3737
*/
3838
private $consumer;
3939
/**
4040
* @var array
4141
*/
42-
private $subscribedQueueNames = [];
42+
private $topics = [];
43+
/**
44+
* @var array
45+
*/
46+
private $queues = [];
4347

4448
/**
4549
* @param \RdKafka\Producer $producer
4650
* @param \RdKafka\KafkaConsumer $consumer
4751
* @param array $config
4852
*/
49-
public function __construct(\RdKafka\Producer $producer, \RdKafka\KafkaConsumer $consumer, $config)
53+
public function __construct(\RdKafka\Producer $producer, \RdKafka\Consumer $consumer, $config)
5054
{
5155
$this->defaultQueue = $config['queue'];
5256
$this->sleepOnError = isset($config['sleep_on_error']) ? $config['sleep_on_error'] : 5;
@@ -140,12 +144,16 @@ public function pop($queue = null)
140144
{
141145
try {
142146
$queue = $this->getQueueName($queue);
143-
if (!in_array($queue, $this->subscribedQueueNames)) {
144-
$this->subscribedQueueNames[] = $queue;
145-
$this->consumer->subscribe($this->subscribedQueueNames);
147+
if (!array_key_exists($queue, $this->queues)) {
148+
$this->queues[$queue] = $this->consumer->newQueue();
149+
$topicConf = new \RdKafka\TopicConf();
150+
$topicConf->set('auto.offset.reset', 'largest');
151+
152+
$this->topics[$queue] = $this->consumer->newTopic($queue, $topicConf);
153+
$this->topics[$queue]->consumeQueueStart(0, RD_KAFKA_OFFSET_STORED, $this->queues[$queue]);
146154
}
147155

148-
$message = $this->consumer->consume(1000);
156+
$message = $this->queues[$queue]->consume(1000);
149157

150158
if ($message === null) {
151159
return null;
@@ -155,7 +163,7 @@ public function pop($queue = null)
155163
case RD_KAFKA_RESP_ERR_NO_ERROR:
156164
return new KafkaJob(
157165
$this->container, $this, $message,
158-
$this->connectionName, $queue ?: $this->defaultQueue
166+
$this->connectionName, $queue ?: $this->defaultQueue, $this->topics[$queue]
159167
);
160168
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
161169
case RD_KAFKA_RESP_ERR__TIMED_OUT:
@@ -255,7 +263,7 @@ protected function reportConnectionError($action, Exception $e)
255263
}
256264

257265
/**
258-
* @return \RdKafka\KafkaConsumer
266+
* @return \RdKafka\Consumer
259267
*/
260268
public function getConsumer()
261269
{

0 commit comments

Comments
 (0)