Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 75 additions & 11 deletions ipfsspec/async_ipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,82 @@ def _raise_requests_too_quick(response):
def __str__(self):
return f"GW({self.url})"

@staticmethod
def _verify_merkle_path(path, blocks):
"""
Verify that blocks form a valid chain from root CID through path segments.

According to the trustless gateway spec, dag-scope=block returns blocks
needed to verify path segments. This function validates the chain of custody
by checking that each parent block contains a PBLink to its child.

Args:
path: Full path like "bafy/dir/file"
blocks: Dict mapping CID -> block data from CAR response

Returns:
Final CID at the end of the path

Raises:
FileNotFoundError: If path cannot be verified through the chain
"""
segments = path.split("/")

# First segment must be the root CID
try:
current_cid = CID.decode(segments[0])
except Exception as e:
raise FileNotFoundError(f"Invalid root CID in path: {segments[0]}") from e

# Verify root block exists in CAR
if current_cid not in blocks:
raise FileNotFoundError(f"Root block {current_cid} not found in CAR response")

# Walk through path segments, validating each link
for segment in segments[1:]:
current_block = blocks[current_cid]

# Decode as PBNode to access links
if current_cid.codec != DagPbCodec:
raise FileNotFoundError(f"Cannot traverse path through non-DAG-PB block: {current_cid}")

node = unixfsv1.PBNode.loads(current_block)

# Find link matching this path segment
matching_link = None
for link in node.Links:
if link.Name == segment:
matching_link = link
break

if matching_link is None:
raise FileNotFoundError(f"Path segment '{segment}' not found in directory {current_cid}")

# Decode the child CID from the link's Hash
try:
child_cid = CID.decode(matching_link.Hash)
except Exception as e:
raise FileNotFoundError(f"Invalid CID in link '{segment}'") from e

# Verify child block exists in CAR
if child_cid not in blocks:
raise FileNotFoundError(f"Child block {child_cid} for path segment '{segment}' not found in CAR response")

current_cid = child_cid

return current_cid

async def info(self, path, session):
res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.car"}, params={"format": "car", "dag-scope": "block"})
self._raise_not_found_for_status(res, path)

roots = res.headers["X-Ipfs-Roots"].split(",")
if len(roots) != len(path.split("/")):
raise FileNotFoundError(path)

cid = CID.decode(roots[-1])
resdata = await res.read()

_, blocks = read_car(resdata) # roots should be ignored by https://specs.ipfs.tech/http-gateways/trustless-gateway/
blocks = {cid: data for cid, data, _ in blocks}

# Verify the merkle proof from root CID through path segments
cid = self._verify_merkle_path(path, blocks)
block = blocks[cid]

if cid.codec == RawCodec:
Expand Down Expand Up @@ -146,17 +209,18 @@ async def iter_chunked(self, path, session, chunk_size):
async def ls(self, path, session, detail=False):
res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.car"}, params={"format": "car", "dag-scope": "block"})
self._raise_not_found_for_status(res, path)
roots = res.headers["X-Ipfs-Roots"].split(",")
if len(roots) != len(path.split("/")):
raise FileNotFoundError(path)

cid = CID.decode(roots[-1])
assert cid.codec == DagPbCodec, "this is not a directory"

resdata = await res.read()

_, blocks = read_car(resdata) # roots should be ignored by https://specs.ipfs.tech/http-gateways/trustless-gateway/
blocks = {cid: data for cid, data, _ in blocks}

# Verify the chain of custody from root CID through path segments
cid = self._verify_merkle_path(path, blocks)

if cid.codec != DagPbCodec:
raise NotADirectoryError(f"Path {path} does not resolve to a directory")

