Skip to content

Commit 8f9983c

Browse files
authored
Add option to equally likely choose node to read between primary and all replicas (#254)
1 parent 9a6132b commit 8f9983c

File tree

7 files changed

+207
-1
lines changed

7 files changed

+207
-1
lines changed

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ gem 'redis-cluster-client'
2424
| --- | --- | --- | --- |
2525
| `:nodes` | String or Hash or Array<String, Hash> | `['redis://127.0.0.1:6379']` | node addresses for startup connection |
2626
| `:replica` | Boolean | `false` | `true` if client should use scale read feature |
27-
| `:replica_affinity` | Symbol or String | `:random` | scale reading strategy, `:random` or `:latency` are valid |
27+
| `:replica_affinity` | Symbol or String | `:random` | scale reading strategy, `:random`, `random_with_primary` or `:latency` are valid |
2828
| `:fixed_hostname` | String | `nil` | required if client should connect to single endpoint with SSL |
2929

3030
Also, [the other generic options](https://github.com/redis-rb/redis-client#configuration) can be passed.
@@ -44,6 +44,10 @@ RedisClient.cluster.new_client
4444
RedisClient.cluster(replica: true).new_client
4545
#=> #<RedisClient::Cluster 172.20.0.2:6379, 172.20.0.3:6379, 172.20.0.4:6379, 172.20.0.5:6379, 172.20.0.6:6379, 172.20.0.7:6379>
4646

47+
# To connect to all nodes to use scale reading feature + make reads equally likely from replicas and primary
48+
RedisClient.cluster(replica: true, replica_affinity: :random_with_primary).new_client
49+
#=> #<RedisClient::Cluster 172.20.0.2:6379, 172.20.0.3:6379, 172.20.0.4:6379, 172.20.0.5:6379, 172.20.0.6:6379, 172.20.0.7:6379>
50+
4751
# To connect to all nodes to use scale reading feature prioritizing low-latency replicas
4852
RedisClient.cluster(replica: true, replica_affinity: :latency).new_client
4953
#=> #<RedisClient::Cluster 172.20.0.2:6379, 172.20.0.3:6379, 172.20.0.4:6379, 172.20.0.5:6379, 172.20.0.6:6379, 172.20.0.7:6379>

lib/redis_client/cluster/node.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
require 'redis_client/cluster/errors'
66
require 'redis_client/cluster/node/primary_only'
77
require 'redis_client/cluster/node/random_replica'
8+
require 'redis_client/cluster/node/random_replica_or_primary'
89
require 'redis_client/cluster/node/latency_replica'
910

1011
class RedisClient
@@ -284,6 +285,8 @@ def update_slot(slot, node_key)
284285
def make_topology_class(with_replica, replica_affinity)
285286
if with_replica && replica_affinity == :random
286287
::RedisClient::Cluster::Node::RandomReplica
288+
elsif with_replica && replica_affinity == :random_with_primary
289+
::RedisClient::Cluster::Node::RandomReplicaOrPrimary
287290
elsif with_replica && replica_affinity == :latency
288291
::RedisClient::Cluster::Node::LatencyReplica
289292
else
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# frozen_string_literal: true
2+
3+
require 'redis_client/cluster/node/replica_mixin'
4+
5+
class RedisClient
6+
class Cluster
7+
class Node
8+
class RandomReplicaOrPrimary
9+
include ::RedisClient::Cluster::Node::ReplicaMixin
10+
11+
def replica_clients
12+
keys = @replications.values.filter_map(&:sample)
13+
@clients.select { |k, _| keys.include?(k) }
14+
end
15+
16+
def clients_for_scanning(seed: nil)
17+
random = seed.nil? ? Random : Random.new(seed)
18+
keys = @replications.map do |primary_node_key, replica_node_keys|
19+
decide_use_primary?(random, replica_node_keys.size) ? primary_node_key : replica_node_keys.sample(random: random)
20+
end
21+
22+
clients.select { |k, _| keys.include?(k) }
23+
end
24+
25+
def find_node_key_of_replica(primary_node_key, seed: nil)
26+
random = seed.nil? ? Random : Random.new(seed)
27+
28+
replica_node_keys = @replications.fetch(primary_node_key, EMPTY_ARRAY)
29+
if decide_use_primary?(random, replica_node_keys.size)
30+
primary_node_key
31+
else
32+
replica_node_keys.sample(random: random) || primary_node_key
33+
end
34+
end
35+
36+
def any_replica_node_key(seed: nil)
37+
random = seed.nil? ? Random : Random.new(seed)
38+
@replica_node_keys.sample(random: random) || any_primary_node_key(seed: seed)
39+
end
40+
41+
private
42+
43+
# Randomly equally likely choose node to read between primary and all replicas
44+
# e.g. 1 primary + 1 replica = 50% probability to read from primary
45+
# e.g. 1 primary + 2 replica = 33% probability to read from primary
46+
# e.g. 1 primary + 0 replica = 100% probability to read from primary
47+
def decide_use_primary?(random, replica_nodes)
48+
primary_nodes = 1.0
49+
total = primary_nodes + replica_nodes
50+
random.rand < primary_nodes / total
51+
end
52+
end
53+
end
54+
end
55+
end

test/bench_command.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,22 @@ def new_test_client
3434
end
3535
end
3636

37+
class ScaleReadRandomWithPrimary < BenchmarkWrapper
38+
include BenchmarkMixin
39+
40+
private
41+
42+
def new_test_client
43+
::RedisClient.cluster(
44+
nodes: TEST_NODE_URIS,
45+
replica: true,
46+
replica_affinity: :random_with_primary,
47+
fixed_hostname: TEST_FIXED_HOSTNAME,
48+
**TEST_GENERIC_OPTIONS
49+
).new_client
50+
end
51+
end
52+
3753
class ScaleReadLatency < BenchmarkWrapper
3854
include BenchmarkMixin
3955

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# frozen_string_literal: true
2+
3+
require 'testing_helper'
4+
require 'redis_client/cluster/node/testing_topology_mixin'
5+
6+
class RedisClient
7+
class Cluster
8+
class Node
9+
class TestRandomReplicaWithPrimary < TestingWrapper
10+
include TestingTopologyMixin
11+
12+
def test_clients_with_redis_client
13+
got = @test_topology.clients
14+
got.each_value { |client| assert_instance_of(::RedisClient, client) }
15+
assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort)
16+
end
17+
18+
def test_clients_with_pooled_redis_client
19+
test_topology = ::RedisClient::Cluster::Node::RandomReplicaOrPrimary.new(
20+
@replications,
21+
@options,
22+
{ timeout: 3, size: 2 },
23+
**TEST_GENERIC_OPTIONS
24+
)
25+
26+
got = test_topology.clients
27+
got.each_value { |client| assert_instance_of(::RedisClient::Pooled, client) }
28+
assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort)
29+
ensure
30+
test_topology&.clients&.each_value(&:close)
31+
end
32+
33+
def test_primary_clients
34+
got = @test_topology.primary_clients
35+
got.each_value do |client|
36+
assert_instance_of(::RedisClient, client)
37+
assert_equal('master', client.call('ROLE').first)
38+
end
39+
end
40+
41+
def test_replica_clients
42+
got = @test_topology.replica_clients
43+
got.each_value do |client|
44+
assert_instance_of(::RedisClient, client)
45+
assert_equal('slave', client.call('ROLE').first)
46+
end
47+
end
48+
49+
def test_clients_for_scanning
50+
got = @test_topology.clients_for_scanning
51+
got.each_value { |client| assert_instance_of(::RedisClient, client) }
52+
assert_equal(TEST_SHARD_SIZE, got.size)
53+
end
54+
55+
def test_find_node_key_of_replica
56+
want = 'dummy_key'
57+
got = @test_topology.find_node_key_of_replica('dummy_key')
58+
assert_equal(want, got)
59+
60+
primary_key = @replications.keys.first
61+
replica_keys = @replications.fetch(primary_key)
62+
got = @test_topology.find_node_key_of_replica(primary_key)
63+
assert_includes(replica_keys + [primary_key], got)
64+
end
65+
66+
def test_any_primary_node_key
67+
got = @test_topology.any_primary_node_key
68+
assert_includes(@replications.keys, got)
69+
end
70+
71+
def test_any_replica_node_key
72+
got = @test_topology.any_replica_node_key
73+
assert_includes(@replications.values.flatten, got)
74+
end
75+
76+
private
77+
78+
def topology_class
79+
::RedisClient::Cluster::Node::RandomReplicaOrPrimary
80+
end
81+
end
82+
end
83+
end
84+
end

