Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/relay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func main() {
inactivityTimeout, // Inactivity timeout
checkInterval, // Check interval
nil, // No custom logger
nil, // No custom config
)

if err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/rpcdaemon/cli/httpcfg/http_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type HttpCfg struct {
// zkevm
DataStreamPort int
DataStreamHost string
DataStreamStorageType string
DataStreamWriteTimeout time.Duration
DataStreamInactivityTimeout time.Duration
DataStreamInactivityCheckInterval time.Duration
Expand Down
5 changes: 5 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,11 @@ var (
Usage: "Define the host used for the zkevm data stream",
Value: "",
}
DataStreamStorageType = cli.StringFlag{
Name: "zkevm.data-stream-store",
Usage: "Define the storage type used for the zkevm data stream (file, mdbx)",
Value: "file",
}
DataStreamWriteTimeout = cli.DurationFlag{
Name: "zkevm.data-stream-writeTimeout",
Usage: "Define the TCP write timeout when sending data to a datastream client",
Expand Down
4 changes: 2 additions & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ type Ethereum struct {
logger log.Logger

// zk
streamServer server.StreamServer
streamServer server.TcpStreamServer
l1Syncer *syncer.L1Syncer
etherManClients []*etherman.Client
l1Cache *l1_cache.L1Cache
Expand Down Expand Up @@ -1000,7 +1000,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
}

// todo [zkevm] read the stream version from config and figure out what system id is used for
backend.streamServer, err = dataStreamServerFactory.CreateStreamServer(uint16(httpCfg.DataStreamPort), 1, datastreamer.StreamType(1), file, httpCfg.DataStreamWriteTimeout, httpCfg.DataStreamInactivityTimeout, httpCfg.DataStreamInactivityCheckInterval, logConfig)
backend.streamServer, err = dataStreamServerFactory.CreateStreamServer(uint16(httpCfg.DataStreamPort), 1, file, httpCfg.DataStreamWriteTimeout, httpCfg.DataStreamInactivityTimeout, httpCfg.DataStreamInactivityCheckInterval, logConfig, server.StreamStoreType(httpCfg.DataStreamStorageType))
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ module github.com/erigontech/erigon

go 1.23.0

toolchain go1.23.7

require (
github.com/erigontech/mdbx-go v0.27.24
github.com/erigontech/secp256k1 v1.1.0
github.com/erigontech/silkworm-go v0.18.0
github.com/gateway-fm/zkevm-data-streamer v0.2.10-0.20250508001840-99bc56b54bb6
)

replace github.com/erigontech/erigon-lib => ./erigon-lib
Expand Down Expand Up @@ -43,7 +46,6 @@ require (
github.com/erigontech/erigon-lib v1.0.0
github.com/erigontech/erigonwatch v0.1.16
github.com/fjl/gencodec v0.0.0-20220412091415-8bb9e558978c
github.com/gateway-fm/zkevm-data-streamer v0.2.9
github.com/gballet/go-verkle v0.0.0-20221121182333-31427a1f2d35
github.com/gfx-labs/sse v0.0.0-20231226060816-f747e26a9baa
github.com/go-chi/chi/v5 v5.0.12
Expand Down Expand Up @@ -190,7 +192,7 @@ require (
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
github.com/hermeznetwork/tracerr v0.3.2 // indirect
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20220405231054-a1ae3e4bba26 // indirect
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20241129212102-9c50ad6b591e // indirect
github.com/imdario/mergo v0.3.11 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/ipfs/go-cid v0.4.1 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,8 @@ github.com/garslo/gogen v0.0.0-20170307003452-d6ebae628c7c h1:uYNKzPntb8c6DKvP9E
github.com/garslo/gogen v0.0.0-20170307003452-d6ebae628c7c/go.mod h1:Q0X6pkwTILDlzrGEckF6HKjXe48EgsY/l7K7vhY4MW8=
github.com/gateway-fm/vectorized-poseidon-gold v1.0.0 h1:Du0ZW+fkZhgRNGx/gAkHnMj3/Rl8uJkAEe+ZDPX3PDw=
github.com/gateway-fm/vectorized-poseidon-gold v1.0.0/go.mod h1:VLGQpyjrOg8+FugH/+d8tfYd/c3z4Xqa+zbUBITygaw=
github.com/gateway-fm/zkevm-data-streamer v0.2.9 h1:ezR6IJakWvtNXFEhWXtwuot0JikKCJIB9rk2wnUsH3A=
github.com/gateway-fm/zkevm-data-streamer v0.2.9/go.mod h1:kKQSu8Ob9yNR+3ocx9dHlWmQ4Gnct3jRtQqKR69CmyM=
github.com/gateway-fm/zkevm-data-streamer v0.2.10-0.20250508001840-99bc56b54bb6 h1:2YhW21cICgqIH3D1JXVv88AtPiuvY+MtM2Ffr9Utqlk=
github.com/gateway-fm/zkevm-data-streamer v0.2.10-0.20250508001840-99bc56b54bb6/go.mod h1:WN8ODhsoWzTz9PkXK/PUJ4st37+yBLhSAyP09N35LYY=
github.com/gballet/go-verkle v0.0.0-20221121182333-31427a1f2d35 h1:I8QswD9gf3VEpr7bpepKKOm7ChxFITIG+oc1I5/S0no=
github.com/gballet/go-verkle v0.0.0-20221121182333-31427a1f2d35/go.mod h1:DMDd04jjQgdynaAwbEgiRERIGpC8fDjx0+y06an7Psg=
github.com/gfx-labs/sse v0.0.0-20231226060816-f747e26a9baa h1:b6fBm4SLM8jywQHNmc3ZCl6zQEhEyZl6bp7is4en72M=
Expand Down Expand Up @@ -503,8 +503,8 @@ github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU
github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc=
github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8=
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20220405231054-a1ae3e4bba26 h1:UT3hQ6+5hwqUT83cKhKlY5I0W/kqsl6lpn3iFb3Gtqs=
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20220405231054-a1ae3e4bba26/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20241129212102-9c50ad6b591e h1:8AnObPi8WmIgjwcidUxaREhXMSpyUJeeSrIkZTXdabw=
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20241129212102-9c50ad6b591e/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/iden3/go-iden3-crypto v0.0.17 h1:NdkceRLJo/pI4UpcjVah4lN/a3yzxRUGXqxbWcYh9mY=
Expand Down
1 change: 1 addition & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ var DefaultFlags = []cli.Flag{
&utils.GasPriceFactor,
&utils.DataStreamHost,
&utils.DataStreamPort,
&utils.DataStreamStorageType,
&utils.DataStreamWriteTimeout,
&utils.DataStreamInactivityTimeout,
&utils.DataStreamInactivityCheckInterval,
Expand Down
1 change: 1 addition & 0 deletions turbo/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg

DataStreamPort: ctx.Int(utils.DataStreamPort.Name),
DataStreamHost: ctx.String(utils.DataStreamHost.Name),
DataStreamStorageType: ctx.String(utils.DataStreamStorageType.Name),
DataStreamWriteTimeout: ctx.Duration(utils.DataStreamWriteTimeout.Name),
DataStreamInactivityTimeout: ctx.Duration(utils.DataStreamInactivityTimeout.Name),
DataStreamInactivityCheckInterval: ctx.Duration(utils.DataStreamInactivityCheckInterval.Name),
Expand Down
8 changes: 4 additions & 4 deletions zk/datastream/mocks/data_stream_server_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 48 additions & 10 deletions zk/datastream/server/data_stream_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package server

import (
"fmt"
"github.com/gateway-fm/zkevm-data-streamer/datastreamer"
dslog "github.com/gateway-fm/zkevm-data-streamer/log"
"github.com/c2h5oh/datasize"
"sync"
"time"

"github.com/gateway-fm/zkevm-data-streamer/datastreamer"
dslog "github.com/gateway-fm/zkevm-data-streamer/log"

libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/log/v3"
Expand Down Expand Up @@ -45,7 +47,7 @@ const (
)

type ZkEVMDataStreamServer struct {
streamServer StreamServer
streamServer TcpStreamServer
chainId uint64
highestBlockWritten,
highestClosedBatchWritten,
Expand All @@ -65,27 +67,63 @@ type DataStreamEntryProto interface {
type ZkEVMDataStreamServerFactory struct {
}

type StorageType uint8

const (
StorageTypeFile StorageType = iota
StorageTypeMDBX
)

type ServerType uint8

const (
ServerTypeTCP ServerType = iota
ServerTypeGRPC
)

func NewZkEVMDataStreamServerFactory() *ZkEVMDataStreamServerFactory {
return &ZkEVMDataStreamServerFactory{}
}

func (f *ZkEVMDataStreamServerFactory) CreateStreamServer(port uint16, systemID uint64, streamType datastreamer.StreamType, fileName string, writeTimeout time.Duration, inactivityTimeout time.Duration, inactivityCheckInterval time.Duration, cfg *dslog.Config) (StreamServer, error) {
func (f *ZkEVMDataStreamServerFactory) CreateStreamServer(port uint16, systemID uint64, fileName string, writeTimeout time.Duration, inactivityTimeout time.Duration, inactivityCheckInterval time.Duration, cfg *dslog.Config, storageType StreamStoreType) (TcpStreamServer, error) {
// after we moved to protobuff encoding we no longer need to support multiple versions.
const datastreamVersion = 3

var storageFactory func() (datastreamer.StreamStore, error)

if storageType == StreamStoreTypeMDBX {
// Create MDBX-based storage
config := &StreamStoreConfig{
SystemID: systemID,
FilePath: fileName,
MDBXMaxDBS: 3,
MDBXMapSize: int64(3 * datasize.GB),
}

mdbxStore, err := NewMDBXStreamStore(config)
if err != nil {
return nil, fmt.Errorf("failed to create MDBX store: %w", err)
}

storageFactory = func() (datastreamer.StreamStore, error) {
return mdbxStore, nil
}
}

// the library still requires version as a input in it's arguments.
return datastreamer.NewServer(port, datastreamVersion, systemID, streamType, fileName, writeTimeout, inactivityTimeout, inactivityCheckInterval, cfg)
return datastreamer.NewServer(port, datastreamVersion, systemID, 1, fileName, writeTimeout, inactivityTimeout, inactivityCheckInterval, cfg, storageFactory)
}

func (f *ZkEVMDataStreamServerFactory) CreateDataStreamServer(streamServer StreamServer, chainId uint64) DataStreamServer {
func (f *ZkEVMDataStreamServerFactory) CreateDataStreamServer(streamStore TcpStreamServer, chainId uint64) DataStreamServer {
return &ZkEVMDataStreamServer{
streamServer: streamServer,
streamServer: streamStore,
chainId: chainId,
highestBlockWritten: nil,
highestBatchWritten: nil,
}
}

func (srv *ZkEVMDataStreamServer) GetStreamServer() StreamServer {
func (srv *ZkEVMDataStreamServer) GetStreamStore() StreamStore {
return srv.streamServer
}

Expand Down Expand Up @@ -632,12 +670,12 @@ func (srv *ZkEVMDataStreamServer) getLastEntryOfType(entryType datastreamer.Entr
}

type dataStreamServerIterator struct {
stream StreamServer
stream StreamStore
curEntryNum uint64
header uint64
}

func newDataStreamServerIterator(stream StreamServer, start uint64) *dataStreamServerIterator {
func newDataStreamServerIterator(stream StreamStore, start uint64) *dataStreamServerIterator {
return &dataStreamServerIterator{
stream: stream,
curEntryNum: start,
Expand Down
30 changes: 18 additions & 12 deletions zk/datastream/server/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,29 @@ import (

type StreamServer interface {
Start() error
StartAtomicOp() error
AddStreamEntry(etype datastreamer.EntryType, data []byte) (uint64, error)
AddStreamBookmark(bookmark []byte) (uint64, error)
CommitAtomicOp() error
RollbackAtomicOp() error
TruncateFile(entryNum uint64) error
UpdateEntryData(entryNum uint64, etype datastreamer.EntryType, data []byte) error
GetHeader() datastreamer.HeaderEntry
}

type StreamStore interface {
datastreamer.StreamStore

GetEntry(entryNum uint64) (datastreamer.FileEntry, error)
GetBookmark(bookmark []byte) (uint64, error)
GetFirstEventAfterBookmark(bookmark []byte) (datastreamer.FileEntry, error)
GetDataBetweenBookmarks(bookmarkFrom, bookmarkTo []byte) ([]byte, error)
GetBookmark(data []byte) (uint64, error)

UpdateEntryData(entryNum uint64, etype datastreamer.EntryType, data []byte) error

BookmarkPrintDump()
}

type TcpStreamServer interface {
StreamServer
StreamStore
}

type GrpcDataStreamServer interface {
Start() error
}
type DataStreamServer interface {
GetStreamServer() StreamServer
GetStreamStore() StreamStore
GetChainId() uint64
IsLastEntryBatchEnd() (isBatchEnd bool, err error)
GetHighestBlockNumber() (uint64, error)
Expand Down
Loading