@@ -476,25 +476,19 @@ private void consumerAdvObjToFetch() {
476476 long now = Time .getCurrentMillis ();
477477 advObjToFetch .values ().stream ().sorted (PriorItem ::compareTo ).forEach (idToFetch -> {
478478 Sha256Hash hash = idToFetch .getHash ();
479- if ( (idToFetch .getType ().equals (InventoryType .TRX ) && TrxCache .getIfPresent (hash ) != null ) ||
480- (idToFetch .getType ().equals (InventoryType .BLOCK ) && BlockCache .getIfPresent (hash ) != null ) ){
481- logger .info ("{} {} Already exist." , idToFetch .getType (), hash );
482- advObjToFetch .remove (hash );
483- return ;
484- }
485479 if (idToFetch .getTime () < now - MSG_CACHE_DURATION_IN_BLOCKS * BLOCK_PRODUCED_INTERVAL ) {
486480 logger .info ("This obj is too late to fetch: " + idToFetch );
487481 advObjToFetch .remove (hash );
488- }else {
489- filterActivePeer .stream ()
490- .filter (peer -> peer .getAdvObjSpreadToUs ().containsKey (hash ) && sendPackage .getSize (peer ) < MAX_TRX_PER_PEER )
491- .sorted (Comparator .comparingInt (peer -> sendPackage .getSize (peer )))
492- .findFirst ().ifPresent (peer -> {
493- sendPackage .add (idToFetch , peer );
494- peer .getAdvObjWeRequested ().put (idToFetch .getItem (), now );
495- advObjToFetch .remove (hash );
496- });
482+ return ;
497483 }
484+ filterActivePeer .stream ()
485+ .filter (peer -> peer .getAdvObjSpreadToUs ().containsKey (hash ) && sendPackage .getSize (peer ) < MAX_TRX_PER_PEER )
486+ .sorted (Comparator .comparingInt (peer -> sendPackage .getSize (peer )))
487+ .findFirst ().ifPresent (peer -> {
488+ sendPackage .add (idToFetch , peer );
489+ peer .getAdvObjWeRequested ().put (idToFetch .getItem (), now );
490+ advObjToFetch .remove (hash );
491+ });
498492 });
499493
500494 sendPackage .sendFetch ();
@@ -653,8 +647,7 @@ public synchronized void disconnectInactive() {
653647
654648 private void onHandleInventoryMessage (PeerConnection peer , InventoryMessage msg ) {
655649 for (Sha256Hash id : msg .getHashList ()){
656- if ( (msg .getInventoryType ().equals (InventoryType .TRX ) && TrxCache .getIfPresent (id ) != null ) ||
657- (msg .getInventoryType ().equals (InventoryType .BLOCK ) && BlockCache .getIfPresent (id ) != null ) ){
650+ if (msg .getInventoryType ().equals (InventoryType .TRX ) && TrxCache .getIfPresent (id ) != null ) {
658651 logger .info ("{} {} from peer {} Already exist." , msg .getInventoryType (), id , peer .getNode ().getHost ());
659652 continue ;
660653 }
@@ -859,7 +852,18 @@ private void cleanUpSyncPeer(PeerConnection peer, ReasonCode reasonCode) {
859852 disconnectPeer (peer , reasonCode );
860853 }
861854
855+ synchronized boolean isExist (TransactionMessage trxMsg ){
856+ if (TrxCache .getIfPresent (trxMsg .getMessageId ()) != null ){
857+ return true ;
858+ }
859+ TrxCache .put (trxMsg .getMessageId (), trxMsg );
860+ return false ;
861+ }
862+
862863 private void onHandleTransactionMessage (PeerConnection peer , TransactionMessage trxMsg ) {
864+ if (isExist (trxMsg )){
865+ return ;
866+ }
863867 //logger.info("on handle transaction message");
864868 try {
865869 Item item = new Item (trxMsg .getMessageId (), InventoryType .TRX );
0 commit comments