node = unixfsv1.PBNode.loads(blocks[cid])
data = unixfsv1.Data.loads(node.Data)
if data.Type != unixfsv1.DataType.Directory:
Expand Down
128 changes: 128 additions & 0 deletions test/test_merkle_verification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""Test merkle path verification logic using real CAR test data"""

import pytest
from pathlib import Path
from multiformats import CID
from ipfsspec.async_ipfs import AsyncIPFSGateway
from ipfsspec.car import read_car


# Test data from test/testdata.car
# Root: QmW3CrGFuFyF3VH1wvrap4Jend5NRTgtESDjuQ7QhHD5dd (directory)
# Children:
# - default: QmZsn2gmGC6yBs6TWPiRspXfTJ3K4DEtWUePVqBJ84YkU8
# - multi: QmaSgZFgGWWuV27GG1QtZuqTXrdWM5yLLdtyr5SSutmJFr
# - raw: bafkreibauudqsswbcktzrs5bwozj3cllhme56jlj23op4lwgmsucpv222q
# - raw_multi: QmeMPrSpm7q5bjczEJLPRHiSDdwEPWt16phrBUx2YY4E8g
# - write: QmUHyXsVBDM9qkj4aaBrqcm12eFYPWva2jmAMD5TJfp2Qh


@pytest.fixture
def test_car():
"""Load blocks and root CID from test/testdata.car"""
root_cid = CID.decode("QmW3CrGFuFyF3VH1wvrap4Jend5NRTgtESDjuQ7QhHD5dd")
car_path = Path(__file__).parent / "testdata.car"
with open(car_path, "rb") as f:
car_data = f.read()

_, blocks_iter = read_car(car_data)
blocks = {cid: data for cid, data, _ in blocks_iter}
return root_cid, blocks


def test_verify_merkle_path_single_cid(test_car):
"""Test verification with just a single CID (no path)"""
root_cid, blocks = test_car

# Should return the same CID for a path with no segments
result = AsyncIPFSGateway._verify_merkle_path(str(root_cid), blocks)
assert result == root_cid


def test_verify_merkle_path_valid_child(test_car):
"""Test verification of a valid path to a child entry"""
root_cid, blocks = test_car
expected_child_cid = CID.decode("QmZsn2gmGC6yBs6TWPiRspXfTJ3K4DEtWUePVqBJ84YkU8")

# Verify path to "default" entry
result = AsyncIPFSGateway._verify_merkle_path(
f"{root_cid}/default", blocks
)
assert result == expected_child_cid


def test_verify_merkle_path_valid_cidv1_child(test_car):
"""Test verification with CIDv1 child (raw block)"""
root_cid, blocks = test_car
expected_raw_cid = CID.decode(
"bafkreibauudqsswbcktzrs5bwozj3cllhme56jlj23op4lwgmsucpv222q"
)

# Verify path to "raw" entry (CIDv1)
result = AsyncIPFSGateway._verify_merkle_path(f"{root_cid}/raw", blocks)
assert result == expected_raw_cid


def test_verify_merkle_path_all_children(test_car):
"""Test verification of all child entries in the directory"""
root_cid, blocks = test_car

expected_entries = {
"default": "QmZsn2gmGC6yBs6TWPiRspXfTJ3K4DEtWUePVqBJ84YkU8",
"multi": "QmaSgZFgGWWuV27GG1QtZuqTXrdWM5yLLdtyr5SSutmJFr",
"raw": "bafkreibauudqsswbcktzrs5bwozj3cllhme56jlj23op4lwgmsucpv222q",
"raw_multi": "QmeMPrSpm7q5bjczEJLPRHiSDdwEPWt16phrBUx2YY4E8g",
"write": "QmUHyXsVBDM9qkj4aaBrqcm12eFYPWva2jmAMD5TJfp2Qh",
}

for name, expected_cid_str in expected_entries.items():
expected_cid = CID.decode(expected_cid_str)
result = AsyncIPFSGateway._verify_merkle_path(
f"{root_cid}/{name}", blocks
)
assert result == expected_cid, f"Failed for entry '{name}'"


def test_verify_merkle_path_missing_root():
"""Test that missing root block raises FileNotFoundError"""
cid = CID.decode("QmW3CrGFuFyF3VH1wvrap4Jend5NRTgtESDjuQ7QhHD5dd")
blocks = {} # Empty, root not present

with pytest.raises(FileNotFoundError, match="Root block .* not found"):
AsyncIPFSGateway._verify_merkle_path(str(cid), blocks)


def test_verify_merkle_path_invalid_root_cid():
"""Test that invalid CID in path raises FileNotFoundError"""
blocks = {}

with pytest.raises(FileNotFoundError, match="Invalid root CID"):
AsyncIPFSGateway._verify_merkle_path("not-a-valid-cid/path", blocks)


def test_verify_merkle_path_nonexistent_path_segment(test_car):
"""Test that nonexistent path segment raises FileNotFoundError"""
root_cid, blocks = test_car

with pytest.raises(FileNotFoundError, match="Path segment 'nonexistent' not found"):
AsyncIPFSGateway._verify_merkle_path(f"{root_cid}/nonexistent", blocks)


def test_verify_merkle_path_wrong_segment_name(test_car):
"""Test that wrong path segment name raises FileNotFoundError"""
root_cid, blocks = test_car

# "defaults" instead of "default"
with pytest.raises(FileNotFoundError, match="Path segment 'defaults' not found"):
AsyncIPFSGateway._verify_merkle_path(f"{root_cid}/defaults", blocks)


def test_verify_merkle_path_missing_intermediate_block(test_car):
"""Test that missing child block in chain raises FileNotFoundError"""
root_cid, test_car = test_car

# Create blocks dict with only root, missing child blocks
blocks = {root_cid: test_car[root_cid]}

with pytest.raises(FileNotFoundError, match="Child block .* not found"):
AsyncIPFSGateway._verify_merkle_path(f"{root_cid}/default", blocks)