Skip to content

Commit 6e775ff

Browse files
committed
fix e2e tests
1 parent 701023f commit 6e775ff

2 files changed

Lines changed: 261 additions & 7 deletions

File tree

da/cmd/local-da/server.go

Lines changed: 220 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package main
22

33
import (
44
"context"
5+
"crypto/sha256"
6+
"encoding/binary"
7+
"fmt"
58
"net"
69
"net/http"
710
"sync/atomic"
@@ -13,13 +16,35 @@ import (
1316
"github.com/evstack/ev-node/da"
1417
)
1518

19+
// Blob represents a Celestia-compatible blob for the blob API
20+
type Blob struct {
21+
Namespace []byte `json:"namespace"`
22+
Data []byte `json:"data"`
23+
ShareVer uint32 `json:"share_version"`
24+
Commitment []byte `json:"commitment"`
25+
Index int `json:"index"`
26+
}
27+
28+
// Proof represents a Celestia-compatible inclusion proof
29+
type Proof struct {
30+
Data []byte `json:"data"`
31+
}
32+
33+
// SubmitOptions contains options for blob submission
34+
type SubmitOptions struct {
35+
Fee float64 `json:"fee,omitempty"`
36+
GasLimit uint64 `json:"gas_limit,omitempty"`
37+
SignerAddress string `json:"signer_address,omitempty"`
38+
}
39+
1640
// Server is a jsonrpc service that serves the LocalDA implementation
1741
type Server struct {
1842
logger zerolog.Logger
1943
srv *http.Server
2044
rpc *jsonrpc.RPCServer
2145
listener net.Listener
2246
daImpl da.DA
47+
localDA *LocalDA // For blob API access to internal data
2348

2449
started atomic.Bool
2550
}
@@ -72,6 +97,184 @@ func (s *serverInternalAPI) SubmitWithOptions(ctx context.Context, blobs []da.Bl
7297
return s.daImpl.SubmitWithOptions(ctx, blobs, gasPrice, ns, options)
7398
}
7499

100+
// blobAPI provides Celestia-compatible Blob API methods
101+
type blobAPI struct {
102+
logger zerolog.Logger
103+
localDA *LocalDA
104+
}
105+
106+
// Submit submits blobs and returns the DA height (Celestia blob API compatible)
107+
func (b *blobAPI) Submit(ctx context.Context, blobs []*Blob, opts *SubmitOptions) (uint64, error) {
108+
b.logger.Debug().Int("num_blobs", len(blobs)).Msg("blob.Submit called")
109+
110+
if len(blobs) == 0 {
111+
return 0, nil
112+
}
113+
114+
ns := blobs[0].Namespace
115+
116+
rawBlobs := make([][]byte, len(blobs))
117+
for i, blob := range blobs {
118+
rawBlobs[i] = blob.Data
119+
}
120+
121+
var gasPrice float64
122+
if opts != nil {
123+
gasPrice = opts.Fee
124+
}
125+
126+
_, err := b.localDA.Submit(ctx, rawBlobs, gasPrice, ns)
127+
if err != nil {
128+
return 0, err
129+
}
130+
131+
b.localDA.mu.Lock()
132+
height := b.localDA.height
133+
b.localDA.mu.Unlock()
134+
135+
b.logger.Info().Uint64("height", height).Int("num_blobs", len(blobs)).Msg("blob.Submit successful")
136+
return height, nil
137+
}
138+
139+
// Get retrieves a single blob by commitment at a given height (Celestia blob API compatible)
140+
func (b *blobAPI) Get(ctx context.Context, height uint64, ns []byte, commitment []byte) (*Blob, error) {
141+
b.logger.Debug().Uint64("height", height).Msg("blob.Get called")
142+
143+
blobs, err := b.GetAll(ctx, height, [][]byte{ns})
144+
if err != nil {
145+
return nil, err
146+
}
147+
148+
for _, blob := range blobs {
149+
if len(commitment) == 0 || bytesEqual(blob.Commitment, commitment) {
150+
return blob, nil
151+
}
152+
}
153+
154+
return nil, nil
155+
}
156+
157+
// GetAll retrieves all blobs at a given height for the specified namespaces (Celestia blob API compatible)
158+
func (b *blobAPI) GetAll(ctx context.Context, height uint64, namespaces [][]byte) ([]*Blob, error) {
159+
b.logger.Debug().Uint64("height", height).Int("num_namespaces", len(namespaces)).Msg("blob.GetAll called")
160+
161+
if len(namespaces) == 0 {
162+
return []*Blob{}, nil
163+
}
164+
165+
ns := namespaces[0]
166+
167+
b.localDA.mu.Lock()
168+
defer b.localDA.mu.Unlock()
169+
170+
if height > b.localDA.height {
171+
b.logger.Debug().Uint64("requested", height).Uint64("current", b.localDA.height).Msg("blob.GetAll: height in future")
172+
return nil, fmt.Errorf("height %d from future, current height is %d", height, b.localDA.height)
173+
}
174+
175+
kvps, ok := b.localDA.data[height]
176+
if !ok {
177+
b.logger.Debug().Uint64("height", height).Msg("blob.GetAll: no data for height")
178+
return []*Blob{}, nil
179+
}
180+
181+
blobs := make([]*Blob, 0, len(kvps))
182+
for i, kv := range kvps {
183+
var commitment []byte
184+
if len(kv.key) > 8 {
185+
commitment = kv.key[8:]
186+
} else {
187+
hash := sha256.Sum256(kv.value)
188+
commitment = hash[:]
189+
}
190+
191+
blobs = append(blobs, &Blob{
192+
Namespace: ns,
193+
Data: kv.value,
194+
ShareVer: 0,
195+
Commitment: commitment,
196+
Index: i,
197+
})
198+
}
199+
200+
b.logger.Debug().Uint64("height", height).Int("num_blobs", len(blobs)).Msg("blob.GetAll successful")
201+
return blobs, nil
202+
}
203+
204+
// GetProof retrieves the inclusion proof for a blob (Celestia blob API compatible)
205+
func (b *blobAPI) GetProof(ctx context.Context, height uint64, ns []byte, commitment []byte) (*Proof, error) {
206+
b.logger.Debug().Uint64("height", height).Msg("blob.GetProof called")
207+
208+
b.localDA.mu.Lock()
209+
defer b.localDA.mu.Unlock()
210+
211+
kvps, ok := b.localDA.data[height]
212+
if !ok {
213+
return nil, nil
214+
}
215+
216+
for _, kv := range kvps {
217+
var blobCommitment []byte
218+
if len(kv.key) > 8 {
219+
blobCommitment = kv.key[8:]
220+
}
221+
222+
if len(commitment) == 0 || bytesEqual(blobCommitment, commitment) {
223+
proof := b.localDA.getProof(kv.key, kv.value)
224+
return &Proof{Data: proof}, nil
225+
}
226+
}
227+
228+
return nil, nil
229+
}
230+
231+
// Included checks whether a blob is included in the DA layer (Celestia blob API compatible)
232+
func (b *blobAPI) Included(ctx context.Context, height uint64, ns []byte, proof *Proof, commitment []byte) (bool, error) {
233+
b.logger.Debug().Uint64("height", height).Msg("blob.Included called")
234+
235+
b.localDA.mu.Lock()
236+
defer b.localDA.mu.Unlock()
237+
238+
kvps, ok := b.localDA.data[height]
239+
if !ok {
240+
return false, nil
241+
}
242+
243+
for _, kv := range kvps {
244+
var blobCommitment []byte
245+
if len(kv.key) > 8 {
246+
blobCommitment = kv.key[8:]
247+
}
248+
249+
if bytesEqual(blobCommitment, commitment) {
250+
return true, nil
251+
}
252+
}
253+
254+
return false, nil
255+
}
256+
257+
// bytesEqual compares two byte slices
258+
func bytesEqual(a, b []byte) bool {
259+
if len(a) != len(b) {
260+
return false
261+
}
262+
for i := range a {
263+
if a[i] != b[i] {
264+
return false
265+
}
266+
}
267+
return true
268+
}
269+
270+
// makeID creates an ID from height and commitment
271+
func makeID(height uint64, commitment []byte) []byte {
272+
id := make([]byte, 8+len(commitment))
273+
binary.LittleEndian.PutUint64(id, height)
274+
copy(id[8:], commitment)
275+
return id
276+
}
277+
75278
func getKnownErrorsMapping() jsonrpc.Errors {
76279
errs := jsonrpc.NewErrors()
77280
errs.Register(jsonrpc.ErrorCode(da.StatusNotFound), &da.ErrBlobNotFound)
@@ -86,25 +289,35 @@ func getKnownErrorsMapping() jsonrpc.Errors {
86289
}
87290

88291
// NewServer creates a new JSON-RPC server for the LocalDA implementation
89-
func NewServer(logger zerolog.Logger, address, port string, daImplementation da.DA) *Server {
292+
// It registers both the legacy "da" namespace and the Celestia-compatible "blob" namespace
293+
func NewServer(logger zerolog.Logger, address, port string, localDA *LocalDA) *Server {
90294
rpc := jsonrpc.NewServer(jsonrpc.WithServerErrors(getKnownErrorsMapping()))
91295
srv := &Server{
92-
rpc: rpc,
93-
logger: logger,
94-
daImpl: daImplementation,
296+
rpc: rpc,
297+
logger: logger,
298+
daImpl: localDA,
299+
localDA: localDA,
95300
srv: &http.Server{
96301
Addr: address + ":" + port,
97302
ReadHeaderTimeout: 2 * time.Second,
98303
},
99304
}
100305
srv.srv.Handler = http.HandlerFunc(rpc.ServeHTTP)
101306

102-
apiHandler := &serverInternalAPI{
307+
// Register legacy "da" namespace API
308+
daAPIHandler := &serverInternalAPI{
103309
logger: logger,
104-
daImpl: daImplementation,
310+
daImpl: localDA,
311+
}
312+
srv.rpc.Register("da", daAPIHandler)
313+
314+
// Register Celestia-compatible "blob" namespace API
315+
blobAPIHandler := &blobAPI{
316+
logger: logger,
317+
localDA: localDA,
105318
}
319+
srv.rpc.Register("blob", blobAPIHandler)
106320

107-
srv.rpc.Register("da", apiHandler)
108321
return srv
109322
}
110323

test/e2e/evm_test_common.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,10 @@ func setupFullNode(t *testing.T, sut *SystemUnderTest, fullNodeHome, sequencerHo
404404
err = os.WriteFile(fullNodeGenesis, genesisData, 0644)
405405
require.NoError(t, err, "failed to write full node genesis file")
406406

407+
// Read namespace from sequencer config to pass to full node
408+
sequencerConfigPath := filepath.Join(sequencerHome, "config", "evnode.yaml")
409+
namespace := extractNamespaceFromConfig(t, sequencerConfigPath)
410+
407411
// Create JWT secret file for full node
408412
fullNodeJwtSecretFile := createJWTSecretFile(t, fullNodeHome, fullNodeJwtSecret)
409413

@@ -418,6 +422,7 @@ func setupFullNode(t *testing.T, sut *SystemUnderTest, fullNodeHome, sequencerHo
418422
"--evm.eth-url", endpoints.GetFullNodeEthURL(),
419423
"--rollkit.da.block_time", DefaultDABlockTime,
420424
"--rollkit.da.address", endpoints.GetDAAddress(),
425+
"--rollkit.da.namespace", namespace, // Use same namespace as sequencer
421426
"--rollkit.rpc.address", endpoints.GetFullNodeRPCListen(),
422427
"--rollkit.p2p.listen_address", endpoints.GetFullNodeP2PAddress(),
423428
}
@@ -427,6 +432,42 @@ func setupFullNode(t *testing.T, sut *SystemUnderTest, fullNodeHome, sequencerHo
427432
sut.AwaitNodeLive(t, endpoints.GetFullNodeRPCAddress(), NodeStartupTimeout)
428433
}
429434

435+
// extractNamespaceFromConfig reads the namespace from a config file
436+
func extractNamespaceFromConfig(t *testing.T, configPath string) string {
437+
t.Helper()
438+
439+
configData, err := os.ReadFile(configPath)
440+
require.NoError(t, err, "failed to read config file")
441+
442+
// Parse YAML - look for "namespace:" under "da:" section
443+
lines := strings.Split(string(configData), "\n")
444+
inDASection := false
445+
for _, line := range lines {
446+
// Check if we're entering the da: section
447+
if strings.TrimSpace(line) == "da:" {
448+
inDASection = true
449+
continue
450+
}
451+
// Check if we're leaving the da: section (new top-level key)
452+
if inDASection && len(line) > 0 && line[0] != ' ' && line[0] != '\t' {
453+
inDASection = false
454+
}
455+
// Look for namespace: inside the da: section
456+
if inDASection {
457+
trimmed := strings.TrimSpace(line)
458+
if strings.HasPrefix(trimmed, "namespace:") {
459+
parts := strings.SplitN(trimmed, ":", 2)
460+
if len(parts) == 2 {
461+
return strings.TrimSpace(parts[1])
462+
}
463+
}
464+
}
465+
}
466+
467+
t.Fatal("namespace not found in config file")
468+
return ""
469+
}
470+
430471
// Global nonce counter to ensure unique nonces across multiple transaction submissions
431472
var globalNonce uint64 = 0
432473

0 commit comments

Comments
 (0)