@@ -476,25 +476,19 @@ private void consumerAdvObjToFetch() {
476476 long now = Time .getCurrentMillis ();
477477 advObjToFetch .values ().stream ().sorted (PriorItem ::compareTo ).forEach (idToFetch -> {
478478 Sha256Hash hash = idToFetch .getHash ();
479- if ( (idToFetch .getType ().equals (InventoryType .TRX ) && TrxCache .getIfPresent (hash ) != null ) ||
480- (idToFetch .getType ().equals (InventoryType .BLOCK ) && BlockCache .getIfPresent (hash ) != null ) ){
481- logger .info ("{} {} Already exist." , idToFetch .getType (), hash );
482- advObjToFetch .remove (hash );
483- return ;
484- }
485479 if (idToFetch .getTime () < now - MSG_CACHE_DURATION_IN_BLOCKS * BLOCK_PRODUCED_INTERVAL ) {
486480 logger .info ("This obj is too late to fetch: " + idToFetch );
487481 advObjToFetch .remove (hash );
488- }else {
489- filterActivePeer .stream ()
490- .filter (peer -> peer .getAdvObjSpreadToUs ().containsKey (hash ) && sendPackage .getSize (peer ) < MAX_TRX_PER_PEER )
491- .sorted (Comparator .comparingInt (peer -> sendPackage .getSize (peer )))
492- .findFirst ().ifPresent (peer -> {
493- sendPackage .add (idToFetch , peer );
494- peer .getAdvObjWeRequested ().put (idToFetch .getItem (), now );
495- advObjToFetch .remove (hash );
496- });
482+ return ;
497483 }
484+ filterActivePeer .stream ()
485+ .filter (peer -> peer .getAdvObjSpreadToUs ().containsKey (hash ) && sendPackage .getSize (peer ) < MAX_TRX_PER_PEER )
486+ .sorted (Comparator .comparingInt (peer -> sendPackage .getSize (peer )))
487+ .findFirst ().ifPresent (peer -> {
488+ sendPackage .add (idToFetch , peer );
489+ peer .getAdvObjWeRequested ().put (idToFetch .getItem (), now );
490+ advObjToFetch .remove (hash );
491+ });
498492 });
499493
500494 sendPackage .sendFetch ();
@@ -648,8 +642,7 @@ public synchronized void disconnectInactive() {
648642
649643 private void onHandleInventoryMessage (PeerConnection peer , InventoryMessage msg ) {
650644 for (Sha256Hash id : msg .getHashList ()){
651- if ( (msg .getInventoryType ().equals (InventoryType .TRX ) && TrxCache .getIfPresent (id ) != null ) ||
652- (msg .getInventoryType ().equals (InventoryType .BLOCK ) && BlockCache .getIfPresent (id ) != null ) ){
645+ if (msg .getInventoryType ().equals (InventoryType .TRX ) && TrxCache .getIfPresent (id ) != null ) {
653646 logger .info ("{} {} from peer {} Already exist." , msg .getInventoryType (), id , peer .getNode ().getHost ());
654647 continue ;
655648 }
@@ -854,17 +847,27 @@ private void cleanUpSyncPeer(PeerConnection peer, ReasonCode reasonCode) {
854847 disconnectPeer (peer , reasonCode );
855848 }
856849
850+ synchronized boolean isTrxExist (TransactionMessage trxMsg ){
851+ if (TrxCache .getIfPresent (trxMsg .getMessageId ()) != null ){
852+ return true ;
853+ }
854+ TrxCache .put (trxMsg .getMessageId (), trxMsg );
855+ return false ;
856+ }
857+
857858 private void onHandleTransactionMessage (PeerConnection peer , TransactionMessage trxMsg ) {
858- //logger.info("on handle transaction message");
859859 try {
860860 Item item = new Item (trxMsg .getMessageId (), InventoryType .TRX );
861861 if (!peer .getAdvObjWeRequested ().containsKey (item )) {
862862 throw new TraitorPeerException ("We don't send fetch request to" + peer );
863- } else {
864- peer .getAdvObjWeRequested ().remove (item );
865- del .handleTransaction (trxMsg .getTransactionCapsule ());
866- broadcast (trxMsg );
867863 }
864+ peer .getAdvObjWeRequested ().remove (item );
865+ if (isTrxExist (trxMsg )){
866+ logger .info ("Trx {} from Peer {} already processed." , trxMsg .getMessageId (), peer .getNode ().getHost ());
867+ return ;
868+ }
869+ del .handleTransaction (trxMsg .getTransactionCapsule ());
870+ broadcast (trxMsg );
868871 } catch (TraitorPeerException e ) {
869872 logger .error (e .getMessage ());
870873 banTraitorPeer (peer , ReasonCode .BAD_PROTOCOL );
0 commit comments