Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions internal/migrations/033-order-book-settlement.sql
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,15 @@ CREATE OR REPLACE ACTION distribute_fees(

$actual_dp_fees := $infra_share;

-- Ensure DP has a participant record so the fee is tracked in ob_net_impacts
$dp_pid INT;
for $p in SELECT id FROM ob_participants WHERE wallet_address = $dp_addr { $dp_pid := $p.id; }
if $dp_pid IS NULL {
INSERT INTO ob_participants (id, wallet_address)
SELECT COALESCE(MAX(id), 0) + 1, $dp_addr
FROM ob_participants;
for $p in SELECT id FROM ob_participants WHERE wallet_address = $dp_addr { $dp_pid := $p.id; }
}
if $dp_pid IS NOT NULL {
$next_id_dp INT;
for $row in SELECT COALESCE(MAX(id), 0::INT) + 1 as val FROM ob_net_impacts { $next_id_dp := $row.val; }
Expand All @@ -113,9 +120,16 @@ CREATE OR REPLACE ACTION distribute_fees(

$actual_validator_fees := $infra_share;

-- Ensure Validator has a participant record so the fee is tracked in ob_net_impacts
$val_pid INT;
$leader_bytes BYTEA := tn_utils.get_leader_bytes();
for $p in SELECT id FROM ob_participants WHERE wallet_address = $leader_bytes { $val_pid := $p.id; }
if $val_pid IS NULL AND $leader_bytes IS NOT NULL {
INSERT INTO ob_participants (id, wallet_address)
SELECT COALESCE(MAX(id), 0) + 1, $leader_bytes
FROM ob_participants;
for $p in SELECT id FROM ob_participants WHERE wallet_address = $leader_bytes { $val_pid := $p.id; }
}
if $val_pid IS NOT NULL {
$next_id_val INT;
for $row in SELECT COALESCE(MAX(id), 0::INT) + 1 as val FROM ob_net_impacts { $next_id_val := $row.val; }
Expand Down
251 changes: 251 additions & 0 deletions tests/streams/order_book/fee_distribution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/trufnetwork/kwil-db/common"
"github.com/trufnetwork/kwil-db/core/crypto"
coreauth "github.com/trufnetwork/kwil-db/core/crypto/auth"
kwilTypes "github.com/trufnetwork/kwil-db/core/types"
erc20bridge "github.com/trufnetwork/kwil-db/node/exts/erc20-bridge/erc20"
Expand All @@ -36,6 +37,8 @@ func TestFeeDistribution(t *testing.T) {
testDistributionZeroFees(t),
// Single LP scenario: 1 LP gets 100% of fees
testDistribution1LP(t),
// DP/Validator auto-participant creation: non-trading DP and Validator get fees
testDistributionDPValidatorAutoParticipant(t),
},
}, testutils.GetTestOptionsWithCache())
}
Expand Down Expand Up @@ -859,3 +862,251 @@ func testDistribution1LP(t *testing.T) func(context.Context, *kwilTesting.Platfo
return nil
}
}

// testDistributionDPValidatorAutoParticipant tests that a DP and Validator who never placed
// orders get auto-created as ob_participants and receive their 12.5% fee shares.
// Scenario: DP is a separate address that never trades. LP users place orders.
// After distribute_fees, DP should have a participant entry and received USDC.
func testDistributionDPValidatorAutoParticipant(t *testing.T) func(context.Context, *kwilTesting.Platform) error {
return func(ctx context.Context, platform *kwilTesting.Platform) error {
// Reset balance point tracker
lastBalancePoint = nil
lastTrufBalancePoint = nil

// Initialize ERC20 extension
err := erc20bridge.ForTestingInitializeExtension(ctx, platform)
require.NoError(t, err)

// DP address: does NOT place any orders (no ob_participants entry before distribution)
dpAddress := util.Unsafe_NewEthereumAddressFromString("0x5555555555555555555555555555555555555555")

// LP users: these place orders and will have ob_participants entries
user1 := util.Unsafe_NewEthereumAddressFromString("0x6666666666666666666666666666666666666666")
user2 := util.Unsafe_NewEthereumAddressFromString("0x7777777777777777777777777777777777777777")

// Give LP users balance (DP doesn't need balance — they receive fees, not place orders)
err = giveBalanceChained(ctx, platform, user1.Address(), "1000000000000000000000")
require.NoError(t, err)
err = giveBalanceChained(ctx, platform, user2.Address(), "1000000000000000000000")
require.NoError(t, err)

// Create market with dpAddress as the Data Provider (NOT user1)
queryComponents, err := encodeQueryComponentsForTests(dpAddress.Address(), "sttest00000000000000000000000070", "get_record", []byte{0x01})
require.NoError(t, err)
settleTime := time.Now().Add(24 * time.Hour).Unix()

var marketID int64
err = callCreateMarket(ctx, platform, &user1, queryComponents, settleTime, 5, 20, func(row *common.Row) error {
marketID = row.Values[0].(int64)
return nil
})
require.NoError(t, err)
t.Logf("Created market ID: %d (DP=%s)", marketID, dpAddress.Address()[:10])

// Verify DP does NOT have a participant entry yet
var dpParticipantCount int
err = platform.Engine.Execute(
&common.EngineContext{TxContext: &common.TxContext{Ctx: ctx, BlockContext: &common.BlockContext{Height: 1}, TxID: platform.Txid()}},
platform.DB,
"SELECT COUNT(*) as cnt FROM ob_participants WHERE wallet_address = decode(substring($wallet, 3, 40), 'hex')",
map[string]any{"$wallet": dpAddress.Address()},
func(row *common.Row) error {
dpParticipantCount = int(row.Values[0].(int64))
return nil
},
)
require.NoError(t, err)
require.Equal(t, 0, dpParticipantCount, "DP should NOT have a participant entry before distribution")

// Setup LP scenario: user1 and user2 place LP pair orders
setupLPScenario(t, ctx, platform, &user1, &user2, int(marketID))

// Sample LP rewards
err = triggerBatchSampling(ctx, platform, 1000)
require.NoError(t, err)

// Get DP USDC balance before distribution
dpBalanceBefore, err := getUSDCBalance(ctx, platform, dpAddress.Address())
require.NoError(t, err)
t.Logf("DP USDC balance before: %s", dpBalanceBefore.String())

// Distribute 10 TRUF fees using a known proposer key (Validator)
totalFees := new(big.Int).Mul(big.NewInt(10), big.NewInt(1e18))

// Fund vault
err = giveUSDCBalanceChained(ctx, platform, testUSDCEscrow, totalFees.String())
require.NoError(t, err)
_, err = erc20bridge.ForTestingForceSyncInstance(ctx, platform, testChain, testEscrow, testERC20, 18)
require.NoError(t, err)

totalFeesDecimal, err := kwilTypes.ParseDecimalExplicit(totalFees.String(), 78, 0)
require.NoError(t, err)

// Generate a known proposer key — this becomes the Validator address
pub := NewTestProposerPub(t)
validatorAddr := fmt.Sprintf("0x%x", crypto.EthereumAddressFromPubKey(pub))
t.Logf("Validator address: %s", validatorAddr)

// Call distribute_fees with user1 as caller but dpAddress as DP
tx := &common.TxContext{
Ctx: ctx,
BlockContext: &common.BlockContext{
Height: 1,
Timestamp: time.Now().Unix(),
Proposer: pub,
},
Signer: user1.Bytes(),
Caller: user1.Address(),
TxID: platform.Txid(),
Authenticator: coreauth.EthPersonalSignAuth,
}
engineCtx := &common.EngineContext{TxContext: tx, OverrideAuthz: true}

res, err := platform.Engine.Call(
engineCtx,
platform.DB,
"",
"distribute_fees",
[]any{int(marketID), totalFeesDecimal, true},
nil,
)
require.NoError(t, err)
if res.Error != nil {
t.Fatalf("distribute_fees error: %v", res.Error)
}

// =====================================================================
// Verify 1: DP was auto-created as participant
// =====================================================================
err = platform.Engine.Execute(
&common.EngineContext{TxContext: &common.TxContext{Ctx: ctx, BlockContext: &common.BlockContext{Height: 1}, TxID: platform.Txid()}},
platform.DB,
"SELECT COUNT(*) as cnt FROM ob_participants WHERE wallet_address = decode(substring($wallet, 3, 40), 'hex')",
map[string]any{"$wallet": dpAddress.Address()},
func(row *common.Row) error {
dpParticipantCount = int(row.Values[0].(int64))
return nil
},
)
require.NoError(t, err)
require.Equal(t, 1, dpParticipantCount, "DP should be auto-created as participant after distribution")
t.Logf("DP auto-created in ob_participants")

// =====================================================================
// Verify 2: Validator was auto-created as participant
// =====================================================================
var valParticipantCount int
err = platform.Engine.Execute(
&common.EngineContext{TxContext: &common.TxContext{Ctx: ctx, BlockContext: &common.BlockContext{Height: 1}, TxID: platform.Txid()}},
platform.DB,
"SELECT COUNT(*) as cnt FROM ob_participants WHERE wallet_address = decode(substring($wallet, 3, 40), 'hex')",
map[string]any{"$wallet": validatorAddr},
func(row *common.Row) error {
valParticipantCount = int(row.Values[0].(int64))
return nil
},
)
require.NoError(t, err)
require.Equal(t, 1, valParticipantCount, "Validator should be auto-created as participant after distribution")
t.Logf("Validator auto-created in ob_participants")

// =====================================================================
// Verify 3: DP received 12.5% fee share via USDC balance increase
// =====================================================================
dpBalanceAfter, err := getUSDCBalance(ctx, platform, dpAddress.Address())
require.NoError(t, err)

dpIncrease := new(big.Int).Sub(dpBalanceAfter, dpBalanceBefore)
expectedInfraShare := new(big.Int).Div(new(big.Int).Mul(totalFees, big.NewInt(125)), big.NewInt(1000))

require.Equal(t, expectedInfraShare.String(), dpIncrease.String(),
"DP should receive exactly 12.5%% of total fees")
t.Logf("DP received %s wei (12.5%% of %s)", dpIncrease.String(), totalFees.String())

// =====================================================================
// Verify 4: DP and Validator have ob_net_impacts entries
// =====================================================================
var dpPID int
err = platform.Engine.Execute(
&common.EngineContext{TxContext: &common.TxContext{Ctx: ctx, BlockContext: &common.BlockContext{Height: 1}, TxID: platform.Txid()}},
platform.DB,
"SELECT id FROM ob_participants WHERE wallet_address = decode(substring($wallet, 3, 40), 'hex')",
map[string]any{"$wallet": dpAddress.Address()},
func(row *common.Row) error {
dpPID = int(row.Values[0].(int64))
return nil
},
)
require.NoError(t, err)

var dpImpactCount int
err = platform.Engine.Execute(
&common.EngineContext{TxContext: &common.TxContext{Ctx: ctx, BlockContext: &common.BlockContext{Height: 1}, TxID: platform.Txid()}},
platform.DB,
"SELECT COUNT(*) as cnt FROM ob_net_impacts WHERE query_id = $query_id AND participant_id = $pid",
map[string]any{"$query_id": int(marketID), "$pid": dpPID},
func(row *common.Row) error {
dpImpactCount = int(row.Values[0].(int64))
return nil
},
)
require.NoError(t, err)
require.Equal(t, 1, dpImpactCount, "DP should have 1 net impact entry (fee reward)")

var valPID int
err = platform.Engine.Execute(
&common.EngineContext{TxContext: &common.TxContext{Ctx: ctx, BlockContext: &common.BlockContext{Height: 1}, TxID: platform.Txid()}},
platform.DB,
"SELECT id FROM ob_participants WHERE wallet_address = decode(substring($wallet, 3, 40), 'hex')",
map[string]any{"$wallet": validatorAddr},
func(row *common.Row) error {
valPID = int(row.Values[0].(int64))
return nil
},
)
require.NoError(t, err)

var valImpactCount int
err = platform.Engine.Execute(
&common.EngineContext{TxContext: &common.TxContext{Ctx: ctx, BlockContext: &common.BlockContext{Height: 1}, TxID: platform.Txid()}},
platform.DB,
"SELECT COUNT(*) as cnt FROM ob_net_impacts WHERE query_id = $query_id AND participant_id = $pid",
map[string]any{"$query_id": int(marketID), "$pid": valPID},
func(row *common.Row) error {
valImpactCount = int(row.Values[0].(int64))
return nil
},
)
require.NoError(t, err)
require.Equal(t, 1, valImpactCount, "Validator should have 1 net impact entry (fee reward)")

t.Logf("DP (pid=%d) and Validator (pid=%d) both have ob_net_impacts entries", dpPID, valPID)

// =====================================================================
// Verify 5: Audit summary shows DP and Validator fees
// =====================================================================
var totalDPFeesStr string
var totalValFeesStr string

_, err = platform.Engine.Call(
&common.EngineContext{TxContext: &common.TxContext{
Ctx: ctx, BlockContext: &common.BlockContext{Height: 1, Timestamp: time.Now().Unix()},
Signer: user1.Bytes(), Caller: user1.Address(), TxID: platform.Txid(),
Authenticator: coreauth.EthPersonalSignAuth,
}, OverrideAuthz: true},
platform.DB, "", "get_distribution_summary", []any{int(marketID)},
func(row *common.Row) error {
totalDPFeesStr = row.Values[2].(*kwilTypes.Decimal).String()
totalValFeesStr = row.Values[3].(*kwilTypes.Decimal).String()
return nil
})
require.NoError(t, err)

require.Equal(t, expectedInfraShare.String(), totalDPFeesStr, "Audit should show DP fees")
require.Equal(t, expectedInfraShare.String(), totalValFeesStr, "Audit should show Validator fees")

t.Logf("Audit verified: DP fees=%s, Validator fees=%s", totalDPFeesStr, totalValFeesStr)

return nil
}
}
Loading