-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[Part 6|*] Thread Contexts through payment methods Part 2 #10308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: elle-payment-sql-series-new
Are you sure you want to change the base?
Changes from all commits
ae9c31f
d11b762
91fd00a
ea33726
abcd834
6aefe5b
b60e698
aae5a33
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -128,7 +128,7 @@ const ( | |
| // results is sent back. then process its result here. When there's no need to | ||
| // wait for results, the method will exit with `stepExit` such that the payment | ||
| // lifecycle loop will terminate. | ||
| func (p *paymentLifecycle) decideNextStep( | ||
| func (p *paymentLifecycle) decideNextStep(ctx context.Context, | ||
| payment paymentsdb.DBMPPayment) (stateStep, error) { | ||
|
|
||
| // Check whether we could make new HTLC attempts. | ||
|
|
@@ -168,7 +168,7 @@ func (p *paymentLifecycle) decideNextStep( | |
| // stepSkip and move to the next lifecycle iteration, which will | ||
| // refresh the payment and wait for the next attempt result, if | ||
| // any. | ||
| _, err := p.handleAttemptResult(r.attempt, r.result) | ||
| _, err := p.handleAttemptResult(ctx, r.attempt, r.result) | ||
|
|
||
| // We would only get a DB-related error here, which will cause | ||
| // us to abort the payment flow. | ||
|
|
@@ -192,6 +192,13 @@ func (p *paymentLifecycle) resumePayment(ctx context.Context) ([32]byte, | |
|
|
||
| // We need to make sure we can still do db operations after the context | ||
| // is cancelled. | ||
| // | ||
| // TODO(ziggie): This is a workaround to avoid a greater refactor of the | ||
| // payment lifecycle. We can currently not rely on the parent context | ||
| // because this method is also collecting the results of inflight HTLCs | ||
| // after the context is cancelled. So we need to make sure we only use | ||
| // the current context to stop creating new attempts but use this | ||
| // cleanupCtx to do all the db operations. | ||
| cleanupCtx := context.WithoutCancel(ctx) | ||
|
|
||
| // When the payment lifecycle loop exits, we make sure to signal any | ||
|
|
@@ -202,7 +209,7 @@ func (p *paymentLifecycle) resumePayment(ctx context.Context) ([32]byte, | |
| // If we had any existing attempts outstanding, we'll start by spinning | ||
| // up goroutines that'll collect their results and deliver them to the | ||
| // lifecycle loop below. | ||
| payment, err := p.reloadInflightAttempts() | ||
| payment, err := p.reloadInflightAttempts(ctx) | ||
| if err != nil { | ||
| return [32]byte{}, nil, err | ||
| } | ||
|
|
@@ -243,7 +250,7 @@ lifecycle: | |
| } | ||
|
|
||
| // We update the payment state on every iteration. | ||
| currentPayment, ps, err := p.reloadPayment() | ||
| currentPayment, ps, err := p.reloadPayment(cleanupCtx) | ||
| if err != nil { | ||
| return exitWithErr(err) | ||
| } | ||
|
|
@@ -264,7 +271,7 @@ lifecycle: | |
| // | ||
|
|
||
| // Now decide the next step of the current lifecycle. | ||
| step, err := p.decideNextStep(payment) | ||
| step, err := p.decideNextStep(cleanupCtx, payment) | ||
| if err != nil { | ||
| return exitWithErr(err) | ||
| } | ||
|
|
@@ -288,7 +295,7 @@ lifecycle: | |
| } | ||
|
|
||
| // Now request a route to be used to create our HTLC attempt. | ||
| rt, err := p.requestRoute(ps) | ||
| rt, err := p.requestRoute(cleanupCtx, ps) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We still have this downside such that, when the user cancels the operation, the path finding won't be stopped, but this is already a pre-existing issue, tho this will likely come up again when adding ctx to
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agree, but luckily we are only talking probably of 1 sec waisted time and as described before we cannot fail this call because we depend on this for loop to collect potential inflight attempts for this payment. |
||
| if err != nil { | ||
| return exitWithErr(err) | ||
| } | ||
|
|
@@ -307,13 +314,15 @@ lifecycle: | |
| log.Tracef("Found route: %s", lnutils.SpewLogClosure(rt.Hops)) | ||
|
|
||
| // We found a route to try, create a new HTLC attempt to try. | ||
| attempt, err := p.registerAttempt(rt, ps.RemainingAmt) | ||
| attempt, err := p.registerAttempt( | ||
| cleanupCtx, rt, ps.RemainingAmt, | ||
| ) | ||
| if err != nil { | ||
| return exitWithErr(err) | ||
| } | ||
|
|
||
| // Once the attempt is created, send it to the htlcswitch. | ||
| result, err := p.sendAttempt(attempt) | ||
| result, err := p.sendAttempt(cleanupCtx, attempt) | ||
| if err != nil { | ||
| return exitWithErr(err) | ||
| } | ||
|
|
@@ -399,11 +408,9 @@ func (p *paymentLifecycle) checkContext(ctx context.Context) error { | |
|
|
||
| // requestRoute is responsible for finding a route to be used to create an HTLC | ||
| // attempt. | ||
| func (p *paymentLifecycle) requestRoute( | ||
| func (p *paymentLifecycle) requestRoute(ctx context.Context, | ||
| ps *paymentsdb.MPPaymentState) (*route.Route, error) { | ||
|
|
||
| ctx := context.TODO() | ||
|
|
||
| remainingFees := p.calcFeeBudget(ps.FeesPaid) | ||
|
|
||
| // Query our payment session to construct a route. | ||
|
|
@@ -598,11 +605,9 @@ func (p *paymentLifecycle) collectResult( | |
| // registerAttempt is responsible for creating and saving an HTLC attempt in db | ||
| // by using the route info provided. The `remainingAmt` is used to decide | ||
| // whether this is the last attempt. | ||
| func (p *paymentLifecycle) registerAttempt(rt *route.Route, | ||
| func (p *paymentLifecycle) registerAttempt(ctx context.Context, rt *route.Route, | ||
| remainingAmt lnwire.MilliSatoshi) (*paymentsdb.HTLCAttempt, error) { | ||
|
|
||
| ctx := context.TODO() | ||
|
|
||
| // If this route will consume the last remaining amount to send | ||
| // to the receiver, this will be our last shard (for now). | ||
| isLastAttempt := rt.ReceiverAmt() == remainingAmt | ||
|
|
@@ -676,7 +681,7 @@ func (p *paymentLifecycle) createNewPaymentAttempt(rt *route.Route, | |
| // sendAttempt attempts to send the current attempt to the switch to complete | ||
| // the payment. If this attempt fails, then we'll continue on to the next | ||
| // available route. | ||
| func (p *paymentLifecycle) sendAttempt( | ||
| func (p *paymentLifecycle) sendAttempt(ctx context.Context, | ||
| attempt *paymentsdb.HTLCAttempt) (*attemptResult, error) { | ||
|
|
||
| log.Debugf("Sending HTLC attempt(id=%v, total_amt=%v, first_hop_amt=%d"+ | ||
|
|
@@ -708,7 +713,7 @@ func (p *paymentLifecycle) sendAttempt( | |
| "payment=%v, err:%v", attempt.AttemptID, | ||
| p.identifier, err) | ||
|
|
||
| return p.failAttempt(attempt.AttemptID, err) | ||
| return p.failAttempt(ctx, attempt.AttemptID, err) | ||
| } | ||
|
|
||
| htlcAdd.OnionBlob = onionBlob | ||
|
|
@@ -722,7 +727,7 @@ func (p *paymentLifecycle) sendAttempt( | |
| log.Errorf("Failed sending attempt %d for payment %v to "+ | ||
| "switch: %v", attempt.AttemptID, p.identifier, err) | ||
|
|
||
| return p.handleSwitchErr(attempt, err) | ||
| return p.handleSwitchErr(ctx, attempt, err) | ||
| } | ||
|
|
||
| log.Debugf("Attempt %v for payment %v successfully sent to switch, "+ | ||
|
|
@@ -813,12 +818,10 @@ func (p *paymentLifecycle) amendFirstHopData(rt *route.Route) error { | |
|
|
||
| // failAttemptAndPayment fails both the payment and its attempt via the | ||
| // router's control tower, which marks the payment as failed in db. | ||
| func (p *paymentLifecycle) failPaymentAndAttempt( | ||
| func (p *paymentLifecycle) failPaymentAndAttempt(ctx context.Context, | ||
| attemptID uint64, reason *paymentsdb.FailureReason, | ||
| sendErr error) (*attemptResult, error) { | ||
|
|
||
| ctx := context.TODO() | ||
|
|
||
| log.Errorf("Payment %v failed: final_outcome=%v, raw_err=%v", | ||
| p.identifier, *reason, sendErr) | ||
|
|
||
|
|
@@ -836,7 +839,7 @@ func (p *paymentLifecycle) failPaymentAndAttempt( | |
| } | ||
|
|
||
| // Fail the attempt. | ||
| return p.failAttempt(attemptID, sendErr) | ||
| return p.failAttempt(ctx, attemptID, sendErr) | ||
| } | ||
|
|
||
| // handleSwitchErr inspects the given error from the Switch and determines | ||
|
|
@@ -847,7 +850,8 @@ func (p *paymentLifecycle) failPaymentAndAttempt( | |
| // the error type, the error is either the final outcome of the payment or we | ||
| // need to continue with an alternative route. A final outcome is indicated by | ||
| // a non-nil reason value. | ||
| func (p *paymentLifecycle) handleSwitchErr(attempt *paymentsdb.HTLCAttempt, | ||
| func (p *paymentLifecycle) handleSwitchErr(ctx context.Context, | ||
| attempt *paymentsdb.HTLCAttempt, | ||
| sendErr error) (*attemptResult, error) { | ||
|
|
||
| internalErrorReason := paymentsdb.FailureReasonError | ||
|
|
@@ -874,11 +878,11 @@ func (p *paymentLifecycle) handleSwitchErr(attempt *paymentsdb.HTLCAttempt, | |
| // Fail the attempt only if there's no reason. | ||
| if reason == nil { | ||
| // Fail the attempt. | ||
| return p.failAttempt(attemptID, sendErr) | ||
| return p.failAttempt(ctx, attemptID, sendErr) | ||
| } | ||
|
|
||
| // Otherwise fail both the payment and the attempt. | ||
| return p.failPaymentAndAttempt(attemptID, reason, sendErr) | ||
| return p.failPaymentAndAttempt(ctx, attemptID, reason, sendErr) | ||
| } | ||
|
|
||
| // If this attempt ID is unknown to the Switch, it means it was never | ||
|
|
@@ -889,7 +893,7 @@ func (p *paymentLifecycle) handleSwitchErr(attempt *paymentsdb.HTLCAttempt, | |
| log.Warnf("Failing attempt=%v for payment=%v as it's not "+ | ||
| "found in the Switch", attempt.AttemptID, p.identifier) | ||
|
|
||
| return p.failAttempt(attemptID, sendErr) | ||
| return p.failAttempt(ctx, attemptID, sendErr) | ||
| } | ||
|
|
||
| if errors.Is(sendErr, htlcswitch.ErrUnreadableFailureMessage) { | ||
|
|
@@ -911,7 +915,7 @@ func (p *paymentLifecycle) handleSwitchErr(attempt *paymentsdb.HTLCAttempt, | |
| ok := errors.As(sendErr, &rtErr) | ||
| if !ok { | ||
| return p.failPaymentAndAttempt( | ||
| attemptID, &internalErrorReason, sendErr, | ||
| ctx, attemptID, &internalErrorReason, sendErr, | ||
| ) | ||
| } | ||
|
|
||
|
|
@@ -937,7 +941,7 @@ func (p *paymentLifecycle) handleSwitchErr(attempt *paymentsdb.HTLCAttempt, | |
| ) | ||
| if err != nil { | ||
| return p.failPaymentAndAttempt( | ||
| attemptID, &internalErrorReason, sendErr, | ||
| ctx, attemptID, &internalErrorReason, sendErr, | ||
| ) | ||
| } | ||
|
|
||
|
|
@@ -1021,11 +1025,9 @@ func (p *paymentLifecycle) handleFailureMessage(rt *route.Route, | |
| } | ||
|
|
||
| // failAttempt calls control tower to fail the current payment attempt. | ||
| func (p *paymentLifecycle) failAttempt(attemptID uint64, | ||
| func (p *paymentLifecycle) failAttempt(ctx context.Context, attemptID uint64, | ||
| sendError error) (*attemptResult, error) { | ||
|
|
||
| ctx := context.TODO() | ||
|
|
||
| log.Warnf("Attempt %v for payment %v failed: %v", attemptID, | ||
| p.identifier, sendError) | ||
|
|
||
|
|
@@ -1136,10 +1138,8 @@ func (p *paymentLifecycle) patchLegacyPaymentHash( | |
| // reloadInflightAttempts is called when the payment lifecycle is resumed after | ||
| // a restart. It reloads all inflight attempts from the control tower and | ||
| // collects the results of the attempts that have been sent before. | ||
| func (p *paymentLifecycle) reloadInflightAttempts() (paymentsdb.DBMPPayment, | ||
| error) { | ||
|
|
||
| ctx := context.TODO() | ||
| func (p *paymentLifecycle) reloadInflightAttempts( | ||
| ctx context.Context) (paymentsdb.DBMPPayment, error) { | ||
|
|
||
| payment, err := p.router.cfg.Control.FetchPayment(ctx, p.identifier) | ||
| if err != nil { | ||
|
|
@@ -1163,11 +1163,10 @@ func (p *paymentLifecycle) reloadInflightAttempts() (paymentsdb.DBMPPayment, | |
| } | ||
|
|
||
| // reloadPayment returns the latest payment found in the db (control tower). | ||
| func (p *paymentLifecycle) reloadPayment() (paymentsdb.DBMPPayment, | ||
| func (p *paymentLifecycle) reloadPayment( | ||
| ctx context.Context) (paymentsdb.DBMPPayment, | ||
| *paymentsdb.MPPaymentState, error) { | ||
|
|
||
| ctx := context.TODO() | ||
|
|
||
| // Read the db to get the latest state of the payment. | ||
| payment, err := p.router.cfg.Control.FetchPayment(ctx, p.identifier) | ||
| if err != nil { | ||
|
|
@@ -1186,15 +1185,14 @@ func (p *paymentLifecycle) reloadPayment() (paymentsdb.DBMPPayment, | |
|
|
||
| // handleAttemptResult processes the result of an HTLC attempt returned from | ||
| // the htlcswitch. | ||
| func (p *paymentLifecycle) handleAttemptResult(attempt *paymentsdb.HTLCAttempt, | ||
| func (p *paymentLifecycle) handleAttemptResult(ctx context.Context, | ||
| attempt *paymentsdb.HTLCAttempt, | ||
| result *htlcswitch.PaymentResult) (*attemptResult, error) { | ||
|
|
||
| ctx := context.TODO() | ||
|
|
||
| // If the result has an error, we need to further process it by failing | ||
| // the attempt and maybe fail the payment. | ||
| if result.Error != nil { | ||
| return p.handleSwitchErr(attempt, result.Error) | ||
| return p.handleSwitchErr(ctx, attempt, result.Error) | ||
| } | ||
|
|
||
| // We got an attempt settled result back from the switch. | ||
|
|
@@ -1237,13 +1235,13 @@ func (p *paymentLifecycle) handleAttemptResult(attempt *paymentsdb.HTLCAttempt, | |
| // available from the Switch, then records the attempt outcome with the control | ||
| // tower. An attemptResult is returned, indicating the final outcome of this | ||
| // HTLC attempt. | ||
| func (p *paymentLifecycle) collectAndHandleResult( | ||
| func (p *paymentLifecycle) collectAndHandleResult(ctx context.Context, | ||
| attempt *paymentsdb.HTLCAttempt) (*attemptResult, error) { | ||
|
|
||
| result, err := p.collectResult(attempt) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| return p.handleAttemptResult(attempt, result) | ||
| return p.handleAttemptResult(ctx, attempt, result) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to
reloadInflightAttempts, we should also usectxhere to cancel the read operation?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no we can't use the ctx context here, because we rely on this function recollect all INFLIGHT results, if we do use the other context we will basically abort as soon as we timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to describe it in the todo where I introduced the new context: