Skip to content

Commit 3a75ede

Browse files
committed
ENHANCE: Create hashUpdateThreadPool to decrease IO thread overhead.
1 parent e912341 commit 3a75ede

File tree

5 files changed

+145
-12
lines changed

5 files changed

+145
-12
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package net.spy.memcached;
2+
3+
import java.util.List;
4+
import java.util.concurrent.Callable;
5+
6+
public class CacheListUpdateTask implements Callable<Boolean> {
7+
8+
private final MemcachedConnection conn;
9+
private final List<MemcachedNode> attachNodes;
10+
private final List<MemcachedNode> removeNodes;
11+
12+
public CacheListUpdateTask(MemcachedConnection conn,
13+
List<MemcachedNode> attachNodes,
14+
List<MemcachedNode> removeNodes) {
15+
16+
this.conn = conn;
17+
this.attachNodes = attachNodes;
18+
this.removeNodes = removeNodes;
19+
}
20+
21+
@Override
22+
public Boolean call() {
23+
conn.getLocator().update(attachNodes, removeNodes);
24+
25+
// Remove the unavailable nodes.
26+
conn.handleNodesToRemove(removeNodes);
27+
28+
return true;
29+
}
30+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package net.spy.memcached;
2+
3+
import java.util.concurrent.ExecutorService;
4+
import java.util.concurrent.Executors;
5+
import java.util.concurrent.Future;
6+
7+
public class HashRingUpdateService {
8+
private final ExecutorService pool;
9+
10+
public HashRingUpdateService() {
11+
pool = Executors.newSingleThreadExecutor();
12+
}
13+
14+
public void updateHashes(CacheListUpdateTask task) {
15+
pool.submit(task);
16+
}
17+
18+
public Future<Boolean> updateHashesWithResult(CacheListUpdateTask task) {
19+
return pool.submit(task);
20+
}
21+
22+
public void shutdown() {
23+
pool.shutdown();
24+
}
25+
}

src/main/java/net/spy/memcached/MemcachedConnection.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ public final class MemcachedConnection extends SpyObject {
114114
private final DelayedSwitchoverGroups delayedSwitchoverGroups =
115115
new DelayedSwitchoverGroups(DELAYED_SWITCHOVER_TIMEOUT_MILLISECONDS);
116116
/* ENABLE_REPLICATION end */
117+
private final HashRingUpdateService hashUpdateService = new HashRingUpdateService();
117118

118119
/**
119120
* Construct a memcached connection.
@@ -312,7 +313,7 @@ public void handleIO() throws IOException {
312313
}
313314
}
314315

315-
private void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
316+
void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
316317
for (MemcachedNode node : nodesToRemove) {
317318
getLogger().info("old memcached node removed %s", node);
318319
reconnectQueue.remove(node);
@@ -357,10 +358,7 @@ private void updateConnections(List<InetSocketAddress> addrs) throws IOException
357358
}
358359

359360
// Update the hash.
360-
locator.update(attachNodes, removeNodes);
361-
362-
// Remove the unavailable nodes.
363-
handleNodesToRemove(removeNodes);
361+
hashUpdateService.updateHashes(new CacheListUpdateTask(this, attachNodes, removeNodes));
364362
}
365363

366364
/* ENABLE_REPLICATION if */
@@ -1509,6 +1507,7 @@ public void shutdown() throws IOException {
15091507
}
15101508
}
15111509
selector.close();
1510+
hashUpdateService.shutdown();
15121511
getLogger().debug("Shut down selector %s", selector);
15131512
}
15141513

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package net.spy.memcached;
2+
3+
import java.net.InetSocketAddress;
4+
import java.util.ArrayList;
5+
import java.util.Collection;
6+
import java.util.List;
7+
import java.util.concurrent.ExecutionException;
8+
9+
import junit.framework.TestCase;
10+
11+
import org.junit.Assert;
12+
13+
public class HashRingUpdateServiceTest extends TestCase {
14+
15+
private HashRingUpdateService service;
16+
private MemcachedConnection conn;
17+
18+
@Override
19+
public void setUp() throws Exception {
20+
super.setUp();
21+
service = new HashRingUpdateService();
22+
23+
ConnectionFactoryBuilder cfb = new ConnectionFactoryBuilder().setReadBufferSize(1024);
24+
ConnectionFactory cf = cfb.build();
25+
conn = new MemcachedConnection("connection test", cf, new ArrayList<InetSocketAddress>(),
26+
cf.getInitialObservers(), cf.getFailureMode(), cf.getOperationFactory());
27+
}
28+
29+
@Override
30+
protected void tearDown() throws Exception {
31+
conn.shutdown();
32+
super.tearDown();
33+
}
34+
35+
public void testSubmitAttachTask() throws InterruptedException, ExecutionException {
36+
List<MemcachedNode> attach = new ArrayList<MemcachedNode>();
37+
38+
for (int i = 0; i < 5; i++) {
39+
MockMemcachedNode node
40+
= new MockMemcachedNode(new InetSocketAddress("localhost", 11211 + i));
41+
attach.add(node);
42+
}
43+
44+
service.updateHashesWithResult(new CacheListUpdateTask(conn, attach,
45+
new ArrayList<MemcachedNode>())).get();
46+
47+
Collection<MemcachedNode> all = conn.getLocator().getAll();
48+
Assert.assertEquals(all.size(), attach.size());
49+
}
50+
51+
public void testSubmitRemoveTask() throws InterruptedException {
52+
List<MemcachedNode> attach = new ArrayList<MemcachedNode>();
53+
List<MemcachedNode> remove = new ArrayList<MemcachedNode>();
54+
55+
for (int i = 0; i < 5; i++) {
56+
MockMemcachedNode node
57+
= new MockMemcachedNode(new InetSocketAddress("localhost", 11211 + i));
58+
attach.add(node);
59+
remove.add(node);
60+
}
61+
62+
try {
63+
service.updateHashesWithResult(new CacheListUpdateTask(conn, attach, remove)).get();
64+
} catch (ExecutionException e) {
65+
// Removing node also removes included Operations.
66+
// MockMemcachedNode does not have any Operations.
67+
// So, ExecutionException occurs.
68+
e.printStackTrace();
69+
}
70+
71+
Collection<MemcachedNode> all = conn.getLocator().getAll();
72+
Assert.assertEquals(all.size(), 0);
73+
}
74+
}

