Skip to content

Commit b29a9f2

Browse files
authored
Merge pull request #159 from bohemima/master
Add support to set queue options, either globally using queue.php or by implementing an interface
2 parents 47c04da + 9fe8378 commit b29a9f2

File tree

4 files changed

+96
-44
lines changed

4 files changed

+96
-44
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ The format is based on [Keep a Changelog][keepachangelog] and this project adher
77
### Added
88

99
- gRPC client support
10+
- Added support to set queue options globally or per job [#158]
1011

1112
## Unreleased
1213

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?php
2+
3+
namespace Spiral\RoadRunnerLaravel\Queue\Contract;
4+
5+
use Spiral\RoadRunner\Jobs\OptionsInterface;
6+
7+
interface HasQueueOptions
8+
{
9+
public function queueOptions(): OptionsInterface;
10+
}

src/Queue/RoadRunnerConnector.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public function connect(array $config): Queue
2626
new Jobs($rpc),
2727
$rpc,
2828
$config['queue'],
29+
$config['options'] ?? [],
2930
);
3031
}
3132
}

src/Queue/RoadRunnerQueue.php

Lines changed: 84 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,22 @@
1212
use RoadRunner\Jobs\DTO\V1\Stats;
1313
use Spiral\Goridge\RPC\RPCInterface;
1414
use Spiral\RoadRunner\Jobs\Jobs;
15+
use Spiral\RoadRunner\Jobs\KafkaOptions;
16+
use Spiral\RoadRunner\Jobs\Options;
17+
use Spiral\RoadRunner\Jobs\OptionsInterface;
18+
use Spiral\RoadRunner\Jobs\Queue\Driver;
1519
use Spiral\RoadRunner\Jobs\QueueInterface;
20+
use Spiral\RoadRunnerLaravel\Queue\Contract\HasQueueOptions;
1621

1722
final class RoadRunnerQueue extends Queue implements QueueContract
1823
{
1924
public function __construct(
2025
private readonly Jobs $jobs,
2126
private readonly RPCInterface $rpc,
2227
private readonly string $default = 'default',
23-
) {}
28+
private readonly array $defaultOptions = [],
29+
) {
30+
}
2431

2532
public function push($job, $data = '', $queue = null): string
2633
{
@@ -29,13 +36,13 @@ public function push($job, $data = '', $queue = null): string
2936
$this->createPayload($job, $queue, $data),
3037
$queue,
3138
null,
32-
fn($payload, $queue) => $this->pushRaw($payload, $queue),
39+
fn($payload, $queue) => $this->pushRaw($payload, $queue, $this->getJobOverrideOptions($job)),
3340
);
3441
}
3542

