Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughIntroduces an async invoice synchronization system for Stripe that replaces synchronous invoice operations with persistent sync plans containing ordered operations executed via events. Includes adapter-based persistence, event-driven handler, locking mechanisms, result reconstruction, and billing service integration. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant App as Invoice<br/>Operation
participant Service as Sync<br/>Service
participant Adapter as Adapter<br/>(DB)
participant Pub as Event<br/>Publisher
participant Handler as Sync<br/>Handler
participant Executor as Executor
participant Stripe as Stripe<br/>API
App->>Service: CreateDraftSyncPlan()
Service->>Service: Generate operations
Service->>Adapter: CreateSyncPlan()
Adapter->>Adapter: Create plan + ops (DB)
Service->>Pub: OnCommit(PublishEvent)
Pub-->>Handler: ExecuteSyncPlanEvent
Handler->>Adapter: GetSyncPlan()
Handler->>Adapter: AcquireLock()
Handler->>Executor: ExecuteNextOperation()
Executor->>Adapter: GetNextPendingOp()
Executor->>Stripe: StripeAPI (Create/Update/etc)
Stripe-->>Executor: Response
Executor->>Adapter: CompleteOperation(response)
Handler->>Adapter: Check if done
alt More ops remain
Handler->>Pub: PublishEvent (next operation)
else All done
Handler->>Adapter: CompletePlan()
Handler->>App: SyncDraftInvoice()
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 11
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
openmeter/testutils/pg_driver.go (1)
60-65:⚠️ Potential issue | 🟡 MinorUpdate the local Postgres test example.
The inline command here still points at
POSTGRES_HOST=localhostand skips-tags=dynamic, so it nudges people toward a setup that doesn’t match this repo’s test requirements.As per coding guidelines,
**/*.go: Run tests with-tags=dynamicflag (required for confluent-kafka-go) and usePOSTGRES_HOST=127.0.0.1environment variable when Postgres is needed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@openmeter/testutils/pg_driver.go` around lines 60 - 65, Update the inline test example in the InitPostgresDB comment to use the repo-required test flags and host: change the example command that currently shows "POSTGRES_HOST=localhost go test ./internal/credit/..." to use POSTGRES_HOST=127.0.0.1 and include the -tags=dynamic flag (e.g., POSTGRES_HOST=127.0.0.1 go test -tags=dynamic ./...) so the comment in InitPostgresDB reflects the correct way to run tests that require Postgres and confluent-kafka-go.
🧹 Nitpick comments (6)
openmeter/app/stripe/invoicesync/app_noop_test.go (1)
12-76: Make unexpected stub calls fail fast.Right now every method returns zero values or
nil, so an accidental dependency call can slip through and make the sync tests fail much later in a less obvious spot. Returning an explicit error for methods this suite shouldn’t touch will make the tests a lot sharper.As per coding guidelines,
**/*_test.go: Make sure the tests are comprehensive and cover the changes. Keep a strong focus on unit tests and in-code integration tests.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@openmeter/app/stripe/invoicesync/app_noop_test.go` around lines 12 - 76, The noopAppService currently returns zero values and nil errors allowing accidental calls to go unnoticed; update noopAppService so any method that tests shouldn't invoke returns an explicit error (e.g., fmt.Errorf or errors.New) containing the method name to fail fast — modify each method on noopAppService (e.g., RegisterMarketplaceListing, GetMarketplaceListing, ListMarketplaceListings, InstallMarketplaceListingWithAPIKey, InstallMarketplaceListing, GetMarketplaceListingOauth2InstallURL, AuthorizeMarketplaceListingOauth2Install, CreateApp, UpdateAppStatus, GetApp, UpdateApp, ListApps, UninstallApp, ListCustomerData, EnsureCustomer, DeleteCustomer) to return a clear "unexpected call to <MethodName>" error instead of nil; keep only those specific stubs that tests legitimately use returning zero values if needed.openmeter/billing/service/stdinvoicestate.go (2)
717-721: Same hardcoded string concern applies here.Similar to the draft sync metadata clearing, consider using the invoicesync constants for the issuing sync keys as well.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@openmeter/billing/service/stdinvoicestate.go` around lines 717 - 721, Replace the hardcoded metadata keys used when clearing issuing sync metadata with the corresponding constants from the invoicesync package instead of string literals; specifically, update the block that checks m.Invoice.Metadata and deletes "openmeter.io/stripe/issuing-sync-completed-at" and "openmeter.io/stripe/issuing-sync-plan-id" to use the invoicesync constants (e.g., invoicesync.IssuingSyncCompletedAtKey and invoicesync.IssuingSyncPlanIDKey or whatever the exact names are) so the deletes reference the centralized keys rather than duplicated strings.
682-689: Import the metadata key constants frominvoicesyncinstead of duplicating hardcoded strings.The invoicesync package already exports these constants in
openmeter/app/stripe/invoicesync/types.go, and other parts of the codebase use them (likeopenmeter/app/stripe/invoicesync/handler.goandopenmeter/app/stripe/entity/app/invoice.go). Using the constants here too keeps things in sync—if those keys ever change, you won't need to hunt through this file to update hardcoded strings.♻️ Suggested refactor
+import "github.com/openmeterio/openmeter/openmeter/app/stripe/invoicesync" + // Clear any previous sync completion metadata so canDraftSyncAdvance returns false // until the new sync plan completes. if m.Invoice.Metadata != nil { - delete(m.Invoice.Metadata, "openmeter.io/stripe/draft-sync-completed-at") - delete(m.Invoice.Metadata, "openmeter.io/stripe/draft-sync-plan-id") + delete(m.Invoice.Metadata, invoicesync.MetadataKeyDraftSyncCompletedAt) + delete(m.Invoice.Metadata, invoicesync.MetadataKeyDraftSyncPlanID) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@openmeter/billing/service/stdinvoicestate.go` around lines 682 - 689, Replace the hardcoded metadata keys with the exported constants from the invoicesync package: use invoicesync.MetadataKeyDraftSyncCompletedAt and invoicesync.MetadataKeyDraftSyncPlanID instead of the literal strings; add an import for the invoicesync package and update the delete calls on m.Invoice.Metadata to reference those constants (keep the existing nil-check). This ensures the keys remain consistent with invoicesync's definitions.openmeter/app/stripe/invoicesync/types_test.go (1)
48-50: Validate the encoding, not just the digest length.Line 50 only proves the key is 64 characters long, so a non-hex string of that length still passes. Adding an actual hex assertion would make this test enforce the SHA-256 contract it names.
As per coding guidelines, "
**/*_test.go: Make sure the tests are comprehensive and cover the changes."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@openmeter/app/stripe/invoicesync/types_test.go` around lines 48 - 50, The test only checks length but not that GenerateIdempotencyKey returns a hex-encoded SHA-256 string; update the test for the case in t.Run("key is hex encoded sha256") to assert the key is valid hex (e.g. use encoding/hex.DecodeString or a regexp like ^[0-9a-f]{64}$) in addition to require.Len, referencing GenerateIdempotencyKey and OpTypeInvoiceCreate so the test enforces the SHA-256 hex contract.openmeter/app/stripe/invoicesync/handler_test.go (1)
16-23: Expand this to one case per requiredHandlerConfigfield.An empty config only proves the constructor rejects a missing
Adapter. The rest of the required fields can regress one by one without this test noticing, so the test name currently over-promises a bit.As per coding guidelines, "
**/*_test.go: Make sure the tests are comprehensive and cover the changes."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@openmeter/app/stripe/invoicesync/handler_test.go` around lines 16 - 23, The current TestNewHandler_AllFieldsRequired only verifies that NewHandler(HandlerConfig{}) fails for a missing Adapter; expand it into one test per required HandlerConfig field by creating a table-driven or subtest loop in TestNewHandler_AllFieldsRequired that constructs configs with exactly one required field set and the rest nil, calls NewHandler, and asserts an error contains the expected message for that missing field (referencing NewHandler and HandlerConfig); ensure you include checks for Adapter, Logger, DB/Store (whichever other required symbols exist in HandlerConfig) and assert the error string mentions each specific required field (e.g., "adapter is required", "logger is required", etc.).openmeter/app/stripe/invoicesync/planner.go (1)
250-281: The "deterministic" ordering still has a couple random edges.
amountDiscountsByIdis iterated as a map,updateLinesnever gets sorted, and the add-path sort key is onlyDescription. Two discounts or lines with the same description can still flip order between runs, which makes the marshaled payloads flaky. Sorting by a stable composite key likeom_line_type+om_line_idwould make the plan reproducible.Also applies to: 337-361, 404-407
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@openmeter/app/stripe/invoicesync/planner.go` around lines 250 - 281, amountDiscountsById is iterated in non-deterministic order and updateLines/addLines are not deterministically sorted (add-path only sorts by Description), causing flaky payload order; before iterating amountDiscountsById sort that slice by a stable composite key (e.g., discount.OmLineType + "|" + discount.OmLineID) and after building updateLines and addLines sort those slices by the same composite key so both update and add operations have a reproducible order; apply the same deterministic sorting approach in the other affected blocks referenced (around the logic that builds add/update lines in the same file, e.g., the areas noted at 337-361 and 404-407) and ensure you use the unique symbols amountDiscountsById, updateLines, addLines, toDiscountAddParams and toDiscountUpdateParams to locate and change the code.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@openmeter/app/stripe/entity/app/invoice.go`:
- Around line 67-76: The code currently blocks sync-plan creation on a live
Stripe read by calling getStripeClient and stripeClient.ListInvoiceLineItems
when invoice.ExternalIDs.Invoicing is present; remove that synchronous remote
call from the transactional/create path in the function that builds the sync
plan (the block using getStripeClient and ListInvoiceLineItems), and instead
record the Stripe ID or a "needs-remote-line-sync" flag in the created plan so
fetching invoice lines is deferred to plan execution (or an async worker) where
you can call getStripeClient and ListInvoiceLineItems; ensure the created sync
plan no longer waits on Stripe and that execution time uses the stored
invoice.ExternalIDs.Invoicing to retrieve line items.
In `@openmeter/app/stripe/invoicesync/adapter/adapter_test.go`:
- Around line 495-500: The test builds a 64KiB blob but only serializes the
first 32 bytes into StripeInvoiceID, so the DB never sees a large payload;
update the test to serialize the full blob into the payload passed to
CreateSyncPlan (or construct a json.RawMessage from the full large byte slice)
so the created RawMessage/InvoiceUpdatePayload contains the entire 64KiB data;
specifically change the construction around invoicesync.InvoiceUpdatePayload /
rawPayload to use string(large) or json.RawMessage(large) (and ensure the
rawPayload variable used by CreateSyncPlan is the full-blob RawMessage) so the
test actually writes a large payload.
In `@openmeter/app/stripe/invoicesync/currency.go`:
- Around line 28-45: FormatAmount currently formats the raw decimal which can
drift from what Stripe actually charges; instead, call the existing rounding
helper (RoundToAmount) to convert the incoming alpacadecimal.Decimal into the
rounded minor-unit amount used for billing, then feed that rounded value into
the integer/minor-unit formatting path (the same path used when
amount.IsInteger()) via the currency calculator from
currencyx.Code(currency).Calculator() so display strings always match billed
subunits; update the non-integer branch in FormatAmount to use RoundToAmount and
then FormatAmount on the resulting minor-unit value.
In `@openmeter/app/stripe/invoicesync/handler.go`:
- Around line 206-213: The publish calls for follow-up events are executed
before the surrounding DB transaction commits, causing race conditions with
advisory locks; wrap both branches that call h.publisher.Publish (the one when
!result.Done and the delete branch around lines noted) in transaction.OnCommit
(or equivalent tx.OnCommit) so the Publish is invoked only after successful
commit, i.e., enqueue a closure that calls h.publisher.Publish(ctx,
ExecuteSyncPlanEvent{...}) inside transaction.OnCommit rather than calling
Publish directly inside the transaction.
In `@openmeter/app/stripe/invoicesync/planner_test.go`:
- Around line 582-587: The test only asserts tax fields for regularLine but
misses the discount line; update the test to also assert on discountLine (the
variable representing the discount invoice line) by checking
discountLine.TaxBehavior is non-nil and equals "exclusive" and
discountLine.TaxCode is non-nil and equals "txcd_10000000" so the discount tax
propagation is covered alongside the regular line assertions.
In `@openmeter/app/stripe/invoicesync/README.md`:
- Line 7: Update each bare fenced code block (``` ) in the README.md to include
a language identifier (use "text") so they read ```text; specifically change all
occurrences of bare fences that render the ASCII diagrams/snippets to fenced
blocks with the "text" language to satisfy markdownlint MD040 and keep make lint
green.
In `@openmeter/app/stripe/invoicesync/service/service.go`:
- Around line 56-97: The cancelAllActivePlans call in CreateDraftSyncPlan,
CreateIssuingSyncPlan and CreateDeleteSyncPlan can race with Handler.Handle
because it runs without the same invoice-scoped lock; change each Create* method
to acquire the same invoice-scoped lock used by Handler.Handle (wrap
cancelAllActivePlans, the Generate* call and the subsequent createAndPublish
within that lock), hold the lock while checking ExternalIDs for deletes so the
existence check and plan creation are atomic, and ensure proper defer/unlock to
avoid deadlocks.
In `@openmeter/billing/adapter/invoiceapp.go`:
- Around line 35-59: The loops update child rows by ID only, which can mutate
rows belonging to other invoices; restrict updates to children owned by the
invoice in in.Invoice. For BillingStandardInvoiceDetailedLine and
BillingInvoiceLineDiscount (the UpdateOneID calls on
tx.db.BillingStandardInvoiceDetailedLine and tx.db.BillingInvoiceLineDiscount),
change the update builder to include a predicate that the child’s parent invoice
equals in.Invoice (e.g. use a Where/Has... predicate or an Update().Where(... ID
== lineID AND invoice_id == in.Invoice) variant) so that if the ID does not
belong to the invoice the operation becomes a no-op/skip rather than updating
another invoice’s row; keep the existing NotFound handling for skipped rows.
In `@openmeter/ent/schema/app_stripe_invoice_sync.go`:
- Around line 46-52: Add validation to the enum-backed string fields so bad
values fail fast: for the fields defined as
field.String("phase").GoType(invoicesync.SyncPlanPhase("")) and
field.String("status").GoType(invoicesync.PlanStatus("")) (and the "type" field
in the operation schema), add .Validate(...) checks that only allow the known
enum values (derived from invoicesync.SyncPlanPhase and invoicesync.PlanStatus
constants) or add a DB CHECK constraint to those columns; update the Plan schema
(phase/status) and the Operation schema ("type") accordingly so persistence will
reject invalid strings instead of allowing silent fallbacks in executor code.
In `@pkg/framework/transaction/transaction.go`:
- Around line 49-65: The post-commit callbacks are being invoked with the same
ctx that still contains the transaction Driver and the post-commit queue marker,
which can cause callback code to reuse the committed tx or append to the
draining queue; fix by creating a tx-free, callback-free context before calling
runPostCommitCallbacks (e.g. add a helper clearTransactionContext(ctx) that
removes the transaction key and the post-commit-queue marker from context and
return that cleaned context), then call runPostCommitCallbacks(cleanCtx) instead
of runPostCommitCallbacks(ctx); reference initPostCommitCallbacks,
runPostCommitCallbacks, manage, and Driver to locate where to clear the context
and ensure nested OnCommit inside callbacks uses a fresh queue.
---
Outside diff comments:
In `@openmeter/testutils/pg_driver.go`:
- Around line 60-65: Update the inline test example in the InitPostgresDB
comment to use the repo-required test flags and host: change the example command
that currently shows "POSTGRES_HOST=localhost go test ./internal/credit/..." to
use POSTGRES_HOST=127.0.0.1 and include the -tags=dynamic flag (e.g.,
POSTGRES_HOST=127.0.0.1 go test -tags=dynamic ./...) so the comment in
InitPostgresDB reflects the correct way to run tests that require Postgres and
confluent-kafka-go.
---
Nitpick comments:
In `@openmeter/app/stripe/invoicesync/app_noop_test.go`:
- Around line 12-76: The noopAppService currently returns zero values and nil
errors allowing accidental calls to go unnoticed; update noopAppService so any
method that tests shouldn't invoke returns an explicit error (e.g., fmt.Errorf
or errors.New) containing the method name to fail fast — modify each method on
noopAppService (e.g., RegisterMarketplaceListing, GetMarketplaceListing,
ListMarketplaceListings, InstallMarketplaceListingWithAPIKey,
InstallMarketplaceListing, GetMarketplaceListingOauth2InstallURL,
AuthorizeMarketplaceListingOauth2Install, CreateApp, UpdateAppStatus, GetApp,
UpdateApp, ListApps, UninstallApp, ListCustomerData, EnsureCustomer,
DeleteCustomer) to return a clear "unexpected call to <MethodName>" error
instead of nil; keep only those specific stubs that tests legitimately use
returning zero values if needed.
In `@openmeter/app/stripe/invoicesync/handler_test.go`:
- Around line 16-23: The current TestNewHandler_AllFieldsRequired only verifies
that NewHandler(HandlerConfig{}) fails for a missing Adapter; expand it into one
test per required HandlerConfig field by creating a table-driven or subtest loop
in TestNewHandler_AllFieldsRequired that constructs configs with exactly one
required field set and the rest nil, calls NewHandler, and asserts an error
contains the expected message for that missing field (referencing NewHandler and
HandlerConfig); ensure you include checks for Adapter, Logger, DB/Store
(whichever other required symbols exist in HandlerConfig) and assert the error
string mentions each specific required field (e.g., "adapter is required",
"logger is required", etc.).
In `@openmeter/app/stripe/invoicesync/planner.go`:
- Around line 250-281: amountDiscountsById is iterated in non-deterministic
order and updateLines/addLines are not deterministically sorted (add-path only
sorts by Description), causing flaky payload order; before iterating
amountDiscountsById sort that slice by a stable composite key (e.g.,
discount.OmLineType + "|" + discount.OmLineID) and after building updateLines
and addLines sort those slices by the same composite key so both update and add
operations have a reproducible order; apply the same deterministic sorting
approach in the other affected blocks referenced (around the logic that builds
add/update lines in the same file, e.g., the areas noted at 337-361 and 404-407)
and ensure you use the unique symbols amountDiscountsById, updateLines,
addLines, toDiscountAddParams and toDiscountUpdateParams to locate and change
the code.
In `@openmeter/app/stripe/invoicesync/types_test.go`:
- Around line 48-50: The test only checks length but not that
GenerateIdempotencyKey returns a hex-encoded SHA-256 string; update the test for
the case in t.Run("key is hex encoded sha256") to assert the key is valid hex
(e.g. use encoding/hex.DecodeString or a regexp like ^[0-9a-f]{64}$) in addition
to require.Len, referencing GenerateIdempotencyKey and OpTypeInvoiceCreate so
the test enforces the SHA-256 hex contract.
In `@openmeter/billing/service/stdinvoicestate.go`:
- Around line 717-721: Replace the hardcoded metadata keys used when clearing
issuing sync metadata with the corresponding constants from the invoicesync
package instead of string literals; specifically, update the block that checks
m.Invoice.Metadata and deletes "openmeter.io/stripe/issuing-sync-completed-at"
and "openmeter.io/stripe/issuing-sync-plan-id" to use the invoicesync constants
(e.g., invoicesync.IssuingSyncCompletedAtKey and
invoicesync.IssuingSyncPlanIDKey or whatever the exact names are) so the deletes
reference the centralized keys rather than duplicated strings.
- Around line 682-689: Replace the hardcoded metadata keys with the exported
constants from the invoicesync package: use
invoicesync.MetadataKeyDraftSyncCompletedAt and
invoicesync.MetadataKeyDraftSyncPlanID instead of the literal strings; add an
import for the invoicesync package and update the delete calls on
m.Invoice.Metadata to reference those constants (keep the existing nil-check).
This ensures the keys remain consistent with invoicesync's definitions.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: d7039ac4-27e9-4544-8e49-21fa86cc85d1
⛔ Files ignored due to path filters (35)
go.sumis excluded by!**/*.sum,!**/*.sumopenmeter/ent/db/appstripeinvoicesyncop.gois excluded by!**/ent/db/**openmeter/ent/db/appstripeinvoicesyncop/appstripeinvoicesyncop.gois excluded by!**/ent/db/**openmeter/ent/db/appstripeinvoicesyncop/where.gois excluded by!**/ent/db/**openmeter/ent/db/appstripeinvoicesyncop_create.gois excluded by!**/ent/db/**openmeter/ent/db/appstripeinvoicesyncop_delete.gois excluded by!**/ent/db/**openmeter/ent/db/appstripeinvoicesyncop_query.gois excluded by!**/ent/db/**openmeter/ent/db/appstripeinvoicesyncop_update.gois excluded by!**/ent/db/**openmeter/ent/db/appstripeinvoicesyncplan.gois excluded by!**/ent/db/**openmeter/ent/db/appstripeinvoicesyncplan/appstripeinvoicesyncplan.gois excluded by!**/ent/db/**openmeter/ent/db/appstripeinvoicesyncplan/where.gois excluded by!**/ent/db/**openmeter/ent/db/appstripeinvoicesyncplan_create.gois excluded by!**/ent/db/**openmeter/ent/db/appstripeinvoicesyncplan_delete.gois excluded by!**/ent/db/**openmeter/ent/db/appstripeinvoicesyncplan_query.gois excluded by!**/ent/db/**openmeter/ent/db/appstripeinvoicesyncplan_update.gois excluded by!**/ent/db/**openmeter/ent/db/billinginvoice.gois excluded by!**/ent/db/**openmeter/ent/db/billinginvoice/billinginvoice.gois excluded by!**/ent/db/**openmeter/ent/db/billinginvoice/where.gois excluded by!**/ent/db/**openmeter/ent/db/billinginvoice_create.gois excluded by!**/ent/db/**openmeter/ent/db/billinginvoice_query.gois excluded by!**/ent/db/**openmeter/ent/db/billinginvoice_update.gois excluded by!**/ent/db/**openmeter/ent/db/client.gois excluded by!**/ent/db/**openmeter/ent/db/cursor.gois excluded by!**/ent/db/**openmeter/ent/db/ent.gois excluded by!**/ent/db/**openmeter/ent/db/entmixinaccessor.gois excluded by!**/ent/db/**openmeter/ent/db/expose.gois excluded by!**/ent/db/**openmeter/ent/db/hook/hook.gois excluded by!**/ent/db/**openmeter/ent/db/migrate/schema.gois excluded by!**/ent/db/**openmeter/ent/db/mutation.gois excluded by!**/ent/db/**openmeter/ent/db/paginate.gois excluded by!**/ent/db/**openmeter/ent/db/predicate/predicate.gois excluded by!**/ent/db/**openmeter/ent/db/runtime.gois excluded by!**/ent/db/**openmeter/ent/db/setorclear.gois excluded by!**/ent/db/**openmeter/ent/db/tx.gois excluded by!**/ent/db/**tools/migrate/migrations/atlas.sumis excluded by!**/*.sum,!**/*.sum
📒 Files selected for processing (51)
.gitignoreapp/common/app.goapp/common/openmeter_billingworker.gocmd/billing-worker/wire_gen.gocmd/jobs/internal/wire_gen.gocmd/server/wire_gen.goopenmeter/app/stripe/entity/app/app.goopenmeter/app/stripe/entity/app/calculator.goopenmeter/app/stripe/entity/app/invoice.goopenmeter/app/stripe/invoicesync/README.mdopenmeter/app/stripe/invoicesync/adapter.goopenmeter/app/stripe/invoicesync/adapter/adapter.goopenmeter/app/stripe/invoicesync/adapter/adapter_test.goopenmeter/app/stripe/invoicesync/app_noop_test.goopenmeter/app/stripe/invoicesync/currency.goopenmeter/app/stripe/invoicesync/currency_test.goopenmeter/app/stripe/invoicesync/events.goopenmeter/app/stripe/invoicesync/events_test.goopenmeter/app/stripe/invoicesync/executor.goopenmeter/app/stripe/invoicesync/executor_test.goopenmeter/app/stripe/invoicesync/handler.goopenmeter/app/stripe/invoicesync/handler_success_test.goopenmeter/app/stripe/invoicesync/handler_test.goopenmeter/app/stripe/invoicesync/planner.goopenmeter/app/stripe/invoicesync/planner_test.goopenmeter/app/stripe/invoicesync/secret_noop_test.goopenmeter/app/stripe/invoicesync/service.goopenmeter/app/stripe/invoicesync/service/service.goopenmeter/app/stripe/invoicesync/types.goopenmeter/app/stripe/invoicesync/types_test.goopenmeter/app/stripe/service/factory.goopenmeter/app/stripe/service/service.goopenmeter/billing/adapter.goopenmeter/billing/adapter/invoiceapp.goopenmeter/billing/errors.goopenmeter/billing/noop.goopenmeter/billing/service.goopenmeter/billing/service/invoiceapp.goopenmeter/billing/service/stdinvoicestate.goopenmeter/billing/stdinvoice.goopenmeter/ent/schema/app_stripe_invoice_sync.goopenmeter/ent/schema/billing.goopenmeter/server/server_test.goopenmeter/testutils/pg_driver.gopkg/framework/transaction/postcommit.gopkg/framework/transaction/transaction.gotest/app/stripe/invoice_test.gotest/app/stripe/invoicesync_test.gotest/app/stripe/testenv.gotools/migrate/migrations/20260329152423_app_stripe_invoice_sync.down.sqltools/migrate/migrations/20260329152423_app_stripe_invoice_sync.up.sql
💤 Files with no reviewable changes (1)
- openmeter/app/stripe/entity/app/calculator.go
| // Build a payload ~64 KiB. | ||
| large := bytes.Repeat([]byte("x"), 64*1024) | ||
| rawPayload, err := json.Marshal(invoicesync.InvoiceUpdatePayload{ | ||
| StripeInvoiceID: string(large[:32]), | ||
| }) | ||
| require.NoError(t, err) |
There was a problem hiding this comment.
This test never stores a large payload.
Only large[:32] is serialized into StripeInvoiceID, so the JSON written to Postgres is tiny. The test stays green even if 64 KiB payloads are truncated. I'd build the RawMessage from the full blob before calling CreateSyncPlan.
🧪 Suggested fix
// Build a payload ~64 KiB.
large := bytes.Repeat([]byte("x"), 64*1024)
- rawPayload, err := json.Marshal(invoicesync.InvoiceUpdatePayload{
- StripeInvoiceID: string(large[:32]),
- })
+ rawPayload, err := json.Marshal(map[string]string{
+ "blob": string(large),
+ })
require.NoError(t, err)As per coding guidelines, "**/*_test.go: Make sure the tests are comprehensive and cover the changes. Keep a strong focus on unit tests and in-code integration tests."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@openmeter/app/stripe/invoicesync/adapter/adapter_test.go` around lines 495 -
500, The test builds a 64KiB blob but only serializes the first 32 bytes into
StripeInvoiceID, so the DB never sees a large payload; update the test to
serialize the full blob into the payload passed to CreateSyncPlan (or construct
a json.RawMessage from the full large byte slice) so the created
RawMessage/InvoiceUpdatePayload contains the entire 64KiB data; specifically
change the construction around invoicesync.InvoiceUpdatePayload / rawPayload to
use string(large) or json.RawMessage(large) (and ensure the rawPayload variable
used by CreateSyncPlan is the full-blob RawMessage) so the test actually writes
a large payload.
| func FormatAmount(amount alpacadecimal.Decimal, currency string) (string, error) { | ||
| calc, err := currencyx.Code(currency).Calculator() | ||
| if err != nil { | ||
| return "", fmt.Errorf("invalid currency %q: %w", currency, err) | ||
| } | ||
|
|
||
| if amount.IsInteger() { | ||
| return calc.Def.FormatAmount(num.MakeAmount(amount.IntPart(), 0)), nil | ||
| } | ||
|
|
||
| am, exact := amount.Float64() | ||
| _ = exact // precision loss possible for display-only formatting; not used for monetary calculations | ||
| decimalPlaces := uint32(0) | ||
| if exp := amount.Exponent(); exp < 0 { | ||
| decimalPlaces = uint32(-exp) | ||
| } | ||
| return calc.Def.FormatAmount(num.AmountFromFloat64(am, decimalPlaces)), nil | ||
| } |
There was a problem hiding this comment.
Format from the rounded minor-unit amount.
For non-integer inputs this renders the raw decimal/exponent, while RoundToAmount rounds to Stripe subunits first. That lets the display string drift from what we actually bill — e.g. 1.235 USD can format as 1.235, but Stripe gets 124 cents, and zero-decimal currencies can show fractional amounts that Stripe can never charge. Reusing the rounded minor-unit path keeps descriptions aligned with the billed value.
💸 Suggested fix
func FormatAmount(amount alpacadecimal.Decimal, currency string) (string, error) {
calc, err := currencyx.Code(currency).Calculator()
if err != nil {
return "", fmt.Errorf("invalid currency %q: %w", currency, err)
}
-
- if amount.IsInteger() {
- return calc.Def.FormatAmount(num.MakeAmount(amount.IntPart(), 0)), nil
- }
-
- am, exact := amount.Float64()
- _ = exact // precision loss possible for display-only formatting; not used for monetary calculations
- decimalPlaces := uint32(0)
- if exp := amount.Exponent(); exp < 0 {
- decimalPlaces = uint32(-exp)
- }
- return calc.Def.FormatAmount(num.AmountFromFloat64(am, decimalPlaces)), nil
+ subunits := uint32(calc.Def.Subunits)
+ multiplier := alpacadecimal.NewFromInt(10).Pow(alpacadecimal.NewFromInt(int64(subunits)))
+ minorUnitAmount := amount.Mul(multiplier).Round(0).IntPart()
+
+ return calc.Def.FormatAmount(num.MakeAmount(minorUnitAmount, subunits)), nil
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func FormatAmount(amount alpacadecimal.Decimal, currency string) (string, error) { | |
| calc, err := currencyx.Code(currency).Calculator() | |
| if err != nil { | |
| return "", fmt.Errorf("invalid currency %q: %w", currency, err) | |
| } | |
| if amount.IsInteger() { | |
| return calc.Def.FormatAmount(num.MakeAmount(amount.IntPart(), 0)), nil | |
| } | |
| am, exact := amount.Float64() | |
| _ = exact // precision loss possible for display-only formatting; not used for monetary calculations | |
| decimalPlaces := uint32(0) | |
| if exp := amount.Exponent(); exp < 0 { | |
| decimalPlaces = uint32(-exp) | |
| } | |
| return calc.Def.FormatAmount(num.AmountFromFloat64(am, decimalPlaces)), nil | |
| } | |
| func FormatAmount(amount alpacadecimal.Decimal, currency string) (string, error) { | |
| calc, err := currencyx.Code(currency).Calculator() | |
| if err != nil { | |
| return "", fmt.Errorf("invalid currency %q: %w", currency, err) | |
| } | |
| subunits := uint32(calc.Def.Subunits) | |
| multiplier := alpacadecimal.NewFromInt(10).Pow(alpacadecimal.NewFromInt(int64(subunits))) | |
| minorUnitAmount := amount.Mul(multiplier).Round(0).IntPart() | |
| return calc.Def.FormatAmount(num.MakeAmount(minorUnitAmount, subunits)), nil | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@openmeter/app/stripe/invoicesync/currency.go` around lines 28 - 45,
FormatAmount currently formats the raw decimal which can drift from what Stripe
actually charges; instead, call the existing rounding helper (RoundToAmount) to
convert the incoming alpacadecimal.Decimal into the rounded minor-unit amount
used for billing, then feed that rounded value into the integer/minor-unit
formatting path (the same path used when amount.IsInteger()) via the currency
calculator from currencyx.Code(currency).Calculator() so display strings always
match billed subunits; update the non-integer branch in FormatAmount to use
RoundToAmount and then FormatAmount on the resulting minor-unit value.
| func (s *Service) CreateDraftSyncPlan(ctx context.Context, input invoicesync.CreateSyncPlanInput) error { | ||
| if err := s.cancelAllActivePlans(ctx, input.Invoice.Namespace, input.Invoice.ID); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| sessionID, ops, err := invoicesync.GenerateDraftSyncPlan(input.GeneratorInput) | ||
| if err != nil { | ||
| return fmt.Errorf("generating draft sync plan: %w", err) | ||
| } | ||
|
|
||
| return s.createAndPublish(ctx, input, invoicesync.SyncPlanPhaseDraft, sessionID, ops) | ||
| } | ||
|
|
||
| func (s *Service) CreateIssuingSyncPlan(ctx context.Context, input invoicesync.CreateSyncPlanInput) error { | ||
| if err := s.cancelAllActivePlans(ctx, input.Invoice.Namespace, input.Invoice.ID); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| sessionID, ops, err := invoicesync.GenerateIssuingSyncPlan(input.GeneratorInput) | ||
| if err != nil { | ||
| return fmt.Errorf("generating issuing sync plan: %w", err) | ||
| } | ||
|
|
||
| return s.createAndPublish(ctx, input, invoicesync.SyncPlanPhaseIssuing, sessionID, ops) | ||
| } | ||
|
|
||
| func (s *Service) CreateDeleteSyncPlan(ctx context.Context, input invoicesync.CreateSyncPlanInput) error { | ||
| if err := s.cancelAllActivePlans(ctx, input.Invoice.Namespace, input.Invoice.ID); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| sessionID, ops, err := invoicesync.GenerateDeleteSyncPlan(input.GeneratorInput) | ||
| if err != nil { | ||
| return fmt.Errorf("generating delete sync plan: %w", err) | ||
| } | ||
|
|
||
| // No-op if the invoice has no Stripe external ID. | ||
| if len(ops) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| return s.createAndPublish(ctx, input, invoicesync.SyncPlanPhaseDelete, sessionID, ops) |
There was a problem hiding this comment.
Plan creation can race an in-flight Stripe call.
cancelAllActivePlans runs without the same invoice-scoped lock that Handler.Handle uses. A concurrent delete can fail the active draft plan while InvoiceCreate is already in flight, then CreateDeleteSyncPlan sees no ExternalIDs.Invoicing and returns no ops, leaving the just-created Stripe invoice with no cleanup path.
Also applies to: 117-133
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@openmeter/app/stripe/invoicesync/service/service.go` around lines 56 - 97,
The cancelAllActivePlans call in CreateDraftSyncPlan, CreateIssuingSyncPlan and
CreateDeleteSyncPlan can race with Handler.Handle because it runs without the
same invoice-scoped lock; change each Create* method to acquire the same
invoice-scoped lock used by Handler.Handle (wrap cancelAllActivePlans, the
Generate* call and the subsequent createAndPublish within that lock), hold the
lock while checking ExternalIDs for deletes so the existence check and plan
creation are atomic, and ensure proper defer/unlock to avoid deadlocks.
| // Defer publish until after the outermost transaction commits. | ||
| // If we're not inside a transaction, OnCommit executes immediately. | ||
| transaction.OnCommit(ctx, func(ctx context.Context) { | ||
| if err := s.publisher.Publish(ctx, event); err != nil { | ||
| s.logger.ErrorContext(ctx, "failed to publish sync plan event; plan will be picked up by next sync", | ||
| "plan_id", plan.ID, | ||
| "invoice_id", input.Invoice.ID, | ||
| "phase", phase, | ||
| "error", err, | ||
| ) | ||
| } | ||
| }) |
There was a problem hiding this comment.
A publish miss here strands the persisted plan.
Once the plan row is committed, this callback only logs Publish failures. If the broker is temporarily unavailable and nothing else touches the invoice, the plan stays pending forever. This needs a retryable outbox or recovery path, not just a log line.
| // Update detailed line external IDs | ||
| for lineID, externalID := range in.LineExternalIDs { | ||
| _, err := tx.db.BillingStandardInvoiceDetailedLine.UpdateOneID(lineID). | ||
| SetInvoicingAppExternalID(externalID). | ||
| Save(ctx) | ||
| if err != nil { | ||
| if db.IsNotFound(err) { | ||
| // Line may not exist if invoice structure changed; skip | ||
| continue | ||
| } | ||
| return fmt.Errorf("updating line external ID [lineID=%s]: %w", lineID, err) | ||
| } | ||
| } | ||
|
|
||
| // Update discount external IDs | ||
| for discountID, externalID := range in.LineDiscountExternalIDs { | ||
| _, err := tx.db.BillingInvoiceLineDiscount.UpdateOneID(discountID). | ||
| SetInvoicingAppExternalID(externalID). | ||
| Save(ctx) | ||
| if err != nil { | ||
| if db.IsNotFound(err) { | ||
| continue | ||
| } | ||
| return fmt.Errorf("updating discount external ID [discountID=%s]: %w", discountID, err) | ||
| } |
There was a problem hiding this comment.
Scope child-row writes to the invoice being synced.
UpdateOneID(lineID) and UpdateOneID(discountID) trust the incoming IDs completely. If a stale or mismatched SyncExternalIDsInput ever carries a child ID from another invoice, this will mutate that other row instead of turning into a harmless skip. Please tie both updates back to in.Invoice ownership before saving.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@openmeter/billing/adapter/invoiceapp.go` around lines 35 - 59, The loops
update child rows by ID only, which can mutate rows belonging to other invoices;
restrict updates to children owned by the invoice in in.Invoice. For
BillingStandardInvoiceDetailedLine and BillingInvoiceLineDiscount (the
UpdateOneID calls on tx.db.BillingStandardInvoiceDetailedLine and
tx.db.BillingInvoiceLineDiscount), change the update builder to include a
predicate that the child’s parent invoice equals in.Invoice (e.g. use a
Where/Has... predicate or an Update().Where(... ID == lineID AND invoice_id ==
in.Invoice) variant) so that if the ID does not belong to the invoice the
operation becomes a no-op/skip rather than updating another invoice’s row; keep
the existing NotFound handling for skipped rows.
| // Initialize post-commit callback collection for the outermost transaction | ||
| if isOutermost { | ||
| ctx = initPostCommitCallbacks(ctx) | ||
| } | ||
|
|
||
| // Execute the callback and manage the transaction | ||
| return manage(ctx, tx, func(ctx context.Context, tx Driver) (R, error) { | ||
| result, err := manage(ctx, tx, func(ctx context.Context, tx Driver) (R, error) { | ||
| return cb(ctx) | ||
| }) | ||
| if err != nil { | ||
| return def, err | ||
| } | ||
|
|
||
| // Run post-commit callbacks after the outermost transaction commits | ||
| if isOutermost { | ||
| runPostCommitCallbacks(ctx) | ||
| } |
There was a problem hiding this comment.
Run post-commit callbacks on a clean context.
ctx still carries the driver from Line 42 and the post-commit queue marker, so callback code that goes back through transaction-aware services can accidentally reuse a committed transaction. It also means a nested OnCommit inside a callback just appends to a queue that’s already being drained and never runs. Please invoke post-commit work with a tx-free, callback-free context here.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/framework/transaction/transaction.go` around lines 49 - 65, The
post-commit callbacks are being invoked with the same ctx that still contains
the transaction Driver and the post-commit queue marker, which can cause
callback code to reuse the committed tx or append to the draining queue; fix by
creating a tx-free, callback-free context before calling runPostCommitCallbacks
(e.g. add a helper clearTransactionContext(ctx) that removes the transaction key
and the post-commit-queue marker from context and return that cleaned context),
then call runPostCommitCallbacks(cleanCtx) instead of
runPostCommitCallbacks(ctx); reference initPostCommitCallbacks,
runPostCommitCallbacks, manage, and Driver to locate where to clear the context
and ensure nested OnCommit inside callbacks uses a fresh queue.
99b4c9b to
f13af6c
Compare
d1463b8 to
06e8640
Compare
06e8640 to
2876ac7
Compare
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
670aaed to
f30b268
Compare
Summary
Replaces the inline, synchronous Stripe invoice sync logic with a persistent, plan-based execution model. Sync operations are persisted as an ordered plan in the database and executed asynchronously with checkpointing, crash recovery, and idempotency.
SyncExternalIDsfor incremental Stripe ID sync-back, standard invoice state helpers, noop billing service for tests, and post-commit transaction hooksinvoicesyncpackage (openmeter/app/stripe/invoicesync/): Implements the full plan lifecycle — planner generates ordered operations, executor calls Stripe API with advisory locking, handler orchestrates async execution via events, adapter persists plans/ops to Postgres via Ententity/app/invoice.go, update DI across billing-worker/server/jobs entry pointsPlan lifecycle
Each sync plan maps to a billing invoice state machine phase (
draft,issuing,delete). Plans transition throughpending → executing → completed/failed, with individual operations tracked and checkpointed independently.Key design decisions
New DB tables
app_stripe_invoice_sync_plans— tracks sync plan lifecycle per invoiceapp_stripe_invoice_sync_ops— tracks individual operations within a planTest plan
test/app/stripe/invoicesync_test.goTestStripeInvoicingtests updated for the new async pathmake testpassesmake lintpasses🤖 Generated with Claude Code
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes