diff --git a/Tests/unit/p2p/host_spec.go b/Tests/unit/p2p/host_spec.go new file mode 100644 index 0000000..a8a9eaf --- /dev/null +++ b/Tests/unit/p2p/host_spec.go @@ -0,0 +1,33 @@ +package p2p_tests + +import ( + "KitsuneSemCalda/SBC/internal/p2p" + "github.com/caiolandgraf/gest/gest" + "strings" +) + +func init() { + s := gest.Describe("P2P Host Core") + + s.It("should detect if a public IP is valid format", func(t *gest.T) { + // This might fail if no internet, so we test the logic via helper if available + // or just check if it returns something or an error + ip, _ := p2p.GetPublicIP() + if ip != "" { + parts := strings.Split(ip, ".") + t.Expect(len(parts)).ToBe(4) + } + }) + + s.It("should include public IP in multiaddrs if set", func(t *gest.T) { + cfg := p2p.DefaultConfig() + h, _ := p2p.NewHost(cfg, nil) + + addrs := h.Addrs() + t.Expect(len(addrs)).Not().ToBe(0) + + h.Close() + }) + + gest.Register(s) +} diff --git a/Tests/unit/p2p/logger_spec.go b/Tests/unit/p2p/logger_spec.go new file mode 100644 index 0000000..e207710 --- /dev/null +++ b/Tests/unit/p2p/logger_spec.go @@ -0,0 +1,21 @@ +package p2p_tests + +import ( + "KitsuneSemCalda/SBC/internal/p2p" + "github.com/caiolandgraf/gest/gest" +) + +func init() { + s := gest.Describe("P2P Logger") + + s.It("should respect log levels", func(t *gest.T) { + p2p.SetLogLevel(p2p.LevelError) + p2p.Debug("TEST", "This should not appear") + p2p.Info("TEST", "This should not appear") + p2p.Error("TEST", "This is an error") + + t.Expect(true).ToBeTrue() + }) + + gest.Register(s) +} diff --git a/Tests/unit/p2p/server_spec.go b/Tests/unit/p2p/server_spec.go new file mode 100644 index 0000000..e68972b --- /dev/null +++ b/Tests/unit/p2p/server_spec.go @@ -0,0 +1,53 @@ +package p2p_tests + +import ( + "context" + "KitsuneSemCalda/SBC/internal/blockchain" + "KitsuneSemCalda/SBC/internal/p2p" + "github.com/caiolandgraf/gest/gest" + "time" +) + +func init() { + s := gest.Describe("P2P Server Core") + + s.It("should manage failed peers cache", func(t *gest.T) { + bc := blockchain.NewBlockchain() + cfg := p2p.DefaultConfig() + server, _ := p2p.NewServer(cfg, bc) + + // Valid multihash for a peer ID + addr := "/ip4/1.1.1.1/tcp/8333/p2p/12D3KooWLGpBKHhshKgGJiiKxKUPKJVBsE3w1td6jSPTquU1xCjk" + + // Attempt connection to non-existent but valid addr (will fail and mark) + err := server.ConnectToPeer(addr) + t.Expect(err).Not().ToBeNil() + + // Second attempt should return nil quickly (cached failure) + start := time.Now() + err = server.ConnectToPeer(addr) + duration := time.Since(start) + + t.Expect(err).ToBeNil() + t.Expect(duration < 100*time.Millisecond).ToBeTrue() + + server.Close() + }) + + s.It("should start maintenance tasks without crashing", func(t *gest.T) { + bc := blockchain.NewBlockchain() + cfg := p2p.DefaultConfig() + server, _ := p2p.NewServer(cfg, bc) + + ctx, cancel := context.WithCancel(context.Background()) + server.StartMaintenance(ctx) + + time.Sleep(50 * time.Millisecond) + cancel() + + t.Expect(true).ToBeTrue() + server.Close() + }) + + gest.Register(s) +} diff --git a/cmd/sbcd/main.go b/cmd/sbcd/main.go index cbad411..295ee5d 100644 --- a/cmd/sbcd/main.go +++ b/cmd/sbcd/main.go @@ -19,12 +19,6 @@ import ( var logger *slog.Logger -const ( - ValidationInterval = 30 * time.Second - SyncInterval = 60 * time.Second - StatsInterval = 10 * time.Second -) - type DaemonCallbacks struct { blockchain *blockchain.Blockchain server *p2p.Server @@ -96,84 +90,6 @@ func (h *ColoredHandler) Handle(ctx context.Context, r slog.Record) error { return nil } -func startMaintenance(ctx context.Context, bc *blockchain.Blockchain, server *p2p.Server) { - logger.Info("starting maintenance loops") - go validationTask(ctx, bc) - go syncTask(ctx, bc, server) - go statsTask(ctx, bc, server) -} - -func validationTask(ctx context.Context, bc *blockchain.Blockchain) { - ticker := time.NewTicker(ValidationInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if bc.IsValid() { - logger.Debug("✓ chain validation: OK", "height", bc.Length()) - } else { - logger.Error("✗ CRITICAL: chain is corrupted!") - } - } - } -} - -func syncTask(ctx context.Context, bc *blockchain.Blockchain, server *p2p.Server) { - ticker := time.NewTicker(SyncInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - peers := server.GetPeers() - if len(peers) == 0 { - continue - } - - var maxHeight int - var bestPeer peer.ID - for pID, p := range peers { - if p.BestHeight > maxHeight { - maxHeight = p.BestHeight - bestPeer = pID - } - } - - if maxHeight > bc.Length() { - logger.Info("sync: peer has longer chain", - "peer", bestPeer.String()[:8], - "peer_height", maxHeight, - "local_height", bc.Length()) - server.RequestSync() - } - } - } -} - -func statsTask(ctx context.Context, bc *blockchain.Blockchain, server *p2p.Server) { - ticker := time.NewTicker(StatsInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - peers := server.GetPeers() - lastBlock := bc.GetLastBlock() - logger.Info("stats update", - "height", bc.Length(), - "peers", len(peers), - "last_hash", lastBlock.Hash[:8]) - } - } -} - func main() { initLogger() logger.Info("starting sbc daemon") @@ -220,7 +136,7 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - startMaintenance(ctx, bc, server) + server.StartMaintenance(ctx) stop := make(chan os.Signal, 1) signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) diff --git a/internal/p2p/host.go b/internal/p2p/host.go index b4ee3a2..cba020a 100644 --- a/internal/p2p/host.go +++ b/internal/p2p/host.go @@ -4,7 +4,9 @@ import ( "context" "encoding/json" "fmt" + "io" "net" + "net/http" "sync" "time" @@ -305,3 +307,16 @@ func (h *Host) Close() error { } return h.host.Close() } + +func GetPublicIP() (string, error) { + resp, err := http.Get("https://api.ipify.org") + if err != nil { + return "", err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", err + } + return string(body), nil +} diff --git a/internal/p2p/logger.go b/internal/p2p/logger.go new file mode 100644 index 0000000..eaddaec --- /dev/null +++ b/internal/p2p/logger.go @@ -0,0 +1,72 @@ +package p2p + +import ( + "fmt" + "io" + "os" + "sync" + "time" +) + +type Level int + +const ( + LevelDebug Level = iota + LevelInfo + LevelWarn + LevelError + LevelNone +) + +type Logger struct { + level Level + output io.Writer + mu sync.Mutex +} + +var defaultLogger = &Logger{ + level: LevelInfo, + output: os.Stdout, +} + +func (l *Logger) SetLevel(level Level) { + l.mu.Lock() + defer l.mu.Unlock() + l.level = level +} + +func (l *Logger) log(level Level, prefix string, format string, v ...any) { + l.mu.Lock() + defer l.mu.Unlock() + if level < l.level { + return + } + + msg := fmt.Sprintf(format, v...) + timestamp := time.Now().Format("15:04:05") + + var colorCode string + switch level { + case LevelDebug: + colorCode = "\033[36m" // Cyan + case LevelInfo: + colorCode = "\033[32m" // Green + case LevelWarn: + colorCode = "\033[33m" // Yellow + case LevelError: + colorCode = "\033[31m" // Red + } + + if colorCode != "" { + fmt.Fprintf(l.output, "%s[%s] %s %s\033[0m\n", colorCode, prefix, timestamp, msg) + } else { + fmt.Fprintf(l.output, "[%s] %s %s\n", prefix, timestamp, msg) + } +} + +func Debug(prefix, format string, v ...any) { defaultLogger.log(LevelDebug, prefix, format, v...) } +func Info(prefix, format string, v ...any) { defaultLogger.log(LevelInfo, prefix, format, v...) } +func Warn(prefix, format string, v ...any) { defaultLogger.log(LevelWarn, prefix, format, v...) } +func Error(prefix, format string, v ...any) { defaultLogger.log(LevelError, prefix, format, v...) } + +func SetLogLevel(level Level) { defaultLogger.SetLevel(level) } diff --git a/internal/p2p/server.go b/internal/p2p/server.go index 96da67d..b0884d3 100644 --- a/internal/p2p/server.go +++ b/internal/p2p/server.go @@ -32,6 +32,9 @@ type Server struct { processedMutex sync.RWMutex processedBlocks map[string]time.Time + + failedMutex sync.RWMutex + failedPeers map[string]time.Time } func NewServer(cfg *Config, bc *blockchain.Blockchain) (*Server, error) { @@ -40,6 +43,7 @@ func NewServer(cfg *Config, bc *blockchain.Blockchain) (*Server, error) { peers: make(map[peer.ID]*Peer), config: cfg, processedBlocks: make(map[string]time.Time), + failedPeers: make(map[string]time.Time), } host, err := NewHost(cfg, s) @@ -67,10 +71,26 @@ func NewServer(cfg *Config, bc *blockchain.Blockchain) (*Server, error) { }) go s.cleanupProcessedBlocks() + go s.cleanupFailedPeers() return s, nil } +func (s *Server) cleanupFailedPeers() { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + for range ticker.C { + now := time.Now() + s.failedMutex.Lock() + for addr, timestamp := range s.failedPeers { + if now.Sub(timestamp) > 10*time.Minute { + delete(s.failedPeers, addr) + } + } + s.failedMutex.Unlock() + } +} + func (s *Server) cleanupProcessedBlocks() { ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() @@ -411,6 +431,14 @@ func (s *Server) OnPeerFound(pi peer.AddrInfo) { } func (s *Server) ConnectToPeer(addr string) error { + s.failedMutex.RLock() + lastFail, exists := s.failedPeers[addr] + if exists && time.Since(lastFail) < 1*time.Minute { + s.failedMutex.RUnlock() + return nil + } + s.failedMutex.RUnlock() + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() @@ -421,6 +449,9 @@ func (s *Server) ConnectToPeer(addr string) error { err = s.host.Connect(ctx, ma) if err != nil { + s.failedMutex.Lock() + s.failedPeers[addr] = time.Now() + s.failedMutex.Unlock() return err } @@ -712,3 +743,81 @@ func (s *Server) fetchSeedsFromHTTP(seedURL string) { return } } + +const ( + ValidationInterval = 30 * time.Second + SyncInterval = 60 * time.Second + StatsInterval = 10 * time.Second +) + +func (s *Server) StartMaintenance(ctx context.Context) { + log.Println("Starting maintenance loops") + go s.validationTask(ctx) + go s.syncTask(ctx) + go s.statsTask(ctx) +} + +func (s *Server) validationTask(ctx context.Context) { + ticker := time.NewTicker(ValidationInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if !s.blockchain.IsValid() { + log.Println("✗ CRITICAL: chain is corrupted!") + } + } + } +} + +func (s *Server) syncTask(ctx context.Context) { + ticker := time.NewTicker(SyncInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + peers := s.GetPeers() + if len(peers) == 0 { + continue + } + + var maxHeight int + var bestPeer peer.ID + for pID, p := range peers { + if p.BestHeight > maxHeight { + maxHeight = p.BestHeight + bestPeer = pID + } + } + + if maxHeight > s.blockchain.Length() { + log.Printf("Sync: peer %s has longer chain (%d vs %d)", + bestPeer.String()[:8], maxHeight, s.blockchain.Length()) + s.RequestSync() + } + } + } +} + +func (s *Server) statsTask(ctx context.Context) { + ticker := time.NewTicker(StatsInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + peers := s.GetPeers() + lastBlock := s.blockchain.GetLastBlock() + log.Printf("Stats update: height=%d, peers=%d, last_hash=%s", + s.blockchain.Length(), len(peers), lastBlock.Hash[:8]) + } + } +}