Skip to content

Commit 8911a40

Browse files
authored
Ensure node.Close and node.RemoveWorker properly stop local jobs (#15)
Before they get requeued.
1 parent 4c3370b commit 8911a40

File tree

6 files changed

+42
-27
lines changed

6 files changed

+42
-27
lines changed

pool/README.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ Pulse uses the [Jump Consistent Hash](https://arxiv.org/abs/1406.2294) algorithm
2020
to assign jobs to workers which provides a good balance between load balancing
2121
and worker assignment stability.
2222

23-
2423
```mermaid
2524
%%{init: {'themeVariables': { 'edgeLabelBackground': '#7A7A7A'}}}%%
2625
flowchart LR
@@ -216,12 +215,12 @@ flowchart TD
216215
end
217216
pr --1. DispatchJob--> no
218217
no --2. Add Job--> js
219-
js -.3. Job.-> ps
218+
js --3. Job--> ps
220219
ps --4. Add Job--> ws
221-
ws -.5. Job.-> r
220+
ws --5. Job--> r
222221
r --6. Start Job--> u
223222
r --7. Add Ack--> rs
224-
rs -.7. Ack.-> nr
223+
rs --7. Ack--> nr
225224
nr --8. Ack Add Job Event--> js
226225
227226
classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC;

pool/node.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) {
526526
}
527527

528528
// returnDispatchStatus returns the start job result to the caller.
529-
func (node *Node) returnDispatchStatus(ctx context.Context, ev *streaming.Event) {
529+
func (node *Node) returnDispatchStatus(_ context.Context, ev *streaming.Event) {
530530
node.lock.Lock()
531531
defer node.lock.Unlock()
532532

@@ -738,7 +738,7 @@ func (node *Node) deleteWorker(ctx context.Context, id string) error {
738738

739739
// workerStream retrieves the stream for a worker. It caches the result in the
740740
// workerStreams map. Caller is responsible for locking.
741-
func (node *Node) workerStream(ctx context.Context, id string) (*streaming.Stream, error) {
741+
func (node *Node) workerStream(_ context.Context, id string) (*streaming.Stream, error) {
742742
stream, ok := node.workerStreams[id]
743743
if !ok {
744744
s, err := streaming.NewStream(workerStreamName(id), node.rdb, soptions.WithStreamLogger(node.logger))

pool/node_test.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,13 @@ func TestRemoveWorkerThenShutdown(t *testing.T) {
7676
rdb = redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
7777
node = newTestNode(t, ctx, rdb, testName)
7878
worker = newTestWorker(t, ctx, node)
79+
handler = worker.handler.(*mockHandler)
7980
)
8081
defer cleanup(t, rdb, true, testName)
82+
assert.NoError(t, node.DispatchJob(ctx, testName, []byte("payload")))
83+
assert.Eventually(t, func() bool { return len(handler.jobs) == 1 }, max, delay)
8184
assert.NoError(t, node.RemoveWorker(ctx, worker))
85+
assert.Eventually(t, func() bool { return len(handler.jobs) == 0 }, max, delay)
8286
assert.NoError(t, node.Shutdown(ctx))
8387
}
8488

@@ -88,9 +92,14 @@ func TestClose(t *testing.T) {
8892
testName = strings.Replace(t.Name(), "/", "_", -1)
8993
rdb = redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: redisPwd})
9094
node = newTestNode(t, ctx, rdb, testName)
95+
worker = newTestWorker(t, ctx, node)
96+
handler = worker.handler.(*mockHandler)
9197
)
9298
defer cleanup(t, rdb, false, testName)
99+
assert.NoError(t, node.DispatchJob(ctx, testName, []byte("payload")))
100+
assert.Eventually(t, func() bool { return len(handler.jobs) == 1 }, max, delay)
93101
assert.NoError(t, node.Close(ctx))
102+
assert.Eventually(t, func() bool { return len(handler.jobs) == 0 }, max, delay)
94103
}
95104

96105
func newTestNode(t *testing.T, ctx context.Context, rdb *redis.Client, name string) *Node {
@@ -106,11 +115,11 @@ func newTestNode(t *testing.T, ctx context.Context, rdb *redis.Client, name stri
106115

107116
func newTestWorker(t *testing.T, ctx context.Context, node *Node) *Worker {
108117
t.Helper()
109-
wm := &workerMock{jobs: make(map[string]*Job)}
110-
wm.startFunc = func(job *Job) error { wm.jobs[job.Key] = job; return nil }
111-
wm.stopFunc = func(key string) error { delete(wm.jobs, key); return nil }
112-
wm.notifyFunc = func(payload []byte) error { return nil }
113-
worker, err := node.AddWorker(ctx, wm)
118+
handler := &mockHandler{jobs: make(map[string]*Job)}
119+
handler.startFunc = func(job *Job) error { handler.jobs[job.Key] = job; return nil }
120+
handler.stopFunc = func(key string) error { delete(handler.jobs, key); return nil }
121+
handler.notifyFunc = func(payload []byte) error { return nil }
122+
worker, err := node.AddWorker(ctx, handler)
114123
require.NoError(t, err)
115124
return worker
116125
}
@@ -150,16 +159,16 @@ func cleanup(t *testing.T, rdb *redis.Client, checkClean bool, testName string)
150159
assert.NoError(t, rdb.FlushDB(ctx).Err())
151160
}
152161

153-
type workerMock struct {
162+
type mockHandler struct {
154163
startFunc func(job *Job) error
155164
stopFunc func(key string) error
156165
notifyFunc func(payload []byte) error
157166
jobs map[string]*Job
158167
}
159168

160-
func (w *workerMock) Start(job *Job) error { return w.startFunc(job) }
161-
func (w *workerMock) Stop(key string) error { return w.stopFunc(key) }
162-
func (w *workerMock) Notify(p []byte) error { return w.notifyFunc(p) }
169+
func (w *mockHandler) Start(job *Job) error { return w.startFunc(job) }
170+
func (w *mockHandler) Stop(key string) error { return w.stopFunc(key) }
171+
func (w *mockHandler) Notify(p []byte) error { return w.notifyFunc(p) }
163172

164173
// buffer is a goroutine safe bytes.Buffer
165174
type buffer struct {

pool/scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func (sched *scheduler) stopJobs(ctx context.Context, plan *JobPlan) error {
170170
}
171171

172172
// handleStop handles the scheduler stop signal.
173-
func (sched *scheduler) handleStop(ctx context.Context) {
173+
func (sched *scheduler) handleStop(_ context.Context) {
174174
ch := sched.jobMap.Subscribe()
175175
for ev := range ch {
176176
if ev == rmap.EventReset {

pool/worker.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,9 @@ func (w *Worker) handleEvents(c <-chan *streaming.Event) {
163163
case evStartJob:
164164
err = w.startJob(ctx, unmarshalJob(payload))
165165
case evStopJob:
166+
w.lock.Lock()
166167
err = w.stopJob(ctx, unmarshalJobKey(payload))
168+
w.lock.Unlock()
167169
case evNotify:
168170
key, payload := unmarshalNotification(payload)
169171
err = w.notify(ctx, key, payload)
@@ -229,7 +231,7 @@ func (w *Worker) stopAndWait(ctx context.Context) {
229231
}
230232

231233
// startJob starts a job.
232-
func (w *Worker) startJob(ctx context.Context, job *Job) error {
234+
func (w *Worker) startJob(_ context.Context, job *Job) error {
233235
w.lock.Lock()
234236
defer w.lock.Unlock()
235237
if w.stopped {
@@ -246,12 +248,8 @@ func (w *Worker) startJob(ctx context.Context, job *Job) error {
246248
}
247249

248250
// stopJob stops a job.
249-
func (w *Worker) stopJob(ctx context.Context, key string) error {
250-
w.lock.Lock()
251-
defer w.lock.Unlock()
252-
if w.stopped {
253-
return nil
254-
}
251+
// worker.lock must be held when calling this method.
252+
func (w *Worker) stopJob(_ context.Context, key string) error {
255253
if _, ok := w.jobs[key]; !ok {
256254
return fmt.Errorf("job %s not found", key)
257255
}
@@ -264,7 +262,7 @@ func (w *Worker) stopJob(ctx context.Context, key string) error {
264262
}
265263

266264
// notify notifies the worker with the given payload.
267-
func (w *Worker) notify(ctx context.Context, key string, payload []byte) error {
265+
func (w *Worker) notify(_ context.Context, key string, payload []byte) error {
268266
w.lock.Lock()
269267
defer w.lock.Unlock()
270268
if w.stopped {
@@ -335,6 +333,9 @@ func (w *Worker) requeueJobs(ctx context.Context) {
335333
w.lock.Lock()
336334
defer w.lock.Unlock()
337335
for _, job := range w.jobs {
336+
if err := w.stopJob(ctx, job.Key); err != nil {
337+
w.logger.Error(fmt.Errorf("failed to stop job %q: %w", job.Key, err))
338+
}
338339
if _, err := w.Node.poolStream.Add(ctx, evStartJob, marshalJob(job)); err != nil {
339340
w.logger.Error(fmt.Errorf("failed to requeue job %q: %w", job.Key, err))
340341
}

scripts/test

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,15 @@ pushd ${GIT_ROOT}
99

1010
staticcheck ./...
1111

12+
# If --force is passed, add --count=1 to the go test command
13+
if [[ "$1" == "--force" ]]; then
14+
shift
15+
OPTIONS="--count=1"
16+
fi
17+
1218
# Run tests one package at a time to avoid Redis race conditions
13-
go test -race goa.design/pulse/rmap/...
14-
go test -race goa.design/pulse/streaming/...
15-
go test -race goa.design/pulse/pool/...
19+
go test -race goa.design/pulse/rmap/... $OPTIONS
20+
go test -race goa.design/pulse/streaming/... $OPTIONS
21+
go test -race goa.design/pulse/pool/... $OPTIONS
1622

1723
popd

0 commit comments

Comments
 (0)