test/redis_client/test_cluster.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,21 @@ def new_test_client
462462
end
463463
end
464464

465+
class ScaleReadRandomWithPrimary < TestingWrapper
466+
include Mixin
467+
468+
def new_test_client
469+
config = ::RedisClient::ClusterConfig.new(
470+
nodes: TEST_NODE_URIS,
471+
replica: true,
472+
replica_affinity: :random_with_primary,
473+
fixed_hostname: TEST_FIXED_HOSTNAME,
474+
**TEST_GENERIC_OPTIONS
475+
)
476+
::RedisClient::Cluster.new(config)
477+
end
478+
end
479+
465480
class ScaleReadLatency < TestingWrapper
466481
include Mixin
467482

test/test_against_cluster_state.rb

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,35 @@ def new_test_client
129129
end
130130
end
131131

132+
class ScaleReadRandomWithPrimary < TestingWrapper
133+
include Mixin
134+
135+
def test_the_state_of_cluster_resharding
136+
keys = nil
137+
do_resharding_test { |ks| keys = ks }
138+
keys.each { |key| assert_equal(key, @client.call('GET', key), "Case: GET: #{key}") }
139+
end
140+
141+
def test_the_state_of_cluster_resharding_with_pipelining
142+
keys = nil
143+
do_resharding_test { |ks| keys = ks }
144+
values = @client.pipelined { |pipeline| keys.each { |key| pipeline.call('GET', key) } }
145+
keys.each_with_index { |key, i| assert_equal(key, values[i], "Case: GET: #{key}") }
146+
end
147+
148+
private
149+
150+
def new_test_client
151+
::RedisClient.cluster(
152+
nodes: TEST_NODE_URIS,
153+
replica: true,
154+
replica_affinity: :random_with_primary,
155+
fixed_hostname: TEST_FIXED_HOSTNAME,
156+
**TEST_GENERIC_OPTIONS
157+
).new_client
158+
end
159+
end
160+
132161
class ScaleReadLatency < TestingWrapper
133162
include Mixin
134163

0 commit comments

Comments
 (0)