Skip to content

Commit 6d58508

Browse files
authored
Merge pull request #5 from ensi-platform/task-82065
#82065 signals
2 parents 299169e + 327f2b5 commit 6d58508

File tree

4 files changed

+43
-3
lines changed

4 files changed

+43
-3
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ class ConsumeMessageJob implements ShouldQueue
124124

125125
```
126126

127+
### Handling signals
128+
129+
`php artisan kafka:consume ...` command can be configured to gracefully stop after receiving some OS signals.
130+
Such signals can be set in the `stop_signals` key of the package config, e.g `'stop_signals' => [SIGINT, SIGQUIT]`.
131+
You can use any of the constants defined by the pcntl extension https://www.php.net/manual/en/pcntl.constants.php
132+
127133
## Testing
128134

129135
```bash

composer.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@
5050
"test-coverage": "vendor/bin/phpunit --coverage-html coverage"
5151
},
5252
"config": {
53-
"sort-packages": true
53+
"sort-packages": true,
54+
"allow-plugins": {
55+
"pestphp/pest-plugin": true
56+
}
5457
},
5558
"extra": {
5659
"laravel": {

src/Commands/KafkaConsumeCommand.php

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
use Ensi\LaravelPhpRdKafkaConsumer\HighLevelConsumer;
77
use Ensi\LaravelPhpRdKafkaConsumer\ProcessorData;
88
use Illuminate\Console\Command;
9+
use Symfony\Component\Console\Command\SignalableCommandInterface;
910
use Throwable;
1011

11-
class KafkaConsumeCommand extends Command
12+
class KafkaConsumeCommand extends Command implements SignalableCommandInterface
1213
{
1314
/**
1415
* The name and signature of the console command.
@@ -26,11 +27,32 @@ class KafkaConsumeCommand extends Command
2627
*/
2728
protected $description = 'Consume concrete topic';
2829

30+
protected ?HighLevelConsumer $consumer = null;
31+
32+
public function getStopSignalsFromConfig(): array
33+
{
34+
return config('kafka-consumer.stop_signals', []);
35+
}
36+
37+
public function getSubscribedSignals(): array
38+
{
39+
return $this->getStopSignalsFromConfig();
40+
}
41+
42+
public function handleSignal(int $signal): void
43+
{
44+
if ($this->consumer && in_array($signal, $this->getStopSignalsFromConfig())) {
45+
$this->line("Stopping the consumer...");
46+
$this->consumer->forceStop();
47+
}
48+
}
49+
2950
/**
3051
* Execute the console command.
3152
*/
3253
public function handle(HighLevelConsumer $highLevelConsumer): int
3354
{
55+
$this->consumer = $highLevelConsumer;
3456
$topic = $this->argument('topic');
3557
$consumer = $this->argument('consumer');
3658
$availableConsumers = array_keys(config('kafka.consumers', []));

src/HighLevelConsumer.php

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ class HighLevelConsumer
1414
{
1515
protected ?KafkaConsumer $consumer;
1616

17+
protected $forceStop = false;
18+
1719
public function __construct(
1820
protected KafkaManager $kafkaManager,
1921
protected Pipeline $pipeline
@@ -29,6 +31,13 @@ public function for(?string $consumerName): static
2931
return $this;
3032
}
3133

34+
public function forceStop(): static
35+
{
36+
$this->forceStop = true;
37+
38+
return $this;
39+
}
40+
3241
/**
3342
* @throws KafkaException
3443
* @throws RdKafkaException
@@ -115,6 +124,6 @@ protected function shouldBeStopped(int|float $startTime, int $eventsProcessed, C
115124
return true;
116125
}
117126

118-
return false;
127+
return $this->forceStop;
119128
}
120129
}

0 commit comments

Comments
 (0)