diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8d2bb89c1..4df57a539 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,6 +55,7 @@ jobs: apps: | [ {"name": "ev-node-evm", "dockerfile": "apps/evm/Dockerfile"}, + {"name": "ev-node-grpc", "dockerfile": "apps/grpc/Dockerfile"}, {"name": "ev-node-testapp", "dockerfile": "apps/testapp/Dockerfile"} ] diff --git a/.golangci.yml b/.golangci.yml index 4df3a98fd..469a31268 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -55,6 +55,7 @@ linters: gosec: excludes: - G115 + - G118 revive: rules: - name: package-comments diff --git a/apps/grpc/Dockerfile b/apps/grpc/Dockerfile index ba69913c9..e926eadfa 100644 --- a/apps/grpc/Dockerfile +++ b/apps/grpc/Dockerfile @@ -10,7 +10,7 @@ WORKDIR /ev-node # Copy go mod files COPY go.mod go.sum ./ COPY apps/grpc/go.mod apps/grpc/go.sum ./apps/grpc/ -COPY core/go.mod ./core/ +COPY core/go.mod core/go.sum ./core/ COPY execution/grpc/go.mod execution/grpc/go.sum ./execution/grpc/ # Download dependencies diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index 229160952..b9070d749 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -15,6 +15,9 @@ import ( "github.com/evstack/ev-node/types" ) +// isNotFound is a convenience alias for store.IsNotFound. +var isNotFound = store.IsNotFound + const ( // HeaderDAIncludedPrefix is the store key prefix for header DA inclusion tracking. HeaderDAIncludedPrefix = "cache/header-da-included/" @@ -387,23 +390,45 @@ func (m *implementation) ClearFromStore() error { return nil } +// getMetadataUint64 reads an 8-byte little-endian uint64 from store metadata. +// Returns (0, false, nil) when the key is absent and a non-nil error for +// genuine backend failures or malformed values. +func getMetadataUint64(ctx context.Context, st store.Store, key string) (uint64, bool, error) { + b, err := st.GetMetadata(ctx, key) + if err != nil { + // Key absent — not an error, just missing. + if isNotFound(err) { + return 0, false, nil + } + return 0, false, fmt.Errorf("read metadata %q: %w", key, err) + } + if len(b) != 8 { + return 0, false, fmt.Errorf("invalid metadata length for %q: %d", key, len(b)) + } + return binary.LittleEndian.Uint64(b), true, nil +} + // initDAHeightFromStore seeds maxDAHeight from the HeightToDAHeight metadata // written by the submitter for the last finalized block. This ensures // DaHeight() is non-zero on restart even when the in-flight snapshot is empty. func (m *implementation) initDAHeightFromStore(ctx context.Context) { - daIncludedBytes, err := m.store.GetMetadata(ctx, store.DAIncludedHeightKey) - if err != nil || len(daIncludedBytes) != 8 { + daIncludedHeight, ok, err := getMetadataUint64(ctx, m.store, store.DAIncludedHeightKey) + if err != nil { + m.logger.Error().Err(err).Msg("failed to read DA included height from store") return } - daIncludedHeight := binary.LittleEndian.Uint64(daIncludedBytes) - if daIncludedHeight == 0 { + if !ok || daIncludedHeight == 0 { return } - if b, err := m.store.GetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(daIncludedHeight)); err == nil && len(b) == 8 { - m.headerCache.setMaxDAHeight(binary.LittleEndian.Uint64(b)) + if h, ok, err := getMetadataUint64(ctx, m.store, store.GetHeightToDAHeightHeaderKey(daIncludedHeight)); err != nil { + m.logger.Error().Err(err).Msg("failed to read header DA height from store") + } else if ok { + m.headerCache.setMaxDAHeight(h) } - if b, err := m.store.GetMetadata(ctx, store.GetHeightToDAHeightDataKey(daIncludedHeight)); err == nil && len(b) == 8 { - m.dataCache.setMaxDAHeight(binary.LittleEndian.Uint64(b)) + if h, ok, err := getMetadataUint64(ctx, m.store, store.GetHeightToDAHeightDataKey(daIncludedHeight)); err != nil { + m.logger.Error().Err(err).Msg("failed to read data DA height from store") + } else if ok { + m.dataCache.setMaxDAHeight(h) } } diff --git a/block/internal/cache/manager_test.go b/block/internal/cache/manager_test.go index 3d2840015..fa5aebf34 100644 --- a/block/internal/cache/manager_test.go +++ b/block/internal/cache/manager_test.go @@ -23,17 +23,10 @@ func tempConfig(t *testing.T) config.Config { return cfg } -// helper to make an in-memory store -func memStore(t *testing.T) pkgstore.Store { - ds, err := pkgstore.NewTestInMemoryKVStore() - require.NoError(t, err) - return pkgstore.New(ds) -} - func TestManager_HeaderDataOperations(t *testing.T) { t.Parallel() cfg := tempConfig(t) - st := memStore(t) + st := testMemStore(t) m, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) @@ -55,7 +48,7 @@ func TestManager_HeaderDataOperations(t *testing.T) { func TestManager_PendingEventsCRUD(t *testing.T) { t.Parallel() cfg := tempConfig(t) - st := memStore(t) + st := testMemStore(t) m, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) @@ -89,7 +82,7 @@ func TestManager_PendingEventsCRUD(t *testing.T) { func TestManager_SaveAndRestoreFromStore(t *testing.T) { t.Parallel() cfg := tempConfig(t) - st := memStore(t) + st := testMemStore(t) ctx := context.Background() h1, d1 := types.GetRandomBlock(1, 1, "test-chain") @@ -169,7 +162,7 @@ func TestManager_SaveAndRestoreFromStore(t *testing.T) { func TestManager_GetNextPendingEvent_NonExistent(t *testing.T) { t.Parallel() cfg := tempConfig(t) - st := memStore(t) + st := testMemStore(t) m, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) @@ -181,7 +174,7 @@ func TestManager_GetNextPendingEvent_NonExistent(t *testing.T) { func TestPendingHeadersAndData_Flow(t *testing.T) { t.Parallel() - st := memStore(t) + st := testMemStore(t) ctx := context.Background() logger := zerolog.Nop() @@ -247,7 +240,7 @@ func TestPendingHeadersAndData_Flow(t *testing.T) { func TestManager_TxOperations(t *testing.T) { t.Parallel() cfg := tempConfig(t) - st := memStore(t) + st := testMemStore(t) m, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) @@ -268,7 +261,7 @@ func TestManager_TxOperations(t *testing.T) { func TestManager_CleanupOldTxs(t *testing.T) { t.Parallel() cfg := tempConfig(t) - st := memStore(t) + st := testMemStore(t) m, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) @@ -296,7 +289,7 @@ func TestManager_CleanupOldTxs(t *testing.T) { func TestManager_CleanupOldTxs_SelectiveRemoval(t *testing.T) { t.Parallel() cfg := tempConfig(t) - st := memStore(t) + st := testMemStore(t) m, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) @@ -330,7 +323,7 @@ func TestManager_CleanupOldTxs_SelectiveRemoval(t *testing.T) { func TestManager_CleanupOldTxs_DefaultDuration(t *testing.T) { t.Parallel() cfg := tempConfig(t) - st := memStore(t) + st := testMemStore(t) m, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) @@ -359,7 +352,7 @@ func TestManager_CleanupOldTxs_DefaultDuration(t *testing.T) { func TestManager_CleanupOldTxs_NoTransactions(t *testing.T) { t.Parallel() cfg := tempConfig(t) - st := memStore(t) + st := testMemStore(t) m, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) @@ -372,7 +365,7 @@ func TestManager_CleanupOldTxs_NoTransactions(t *testing.T) { func TestManager_TxCache_NotPersistedToStore(t *testing.T) { t.Parallel() cfg := tempConfig(t) - st := memStore(t) + st := testMemStore(t) // Create first manager and add transactions m1, err := NewManager(cfg, st, zerolog.Nop()) @@ -400,7 +393,7 @@ func TestManager_TxCache_NotPersistedToStore(t *testing.T) { func TestManager_DeleteHeight_PreservesTxCache(t *testing.T) { t.Parallel() cfg := tempConfig(t) - st := memStore(t) + st := testMemStore(t) m, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) @@ -429,7 +422,7 @@ func TestManager_DeleteHeight_PreservesTxCache(t *testing.T) { func TestManager_DAInclusionPersistence(t *testing.T) { t.Parallel() cfg := tempConfig(t) - st := memStore(t) + st := testMemStore(t) ctx := context.Background() // Create blocks and save to store @@ -484,7 +477,7 @@ func TestManager_DaHeightAfterCacheClear(t *testing.T) { t.Parallel() cfg := tempConfig(t) - st := memStore(t) + st := testMemStore(t) ctx := context.Background() // Store a block first @@ -529,7 +522,7 @@ func TestManager_DaHeightFromStoreOnRestore(t *testing.T) { t.Parallel() cfg := tempConfig(t) - st := memStore(t) + st := testMemStore(t) ctx := context.Background() // Store a block first diff --git a/block/internal/cache/pending_data_test.go b/block/internal/cache/pending_data_test.go index 10dc87382..06e1dd992 100644 --- a/block/internal/cache/pending_data_test.go +++ b/block/internal/cache/pending_data_test.go @@ -15,7 +15,7 @@ import ( func TestPendingData_BasicFlow(t *testing.T) { t.Parallel() ctx := context.Background() - store := memStore(t) + store := testMemStore(t) // three blocks with transactions chainID := "pd-basic" @@ -62,7 +62,7 @@ func TestPendingData_BasicFlow(t *testing.T) { func TestPendingData_AdvancesPastEmptyData(t *testing.T) { t.Parallel() ctx := context.Background() - store := memStore(t) + store := testMemStore(t) // Create blocks: non-empty, empty, empty, non-empty chainID := "pd-empty" @@ -108,7 +108,7 @@ func TestPendingData_AdvancesPastEmptyData(t *testing.T) { func TestPendingData_AdvancesPastAllEmptyToEnd(t *testing.T) { t.Parallel() ctx := context.Background() - store := memStore(t) + store := testMemStore(t) // Create blocks: non-empty, empty, empty (all remaining are empty) chainID := "pd-all-empty" @@ -144,7 +144,7 @@ func TestPendingData_AdvancesPastAllEmptyToEnd(t *testing.T) { func TestPendingData_AdvancesPastEmptyAtStart(t *testing.T) { t.Parallel() ctx := context.Background() - store := memStore(t) + store := testMemStore(t) // Create blocks: empty, empty, non-empty chainID := "pd-empty-start" @@ -206,7 +206,7 @@ func TestPendingData_InitFromMetadata(t *testing.T) { func TestPendingData_GetPending_PropagatesFetchError(t *testing.T) { t.Parallel() ctx := context.Background() - store := memStore(t) + store := testMemStore(t) // Set height to 1 but do not save any block data batch, err := store.NewBatch(ctx) diff --git a/block/internal/cache/pending_headers_test.go b/block/internal/cache/pending_headers_test.go index 25c029700..8691f3a57 100644 --- a/block/internal/cache/pending_headers_test.go +++ b/block/internal/cache/pending_headers_test.go @@ -15,7 +15,7 @@ import ( func TestPendingHeaders_BasicFlow(t *testing.T) { t.Parallel() ctx := context.Background() - store := memStore(t) + store := testMemStore(t) // create and persist three blocks chainID := "ph-basic" @@ -67,7 +67,7 @@ func TestPendingHeaders_BasicFlow(t *testing.T) { func TestPendingHeaders_EmptyWhenUpToDate(t *testing.T) { t.Parallel() ctx := context.Background() - store := memStore(t) + store := testMemStore(t) h, d := types.GetRandomBlock(1, 1, "ph-up") batch, err := store.NewBatch(ctx) diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index aa7c02e71..34fab216d 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -349,17 +349,16 @@ func (s *Submitter) processDAInclusionLoop() { return } + // Persist DA included height before advancing in-memory state + if err := putUint64Metadata(s.ctx, s.store, store.DAIncludedHeightKey, nextHeight); err != nil { + s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("failed to persist DA included height") + break + } + // Update DA included height s.SetDAIncludedHeight(nextHeight) currentDAIncluded = nextHeight - // Persist DA included height - bz := make([]byte, 8) - binary.LittleEndian.PutUint64(bz, nextHeight) - if err := s.store.SetMetadata(s.ctx, store.DAIncludedHeightKey, bz); err != nil { - s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("failed to persist DA included height") - } - // Delete height cache for that height // This can only be performed after the height has been persisted to store s.cache.DeleteHeight(nextHeight) @@ -415,6 +414,13 @@ func (s *Submitter) initializeDAIncludedHeight(ctx context.Context) error { return nil } +// putUint64Metadata encodes val as 8-byte little-endian and writes it to the store. +func putUint64Metadata(ctx context.Context, st store.Store, key string, val uint64) error { + bz := make([]byte, 8) + binary.LittleEndian.PutUint64(bz, val) + return st.SetMetadata(ctx, key, bz) +} + // sendCriticalError sends a critical error to the error channel without blocking func (s *Submitter) sendCriticalError(err error) { if s.errorCh != nil { @@ -431,41 +437,33 @@ func (s *Submitter) sendCriticalError(err error) { func (s *Submitter) setNodeHeightToDAHeight(ctx context.Context, height uint64, data *types.Data, genesisInclusion bool) error { dataHash := data.DACommitment() - headerDaHeightBytes := make([]byte, 8) daHeightForHeader, ok := s.cache.GetHeaderDAIncludedByHeight(height) if !ok { return fmt.Errorf("header for height %d not found in cache", height) } - binary.LittleEndian.PutUint64(headerDaHeightBytes, daHeightForHeader) - if err := s.store.SetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(height), headerDaHeightBytes); err != nil { + if err := putUint64Metadata(ctx, s.store, store.GetHeightToDAHeightHeaderKey(height), daHeightForHeader); err != nil { return err } genesisDAIncludedHeight := daHeightForHeader - dataDaHeightBytes := make([]byte, 8) - // For empty transactions, use the same DA height as the header - if bytes.Equal(dataHash, common.DataHashForEmptyTxs) { - binary.LittleEndian.PutUint64(dataDaHeightBytes, daHeightForHeader) - } else { + // For empty transactions, use the same DA height as the header. + dataDAHeight := daHeightForHeader + if !bytes.Equal(dataHash, common.DataHashForEmptyTxs) { daHeightForData, ok := s.cache.GetDataDAIncludedByHeight(height) if !ok { return fmt.Errorf("data for height %d not found in cache", height) } - binary.LittleEndian.PutUint64(dataDaHeightBytes, daHeightForData) - + dataDAHeight = daHeightForData // if data posted before header, use data da included height for genesis da height genesisDAIncludedHeight = min(daHeightForData, genesisDAIncludedHeight) } - if err := s.store.SetMetadata(ctx, store.GetHeightToDAHeightDataKey(height), dataDaHeightBytes); err != nil { + if err := putUint64Metadata(ctx, s.store, store.GetHeightToDAHeightDataKey(height), dataDAHeight); err != nil { return err } if genesisInclusion { - genesisDAIncludedHeightBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(genesisDAIncludedHeightBytes, genesisDAIncludedHeight) - - if err := s.store.SetMetadata(ctx, store.GenesisDAHeightKey, genesisDAIncludedHeightBytes); err != nil { + if err := putUint64Metadata(ctx, s.store, store.GenesisDAHeightKey, genesisDAIncludedHeight); err != nil { return err } diff --git a/pkg/store/types.go b/pkg/store/types.go index 90b29a51b..b1b1f2bd5 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -2,12 +2,19 @@ package store import ( "context" + "errors" ds "github.com/ipfs/go-datastore" "github.com/evstack/ev-node/types" ) +// ErrNotFound is returned when a key is not present in the store. +var ErrNotFound = ds.ErrNotFound + +// IsNotFound reports whether err (or any error in its chain) is a not-found error. +func IsNotFound(err error) bool { return errors.Is(err, ds.ErrNotFound) } + // Batch provides atomic operations for the store type Batch interface { // SaveBlockData atomically saves the block header, data, and signature