Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion errors/error.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion model/model.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion services/alert/alert_api/alert_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion services/alert/alert_api/alert_api_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion services/asset/asset_api/asset_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion services/blockchain/blockchain_api/blockchain_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion services/blockvalidation/get_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (u *Server) fetchAndStoreSubtree(ctx context.Context, block *model.Block, s
// Only report success after the entire block is validated
// This prevents inflating reputation for peers providing invalid chains
// if u.p2pClient != nil {
// if err := u.p2pClient.ReportValidSubtree(ctx, peerID, subtreeHash.String()); err != nil {
// if err := u.p2pClient.ReportValidSubtreeHandler(ctx, peerID, subtreeHash.String()); err != nil {
// u.logger.Warnf("[fetchAndStoreSubtree][%s] failed to report valid subtree: %v", subtreeHash.String(), err)
// }
// }
Expand Down
2 changes: 1 addition & 1 deletion services/legacy/peer_api/peer_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion services/legacy/peer_api/peer_api_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions services/p2p/Client.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,33 @@ func (c *Client) ReportValidSubtree(ctx context.Context, peerID string, subtreeH
return nil
}

// ReportInvalidSubtree reports that a subtree was unsuccessfully fetched and validated from a peer.
// Parameters:
// - ctx: Context for the operation
// - peerID: Peer ID that provided the subtree
// - subtreeHash: Hash of the validated subtree
//
// Returns:
// - error: Any error encountered during the operation
func (c *Client) ReportInvalidSubtree(ctx context.Context, peerID string, subtreeHash string, reason string) error {
req := &p2p_api.ReportInvalidSubtreeRequest{
PeerId: peerID,
SubtreeHash: subtreeHash,
Reason: reason,
}

resp, err := c.client.ReportInvalidSubtree(ctx, req)
if err != nil {
return err
}

if resp != nil && !resp.Success {
return errors.NewServiceError("failed to report valid subtree: %s", resp.Message)
}

return nil
}

// ReportValidBlock reports that a block was successfully received and validated from a peer.
// Parameters:
// - ctx: Context for the operation
Expand Down
8 changes: 8 additions & 0 deletions services/p2p/Client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type MockPeerServiceClient struct {
UpdateCatchupErrorFunc func(ctx context.Context, in *p2p_api.UpdateCatchupErrorRequest, opts ...grpc.CallOption) (*p2p_api.UpdateCatchupErrorResponse, error)
ResetReputationFunc func(ctx context.Context, in *p2p_api.ResetReputationRequest, opts ...grpc.CallOption) (*p2p_api.ResetReputationResponse, error)
GetPeersForCatchupFunc func(ctx context.Context, in *p2p_api.GetPeersForCatchupRequest, opts ...grpc.CallOption) (*p2p_api.GetPeersForCatchupResponse, error)
ReportInvalidSubtreeFunc func(ctx context.Context, in *p2p_api.ReportInvalidSubtreeRequest, opts ...grpc.CallOption) (*p2p_api.ReportInvalidSubtreeResponse, error)
ReportValidSubtreeFunc func(ctx context.Context, in *p2p_api.ReportValidSubtreeRequest, opts ...grpc.CallOption) (*p2p_api.ReportValidSubtreeResponse, error)
ReportValidBlockFunc func(ctx context.Context, in *p2p_api.ReportValidBlockRequest, opts ...grpc.CallOption) (*p2p_api.ReportValidBlockResponse, error)
IsPeerMaliciousFunc func(ctx context.Context, in *p2p_api.IsPeerMaliciousRequest, opts ...grpc.CallOption) (*p2p_api.IsPeerMaliciousResponse, error)
Expand Down Expand Up @@ -163,6 +164,13 @@ func (m *MockPeerServiceClient) GetPeersForCatchup(ctx context.Context, in *p2p_
return &p2p_api.GetPeersForCatchupResponse{Peers: []*p2p_api.PeerInfoForCatchup{}}, nil
}

func (m *MockPeerServiceClient) ReportInvalidSubtree(ctx context.Context, in *p2p_api.ReportInvalidSubtreeRequest, opts ...grpc.CallOption) (*p2p_api.ReportInvalidSubtreeResponse, error) {
if m.ReportInvalidSubtreeFunc != nil {
return m.ReportInvalidSubtreeFunc(ctx, in, opts...)
}
return &p2p_api.ReportInvalidSubtreeResponse{Success: true}, nil
}

func (m *MockPeerServiceClient) ReportValidSubtree(ctx context.Context, in *p2p_api.ReportValidSubtreeRequest, opts ...grpc.CallOption) (*p2p_api.ReportValidSubtreeResponse, error) {
if m.ReportValidSubtreeFunc != nil {
return m.ReportValidSubtreeFunc(ctx, in, opts...)
Expand Down
4 changes: 4 additions & 0 deletions services/p2p/Interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ type ClientI interface {
// This increases the peer's reputation score for providing valid data.
ReportValidSubtree(ctx context.Context, peerID string, subtreeHash string) error

// ReportInvalidSubtree reports that a subtree was unsuccessfully fetched and validated from a peer.
// This decreases the peer's reputation score for providing valid data.
ReportInvalidSubtree(ctx context.Context, peerID string, subtreeHash string, reason string) error

// ReportValidBlock reports that a block was successfully received and validated from a peer.
// This increases the peer's reputation score for providing valid blocks.
ReportValidBlock(ctx context.Context, peerID string, blockHash string) error
Expand Down
51 changes: 11 additions & 40 deletions services/p2p/Server.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,8 +690,8 @@ func (s *Server) invalidSubtreeHandler(ctx context.Context) func(msg *kafka.Kafk

s.logger.Infof("[invalidSubtreeHandler] Received invalid subtree notification via Kafka: hash=%s, peerUrl=%s, reason=%s", m.SubtreeHash, m.PeerUrl, m.Reason)

// Use the existing ReportInvalidSubtree method to handle the invalid subtree
err = s.ReportInvalidSubtree(ctx, m.SubtreeHash, m.PeerUrl, m.Reason)
// Use the existing reportInvalidSubtree method to handle the invalid subtree
err = s.reportInvalidSubtree(ctx, m.SubtreeHash, m.PeerUrl, m.Reason)
if err != nil {
// Don't return error here, as we want to continue processing messages
s.logger.Errorf("[invalidSubtreeHandler] Failed to report invalid subtree from Kafka: %v", err)
Expand Down Expand Up @@ -1807,50 +1807,21 @@ func (s *Server) ReportInvalidBlock(ctx context.Context, blockHash string, reaso
return nil
}

// ReportInvalidSubtree handles invalid subtree reports with explicit peer URL
func (s *Server) ReportInvalidSubtree(ctx context.Context, subtreeHash string, peerURL string, reason string) error {
var peerID string

// First try to get peer ID from the subtreePeerMap (for subtrees received via P2P)
peerID, err := s.getPeerFromMap(&s.subtreePeerMap, subtreeHash, "subtree")
if err != nil && peerURL != "" {
// If not found in map and we have a peer URL, look up the peer ID from the URL
peerID = s.getPeerIDFromDataHubURL(peerURL)
if peerID == "" {
s.logger.Warnf("[ReportInvalidSubtree] could not find peer ID for URL %s, subtree %s, reason: %s",
peerURL, subtreeHash, reason)
return nil // Don't return error, just log and continue
}
s.logger.Debugf("[ReportInvalidSubtree] found peer %s from URL %s for subtree %s",
peerID, peerURL, subtreeHash)
}

if peerID == "" {
s.logger.Warnf("[ReportInvalidSubtree] could not determine peer for subtree %s, reason: %s",
subtreeHash, reason)
return nil
}

// reportInvalidSubtree handles invalid subtree reports with explicit peer URL
func (s *Server) reportInvalidSubtree(ctx context.Context, subtreeHash string, peerID string, reason string) error {
// Add ban score to the peer
s.logger.Infof("[ReportInvalidSubtree] adding ban score to peer %s for invalid subtree %s: %s",
s.logger.Infof("[reportInvalidSubtree] adding ban score to peer %s for invalid subtree %s: %s",
peerID, subtreeHash, reason)

// Record as malicious interaction for reputation tracking
s.peerRegistry.RecordMaliciousInteraction(peer.ID(peerID))

// Create the request to add ban score
req := &p2p_api.AddBanScoreRequest{
PeerId: peerID,
Reason: "invalid_subtree",
}

// Call the AddBanScore method
_, err = s.AddBanScore(ctx, req)
peerDecoded, err := peer.Decode(peerID)
if err != nil {
s.logger.Errorf("[ReportInvalidSubtree] error adding ban score to peer %s: %v", peerID, err)
return errors.NewServiceError("error adding ban score to peer %s", peerID, err)
s.logger.Errorf("[reportInvalidSubtree] error decoding peer ID %s: %v", peerID, err)
return errors.NewServiceError("error decoding peer ID %s", peerID, err)
}

// Record as malicious interaction for reputation tracking
s.peerRegistry.RecordMaliciousInteraction(peerDecoded)

// Remove the subtree from the map to avoid memory leaks
s.subtreePeerMap.Delete(subtreeHash)

Expand Down
8 changes: 4 additions & 4 deletions services/p2p/Server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3391,16 +3391,16 @@ func TestReportInvalidSubtreeCoverage(t *testing.T) {
server := createTestServer(t)

// Test with empty hash and required parameters
err := server.ReportInvalidSubtree(ctx, "", "http://test-peer:8080", "test reason")
err := server.reportInvalidSubtree(ctx, "", "http://test-peer:8080", "test reason")
if err != nil {
t.Logf("ReportInvalidSubtree with empty hash failed as expected: %v", err)
t.Logf("reportInvalidSubtree with empty hash failed as expected: %v", err)
}

// Test with valid hash format and all required parameters
testHash := "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
err = server.ReportInvalidSubtree(ctx, testHash, "http://peer:8080", "invalid subtree")
err = server.reportInvalidSubtree(ctx, testHash, "http://peer:8080", "invalid subtree")
if err != nil {
t.Logf("ReportInvalidSubtree may fail in test environment: %v", err)
t.Logf("reportInvalidSubtree may fail in test environment: %v", err)
}

// The function should execute the main logic path regardless of result
Expand Down
41 changes: 38 additions & 3 deletions services/p2p/catchup_metrics_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ func TestReportValidSubtree_GRPCEndpoint(t *testing.T) {
PeerId: testPeerID.String(),
SubtreeHash: "test_subtree_hash_123",
}
resp, err := p2pServer.ReportValidSubtree(ctx, req)
resp, err := p2pServer.ReportValidSubtreeHandler(ctx, req)
require.NoError(t, err)
assert.True(t, resp.Success)
assert.Equal(t, "subtree validation recorded", resp.Message)
Expand All @@ -543,6 +543,41 @@ func TestReportValidSubtree_GRPCEndpoint(t *testing.T) {
assert.Equal(t, int64(1), info.InteractionSuccesses)
}

// TestReportInvalidSubtree_GRPCEndpoint tests the gRPC endpoint validation
func TestReportInvalidSubtree_GRPCEndpoint(t *testing.T) {
ctx := context.Background()

// Create P2P service
p2pRegistry := NewPeerRegistry()
p2pServer := &Server{
peerRegistry: p2pRegistry,
logger: ulogger.TestLogger{},
}

// Create a test peer
testPeerID, err := peer.Decode("12D3KooWBPqTBhshqRZMKZtqb5sfgckM9JYkWDR7eW5kSPEKwKCW")
require.NoError(t, err)

// Add peer to registry
p2pRegistry.Put(testPeerID, "", 0, nil, "")

// Test valid request returns success
req := &p2p_api.ReportInvalidSubtreeRequest{
PeerId: testPeerID.String(),
SubtreeHash: "test_subtree_hash_123",
Reason: "reason",
}
resp, err := p2pServer.ReportInvalidSubtreeHandler(ctx, req)
require.NoError(t, err)
assert.True(t, resp.Success)
assert.Equal(t, "invalid subtree reported", resp.Message)

// Verify peer metrics were updated
info, exists := p2pRegistry.Get(testPeerID)
require.True(t, exists)
assert.Equal(t, int64(1), info.InteractionFailures)
}

// TestReportValidSubtree_MissingHash tests error handling when subtree hash is missing
func TestReportValidSubtree_MissingHash(t *testing.T) {
ctx := context.Background()
Expand All @@ -559,7 +594,7 @@ func TestReportValidSubtree_MissingHash(t *testing.T) {
PeerId: "",
SubtreeHash: "test_hash",
}
resp1, err1 := p2pServer.ReportValidSubtree(ctx, req1)
resp1, err1 := p2pServer.ReportValidSubtreeHandler(ctx, req1)
assert.Error(t, err1)
assert.False(t, resp1.Success)
assert.Contains(t, resp1.Message, "peer ID is required")
Expand All @@ -569,7 +604,7 @@ func TestReportValidSubtree_MissingHash(t *testing.T) {
PeerId: "12D3KooWBPqTBhshqRZMKZtqb5sfgckM9JYkWDR7eW5kSPEKwKCW",
SubtreeHash: "",
}
resp2, err2 := p2pServer.ReportValidSubtree(ctx, req2)
resp2, err2 := p2pServer.ReportValidSubtreeHandler(ctx, req2)
assert.Error(t, err2)
assert.False(t, resp2.Success)
assert.Contains(t, resp2.Message, "subtree hash is required")
Expand Down
Loading
Loading