Skip to content

Commit ef202f4

Browse files
committed
chore: adding da retreiver syncing
1 parent 1ef693a commit ef202f4

File tree

3 files changed

+183
-0
lines changed

3 files changed

+183
-0
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package syncing
2+
3+
import (
4+
"context"
5+
6+
"go.opentelemetry.io/otel"
7+
"go.opentelemetry.io/otel/attribute"
8+
"go.opentelemetry.io/otel/codes"
9+
"go.opentelemetry.io/otel/trace"
10+
11+
"github.com/evstack/ev-node/block/internal/common"
12+
)
13+
14+
var _ DARetriever = (*tracedDARetriever)(nil)
15+
16+
// tracedDARetriever wraps a DARetriever with OpenTelemetry tracing.
17+
type tracedDARetriever struct {
18+
inner DARetriever
19+
tracer trace.Tracer
20+
}
21+
22+
// WithTracingDARetriever wraps a DARetriever with OpenTelemetry tracing.
23+
func WithTracingDARetriever(inner DARetriever) DARetriever {
24+
return &tracedDARetriever{
25+
inner: inner,
26+
tracer: otel.Tracer("ev-node/da-retriever"),
27+
}
28+
}
29+
30+
func (t *tracedDARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) {
31+
ctx, span := t.tracer.Start(ctx, "DARetriever.RetrieveFromDA",
32+
trace.WithAttributes(
33+
attribute.Int64("da.height", int64(daHeight)),
34+
),
35+
)
36+
defer span.End()
37+
38+
events, err := t.inner.RetrieveFromDA(ctx, daHeight)
39+
if err != nil {
40+
span.RecordError(err)
41+
span.SetStatus(codes.Error, err.Error())
42+
return events, err
43+
}
44+
45+
span.SetAttributes(attribute.Int("event.count", len(events)))
46+
47+
// add block heights from events
48+
if len(events) > 0 {
49+
heights := make([]int64, len(events))
50+
for i, event := range events {
51+
heights[i] = int64(event.Header.Height())
52+
}
53+
span.SetAttributes(attribute.Int64Slice("block.heights", heights))
54+
}
55+
56+
return events, nil
57+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package syncing
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/stretchr/testify/require"
9+
"go.opentelemetry.io/otel"
10+
"go.opentelemetry.io/otel/codes"
11+
sdktrace "go.opentelemetry.io/otel/sdk/trace"
12+
"go.opentelemetry.io/otel/sdk/trace/tracetest"
13+
14+
"github.com/evstack/ev-node/block/internal/common"
15+
"github.com/evstack/ev-node/pkg/telemetry/testutil"
16+
"github.com/evstack/ev-node/types"
17+
)
18+
19+
type mockDARetriever struct {
20+
retrieveFromDAFn func(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)
21+
}
22+
23+
func (m *mockDARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) {
24+
if m.retrieveFromDAFn != nil {
25+
return m.retrieveFromDAFn(ctx, daHeight)
26+
}
27+
return nil, nil
28+
}
29+
30+
func setupDARetrieverTrace(t *testing.T, inner DARetriever) (DARetriever, *tracetest.SpanRecorder) {
31+
t.Helper()
32+
sr := tracetest.NewSpanRecorder()
33+
tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
34+
t.Cleanup(func() { _ = tp.Shutdown(context.Background()) })
35+
otel.SetTracerProvider(tp)
36+
return WithTracingDARetriever(inner), sr
37+
}
38+
39+
func TestTracedDARetriever_RetrieveFromDA_Success(t *testing.T) {
40+
mock := &mockDARetriever{
41+
retrieveFromDAFn: func(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) {
42+
return []common.DAHeightEvent{
43+
{
44+
Header: &types.SignedHeader{
45+
Header: types.Header{
46+
BaseHeader: types.BaseHeader{Height: 100},
47+
},
48+
},
49+
DaHeight: daHeight,
50+
Source: common.SourceDA,
51+
},
52+
{
53+
Header: &types.SignedHeader{
54+
Header: types.Header{
55+
BaseHeader: types.BaseHeader{Height: 101},
56+
},
57+
},
58+
DaHeight: daHeight,
59+
Source: common.SourceDA,
60+
},
61+
}, nil
62+
},
63+
}
64+
retriever, sr := setupDARetrieverTrace(t, mock)
65+
ctx := context.Background()
66+
67+
events, err := retriever.RetrieveFromDA(ctx, 50)
68+
require.NoError(t, err)
69+
require.Len(t, events, 2)
70+
71+
spans := sr.Ended()
72+
require.Len(t, spans, 1)
73+
span := spans[0]
74+
require.Equal(t, "DARetriever.RetrieveFromDA", span.Name())
75+
require.Equal(t, codes.Unset, span.Status().Code)
76+
77+
attrs := span.Attributes()
78+
testutil.RequireAttribute(t, attrs, "da.height", int64(50))
79+
testutil.RequireAttribute(t, attrs, "event.count", 2)
80+
}
81+
82+
func TestTracedDARetriever_RetrieveFromDA_NoEvents(t *testing.T) {
83+
mock := &mockDARetriever{
84+
retrieveFromDAFn: func(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) {
85+
return []common.DAHeightEvent{}, nil
86+
},
87+
}
88+
retriever, sr := setupDARetrieverTrace(t, mock)
89+
ctx := context.Background()
90+
91+
events, err := retriever.RetrieveFromDA(ctx, 50)
92+
require.NoError(t, err)
93+
require.Empty(t, events)
94+
95+
spans := sr.Ended()
96+
require.Len(t, spans, 1)
97+
span := spans[0]
98+
require.Equal(t, codes.Unset, span.Status().Code)
99+
100+
attrs := span.Attributes()
101+
testutil.RequireAttribute(t, attrs, "event.count", 0)
102+
}
103+
104+
func TestTracedDARetriever_RetrieveFromDA_Error(t *testing.T) {
105+
expectedErr := errors.New("DA retrieval failed")
106+
mock := &mockDARetriever{
107+
retrieveFromDAFn: func(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) {
108+
return nil, expectedErr
109+
},
110+
}
111+
retriever, sr := setupDARetrieverTrace(t, mock)
112+
ctx := context.Background()
113+
114+
_, err := retriever.RetrieveFromDA(ctx, 50)
115+
require.Error(t, err)
116+
require.Equal(t, expectedErr, err)
117+
118+
spans := sr.Ended()
119+
require.Len(t, spans, 1)
120+
span := spans[0]
121+
require.Equal(t, codes.Error, span.Status().Code)
122+
require.Equal(t, expectedErr.Error(), span.Status().Description)
123+
}

block/internal/syncing/syncer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ func (s *Syncer) Start(ctx context.Context) error {
201201

202202
// Initialize handlers
203203
s.daRetriever = NewDARetriever(s.daClient, s.cache, s.genesis, s.logger)
204+
if s.config.Instrumentation.IsTracingEnabled() {
205+
s.daRetriever = WithTracingDARetriever(s.daRetriever)
206+
}
204207
s.fiRetriever = da.NewForcedInclusionRetriever(s.daClient, s.logger, s.config, s.genesis.DAStartHeight, s.genesis.DAEpochForcedInclusion)
205208
s.p2pHandler = NewP2PHandler(s.headerStore.Store(), s.dataStore.Store(), s.cache, s.genesis, s.logger)
206209
if currentHeight, err := s.store.Height(s.ctx); err != nil {

0 commit comments

Comments
 (0)