@@ -17,13 +17,15 @@ import (
1717// SubscriberHandler is the callback interface for subscription consumers.
1818// Implementations drive the consumer-specific logic (caching, piping events, etc.).
1919type SubscriberHandler interface {
20- // HandleEvent processes a subscription event inline (fast path).
21- // Called on the followLoop goroutine for each subscription event.
22- HandleEvent (ctx context.Context , ev datypes.SubscriptionEvent )
20+ // HandleEvent processes a subscription event.
21+ // isInline is true if the subscriber successfully claimed this height (via CAS).
22+ // Returning an error when isInline is true instructs the Subscriber to roll back the localDAHeight.
23+ HandleEvent (ctx context.Context , ev datypes.SubscriptionEvent , isInline bool ) error
2324
2425 // HandleCatchup is called for each height during sequential catchup.
25- // The subscriber advances localDAHeight only after this returns nil.
26- // Returning an error triggers a backoff retry.
26+ // The subscriber advances localDAHeight only after this returns (true, nil).
27+ // Returning (false, nil) rolls back localDAHeight without triggering a backoff.
28+ // Returning an error rolls back localDAHeight and triggers a backoff retry.
2729 HandleCatchup (ctx context.Context , height uint64 ) error
2830}
2931
@@ -36,6 +38,8 @@ type SubscriberConfig struct {
3638 Handler SubscriberHandler
3739 // Deprecated: Remove with https://github.com/evstack/ev-node/issues/3142
3840 FetchBlockTimestamp bool // the timestamp comes with an extra api call before Celestia v0.29.1-mocha.
41+
42+ StartHeight uint64 // initial localDAHeight
3943}
4044
4145// Subscriber is a shared DA subscription primitive that encapsulates the
@@ -87,17 +91,15 @@ func NewSubscriber(cfg SubscriberConfig) *Subscriber {
8791 daBlockTime : cfg .DABlockTime ,
8892 fetchBlockTimestamp : cfg .FetchBlockTimestamp ,
8993 }
94+ s .localDAHeight .Store (cfg .StartHeight )
95+ s .catchupSignal <- struct {}{}
96+
9097 if len (s .namespaces ) == 0 {
9198 s .logger .Warn ().Msg ("no namespaces configured, subscriber will stay idle" )
9299 }
93100 return s
94101}
95102
96- // SetStartHeight sets the initial local DA height before Start is called.
97- func (s * Subscriber ) SetStartHeight (height uint64 ) {
98- s .localDAHeight .Store (height )
99- }
100-
101103// Start begins the follow and catchup goroutines.
102104func (s * Subscriber ) Start (ctx context.Context ) error {
103105 if len (s .namespaces ) == 0 {
@@ -108,6 +110,7 @@ func (s *Subscriber) Start(ctx context.Context) error {
108110 s .wg .Add (2 )
109111 go s .followLoop (ctx )
110112 go s .catchupLoop (ctx )
113+
111114 return nil
112115}
113116
@@ -134,29 +137,6 @@ func (s *Subscriber) HasReachedHead() bool {
134137 return s .headReached .Load ()
135138}
136139
137- // CompareAndSwapLocalHeight attempts a CAS on localDAHeight.
138- // Used by handlers that want to claim exclusive processing of a height.
139- func (s * Subscriber ) CompareAndSwapLocalHeight (old , new uint64 ) bool {
140- return s .localDAHeight .CompareAndSwap (old , new )
141- }
142-
143- // SetLocalHeight stores a new localDAHeight value.
144- func (s * Subscriber ) SetLocalHeight (height uint64 ) {
145- s .localDAHeight .Store (height )
146- }
147-
148- // UpdateHighestForTest directly sets the highest seen DA height and signals catchup.
149- // Only for use in tests that bypass the subscription loop.
150- func (s * Subscriber ) UpdateHighestForTest (height uint64 ) {
151- s .updateHighest (height )
152- }
153-
154- // RunCatchupForTest runs a single catchup pass. Only for use in tests that
155- // bypass the catchup loop's signal-wait mechanism.
156- func (s * Subscriber ) RunCatchupForTest (ctx context.Context ) {
157- s .runCatchup (ctx )
158- }
159-
160140// ---------------------------------------------------------------------------
161141// Follow loop
162142// ---------------------------------------------------------------------------
@@ -215,7 +195,21 @@ func (s *Subscriber) runSubscription(ctx context.Context) error {
215195 return errors .New ("subscription channel closed" )
216196 }
217197 s .updateHighest (ev .Height )
218- s .handler .HandleEvent (ctx , ev )
198+
199+ local := s .localDAHeight .Load ()
200+ isInline := ev .Height == local && s .localDAHeight .CompareAndSwap (local , local + 1 )
201+
202+ err = s .handler .HandleEvent (ctx , ev , isInline )
203+ if isInline {
204+ if err != nil {
205+ s .localDAHeight .Store (local )
206+ s .logger .Debug ().Err (err ).Uint64 ("da_height" , ev .Height ).
207+ Msg ("inline processing skipped/failed, rolling back" )
208+ } else {
209+ s .headReached .Store (true )
210+ }
211+ }
212+
219213 watchdog .Reset (watchdogTimeout )
220214 case <- watchdog .C :
221215 return errors .New ("subscription watchdog: no events received, reconnecting" )
@@ -325,9 +319,7 @@ func (s *Subscriber) runCatchup(ctx context.Context) {
325319 }
326320
327321 local := s .localDAHeight .Load ()
328- highest := s .highestSeenDAHeight .Load ()
329-
330- if local > highest {
322+ if local > s .highestSeenDAHeight .Load () {
331323 s .headReached .Store (true )
332324 return
333325 }
0 commit comments