Skip to content

Commit 67e1908

Browse files
committed
FEAT: add support for shard key in ketama hashing
1 parent 261bbdc commit 67e1908

File tree

8 files changed

+212
-21
lines changed

8 files changed

+212
-21
lines changed

src/main/java/net/spy/memcached/ArcusKetamaNodeLocator.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,17 +58,20 @@ public final class ArcusKetamaNodeLocator extends SpyObject implements NodeLocat
5858
private final ArcusKetamaNodeLocatorConfiguration config;
5959

6060
private final Lock lock = new ReentrantLock();
61+
private final boolean enableShardKey;
6162

62-
public ArcusKetamaNodeLocator(List<MemcachedNode> nodes) {
63-
this(nodes, new ArcusKetamaNodeLocatorConfiguration());
63+
public ArcusKetamaNodeLocator(List<MemcachedNode> nodes, boolean enableShardKey) {
64+
this(nodes, new ArcusKetamaNodeLocatorConfiguration(), enableShardKey);
6465
}
6566

6667
public ArcusKetamaNodeLocator(List<MemcachedNode> nodes,
67-
ArcusKetamaNodeLocatorConfiguration conf) {
68+
ArcusKetamaNodeLocatorConfiguration conf,
69+
boolean shardKey) {
6870
super();
6971
allNodes = nodes;
7072
ketamaNodes = new TreeMap<>();
7173
config = conf;
74+
enableShardKey = shardKey;
7275

7376
int numReps = config.getNodeRepetitions();
7477
// Ketama does some special work with md5 where it reuses chunks.
@@ -89,11 +92,13 @@ public ArcusKetamaNodeLocator(List<MemcachedNode> nodes,
8992

9093
private ArcusKetamaNodeLocator(TreeMap<Long, SortedSet<MemcachedNode>> smn,
9194
Collection<MemcachedNode> an,
92-
ArcusKetamaNodeLocatorConfiguration conf) {
95+
ArcusKetamaNodeLocatorConfiguration conf,
96+
boolean shardKey) {
9397
super();
9498
ketamaNodes = smn;
9599
allNodes = an;
96100
config = conf;
101+
enableShardKey = shardKey;
97102

98103
/* ENABLE_MIGRATION if */
99104
existNodes = new HashSet<>();
@@ -151,7 +156,29 @@ public SortedMap<Long, SortedSet<MemcachedNode>> getKetamaNodes() {
151156
}
152157

153158
public MemcachedNode getPrimary(final String k) {
154-
return getNodeForKey(hashAlg.hash(k));
159+
String shardKey = getShardKey(k);
160+
return getNodeForKey(hashAlg.hash(shardKey));
161+
}
162+
163+
String getShardKey(String key) {
164+
if (!enableShardKey) {
165+
return key;
166+
}
167+
168+
if (key == null) {
169+
return null;
170+
}
171+
172+
int left = key.indexOf('{');
173+
if (left == -1) {
174+
return key;
175+
}
176+
int right = key.indexOf('}', left + 1);
177+
if (right == -1 || right == left + 1) {
178+
return key;
179+
}
180+
181+
return key.substring(left + 1, right);
155182
}
156183

157184
MemcachedNode getNodeForKey(long hash) {
@@ -193,7 +220,7 @@ public NodeLocator getReadonlyCopy() {
193220
nodesCopy.add(new MemcachedNodeROImpl(node));
194221
}
195222

196-
return new ArcusKetamaNodeLocator(ketamaCopy, nodesCopy, config);
223+
return new ArcusKetamaNodeLocator(ketamaCopy, nodesCopy, config, enableShardKey);
197224
} finally {
198225
lock.unlock();
199226
}
@@ -565,9 +592,10 @@ class KetamaIterator implements Iterator<MemcachedNode> {
565592

566593
public KetamaIterator(final String k, final int t) {
567594
super();
568-
hashVal = hashAlg.hash(k);
595+
String shardKey = getShardKey(k);
596+
hashVal = hashAlg.hash(shardKey);
569597
remainingTries = t;
570-
key = k;
598+
key = shardKey;
571599
}
572600

573601
private void nextHash() {

src/main/java/net/spy/memcached/ArcusReplKetamaNodeLocator.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,14 @@ public final class ArcusReplKetamaNodeLocator extends SpyObject implements NodeL
6363
= new ArcusReplKetamaNodeLocatorConfiguration();
6464

6565
private final Lock lock = new ReentrantLock();
66+
private final boolean enableShardKey;
6667

67-
public ArcusReplKetamaNodeLocator(List<MemcachedNode> nodes) {
68+
public ArcusReplKetamaNodeLocator(List<MemcachedNode> nodes, boolean shardKey) {
6869
super();
6970
allNodes = nodes;
7071
ketamaGroups = new TreeMap<>();
7172
allGroups = new ConcurrentHashMap<>();
73+
enableShardKey = shardKey;
7274

7375
// create all memcached replica group
7476
for (MemcachedNode node : nodes) {
@@ -105,12 +107,13 @@ public ArcusReplKetamaNodeLocator(List<MemcachedNode> nodes) {
105107

106108
private ArcusReplKetamaNodeLocator(TreeMap<Long, SortedSet<MemcachedReplicaGroup>> kg,
107109
ConcurrentHashMap<String, MemcachedReplicaGroup> ag,
108-
Collection<MemcachedNode> an) {
110+
Collection<MemcachedNode> an, boolean shardKey) {
109111
super();
110112
ketamaGroups = kg;
111113
allGroups = ag;
112114
allNodes = an;
113115
toDeleteGroups = new HashSet<>();
116+
enableShardKey = shardKey;
114117

115118
/* ENABLE_MIGRATION if */
116119
alterNodes = new HashSet<>();
@@ -172,11 +175,13 @@ public Collection<MemcachedNode> getMasterNodes() {
172175
}
173176

174177
public MemcachedNode getPrimary(final String k) {
175-
return getNodeForKey(hashAlg.hash(k), ReplicaPick.MASTER);
178+
String shardKey = getShardKey(k);
179+
return getNodeForKey(hashAlg.hash(shardKey), ReplicaPick.MASTER);
176180
}
177181

178182
public MemcachedNode getPrimary(final String k, ReplicaPick pick) {
179-
return getNodeForKey(hashAlg.hash(k), pick);
183+
String shardKey = getShardKey(k);
184+
return getNodeForKey(hashAlg.hash(shardKey), pick);
180185
}
181186

182187
private MemcachedNode getNodeForKey(long hash, ReplicaPick pick) {
@@ -231,7 +236,7 @@ public NodeLocator getReadonlyCopy() {
231236
nodesCopy.add(new MemcachedNodeROImpl(node));
232237
}
233238

234-
return new ArcusReplKetamaNodeLocator(ketamaCopy, groupsCopy, nodesCopy);
239+
return new ArcusReplKetamaNodeLocator(ketamaCopy, groupsCopy, nodesCopy, enableShardKey);
235240
} finally {
236241
lock.unlock();
237242
}
@@ -297,6 +302,27 @@ public void switchoverReplGroup(MemcachedReplicaGroup group) {
297302
lock.unlock();
298303
}
299304

305+
String getShardKey(String key) {
306+
if (!enableShardKey) {
307+
return key;
308+
}
309+
310+
if (key == null) {
311+
return null;
312+
}
313+
314+
int left = key.indexOf('{');
315+
if (left == -1) {
316+
return key;
317+
}
318+
int right = key.indexOf('}', left + 1);
319+
if (right == -1 || right == left + 1) {
320+
return key;
321+
}
322+
323+
return key.substring(left + 1, right);
324+
}
325+
300326
private void insertNodeIntoGroup(MemcachedNode node) {
301327
/* ENABLE_MIGRATION if */
302328
if (migrationInProgress) {
@@ -716,9 +742,11 @@ private class ReplKetamaIterator implements Iterator<MemcachedNode> {
716742

717743
public ReplKetamaIterator(final String k, ReplicaPick p, final int t) {
718744
super();
719-
hashVal = hashAlg.hash(k);
745+
746+
String shardKey = getShardKey(k);
747+
hashVal = hashAlg.hash(shardKey);
720748
remainingTries = t;
721-
key = k;
749+
key = shardKey;
722750
pick = p;
723751
}
724752

src/main/java/net/spy/memcached/ConnectionFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ MemcachedNode createMemcachedNode(String name,
120120
*/
121121
boolean getDnsCacheTtlCheck();
122122

123+
/**
124+
* If true, the shard key logic will be used for hashing.
125+
*/
126+
boolean isShardKeyEnabled();
127+
123128
/**
124129
* Observers that should be established at the time of connection
125130
* instantiation.

src/main/java/net/spy/memcached/ConnectionFactoryBuilder.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class ConnectionFactoryBuilder {
6161
private boolean useNagle = false;
6262
private boolean keepAlive = false;
6363
private boolean dnsCacheTtlCheck = true;
64+
private boolean enableShardKey = false;
6465
private long maxReconnectDelay = 1;
6566

6667
private int readBufSize = -1;
@@ -498,6 +499,11 @@ public ConnectionFactoryBuilder setDnsCacheTtlCheck(boolean dnsCacheTtlCheck) {
498499
return this;
499500
}
500501

502+
public ConnectionFactoryBuilder enableShardKey(boolean shardKey) {
503+
this.enableShardKey = shardKey;
504+
return this;
505+
}
506+
501507
/**
502508
* Get the ConnectionFactory set up with the provided parameters.
503509
*/
@@ -552,10 +558,10 @@ public NodeLocator createLocator(List<MemcachedNode> nodes) {
552558
// This locator uses ArcusReplKetamaNodeLocatorConfiguration
553559
// which builds keys off the server's group name, not
554560
// its ip:port.
555-
return new ArcusReplKetamaNodeLocator(nodes);
561+
return new ArcusReplKetamaNodeLocator(nodes, isShardKeyEnabled());
556562
}
557563
/* ENABLE_REPLICATION end */
558-
return new ArcusKetamaNodeLocator(nodes);
564+
return new ArcusKetamaNodeLocator(nodes, isShardKeyEnabled());
559565
default:
560566
throw new IllegalStateException(
561567
"Unhandled locator type: " + locator);
@@ -632,6 +638,11 @@ public boolean getDnsCacheTtlCheck() {
632638
return dnsCacheTtlCheck;
633639
}
634640

641+
@Override
642+
public boolean isShardKeyEnabled() {
643+
return enableShardKey;
644+
}
645+
635646
@Override
636647
public long getMaxReconnectDelay() {
637648
return maxReconnectDelay;

src/main/java/net/spy/memcached/DefaultConnectionFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,11 @@ public boolean getKeepAlive() {
311311
return false;
312312
}
313313

314+
@Override
315+
public boolean isShardKeyEnabled() {
316+
return false;
317+
}
318+
314319
@Override
315320
public boolean getDnsCacheTtlCheck() {
316321
return true;

src/test/java/net/spy/memcached/ArcusKetamaHashingTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ private void runThisManyNodes(List<String> stringNode1, List<String> stringNode2
7575
MemcachedNode oddManOut = larger.get(larger.size() - 1);
7676
assertFalse(smaller.contains(oddManOut));
7777

78-
ArcusKetamaNodeLocator lgLocator = new ArcusKetamaNodeLocator(larger);
79-
ArcusKetamaNodeLocator smLocator = new ArcusKetamaNodeLocator(smaller);
78+
ArcusKetamaNodeLocator lgLocator = new ArcusKetamaNodeLocator(larger, false);
79+
ArcusKetamaNodeLocator smLocator = new ArcusKetamaNodeLocator(smaller, false);
8080

8181
SortedMap<Long, SortedSet<MemcachedNode>> lgMap = lgLocator.getKetamaNodes();
8282
SortedMap<Long, SortedSet<MemcachedNode>> smMap = smLocator.getKetamaNodes();

src/test/java/net/spy/memcached/ArcusKetamaNodeLocatorTest.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ protected void setupNodes(int n) {
5050
}));
5151
}
5252

53-
locator = new ArcusKetamaNodeLocator(Arrays.asList(nodes));
53+
locator = new ArcusKetamaNodeLocator(Arrays.asList(nodes), false);
5454
}
5555

5656
@Test
@@ -154,6 +154,25 @@ private MemcachedNode[] mockNodes(String servers[]) {
154154
return nodes;
155155
}
156156

157+
@Test
158+
void testShardKeyBasic() {
159+
setupNodes(10);
160+
161+
locator = new ArcusKetamaNodeLocator(Arrays.asList(nodes), true);
162+
MemcachedNode node1 = locator.getPrimary("user:{100}:data1");
163+
MemcachedNode node2 = locator.getPrimary("user:{100}:data2");
164+
assertSame(node1, node2);
165+
}
166+
167+
@Test
168+
void testShardKeyDistribution() {
169+
setupNodes(10);
170+
171+
locator = new ArcusKetamaNodeLocator(Arrays.asList(nodes), true);
172+
MemcachedNode nodeA = locator.getPrimary("user:{100}:data");
173+
MemcachedNode nodeB = locator.getPrimary("user:{200}:data");
174+
}
175+
157176
@Test
158177
void testLibKetamaCompatTwo() {
159178
String servers[] = {
@@ -166,7 +185,7 @@ void testLibKetamaCompatTwo() {
166185
"10.0.1.7:11211",
167186
"10.0.1.8:11211"
168187
};
169-
locator = new ArcusKetamaNodeLocator(Arrays.asList(mockNodes(servers)));
188+
locator = new ArcusKetamaNodeLocator(Arrays.asList(mockNodes(servers)), false);
170189

171190
String[][] exp = {
172191
{"0", "10.0.1.1:11211"},

0 commit comments

Comments
 (0)