|
1 | | -use std::collections::BTreeMap; |
| 1 | +use std::collections::{BTreeMap, HashMap}; |
2 | 2 |
|
3 | 3 | use async_trait::async_trait; |
| 4 | +use graph::components::subgraph::ProofOfIndexingFinisher; |
4 | 5 | use graph::data::query::Trace; |
5 | 6 | use graph::data::store::Id; |
6 | 7 | use graph::prelude::alloy::primitives::Address; |
@@ -454,6 +455,237 @@ where |
454 | 455 | Ok(r::Value::List(public_poi_results)) |
455 | 456 | } |
456 | 457 |
|
| 458 | + async fn resolve_block_for_poi( |
| 459 | + &self, |
| 460 | + field: &a::Field, |
| 461 | + ) -> Result<r::Value, QueryExecutionError> { |
| 462 | + const CHUNK_SIZE: i32 = 1_000_000; |
| 463 | + |
| 464 | + let deployment_id = field |
| 465 | + .get_required::<DeploymentHash>("subgraph") |
| 466 | + .expect("Valid subgraph required"); |
| 467 | + let target_poi_hash = field |
| 468 | + .get_required::<BlockHash>("targetPoi") |
| 469 | + .expect("Valid targetPoi required"); |
| 470 | + let start_block = field |
| 471 | + .get_required::<BlockNumber>("startBlock") |
| 472 | + .expect("Valid startBlock required"); |
| 473 | + let end_block = field |
| 474 | + .get_required::<BlockNumber>("endBlock") |
| 475 | + .expect("Valid endBlock required"); |
| 476 | + |
| 477 | + let indexer = Some( |
| 478 | + field |
| 479 | + .get_required::<Address>("indexer") |
| 480 | + .expect("Valid indexer required"), |
| 481 | + ); |
| 482 | + |
| 483 | + if end_block <= start_block { |
| 484 | + return Ok(r::Value::Null); |
| 485 | + } |
| 486 | + |
| 487 | + let target_bytes: [u8; 32] = match target_poi_hash.as_slice().try_into() { |
| 488 | + Ok(bytes) => bytes, |
| 489 | + Err(_) => { |
| 490 | + error!( |
| 491 | + self.logger, |
| 492 | + "Invalid targetPoi: expected 32 bytes"; |
| 493 | + "got_bytes" => target_poi_hash.as_slice().len() |
| 494 | + ); |
| 495 | + return Ok(r::Value::Null); |
| 496 | + } |
| 497 | + }; |
| 498 | + |
| 499 | + // Resolve the network for this deployment |
| 500 | + let network = match self.store.network_for_deployment(&deployment_id).await { |
| 501 | + Ok(n) => n, |
| 502 | + Err(e) => { |
| 503 | + error!( |
| 504 | + self.logger, |
| 505 | + "Failed to resolve network for deployment"; |
| 506 | + "subgraph" => &deployment_id, |
| 507 | + "error" => format!("{:?}", e) |
| 508 | + ); |
| 509 | + return Ok(r::Value::Null); |
| 510 | + } |
| 511 | + }; |
| 512 | + |
| 513 | + // Fetch the full digest history for the block range |
| 514 | + let history = match self |
| 515 | + .store |
| 516 | + .get_poi_digest_history(&deployment_id, start_block..end_block) |
| 517 | + .await |
| 518 | + { |
| 519 | + Ok(Some(h)) => h, |
| 520 | + Ok(None) => return Ok(r::Value::Null), |
| 521 | + Err(e) => { |
| 522 | + error!( |
| 523 | + self.logger, |
| 524 | + "Failed to fetch POI digest history"; |
| 525 | + "subgraph" => &deployment_id, |
| 526 | + "error" => format!("{:?}", e) |
| 527 | + ); |
| 528 | + return Ok(r::Value::Null); |
| 529 | + } |
| 530 | + }; |
| 531 | + |
| 532 | + let poi_version = history.poi_version; |
| 533 | + |
| 534 | + // Build a lookup structure: for each causality region id, a sorted |
| 535 | + // vec of (start_block, end_block, digest) for binary search. |
| 536 | + let mut region_entries: HashMap<Id, Vec<(BlockNumber, BlockNumber, Vec<u8>)>> = |
| 537 | + HashMap::new(); |
| 538 | + for entry in history.entries { |
| 539 | + region_entries.entry(entry.id).or_default().push(( |
| 540 | + entry.start_block, |
| 541 | + entry.end_block, |
| 542 | + entry.digest, |
| 543 | + )); |
| 544 | + } |
| 545 | + for entries in region_entries.values_mut() { |
| 546 | + entries.sort_by_key(|(start, _, _)| *start); |
| 547 | + } |
| 548 | + |
| 549 | + // Share across rayon threads |
| 550 | + let region_entries = Arc::new(region_entries); |
| 551 | + |
| 552 | + let chain_store = match self.store.block_store().chain_store(&network).await { |
| 553 | + Some(cs) => cs, |
| 554 | + None => { |
| 555 | + error!( |
| 556 | + self.logger, |
| 557 | + "Chain store not found for network"; |
| 558 | + "network" => &network |
| 559 | + ); |
| 560 | + return Ok(r::Value::Null); |
| 561 | + } |
| 562 | + }; |
| 563 | + |
| 564 | + // Search backwards from end_block (the match is likely near the top). |
| 565 | + // Pipeline: fetch the next chunk while computing POIs for the current one. |
| 566 | + let mut chunk_end = end_block; |
| 567 | + let chunk_start = std::cmp::max(chunk_end - CHUNK_SIZE, start_block); |
| 568 | + |
| 569 | + // Fetch first chunk |
| 570 | + let block_numbers: Vec<BlockNumber> = (chunk_start..chunk_end).collect(); |
| 571 | + let mut current_ptrs = match chain_store |
| 572 | + .cheap_clone() |
| 573 | + .block_ptrs_by_numbers(block_numbers) |
| 574 | + .await |
| 575 | + { |
| 576 | + Ok(ptrs) => ptrs, |
| 577 | + Err(e) => { |
| 578 | + error!( |
| 579 | + self.logger, |
| 580 | + "Failed to fetch block hashes"; |
| 581 | + "range" => format!("{}..{}", chunk_start, chunk_end), |
| 582 | + "error" => format!("{:?}", e) |
| 583 | + ); |
| 584 | + return Ok(r::Value::Null); |
| 585 | + } |
| 586 | + }; |
| 587 | + chunk_end = chunk_start; |
| 588 | + |
| 589 | + loop { |
| 590 | + // Start prefetching the next chunk while we process the current one |
| 591 | + let next_chunk_end = chunk_end; |
| 592 | + let next_chunk_start = std::cmp::max(next_chunk_end - CHUNK_SIZE, start_block); |
| 593 | + let prefetch = if next_chunk_start < next_chunk_end { |
| 594 | + let cs = chain_store.cheap_clone(); |
| 595 | + let numbers: Vec<BlockNumber> = (next_chunk_start..next_chunk_end).collect(); |
| 596 | + Some(tokio::spawn(async move { |
| 597 | + cs.block_ptrs_by_numbers(numbers).await |
| 598 | + })) |
| 599 | + } else { |
| 600 | + None |
| 601 | + }; |
| 602 | + |
| 603 | + // Collect blocks with unambiguous hashes for parallel search |
| 604 | + let blocks_to_check: Vec<(BlockNumber, BlockHash)> = current_ptrs |
| 605 | + .iter() |
| 606 | + .filter_map(|(num, ptrs)| { |
| 607 | + if ptrs.len() == 1 { |
| 608 | + Some((*num, ptrs[0].hash.clone())) |
| 609 | + } else { |
| 610 | + None |
| 611 | + } |
| 612 | + }) |
| 613 | + .collect(); |
| 614 | + |
| 615 | + // Parallel POI computation across all cores via rayon |
| 616 | + let re = region_entries.clone(); |
| 617 | + let did = deployment_id.clone(); |
| 618 | + let result = graph::spawn_blocking_allow_panic(move || { |
| 619 | + use rayon::prelude::*; |
| 620 | + blocks_to_check |
| 621 | + .par_iter() |
| 622 | + .find_map_any(|(block_num, block_hash)| { |
| 623 | + let block_ptr = BlockPtr::new(block_hash.clone(), *block_num); |
| 624 | + let mut finisher = |
| 625 | + ProofOfIndexingFinisher::new(&block_ptr, &did, &indexer, poi_version); |
| 626 | + |
| 627 | + for (region_id, entries) in re.as_ref() { |
| 628 | + let idx = entries.partition_point(|(start, _, _)| *start <= *block_num); |
| 629 | + if idx == 0 { |
| 630 | + continue; |
| 631 | + } |
| 632 | + let (start, end, ref digest) = entries[idx - 1]; |
| 633 | + if *block_num >= start && *block_num < end { |
| 634 | + finisher.add_causality_region(region_id, digest); |
| 635 | + } |
| 636 | + } |
| 637 | + |
| 638 | + let computed = finisher.finish(); |
| 639 | + if computed == target_bytes { |
| 640 | + Some((*block_num, block_hash.clone(), computed)) |
| 641 | + } else { |
| 642 | + None |
| 643 | + } |
| 644 | + }) |
| 645 | + }) |
| 646 | + .await |
| 647 | + .map_err(|e| QueryExecutionError::Panic(e.to_string()))?; |
| 648 | + |
| 649 | + if let Some((block_num, block_hash, computed_poi)) = result { |
| 650 | + // Found it - abort any in-flight prefetch |
| 651 | + if let Some(handle) = prefetch { |
| 652 | + handle.abort(); |
| 653 | + } |
| 654 | + return Ok(object! { |
| 655 | + __typename: "PoiSearchResult", |
| 656 | + deployment: deployment_id.to_string(), |
| 657 | + block: object! { |
| 658 | + hash: block_hash.hash_hex(), |
| 659 | + number: block_num, |
| 660 | + }, |
| 661 | + proofOfIndexing: format!("0x{}", hex::encode(computed_poi)), |
| 662 | + }); |
| 663 | + } |
| 664 | + |
| 665 | + // Move to the next chunk |
| 666 | + match prefetch { |
| 667 | + Some(handle) => { |
| 668 | + current_ptrs = handle |
| 669 | + .await |
| 670 | + .map_err(|e| QueryExecutionError::Panic(e.to_string()))? |
| 671 | + .map_err(|e| { |
| 672 | + error!( |
| 673 | + self.logger, |
| 674 | + "Failed to fetch block hashes"; |
| 675 | + "range" => format!("{}..{}", next_chunk_start, next_chunk_end), |
| 676 | + "error" => format!("{:?}", e) |
| 677 | + ); |
| 678 | + QueryExecutionError::StoreError(e.into()) |
| 679 | + })?; |
| 680 | + chunk_end = next_chunk_start; |
| 681 | + } |
| 682 | + None => break, |
| 683 | + } |
| 684 | + } |
| 685 | + |
| 686 | + Ok(r::Value::Null) |
| 687 | + } |
| 688 | + |
457 | 689 | async fn resolve_indexing_status_for_version( |
458 | 690 | &self, |
459 | 691 | field: &a::Field, |
@@ -858,6 +1090,7 @@ where |
858 | 1090 | // The top-level `subgraphVersions` field |
859 | 1091 | (None, "apiVersions") => self.resolve_api_versions(field), |
860 | 1092 | (None, "version") => self.version(), |
| 1093 | + (None, "blockForPoi") => self.resolve_block_for_poi(field).await, |
861 | 1094 |
|
862 | 1095 | // Resolve fields of `Object` values (e.g. the `latestBlock` field of `EthereumBlock`) |
863 | 1096 | (value, _) => Ok(value.unwrap_or(r::Value::Null)), |
|
0 commit comments