Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.

Commit da10f70

Browse files
committed
swap, swap/chain: add transaction queue
1 parent 090b81e commit da10f70

File tree

12 files changed

+1169
-175
lines changed

12 files changed

+1169
-175
lines changed

swap/cashout.go

Lines changed: 90 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,9 @@ import (
3030
// CashChequeBeneficiaryTransactionCost is the expected gas cost of a CashChequeBeneficiary transaction
3131
const CashChequeBeneficiaryTransactionCost = 50000
3232

33-
// CashoutProcessor holds all relevant fields needed for processing cashouts
34-
type CashoutProcessor struct {
35-
backend chain.Backend // ethereum backend to use
36-
privateKey *ecdsa.PrivateKey // private key to use
33+
var CashoutRequestTypeID = chain.TransactionRequestTypeID{
34+
Handler: "cashout",
35+
RequestType: "CashoutRequest",
3736
}
3837

3938
// CashoutRequest represents a request for a cashout operation
@@ -42,42 +41,106 @@ type CashoutRequest struct {
4241
Destination common.Address // destination for the payout
4342
}
4443

45-
// ActiveCashout stores the necessary information for a cashout in progess
46-
type ActiveCashout struct {
47-
Request CashoutRequest // the request that caused this cashout
48-
TransactionHash common.Hash // the hash of the current transaction for this request
44+
// CashoutProcessor holds all relevant fields needed for processing cashouts
45+
type CashoutProcessor struct {
46+
backend chain.Backend // ethereum backend to use
47+
txScheduler chain.TxScheduler // transaction queue to use
48+
cashoutDone chan *CashoutRequest
4949
}
5050

5151
// newCashoutProcessor creates a new instance of CashoutProcessor
52-
func newCashoutProcessor(backend chain.Backend, privateKey *ecdsa.PrivateKey) *CashoutProcessor {
53-
return &CashoutProcessor{
54-
backend: backend,
55-
privateKey: privateKey,
52+
func newCashoutProcessor(txScheduler chain.TxScheduler, backend chain.Backend, privateKey *ecdsa.PrivateKey) *CashoutProcessor {
53+
c := &CashoutProcessor{
54+
backend: backend,
55+
txScheduler: txScheduler,
5656
}
57+
58+
txScheduler.SetHandlers(CashoutRequestTypeID, &chain.TransactionRequestHandlers{
59+
Send: func(id uint64, backend chain.Backend, opts *bind.TransactOpts) (common.Hash, error) {
60+
var request CashoutRequest
61+
if err := c.txScheduler.GetRequest(id, &request); err != nil {
62+
return common.Hash{}, err
63+
}
64+
65+
cheque := request.Cheque
66+
67+
otherSwap, err := contract.InstanceAt(cheque.Contract, backend)
68+
if err != nil {
69+
return common.Hash{}, err
70+
}
71+
72+
tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature)
73+
if err != nil {
74+
return common.Hash{}, err
75+
}
76+
return tx.Hash(), nil
77+
},
78+
NotifyReceipt: func(id uint64, notification *chain.TransactionReceiptNotification) error {
79+
var request *CashoutRequest
80+
err := c.txScheduler.GetRequest(id, &request)
81+
if err != nil {
82+
return err
83+
}
84+
85+
otherSwap, err := contract.InstanceAt(request.Cheque.Contract, c.backend)
86+
if err != nil {
87+
return err
88+
}
89+
90+
receipt := &notification.Receipt
91+
if receipt.Status == 0 {
92+
swapLog.Error("cheque cashing transaction reverted", "tx", receipt.TxHash)
93+
return nil
94+
}
95+
96+
result := otherSwap.CashChequeBeneficiaryResult(receipt)
97+
98+
metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64())
99+
100+
if result.Bounced {
101+
metrics.GetOrRegisterCounter("swap.cheques.cashed.bounced", nil).Inc(1)
102+
swapLog.Warn("cheque bounced", "tx", receipt.TxHash)
103+
}
104+
105+
swapLog.Info("cheque cashed", "honey", request.Cheque.Honey)
106+
107+
select {
108+
case c.cashoutDone <- request:
109+
default:
110+
}
111+
112+
return nil
113+
},
114+
})
115+
return c
57116
}
58117

59-
// cashCheque tries to cash the cheque specified in the request
60-
// after the transaction is sent it waits on its success
61-
func (c *CashoutProcessor) cashCheque(ctx context.Context, request *CashoutRequest) error {
62-
cheque := request.Cheque
63-
opts := bind.NewKeyedTransactor(c.privateKey)
64-
opts.Context = ctx
118+
func (c *CashoutProcessor) setCashoutDoneChan(cashoutDone chan *CashoutRequest) {
119+
c.cashoutDone = cashoutDone
120+
}
65121

66-
otherSwap, err := contract.InstanceAt(cheque.Contract, c.backend)
122+
func (c *CashoutProcessor) submitCheque(request *CashoutRequest) {
123+
expectedPayout, transactionCosts, err := c.estimatePayout(context.TODO(), &request.Cheque)
67124
if err != nil {
68-
return err
125+
swapLog.Error("could not estimate payout", "error", err)
126+
return
69127
}
70128

71-
tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature)
129+
costsMultiplier := uint256.FromUint64(2)
130+
costThreshold, err := uint256.New().Mul(transactionCosts, costsMultiplier)
72131
if err != nil {
73-
return err
132+
swapLog.Error("overflow in transaction fee", "error", err)
133+
return
74134
}
75135

76-
// this blocks until the cashout has been successfully processed
77-
return c.waitForAndProcessActiveCashout(&ActiveCashout{
78-
Request: *request,
79-
TransactionHash: tx.Hash(),
80-
})
136+
// do a payout transaction if we get 2 times the gas costs
137+
if expectedPayout.Cmp(costThreshold) == 1 {
138+
_, err := c.txScheduler.ScheduleRequest(CashoutRequestTypeID, request)
139+
if err != nil {
140+
metrics.GetOrRegisterCounter("swap.cheques.cashed.errors", nil).Inc(1)
141+
swapLog.Error("cashing cheque:", "error", err)
142+
}
143+
}
81144
}
82145

83146
// estimatePayout estimates the payout for a given cheque as well as the transaction cost
@@ -123,31 +186,3 @@ func (c *CashoutProcessor) estimatePayout(ctx context.Context, cheque *Cheque) (
123186

124187
return expectedPayout, transactionCosts, nil
125188
}
126-
127-
// waitForAndProcessActiveCashout waits for activeCashout to complete
128-
func (c *CashoutProcessor) waitForAndProcessActiveCashout(activeCashout *ActiveCashout) error {
129-
ctx, cancel := context.WithTimeout(context.Background(), DefaultTransactionTimeout)
130-
defer cancel()
131-
132-
receipt, err := chain.WaitMined(ctx, c.backend, activeCashout.TransactionHash)
133-
if err != nil {
134-
return err
135-
}
136-
137-
otherSwap, err := contract.InstanceAt(activeCashout.Request.Cheque.Contract, c.backend)
138-
if err != nil {
139-
return err
140-
}
141-
142-
result := otherSwap.CashChequeBeneficiaryResult(receipt)
143-
144-
metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64())
145-
146-
if result.Bounced {
147-
metrics.GetOrRegisterCounter("swap.cheques.cashed.bounced", nil).Inc(1)
148-
swapLog.Warn("cheque bounced", "tx", receipt.TxHash)
149-
}
150-
151-
swapLog.Info("cheque cashed", "honey", activeCashout.Request.Cheque.Honey)
152-
return nil
153-
}

swap/cashout_test.go

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ package swap
1919
import (
2020
"context"
2121
"testing"
22+
"time"
2223

2324
"github.com/ethereum/go-ethereum/accounts/abi/bind"
2425
"github.com/ethereum/go-ethereum/log"
26+
"github.com/ethersphere/swarm/state"
2527
"github.com/ethersphere/swarm/swap/chain"
2628
"github.com/ethersphere/swarm/uint256"
2729
)
@@ -33,8 +35,7 @@ import (
3335
// afterwards it attempts to cash-in a bouncing cheque
3436
func TestContractIntegration(t *testing.T) {
3537
backend := newTestBackend(t)
36-
reset := setupContractTest()
37-
defer reset()
38+
defer backend.Close()
3839

3940
payout := uint256.FromUint64(42)
4041
chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
@@ -116,11 +117,15 @@ func TestContractIntegration(t *testing.T) {
116117
// TestCashCheque creates a valid cheque and feeds it to cashoutProcessor.cashCheque
117118
func TestCashCheque(t *testing.T) {
118119
backend := newTestBackend(t)
119-
reset := setupContractTest()
120-
defer reset()
120+
defer backend.Close()
121121

122-
cashoutProcessor := newCashoutProcessor(backend, ownerKey)
123-
payout := uint256.FromUint64(42)
122+
store := state.NewInmemoryStore()
123+
defer store.Close()
124+
transactionQueue := chain.NewTxQueue(store, "queue", backend, ownerKey)
125+
transactionQueue.Start()
126+
defer transactionQueue.Stop()
127+
cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey)
128+
payout := uint256.FromUint64(CashChequeBeneficiaryTransactionCost*2 + 1)
124129

125130
chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
126131
if err != nil {
@@ -132,12 +137,18 @@ func TestCashCheque(t *testing.T) {
132137
t.Fatal(err)
133138
}
134139

135-
err = cashoutProcessor.cashCheque(context.Background(), &CashoutRequest{
140+
cashChequeDone := make(chan *CashoutRequest)
141+
defer close(cashChequeDone)
142+
cashoutProcessor.setCashoutDoneChan(cashChequeDone)
143+
144+
cashoutProcessor.submitCheque(&CashoutRequest{
136145
Cheque: *testCheque,
137146
Destination: ownerAddress,
138147
})
139-
if err != nil {
140-
t.Fatal(err)
148+
149+
select {
150+
case <-cashChequeDone:
151+
case <-time.After(5 * time.Second):
141152
}
142153

143154
paidOut, err := chequebook.PaidOut(nil, ownerAddress)
@@ -154,10 +165,14 @@ func TestCashCheque(t *testing.T) {
154165
// TestEstimatePayout creates a valid cheque and feeds it to cashoutProcessor.estimatePayout
155166
func TestEstimatePayout(t *testing.T) {
156167
backend := newTestBackend(t)
157-
reset := setupContractTest()
158-
defer reset()
159-
160-
cashoutProcessor := newCashoutProcessor(backend, ownerKey)
168+
defer backend.Close()
169+
170+
store := state.NewInmemoryStore()
171+
defer store.Close()
172+
transactionQueue := chain.NewTxQueue(store, "queue", backend, ownerKey)
173+
transactionQueue.Start()
174+
defer transactionQueue.Stop()
175+
cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey)
161176
payout := uint256.FromUint64(42)
162177

163178
chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)

swap/chain/backend.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package chain
22

33
import (
44
"context"
5-
"errors"
65
"time"
76

87
"github.com/ethereum/go-ethereum/log"
@@ -12,11 +11,6 @@ import (
1211
"github.com/ethereum/go-ethereum/core/types"
1312
)
1413

15-
var (
16-
// ErrTransactionReverted is given when the transaction that cashes a cheque is reverted
17-
ErrTransactionReverted = errors.New("Transaction reverted")
18-
)
19-
2014
// Backend is the minimum amount of functionality required by the underlying ethereum backend
2115
type Backend interface {
2216
bind.ContractBackend
@@ -36,9 +30,6 @@ func WaitMined(ctx context.Context, b Backend, hash common.Hash) (*types.Receipt
3630
log.Trace("Receipt retrieval failed", "err", err)
3731
}
3832
if receipt != nil {
39-
if receipt.Status != types.ReceiptStatusSuccessful {
40-
return nil, ErrTransactionReverted
41-
}
4233
return receipt, nil
4334
}
4435

swap/chain/persistentqueue.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package chain
2+
3+
import (
4+
"context"
5+
"encoding"
6+
"encoding/json"
7+
"strconv"
8+
"strings"
9+
"sync"
10+
"time"
11+
12+
"github.com/ethersphere/swarm/state"
13+
)
14+
15+
// PersistentQueue represents a queue stored in a state store
16+
type PersistentQueue struct {
17+
lock sync.Mutex
18+
store state.Store
19+
prefix string
20+
trigger chan struct{}
21+
nonce uint64
22+
}
23+
24+
// NewPersistentQueue creates a structure to interact with a queue with the given prefix
25+
func NewPersistentQueue(store state.Store, prefix string) *PersistentQueue {
26+
return &PersistentQueue{
27+
store: store,
28+
prefix: prefix,
29+
trigger: make(chan struct{}, 1),
30+
nonce: 0,
31+
}
32+
}
33+
34+
// Queue puts the necessary database operations for the queueing into the supplied batch
35+
// it returns the generated key and a trigger function which must be called if the batch was successfully written
36+
// this only returns an error if the encoding fails which is an unrecoverable error
37+
func (pq *PersistentQueue) Queue(b *state.StoreBatch, v interface{}) (key string, trigger func(), err error) {
38+
pq.lock.Lock()
39+
defer pq.lock.Unlock()
40+
41+
pq.nonce++
42+
key = time.Now().String() + "_" + strconv.FormatUint(pq.nonce, 10)
43+
if err = b.Put(pq.prefix+key, v); err != nil {
44+
return "", nil, err
45+
}
46+
47+
return key, func() {
48+
select {
49+
case pq.trigger <- struct{}{}:
50+
default:
51+
}
52+
}, nil
53+
}
54+
55+
// Peek looks at the next item in the queue
56+
// the error returned is either an decode or an io error
57+
func (pq *PersistentQueue) Peek(i interface{}) (key string, exists bool, err error) {
58+
err = pq.store.Iterate(pq.prefix, func(k, data []byte) (bool, error) {
59+
key = string(k)
60+
unmarshaler, ok := i.(encoding.BinaryUnmarshaler)
61+
if !ok {
62+
return true, json.Unmarshal(data, i)
63+
}
64+
return true, unmarshaler.UnmarshalBinary(data)
65+
})
66+
if err != nil {
67+
return "", false, err
68+
}
69+
if key == "" {
70+
return "", false, nil
71+
}
72+
return strings.TrimPrefix(key, pq.prefix), true, nil
73+
}
74+
75+
// Next looks at the next item in the queue and blocks until an item is available if there is none
76+
// the error returned is either an decode error, an io error or a cancelled context
77+
func (pq *PersistentQueue) Next(ctx context.Context, i interface{}) (key string, err error) {
78+
key, exists, err := pq.Peek(i)
79+
if err != nil {
80+
return "", err
81+
}
82+
if exists {
83+
return key, nil
84+
}
85+
86+
for {
87+
select {
88+
case <-pq.trigger:
89+
key, exists, err = pq.Peek(i)
90+
if err != nil {
91+
return "", err
92+
}
93+
if exists {
94+
return key, nil
95+
}
96+
case <-ctx.Done():
97+
return "", ctx.Err()
98+
}
99+
}
100+
}
101+
102+
// Delete adds the batch operation to delete the queue element with the given key
103+
func (pq *PersistentQueue) Delete(b *state.StoreBatch, key string) {
104+
b.Delete(pq.prefix + key)
105+
}

0 commit comments

Comments
 (0)