Skip to content

Commit db5390c

Browse files
authored
da: replace go-cnc with celestia-openrpc (#986)
## Overview This PR replaces `go-cnc` with `celestia-openrpc` for the client. Fixes #979 ## Checklist - [x] New and updated code has appropriate documentation - [x] New and updated code has new and/or updated testing - [x] Required CI checks are passing - [x] Visual proof for any user facing features like CLI or documentation updates - [x] Linked issues closed with keywords
1 parent 9a80ef7 commit db5390c

10 files changed

Lines changed: 544 additions & 279 deletions

File tree

da/celestia/celestia.go

Lines changed: 77 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,21 @@ package celestia
22

33
import (
44
"context"
5+
"encoding/hex"
56
"encoding/json"
67
"fmt"
78
"strings"
89
"time"
910

11+
"cosmossdk.io/math"
1012
"github.com/gogo/protobuf/proto"
1113
ds "github.com/ipfs/go-datastore"
1214

13-
"github.com/celestiaorg/go-cnc"
14-
1515
openrpc "github.com/rollkit/celestia-openrpc"
16+
"github.com/rollkit/celestia-openrpc/types/blob"
17+
"github.com/rollkit/celestia-openrpc/types/share"
1618

19+
openrpcns "github.com/rollkit/celestia-openrpc/types/namespace"
1720
"github.com/rollkit/rollkit/da"
1821
"github.com/rollkit/rollkit/log"
1922
"github.com/rollkit/rollkit/types"
@@ -22,30 +25,32 @@ import (
2225

2326
// DataAvailabilityLayerClient use celestia-node public API.
2427
type DataAvailabilityLayerClient struct {
25-
_ *openrpc.Client
26-
client *cnc.Client
28+
rpc *openrpc.Client
2729

28-
namespaceID types.NamespaceID
29-
config Config
30-
logger log.Logger
30+
namespace openrpcns.Namespace
31+
config Config
32+
logger log.Logger
3133
}
3234

3335
var _ da.DataAvailabilityLayerClient = &DataAvailabilityLayerClient{}
3436
var _ da.BlockRetriever = &DataAvailabilityLayerClient{}
3537

3638
// Config stores Celestia DALC configuration parameters.
3739
type Config struct {
38-
BaseURL string `json:"base_url"`
39-
Timeout time.Duration `json:"timeout"`
40-
Fee int64 `json:"fee"`
41-
GasLimit uint64 `json:"gas_limit"`
40+
AuthToken string `json:"auth_token"`
41+
BaseURL string `json:"base_url"`
42+
Timeout time.Duration `json:"timeout"`
43+
Fee int64 `json:"fee"`
44+
GasLimit uint64 `json:"gas_limit"`
4245
}
4346

4447
// Init initializes DataAvailabilityLayerClient instance.
45-
func (c *DataAvailabilityLayerClient) Init(
46-
namespaceID types.NamespaceID, config []byte, kvStore ds.Datastore, logger log.Logger,
47-
) error {
48-
c.namespaceID = namespaceID
48+
func (c *DataAvailabilityLayerClient) Init(namespaceID types.NamespaceID, config []byte, kvStore ds.Datastore, logger log.Logger) error {
49+
namespace, err := share.NewBlobNamespaceV0(namespaceID[:])
50+
if err != nil {
51+
return err
52+
}
53+
c.namespace = namespace.ToAppNamespace()
4954
c.logger = logger
5055

5156
if len(config) > 0 {
@@ -59,7 +64,7 @@ func (c *DataAvailabilityLayerClient) Init(
5964
func (c *DataAvailabilityLayerClient) Start() error {
6065
c.logger.Info("starting Celestia Data Availability Layer Client", "baseURL", c.config.BaseURL)
6166
var err error
62-
c.client, err = cnc.NewClient(c.config.BaseURL, cnc.WithTimeout(c.config.Timeout))
67+
c.rpc, err = openrpc.NewClient(context.Background(), c.config.BaseURL, c.config.AuthToken)
6368
return err
6469
}
6570

@@ -71,7 +76,17 @@ func (c *DataAvailabilityLayerClient) Stop() error {
7176

7277
// SubmitBlock submits a block to DA layer.
7378
func (c *DataAvailabilityLayerClient) SubmitBlock(ctx context.Context, block *types.Block) da.ResultSubmitBlock {
74-
blob, err := block.MarshalBinary()
79+
data, err := block.MarshalBinary()
80+
if err != nil {
81+
return da.ResultSubmitBlock{
82+
BaseResult: da.BaseResult{
83+
Code: da.StatusError,
84+
Message: err.Error(),
85+
},
86+
}
87+
}
88+
89+
blockBlob, err := blob.NewBlobV0(c.namespace.Bytes(), data)
7590
if err != nil {
7691
return da.ResultSubmitBlock{
7792
BaseResult: da.BaseResult{
@@ -81,8 +96,9 @@ func (c *DataAvailabilityLayerClient) SubmitBlock(ctx context.Context, block *ty
8196
}
8297
}
8398

84-
txResponse, err := c.client.SubmitPFB(ctx, c.namespaceID, blob, c.config.Fee, c.config.GasLimit)
99+
blobs := []*blob.Blob{blockBlob}
85100

101+
txResponse, err := c.rpc.State.SubmitPayForBlob(ctx, math.NewInt(c.config.Fee), c.config.GasLimit, blobs)
86102
if err != nil {
87103
return da.ResultSubmitBlock{
88104
BaseResult: da.BaseResult{
@@ -92,6 +108,10 @@ func (c *DataAvailabilityLayerClient) SubmitBlock(ctx context.Context, block *ty
92108
}
93109
}
94110

111+
c.logger.Debug("successfully submitted PayForBlob transaction",
112+
"fee", c.config.Fee, "gasLimit", c.config.GasLimit,
113+
"daHeight", txResponse.Height, "daTxHash", txResponse.TxHash)
114+
95115
if txResponse.Code != 0 {
96116
return da.ResultSubmitBlock{
97117
BaseResult: da.BaseResult{
@@ -111,34 +131,56 @@ func (c *DataAvailabilityLayerClient) SubmitBlock(ctx context.Context, block *ty
111131
}
112132

113133
// CheckBlockAvailability queries DA layer to check data availability of block at given height.
114-
func (c *DataAvailabilityLayerClient) CheckBlockAvailability(
115-
ctx context.Context, dataLayerHeight uint64,
116-
) da.ResultCheckBlock {
117-
shares, err := c.client.NamespacedShares(ctx, c.namespaceID, dataLayerHeight)
118-
code := dataRequestErrorToStatus(err)
119-
if code != da.StatusSuccess {
134+
func (c *DataAvailabilityLayerClient) CheckBlockAvailability(ctx context.Context, dataLayerHeight uint64) da.ResultCheckBlock {
135+
header, err := c.rpc.Header.GetByHeight(ctx, dataLayerHeight)
136+
if err != nil {
137+
return da.ResultCheckBlock{
138+
BaseResult: da.BaseResult{
139+
Code: da.StatusError,
140+
Message: err.Error(),
141+
},
142+
}
143+
}
144+
if header.DAH == nil {
145+
return da.ResultCheckBlock{
146+
BaseResult: da.BaseResult{
147+
Code: da.StatusSuccess,
148+
DAHeight: dataLayerHeight,
149+
},
150+
DataAvailable: false,
151+
}
152+
}
153+
err = c.rpc.Share.SharesAvailable(ctx, header.DAH)
154+
if err != nil {
155+
if strings.Contains(err.Error(), share.ErrNotAvailable.Error()) {
156+
return da.ResultCheckBlock{
157+
BaseResult: da.BaseResult{
158+
Code: da.StatusSuccess,
159+
DAHeight: dataLayerHeight,
160+
},
161+
DataAvailable: false,
162+
}
163+
}
120164
return da.ResultCheckBlock{
121165
BaseResult: da.BaseResult{
122166
Code: da.StatusError,
123167
Message: err.Error(),
124168
},
125169
}
126170
}
127-
128171
return da.ResultCheckBlock{
129172
BaseResult: da.BaseResult{
130173
Code: da.StatusSuccess,
131174
DAHeight: dataLayerHeight,
132175
},
133-
DataAvailable: len(shares) > 0,
176+
DataAvailable: true,
134177
}
135178
}
136179

137180
// RetrieveBlocks gets a batch of blocks from DA layer.
138-
func (c *DataAvailabilityLayerClient) RetrieveBlocks(
139-
ctx context.Context, dataLayerHeight uint64,
140-
) da.ResultRetrieveBlocks {
141-
data, err := c.client.NamespacedData(ctx, c.namespaceID, dataLayerHeight)
181+
func (c *DataAvailabilityLayerClient) RetrieveBlocks(ctx context.Context, dataLayerHeight uint64) da.ResultRetrieveBlocks {
182+
c.logger.Debug("trying to retrieve blob using Blob.GetAll", "daHeight", dataLayerHeight, "namespace", hex.EncodeToString(c.namespace.Bytes()))
183+
blobs, err := c.rpc.Blob.GetAll(ctx, dataLayerHeight, []share.Namespace{c.namespace.Bytes()})
142184
status := dataRequestErrorToStatus(err)
143185
if status != da.StatusSuccess {
144186
return da.ResultRetrieveBlocks{
@@ -149,10 +191,10 @@ func (c *DataAvailabilityLayerClient) RetrieveBlocks(
149191
}
150192
}
151193

152-
blocks := make([]*types.Block, len(data))
153-
for i, msg := range data {
194+
blocks := make([]*types.Block, len(blobs))
195+
for i, blob := range blobs {
154196
var block pb.Block
155-
err = proto.Unmarshal(msg, &block)
197+
err = proto.Unmarshal(blob.Data, &block)
156198
if err != nil {
157199
c.logger.Error("failed to unmarshal block", "daHeight", dataLayerHeight, "position", i, "error", err)
158200
continue
@@ -187,7 +229,8 @@ func dataRequestErrorToStatus(err error) da.StatusCode {
187229
strings.Contains(err.Error(), da.ErrNamespaceNotFound.Error()):
188230
return da.StatusSuccess
189231
case strings.Contains(err.Error(), da.ErrDataNotFound.Error()),
190-
strings.Contains(err.Error(), da.ErrEDSNotFound.Error()):
232+
strings.Contains(err.Error(), da.ErrEDSNotFound.Error()),
233+
strings.Contains(err.Error(), da.ErrBlobNotFound.Error()):
191234
return da.StatusNotFound
192235
default:
193236
return da.StatusError

da/celestia/mock/messages.go

Lines changed: 0 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,8 @@
11
package mock
22

3-
import (
4-
"bytes"
5-
"encoding/binary"
6-
)
7-
83
// This code is extracted from celestia-app. It's here to build shares from messages (serialized blocks).
94
// TODO(tzdybal): if we stop using `/namespaced_shares` we can get rid of this file.
105

11-
const (
12-
shareSize = 256
13-
namespaceSize = 8
14-
msgShareSize = shareSize - namespaceSize
15-
)
16-
17-
// splitMessage breaks the data in a message into the minimum number of
18-
// namespaced shares
19-
func splitMessage(rawData []byte, nid []byte) []NamespacedShare {
20-
shares := make([]NamespacedShare, 0)
21-
firstRawShare := append(append(
22-
make([]byte, 0, shareSize),
23-
nid...),
24-
rawData[:msgShareSize]...,
25-
)
26-
shares = append(shares, NamespacedShare{firstRawShare, nid})
27-
rawData = rawData[msgShareSize:]
28-
for len(rawData) > 0 {
29-
shareSizeOrLen := min(msgShareSize, len(rawData))
30-
rawShare := append(append(
31-
make([]byte, 0, shareSize),
32-
nid...),
33-
rawData[:shareSizeOrLen]...,
34-
)
35-
paddedShare := zeroPadIfNecessary(rawShare, shareSize)
36-
share := NamespacedShare{paddedShare, nid}
37-
shares = append(shares, share)
38-
rawData = rawData[shareSizeOrLen:]
39-
}
40-
return shares
41-
}
42-
436
// Share contains the raw share data without the corresponding namespace.
447
type Share []byte
458

@@ -48,59 +11,3 @@ type NamespacedShare struct {
4811
Share
4912
ID []byte
5013
}
51-
52-
func min(a, b int) int {
53-
if a <= b {
54-
return a
55-
}
56-
return b
57-
}
58-
59-
func zeroPadIfNecessary(share []byte, width int) []byte {
60-
oldLen := len(share)
61-
if oldLen < width {
62-
missingBytes := width - oldLen
63-
padByte := []byte{0}
64-
padding := bytes.Repeat(padByte, missingBytes)
65-
share = append(share, padding...)
66-
return share
67-
}
68-
return share
69-
}
70-
71-
// marshalDelimited marshals the raw data (excluding the namespace) of this
72-
// message and prefixes it with the length of that encoding.
73-
func marshalDelimited(data []byte) ([]byte, error) {
74-
lenBuf := make([]byte, binary.MaxVarintLen64)
75-
length := uint64(len(data))
76-
n := binary.PutUvarint(lenBuf, length)
77-
return append(lenBuf[:n], data...), nil
78-
}
79-
80-
// appendToShares appends raw data as shares.
81-
// Used to build shares from blocks/messages.
82-
func appendToShares(shares []NamespacedShare, nid []byte, rawData []byte) []NamespacedShare {
83-
if len(rawData) <= msgShareSize {
84-
rawShare := append(append(
85-
make([]byte, 0, len(nid)+len(rawData)),
86-
nid...),
87-
rawData...,
88-
)
89-
paddedShare := zeroPadIfNecessary(rawShare, shareSize)
90-
share := NamespacedShare{paddedShare, nid}
91-
shares = append(shares, share)
92-
} else { // len(rawData) > msgShareSize
93-
shares = append(shares, splitMessage(rawData, nid)...)
94-
}
95-
return shares
96-
}
97-
98-
type namespacedSharesResponse struct {
99-
Shares []Share `json:"shares"`
100-
Height uint64 `json:"height"`
101-
}
102-
103-
type namespacedDataResponse struct {
104-
Data [][]byte `json:"data"`
105-
Height uint64 `json:"height"`
106-
}

0 commit comments

Comments
 (0)