Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
92cb350
docs(client-query): rust-document run-indexer-queries
suchapalaver Jun 19, 2025
d7ff2dc
build: import tap-graph with horizon support
suchapalaver Jun 19, 2025
17ee080
feat: support horizon
suchapalaver Jun 26, 2025
7d56492
chore: add debug logging to receipt generation
suchapalaver Aug 14, 2025
fdb8c84
chore: remove dead v1 code
suchapalaver Aug 14, 2025
815e4d0
chore: fix query routing
suchapalaver Aug 14, 2025
ce3d3f3
chore: implement horizon subgraph queries
suchapalaver Aug 14, 2025
c2433ea
chore: add debug logging to subgraph client
suchapalaver Aug 14, 2025
4fbd482
chore: use full trusted indexer url
suchapalaver Aug 14, 2025
497ddca
fix: update subgraph querying
suchapalaver Aug 14, 2025
2f1ccfb
feat: inject what 'too far behind' means
suchapalaver Aug 15, 2025
c9b642e
fix: fix tap receipt header
suchapalaver Aug 15, 2025
e6ce396
build: update tap-aggregator to 0.5.8
suchapalaver Aug 15, 2025
c4b4a4c
chore(network): add debug logging
suchapalaver Aug 15, 2025
76c4118
build: install protobuf compiler
suchapalaver Aug 15, 2025
774fafe
chore(client_query): add query entrypoint log
suchapalaver Aug 15, 2025
ca10bef
fix: fix collection id derivation
suchapalaver Aug 16, 2025
ecf7926
fix: use correct addresses in the EIP712 message hash
suchapalaver Aug 16, 2025
f465800
chore: add more debug logging
suchapalaver Aug 16, 2025
20fc916
build: update cargo dependencies (tap-aggregator)
suchapalaver Aug 17, 2025
8067302
fix: use correct arguments for horizon receipt creation
tmigone Aug 18, 2025
1c50656
chore: bump tap-aggregator dependency to 0.6.0
tmigone Aug 21, 2025
13e63b6
chore: format
tmigone Aug 21, 2025
98e9b09
Tmigone/horizon fixes (#1123)
tmigone Sep 1, 2025
c59483f
chore: update tap-aggergator to latest version
Maikol Sep 1, 2025
08f8f30
remove excessive logging
Theodus Sep 2, 2025
7c979cf
remove more excessive logging
Theodus Sep 2, 2025
29e2be1
remove dependency on tap_aggregator
Theodus Sep 2, 2025
8ba418b
remove unnecessary struct
Theodus Sep 2, 2025
83ba7b9
remove unrelated comment
Theodus Sep 2, 2025
0a1b30a
remove unnecessary argument for receipt creation
Theodus Sep 2, 2025
72646e7
move subgraph service signing context
Theodus Sep 2, 2025
ce5d5ed
test: remove unnecessary tests
tmigone Sep 4, 2025
80faf65
docs: update README.md
tmigone Sep 4, 2025
6f53195
Merge branch 'main' into tmigone/semiotic-main
tmigone Sep 4, 2025
8e05474
fix: receipt serialization missing outer signed wrapper
tmigone Sep 4, 2025
e514f3f
fix: use correct address for tap receipt payer (#1129)
tmigone Sep 8, 2025
15b78b0
Merge branch 'main' into tmigone/semiotic-main
tmigone Sep 9, 2025
7d0aca9
fix: remove duplicate definition merge conflict
tmigone Sep 9, 2025
aca9232
feat: support both legacy and horizon dispute managers for attestatio…
tmigone Oct 29, 2025
5d4d4a5
chore: make clippy happy
tmigone Oct 29, 2025
3bf5d23
Merge branch 'main' into tmigone/semiotic-main
tmigone Oct 29, 2025
652edb0
Merge branch 'tmigone/dual-dispute-manager' into tmigone/semiotic-main
tmigone Oct 29, 2025
84f682d
fix: issues after merge
tmigone Oct 29, 2025
88062be
chore: create v1 or v2 receipts depending on allocation legacy (#1148)
Maikol Nov 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 229 additions & 111 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ axum = { version = "0.8.1", default-features = false, features = [
"tokio",
"http1",
] }
base64 = "0.22.1"
custom_debug = "0.6.1"
faster-hex = "0.10.0"
futures = "0.3"
Expand Down Expand Up @@ -46,8 +47,8 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0.116", features = ["raw_value"] }
serde_with = "3.8.1"
snmalloc-rs = "0.3"
tap_graph = "0.3.3"
thegraph-core = { version = "0.15", features = [
tap_graph = { version = "0.3.4", features = ["v2"] }
thegraph-core = { version = "0.15.1", features = [
"alloy-contract",
"alloy-signer-local",
"attestation",
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ client query. Indexer fees are clamped to a maximum of the gateway's budget.

For an overview of TAP see https://github.com/semiotic-ai/timeline-aggregation-protocol.

The gateway acts as a TAP sender, where each indexer request is sent with a TAP receipt. The gateway
This is a horizon-ready gateway that generates TAP v2 receipts exclusively. The gateway acts as a TAP sender, where each indexer request is sent with a TAP v2 receipt. The gateway
operator is expected to run 2 additional services:

- [tap-aggregator](https://github.com/semiotic-ai/timeline-aggregation-protocol/tree/main/tap_aggregator):
Expand All @@ -113,7 +113,7 @@ operator is expected to run 2 additional services:
The gateway operator is also expected to manage at least 2 wallets:

- sender: requires ETH for transaction gas and GRT to allocate into TAP escrow balances for paying indexers
- authorized signer: used by the gateway and tap-aggregator to sign receipts and RAVs
- authorized signer: used by the gateway and tap-aggregator to sign v2 receipts and RAVs

## operational notes

Expand Down
28 changes: 24 additions & 4 deletions src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,12 @@ async fn run_indexer_queries(
let min_fee = *(min_fee.0 * grt_per_usd * one_grt) / selections.len() as f64;
let indexer_fee = selection.fee.as_f64() * budget as f64;
let fee = indexer_fee.max(min_fee) as u128;
let receipt = match ctx.receipt_signer.create_receipt(largest_allocation, fee) {
let receipt = match ctx.receipt_signer.create_receipt(
largest_allocation,
*indexer,
fee,
selection.data.largest_allocation_is_legacy,
) {
Ok(receipt) => receipt,
Err(err) => {
tracing::error!(?indexer, %deployment, error=?err, "failed to create receipt");
Expand All @@ -303,7 +308,11 @@ async fn run_indexer_queries(
let start_time = Instant::now();
// URL checked: ref df8e647b-1e6e-422a-8846-dc9ee7e0dcc2
let deployment_url = url.join(&format!("subgraphs/id/{deployment}")).unwrap();
let auth = IndexerAuth::Paid(&receipt, ctx.attestation_domain);
let auth = IndexerAuth::Paid(
&receipt,
ctx.attestation_domain,
ctx.legacy_attestation_domain,
);
let result = indexer_client
.query_indexer(deployment_url, auth, &indexer_query)
.in_current_span()
Expand Down Expand Up @@ -464,6 +473,7 @@ struct CandidateMetadata {
#[debug(with = std::fmt::Display::fmt)]
url: Url,
largest_allocation: AllocationId,
largest_allocation_is_legacy: bool,
}

/// Given a list of indexings, build a list of candidates that are within the required block range
Expand Down Expand Up @@ -572,6 +582,7 @@ fn build_candidates_list(
deployment,
url: indexing.indexer.url.clone(),
largest_allocation: indexing.largest_allocation,
largest_allocation_is_legacy: indexing.largest_allocation_is_legacy,
},
perf: perf.response,
fee: Normalized::new(indexing.fee as f64 / budget as f64).unwrap_or(Normalized::ONE),
Expand Down Expand Up @@ -682,7 +693,12 @@ pub async fn handle_indexer_query(
let fee = *(ctx.budgeter.query_fees_target.0 * grt_per_usd * one_grt) as u128;

let allocation = indexing.largest_allocation;
let receipt = match ctx.receipt_signer.create_receipt(allocation, fee) {
let receipt = match ctx.receipt_signer.create_receipt(
allocation,
*indexer,
fee,
indexing.largest_allocation_is_legacy,
) {
Ok(receipt) => receipt,
Err(err) => {
return Err(Error::Internal(anyhow!("failed to create receipt: {err}")));
Expand All @@ -695,7 +711,11 @@ pub async fn handle_indexer_query(
.url
.join(&format!("subgraphs/id/{deployment}"))
.unwrap();
let indexer_auth = IndexerAuth::Paid(&receipt, ctx.attestation_domain);
let indexer_auth = IndexerAuth::Paid(
&receipt,
ctx.attestation_domain,
ctx.legacy_attestation_domain,
);

let indexer_start_time = Instant::now();
let result = ctx
Expand Down
1 change: 1 addition & 0 deletions src/client_query/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ pub struct Context {
pub network: NetworkService,
pub indexing_perf: IndexingPerformance,
pub attestation_domain: &'static Eip712Domain,
pub legacy_attestation_domain: &'static Eip712Domain,
pub reporter: mpsc::UnboundedSender<reports::ClientRequest>,
}
11 changes: 9 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct Config {
/// Minimum indexer-service version that will receive queries
#[serde_as(as = "DisplayFromStr")]
pub min_indexer_version: Version,
/// Indexers used to query the network subgraph
/// Trusted indexers that can serve the network subgraph for free
pub trusted_indexers: Vec<TrustedIndexer>,
/// Maximum acceptable lag (in seconds) for network subgraph responses (default: 120)
#[serde(default = "default_network_subgraph_max_lag_seconds")]
Expand All @@ -61,6 +61,8 @@ pub struct Config {
#[serde(deserialize_with = "deserialize_not_nan_f64")]
pub query_fees_target: NotNan<f64>,
pub receipts: Receipts,
/// Address for the Subgraph Service
pub subgraph_service: Address,
}

/// Default network subgraph max lag threshold (120 seconds)
Expand Down Expand Up @@ -128,6 +130,7 @@ pub enum BlocklistEntry {
pub struct AttestationConfig {
pub chain_id: String,
pub dispute_manager: Address,
pub legacy_dispute_manager: Address,
}

/// The exchange rate provider.
Expand Down Expand Up @@ -183,10 +186,14 @@ impl From<KafkaConfig> for rdkafka::config::ClientConfig {
pub struct Receipts {
/// TAP verifier contract chain
pub chain_id: U256,
/// TAP payer address
pub payer: Address,
/// TAP signer key
pub signer: B256,
/// TAP verifier contract address
/// TAP verifier contract address (v2)
pub verifier: Address,
/// Legacy TAP verifier contract address (v1)
pub legacy_verifier: Address,
}

/// Load the configuration from a JSON file.
Expand Down
18 changes: 13 additions & 5 deletions src/indexer_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct IndexerClient {
}

pub enum IndexerAuth<'a> {
Paid(&'a Receipt, &'a Eip712Domain),
Paid(&'a Receipt, &'a Eip712Domain, &'a Eip712Domain),
Free(&'a str),
}

Expand All @@ -47,7 +47,7 @@ impl IndexerClient {
query: &str,
) -> Result<IndexerResponse, IndexerError> {
let (auth_key, auth_value) = match auth {
IndexerAuth::Paid(receipt, _) => ("Tap-Receipt", receipt.serialize()),
IndexerAuth::Paid(receipt, _, _) => ("tap-receipt", receipt.serialize()),
IndexerAuth::Free(token) => (AUTHORIZATION.as_str(), format!("Bearer {token}")),
};

Expand Down Expand Up @@ -113,18 +113,26 @@ impl IndexerClient {
return Err(BadResponse(format!("unattestable response: {error}")));
}

if let IndexerAuth::Paid(receipt, attestation_domain) = auth {
if let IndexerAuth::Paid(receipt, attestation_domain, legacy_attestation_domain) = auth {
match &payload.attestation {
Some(attestation) => {
let allocation = receipt.allocation();
if let Err(err) = attestation::verify(
if let Err(legacy_err) = attestation::verify(
legacy_attestation_domain,
attestation,
&allocation,
query,
&original_response,
) && let Err(err) = attestation::verify(
attestation_domain,
attestation,
&allocation,
query,
&original_response,
) {
return Err(BadResponse(format!("bad attestation: {err}")));
return Err(BadResponse(format!(
"bad attestation: {legacy_err} - {err}"
)));
}
}
None => {
Expand Down
13 changes: 13 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ async fn main() {
conf.attestations.dispute_manager,
)));

let legacy_attestation_domain: &'static Eip712Domain =
Box::leak(Box::new(attestation::eip712_domain(
conf.attestations
.chain_id
.parse::<ChainId>()
.expect("failed to parse attestation domain chain_id"),
conf.attestations.legacy_dispute_manager,
)));

let indexer_client = IndexerClient {
client: http_client.clone(),
};
Expand Down Expand Up @@ -134,9 +143,12 @@ async fn main() {
network.wait_until_ready().await;

let receipt_signer: &'static ReceiptSigner = Box::leak(Box::new(ReceiptSigner::new(
conf.receipts.payer,
receipt_signer,
conf.receipts.chain_id,
conf.receipts.verifier,
conf.receipts.legacy_verifier,
conf.subgraph_service,
)));

let auth_service = init_auth_service(
Expand Down Expand Up @@ -171,6 +183,7 @@ async fn main() {
indexing_perf,
network,
attestation_domain,
legacy_attestation_domain,
reporter,
};

Expand Down
8 changes: 8 additions & 0 deletions src/network/indexer_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct IndexerInfo<I> {
pub struct IndexingRawInfo {
/// The largest allocation.
pub largest_allocation: AllocationId,
/// Whether the largest allocation is a legacy allocation
pub largest_allocation_is_legacy: bool,
}

/// Internal representation of the fetched indexer's indexing information.
Expand All @@ -65,6 +67,9 @@ pub struct IndexingInfo<P, C> {
/// The largest allocation.
pub largest_allocation: AllocationId,

/// Whether the largest allocation is a legacy allocation
pub largest_allocation_is_legacy: bool,

/// The indexing progress information
///
/// See [`IndexingProgress`] for more information.
Expand All @@ -78,6 +83,7 @@ impl From<IndexingRawInfo> for IndexingInfo<(), ()> {
fn from(raw: IndexingRawInfo) -> Self {
Self {
largest_allocation: raw.largest_allocation,
largest_allocation_is_legacy: raw.largest_allocation_is_legacy,
progress: (),
fee: (),
}
Expand All @@ -93,6 +99,7 @@ impl IndexingInfo<(), ()> {
) -> IndexingInfo<IndexingProgress, ()> {
IndexingInfo {
largest_allocation: self.largest_allocation,
largest_allocation_is_legacy: self.largest_allocation_is_legacy,
progress,
fee: self.fee,
}
Expand All @@ -105,6 +112,7 @@ impl IndexingInfo<IndexingProgress, ()> {
fn with_fee(self, fee: u128) -> IndexingInfo<IndexingProgress, u128> {
IndexingInfo {
largest_allocation: self.largest_allocation,
largest_allocation_is_legacy: self.largest_allocation_is_legacy,
progress: self.progress,
fee,
}
Expand Down
42 changes: 26 additions & 16 deletions src/network/pre_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub fn into_internal_indexers_raw_info<'a>(
) -> HashMap<IndexerId, IndexerRawInfo> {
let mut indexer_indexing_largest_allocation: HashMap<
(IndexerId, DeploymentId),
(AllocationId, u128),
(AllocationId, u128, bool),
> = HashMap::new();

data.flat_map(|subgraph| {
Expand Down Expand Up @@ -53,32 +53,42 @@ pub fn into_internal_indexers_raw_info<'a>(
};

// Update the indexer's indexings largest allocations table
let indexing_largest_allocation = match indexer_indexing_largest_allocation
.entry((indexer_id, deployment_id))
{
Entry::Vacant(entry) => {
entry.insert((allocation.id, allocation.allocated_tokens));
allocation.id
}
Entry::Occupied(entry) => {
let (largest_allocation_address, largest_allocation_amount) = entry.into_mut();
if allocation.allocated_tokens > *largest_allocation_amount {
*largest_allocation_address = allocation.id;
*largest_allocation_amount = allocation.allocated_tokens;
let (indexing_largest_allocation, indexing_largest_allocation_is_legacy) =
match indexer_indexing_largest_allocation.entry((indexer_id, deployment_id)) {
Entry::Vacant(entry) => {
entry.insert((
allocation.id,
allocation.allocated_tokens,
allocation.is_legacy,
));
(allocation.id, allocation.is_legacy)
}
*largest_allocation_address
}
};
Entry::Occupied(entry) => {
let (
largest_allocation_address,
largest_allocation_amount,
largest_allocation_is_legacy,
) = entry.into_mut();
if allocation.allocated_tokens > *largest_allocation_amount {
*largest_allocation_address = allocation.id;
*largest_allocation_amount = allocation.allocated_tokens;
*largest_allocation_is_legacy = allocation.is_legacy;
}
(*largest_allocation_address, *largest_allocation_is_legacy)
}
};

// Update the indexer's indexings info
let indexing = indexer
.indexings
.entry(deployment_id)
.or_insert(IndexingRawInfo {
largest_allocation: allocation.id,
largest_allocation_is_legacy: allocation.is_legacy,
});

indexing.largest_allocation = indexing_largest_allocation;
indexing.largest_allocation_is_legacy = indexing_largest_allocation_is_legacy;
}

acc
Expand Down
3 changes: 3 additions & 0 deletions src/network/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct Indexing {
/// This is, among all allocations associated with the indexer and deployment, the address
/// with the largest amount of allocated tokens.
pub largest_allocation: AllocationId,
/// Whether the largest allocation is a legacy allocation
pub largest_allocation_is_legacy: bool,
/// The indexer
pub indexer: Arc<Indexer>,
/// The indexing progress.
Expand Down Expand Up @@ -365,6 +367,7 @@ fn construct_indexings_table_row(
let indexing = Indexing {
id: indexing_id,
largest_allocation: indexing_largest_allocation_addr,
largest_allocation_is_legacy: indexing_info.largest_allocation_is_legacy,
indexer: Arc::clone(indexer),
progress: IndexingProgress {
latest_block: indexing_progress.latest_block,
Expand Down
4 changes: 3 additions & 1 deletion src/network/subgraph_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub mod types {
pub id: AllocationId,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub allocated_tokens: u128,
pub is_legacy: bool,
pub indexer: Indexer,
}

Expand All @@ -94,7 +95,7 @@ pub mod types {
#[serde_as]
#[derive(Clone, CustomDebug, Deserialize)]
pub struct TrustedIndexer {
/// network subgraph endpoint
/// Complete network subgraph endpoint URL (e.g., http://indexer:7601/subgraphs/id/Qmc2Cb...)
#[debug(with = std::fmt::Display::fmt)]
#[serde_as(as = "serde_with::DisplayFromStr")]
pub url: Url,
Expand Down Expand Up @@ -180,6 +181,7 @@ impl Client {
) {
id
allocatedTokens
isLegacy
indexer {
id
url
Expand Down
Loading