Skip to content

Commit e3a0ed3

Browse files
committed
feat: implement sync mode metrics and subscription functionality for DA client
1 parent 41cac58 commit e3a0ed3

File tree

8 files changed

+672
-14
lines changed

8 files changed

+672
-14
lines changed

block/internal/common/metrics.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ type Metrics struct {
6969
// Forced inclusion metrics
7070
ForcedInclusionTxsInGracePeriod metrics.Gauge // Number of forced inclusion txs currently in grace period
7171
ForcedInclusionTxsMalicious metrics.Counter // Total number of forced inclusion txs marked as malicious
72+
73+
// Sync mode metrics
74+
SyncMode metrics.Gauge // Current sync mode: 0=catchup, 1=follow
75+
SubscribeErrors metrics.Counter // Number of subscription failures
76+
ModeSwitches metrics.Counter // Number of catchup<->follow mode transitions
7277
}
7378

7479
// PrometheusMetrics returns Metrics built using Prometheus client library
@@ -201,6 +206,28 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
201206
Help: "Total number of forced inclusion transactions marked as malicious (past grace boundary)",
202207
}, labels).With(labelsAndValues...)
203208

209+
// Sync mode metrics
210+
m.SyncMode = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
211+
Namespace: namespace,
212+
Subsystem: MetricsSubsystem,
213+
Name: "sync_mode",
214+
Help: "Current sync mode: 0=catchup (polling), 1=follow (subscription)",
215+
}, labels).With(labelsAndValues...)
216+
217+
m.SubscribeErrors = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
218+
Namespace: namespace,
219+
Subsystem: MetricsSubsystem,
220+
Name: "subscribe_errors_total",
221+
Help: "Total number of DA subscription failures",
222+
}, labels).With(labelsAndValues...)
223+
224+
m.ModeSwitches = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
225+
Namespace: namespace,
226+
Subsystem: MetricsSubsystem,
227+
Name: "mode_switches_total",
228+
Help: "Total number of sync mode transitions between catchup and follow",
229+
}, labels).With(labelsAndValues...)
230+
204231
// DA Submitter metrics
205232
m.DASubmitterPendingBlobs = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
206233
Namespace: namespace,
@@ -269,6 +296,11 @@ func NopMetrics() *Metrics {
269296
// Forced inclusion metrics
270297
ForcedInclusionTxsInGracePeriod: discard.NewGauge(),
271298
ForcedInclusionTxsMalicious: discard.NewCounter(),
299+
300+
// Sync mode metrics
301+
SyncMode: discard.NewGauge(),
302+
SubscribeErrors: discard.NewCounter(),
303+
ModeSwitches: discard.NewCounter(),
272304
}
273305

274306
// Initialize maps with no-op metrics

block/internal/da/client.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,3 +442,27 @@ func (c *client) Validate(ctx context.Context, ids []datypes.ID, proofs []datype
442442

443443
return results, nil
444444
}
445+
446+
// Subscribe subscribes to blobs in the specified namespace.
447+
// Returns a channel that receives subscription responses as new blobs are included.
448+
func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) {
449+
ns, err := share.NewNamespaceFromBytes(namespace)
450+
if err != nil {
451+
return nil, fmt.Errorf("invalid namespace: %w", err)
452+
}
453+
454+
return c.blobAPI.Subscribe(ctx, ns)
455+
}
456+
457+
// LocalHead returns the height of the locally synced DA head.
458+
func (c *client) LocalHead(ctx context.Context) (uint64, error) {
459+
headCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
460+
defer cancel()
461+
462+
header, err := c.headerAPI.LocalHead(headCtx)
463+
if err != nil {
464+
return 0, fmt.Errorf("failed to get local head: %w", err)
465+
}
466+
467+
return header.Height, nil
468+
}

block/internal/da/interface.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package da
33
import (
44
"context"
55

6+
blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc"
67
datypes "github.com/evstack/ev-node/pkg/da/types"
78
)
89

