Skip to content

Commit 200f4d6

Browse files
committed
feat: add sequencer tracing instrumentation
Add OpenTelemetry tracing for the core Sequencer interface. This traces all three main operations: - SubmitBatchTxs: tracks tx count and batch size - GetNextBatch: tracks tx count, forced inclusion count, batch size - VerifyBatch: tracks batch data count and verification result The tracing wrapper can be used with any Sequencer implementation (single, based, etc.) via WithTracingSequencer().
1 parent 73297c1 commit 200f4d6

File tree

3 files changed

+427
-0
lines changed

3 files changed

+427
-0
lines changed

block/components.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
coreexecutor "github.com/evstack/ev-node/core/execution"
1818
coresequencer "github.com/evstack/ev-node/core/sequencer"
1919
"github.com/evstack/ev-node/pkg/config"
20+
"github.com/evstack/ev-node/pkg/telemetry"
2021
"github.com/evstack/ev-node/pkg/genesis"
2122
"github.com/evstack/ev-node/pkg/signer"
2223
"github.com/evstack/ev-node/pkg/store"
@@ -205,6 +206,11 @@ func NewAggregatorComponents(
205206
// error channel for critical failures
206207
errorCh := make(chan error, 1)
207208

209+
// wrap sequencer with tracing if enabled
210+
if config.Instrumentation.IsTracingEnabled() {
211+
sequencer = telemetry.WithTracingSequencer(sequencer)
212+
}
213+
208214
executor, err := executing.NewExecutor(
209215
store,
210216
exec,

pkg/telemetry/sequencer_tracing.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package telemetry
2+
3+
import (
4+
"context"
5+
"encoding/hex"
6+
7+
coresequencer "github.com/evstack/ev-node/core/sequencer"
8+
"go.opentelemetry.io/otel"
9+
"go.opentelemetry.io/otel/attribute"
10+
"go.opentelemetry.io/otel/codes"
11+
"go.opentelemetry.io/otel/trace"
12+
)
13+
14+
var _ coresequencer.Sequencer = (*tracedSequencer)(nil)
15+
16+
// tracedSequencer decorates a Sequencer with OpenTelemetry spans.
17+
type tracedSequencer struct {
18+
inner coresequencer.Sequencer
19+
tracer trace.Tracer
20+
}
21+
22+
// WithTracingSequencer decorates the provided Sequencer with tracing spans.
23+
func WithTracingSequencer(inner coresequencer.Sequencer) coresequencer.Sequencer {
24+
return &tracedSequencer{
25+
inner: inner,
26+
tracer: otel.Tracer("ev-node/sequencer"),
27+
}
28+
}
29+
30+
func (t *tracedSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) {
31+
txCount := 0
32+
totalBytes := 0
33+
if req.Batch != nil {
34+
txCount = len(req.Batch.Transactions)
35+
for _, tx := range req.Batch.Transactions {
36+
totalBytes += len(tx)
37+
}
38+
}
39+
40+
ctx, span := t.tracer.Start(ctx, "Sequencer.SubmitBatchTxs",
41+
trace.WithAttributes(
42+
attribute.String("chain.id", hex.EncodeToString(req.Id)),
43+
attribute.Int("tx.count", txCount),
44+
attribute.Int("batch.size_bytes", totalBytes),
45+
),
46+
)
47+
defer span.End()
48+
49+
res, err := t.inner.SubmitBatchTxs(ctx, req)
50+
if err != nil {
51+
span.RecordError(err)
52+
span.SetStatus(codes.Error, err.Error())
53+
return nil, err
54+
}
55+
56+
return res, nil
57+
}
58+
59+
func (t *tracedSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextBatchRequest) (*coresequencer.GetNextBatchResponse, error) {
60+
ctx, span := t.tracer.Start(ctx, "Sequencer.GetNextBatch",
61+
trace.WithAttributes(
62+
attribute.String("chain.id", hex.EncodeToString(req.Id)),
63+
attribute.Int64("max_bytes", int64(req.MaxBytes)),
64+
),
65+
)
66+
defer span.End()
67+
68+
res, err := t.inner.GetNextBatch(ctx, req)
69+
if err != nil {
70+
span.RecordError(err)
71+
span.SetStatus(codes.Error, err.Error())
72+
return nil, err
73+
}
74+
75+
if res.Batch != nil {
76+
txCount := len(res.Batch.Transactions)
77+
forcedCount := 0
78+
for _, forced := range res.Batch.ForceIncludedMask {
79+
if forced {
80+
forcedCount++
81+
}
82+
}
83+
totalBytes := 0
84+
for _, tx := range res.Batch.Transactions {
85+
totalBytes += len(tx)
86+
}
87+
88+
span.SetAttributes(
89+
attribute.Int("tx.count", txCount),
90+
attribute.Int("forced_inclusion.count", forcedCount),
91+
attribute.Int("batch.size_bytes", totalBytes),
92+
attribute.Int64("timestamp", res.Timestamp.Unix()),
93+
)
94+
}
95+
96+
return res, nil
97+
}
98+
99+
func (t *tracedSequencer) VerifyBatch(ctx context.Context, req coresequencer.VerifyBatchRequest) (*coresequencer.VerifyBatchResponse, error) {
100+
ctx, span := t.tracer.Start(ctx, "Sequencer.VerifyBatch",
101+
trace.WithAttributes(
102+
attribute.String("chain.id", hex.EncodeToString(req.Id)),
103+
attribute.Int("batch_data.count", len(req.BatchData)),
104+
),
105+
)
106+
defer span.End()
107+
108+
res, err := t.inner.VerifyBatch(ctx, req)
109+
if err != nil {
110+
span.RecordError(err)
111+
span.SetStatus(codes.Error, err.Error())
112+
return nil, err
113+
}
114+
115+
span.SetAttributes(
116+
attribute.Bool("verified", res.Status),
117+
)
118+
119+
return res, nil
120+
}
121+
122+
func (t *tracedSequencer) SetDAHeight(height uint64) {
123+
t.inner.SetDAHeight(height)
124+
}
125+
126+
func (t *tracedSequencer) GetDAHeight() uint64 {
127+
return t.inner.GetDAHeight()
128+
}

0 commit comments

Comments
 (0)