diff --git a/extensions/tn_utils/precompiles.go b/extensions/tn_utils/precompiles.go index cf0980d0..70903148 100644 --- a/extensions/tn_utils/precompiles.go +++ b/extensions/tn_utils/precompiles.go @@ -8,6 +8,7 @@ import ( "fmt" "math" "math/big" + "sort" "strings" "github.com/trufnetwork/kwil-db/common" @@ -41,6 +42,8 @@ func buildPrecompile() precompiles.Precompile { getCallerBytesMethod(), getLeaderHexMethod(), getLeaderBytesMethod(), + getValidatorsMethod(), + getValidatorCountMethod(), }, } } @@ -177,6 +180,112 @@ func getLeaderBytesMethod() precompiles.Method { } } +// getValidatorsMethod returns all active validators as a table of (wallet_hex, wallet_bytes, power). +// Results are sorted deterministically by public key bytes for consensus safety. +func getValidatorsMethod() precompiles.Method { + return precompiles.Method{ + Name: "get_validators", + AccessModifiers: []precompiles.Modifier{precompiles.VIEW, precompiles.PUBLIC}, + Parameters: []precompiles.PrecompileValue{}, + Returns: &precompiles.MethodReturn{ + IsTable: true, + Fields: []precompiles.PrecompileValue{ + precompiles.NewPrecompileValue("wallet_hex", types.TextType, false), + precompiles.NewPrecompileValue("wallet_bytes", types.ByteaType, false), + precompiles.NewPrecompileValue("power", types.IntType, false), + }, + }, + Handler: func(ctx *common.EngineContext, app *common.App, inputs []any, resultFn func([]any) error) error { + if app == nil || app.Validators == nil { + return nil // No validators available, return empty table + } + + validators := app.Validators.GetValidators() + if len(validators) == 0 { + return nil + } + + // Defensive shallow copy to avoid mutating the caller's slice + validatorsCopy := make([]*types.Validator, len(validators)) + copy(validatorsCopy, validators) + + // Sort deterministically by pubkey bytes for consensus safety + sort.Slice(validatorsCopy, func(i, j int) bool { + return bytes.Compare(validatorsCopy[i].Identifier, validatorsCopy[j].Identifier) < 0 + }) + + validators = validatorsCopy + + for _, v := range validators { + if v.Power <= 0 { + continue + } + + var walletHex string + var walletBytes []byte + + if v.KeyType == crypto.KeyTypeSecp256k1 { + secp, err := crypto.UnmarshalSecp256k1PublicKey(v.Identifier) + if err != nil { + continue // Skip validators with invalid keys + } + addr := crypto.EthereumAddressFromPubKey(secp) + walletHex = "0x" + hex.EncodeToString(addr) + walletBytes = addr + } else { + // Non-secp256k1 validators: use raw pubkey + walletHex = "0x" + hex.EncodeToString(v.Identifier) + walletBytes = v.Identifier + } + + if err := resultFn([]any{walletHex, walletBytes, v.Power}); err != nil { + return err + } + } + + return nil + }, + } +} + +// getValidatorCountMethod returns the number of active validators (power > 0) as a scalar INT. +// This avoids iterating get_validators() just to count, enabling single-pass distribution loops. +func getValidatorCountMethod() precompiles.Method { + return precompiles.Method{ + Name: "get_validator_count", + AccessModifiers: []precompiles.Modifier{precompiles.VIEW, precompiles.PUBLIC}, + Parameters: []precompiles.PrecompileValue{}, + Returns: &precompiles.MethodReturn{ + IsTable: false, + Fields: []precompiles.PrecompileValue{ + precompiles.NewPrecompileValue("count", types.IntType, false), + }, + }, + Handler: func(ctx *common.EngineContext, app *common.App, inputs []any, resultFn func([]any) error) error { + if app == nil || app.Validators == nil { + return resultFn([]any{int64(0)}) + } + + validators := app.Validators.GetValidators() + var count int64 + for _, v := range validators { + if v.Power <= 0 { + continue + } + // Mirror get_validators() filtering: skip malformed secp256k1 keys + if v.KeyType == crypto.KeyTypeSecp256k1 { + if _, err := crypto.UnmarshalSecp256k1PublicKey(v.Identifier); err != nil { + continue + } + } + count++ + } + + return resultFn([]any{count}) + }, + } +} + // unpackQueryComponentsMethod extracts (dataProvider, streamID, actionID, args) from ABI-encoded bytes. func unpackQueryComponentsMethod() precompiles.Method { return precompiles.Method{ diff --git a/go.mod b/go.mod index d526d0fd..3c937cd3 100644 --- a/go.mod +++ b/go.mod @@ -17,8 +17,8 @@ require ( github.com/spf13/cobra v1.9.1 github.com/stretchr/testify v1.11.1 github.com/testcontainers/testcontainers-go v0.37.0 - github.com/trufnetwork/kwil-db v0.10.3-0.20260303100144-0119418a1a7c - github.com/trufnetwork/kwil-db/core v0.4.3-0.20260303100144-0119418a1a7c + github.com/trufnetwork/kwil-db v0.10.3-0.20260311113716-5415d420dcc6 + github.com/trufnetwork/kwil-db/core v0.4.3-0.20260311113716-5415d420dcc6 github.com/trufnetwork/sdk-go v0.6.4-0.20260224122406-a741343e2f37 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa diff --git a/go.sum b/go.sum index b082cfa9..eb582471 100644 --- a/go.sum +++ b/go.sum @@ -1244,10 +1244,22 @@ github.com/trufnetwork/kwil-db v0.10.3-0.20260216231327-01b863886682 h1:Gqee9/lN github.com/trufnetwork/kwil-db v0.10.3-0.20260216231327-01b863886682/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= github.com/trufnetwork/kwil-db v0.10.3-0.20260303100144-0119418a1a7c h1:lvyTdrm1gzLqCmS+sqsg2JZWnoWo0ORKKBL8Z91C/JU= github.com/trufnetwork/kwil-db v0.10.3-0.20260303100144-0119418a1a7c/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db v0.10.3-0.20260311105956-61f1dad27875 h1:dwJejERTLmehMs6lEY7+nnUKfyrEEYIgP5Vlm8FGeQ8= +github.com/trufnetwork/kwil-db v0.10.3-0.20260311105956-61f1dad27875/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db v0.10.3-0.20260311112239-ab4bf677c84b h1:9LnfIK51aIc2Hx+2onVf7gK/ggR95m/1vrsU2FOZp4w= +github.com/trufnetwork/kwil-db v0.10.3-0.20260311112239-ab4bf677c84b/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db v0.10.3-0.20260311113716-5415d420dcc6 h1:WjEGspN5kp6JPpX/p6f6iLCHGyXg9k/UpIlcPLpqfLE= +github.com/trufnetwork/kwil-db v0.10.3-0.20260311113716-5415d420dcc6/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= github.com/trufnetwork/kwil-db/core v0.4.3-0.20260216231327-01b863886682 h1:iaxXr8D3dU79MBhmS/uCuBhnlc+gbLvCvV6GtAz3ukw= github.com/trufnetwork/kwil-db/core v0.4.3-0.20260216231327-01b863886682/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= github.com/trufnetwork/kwil-db/core v0.4.3-0.20260303100144-0119418a1a7c h1:O5pyUJqZNNIi/l1vXc9fxycdEU9OxF5z8UQprTn4zZE= github.com/trufnetwork/kwil-db/core v0.4.3-0.20260303100144-0119418a1a7c/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20260311105956-61f1dad27875 h1:KxTKPF7zTedqFRkXO02Gil6r31Ra9I038JR7ZWMViA8= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20260311105956-61f1dad27875/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20260311112239-ab4bf677c84b h1:vNwvWLDjh4s8qvmS1UKISZee67YS9a2vncPC7W555dI= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20260311112239-ab4bf677c84b/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20260311113716-5415d420dcc6 h1:jyPGV1M4VVpvf76zaCKaEYrWz8IHDW4ukFHqL2shJx0= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20260311113716-5415d420dcc6/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2 h1:DCq8MzbWH0wZmICNmMVsSzUHUPl+2vqRhluEABjxl88= github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2/go.mod h1:Y0MJpPp9QXU5vC6Gpoilql2NkgmGNcbHm9HYC2v2N8s= github.com/trufnetwork/sdk-go v0.6.4-0.20260224122406-a741343e2f37 h1:VD/GWxLTshaXpLukEc1SXbG7QA9HrFzF8JvxJAJ/x7Q= diff --git a/internal/migrations/033-order-book-settlement.sql b/internal/migrations/033-order-book-settlement.sql index c28aa8a1..64e5ed56 100644 --- a/internal/migrations/033-order-book-settlement.sql +++ b/internal/migrations/033-order-book-settlement.sql @@ -81,6 +81,23 @@ CREATE OR REPLACE ACTION distribute_fees( $infra_share NUMERIC(78, 0) := ($total_fees * 125::NUMERIC(78, 0)) / 1000::NUMERIC(78, 0); $lp_share := $lp_share + ($total_fees - $lp_share - (2::NUMERIC(78, 0) * $infra_share)); + -- Pre-compute block_count and create parent distribution record early + -- so DP and validator detail rows can reference it via FK + $block_count INT := 0; + for $row in SELECT COUNT(DISTINCT block) as cnt FROM ob_rewards WHERE query_id = $query_id { $block_count := $row.cnt; } + + $lp_count INT := 0; + $actual_fees_distributed NUMERIC(78, 0) := '0'::NUMERIC(78, 0); + $distribution_id INT; + for $row in SELECT COALESCE(MAX(id), 0) + 1 as val FROM ob_fee_distributions { $distribution_id := $row.val; } + + -- PRE-INSERT PARENT RECORD with placeholders (updated at end with final values) + INSERT INTO ob_fee_distributions ( + id, query_id, total_fees_distributed, total_dp_fees, total_validator_fees, total_lp_count, block_count, distributed_at + ) VALUES ( + $distribution_id, $query_id, '0'::NUMERIC(78, 0), '0'::NUMERIC(78, 0), '0'::NUMERIC(78, 0), 0, $block_count, @block_timestamp + ); + -- Payout Data Provider $dp_addr BYTEA; for $row in tn_utils.unpack_query_components($query_components) { $dp_addr := $row.data_provider; } @@ -107,61 +124,85 @@ CREATE OR REPLACE ACTION distribute_fees( $next_id_dp INT; for $row in SELECT COALESCE(MAX(id), 0::INT) + 1 as val FROM ob_net_impacts { $next_id_dp := $row.val; } ob_record_net_impact($next_id_dp, $query_id, $dp_pid, $winning_outcome, 0::INT8, $infra_share, FALSE); + + -- Record DP in distribution details so indexer picks it up + INSERT INTO ob_fee_distribution_details (distribution_id, participant_id, wallet_address, reward_amount, total_reward_percent) + VALUES ($distribution_id, $dp_pid, $dp_addr, $infra_share, 12.50::NUMERIC(10, 2)); } } - -- Payout Validator (Leader) + -- Payout Validators (split evenly among all active validators) $actual_validator_fees NUMERIC(78, 0) := '0'::NUMERIC(78, 0); - $val_wallet TEXT := tn_utils.get_leader_hex(); - if $val_wallet != '' AND $infra_share > '0'::NUMERIC(78, 0) { - if $bridge = 'hoodi_tt2' { hoodi_tt2.unlock($val_wallet, $infra_share); } - else if $bridge = 'sepolia_bridge' { sepolia_bridge.unlock($val_wallet, $infra_share); } - else if $bridge = 'ethereum_bridge' { ethereum_bridge.unlock($val_wallet, $infra_share); } - - $actual_validator_fees := $infra_share; - - -- Ensure Validator has a participant record so the fee is tracked in ob_net_impacts - $val_pid INT; - $leader_bytes BYTEA := tn_utils.get_leader_bytes(); - for $p in SELECT id FROM ob_participants WHERE wallet_address = $leader_bytes { $val_pid := $p.id; } - if $val_pid IS NULL AND $leader_bytes IS NOT NULL { - INSERT INTO ob_participants (id, wallet_address) - SELECT COALESCE(MAX(id), 0) + 1, $leader_bytes - FROM ob_participants; - for $p in SELECT id FROM ob_participants WHERE wallet_address = $leader_bytes { $val_pid := $p.id; } - } - if $val_pid IS NOT NULL { - $next_id_val INT; - for $row in SELECT COALESCE(MAX(id), 0::INT) + 1 as val FROM ob_net_impacts { $next_id_val := $row.val; } - ob_record_net_impact($next_id_val, $query_id, $val_pid, $winning_outcome, 0::INT8, $infra_share, FALSE); + + if $infra_share > '0'::NUMERIC(78, 0) { + $validator_count INT := tn_utils.get_validator_count(); + + if $validator_count > 0 { + $per_validator NUMERIC(78, 0) := $infra_share / $validator_count::NUMERIC(78, 0); + $val_remainder NUMERIC(78, 0) := $infra_share - ($per_validator * $validator_count::NUMERIC(78, 0)); + $first_validator BOOL := TRUE; + $val_pct NUMERIC(10, 2) := 12.50::NUMERIC(10, 2) / $validator_count::NUMERIC(10, 2); + $val_pct_remainder NUMERIC(10, 2) := 12.50::NUMERIC(10, 2) - ($val_pct * $validator_count::NUMERIC(10, 2)); + + for $v in tn_utils.get_validators() { + -- Extract row fields to local variables (Kuneiform SQL generator limitation) + $v_wallet_hex TEXT := $v.wallet_hex; + $v_wallet_bytes BYTEA := $v.wallet_bytes; + $v_payout NUMERIC(78, 0) := $per_validator; + $v_pct NUMERIC(10, 2) := $val_pct; + + -- Give remainder to first validator (deterministic: sorted by pubkey) + if $first_validator { + $v_payout := $v_payout + $val_remainder; + $v_pct := $v_pct + $val_pct_remainder; + $first_validator := FALSE; + } + + -- Skip validators with zero payout (e.g., infra_share < validator_count) + if $v_payout > '0'::NUMERIC(78, 0) { + -- Unlock funds via bridge + if $bridge = 'hoodi_tt2' { hoodi_tt2.unlock($v_wallet_hex, $v_payout); } + else if $bridge = 'sepolia_bridge' { sepolia_bridge.unlock($v_wallet_hex, $v_payout); } + else if $bridge = 'ethereum_bridge' { ethereum_bridge.unlock($v_wallet_hex, $v_payout); } + + -- Ensure validator has a participant record + $v_pid INT; + for $p in SELECT id FROM ob_participants WHERE wallet_address = $v_wallet_bytes { $v_pid := $p.id; } + if $v_pid IS NULL { + INSERT INTO ob_participants (id, wallet_address) + SELECT COALESCE(MAX(id), 0) + 1, $v_wallet_bytes + FROM ob_participants; + for $p in SELECT id FROM ob_participants WHERE wallet_address = $v_wallet_bytes { $v_pid := $p.id; } + } + if $v_pid IS NOT NULL { + $next_id_val INT; + for $row in SELECT COALESCE(MAX(id), 0::INT) + 1 as val FROM ob_net_impacts { $next_id_val := $row.val; } + ob_record_net_impact($next_id_val, $query_id, $v_pid, $winning_outcome, 0::INT8, $v_payout, FALSE); + + -- Record validator in distribution details so indexer picks it up + INSERT INTO ob_fee_distribution_details (distribution_id, participant_id, wallet_address, reward_amount, total_reward_percent) + VALUES ($distribution_id, $v_pid, $v_wallet_bytes, $v_payout, $v_pct) + ON CONFLICT (distribution_id, participant_id) DO UPDATE + SET reward_amount = ob_fee_distribution_details.reward_amount + $v_payout, + total_reward_percent = ob_fee_distribution_details.total_reward_percent + $v_pct; + } + + $actual_validator_fees := $actual_validator_fees + $v_payout; + } + } } } - -- LP Distribution Pre-calculation + -- LP Distribution -- NOTE: sample_lp_rewards() is called in process_settlement() BEFORE ob_positions are deleted, -- so the final sample reads the live order book. Do NOT call it here. - $block_count INT := 0; - for $row in SELECT COUNT(DISTINCT block) as cnt FROM ob_rewards WHERE query_id = $query_id { $block_count := $row.cnt; } - - $lp_count INT := 0; - $actual_fees_distributed NUMERIC(78, 0) := '0'::NUMERIC(78, 0); - $distribution_id INT; - for $row in SELECT COALESCE(MAX(id), 0) + 1 as val FROM ob_fee_distributions { $distribution_id := $row.val; } - - -- PRE-INSERT PARENT RECORD to satisfy FK for details - INSERT INTO ob_fee_distributions ( - id, query_id, total_fees_distributed, total_dp_fees, total_validator_fees, total_lp_count, block_count, distributed_at - ) VALUES ( - $distribution_id, $query_id, '0'::NUMERIC(78, 0), $actual_dp_fees, $actual_validator_fees, 0, $block_count, @block_timestamp - ); - if $block_count > 0 { for $row in SELECT MIN(participant_id) as val FROM ob_rewards WHERE query_id = $query_id { $min_pid := $row.val; } -- Calculate remainder $total_calc NUMERIC(78, 0) := '0'::NUMERIC(78, 0); - for $res in + for $res in SELECT participant_id, SUM(reward_percent) as reward_percent FROM ob_rewards WHERE query_id = $query_id GROUP BY participant_id { @@ -187,32 +228,38 @@ CREATE OR REPLACE ACTION distribute_fees( $wallet_hexes := array_append($wallet_hexes, $res.wallet_hex); $amounts := array_append($amounts, $reward_final); $outcomes := array_append($outcomes, $winning_outcome); - - -- Record audit detail (Parent exists now) + + -- Record LP audit detail (skip if percent rounds to 0 at NUMERIC(10,2) precision) $curr_pid INT := $res.participant_id; $curr_wallet BYTEA := $res.wallet_address; $curr_reward NUMERIC(78,0) := $reward_final; $curr_pct NUMERIC(10,2) := $res.reward_percent::NUMERIC(10, 2); - INSERT INTO ob_fee_distribution_details (distribution_id, participant_id, wallet_address, reward_amount, total_reward_percent) - VALUES ($distribution_id, $curr_pid, $curr_wallet, $curr_reward, $curr_pct); + if $curr_pct > 0.00::NUMERIC(10, 2) { + INSERT INTO ob_fee_distribution_details (distribution_id, participant_id, wallet_address, reward_amount, total_reward_percent) + VALUES ($distribution_id, $curr_pid, $curr_wallet, $curr_reward, $curr_pct) + ON CONFLICT (distribution_id, participant_id) DO UPDATE + SET reward_amount = ob_fee_distribution_details.reward_amount + $curr_reward, + total_reward_percent = ob_fee_distribution_details.total_reward_percent + $curr_pct; + } } } if COALESCE(array_length($pids), 0) > 0 { $lp_count := array_length($pids); $actual_fees_distributed := $lp_share; - - -- UPDATE SUMMARY with final values - UPDATE ob_fee_distributions - SET total_fees_distributed = $actual_fees_distributed, - total_lp_count = $lp_count - WHERE id = $distribution_id; - ob_batch_unlock_collateral($query_id, $bridge, $pids, $wallet_hexes, $amounts, $outcomes); } } + -- UPDATE SUMMARY with final values + UPDATE ob_fee_distributions + SET total_fees_distributed = $actual_fees_distributed, + total_dp_fees = $actual_dp_fees, + total_validator_fees = $actual_validator_fees, + total_lp_count = $lp_count + WHERE id = $distribution_id; + if $lp_count > 0 { DELETE FROM ob_rewards WHERE query_id = $query_id; } }; diff --git a/tests/streams/order_book/fee_distribution_audit_test.go b/tests/streams/order_book/fee_distribution_audit_test.go index 26b4493a..d8a6a5bc 100644 --- a/tests/streams/order_book/fee_distribution_audit_test.go +++ b/tests/streams/order_book/fee_distribution_audit_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/core/crypto" coreauth "github.com/trufnetwork/kwil-db/core/crypto/auth" kwilTypes "github.com/trufnetwork/kwil-db/core/types" erc20bridge "github.com/trufnetwork/kwil-db/node/exts/erc20-bridge/erc20" @@ -81,9 +82,12 @@ func testAuditRecordCreation(t *testing.T) func(context.Context, *kwilTesting.Pl err = triggerBatchSampling(ctx, platform, 1000) require.NoError(t, err) + // Inject a validator so distribute_fees can pay validator share + valPub, _ := injectTestValidator(t, platform) + // Fund vault and call distribute_fees totalFees := new(big.Int).Mul(big.NewInt(10), big.NewInt(1e18)) // 10 TRUF - err = fundVaultAndDistributeFees(t, ctx, platform, &user1, int(marketID), totalFees, true) + err = fundVaultAndDistributeFees(t, ctx, platform, &user1, int(marketID), totalFees, true, valPub) require.NoError(t, err) // Verify audit summary record exists @@ -164,20 +168,20 @@ func testAuditRecordCreation(t *testing.T) func(context.Context, *kwilTesting.Pl return nil }) require.NoError(t, err) - require.Len(t, detailRows, 2, "Should have 2 LP detail records") + // 3 detail records: 2 LPs + 1 validator (user1's DP fee merged into LP row via ON CONFLICT) + require.Len(t, detailRows, 3, "Should have 3 detail records (2 LPs + 1 validator, DP merged with LP)") - // Verify zero-loss: SUM(reward_amount) = total_lp_fees_distributed - totalLPFeesFromAudit, _ := new(big.Int).SetString(totalLPFeesStr, 10) + // Verify zero-loss: SUM(reward_amount) = total_fees (LP + DP + validator) var totalDistributed big.Int for _, detail := range detailRows { amt, _ := new(big.Int).SetString(detail.rewardAmount, 10) totalDistributed.Add(&totalDistributed, amt) } - require.Equal(t, totalLPFeesFromAudit.String(), totalDistributed.String(), - "Zero-loss audit: SUM(reward_amount) should equal total_lp_fees_distributed") + require.Equal(t, totalFees.String(), totalDistributed.String(), + "Zero-loss audit: SUM(detail reward_amount) should equal total_fees") - t.Logf("✅ Audit record creation verified: %s wei distributed across %d LPs", totalLPFeesFromAudit.String(), lpCount) + t.Logf("✅ Audit record creation verified: %s wei distributed across %d detail records", totalDistributed.String(), len(detailRows)) return nil } @@ -221,9 +225,10 @@ func testAuditMultiBlock(t *testing.T) func(context.Context, *kwilTesting.Platfo err = triggerBatchSampling(ctx, platform, 3000) require.NoError(t, err) - // Distribute fees + // Inject validator and distribute fees + valPub, _ := injectTestValidator(t, platform) totalFees := new(big.Int).Mul(big.NewInt(30), big.NewInt(1e18)) - err = fundVaultAndDistributeFees(t, ctx, platform, &user1, int(marketID), totalFees, true) + err = fundVaultAndDistributeFees(t, ctx, platform, &user1, int(marketID), totalFees, true, valPub) require.NoError(t, err) // Verify audit summary @@ -287,8 +292,9 @@ func testAuditNoLPs(t *testing.T) func(context.Context, *kwilTesting.Platform) e require.NoError(t, err) // Don't sample LP rewards (no LP samples) + valPub, _ := injectTestValidator(t, platform) totalFees := new(big.Int).Mul(big.NewInt(10), big.NewInt(1e18)) - err = fundVaultAndDistributeFees(t, ctx, platform, &user1, int(marketID), totalFees, true) + err = fundVaultAndDistributeFees(t, ctx, platform, &user1, int(marketID), totalFees, true, valPub) require.NoError(t, err) // Verify NO audit summary record @@ -355,8 +361,9 @@ func testAuditZeroFees(t *testing.T) func(context.Context, *kwilTesting.Platform require.NoError(t, err) // Call distribute_fees with $0 fees (early return) + valPub, _ := injectTestValidator(t, platform) zeroFees := big.NewInt(0) - err = fundVaultAndDistributeFees(t, ctx, platform, &user1, int(marketID), zeroFees, true) + err = fundVaultAndDistributeFees(t, ctx, platform, &user1, int(marketID), zeroFees, true, valPub) require.NoError(t, err) // Verify NO audit record (zero fees early return) @@ -431,9 +438,10 @@ func testAuditDataIntegrity(t *testing.T) func(context.Context, *kwilTesting.Pla err = triggerBatchSampling(ctx, platform, 1000) require.NoError(t, err) - // Distribute + // Inject validator and distribute + valPub, _ := injectTestValidator(t, platform) totalFees := new(big.Int).Mul(big.NewInt(10), big.NewInt(1e18)) - err = fundVaultAndDistributeFees(t, ctx, platform, &user1, int(marketID), totalFees, true) + err = fundVaultAndDistributeFees(t, ctx, platform, &user1, int(marketID), totalFees, true, valPub) require.NoError(t, err) // Get final USDC balances @@ -494,7 +502,8 @@ func testAuditDataIntegrity(t *testing.T) func(context.Context, *kwilTesting.Pla return nil }) require.NoError(t, err) - require.Equal(t, 2, detailRowCount, "Should have 2 LP detail records") + // 3 detail records: 2 LPs + 1 validator (user1's DP fee merged into LP row via ON CONFLICT) + require.Equal(t, 3, detailRowCount, "Should have 3 detail records (2 LPs + 1 validator, DP merged with LP)") // Verify audit rewards match actual balance increases // Note: user.Address() already includes "0x" prefix @@ -507,39 +516,29 @@ func testAuditDataIntegrity(t *testing.T) func(context.Context, *kwilTesting.Pla require.NotNil(t, auditReward1, "User1 should have audit reward") require.NotNil(t, auditReward2, "User2 should have audit reward") - // Expected balance increases: - // User1 = auditReward1 (LP) + totalDPFees (DP) [+ totalValFees (if Leader)] - // User2 = auditReward2 (LP) - infraShare, _ := new(big.Int).SetString(totalDPFeesStr, 10) - expectedIncrease1 := new(big.Int).Add(auditReward1, infraShare) - - if increase1.Cmp(expectedIncrease1) != 0 { - // Try adding validator share if User1 is leader - infraShareVal, _ := new(big.Int).SetString(totalValFeesStr, 10) - expectedIncrease1WithVal := new(big.Int).Add(expectedIncrease1, infraShareVal) - if increase1.Cmp(expectedIncrease1WithVal) == 0 { - t.Logf("User1 appears to be the leader, adding Validator share to expectation") - expectedIncrease1 = expectedIncrease1WithVal - } - } - - require.Equal(t, expectedIncrease1.String(), increase1.String(), "User1 balance increase should match LP + DP (+ Leader) audit rewards") + // User1's audit detail includes DP fee merged with LP reward (via ON CONFLICT), + // so the balance increase should match the audit reward directly. + // Note: user1 is never the validator (injectTestValidator generates a fresh keypair). + require.Equal(t, auditReward1.String(), increase1.String(), "User1 balance increase should match audit reward (LP + DP merged)") require.Equal(t, increase2.String(), auditReward2.String(), "User2 balance increase should match LP audit reward") - // Verify zero-loss in audit - totalLPFeesFromAudit, _ := new(big.Int).SetString(totalLPFeesStr, 10) - auditLPSum := new(big.Int).Add(auditReward1, auditReward2) - require.Equal(t, totalLPFeesFromAudit.String(), auditLPSum.String(), "Audit LP rewards should sum to total LP fees") + // Verify zero-loss: SUM(all detail amounts) = total_fees + var totalDetailSum big.Int + for _, reward := range auditRewards { + totalDetailSum.Add(&totalDetailSum, reward) + } + require.Equal(t, totalFees.String(), totalDetailSum.String(), "SUM(detail amounts) should equal total_fees") totalDPFees, _ := new(big.Int).SetString(totalDPFeesStr, 10) totalValFees, _ := new(big.Int).SetString(totalValFeesStr, 10) - totalFeesFromAudit := new(big.Int).Add(totalLPFeesFromAudit, new(big.Int).Add(totalDPFees, totalValFees)) - require.Equal(t, totalFees.String(), totalFeesFromAudit.String(), "Total fees from audit should match input totalFees") + totalLPFeesFromAudit, _ := new(big.Int).SetString(totalLPFeesStr, 10) + totalFeesFromSummary := new(big.Int).Add(totalLPFeesFromAudit, new(big.Int).Add(totalDPFees, totalValFees)) + require.Equal(t, totalFees.String(), totalFeesFromSummary.String(), "Summary totals should match input totalFees") t.Logf("✅ Audit data integrity verified:") - t.Logf(" - Audit User1 LP reward: %s wei (total increase: %s)", auditReward1.String(), increase1.String()) - t.Logf(" - Audit User2 LP reward: %s wei (total increase: %s)", auditReward2.String(), increase2.String()) - t.Logf(" - Audit total: %s wei (zero-loss verified)", totalFeesFromAudit.String()) + t.Logf(" - Audit User1 reward: %s wei (total increase: %s)", auditReward1.String(), increase1.String()) + t.Logf(" - Audit User2 reward: %s wei (total increase: %s)", auditReward2.String(), increase2.String()) + t.Logf(" - Audit total: %s wei (zero-loss verified)", totalDetailSum.String()) return nil } @@ -592,9 +591,10 @@ func setupLPScenario(t *testing.T, ctx context.Context, platform *kwilTesting.Pl // Final midpoint: best bid=-50, lowest sell=51 → midpoint=50, spread=5 } -// fundVaultAndDistributeFees funds the vault and calls distribute_fees +// fundVaultAndDistributeFees funds the vault and calls distribute_fees. +// If proposer is nil, a random proposer key is generated. func fundVaultAndDistributeFees(t *testing.T, ctx context.Context, platform *kwilTesting.Platform, - user *util.EthereumAddress, marketID int, totalFees *big.Int, winningOutcome bool) error { + user *util.EthereumAddress, marketID int, totalFees *big.Int, winningOutcome bool, proposer ...crypto.PublicKey) error { // Fund vault if fees > 0 (use USDC-only since vault doesn't need TRUF) if totalFees.Sign() > 0 { @@ -615,8 +615,13 @@ func fundVaultAndDistributeFees(t *testing.T, ctx context.Context, platform *kwi return err } - // Generate leader key for fee transfers - pub := NewTestProposerPub(t) + // Use provided proposer or generate one + var pub crypto.PublicKey + if len(proposer) > 0 && proposer[0] != nil { + pub = proposer[0] + } else { + pub = NewTestProposerPub(t) + } // Call distribute_fees tx := &common.TxContext{ diff --git a/tests/streams/order_book/fee_distribution_test.go b/tests/streams/order_book/fee_distribution_test.go index c86dbb00..fc78f8a6 100644 --- a/tests/streams/order_book/fee_distribution_test.go +++ b/tests/streams/order_book/fee_distribution_test.go @@ -3,9 +3,11 @@ package order_book import ( + "bytes" "context" "fmt" "math/big" + "sort" "testing" "time" @@ -39,6 +41,8 @@ func TestFeeDistribution(t *testing.T) { testDistribution1LP(t), // DP/Validator auto-participant creation: non-trading DP and Validator get fees testDistributionDPValidatorAutoParticipant(t), + // Multi-validator: fees split evenly among 3 validators + testDistributionMultipleValidators(t), }, }, testutils.GetTestOptionsWithCache()) } @@ -157,7 +161,7 @@ func testDistribution1Block2LPs(t *testing.T) func(context.Context, *kwilTesting require.NoError(t, err) // Generate leader key for fee transfers - pub := NewTestProposerPub(t) + pub, _ := injectTestValidator(t, platform) tx := &common.TxContext{ Ctx: ctx, @@ -362,7 +366,7 @@ func testDistribution3Blocks2LPs(t *testing.T) func(context.Context, *kwilTestin require.NoError(t, err) // Generate leader key for fee transfers - pub := NewTestProposerPub(t) + pub, _ := injectTestValidator(t, platform) tx := &common.TxContext{ Ctx: ctx, @@ -515,7 +519,7 @@ func testDistributionNoSamples(t *testing.T) func(context.Context, *kwilTesting. require.NoError(t, err) // Generate leader key for fee transfers - pub := NewTestProposerPub(t) + pub, _ := injectTestValidator(t, platform) tx := &common.TxContext{ Ctx: ctx, @@ -657,7 +661,7 @@ func testDistributionZeroFees(t *testing.T) func(context.Context, *kwilTesting.P require.NoError(t, err) // Generate leader key for fee transfers - pub := NewTestProposerPub(t) + pub, _ := injectTestValidator(t, platform) tx := &common.TxContext{ Ctx: ctx, @@ -795,7 +799,7 @@ func testDistribution1LP(t *testing.T) func(context.Context, *kwilTesting.Platfo require.NoError(t, err) // Generate leader key for fee transfers - pub := NewTestProposerPub(t) + pub, _ := injectTestValidator(t, platform) tx := &common.TxContext{ Ctx: ctx, @@ -943,7 +947,7 @@ func testDistributionDPValidatorAutoParticipant(t *testing.T) func(context.Conte require.NoError(t, err) // Generate a known proposer key — this becomes the Validator address - pub := NewTestProposerPub(t) + pub, _ := injectTestValidator(t, platform) validatorAddr := fmt.Sprintf("0x%x", crypto.EthereumAddressFromPubKey(pub)) t.Logf("Validator address: %s", validatorAddr) @@ -1110,3 +1114,202 @@ func testDistributionDPValidatorAutoParticipant(t *testing.T) func(context.Conte return nil } } + +// testDistributionMultipleValidators tests that validator fees are split evenly among +// 3 validators, with remainder going to the first (sorted by pubkey). +func testDistributionMultipleValidators(t *testing.T) func(context.Context, *kwilTesting.Platform) error { + return func(ctx context.Context, platform *kwilTesting.Platform) error { + // Reset balance point tracker + lastBalancePoint = nil + lastTrufBalancePoint = nil + + // Initialize ERC20 extension + err := erc20bridge.ForTestingInitializeExtension(ctx, platform) + require.NoError(t, err) + + // Inject 3 validators (capture all pubkeys to determine sort order) + val1Pub, val1Addr := injectTestValidator(t, platform) + val2Pub, val2Addr := injectTestValidator(t, platform) + val3Pub, val3Addr := injectTestValidator(t, platform) + + t.Logf("Validators: %s, %s, %s", val1Addr[:10], val2Addr[:10], val3Addr[:10]) + + // LP users + user1 := util.Unsafe_NewEthereumAddressFromString("0xAABBCCDDEEFF00112233445566778899AABBCCDD") + user2 := util.Unsafe_NewEthereumAddressFromString("0x112233445566778899AABBCCDDEEFF0011223344") + + err = giveBalanceChained(ctx, platform, user1.Address(), "1000000000000000000000") + require.NoError(t, err) + err = giveBalanceChained(ctx, platform, user2.Address(), "1000000000000000000000") + require.NoError(t, err) + + // Create market with user1 as DP + queryComponents, err := encodeQueryComponentsForTests(user1.Address(), "sttest00000000000000000000000071", "get_record", []byte{0x01}) + require.NoError(t, err) + settleTime := time.Now().Add(24 * time.Hour).Unix() + + var marketID int64 + err = callCreateMarket(ctx, platform, &user1, queryComponents, settleTime, 5, 20, func(row *common.Row) error { + marketID = row.Values[0].(int64) + return nil + }) + require.NoError(t, err) + t.Logf("Created market ID: %d", marketID) + + // Setup LP scenario and sample + setupLPScenario(t, ctx, platform, &user1, &user2, int(marketID)) + err = triggerBatchSampling(ctx, platform, 1000) + require.NoError(t, err) + + // Get validator balances before + val1Before, err := getUSDCBalance(ctx, platform, val1Addr) + require.NoError(t, err) + val2Before, err := getUSDCBalance(ctx, platform, val2Addr) + require.NoError(t, err) + val3Before, err := getUSDCBalance(ctx, platform, val3Addr) + require.NoError(t, err) + + // Distribute 31 TRUF fees (produces non-zero remainder when split 3 ways) + totalFees := new(big.Int).Mul(big.NewInt(31), big.NewInt(1e18)) + + err = giveUSDCBalanceChained(ctx, platform, testUSDCEscrow, totalFees.String()) + require.NoError(t, err) + _, err = erc20bridge.ForTestingForceSyncInstance(ctx, platform, testChain, testEscrow, testERC20, 18) + require.NoError(t, err) + + totalFeesDecimal, err := kwilTypes.ParseDecimalExplicit(totalFees.String(), 78, 0) + require.NoError(t, err) + + // Call distribute_fees with val1 as block proposer + tx := &common.TxContext{ + Ctx: ctx, + BlockContext: &common.BlockContext{ + Height: 1, + Timestamp: time.Now().Unix(), + Proposer: val1Pub, + }, + Signer: user1.Bytes(), + Caller: user1.Address(), + TxID: platform.Txid(), + Authenticator: coreauth.EthPersonalSignAuth, + } + engineCtx := &common.EngineContext{TxContext: tx, OverrideAuthz: true} + + res, err := platform.Engine.Call(engineCtx, platform.DB, "", "distribute_fees", + []any{int(marketID), totalFeesDecimal, true}, nil) + require.NoError(t, err) + if res.Error != nil { + t.Fatalf("distribute_fees error: %v", res.Error) + } + + // Get validator balances after + val1After, err := getUSDCBalance(ctx, platform, val1Addr) + require.NoError(t, err) + val2After, err := getUSDCBalance(ctx, platform, val2Addr) + require.NoError(t, err) + val3After, err := getUSDCBalance(ctx, platform, val3Addr) + require.NoError(t, err) + + // Calculate increases + inc1 := new(big.Int).Sub(val1After, val1Before) + inc2 := new(big.Int).Sub(val2After, val2Before) + inc3 := new(big.Int).Sub(val3After, val3Before) + + t.Logf("Validator payouts: V1=%s, V2=%s, V3=%s", inc1.String(), inc2.String(), inc3.String()) + + // Expected: infraShare = 31 * 10^18 * 125 / 1000 = 3.875 * 10^18 + infraShare := new(big.Int).Div(new(big.Int).Mul(totalFees, big.NewInt(125)), big.NewInt(1000)) + + // Total validator payout should equal infraShare + totalValPayout := new(big.Int).Add(inc1, new(big.Int).Add(inc2, inc3)) + require.Equal(t, infraShare.String(), totalValPayout.String(), + "Total validator payout should equal infraShare") + + // All validators should get > 0 + require.True(t, inc1.Sign() > 0, "V1 should get > 0") + require.True(t, inc2.Sign() > 0, "V2 should get > 0") + require.True(t, inc3.Sign() > 0, "V3 should get > 0") + + // Payouts should differ by at most 1 wei (remainder handling). + // NUMERIC(78,0) division rounds to nearest integer, so for division by 3 + // the remainder is at most 1 in absolute value. + vals := []*big.Int{inc1, inc2, inc3} + minVal, maxVal := new(big.Int).Set(vals[0]), new(big.Int).Set(vals[0]) + for _, v := range vals[1:] { + if v.Cmp(minVal) < 0 { + minVal.Set(v) + } + if v.Cmp(maxVal) > 0 { + maxVal.Set(v) + } + } + diff := new(big.Int).Sub(maxVal, minVal) + require.True(t, diff.Cmp(big.NewInt(1)) <= 0, + "Max difference between validators should be <= 1 wei, got %s", diff.String()) + + // Verify that not all validators got the same amount (remainder is non-zero) + require.True(t, diff.Sign() > 0, + "With 31 TRUF, remainder should be non-zero so validators get different amounts") + + // Verify deterministic remainder recipient: the first validator sorted by pubkey + // should receive the remainder (may be +1 or -1 depending on rounding direction). + type valEntry struct { + pubkey []byte + payout *big.Int + } + entries := []valEntry{ + {pubkey: val1Pub.Bytes(), payout: inc1}, + {pubkey: val2Pub.Bytes(), payout: inc2}, + {pubkey: val3Pub.Bytes(), payout: inc3}, + } + sort.Slice(entries, func(i, j int) bool { + return bytes.Compare(entries[i].pubkey, entries[j].pubkey) < 0 + }) + // The first validator (sorted by pubkey) gets the remainder, making its payout + // different from the others. Verify it's the one with a unique payout. + firstPayout := entries[0].payout + othersSame := entries[1].payout.Cmp(entries[2].payout) == 0 + firstDiffers := firstPayout.Cmp(entries[1].payout) != 0 + require.True(t, othersSame && firstDiffers, + "First validator (sorted by pubkey) should be the sole remainder recipient") + + // Verify audit summary shows correct total validator fees + var totalValFeesStr string + _, err = platform.Engine.Call( + &common.EngineContext{TxContext: &common.TxContext{ + Ctx: ctx, BlockContext: &common.BlockContext{Height: 1, Timestamp: time.Now().Unix()}, + Signer: user1.Bytes(), Caller: user1.Address(), TxID: platform.Txid(), + Authenticator: coreauth.EthPersonalSignAuth, + }, OverrideAuthz: true}, + platform.DB, "", "get_distribution_summary", []any{int(marketID)}, + func(row *common.Row) error { + totalValFeesStr = row.Values[3].(*kwilTypes.Decimal).String() + return nil + }) + require.NoError(t, err) + require.Equal(t, infraShare.String(), totalValFeesStr, + "Audit total_validator_fees should equal full infraShare") + + // Verify all 3 validators have ob_participants entries + for _, addr := range []string{val1Addr, val2Addr, val3Addr} { + var count int + err = platform.Engine.Execute( + &common.EngineContext{TxContext: &common.TxContext{Ctx: ctx, BlockContext: &common.BlockContext{Height: 1}, TxID: platform.Txid()}}, + platform.DB, + "SELECT COUNT(*) as cnt FROM ob_participants WHERE wallet_address = decode(substring($wallet, 3, 40), 'hex')", + map[string]any{"$wallet": addr}, + func(row *common.Row) error { + count = int(row.Values[0].(int64)) + return nil + }, + ) + require.NoError(t, err) + require.Equal(t, 1, count, fmt.Sprintf("Validator %s should have participant entry", addr[:10])) + } + + t.Logf("3-validator split verified: total=%s, min=%s, max=%s, diff=%s", + totalValPayout.String(), minVal.String(), maxVal.String(), diff.String()) + + return nil + } +} diff --git a/tests/streams/order_book/pnl_impact_test.go b/tests/streams/order_book/pnl_impact_test.go index bdaf94d2..7660df24 100644 --- a/tests/streams/order_book/pnl_impact_test.go +++ b/tests/streams/order_book/pnl_impact_test.go @@ -176,7 +176,10 @@ func testPnLImpactSettlement(t *testing.T) func(ctx context.Context, platform *k // Use a real key for User A so we can use them as block proposer (Leader) and Data Provider userAPub := NewTestProposerPub(t) userA := util.Unsafe_NewEthereumAddressFromString(fmt.Sprintf("0x%x", crypto.EthereumAddressFromPubKey(userAPub))) - + + // Inject User A as a validator so distribute_fees pays validator share + platform.Validators.ForTestingAddValidator(userAPub.Bytes(), crypto.KeyTypeSecp256k1, 1) + // Setup FIRST: Inject balance err := InjectDualBalance(ctx, platform, userA.Address(), "100000000000000000000") require.NoError(t, err) diff --git a/tests/streams/order_book/test_helpers_orderbook.go b/tests/streams/order_book/test_helpers_orderbook.go index 0e799581..7fa16ba1 100644 --- a/tests/streams/order_book/test_helpers_orderbook.go +++ b/tests/streams/order_book/test_helpers_orderbook.go @@ -117,6 +117,16 @@ func NewTestProposerPub(t require.TestingT) *crypto.Secp256k1PublicKey { return pubGeneric.(*crypto.Secp256k1PublicKey) } +// injectTestValidator adds a validator to the platform's VoteStore for testing. +// Returns the validator's public key (for use as BlockContext.Proposer) and +// its derived Ethereum address. +func injectTestValidator(t require.TestingT, platform *kwilTesting.Platform) (*crypto.Secp256k1PublicKey, string) { + pub := NewTestProposerPub(t) + platform.Validators.ForTestingAddValidator(pub.Bytes(), crypto.KeyTypeSecp256k1, 1) + addr := fmt.Sprintf("0x%x", crypto.EthereumAddressFromPubKey(pub)) + return pub, addr +} + // ensureNonZeroAddress returns the provided address unless it's the zero address, // in which case it returns a fallback non-zero address. func ensureNonZeroAddress(addr string) string {