Skip to content

Commit bc20968

Browse files
[SCH-175] Update to buffer and pipeline commands (#16)
1 parent c7ab965 commit bc20968

File tree

5 files changed

+184
-27
lines changed

5 files changed

+184
-27
lines changed

broadcast/broadcast.go

Lines changed: 80 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package broadcast
22

33
import (
44
"encoding/json"
5+
"sync"
56
"time"
67

78
"github.com/bitly/go-nsq"
@@ -30,28 +31,47 @@ type Message struct {
3031

3132
// Options for broadcast.
3233
type Options struct {
33-
Redis RedisPool
34-
Metrics *statsd.Client
35-
Ratelimiter *ratelimit.Ratelimiter
36-
RatelimitKey string
37-
Log *log.Logger
34+
Redis RedisPool
35+
Metrics *statsd.Client
36+
Ratelimiter *ratelimit.Ratelimiter
37+
RatelimitKey string
38+
Log *log.Logger
39+
FlushInterval time.Duration
3840
}
3941

4042
// Broadcast consumer distributes messages to N handlers.
4143
type Broadcast struct {
44+
Done chan struct{}
45+
*Options
46+
4247
handlers []Handler
4348
stats *stats.Stats
44-
*Options
49+
50+
// Single connection, channel and mutex are used when applying a flush interval
51+
conn Conn
52+
mutex sync.Mutex
4553
}
4654

4755
// New broadcast consumer.
4856
func New(o *Options) *Broadcast {
4957
stats := stats.New()
5058
go stats.TickEvery(10 * time.Second)
51-
return &Broadcast{
59+
60+
broadcast := Broadcast{
5261
stats: stats,
5362
Options: o,
63+
Done: make(chan struct{}, 1),
64+
}
65+
66+
if o.FlushInterval < 0 {
67+
panic("FlushInterval must not be a negative duration")
68+
} else if o.FlushInterval > 0 {
69+
conn := NewConn(o.Redis.Get())
70+
broadcast.conn = conn
71+
broadcast.flushOnInterval(conn)
5472
}
73+
74+
return &broadcast
5575
}
5676

5777
// Add handler.
@@ -80,9 +100,8 @@ func (b *Broadcast) HandleMessage(msg *nsq.Message) error {
80100
return nil
81101
}
82102

83-
db := b.Redis.Get()
84-
defer db.Close()
85-
conn := NewConn(db)
103+
conn, done := b.getConn()
104+
defer done()
86105

87106
for _, h := range b.handlers {
88107
err := h.Handle(conn, m)
@@ -91,14 +110,63 @@ func (b *Broadcast) HandleMessage(msg *nsq.Message) error {
91110
}
92111
}
93112

94-
err = conn.Flush()
113+
if b.FlushInterval == 0 {
114+
if err := b.flush(conn); err != nil {
115+
return err
116+
}
117+
}
118+
119+
b.Metrics.Duration("timers.broadcast", time.Since(start))
120+
return nil
121+
}
122+
123+
// Flushes all messages, then sends on the Done channel.
124+
func (b *Broadcast) Stop() {
125+
if b.FlushInterval > 0 {
126+
conn, done := b.getConn()
127+
defer done()
128+
b.flush(conn)
129+
}
130+
131+
b.Done <- struct{}{}
132+
}
133+
134+
func (b *Broadcast) getConn() (conn Conn, done func()) {
135+
if b.FlushInterval == 0 {
136+
db := b.Redis.Get()
137+
conn = NewConn(db)
138+
done = func() {
139+
db.Close()
140+
}
141+
} else {
142+
b.mutex.Lock()
143+
conn = b.conn
144+
done = func() {
145+
b.mutex.Unlock()
146+
}
147+
}
148+
149+
return conn, done
150+
}
151+
152+
func (b *Broadcast) flushOnInterval(conn Conn) {
153+
go func() {
154+
for range time.Tick(b.FlushInterval) {
155+
b.mutex.Lock()
156+
b.flush(conn)
157+
b.mutex.Unlock()
158+
}
159+
}()
160+
}
161+
162+
func (b *Broadcast) flush(conn Conn) error {
163+
err := conn.Flush()
95164
if err != nil {
96165
b.Metrics.Incr("errors.flush")
97166
b.Log.Error("flush: %s", err)
98167
return err
99168
}
100169

101-
b.Metrics.Duration("timers.broadcast", time.Since(start))
102170
return nil
103171
}
104172

broadcast/broadcast_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package broadcast
22

33
import (
44
"io/ioutil"
5+
"sync/atomic"
56
"testing"
67
"time"
78

@@ -84,6 +85,73 @@ func TestBroadcast(t *testing.T) {
8485
h2.AssertExpectations(t)
8586
}
8687

88+
func TestBroadcastInvalidFlushInterval(t *testing.T) {
89+
assert.Panic(t, "FlushInterval must not be a negative duration", func() {
90+
New(&Options{FlushInterval: -1 * time.Hour})
91+
})
92+
}
93+
94+
func TestBroadcastWithoutFlushInterval(t *testing.T) {
95+
pool := getMockPool()
96+
broadcast := New(&Options{
97+
Redis: pool,
98+
Metrics: statsd.NewClient(ioutil.Discard),
99+
Log: log.Log,
100+
})
101+
102+
sendMessages(broadcast)
103+
104+
mockConn := pool.Get().(*mocks.NoOpRedisConn)
105+
assert.Equal(t, 2, int(atomic.LoadUint64(&mockConn.Flushes)))
106+
}
107+
108+
func TestBroadcastWithFlushInterval(t *testing.T) {
109+
pool := getMockPool()
110+
broadcast := New(&Options{
111+
Redis: pool,
112+
Metrics: statsd.NewClient(ioutil.Discard),
113+
Log: log.Log,
114+
FlushInterval: 2 * time.Millisecond,
115+
})
116+
117+
sendMessages(broadcast)
118+
119+
mockConn := pool.Get().(*mocks.NoOpRedisConn)
120+
assert.Equal(t, 0, int(atomic.LoadUint64(&mockConn.Flushes)))
121+
<-time.After(3 * time.Millisecond)
122+
assert.Equal(t, 1, int(atomic.LoadUint64(&mockConn.Flushes)))
123+
}
124+
125+
func TestBroadcastWithFlushIntervalStop(t *testing.T) {
126+
pool := getMockPool()
127+
broadcast := New(&Options{
128+
Redis: pool,
129+
Metrics: statsd.NewClient(ioutil.Discard),
130+
Log: log.Log,
131+
FlushInterval: 10 * time.Second,
132+
})
133+
134+
sendMessages(broadcast)
135+
136+
mockConn := pool.Get().(*mocks.NoOpRedisConn)
137+
assert.Equal(t, 0, int(atomic.LoadUint64(&mockConn.Flushes)))
138+
broadcast.Stop()
139+
<-broadcast.Done
140+
assert.Equal(t, 1, int(atomic.LoadUint64(&mockConn.Flushes)))
141+
}
142+
143+
func getMockPool() RedisPool {
144+
pool := &mockRedisPool{}
145+
pool.On("Get").Return(mocks.NewNoOpRedisConn())
146+
return pool
147+
}
148+
149+
func sendMessages(broadcast *Broadcast) {
150+
nsqMsg := nsq.NewMessage(newNSQMessageId("nsq__message__id"), []byte(`{"projectId":"gy2d"}`))
151+
broadcast.HandleMessage(nsqMsg)
152+
broadcast.HandleMessage(nsqMsg)
153+
}
154+
87155
func newNSQMessageId(id string) nsq.MessageID {
88156
nsqId := [nsq.MsgIDLength]byte{}
89157
copy(nsqId[:], id[:nsq.MsgIDLength])

broadcast/conn.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,7 @@ func (c *conn) Flush() error {
4141
}
4242
}
4343

44+
c.pending = 0
45+
4446
return nil
4547
}

broadcast/mocks/RedisConn.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,40 @@
11
package mocks
22

3-
import "github.com/garyburd/redigo/redis"
3+
import (
4+
"sync/atomic"
5+
6+
"github.com/garyburd/redigo/redis"
7+
)
48

59
func NewNoOpRedisConn() redis.Conn {
6-
return &noOpRedisConn{}
10+
return &NoOpRedisConn{}
711
}
812

9-
type noOpRedisConn struct{}
13+
type NoOpRedisConn struct {
14+
Flushes uint64
15+
}
1016

11-
func (c *noOpRedisConn) Close() error {
17+
func (c *NoOpRedisConn) Close() error {
1218
return nil
1319
}
1420

15-
func (c *noOpRedisConn) Err() error {
21+
func (c *NoOpRedisConn) Err() error {
1622
return nil
1723
}
1824

19-
func (c *noOpRedisConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
25+
func (c *NoOpRedisConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
2026
return nil, nil
2127
}
2228

23-
func (c *noOpRedisConn) Send(commandName string, args ...interface{}) error {
29+
func (c *NoOpRedisConn) Send(commandName string, args ...interface{}) error {
2430
return nil
2531
}
2632

27-
func (c *noOpRedisConn) Flush() error {
33+
func (c *NoOpRedisConn) Flush() error {
34+
atomic.AddUint64(&c.Flushes, 1)
2835
return nil
29-
3036
}
3137

32-
func (c *noOpRedisConn) Receive() (reply interface{}, err error) {
38+
func (c *NoOpRedisConn) Receive() (reply interface{}, err error) {
3339
return nil, nil
3440
}

main.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const usage = `
2929
[--lookupd-http-address addr...]
3030
[--nsqd-tcp-address addr...]
3131
[--redis-address addr]
32+
[--flush-interval t]
3233
[--max-idle n]
3334
[--idle-timeout t]
3435
[--list name] [--list-size n]
@@ -47,6 +48,7 @@ const usage = `
4748
--redis-address addr redis address [default: :6379]
4849
--max-attempts n nsq max message attempts [default: 5]
4950
--max-in-flight n nsq messages in-flight [default: 250]
51+
--flush-interval t time to buffer redis commands before flushing [default: 0s]
5052
--max-idle n redis max idle connections [default: 15]
5153
--idle-timeout t idle connection timeout [default: 1m]
5254
--list-size n   redis list size [default: 100]
@@ -88,6 +90,14 @@ func main() {
8890
log.Fatalf("error parsing idle timeout: %s", err)
8991
}
9092

93+
flushInterval, err := time.ParseDuration(args["--flush-interval"].(string))
94+
if err != nil {
95+
log.Fatalf("error parsing flush-interval: %s", err)
96+
}
97+
if flushInterval < 0 {
98+
log.Fatalf("flush-interval must not be a negative value")
99+
}
100+
91101
maxIdle, err := strconv.Atoi(args["--max-idle"].(string))
92102
if err != nil {
93103
log.Fatalf("error parsing max-idle: %s", err)
@@ -105,11 +115,12 @@ func main() {
105115
}
106116

107117
broadcast := broadcast.New(&broadcast.Options{
108-
Redis: pool,
109-
Metrics: metrics,
110-
Log: log.Log,
111-
Ratelimiter: ratelimiter(args),
112-
RatelimitKey: args["--ratelimit-key"].(string),
118+
Redis: pool,
119+
Metrics: metrics,
120+
Log: log.Log,
121+
Ratelimiter: ratelimiter(args),
122+
RatelimitKey: args["--ratelimit-key"].(string),
123+
FlushInterval: flushInterval,
113124
})
114125
config := config(args)
115126

@@ -175,6 +186,8 @@ func main() {
175186
log.Info("stopping")
176187
consumer.Stop()
177188
<-consumer.StopChan
189+
broadcast.Stop()
190+
<-broadcast.Done
178191
log.Info("bye :)")
179192
}
180193

0 commit comments

Comments
 (0)