Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions alex_questions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# direct TX

- What format can we expect for the direct TX on the DA? some wrapper type would be useful to unpack and distinct from
random bytes
- Should we always include direct TX although they may be duplicates to mempool TX? spike: yes
- Should we fill the block space with directTX if possible or reserve space for mempool TX

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should we do when a sequencer was not able to add a direct-TX within the time window?

## Smarter sequencer
build blocks by max bytes from the request rather than returning what was added as a batch before from the syncer.
5 changes: 4 additions & 1 deletion apps/evm/based/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/evstack/ev-node/pkg/p2p/key"
"github.com/evstack/ev-node/pkg/store"
"github.com/evstack/ev-node/sequencers/based"
"github.com/evstack/ev-node/sequencers/single"

"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -169,11 +170,13 @@ func NewExtendedRunNodeCmd(ctx context.Context) *cobra.Command {
return fmt.Errorf("failed to create P2P client: %w", err)
}

directTXSeq := single.NewDirectTxSequencer(sequencer, logger, datastore, 100) // todo (Alex): what is a good max value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The maxSize for the DirectTxSequencer is hardcoded to 100. This value should be configurable rather than being a magic number. This will allow for easier tuning and adjustment in different environments without changing the code. Consider adding this to a configuration struct.


// Pass the raw rollDA implementation to StartNode.
// StartNode might need adjustment if it strictly requires coreda.Client methods.
// For now, assume it can work with coreda.DA or will be adjusted later.
// We also need to pass the namespace config for rollDA.
return rollcmd.StartNode(logger, cmd, executor, sequencer, rollDA, p2pClient, datastore, nodeConfig, node.NodeOptions{})
return rollcmd.StartNode(logger, cmd, executor, directTXSeq, rollDA, p2pClient, datastore, nodeConfig, node.NodeOptions{})
},
}

Expand Down
289 changes: 145 additions & 144 deletions apps/evm/based/go.mod

Large diffs are not rendered by default.

615 changes: 300 additions & 315 deletions apps/evm/based/go.sum

Large diffs are not rendered by default.

11 changes: 9 additions & 2 deletions apps/evm/single/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ var RunCmd = &cobra.Command{
}

logger := rollcmd.SetupLogger(nodeConfig.Log)

daJrpc, err := jsonrpc.NewClient(context.Background(), logger, nodeConfig.DA.Address, nodeConfig.DA.AuthToken, nodeConfig.DA.Namespace)
if err != nil {
return err
Expand Down Expand Up @@ -68,6 +67,14 @@ var RunCmd = &cobra.Command{
return err
}

directTXSequencer := single.NewDirectTxSequencer(
sequencer,
logger,
datastore,
100, // todo (Alex): what is a good value?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The maxSize for the DirectTxSequencer is hardcoded to 100. This value should be made configurable to allow for tuning based on the environment. Using hardcoded values makes the system less flexible.

nodeConfig.ForcedInclusion,
)

nodeKey, err := key.LoadNodeKey(filepath.Dir(nodeConfig.ConfigPath()))
if err != nil {
return err
Expand All @@ -78,7 +85,7 @@ var RunCmd = &cobra.Command{
return err
}

return rollcmd.StartNode(logger, cmd, executor, sequencer, &daJrpc.DA, p2pClient, datastore, nodeConfig, node.NodeOptions{})
return rollcmd.StartNode(logger, cmd, executor, directTXSequencer, &daJrpc.DA, p2pClient, datastore, nodeConfig, node.NodeOptions{})
},
}

