diff --git a/cmd/relay/main.go b/cmd/relay/main.go index 15844e4a039..c62f14640d9 100644 --- a/cmd/relay/main.go +++ b/cmd/relay/main.go @@ -56,6 +56,7 @@ func main() { inactivityTimeout, // Inactivity timeout checkInterval, // Check interval nil, // No custom logger + nil, // No custom config ) if err != nil { diff --git a/cmd/rpcdaemon/cli/httpcfg/http_cfg.go b/cmd/rpcdaemon/cli/httpcfg/http_cfg.go index 93385156514..ad84de2b3ea 100644 --- a/cmd/rpcdaemon/cli/httpcfg/http_cfg.go +++ b/cmd/rpcdaemon/cli/httpcfg/http_cfg.go @@ -100,6 +100,7 @@ type HttpCfg struct { // zkevm DataStreamPort int DataStreamHost string + DataStreamStorageType string DataStreamWriteTimeout time.Duration DataStreamInactivityTimeout time.Duration DataStreamInactivityCheckInterval time.Duration diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 1969d5751b4..34167d73c05 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -683,6 +683,11 @@ var ( Usage: "Define the host used for the zkevm data stream", Value: "", } + DataStreamStorageType = cli.StringFlag{ + Name: "zkevm.data-stream-store", + Usage: "Define the storage type used for the zkevm data stream (file, mdbx)", + Value: "file", + } DataStreamWriteTimeout = cli.DurationFlag{ Name: "zkevm.data-stream-writeTimeout", Usage: "Define the TCP write timeout when sending data to a datastream client", diff --git a/eth/backend.go b/eth/backend.go index f8e124ae5e9..2e503c7e2ac 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -224,7 +224,7 @@ type Ethereum struct { logger log.Logger // zk - streamServer server.StreamServer + streamServer server.TcpStreamServer l1Syncer *syncer.L1Syncer etherManClients []*etherman.Client l1Cache *l1_cache.L1Cache @@ -1000,7 +1000,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger } // todo [zkevm] read the stream version from config and figure out what system id is used for - backend.streamServer, err = dataStreamServerFactory.CreateStreamServer(uint16(httpCfg.DataStreamPort), 1, datastreamer.StreamType(1), file, httpCfg.DataStreamWriteTimeout, httpCfg.DataStreamInactivityTimeout, httpCfg.DataStreamInactivityCheckInterval, logConfig) + backend.streamServer, err = dataStreamServerFactory.CreateStreamServer(uint16(httpCfg.DataStreamPort), 1, file, httpCfg.DataStreamWriteTimeout, httpCfg.DataStreamInactivityTimeout, httpCfg.DataStreamInactivityCheckInterval, logConfig, server.StreamStoreType(httpCfg.DataStreamStorageType)) if err != nil { return nil, err } diff --git a/go.mod b/go.mod index 1eca4da763a..1f2a35f8544 100644 --- a/go.mod +++ b/go.mod @@ -2,10 +2,13 @@ module github.com/erigontech/erigon go 1.23.0 +toolchain go1.23.7 + require ( github.com/erigontech/mdbx-go v0.27.24 github.com/erigontech/secp256k1 v1.1.0 github.com/erigontech/silkworm-go v0.18.0 + github.com/gateway-fm/zkevm-data-streamer v0.2.10-0.20250508001840-99bc56b54bb6 ) replace github.com/erigontech/erigon-lib => ./erigon-lib @@ -43,7 +46,6 @@ require ( github.com/erigontech/erigon-lib v1.0.0 github.com/erigontech/erigonwatch v0.1.16 github.com/fjl/gencodec v0.0.0-20220412091415-8bb9e558978c - github.com/gateway-fm/zkevm-data-streamer v0.2.9 github.com/gballet/go-verkle v0.0.0-20221121182333-31427a1f2d35 github.com/gfx-labs/sse v0.0.0-20231226060816-f747e26a9baa github.com/go-chi/chi/v5 v5.0.12 @@ -190,7 +192,7 @@ require ( github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect github.com/hermeznetwork/tracerr v0.3.2 // indirect - github.com/ianlancetaylor/cgosymbolizer v0.0.0-20220405231054-a1ae3e4bba26 // indirect + github.com/ianlancetaylor/cgosymbolizer v0.0.0-20241129212102-9c50ad6b591e // indirect github.com/imdario/mergo v0.3.11 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect diff --git a/go.sum b/go.sum index 64403a67efd..ecdc46a8369 100644 --- a/go.sum +++ b/go.sum @@ -315,8 +315,8 @@ github.com/garslo/gogen v0.0.0-20170307003452-d6ebae628c7c h1:uYNKzPntb8c6DKvP9E github.com/garslo/gogen v0.0.0-20170307003452-d6ebae628c7c/go.mod h1:Q0X6pkwTILDlzrGEckF6HKjXe48EgsY/l7K7vhY4MW8= github.com/gateway-fm/vectorized-poseidon-gold v1.0.0 h1:Du0ZW+fkZhgRNGx/gAkHnMj3/Rl8uJkAEe+ZDPX3PDw= github.com/gateway-fm/vectorized-poseidon-gold v1.0.0/go.mod h1:VLGQpyjrOg8+FugH/+d8tfYd/c3z4Xqa+zbUBITygaw= -github.com/gateway-fm/zkevm-data-streamer v0.2.9 h1:ezR6IJakWvtNXFEhWXtwuot0JikKCJIB9rk2wnUsH3A= -github.com/gateway-fm/zkevm-data-streamer v0.2.9/go.mod h1:kKQSu8Ob9yNR+3ocx9dHlWmQ4Gnct3jRtQqKR69CmyM= +github.com/gateway-fm/zkevm-data-streamer v0.2.10-0.20250508001840-99bc56b54bb6 h1:2YhW21cICgqIH3D1JXVv88AtPiuvY+MtM2Ffr9Utqlk= +github.com/gateway-fm/zkevm-data-streamer v0.2.10-0.20250508001840-99bc56b54bb6/go.mod h1:WN8ODhsoWzTz9PkXK/PUJ4st37+yBLhSAyP09N35LYY= github.com/gballet/go-verkle v0.0.0-20221121182333-31427a1f2d35 h1:I8QswD9gf3VEpr7bpepKKOm7ChxFITIG+oc1I5/S0no= github.com/gballet/go-verkle v0.0.0-20221121182333-31427a1f2d35/go.mod h1:DMDd04jjQgdynaAwbEgiRERIGpC8fDjx0+y06an7Psg= github.com/gfx-labs/sse v0.0.0-20231226060816-f747e26a9baa h1:b6fBm4SLM8jywQHNmc3ZCl6zQEhEyZl6bp7is4en72M= @@ -503,8 +503,8 @@ github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= -github.com/ianlancetaylor/cgosymbolizer v0.0.0-20220405231054-a1ae3e4bba26 h1:UT3hQ6+5hwqUT83cKhKlY5I0W/kqsl6lpn3iFb3Gtqs= -github.com/ianlancetaylor/cgosymbolizer v0.0.0-20220405231054-a1ae3e4bba26/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= +github.com/ianlancetaylor/cgosymbolizer v0.0.0-20241129212102-9c50ad6b591e h1:8AnObPi8WmIgjwcidUxaREhXMSpyUJeeSrIkZTXdabw= +github.com/ianlancetaylor/cgosymbolizer v0.0.0-20241129212102-9c50ad6b591e/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/iden3/go-iden3-crypto v0.0.17 h1:NdkceRLJo/pI4UpcjVah4lN/a3yzxRUGXqxbWcYh9mY= diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index cc1fb875f71..00cb123bb20 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -246,6 +246,7 @@ var DefaultFlags = []cli.Flag{ &utils.GasPriceFactor, &utils.DataStreamHost, &utils.DataStreamPort, + &utils.DataStreamStorageType, &utils.DataStreamWriteTimeout, &utils.DataStreamInactivityTimeout, &utils.DataStreamInactivityCheckInterval, diff --git a/turbo/cli/flags.go b/turbo/cli/flags.go index 4ef1e00faf1..9c7cbc8123b 100644 --- a/turbo/cli/flags.go +++ b/turbo/cli/flags.go @@ -553,6 +553,7 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg DataStreamPort: ctx.Int(utils.DataStreamPort.Name), DataStreamHost: ctx.String(utils.DataStreamHost.Name), + DataStreamStorageType: ctx.String(utils.DataStreamStorageType.Name), DataStreamWriteTimeout: ctx.Duration(utils.DataStreamWriteTimeout.Name), DataStreamInactivityTimeout: ctx.Duration(utils.DataStreamInactivityTimeout.Name), DataStreamInactivityCheckInterval: ctx.Duration(utils.DataStreamInactivityCheckInterval.Name), diff --git a/zk/datastream/mocks/data_stream_server_mock.go b/zk/datastream/mocks/data_stream_server_mock.go index 85674134a43..e88922f9841 100644 --- a/zk/datastream/mocks/data_stream_server_mock.go +++ b/zk/datastream/mocks/data_stream_server_mock.go @@ -241,17 +241,17 @@ func (c *MockDataStreamServerGetHighestClosedBatchNoCacheCall) DoAndReturn(f fun } // GetStreamServer mocks base method. -func (m *MockDataStreamServer) GetStreamServer() server.StreamServer { +func (m *MockDataStreamServer) GetStreamStore() server.StreamStore { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetStreamServer") - ret0, _ := ret[0].(server.StreamServer) + ret := m.ctrl.Call(m, "GetStreamStore") + ret0, _ := ret[0].(server.StreamStore) return ret0 } // GetStreamServer indicates an expected call of GetStreamServer. func (mr *MockDataStreamServerMockRecorder) GetStreamServer() *MockDataStreamServerGetStreamServerCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStreamServer", reflect.TypeOf((*MockDataStreamServer)(nil).GetStreamServer)) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStreamServer", reflect.TypeOf((*MockDataStreamServer)(nil).GetStreamStore())) return &MockDataStreamServerGetStreamServerCall{Call: call} } diff --git a/zk/datastream/server/data_stream_server.go b/zk/datastream/server/data_stream_server.go index 84c26d45a3b..09981a1d95b 100644 --- a/zk/datastream/server/data_stream_server.go +++ b/zk/datastream/server/data_stream_server.go @@ -2,11 +2,13 @@ package server import ( "fmt" - "github.com/gateway-fm/zkevm-data-streamer/datastreamer" - dslog "github.com/gateway-fm/zkevm-data-streamer/log" + "github.com/c2h5oh/datasize" "sync" "time" + "github.com/gateway-fm/zkevm-data-streamer/datastreamer" + dslog "github.com/gateway-fm/zkevm-data-streamer/log" + libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon-lib/log/v3" @@ -45,7 +47,7 @@ const ( ) type ZkEVMDataStreamServer struct { - streamServer StreamServer + streamServer TcpStreamServer chainId uint64 highestBlockWritten, highestClosedBatchWritten, @@ -65,27 +67,63 @@ type DataStreamEntryProto interface { type ZkEVMDataStreamServerFactory struct { } +type StorageType uint8 + +const ( + StorageTypeFile StorageType = iota + StorageTypeMDBX +) + +type ServerType uint8 + +const ( + ServerTypeTCP ServerType = iota + ServerTypeGRPC +) + func NewZkEVMDataStreamServerFactory() *ZkEVMDataStreamServerFactory { return &ZkEVMDataStreamServerFactory{} } -func (f *ZkEVMDataStreamServerFactory) CreateStreamServer(port uint16, systemID uint64, streamType datastreamer.StreamType, fileName string, writeTimeout time.Duration, inactivityTimeout time.Duration, inactivityCheckInterval time.Duration, cfg *dslog.Config) (StreamServer, error) { +func (f *ZkEVMDataStreamServerFactory) CreateStreamServer(port uint16, systemID uint64, fileName string, writeTimeout time.Duration, inactivityTimeout time.Duration, inactivityCheckInterval time.Duration, cfg *dslog.Config, storageType StreamStoreType) (TcpStreamServer, error) { // after we moved to protobuff encoding we no longer need to support multiple versions. const datastreamVersion = 3 + + var storageFactory func() (datastreamer.StreamStore, error) + + if storageType == StreamStoreTypeMDBX { + // Create MDBX-based storage + config := &StreamStoreConfig{ + SystemID: systemID, + FilePath: fileName, + MDBXMaxDBS: 3, + MDBXMapSize: int64(3 * datasize.GB), + } + + mdbxStore, err := NewMDBXStreamStore(config) + if err != nil { + return nil, fmt.Errorf("failed to create MDBX store: %w", err) + } + + storageFactory = func() (datastreamer.StreamStore, error) { + return mdbxStore, nil + } + } + // the library still requires version as a input in it's arguments. - return datastreamer.NewServer(port, datastreamVersion, systemID, streamType, fileName, writeTimeout, inactivityTimeout, inactivityCheckInterval, cfg) + return datastreamer.NewServer(port, datastreamVersion, systemID, 1, fileName, writeTimeout, inactivityTimeout, inactivityCheckInterval, cfg, storageFactory) } -func (f *ZkEVMDataStreamServerFactory) CreateDataStreamServer(streamServer StreamServer, chainId uint64) DataStreamServer { +func (f *ZkEVMDataStreamServerFactory) CreateDataStreamServer(streamStore TcpStreamServer, chainId uint64) DataStreamServer { return &ZkEVMDataStreamServer{ - streamServer: streamServer, + streamServer: streamStore, chainId: chainId, highestBlockWritten: nil, highestBatchWritten: nil, } } -func (srv *ZkEVMDataStreamServer) GetStreamServer() StreamServer { +func (srv *ZkEVMDataStreamServer) GetStreamStore() StreamStore { return srv.streamServer } @@ -632,12 +670,12 @@ func (srv *ZkEVMDataStreamServer) getLastEntryOfType(entryType datastreamer.Entr } type dataStreamServerIterator struct { - stream StreamServer + stream StreamStore curEntryNum uint64 header uint64 } -func newDataStreamServerIterator(stream StreamServer, start uint64) *dataStreamServerIterator { +func newDataStreamServerIterator(stream StreamStore, start uint64) *dataStreamServerIterator { return &dataStreamServerIterator{ stream: stream, curEntryNum: start, diff --git a/zk/datastream/server/interfaces.go b/zk/datastream/server/interfaces.go index e5ea310c67b..7f1edc5d562 100644 --- a/zk/datastream/server/interfaces.go +++ b/zk/datastream/server/interfaces.go @@ -18,23 +18,29 @@ import ( type StreamServer interface { Start() error - StartAtomicOp() error - AddStreamEntry(etype datastreamer.EntryType, data []byte) (uint64, error) - AddStreamBookmark(bookmark []byte) (uint64, error) - CommitAtomicOp() error - RollbackAtomicOp() error - TruncateFile(entryNum uint64) error - UpdateEntryData(entryNum uint64, etype datastreamer.EntryType, data []byte) error - GetHeader() datastreamer.HeaderEntry +} + +type StreamStore interface { + datastreamer.StreamStore + GetEntry(entryNum uint64) (datastreamer.FileEntry, error) - GetBookmark(bookmark []byte) (uint64, error) - GetFirstEventAfterBookmark(bookmark []byte) (datastreamer.FileEntry, error) - GetDataBetweenBookmarks(bookmarkFrom, bookmarkTo []byte) ([]byte, error) + GetBookmark(data []byte) (uint64, error) + + UpdateEntryData(entryNum uint64, etype datastreamer.EntryType, data []byte) error + BookmarkPrintDump() } +type TcpStreamServer interface { + StreamServer + StreamStore +} + +type GrpcDataStreamServer interface { + Start() error +} type DataStreamServer interface { - GetStreamServer() StreamServer + GetStreamStore() StreamStore GetChainId() uint64 IsLastEntryBatchEnd() (isBatchEnd bool, err error) GetHighestBlockNumber() (uint64, error) diff --git a/zk/datastream/server/mdbx_stream_store.go b/zk/datastream/server/mdbx_stream_store.go new file mode 100644 index 00000000000..546078088c8 --- /dev/null +++ b/zk/datastream/server/mdbx_stream_store.go @@ -0,0 +1,786 @@ +package server + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "sync" + + "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/kv/mdbx" + "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon/zk/datastream/types" + "github.com/gateway-fm/zkevm-data-streamer/datastreamer" +) + +// MDBXStreamStore implements StreamStore using kv.RwDB interface +type MDBXStreamStore struct { + db kv.RwDB + header datastreamer.HeaderEntry + mutex sync.RWMutex + inTransaction bool + currentTx kv.RwTx + streamChannel chan datastreamer.StreamAO + atomicOp datastreamer.StreamAO + ctx context.Context + logger log.Logger +} + +// These constants define the MDBX tables we'll use +const ( + // Main tables + TableEntries = "entries" // Store all entries sequentially + TableBookmarks = "bookmarks" // Store bookmarks for fast seeking + TableMetadata = "metadata" // Store header information +) + +const ( + // Fixed StreamType value + StreamTypeValue = 1 // Always use StreamType 1 (sequencer) in this implementation +) + +const ( + // Atomic operation Status + aoNone datastreamer.AOStatus = iota + 1 + aoStarted + aoCommitting + aoRollbacking +) + +// NewMDBXStreamStore creates a new MDBX-based stream store +func NewMDBXStreamStore(config *StreamStoreConfig) (StreamStore, error) { + ctx := context.Background() + + // Use the logger from the config + logger := config.Logger + if logger == nil { + logger = log.New() // Use default logger if none provided + } + + // Configure database + opts := mdbx.NewMDBX(logger). + Path(config.FilePath + ".mdbx") + + // Open database + db, err := opts.Open(ctx) + if err != nil { + return nil, fmt.Errorf("failed to open MDBX database: %w", err) + } + + // Initialize store + store := &MDBXStreamStore{ + db: db, + header: datastreamer.NewHeader(3, config.SystemID, StreamTypeValue), + ctx: ctx, + logger: logger, + } + + // Create tables if they don't exist + err = db.Update(ctx, func(tx kv.RwTx) error { + // Create necessary buckets + for _, table := range []string{TableEntries, TableBookmarks, TableMetadata} { + if err := tx.CreateBucket(table); err != nil { + return fmt.Errorf("failed to create bucket %s: %w", table, err) + } + } + + // Try to load existing header + headerVal, err := tx.GetOne(TableMetadata, []byte("header")) + if err == nil && len(headerVal) > 0 { + existingHeader, err := decodeHeader(headerVal) + if err == nil { + store.header = *existingHeader + } + // If there's an error decoding, we'll keep the default header + } + + return nil + }) + + if err != nil { + db.Close() + return nil, fmt.Errorf("failed to initialize database: %w", err) + } + + return store, nil +} + +// SetStreamChannel sets the channel for atomic operation notifications +func (ms *MDBXStreamStore) SetStreamChannel(ch chan datastreamer.StreamAO) { + ms.mutex.Lock() + defer ms.mutex.Unlock() + ms.streamChannel = ch +} + +func (ms *MDBXStreamStore) GetNextEntry() uint64 { + return ms.header.TotalEntries +} + +func (ms *MDBXStreamStore) PrintDumpBookmarks() error { + return ms.db.View(ms.ctx, func(tx kv.Tx) error { + cursor, err := tx.Cursor(TableBookmarks) + if err != nil { + return err + } + defer cursor.Close() + + // Walk through all bookmarks + for k, v, err := cursor.First(); k != nil; k, v, err = cursor.Next() { + if err != nil { + return err + } + entryNum := binary.BigEndian.Uint64(v) + fmt.Printf("Bookmark: %X -> Entry %d\n", k, entryNum) + } + + return nil + }) +} + +// addToStream handles common entry storage logic +func (ms *MDBXStreamStore) addToStream(entryType datastreamer.EntryType, data []byte) (uint64, error) { + // Create entry + entryNum := ms.header.TotalEntries + entry := datastreamer.NewFileEntry(datastreamer.PtData, entryType, entryNum, data) + + // Encode and store entry + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, entryNum) + + entryBytes, err := encodeFileEntry(entry) + if err != nil { + return 0, err + } + + if err := ms.currentTx.Put(TableEntries, keyBytes, entryBytes); err != nil { + return 0, err + } + + // Save the entry in the server's atomic operation tracking + ms.atomicOp.Entries = append(ms.atomicOp.Entries, entry) + + // Update header (will be saved on commit) + ms.header.TotalEntries++ + ms.header.TotalLength += uint64(entry.Length) + + return entryNum, nil +} + +// AddStreamEntry adds a new entry to the stream +func (ms *MDBXStreamStore) AddStreamEntry(entryType datastreamer.EntryType, data []byte) (uint64, error) { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if !ms.inTransaction { + return 0, errors.New("must be in transaction to add entries") + } + + return ms.addToStream(entryType, data) +} + +// AddStreamBookmark adds a new bookmark to the stream +func (ms *MDBXStreamStore) AddStreamBookmark(data []byte) (uint64, error) { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if !ms.inTransaction { + return 0, errors.New("must be in transaction to add bookmarks") + } + + entryNum, err := ms.addToStream(datastreamer.EntryType(types.BookmarkEntryType), data) + if err != nil { + return 0, err + } + + // Also store in bookmark table for quick lookup + entryNumBytes := make([]byte, 8) + binary.BigEndian.PutUint64(entryNumBytes, entryNum) + if err := ms.currentTx.Put(TableBookmarks, data, entryNumBytes); err != nil { + return 0, err + } + + return entryNum, nil +} + +// GetEntry retrieves an entry from the stream +func (ms *MDBXStreamStore) GetEntry(entryNum uint64) (datastreamer.FileEntry, error) { + // Create key + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, entryNum) + + // Define the read function + readFn := func() (interface{}, error) { + var entry datastreamer.FileEntry + err := ms.db.View(ms.ctx, func(tx kv.Tx) error { + // Get from db + entryBytes, err := tx.GetOne(TableEntries, keyBytes) + if err != nil { + return err + } + if entryBytes == nil { + return fmt.Errorf("entry not found: %d", entryNum) + } + + // Decode entry + decodedEntry, err := decodeFileEntry(entryBytes) + if err != nil { + return err + } + + entry = decodedEntry + return nil + }) + return entry, err + } + + // If we're in a transaction, run in separate goroutine + if ms.inTransaction { + result, err := ms.runReaderInSeparateGoroutine(readFn) + if err != nil { + return datastreamer.FileEntry{}, err + } + return result.(datastreamer.FileEntry), nil + } + + // Otherwise, run directly + result, err := readFn() + if err != nil { + return datastreamer.FileEntry{}, err + } + return result.(datastreamer.FileEntry), nil +} + +// GetBookmark retrieves a bookmark from the stream +func (ms *MDBXStreamStore) GetBookmark(key []byte) (uint64, error) { + // Define the read function + readFn := func() (interface{}, error) { + var entryNum uint64 + err := ms.db.View(ms.ctx, func(tx kv.Tx) error { + // Get from db + entryNumBytes, err := tx.GetOne(TableBookmarks, key) + if err != nil { + return err + } + if entryNumBytes == nil { + return fmt.Errorf("bookmark not found") + } + + entryNum = binary.BigEndian.Uint64(entryNumBytes) + return nil + }) + return entryNum, err + } + + // If we're in a transaction, run in separate goroutine + if ms.inTransaction { + result, err := ms.runReaderInSeparateGoroutine(readFn) + if err != nil { + return 0, err + } + return result.(uint64), nil + } + + // Otherwise, run directly + result, err := readFn() + if err != nil { + return 0, err + } + return result.(uint64), nil +} + +// StartAtomicOp starts a transaction +func (ms *MDBXStreamStore) StartAtomicOp() error { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if ms.inTransaction { + return fmt.Errorf("already in transaction") + } + + // Begin a new write transaction + tx, err := ms.db.BeginRw(ms.ctx) + if err != nil { + return err + } + + ms.currentTx = tx + ms.inTransaction = true + + // Reset atomic operation + ms.atomicOp = datastreamer.StreamAO{ + Status: aoStarted, + StartEntry: ms.GetNextEntry(), + Entries: []datastreamer.FileEntry{}, + } + + return nil +} + +// CommitAtomicOp commits the current transaction +func (ms *MDBXStreamStore) CommitAtomicOp() error { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if !ms.inTransaction { + return fmt.Errorf("not in transaction") + } + + // Set the status to committing + ms.atomicOp.Status = aoCommitting + + // Save the header + if err := ms.WriteHeaderEntry(); err != nil { + ms.currentTx.Rollback() + ms.inTransaction = false + ms.currentTx = nil + return err + } + + // Commit the transaction + if err := ms.currentTx.Commit(); err != nil { + ms.currentTx.Rollback() + ms.inTransaction = false + ms.currentTx = nil + return err + } + + if ms.streamChannel != nil { + // Do broadcast of the committed atomic operation to the stream clients + atomic := datastreamer.StreamAO{ + Status: ms.atomicOp.Status, + StartEntry: ms.atomicOp.StartEntry, + } + atomic.Entries = make([]datastreamer.FileEntry, len(ms.atomicOp.Entries)) + copy(atomic.Entries, ms.atomicOp.Entries) + + ms.streamChannel <- atomic + } + ms.atomicOp.Entries = ms.atomicOp.Entries[:0] + ms.atomicOp.Status = aoNone + ms.inTransaction = false + ms.currentTx = nil + return nil +} + +// RollbackAtomicOp aborts the current transaction +func (ms *MDBXStreamStore) RollbackAtomicOp() error { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if !ms.inTransaction { + return fmt.Errorf("not in transaction") + } + + ms.currentTx.Rollback() + ms.inTransaction = false + ms.currentTx = nil + return nil +} + +// GetHeader returns the current header +func (ms *MDBXStreamStore) GetHeader() datastreamer.HeaderEntry { + return ms.header +} + +// TruncateFile truncates the stream to the specified entry number +func (ms *MDBXStreamStore) TruncateFile(entryNum uint64) error { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if ms.inTransaction { + return fmt.Errorf("cannot truncate while in transaction") + } + + // We need to get the old header first to calculate total length reduction + oldHeader := ms.header + newTotalLength := uint64(0) + + // Start a transaction + err := ms.db.Update(ms.ctx, func(tx kv.RwTx) error { + // Read all entries up to entryNum to calculate new total length + for i := uint64(0); i < entryNum; i++ { + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, i) + + entryBytes, err := tx.GetOne(TableEntries, keyBytes) + if err != nil { + continue // Skip missing entries + } + + entry, err := decodeFileEntry(entryBytes) + if err != nil { + continue // Skip invalid entries + } + + newTotalLength += uint64(entry.Length) + } + + // Delete entries from entryNum onwards + cursor, err := tx.RwCursor(TableEntries) + if err != nil { + return err + } + defer cursor.Close() + + for i := entryNum; i < oldHeader.TotalEntries; i++ { + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, i) + if err := cursor.Delete(keyBytes); err != nil { + // Skip errors for missing entries + continue + } + } + + // Delete bookmarks that point to deleted entries + bookmarkCursor, err := tx.RwCursor(TableBookmarks) + if err != nil { + return err + } + defer bookmarkCursor.Close() + + for k, v, err := bookmarkCursor.First(); k != nil; k, v, err = bookmarkCursor.Next() { + if err != nil { + continue // Skip errors + } + + bookmarkEntryNum := binary.BigEndian.Uint64(v) + if bookmarkEntryNum >= entryNum { + if err := bookmarkCursor.Delete(k); err != nil { + // Skip errors + continue + } + } + } + + // Update header + ms.header.TotalEntries = entryNum + ms.header.TotalLength = newTotalLength + + // Save updated header + headerBytes, err := encodeHeader(&ms.header) + if err != nil { + return err + } + + return tx.Put(TableMetadata, []byte("header"), headerBytes) + }) + + return err +} + +// IteratorFrom returns an iterator starting from the specified entry +func (ms *MDBXStreamStore) IteratorFrom(entryNum uint64, includeBookmarks bool) (*MDBXStreamStoreIterator, error) { + return newMDBXStreamStoreIterator(ms, entryNum, includeBookmarks), nil +} + +// UpdateEntryData updates the data for an existing entry +func (ms *MDBXStreamStore) UpdateEntryData(entryNum uint64, entryType datastreamer.EntryType, data []byte) error { + if err := ms.StartAtomicOp(); err != nil { + return err + } + + // Get existing entry + entry, err := ms.GetEntry(entryNum) + if err != nil { + ms.RollbackAtomicOp() + return err + } + + // Update entry + entry.Type = entryType + entry.Data = data + entry.Length = uint32(len(data) + 17) // 17 is the header size + + // Store updated entry + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, entryNum) + + entryBytes, err := encodeFileEntry(entry) + if err != nil { + ms.RollbackAtomicOp() + return err + } + + if err := ms.currentTx.Put(TableEntries, keyBytes, entryBytes); err != nil { + ms.RollbackAtomicOp() + return err + } + + return ms.CommitAtomicOp() +} + +// GetIterator returns a file iterator for the stream store +func (ms *MDBXStreamStore) GetIterator(entryNum uint64, readOnly bool) (datastreamer.StorageIterator, error) { + // Create a real iterator using our existing implementation + iterator, err := ms.IteratorFrom(entryNum, true) // Include bookmarks for compatibility + if err != nil { + return nil, err + } + + // Return the iterator as the required interface type + return iterator, nil +} + +// AddFileEntry adds a file entry directly to the stream +func (ms *MDBXStreamStore) AddFileEntry(e datastreamer.FileEntry) error { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if !ms.inTransaction { + return errors.New("must be in transaction to add entries") + } + + // Encode and store entry + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, e.Number) + + entryBytes, err := encodeFileEntry(e) + if err != nil { + return err + } + + if err := ms.currentTx.Put(TableEntries, keyBytes, entryBytes); err != nil { + return err + } + + // Only update the header if this is a new entry + if e.Number >= ms.header.TotalEntries { + ms.header.TotalEntries = e.Number + 1 + ms.header.TotalLength += uint64(e.Length) + } + + return nil +} + +// WriteHeaderEntry writes the current header to storage +func (ms *MDBXStreamStore) WriteHeaderEntry() error { + if !ms.inTransaction { + return errors.New("must be in transaction to write header") + } + + headerBytes, err := encodeHeader(&ms.header) + if err != nil { + return err + } + + return ms.currentTx.Put(TableMetadata, []byte("header"), headerBytes) +} + +// BookmarkPrintDump prints debug information about bookmarks +func (ms *MDBXStreamStore) BookmarkPrintDump() { + // This is a no-op implementation + // Only needed to satisfy the StreamStore interface +} + +// Close closes the stream store +func (ms *MDBXStreamStore) Close() error { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if ms.inTransaction { + ms.currentTx.Rollback() + ms.inTransaction = false + ms.currentTx = nil + } + + // Close the database + ms.db.Close() + return nil +} + +// MDBXStreamStoreIterator implements the datastreamer.StorageIterator interface +type MDBXStreamStoreIterator struct { + store *MDBXStreamStore + currentEntryNum uint64 + maxEntryNum uint64 + includeBookmarks bool + tx kv.Tx + currentEntry datastreamer.FileEntry + hasCurrentEntry bool + err error +} + +// Next advances the iterator to the next item +func (it *MDBXStreamStoreIterator) Next() (bool, error) { + if it.currentEntryNum > it.maxEntryNum { + it.hasCurrentEntry = false + return false, nil + } + + for { + entry, err := it.store.GetEntry(it.currentEntryNum) + it.currentEntryNum++ + + if err != nil { + // Skip missing entries + if it.currentEntryNum > it.maxEntryNum { + it.hasCurrentEntry = false + return false, nil + } + continue + } + + // Skip bookmarks if not including them + if !it.includeBookmarks && entry.Type == 176 { // 176 is bookmark type + if it.currentEntryNum > it.maxEntryNum { + it.hasCurrentEntry = false + return false, nil + } + continue + } + + it.currentEntry = entry + it.hasCurrentEntry = true + return it.currentEntryNum >= it.maxEntryNum, nil + } +} + +// End cleans up iterator resources +func (it *MDBXStreamStoreIterator) End() { + // Nothing to clean up for this implementation + it.hasCurrentEntry = false +} + +// GetEntry returns the current entry +func (it *MDBXStreamStoreIterator) GetEntry() datastreamer.FileEntry { + return it.currentEntry +} + +// newMDBXStreamStoreIterator creates a new iterator for the MDBX-based store +func newMDBXStreamStoreIterator(store *MDBXStreamStore, startEntryNum uint64, includeBookmarks bool) *MDBXStreamStoreIterator { + // Get max entry num + maxEntryNum := store.GetHeader().TotalEntries - 1 + + return &MDBXStreamStoreIterator{ + store: store, + currentEntryNum: startEntryNum, + maxEntryNum: maxEntryNum, + includeBookmarks: includeBookmarks, + } +} + +// GetEntryNumberLimit returns the maximum entry number in the store +func (it *MDBXStreamStoreIterator) GetEntryNumberLimit() uint64 { + return it.maxEntryNum + 1 +} + +// NextFileEntry returns the next file entry from the iterator +func (it *MDBXStreamStoreIterator) NextFileEntry() (*types.FileEntry, error) { + hasNext, err := it.Next() + if err != nil { + return nil, err + } + + if !hasNext { + return nil, nil + } + + // Convert from datastreamer.FileEntry to types.FileEntry for compatibility + dsEntry := it.GetEntry() + return &types.FileEntry{ + PacketType: uint8(dsEntry.Type), + Length: dsEntry.Length, + EntryType: types.EntryType(dsEntry.Type), + EntryNum: dsEntry.Number, + Data: dsEntry.Data, + }, nil +} + +// Close closes the iterator and frees associated resources +func (it *MDBXStreamStoreIterator) Close() { + it.End() +} + +// Helper functions + +// encodeHeader encodes a HeaderEntry into bytes +func encodeHeader(header *datastreamer.HeaderEntry) ([]byte, error) { + // Version(8) + SystemID(8) + TotalEntries(8) + TotalLength(8) = 32 bytes + result := make([]byte, 32) + + // Write Version (bytes 0-8) + binary.LittleEndian.PutUint64(result[0:8], uint64(header.Version)) + + // Write SystemID (bytes 8-16) + binary.LittleEndian.PutUint64(result[8:16], header.SystemID) + + // Write TotalEntries (bytes 16-24) + binary.LittleEndian.PutUint64(result[16:24], header.TotalEntries) + + // Write TotalLength (bytes 24-32) + binary.LittleEndian.PutUint64(result[24:32], header.TotalLength) + + return result, nil +} + +// decodeHeader decodes bytes into a HeaderEntry +func decodeHeader(data []byte) (*datastreamer.HeaderEntry, error) { + if len(data) < 32 { + return nil, fmt.Errorf("header data too short: got %d bytes, expected at least 32", len(data)) + } + + header := datastreamer.NewHeader( + uint8(binary.LittleEndian.Uint64(data[0:8])), + binary.LittleEndian.Uint64(data[8:16]), + datastreamer.StreamType(1), // Always use StreamType 1 (sequencer) + ) + header.TotalEntries = binary.LittleEndian.Uint64(data[16:24]) + header.TotalLength = binary.LittleEndian.Uint64(data[24:32]) + + return &header, nil +} + +// encodeFileEntry encodes a FileEntry to bytes +func encodeFileEntry(entry datastreamer.FileEntry) ([]byte, error) { + result := make([]byte, 17+len(entry.Data)) + result[0] = 2 // PacketType (2 for data) + binary.BigEndian.PutUint32(result[1:5], entry.Length) + binary.BigEndian.PutUint32(result[5:9], uint32(entry.Type)) + binary.BigEndian.PutUint64(result[9:17], entry.Number) + copy(result[17:], entry.Data) + return result, nil +} + +// decodeFileEntry decodes bytes to a FileEntry +func decodeFileEntry(data []byte) (datastreamer.FileEntry, error) { + if len(data) < 17 { + return datastreamer.FileEntry{}, errors.New("invalid file entry data") + } + + length := binary.BigEndian.Uint32(data[1:5]) + entryType := datastreamer.EntryType(binary.BigEndian.Uint32(data[5:9])) + number := binary.BigEndian.Uint64(data[9:17]) + entryData := data[17:] + + decoded := datastreamer.NewFileEntry(data[0], entryType, number, entryData) + decoded.Length = length + return decoded, nil +} + +// runReaderInSeparateGoroutine executes a read operation in a separate goroutine +// This is useful when we're in a write transaction and need to perform a read operation +// which would otherwise fail due to MDBX's thread-specific transaction restrictions +func (ms *MDBXStreamStore) runReaderInSeparateGoroutine(readFn func() (interface{}, error)) (interface{}, error) { + resultCh := make(chan interface{}, 1) + errCh := make(chan error, 1) + + go func() { + result, err := readFn() + if err != nil { + errCh <- err + return + } + resultCh <- result + }() + + // Wait for the result or error + select { + case result := <-resultCh: + return result, nil + case err := <-errCh: + return nil, err + case <-ms.ctx.Done(): + return nil, ms.ctx.Err() + } +} diff --git a/zk/datastream/server/store_interfaces.go b/zk/datastream/server/store_interfaces.go new file mode 100644 index 00000000000..31688df7f45 --- /dev/null +++ b/zk/datastream/server/store_interfaces.go @@ -0,0 +1,29 @@ +package server + +import ( + "github.com/erigontech/erigon-lib/log/v3" +) + +// StreamStoreType identifies the underlying storage implementation +type StreamStoreType string + +const ( + // StreamStoreTypeFile represents the legacy file-based storage + StreamStoreTypeFile StreamStoreType = "file" + + // StreamStoreTypeMDBX represents the MDBX-based storage + StreamStoreTypeMDBX StreamStoreType = "mdbx" +) + +// StreamStoreConfig contains configuration for stream stores +type StreamStoreConfig struct { + // Common config + SystemID uint64 + FilePath string + Logger log.Logger + + // MDBX specific options + MDBXMapSize int64 + MDBXMaxDBS int + MDBXFlags uint +} diff --git a/zk/debug_tools/datastream-host/main.go b/zk/debug_tools/datastream-host/main.go index 6550577e548..5176d9a7bba 100644 --- a/zk/debug_tools/datastream-host/main.go +++ b/zk/debug_tools/datastream-host/main.go @@ -8,7 +8,6 @@ import ( "time" "github.com/erigontech/erigon/zk/datastream/server" - "github.com/gateway-fm/zkevm-data-streamer/datastreamer" log2 "github.com/gateway-fm/zkevm-data-streamer/log" ) @@ -27,7 +26,7 @@ func main() { Outputs: []string{"stdout"}, } - stream, err := dataStreamServerFactory.CreateStreamServer(uint16(6900), 1, datastreamer.StreamType(1), file, 5*time.Second, 10*time.Second, 60*time.Second, logConfig) + stream, err := dataStreamServerFactory.CreateStreamServer(uint16(6900), 1, file, 5*time.Second, 10*time.Second, 60*time.Second, logConfig, server.StreamStoreTypeFile) if err != nil { fmt.Println("Error creating datastream server:", err) return diff --git a/zk/stages/stage_data_stream_catch_up.go b/zk/stages/stage_data_stream_catch_up.go index 83ea650c0ef..051ad086c3d 100644 --- a/zk/stages/stage_data_stream_catch_up.go +++ b/zk/stages/stage_data_stream_catch_up.go @@ -113,7 +113,7 @@ func CatchupDatastream(ctx context.Context, logPrefix string, tx kv.RwTx, srv se // a quick check that we haven't written anything to the stream yet. Stage progress is a little misleading // for genesis as we are in fact at block 0 here! Getting the header has some performance overhead, so // we only want to do this when we know the previous progress is 0. - header := srv.GetStreamServer().GetHeader() + header := srv.GetStreamStore().GetHeader() if header.TotalEntries == 0 { genesis, err := rawdb.ReadBlockByNumber(tx, 0) if err != nil {