Skip to content

Commit ffb975e

Browse files
authored
Merge pull request #749 from tronprotocol/sync_msg
sync: mdf fetch item logic
2 parents d149c4e + 5dbd94d commit ffb975e

File tree

1 file changed

+22
-30
lines changed

1 file changed

+22
-30
lines changed

src/main/java/org/tron/core/net/node/NodeImpl.java

Lines changed: 22 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ public void setPool(SyncPool pool) {
335335
public void broadcast(Message msg) {
336336
InventoryType type;
337337
if (msg instanceof BlockMessage) {
338-
logger.info("Ready to broadcast a block, Its hash is " + msg.getMessageId());
338+
logger.info("Ready to broadcast block {}", ((BlockMessage) msg).getBlockId());
339339
freshBlockId.offer(((BlockMessage) msg).getBlockId());
340340
BlockCache.put(msg.getMessageId(), (BlockMessage) msg);
341341
type = InventoryType.BLOCK;
@@ -473,25 +473,22 @@ private void consumerAdvObjToFetch() {
473473
}
474474
}
475475
InvToSend sendPackage = new InvToSend();
476-
//AtomicLong batchFecthResponseSize = new AtomicLong(0);
477-
478-
advObjToFetch.values().stream()
479-
.sorted(PriorItem::compareTo)
480-
.forEach(idToFetch ->
481-
filterActivePeer.stream()
482-
.filter(peer -> peer.getAdvObjSpreadToUs().containsKey(idToFetch.getHash())
483-
&& sendPackage.getSize(peer) < MAX_TRX_PER_PEER)
484-
.sorted(Comparator.comparingInt(peer -> sendPackage.getSize(peer)))
485-
.findFirst().ifPresent(peer -> {
486-
long now = Time.getCurrentMillis();
487-
if (idToFetch.getTime() > now - MSG_CACHE_DURATION_IN_BLOCKS * BLOCK_PRODUCED_INTERVAL) {
476+
long now = Time.getCurrentMillis();
477+
advObjToFetch.values().stream().sorted(PriorItem::compareTo).forEach(idToFetch -> {
478+
if (idToFetch.getTime() < now - MSG_CACHE_DURATION_IN_BLOCKS * BLOCK_PRODUCED_INTERVAL) {
479+
logger.info("This obj is too late to fetch: " + idToFetch);
480+
advObjToFetch.remove(idToFetch.getHash());
481+
}else {
482+
filterActivePeer.stream()
483+
.filter(peer -> peer.getAdvObjSpreadToUs().containsKey(idToFetch.getHash()) && sendPackage.getSize(peer) < MAX_TRX_PER_PEER)
484+
.sorted(Comparator.comparingInt(peer -> sendPackage.getSize(peer)))
485+
.findFirst().ifPresent(peer -> {
488486
sendPackage.add(idToFetch, peer);
489487
peer.getAdvObjWeRequested().put(idToFetch.getItem(), now);
490-
} else {
491-
logger.info("This obj is too late to fetch: " + idToFetch);
492-
}
493-
advObjToFetch.remove(idToFetch.getHash());
494-
}));
488+
advObjToFetch.remove(idToFetch.getHash());
489+
});
490+
}
491+
});
495492

496493
sendPackage.sendFetch();
497494
}
@@ -548,22 +545,17 @@ private synchronized void handleSyncBlock() {
548545
pool.forEach(msg -> {
549546
final boolean[] isFound = {false};
550547
getActivePeer().stream()
551-
.filter(peer ->
552-
!peer.getSyncBlockToFetch().isEmpty()
553-
&& peer.getSyncBlockToFetch().peek().equals(msg.getBlockId()))
554-
.forEach(peer -> {
555-
peer.getSyncBlockToFetch().pop();
556-
peer.getBlockInProc().add(msg.getBlockId());
557-
isFound[0] = true;
558-
});
548+
.filter(peer -> !peer.getSyncBlockToFetch().isEmpty() && peer.getSyncBlockToFetch().peek().equals(msg.getBlockId()))
549+
.forEach(peer -> {
550+
peer.getSyncBlockToFetch().pop();
551+
peer.getBlockInProc().add(msg.getBlockId());
552+
isFound[0] = true;
553+
});
559554

560555
if (isFound[0]) {
561556
if (!freshBlockId.contains(msg.getBlockId())) {
562557
blockWaitToProc.remove(msg);
563-
//TODO: blockWaitToProc and handle thread.
564-
BlockCapsule block = msg.getBlockCapsule();
565-
//handleBackLogBlocksPool.execute(() -> processSyncBlock(block));
566-
processSyncBlock(block);
558+
processSyncBlock(msg.getBlockCapsule());
567559
isBlockProc[0] = true;
568560
}
569561
}

0 commit comments

Comments
 (0)