File tree Expand file tree Collapse file tree 1 file changed +8
-4
lines changed Expand file tree Collapse file tree 1 file changed +8
-4
lines changed Original file line number Diff line number Diff line change 77 "math/rand"
88 "sort"
99 "sync"
10- "sync/atomic"
1110)
1211
1312// The Balancer interface provides an abstraction of the message distribution
@@ -42,8 +41,10 @@ func (f BalancerFunc) Balance(msg Message, partitions ...int) int {
4241type RoundRobin struct {
4342 ChunkSize int
4443 // Use a 32 bits integer so RoundRobin values don't need to be aligned to
45- // apply atomic increments.
44+ // apply increments.
4645 counter uint32
46+
47+ mutex sync.Mutex
4748}
4849
4950// Balance satisfies the Balancer interface.
@@ -52,14 +53,17 @@ func (rr *RoundRobin) Balance(msg Message, partitions ...int) int {
5253}
5354
5455func (rr * RoundRobin ) balance (partitions []int ) int {
56+ rr .mutex .Lock ()
57+ defer rr .mutex .Unlock ()
58+
5559 if rr .ChunkSize < 1 {
5660 rr .ChunkSize = 1
5761 }
5862
5963 length := len (partitions )
60- counterNow := atomic . LoadUint32 ( & rr .counter )
64+ counterNow := rr .counter
6165 offset := int (counterNow / uint32 (rr .ChunkSize ))
62- atomic . AddUint32 ( & rr .counter , 1 )
66+ rr .counter ++
6367 return partitions [offset % length ]
6468}
6569
You can’t perform that action at this time.
0 commit comments