Skip to content

Commit 5efdf16

Browse files
committed
feat(broadcast): register docId -> broadcast server route when init connection with client
1 parent 29b7c00 commit 5efdf16

File tree

3 files changed

+53
-17
lines changed

3 files changed

+53
-17
lines changed

scalable-ot-broadcast/src/main/java/com/brotherjing/broadcast/handler/WebSocketHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212
import lombok.extern.slf4j.Slf4j;
1313

1414
import org.apache.dubbo.config.annotation.Service;
15+
import org.springframework.beans.factory.annotation.Autowired;
1516
import org.springframework.stereotype.Component;
1617
import org.springframework.util.CollectionUtils;
1718
import org.springframework.web.socket.BinaryMessage;
1819
import org.springframework.web.socket.WebSocketSession;
1920
import org.springframework.web.socket.handler.BinaryWebSocketHandler;
2021

2122
import com.brotherjing.api.Broadcast;
23+
import com.brotherjing.broadcast.service.RegistryService;
2224
import com.brotherjing.core.util.Converter;
2325
import com.brotherjing.proto.BaseProto;
2426

@@ -27,6 +29,9 @@
2729
@Service(interfaceClass = Broadcast.class)
2830
public class WebSocketHandler extends BinaryWebSocketHandler implements Broadcast {
2931

32+
@Autowired
33+
private RegistryService registryService;
34+
3035
private Map<String, Map<String, WebSocketSession>> clientsByDocId = new ConcurrentHashMap<>();
3136

3237
@Override
@@ -104,5 +109,8 @@ public void sendTo(String sid, List<byte[]> commandsPayload) {
104109
private void register(String docId, String sid, WebSocketSession client) {
105110
clientsByDocId.putIfAbsent(docId, new ConcurrentHashMap<>());
106111
clientsByDocId.get(docId).put(sid, client);
112+
113+
// also register the route in redis
114+
registryService.registerRoute(docId);
107115
}
108116
}

scalable-ot-broadcast/src/main/java/com/brotherjing/broadcast/service/RegistryService.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,30 @@
11
package com.brotherjing.broadcast.service;
22

3+
import java.net.InetAddress;
4+
import java.net.UnknownHostException;
5+
36
import javax.annotation.PostConstruct;
47

58
import org.springframework.beans.factory.annotation.Autowired;
69
import org.springframework.beans.factory.annotation.Value;
710
import org.springframework.stereotype.Service;
811

9-
import com.brotherjing.Const;
1012
import com.brotherjing.config.ZookeeperConfig;
13+
import com.brotherjing.core.dao.RedisDao;
14+
import com.brotherjing.core.loadbalance.ServerEntity;
1115
import com.brotherjing.core.zookeeper.ZookeeperRegistry;
1216

1317
@Service
1418
public class RegistryService {
1519

1620
@Value(value = "${server.port}")
17-
private String serverPort;
21+
private int serverPort;
1822

1923
@Value(value = "${dubbo.protocol.port}")
20-
private String dubboPort;
24+
private int dubboPort;
25+
26+
@Autowired
27+
private RedisDao redisDao;
2128

2229
@Autowired
2330
private ZookeeperConfig zookeeperConfig;
@@ -31,10 +38,38 @@ public void init() {
3138
register();
3239
}
3340

41+
/**
42+
* Register this node in discovery service
43+
*/
3444
private void register() {
35-
boolean success = registry.register(Const.BROADCAST_REGISTRY_PATH, serverPort, dubboPort);
45+
boolean success = registry.register(getServerEntity());
3646
if (!success) {
3747
throw new RuntimeException("Failed to register");
3848
}
3949
}
50+
51+
/**
52+
* Register a specific route from docId to node.
53+
*/
54+
public void registerRoute(String docId) {
55+
redisDao.addRoute(docId, getServerEntity());
56+
}
57+
58+
private ServerEntity getServerEntity() {
59+
return ServerEntity.builder()
60+
.host(getAddress())
61+
.serverPort(serverPort)
62+
.dubboPort(dubboPort)
63+
.build();
64+
}
65+
66+
private String getAddress() {
67+
InetAddress inetAddress;
68+
try {
69+
inetAddress = InetAddress.getLocalHost();
70+
} catch (UnknownHostException e) {
71+
throw new RuntimeException("Failed to get local address", e);
72+
}
73+
return inetAddress.getHostAddress();
74+
}
4075
}

scalable-ot-core/src/main/java/com/brotherjing/core/zookeeper/ZookeeperRegistry.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package com.brotherjing.core.zookeeper;
22

3-
import java.net.InetAddress;
4-
import java.net.UnknownHostException;
53
import java.util.concurrent.CountDownLatch;
64
import java.util.function.BiConsumer;
75

@@ -15,7 +13,9 @@
1513
import org.apache.zookeeper.CreateMode;
1614
import org.apache.zookeeper.data.Stat;
1715

16+
import com.brotherjing.Const;
1817
import com.brotherjing.config.ZookeeperConfig;
18+
import com.brotherjing.core.loadbalance.ServerEntity;
1919

2020
@Slf4j
2121
public class ZookeeperRegistry {
@@ -79,21 +79,14 @@ public boolean discover(String path, BiConsumer<CuratorFramework, PathChildrenCa
7979
* Dubbo port is put in path, while server port is put in node data.
8080
* For example /root/127.0.0.1:20880 with data 127.0.0.1:8080
8181
*/
82-
public boolean register(String root, String serverPort, String dubboPort) {
83-
InetAddress inetAddress;
84-
try {
85-
inetAddress = InetAddress.getLocalHost();
86-
} catch (UnknownHostException e) {
87-
throw new RuntimeException("Failed to get local address", e);
88-
}
89-
String address = inetAddress.getHostAddress();
90-
String path = root + "/" + address + ":" + dubboPort;
91-
String serverAddress = address + ":" + serverPort;
82+
public boolean register(ServerEntity serverEntity) {
83+
String path = Const.BROADCAST_REGISTRY_PATH + "/" + serverEntity.getDubboAddress();
84+
String data = serverEntity.getServerAddress();
9285
try {
9386
curator.create()
9487
.creatingParentsIfNeeded()
9588
.withMode(CreateMode.EPHEMERAL)
96-
.forPath(path, serverAddress.getBytes());
89+
.forPath(path, data.getBytes());
9790
log.info("Registered path in zookeeper: {}", path);
9891
} catch (Exception e) {
9992
log.error("Failed to register path {}, {}", path, e);

0 commit comments

Comments
 (0)