Skip to content

Commit 3b5ed16

Browse files
authored
feat(net): optimize block processing logic (#5754)
feat(net): optimize block processing logic
1 parent 45a2af0 commit 3b5ed16

File tree

5 files changed

+38
-15
lines changed

5 files changed

+38
-15
lines changed

framework/src/main/java/org/tron/core/net/messagehandler/BlockMsgHandler.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,14 @@ private void check(PeerConnection peer, BlockMessage msg) throws P2pException {
125125

126126
private void processBlock(PeerConnection peer, BlockCapsule block) throws P2pException {
127127
BlockId blockId = block.getBlockId();
128+
boolean flag = tronNetDelegate.validBlock(block);
129+
if (!flag) {
130+
logger.warn("Receive a bad block from {}, {}, {}",
131+
peer.getInetSocketAddress(), blockId.getString(),
132+
Hex.toHexString(block.getWitnessAddress().toByteArray()));
133+
return;
134+
}
135+
128136
if (!tronNetDelegate.containBlock(block.getParentBlockId())) {
129137
logger.warn("Get unlink block {} from {}, head is {}", blockId.getString(),
130138
peer.getInetAddress(), tronNetDelegate.getHeadBlockId().getString());
@@ -138,16 +146,10 @@ private void processBlock(PeerConnection peer, BlockCapsule block) throws P2pExc
138146
return;
139147
}
140148

141-
boolean flag = tronNetDelegate.validBlock(block);
142-
if (flag) {
143-
broadcast(new BlockMessage(block));
144-
}
149+
broadcast(new BlockMessage(block));
145150

146151
try {
147152
tronNetDelegate.processBlock(block, false);
148-
if (!flag) {
149-
broadcast(new BlockMessage(block));
150-
}
151153

152154
witnessProductBlockService.validWitnessProductTwoBlock(block);
153155

framework/src/main/java/org/tron/core/net/messagehandler/PbftDataSyncHandler.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.tron.core.net.messagehandler;
22

3+
import com.google.common.cache.Cache;
4+
import com.google.common.cache.CacheBuilder;
35
import com.google.common.collect.Sets;
46
import com.google.protobuf.ByteString;
57
import com.google.protobuf.InvalidProtocolBufferException;
@@ -8,12 +10,11 @@
810
import java.security.SignatureException;
911
import java.util.ArrayList;
1012
import java.util.List;
11-
import java.util.Map;
1213
import java.util.Set;
1314
import java.util.concurrent.Callable;
14-
import java.util.concurrent.ConcurrentHashMap;
1515
import java.util.concurrent.ExecutorService;
1616
import java.util.concurrent.Future;
17+
import java.util.concurrent.TimeUnit;
1718
import lombok.extern.slf4j.Slf4j;
1819
import org.springframework.beans.factory.annotation.Autowired;
1920
import org.springframework.stereotype.Service;
@@ -37,7 +38,9 @@
3738
@Service
3839
public class PbftDataSyncHandler implements TronMsgHandler, Closeable {
3940

40-
private Map<Long, PbftCommitMessage> pbftCommitMessageCache = new ConcurrentHashMap<>();
41+
private static final Cache<Long, PbftCommitMessage> pbftCommitMessageCache =
42+
CacheBuilder.newBuilder().initialCapacity(100).maximumSize(200)
43+
.expireAfterWrite(10, TimeUnit.MINUTES).build();
4144

4245
private final String esName = "valid-header-pbft-sign";
4346

@@ -51,6 +54,9 @@ public class PbftDataSyncHandler implements TronMsgHandler, Closeable {
5154
public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException {
5255
PbftCommitMessage pbftCommitMessage = (PbftCommitMessage) msg;
5356
try {
57+
if (!chainBaseManager.getDynamicPropertiesStore().allowPBFT()) {
58+
return;
59+
}
5460
Raw raw = Raw.parseFrom(pbftCommitMessage.getPBFTCommitResult().getData());
5561
pbftCommitMessageCache.put(raw.getViewN(), pbftCommitMessage);
5662
} catch (InvalidProtocolBufferException e) {
@@ -64,7 +70,8 @@ public void processPBFTCommitData(BlockCapsule block) {
6470
return;
6571
}
6672
long epoch = 0;
67-
PbftCommitMessage pbftCommitMessage = pbftCommitMessageCache.remove(block.getNum());
73+
PbftCommitMessage pbftCommitMessage = pbftCommitMessageCache.getIfPresent(block.getNum());
74+
pbftCommitMessageCache.invalidate(block.getNum());
6875
long maintenanceTimeInterval = chainBaseManager.getDynamicPropertiesStore()
6976
.getMaintenanceTimeInterval();
7077
if (pbftCommitMessage == null) {
@@ -75,7 +82,8 @@ public void processPBFTCommitData(BlockCapsule block) {
7582
Raw raw = Raw.parseFrom(pbftCommitMessage.getPBFTCommitResult().getData());
7683
epoch = raw.getEpoch();
7784
}
78-
pbftCommitMessage = pbftCommitMessageCache.remove(epoch);
85+
pbftCommitMessage = pbftCommitMessageCache.getIfPresent(epoch);
86+
pbftCommitMessageCache.invalidate(epoch);
7987
if (pbftCommitMessage != null) {
8088
processPBFTCommitMessage(pbftCommitMessage);
8189
}

framework/src/main/java/org/tron/core/net/messagehandler/PbftMsgHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ public class PbftMsgHandler {
3333
private TronNetDelegate tronNetDelegate;
3434

3535
public void processMessage(PeerConnection peer, PbftMessage msg) throws Exception {
36+
if (!tronNetDelegate.allowPBFT()) {
37+
return;
38+
}
3639
if (Param.getInstance().getPbftInterface().isSyncing()) {
3740
return;
3841
}

framework/src/test/java/org/tron/core/net/messagehandler/PbftDataSyncHandlerTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,6 @@ public void testProcessMessage() throws Exception {
3232
Protocol.PBFTMessage.Raw raw = rawBuilder.build();
3333
PbftSignCapsule pbftSignCapsule = new PbftSignCapsule(raw.toByteString(), new ArrayList<>());
3434
PbftCommitMessage pbftCommitMessage = new PbftCommitMessage(pbftSignCapsule);
35-
pbftDataSyncHandler.processMessage(null, pbftCommitMessage);
36-
Assert.assertEquals(Protocol.PBFTMessage.Raw.parseFrom(
37-
pbftCommitMessage.getPBFTCommitResult().getData()).getViewN(), 1);
3835

3936
DynamicPropertiesStore dynamicPropertiesStore = Mockito.mock(DynamicPropertiesStore.class);
4037
PbftSignDataStore pbftSignDataStore = Mockito.mock(PbftSignDataStore.class);
@@ -48,6 +45,10 @@ public void testProcessMessage() throws Exception {
4845
field.setAccessible(true);
4946
field.set(pbftDataSyncHandler, chainBaseManager);
5047

48+
pbftDataSyncHandler.processMessage(null, pbftCommitMessage);
49+
Assert.assertEquals(Protocol.PBFTMessage.Raw.parseFrom(
50+
pbftCommitMessage.getPBFTCommitResult().getData()).getViewN(), 1);
51+
5152
pbftDataSyncHandler.processPBFTCommitData(blockCapsule);
5253
Field field1 = PbftDataSyncHandler.class.getDeclaredField("pbftCommitMessageCache");
5354
field1.setAccessible(true);

framework/src/test/java/org/tron/core/net/messagehandler/PbftMsgHandlerTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.tron.core.net.message.MessageTypes;
3535
import org.tron.core.net.peer.PeerConnection;
3636
import org.tron.core.net.peer.PeerManager;
37+
import org.tron.core.store.DynamicPropertiesStore;
3738
import org.tron.p2p.P2pConfig;
3839
import org.tron.p2p.base.Parameter;
3940
import org.tron.p2p.connection.Channel;
@@ -116,6 +117,14 @@ public void testPbft() throws Exception {
116117
Assert.assertEquals(P2pException.TypeEnum.BAD_MESSAGE, e.getType());
117118
}
118119

120+
DynamicPropertiesStore dynamicPropertiesStore = context.getBean(DynamicPropertiesStore.class);
121+
dynamicPropertiesStore.saveAllowPBFT(1);
122+
try {
123+
context.getBean(PbftMsgHandler.class).processMessage(peer, pbftMessage);
124+
} catch (P2pException e) {
125+
Assert.assertEquals(P2pException.TypeEnum.BAD_MESSAGE, e.getType());
126+
}
127+
119128
Assert.assertEquals(1, PeerManager.getPeers().size());
120129
}
121130
}

0 commit comments

Comments
 (0)