Conversation
Introduces a new gRPC server-side streaming RPC that emits real-time events for VHTLC and VTXO lifecycle changes: - VHTLC_CREATED: New VHTLC address generated - VHTLC_FUNDED: Funds received at VHTLC address - VHTLC_CLAIMED: VHTLC claimed with preimage (includes preimage) - VHTLC_REFUNDED: VHTLC refunded after timeout - VTXO_RECEIVED: New VTXOs at subscribed addresses - VTXO_SPENT: VTXOs spent from subscribed addresses Event structure uses oneofs for clean separation: - VhtlcEventData: id, txid, preimage (for claims) - VtxoEventData: full Vtxo objects, txid Includes unit tests for event conversion and listener handling. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
📝 WalkthroughWalkthroughThis PR adds VHTLC event streaming capability by introducing a new event system with EventType enum and VhtlcEvent types, updating the application service to emit lifecycle events, implementing a new GetVhtlcEvents gRPC endpoint, adding database migrations to track VHTLC locking scripts, and bumping the Go toolchain to 1.25.7 across the codebase. Changes
Sequence Diagram(s)sequenceDiagram
actor Client
participant gRPC Handler
participant App Service
participant Event Channel
participant Database
Client->>gRPC Handler: GetVhtlcEvents()
gRPC Handler->>gRPC Handler: Create listener<br/>(vhtlcEventsListenerHandler)
gRPC Handler->>App Service: Background listenToEvents()<br/>subscribes to service.GetEvents()
Note over Client,Database: VHTLC Lifecycle Operations
Client->>gRPC Handler: CreateVHTLC request
gRPC Handler->>App Service: GetSwapVHTLC()
App Service->>Event Channel: emitEvent(VhtlcCreated)
Event Channel->>gRPC Handler: listenToEvents receives event
gRPC Handler->>gRPC Handler: Broadcast to all listeners
gRPC Handler->>Client: Stream VhtlcEvent(CREATED)
App Service->>Database: Query on-chain scripts
Database-->>App Service: Script matches
App Service->>Event Channel: emitEvent(VhtlcFunded)
Event Channel->>gRPC Handler: listenToEvents receives event
gRPC Handler->>Client: Stream VhtlcEvent(FUNDED)
Client->>gRPC Handler: ClaimVHTLC request
App Service->>Event Channel: emitEvent(VhtlcClaimed, txid, preimage)
Event Channel->>gRPC Handler: listenToEvents receives event
gRPC Handler->>Client: Stream VhtlcEvent(CLAIMED)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Tip Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/core/application/service.go (1)
908-932:⚠️ Potential issue | 🟡 MinorVHTLC DB persistence and event emission run in a detached goroutine — caller receives success before the VHTLC is actually persisted.
GetSwapVHTLCreturns(encodedAddr, vhtlcId, vHTLCScript, nil)at line 934 while the DB write and event emission happen asynchronously in the goroutine (lines 908–932). If the DB write fails (line 910), the caller has already received a "success" response with a vhtlcId that doesn't exist in the database.Additionally, the vhtlc subscription at line 920 uses
context.Background()which is fine for a fire-and-forget goroutine, but the error handling via nestedif/else if/elseis a bit fragile — a subscription failure after a successful DB write means the VHTLC won't receiveFundedevents.This appears to be a pre-existing pattern (the goroutine was likely there before this PR), so flagging as a minor concern.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/core/application/service.go` around lines 908 - 932, GetSwapVHTLC currently returns success before the background goroutine runs s.dbSvc.VHTLC().Add and subscription, which can lead to returning a vhtlcId that was never persisted; change the flow so the DB insert (s.dbSvc.VHTLC().Add) and the subscription (s.vhtlcSubscription.subscribe) run synchronously using the caller context (instead of a detached goroutine and context.Background()), return any error from Add or subscribe to the caller, and only call s.emitEvent after both operations succeed; if subscription fails after a successful Add consider either rolling back the DB insert or clearly document/handle the partial state (e.g., attempt cleanup via s.dbSvc.VHTLC().Delete or update status) so callers only get success when persistence and subscription are confirmed.
🧹 Nitpick comments (11)
internal/test/e2e/vhtlc_test.go (1)
170-207: Goroutine-per-iteration pattern works but leaks on timeout.Each loop iteration spawns a goroutine for
stream.Recv(). This is sequentially correct since each goroutine completes before the next iteration (theselectconsumes fromeventCh/errChfirst). However, on timeout the blockedRecv()goroutine leaks. For test code this is acceptable, but if you want to tighten it up, you could use a single long-lived reader goroutine feeding a shared channel outside the loop.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/test/e2e/vhtlc_test.go` around lines 170 - 207, The helper waitForVhtlcEvent spawns a new goroutine each loop that calls stream.Recv(), which can leak if the context times out because the blocked Recv() remains; change the approach to start one long-lived reader goroutine outside the for loop that repeatedly calls stream.Recv() and forwards results/errors into shared channels (e.g., eventCh and errCh), and then have the loop select on those channels and ctx.Done(); ensure the reader goroutine exits when ctx is canceled (listen on ctx or close the stream) so no Recv() goroutines are left hanging.internal/test/e2e/utils_test.go (1)
26-42: Consider extracting the shared gRPC connection setup.Both
newFulmineClientandnewNotificationClientduplicate the connection setup. A small helper likedialInsecure(url) (*grpc.ClientConn, error)would reduce duplication and make it easier to add connection cleanup (theconnis currently leaked in both helpers).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/test/e2e/utils_test.go` around lines 26 - 42, Extract the duplicated insecure gRPC connection setup into a helper, e.g. dialInsecure(url string) (*grpc.ClientConn, error), that calls grpc.WithTransportCredentials(insecure.NewCredentials()) and grpc.Dial (or grpc.DialContext) to return the *grpc.ClientConn; then update newFulmineClient and newNotificationClient to call dialInsecure, create their pb.NewServiceClient(conn) / pb.NewNotificationServiceClient(conn) from the returned conn, and ensure the connection is not leaked (either return the conn alongside the client or add a Close method/ensure callers defer conn.Close()). Reference: newFulmineClient, newNotificationClient, and the new dialInsecure helper.internal/infrastructure/db/badger/vhtlc_repo.go (2)
95-121: Silently skipping locking-script derivation errors hides data problems.Lines 113-114 silently
continuewhengetVhtlcLockingScriptfails. This means corrupted or invalid VHTLC records are quietly excluded from results with no log trail. The same pattern appears inGetScripts(lines 133-134). Consider logging a warning so operators can detect data issues.Suggested fix
+ log "github.com/sirupsen/logrus" ... for _, v := range vhtlcs { lockingScript, err := getVhtlcLockingScript(v) if err != nil { + log.WithError(err).Warnf("failed to derive locking script for vhtlc %s", v.Id) continue }Apply the same pattern in
GetScripts.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/infrastructure/db/badger/vhtlc_repo.go` around lines 95 - 121, In GetByScripts (and likewise in GetScripts) avoid silently dropping records when getVhtlcLockingScript returns an error: replace the bare "continue" with a warning log that includes the VHTLC identifier (e.g. v.ID or other unique field), the error returned by getVhtlcLockingScript, and context (function name and the problematic lockingScript derivation), so operators can detect corrupted/invalid VHTLCs; use the repository's logger (e.g. r.logger or existing logging facility) to emit the warning and then continue to the next entry.
95-96: Add a doc comment onGetByScripts.
GetScripts(line 123) has a doc comment butGetByScriptsdoes not. As per coding guidelines, exported functions should be documented with comments in Go code.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/infrastructure/db/badger/vhtlc_repo.go` around lines 95 - 96, Add a Go doc comment for the exported method GetByScripts on type vhtlcRepository describing its purpose, inputs and return values similar to the existing GetScripts comment; place the comment immediately above the GetByScripts function declaration and mention that it retrieves Vhtlc records matching the provided script slice and returns a slice of domain.Vhtlc and an error.internal/infrastructure/db/migrations.go (2)
26-30: Migrations run in slice order, not necessarily chronological order.The doc comment states "Migrations are applied in slice order (chronological by version)" but there's no validation that versions are in ascending order. If someone accidentally registers migrations out of order, they'll silently run in the wrong order. Consider adding a monotonicity check during the validation loop.
Suggested addition in the validation loop (after line 49)
seenVersions := make(map[string]struct{}, len(migrations)) + var prevVersion int64 for _, m := range migrations { if m.Version == "" { return fmt.Errorf("go migration has empty version") } - if _, err := strconv.ParseInt(m.Version, 10, 64); err != nil { + v, err := strconv.ParseInt(m.Version, 10, 64) + if err != nil { return fmt.Errorf("go migration %q has invalid version format: %w", m.Version, err) } + if v <= prevVersion { + return fmt.Errorf("go migration %s is not in ascending order (prev: %d)", m.Version, prevVersion) + } + prevVersion = v if _, exists := seenVersions[m.Version]; exists { return fmt.Errorf("duplicate go migration version %s", m.Version) } seenVersions[m.Version] = struct{}{} }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/infrastructure/db/migrations.go` around lines 26 - 30, The comment claims migrations run chronologically but ApplyGoMigrations currently applies them in slice order without checking monotonicity; update the validation loop inside ApplyGoMigrations to enforce that each GoMigration's Version (field Version on type GoMigration) is strictly increasing compared to the previous entry in the migrations slice (e.g., keep a prevVersion variable and for each migration check prevVersion < m.Version), and if not return an error (using fmt.Errorf or your project's error pattern) that identifies the offending migration/version; also run or add tests to cover out-of-order registration.
61-64:SELECT version FROM schema_migrations LIMIT 1may behave unexpectedly if table is empty.If
schema_migrationsis empty (e.g., a code path where Go migrations are called before SQL migrations), this returnssql.ErrNoRows. The error message "read schema migration version" is a bit opaque. This may be intentional (SQL migrations must run first), but a more descriptive error for this specific case would improve debuggability.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/infrastructure/db/migrations.go` around lines 61 - 64, The current read of schemaVersion using db.QueryRowContext can return sql.ErrNoRows when the schema_migrations table is empty; update the block that calls db.QueryRowContext/Scan (the schemaVersion variable read) to explicitly handle sql.ErrNoRows and return a clearer error message (e.g., "schema_migrations table empty: SQL migrations not applied" or similar) instead of the opaque "read schema migration version" message, and ensure database/sql is imported to reference sql.ErrNoRows.internal/infrastructure/db/sqlite/vhtlc_migration.go (1)
450-458: Rows that fail script derivation permanently stay at'[]'and are re-processed every startup.When
LockingScriptHexFromOptsfails (line 451), the script is set back to"[]"(line 453). TheWHERE script = '[]'clause at line 354 will pick it up again on every startup. If the data genuinely can't produce a valid script, this creates a recurring (albeit harmless) warning on each start.Consider using a distinct sentinel (e.g.,
"error") for permanently un-derivable rows so they are excluded from future backfill attempts, or log at a higher level so operators can clean up the underlying data.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/infrastructure/db/sqlite/vhtlc_migration.go` around lines 450 - 458, When LockingScriptHexFromOpts fails you currently write back the sentinel "[]" which keeps the row selected by the existing WHERE script = '[]' backfill query; change the failure-handling path (the vhtlc.LockingScriptHexFromOpts error branch where stmt.ExecContext(ctx, "[]", id) is called) to write a distinct permanent sentinel such as "error" instead of "[]", and update the selection logic that uses WHERE script = '[]' so it only targets truly unprocessed rows (or explicitly exclude rows with script = 'error') so permanently un-derivable rows are not reprocessed on every startup. Ensure you update any associated tests/log messages to reflect the new sentinel.internal/interface/grpc/handlers/notification_handler.go (1)
98-100: Add a doc comment to the exported-via-interfaceGetVhtlcEventsmethod.As per coding guidelines, exported functions should be documented. A brief comment describing the streaming behavior would be helpful.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/interface/grpc/handlers/notification_handler.go` around lines 98 - 100, Add a doc comment immediately above the GetVhtlcEvents method on the notificationHandler type describing its behavior: explain that GetVhtlcEvents is a server-side gRPC streaming handler that continuously sends VHTLC events to the provided pb.NotificationService_GetVhtlcEventsServer stream, note what kinds of events are streamed and any concurrency/close/context semantics (e.g., it returns when the stream is closed or an error occurs), and document the parameters and error return briefly so readers know the method’s contract.internal/infrastructure/db/service_vhtlc_migration_test.go (1)
211-251:seedVhtlcRowWithoutScriptduplicatesinsertNewSchemaRowWithoutScript.This helper is nearly identical to
insertNewSchemaRowWithoutScriptininternal/infrastructure/db/sqlite/vhtlc_migration_test.go. Since they're in different test packages (db_testvssqlitedb_test), sharing directly isn't trivial, but consider extracting a sharedtestutilpackage if more helpers accumulate.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/infrastructure/db/service_vhtlc_migration_test.go` around lines 211 - 251, The test helper seedVhtlcRowWithoutScript duplicates insertNewSchemaRowWithoutScript; extract the shared logic into a new test helper in a test-only package (e.g., internal/testutil or internal/testhelpers) and have both test packages call that helper to avoid duplication. Move the common code that builds preimage, keys, id and executes the INSERT into a function (e.g., CreateVhtlcRow/dbSeedVhtlc) exposed from the new testutil package, update imports in both test files to use that helper, and keep package-specific test assertions local to each test package.internal/infrastructure/db/sqlite/vhtlc_repo.go (2)
119-125: Sentinel value"[]"is an unusual choice for "no script".Lines 121 and 219 use
"[]"(a JSON-looking string) as a sentinel for missing/failed script derivation. An empty string""is the natural zero value for Go strings and is already checked alongside"[]"inGetScripts. Using only""(and ensuringtoVhtlcRowsetsScript = ""on failure, or better, returns an error) would simplify the filtering logic and avoid confusion with actual data.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/infrastructure/db/sqlite/vhtlc_repo.go` around lines 119 - 125, The code uses the string literal "[]" as a sentinel for missing/failed script derivation which is confusing; instead make the absence of a script the empty string and remove the "[]" check. Update toVhtlcRow to either return an error on failed script derivation or explicitly set row.Script = "" on failure, and then change the filtering in GetScripts (the loop building allScripts over vhtlcs / variable allScripts) to only skip when vhtlc.Script == "" (remove the vhtlc.Script == "[]" condition). Ensure callers handle an error return from toVhtlcRow if you choose the error route.
83-111:GetByScriptsloads all rows and filters in-memory instead of using a SQLWHERE script IN (...)clause.For a wallet-side app the dataset is likely small, but this still means every call to
GetByScriptsperforms a full table scan and deserializes all VHTLCs. If the table grows, this becomes wasteful. Consider adding a sqlc query with aWHERE script IN (...)clause (sqlc supportssqlc.slice(...)for this pattern) to push filtering to the database.The same applies to
GetScripts(lines 113–128), which also callsListVHTLCjust to collect non-empty scripts.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/infrastructure/db/sqlite/vhtlc_repo.go` around lines 83 - 111, GetByScripts currently calls ListVHTLC and filters results in-memory causing full table scans; add a new sqlc query (e.g. ListVHTLCByScripts) that uses WHERE script IN (sqlc.slice(?)) to push filtering into SQL, then update GetByScripts to call the new querier method (preserve the early return for empty scripts), iterate only returned rows and convert them with toVhtlc; do the same refactor for GetScripts (replace ListVHTLC usage with a targeted query that returns only non-empty scripts) so the DB performs the filtering and we avoid deserializing the entire table.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/core/application/service.go`:
- Line 101: The unbuffered events channel (events chan VhtlcEvent) combined with
emitEvent spawning a goroutine that does s.events <- event leaks goroutines when
no consumer is reading; change the channel allocation from make(chan VhtlcEvent)
to a buffered channel (e.g., make(chan VhtlcEvent, <reasonable_capacity>)) where
the channel is created (constructor/new service init), remove the
fire-and-forget goroutine from emitEvent and replace the send with a
non-blocking select send (case s.events <- event: ... default: log a
warning/drop the event using the service logger like s.logger.Warnf or
processLogger.Warnf) so events are queued while consumers exist and dropped
safely when the buffer is full.
- Around line 1755-1780: The DB error in handleVhtlcScriptEvent is logged at
Debug which can hide real failures; update the logging call that handles the
error from s.dbSvc.VHTLC().GetByScripts (currently using
log.WithError(err).Debug("failed to get vhtlcs for scripts")) to use a higher
severity (log.WithError(err).Warn or log.WithError(err).Error — prefer Error) so
database query failures are visible to operators; keep the same message and
error attachment otherwise.
In `@internal/infrastructure/db/sqlite/vhtlc_migration_test.go`:
- Around line 195-198: Update the doc comment for
insertNewSchemaRowWithoutScript to match the function name and behavior: change
the function name in the comment to insertNewSchemaRowWithoutScript and clarify
that although the table has a script column, this function deliberately omits
the script value from the INSERT so the column uses its default ('[]'), rather
than suggesting it inserts a script; reference the function
insertNewSchemaRowWithoutScript and the behavior of using the default script
value in the comment.
In `@internal/infrastructure/db/sqlite/vhtlc_migration.go`:
- Around line 362-370: The defer currently conditionally rollbacks based on the
outer err variable, which can be shadowed by inner := errors and cause a
transaction leak; modify the defer associated with tx from db.BeginTx so it
always calls tx.Rollback() (ignoring the rollback error), allowing Commit to
succeed later (Rollback after Commit is a no-op). Locate the tx variable and the
existing defer func() { if err != nil { _ = tx.Rollback() } } and replace it
with an unconditional _ = tx.Rollback() inside the defer or alternatively use a
named return error variable pattern so inner := usages don't shadow err.
In `@internal/infrastructure/db/sqlite/vhtlc_repo.go`:
- Around line 195-221: toVhtlcRow currently swallows errors from
LockingScriptHexFromOpts and returns a bogus Script "[]", causing corrupted DB
rows and no programmatic error; change toVhtlcRow to return
(queries.InsertVHTLCParams, error) and return the actual error when
LockingScriptHexFromOpts fails instead of constructing a row, then update its
caller Add to handle that error (propagate or abort insert) so no row is
persisted with an invalid Script and GetByScripts will work correctly; ensure
all other callers of toVhtlcRow are updated to handle the new error return.
In `@internal/interface/grpc/handlers/notification_handler.go`:
- Around line 98-123: The GetVhtlcEvents handler currently calls
close(listener.ch) when stream.Context().Done() which can race with stop()
(which also closes listener channels) and cause a double-close panic; update the
code in GetVhtlcEvents to stop explicitly closing listener.ch — simply remove
the close(listener.ch) call and rely on
vhtlcEventsListenerHandler.removeListener(listener.id) and the global stop() to
close channels, or alternately wrap channel close logic in a single-close guard
(e.g., a sync.Once) on the listener struct so close(listener.ch) is executed
exactly once; pay attention to the listener type, its ch field, the
GetVhtlcEvents method, and the stop() that closes all listener channels when
applying the fix.
- Around line 161-177: listenToEvents currently reads
h.vhtlcEventsListenerHandler.listeners without taking the
vhtlcEventsListenerHandler.lock and launches goroutines that may send on a
channel closed by GetVhtlcEvents, causing a race and possible panic; fix by
acquiring the lock in listenToEvents just long enough to copy the listeners
slice (h.vhtlcEventsListenerHandler.lock and
h.vhtlcEventsListenerHandler.listeners), then release the lock and iterate the
copied slice, and for each listener launch a goroutine that wraps the send to
l.ch with a defer-recover (or otherwise guards against send-on-closed-channel)
so a concurrent channel close from GetVhtlcEvents won't panic; keep
pushListener/removeListener unchanged but reference them as the mutating
callers.
---
Outside diff comments:
In `@internal/core/application/service.go`:
- Around line 908-932: GetSwapVHTLC currently returns success before the
background goroutine runs s.dbSvc.VHTLC().Add and subscription, which can lead
to returning a vhtlcId that was never persisted; change the flow so the DB
insert (s.dbSvc.VHTLC().Add) and the subscription
(s.vhtlcSubscription.subscribe) run synchronously using the caller context
(instead of a detached goroutine and context.Background()), return any error
from Add or subscribe to the caller, and only call s.emitEvent after both
operations succeed; if subscription fails after a successful Add consider either
rolling back the DB insert or clearly document/handle the partial state (e.g.,
attempt cleanup via s.dbSvc.VHTLC().Delete or update status) so callers only get
success when persistence and subscription are confirmed.
---
Nitpick comments:
In `@internal/infrastructure/db/badger/vhtlc_repo.go`:
- Around line 95-121: In GetByScripts (and likewise in GetScripts) avoid
silently dropping records when getVhtlcLockingScript returns an error: replace
the bare "continue" with a warning log that includes the VHTLC identifier (e.g.
v.ID or other unique field), the error returned by getVhtlcLockingScript, and
context (function name and the problematic lockingScript derivation), so
operators can detect corrupted/invalid VHTLCs; use the repository's logger (e.g.
r.logger or existing logging facility) to emit the warning and then continue to
the next entry.
- Around line 95-96: Add a Go doc comment for the exported method GetByScripts
on type vhtlcRepository describing its purpose, inputs and return values similar
to the existing GetScripts comment; place the comment immediately above the
GetByScripts function declaration and mention that it retrieves Vhtlc records
matching the provided script slice and returns a slice of domain.Vhtlc and an
error.
In `@internal/infrastructure/db/migrations.go`:
- Around line 26-30: The comment claims migrations run chronologically but
ApplyGoMigrations currently applies them in slice order without checking
monotonicity; update the validation loop inside ApplyGoMigrations to enforce
that each GoMigration's Version (field Version on type GoMigration) is strictly
increasing compared to the previous entry in the migrations slice (e.g., keep a
prevVersion variable and for each migration check prevVersion < m.Version), and
if not return an error (using fmt.Errorf or your project's error pattern) that
identifies the offending migration/version; also run or add tests to cover
out-of-order registration.
- Around line 61-64: The current read of schemaVersion using db.QueryRowContext
can return sql.ErrNoRows when the schema_migrations table is empty; update the
block that calls db.QueryRowContext/Scan (the schemaVersion variable read) to
explicitly handle sql.ErrNoRows and return a clearer error message (e.g.,
"schema_migrations table empty: SQL migrations not applied" or similar) instead
of the opaque "read schema migration version" message, and ensure database/sql
is imported to reference sql.ErrNoRows.
In `@internal/infrastructure/db/service_vhtlc_migration_test.go`:
- Around line 211-251: The test helper seedVhtlcRowWithoutScript duplicates
insertNewSchemaRowWithoutScript; extract the shared logic into a new test helper
in a test-only package (e.g., internal/testutil or internal/testhelpers) and
have both test packages call that helper to avoid duplication. Move the common
code that builds preimage, keys, id and executes the INSERT into a function
(e.g., CreateVhtlcRow/dbSeedVhtlc) exposed from the new testutil package, update
imports in both test files to use that helper, and keep package-specific test
assertions local to each test package.
In `@internal/infrastructure/db/sqlite/vhtlc_migration.go`:
- Around line 450-458: When LockingScriptHexFromOpts fails you currently write
back the sentinel "[]" which keeps the row selected by the existing WHERE script
= '[]' backfill query; change the failure-handling path (the
vhtlc.LockingScriptHexFromOpts error branch where stmt.ExecContext(ctx, "[]",
id) is called) to write a distinct permanent sentinel such as "error" instead of
"[]", and update the selection logic that uses WHERE script = '[]' so it only
targets truly unprocessed rows (or explicitly exclude rows with script =
'error') so permanently un-derivable rows are not reprocessed on every startup.
Ensure you update any associated tests/log messages to reflect the new sentinel.
In `@internal/infrastructure/db/sqlite/vhtlc_repo.go`:
- Around line 119-125: The code uses the string literal "[]" as a sentinel for
missing/failed script derivation which is confusing; instead make the absence of
a script the empty string and remove the "[]" check. Update toVhtlcRow to either
return an error on failed script derivation or explicitly set row.Script = "" on
failure, and then change the filtering in GetScripts (the loop building
allScripts over vhtlcs / variable allScripts) to only skip when vhtlc.Script ==
"" (remove the vhtlc.Script == "[]" condition). Ensure callers handle an error
return from toVhtlcRow if you choose the error route.
- Around line 83-111: GetByScripts currently calls ListVHTLC and filters results
in-memory causing full table scans; add a new sqlc query (e.g.
ListVHTLCByScripts) that uses WHERE script IN (sqlc.slice(?)) to push filtering
into SQL, then update GetByScripts to call the new querier method (preserve the
early return for empty scripts), iterate only returned rows and convert them
with toVhtlc; do the same refactor for GetScripts (replace ListVHTLC usage with
a targeted query that returns only non-empty scripts) so the DB performs the
filtering and we avoid deserializing the entire table.
In `@internal/interface/grpc/handlers/notification_handler.go`:
- Around line 98-100: Add a doc comment immediately above the GetVhtlcEvents
method on the notificationHandler type describing its behavior: explain that
GetVhtlcEvents is a server-side gRPC streaming handler that continuously sends
VHTLC events to the provided pb.NotificationService_GetVhtlcEventsServer stream,
note what kinds of events are streamed and any concurrency/close/context
semantics (e.g., it returns when the stream is closed or an error occurs), and
document the parameters and error return briefly so readers know the method’s
contract.
In `@internal/test/e2e/utils_test.go`:
- Around line 26-42: Extract the duplicated insecure gRPC connection setup into
a helper, e.g. dialInsecure(url string) (*grpc.ClientConn, error), that calls
grpc.WithTransportCredentials(insecure.NewCredentials()) and grpc.Dial (or
grpc.DialContext) to return the *grpc.ClientConn; then update newFulmineClient
and newNotificationClient to call dialInsecure, create their
pb.NewServiceClient(conn) / pb.NewNotificationServiceClient(conn) from the
returned conn, and ensure the connection is not leaked (either return the conn
alongside the client or add a Close method/ensure callers defer conn.Close()).
Reference: newFulmineClient, newNotificationClient, and the new dialInsecure
helper.
In `@internal/test/e2e/vhtlc_test.go`:
- Around line 170-207: The helper waitForVhtlcEvent spawns a new goroutine each
loop that calls stream.Recv(), which can leak if the context times out because
the blocked Recv() remains; change the approach to start one long-lived reader
goroutine outside the for loop that repeatedly calls stream.Recv() and forwards
results/errors into shared channels (e.g., eventCh and errCh), and then have the
loop select on those channels and ctx.Done(); ensure the reader goroutine exits
when ctx is canceled (listen on ctx or close the stream) so no Recv() goroutines
are left hanging.
|
|
||
| // Notification channels | ||
| notifications chan Notification | ||
| events chan VhtlcEvent |
There was a problem hiding this comment.
Unbuffered events channel + fire-and-forget goroutine in emitEvent leaks goroutines when no consumer is connected.
events is an unbuffered channel (make(chan VhtlcEvent)). Every call to emitEvent spawns a goroutine that blocks on s.events <- event indefinitely if nobody is reading (e.g., no gRPC stream is active). Over the VHTLC lifecycle (created → funded → claimed/refunded), these goroutines accumulate and never terminate.
Options:
- Use a buffered channel with a reasonable capacity and drop events when full (select-with-default).
- Track active consumers and skip emission when none are listening.
Proposed fix: buffered channel with non-blocking send
- events: make(chan VhtlcEvent),
+ events: make(chan VhtlcEvent, 64), func (s *Service) emitEvent(event VhtlcEvent) {
- go func() {
- s.events <- event
- }()
+ select {
+ case s.events <- event:
+ default:
+ log.Warn("events channel full, dropping vhtlc event")
+ }
}This removes the goroutine leak entirely. With a buffered channel, events queue up. If the buffer fills (no consumer), the event is dropped with a warning rather than leaking a goroutine.
Also applies to: 1198-1206
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/core/application/service.go` at line 101, The unbuffered events
channel (events chan VhtlcEvent) combined with emitEvent spawning a goroutine
that does s.events <- event leaks goroutines when no consumer is reading; change
the channel allocation from make(chan VhtlcEvent) to a buffered channel (e.g.,
make(chan VhtlcEvent, <reasonable_capacity>)) where the channel is created
(constructor/new service init), remove the fire-and-forget goroutine from
emitEvent and replace the send with a non-blocking select send (case s.events <-
event: ... default: log a warning/drop the event using the service logger like
s.logger.Warnf or processLogger.Warnf) so events are queued while consumers
exist and dropped safely when the buffer is full.
| // handleVhtlcScriptEvent handles indexer script events for VHTLCs. | ||
| // This is called by the vhtlcSubscription handler when a VHTLC script is detected on-chain. | ||
| func (s *Service) handleVhtlcScriptEvent(event *indexer.ScriptEvent) { | ||
| ctx := context.Background() | ||
|
|
||
| if event == nil || len(event.Scripts) == 0 { | ||
| log.Debug("received nil or empty script event") | ||
| return | ||
| } | ||
|
|
||
| log.Debugf("received vhtlc script event with %d scripts in tx %s", len(event.Scripts), event.Txid) | ||
|
|
||
| vhtlcs, err := s.dbSvc.VHTLC().GetByScripts(ctx, event.Scripts) | ||
| if err != nil { | ||
| log.WithError(err).Debug("failed to get vhtlcs for scripts") | ||
| return | ||
| } | ||
| for _, v := range vhtlcs { | ||
| s.emitEvent(VhtlcEvent{ | ||
| ID: v.Id, | ||
| Txid: event.Txid, | ||
| Type: EventTypeVhtlcFunded, | ||
| Timestamp: time.Now(), | ||
| }) | ||
| } | ||
| } |
There was a problem hiding this comment.
Log level for DB error in handleVhtlcScriptEvent should be Warn or Error, not Debug.
Line 1769: a failure to query VHTLCs from the database is logged at Debug level. This is a real error that could cause funded events to be silently dropped. It should be Warn or Error so operators can detect it.
Proposed fix
- log.WithError(err).Debug("failed to get vhtlcs for scripts")
+ log.WithError(err).Error("failed to get vhtlcs for scripts")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // handleVhtlcScriptEvent handles indexer script events for VHTLCs. | |
| // This is called by the vhtlcSubscription handler when a VHTLC script is detected on-chain. | |
| func (s *Service) handleVhtlcScriptEvent(event *indexer.ScriptEvent) { | |
| ctx := context.Background() | |
| if event == nil || len(event.Scripts) == 0 { | |
| log.Debug("received nil or empty script event") | |
| return | |
| } | |
| log.Debugf("received vhtlc script event with %d scripts in tx %s", len(event.Scripts), event.Txid) | |
| vhtlcs, err := s.dbSvc.VHTLC().GetByScripts(ctx, event.Scripts) | |
| if err != nil { | |
| log.WithError(err).Debug("failed to get vhtlcs for scripts") | |
| return | |
| } | |
| for _, v := range vhtlcs { | |
| s.emitEvent(VhtlcEvent{ | |
| ID: v.Id, | |
| Txid: event.Txid, | |
| Type: EventTypeVhtlcFunded, | |
| Timestamp: time.Now(), | |
| }) | |
| } | |
| } | |
| // handleVhtlcScriptEvent handles indexer script events for VHTLCs. | |
| // This is called by the vhtlcSubscription handler when a VHTLC script is detected on-chain. | |
| func (s *Service) handleVhtlcScriptEvent(event *indexer.ScriptEvent) { | |
| ctx := context.Background() | |
| if event == nil || len(event.Scripts) == 0 { | |
| log.Debug("received nil or empty script event") | |
| return | |
| } | |
| log.Debugf("received vhtlc script event with %d scripts in tx %s", len(event.Scripts), event.Txid) | |
| vhtlcs, err := s.dbSvc.VHTLC().GetByScripts(ctx, event.Scripts) | |
| if err != nil { | |
| log.WithError(err).Error("failed to get vhtlcs for scripts") | |
| return | |
| } | |
| for _, v := range vhtlcs { | |
| s.emitEvent(VhtlcEvent{ | |
| ID: v.Id, | |
| Txid: event.Txid, | |
| Type: EventTypeVhtlcFunded, | |
| Timestamp: time.Now(), | |
| }) | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/core/application/service.go` around lines 1755 - 1780, The DB error
in handleVhtlcScriptEvent is logged at Debug which can hide real failures;
update the logging call that handles the error from s.dbSvc.VHTLC().GetByScripts
(currently using log.WithError(err).Debug("failed to get vhtlcs for scripts"))
to use a higher severity (log.WithError(err).Warn or log.WithError(err).Error —
prefer Error) so database query failures are visible to operators; keep the same
message and error attachment otherwise.
| // insertNewSchemaRowWithScript inserts a VHTLC row into the new-schema table | ||
| // (which has a script column). Uses real secp256k1 keys so BackfillVhtlcScripts | ||
| // can derive a valid taproot script for the row. | ||
| func insertNewSchemaRowWithoutScript(t *testing.T, dbh *sql.DB) (id string) { |
There was a problem hiding this comment.
Doc comment names wrong function.
The comment on line 195 says insertNewSchemaRowWithScript but the actual function is insertNewSchemaRowWithoutScript. The comment body also says "which has a script column" — the table may have the column, but the function deliberately omits the script value from the INSERT so the default '[]' is used.
Suggested fix
-// insertNewSchemaRowWithScript inserts a VHTLC row into the new-schema table
-// (which has a script column). Uses real secp256k1 keys so BackfillVhtlcScripts
+// insertNewSchemaRowWithoutScript inserts a VHTLC row into the new-schema table
+// without specifying a script value (relying on the DEFAULT '[]'). Uses real secp256k1 keys so BackfillVhtlcScripts
// can derive a valid taproot script for the row.
func insertNewSchemaRowWithoutScript(t *testing.T, dbh *sql.DB) (id string) {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // insertNewSchemaRowWithScript inserts a VHTLC row into the new-schema table | |
| // (which has a script column). Uses real secp256k1 keys so BackfillVhtlcScripts | |
| // can derive a valid taproot script for the row. | |
| func insertNewSchemaRowWithoutScript(t *testing.T, dbh *sql.DB) (id string) { | |
| // insertNewSchemaRowWithoutScript inserts a VHTLC row into the new-schema table | |
| // without specifying a script value (relying on the DEFAULT '[]'). Uses real secp256k1 keys so BackfillVhtlcScripts | |
| // can derive a valid taproot script for the row. | |
| func insertNewSchemaRowWithoutScript(t *testing.T, dbh *sql.DB) (id string) { |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/infrastructure/db/sqlite/vhtlc_migration_test.go` around lines 195 -
198, Update the doc comment for insertNewSchemaRowWithoutScript to match the
function name and behavior: change the function name in the comment to
insertNewSchemaRowWithoutScript and clarify that although the table has a script
column, this function deliberately omits the script value from the INSERT so the
column uses its default ('[]'), rather than suggesting it inserts a script;
reference the function insertNewSchemaRowWithoutScript and the behavior of using
the default script value in the comment.
| tx, err := db.BeginTx(ctx, nil) | ||
| if err != nil { | ||
| return fmt.Errorf("begin transaction: %w", err) | ||
| } | ||
| defer func() { | ||
| if err != nil { | ||
| _ = tx.Rollback() | ||
| } | ||
| }() |
There was a problem hiding this comment.
Deferred rollback won't fire on mid-loop errors — transaction leak.
The deferred rollback checks the err variable declared at line 362. After a successful rows.Scan (line 394, err = ...), this variable is nil. Subsequent errors in the loop body (lines 400–461) are declared with := in inner scopes—they shadow the outer err and don't update it. If any of those return an error, the deferred rollback sees err == nil and skips the rollback, leaking the open transaction.
The simplest fix: always rollback in the defer (rollback after commit is a documented no-op in database/sql).
Suggested fix
defer func() {
- if err != nil {
- _ = tx.Rollback()
- }
+ _ = tx.Rollback() // no-op after successful Commit
}()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| tx, err := db.BeginTx(ctx, nil) | |
| if err != nil { | |
| return fmt.Errorf("begin transaction: %w", err) | |
| } | |
| defer func() { | |
| if err != nil { | |
| _ = tx.Rollback() | |
| } | |
| }() | |
| tx, err := db.BeginTx(ctx, nil) | |
| if err != nil { | |
| return fmt.Errorf("begin transaction: %w", err) | |
| } | |
| defer func() { | |
| _ = tx.Rollback() // no-op after successful Commit | |
| }() |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/infrastructure/db/sqlite/vhtlc_migration.go` around lines 362 - 370,
The defer currently conditionally rollbacks based on the outer err variable,
which can be shadowed by inner := errors and cause a transaction leak; modify
the defer associated with tx from db.BeginTx so it always calls tx.Rollback()
(ignoring the rollback error), allowing Commit to succeed later (Rollback after
Commit is a no-op). Locate the tx variable and the existing defer func() { if
err != nil { _ = tx.Rollback() } } and replace it with an unconditional _ =
tx.Rollback() inside the defer or alternatively use a named return error
variable pattern so inner := usages don't shadow err.
| func toVhtlcRow(v domain.Vhtlc) queries.InsertVHTLCParams { | ||
| preimageHash := v.PreimageHash | ||
| sender := v.Sender.SerializeCompressed() | ||
| receiver := v.Receiver.SerializeCompressed() | ||
| server := hex.EncodeToString(v.Server.SerializeCompressed()) | ||
|
|
||
| vhtlcId := domain.GetVhtlcId(preimageHash, sender, receiver) | ||
|
|
||
| lockingScriptHex, err := vhtlc.LockingScriptHexFromOpts(v.Opts) | ||
| if err != nil { | ||
| log.WithError(err).Error("failed to derive taproot locking script for vhtlc") | ||
| return queries.InsertVHTLCParams{ | ||
| ID: vhtlcId, | ||
| PreimageHash: hex.EncodeToString(preimageHash), | ||
| Sender: hex.EncodeToString(sender), | ||
| Receiver: hex.EncodeToString(receiver), | ||
| Server: server, | ||
| RefundLocktime: int64(v.RefundLocktime), | ||
| UnilateralClaimDelayType: int64(v.UnilateralClaimDelay.Type), | ||
| UnilateralClaimDelayValue: int64(v.UnilateralClaimDelay.Value), | ||
| UnilateralRefundDelayType: int64(v.UnilateralRefundDelay.Type), | ||
| UnilateralRefundDelayValue: int64(v.UnilateralRefundDelay.Value), | ||
| UnilateralRefundWithoutReceiverDelayType: int64(v.UnilateralRefundWithoutReceiverDelay.Type), | ||
| UnilateralRefundWithoutReceiverDelayValue: int64(v.UnilateralRefundWithoutReceiverDelay.Value), | ||
| Script: "[]", | ||
| } | ||
| } |
There was a problem hiding this comment.
toVhtlcRow silently degrades on locking-script derivation failure, persisting "[]" as a sentinel.
When LockingScriptHexFromOpts fails, the function logs the error but still returns a complete InsertVHTLCParams with Script: "[]". The caller (Add) will insert this row into the database with a bogus script value. This means:
- The VHTLC is persisted with corrupted data that can never be matched by
GetByScripts. - The error is only visible in logs — no programmatic signal reaches the caller.
If deriving the script is essential (it's needed for event subscriptions and GetByScripts), consider returning an error from toVhtlcRow so the caller can decide whether to abort or proceed.
Proposed fix: propagate the error
-func toVhtlcRow(v domain.Vhtlc) queries.InsertVHTLCParams {
+func toVhtlcRow(v domain.Vhtlc) (queries.InsertVHTLCParams, error) {
preimageHash := v.PreimageHash
sender := v.Sender.SerializeCompressed()
receiver := v.Receiver.SerializeCompressed()
server := hex.EncodeToString(v.Server.SerializeCompressed())
vhtlcId := domain.GetVhtlcId(preimageHash, sender, receiver)
lockingScriptHex, err := vhtlc.LockingScriptHexFromOpts(v.Opts)
if err != nil {
- log.WithError(err).Error("failed to derive taproot locking script for vhtlc")
- return queries.InsertVHTLCParams{
- ...
- Script: "[]",
- }
+ return queries.InsertVHTLCParams{}, fmt.Errorf("failed to derive taproot locking script: %w", err)
}
- return queries.InsertVHTLCParams{
+ return queries.InsertVHTLCParams{
...
Script: lockingScriptHex,
- }
+ }, nil
}Then update Add to handle the error:
func (r *vhtlcRepository) Add(ctx context.Context, vhtlc domain.Vhtlc) error {
- optsParams := toVhtlcRow(vhtlc)
+ optsParams, err := toVhtlcRow(vhtlc)
+ if err != nil {
+ return fmt.Errorf("failed to convert vhtlc to row: %w", err)
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/infrastructure/db/sqlite/vhtlc_repo.go` around lines 195 - 221,
toVhtlcRow currently swallows errors from LockingScriptHexFromOpts and returns a
bogus Script "[]", causing corrupted DB rows and no programmatic error; change
toVhtlcRow to return (queries.InsertVHTLCParams, error) and return the actual
error when LockingScriptHexFromOpts fails instead of constructing a row, then
update its caller Add to handle that error (propagate or abort insert) so no row
is persisted with an invalid Script and GetByScripts will work correctly; ensure
all other callers of toVhtlcRow are updated to handle the new error return.
| func (h *notificationHandler) GetVhtlcEvents( | ||
| _ *pb.GetVhtlcEventsRequest, stream pb.NotificationService_GetVhtlcEventsServer, | ||
| ) error { | ||
| listener := &listener[*pb.GetVhtlcEventsResponse]{ | ||
| id: uuid.NewString(), | ||
| ch: make(chan *pb.GetVhtlcEventsResponse), | ||
| } | ||
|
|
||
| h.vhtlcEventsListenerHandler.pushListener(listener) | ||
| defer h.vhtlcEventsListenerHandler.removeListener(listener.id) | ||
|
|
||
| for { | ||
| select { | ||
| case <-stream.Context().Done(): | ||
| close(listener.ch) | ||
| return nil | ||
| case ev, ok := <-listener.ch: | ||
| if !ok { | ||
| return nil | ||
| } | ||
| if err := stream.Send(ev); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Double-close panic risk on listener.ch.
When stream.Context().Done() fires, close(listener.ch) is called (line 112). But if stopCh fires around the same time, stop() (line 217) also closes all listener channels. This will panic on a double close. The same pattern exists in GetVtxoNotifications (line 79), but since GetVhtlcEvents is new code, it should be addressed.
Consider removing the explicit close(listener.ch) here, and only letting stop() close channels on shutdown. Alternatively, use a sync.Once wrapper to ensure the channel is closed exactly once.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/interface/grpc/handlers/notification_handler.go` around lines 98 -
123, The GetVhtlcEvents handler currently calls close(listener.ch) when
stream.Context().Done() which can race with stop() (which also closes listener
channels) and cause a double-close panic; update the code in GetVhtlcEvents to
stop explicitly closing listener.ch — simply remove the close(listener.ch) call
and rely on vhtlcEventsListenerHandler.removeListener(listener.id) and the
global stop() to close channels, or alternately wrap channel close logic in a
single-close guard (e.g., a sync.Once) on the listener struct so
close(listener.ch) is executed exactly once; pay attention to the listener type,
its ch field, the GetVhtlcEvents method, and the stop() that closes all listener
channels when applying the fix.
| func (h *notificationHandler) listenToEvents() { | ||
| for { | ||
| select { | ||
| case event := <-h.svc.GetEvents(context.Background()): | ||
| for _, l := range h.vhtlcEventsListenerHandler.listeners { | ||
| go func(l *listener[*pb.GetVhtlcEventsResponse]) { | ||
| l.ch <- &pb.GetVhtlcEventsResponse{ | ||
| Event: toVhtlcEventProto(event), | ||
| } | ||
| }(l) | ||
| } | ||
| case <-h.stopCh: | ||
| h.vhtlcEventsListenerHandler.stop() | ||
| return | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Data race: listeners slice read without holding the lock.
listenToEvents iterates h.vhtlcEventsListenerHandler.listeners (line 165) without acquiring h.vhtlcEventsListenerHandler.lock, while pushListener/removeListener mutate the slice under that lock. This is a data race.
Additionally, the go func on line 166 sends to l.ch, but GetVhtlcEvents (line 112) closes that channel when the stream context is cancelled. This creates a send-on-closed-channel panic risk since the goroutine may be mid-flight when the channel is closed.
Both issues also exist in the pre-existing listenToNotifications/GetVtxoNotifications pair, but since listenToEvents/GetVhtlcEvents is new code, it should be fixed here.
Suggested approach for listenToEvents
func (h *notificationHandler) listenToEvents() {
for {
select {
case event := <-h.svc.GetEvents(context.Background()):
- for _, l := range h.vhtlcEventsListenerHandler.listeners {
+ h.vhtlcEventsListenerHandler.lock.Lock()
+ snapshot := make([]*listener[*pb.GetVhtlcEventsResponse], len(h.vhtlcEventsListenerHandler.listeners))
+ copy(snapshot, h.vhtlcEventsListenerHandler.listeners)
+ h.vhtlcEventsListenerHandler.lock.Unlock()
+ for _, l := range snapshot {
go func(l *listener[*pb.GetVhtlcEventsResponse]) {
- l.ch <- &pb.GetVhtlcEventsResponse{
- Event: toVhtlcEventProto(event),
+ defer func() { recover() }() // guard against send on closed channel
+ select {
+ case l.ch <- &pb.GetVhtlcEventsResponse{
+ Event: toVhtlcEventProto(event),
+ }:
+ default:
}
}(l)
}
case <-h.stopCh:📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (h *notificationHandler) listenToEvents() { | |
| for { | |
| select { | |
| case event := <-h.svc.GetEvents(context.Background()): | |
| for _, l := range h.vhtlcEventsListenerHandler.listeners { | |
| go func(l *listener[*pb.GetVhtlcEventsResponse]) { | |
| l.ch <- &pb.GetVhtlcEventsResponse{ | |
| Event: toVhtlcEventProto(event), | |
| } | |
| }(l) | |
| } | |
| case <-h.stopCh: | |
| h.vhtlcEventsListenerHandler.stop() | |
| return | |
| } | |
| } | |
| } | |
| func (h *notificationHandler) listenToEvents() { | |
| for { | |
| select { | |
| case event := <-h.svc.GetEvents(context.Background()): | |
| h.vhtlcEventsListenerHandler.lock.Lock() | |
| snapshot := make([]*listener[*pb.GetVhtlcEventsResponse], len(h.vhtlcEventsListenerHandler.listeners)) | |
| copy(snapshot, h.vhtlcEventsListenerHandler.listeners) | |
| h.vhtlcEventsListenerHandler.lock.Unlock() | |
| for _, l := range snapshot { | |
| go func(l *listener[*pb.GetVhtlcEventsResponse]) { | |
| defer func() { recover() }() // guard against send on closed channel | |
| select { | |
| case l.ch <- &pb.GetVhtlcEventsResponse{ | |
| Event: toVhtlcEventProto(event), | |
| }: | |
| default: | |
| } | |
| }(l) | |
| } | |
| case <-h.stopCh: | |
| h.vhtlcEventsListenerHandler.stop() | |
| return | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/interface/grpc/handlers/notification_handler.go` around lines 161 -
177, listenToEvents currently reads h.vhtlcEventsListenerHandler.listeners
without taking the vhtlcEventsListenerHandler.lock and launches goroutines that
may send on a channel closed by GetVhtlcEvents, causing a race and possible
panic; fix by acquiring the lock in listenToEvents just long enough to copy the
listeners slice (h.vhtlcEventsListenerHandler.lock and
h.vhtlcEventsListenerHandler.listeners), then release the lock and iterate the
copied slice, and for each listener launch a goroutine that wraps the send to
l.ch with a defer-recover (or otherwise guards against send-on-closed-channel)
so a concurrent channel close from GetVhtlcEvents won't panic; keep
pushListener/removeListener unchanged but reference them as the mutating
callers.
There was a problem hiding this comment.
arkanaai[bot] PR Review — fulmine #373
Reviewed: GetVhtlcStream RPC — VHTLC lifecycle event streaming + DB migration for VHTLC script column
🔴 Security concern: preimage exposed in streaming RPC
VhtlcEvent.Preimage is populated and streamed to all GetVhtlcEvents clients on EVENT_TYPE_VHTLC_CLAIMED:
// service.go
s.emitEvent(VhtlcEvent{
Type: EventTypeVhtlcClaimed,
Preimage: hex.EncodeToString(preimage),
...
})The preimage is a payment secret. If GetVhtlcEvents has no authentication or is accessible from the local gRPC port without TLS, anyone who can connect to Fulmine can receive preimages for all claims. This is the same risk as exposing an LN preimage over an unauthenticated local socket.
Questions:
- Is
NotificationService.GetVhtlcEventsauthentication-gated (same as other notification RPCs)? - Is the preimage needed in the event, or can callers use the
txidto resolve it from their own state?
If the preimage is required for downstream automation (e.g., Boltz cooperative claim), it should only be included when the caller has authenticated and is the owner of the VHTLC. If no auth is required, remove preimage from the public event or exclude it from the stream response.
🟡 emitEvent channel behavior under load
events chan VhtlcEvent is unbuffered (created with make(chan VhtlcEvent)). emitEvent likely blocks if no one is consuming GetVhtlcEvents. If Fulmine has no active RPC client, event emissions during ClaimVHTLC / RefundVHTLC could block the goroutine calling those handlers.
Verify that emitEvent uses a non-blocking send (select { case s.events <- ev: default: }) or uses a buffered channel.
✅ Subscription handler refactor
newReadOnlySubscriptionHandler with optional withScriptsMutator is a clean separation. VHTLC scripts use read-only subscriptions (no add/delete persistence) since the scripts are dynamically registered at swap creation time via vhtlcSubscription.subscribe(...). This is correct.
✅ DB migration (SQLite)
add_vhtlc_script_column.up.sql adds a script column to the VHTLC table. The vhtlc_migration.go backfills existing rows by re-deriving scripts from stored VHTLC data. The migration tests (migrations_test.go, service_vhtlc_migration_test.go) appear comprehensive — good coverage of the backfill path.
🟡 VtxoReceived/VtxoSpent event types defined but not emitted
EventTypeVtxoReceived and EventTypeVtxoSpent are in the Go enum but not in the proto. Either add them to proto now (for completeness) or remove them from the Go enum to avoid future confusion. Stub definitions without wire representation are confusing.
🔍 Breaking: TransactionInfo.settled field removed
settled = 7 is removed from types.proto. This is a breaking proto change for any client that reads TransactionInfo.settled. Is this intentional? If clients rely on this field, they'll silently get false (proto3 default) without a serialization error.
Summary: Core functionality looks correct. The blocking concern is the preimage in the public event stream. Needs authentication audit before merge.
|
👋 Hey @sekulicd — friendly nudge! This PR has had changes requested since Feb 22 (~23 days ago). Are you still planning to address the feedback, or should this be closed/deprioritized? Let us know if you need any help. |
🔍 Arkana PR Review —
|
|
test |
|
👋 @sekulicd — changes were requested on this PR back in February. Any updates, or is this one blocked on something? Happy to help prioritize if needed. |
|
Hey @sekulicd — changes were requested on this PR and it hasn't been updated in a couple of days. Worth a look when you get a chance? |
|
Hey @sekulicd — this PR has had changes requested for about 5 weeks with no new commits. Still planning to address the review feedback? Would love to see this land. |
|
👋 Hey @sekulicd — friendly nudge! This PR had changes requested back on Feb 22 and hasn't had an update since. Is this still in progress, or would it help to discuss the feedback? Just checking in. |
|
Hi @sekulicd — this PR has been waiting since the CHANGES_REQUESTED on Feb 22 (37 days). The main blocker flagged in review is the preimage exposed in the streaming RPC — specifically whether |
|
Hey @sekulicd — just checking in on this one. There's been a CHANGES_REQUESTED review here since Feb 22 with no update since. Is this still in progress, blocked on something, or should it be marked draft? 🙏 |
|
👋 Hey @sekulicd — friendly nudge! The CHANGES_REQUESTED review from Feb 22 flagged a preimage security concern in the VHTLC event stream (preimage exposed to unauthenticated GetVhtlcEvents clients). This has been open for ~5 weeks now. Could you take a look when you get a chance? Happy to help clarify anything. |
Title
Standardize VHTLC events + script migration flow; update go-sdk integration
Summary
This PR consolidates VHTLC event streaming and script migration behavior, then updates integrations to the latest go-sdk changes.
Why
GetVhtlcEvents, VHTLC-only payloads), and docs/proto/generated code needed to stay aligned.What changed
api-spec/protobuf/fulmine/v1/notification.protoapi-spec/protobuf/fulmine/v1/types.protointernal/interface/grpc/handlers/notification_handler.gointernal/interface/grpc/handlers/utils.gointernal/interface/grpc/handlers/notification_handler_test.gointernal/core/application/service.gointernal/core/application/subscription.gointernal/core/domain/vhtlc.goscriptcolumn:internal/infrastructure/db/sqlite/migration/20260216162009_add_vhtlc_script_column.up.sqlinternal/infrastructure/db/sqlite/migration/20260216162009_add_vhtlc_script_column.down.sqlinternal/infrastructure/db/migrations.gointernal/infrastructure/db/migrations_test.gointernal/infrastructure/db/service_vhtlc_migration_test.gointernal/infrastructure/db/service.gointernal/infrastructure/db/sqlite/vhtlc_migration.gointernal/infrastructure/db/sqlite/vhtlc_migration_test.gointernal/infrastructure/db/sqlite/vhtlc_migration_whitebox_test.gointernal/test/e2e/vhtlc_test.gointernal/test/e2e/utils_test.gogo.mod,go.sumpkg/swap/go.mod,pkg/swap/go.suminternal/interface/grpc/handlers/service_handler.gointernal/interface/web/handlers.goDockerfilearkd.Dockerfilearkdwallet.DockerfileTesting
Note that this is continuations on @Kukks Draft PR
@altafan @Kukks please review
Summary by CodeRabbit
New Features
API Changes
Chores