Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions extensions/tn_utils/precompiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"math"
"math/big"
"sort"
"strings"

"github.com/trufnetwork/kwil-db/common"
Expand Down Expand Up @@ -41,6 +42,8 @@ func buildPrecompile() precompiles.Precompile {
getCallerBytesMethod(),
getLeaderHexMethod(),
getLeaderBytesMethod(),
getValidatorsMethod(),
getValidatorCountMethod(),
},
}
}
Expand Down Expand Up @@ -177,6 +180,112 @@ func getLeaderBytesMethod() precompiles.Method {
}
}

// getValidatorsMethod returns all active validators as a table of (wallet_hex, wallet_bytes, power).
// Results are sorted deterministically by public key bytes for consensus safety.
func getValidatorsMethod() precompiles.Method {
return precompiles.Method{
Name: "get_validators",
AccessModifiers: []precompiles.Modifier{precompiles.VIEW, precompiles.PUBLIC},
Parameters: []precompiles.PrecompileValue{},
Returns: &precompiles.MethodReturn{
IsTable: true,
Fields: []precompiles.PrecompileValue{
precompiles.NewPrecompileValue("wallet_hex", types.TextType, false),
precompiles.NewPrecompileValue("wallet_bytes", types.ByteaType, false),
precompiles.NewPrecompileValue("power", types.IntType, false),
},
},
Handler: func(ctx *common.EngineContext, app *common.App, inputs []any, resultFn func([]any) error) error {
if app == nil || app.Validators == nil {
return nil // No validators available, return empty table
}

validators := app.Validators.GetValidators()
if len(validators) == 0 {
return nil
}

// Defensive shallow copy to avoid mutating the caller's slice
validatorsCopy := make([]*types.Validator, len(validators))
copy(validatorsCopy, validators)

// Sort deterministically by pubkey bytes for consensus safety
sort.Slice(validatorsCopy, func(i, j int) bool {
return bytes.Compare(validatorsCopy[i].Identifier, validatorsCopy[j].Identifier) < 0
})

validators = validatorsCopy

for _, v := range validators {
if v.Power <= 0 {
continue
}

var walletHex string
var walletBytes []byte

if v.KeyType == crypto.KeyTypeSecp256k1 {
secp, err := crypto.UnmarshalSecp256k1PublicKey(v.Identifier)
if err != nil {
continue // Skip validators with invalid keys
}
addr := crypto.EthereumAddressFromPubKey(secp)
walletHex = "0x" + hex.EncodeToString(addr)
walletBytes = addr
} else {
// Non-secp256k1 validators: use raw pubkey
walletHex = "0x" + hex.EncodeToString(v.Identifier)
walletBytes = v.Identifier
}

if err := resultFn([]any{walletHex, walletBytes, v.Power}); err != nil {
return err
}
}

return nil
},
}
}

// getValidatorCountMethod returns the number of active validators (power > 0) as a scalar INT.
// This avoids iterating get_validators() just to count, enabling single-pass distribution loops.
func getValidatorCountMethod() precompiles.Method {
return precompiles.Method{
Name: "get_validator_count",
AccessModifiers: []precompiles.Modifier{precompiles.VIEW, precompiles.PUBLIC},
Parameters: []precompiles.PrecompileValue{},
Returns: &precompiles.MethodReturn{
IsTable: false,
Fields: []precompiles.PrecompileValue{
precompiles.NewPrecompileValue("count", types.IntType, false),
},
},
Handler: func(ctx *common.EngineContext, app *common.App, inputs []any, resultFn func([]any) error) error {
if app == nil || app.Validators == nil {
return resultFn([]any{int64(0)})
}

validators := app.Validators.GetValidators()
var count int64
for _, v := range validators {
if v.Power <= 0 {
continue
}
// Mirror get_validators() filtering: skip malformed secp256k1 keys
if v.KeyType == crypto.KeyTypeSecp256k1 {
if _, err := crypto.UnmarshalSecp256k1PublicKey(v.Identifier); err != nil {
continue
}
}
count++
}

return resultFn([]any{count})
},
}
}

