@@ -29,7 +29,11 @@ import (
2929
3030const DefaultPointerPrefixLength = 14
3131
32- var log = logging .MustGetLogger ("retriever" )
32+ var (
33+ // Initialize a clear pointerList for the DHT on start
34+ pointerList = []string {}
35+ log = logging .MustGetLogger ("retriever" )
36+ )
3337
3438type MRConfig struct {
3539 Db repo.Datastore
@@ -66,6 +70,20 @@ type offlineMessage struct {
6670 env pb.Envelope
6771}
6872
73+ func stringInSlice (str string , list []string ) bool {
74+ for _ , v := range list {
75+ if v == str {
76+ return true
77+ }
78+ }
79+ return false
80+ }
81+
82+ // Reset on startup
83+ func (m * MessageRetriever ) ResetPointerList () {
84+ pointerList = []string {}
85+ }
86+
6987func NewMessageRetriever (cfg MRConfig ) * MessageRetriever {
7088 var client * http.Client
7189 if cfg .Dialer != nil {
@@ -100,8 +118,8 @@ func (m *MessageRetriever) Run() {
100118 peers := time .NewTicker (time .Minute )
101119 defer dht .Stop ()
102120 defer peers .Stop ()
103- go m .fetchPointersFromDHT ()
104121 go m .fetchPointersFromPushNodes ()
122+ go m .fetchPointersFromDHT ()
105123 for {
106124 select {
107125 case <- dht .C :
@@ -159,7 +177,8 @@ func (m *MessageRetriever) downloadMessages(peerOut chan ps.PeerInfo) {
159177 inFlight := make (map [string ]bool )
160178 // Iterate over the pointers, adding 1 to the waitgroup for each pointer found
161179 for p := range peerOut {
162- if len (p .Addrs ) > 0 && ! m .db .OfflineMessages ().Has (p .Addrs [0 ].String ()) && ! inFlight [p .Addrs [0 ].String ()] {
180+ if len (p .Addrs ) > 0 && ! m .db .OfflineMessages ().Has (p .Addrs [0 ].String ()) && ! stringInSlice (p .Addrs [0 ].String (), pointerList ) && ! inFlight [p .Addrs [0 ].String ()] {
181+ pointerList = append (pointerList , p .Addrs [0 ].String ())
163182 inFlight [p .Addrs [0 ].String ()] = true
164183 log .Debugf ("Found pointer with location %s" , p .Addrs [0 ].String ())
165184 // IPFS
0 commit comments