src/test/java/net/spy/memcached/MemcachedConnectionTest.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ public void testNodesChangeQueue() throws Exception {
7272
conn.handleCacheNodesChange();
7373

7474
// then
75-
assertTrue(1 == locator.getAll().size());
75+
Thread.sleep(500);
76+
assertEquals(1, locator.getAll().size());
7677

7778
// when
7879
conn.setCacheNodesChange("0.0.0.0:11211,0.0.0.0:11212,0.0.0.0:11213");
@@ -81,7 +82,8 @@ public void testNodesChangeQueue() throws Exception {
8182
conn.handleCacheNodesChange();
8283

8384
// then
84-
assertTrue(3 == locator.getAll().size());
85+
Thread.sleep(500);
86+
assertEquals(3, locator.getAll().size());
8587

8688
// when
8789
conn.setCacheNodesChange("0.0.0.0:11212");
@@ -90,7 +92,8 @@ public void testNodesChangeQueue() throws Exception {
9092
conn.handleCacheNodesChange();
9193

9294
// then
93-
assertTrue(1 == locator.getAll().size());
95+
Thread.sleep(500);
96+
assertEquals(1, locator.getAll().size());
9497
}
9598

9699
public void testNodesChangeQueue_empty() throws Exception {
@@ -101,7 +104,7 @@ public void testNodesChangeQueue_empty() throws Exception {
101104
conn.handleCacheNodesChange();
102105

103106
// then
104-
assertTrue(0 == locator.getAll().size());
107+
assertEquals(0, locator.getAll().size());
105108
}
106109

107110
public void testNodesChangeQueue_invalid_addr() {
@@ -120,15 +123,16 @@ public void testNodesChangeQueue_invalid_addr() {
120123
}
121124
}
122125

123-
public void testNodesChangeQueue_redundant() throws Exception {
126+
public void testNodesChangeQueue_redundent() throws Exception {
124127
// when
125128
conn.setCacheNodesChange("0.0.0.0:11211,0.0.0.0:11211");
126129

127130
// test
128131
conn.handleCacheNodesChange();
129132

130133
// then
131-
assertTrue(2 == locator.getAll().size());
134+
Thread.sleep(500);
135+
assertEquals(2, locator.getAll().size());
132136
}
133137

134138
public void testNodesChangeQueue_twice() throws Exception {
@@ -140,7 +144,8 @@ public void testNodesChangeQueue_twice() throws Exception {
140144
conn.handleCacheNodesChange();
141145

142146
// then
143-
assertTrue(1 == locator.getAll().size());
147+
Thread.sleep(500);
148+
assertEquals(1, locator.getAll().size());
144149
}
145150

146151
public void testAddOperations() throws Exception {

0 commit comments

Comments
 (0)