|
1 | 1 | package db |
2 | 2 |
|
3 | | -import ( |
4 | | - "context" |
5 | | - "database/sql" |
6 | | - "math" |
7 | | - prand "math/rand" |
8 | | - "time" |
9 | | - |
10 | | - "github.com/lightninglabs/lightning-terminal/db/sqlc" |
11 | | - "github.com/lightningnetwork/lnd/sqldb/v2" |
12 | | -) |
13 | | - |
14 | | -var ( |
15 | | - // DefaultStoreTimeout is the default timeout used for any interaction |
16 | | - // with the storage/database. |
17 | | - DefaultStoreTimeout = time.Second * 10 |
18 | | -) |
19 | | - |
20 | | -const ( |
21 | | - // DefaultNumTxRetries is the default number of times we'll retry a |
22 | | - // transaction if it fails with an error that permits transaction |
23 | | - // repetition. |
24 | | - DefaultNumTxRetries = 10 |
25 | | - |
26 | | - // DefaultInitialRetryDelay is the default initial delay between |
27 | | - // retries. This will be used to generate a random delay between -50% |
28 | | - // and +50% of this value, so 20 to 60 milliseconds. The retry will be |
29 | | - // doubled after each attempt until we reach DefaultMaxRetryDelay. We |
30 | | - // start with a random value to avoid multiple goroutines that are |
31 | | - // created at the same time to effectively retry at the same time. |
32 | | - DefaultInitialRetryDelay = time.Millisecond * 40 |
33 | | - |
34 | | - // DefaultMaxRetryDelay is the default maximum delay between retries. |
35 | | - DefaultMaxRetryDelay = time.Second * 3 |
36 | | -) |
37 | | - |
38 | | -// TxOptions represents a set of options one can use to control what type of |
39 | | -// database transaction is created. Transaction can wither be read or write. |
40 | | -type TxOptions interface { |
41 | | - // ReadOnly returns true if the transaction should be read only. |
42 | | - ReadOnly() bool |
43 | | -} |
44 | | - |
45 | | -// BatchedTx is a generic interface that represents the ability to execute |
46 | | -// several operations to a given storage interface in a single atomic |
47 | | -// transaction. Typically, Q here will be some subset of the main sqlc.Querier |
48 | | -// interface allowing it to only depend on the routines it needs to implement |
49 | | -// any additional business logic. |
50 | | -type BatchedTx[Q any] interface { |
51 | | - // ExecTx will execute the passed txBody, operating upon generic |
52 | | - // parameter Q (usually a storage interface) in a single transaction. |
53 | | - // The set of TxOptions are passed in in order to allow the caller to |
54 | | - // specify if a transaction should be read-only and optionally what |
55 | | - // type of concurrency control should be used. |
56 | | - ExecTx(ctx context.Context, txOptions TxOptions, |
57 | | - txBody func(Q) error) error |
58 | | - |
59 | | - // Backend returns the type of the database backend used. |
60 | | - Backend() sqldb.BackendType |
61 | | -} |
62 | | - |
63 | | -// Tx represents a database transaction that can be committed or rolled back. |
64 | | -type Tx interface { |
65 | | - // Commit commits the database transaction, an error should be returned |
66 | | - // if the commit isn't possible. |
67 | | - Commit() error |
68 | | - |
69 | | - // Rollback rolls back an incomplete database transaction. |
70 | | - // Transactions that were able to be committed can still call this as a |
71 | | - // noop. |
72 | | - Rollback() error |
73 | | -} |
74 | | - |
75 | | -// QueryCreator is a generic function that's used to create a Querier, which is |
76 | | -// a type of interface that implements storage related methods from a database |
77 | | -// transaction. This will be used to instantiate an object callers can use to |
78 | | -// apply multiple modifications to an object interface in a single atomic |
79 | | -// transaction. |
80 | | -type QueryCreator[Q any] func(*sql.Tx) Q |
81 | | - |
82 | | -// BatchedQuerier is a generic interface that allows callers to create a new |
83 | | -// database transaction based on an abstract type that implements the TxOptions |
84 | | -// interface. |
85 | | -type BatchedQuerier interface { |
86 | | - // Querier is the underlying query source, this is in place so we can |
87 | | - // pass a BatchedQuerier implementation directly into objects that |
88 | | - // create a batched version of the normal methods they need. |
89 | | - sqlc.Querier |
90 | | - |
91 | | - // CustomQueries is the set of custom queries that we have manually |
92 | | - // defined in addition to the ones generated by sqlc. |
93 | | - sqlc.CustomQueries |
94 | | - |
95 | | - // BeginTx creates a new database transaction given the set of |
96 | | - // transaction options. |
97 | | - BeginTx(ctx context.Context, options TxOptions) (*sql.Tx, error) |
98 | | -} |
99 | | - |
100 | | -// txExecutorOptions is a struct that holds the options for the transaction |
101 | | -// executor. This can be used to do things like retry a transaction due to an |
102 | | -// error a certain amount of times. |
103 | | -type txExecutorOptions struct { |
104 | | - numRetries int |
105 | | - initialRetryDelay time.Duration |
106 | | - maxRetryDelay time.Duration |
107 | | -} |
108 | | - |
109 | | -// defaultTxExecutorOptions returns the default options for the transaction |
110 | | -// executor. |
111 | | -func defaultTxExecutorOptions() *txExecutorOptions { |
112 | | - return &txExecutorOptions{ |
113 | | - numRetries: DefaultNumTxRetries, |
114 | | - initialRetryDelay: DefaultInitialRetryDelay, |
115 | | - maxRetryDelay: DefaultMaxRetryDelay, |
116 | | - } |
117 | | -} |
118 | | - |
119 | | -// randRetryDelay returns a random retry delay between -50% and +50% |
120 | | -// of the configured delay that is doubled for each attempt and capped at a max |
121 | | -// value. |
122 | | -func (t *txExecutorOptions) randRetryDelay(attempt int) time.Duration { |
123 | | - halfDelay := t.initialRetryDelay / 2 |
124 | | - randDelay := prand.Int63n(int64(t.initialRetryDelay)) //nolint:gosec |
125 | | - |
126 | | - // 50% plus 0%-100% gives us the range of 50%-150%. |
127 | | - initialDelay := halfDelay + time.Duration(randDelay) |
128 | | - |
129 | | - // If this is the first attempt, we just return the initial delay. |
130 | | - if attempt == 0 { |
131 | | - return initialDelay |
132 | | - } |
133 | | - |
134 | | - // For each subsequent delay, we double the initial delay. This still |
135 | | - // gives us a somewhat random delay, but it still increases with each |
136 | | - // attempt. If we double something n times, that's the same as |
137 | | - // multiplying the value with 2^n. We limit the power to 32 to avoid |
138 | | - // overflows. |
139 | | - factor := time.Duration(math.Pow(2, math.Min(float64(attempt), 32))) |
140 | | - actualDelay := initialDelay * factor |
141 | | - |
142 | | - // Cap the delay at the maximum configured value. |
143 | | - if actualDelay > t.maxRetryDelay { |
144 | | - return t.maxRetryDelay |
145 | | - } |
146 | | - |
147 | | - return actualDelay |
148 | | -} |
149 | | - |
150 | | -// TxExecutorOption is a functional option that allows us to pass in optional |
151 | | -// argument when creating the executor. |
152 | | -type TxExecutorOption func(*txExecutorOptions) |
153 | | - |
154 | | -// WithTxRetries is a functional option that allows us to specify the number of |
155 | | -// times a transaction should be retried if it fails with a repeatable error. |
156 | | -func WithTxRetries(numRetries int) TxExecutorOption { |
157 | | - return func(o *txExecutorOptions) { |
158 | | - o.numRetries = numRetries |
159 | | - } |
160 | | -} |
161 | | - |
162 | | -// WithTxRetryDelay is a functional option that allows us to specify the delay |
163 | | -// to wait before a transaction is retried. |
164 | | -func WithTxRetryDelay(delay time.Duration) TxExecutorOption { |
165 | | - return func(o *txExecutorOptions) { |
166 | | - o.initialRetryDelay = delay |
167 | | - } |
168 | | -} |
169 | | - |
170 | | -// TransactionExecutor is a generic struct that abstracts away from the type of |
171 | | -// query a type needs to run under a database transaction, and also the set of |
172 | | -// options for that transaction. The QueryCreator is used to create a query |
173 | | -// given a database transaction created by the BatchedQuerier. |
174 | | -type TransactionExecutor[Query any] struct { |
175 | | - BatchedQuerier |
176 | | - |
177 | | - createQuery QueryCreator[Query] |
178 | | - |
179 | | - opts *txExecutorOptions |
180 | | -} |
181 | | - |
182 | | -// NewTransactionExecutor creates a new instance of a TransactionExecutor given |
183 | | -// a Querier query object and a concrete type for the type of transactions the |
184 | | -// Querier understands. |
185 | | -func NewTransactionExecutor[Querier any](db BatchedQuerier, |
186 | | - createQuery QueryCreator[Querier], |
187 | | - opts ...TxExecutorOption) *TransactionExecutor[Querier] { |
188 | | - |
189 | | - txOpts := defaultTxExecutorOptions() |
190 | | - for _, optFunc := range opts { |
191 | | - optFunc(txOpts) |
192 | | - } |
193 | | - |
194 | | - return &TransactionExecutor[Querier]{ |
195 | | - BatchedQuerier: db, |
196 | | - createQuery: createQuery, |
197 | | - opts: txOpts, |
198 | | - } |
199 | | -} |
200 | | - |
201 | | -// ExecTx is a wrapper for txBody to abstract the creation and commit of a db |
202 | | -// transaction. The db transaction is embedded in a `*Queries` that txBody |
203 | | -// needs to use when executing each one of the queries that need to be applied |
204 | | -// atomically. This can be used by other storage interfaces to parameterize the |
205 | | -// type of query and options run, in order to have access to batched operations |
206 | | -// related to a storage object. |
207 | | -func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context, |
208 | | - txOptions TxOptions, txBody func(Q) error) error { |
209 | | - |
210 | | - waitBeforeRetry := func(attemptNumber int) { |
211 | | - retryDelay := t.opts.randRetryDelay(attemptNumber) |
212 | | - |
213 | | - log.Tracef("Retrying transaction due to tx serialization or "+ |
214 | | - "deadlock error, attempt_number=%v, delay=%v", |
215 | | - attemptNumber, retryDelay) |
216 | | - |
217 | | - // Before we try again, we'll wait with a random backoff based |
218 | | - // on the retry delay. |
219 | | - time.Sleep(retryDelay) |
220 | | - } |
221 | | - |
222 | | - for i := 0; i < t.opts.numRetries; i++ { |
223 | | - // Create the db transaction. |
224 | | - tx, err := t.BatchedQuerier.BeginTx(ctx, txOptions) |
225 | | - if err != nil { |
226 | | - dbErr := MapSQLError(err) |
227 | | - if IsSerializationOrDeadlockError(dbErr) { |
228 | | - // Nothing to roll back here, since we didn't |
229 | | - // even get a transaction yet. |
230 | | - waitBeforeRetry(i) |
231 | | - continue |
232 | | - } |
233 | | - |
234 | | - return dbErr |
235 | | - } |
236 | | - |
237 | | - // Rollback is safe to call even if the tx is already closed, |
238 | | - // so if the tx commits successfully, this is a no-op. |
239 | | - defer func() { |
240 | | - _ = tx.Rollback() |
241 | | - }() |
242 | | - |
243 | | - if err := txBody(t.createQuery(tx)); err != nil { |
244 | | - dbErr := MapSQLError(err) |
245 | | - if IsSerializationOrDeadlockError(dbErr) { |
246 | | - // Roll back the transaction, then pop back up |
247 | | - // to try once again. |
248 | | - _ = tx.Rollback() |
249 | | - |
250 | | - waitBeforeRetry(i) |
251 | | - continue |
252 | | - } |
253 | | - |
254 | | - return dbErr |
255 | | - } |
256 | | - |
257 | | - // Commit transaction. |
258 | | - if err = tx.Commit(); err != nil { |
259 | | - dbErr := MapSQLError(err) |
260 | | - if IsSerializationOrDeadlockError(dbErr) { |
261 | | - // Roll back the transaction, then pop back up |
262 | | - // to try once again. |
263 | | - _ = tx.Rollback() |
264 | | - |
265 | | - waitBeforeRetry(i) |
266 | | - continue |
267 | | - } |
268 | | - |
269 | | - return dbErr |
270 | | - } |
271 | | - |
272 | | - return nil |
273 | | - } |
274 | | - |
275 | | - // If we get to this point, then we weren't able to successfully commit |
276 | | - // a tx given the max number of retries. |
277 | | - return ErrRetriesExceeded |
278 | | -} |
279 | | - |
280 | | -// Backend returns the type of the database backend used. |
281 | | -func (t *TransactionExecutor[Q]) Backend() sqldb.BackendType { |
282 | | - return t.BatchedQuerier.Backend() |
283 | | -} |
284 | | - |
285 | | -// BaseDB is the base database struct that each implementation can embed to |
286 | | -// gain some common functionality. |
287 | | -type BaseDB struct { |
288 | | - *sql.DB |
289 | | - |
290 | | - *sqlc.Queries |
291 | | -} |
292 | | - |
293 | | -// BeginTx wraps the normal sql specific BeginTx method with the TxOptions |
294 | | -// interface. This interface is then mapped to the concrete sql tx options |
295 | | -// struct. |
296 | | -func (s *BaseDB) BeginTx(ctx context.Context, opts TxOptions) (*sql.Tx, error) { |
297 | | - sqlOptions := sql.TxOptions{ |
298 | | - ReadOnly: opts.ReadOnly(), |
299 | | - Isolation: sql.LevelSerializable, |
300 | | - } |
301 | | - return s.DB.BeginTx(ctx, &sqlOptions) |
302 | | -} |
303 | | - |
304 | | -// Backend returns the type of the database backend used. |
305 | | -func (s *BaseDB) Backend() sqldb.BackendType { |
306 | | - return s.Queries.Backend() |
307 | | -} |
308 | | - |
309 | 3 | // QueriesTxOptions defines the set of db txn options the SQLQueries |
310 | 4 | // understands. |
311 | 5 | type QueriesTxOptions struct { |
|
0 commit comments