Skip to content

Commit 331d4f4

Browse files
author
xiemalin
committed
refact code
1 parent 6fe306c commit 331d4f4

File tree

2 files changed

+6
-3
lines changed

2 files changed

+6
-3
lines changed

filefanoutqueue.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,9 +229,12 @@ func (q *FileFanoutQueue) Subscribe(fanoutID int64, fn func(int64, []byte, error
229229
return ErrSubscribeExistErr
230230
}
231231

232-
q.doLoopSubscribe(fanoutID, fn)
232+
// do subsrcibe by async way
233+
go func() {
234+
q.doLoopSubscribe(fanoutID, fn)
233235

234-
qf.subscriber = fn
236+
qf.subscriber = fn
237+
}()
235238

236239
return nil
237240
}

filefanoutqueue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ func TestFanoutQueueSubscribe(t *testing.T) {
252252
So(err, ShouldBeNil)
253253
}
254254
for i := 0; i < 5; i++ {
255-
fq.Dequeue(fanoutID)
255+
fq.Dequeue(fanoutID) // fanoutID dequeue 5 elements directly
256256
}
257257

258258
fq.Subscribe(fanoutID, func(index int64, data []byte, err error) {

0 commit comments

Comments
 (0)