Expand Down
2 changes: 1 addition & 1 deletion apps/evm/single/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ require (
github.com/buger/goterm v1.0.4 // indirect
github.com/celestiaorg/go-header v0.6.6 // indirect
github.com/celestiaorg/go-libp2p-messenger v0.2.2 // indirect
github.com/celestiaorg/go-square/v2 v2.2.0 // indirect
github.com/celestiaorg/go-square/v2 v2.3.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/compose-spec/compose-go/v2 v2.6.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions apps/evm/single/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ github.com/celestiaorg/go-header v0.6.6 h1:17GvSXU/w8L1YWHZP4pYm9/4YHA8iy5Ku2wTE
github.com/celestiaorg/go-header v0.6.6/go.mod h1:RdnlTmsyuNerztNiJiQE5G/EGEH+cErhQ83xNjuGcaQ=
github.com/celestiaorg/go-libp2p-messenger v0.2.2 h1:osoUfqjss7vWTIZrrDSy953RjQz+ps/vBFE7bychLEc=
github.com/celestiaorg/go-libp2p-messenger v0.2.2/go.mod h1:oTCRV5TfdO7V/k6nkx7QjQzGrWuJbupv+0o1cgnY2i4=
github.com/celestiaorg/go-square/v2 v2.2.0 h1:zJnUxCYc65S8FgUfVpyG/osDcsnjzo/JSXw/Uwn8zp4=
github.com/celestiaorg/go-square/v2 v2.2.0/go.mod h1:j8kQUqJLYtcvCQMQV6QjEhUdaF7rBTXF74g8LbkR0Co=
github.com/celestiaorg/go-square/v2 v2.3.1 h1:CDdiQ+QkKPOQEcyDPODgP/PbAEzqUcftsohCPcbvsnw=
github.com/celestiaorg/go-square/v2 v2.3.1/go.mod h1:6M2txj0j6dkoE+cgwyG0EqrEPhbZpM2R1lsWEopMIBc=
github.com/celestiaorg/utils v0.1.0 h1:WsP3O8jF7jKRgLNFmlDCwdThwOFMFxg0MnqhkLFVxPo=
github.com/celestiaorg/utils v0.1.0/go.mod h1:vQTh7MHnvpIeCQZ2/Ph+w7K1R2UerDheZbgJEJD2hSU=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
Expand Down
4 changes: 2 additions & 2 deletions apps/testapp/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ var RunCmd = &cobra.Command{
if err != nil {
return err
}

directTXSeq := single.NewDirectTxSequencer(sequencer, logger, datastore, 100) // todo (Alex): find a good default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The maxSize for the DirectTxSequencer is hardcoded to 100. This should be exposed as a configuration parameter to avoid magic numbers in the code and allow for easier adjustments.

p2pClient, err := p2p.NewClient(nodeConfig, nodeKey, datastore, logger, p2p.NopMetrics())
if err != nil {
return err
}

return rollcmd.StartNode(logger, cmd, executor, sequencer, &daJrpc.DA, p2pClient, datastore, nodeConfig, node.NodeOptions{})
return rollcmd.StartNode(logger, cmd, executor, directTXSeq, &daJrpc.DA, p2pClient, datastore, nodeConfig, node.NodeOptions{})
},
}
58 changes: 58 additions & 0 deletions block/direct_tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package block

import (
"context"
"crypto/sha256"
"errors"
"github.com/evstack/ev-node/types"
)

type DirectTransaction struct {
TxHash types.Hash
FirstSeenDAHeight uint64 // DA block time when the tx was seen
Included bool // Whether it has been included in a block
IncludedAt uint64 // Height at which it was included
TX []byte
}

