Skip to content

Commit 9a6132b

Browse files
authored
fix: Pub/Sub with performance enhancement and changing error handling (#255)
1 parent 26bdba4 commit 9a6132b

File tree

2 files changed

+29
-31
lines changed

2 files changed

+29
-31
lines changed

lib/redis_client/cluster/pub_sub.rb

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,44 +6,44 @@ class RedisClient
66
class Cluster
77
class PubSub
88
class State
9-
def initialize(client)
9+
def initialize(client, queue)
1010
@client = client
1111
@worker = nil
12+
@queue = queue
1213
end
1314

1415
def call(command)
1516
@client.call_v(command)
1617
end
1718

19+
def ensure_worker
20+
@worker = spawn_worker(@client, @queue) unless @worker&.alive?
21+
end
22+
1823
def close
1924
@worker.exit if @worker&.alive?
2025
@client.close
2126
end
2227

23-
def take_message(timeout)
24-
@worker = subscribe(@client, timeout) if @worker.nil?
25-
return if @worker.alive?
26-
27-
message = @worker.value
28-
@worker = nil
29-
message
30-
end
31-
3228
private
3329

34-
def subscribe(client, timeout)
35-
Thread.new(client, timeout) do |pubsub, to|
36-
pubsub.next_event(to)
37-
rescue StandardError => e
38-
e
30+
def spawn_worker(client, queue)
31+
Thread.new(client, queue) do |pubsub, q|
32+
loop do
33+
q << pubsub.next_event
34+
rescue StandardError => e
35+
q << e
36+
end
3937
end
4038
end
4139
end
4240

41+
BUF_SIZE = Integer(ENV.fetch('REDIS_CLIENT_PUBSUB_BUF_SIZE', 1024))
42+
4343
def initialize(router, command_builder)
4444
@router = router
4545
@command_builder = command_builder
46-
@state_list = []
46+
@queue = SizedQueue.new(BUF_SIZE)
4747
@state_dict = {}
4848
end
4949

@@ -56,26 +56,25 @@ def call_v(command)
5656
end
5757

5858
def close
59-
@state_list.each(&:close)
60-
@state_list.clear
59+
@state_dict.each_value(&:close)
6160
@state_dict.clear
61+
@queue.clear
6262
end
6363

6464
def next_event(timeout = nil)
65-
return if @state_list.empty?
66-
67-
@state_list.shuffle!
65+
@state_dict.each_value(&:ensure_worker)
6866
max_duration = calc_max_duration(timeout)
6967
starting = obtain_current_time
68+
7069
loop do
7170
break if max_duration > 0 && obtain_current_time - starting > max_duration
7271

73-
@state_list.each do |pubsub|
74-
message = pubsub.take_message(timeout)
75-
return message if message
72+
case event = @queue.pop(true)
73+
when StandardError then raise event
74+
when Array then break event
7675
end
77-
78-
sleep 0.001
76+
rescue ThreadError
77+
sleep 0.005
7978
end
8079
end
8180

@@ -100,8 +99,7 @@ def try_call(node_key, command, retry_count: 1)
10099
def add_state(node_key)
101100
return @state_dict[node_key] if @state_dict.key?(node_key)
102101

103-
state = State.new(@router.find_node(node_key).pubsub)
104-
@state_list << state
102+
state = State.new(@router.find_node(node_key).pubsub, @queue)
105103
@state_dict[node_key] = state
106104
end
107105

test/redis_client/test_cluster.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,16 +183,16 @@ def test_pipelined_with_many_commands
183183

184184
def test_pubsub_without_subscription
185185
pubsub = @client.pubsub
186-
assert_nil(pubsub.next_event(TEST_TIMEOUT_SEC))
186+
assert_nil(pubsub.next_event(0.01))
187187
pubsub.close
188188
end
189189

190190
def test_pubsub_with_wrong_command
191191
pubsub = @client.pubsub
192192
assert_nil(pubsub.call('SUBWAY'))
193193
assert_nil(pubsub.call_v(%w[SUBSCRIBE]))
194-
assert_instance_of(::RedisClient::CommandError, pubsub.next_event, 'unknown command')
195-
assert_instance_of(::RedisClient::CommandError, pubsub.next_event, 'wrong number of arguments')
194+
assert_raises(::RedisClient::CommandError, 'unknown command') { pubsub.next_event }
195+
assert_raises(::RedisClient::CommandError, 'wrong number of arguments') { pubsub.next_event }
196196
pubsub.close
197197
end
198198

0 commit comments

Comments
 (0)