Skip to content

[WIP]Introduce HTLC event stream#376

Open
sekulicd wants to merge 4 commits intomasterfrom
feat/htlc-event-stream
Open

[WIP]Introduce HTLC event stream#376
sekulicd wants to merge 4 commits intomasterfrom
feat/htlc-event-stream

Conversation

@sekulicd
Copy link
Copy Markdown
Contributor

@sekulicd sekulicd commented Mar 9, 2026

Summary

  • Adds GetEventStream server-side streaming gRPC RPC to Service that emits wallet and VHTLC lifecycle events
  • 5 event types: tx_associated, htlc_created, htlc_funded, htlc_spent, htlc_refundable
  • Follows ChainSwapEventCallback pattern (PR Chain Swap #369) with buffered channel (1024) and non-blocking sends
  • Uses existing listenerHandler[T] fan-out pattern for concurrent stream consumers
  • REST streaming via grpc-gateway at GET /v1/events (chunked JSON)

Changes

Proto (service.proto)

  • GetEventStream RPC returning stream GetEventStreamResponse
  • GetEventStreamResponse with oneof event containing 5 event message types
  • Each event type carries relevant fields (vhtlc_id, txid, amount, spend_type, etc.)

Application Layer (internal/core/application/)

  • New htlc_events.go: HtlcEvent domain type with HtlcEventType and SpendType constants
  • service.go: htlcEvents buffered channel, emitHtlcEvent() non-blocking helper, GetHtlcEvents() accessor
  • Event emission at all VHTLC lifecycle points:
    • htlc_created in GetSwapVHTLC() after DB persistence
    • htlc_funded in postProcess closures (GetInvoice, IncreaseInboundCapacity, PayInvoice, PayOffer)
    • htlc_spent (claimed) in ClaimVHTLC(), SettleVHTLCWithClaimPath()
    • htlc_spent (refunded) in RefundVHTLC(), SettleVHTLCWithRefundPath(), scheduled refunds
    • htlc_refundable in scheduleSwapRefund() and scheduleChainSwapRefund() before unilateral attempt

gRPC Handler (internal/interface/grpc/handlers/)

  • New event_handler.go: eventHandler with listenerHandler[T] fan-out, listenToHtlcEvents() goroutine, GetEventStream() implementation
  • notification_handler.go: forwards VTXO notifications as TxAssociatedEvent to event stream consumers
  • service_handler.go: delegates GetEventStream to eventHandler

Tests (12 new)

  • 4 application-layer tests: channel behavior, all event types, drop-on-full
  • 8 handler-layer tests: proto conversion, listener lifecycle, fan-out, tx_associated

Test plan

  • buf lint passes
  • go build ./... compiles cleanly
  • go vet ./... clean
  • 12/12 new unit tests pass
  • make proto && make build && make test (requires Docker)
  • Manual gRPC stream test with grpcurl
  • E2E swap lifecycle verifying event sequence

Closes #343

Summary by CodeRabbit

  • New Features

    • Real-time streaming API (GET /v1/events) for wallet, VHTLC, and chain-swap lifecycle events, including timestamps, IDs, amounts, addresses, and transaction references.
    • Added Tx-associated event payloads and expanded chain-swap events (create, user/server locked, claimed, refunded, failed) with refund-kind and spend-type semantics.
    • Server-side listener support to fan out events to multiple consumers.
  • Tests

    • New unit and extensive e2e tests covering event mapping, ordering, delivery to multiple listeners, and payload correctness.

Add a server-side streaming gRPC RPC `GetEventStream` to the Service
that emits wallet and VHTLC lifecycle events:
- tx_associated: any transaction touching the fulmine wallet
- htlc_created: new VHTLC created
- htlc_funded: VHTLC received funds via submarine swap
- htlc_spent: VHTLC claimed or refunded
- htlc_refundable: refund locktime reached

The implementation follows the ChainSwapEventCallback pattern (PR #369)
with a buffered channel (1024) and non-blocking sends. The gRPC handler
uses the existing listenerHandler[T] fan-out pattern for concurrent
stream consumers.

Closes #343

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Mar 9, 2026

📝 Walkthrough

Walkthrough

Adds a streaming GetEventStream API (gRPC + OpenAPI) for wallet, VHTLC, and chain-swap lifecycle events; introduces application HtlcEvent and ChainSwapEvent models and channels; wires event emission across service flows; implements listener fan‑out in gRPC handlers; and adds unit and e2e tests for streaming behavior.

Changes

Cohort / File(s) Summary
API Specs
api-spec/openapi/swagger/fulmine/v1/service.swagger.json, api-spec/protobuf/fulmine/v1/service.proto
Added GET /v1/events streaming endpoint and protobuf/openapi types/messages: GetEventStreamRequest/Response, streaming wrappers, TxAssociatedEvent, Htlc* events, ChainSwap* events, and related enums.
Application event model
internal/core/application/htlc_events.go, internal/core/application/chainswap_events.go
New HtlcEvent and ChainSwapEvent types, enums for event kinds and refund/spend types, and structs modeling event payloads.
Service integration
internal/core/application/service.go
Added buffered htlcEvents and chainSwapEvents channels (1024), GetHtlcEvents() / GetChainSwapEvents() accessors, and non-blocking emitHtlcEvent() / emitChainSwapEvent() helpers; instrumented many VHTLC and chain-swap flows to emit events.
Application tests
internal/core/application/htlc_events_test.go, internal/core/application/chainswap_events_test.go
Unit tests for event channel behavior, buffering/non-blocking emits, and verification across all HTLC and ChainSwap event variants.
gRPC event streaming
internal/interface/grpc/handlers/event_handler.go, internal/interface/grpc/handlers/event_handler_test.go
Implemented server-streaming GetEventStream, per-connection listener registration, fan‑out, conversion helpers (toEventStreamResponse, toChainSwapEventStreamResponse, toTxAssociatedResponse), and tests validating mappings and multi-consumer delivery.
Handler wiring & constructors
internal/interface/grpc/handlers/service_handler.go, internal/interface/grpc/handlers/notification_handler.go, internal/interface/grpc/service.go
Added NewEventListenerHandler factory; extended NewServiceHandler and NewNotificationHandler signatures to accept eventListenerHandler and stopCh; notification handler fans out TxAssociated events to listeners.
E2E tests
internal/test/e2e/event_stream_test.go, internal/test/e2e/main_test.go
Extensive end-to-end tests exercising GetEventStream across VHTLC lifecycle, submarine swaps, refunds, and comprehensive chain-swap scenarios; minor whitespace tweak in main_test.
Test infra
test.docker-compose.yml
Small config key rename in test compose (currencies.lndcurrencies.lnds).

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant GRPC as gRPC Handler
    participant ListenerMgr as Listener Manager
    participant AppSvc as Application Service
    participant EventCh as Event Channel

    Client->>GRPC: GetEventStream()
    activate GRPC

    GRPC->>ListenerMgr: Register listener
    ListenerMgr->>ListenerMgr: add listener entry

    AppSvc->>EventCh: emitHtlcEvent / emitChainSwapEvent
    EventCh-->>ListenerMgr: event received
    ListenerMgr->>GRPC: push GetEventStreamResponse
    GRPC->>Client: Stream GetEventStreamResponse

    alt Client cancels
        Client->>GRPC: Cancel
        GRPC->>ListenerMgr: Unregister listener
    end
    deactivate GRPC
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

  • Chain Swap #369: Adds HTLC and ChainSwap event types and streaming RPCs — directly related to API and event model additions in this PR.
  • Recover VHTLC #359: Modifies VHTLC settlement flows in Service — overlaps where emit points were added for spent/refunded events.
  • Swap pkg #235: Refactors swap/pay flows and PayOffer — related to code paths instrumented to emit funded/spent events.

Suggested labels

enhancement

Suggested reviewers

  • altafan
  • bordalix

Poem

🐰 I nudge a stream with twitching whiskers bright,
HTLCs hop past in the soft moonlight,
Created, funded, claimed or returned,
I watch each hop and happily churn,
A tiny rabbit cheering every event in flight.

🚥 Pre-merge checks | ✅ 2 | ❌ 3

❌ Failed checks (2 warnings, 1 inconclusive)

Check name Status Explanation Resolution
Out of Scope Changes check ⚠️ Warning The PR includes chain swap event streaming (chainswap_events.go, related service changes, handler implementations) which extends beyond the linked issue #343 that specifically requests HTLC events only. While beneficial, chain swap events represent scope expansion not covered in the linked requirements. Either update linked issue #343 to include chain swap event requirements, or move chain swap event implementation to a separate PR for focused review and scope management.
Docstring Coverage ⚠️ Warning Docstring coverage is 22.86% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title '[WIP]Introduce HTLC event stream' is partially related to the changeset but does not capture the full scope. The PR introduces not only HTLC event streams but also chain swap event streams and comprehensive event handling infrastructure, making the title incomplete. Consider updating the title to reflect the broader scope: 'Introduce HTLC and chain swap event streams' or 'Introduce event streaming for HTLC and chain swap lifecycle' to better represent all changes included.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Linked Issues check ✅ Passed The PR successfully implements all requirements from issue #343: tx_associated events, htlc_created, htlc_funded, htlc_spent with spend_type metadata, and htlc_refundable events. All objectives are met with proper implementation across application, gRPC handlers, and comprehensive testing.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/htlc-event-stream

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (2)
internal/interface/grpc/handlers/notification_handler.go (1)

29-29: Prefer named fields in struct initialization for maintainability.

Positional struct initialization is fragile—if the field order in notificationHandler changes, this line will silently compile with incorrect assignments.

♻️ Suggested fix
-	svc := &notificationHandler{appSvc, handler, eventListenerHandler, stopCh}
+	svc := &notificationHandler{
+		svc:                         appSvc,
+		notificationListenerHandler: handler,
+		eventListenerHandler:        eventListenerHandler,
+		stopCh:                      stopCh,
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/interface/grpc/handlers/notification_handler.go` at line 29, The
struct is being initialized positionally in svc := &notificationHandler{appSvc,
handler, eventListenerHandler, stopCh} which is fragile; change it to a
named-field composite literal using the notificationHandler field names (e.g.,
AppSvc:, Handler:, EventListenerHandler:, StopCh:) so each field is explicitly
set—locate the notificationHandler type and update this initialization to use
those named fields for clarity and maintainability.
internal/interface/grpc/handlers/event_handler.go (1)

109-112: Default case returns a response with nil Event.

When an unknown event type is encountered, the returned GetEventStreamResponse has a timestamp but no Event set. Clients consuming the stream should handle this gracefully. The warning log is helpful for debugging. This is acceptable, but consider whether skipping broadcast entirely for unknown types might be cleaner.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/interface/grpc/handlers/event_handler.go` around lines 109 - 112,
The default branch currently logs unknown htlc types and returns a
GetEventStreamResponse with a timestamp but no Event, which causes clients to
receive empty events; update the default case in the event switch (the code that
constructs GetEventStreamResponse for e.Type) to skip broadcasting entirely for
unknown event types instead of returning a response with a nil Event — i.e., log
the warning via log.Warnf("unknown htlc event type: %s", e.Type) and then return
nil or otherwise avoid sending a GetEventStreamResponse (containing Event) to
the stream so callers do not receive empty Event objects; ensure any caller of
the function that expects GetEventStreamResponse handles the new nil/skip
behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@api-spec/protobuf/fulmine/v1/service.proto`:
- Around line 556-558: The HtlcRefundableEvent message's refund_locktime (used
by scheduleSwapRefund()/scheduleChainSwapRefund() and determined via
IsSeconds()) is ambiguous; change the proto to make the unit explicit — e.g.
replace refund_locktime with either a oneof { uint64 block_height = X; uint64
unix_timestamp = Y } or add an enum LocktimeUnit { HEIGHT, TIMESTAMP } plus a
uint64 locktime field — and update all serialization/deserialization and any
producers/consumers that set/read HtlcRefundableEvent (referencing
HtlcRefundableEvent, refund_locktime, scheduleSwapRefund(),
scheduleChainSwapRefund(), and IsSeconds()) so emitted messages unambiguously
indicate whether the value is a block height or a Unix timestamp.

In `@internal/core/application/service.go`:
- Around line 2304-2309: The htlc_refundable event is emitted too early; move
the emitHtlcEvent call that creates an HtlcEvent{Type: HtlcEventRefundable, ...}
so it runs only after GetVHTLCFunds(...) confirms the HTLC is unspent (check
Spent == false) and immediately before calling RefundSwap(...). Update both
locations (the one around emitHtlcEvent with vhtlcId and RefundLocktime and the
second occurrence at the other block) to ensure the emission happens after the
unspent check passes, using the same fields (vhtlcId,
uint64(opts.RefundLocktime), Timestamp) so the stream reflects the current
state.

In `@internal/interface/grpc/handlers/event_handler.go`:
- Around line 54-58: The current loop in event_handler.go spawns goroutines that
send pbEvent into each listener's unbuffered channel
(eventListenerHandler.listeners / listener.ch) and can block forever if a
listener is slow or gone; change the send to a non-blocking pattern (use a
select with a default case or a select with a timeout) inside the goroutine so
the send to l.ch either succeeds or is dropped/handled without blocking; ensure
you reference the listener type (listener[*pb.GetEventStreamResponse]), the l.ch
channel and the pbEvent variable when making the change and optionally log or
count dropped sends rather than blocking.
- Around line 117-129: The toTxAssociatedResponse function currently sets
Timestamp using time.Now().Unix(), causing inconsistency with
toEventStreamResponse which uses the event's own timestamp; add a timestamp
field to application.Notification (e.g., Timestamp int64) and populate it in
service.go at the point where the Notification is constructed from the indexer
event, then modify toTxAssociatedResponse to use n.Timestamp instead of
time.Now().Unix() so all GetEventStreamResponse variants use the original event
time for consistent ordering.

In `@internal/interface/grpc/handlers/notification_handler.go`:
- Around line 118-123: The fan-out goroutines sending to listener channels (the
anonymous goroutine that does l.ch <-
&pb.GetVtxoNotificationsResponse{Notification: toNotificationProto(event)}) can
block forever if a listener stops consuming; change the send to be non-blocking
(use a select with default) or switch listener.ch to a buffered channel and drop
the message on full to avoid goroutine leaks, and apply the same change to the
equivalent send used at the event stream fan-out (the other goroutine around
lines 131–134); ensure removeListener logic remains unchanged but now
lost/dropped messages are safe and do not block publishers.

In `@internal/interface/grpc/handlers/service_handler.go`:
- Around line 34-45: The HTLC fan-out spawns a goroutine per listener send in
listenToHtlcEvents while GetEventStream registers listeners with unbuffered
channels (listenerHanlder / ch), which leaks goroutines under backpressure; fix
by creating the listener channel with a small bounded buffer (e.g., make(chan
*pb.GetEventStreamResponse, 4)) when listeners are created in GetEventStream or
modify the send path in listenToHtlcEvents to use non-blocking sends (select
with default) and drop or unregister slow listeners when the channel is full so
blocked sends cannot accumulate.

In `@internal/interface/grpc/service.go`:
- Around line 120-125: The shutdown uses a single send on appStopCh so only one
goroutine wakes; change Stop() to broadcast the shutdown by closing the channel
(close(appStopCh)) or switch to a cancellable context (context.WithCancel()) and
propagate the cancel function to all handlers; update uses where appStopCh is
created and passed into handlers.NewServiceHandler,
handlers.NewNotificationHandler, handlers.NewEventListenerHandler and change any
single-send logic in Stop() to use close(appStopCh) (or call cancel()) so every
goroutine selecting on appStopCh receives the shutdown signal.

---

Nitpick comments:
In `@internal/interface/grpc/handlers/event_handler.go`:
- Around line 109-112: The default branch currently logs unknown htlc types and
returns a GetEventStreamResponse with a timestamp but no Event, which causes
clients to receive empty events; update the default case in the event switch
(the code that constructs GetEventStreamResponse for e.Type) to skip
broadcasting entirely for unknown event types instead of returning a response
with a nil Event — i.e., log the warning via log.Warnf("unknown htlc event type:
%s", e.Type) and then return nil or otherwise avoid sending a
GetEventStreamResponse (containing Event) to the stream so callers do not
receive empty Event objects; ensure any caller of the function that expects
GetEventStreamResponse handles the new nil/skip behavior.

In `@internal/interface/grpc/handlers/notification_handler.go`:
- Line 29: The struct is being initialized positionally in svc :=
&notificationHandler{appSvc, handler, eventListenerHandler, stopCh} which is
fragile; change it to a named-field composite literal using the
notificationHandler field names (e.g., AppSvc:, Handler:, EventListenerHandler:,
StopCh:) so each field is explicitly set—locate the notificationHandler type and
update this initialization to use those named fields for clarity and
maintainability.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: f3d731a9-8696-444e-be60-e35e22a9dde8

📥 Commits

Reviewing files that changed from the base of the PR and between 193e617 and 3fc9191.

⛔ Files ignored due to path filters (3)
  • api-spec/protobuf/gen/go/fulmine/v1/service.pb.go is excluded by !**/*.pb.go, !**/gen/**
  • api-spec/protobuf/gen/go/fulmine/v1/service.pb.gw.go is excluded by !**/*.pb.gw.go, !**/gen/**
  • api-spec/protobuf/gen/go/fulmine/v1/service_grpc.pb.go is excluded by !**/*.pb.go, !**/gen/**
📒 Files selected for processing (10)
  • api-spec/openapi/swagger/fulmine/v1/service.swagger.json
  • api-spec/protobuf/fulmine/v1/service.proto
  • internal/core/application/htlc_events.go
  • internal/core/application/htlc_events_test.go
  • internal/core/application/service.go
  • internal/interface/grpc/handlers/event_handler.go
  • internal/interface/grpc/handlers/event_handler_test.go
  • internal/interface/grpc/handlers/notification_handler.go
  • internal/interface/grpc/handlers/service_handler.go
  • internal/interface/grpc/service.go

Comment on lines +556 to +558
message HtlcRefundableEvent {
string vhtlc_id = 1;
uint64 refund_locktime = 2;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make refund_locktime self-describing.

The application emits both height-based and second-based absolute locktimes here (scheduleSwapRefund() / scheduleChainSwapRefund() branch on IsSeconds()), but this message only exposes a bare uint64. Clients cannot tell whether a value like 800000 is a block height or a Unix timestamp, so the field is ambiguous to consume. Add a locktime type/unit enum, or split this into explicit height/time variants.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api-spec/protobuf/fulmine/v1/service.proto` around lines 556 - 558, The
HtlcRefundableEvent message's refund_locktime (used by
scheduleSwapRefund()/scheduleChainSwapRefund() and determined via IsSeconds())
is ambiguous; change the proto to make the unit explicit — e.g. replace
refund_locktime with either a oneof { uint64 block_height = X; uint64
unix_timestamp = Y } or add an enum LocktimeUnit { HEIGHT, TIMESTAMP } plus a
uint64 locktime field — and update all serialization/deserialization and any
producers/consumers that set/read HtlcRefundableEvent (referencing
HtlcRefundableEvent, refund_locktime, scheduleSwapRefund(),
scheduleChainSwapRefund(), and IsSeconds()) so emitted messages unambiguously
indicate whether the value is a block height or a Unix timestamp.

Comment on lines +2304 to +2309
s.emitHtlcEvent(HtlcEvent{
Type: HtlcEventRefundable,
Timestamp: time.Now().Unix(),
VhtlcId: vhtlcId,
RefundLocktime: uint64(opts.RefundLocktime),
})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Only emit htlc_refundable after confirming the HTLC is still unspent.

Both callbacks publish the event before GetVHTLCFunds() and the Spent check. On recovery paths, an HTLC that was already claimed/refunded elsewhere will still be advertised as refundable, which makes the stream lie about the current state. Move this emission to just before RefundSwap(...), after the unspent check passes.

Also applies to: 2390-2395

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/core/application/service.go` around lines 2304 - 2309, The
htlc_refundable event is emitted too early; move the emitHtlcEvent call that
creates an HtlcEvent{Type: HtlcEventRefundable, ...} so it runs only after
GetVHTLCFunds(...) confirms the HTLC is unspent (check Spent == false) and
immediately before calling RefundSwap(...). Update both locations (the one
around emitHtlcEvent with vhtlcId and RefundLocktime and the second occurrence
at the other block) to ensure the emission happens after the unspent check
passes, using the same fields (vhtlcId, uint64(opts.RefundLocktime), Timestamp)
so the stream reflects the current state.

Comment on lines +54 to +58
for _, l := range h.eventListenerHandler.listeners {
go func(l *listener[*pb.GetEventStreamResponse]) {
l.ch <- pbEvent
}(l)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Same goroutine leak risk as noted in notification_handler.go.

The spawned goroutines send to unbuffered channels without a timeout or non-blocking select. If a listener is slow or disconnected, these goroutines block indefinitely. Apply the same non-blocking send pattern here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/interface/grpc/handlers/event_handler.go` around lines 54 - 58, The
current loop in event_handler.go spawns goroutines that send pbEvent into each
listener's unbuffered channel (eventListenerHandler.listeners / listener.ch) and
can block forever if a listener is slow or gone; change the send to a
non-blocking pattern (use a select with a default case or a select with a
timeout) inside the goroutine so the send to l.ch either succeeds or is
dropped/handled without blocking; ensure you reference the listener type
(listener[*pb.GetEventStreamResponse]), the l.ch channel and the pbEvent
variable when making the change and optionally log or count dropped sends rather
than blocking.

Comment on lines +117 to +129
func toTxAssociatedResponse(n application.Notification) *pb.GetEventStreamResponse {
return &pb.GetEventStreamResponse{
Timestamp: time.Now().Unix(),
Event: &pb.GetEventStreamResponse_TxAssociated{
TxAssociated: &pb.TxAssociatedEvent{
Txid: n.Txid,
Tx: n.Tx,
NewVtxos: toVtxosProto(n.NewVtxos),
SpentVtxos: toVtxosProto(n.SpentVtxos),
},
},
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if application.Notification has a Timestamp field
ast-grep --pattern 'type Notification struct {
  $$$
}'

Repository: ArkLabsHQ/fulmine

Length of output: 2021


🏁 Script executed:

#!/bin/bash
# Find the definition of indexer.TxData
rg "type TxData struct" --type go -A 10

Repository: ArkLabsHQ/fulmine

Length of output: 917


🏁 Script executed:

#!/bin/bash
# Find the toEventStreamResponse function to compare timestamp handling
rg "func toEventStreamResponse" -A 20 --type go

Repository: ArkLabsHQ/fulmine

Length of output: 1783


🏁 Script executed:

#!/bin/bash
# Check all event handler conversion functions in the file
cat -n internal/interface/grpc/handlers/event_handler.go | head -150

Repository: ArkLabsHQ/fulmine

Length of output: 4636


🏁 Script executed:

#!/bin/bash
# Find application.HtlcEvent definition
rg "type HtlcEvent struct" --type go -A 15

Repository: ArkLabsHQ/fulmine

Length of output: 833


🏁 Script executed:

#!/bin/bash
# Find where toTxAssociatedResponse is called
rg "toTxAssociatedResponse" --type go -B 3 -A 3

Repository: ArkLabsHQ/fulmine

Length of output: 2088


🏁 Script executed:

#!/bin/bash
# Find where Notification is created to understand the event emission context
rg "application.Notification{" --type go -B 5 -A 5

Repository: ArkLabsHQ/fulmine

Length of output: 174


🏁 Script executed:

#!/bin/bash
# Find where Notification is created - search for "Notification struct" in service.go
rg "Notification\{" --type go -B 5 -A 10 | head -100

Repository: ArkLabsHQ/fulmine

Length of output: 6973


🏁 Script executed:

#!/bin/bash
# Check the context where toTxAssociatedResponse is called in notification_handler.go
cat -n internal/interface/grpc/handlers/notification_handler.go | grep -A 30 "toTxAssociatedResponse"

Repository: ArkLabsHQ/fulmine

Length of output: 944


🏁 Script executed:

#!/bin/bash
# Check the indexer.ScriptEvent to see if it has timestamp information
rg "type ScriptEvent struct" --type go -A 15

Repository: ArkLabsHQ/fulmine

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Search for ScriptEvent in all file types
rg "ScriptEvent" --type go -l | head -5

Repository: ArkLabsHQ/fulmine

Length of output: 139


🏁 Script executed:

#!/bin/bash
# Check where ScriptEvent comes from in service.go
grep -n "ScriptEvent" internal/core/application/service.go | head -10

Repository: ArkLabsHQ/fulmine

Length of output: 257


🏁 Script executed:

#!/bin/bash
# Find indexer package and ScriptEvent definition
find . -name "*.go" -type f | xargs grep -l "type ScriptEvent" | head -1

Repository: ArkLabsHQ/fulmine

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Get the full path and check ScriptEvent in indexer
rg "type ScriptEvent" --type go -A 20

Repository: ArkLabsHQ/fulmine

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Check imports in service.go to find where indexer package comes from
head -30 internal/core/application/service.go | grep -E "import|indexer"

Repository: ArkLabsHQ/fulmine

Length of output: 69


🏁 Script executed:

#!/bin/bash
# Search for where indexer is defined or imported
rg "import.*indexer" --type go | head -5

Repository: ArkLabsHQ/fulmine

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Check if ScriptEvent might have timestamp in subscription.go
rg "ScriptEvent" internal/core/application/subscription.go -A 5 -B 5 | head -50

Repository: ArkLabsHQ/fulmine

Length of output: 574


🏁 Script executed:

#!/bin/bash
# Get full imports section from service.go
sed -n '1,50p' internal/core/application/service.go

Repository: ArkLabsHQ/fulmine

Length of output: 1661


🏁 Script executed:

#!/bin/bash
# Check if indexer is a local package or external by looking at go.mod or directory structure
find . -name "indexer" -type d 2>/dev/null | head -5

Repository: ArkLabsHQ/fulmine

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Look at the full test to understand expectations
cat -n internal/interface/grpc/handlers/event_handler_test.go | sed -n '200,250p'

Repository: ArkLabsHQ/fulmine

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Check if there are any other tests related to timestamp ordering
rg "Timestamp" internal/interface/grpc/handlers/event_handler_test.go -B 3 -A 3

Repository: ArkLabsHQ/fulmine

Length of output: 1839


Timestamp sourced differently between event types in same stream.

toEventStreamResponse preserves the event's own Timestamp, while toTxAssociatedResponse uses time.Now().Unix(). This inconsistency may cause ordering confusion when clients correlate events from the same stream.

Since application.Notification does not have a timestamp field, consider adding one to capture the original event time at the emission site in service.go where the notification is created from the indexer event.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/interface/grpc/handlers/event_handler.go` around lines 117 - 129,
The toTxAssociatedResponse function currently sets Timestamp using
time.Now().Unix(), causing inconsistency with toEventStreamResponse which uses
the event's own timestamp; add a timestamp field to application.Notification
(e.g., Timestamp int64) and populate it in service.go at the point where the
Notification is constructed from the indexer event, then modify
toTxAssociatedResponse to use n.Timestamp instead of time.Now().Unix() so all
GetEventStreamResponse variants use the original event time for consistent
ordering.

Comment on lines 118 to 123
go func(l *listener[*pb.GetVtxoNotificationsResponse]) {
l.ch <- &pb.GetVtxoNotificationsResponse{
Notification: toNotificationProto(event),
}
}(l)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fan-out goroutines can block indefinitely on slow/dead listeners.

The unbuffered channel sends (l.ch <- ...) in the spawned goroutines will block forever if a listener stops consuming (e.g., client disconnects before removeListener runs). This causes goroutine leaks.

Consider a non-blocking send with a select/default or a buffered channel with drop semantics.

♻️ Non-blocking send alternative
 go func(l *listener[*pb.GetVtxoNotificationsResponse]) {
-    l.ch <- &pb.GetVtxoNotificationsResponse{
-        Notification: toNotificationProto(event),
-    }
+    select {
+    case l.ch <- &pb.GetVtxoNotificationsResponse{
+        Notification: toNotificationProto(event),
+    }:
+    default:
+        // Listener is not consuming; drop event to avoid goroutine leak
+    }
 }(l)

Apply similarly for the event stream fan-out below.

Also applies to: 131-134

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/interface/grpc/handlers/notification_handler.go` around lines 118 -
123, The fan-out goroutines sending to listener channels (the anonymous
goroutine that does l.ch <- &pb.GetVtxoNotificationsResponse{Notification:
toNotificationProto(event)}) can block forever if a listener stops consuming;
change the send to be non-blocking (use a select with default) or switch
listener.ch to a buffered channel and drop the message on full to avoid
goroutine leaks, and apply the same change to the equivalent send used at the
event stream fan-out (the other goroutine around lines 131–134); ensure
removeListener logic remains unchanged but now lost/dropped messages are safe
and do not block publishers.

Comment on lines +34 to +45
func NewServiceHandler(
svc *application.Service,
stopCh <-chan struct{},
eventListenerHandler *listenerHanlder[*pb.GetEventStreamResponse],
) pb.ServiceServer {
handler := &serviceHandler{
svc: svc,
eventListenerHandler: eventListenerHandler,
stopCh: stopCh,
}
go handler.listenToHtlcEvents()
return handler
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Verify that GetEventStream uses unbuffered listener channels and
# listenToHtlcEvents performs goroutine-per-send fan-out.
rg -n -C3 'func \(h \*serviceHandler\) GetEventStream|ch: make\(chan \*pb\.GetEventStreamResponse\)|func \(h \*serviceHandler\) listenToHtlcEvents|go func\(l \*listener\[\*pb\.GetEventStreamResponse\]\)|l\.ch <- pbEvent'

Repository: ArkLabsHQ/fulmine

Length of output: 3817


This HTLC fan-out path will leak goroutines under backpressure.

NewServiceHandler starts listenToHtlcEvents(), which fans out HTLC events by spawning go func { l.ch <- pbEvent } for each listener. However, GetEventStream() creates listener channels with no buffer (ch: make(chan *pb.GetEventStreamResponse)). A slow or disconnected client leaves blocked sender goroutines behind for every event that arrives.

Replace unbuffered channels with a small bounded buffer (e.g., make(chan *pb.GetEventStreamResponse, 4)), or handle full channels explicitly by dropping the send or the listener to prevent goroutine accumulation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/interface/grpc/handlers/service_handler.go` around lines 34 - 45,
The HTLC fan-out spawns a goroutine per listener send in listenToHtlcEvents
while GetEventStream registers listeners with unbuffered channels
(listenerHanlder / ch), which leaks goroutines under backpressure; fix by
creating the listener channel with a small bounded buffer (e.g., make(chan
*pb.GetEventStreamResponse, 4)) when listeners are created in GetEventStream or
modify the send path in listenToHtlcEvents to use non-blocking sends (select
with default) and drop or unregister slow listeners when the channel is full so
blocked sends cannot accumulate.

Comment on lines +120 to +125
eventListenerHandler := handlers.NewEventListenerHandler()

serviceHandler := handlers.NewServiceHandler(appSvc, appStopCh, eventListenerHandler)
pb.RegisterServiceServer(grpcServer, serviceHandler)

notificationHandler := handlers.NewNotificationHandler(appSvc, appStopCh)
notificationHandler := handlers.NewNotificationHandler(appSvc, appStopCh, eventListenerHandler)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Verify how appStopCh is created, shared, and consumed.
rg -n -C3 '\bappStopCh\b|func NewServiceHandler|func NewNotificationHandler|<-\s*[A-Za-z0-9_\.]*stopCh'

Repository: ArkLabsHQ/fulmine

Length of output: 5851


Use close(appStopCh) instead of sending a single stop token to broadcast shutdown to all handlers.

Multiple handlers receive the same appStopCh, but Stop() at line 349 performs a single send to a buffered channel. Only one goroutine will observe this value; any other goroutine selecting on appStopCh will remain blocked. Replace the send with close(appStopCh) or switch to context.WithCancel() to ensure the shutdown signal reaches all consumers.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/interface/grpc/service.go` around lines 120 - 125, The shutdown uses
a single send on appStopCh so only one goroutine wakes; change Stop() to
broadcast the shutdown by closing the channel (close(appStopCh)) or switch to a
cancellable context (context.WithCancel()) and propagate the cancel function to
all handlers; update uses where appStopCh is created and passed into
handlers.NewServiceHandler, handlers.NewNotificationHandler,
handlers.NewEventListenerHandler and change any single-send logic in Stop() to
use close(appStopCh) (or call cancel()) so every goroutine selecting on
appStopCh receives the shutdown signal.

Three tests verify the event stream works during VHTLC lifecycle:
- TestEventStreamVHTLCLifecycle: create → fund → claim, verifies
  htlc_created and htlc_spent (CLAIMED) events with correct ordering
- TestEventStreamSubmarineSwap: PayInvoice via Boltz, verifies
  htlc_funded event (skips gracefully if Boltz routing unavailable)
- TestEventStreamRefundVHTLC: create → fund → refund, verifies
  htlc_created and htlc_spent (REFUNDED) events with correct ordering

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@sekulicd sekulicd changed the title Introduce HTLC event stream [WIP]Introduce HTLC event stream Mar 9, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
internal/test/e2e/event_stream_test.go (2)

260-269: Don't couple the Boltz skip path to one exact error string.

This test is trying to isolate stream behavior, but it only skips when err.Error() contains could not find route. Other transient Boltz/LN failures will still fail the suite, and matching on gRPC error text is brittle. Prefer a helper that classifies skip-worthy upstream failures from structured status/details if the server exposes them.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/test/e2e/event_stream_test.go` around lines 260 - 269, The current
skip logic in the event stream test is brittle because it only checks
err.Error() for "could not find route"; replace that ad-hoc string match with a
small helper like isSkipWorthError(err error) and call it before
streamCancel/streamDone and t.Skipf; implement the helper to use grpc/status
(status.FromError) and classify upstream/transient errors (e.g.,
codes.Unavailable, codes.ResourceExhausted, codes.FailedPrecondition or any
server-defined error details) so the test skips for a broader set of Boltz/LN
transient errors instead of relying on a single text match, and keep the final
require.NoError(t, err) for non-skip cases.

18-208: Factor the claim/refund lifecycle cases into a shared table-driven harness.

TestEventStreamVHTLCLifecycle and TestEventStreamRefundVHTLC repeat the same stream setup, collection, filtering, and ordering checks, differing mainly in the settle step and expected spend type. A table with subtests would remove a lot of duplication and make future lifecycle event cases cheaper to add. As per coding guidelines "Write table-driven tests when possible and use subtests for clarity in Go testing".

Also applies to: 329-481

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/test/e2e/event_stream_test.go` around lines 18 - 208, The two tests
TestEventStreamVHTLCLifecycle and TestEventStreamRefundVHTLC duplicate stream
setup, background collection, filtering by vhtlc id, and ordering/assertion
logic; refactor by extracting a shared table-driven harness: create a helper
(e.g., runVHTLCLifecycleSubtest or a closure) that performs the common steps
(open stream via GetEventStream, spawn receiver goroutine, create preimage/hash,
CreateVHTLC, SendOffChain, collect/filter events, and perform ordering/timestamp
assertions) and accept parameters per case (settle action: ClaimVHTLC vs Refund
flow, expected spend type like pb.HtlcSpentEvent_SPEND_TYPE_CLAIMED/REFUNDED,
any different waits). Replace the two tests with a table of cases that calls the
helper in t.Run subtests, keeping unique assertions (expected spend type and
settle call) configurable via the case fields.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/test/e2e/event_stream_test.go`:
- Around line 39-45: Tests currently rely on time.Sleep to wait for the gRPC
event stream subscription created by GetEventStream(), which is racy because
event listeners are only registered inside GetEventStream() and missed events
are not replayed; replace these sleeps with a deterministic barrier: after
calling f.GetEventStream(streamCtx, &pb.GetEventStreamRequest{}) use stream.Recv
(or a small loop that calls stream.Recv with a timeout) to wait for either an
explicit "subscription ready" marker event or the first expected event before
issuing mutations, and likewise wait (by receiving until you observe the
expected htlc_spent event or a confirmation marker) before cancelling
streamCancel; alternatively add a ready signal in event_handler.go (e.g., emit a
subscription-established event) and have the test block on that signal instead
of sleeping so assertions for htlc_created/htlc_spent become deterministic.
- Around line 304-324: The loop currently accepts the first non-empty
htlc_funded from the global events stream, which can be unrelated; capture a
stable identifier produced by this test's flow (e.g., the vhtlc id or
payment/invoice hash returned by your PayInvoice/GetSwapVHTLC call) and filter
events by that identifier before asserting. Concretely: obtain the expected id
from the PayInvoice/GetSwapVHTLC response, then change the loop that checks
ev.GetHtlcFunded() to also verify funded.GetVhtlcId() (or
fundingTxid/paymentHash) equals the expected value, only then perform
NotEmpty/NotZero assertions and set foundFunded. Use the same symbol names shown
(PayInvoice, GetSwapVHTLC, ev.GetHtlcFunded, funded.GetVhtlcId) so the test
targets the swap under test.

---

Nitpick comments:
In `@internal/test/e2e/event_stream_test.go`:
- Around line 260-269: The current skip logic in the event stream test is
brittle because it only checks err.Error() for "could not find route"; replace
that ad-hoc string match with a small helper like isSkipWorthError(err error)
and call it before streamCancel/streamDone and t.Skipf; implement the helper to
use grpc/status (status.FromError) and classify upstream/transient errors (e.g.,
codes.Unavailable, codes.ResourceExhausted, codes.FailedPrecondition or any
server-defined error details) so the test skips for a broader set of Boltz/LN
transient errors instead of relying on a single text match, and keep the final
require.NoError(t, err) for non-skip cases.
- Around line 18-208: The two tests TestEventStreamVHTLCLifecycle and
TestEventStreamRefundVHTLC duplicate stream setup, background collection,
filtering by vhtlc id, and ordering/assertion logic; refactor by extracting a
shared table-driven harness: create a helper (e.g., runVHTLCLifecycleSubtest or
a closure) that performs the common steps (open stream via GetEventStream, spawn
receiver goroutine, create preimage/hash, CreateVHTLC, SendOffChain,
collect/filter events, and perform ordering/timestamp assertions) and accept
parameters per case (settle action: ClaimVHTLC vs Refund flow, expected spend
type like pb.HtlcSpentEvent_SPEND_TYPE_CLAIMED/REFUNDED, any different waits).
Replace the two tests with a table of cases that calls the helper in t.Run
subtests, keeping unique assertions (expected spend type and settle call)
configurable via the case fields.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 1fbc929e-0bf1-4cdf-ab67-a26787d447ae

📥 Commits

Reviewing files that changed from the base of the PR and between 3fc9191 and d036e74.

📒 Files selected for processing (1)
  • internal/test/e2e/event_stream_test.go

Comment on lines +39 to +45
// Open the event stream before performing any VHTLC operations so we
// are guaranteed to capture all events.
streamCtx, streamCancel := context.WithCancel(ctx)
defer streamCancel()

stream, err := f.GetEventStream(streamCtx, &pb.GetEventStreamRequest{})
require.NoError(t, err)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use a deterministic stream barrier instead of wall-clock sleeps.

internal/interface/grpc/handlers/event_handler.go only registers listeners inside GetEventStream() and does not replay missed events. These time.Sleep calls therefore do not guarantee the subscription was active before the first mutation or that the last event arrived before cancellation, so the htlc_created / htlc_spent assertions can flake under CI load. Please wait on concrete stream conditions (or add a ready signal) before issuing mutations and before closing the stream.

Also applies to: 97-99, 118-123, 226-231, 274-279, 341-346, 398-423

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/test/e2e/event_stream_test.go` around lines 39 - 45, Tests currently
rely on time.Sleep to wait for the gRPC event stream subscription created by
GetEventStream(), which is racy because event listeners are only registered
inside GetEventStream() and missed events are not replayed; replace these sleeps
with a deterministic barrier: after calling f.GetEventStream(streamCtx,
&pb.GetEventStreamRequest{}) use stream.Recv (or a small loop that calls
stream.Recv with a timeout) to wait for either an explicit "subscription ready"
marker event or the first expected event before issuing mutations, and likewise
wait (by receiving until you observe the expected htlc_spent event or a
confirmation marker) before cancelling streamCancel; alternatively add a ready
signal in event_handler.go (e.g., emit a subscription-established event) and
have the test block on that signal instead of sleeping so assertions for
htlc_created/htlc_spent become deterministic.

Comment on lines +304 to +324
// During a submarine swap, PayInvoice internally:
// 1. Creates a VHTLC via Boltz (htlc_created emitted from GetSwapVHTLC)
// 2. Funds the VHTLC (htlc_funded emitted from PayInvoice)
// We check for htlc_funded because that confirms the swap pipeline emitted
// the event. htlc_created may or may not arrive depending on timing
// (it's emitted in a goroutine).
var foundFunded bool
for _, ev := range events {
if funded := ev.GetHtlcFunded(); funded != nil {
foundFunded = true
require.NotEmpty(t, funded.GetVhtlcId(), "htlc_funded vhtlc_id should not be empty")
require.NotEmpty(t, funded.GetFundingTxid(), "htlc_funded funding_txid should not be empty")
require.NotZero(t, funded.GetAmount(), "htlc_funded amount should not be zero")
require.NotZero(t, ev.GetTimestamp(), "htlc_funded timestamp should not be zero")
t.Logf("verified htlc_funded: vhtlc_id=%s, funding_txid=%s, amount=%d",
funded.GetVhtlcId(), funded.GetFundingTxid(), funded.GetAmount())
break
}
}
require.True(t, foundFunded,
"expected at least one htlc_funded event during submarine swap")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Correlate htlc_funded to the swap under test.

The stream is global, and this loop passes on the first htlc_funded event with non-empty fields. Unrelated activity after subscription can satisfy that, so the test can go green even if this PayInvoice call never emitted its own funded event. Filter on a stable identifier from this flow before asserting.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/test/e2e/event_stream_test.go` around lines 304 - 324, The loop
currently accepts the first non-empty htlc_funded from the global events stream,
which can be unrelated; capture a stable identifier produced by this test's flow
(e.g., the vhtlc id or payment/invoice hash returned by your
PayInvoice/GetSwapVHTLC call) and filter events by that identifier before
asserting. Concretely: obtain the expected id from the PayInvoice/GetSwapVHTLC
response, then change the loop that checks ev.GetHtlcFunded() to also verify
funded.GetVhtlcId() (or fundingTxid/paymentHash) equals the expected value, only
then perform NotEmpty/NotZero assertions and set foundFunded. Use the same
symbol names shown (PayInvoice, GetSwapVHTLC, ev.GetHtlcFunded,
funded.GetVhtlcId) so the test targets the swap under test.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (2)
internal/interface/grpc/handlers/event_handler.go (2)

19-21: ⚠️ Potential issue | 🔴 Critical

Remove the detached send goroutines from the fan-out path.

Line 56 sends into an unbuffered listener.ch from a goroutine. If a client disconnects, Line 25 only removes the listener from the registry, so that goroutine can block forever; if stop() closes the channel first, it can panic with send on closed channel. A buffered per-listener channel plus a non-blocking send under eventListenerHandler.lock avoids both failure modes.

Suggested direction
 	listener := &listener[*pb.GetEventStreamResponse]{
 		id: uuid.NewString(),
-		ch: make(chan *pb.GetEventStreamResponse),
+		ch: make(chan *pb.GetEventStreamResponse, 16),
 	}
@@
 			pbEvent := toEventStreamResponse(event)
 			h.eventListenerHandler.lock.Lock()
 			for _, l := range h.eventListenerHandler.listeners {
-				go func(l *listener[*pb.GetEventStreamResponse]) {
-					l.ch <- pbEvent
-				}(l)
+				select {
+				case l.ch <- pbEvent:
+				default:
+					log.Warnf("dropping event for slow listener %s", l.id)
+				}
 			}
 			h.eventListenerHandler.lock.Unlock()

Also applies to: 24-25, 53-57

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/interface/grpc/handlers/event_handler.go` around lines 19 - 21, The
fan-out code creates a listener with an unbuffered channel (listener.ch) and
spawns detached goroutines that send into it, which can block or panic if the
listener is removed or stopped; change listener.ch to a buffered channel (e.g.,
size 1 or small N) when constructing the listener in the
listener[*pb.GetEventStreamResponse] creation, remove the detached send
goroutines, and instead perform a non-blocking send to listener.ch while holding
eventListenerHandler.lock (so sends never race with stop() closing the channel);
ensure stop() still closes the channel and registry removal simply deletes the
listener without leaving any blocked goroutines.

118-120: ⚠️ Potential issue | 🟠 Major

Preserve the original event time for tx_associated.

Line 120 timestamps the protobuf at conversion time, not when the wallet event happened. That makes tx_associated the only variant in this stream whose Timestamp can drift, so mixed wallet/HTLC events can be mis-ordered client-side. Please carry the timestamp on application.Notification from the emission site and use that value here instead of time.Now().Unix().

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/interface/grpc/handlers/event_handler.go` around lines 118 - 120,
toTxAssociatedResponse currently sets Timestamp to the conversion time
(time.Now().Unix()), causing reorderable events; change the function to read the
original event time from the application.Notification passed in (in
toTxAssociatedResponse) and set pb.GetEventStreamResponse.Timestamp to that
value instead, converting to Unix seconds if the Notification timestamp is a
time.Time (e.g., n.Time.Unix()) or using n.Timestamp directly if it is already
an int64; update toTxAssociatedResponse to use that field rather than
time.Now().Unix().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@internal/interface/grpc/handlers/event_handler.go`:
- Around line 19-21: The fan-out code creates a listener with an unbuffered
channel (listener.ch) and spawns detached goroutines that send into it, which
can block or panic if the listener is removed or stopped; change listener.ch to
a buffered channel (e.g., size 1 or small N) when constructing the listener in
the listener[*pb.GetEventStreamResponse] creation, remove the detached send
goroutines, and instead perform a non-blocking send to listener.ch while holding
eventListenerHandler.lock (so sends never race with stop() closing the channel);
ensure stop() still closes the channel and registry removal simply deletes the
listener without leaving any blocked goroutines.
- Around line 118-120: toTxAssociatedResponse currently sets Timestamp to the
conversion time (time.Now().Unix()), causing reorderable events; change the
function to read the original event time from the application.Notification
passed in (in toTxAssociatedResponse) and set
pb.GetEventStreamResponse.Timestamp to that value instead, converting to Unix
seconds if the Notification timestamp is a time.Time (e.g., n.Time.Unix()) or
using n.Timestamp directly if it is already an int64; update
toTxAssociatedResponse to use that field rather than time.Now().Unix().

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 67bb16b7-052c-4a1f-af3a-79869f3f401d

📥 Commits

Reviewing files that changed from the base of the PR and between d036e74 and 7e65c42.

📒 Files selected for processing (2)
  • internal/interface/grpc/handlers/event_handler.go
  • internal/test/e2e/main_test.go
✅ Files skipped from review due to trivial changes (1)
  • internal/test/e2e/main_test.go

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (4)
internal/core/application/service.go (1)

2413-2418: ⚠️ Potential issue | 🟠 Major

Emit htlc_refundable only after the spend check passes.

These callbacks still publish the event before GetVHTLCFunds() / Spent verification. On recovery paths, an HTLC that was already claimed or refunded elsewhere is advertised as refundable, so the stream can lie about current state. Move the emit to just before RefundSwap(...) after the Spent guard.

Also applies to: 2499-2504

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/core/application/service.go` around lines 2413 - 2418, The code
currently emits an HtlcEvent with Type HtlcEventRefundable (via s.emitHtlcEvent)
before verifying GetVHTLCFunds()/Spent, which can advertise refunds for
already-spent HTLCs; move the emitHtlcEvent call so it runs only after the Spent
guard succeeds and just before calling RefundSwap(...). Update both occurrences
(the block that creates HtlcEvent with VhtlcId/vhtlcId and RefundLocktime
uint64(opts.RefundLocktime)) to emit after the GetVHTLCFunds()/Spent check and
immediately prior to invoking RefundSwap to ensure refundable events reflect the
verified state.
api-spec/protobuf/fulmine/v1/service.proto (1)

562-565: ⚠️ Potential issue | 🟠 Major

Make refund_locktime self-describing.

scheduleSwapRefund() and scheduleChainSwapRefund() emit both block-height and unix-second absolute locktimes, but this message only exposes a bare uint64. Clients cannot tell which unit a value like 800000 uses. Split the field or add a locktime-unit enum/oneof.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api-spec/protobuf/fulmine/v1/service.proto` around lines 562 - 565,
HtlcRefundableEvent’s refund_locktime is ambiguous; update the proto so clients
can tell units by replacing refund_locktime with a self-describing alternative
(e.g. a oneof or separate fields plus an enum) — for example add a LocktimeUnit
enum and either (a) change refund_locktime into two fields like
refund_locktime_blocks and refund_locktime_seconds (only one set) or (b)
introduce oneof { uint64 refund_locktime_blocks; uint64 refund_locktime_seconds;
} or add enum LocktimeUnit { BLOCK_HEIGHT, UNIX_SECONDS } alongside the existing
field and include the unit; adjust any serializers/deserializers and usages in
scheduleSwapRefund and scheduleChainSwapRefund to populate the new fields
accordingly.
internal/interface/grpc/handlers/event_handler.go (2)

206-208: ⚠️ Potential issue | 🟠 Major

Preserve the notification timestamp instead of stamping relay time here.

Line 208 uses time.Now().Unix(), while HTLC and chain-swap events forward their source timestamps. That makes cross-event ordering inconsistent whenever notification delivery is delayed. Add a timestamp to application.Notification at emission time and use it here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/interface/grpc/handlers/event_handler.go` around lines 206 - 208,
The handler currently stamps relay time in toTxAssociatedResponse (function
toTxAssociatedResponse) via time.Now().Unix(); instead, add a timestamp field to
application.Notification and set that field at emission time in the code paths
that create HTLC and chain-swap notifications, then change
toTxAssociatedResponse to use n.Timestamp (or n.GetTimestamp()) so the original
event time is preserved for ordering; also handle a zero/fallback value if
needed (e.g., fall back to time.Now().Unix() only when n.Timestamp is unset).

19-21: ⚠️ Potential issue | 🟠 Major

This fan-out path still leaks goroutines under backpressure.

Line 21 creates an unbuffered listener channel, and Lines 55-57 / 129-131 still dispatch via blocking goroutines. A slow or disconnected stream leaves one blocked sender goroutine per event behind. Use a small bounded listener buffer and a non-blocking send/drop-or-evict policy instead.

Also applies to: 53-58, 127-132

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/interface/grpc/handlers/event_handler.go` around lines 19 - 21, The
current fan-out leaks one blocked sender goroutine per slow client because
listener.ch is unbuffered and dispatch spawns blocking goroutines; change the
listener creation in listener[*pb.GetEventStreamResponse] to use a small bounded
buffer (e.g., make(chan *pb.GetEventStreamResponse, 4)) and remove the blocking
goroutine sends (the sites that spawn goroutines to send into listener.ch).
Replace those sends with a non-blocking send using select: try to send into
listener.ch, and on default either drop the new event or evict the oldest queued
event (read one from the channel to make room then send), so no sender goroutine
can block indefinitely.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/core/application/service.go`:
- Around line 1274-1285: The code dereferences swapDetails.Opts before checking
it; change the logic to first guard that swapDetails.Opts != nil and only then
call domain.NewVhtlc(*swapDetails.Opts) and use the returned vHTLC when emitting
events; update the same pattern at the other occurrences where NewVhtlc and
vHTLC are created/used (the blocks using domain.NewVhtlc, vHTLC.Id and
s.emitHtlcEvent/HtlcEvent) so no dereference happens unless Opts is non-nil, and
ensure the alternative path returns or handles the missing opts appropriately.
- Around line 1016-1021: The HtlcEvent emitted by emitHtlcEvent for
HtlcEventCreated omits Amount (so proto3 shows 0); update the creation flow to
populate Amount: either retrieve the amount from the created VHTLC object (e.g.,
via CreateVHTLC's returned vhtlc or vhtlc.Amount) and set HtlcEvent.Amount when
constructing HtlcEvent, or if the service lacks an amount source, change the
public proto/contract to make amount optional/nullable (or remove it) and
regenerate bindings; modify CreateVHTLC (or the caller that has the amount) to
accept/return the amount and ensure emitHtlcEvent(HtlcEvent{... Amount: amount,
...}) is used.

---

Duplicate comments:
In `@api-spec/protobuf/fulmine/v1/service.proto`:
- Around line 562-565: HtlcRefundableEvent’s refund_locktime is ambiguous;
update the proto so clients can tell units by replacing refund_locktime with a
self-describing alternative (e.g. a oneof or separate fields plus an enum) — for
example add a LocktimeUnit enum and either (a) change refund_locktime into two
fields like refund_locktime_blocks and refund_locktime_seconds (only one set) or
(b) introduce oneof { uint64 refund_locktime_blocks; uint64
refund_locktime_seconds; } or add enum LocktimeUnit { BLOCK_HEIGHT, UNIX_SECONDS
} alongside the existing field and include the unit; adjust any
serializers/deserializers and usages in scheduleSwapRefund and
scheduleChainSwapRefund to populate the new fields accordingly.

In `@internal/core/application/service.go`:
- Around line 2413-2418: The code currently emits an HtlcEvent with Type
HtlcEventRefundable (via s.emitHtlcEvent) before verifying
GetVHTLCFunds()/Spent, which can advertise refunds for already-spent HTLCs; move
the emitHtlcEvent call so it runs only after the Spent guard succeeds and just
before calling RefundSwap(...). Update both occurrences (the block that creates
HtlcEvent with VhtlcId/vhtlcId and RefundLocktime uint64(opts.RefundLocktime))
to emit after the GetVHTLCFunds()/Spent check and immediately prior to invoking
RefundSwap to ensure refundable events reflect the verified state.

In `@internal/interface/grpc/handlers/event_handler.go`:
- Around line 206-208: The handler currently stamps relay time in
toTxAssociatedResponse (function toTxAssociatedResponse) via time.Now().Unix();
instead, add a timestamp field to application.Notification and set that field at
emission time in the code paths that create HTLC and chain-swap notifications,
then change toTxAssociatedResponse to use n.Timestamp (or n.GetTimestamp()) so
the original event time is preserved for ordering; also handle a zero/fallback
value if needed (e.g., fall back to time.Now().Unix() only when n.Timestamp is
unset).
- Around line 19-21: The current fan-out leaks one blocked sender goroutine per
slow client because listener.ch is unbuffered and dispatch spawns blocking
goroutines; change the listener creation in listener[*pb.GetEventStreamResponse]
to use a small bounded buffer (e.g., make(chan *pb.GetEventStreamResponse, 4))
and remove the blocking goroutine sends (the sites that spawn goroutines to send
into listener.ch). Replace those sends with a non-blocking send using select:
try to send into listener.ch, and on default either drop the new event or evict
the oldest queued event (read one from the channel to make room then send), so
no sender goroutine can block indefinitely.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5395a1cb-c969-47fc-9334-7583376795f5

📥 Commits

Reviewing files that changed from the base of the PR and between 7e65c42 and 8262f88.

⛔ Files ignored due to path filters (1)
  • api-spec/protobuf/gen/go/fulmine/v1/service.pb.go is excluded by !**/*.pb.go, !**/gen/**
📒 Files selected for processing (10)
  • api-spec/openapi/swagger/fulmine/v1/service.swagger.json
  • api-spec/protobuf/fulmine/v1/service.proto
  • internal/core/application/chainswap_events.go
  • internal/core/application/chainswap_events_test.go
  • internal/core/application/service.go
  • internal/interface/grpc/handlers/event_handler.go
  • internal/interface/grpc/handlers/event_handler_test.go
  • internal/interface/grpc/handlers/service_handler.go
  • internal/test/e2e/event_stream_test.go
  • test.docker-compose.yml

Comment on lines +1016 to +1021
s.emitHtlcEvent(HtlcEvent{
Type: HtlcEventCreated,
Timestamp: time.Now().Unix(),
VhtlcId: vhtlcId,
Address: encodedAddr,
})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

htlc_created.amount is never populated.

Line 1016 emits HtlcEventCreated without Amount, and CreateVHTLC does not have an amount input to supply here. Because this is a proto3 scalar, clients will see amount=0 rather than “unknown” for real htlc_created events. Either populate it from the creation source or remove/make-optional the field in the public contract.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/core/application/service.go` around lines 1016 - 1021, The HtlcEvent
emitted by emitHtlcEvent for HtlcEventCreated omits Amount (so proto3 shows 0);
update the creation flow to populate Amount: either retrieve the amount from the
created VHTLC object (e.g., via CreateVHTLC's returned vhtlc or vhtlc.Amount)
and set HtlcEvent.Amount when constructing HtlcEvent, or if the service lacks an
amount source, change the public proto/contract to make amount optional/nullable
(or remove it) and regenerate bindings; modify CreateVHTLC (or the caller that
has the amount) to accept/return the amount and ensure
emitHtlcEvent(HtlcEvent{... Amount: amount, ...}) is used.

Comment on lines 1274 to +1285
swapStatus := domain.SwapStatus(swapDetails.Status)
vHTLC := domain.NewVhtlc(*swapDetails.Opts)

if swapDetails.TxId != "" && swapDetails.Opts != nil {
s.emitHtlcEvent(HtlcEvent{
Type: HtlcEventFunded,
Timestamp: time.Now().Unix(),
VhtlcId: vHTLC.Id,
TxId: swapDetails.TxId,
Amount: swapDetails.Amount,
})
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Guard swapDetails.Opts before dereferencing it.

Line 1275 dereferences swapDetails.Opts before the nil guard on Line 1277, and the same pattern repeats on Lines 1437 and 1511 with no guard at all. If the swap handler ever returns a response without opts, these RPCs panic instead of returning an error.

Also applies to: 1436-1447, 1510-1521

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/core/application/service.go` around lines 1274 - 1285, The code
dereferences swapDetails.Opts before checking it; change the logic to first
guard that swapDetails.Opts != nil and only then call
domain.NewVhtlc(*swapDetails.Opts) and use the returned vHTLC when emitting
events; update the same pattern at the other occurrences where NewVhtlc and
vHTLC are created/used (the blocks using domain.NewVhtlc, vHTLC.Id and
s.emitHtlcEvent/HtlcEvent) so no dereference happens unless Opts is non-nil, and
ensure the alternative path returns or handles the missing opts appropriately.

@arkanaai
Copy link
Copy Markdown
Contributor

arkanaai bot commented Mar 27, 2026

📝 This is marked WIP and has been open since Mar 9. Is it still in progress? If ready, remove [WIP] and request review.

@arkanaai
Copy link
Copy Markdown
Contributor

arkanaai bot commented Mar 27, 2026

Review

[WIP] HTLC event stream — server-side streaming ✓

Scope: gRPC + REST streaming for wallet/VHTLC lifecycle events

Key observations:

  1. Event types — 5 core types (tx_associated, htlc_created, htlc_funded, htlc_spent, htlc_refundable) cover essential state transitions. Ensures clients can detect funding, settlement, and refund readiness.

  2. Buffered channel pattern — 1024-buffer with non-blocking sends prevents goroutine leaks if a consumer falls behind. Follows ChainSwapEventCallback pattern correctly.

  3. Fan-out via listenerHandler[T] — Concurrent stream consumers supported; serialization of events is properly ordered per listener.

  4. REST gateway — grpc-gateway chunked JSON streaming is standard; ensure timeouts and connection pooling are configured for long-lived connections.

Minor items:

  • Are events guaranteed in order? (critical for detecting double-spends or replay)
  • Add heartbeat event or keep-alive ping for long-idle streams (HTTP proxies may kill idle connections)
  • WIP tag suggests incomplete — expected completion timeline?

Pending full code review due to WIP status.

@arkanaai
Copy link
Copy Markdown
Contributor

arkanaai bot commented Mar 27, 2026

Review: [WIP] Introduce HTLC event stream

Summary

Server-side event streaming for wallet and VHTLC lifecycle events. Comprehensive proto-first design with 5 event types, fan-out listener pattern, and REST streaming via grpc-gateway. Strong coverage with 12 unit tests + 1075-line E2E test suite.

✅ Strengths

  • Event taxonomy is well-thought-out: tx_associated, htlc_created, htlc_funded, htlc_spent, htlc_refundable cover the full VHTLC lifecycle. Clear semantics with SpendType (claimed/refunded) and RefundKind distinctions.
  • Non-blocking emission pattern: Buffered channel (1024) with non-blocking sends + drop-on-full is correct for high-volume event sources. Prevents event loops from blocking application flow.
  • Fan-out listener architecture: listenerHandler[T] pattern for concurrent consumers is sound. Multiple streams can subscribe to same event source without interference.
  • Proto-first design: gRPC + REST (grpc-gateway) parity. OpenAPI spec generated correctly.
  • Test breadth: 12 unit tests covering channel behavior, all event types, listener lifecycle. 1075-line E2E test is serious validation.

🔍 Critical Review Points

  1. Event ordering guarantee: The PR doesn't explicitly document if events are delivered in order across multiple event types. With concurrent emission points (e.g., htlc_created in one goroutine, htlc_funded in another), could a listener see them out-of-order?

    • If ordering matters (e.g., for state reconstruction), consider a monotonic event ID or single emission goroutine.
  2. Drop-on-full behavior: Buffered channel drops events silently when full (capacity 1024). This is documented in tests, but what's the failure mode?

    • What if a listener relies on event completeness (e.g., for audit logs)? A dropped htlc_spent event could be silently lost.
    • Should there be metrics/logging when events are dropped?
  3. Timestamp semantics: Each event has a timestamp. Is this time.Now() at emission, or at proto marshaling? Matters for precise ordering and causality.

    • If events are emitted concurrently across multiple goroutines, timestamps might not reflect true causality.
  4. gRPC flow control: With long-running streams and 1024 buffer, what happens if a slow consumer can't keep up?

    • gRPC has backpressure mechanisms. Are they properly configured? Or will the channel fill and start dropping?
  5. Refund scheduling: htlc_refundable is emitted in scheduleSwapRefund() and scheduleChainSwapRefund() before the unilateral attempt. Why pre-emit instead of post-settle?

    • If the refund attempt fails (e.g., fee spike), a listener might have acted on a refundable event that never completed.
  6. State consistency: Is there a guarantee that if a listener sees htlc_funded, all prior events (htlc_created) have been delivered?

    • This is important for SDKs rebuilding state from the stream.

🎯 Code Quality Notes

  • Excellent test structure (both unit and E2E).
  • Proto definitions are clean and extensible.
  • Application layer event types are well-separated from gRPC/proto concerns.

⚠️ Recommendations

  1. Document event ordering semantics in the service.proto comment. Is it guaranteed? If not, SDKs need to handle reordering.
  2. Add metrics for dropped events (when channel is full). At minimum, log a warning.
  3. Clarify timestamp meaning: Is it emission time or proto time? Add a comment.
  4. Test concurrent emission: Current E2E suite should have a test that emits events from multiple goroutines and verifies ordering guarantees (or documents why they don't exist).
  5. Consider backpressure strategy: If slow consumers are expected, document how gRPC flow control + buffered channel interact.

Status

  • Not blocking, but these clarifications should be addressed before /main merge.
  • The foundation is solid; these are refinements for production confidence.
  • Tag this for discussion with <@sekulicd> on ordering and drop semantics.

Recommendation: Address documentation + testing questions, then good to merge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Introduce htlc event stream

1 participant