func (m *Manager) handlePotentialDirectTXs(ctx context.Context, bz []byte, daHeight uint64) bool {
var unsignedData types.Data // todo (Alex): we need some type to separate from noise
err := unsignedData.UnmarshalBinary(bz)
if err != nil {
m.logger.Debug("failed to unmarshal unsigned data, error", err)
return false
}
if len(unsignedData.Txs) == 0 {
m.logger.Debug("ignoring empty unsigned data, daHeight: ", daHeight)
return false
}
if unsignedData.Metadata.ChainID != m.genesis.ChainID {
m.logger.Debug("ignoring unsigned data from different chain, daHeight: ", daHeight)
return false
}
//Early validation to reject junk data
//if !m.isValidSignedData(&unsignedData) {
// m.logger.Debug("invalid data signature, daHeight: ", daHeight)
// return false
//}
h := m.headerCache.GetItem(daHeight)
if h == nil {
panic("header not found in cache") // todo (Alex): not sure if headers are always available before data. better assume not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Using panic here can crash the entire node if a header is not found in the cache. While the todo comment acknowledges this, it's a critical issue. The function should handle this case gracefully by logging an error and returning false, especially since it's possible for data to be available before its corresponding header.

Suggested change
panic("header not found in cache") // todo (Alex): not sure if headers are always available before data. better assume not
m.logger.Debug("header not found in cache, height:", daHeight)
return false

//m.logger.Debug("header not found in cache, height:", daHeight)
//return false
}
for _, tx := range unsignedData.Txs {
txHash := sha256.New().Sum(tx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The transaction hashing logic is incorrect. sha256.New().Sum(tx) computes the hash of an empty input and appends the result to tx, which is not the intended behavior. To compute the SHA256 hash of the transaction, you should use sha256.Sum256(tx).

txHashBytes := sha256.Sum256(tx)
txHash := txHashBytes[:]

d := DirectTransaction{
TxHash: txHash,
TX: unsignedData.Txs[0],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a bug in this loop. The code is iterating over unsignedData.Txs with the loop variable tx, but inside the DirectTransaction struct, it's always assigning unsignedData.Txs[0]. This means every transaction in the DirectTransaction list will have the content of the first transaction. It should use the loop variable tx instead.

TX:                tx,

FirstSeenDAHeight: daHeight,
Included: false,
IncludedAt: 0,
}
_ = d
}
return true
}

var ErrMissingDirectTx = errors.New("missing direct tx")
177 changes: 177 additions & 0 deletions block/direct_tx_reaper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package block

import (
"context"
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/evstack/ev-node/core/da"
coreda "github.com/evstack/ev-node/core/da"
"github.com/evstack/ev-node/core/sequencer"
"github.com/evstack/ev-node/types"
ds "github.com/ipfs/go-datastore"
kt "github.com/ipfs/go-datastore/keytransform"
"github.com/ipfs/go-log/v2"
)

const (
keyPrefixDirTXSeen = "dTX"
)

// DirectTxReaper is responsible for periodically retrieving direct transactions from the DA layer,
// filtering out already seen transactions, and submitting new transactions to the sequencer.
type DirectTxReaper struct {
da da.DA
sequencer sequencer.DirectTxSequencer
chainID string
interval time.Duration
logger log.EventLogger
ctx context.Context
seenStore ds.Batching
manager *Manager
daHeight *atomic.Uint64
}

// NewDirectTxReaper creates a new DirectTxReaper instance with persistent seenTx storage.
func NewDirectTxReaper(
ctx context.Context,
da coreda.DA,
sequencer sequencer.DirectTxSequencer,
manager *Manager,
chainID string,
interval time.Duration,
logger log.EventLogger,
store ds.Batching,
daStartHeight uint64,
) *DirectTxReaper {
if daStartHeight == 0 {
daStartHeight = 1
}
if interval <= 0 {
interval = 100 * time.Millisecond
}
daHeight := new(atomic.Uint64)
daHeight.Store(daStartHeight)
return &DirectTxReaper{
da: da,
sequencer: sequencer,
chainID: chainID,
interval: interval,
logger: logger,
ctx: ctx,
seenStore: kt.Wrap(store, &kt.PrefixTransform{
Prefix: ds.NewKey(keyPrefixDirTXSeen),
}),
manager: manager,
daHeight: daHeight,
}
}

// Start begins the reaping process at the specified interval.
func (r *DirectTxReaper) Start(ctx context.Context) {
r.ctx = ctx
ticker := time.NewTicker(r.interval)
defer ticker.Stop()

r.logger.Info("DirectTxReaper started", "interval", r.interval)

for {
select {
case <-ctx.Done():
r.logger.Info("DirectTxReaper stopped")
return
case <-ticker.C:
daHeight := r.daHeight.Load()
if err := r.retrieveDirectTXs(daHeight); err != nil {
if strings.Contains(err.Error(), coreda.ErrHeightFromFuture.Error()) {
r.logger.Debug("IDs not found at height", "height", daHeight)
} else {
r.logger.Error("Submit direct txs to sequencer", "error", err)
}
continue
}
r.daHeight.Store(daHeight + 1)

}
}
}

// retrieveDirectTXs retrieves direct transactions from the DA layer and submits them to the sequencer.
func (r *DirectTxReaper) retrieveDirectTXs(daHeight uint64) error {
// Get the latest DA height
// Get all blob IDs at the current DA height
result, err := r.da.GetIDs(r.ctx, daHeight, nil)
if err != nil {
return fmt.Errorf("get IDs from DA: %w", err)
}
if result == nil || len(result.IDs) == 0 {
r.logger.Debug("No blobs at current DA height", "height", daHeight)
return nil
}
r.logger.Debug("IDs at current DA height", "height", daHeight, "count", len(result.IDs))

// Get the blobs for all IDs
blobs, err := r.da.Get(r.ctx, result.IDs, nil)
if err != nil {
return fmt.Errorf("get blobs from DA: %w", err)
}
r.logger.Debug("Blobs found at height", "height", daHeight, "count", len(blobs))

var newTxs []sequencer.DirectTX
for _, blob := range blobs {
r.logger.Debug("Processing blob data")

// Process each blob to extract direct transactions
var data types.Data
err := data.UnmarshalBinary(blob)
if err != nil {
r.logger.Debug("Unexpected payload skipping ", "error", err)
continue
}

// Skip blobs from different chains
if data.Metadata.ChainID != r.chainID {
r.logger.Debug("Ignoring data from different chain", "chainID", data.Metadata.ChainID, "expectedChainID", r.chainID)
continue
}

// Process each transaction in the blob
for i, tx := range data.Txs {
txHash := hashTx(tx)
has, err := r.seenStore.Has(r.ctx, ds.NewKey(txHash))
if err != nil {
return fmt.Errorf("check seenStore: %w", err)
}
if !has {
newTxs = append(newTxs, sequencer.DirectTX{
TX: tx,
ID: result.IDs[i],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There's a critical bug in how the blob ID is being assigned to direct transactions. The code uses the transaction's index within a blob (i) to look up an ID from result.IDs, which is a list of blob IDs. This is incorrect because all transactions within a single blob should share the same blob ID. Furthermore, if a blob contains more transactions than there are total blob IDs for that height, this will cause a panic due to an index out-of-range error.

The logic should be to iterate through blobs with their index to get the corresponding blob ID, and then assign that same blob ID to all transactions found within that blob.

FirstSeenHeight: daHeight,
FirstSeenTime: result.Timestamp.Unix(),
})
}
}
}

if len(newTxs) == 0 {
r.logger.Debug("No new direct txs to submit")
return nil
}

r.logger.Debug("Submitting direct txs to sequencer", "txCount", len(newTxs))
err = r.sequencer.SubmitDirectTxs(r.ctx, newTxs...)
if err != nil {
return fmt.Errorf("submit direct txs to sequencer: %w", err)
}
// Mark the transactions as seen
for _, v := range newTxs {
txHash := hashTx(v.TX)
if err := r.seenStore.Put(r.ctx, ds.NewKey(txHash), []byte{1}); err != nil {
return fmt.Errorf("persist seen tx: %w", err)
}
}
r.logger.Debug("Successfully submitted direct txs")
return nil
}
Loading
Loading