Skip to content

Commit 77bc171

Browse files
committed
updates
1 parent 2800a75 commit 77bc171

File tree

13 files changed

+697
-98
lines changed

13 files changed

+697
-98
lines changed

apps/evm/cmd/run.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -194,13 +194,8 @@ func createSequencer(
194194
return nil, fmt.Errorf("failed to create single sequencer: %w", err)
195195
}
196196

197-
// Configure DA transaction filter if executor supports it
198-
if filter, ok := executor.(execution.DATransactionFilter); ok {
199-
if infoProvider, ok := executor.(execution.ExecutionInfoProvider); ok {
200-
sequencer.SetDATransactionFilter(filter, infoProvider)
201-
logger.Info().Msg("DA transaction filter configured for gas-based filtering")
202-
}
203-
}
197+
// Configure executor for DA transaction gas-based filtering
198+
sequencer.SetExecutor(executor)
204199

205200
logger.Info().
206201
Str("forced_inclusion_namespace", nodeConfig.DA.GetForcedInclusionNamespace()).

apps/testapp/kv/kvexecutor.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"strings"
99
"time"
1010

11+
"github.com/evstack/ev-node/core/execution"
1112
"github.com/evstack/ev-node/pkg/store"
1213
ds "github.com/ipfs/go-datastore"
1314
"github.com/ipfs/go-datastore/query"
@@ -422,3 +423,30 @@ func (k *KVExecutor) Rollback(ctx context.Context, height uint64) error {
422423
func getTxKey(height uint64, txKey string) ds.Key {
423424
return heightKeyPrefix.Child(ds.NewKey(fmt.Sprintf("%d/%s", height, txKey)))
424425
}
426+
427+
// GetExecutionInfo returns execution layer parameters.
428+
// For KVExecutor, returns MaxGas=0 indicating no gas-based filtering.
429+
func (k *KVExecutor) GetExecutionInfo(ctx context.Context, height uint64) (execution.ExecutionInfo, error) {
430+
return execution.ExecutionInfo{MaxGas: 0}, nil
431+
}
432+
433+
// FilterDATransactions validates and filters force-included transactions from DA.
434+
// For KVExecutor, all transactions are considered valid (no gas-based filtering).
435+
// Invalid transactions (not in key=value format) are filtered out.
436+
func (k *KVExecutor) FilterDATransactions(ctx context.Context, txs [][]byte, maxGas uint64) ([][]byte, [][]byte, error) {
437+
// KVExecutor doesn't do gas filtering but does basic validation
438+
validTxs := make([][]byte, 0, len(txs))
439+
for _, tx := range txs {
440+
if len(tx) == 0 {
441+
continue // Skip empty transactions
442+
}
443+
// Basic format validation: must be key=value
444+
parts := strings.SplitN(string(tx), "=", 2)
445+
if len(parts) != 2 || strings.TrimSpace(parts[0]) == "" {
446+
continue // Filter out malformed transactions
447+
}
448+
validTxs = append(validTxs, tx)
449+
}
450+
// No gas-based filtering, so no remaining transactions
451+
return validTxs, nil, nil
452+
}

core/execution/execution.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,3 +142,19 @@ type HeightProvider interface {
142142
// - error: Any errors during height retrieval
143143
GetLatestHeight(ctx context.Context) (uint64, error)
144144
}
145+
146+
// ExecutionInfoProvider is an interface for components that can provide execution layer parameters.
147+
// This is useful for type assertions when an Executor implementation supports gas-based filtering.
148+
type ExecutionInfoProvider interface {
149+
// GetExecutionInfo returns current execution layer parameters.
150+
// See Executor.GetExecutionInfo for full documentation.
151+
GetExecutionInfo(ctx context.Context, height uint64) (ExecutionInfo, error)
152+
}
153+
154+
// DATransactionFilter is an interface for components that can filter DA transactions.
155+
// This is useful for type assertions when an Executor implementation supports gas-based filtering.
156+
type DATransactionFilter interface {
157+
// FilterDATransactions validates and filters force-included transactions from DA.
158+
// See Executor.FilterDATransactions for full documentation.
159+
FilterDATransactions(ctx context.Context, txs [][]byte, maxGas uint64) (validTxs [][]byte, remainingTxs [][]byte, err error)
160+
}

execution/grpc/client.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,41 @@ func (c *Client) SetFinal(ctx context.Context, blockHeight uint64) error {
112112

113113
return nil
114114
}
115+
116+
// GetExecutionInfo returns current execution layer parameters.
117+
//
118+
// This method retrieves execution parameters such as the block gas limit
119+
// from the remote execution service.
120+
func (c *Client) GetExecutionInfo(ctx context.Context, height uint64) (execution.ExecutionInfo, error) {
121+
req := connect.NewRequest(&pb.GetExecutionInfoRequest{
122+
Height: height,
123+
})
124+
125+
resp, err := c.client.GetExecutionInfo(ctx, req)
126+
if err != nil {
127+
return execution.ExecutionInfo{}, fmt.Errorf("grpc client: failed to get execution info: %w", err)
128+
}
129+
130+
return execution.ExecutionInfo{
131+
MaxGas: resp.Msg.MaxGas,
132+
}, nil
133+
}
134+
135+
// FilterDATransactions validates and filters force-included transactions from DA.
136+
//
137+
// This method sends DA transactions to the remote execution service for validation
138+
// and gas-based filtering. It returns transactions that are valid and fit within
139+
// the gas limit, plus any remaining valid transactions for re-queuing.
140+
func (c *Client) FilterDATransactions(ctx context.Context, txs [][]byte, maxGas uint64) ([][]byte, [][]byte, error) {
141+
req := connect.NewRequest(&pb.FilterDATransactionsRequest{
142+
Txs: txs,
143+
MaxGas: maxGas,
144+
})
145+
146+
resp, err := c.client.FilterDATransactions(ctx, req)
147+
if err != nil {
148+
return nil, nil, fmt.Errorf("grpc client: failed to filter DA transactions: %w", err)
149+
}
150+
151+
return resp.Msg.ValidTxs, resp.Msg.RemainingTxs, nil
152+
}

execution/grpc/server.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,3 +136,39 @@ func (s *Server) SetFinal(
136136

137137
return connect.NewResponse(&pb.SetFinalResponse{}), nil
138138
}
139+
140+
// GetExecutionInfo handles the GetExecutionInfo RPC request.
141+
//
142+
// It returns current execution layer parameters such as the block gas limit.
143+
func (s *Server) GetExecutionInfo(
144+
ctx context.Context,
145+
req *connect.Request[pb.GetExecutionInfoRequest],
146+
) (*connect.Response[pb.GetExecutionInfoResponse], error) {
147+
info, err := s.executor.GetExecutionInfo(ctx, req.Msg.Height)
148+
if err != nil {
149+
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to get execution info: %w", err))
150+
}
151+
152+
return connect.NewResponse(&pb.GetExecutionInfoResponse{
153+
MaxGas: info.MaxGas,
154+
}), nil
155+
}
156+
157+
// FilterDATransactions handles the FilterDATransactions RPC request.
158+
//
159+
// It validates and filters force-included transactions from DA, returning
160+
// transactions that are valid and fit within the gas limit.
161+
func (s *Server) FilterDATransactions(
162+
ctx context.Context,
163+
req *connect.Request[pb.FilterDATransactionsRequest],
164+
) (*connect.Response[pb.FilterDATransactionsResponse], error) {
165+
validTxs, remainingTxs, err := s.executor.FilterDATransactions(ctx, req.Msg.Txs, req.Msg.MaxGas)
166+
if err != nil {
167+
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to filter DA transactions: %w", err))
168+
}
169+
170+
return connect.NewResponse(&pb.FilterDATransactionsResponse{
171+
ValidTxs: validTxs,
172+
RemainingTxs: remainingTxs,
173+
}), nil
174+
}

pkg/sequencers/single/sequencer.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,10 @@ type Sequencer struct {
5252
// Cached forced inclusion transactions from the current epoch
5353
cachedForcedInclusionTxs [][]byte
5454

55-
// DA transaction filtering support (optional)
55+
// Executor for DA transaction filtering (optional)
5656
// When set, forced inclusion transactions are filtered by gas limit
57-
txFilter execution.DATransactionFilter
58-
infoProvider execution.ExecutionInfoProvider
57+
// using the executor's GetExecutionInfo and FilterDATransactions methods
58+
executor execution.Executor
5959
}
6060

6161
// NewSequencer creates a new Single Sequencer
@@ -122,13 +122,12 @@ func NewSequencer(
122122
return s, nil
123123
}
124124

125-
// SetDATransactionFilter sets the optional DA transaction filter and execution info provider.
125+
// SetExecutor sets the optional executor for DA transaction filtering.
126126
// When set, forced inclusion transactions will be filtered by gas limit before being included in batches.
127127
// This should be called after NewSequencer and before Start if filtering is desired.
128-
func (c *Sequencer) SetDATransactionFilter(filter execution.DATransactionFilter, infoProvider execution.ExecutionInfoProvider) {
129-
c.txFilter = filter
130-
c.infoProvider = infoProvider
131-
c.logger.Info().Msg("DA transaction filter configured for gas-based filtering")
128+
func (c *Sequencer) SetExecutor(executor execution.Executor) {
129+
c.executor = executor
130+
c.logger.Info().Msg("Executor configured for DA transaction gas-based filtering")
132131
}
133132

134133
// getInitialDAStartHeight retrieves the DA height of the first included chain height from store.
@@ -216,18 +215,18 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB
216215
// Process forced inclusion transactions from checkpoint position
217216
forcedTxs := c.processForcedInclusionTxsFromCheckpoint(req.MaxBytes)
218217

219-
// Apply gas-based filtering if filter is configured
218+
// Apply gas-based filtering if executor is configured
220219
var filteredForcedTxs [][]byte
221220
var remainingGasFilteredTxs [][]byte
222-
if c.txFilter != nil && c.infoProvider != nil && len(forcedTxs) > 0 {
221+
if c.executor != nil && len(forcedTxs) > 0 {
223222
// Get current gas limit from execution layer
224-
info, err := c.infoProvider.GetExecutionInfo(ctx, 0) // 0 = latest/next block
223+
info, err := c.executor.GetExecutionInfo(ctx, 0) // 0 = latest/next block
225224
if err != nil {
226225
c.logger.Warn().Err(err).Msg("failed to get execution info for gas filtering, proceeding without gas filter")
227226
filteredForcedTxs = forcedTxs
228227
} else if info.MaxGas > 0 {
229228
// Filter by gas limit
230-
filteredForcedTxs, remainingGasFilteredTxs, err = c.txFilter.FilterDATransactions(ctx, forcedTxs, info.MaxGas)
229+
filteredForcedTxs, remainingGasFilteredTxs, err = c.executor.FilterDATransactions(ctx, forcedTxs, info.MaxGas)
231230
if err != nil {
232231
c.logger.Warn().Err(err).Msg("failed to filter DA transactions by gas, proceeding without gas filter")
233232
filteredForcedTxs = forcedTxs

pkg/sequencers/single/sequencer_test.go

Lines changed: 39 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,26 +1010,40 @@ func TestSequencer_GetNextBatch_EmptyDABatch_IncreasesDAHeight(t *testing.T) {
10101010
assert.Equal(t, uint64(0), seq.checkpoint.TxIndex)
10111011
}
10121012

1013-
// mockDATransactionFilter is a mock implementation of execution.DATransactionFilter
1014-
type mockDATransactionFilter struct {
1015-
filterFunc func(ctx context.Context, txs [][]byte, maxGas uint64) ([][]byte, [][]byte, error)
1013+
// mockExecutor is a mock implementation of execution.Executor for testing gas filtering
1014+
type mockExecutor struct {
1015+
maxGas uint64
1016+
getInfoErr error
1017+
filterFunc func(ctx context.Context, txs [][]byte, maxGas uint64) ([][]byte, [][]byte, error)
1018+
filterCallCount int
10161019
}
10171020

1018-
func (m *mockDATransactionFilter) FilterDATransactions(ctx context.Context, txs [][]byte, maxGas uint64) ([][]byte, [][]byte, error) {
1019-
if m.filterFunc != nil {
1020-
return m.filterFunc(ctx, txs, maxGas)
1021-
}
1022-
return txs, nil, nil
1021+
func (m *mockExecutor) InitChain(ctx context.Context, genesisTime time.Time, initialHeight uint64, chainID string) ([]byte, error) {
1022+
return []byte("state-root"), nil
1023+
}
1024+
1025+
func (m *mockExecutor) GetTxs(ctx context.Context) ([][]byte, error) {
1026+
return nil, nil
10231027
}
10241028

1025-
// mockExecutionInfoProvider is a mock implementation of execution.ExecutionInfoProvider
1026-
type mockExecutionInfoProvider struct {
1027-
maxGas uint64
1028-
err error
1029+
func (m *mockExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) ([]byte, error) {
1030+
return []byte("new-state-root"), nil
10291031
}
10301032

1031-
func (m *mockExecutionInfoProvider) GetExecutionInfo(ctx context.Context, height uint64) (execution.ExecutionInfo, error) {
1032-
return execution.ExecutionInfo{MaxGas: m.maxGas}, m.err
1033+
func (m *mockExecutor) SetFinal(ctx context.Context, blockHeight uint64) error {
1034+
return nil
1035+
}
1036+
1037+
func (m *mockExecutor) GetExecutionInfo(ctx context.Context, height uint64) (execution.ExecutionInfo, error) {
1038+
return execution.ExecutionInfo{MaxGas: m.maxGas}, m.getInfoErr
1039+
}
1040+
1041+
func (m *mockExecutor) FilterDATransactions(ctx context.Context, txs [][]byte, maxGas uint64) ([][]byte, [][]byte, error) {
1042+
m.filterCallCount++
1043+
if m.filterFunc != nil {
1044+
return m.filterFunc(ctx, txs, maxGas)
1045+
}
1046+
return txs, nil, nil
10331047
}
10341048

10351049
func TestSequencer_GetNextBatch_WithGasFiltering(t *testing.T) {
@@ -1070,11 +1084,10 @@ func TestSequencer_GetNextBatch_WithGasFiltering(t *testing.T) {
10701084
)
10711085
require.NoError(t, err)
10721086

1073-
// Configure the gas filter mock
1074-
filterCallCount := 0
1075-
mockFilter := &mockDATransactionFilter{
1087+
// Configure the executor mock
1088+
mockExec := &mockExecutor{
1089+
maxGas: 1000000, // 1M gas limit
10761090
filterFunc: func(ctx context.Context, txs [][]byte, maxGas uint64) ([][]byte, [][]byte, error) {
1077-
filterCallCount++
10781091
// Simulate: first 2 txs fit, third one doesn't
10791092
if len(txs) >= 3 {
10801093
return txs[:2], txs[2:], nil
@@ -1083,12 +1096,8 @@ func TestSequencer_GetNextBatch_WithGasFiltering(t *testing.T) {
10831096
},
10841097
}
10851098

1086-
mockInfoProvider := &mockExecutionInfoProvider{
1087-
maxGas: 1000000, // 1M gas limit
1088-
}
1089-
1090-
// Set the filter
1091-
seq.SetDATransactionFilter(mockFilter, mockInfoProvider)
1099+
// Set the executor
1100+
seq.SetExecutor(mockExec)
10921101

10931102
// Manually set up cached forced txs to simulate DA fetch
10941103
seq.cachedForcedInclusionTxs = forcedTxs
@@ -1118,11 +1127,10 @@ func TestSequencer_GetNextBatch_WithGasFiltering(t *testing.T) {
11181127
assert.Equal(t, uint64(0), seq.checkpoint.TxIndex) // Reset because we replaced the cache
11191128

11201129
// Filter should have been called
1121-
assert.Equal(t, 1, filterCallCount)
1130+
assert.Equal(t, 1, mockExec.filterCallCount)
11221131

11231132
// Second call should return the remaining tx
1124-
mockFilter.filterFunc = func(ctx context.Context, txs [][]byte, maxGas uint64) ([][]byte, [][]byte, error) {
1125-
filterCallCount++
1133+
mockExec.filterFunc = func(ctx context.Context, txs [][]byte, maxGas uint64) ([][]byte, [][]byte, error) {
11261134
// Now all remaining txs fit
11271135
return txs, nil, nil
11281136
}
@@ -1167,18 +1175,15 @@ func TestSequencer_GetNextBatch_GasFilterError(t *testing.T) {
11671175
)
11681176
require.NoError(t, err)
11691177

1170-
// Configure filter that returns error
1171-
mockFilter := &mockDATransactionFilter{
1178+
// Configure executor that returns filter error
1179+
mockExec := &mockExecutor{
1180+
maxGas: 1000000,
11721181
filterFunc: func(ctx context.Context, txs [][]byte, maxGas uint64) ([][]byte, [][]byte, error) {
11731182
return nil, nil, errors.New("filter error")
11741183
},
11751184
}
11761185

1177-
mockInfoProvider := &mockExecutionInfoProvider{
1178-
maxGas: 1000000,
1179-
}
1180-
1181-
seq.SetDATransactionFilter(mockFilter, mockInfoProvider)
1186+
seq.SetExecutor(mockExec)
11821187

11831188
// Set up cached txs
11841189
forcedTxs := [][]byte{[]byte("tx1"), []byte("tx2")}

pkg/telemetry/executor_tracing.go

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -106,18 +106,14 @@ func (t *tracedExecutor) GetLatestHeight(ctx context.Context) (uint64, error) {
106106
return height, err
107107
}
108108

109-
// If the underlying executor implements ExecutionInfoProvider, forward it while tracing.
109+
// GetExecutionInfo forwards to the inner executor with tracing.
110110
func (t *tracedExecutor) GetExecutionInfo(ctx context.Context, height uint64) (coreexec.ExecutionInfo, error) {
111-
eip, ok := t.inner.(coreexec.ExecutionInfoProvider)
112-
if !ok {
113-
return coreexec.ExecutionInfo{}, nil
114-
}
115111
ctx, span := t.tracer.Start(ctx, "Executor.GetExecutionInfo",
116112
trace.WithAttributes(attribute.Int64("height", int64(height))),
117113
)
118114
defer span.End()
119115

120-
info, err := eip.GetExecutionInfo(ctx, height)
116+
info, err := t.inner.GetExecutionInfo(ctx, height)
121117
if err != nil {
122118
span.RecordError(err)
123119
span.SetStatus(codes.Error, err.Error())
@@ -127,13 +123,8 @@ func (t *tracedExecutor) GetExecutionInfo(ctx context.Context, height uint64) (c
127123
return info, err
128124
}
129125

130-
// If the underlying executor implements DATransactionFilter, forward it while tracing.
126+
// FilterDATransactions forwards to the inner executor with tracing.
131127
func (t *tracedExecutor) FilterDATransactions(ctx context.Context, txs [][]byte, maxGas uint64) ([][]byte, [][]byte, error) {
132-
filter, ok := t.inner.(coreexec.DATransactionFilter)
133-
if !ok {
134-
// If not implemented, return all transactions as valid (no filtering)
135-
return txs, nil, nil
136-
}
137128
ctx, span := t.tracer.Start(ctx, "Executor.FilterDATransactions",
138129
trace.WithAttributes(
139130
attribute.Int("input_tx_count", len(txs)),
@@ -142,7 +133,7 @@ func (t *tracedExecutor) FilterDATransactions(ctx context.Context, txs [][]byte,
142133
)
143134
defer span.End()
144135

145-
validTxs, remainingTxs, err := filter.FilterDATransactions(ctx, txs, maxGas)
136+
validTxs, remainingTxs, err := t.inner.FilterDATransactions(ctx, txs, maxGas)
146137
if err != nil {
147138
span.RecordError(err)
148139
span.SetStatus(codes.Error, err.Error())

0 commit comments

Comments
 (0)