@@ -22,6 +23,15 @@ type Client interface {
2223
GetDataNamespace() []byte
2324
GetForcedInclusionNamespace() []byte
2425
HasForcedInclusionNamespace() bool
26+
27+
// Subscribe subscribes to blobs in the specified namespace.
28+
// Returns a channel that receives subscription responses as new blobs are included.
29+
// Used for follow mode to receive real-time blob notifications.
30+
Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error)
31+
32+
// LocalHead returns the height of the locally synced DA head.
33+
// Used to determine if the node is caught up with the DA layer.
34+
LocalHead(ctx context.Context) (uint64, error)
2535
}
2636

2737
// Verifier defines the interface for DA proof verification operations.

block/internal/syncing/syncer.go

Lines changed: 234 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,45 @@ import (
2222
"github.com/evstack/ev-node/block/internal/common"
2323
"github.com/evstack/ev-node/block/internal/da"
2424
"github.com/evstack/ev-node/pkg/config"
25+
blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc"
2526
datypes "github.com/evstack/ev-node/pkg/da/types"
2627
"github.com/evstack/ev-node/pkg/genesis"
2728
"github.com/evstack/ev-node/pkg/store"
2829
"github.com/evstack/ev-node/types"
2930
)
3031

32+
// SyncMode represents the current synchronization mode for the DA worker.
33+
type SyncMode int
34+
35+
const (
36+
// SyncModeCatchup indicates the node is behind the DA chain head and polling aggressively.
37+
SyncModeCatchup SyncMode = iota
38+
// SyncModeFollow indicates the node is caught up and using subscription for real-time updates.
39+
SyncModeFollow
40+
)
41+
42+
// String returns a human-readable representation of the sync mode.
43+
func (m SyncMode) String() string {
44+
switch m {
45+
case SyncModeCatchup:
46+
return "catchup"
47+
case SyncModeFollow:
48+
return "follow"
49+
default:
50+
return "unknown"
51+
}
52+
}
53+
54+
const (
55+
// catchupThreshold is the number of DA blocks behind local head
56+
// before switching from follow to catchup mode.
57+
catchupThreshold = 2
58+
59+
// followWatchdogMultiplier is the multiplier for BlockTime
60+
// used as subscription watchdog timeout.
61+
followWatchdogMultiplier = 3
62+
)
63+
3164
// forcedInclusionGracePeriodConfig contains internal configuration for forced inclusion grace periods.
3265
type forcedInclusionGracePeriodConfig struct {
3366
// basePeriod is the base number of additional epochs allowed for including forced inclusion transactions
@@ -118,6 +151,9 @@ type Syncer struct {
118151

119152
// P2P wait coordination
120153
p2pWaitState atomic.Value // stores p2pWaitState
154+
155+
// Sync mode tracking
156+
currentSyncMode atomic.Int32 // stores SyncMode as int32
121157
}
122158

123159
// pendingForcedInclusionTx represents a forced inclusion transaction that hasn't been included yet
@@ -318,28 +354,213 @@ func (s *Syncer) daWorkerLoop() {
318354
defer s.logger.Info().Msg("DA worker stopped")
319355

320356
for {
321-
err := s.fetchDAUntilCaughtUp()
357+
select {
358+
case <-s.ctx.Done():
359+
return
360+
default:
361+
}
322362

323-
var backoff time.Duration
324-
if err == nil {
325-
// No error, means we are caught up.
326-
backoff = s.config.DA.BlockTime.Duration
327-
} else {
328-
// Error, back off for a shorter duration.
329-
backoff = s.config.DA.BlockTime.Duration
330-
if backoff <= 0 {
331-
backoff = 2 * time.Second
332-
}
363+
mode := s.determineSyncMode()
364+
previousMode := SyncMode(s.currentSyncMode.Load())
365+
366+
// Track mode switches
367+
if mode != previousMode {
368+
s.currentSyncMode.Store(int32(mode))
369+
s.metrics.ModeSwitches.Add(1)
370+
s.logger.Info().
371+
Str("from", previousMode.String()).
372+
Str("to", mode.String()).
373+
Msg("sync mode changed")
374+
}
375+
376+
switch mode {
377+
case SyncModeCatchup:
378+
s.runCatchupMode()
379+
case SyncModeFollow:
380+
s.runFollowMode()
333381
}
382+
}
383+
}
334384

385+
// determineSyncMode checks the current DA sync status and returns the appropriate mode.
386+
func (s *Syncer) determineSyncMode() SyncMode {
387+
// If DA client is nil (e.g., in tests), default to catchup mode
388+
if s.daClient == nil {
389+
return SyncModeCatchup
390+
}
391+
392+
localHead, err := s.daClient.LocalHead(s.ctx)
393+
if err != nil {
394+
// Default to catchup on error - safer to poll than assume we're caught up
395+
s.logger.Debug().Err(err).Msg("failed to get local DA head, defaulting to catchup mode")
396+
return SyncModeCatchup
397+
}
398+
399+
currentDAHeight := s.daRetrieverHeight.Load()
400+
401+
// Consider "caught up" if within catchupThreshold blocks of local head
402+
if currentDAHeight+catchupThreshold >= localHead {
403+
return SyncModeFollow
404+
}
405+
return SyncModeCatchup
406+
}
407+
408+
// runCatchupMode runs the catchup sync mode - aggressive polling until caught up.
409+
func (s *Syncer) runCatchupMode() {
410+
s.logger.Debug().Msg("running catchup mode")
411+
s.metrics.SyncMode.Set(float64(SyncModeCatchup))
412+
413+
err := s.fetchDAUntilCaughtUp()
414+
if errors.Is(err, context.Canceled) {
415+
return
416+
}
417+
418+
// Back off before next iteration:
419+
// - On error: wait before retrying to avoid hammering a failing DA layer
420+
// - On success (caught up): wait for new DA blocks to appear
421+
backoff := s.config.DA.BlockTime.Duration
422+
if backoff <= 0 {
423+
backoff = 2 * time.Second
424+
}
425+
426+
if err != nil {
427+
s.logger.Debug().Err(err).Msg("catchup failed, backing off before retry")
428+
} else {
429+
s.logger.Debug().Msg("caught up with DA, backing off before next check")
430+
}
431+
432+
s.sleepOrDone(backoff)
433+
}
434+
435+
// runFollowMode runs the follow sync mode - subscription-based real-time updates.
436+
func (s *Syncer) runFollowMode() {
437+
s.logger.Debug().Msg("running follow mode")
438+
s.metrics.SyncMode.Set(float64(SyncModeFollow))
439+
440+
err := s.subscribeAndFollow()
441+
if err != nil && !errors.Is(err, context.Canceled) {
442+
s.metrics.SubscribeErrors.Add(1)
443+
s.logger.Warn().Err(err).Msg("subscribe failed, falling back to catchup")
444+
// Don't sleep - go straight to catchup mode to recover
445+
}
446+
}
447+
448+
// subscribeAndFollow uses the DA subscription API to receive real-time blob notifications.
449+
// It subscribes to both header and data namespaces and processes incoming blobs.
450+
// Returns when subscription fails, context is cancelled, or node falls behind.
451+
func (s *Syncer) subscribeAndFollow() error {
452+
// Get namespaces
453+
headerNS := s.daClient.GetHeaderNamespace()
454+
dataNS := s.daClient.GetDataNamespace()
455+
456+
// Create subscription context with cancellation
457+
subCtx, cancel := context.WithCancel(s.ctx)
458+
defer cancel()
459+
460+
// Subscribe to header namespace
461+
headerCh, err := s.daClient.Subscribe(subCtx, headerNS)
462+
if err != nil {
463+
return fmt.Errorf("failed to subscribe to header namespace: %w", err)
464+
}
465+
466+
// Subscribe to data namespace (only if different from header namespace)
467+
var dataCh <-chan *blobrpc.SubscriptionResponse
468+
if !bytes.Equal(headerNS, dataNS) {
469+
dataCh, err = s.daClient.Subscribe(subCtx, dataNS)
470+
if err != nil {
471+
return fmt.Errorf("failed to subscribe to data namespace: %w", err)
472+
}
473+
}
474+
475+
s.logger.Info().Msg("subscribed to DA namespaces for follow mode")
476+
477+
// Calculate watchdog timeout
478+
watchdogTimeout := s.config.DA.BlockTime.Duration * followWatchdogMultiplier
479+
if watchdogTimeout <= 0 {
480+
watchdogTimeout = 30 * time.Second
481+
}
482+
483+
// Process subscription events
484+
for {
335485
select {
336486
case <-s.ctx.Done():
337-
return
338-
case <-time.After(backoff):
487+
return s.ctx.Err()
488+
489+
case resp, ok := <-headerCh:
490+
if !ok {
491+
return errors.New("header subscription closed")
492+
}
493+
if err := s.processSubscriptionResponse(resp); err != nil {
494+
s.logger.Error().Err(err).Uint64("height", resp.Height).Msg("failed to process header subscription")
495+
}
496+
497+
case resp, ok := <-dataCh:
498+
if dataCh == nil {
499+
// Data channel not used (same namespace), continue
500+
continue
501+
}
502+
if !ok {
503+
return errors.New("data subscription closed")
504+
}
505+
if err := s.processSubscriptionResponse(resp); err != nil {
506+
s.logger.Error().Err(err).Uint64("height", resp.Height).Msg("failed to process data subscription")
507+
}
508+
509+
case <-time.After(watchdogTimeout):
510+
// Watchdog: if no events for watchdogTimeout, recheck mode
511+
// Might have fallen behind due to network issues
512+
s.logger.Debug().Dur("timeout", watchdogTimeout).Msg("subscription watchdog triggered, checking sync mode")
513+
if s.determineSyncMode() == SyncModeCatchup {
514+
return errors.New("fell behind, switching to catchup")
515+
}
339516
}
340517
}
341518
}
342519

520+
// processSubscriptionResponse processes a subscription response and sends events to the processing channel.
521+
func (s *Syncer) processSubscriptionResponse(resp *blobrpc.SubscriptionResponse) error {
522+
if resp == nil || len(resp.Blobs) == 0 {
523+
return nil
524+
}
525+
526+
s.logger.Debug().
527+
Uint64("da_height", resp.Height).
528+
Int("blobs", len(resp.Blobs)).
529+
Msg("processing subscription response")
530+
531+
// Convert blobs to raw byte slices for processing
532+
blobs := make([][]byte, len(resp.Blobs))
533+
for i, blob := range resp.Blobs {
534+
blobs[i] = blob.Data()
535+
}
536+
537+
// Process blobs using the DA retriever's processBlobs method
538+
events := s.daRetriever.(*daRetriever).processBlobs(s.ctx, blobs, resp.Height)
539+
540+
// Send events to the processing channel
541+
for _, event := range events {
542+
select {
543+
case s.heightInCh <- event:
544+
s.logger.Debug().
545+
Uint64("height", event.Header.Height()).
546+
Uint64("da_height", event.DaHeight).
547+
Msg("sent subscription event to processing")
548+
default:
549+
s.cache.SetPendingEvent(event.Header.Height(), &event)
550+
s.logger.Debug().
551+
Uint64("height", event.Header.Height()).
552+
Msg("subscription event queued as pending")
553+
}
554+
}
555+
556+
// Update retriever height
557+
if resp.Height >= s.daRetrieverHeight.Load() {
558+
s.daRetrieverHeight.Store(resp.Height + 1)
559+
}
560+
561+
return nil
562+
}
563+
343564
func (s *Syncer) fetchDAUntilCaughtUp() error {
344565
for {
345566
select {

0 commit comments

Comments
 (0)