Skip to content

Commit 8c900c0

Browse files
authored
Merge pull request #770 from tronprotocol/p2p_msg
P2p: mdf P2pHandler hasping to volatile
2 parents 39b71e5 + 2e0da22 commit 8c900c0

File tree

4 files changed

+31
-14
lines changed

4 files changed

+31
-14
lines changed

src/main/java/org/tron/common/overlay/server/MessageQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class MessageQueue {
2626

2727
private volatile boolean sendMsgFlag = false;
2828

29-
private long sendTime;
29+
private volatile long sendTime;
3030

3131
private Thread sendMsgThread;
3232

src/main/java/org/tron/common/overlay/server/P2pHandler.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
import java.util.concurrent.ScheduledExecutorService;
2727
import java.util.concurrent.ScheduledFuture;
2828
import java.util.concurrent.TimeUnit;
29-
import org.slf4j.Logger;
30-
import org.slf4j.LoggerFactory;
3129
import org.springframework.context.annotation.Scope;
3230
import org.springframework.stereotype.Component;
3331
import org.tron.common.overlay.message.DisconnectMessage;
@@ -38,8 +36,6 @@
3836
@Scope("prototype")
3937
public class P2pHandler extends SimpleChannelInboundHandler<P2pMessage> {
4038

41-
private final static Logger logger = LoggerFactory.getLogger("P2pHandler");
42-
4339
private static ScheduledExecutorService pingTimer =
4440
Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "P2pPingTimer"));
4541

@@ -49,9 +45,9 @@ public class P2pHandler extends SimpleChannelInboundHandler<P2pMessage> {
4945

5046
private ScheduledFuture<?> pingTask;
5147

52-
private boolean hasPing = false;
48+
private volatile boolean hasPing = false;
5349

54-
private long sendPingTime;
50+
private volatile long sendPingTime;
5551

5652
private ChannelHandlerContext ctx;
5753

src/main/java/org/tron/common/overlay/server/SyncPool.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import java.util.concurrent.ScheduledExecutorService;
2828
import java.util.concurrent.TimeUnit;
2929
import java.util.function.Predicate;
30+
31+
import com.google.common.cache.Cache;
32+
import com.google.common.cache.CacheBuilder;
3033
import org.slf4j.Logger;
3134
import org.slf4j.LoggerFactory;
3235
import org.springframework.beans.factory.annotation.Autowired;
@@ -36,7 +39,9 @@
3639
import org.tron.common.overlay.discover.Node;
3740
import org.tron.common.overlay.discover.NodeHandler;
3841
import org.tron.common.overlay.discover.NodeManager;
42+
import org.tron.common.utils.Sha256Hash;
3943
import org.tron.core.config.args.Args;
44+
import org.tron.core.net.message.TransactionMessage;
4045
import org.tron.core.net.peer.PeerConnection;
4146
import org.tron.core.net.peer.PeerConnectionDelegate;
4247

@@ -49,6 +54,9 @@ public class SyncPool {
4954

5055
private final List<PeerConnection> activePeers = Collections.synchronizedList(new ArrayList<PeerConnection>());
5156

57+
private Cache<NodeHandler, Long> nodeHandlerCache = CacheBuilder.newBuilder()
58+
.maximumSize(1000).expireAfterWrite(120, TimeUnit.SECONDS).recordStats().build();
59+
5260
@Autowired
5361
private NodeManager nodeManager;
5462

@@ -102,7 +110,10 @@ private void fillUp() {
102110
nodesInUse.add(nodeManager.getPublicHomeNode().getHexId());
103111

104112
List<NodeHandler> newNodes = nodeManager.getNodes(new NodeSelector(nodesInUse), lackSize);
105-
newNodes.forEach(n -> peerClient.connectAsync(n, false));
113+
newNodes.forEach(n -> {
114+
peerClient.connectAsync(n, false);
115+
nodeHandlerCache.put(n, System.currentTimeMillis());
116+
});
106117
}
107118

108119
// for test only
@@ -112,17 +123,17 @@ public void addActivePeers(PeerConnection p) {
112123

113124

114125
synchronized void logActivePeers() {
126+
115127
logger.info("-------- active node {}", nodeManager.dumpActiveNodes().size());
116-
nodeManager.dumpActiveNodes().forEach(handler -> {
128+
nodeManager.dumpActiveNodes().forEach(handler -> {
117129
if (handler.getNode().getPort() == 18888) {
118130
logger.info("address: {}:{}, ID:{} {}",
119-
handler.getNode().getHost(), handler.getNode().getPort(),
120-
handler.getNode().getHexIdShort(), handler.getNodeStatistics().toString());
131+
handler.getNode().getHost(), handler.getNode().getPort(),
132+
handler.getNode().getHexIdShort(), handler.getNodeStatistics().toString());
121133
}
122-
});
134+
});
123135

124-
logger.info("-------- active channel {}, node in user size {}", channelManager.getActivePeers().size(),
125-
channelManager.nodesInUse().size());
136+
logger.info("-------- active channel {}", channelManager.getActivePeers().size());
126137
for (Channel channel: channelManager.getActivePeers()){
127138
logger.info(channel.toString());
128139
}
@@ -203,6 +214,10 @@ public boolean test(NodeHandler handler) {
203214
return false;
204215
}
205216

217+
if (nodeHandlerCache.getIfPresent(handler) != null){
218+
return false;
219+
}
220+
206221
if (handler.getNodeStatistics().getReputation() < 100) {
207222
return false;
208223
}

src/main/java/org/tron/core/net/message/TransactionMessage.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ public TransactionMessage(Transaction trx) {
2020
this.data = trx.toByteArray();
2121
}
2222

23+
@Override
24+
public String toString(){
25+
return new StringBuilder().append(super.toString())
26+
.append("messageId: ").append(super.getMessageId()).toString();
27+
}
28+
2329
@Override
2430
public Class<?> getAnswerMessage() {
2531
return null;

0 commit comments

Comments
 (0)