Skip to content

Commit 85a219a

Browse files
committed
feat(api): implement routing logic which support dynamically adding/removing broadcast server
1 parent 911517e commit 85a219a

File tree

2 files changed

+28
-6
lines changed

2 files changed

+28
-6
lines changed

scalable-ot-api/src/main/java/com/brotherjing/service/DiscoveryService.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package com.brotherjing.service;
22

33
import java.net.InetSocketAddress;
4-
import java.util.ArrayList;
5-
import java.util.List;
64
import java.util.Set;
75
import java.util.concurrent.ConcurrentHashMap;
86

@@ -48,8 +46,8 @@ public void init() {
4846
});
4947
}
5048

51-
public List<ServerEntity> getAllServers() {
52-
return new ArrayList<>(cache);
49+
Set<ServerEntity> getAllServers() {
50+
return cache;
5351
}
5452

5553
private void addServer(PathChildrenCacheEvent event) {
Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
package com.brotherjing.service;
22

3-
import java.util.List;
3+
import java.util.Set;
4+
5+
import lombok.extern.slf4j.Slf4j;
46

57
import org.springframework.beans.factory.annotation.Autowired;
68
import org.springframework.stereotype.Service;
79

10+
import com.brotherjing.core.dao.RedisDao;
811
import com.brotherjing.core.loadbalance.LoadBalancer;
912
import com.brotherjing.core.loadbalance.ServerEntity;
1013

14+
@Slf4j
1115
@Service
1216
public class RouteService {
1317

@@ -17,8 +21,28 @@ public class RouteService {
1721
@Autowired
1822
private LoadBalancer loadBalancer;
1923

24+
@Autowired
25+
private RedisDao redisDao;
26+
2027
public ServerEntity getRoute(String docId) {
21-
List<ServerEntity> allServers = discoveryService.getAllServers();
28+
Set<ServerEntity> allServers = discoveryService.getAllServers();
29+
30+
// first check cached route
31+
ServerEntity cachedRoute = redisDao.getRoute(docId);
32+
if (cachedRoute != null) {
33+
if (allServers.contains(cachedRoute)) {
34+
// the cached route might differ from the one selected by load balance due to adding new node.
35+
// but the old node may still have clients connected to it,
36+
// so use the cached route as long as the node is up.
37+
return cachedRoute;
38+
} else {
39+
// this can happen when node is down and the cache has not expired
40+
log.warn("Found obsolete route in cache: {}, evicted.", cachedRoute);
41+
redisDao.removeRoute(docId);
42+
}
43+
}
44+
45+
// if no cache, use load balance to select route
2246
return loadBalancer.select(allServers, docId);
2347
}
2448
}

0 commit comments

Comments
 (0)