3643
public function pushRaw($payload, $queue = null, array $options = []): string
3744
{
38-
$queue = $this->getQueue($queue);
45+
$queue = $this->getQueue($queue, $options);
3946

4047
$task = $queue->dispatch(
4148
$queue
@@ -45,40 +52,74 @@ public function pushRaw($payload, $queue = null, array $options = []): string
4552
return $task->getId();
4653
}
4754

48-
public function later($delay, $job, $data = '', $queue = null): string
55+
private function getQueue(?string $queue = null, array $options = []): QueueInterface
4956
{
50-
return $this->enqueueUsing(
51-
$job,
52-
$this->createPayload($job, $queue, $data),
53-
$queue,
54-
$delay,
55-
fn($payload, $queue) => $this->laterRaw($delay, $payload, $queue),
56-
);
57+
$queue = $this->jobs->connect($queue ?? $this->default, $this->getQueueOptions($options));
58+
59+
if (!$this->getStats($queue->getName())->getReady()) {
60+
$queue->resume();
61+
}
62+
63+
return $queue;
5764
}
5865

59-
public function pop($queue = null): void
66+
private function getQueueOptions(array $overrides = []): OptionsInterface
6067
{
61-
throw new \BadMethodCallException('Pop is not supported');
68+
$config = array_merge($this->defaultOptions, $overrides);
69+
$options = new Options(
70+
$config['delay'] ?? OptionsInterface::DEFAULT_DELAY,
71+
$config['priority'] ?? OptionsInterface::DEFAULT_PRIORITY,
72+
$config['auto_ack'] ?? OptionsInterface::DEFAULT_AUTO_ACK,
73+
);
74+
75+
return match ($config['driver'] ?? null) {
76+
Driver::Kafka => KafkaOptions::from($options)
77+
->withTopic($config['topic'] ?? ($this->defaultOptions['topic'] ?? '')),
78+
default => $options,
79+
};
6280
}
6381

64-
public function size($queue = null): int
82+
private function getStats(?string $queue = null): Stat
6583
{
66-
$stats = $this->getStats($queue);
84+
$queue ??= $this->default;
6785

68-
return $stats->getActive() + $stats->getDelayed();
86+
$stats = $this->rpc->call('jobs.Stat', new Stats(), Stats::class)->getStats();
87+
88+
/** @var Stat $stat */
89+
foreach ($stats as $stat) {
90+
if ($stat->getPipeline() === $queue) {
91+
return $stat;
92+
}
93+
}
94+
95+
return new Stat();
6996
}
7097

71-
/**
72-
* Get the "available at" UNIX timestamp.
73-
* @param mixed $delay
74-
*/
75-
protected function availableAt($delay = 0): int
98+
private function getJobOverrideOptions(string|object $job): array
7699
{
77-
$delay = $this->parseDateInterval($delay);
100+
if (is_string($job) && class_exists($job)) {
101+
$job = app($job);
102+
}
78103

79-
return $delay instanceof \DateTimeInterface
80-
? Carbon::parse($delay)->diffInSeconds()
81-
: $delay;
104+
if ($job instanceof HasQueueOptions) {
105+
$options = $job->queueOptions();
106+
if ($options instanceof Options) {
107+
return $options->toArray();
108+
}
109+
}
110+
111+
return [];
112+
}
113+
114+
public function later($delay, $job, $data = '', $queue = null): string
115+
{
116+
return $this->enqueueUsing(
117+
$job,
118+
$this->createPayload($job, $queue, $data),
119+
$queue,
120+
$delay,
121+
fn($payload, $queue) => $this->laterRaw($delay, $payload, $queue, $this->getJobOverrideOptions($job)),
122+
);
82123
}
83124

84125
/**
@@ -88,8 +129,9 @@ private function laterRaw(
88129
\DateTimeInterface|\DateInterval|int $delay,
89130
array $payload,
90131
?string $queue = null,
132+
array $options = []
91133
): string {
92-
$queue = $this->getQueue($queue);
134+
$queue = $this->getQueue($queue, $options);
93135

94136
$task = $queue->dispatch(
95137
$queue
@@ -101,30 +143,28 @@ private function laterRaw(
101143
return $task->getId();
102144
}
103145

104-
private function getQueue(?string $queue = null): QueueInterface
146+
/**
147+
* Get the "available at" UNIX timestamp.
148+
* @param mixed $delay
149+
*/
150+
protected function availableAt($delay = 0): int
105151
{
106-
$queue = $this->jobs->connect($queue ?? $this->default);
107-
108-
if (!$this->getStats($queue->getName())->getReady()) {
109-
$queue->resume();
110-
}
152+
$delay = $this->parseDateInterval($delay);
111153

112-
return $queue;
154+
return $delay instanceof \DateTimeInterface
155+
? Carbon::parse($delay)->diffInSeconds()
156+
: $delay;
113157
}
114158

115-
private function getStats(?string $queue = null): Stat
159+
public function pop($queue = null): void
116160
{
117-
$queue ??= $this->default;
118-
119-
$stats = $this->rpc->call('jobs.Stat', new Stats(), Stats::class)->getStats();
161+
throw new \BadMethodCallException('Pop is not supported');
162+
}
120163

121-
/** @var Stat $stat */
122-
foreach ($stats as $stat) {
123-
if ($stat->getPipeline() === $queue) {
124-
return $stat;
125-
}
126-
}
164+
public function size($queue = null): int
165+
{
166+
$stats = $this->getStats($queue);
127167

128-
return new Stat();
168+
return $stats->getActive() + $stats->getDelayed();
129169
}
130170
}

0 commit comments

Comments
 (0)