// unpackQueryComponentsMethod extracts (dataProvider, streamID, actionID, args) from ABI-encoded bytes.
func unpackQueryComponentsMethod() precompiles.Method {
return precompiles.Method{
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ require (
github.com/spf13/cobra v1.9.1
github.com/stretchr/testify v1.11.1
github.com/testcontainers/testcontainers-go v0.37.0
github.com/trufnetwork/kwil-db v0.10.3-0.20260303100144-0119418a1a7c
github.com/trufnetwork/kwil-db/core v0.4.3-0.20260303100144-0119418a1a7c
github.com/trufnetwork/kwil-db v0.10.3-0.20260311113716-5415d420dcc6
github.com/trufnetwork/kwil-db/core v0.4.3-0.20260311113716-5415d420dcc6
github.com/trufnetwork/sdk-go v0.6.4-0.20260224122406-a741343e2f37
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1244,10 +1244,22 @@ github.com/trufnetwork/kwil-db v0.10.3-0.20260216231327-01b863886682 h1:Gqee9/lN
github.com/trufnetwork/kwil-db v0.10.3-0.20260216231327-01b863886682/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo=
github.com/trufnetwork/kwil-db v0.10.3-0.20260303100144-0119418a1a7c h1:lvyTdrm1gzLqCmS+sqsg2JZWnoWo0ORKKBL8Z91C/JU=
github.com/trufnetwork/kwil-db v0.10.3-0.20260303100144-0119418a1a7c/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo=
github.com/trufnetwork/kwil-db v0.10.3-0.20260311105956-61f1dad27875 h1:dwJejERTLmehMs6lEY7+nnUKfyrEEYIgP5Vlm8FGeQ8=
github.com/trufnetwork/kwil-db v0.10.3-0.20260311105956-61f1dad27875/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo=
github.com/trufnetwork/kwil-db v0.10.3-0.20260311112239-ab4bf677c84b h1:9LnfIK51aIc2Hx+2onVf7gK/ggR95m/1vrsU2FOZp4w=
github.com/trufnetwork/kwil-db v0.10.3-0.20260311112239-ab4bf677c84b/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo=
github.com/trufnetwork/kwil-db v0.10.3-0.20260311113716-5415d420dcc6 h1:WjEGspN5kp6JPpX/p6f6iLCHGyXg9k/UpIlcPLpqfLE=
github.com/trufnetwork/kwil-db v0.10.3-0.20260311113716-5415d420dcc6/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo=
github.com/trufnetwork/kwil-db/core v0.4.3-0.20260216231327-01b863886682 h1:iaxXr8D3dU79MBhmS/uCuBhnlc+gbLvCvV6GtAz3ukw=
github.com/trufnetwork/kwil-db/core v0.4.3-0.20260216231327-01b863886682/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ=
github.com/trufnetwork/kwil-db/core v0.4.3-0.20260303100144-0119418a1a7c h1:O5pyUJqZNNIi/l1vXc9fxycdEU9OxF5z8UQprTn4zZE=
github.com/trufnetwork/kwil-db/core v0.4.3-0.20260303100144-0119418a1a7c/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ=
github.com/trufnetwork/kwil-db/core v0.4.3-0.20260311105956-61f1dad27875 h1:KxTKPF7zTedqFRkXO02Gil6r31Ra9I038JR7ZWMViA8=
github.com/trufnetwork/kwil-db/core v0.4.3-0.20260311105956-61f1dad27875/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ=
github.com/trufnetwork/kwil-db/core v0.4.3-0.20260311112239-ab4bf677c84b h1:vNwvWLDjh4s8qvmS1UKISZee67YS9a2vncPC7W555dI=
github.com/trufnetwork/kwil-db/core v0.4.3-0.20260311112239-ab4bf677c84b/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ=
github.com/trufnetwork/kwil-db/core v0.4.3-0.20260311113716-5415d420dcc6 h1:jyPGV1M4VVpvf76zaCKaEYrWz8IHDW4ukFHqL2shJx0=
github.com/trufnetwork/kwil-db/core v0.4.3-0.20260311113716-5415d420dcc6/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ=
github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2 h1:DCq8MzbWH0wZmICNmMVsSzUHUPl+2vqRhluEABjxl88=
github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2/go.mod h1:Y0MJpPp9QXU5vC6Gpoilql2NkgmGNcbHm9HYC2v2N8s=
github.com/trufnetwork/sdk-go v0.6.4-0.20260224122406-a741343e2f37 h1:VD/GWxLTshaXpLukEc1SXbG7QA9HrFzF8JvxJAJ/x7Q=
Expand Down
149 changes: 98 additions & 51 deletions internal/migrations/033-order-book-settlement.sql
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,23 @@ CREATE OR REPLACE ACTION distribute_fees(
$infra_share NUMERIC(78, 0) := ($total_fees * 125::NUMERIC(78, 0)) / 1000::NUMERIC(78, 0);
$lp_share := $lp_share + ($total_fees - $lp_share - (2::NUMERIC(78, 0) * $infra_share));

-- Pre-compute block_count and create parent distribution record early
-- so DP and validator detail rows can reference it via FK
$block_count INT := 0;
for $row in SELECT COUNT(DISTINCT block) as cnt FROM ob_rewards WHERE query_id = $query_id { $block_count := $row.cnt; }

$lp_count INT := 0;
$actual_fees_distributed NUMERIC(78, 0) := '0'::NUMERIC(78, 0);
$distribution_id INT;
for $row in SELECT COALESCE(MAX(id), 0) + 1 as val FROM ob_fee_distributions { $distribution_id := $row.val; }

-- PRE-INSERT PARENT RECORD with placeholders (updated at end with final values)
INSERT INTO ob_fee_distributions (
id, query_id, total_fees_distributed, total_dp_fees, total_validator_fees, total_lp_count, block_count, distributed_at
) VALUES (
$distribution_id, $query_id, '0'::NUMERIC(78, 0), '0'::NUMERIC(78, 0), '0'::NUMERIC(78, 0), 0, $block_count, @block_timestamp
);

-- Payout Data Provider
$dp_addr BYTEA;
for $row in tn_utils.unpack_query_components($query_components) { $dp_addr := $row.data_provider; }
Expand All @@ -107,61 +124,85 @@ CREATE OR REPLACE ACTION distribute_fees(
$next_id_dp INT;
for $row in SELECT COALESCE(MAX(id), 0::INT) + 1 as val FROM ob_net_impacts { $next_id_dp := $row.val; }
ob_record_net_impact($next_id_dp, $query_id, $dp_pid, $winning_outcome, 0::INT8, $infra_share, FALSE);

-- Record DP in distribution details so indexer picks it up
INSERT INTO ob_fee_distribution_details (distribution_id, participant_id, wallet_address, reward_amount, total_reward_percent)
VALUES ($distribution_id, $dp_pid, $dp_addr, $infra_share, 12.50::NUMERIC(10, 2));
}
}

-- Payout Validator (Leader)
-- Payout Validators (split evenly among all active validators)
$actual_validator_fees NUMERIC(78, 0) := '0'::NUMERIC(78, 0);
$val_wallet TEXT := tn_utils.get_leader_hex();
if $val_wallet != '' AND $infra_share > '0'::NUMERIC(78, 0) {
if $bridge = 'hoodi_tt2' { hoodi_tt2.unlock($val_wallet, $infra_share); }
else if $bridge = 'sepolia_bridge' { sepolia_bridge.unlock($val_wallet, $infra_share); }
else if $bridge = 'ethereum_bridge' { ethereum_bridge.unlock($val_wallet, $infra_share); }

$actual_validator_fees := $infra_share;

-- Ensure Validator has a participant record so the fee is tracked in ob_net_impacts
$val_pid INT;
$leader_bytes BYTEA := tn_utils.get_leader_bytes();
for $p in SELECT id FROM ob_participants WHERE wallet_address = $leader_bytes { $val_pid := $p.id; }
if $val_pid IS NULL AND $leader_bytes IS NOT NULL {
INSERT INTO ob_participants (id, wallet_address)
SELECT COALESCE(MAX(id), 0) + 1, $leader_bytes
FROM ob_participants;
for $p in SELECT id FROM ob_participants WHERE wallet_address = $leader_bytes { $val_pid := $p.id; }
}
if $val_pid IS NOT NULL {
$next_id_val INT;
for $row in SELECT COALESCE(MAX(id), 0::INT) + 1 as val FROM ob_net_impacts { $next_id_val := $row.val; }
ob_record_net_impact($next_id_val, $query_id, $val_pid, $winning_outcome, 0::INT8, $infra_share, FALSE);

if $infra_share > '0'::NUMERIC(78, 0) {
$validator_count INT := tn_utils.get_validator_count();

if $validator_count > 0 {
$per_validator NUMERIC(78, 0) := $infra_share / $validator_count::NUMERIC(78, 0);
$val_remainder NUMERIC(78, 0) := $infra_share - ($per_validator * $validator_count::NUMERIC(78, 0));
$first_validator BOOL := TRUE;
$val_pct NUMERIC(10, 2) := 12.50::NUMERIC(10, 2) / $validator_count::NUMERIC(10, 2);
$val_pct_remainder NUMERIC(10, 2) := 12.50::NUMERIC(10, 2) - ($val_pct * $validator_count::NUMERIC(10, 2));

for $v in tn_utils.get_validators() {
-- Extract row fields to local variables (Kuneiform SQL generator limitation)
$v_wallet_hex TEXT := $v.wallet_hex;
$v_wallet_bytes BYTEA := $v.wallet_bytes;
$v_payout NUMERIC(78, 0) := $per_validator;
$v_pct NUMERIC(10, 2) := $val_pct;

-- Give remainder to first validator (deterministic: sorted by pubkey)
if $first_validator {
$v_payout := $v_payout + $val_remainder;
$v_pct := $v_pct + $val_pct_remainder;
$first_validator := FALSE;
}

-- Skip validators with zero payout (e.g., infra_share < validator_count)
if $v_payout > '0'::NUMERIC(78, 0) {
-- Unlock funds via bridge
if $bridge = 'hoodi_tt2' { hoodi_tt2.unlock($v_wallet_hex, $v_payout); }
else if $bridge = 'sepolia_bridge' { sepolia_bridge.unlock($v_wallet_hex, $v_payout); }
else if $bridge = 'ethereum_bridge' { ethereum_bridge.unlock($v_wallet_hex, $v_payout); }

-- Ensure validator has a participant record
$v_pid INT;
for $p in SELECT id FROM ob_participants WHERE wallet_address = $v_wallet_bytes { $v_pid := $p.id; }
if $v_pid IS NULL {
INSERT INTO ob_participants (id, wallet_address)
SELECT COALESCE(MAX(id), 0) + 1, $v_wallet_bytes
FROM ob_participants;
for $p in SELECT id FROM ob_participants WHERE wallet_address = $v_wallet_bytes { $v_pid := $p.id; }
}
if $v_pid IS NOT NULL {
$next_id_val INT;
for $row in SELECT COALESCE(MAX(id), 0::INT) + 1 as val FROM ob_net_impacts { $next_id_val := $row.val; }
ob_record_net_impact($next_id_val, $query_id, $v_pid, $winning_outcome, 0::INT8, $v_payout, FALSE);

-- Record validator in distribution details so indexer picks it up
INSERT INTO ob_fee_distribution_details (distribution_id, participant_id, wallet_address, reward_amount, total_reward_percent)
VALUES ($distribution_id, $v_pid, $v_wallet_bytes, $v_payout, $v_pct)
ON CONFLICT (distribution_id, participant_id) DO UPDATE
SET reward_amount = ob_fee_distribution_details.reward_amount + $v_payout,
total_reward_percent = ob_fee_distribution_details.total_reward_percent + $v_pct;
}

$actual_validator_fees := $actual_validator_fees + $v_payout;
}
}
}
}

-- LP Distribution Pre-calculation
-- LP Distribution
-- NOTE: sample_lp_rewards() is called in process_settlement() BEFORE ob_positions are deleted,
-- so the final sample reads the live order book. Do NOT call it here.

$block_count INT := 0;
for $row in SELECT COUNT(DISTINCT block) as cnt FROM ob_rewards WHERE query_id = $query_id { $block_count := $row.cnt; }

$lp_count INT := 0;
$actual_fees_distributed NUMERIC(78, 0) := '0'::NUMERIC(78, 0);
$distribution_id INT;
for $row in SELECT COALESCE(MAX(id), 0) + 1 as val FROM ob_fee_distributions { $distribution_id := $row.val; }

-- PRE-INSERT PARENT RECORD to satisfy FK for details
INSERT INTO ob_fee_distributions (
id, query_id, total_fees_distributed, total_dp_fees, total_validator_fees, total_lp_count, block_count, distributed_at
) VALUES (
$distribution_id, $query_id, '0'::NUMERIC(78, 0), $actual_dp_fees, $actual_validator_fees, 0, $block_count, @block_timestamp
);

if $block_count > 0 {
for $row in SELECT MIN(participant_id) as val FROM ob_rewards WHERE query_id = $query_id { $min_pid := $row.val; }

-- Calculate remainder
$total_calc NUMERIC(78, 0) := '0'::NUMERIC(78, 0);
for $res in
for $res in
SELECT participant_id, SUM(reward_percent) as reward_percent
FROM ob_rewards WHERE query_id = $query_id GROUP BY participant_id
{
Expand All @@ -187,32 +228,38 @@ CREATE OR REPLACE ACTION distribute_fees(
$wallet_hexes := array_append($wallet_hexes, $res.wallet_hex);
$amounts := array_append($amounts, $reward_final);
$outcomes := array_append($outcomes, $winning_outcome);
-- Record audit detail (Parent exists now)

-- Record LP audit detail (skip if percent rounds to 0 at NUMERIC(10,2) precision)
$curr_pid INT := $res.participant_id;
$curr_wallet BYTEA := $res.wallet_address;
$curr_reward NUMERIC(78,0) := $reward_final;
$curr_pct NUMERIC(10,2) := $res.reward_percent::NUMERIC(10, 2);

INSERT INTO ob_fee_distribution_details (distribution_id, participant_id, wallet_address, reward_amount, total_reward_percent)
VALUES ($distribution_id, $curr_pid, $curr_wallet, $curr_reward, $curr_pct);
if $curr_pct > 0.00::NUMERIC(10, 2) {
INSERT INTO ob_fee_distribution_details (distribution_id, participant_id, wallet_address, reward_amount, total_reward_percent)
VALUES ($distribution_id, $curr_pid, $curr_wallet, $curr_reward, $curr_pct)
ON CONFLICT (distribution_id, participant_id) DO UPDATE
SET reward_amount = ob_fee_distribution_details.reward_amount + $curr_reward,
total_reward_percent = ob_fee_distribution_details.total_reward_percent + $curr_pct;
}
}
}

if COALESCE(array_length($pids), 0) > 0 {
$lp_count := array_length($pids);
$actual_fees_distributed := $lp_share;

-- UPDATE SUMMARY with final values
UPDATE ob_fee_distributions
SET total_fees_distributed = $actual_fees_distributed,
total_lp_count = $lp_count
WHERE id = $distribution_id;

ob_batch_unlock_collateral($query_id, $bridge, $pids, $wallet_hexes, $amounts, $outcomes);
}
}

-- UPDATE SUMMARY with final values
UPDATE ob_fee_distributions
SET total_fees_distributed = $actual_fees_distributed,
total_dp_fees = $actual_dp_fees,
total_validator_fees = $actual_validator_fees,
total_lp_count = $lp_count
WHERE id = $distribution_id;

if $lp_count > 0 { DELETE FROM ob_rewards WHERE query_id = $query_id; }
};

Expand Down
Loading
Loading