4444import java .util .SortedMap ;
4545import java .util .TreeMap ;
4646import java .util .concurrent .ConcurrentLinkedQueue ;
47+ import java .util .concurrent .Future ;
4748import java .util .concurrent .TimeUnit ;
4849import java .util .concurrent .atomic .AtomicReference ;
4950
@@ -114,6 +115,9 @@ public final class MemcachedConnection extends SpyObject {
114115 private final DelayedSwitchoverGroups delayedSwitchoverGroups =
115116 new DelayedSwitchoverGroups (DELAYED_SWITCHOVER_TIMEOUT_MILLISECONDS );
116117 /* ENABLE_REPLICATION end */
118+ private final HashRingUpdateService hashUpdateService = new HashRingUpdateService ();
119+
120+ private Future <Boolean > hashUpdateResult ;
117121
118122 /**
119123 * Construct a memcached connection.
@@ -312,7 +316,7 @@ public void handleIO() throws IOException {
312316 }
313317 }
314318
315- private void handleNodesToRemove (final List <MemcachedNode > nodesToRemove ) {
319+ void handleNodesToRemove (final List <MemcachedNode > nodesToRemove ) {
316320 for (MemcachedNode node : nodesToRemove ) {
317321 getLogger ().info ("old memcached node removed %s" , node );
318322 reconnectQueue .remove (node );
@@ -339,10 +343,9 @@ private void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
339343 }
340344 }
341345
342- private void updateConnections (List <InetSocketAddress > addrs ) throws IOException {
343- List <MemcachedNode > attachNodes = new ArrayList <MemcachedNode >();
344- List <MemcachedNode > removeNodes = new ArrayList <MemcachedNode >();
345-
346+ private void getUpdateNodes (List <InetSocketAddress > addrs ,
347+ List <MemcachedNode > attachNodes ,
348+ List <MemcachedNode > removeNodes ) throws IOException {
346349 for (MemcachedNode node : locator .getAll ()) {
347350 if (addrs .contains (node .getSocketAddress ())) {
348351 addrs .remove (node .getSocketAddress ());
@@ -355,12 +358,6 @@ private void updateConnections(List<InetSocketAddress> addrs) throws IOException
355358 for (SocketAddress sa : addrs ) {
356359 attachNodes .add (attachMemcachedNode (sa ));
357360 }
358-
359- // Update the hash.
360- locator .update (attachNodes , removeNodes );
361-
362- // Remove the unavailable nodes.
363- handleNodesToRemove (removeNodes );
364361 }
365362
366363 /* ENABLE_REPLICATION if */
@@ -703,7 +700,12 @@ void handleCacheNodesChange() throws IOException {
703700 return ;
704701 }
705702 /* ENABLE_REPLICATION end */
706- updateConnections (AddrUtil .getAddresses (cacheList ));
703+ List <MemcachedNode > attachNodes = new ArrayList <MemcachedNode >();
704+ List <MemcachedNode > removeNodes = new ArrayList <MemcachedNode >();
705+ getUpdateNodes (AddrUtil .getAddresses (cacheList ), attachNodes , removeNodes );
706+ // Update the hash.
707+ CacheListUpdateTask task = new CacheListUpdateTask (this , attachNodes , removeNodes );
708+ hashUpdateResult = hashUpdateService .updateHashes (task );
707709 }
708710 /* ENABLE_MIGRATION if */
709711 if (arcusMigrEnabled && alterList != null ) {
@@ -724,6 +726,17 @@ void handleCacheNodesChange() throws IOException {
724726 /* ENABLE_MIGRATION end */
725727 }
726728
729+ // Called By MemcachedConnectionTest.
730+ boolean getHashUpdateResult () {
731+ try {
732+ hashUpdateResult .get ();
733+ } catch (Exception e ) {
734+ getLogger ().warn ("Failed to update hash." , e );
735+ return false ;
736+ }
737+ return true ;
738+ }
739+
727740 // Called by CacheManger to add the memcached server group.
728741 public void setCacheNodesChange (String addrs ) {
729742 String old = cacheNodesChange .getAndSet (addrs );
@@ -1509,6 +1522,7 @@ public void shutdown() throws IOException {
15091522 }
15101523 }
15111524 selector .close ();
1525+ hashUpdateService .shutdown ();
15121526 getLogger ().debug ("Shut down selector %s" , selector );
15131527 }
15141528
0 commit comments