From 2f027619e3b32544ed24b43db3306f56857e9ed4 Mon Sep 17 00:00:00 2001 From: Carl Lambert Date: Thu, 24 Apr 2025 21:45:49 +0100 Subject: [PATCH 1/5] Initial refactor to support MDBX as a data stream store. file based appears to work as before. MDBX is not fully implemented. --- cmd/rpcdaemon/cli/httpcfg/http_cfg.go | 1 + cmd/utils/flags.go | 5 + eth/backend.go | 4 +- turbo/cli/default_flags.go | 1 + turbo/cli/flags.go | 1 + .../mocks/data_stream_server_mock.go | 8 +- zk/datastream/server/data_stream_server.go | 59 +- zk/datastream/server/interfaces.go | 28 +- zk/datastream/server/store_file.go | 123 +++ zk/datastream/server/store_interfaces.go | 41 + zk/datastream/server/store_mdbx.go | 881 ++++++++++++++++++ zk/debug_tools/datastream-host/main.go | 3 +- zk/stages/stage_data_stream_catch_up.go | 2 +- 13 files changed, 1128 insertions(+), 29 deletions(-) create mode 100644 zk/datastream/server/store_file.go create mode 100644 zk/datastream/server/store_interfaces.go create mode 100644 zk/datastream/server/store_mdbx.go diff --git a/cmd/rpcdaemon/cli/httpcfg/http_cfg.go b/cmd/rpcdaemon/cli/httpcfg/http_cfg.go index 93385156514..ad84de2b3ea 100644 --- a/cmd/rpcdaemon/cli/httpcfg/http_cfg.go +++ b/cmd/rpcdaemon/cli/httpcfg/http_cfg.go @@ -100,6 +100,7 @@ type HttpCfg struct { // zkevm DataStreamPort int DataStreamHost string + DataStreamStorageType string DataStreamWriteTimeout time.Duration DataStreamInactivityTimeout time.Duration DataStreamInactivityCheckInterval time.Duration diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 1969d5751b4..fca3d107799 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -683,6 +683,11 @@ var ( Usage: "Define the host used for the zkevm data stream", Value: "", } + DataStreamStorageType = cli.StringFlag{ + Name: "zkevm.data-stream-store", + Usage: "Define the storage type used for the zkevm data stream", + Value: "file", + } DataStreamWriteTimeout = cli.DurationFlag{ Name: "zkevm.data-stream-writeTimeout", Usage: "Define the TCP write timeout when sending data to a datastream client", diff --git a/eth/backend.go b/eth/backend.go index f8e124ae5e9..2e503c7e2ac 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -224,7 +224,7 @@ type Ethereum struct { logger log.Logger // zk - streamServer server.StreamServer + streamServer server.TcpStreamServer l1Syncer *syncer.L1Syncer etherManClients []*etherman.Client l1Cache *l1_cache.L1Cache @@ -1000,7 +1000,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger } // todo [zkevm] read the stream version from config and figure out what system id is used for - backend.streamServer, err = dataStreamServerFactory.CreateStreamServer(uint16(httpCfg.DataStreamPort), 1, datastreamer.StreamType(1), file, httpCfg.DataStreamWriteTimeout, httpCfg.DataStreamInactivityTimeout, httpCfg.DataStreamInactivityCheckInterval, logConfig) + backend.streamServer, err = dataStreamServerFactory.CreateStreamServer(uint16(httpCfg.DataStreamPort), 1, file, httpCfg.DataStreamWriteTimeout, httpCfg.DataStreamInactivityTimeout, httpCfg.DataStreamInactivityCheckInterval, logConfig, server.StreamStoreType(httpCfg.DataStreamStorageType)) if err != nil { return nil, err } diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index cc1fb875f71..00cb123bb20 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -246,6 +246,7 @@ var DefaultFlags = []cli.Flag{ &utils.GasPriceFactor, &utils.DataStreamHost, &utils.DataStreamPort, + &utils.DataStreamStorageType, &utils.DataStreamWriteTimeout, &utils.DataStreamInactivityTimeout, &utils.DataStreamInactivityCheckInterval, diff --git a/turbo/cli/flags.go b/turbo/cli/flags.go index 4ef1e00faf1..9c7cbc8123b 100644 --- a/turbo/cli/flags.go +++ b/turbo/cli/flags.go @@ -553,6 +553,7 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg DataStreamPort: ctx.Int(utils.DataStreamPort.Name), DataStreamHost: ctx.String(utils.DataStreamHost.Name), + DataStreamStorageType: ctx.String(utils.DataStreamStorageType.Name), DataStreamWriteTimeout: ctx.Duration(utils.DataStreamWriteTimeout.Name), DataStreamInactivityTimeout: ctx.Duration(utils.DataStreamInactivityTimeout.Name), DataStreamInactivityCheckInterval: ctx.Duration(utils.DataStreamInactivityCheckInterval.Name), diff --git a/zk/datastream/mocks/data_stream_server_mock.go b/zk/datastream/mocks/data_stream_server_mock.go index 85674134a43..e88922f9841 100644 --- a/zk/datastream/mocks/data_stream_server_mock.go +++ b/zk/datastream/mocks/data_stream_server_mock.go @@ -241,17 +241,17 @@ func (c *MockDataStreamServerGetHighestClosedBatchNoCacheCall) DoAndReturn(f fun } // GetStreamServer mocks base method. -func (m *MockDataStreamServer) GetStreamServer() server.StreamServer { +func (m *MockDataStreamServer) GetStreamStore() server.StreamStore { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetStreamServer") - ret0, _ := ret[0].(server.StreamServer) + ret := m.ctrl.Call(m, "GetStreamStore") + ret0, _ := ret[0].(server.StreamStore) return ret0 } // GetStreamServer indicates an expected call of GetStreamServer. func (mr *MockDataStreamServerMockRecorder) GetStreamServer() *MockDataStreamServerGetStreamServerCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStreamServer", reflect.TypeOf((*MockDataStreamServer)(nil).GetStreamServer)) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStreamServer", reflect.TypeOf((*MockDataStreamServer)(nil).GetStreamStore())) return &MockDataStreamServerGetStreamServerCall{Call: call} } diff --git a/zk/datastream/server/data_stream_server.go b/zk/datastream/server/data_stream_server.go index 84c26d45a3b..c5d1a1f2a99 100644 --- a/zk/datastream/server/data_stream_server.go +++ b/zk/datastream/server/data_stream_server.go @@ -2,11 +2,12 @@ package server import ( "fmt" - "github.com/gateway-fm/zkevm-data-streamer/datastreamer" - dslog "github.com/gateway-fm/zkevm-data-streamer/log" "sync" "time" + "github.com/gateway-fm/zkevm-data-streamer/datastreamer" + dslog "github.com/gateway-fm/zkevm-data-streamer/log" + libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon-lib/log/v3" @@ -45,7 +46,7 @@ const ( ) type ZkEVMDataStreamServer struct { - streamServer StreamServer + streamServer TcpStreamServer chainId uint64 highestBlockWritten, highestClosedBatchWritten, @@ -65,27 +66,65 @@ type DataStreamEntryProto interface { type ZkEVMDataStreamServerFactory struct { } +type StorageType uint8 + +const ( + StorageTypeFile StorageType = iota + StorageTypeMDBX +) + +type ServerType uint8 + +const ( + ServerTypeTCP ServerType = iota + ServerTypeGRPC +) + func NewZkEVMDataStreamServerFactory() *ZkEVMDataStreamServerFactory { return &ZkEVMDataStreamServerFactory{} } -func (f *ZkEVMDataStreamServerFactory) CreateStreamServer(port uint16, systemID uint64, streamType datastreamer.StreamType, fileName string, writeTimeout time.Duration, inactivityTimeout time.Duration, inactivityCheckInterval time.Duration, cfg *dslog.Config) (StreamServer, error) { +func (f *ZkEVMDataStreamServerFactory) CreateStreamServer(port uint16, systemID uint64, fileName string, writeTimeout time.Duration, inactivityTimeout time.Duration, inactivityCheckInterval time.Duration, cfg *dslog.Config, storageType StreamStoreType) (TcpStreamServer, error) { // after we moved to protobuff encoding we no longer need to support multiple versions. const datastreamVersion = 3 + + var storageFactory func() (datastreamer.StreamStore, error) + + if storageType == StreamStoreTypeMDBX { + // Create MDBX-based storage + config := &StreamStoreConfig{ + SystemID: systemID, + StreamType: 1, // Always use sequencer type + FilePath: fileName, + StoreType: StreamStoreTypeMDBX, + DatastreamVersion: datastreamVersion, + MDBXMaxDBS: 3, + } + + mdbxStore, err := NewMDBXStreamStore(config) + if err != nil { + return nil, fmt.Errorf("failed to create MDBX store: %w", err) + } + + storageFactory = func() (datastreamer.StreamStore, error) { + return mdbxStore, nil + } + } + // the library still requires version as a input in it's arguments. - return datastreamer.NewServer(port, datastreamVersion, systemID, streamType, fileName, writeTimeout, inactivityTimeout, inactivityCheckInterval, cfg) + return datastreamer.NewServer(port, datastreamVersion, systemID, 1, fileName, writeTimeout, inactivityTimeout, inactivityCheckInterval, cfg, storageFactory) } -func (f *ZkEVMDataStreamServerFactory) CreateDataStreamServer(streamServer StreamServer, chainId uint64) DataStreamServer { +func (f *ZkEVMDataStreamServerFactory) CreateDataStreamServer(streamStore TcpStreamServer, chainId uint64) DataStreamServer { return &ZkEVMDataStreamServer{ - streamServer: streamServer, + streamServer: streamStore, chainId: chainId, highestBlockWritten: nil, highestBatchWritten: nil, } } -func (srv *ZkEVMDataStreamServer) GetStreamServer() StreamServer { +func (srv *ZkEVMDataStreamServer) GetStreamStore() StreamStore { return srv.streamServer } @@ -632,12 +671,12 @@ func (srv *ZkEVMDataStreamServer) getLastEntryOfType(entryType datastreamer.Entr } type dataStreamServerIterator struct { - stream StreamServer + stream StreamStore curEntryNum uint64 header uint64 } -func newDataStreamServerIterator(stream StreamServer, start uint64) *dataStreamServerIterator { +func newDataStreamServerIterator(stream StreamStore, start uint64) *dataStreamServerIterator { return &dataStreamServerIterator{ stream: stream, curEntryNum: start, diff --git a/zk/datastream/server/interfaces.go b/zk/datastream/server/interfaces.go index e5ea310c67b..7a85a41ad32 100644 --- a/zk/datastream/server/interfaces.go +++ b/zk/datastream/server/interfaces.go @@ -18,23 +18,31 @@ import ( type StreamServer interface { Start() error - StartAtomicOp() error - AddStreamEntry(etype datastreamer.EntryType, data []byte) (uint64, error) - AddStreamBookmark(bookmark []byte) (uint64, error) - CommitAtomicOp() error - RollbackAtomicOp() error - TruncateFile(entryNum uint64) error - UpdateEntryData(entryNum uint64, etype datastreamer.EntryType, data []byte) error - GetHeader() datastreamer.HeaderEntry +} + +type StreamStore interface { + datastreamer.StreamStore + GetEntry(entryNum uint64) (datastreamer.FileEntry, error) - GetBookmark(bookmark []byte) (uint64, error) + GetBookmark(data []byte) (uint64, error) GetFirstEventAfterBookmark(bookmark []byte) (datastreamer.FileEntry, error) GetDataBetweenBookmarks(bookmarkFrom, bookmarkTo []byte) ([]byte, error) + + UpdateEntryData(entryNum uint64, etype datastreamer.EntryType, data []byte) error + BookmarkPrintDump() } +type TcpStreamServer interface { + StreamServer + StreamStore +} + +type GrpcDataStreamServer interface { + Start() error +} type DataStreamServer interface { - GetStreamServer() StreamServer + GetStreamStore() StreamStore GetChainId() uint64 IsLastEntryBatchEnd() (isBatchEnd bool, err error) GetHighestBlockNumber() (uint64, error) diff --git a/zk/datastream/server/store_file.go b/zk/datastream/server/store_file.go new file mode 100644 index 00000000000..8fadb2fe56b --- /dev/null +++ b/zk/datastream/server/store_file.go @@ -0,0 +1,123 @@ +package server + +import ( + "time" + + "github.com/gateway-fm/zkevm-data-streamer/datastreamer" +) + +// FileStreamStore wraps the legacy datastreamer.Server to implement the StreamStore interface +type FileStreamStore struct { + server *datastreamer.StreamServer +} + +// NewFileStreamStore creates a new file-based stream store +func NewFileStreamStore(config *StreamStoreConfig) (*FileStreamStore, error) { + // Default values for backward compatibility + inactivityCheckInterval := time.Second * 10 + writeTimeout := time.Second * 3 + inactivityTimeout := time.Second * 120 + + server, err := datastreamer.NewServer( + 0, // port is not required for store + config.DatastreamVersion, + config.SystemID, + config.StreamType, + config.FilePath, + writeTimeout, + inactivityTimeout, + inactivityCheckInterval, + nil, // logging config + nil, // store + ) + + if err != nil { + return nil, err + } + + return &FileStreamStore{ + server: server, + }, nil +} + +// AddStreamEntry adds a new entry to the stream +func (fs *FileStreamStore) AddStreamEntry(entryType datastreamer.EntryType, data []byte) (uint64, error) { + return fs.server.AddStreamEntry(entryType, data) +} + +// AddStreamBookmark adds a new bookmark to the stream +func (fs *FileStreamStore) AddStreamBookmark(data []byte) (uint64, error) { + return fs.server.AddStreamBookmark(data) +} + +// GetEntry retrieves an entry from the stream +func (fs *FileStreamStore) GetEntry(entryNum uint64) (datastreamer.FileEntry, error) { + return fs.server.GetEntry(entryNum) +} + +// GetBookmark retrieves a bookmark from the stream +func (fs *FileStreamStore) GetBookmark(data []byte) (uint64, error) { + return fs.server.GetBookmark(data) +} + +// StartAtomicOp starts an atomic operation +func (fs *FileStreamStore) StartAtomicOp() error { + return nil +} + +// CommitAtomicOp commits an atomic operation +func (fs *FileStreamStore) CommitAtomicOp() error { + return nil +} + +// RollbackAtomicOp rolls back an atomic operation +func (fs *FileStreamStore) RollbackAtomicOp() error { + return fs.server.RollbackAtomicOp() +} + +// GetHeader retrieves the header from the stream +func (fs *FileStreamStore) GetHeader() datastreamer.HeaderEntry { + return fs.server.GetHeader() +} + +// TruncateToEntry truncates the stream to the specified entry +func (fs *FileStreamStore) TruncateToEntry(entryNum uint64) error { + return fs.server.TruncateFile(entryNum) +} + +// UpdateEntryData updates the data for an entry +func (fs *FileStreamStore) UpdateEntryData(entryNum uint64, entryType datastreamer.EntryType, data []byte) error { + return fs.server.UpdateEntryData(entryNum, entryType, data) +} + +// GetFirstEventAfterBookmark gets the first event after a bookmark +func (fs *FileStreamStore) GetFirstEventAfterBookmark(bookmark []byte) (datastreamer.FileEntry, error) { + return fs.server.GetFirstEventAfterBookmark(bookmark) +} + +// GetDataBetweenBookmarks gets data between two bookmarks +func (fs *FileStreamStore) GetDataBetweenBookmarks(bookmarkFrom, bookmarkTo []byte) ([]byte, error) { + return fs.server.GetDataBetweenBookmarks(bookmarkFrom, bookmarkTo) +} + +// Start starts the stream store +func (fs *FileStreamStore) Start() error { + return fs.server.Start() +} + +// Stop stops the stream store +func (fs *FileStreamStore) Stop() error { + // The original datastreamer.Server doesn't have a Stop method + // This is a no-op for compatibility + return nil +} + +// BookmarkPrintDump prints debug information about bookmarks +func (fs *FileStreamStore) BookmarkPrintDump() { + fs.server.BookmarkPrintDump() +} + +// TruncateFile truncates the stream to the specified entry +func (fs *FileStreamStore) TruncateFile(entryNum uint64) error { + return fs.server.TruncateFile(entryNum) +} diff --git a/zk/datastream/server/store_interfaces.go b/zk/datastream/server/store_interfaces.go new file mode 100644 index 00000000000..8afce4e03e0 --- /dev/null +++ b/zk/datastream/server/store_interfaces.go @@ -0,0 +1,41 @@ +package server + +import ( + "github.com/gateway-fm/zkevm-data-streamer/datastreamer" +) + +// StreamStoreType identifies the underlying storage implementation +type StreamStoreType string + +const ( + // StreamStoreTypeFile represents the legacy file-based storage + StreamStoreTypeFile StreamStoreType = "file" + + // StreamStoreTypeMDBX represents the MDBX-based storage + StreamStoreTypeMDBX StreamStoreType = "mdbx" +) + +// StreamStoreConfig contains configuration for stream stores +type StreamStoreConfig struct { + // Common config + SystemID uint64 + StreamType datastreamer.StreamType + FilePath string + + // Implementation selection + StoreType StreamStoreType + + // MDBX specific options + MDBXMapSize int64 + MDBXMaxDBS int + MDBXFlags uint + + // File specific options + DatastreamVersion uint8 +} + +// StreamStoreFactory creates stream stores based on configuration +type StreamStoreFactory interface { + // CreateStore creates a new stream store based on the provided configuration + CreateStore(config *StreamStoreConfig) (StreamStore, error) +} diff --git a/zk/datastream/server/store_mdbx.go b/zk/datastream/server/store_mdbx.go new file mode 100644 index 00000000000..2f98893d799 --- /dev/null +++ b/zk/datastream/server/store_mdbx.go @@ -0,0 +1,881 @@ +package server + +import ( + "encoding/binary" + "errors" + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/erigontech/erigon/zk/datastream/types" + "github.com/erigontech/mdbx-go/mdbx" + "github.com/gateway-fm/zkevm-data-streamer/datastreamer" +) + +// These constants define the MDBX tables we'll use +const ( + // Main tables + TableEntries = "entries" // Store all entries sequentially + TableBookmarks = "bookmarks" // Store bookmarks for fast seeking + TableMetadata = "metadata" // Store header information + + // Index tables + TableBlockIndex = "block_index" // Index blocks by number + TableBatchIndex = "batch_index" // Index batches by number +) + +// MDBX flags and options +const ( + + // Flags for Env.Open + NoTLS = 0x200000 // Don't use thread-local storage + + // Flags for Txn.OpenDBI + Create = 0x40000 // Create DB if not already existing + + // Fixed StreamType value + StreamTypeValue = 1 // Always use StreamType 1 (sequencer) in this implementation +) + +// MDBXStreamStore implements StreamStore using MDBX +type MDBXStreamStore struct { + env *mdbx.Env + dbi mdbx.DBI + bookmarksDbi mdbx.DBI + metadataDbi mdbx.DBI + header datastreamer.HeaderEntry + mutex sync.RWMutex + inTransaction bool + txn *mdbx.Txn // Current transaction +} + +func (ms *MDBXStreamStore) SetStreamChannel(chan datastreamer.StreamAO) { + //TODO implement me + panic("implement me") +} + +func (ms *MDBXStreamStore) GetNextEntry() uint64 { + //TODO implement me + panic("implement me") +} + +func (ms *MDBXStreamStore) PrintDumpBookmarks() error { + //TODO implement me + panic("implement me") +} + +// NewMDBXStreamStore creates a new MDBX-based stream store +func NewMDBXStreamStore(config *StreamStoreConfig) (*MDBXStreamStore, error) { + + // Create environment + env, err := mdbx.NewEnv() + if err != nil { + return nil, err + } + + // Configure MDBX + if err := env.SetOption(mdbx.OptMaxDB, uint64(config.MDBXMaxDBS)); err != nil { + env.Close() + return nil, fmt.Errorf("failed to set maxDBs: %w", err) + } + + const pageSize = 4096 + err = env.SetGeometry(-1, -1, 64*1024*pageSize, -1, -1, pageSize) + if err != nil { + env.Close() + return nil, fmt.Errorf("failed to set geometry: %w", err) + } + + file := config.FilePath + ".mdbx" + // create directory if it doesn't exist + if err := os.MkdirAll(filepath.Dir(file), 0755); err != nil { + env.Close() + return nil, fmt.Errorf("failed to data-stream directory: %w", err) + } + + if err := env.Open(file, mdbx.Create, 0644); err != nil { + env.Close() + return nil, err + } + + // Initialize store + store := &MDBXStreamStore{ + env: env, + header: datastreamer.HeaderEntry{ + Version: config.DatastreamVersion, + SystemID: config.SystemID, + TotalEntries: 0, + TotalLength: 0, + }, + } + + // Open DBIs in a transaction + txn, err := env.BeginTxn(nil, 0) + if err != nil { + env.Close() + return nil, err + } + + // Create DBIs + store.dbi, err = txn.OpenDBISimple(TableEntries, Create) + if err != nil { + txn.Abort() + env.Close() + return nil, err + } + + store.bookmarksDbi, err = txn.OpenDBISimple(TableBookmarks, Create) + if err != nil { + txn.Abort() + env.Close() + return nil, err + } + + store.metadataDbi, err = txn.OpenDBISimple(TableMetadata, Create) + if err != nil { + txn.Abort() + env.Close() + return nil, err + } + + // Try to load existing header + headerVal, err := txn.Get(store.metadataDbi, []byte("header")) + if err == nil && len(headerVal) > 0 { + existingHeader, err := decodeHeader(headerVal) + if err == nil { + store.header = *existingHeader + } + // If there's an error decoding, we'll keep the default header + } + + // Commit transaction + commit, err := txn.Commit() + if err != nil { + env.Close() + return nil, err + } + + // Ignore commit latency value: TODO add this to metrics + _ = commit + + return store, nil +} + +// AddStreamEntry adds a new entry to the stream +func (ms *MDBXStreamStore) AddStreamEntry(entryType datastreamer.EntryType, data []byte) (uint64, error) { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if !ms.inTransaction { + return 0, errors.New("must be in transaction to add entries") + } + + // Create entry + entryNum := ms.header.TotalEntries + entry := datastreamer.FileEntry{ + Type: entryType, + Length: uint32(len(data) + 17), // 17 is the header size + Number: entryNum, + Data: data, + } + + // Encode and store entry + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, entryNum) + + entryBytes, err := encodeFileEntry(entry) + if err != nil { + return 0, err + } + + if err := ms.txn.Put(ms.dbi, keyBytes, entryBytes, 0); err != nil { + return 0, err + } + + // Update header (will be saved on commit) + ms.header.TotalEntries++ + ms.header.TotalLength += uint64(entry.Length) + + return entryNum, nil +} + +// AddStreamBookmark adds a new bookmark to the stream +func (ms *MDBXStreamStore) AddStreamBookmark(data []byte) (uint64, error) { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if !ms.inTransaction { + return 0, errors.New("must be in transaction to add bookmarks") + } + + // Create bookmark entry + entryNum := ms.header.TotalEntries + entry := datastreamer.FileEntry{ + Type: 176, // Bookmark type + Length: uint32(len(data) + 17), // 17 is the header size + Number: entryNum, + Data: data, + } + + // Store entry in main table + entryKeyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(entryKeyBytes, entryNum) + + entryBytes, err := encodeFileEntry(entry) + if err != nil { + return 0, err + } + + if err := ms.txn.Put(ms.dbi, entryKeyBytes, entryBytes, 0); err != nil { + return 0, err + } + + // Also store in bookmark table for quick lookup + entryNumBytes := make([]byte, 8) + binary.BigEndian.PutUint64(entryNumBytes, entryNum) + if err := ms.txn.Put(ms.bookmarksDbi, data, entryNumBytes, 0); err != nil { + return 0, err + } + + // Update header (will be saved on commit) + ms.header.TotalEntries++ + ms.header.TotalLength += uint64(entry.Length) + + return entryNum, nil +} + +// GetEntry retrieves an entry from the stream +func (ms *MDBXStreamStore) GetEntry(entryNum uint64) (datastreamer.FileEntry, error) { + ms.mutex.RLock() + defer ms.mutex.RUnlock() + + // Create read transaction if not in a transaction + var txn *mdbx.Txn + var err error + var shouldAbort bool + + if ms.inTransaction { + txn = ms.txn + } else { + txn, err = ms.env.BeginTxn(nil, mdbx.Readonly) + if err != nil { + return datastreamer.FileEntry{}, err + } + shouldAbort = true + defer func() { + if shouldAbort { + txn.Abort() + } + }() + } + + // Create key + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, entryNum) + + // Get from db + entryBytes, err := txn.Get(ms.dbi, keyBytes) + if err != nil { + if err == mdbx.NotFound { + return datastreamer.FileEntry{}, fmt.Errorf("entry not found: %d", entryNum) + } + return datastreamer.FileEntry{}, err + } + + // Decode entry + entry, err := decodeFileEntry(entryBytes) + if err != nil { + return datastreamer.FileEntry{}, err + } + + // If we created a transaction, we need to abort it now + shouldAbort = false + if !ms.inTransaction { + txn.Abort() + } + + return entry, nil +} + +// GetBookmark retrieves a bookmark from the stream +func (ms *MDBXStreamStore) GetBookmark(data []byte) (uint64, error) { + ms.mutex.RLock() + defer ms.mutex.RUnlock() + + // Create read transaction if not in a transaction + var txn *mdbx.Txn + var err error + var shouldAbort bool + + if ms.inTransaction { + txn = ms.txn + } else { + txn, err = ms.env.BeginTxn(nil, mdbx.Readonly) + if err != nil { + return 0, err + } + shouldAbort = true + defer func() { + if shouldAbort { + txn.Abort() + } + }() + } + + // Get from db + entryNumBytes, err := txn.Get(ms.bookmarksDbi, data) + if err != nil { + if err == mdbx.NotFound { + return 0, fmt.Errorf("bookmark not found") + } + return 0, err + } + + // If we created a transaction, we need to abort it now + shouldAbort = false + if !ms.inTransaction { + txn.Abort() + } + + return binary.BigEndian.Uint64(entryNumBytes), nil +} + +// StartAtomicOp starts a transaction +func (ms *MDBXStreamStore) StartAtomicOp() error { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if ms.inTransaction { + return errors.New("transaction already in progress") + } + + // Begin a new transaction + txn, err := ms.env.BeginTxn(nil, 0) + if err != nil { + return err + } + + ms.txn = txn + ms.inTransaction = true + + return nil +} + +// CommitAtomicOp commits a transaction +func (ms *MDBXStreamStore) CommitAtomicOp() error { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if !ms.inTransaction { + return errors.New("no transaction in progress") + } + + // Save header + headerBytes, err := encodeHeader(&ms.header) + if err != nil { + ms.txn.Abort() + ms.txn = nil + ms.inTransaction = false + return fmt.Errorf("failed to encode header: %w", err) + } + + if err := ms.txn.Put(ms.metadataDbi, []byte("header"), headerBytes, 0); err != nil { + ms.txn.Abort() + ms.txn = nil + ms.inTransaction = false + return fmt.Errorf("failed to save header: %w", err) + } + + // Commit transaction + commit, err := ms.txn.Commit() + if err != nil { + ms.txn.Abort() + ms.txn = nil + ms.inTransaction = false + return fmt.Errorf("failed to commit transaction: %w", err) + } + + // Ignore commit latency: TODO add this to metrics + _ = commit + + // Clean up + ms.txn = nil + ms.inTransaction = false + + return nil +} + +// RollbackAtomicOp rolls back a transaction +func (ms *MDBXStreamStore) RollbackAtomicOp() error { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if !ms.inTransaction { + return errors.New("no transaction in progress") + } + + // Abort transaction + ms.txn.Abort() + ms.txn = nil + ms.inTransaction = false + + return nil +} + +// GetHeader retrieves the header from the stream +func (ms *MDBXStreamStore) GetHeader() datastreamer.HeaderEntry { + ms.mutex.RLock() + defer ms.mutex.RUnlock() + + // Copy of header struct + return ms.header +} + +// TruncateFile truncates the stream to the specified entry +func (ms *MDBXStreamStore) TruncateFile(entryNum uint64) error { + if err := ms.StartAtomicOp(); err != nil { + return err + } + + // TODO: Implement truncation logic + // This will require: + // 1. Getting the current header + // 2. Deleting all entries above entryNum using cursor to walk the database + // 3. Updating the header + + // For now, just return error + ms.RollbackAtomicOp() + return errors.New("truncate not implemented yet") +} + +// IteratorFrom creates an iterator starting from the specified entry +func (ms *MDBXStreamStore) IteratorFrom(entryNum uint64, includeBookmarks bool) (*MDBXStreamStoreIterator, error) { + return newMDBXStreamStoreIterator(ms, entryNum, includeBookmarks), nil +} + +// Helper functions + +// encodeHeader encodes a HeaderEntry into bytes +func encodeHeader(header *datastreamer.HeaderEntry) ([]byte, error) { + // Version(8) + SystemID(8) + TotalEntries(8) + TotalLength(8) = 32 bytes + result := make([]byte, 32) + + // Write Version (bytes 0-8) + binary.LittleEndian.PutUint64(result[0:8], uint64(header.Version)) + + // Write SystemID (bytes 8-16) + binary.LittleEndian.PutUint64(result[8:16], header.SystemID) + + // Write TotalEntries (bytes 16-24) + binary.LittleEndian.PutUint64(result[16:24], header.TotalEntries) + + // Write TotalLength (bytes 24-32) + binary.LittleEndian.PutUint64(result[24:32], header.TotalLength) + + return result, nil +} + +// decodeHeader decodes bytes into a HeaderEntry +func decodeHeader(data []byte) (*datastreamer.HeaderEntry, error) { + if len(data) < 32 { + return nil, fmt.Errorf("header data too short: got %d bytes, expected at least 32", len(data)) + } + + header := &datastreamer.HeaderEntry{ + // Read from encoded data + Version: uint8(binary.LittleEndian.Uint64(data[0:8])), + SystemID: binary.LittleEndian.Uint64(data[8:16]), + TotalEntries: binary.LittleEndian.Uint64(data[16:24]), + TotalLength: binary.LittleEndian.Uint64(data[24:32]), + } + + return header, nil +} + +// encodeFileEntry encodes a FileEntry to bytes +func encodeFileEntry(entry datastreamer.FileEntry) ([]byte, error) { + result := make([]byte, 17+len(entry.Data)) + result[0] = 2 // PacketType (2 for data) + binary.BigEndian.PutUint32(result[1:5], entry.Length) + binary.BigEndian.PutUint32(result[5:9], uint32(entry.Type)) + binary.BigEndian.PutUint64(result[9:17], entry.Number) + copy(result[17:], entry.Data) + return result, nil +} + +// decodeFileEntry decodes bytes to a FileEntry +func decodeFileEntry(data []byte) (datastreamer.FileEntry, error) { + if len(data) < 17 { + return datastreamer.FileEntry{}, errors.New("invalid file entry data") + } + + length := binary.BigEndian.Uint32(data[1:5]) + entryType := datastreamer.EntryType(binary.BigEndian.Uint32(data[5:9])) + number := binary.BigEndian.Uint64(data[9:17]) + entryData := data[17:] + + return datastreamer.FileEntry{ + Type: entryType, + Length: length, + Number: number, + Data: entryData, + }, nil +} + +// MDBXStreamStoreIterator implements the datastreamer.StorageIterator interface +type MDBXStreamStoreIterator struct { + store *MDBXStreamStore + currentEntryNum uint64 + maxEntryNum uint64 + includeBookmarks bool + txn *mdbx.Txn + cursor *mdbx.Cursor + currentEntry datastreamer.FileEntry + hasCurrentEntry bool + err error +} + +// IteratorNext advances the iterator to the next item +func (it *MDBXStreamStoreIterator) Next() (bool, error) { + if it.currentEntryNum > it.maxEntryNum { + it.hasCurrentEntry = false + return false, nil + } + + // Initialize transaction and cursor if needed + if it.txn == nil { + var err error + it.txn, err = it.store.env.BeginTxn(nil, mdbx.Readonly) + if err != nil { + it.hasCurrentEntry = false + it.err = err + return false, err + } + + it.cursor, err = it.txn.OpenCursor(it.store.dbi) + if err != nil { + it.txn.Abort() + it.txn = nil + it.hasCurrentEntry = false + it.err = err + return false, err + } + } + + // Create key for current entry + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, it.currentEntryNum) + + // Get entry from db + entryBytes, err := it.txn.Get(it.store.dbi, keyBytes) + if err != nil { + if err == mdbx.NotFound { + // Skip to next entry + it.currentEntryNum++ + return it.Next() + } + it.cursor.Close() + it.txn.Abort() + it.txn = nil + it.cursor = nil + it.hasCurrentEntry = false + it.err = err + return false, err + } + + // Decode entry + entry, err := decodeFileEntry(entryBytes) + if err != nil { + it.cursor.Close() + it.txn.Abort() + it.txn = nil + it.cursor = nil + it.hasCurrentEntry = false + it.err = err + return false, err + } + + it.currentEntryNum++ + + // Skip bookmarks if not including them + if !it.includeBookmarks && entry.Type == 176 { // 176 is bookmark type + return it.Next() + } + + it.currentEntry = entry + it.hasCurrentEntry = true + return true, nil +} + +// IteratorEnd cleans up iterator resources +func (it *MDBXStreamStoreIterator) End() { + if it.cursor != nil { + it.cursor.Close() + it.cursor = nil + } + + if it.txn != nil { + it.txn.Abort() + it.txn = nil + } + + it.hasCurrentEntry = false +} + +// GetEntry returns the current entry +func (it *MDBXStreamStoreIterator) GetEntry() datastreamer.FileEntry { + if !it.hasCurrentEntry { + panic("No current entry exists, call iteratorNext first") + } + return it.currentEntry +} + +// newMDBXStreamStoreIterator creates a new iterator for the MDBX-based store +func newMDBXStreamStoreIterator(store *MDBXStreamStore, startEntryNum uint64, includeBookmarks bool) *MDBXStreamStoreIterator { + // Get max entry num + maxEntryNum := store.GetHeader().TotalEntries - 1 + + // We initialize txn and cursor on first use to avoid issues with transaction lifetimes + return &MDBXStreamStoreIterator{ + store: store, + currentEntryNum: startEntryNum, + maxEntryNum: maxEntryNum, + includeBookmarks: includeBookmarks, + } +} + +// GetEntryNumberLimit returns the maximum entry number in the store +func (it *MDBXStreamStoreIterator) GetEntryNumberLimit() uint64 { + return it.maxEntryNum + 1 +} + +// NextFileEntry returns the next file entry from the iterator +func (it *MDBXStreamStoreIterator) NextFileEntry() (*types.FileEntry, error) { + hasNext, err := it.Next() + if err != nil { + return nil, err + } + + if !hasNext { + return nil, nil + } + + // Convert from datastreamer.FileEntry to types.FileEntry for compatibility + dsEntry := it.GetEntry() + return &types.FileEntry{ + PacketType: uint8(dsEntry.Type), + Length: dsEntry.Length, + EntryType: types.EntryType(dsEntry.Type), + EntryNum: dsEntry.Number, + Data: dsEntry.Data, + }, nil +} + +// Close closes the iterator and frees associated resources +func (it *MDBXStreamStoreIterator) Close() { + it.End() +} + +// BookmarkPrintDump prints debug information about bookmarks +func (ms *MDBXStreamStore) BookmarkPrintDump() { + // This is a no-op for MDBX implementation + // Only needed to satisfy the StreamStore interface +} + +// GetFirstEventAfterBookmark gets the first event after a bookmark +func (ms *MDBXStreamStore) GetFirstEventAfterBookmark(bookmark []byte) (datastreamer.FileEntry, error) { + // Get bookmark entry number + entryNum, err := ms.GetBookmark(bookmark) + if err != nil { + return datastreamer.FileEntry{}, err + } + + // Create iterator + iter, err := ms.IteratorFrom(entryNum, false) + if err != nil { + return datastreamer.FileEntry{}, err + } + + // Skip the bookmark entry itself + _, err = iter.NextFileEntry() + if err != nil { + return datastreamer.FileEntry{}, err + } + + // Get the next entry + entry, err := iter.NextFileEntry() + if err != nil { + return datastreamer.FileEntry{}, err + } + + if entry == nil { + return datastreamer.FileEntry{}, errors.New("no entries after bookmark") + } + + // Convert back to datastreamer.FileEntry + return datastreamer.FileEntry{ + Type: datastreamer.EntryType(entry.EntryType), + Length: entry.Length, + Number: entry.EntryNum, + Data: entry.Data, + }, nil +} + +// GetDataBetweenBookmarks gets data between two bookmarks +func (ms *MDBXStreamStore) GetDataBetweenBookmarks(bookmarkFrom, bookmarkTo []byte) ([]byte, error) { + // Get bookmark entry numbers + fromEntryNum, err := ms.GetBookmark(bookmarkFrom) + if err != nil { + return nil, err + } + + toEntryNum, err := ms.GetBookmark(bookmarkTo) + if err != nil { + return nil, err + } + + // Create an iterator + iter, err := ms.IteratorFrom(fromEntryNum, false) + if err != nil { + return nil, err + } + + // Collect all data between bookmarks + var result []byte + for { + entry, err := iter.NextFileEntry() + if err != nil { + return nil, err + } + + if entry == nil || entry.EntryNum > toEntryNum { + break + } + + result = append(result, entry.Data...) + } + + return result, nil +} + +// UpdateEntryData updates the data for an entry +func (ms *MDBXStreamStore) UpdateEntryData(entryNum uint64, entryType datastreamer.EntryType, data []byte) error { + if err := ms.StartAtomicOp(); err != nil { + return err + } + + // Get existing entry + entry, err := ms.GetEntry(entryNum) + if err != nil { + ms.RollbackAtomicOp() + return err + } + + // Update entry + entry.Type = entryType + entry.Data = data + entry.Length = uint32(len(data) + 17) // 17 is the header size + + // Store updated entry + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, entryNum) + + entryBytes, err := encodeFileEntry(entry) + if err != nil { + ms.RollbackAtomicOp() + return err + } + + if err := ms.txn.Put(ms.dbi, keyBytes, entryBytes, 0); err != nil { + ms.RollbackAtomicOp() + return err + } + + return ms.CommitAtomicOp() +} + +// GetIterator returns a file iterator for the MDBX stream store +func (ms *MDBXStreamStore) GetIterator(entryNum uint64, readOnly bool) (datastreamer.StorageIterator, error) { + // Create a real iterator using our existing implementation + iterator, err := ms.IteratorFrom(entryNum, true) // Include bookmarks for compatibility + if err != nil { + return nil, err + } + + // Return the iterator as an interface that can be type-asserted by the caller + return iterator, nil +} + +// AddFileEntry adds a file entry directly to the stream +func (ms *MDBXStreamStore) AddFileEntry(e datastreamer.FileEntry) error { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if !ms.inTransaction { + return errors.New("must be in transaction to add entries") + } + + // Encode and store entry + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, e.Number) + + entryBytes, err := encodeFileEntry(e) + if err != nil { + return err + } + + if err := ms.txn.Put(ms.dbi, keyBytes, entryBytes, 0); err != nil { + return err + } + + // Only update the header if this is a new entry + if e.Number >= ms.header.TotalEntries { + ms.header.TotalEntries = e.Number + 1 + ms.header.TotalLength += uint64(e.Length) + } + + return nil +} + +// WriteHeaderEntry writes the current header to storage +func (ms *MDBXStreamStore) WriteHeaderEntry() error { + if !ms.inTransaction { + if err := ms.StartAtomicOp(); err != nil { + return err + } + defer ms.CommitAtomicOp() + } + + headerBytes, err := encodeHeader(&ms.header) + if err != nil { + return fmt.Errorf("failed to encode header: %w", err) + } + + if err := ms.txn.Put(ms.metadataDbi, []byte("header"), headerBytes, 0); err != nil { + return fmt.Errorf("failed to save header: %w", err) + } + + return nil +} + +// Close closes the MDBX environment and releases resources +func (ms *MDBXStreamStore) Close() error { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + // If there's an active transaction, abort it + if ms.inTransaction && ms.txn != nil { + ms.txn.Abort() + ms.txn = nil + ms.inTransaction = false + } + + // Close environment + if ms.env != nil { + ms.env.Close() + ms.env = nil + } + + return nil +} diff --git a/zk/debug_tools/datastream-host/main.go b/zk/debug_tools/datastream-host/main.go index 6550577e548..5176d9a7bba 100644 --- a/zk/debug_tools/datastream-host/main.go +++ b/zk/debug_tools/datastream-host/main.go @@ -8,7 +8,6 @@ import ( "time" "github.com/erigontech/erigon/zk/datastream/server" - "github.com/gateway-fm/zkevm-data-streamer/datastreamer" log2 "github.com/gateway-fm/zkevm-data-streamer/log" ) @@ -27,7 +26,7 @@ func main() { Outputs: []string{"stdout"}, } - stream, err := dataStreamServerFactory.CreateStreamServer(uint16(6900), 1, datastreamer.StreamType(1), file, 5*time.Second, 10*time.Second, 60*time.Second, logConfig) + stream, err := dataStreamServerFactory.CreateStreamServer(uint16(6900), 1, file, 5*time.Second, 10*time.Second, 60*time.Second, logConfig, server.StreamStoreTypeFile) if err != nil { fmt.Println("Error creating datastream server:", err) return diff --git a/zk/stages/stage_data_stream_catch_up.go b/zk/stages/stage_data_stream_catch_up.go index 83ea650c0ef..051ad086c3d 100644 --- a/zk/stages/stage_data_stream_catch_up.go +++ b/zk/stages/stage_data_stream_catch_up.go @@ -113,7 +113,7 @@ func CatchupDatastream(ctx context.Context, logPrefix string, tx kv.RwTx, srv se // a quick check that we haven't written anything to the stream yet. Stage progress is a little misleading // for genesis as we are in fact at block 0 here! Getting the header has some performance overhead, so // we only want to do this when we know the previous progress is 0. - header := srv.GetStreamServer().GetHeader() + header := srv.GetStreamStore().GetHeader() if header.TotalEntries == 0 { genesis, err := rawdb.ReadBlockByNumber(tx, 0) if err != nil { From d94665f109715279d0903f72a9784c38c0e30492 Mon Sep 17 00:00:00 2001 From: Carl Lambert Date: Mon, 28 Apr 2025 10:53:08 +0100 Subject: [PATCH 2/5] WIP: MDBX implemented, still not stable issues to be ironed out, will not currently build due to unpublished dependency changes. --- cmd/utils/flags.go | 2 +- zk/datastream/server/data_stream_server.go | 13 +- zk/datastream/server/store_file.go | 4 +- zk/datastream/server/store_interfaces.go | 14 +- zk/datastream/server/store_mdbx.go | 155 +++-- zk/datastream/server/store_mdbx_rwdb.go | 741 +++++++++++++++++++++ 6 files changed, 865 insertions(+), 64 deletions(-) create mode 100644 zk/datastream/server/store_mdbx_rwdb.go diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index fca3d107799..34167d73c05 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -685,7 +685,7 @@ var ( } DataStreamStorageType = cli.StringFlag{ Name: "zkevm.data-stream-store", - Usage: "Define the storage type used for the zkevm data stream", + Usage: "Define the storage type used for the zkevm data stream (file, mdbx)", Value: "file", } DataStreamWriteTimeout = cli.DurationFlag{ diff --git a/zk/datastream/server/data_stream_server.go b/zk/datastream/server/data_stream_server.go index c5d1a1f2a99..dbef4e2002e 100644 --- a/zk/datastream/server/data_stream_server.go +++ b/zk/datastream/server/data_stream_server.go @@ -2,6 +2,7 @@ package server import ( "fmt" + "github.com/c2h5oh/datasize" "sync" "time" @@ -93,15 +94,13 @@ func (f *ZkEVMDataStreamServerFactory) CreateStreamServer(port uint16, systemID if storageType == StreamStoreTypeMDBX { // Create MDBX-based storage config := &StreamStoreConfig{ - SystemID: systemID, - StreamType: 1, // Always use sequencer type - FilePath: fileName, - StoreType: StreamStoreTypeMDBX, - DatastreamVersion: datastreamVersion, - MDBXMaxDBS: 3, + SystemID: systemID, + FilePath: fileName, + MDBXMaxDBS: 3, + MDBXMapSize: int64(3 * datasize.GB), } - mdbxStore, err := NewMDBXStreamStore(config) + mdbxStore, err := NewMDBXRwDBStreamStore(config) if err != nil { return nil, fmt.Errorf("failed to create MDBX store: %w", err) } diff --git a/zk/datastream/server/store_file.go b/zk/datastream/server/store_file.go index 8fadb2fe56b..384ddf3a011 100644 --- a/zk/datastream/server/store_file.go +++ b/zk/datastream/server/store_file.go @@ -20,9 +20,9 @@ func NewFileStreamStore(config *StreamStoreConfig) (*FileStreamStore, error) { server, err := datastreamer.NewServer( 0, // port is not required for store - config.DatastreamVersion, + 3, config.SystemID, - config.StreamType, + 1, config.FilePath, writeTimeout, inactivityTimeout, diff --git a/zk/datastream/server/store_interfaces.go b/zk/datastream/server/store_interfaces.go index 8afce4e03e0..2d8b42384d6 100644 --- a/zk/datastream/server/store_interfaces.go +++ b/zk/datastream/server/store_interfaces.go @@ -1,7 +1,7 @@ package server import ( - "github.com/gateway-fm/zkevm-data-streamer/datastreamer" + "github.com/erigontech/erigon-lib/log/v3" ) // StreamStoreType identifies the underlying storage implementation @@ -18,20 +18,14 @@ const ( // StreamStoreConfig contains configuration for stream stores type StreamStoreConfig struct { // Common config - SystemID uint64 - StreamType datastreamer.StreamType - FilePath string - - // Implementation selection - StoreType StreamStoreType + SystemID uint64 + FilePath string + Logger log.Logger // MDBX specific options MDBXMapSize int64 MDBXMaxDBS int MDBXFlags uint - - // File specific options - DatastreamVersion uint8 } // StreamStoreFactory creates stream stores based on configuration diff --git a/zk/datastream/server/store_mdbx.go b/zk/datastream/server/store_mdbx.go index 2f98893d799..614ef65a230 100644 --- a/zk/datastream/server/store_mdbx.go +++ b/zk/datastream/server/store_mdbx.go @@ -48,21 +48,56 @@ type MDBXStreamStore struct { mutex sync.RWMutex inTransaction bool txn *mdbx.Txn // Current transaction + streamChannel chan datastreamer.StreamAO + atomicOp datastreamer.StreamAO } -func (ms *MDBXStreamStore) SetStreamChannel(chan datastreamer.StreamAO) { - //TODO implement me - panic("implement me") +// SetStreamChannel sets the channel for atomic operation notifications +func (ms *MDBXStreamStore) SetStreamChannel(ch chan datastreamer.StreamAO) { + ms.mutex.Lock() + defer ms.mutex.Unlock() + ms.streamChannel = ch } func (ms *MDBXStreamStore) GetNextEntry() uint64 { - //TODO implement me - panic("implement me") + ms.mutex.RLock() + defer ms.mutex.RUnlock() + return ms.header.TotalEntries } func (ms *MDBXStreamStore) PrintDumpBookmarks() error { - //TODO implement me - panic("implement me") + ms.mutex.RLock() + defer ms.mutex.RUnlock() + + // Create read transaction + txn, err := ms.env.BeginTxn(nil, mdbx.Readonly) + if err != nil { + return err + } + defer txn.Abort() + + // Create cursor for bookmarks + cursor, err := txn.OpenCursor(ms.bookmarksDbi) + if err != nil { + return err + } + defer cursor.Close() + + // Walk through all bookmarks + key, val, err := cursor.Get(nil, nil, mdbx.First) + for ; err == nil; key, val, err = cursor.Get(nil, nil, mdbx.Next) { + if key == nil { + break + } + entryNum := binary.BigEndian.Uint64(val) + fmt.Printf("Bookmark: %X -> Entry %d\n", key, entryNum) + } + + if err != nil && err != mdbx.NotFound { + return err + } + + return nil } // NewMDBXStreamStore creates a new MDBX-based stream store @@ -103,7 +138,7 @@ func NewMDBXStreamStore(config *StreamStoreConfig) (*MDBXStreamStore, error) { store := &MDBXStreamStore{ env: env, header: datastreamer.HeaderEntry{ - Version: config.DatastreamVersion, + Version: 3, SystemID: config.SystemID, TotalEntries: 0, TotalLength: 0, @@ -250,25 +285,12 @@ func (ms *MDBXStreamStore) GetEntry(entryNum uint64) (datastreamer.FileEntry, er ms.mutex.RLock() defer ms.mutex.RUnlock() - // Create read transaction if not in a transaction - var txn *mdbx.Txn - var err error - var shouldAbort bool - - if ms.inTransaction { - txn = ms.txn - } else { - txn, err = ms.env.BeginTxn(nil, mdbx.Readonly) - if err != nil { - return datastreamer.FileEntry{}, err - } - shouldAbort = true - defer func() { - if shouldAbort { - txn.Abort() - } - }() + // Create read transaction, setting parent if one exists + txn, err := ms.env.BeginTxn(ms.txn, mdbx.Readonly) + if err != nil { + return datastreamer.FileEntry{}, err } + defer txn.Abort() // Ensure the transaction is aborted if not committed // Create key keyBytes := make([]byte, 8) @@ -289,12 +311,6 @@ func (ms *MDBXStreamStore) GetEntry(entryNum uint64) (datastreamer.FileEntry, er return datastreamer.FileEntry{}, err } - // If we created a transaction, we need to abort it now - shouldAbort = false - if !ms.inTransaction { - txn.Abort() - } - return entry, nil } @@ -403,6 +419,18 @@ func (ms *MDBXStreamStore) CommitAtomicOp() error { ms.txn = nil ms.inTransaction = false + if ms.streamChannel != nil { + // Do broadcast of the committed atomic operation to the stream clients + atomic := datastreamer.StreamAO{ + Status: ms.atomicOp.Status, + StartEntry: ms.atomicOp.StartEntry, + } + atomic.Entries = make([]datastreamer.FileEntry, len(ms.atomicOp.Entries)) + copy(atomic.Entries, ms.atomicOp.Entries) + + ms.streamChannel <- atomic + } + return nil } @@ -438,15 +466,57 @@ func (ms *MDBXStreamStore) TruncateFile(entryNum uint64) error { return err } - // TODO: Implement truncation logic - // This will require: - // 1. Getting the current header - // 2. Deleting all entries above entryNum using cursor to walk the database - // 3. Updating the header + // Get the current header + currentTotal := ms.header.TotalEntries + + if entryNum >= currentTotal { + // Nothing to truncate + ms.RollbackAtomicOp() + return nil + } + + // Create cursor to iterate through entries to delete + cursor, err := ms.txn.OpenCursor(ms.dbi) + if err != nil { + ms.RollbackAtomicOp() + return err + } + defer cursor.Close() + + // Calculate total length adjustment + var lengthToSubtract uint64 = 0 + + // Delete entries from entryNum to the end + for i := entryNum; i < currentTotal; i++ { + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, i) + + // Get the entry to calculate length adjustment + entryBytes, err := ms.txn.Get(ms.dbi, keyBytes) + if err == nil { + entry, err := decodeFileEntry(entryBytes) + if err == nil { + lengthToSubtract += uint64(entry.Length) + + // If it's a bookmark, also remove from bookmarks table + if entry.Type == 176 { // Bookmark type + ms.txn.Del(ms.bookmarksDbi, entry.Data, nil) + } + } + } + + // Delete the entry + if err := ms.txn.Del(ms.dbi, keyBytes, nil); err != nil { + ms.RollbackAtomicOp() + return err + } + } + + // Update header + ms.header.TotalEntries = entryNum + ms.header.TotalLength -= lengthToSubtract - // For now, just return error - ms.RollbackAtomicOp() - return errors.New("truncate not implemented yet") + return ms.CommitAtomicOp() } // IteratorFrom creates an iterator starting from the specified entry @@ -625,9 +695,6 @@ func (it *MDBXStreamStoreIterator) End() { // GetEntry returns the current entry func (it *MDBXStreamStoreIterator) GetEntry() datastreamer.FileEntry { - if !it.hasCurrentEntry { - panic("No current entry exists, call iteratorNext first") - } return it.currentEntry } @@ -803,7 +870,7 @@ func (ms *MDBXStreamStore) GetIterator(entryNum uint64, readOnly bool) (datastre return nil, err } - // Return the iterator as an interface that can be type-asserted by the caller + // Return the iterator as the required interface type return iterator, nil } diff --git a/zk/datastream/server/store_mdbx_rwdb.go b/zk/datastream/server/store_mdbx_rwdb.go new file mode 100644 index 00000000000..12aca7d87bc --- /dev/null +++ b/zk/datastream/server/store_mdbx_rwdb.go @@ -0,0 +1,741 @@ +package server + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "sync" + + "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/kv/mdbx" + "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon/zk/datastream/types" + "github.com/gateway-fm/zkevm-data-streamer/datastreamer" +) + +// MDBXRwDBStreamStore implements StreamStore using kv.RwDB interface +type MDBXRwDBStreamStore struct { + db kv.RwDB + header datastreamer.HeaderEntry + mutex sync.RWMutex + inTransaction bool + currentTx kv.RwTx + streamChannel chan datastreamer.StreamAO + atomicOp datastreamer.StreamAO + ctx context.Context + logger log.Logger +} + +const ( + // Atomic operation Status + aoNone datastreamer.AOStatus = iota + 1 + aoStarted + aoCommitting + aoRollbacking +) + +// NewMDBXRwDBStreamStore creates a new kv.RwDB-based stream store +func NewMDBXRwDBStreamStore(config *StreamStoreConfig) (*MDBXRwDBStreamStore, error) { + ctx := context.Background() + + // Use the logger from the config + logger := config.Logger + if logger == nil { + logger = log.New() // Use default logger if none provided + } + + // Configure database + opts := mdbx.NewMDBX(logger). + Path(config.FilePath + ".mdbx") + + // Open database + db, err := opts.Open(ctx) + if err != nil { + return nil, fmt.Errorf("failed to open MDBX database: %w", err) + } + + // Initialize store + store := &MDBXRwDBStreamStore{ + db: db, + header: datastreamer.NewHeader(3, config.SystemID, StreamTypeValue), + ctx: ctx, + logger: logger, + } + + // Create tables if they don't exist + err = db.Update(ctx, func(tx kv.RwTx) error { + // Create necessary buckets + for _, table := range []string{TableEntries, TableBookmarks, TableMetadata} { + if err := tx.CreateBucket(table); err != nil { + return fmt.Errorf("failed to create bucket %s: %w", table, err) + } + } + + // Try to load existing header + headerVal, err := tx.GetOne(TableMetadata, []byte("header")) + if err == nil && len(headerVal) > 0 { + existingHeader, err := decodeHeader(headerVal) + if err == nil { + store.header = *existingHeader + } + // If there's an error decoding, we'll keep the default header + } + + return nil + }) + + if err != nil { + db.Close() + return nil, fmt.Errorf("failed to initialize database: %w", err) + } + + return store, nil +} + +// SetStreamChannel sets the channel for atomic operation notifications +func (ms *MDBXRwDBStreamStore) SetStreamChannel(ch chan datastreamer.StreamAO) { + ms.mutex.Lock() + defer ms.mutex.Unlock() + ms.streamChannel = ch +} + +func (ms *MDBXRwDBStreamStore) GetNextEntry() uint64 { + return ms.header.TotalEntries +} + +func (ms *MDBXRwDBStreamStore) PrintDumpBookmarks() error { + ms.mutex.RLock() + defer ms.mutex.RUnlock() + + return ms.db.View(ms.ctx, func(tx kv.Tx) error { + cursor, err := tx.Cursor(TableBookmarks) + if err != nil { + return err + } + defer cursor.Close() + + // Walk through all bookmarks + for k, v, err := cursor.First(); k != nil; k, v, err = cursor.Next() { + if err != nil { + return err + } + entryNum := binary.BigEndian.Uint64(v) + fmt.Printf("Bookmark: %X -> Entry %d\n", k, entryNum) + } + + return nil + }) +} + +// addToStream handles common entry storage logic +func (ms *MDBXRwDBStreamStore) addToStream(entryType datastreamer.EntryType, data []byte) (uint64, error) { + // Create entry + entryNum := ms.header.TotalEntries + entry := datastreamer.NewFileEntry(datastreamer.PtData, entryType, entryNum, data) + + // Encode and store entry + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, entryNum) + + entryBytes, err := encodeFileEntry(entry) + if err != nil { + return 0, err + } + + if err := ms.currentTx.Put(TableEntries, keyBytes, entryBytes); err != nil { + return 0, err + } + + // Save the entry in the server's atomic operation tracking + ms.atomicOp.Entries = append(ms.atomicOp.Entries, entry) + + // Update header (will be saved on commit) + ms.header.TotalEntries++ + ms.header.TotalLength += uint64(entry.Length) + + return entryNum, nil +} + +// AddStreamEntry adds a new entry to the stream +func (ms *MDBXRwDBStreamStore) AddStreamEntry(entryType datastreamer.EntryType, data []byte) (uint64, error) { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if !ms.inTransaction { + return 0, errors.New("must be in transaction to add entries") + } + + return ms.addToStream(entryType, data) +} + +// AddStreamBookmark adds a new bookmark to the stream +func (ms *MDBXRwDBStreamStore) AddStreamBookmark(data []byte) (uint64, error) { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if !ms.inTransaction { + return 0, errors.New("must be in transaction to add bookmarks") + } + + entryNum, err := ms.addToStream(176, data) // 176 is bookmark type + if err != nil { + return 0, err + } + + // Also store in bookmark table for quick lookup + entryNumBytes := make([]byte, 8) + binary.BigEndian.PutUint64(entryNumBytes, entryNum) + if err := ms.currentTx.Put(TableBookmarks, data, entryNumBytes); err != nil { + return 0, err + } + + return entryNum, nil +} + +// GetEntry retrieves an entry from the stream +func (ms *MDBXRwDBStreamStore) GetEntry(entryNum uint64) (datastreamer.FileEntry, error) { + ms.mutex.RLock() + defer ms.mutex.RUnlock() + + var entry datastreamer.FileEntry + + // Function to get the entry using a transaction + getEntryFn := func(tx kv.Tx) error { + // Create key + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, entryNum) + + // Get from db + entryBytes, err := tx.GetOne(TableEntries, keyBytes) + if err != nil { + return err + } + if entryBytes == nil { + return fmt.Errorf("entry not found: %d", entryNum) + } + + // Decode entry + decodedEntry, err := decodeFileEntry(entryBytes) + if err != nil { + return err + } + + entry = decodedEntry + return nil + } + + // If we're in a transaction, use the current transaction + if ms.inTransaction { + err := getEntryFn(ms.currentTx) + return entry, err + } + + // Otherwise, start a new read transaction + err := ms.db.View(ms.ctx, func(tx kv.Tx) error { + return getEntryFn(tx) + }) + + return entry, err +} + +// GetBookmark retrieves a bookmark from the stream +func (ms *MDBXRwDBStreamStore) GetBookmark(data []byte) (uint64, error) { + ms.mutex.RLock() + defer ms.mutex.RUnlock() + + var entryNum uint64 + + // Function to get the bookmark using a transaction + getBookmarkFn := func(tx kv.Tx) error { + // Get from db + entryNumBytes, err := tx.GetOne(TableBookmarks, data) + if err != nil { + return err + } + if entryNumBytes == nil { + return fmt.Errorf("bookmark not found") + } + + entryNum = binary.BigEndian.Uint64(entryNumBytes) + return nil + } + + // If we're in a transaction, use the current transaction + if ms.inTransaction { + err := getBookmarkFn(ms.currentTx) + return entryNum, err + } + + // Otherwise, start a new read transaction + err := ms.db.View(ms.ctx, func(tx kv.Tx) error { + return getBookmarkFn(tx) + }) + + return entryNum, err +} + +// StartAtomicOp starts a transaction +func (ms *MDBXRwDBStreamStore) StartAtomicOp() error { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if ms.inTransaction { + return fmt.Errorf("already in transaction") + } + + // Begin a new write transaction + tx, err := ms.db.BeginRw(ms.ctx) + if err != nil { + return err + } + + ms.currentTx = tx + ms.inTransaction = true + + // Reset atomic operation + ms.atomicOp = datastreamer.StreamAO{ + Status: aoStarted, + StartEntry: ms.GetNextEntry(), + Entries: []datastreamer.FileEntry{}, + } + + return nil +} + +// CommitAtomicOp commits the current transaction +func (ms *MDBXRwDBStreamStore) CommitAtomicOp() error { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if !ms.inTransaction { + return fmt.Errorf("not in transaction") + } + + // Save the header + if err := ms.WriteHeaderEntry(); err != nil { + ms.currentTx.Rollback() + ms.inTransaction = false + ms.currentTx = nil + return err + } + + // Commit the transaction + if err := ms.currentTx.Commit(); err != nil { + ms.currentTx.Rollback() + ms.inTransaction = false + ms.currentTx = nil + return err + } + + if ms.streamChannel != nil { + // Do broadcast of the committed atomic operation to the stream clients + atomic := datastreamer.StreamAO{ + Status: ms.atomicOp.Status, + StartEntry: ms.atomicOp.StartEntry, + } + atomic.Entries = make([]datastreamer.FileEntry, len(ms.atomicOp.Entries)) + copy(atomic.Entries, ms.atomicOp.Entries) + + ms.streamChannel <- atomic + } + ms.atomicOp.Entries = ms.atomicOp.Entries[:0] + ms.atomicOp.Status = aoNone + ms.inTransaction = false + ms.currentTx = nil + return nil +} + +// RollbackAtomicOp aborts the current transaction +func (ms *MDBXRwDBStreamStore) RollbackAtomicOp() error { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if !ms.inTransaction { + return fmt.Errorf("not in transaction") + } + + ms.currentTx.Rollback() + ms.inTransaction = false + ms.currentTx = nil + return nil +} + +// GetHeader returns the current header +func (ms *MDBXRwDBStreamStore) GetHeader() datastreamer.HeaderEntry { + return ms.header +} + +// TruncateFile truncates the stream to the specified entry number +func (ms *MDBXRwDBStreamStore) TruncateFile(entryNum uint64) error { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if ms.inTransaction { + return fmt.Errorf("cannot truncate while in transaction") + } + + // We need to get the old header first to calculate total length reduction + oldHeader := ms.header + newTotalLength := uint64(0) + + // Start a transaction + err := ms.db.Update(ms.ctx, func(tx kv.RwTx) error { + // Read all entries up to entryNum to calculate new total length + for i := uint64(0); i < entryNum; i++ { + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, i) + + entryBytes, err := tx.GetOne(TableEntries, keyBytes) + if err != nil { + continue // Skip missing entries + } + + entry, err := decodeFileEntry(entryBytes) + if err != nil { + continue // Skip invalid entries + } + + newTotalLength += uint64(entry.Length) + } + + // Delete entries from entryNum onwards + cursor, err := tx.RwCursor(TableEntries) + if err != nil { + return err + } + defer cursor.Close() + + for i := entryNum; i < oldHeader.TotalEntries; i++ { + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, i) + if err := cursor.Delete(keyBytes); err != nil { + // Skip errors for missing entries + continue + } + } + + // Delete bookmarks that point to deleted entries + bookmarkCursor, err := tx.RwCursor(TableBookmarks) + if err != nil { + return err + } + defer bookmarkCursor.Close() + + for k, v, err := bookmarkCursor.First(); k != nil; k, v, err = bookmarkCursor.Next() { + if err != nil { + continue // Skip errors + } + + bookmarkEntryNum := binary.BigEndian.Uint64(v) + if bookmarkEntryNum >= entryNum { + if err := bookmarkCursor.Delete(k); err != nil { + // Skip errors + continue + } + } + } + + // Update header + ms.header.TotalEntries = entryNum + ms.header.TotalLength = newTotalLength + + // Save updated header + headerBytes, err := encodeHeader(&ms.header) + if err != nil { + return err + } + + return tx.Put(TableMetadata, []byte("header"), headerBytes) + }) + + return err +} + +// IteratorFrom returns an iterator starting from the specified entry +func (ms *MDBXRwDBStreamStore) IteratorFrom(entryNum uint64, includeBookmarks bool) (*MDBXRwDBStreamStoreIterator, error) { + return newMDBXRwDBStreamStoreIterator(ms, entryNum, includeBookmarks), nil +} + +// GetFirstEventAfterBookmark gets the first event after a bookmark +func (ms *MDBXRwDBStreamStore) GetFirstEventAfterBookmark(bookmark []byte) (datastreamer.FileEntry, error) { + // Get entry number from bookmark + entryNum, err := ms.GetBookmark(bookmark) + if err != nil { + return datastreamer.FileEntry{}, err + } + + // Create iterator from that entry + iterator, err := ms.IteratorFrom(entryNum, false) // Skip bookmarks + if err != nil { + return datastreamer.FileEntry{}, err + } + defer iterator.Close() + + // Get first event entry after bookmark + hasNext, err := iterator.Next() + if err != nil { + return datastreamer.FileEntry{}, err + } + + if !hasNext { + return datastreamer.FileEntry{}, fmt.Errorf("no events after bookmark") + } + + return iterator.GetEntry(), nil +} + +// GetDataBetweenBookmarks gets all data between two bookmarks +func (ms *MDBXRwDBStreamStore) GetDataBetweenBookmarks(bookmarkFrom, bookmarkTo []byte) ([]byte, error) { + // Get entry numbers from bookmarks + fromEntryNum, err := ms.GetBookmark(bookmarkFrom) + if err != nil { + return nil, err + } + + toEntryNum, err := ms.GetBookmark(bookmarkTo) + if err != nil { + return nil, err + } + + if fromEntryNum >= toEntryNum { + return nil, fmt.Errorf("invalid bookmark range") + } + + // Collect all data in the range + var data []byte + for i := fromEntryNum + 1; i < toEntryNum; i++ { + entry, err := ms.GetEntry(i) + if err != nil { + continue // Skip errors + } + + // Skip bookmarks + if entry.Type == 176 { // Bookmark type + continue + } + + data = append(data, entry.Data...) + } + + return data, nil +} + +// UpdateEntryData updates the data for an existing entry +func (ms *MDBXRwDBStreamStore) UpdateEntryData(entryNum uint64, entryType datastreamer.EntryType, data []byte) error { + if err := ms.StartAtomicOp(); err != nil { + return err + } + + // Get existing entry + entry, err := ms.GetEntry(entryNum) + if err != nil { + ms.RollbackAtomicOp() + return err + } + + // Update entry + entry.Type = entryType + entry.Data = data + entry.Length = uint32(len(data) + 17) // 17 is the header size + + // Store updated entry + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, entryNum) + + entryBytes, err := encodeFileEntry(entry) + if err != nil { + ms.RollbackAtomicOp() + return err + } + + if err := ms.currentTx.Put(TableEntries, keyBytes, entryBytes); err != nil { + ms.RollbackAtomicOp() + return err + } + + return ms.CommitAtomicOp() +} + +// GetIterator returns a file iterator for the stream store +func (ms *MDBXRwDBStreamStore) GetIterator(entryNum uint64, readOnly bool) (datastreamer.StorageIterator, error) { + // Create a real iterator using our existing implementation + iterator, err := ms.IteratorFrom(entryNum, true) // Include bookmarks for compatibility + if err != nil { + return nil, err + } + + // Return the iterator as the required interface type + return iterator, nil +} + +// AddFileEntry adds a file entry directly to the stream +func (ms *MDBXRwDBStreamStore) AddFileEntry(e datastreamer.FileEntry) error { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if !ms.inTransaction { + return errors.New("must be in transaction to add entries") + } + + // Encode and store entry + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, e.Number) + + entryBytes, err := encodeFileEntry(e) + if err != nil { + return err + } + + if err := ms.currentTx.Put(TableEntries, keyBytes, entryBytes); err != nil { + return err + } + + // Only update the header if this is a new entry + if e.Number >= ms.header.TotalEntries { + ms.header.TotalEntries = e.Number + 1 + ms.header.TotalLength += uint64(e.Length) + } + + return nil +} + +// WriteHeaderEntry writes the current header to storage +func (ms *MDBXRwDBStreamStore) WriteHeaderEntry() error { + if !ms.inTransaction { + return errors.New("must be in transaction to write header") + } + + headerBytes, err := encodeHeader(&ms.header) + if err != nil { + return err + } + + return ms.currentTx.Put(TableMetadata, []byte("header"), headerBytes) +} + +// BookmarkPrintDump prints debug information about bookmarks +func (ms *MDBXRwDBStreamStore) BookmarkPrintDump() { + // This is a no-op implementation + // Only needed to satisfy the StreamStore interface +} + +// Close closes the stream store +func (ms *MDBXRwDBStreamStore) Close() error { + ms.mutex.Lock() + defer ms.mutex.Unlock() + + if ms.inTransaction { + ms.currentTx.Rollback() + ms.inTransaction = false + ms.currentTx = nil + } + + // Close the database + ms.db.Close() + return nil +} + +// MDBXRwDBStreamStoreIterator implements the datastreamer.StorageIterator interface +type MDBXRwDBStreamStoreIterator struct { + store *MDBXRwDBStreamStore + currentEntryNum uint64 + maxEntryNum uint64 + includeBookmarks bool + tx kv.Tx + currentEntry datastreamer.FileEntry + hasCurrentEntry bool + err error +} + +// Next advances the iterator to the next item +func (it *MDBXRwDBStreamStoreIterator) Next() (bool, error) { + if it.currentEntryNum > it.maxEntryNum { + it.hasCurrentEntry = false + return false, nil + } + + for { + // Get entry at current position + entry, err := it.store.GetEntry(it.currentEntryNum) + it.currentEntryNum++ + + if err != nil { + // Skip missing entries + if it.currentEntryNum > it.maxEntryNum { + it.hasCurrentEntry = false + return false, nil + } + continue + } + + // Skip bookmarks if not including them + if !it.includeBookmarks && entry.Type == 176 { // 176 is bookmark type + if it.currentEntryNum > it.maxEntryNum { + it.hasCurrentEntry = false + return false, nil + } + continue + } + + it.currentEntry = entry + it.hasCurrentEntry = true + return true, nil + } +} + +// End cleans up iterator resources +func (it *MDBXRwDBStreamStoreIterator) End() { + // Nothing to clean up for this implementation + it.hasCurrentEntry = false +} + +// GetEntry returns the current entry +func (it *MDBXRwDBStreamStoreIterator) GetEntry() datastreamer.FileEntry { + return it.currentEntry +} + +// newMDBXRwDBStreamStoreIterator creates a new iterator for the kv.RwDB-based store +func newMDBXRwDBStreamStoreIterator(store *MDBXRwDBStreamStore, startEntryNum uint64, includeBookmarks bool) *MDBXRwDBStreamStoreIterator { + // Get max entry num + maxEntryNum := store.GetHeader().TotalEntries - 1 + + return &MDBXRwDBStreamStoreIterator{ + store: store, + currentEntryNum: startEntryNum, + maxEntryNum: maxEntryNum, + includeBookmarks: includeBookmarks, + } +} + +// GetEntryNumberLimit returns the maximum entry number in the store +func (it *MDBXRwDBStreamStoreIterator) GetEntryNumberLimit() uint64 { + return it.maxEntryNum + 1 +} + +// NextFileEntry returns the next file entry from the iterator +func (it *MDBXRwDBStreamStoreIterator) NextFileEntry() (*types.FileEntry, error) { + hasNext, err := it.Next() + if err != nil { + return nil, err + } + + if !hasNext { + return nil, nil + } + + // Convert from datastreamer.FileEntry to types.FileEntry for compatibility + dsEntry := it.GetEntry() + return &types.FileEntry{ + PacketType: uint8(dsEntry.Type), + Length: dsEntry.Length, + EntryType: types.EntryType(dsEntry.Type), + EntryNum: dsEntry.Number, + Data: dsEntry.Data, + }, nil +} + +// Close closes the iterator and frees associated resources +func (it *MDBXRwDBStreamStoreIterator) Close() { + it.End() +} From c9c6cccddcb24da6ca5da6e6718bed64fa13e0b8 Mon Sep 17 00:00:00 2001 From: Carl Lambert Date: Wed, 7 May 2025 14:19:20 +0100 Subject: [PATCH 3/5] WIP: rename MDBXRwDBStreamStore to MDBXStreamStore fix a number of transactional related issues with MDBX fix decoding bug missing packetType WIP: rename MDBXRwDBStreamStore to MDBXStreamStore fix a number of transactional related issues with MDBX fix decoding bug missing packetType --- zk/datastream/server/data_stream_server.go | 2 +- zk/datastream/server/store_file.go | 123 --- zk/datastream/server/store_mdbx.go | 948 --------------------- zk/datastream/server/store_mdbx_rwdb.go | 445 +++++++--- 4 files changed, 329 insertions(+), 1189 deletions(-) delete mode 100644 zk/datastream/server/store_file.go delete mode 100644 zk/datastream/server/store_mdbx.go diff --git a/zk/datastream/server/data_stream_server.go b/zk/datastream/server/data_stream_server.go index dbef4e2002e..09981a1d95b 100644 --- a/zk/datastream/server/data_stream_server.go +++ b/zk/datastream/server/data_stream_server.go @@ -100,7 +100,7 @@ func (f *ZkEVMDataStreamServerFactory) CreateStreamServer(port uint16, systemID MDBXMapSize: int64(3 * datasize.GB), } - mdbxStore, err := NewMDBXRwDBStreamStore(config) + mdbxStore, err := NewMDBXStreamStore(config) if err != nil { return nil, fmt.Errorf("failed to create MDBX store: %w", err) } diff --git a/zk/datastream/server/store_file.go b/zk/datastream/server/store_file.go deleted file mode 100644 index 384ddf3a011..00000000000 --- a/zk/datastream/server/store_file.go +++ /dev/null @@ -1,123 +0,0 @@ -package server - -import ( - "time" - - "github.com/gateway-fm/zkevm-data-streamer/datastreamer" -) - -// FileStreamStore wraps the legacy datastreamer.Server to implement the StreamStore interface -type FileStreamStore struct { - server *datastreamer.StreamServer -} - -// NewFileStreamStore creates a new file-based stream store -func NewFileStreamStore(config *StreamStoreConfig) (*FileStreamStore, error) { - // Default values for backward compatibility - inactivityCheckInterval := time.Second * 10 - writeTimeout := time.Second * 3 - inactivityTimeout := time.Second * 120 - - server, err := datastreamer.NewServer( - 0, // port is not required for store - 3, - config.SystemID, - 1, - config.FilePath, - writeTimeout, - inactivityTimeout, - inactivityCheckInterval, - nil, // logging config - nil, // store - ) - - if err != nil { - return nil, err - } - - return &FileStreamStore{ - server: server, - }, nil -} - -// AddStreamEntry adds a new entry to the stream -func (fs *FileStreamStore) AddStreamEntry(entryType datastreamer.EntryType, data []byte) (uint64, error) { - return fs.server.AddStreamEntry(entryType, data) -} - -// AddStreamBookmark adds a new bookmark to the stream -func (fs *FileStreamStore) AddStreamBookmark(data []byte) (uint64, error) { - return fs.server.AddStreamBookmark(data) -} - -// GetEntry retrieves an entry from the stream -func (fs *FileStreamStore) GetEntry(entryNum uint64) (datastreamer.FileEntry, error) { - return fs.server.GetEntry(entryNum) -} - -// GetBookmark retrieves a bookmark from the stream -func (fs *FileStreamStore) GetBookmark(data []byte) (uint64, error) { - return fs.server.GetBookmark(data) -} - -// StartAtomicOp starts an atomic operation -func (fs *FileStreamStore) StartAtomicOp() error { - return nil -} - -// CommitAtomicOp commits an atomic operation -func (fs *FileStreamStore) CommitAtomicOp() error { - return nil -} - -// RollbackAtomicOp rolls back an atomic operation -func (fs *FileStreamStore) RollbackAtomicOp() error { - return fs.server.RollbackAtomicOp() -} - -// GetHeader retrieves the header from the stream -func (fs *FileStreamStore) GetHeader() datastreamer.HeaderEntry { - return fs.server.GetHeader() -} - -// TruncateToEntry truncates the stream to the specified entry -func (fs *FileStreamStore) TruncateToEntry(entryNum uint64) error { - return fs.server.TruncateFile(entryNum) -} - -// UpdateEntryData updates the data for an entry -func (fs *FileStreamStore) UpdateEntryData(entryNum uint64, entryType datastreamer.EntryType, data []byte) error { - return fs.server.UpdateEntryData(entryNum, entryType, data) -} - -// GetFirstEventAfterBookmark gets the first event after a bookmark -func (fs *FileStreamStore) GetFirstEventAfterBookmark(bookmark []byte) (datastreamer.FileEntry, error) { - return fs.server.GetFirstEventAfterBookmark(bookmark) -} - -// GetDataBetweenBookmarks gets data between two bookmarks -func (fs *FileStreamStore) GetDataBetweenBookmarks(bookmarkFrom, bookmarkTo []byte) ([]byte, error) { - return fs.server.GetDataBetweenBookmarks(bookmarkFrom, bookmarkTo) -} - -// Start starts the stream store -func (fs *FileStreamStore) Start() error { - return fs.server.Start() -} - -// Stop stops the stream store -func (fs *FileStreamStore) Stop() error { - // The original datastreamer.Server doesn't have a Stop method - // This is a no-op for compatibility - return nil -} - -// BookmarkPrintDump prints debug information about bookmarks -func (fs *FileStreamStore) BookmarkPrintDump() { - fs.server.BookmarkPrintDump() -} - -// TruncateFile truncates the stream to the specified entry -func (fs *FileStreamStore) TruncateFile(entryNum uint64) error { - return fs.server.TruncateFile(entryNum) -} diff --git a/zk/datastream/server/store_mdbx.go b/zk/datastream/server/store_mdbx.go deleted file mode 100644 index 614ef65a230..00000000000 --- a/zk/datastream/server/store_mdbx.go +++ /dev/null @@ -1,948 +0,0 @@ -package server - -import ( - "encoding/binary" - "errors" - "fmt" - "os" - "path/filepath" - "sync" - - "github.com/erigontech/erigon/zk/datastream/types" - "github.com/erigontech/mdbx-go/mdbx" - "github.com/gateway-fm/zkevm-data-streamer/datastreamer" -) - -// These constants define the MDBX tables we'll use -const ( - // Main tables - TableEntries = "entries" // Store all entries sequentially - TableBookmarks = "bookmarks" // Store bookmarks for fast seeking - TableMetadata = "metadata" // Store header information - - // Index tables - TableBlockIndex = "block_index" // Index blocks by number - TableBatchIndex = "batch_index" // Index batches by number -) - -// MDBX flags and options -const ( - - // Flags for Env.Open - NoTLS = 0x200000 // Don't use thread-local storage - - // Flags for Txn.OpenDBI - Create = 0x40000 // Create DB if not already existing - - // Fixed StreamType value - StreamTypeValue = 1 // Always use StreamType 1 (sequencer) in this implementation -) - -// MDBXStreamStore implements StreamStore using MDBX -type MDBXStreamStore struct { - env *mdbx.Env - dbi mdbx.DBI - bookmarksDbi mdbx.DBI - metadataDbi mdbx.DBI - header datastreamer.HeaderEntry - mutex sync.RWMutex - inTransaction bool - txn *mdbx.Txn // Current transaction - streamChannel chan datastreamer.StreamAO - atomicOp datastreamer.StreamAO -} - -// SetStreamChannel sets the channel for atomic operation notifications -func (ms *MDBXStreamStore) SetStreamChannel(ch chan datastreamer.StreamAO) { - ms.mutex.Lock() - defer ms.mutex.Unlock() - ms.streamChannel = ch -} - -func (ms *MDBXStreamStore) GetNextEntry() uint64 { - ms.mutex.RLock() - defer ms.mutex.RUnlock() - return ms.header.TotalEntries -} - -func (ms *MDBXStreamStore) PrintDumpBookmarks() error { - ms.mutex.RLock() - defer ms.mutex.RUnlock() - - // Create read transaction - txn, err := ms.env.BeginTxn(nil, mdbx.Readonly) - if err != nil { - return err - } - defer txn.Abort() - - // Create cursor for bookmarks - cursor, err := txn.OpenCursor(ms.bookmarksDbi) - if err != nil { - return err - } - defer cursor.Close() - - // Walk through all bookmarks - key, val, err := cursor.Get(nil, nil, mdbx.First) - for ; err == nil; key, val, err = cursor.Get(nil, nil, mdbx.Next) { - if key == nil { - break - } - entryNum := binary.BigEndian.Uint64(val) - fmt.Printf("Bookmark: %X -> Entry %d\n", key, entryNum) - } - - if err != nil && err != mdbx.NotFound { - return err - } - - return nil -} - -// NewMDBXStreamStore creates a new MDBX-based stream store -func NewMDBXStreamStore(config *StreamStoreConfig) (*MDBXStreamStore, error) { - - // Create environment - env, err := mdbx.NewEnv() - if err != nil { - return nil, err - } - - // Configure MDBX - if err := env.SetOption(mdbx.OptMaxDB, uint64(config.MDBXMaxDBS)); err != nil { - env.Close() - return nil, fmt.Errorf("failed to set maxDBs: %w", err) - } - - const pageSize = 4096 - err = env.SetGeometry(-1, -1, 64*1024*pageSize, -1, -1, pageSize) - if err != nil { - env.Close() - return nil, fmt.Errorf("failed to set geometry: %w", err) - } - - file := config.FilePath + ".mdbx" - // create directory if it doesn't exist - if err := os.MkdirAll(filepath.Dir(file), 0755); err != nil { - env.Close() - return nil, fmt.Errorf("failed to data-stream directory: %w", err) - } - - if err := env.Open(file, mdbx.Create, 0644); err != nil { - env.Close() - return nil, err - } - - // Initialize store - store := &MDBXStreamStore{ - env: env, - header: datastreamer.HeaderEntry{ - Version: 3, - SystemID: config.SystemID, - TotalEntries: 0, - TotalLength: 0, - }, - } - - // Open DBIs in a transaction - txn, err := env.BeginTxn(nil, 0) - if err != nil { - env.Close() - return nil, err - } - - // Create DBIs - store.dbi, err = txn.OpenDBISimple(TableEntries, Create) - if err != nil { - txn.Abort() - env.Close() - return nil, err - } - - store.bookmarksDbi, err = txn.OpenDBISimple(TableBookmarks, Create) - if err != nil { - txn.Abort() - env.Close() - return nil, err - } - - store.metadataDbi, err = txn.OpenDBISimple(TableMetadata, Create) - if err != nil { - txn.Abort() - env.Close() - return nil, err - } - - // Try to load existing header - headerVal, err := txn.Get(store.metadataDbi, []byte("header")) - if err == nil && len(headerVal) > 0 { - existingHeader, err := decodeHeader(headerVal) - if err == nil { - store.header = *existingHeader - } - // If there's an error decoding, we'll keep the default header - } - - // Commit transaction - commit, err := txn.Commit() - if err != nil { - env.Close() - return nil, err - } - - // Ignore commit latency value: TODO add this to metrics - _ = commit - - return store, nil -} - -// AddStreamEntry adds a new entry to the stream -func (ms *MDBXStreamStore) AddStreamEntry(entryType datastreamer.EntryType, data []byte) (uint64, error) { - ms.mutex.Lock() - defer ms.mutex.Unlock() - - if !ms.inTransaction { - return 0, errors.New("must be in transaction to add entries") - } - - // Create entry - entryNum := ms.header.TotalEntries - entry := datastreamer.FileEntry{ - Type: entryType, - Length: uint32(len(data) + 17), // 17 is the header size - Number: entryNum, - Data: data, - } - - // Encode and store entry - keyBytes := make([]byte, 8) - binary.BigEndian.PutUint64(keyBytes, entryNum) - - entryBytes, err := encodeFileEntry(entry) - if err != nil { - return 0, err - } - - if err := ms.txn.Put(ms.dbi, keyBytes, entryBytes, 0); err != nil { - return 0, err - } - - // Update header (will be saved on commit) - ms.header.TotalEntries++ - ms.header.TotalLength += uint64(entry.Length) - - return entryNum, nil -} - -// AddStreamBookmark adds a new bookmark to the stream -func (ms *MDBXStreamStore) AddStreamBookmark(data []byte) (uint64, error) { - ms.mutex.Lock() - defer ms.mutex.Unlock() - - if !ms.inTransaction { - return 0, errors.New("must be in transaction to add bookmarks") - } - - // Create bookmark entry - entryNum := ms.header.TotalEntries - entry := datastreamer.FileEntry{ - Type: 176, // Bookmark type - Length: uint32(len(data) + 17), // 17 is the header size - Number: entryNum, - Data: data, - } - - // Store entry in main table - entryKeyBytes := make([]byte, 8) - binary.BigEndian.PutUint64(entryKeyBytes, entryNum) - - entryBytes, err := encodeFileEntry(entry) - if err != nil { - return 0, err - } - - if err := ms.txn.Put(ms.dbi, entryKeyBytes, entryBytes, 0); err != nil { - return 0, err - } - - // Also store in bookmark table for quick lookup - entryNumBytes := make([]byte, 8) - binary.BigEndian.PutUint64(entryNumBytes, entryNum) - if err := ms.txn.Put(ms.bookmarksDbi, data, entryNumBytes, 0); err != nil { - return 0, err - } - - // Update header (will be saved on commit) - ms.header.TotalEntries++ - ms.header.TotalLength += uint64(entry.Length) - - return entryNum, nil -} - -// GetEntry retrieves an entry from the stream -func (ms *MDBXStreamStore) GetEntry(entryNum uint64) (datastreamer.FileEntry, error) { - ms.mutex.RLock() - defer ms.mutex.RUnlock() - - // Create read transaction, setting parent if one exists - txn, err := ms.env.BeginTxn(ms.txn, mdbx.Readonly) - if err != nil { - return datastreamer.FileEntry{}, err - } - defer txn.Abort() // Ensure the transaction is aborted if not committed - - // Create key - keyBytes := make([]byte, 8) - binary.BigEndian.PutUint64(keyBytes, entryNum) - - // Get from db - entryBytes, err := txn.Get(ms.dbi, keyBytes) - if err != nil { - if err == mdbx.NotFound { - return datastreamer.FileEntry{}, fmt.Errorf("entry not found: %d", entryNum) - } - return datastreamer.FileEntry{}, err - } - - // Decode entry - entry, err := decodeFileEntry(entryBytes) - if err != nil { - return datastreamer.FileEntry{}, err - } - - return entry, nil -} - -// GetBookmark retrieves a bookmark from the stream -func (ms *MDBXStreamStore) GetBookmark(data []byte) (uint64, error) { - ms.mutex.RLock() - defer ms.mutex.RUnlock() - - // Create read transaction if not in a transaction - var txn *mdbx.Txn - var err error - var shouldAbort bool - - if ms.inTransaction { - txn = ms.txn - } else { - txn, err = ms.env.BeginTxn(nil, mdbx.Readonly) - if err != nil { - return 0, err - } - shouldAbort = true - defer func() { - if shouldAbort { - txn.Abort() - } - }() - } - - // Get from db - entryNumBytes, err := txn.Get(ms.bookmarksDbi, data) - if err != nil { - if err == mdbx.NotFound { - return 0, fmt.Errorf("bookmark not found") - } - return 0, err - } - - // If we created a transaction, we need to abort it now - shouldAbort = false - if !ms.inTransaction { - txn.Abort() - } - - return binary.BigEndian.Uint64(entryNumBytes), nil -} - -// StartAtomicOp starts a transaction -func (ms *MDBXStreamStore) StartAtomicOp() error { - ms.mutex.Lock() - defer ms.mutex.Unlock() - - if ms.inTransaction { - return errors.New("transaction already in progress") - } - - // Begin a new transaction - txn, err := ms.env.BeginTxn(nil, 0) - if err != nil { - return err - } - - ms.txn = txn - ms.inTransaction = true - - return nil -} - -// CommitAtomicOp commits a transaction -func (ms *MDBXStreamStore) CommitAtomicOp() error { - ms.mutex.Lock() - defer ms.mutex.Unlock() - - if !ms.inTransaction { - return errors.New("no transaction in progress") - } - - // Save header - headerBytes, err := encodeHeader(&ms.header) - if err != nil { - ms.txn.Abort() - ms.txn = nil - ms.inTransaction = false - return fmt.Errorf("failed to encode header: %w", err) - } - - if err := ms.txn.Put(ms.metadataDbi, []byte("header"), headerBytes, 0); err != nil { - ms.txn.Abort() - ms.txn = nil - ms.inTransaction = false - return fmt.Errorf("failed to save header: %w", err) - } - - // Commit transaction - commit, err := ms.txn.Commit() - if err != nil { - ms.txn.Abort() - ms.txn = nil - ms.inTransaction = false - return fmt.Errorf("failed to commit transaction: %w", err) - } - - // Ignore commit latency: TODO add this to metrics - _ = commit - - // Clean up - ms.txn = nil - ms.inTransaction = false - - if ms.streamChannel != nil { - // Do broadcast of the committed atomic operation to the stream clients - atomic := datastreamer.StreamAO{ - Status: ms.atomicOp.Status, - StartEntry: ms.atomicOp.StartEntry, - } - atomic.Entries = make([]datastreamer.FileEntry, len(ms.atomicOp.Entries)) - copy(atomic.Entries, ms.atomicOp.Entries) - - ms.streamChannel <- atomic - } - - return nil -} - -// RollbackAtomicOp rolls back a transaction -func (ms *MDBXStreamStore) RollbackAtomicOp() error { - ms.mutex.Lock() - defer ms.mutex.Unlock() - - if !ms.inTransaction { - return errors.New("no transaction in progress") - } - - // Abort transaction - ms.txn.Abort() - ms.txn = nil - ms.inTransaction = false - - return nil -} - -// GetHeader retrieves the header from the stream -func (ms *MDBXStreamStore) GetHeader() datastreamer.HeaderEntry { - ms.mutex.RLock() - defer ms.mutex.RUnlock() - - // Copy of header struct - return ms.header -} - -// TruncateFile truncates the stream to the specified entry -func (ms *MDBXStreamStore) TruncateFile(entryNum uint64) error { - if err := ms.StartAtomicOp(); err != nil { - return err - } - - // Get the current header - currentTotal := ms.header.TotalEntries - - if entryNum >= currentTotal { - // Nothing to truncate - ms.RollbackAtomicOp() - return nil - } - - // Create cursor to iterate through entries to delete - cursor, err := ms.txn.OpenCursor(ms.dbi) - if err != nil { - ms.RollbackAtomicOp() - return err - } - defer cursor.Close() - - // Calculate total length adjustment - var lengthToSubtract uint64 = 0 - - // Delete entries from entryNum to the end - for i := entryNum; i < currentTotal; i++ { - keyBytes := make([]byte, 8) - binary.BigEndian.PutUint64(keyBytes, i) - - // Get the entry to calculate length adjustment - entryBytes, err := ms.txn.Get(ms.dbi, keyBytes) - if err == nil { - entry, err := decodeFileEntry(entryBytes) - if err == nil { - lengthToSubtract += uint64(entry.Length) - - // If it's a bookmark, also remove from bookmarks table - if entry.Type == 176 { // Bookmark type - ms.txn.Del(ms.bookmarksDbi, entry.Data, nil) - } - } - } - - // Delete the entry - if err := ms.txn.Del(ms.dbi, keyBytes, nil); err != nil { - ms.RollbackAtomicOp() - return err - } - } - - // Update header - ms.header.TotalEntries = entryNum - ms.header.TotalLength -= lengthToSubtract - - return ms.CommitAtomicOp() -} - -// IteratorFrom creates an iterator starting from the specified entry -func (ms *MDBXStreamStore) IteratorFrom(entryNum uint64, includeBookmarks bool) (*MDBXStreamStoreIterator, error) { - return newMDBXStreamStoreIterator(ms, entryNum, includeBookmarks), nil -} - -// Helper functions - -// encodeHeader encodes a HeaderEntry into bytes -func encodeHeader(header *datastreamer.HeaderEntry) ([]byte, error) { - // Version(8) + SystemID(8) + TotalEntries(8) + TotalLength(8) = 32 bytes - result := make([]byte, 32) - - // Write Version (bytes 0-8) - binary.LittleEndian.PutUint64(result[0:8], uint64(header.Version)) - - // Write SystemID (bytes 8-16) - binary.LittleEndian.PutUint64(result[8:16], header.SystemID) - - // Write TotalEntries (bytes 16-24) - binary.LittleEndian.PutUint64(result[16:24], header.TotalEntries) - - // Write TotalLength (bytes 24-32) - binary.LittleEndian.PutUint64(result[24:32], header.TotalLength) - - return result, nil -} - -// decodeHeader decodes bytes into a HeaderEntry -func decodeHeader(data []byte) (*datastreamer.HeaderEntry, error) { - if len(data) < 32 { - return nil, fmt.Errorf("header data too short: got %d bytes, expected at least 32", len(data)) - } - - header := &datastreamer.HeaderEntry{ - // Read from encoded data - Version: uint8(binary.LittleEndian.Uint64(data[0:8])), - SystemID: binary.LittleEndian.Uint64(data[8:16]), - TotalEntries: binary.LittleEndian.Uint64(data[16:24]), - TotalLength: binary.LittleEndian.Uint64(data[24:32]), - } - - return header, nil -} - -// encodeFileEntry encodes a FileEntry to bytes -func encodeFileEntry(entry datastreamer.FileEntry) ([]byte, error) { - result := make([]byte, 17+len(entry.Data)) - result[0] = 2 // PacketType (2 for data) - binary.BigEndian.PutUint32(result[1:5], entry.Length) - binary.BigEndian.PutUint32(result[5:9], uint32(entry.Type)) - binary.BigEndian.PutUint64(result[9:17], entry.Number) - copy(result[17:], entry.Data) - return result, nil -} - -// decodeFileEntry decodes bytes to a FileEntry -func decodeFileEntry(data []byte) (datastreamer.FileEntry, error) { - if len(data) < 17 { - return datastreamer.FileEntry{}, errors.New("invalid file entry data") - } - - length := binary.BigEndian.Uint32(data[1:5]) - entryType := datastreamer.EntryType(binary.BigEndian.Uint32(data[5:9])) - number := binary.BigEndian.Uint64(data[9:17]) - entryData := data[17:] - - return datastreamer.FileEntry{ - Type: entryType, - Length: length, - Number: number, - Data: entryData, - }, nil -} - -// MDBXStreamStoreIterator implements the datastreamer.StorageIterator interface -type MDBXStreamStoreIterator struct { - store *MDBXStreamStore - currentEntryNum uint64 - maxEntryNum uint64 - includeBookmarks bool - txn *mdbx.Txn - cursor *mdbx.Cursor - currentEntry datastreamer.FileEntry - hasCurrentEntry bool - err error -} - -// IteratorNext advances the iterator to the next item -func (it *MDBXStreamStoreIterator) Next() (bool, error) { - if it.currentEntryNum > it.maxEntryNum { - it.hasCurrentEntry = false - return false, nil - } - - // Initialize transaction and cursor if needed - if it.txn == nil { - var err error - it.txn, err = it.store.env.BeginTxn(nil, mdbx.Readonly) - if err != nil { - it.hasCurrentEntry = false - it.err = err - return false, err - } - - it.cursor, err = it.txn.OpenCursor(it.store.dbi) - if err != nil { - it.txn.Abort() - it.txn = nil - it.hasCurrentEntry = false - it.err = err - return false, err - } - } - - // Create key for current entry - keyBytes := make([]byte, 8) - binary.BigEndian.PutUint64(keyBytes, it.currentEntryNum) - - // Get entry from db - entryBytes, err := it.txn.Get(it.store.dbi, keyBytes) - if err != nil { - if err == mdbx.NotFound { - // Skip to next entry - it.currentEntryNum++ - return it.Next() - } - it.cursor.Close() - it.txn.Abort() - it.txn = nil - it.cursor = nil - it.hasCurrentEntry = false - it.err = err - return false, err - } - - // Decode entry - entry, err := decodeFileEntry(entryBytes) - if err != nil { - it.cursor.Close() - it.txn.Abort() - it.txn = nil - it.cursor = nil - it.hasCurrentEntry = false - it.err = err - return false, err - } - - it.currentEntryNum++ - - // Skip bookmarks if not including them - if !it.includeBookmarks && entry.Type == 176 { // 176 is bookmark type - return it.Next() - } - - it.currentEntry = entry - it.hasCurrentEntry = true - return true, nil -} - -// IteratorEnd cleans up iterator resources -func (it *MDBXStreamStoreIterator) End() { - if it.cursor != nil { - it.cursor.Close() - it.cursor = nil - } - - if it.txn != nil { - it.txn.Abort() - it.txn = nil - } - - it.hasCurrentEntry = false -} - -// GetEntry returns the current entry -func (it *MDBXStreamStoreIterator) GetEntry() datastreamer.FileEntry { - return it.currentEntry -} - -// newMDBXStreamStoreIterator creates a new iterator for the MDBX-based store -func newMDBXStreamStoreIterator(store *MDBXStreamStore, startEntryNum uint64, includeBookmarks bool) *MDBXStreamStoreIterator { - // Get max entry num - maxEntryNum := store.GetHeader().TotalEntries - 1 - - // We initialize txn and cursor on first use to avoid issues with transaction lifetimes - return &MDBXStreamStoreIterator{ - store: store, - currentEntryNum: startEntryNum, - maxEntryNum: maxEntryNum, - includeBookmarks: includeBookmarks, - } -} - -// GetEntryNumberLimit returns the maximum entry number in the store -func (it *MDBXStreamStoreIterator) GetEntryNumberLimit() uint64 { - return it.maxEntryNum + 1 -} - -// NextFileEntry returns the next file entry from the iterator -func (it *MDBXStreamStoreIterator) NextFileEntry() (*types.FileEntry, error) { - hasNext, err := it.Next() - if err != nil { - return nil, err - } - - if !hasNext { - return nil, nil - } - - // Convert from datastreamer.FileEntry to types.FileEntry for compatibility - dsEntry := it.GetEntry() - return &types.FileEntry{ - PacketType: uint8(dsEntry.Type), - Length: dsEntry.Length, - EntryType: types.EntryType(dsEntry.Type), - EntryNum: dsEntry.Number, - Data: dsEntry.Data, - }, nil -} - -// Close closes the iterator and frees associated resources -func (it *MDBXStreamStoreIterator) Close() { - it.End() -} - -// BookmarkPrintDump prints debug information about bookmarks -func (ms *MDBXStreamStore) BookmarkPrintDump() { - // This is a no-op for MDBX implementation - // Only needed to satisfy the StreamStore interface -} - -// GetFirstEventAfterBookmark gets the first event after a bookmark -func (ms *MDBXStreamStore) GetFirstEventAfterBookmark(bookmark []byte) (datastreamer.FileEntry, error) { - // Get bookmark entry number - entryNum, err := ms.GetBookmark(bookmark) - if err != nil { - return datastreamer.FileEntry{}, err - } - - // Create iterator - iter, err := ms.IteratorFrom(entryNum, false) - if err != nil { - return datastreamer.FileEntry{}, err - } - - // Skip the bookmark entry itself - _, err = iter.NextFileEntry() - if err != nil { - return datastreamer.FileEntry{}, err - } - - // Get the next entry - entry, err := iter.NextFileEntry() - if err != nil { - return datastreamer.FileEntry{}, err - } - - if entry == nil { - return datastreamer.FileEntry{}, errors.New("no entries after bookmark") - } - - // Convert back to datastreamer.FileEntry - return datastreamer.FileEntry{ - Type: datastreamer.EntryType(entry.EntryType), - Length: entry.Length, - Number: entry.EntryNum, - Data: entry.Data, - }, nil -} - -// GetDataBetweenBookmarks gets data between two bookmarks -func (ms *MDBXStreamStore) GetDataBetweenBookmarks(bookmarkFrom, bookmarkTo []byte) ([]byte, error) { - // Get bookmark entry numbers - fromEntryNum, err := ms.GetBookmark(bookmarkFrom) - if err != nil { - return nil, err - } - - toEntryNum, err := ms.GetBookmark(bookmarkTo) - if err != nil { - return nil, err - } - - // Create an iterator - iter, err := ms.IteratorFrom(fromEntryNum, false) - if err != nil { - return nil, err - } - - // Collect all data between bookmarks - var result []byte - for { - entry, err := iter.NextFileEntry() - if err != nil { - return nil, err - } - - if entry == nil || entry.EntryNum > toEntryNum { - break - } - - result = append(result, entry.Data...) - } - - return result, nil -} - -// UpdateEntryData updates the data for an entry -func (ms *MDBXStreamStore) UpdateEntryData(entryNum uint64, entryType datastreamer.EntryType, data []byte) error { - if err := ms.StartAtomicOp(); err != nil { - return err - } - - // Get existing entry - entry, err := ms.GetEntry(entryNum) - if err != nil { - ms.RollbackAtomicOp() - return err - } - - // Update entry - entry.Type = entryType - entry.Data = data - entry.Length = uint32(len(data) + 17) // 17 is the header size - - // Store updated entry - keyBytes := make([]byte, 8) - binary.BigEndian.PutUint64(keyBytes, entryNum) - - entryBytes, err := encodeFileEntry(entry) - if err != nil { - ms.RollbackAtomicOp() - return err - } - - if err := ms.txn.Put(ms.dbi, keyBytes, entryBytes, 0); err != nil { - ms.RollbackAtomicOp() - return err - } - - return ms.CommitAtomicOp() -} - -// GetIterator returns a file iterator for the MDBX stream store -func (ms *MDBXStreamStore) GetIterator(entryNum uint64, readOnly bool) (datastreamer.StorageIterator, error) { - // Create a real iterator using our existing implementation - iterator, err := ms.IteratorFrom(entryNum, true) // Include bookmarks for compatibility - if err != nil { - return nil, err - } - - // Return the iterator as the required interface type - return iterator, nil -} - -// AddFileEntry adds a file entry directly to the stream -func (ms *MDBXStreamStore) AddFileEntry(e datastreamer.FileEntry) error { - ms.mutex.Lock() - defer ms.mutex.Unlock() - - if !ms.inTransaction { - return errors.New("must be in transaction to add entries") - } - - // Encode and store entry - keyBytes := make([]byte, 8) - binary.BigEndian.PutUint64(keyBytes, e.Number) - - entryBytes, err := encodeFileEntry(e) - if err != nil { - return err - } - - if err := ms.txn.Put(ms.dbi, keyBytes, entryBytes, 0); err != nil { - return err - } - - // Only update the header if this is a new entry - if e.Number >= ms.header.TotalEntries { - ms.header.TotalEntries = e.Number + 1 - ms.header.TotalLength += uint64(e.Length) - } - - return nil -} - -// WriteHeaderEntry writes the current header to storage -func (ms *MDBXStreamStore) WriteHeaderEntry() error { - if !ms.inTransaction { - if err := ms.StartAtomicOp(); err != nil { - return err - } - defer ms.CommitAtomicOp() - } - - headerBytes, err := encodeHeader(&ms.header) - if err != nil { - return fmt.Errorf("failed to encode header: %w", err) - } - - if err := ms.txn.Put(ms.metadataDbi, []byte("header"), headerBytes, 0); err != nil { - return fmt.Errorf("failed to save header: %w", err) - } - - return nil -} - -// Close closes the MDBX environment and releases resources -func (ms *MDBXStreamStore) Close() error { - ms.mutex.Lock() - defer ms.mutex.Unlock() - - // If there's an active transaction, abort it - if ms.inTransaction && ms.txn != nil { - ms.txn.Abort() - ms.txn = nil - ms.inTransaction = false - } - - // Close environment - if ms.env != nil { - ms.env.Close() - ms.env = nil - } - - return nil -} diff --git a/zk/datastream/server/store_mdbx_rwdb.go b/zk/datastream/server/store_mdbx_rwdb.go index 12aca7d87bc..854ada9b686 100644 --- a/zk/datastream/server/store_mdbx_rwdb.go +++ b/zk/datastream/server/store_mdbx_rwdb.go @@ -14,8 +14,8 @@ import ( "github.com/gateway-fm/zkevm-data-streamer/datastreamer" ) -// MDBXRwDBStreamStore implements StreamStore using kv.RwDB interface -type MDBXRwDBStreamStore struct { +// MDBXStreamStore implements StreamStore using kv.RwDB interface +type MDBXStreamStore struct { db kv.RwDB header datastreamer.HeaderEntry mutex sync.RWMutex @@ -27,6 +27,19 @@ type MDBXRwDBStreamStore struct { logger log.Logger } +// These constants define the MDBX tables we'll use +const ( + // Main tables + TableEntries = "entries" // Store all entries sequentially + TableBookmarks = "bookmarks" // Store bookmarks for fast seeking + TableMetadata = "metadata" // Store header information +) + +const ( + // Fixed StreamType value + StreamTypeValue = 1 // Always use StreamType 1 (sequencer) in this implementation +) + const ( // Atomic operation Status aoNone datastreamer.AOStatus = iota + 1 @@ -35,8 +48,8 @@ const ( aoRollbacking ) -// NewMDBXRwDBStreamStore creates a new kv.RwDB-based stream store -func NewMDBXRwDBStreamStore(config *StreamStoreConfig) (*MDBXRwDBStreamStore, error) { +// NewMDBXStreamStore creates a new MDBX-based stream store +func NewMDBXStreamStore(config *StreamStoreConfig) (StreamStore, error) { ctx := context.Background() // Use the logger from the config @@ -56,7 +69,7 @@ func NewMDBXRwDBStreamStore(config *StreamStoreConfig) (*MDBXRwDBStreamStore, er } // Initialize store - store := &MDBXRwDBStreamStore{ + store := &MDBXStreamStore{ db: db, header: datastreamer.NewHeader(3, config.SystemID, StreamTypeValue), ctx: ctx, @@ -94,20 +107,17 @@ func NewMDBXRwDBStreamStore(config *StreamStoreConfig) (*MDBXRwDBStreamStore, er } // SetStreamChannel sets the channel for atomic operation notifications -func (ms *MDBXRwDBStreamStore) SetStreamChannel(ch chan datastreamer.StreamAO) { +func (ms *MDBXStreamStore) SetStreamChannel(ch chan datastreamer.StreamAO) { ms.mutex.Lock() defer ms.mutex.Unlock() ms.streamChannel = ch } -func (ms *MDBXRwDBStreamStore) GetNextEntry() uint64 { +func (ms *MDBXStreamStore) GetNextEntry() uint64 { return ms.header.TotalEntries } -func (ms *MDBXRwDBStreamStore) PrintDumpBookmarks() error { - ms.mutex.RLock() - defer ms.mutex.RUnlock() - +func (ms *MDBXStreamStore) PrintDumpBookmarks() error { return ms.db.View(ms.ctx, func(tx kv.Tx) error { cursor, err := tx.Cursor(TableBookmarks) if err != nil { @@ -129,7 +139,7 @@ func (ms *MDBXRwDBStreamStore) PrintDumpBookmarks() error { } // addToStream handles common entry storage logic -func (ms *MDBXRwDBStreamStore) addToStream(entryType datastreamer.EntryType, data []byte) (uint64, error) { +func (ms *MDBXStreamStore) addToStream(entryType datastreamer.EntryType, data []byte) (uint64, error) { // Create entry entryNum := ms.header.TotalEntries entry := datastreamer.NewFileEntry(datastreamer.PtData, entryType, entryNum, data) @@ -158,7 +168,7 @@ func (ms *MDBXRwDBStreamStore) addToStream(entryType datastreamer.EntryType, dat } // AddStreamEntry adds a new entry to the stream -func (ms *MDBXRwDBStreamStore) AddStreamEntry(entryType datastreamer.EntryType, data []byte) (uint64, error) { +func (ms *MDBXStreamStore) AddStreamEntry(entryType datastreamer.EntryType, data []byte) (uint64, error) { ms.mutex.Lock() defer ms.mutex.Unlock() @@ -170,7 +180,7 @@ func (ms *MDBXRwDBStreamStore) AddStreamEntry(entryType datastreamer.EntryType, } // AddStreamBookmark adds a new bookmark to the stream -func (ms *MDBXRwDBStreamStore) AddStreamBookmark(data []byte) (uint64, error) { +func (ms *MDBXStreamStore) AddStreamBookmark(data []byte) (uint64, error) { ms.mutex.Lock() defer ms.mutex.Unlock() @@ -178,7 +188,7 @@ func (ms *MDBXRwDBStreamStore) AddStreamBookmark(data []byte) (uint64, error) { return 0, errors.New("must be in transaction to add bookmarks") } - entryNum, err := ms.addToStream(176, data) // 176 is bookmark type + entryNum, err := ms.addToStream(datastreamer.EntryType(types.BookmarkEntryType), data) if err != nil { return 0, err } @@ -194,89 +204,93 @@ func (ms *MDBXRwDBStreamStore) AddStreamBookmark(data []byte) (uint64, error) { } // GetEntry retrieves an entry from the stream -func (ms *MDBXRwDBStreamStore) GetEntry(entryNum uint64) (datastreamer.FileEntry, error) { - ms.mutex.RLock() - defer ms.mutex.RUnlock() +func (ms *MDBXStreamStore) GetEntry(entryNum uint64) (datastreamer.FileEntry, error) { + // Create key + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, entryNum) - var entry datastreamer.FileEntry + // Define the read function + readFn := func() (interface{}, error) { + var entry datastreamer.FileEntry + err := ms.db.View(ms.ctx, func(tx kv.Tx) error { + // Get from db + entryBytes, err := tx.GetOne(TableEntries, keyBytes) + if err != nil { + return err + } + if entryBytes == nil { + return fmt.Errorf("entry not found: %d", entryNum) + } - // Function to get the entry using a transaction - getEntryFn := func(tx kv.Tx) error { - // Create key - keyBytes := make([]byte, 8) - binary.BigEndian.PutUint64(keyBytes, entryNum) + // Decode entry + decodedEntry, err := decodeFileEntry(entryBytes) + if err != nil { + return err + } - // Get from db - entryBytes, err := tx.GetOne(TableEntries, keyBytes) - if err != nil { - return err - } - if entryBytes == nil { - return fmt.Errorf("entry not found: %d", entryNum) - } + entry = decodedEntry + return nil + }) + return entry, err + } - // Decode entry - decodedEntry, err := decodeFileEntry(entryBytes) + // If we're in a transaction, run in separate goroutine + if ms.inTransaction { + result, err := ms.runReaderInSeparateGoroutine(readFn) if err != nil { - return err + return datastreamer.FileEntry{}, err } - - entry = decodedEntry - return nil + return result.(datastreamer.FileEntry), nil } - // If we're in a transaction, use the current transaction - if ms.inTransaction { - err := getEntryFn(ms.currentTx) - return entry, err + // Otherwise, run directly + result, err := readFn() + if err != nil { + return datastreamer.FileEntry{}, err } - - // Otherwise, start a new read transaction - err := ms.db.View(ms.ctx, func(tx kv.Tx) error { - return getEntryFn(tx) - }) - - return entry, err + return result.(datastreamer.FileEntry), nil } // GetBookmark retrieves a bookmark from the stream -func (ms *MDBXRwDBStreamStore) GetBookmark(data []byte) (uint64, error) { - ms.mutex.RLock() - defer ms.mutex.RUnlock() +func (ms *MDBXStreamStore) GetBookmark(key []byte) (uint64, error) { + // Define the read function + readFn := func() (interface{}, error) { + var entryNum uint64 + err := ms.db.View(ms.ctx, func(tx kv.Tx) error { + // Get from db + entryNumBytes, err := tx.GetOne(TableBookmarks, key) + if err != nil { + return err + } + if entryNumBytes == nil { + return fmt.Errorf("bookmark not found") + } - var entryNum uint64 + entryNum = binary.BigEndian.Uint64(entryNumBytes) + return nil + }) + return entryNum, err + } - // Function to get the bookmark using a transaction - getBookmarkFn := func(tx kv.Tx) error { - // Get from db - entryNumBytes, err := tx.GetOne(TableBookmarks, data) + // If we're in a transaction, run in separate goroutine + if ms.inTransaction { + result, err := ms.runReaderInSeparateGoroutine(readFn) if err != nil { - return err + return 0, err } - if entryNumBytes == nil { - return fmt.Errorf("bookmark not found") - } - - entryNum = binary.BigEndian.Uint64(entryNumBytes) - return nil + return result.(uint64), nil } - // If we're in a transaction, use the current transaction - if ms.inTransaction { - err := getBookmarkFn(ms.currentTx) - return entryNum, err + // Otherwise, run directly + result, err := readFn() + if err != nil { + return 0, err } - - // Otherwise, start a new read transaction - err := ms.db.View(ms.ctx, func(tx kv.Tx) error { - return getBookmarkFn(tx) - }) - - return entryNum, err + return result.(uint64), nil } // StartAtomicOp starts a transaction -func (ms *MDBXRwDBStreamStore) StartAtomicOp() error { +func (ms *MDBXStreamStore) StartAtomicOp() error { ms.mutex.Lock() defer ms.mutex.Unlock() @@ -304,7 +318,7 @@ func (ms *MDBXRwDBStreamStore) StartAtomicOp() error { } // CommitAtomicOp commits the current transaction -func (ms *MDBXRwDBStreamStore) CommitAtomicOp() error { +func (ms *MDBXStreamStore) CommitAtomicOp() error { ms.mutex.Lock() defer ms.mutex.Unlock() @@ -347,7 +361,7 @@ func (ms *MDBXRwDBStreamStore) CommitAtomicOp() error { } // RollbackAtomicOp aborts the current transaction -func (ms *MDBXRwDBStreamStore) RollbackAtomicOp() error { +func (ms *MDBXStreamStore) RollbackAtomicOp() error { ms.mutex.Lock() defer ms.mutex.Unlock() @@ -362,12 +376,12 @@ func (ms *MDBXRwDBStreamStore) RollbackAtomicOp() error { } // GetHeader returns the current header -func (ms *MDBXRwDBStreamStore) GetHeader() datastreamer.HeaderEntry { +func (ms *MDBXStreamStore) GetHeader() datastreamer.HeaderEntry { return ms.header } // TruncateFile truncates the stream to the specified entry number -func (ms *MDBXRwDBStreamStore) TruncateFile(entryNum uint64) error { +func (ms *MDBXStreamStore) TruncateFile(entryNum uint64) error { ms.mutex.Lock() defer ms.mutex.Unlock() @@ -453,12 +467,89 @@ func (ms *MDBXRwDBStreamStore) TruncateFile(entryNum uint64) error { } // IteratorFrom returns an iterator starting from the specified entry -func (ms *MDBXRwDBStreamStore) IteratorFrom(entryNum uint64, includeBookmarks bool) (*MDBXRwDBStreamStoreIterator, error) { - return newMDBXRwDBStreamStoreIterator(ms, entryNum, includeBookmarks), nil +func (ms *MDBXStreamStore) IteratorFrom(entryNum uint64, includeBookmarks bool) (*MDBXStreamStoreIterator, error) { + return newMDBXStreamStoreIterator(ms, entryNum, includeBookmarks), nil } // GetFirstEventAfterBookmark gets the first event after a bookmark -func (ms *MDBXRwDBStreamStore) GetFirstEventAfterBookmark(bookmark []byte) (datastreamer.FileEntry, error) { +func (ms *MDBXStreamStore) GetFirstEventAfterBookmark(bookmark []byte) (datastreamer.FileEntry, error) { + // If we're in a transaction, we need a specialized approach + if ms.inTransaction { + // Define a read function that gets the bookmark and finds the first event + readFn := func() (interface{}, error) { + // Get the bookmark and find first event in a single transaction + var resultEntry datastreamer.FileEntry + err := ms.db.View(ms.ctx, func(tx kv.Tx) error { + // Get bookmark + entryNumBytes, err := tx.GetOne(TableBookmarks, bookmark) + if err != nil { + return err + } + if entryNumBytes == nil { + return fmt.Errorf("bookmark not found") + } + + entryNum := binary.BigEndian.Uint64(entryNumBytes) + currentEntryNum := entryNum + + // Get maxEntryNum by retrieving the header + headerBytes, err := tx.GetOne(TableMetadata, []byte("header")) + if err != nil { + return err + } + if headerBytes == nil { + return fmt.Errorf("header not found") + } + + header, err := decodeHeader(headerBytes) + if err != nil { + return err + } + + maxEntryNum := header.TotalEntries - 1 + + // Search for first non-bookmark entry + for currentEntryNum <= maxEntryNum { + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, currentEntryNum) + + entryBytes, err := tx.GetOne(TableEntries, keyBytes) + if err != nil || entryBytes == nil { + currentEntryNum++ + continue // Skip missing entries + } + + entry, err := decodeFileEntry(entryBytes) + if err != nil { + currentEntryNum++ + continue // Skip invalid entries + } + + // Skip bookmarks + if entry.Type == 176 { // Bookmark type + currentEntryNum++ + continue + } + + resultEntry = entry + return nil + } + + return fmt.Errorf("no events after bookmark") + }) + + return resultEntry, err + } + + // Run in separate goroutine + result, err := ms.runReaderInSeparateGoroutine(readFn) + if err != nil { + return datastreamer.FileEntry{}, err + } + return result.(datastreamer.FileEntry), nil + } + + // Standard approach for non-transaction case // Get entry number from bookmark entryNum, err := ms.GetBookmark(bookmark) if err != nil { @@ -486,7 +577,7 @@ func (ms *MDBXRwDBStreamStore) GetFirstEventAfterBookmark(bookmark []byte) (data } // GetDataBetweenBookmarks gets all data between two bookmarks -func (ms *MDBXRwDBStreamStore) GetDataBetweenBookmarks(bookmarkFrom, bookmarkTo []byte) ([]byte, error) { +func (ms *MDBXStreamStore) GetDataBetweenBookmarks(bookmarkFrom, bookmarkTo []byte) ([]byte, error) { // Get entry numbers from bookmarks fromEntryNum, err := ms.GetBookmark(bookmarkFrom) if err != nil { @@ -502,27 +593,55 @@ func (ms *MDBXRwDBStreamStore) GetDataBetweenBookmarks(bookmarkFrom, bookmarkTo return nil, fmt.Errorf("invalid bookmark range") } - // Collect all data in the range - var data []byte - for i := fromEntryNum + 1; i < toEntryNum; i++ { - entry, err := ms.GetEntry(i) - if err != nil { - continue // Skip errors - } + // Define the read function to collect data + readFn := func() (interface{}, error) { + var data []byte + err := ms.db.View(ms.ctx, func(tx kv.Tx) error { + for i := fromEntryNum + 1; i < toEntryNum; i++ { + keyBytes := make([]byte, 8) + binary.BigEndian.PutUint64(keyBytes, i) - // Skip bookmarks - if entry.Type == 176 { // Bookmark type - continue - } + entryBytes, err := tx.GetOne(TableEntries, keyBytes) + if err != nil || entryBytes == nil { + continue // Skip missing entries + } + + entry, err := decodeFileEntry(entryBytes) + if err != nil { + continue // Skip invalid entries + } + + // Skip bookmarks + if entry.Type == 176 { // Bookmark type + continue + } - data = append(data, entry.Data...) + data = append(data, entry.Data...) + } + return nil + }) + return data, err } - return data, nil + // If we're in a transaction, run in separate goroutine + if ms.inTransaction { + result, err := ms.runReaderInSeparateGoroutine(readFn) + if err != nil { + return nil, err + } + return result.([]byte), nil + } + + // Otherwise, run directly + result, err := readFn() + if err != nil { + return nil, err + } + return result.([]byte), nil } // UpdateEntryData updates the data for an existing entry -func (ms *MDBXRwDBStreamStore) UpdateEntryData(entryNum uint64, entryType datastreamer.EntryType, data []byte) error { +func (ms *MDBXStreamStore) UpdateEntryData(entryNum uint64, entryType datastreamer.EntryType, data []byte) error { if err := ms.StartAtomicOp(); err != nil { return err } @@ -558,7 +677,7 @@ func (ms *MDBXRwDBStreamStore) UpdateEntryData(entryNum uint64, entryType datast } // GetIterator returns a file iterator for the stream store -func (ms *MDBXRwDBStreamStore) GetIterator(entryNum uint64, readOnly bool) (datastreamer.StorageIterator, error) { +func (ms *MDBXStreamStore) GetIterator(entryNum uint64, readOnly bool) (datastreamer.StorageIterator, error) { // Create a real iterator using our existing implementation iterator, err := ms.IteratorFrom(entryNum, true) // Include bookmarks for compatibility if err != nil { @@ -570,7 +689,7 @@ func (ms *MDBXRwDBStreamStore) GetIterator(entryNum uint64, readOnly bool) (data } // AddFileEntry adds a file entry directly to the stream -func (ms *MDBXRwDBStreamStore) AddFileEntry(e datastreamer.FileEntry) error { +func (ms *MDBXStreamStore) AddFileEntry(e datastreamer.FileEntry) error { ms.mutex.Lock() defer ms.mutex.Unlock() @@ -601,7 +720,7 @@ func (ms *MDBXRwDBStreamStore) AddFileEntry(e datastreamer.FileEntry) error { } // WriteHeaderEntry writes the current header to storage -func (ms *MDBXRwDBStreamStore) WriteHeaderEntry() error { +func (ms *MDBXStreamStore) WriteHeaderEntry() error { if !ms.inTransaction { return errors.New("must be in transaction to write header") } @@ -615,13 +734,13 @@ func (ms *MDBXRwDBStreamStore) WriteHeaderEntry() error { } // BookmarkPrintDump prints debug information about bookmarks -func (ms *MDBXRwDBStreamStore) BookmarkPrintDump() { +func (ms *MDBXStreamStore) BookmarkPrintDump() { // This is a no-op implementation // Only needed to satisfy the StreamStore interface } // Close closes the stream store -func (ms *MDBXRwDBStreamStore) Close() error { +func (ms *MDBXStreamStore) Close() error { ms.mutex.Lock() defer ms.mutex.Unlock() @@ -636,9 +755,9 @@ func (ms *MDBXRwDBStreamStore) Close() error { return nil } -// MDBXRwDBStreamStoreIterator implements the datastreamer.StorageIterator interface -type MDBXRwDBStreamStoreIterator struct { - store *MDBXRwDBStreamStore +// MDBXStreamStoreIterator implements the datastreamer.StorageIterator interface +type MDBXStreamStoreIterator struct { + store *MDBXStreamStore currentEntryNum uint64 maxEntryNum uint64 includeBookmarks bool @@ -649,14 +768,13 @@ type MDBXRwDBStreamStoreIterator struct { } // Next advances the iterator to the next item -func (it *MDBXRwDBStreamStoreIterator) Next() (bool, error) { +func (it *MDBXStreamStoreIterator) Next() (bool, error) { if it.currentEntryNum > it.maxEntryNum { it.hasCurrentEntry = false return false, nil } for { - // Get entry at current position entry, err := it.store.GetEntry(it.currentEntryNum) it.currentEntryNum++ @@ -680,27 +798,27 @@ func (it *MDBXRwDBStreamStoreIterator) Next() (bool, error) { it.currentEntry = entry it.hasCurrentEntry = true - return true, nil + return it.currentEntryNum >= it.maxEntryNum, nil } } // End cleans up iterator resources -func (it *MDBXRwDBStreamStoreIterator) End() { +func (it *MDBXStreamStoreIterator) End() { // Nothing to clean up for this implementation it.hasCurrentEntry = false } // GetEntry returns the current entry -func (it *MDBXRwDBStreamStoreIterator) GetEntry() datastreamer.FileEntry { +func (it *MDBXStreamStoreIterator) GetEntry() datastreamer.FileEntry { return it.currentEntry } -// newMDBXRwDBStreamStoreIterator creates a new iterator for the kv.RwDB-based store -func newMDBXRwDBStreamStoreIterator(store *MDBXRwDBStreamStore, startEntryNum uint64, includeBookmarks bool) *MDBXRwDBStreamStoreIterator { +// newMDBXStreamStoreIterator creates a new iterator for the MDBX-based store +func newMDBXStreamStoreIterator(store *MDBXStreamStore, startEntryNum uint64, includeBookmarks bool) *MDBXStreamStoreIterator { // Get max entry num maxEntryNum := store.GetHeader().TotalEntries - 1 - return &MDBXRwDBStreamStoreIterator{ + return &MDBXStreamStoreIterator{ store: store, currentEntryNum: startEntryNum, maxEntryNum: maxEntryNum, @@ -709,12 +827,12 @@ func newMDBXRwDBStreamStoreIterator(store *MDBXRwDBStreamStore, startEntryNum ui } // GetEntryNumberLimit returns the maximum entry number in the store -func (it *MDBXRwDBStreamStoreIterator) GetEntryNumberLimit() uint64 { +func (it *MDBXStreamStoreIterator) GetEntryNumberLimit() uint64 { return it.maxEntryNum + 1 } // NextFileEntry returns the next file entry from the iterator -func (it *MDBXRwDBStreamStoreIterator) NextFileEntry() (*types.FileEntry, error) { +func (it *MDBXStreamStoreIterator) NextFileEntry() (*types.FileEntry, error) { hasNext, err := it.Next() if err != nil { return nil, err @@ -736,6 +854,99 @@ func (it *MDBXRwDBStreamStoreIterator) NextFileEntry() (*types.FileEntry, error) } // Close closes the iterator and frees associated resources -func (it *MDBXRwDBStreamStoreIterator) Close() { +func (it *MDBXStreamStoreIterator) Close() { it.End() } + +// Helper functions + +// encodeHeader encodes a HeaderEntry into bytes +func encodeHeader(header *datastreamer.HeaderEntry) ([]byte, error) { + // Version(8) + SystemID(8) + TotalEntries(8) + TotalLength(8) = 32 bytes + result := make([]byte, 32) + + // Write Version (bytes 0-8) + binary.LittleEndian.PutUint64(result[0:8], uint64(header.Version)) + + // Write SystemID (bytes 8-16) + binary.LittleEndian.PutUint64(result[8:16], header.SystemID) + + // Write TotalEntries (bytes 16-24) + binary.LittleEndian.PutUint64(result[16:24], header.TotalEntries) + + // Write TotalLength (bytes 24-32) + binary.LittleEndian.PutUint64(result[24:32], header.TotalLength) + + return result, nil +} + +// decodeHeader decodes bytes into a HeaderEntry +func decodeHeader(data []byte) (*datastreamer.HeaderEntry, error) { + if len(data) < 32 { + return nil, fmt.Errorf("header data too short: got %d bytes, expected at least 32", len(data)) + } + + header := datastreamer.NewHeader( + uint8(binary.LittleEndian.Uint64(data[0:8])), + binary.LittleEndian.Uint64(data[8:16]), + datastreamer.StreamType(1), // Always use StreamType 1 (sequencer) + ) + header.TotalEntries = binary.LittleEndian.Uint64(data[16:24]) + header.TotalLength = binary.LittleEndian.Uint64(data[24:32]) + + return &header, nil +} + +// encodeFileEntry encodes a FileEntry to bytes +func encodeFileEntry(entry datastreamer.FileEntry) ([]byte, error) { + result := make([]byte, 17+len(entry.Data)) + result[0] = 2 // PacketType (2 for data) + binary.BigEndian.PutUint32(result[1:5], entry.Length) + binary.BigEndian.PutUint32(result[5:9], uint32(entry.Type)) + binary.BigEndian.PutUint64(result[9:17], entry.Number) + copy(result[17:], entry.Data) + return result, nil +} + +// decodeFileEntry decodes bytes to a FileEntry +func decodeFileEntry(data []byte) (datastreamer.FileEntry, error) { + if len(data) < 17 { + return datastreamer.FileEntry{}, errors.New("invalid file entry data") + } + + length := binary.BigEndian.Uint32(data[1:5]) + entryType := datastreamer.EntryType(binary.BigEndian.Uint32(data[5:9])) + number := binary.BigEndian.Uint64(data[9:17]) + entryData := data[17:] + + decoded := datastreamer.NewFileEntry(data[0], entryType, number, entryData) + decoded.Length = length + return decoded, nil +} + +// runReaderInSeparateGoroutine executes a read operation in a separate goroutine +// This is useful when we're in a write transaction and need to perform a read operation +// which would otherwise fail due to MDBX's thread-specific transaction restrictions +func (ms *MDBXStreamStore) runReaderInSeparateGoroutine(readFn func() (interface{}, error)) (interface{}, error) { + resultCh := make(chan interface{}, 1) + errCh := make(chan error, 1) + + go func() { + result, err := readFn() + if err != nil { + errCh <- err + return + } + resultCh <- result + }() + + // Wait for the result or error + select { + case result := <-resultCh: + return result, nil + case err := <-errCh: + return nil, err + case <-ms.ctx.Done(): + return nil, ms.ctx.Err() + } +} From 8ffee5d3d294007fa91ebfd6b35780f486159d77 Mon Sep 17 00:00:00 2001 From: Carl Lambert Date: Thu, 8 May 2025 01:36:08 +0100 Subject: [PATCH 4/5] WIP: reference rc1 ZDS package minor tidy up --- go.mod | 6 +- go.sum | 8 +- zk/datastream/server/interfaces.go | 2 - ...tore_mdbx_rwdb.go => mdbx_stream_store.go} | 172 +----------------- zk/datastream/server/store_interfaces.go | 6 - 5 files changed, 11 insertions(+), 183 deletions(-) rename zk/datastream/server/{store_mdbx_rwdb.go => mdbx_stream_store.go} (82%) diff --git a/go.mod b/go.mod index 1eca4da763a..1f2a35f8544 100644 --- a/go.mod +++ b/go.mod @@ -2,10 +2,13 @@ module github.com/erigontech/erigon go 1.23.0 +toolchain go1.23.7 + require ( github.com/erigontech/mdbx-go v0.27.24 github.com/erigontech/secp256k1 v1.1.0 github.com/erigontech/silkworm-go v0.18.0 + github.com/gateway-fm/zkevm-data-streamer v0.2.10-0.20250508001840-99bc56b54bb6 ) replace github.com/erigontech/erigon-lib => ./erigon-lib @@ -43,7 +46,6 @@ require ( github.com/erigontech/erigon-lib v1.0.0 github.com/erigontech/erigonwatch v0.1.16 github.com/fjl/gencodec v0.0.0-20220412091415-8bb9e558978c - github.com/gateway-fm/zkevm-data-streamer v0.2.9 github.com/gballet/go-verkle v0.0.0-20221121182333-31427a1f2d35 github.com/gfx-labs/sse v0.0.0-20231226060816-f747e26a9baa github.com/go-chi/chi/v5 v5.0.12 @@ -190,7 +192,7 @@ require ( github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect github.com/hermeznetwork/tracerr v0.3.2 // indirect - github.com/ianlancetaylor/cgosymbolizer v0.0.0-20220405231054-a1ae3e4bba26 // indirect + github.com/ianlancetaylor/cgosymbolizer v0.0.0-20241129212102-9c50ad6b591e // indirect github.com/imdario/mergo v0.3.11 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect diff --git a/go.sum b/go.sum index 64403a67efd..ecdc46a8369 100644 --- a/go.sum +++ b/go.sum @@ -315,8 +315,8 @@ github.com/garslo/gogen v0.0.0-20170307003452-d6ebae628c7c h1:uYNKzPntb8c6DKvP9E github.com/garslo/gogen v0.0.0-20170307003452-d6ebae628c7c/go.mod h1:Q0X6pkwTILDlzrGEckF6HKjXe48EgsY/l7K7vhY4MW8= github.com/gateway-fm/vectorized-poseidon-gold v1.0.0 h1:Du0ZW+fkZhgRNGx/gAkHnMj3/Rl8uJkAEe+ZDPX3PDw= github.com/gateway-fm/vectorized-poseidon-gold v1.0.0/go.mod h1:VLGQpyjrOg8+FugH/+d8tfYd/c3z4Xqa+zbUBITygaw= -github.com/gateway-fm/zkevm-data-streamer v0.2.9 h1:ezR6IJakWvtNXFEhWXtwuot0JikKCJIB9rk2wnUsH3A= -github.com/gateway-fm/zkevm-data-streamer v0.2.9/go.mod h1:kKQSu8Ob9yNR+3ocx9dHlWmQ4Gnct3jRtQqKR69CmyM= +github.com/gateway-fm/zkevm-data-streamer v0.2.10-0.20250508001840-99bc56b54bb6 h1:2YhW21cICgqIH3D1JXVv88AtPiuvY+MtM2Ffr9Utqlk= +github.com/gateway-fm/zkevm-data-streamer v0.2.10-0.20250508001840-99bc56b54bb6/go.mod h1:WN8ODhsoWzTz9PkXK/PUJ4st37+yBLhSAyP09N35LYY= github.com/gballet/go-verkle v0.0.0-20221121182333-31427a1f2d35 h1:I8QswD9gf3VEpr7bpepKKOm7ChxFITIG+oc1I5/S0no= github.com/gballet/go-verkle v0.0.0-20221121182333-31427a1f2d35/go.mod h1:DMDd04jjQgdynaAwbEgiRERIGpC8fDjx0+y06an7Psg= github.com/gfx-labs/sse v0.0.0-20231226060816-f747e26a9baa h1:b6fBm4SLM8jywQHNmc3ZCl6zQEhEyZl6bp7is4en72M= @@ -503,8 +503,8 @@ github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= -github.com/ianlancetaylor/cgosymbolizer v0.0.0-20220405231054-a1ae3e4bba26 h1:UT3hQ6+5hwqUT83cKhKlY5I0W/kqsl6lpn3iFb3Gtqs= -github.com/ianlancetaylor/cgosymbolizer v0.0.0-20220405231054-a1ae3e4bba26/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= +github.com/ianlancetaylor/cgosymbolizer v0.0.0-20241129212102-9c50ad6b591e h1:8AnObPi8WmIgjwcidUxaREhXMSpyUJeeSrIkZTXdabw= +github.com/ianlancetaylor/cgosymbolizer v0.0.0-20241129212102-9c50ad6b591e/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/iden3/go-iden3-crypto v0.0.17 h1:NdkceRLJo/pI4UpcjVah4lN/a3yzxRUGXqxbWcYh9mY= diff --git a/zk/datastream/server/interfaces.go b/zk/datastream/server/interfaces.go index 7a85a41ad32..7f1edc5d562 100644 --- a/zk/datastream/server/interfaces.go +++ b/zk/datastream/server/interfaces.go @@ -25,8 +25,6 @@ type StreamStore interface { GetEntry(entryNum uint64) (datastreamer.FileEntry, error) GetBookmark(data []byte) (uint64, error) - GetFirstEventAfterBookmark(bookmark []byte) (datastreamer.FileEntry, error) - GetDataBetweenBookmarks(bookmarkFrom, bookmarkTo []byte) ([]byte, error) UpdateEntryData(entryNum uint64, etype datastreamer.EntryType, data []byte) error diff --git a/zk/datastream/server/store_mdbx_rwdb.go b/zk/datastream/server/mdbx_stream_store.go similarity index 82% rename from zk/datastream/server/store_mdbx_rwdb.go rename to zk/datastream/server/mdbx_stream_store.go index 854ada9b686..546078088c8 100644 --- a/zk/datastream/server/store_mdbx_rwdb.go +++ b/zk/datastream/server/mdbx_stream_store.go @@ -326,6 +326,9 @@ func (ms *MDBXStreamStore) CommitAtomicOp() error { return fmt.Errorf("not in transaction") } + // Set the status to committing + ms.atomicOp.Status = aoCommitting + // Save the header if err := ms.WriteHeaderEntry(); err != nil { ms.currentTx.Rollback() @@ -471,175 +474,6 @@ func (ms *MDBXStreamStore) IteratorFrom(entryNum uint64, includeBookmarks bool) return newMDBXStreamStoreIterator(ms, entryNum, includeBookmarks), nil } -// GetFirstEventAfterBookmark gets the first event after a bookmark -func (ms *MDBXStreamStore) GetFirstEventAfterBookmark(bookmark []byte) (datastreamer.FileEntry, error) { - // If we're in a transaction, we need a specialized approach - if ms.inTransaction { - // Define a read function that gets the bookmark and finds the first event - readFn := func() (interface{}, error) { - // Get the bookmark and find first event in a single transaction - var resultEntry datastreamer.FileEntry - err := ms.db.View(ms.ctx, func(tx kv.Tx) error { - // Get bookmark - entryNumBytes, err := tx.GetOne(TableBookmarks, bookmark) - if err != nil { - return err - } - if entryNumBytes == nil { - return fmt.Errorf("bookmark not found") - } - - entryNum := binary.BigEndian.Uint64(entryNumBytes) - currentEntryNum := entryNum - - // Get maxEntryNum by retrieving the header - headerBytes, err := tx.GetOne(TableMetadata, []byte("header")) - if err != nil { - return err - } - if headerBytes == nil { - return fmt.Errorf("header not found") - } - - header, err := decodeHeader(headerBytes) - if err != nil { - return err - } - - maxEntryNum := header.TotalEntries - 1 - - // Search for first non-bookmark entry - for currentEntryNum <= maxEntryNum { - keyBytes := make([]byte, 8) - binary.BigEndian.PutUint64(keyBytes, currentEntryNum) - - entryBytes, err := tx.GetOne(TableEntries, keyBytes) - if err != nil || entryBytes == nil { - currentEntryNum++ - continue // Skip missing entries - } - - entry, err := decodeFileEntry(entryBytes) - if err != nil { - currentEntryNum++ - continue // Skip invalid entries - } - - // Skip bookmarks - if entry.Type == 176 { // Bookmark type - currentEntryNum++ - continue - } - - resultEntry = entry - return nil - } - - return fmt.Errorf("no events after bookmark") - }) - - return resultEntry, err - } - - // Run in separate goroutine - result, err := ms.runReaderInSeparateGoroutine(readFn) - if err != nil { - return datastreamer.FileEntry{}, err - } - return result.(datastreamer.FileEntry), nil - } - - // Standard approach for non-transaction case - // Get entry number from bookmark - entryNum, err := ms.GetBookmark(bookmark) - if err != nil { - return datastreamer.FileEntry{}, err - } - - // Create iterator from that entry - iterator, err := ms.IteratorFrom(entryNum, false) // Skip bookmarks - if err != nil { - return datastreamer.FileEntry{}, err - } - defer iterator.Close() - - // Get first event entry after bookmark - hasNext, err := iterator.Next() - if err != nil { - return datastreamer.FileEntry{}, err - } - - if !hasNext { - return datastreamer.FileEntry{}, fmt.Errorf("no events after bookmark") - } - - return iterator.GetEntry(), nil -} - -// GetDataBetweenBookmarks gets all data between two bookmarks -func (ms *MDBXStreamStore) GetDataBetweenBookmarks(bookmarkFrom, bookmarkTo []byte) ([]byte, error) { - // Get entry numbers from bookmarks - fromEntryNum, err := ms.GetBookmark(bookmarkFrom) - if err != nil { - return nil, err - } - - toEntryNum, err := ms.GetBookmark(bookmarkTo) - if err != nil { - return nil, err - } - - if fromEntryNum >= toEntryNum { - return nil, fmt.Errorf("invalid bookmark range") - } - - // Define the read function to collect data - readFn := func() (interface{}, error) { - var data []byte - err := ms.db.View(ms.ctx, func(tx kv.Tx) error { - for i := fromEntryNum + 1; i < toEntryNum; i++ { - keyBytes := make([]byte, 8) - binary.BigEndian.PutUint64(keyBytes, i) - - entryBytes, err := tx.GetOne(TableEntries, keyBytes) - if err != nil || entryBytes == nil { - continue // Skip missing entries - } - - entry, err := decodeFileEntry(entryBytes) - if err != nil { - continue // Skip invalid entries - } - - // Skip bookmarks - if entry.Type == 176 { // Bookmark type - continue - } - - data = append(data, entry.Data...) - } - return nil - }) - return data, err - } - - // If we're in a transaction, run in separate goroutine - if ms.inTransaction { - result, err := ms.runReaderInSeparateGoroutine(readFn) - if err != nil { - return nil, err - } - return result.([]byte), nil - } - - // Otherwise, run directly - result, err := readFn() - if err != nil { - return nil, err - } - return result.([]byte), nil -} - // UpdateEntryData updates the data for an existing entry func (ms *MDBXStreamStore) UpdateEntryData(entryNum uint64, entryType datastreamer.EntryType, data []byte) error { if err := ms.StartAtomicOp(); err != nil { diff --git a/zk/datastream/server/store_interfaces.go b/zk/datastream/server/store_interfaces.go index 2d8b42384d6..31688df7f45 100644 --- a/zk/datastream/server/store_interfaces.go +++ b/zk/datastream/server/store_interfaces.go @@ -27,9 +27,3 @@ type StreamStoreConfig struct { MDBXMaxDBS int MDBXFlags uint } - -// StreamStoreFactory creates stream stores based on configuration -type StreamStoreFactory interface { - // CreateStore creates a new stream store based on the provided configuration - CreateStore(config *StreamStoreConfig) (StreamStore, error) -} From 61aaa2b676d85922eff26cd2d034dceeebf5df3f Mon Sep 17 00:00:00 2001 From: Carl Lambert Date: Thu, 8 May 2025 21:00:09 +0100 Subject: [PATCH 5/5] fix: post rebase correction --- cmd/relay/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/relay/main.go b/cmd/relay/main.go index 15844e4a039..c62f14640d9 100644 --- a/cmd/relay/main.go +++ b/cmd/relay/main.go @@ -56,6 +56,7 @@ func main() { inactivityTimeout, // Inactivity timeout checkInterval, // Check interval nil, // No custom logger + nil, // No custom config ) if err != nil {