From b009e236cfa39c82f0ace0d4b674643e04704d4c Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Tue, 20 Jan 2026 14:30:16 +0100 Subject: [PATCH 1/2] Improve unit tests --- tests/_util.py | 7 ++ tests/conftest.py | 133 --------------------------- tests/descriptor/test_hyperv.py | 109 +++++++++++----------- tests/descriptor/test_vmx.py | 7 +- tests/disk/test_asif.py | 18 ++-- tests/disk/test_hdd.py | 18 ++++ tests/disk/test_qcow2.py | 75 ++++++++------- tests/disk/test_vhd.py | 115 ++++++++++++----------- tests/disk/test_vhdx.py | 158 +++++++++++++++++--------------- tests/disk/test_vmdk.py | 28 +++--- tests/util/test_envelope.py | 21 +++-- tests/util/test_vmtar.py | 37 ++++---- 12 files changed, 324 insertions(+), 402 deletions(-) create mode 100644 tests/_util.py delete mode 100644 tests/conftest.py diff --git a/tests/_util.py b/tests/_util.py new file mode 100644 index 0000000..7ace382 --- /dev/null +++ b/tests/_util.py @@ -0,0 +1,7 @@ +from __future__ import annotations + +from pathlib import Path + + +def absolute_path(filename: str) -> Path: + return Path(__file__).parent.joinpath(filename).resolve() diff --git a/tests/conftest.py b/tests/conftest.py deleted file mode 100644 index 3673181..0000000 --- a/tests/conftest.py +++ /dev/null @@ -1,133 +0,0 @@ -from __future__ import annotations - -import gzip -from pathlib import Path -from typing import TYPE_CHECKING, BinaryIO, TextIO - -import pytest - -if TYPE_CHECKING: - from collections.abc import Iterator - - -def absolute_path(filename: str) -> Path: - return Path(__file__).parent / filename - - -def open_file(name: str, mode: str = "rb") -> Iterator[BinaryIO]: - with absolute_path(name).open(mode) as fh: - yield fh - - -def open_file_gz(name: str, mode: str = "rb") -> Iterator[BinaryIO]: - with gzip.GzipFile(absolute_path(name), mode) as fh: - yield fh - - -@pytest.fixture -def encrypted_vmx() -> Iterator[BinaryIO]: - yield from open_file("_data/descriptor/vmx/encrypted.vmx") - - -@pytest.fixture -def vmcx() -> Iterator[BinaryIO]: - yield from open_file("_data/descriptor/hyperv/test.vmcx") - - -@pytest.fixture -def vmrs() -> Iterator[BinaryIO]: - yield from open_file("_data/descriptor/hyperv/test.VMRS") - - -@pytest.fixture -def fixed_vhd() -> Iterator[BinaryIO]: - yield from open_file_gz("_data/disk/vhd/fixed.vhd.gz") - - -@pytest.fixture -def dynamic_vhd() -> Iterator[BinaryIO]: - yield from open_file_gz("_data/disk/vhd/dynamic.vhd.gz") - - -@pytest.fixture -def fixed_vhdx() -> Iterator[BinaryIO]: - yield from open_file_gz("_data/disk/vhdx/fixed.vhdx.gz") - - -@pytest.fixture -def dynamic_vhdx() -> Iterator[BinaryIO]: - yield from open_file_gz("_data/disk/vhdx/dynamic.vhdx.gz") - - -@pytest.fixture -def differencing_vhdx() -> Iterator[BinaryIO]: - yield from open_file_gz("_data/disk/vhdx/differencing.avhdx.gz") - - -@pytest.fixture -def sesparse_vmdk() -> Iterator[BinaryIO]: - yield from open_file_gz("_data/disk/vmdk/sesparse.vmdk.gz") - - -@pytest.fixture -def plain_hdd() -> Path: - return absolute_path("_data/disk/hdd/plain.hdd") - - -@pytest.fixture -def expanding_hdd() -> Path: - return absolute_path("_data/disk/hdd/expanding.hdd") - - -@pytest.fixture -def split_hdd() -> Path: - return absolute_path("_data/disk/hdd/split.hdd") - - -@pytest.fixture -def basic_qcow2() -> Iterator[BinaryIO]: - yield from open_file_gz("_data/disk/qcow2/basic.qcow2.gz") - - -@pytest.fixture -def basic_zstd_qcow2() -> Iterator[BinaryIO]: - yield from open_file_gz("_data/disk/qcow2/basic-zstd.qcow2.gz") - - -@pytest.fixture -def data_file_qcow2() -> Path: - return absolute_path("_data/disk/qcow2/data-file.qcow2.gz") - - -@pytest.fixture -def backing_chain_qcow2() -> tuple[Path, Path, Path]: - return ( - absolute_path("_data/disk/qcow2/backing-chain-1.qcow2.gz"), - absolute_path("_data/disk/qcow2/backing-chain-2.qcow2.gz"), - absolute_path("_data/disk/qcow2/backing-chain-3.qcow2.gz"), - ) - - -@pytest.fixture -def snapshot_qcow2() -> Iterator[BinaryIO]: - yield from open_file_gz("_data/disk/qcow2/snapshot.qcow2.gz") - - -@pytest.fixture -def basic_asif() -> Iterator[BinaryIO]: - yield from open_file_gz("_data/disk/asif/basic.asif.gz") - - -@pytest.fixture -def envelope() -> Iterator[BinaryIO]: - yield from open_file("_data/util/envelope/local.tgz.ve") - - -@pytest.fixture -def keystore() -> Iterator[TextIO]: - yield from open_file("_data/util/envelope/encryption.info", "r") - - -@pytest.fixture -def vgz() -> Iterator[BinaryIO]: - yield from open_file("_data/util/vmtar/test.vgz") diff --git a/tests/descriptor/test_hyperv.py b/tests/descriptor/test_hyperv.py index ea494f6..adfca34 100644 --- a/tests/descriptor/test_hyperv.py +++ b/tests/descriptor/test_hyperv.py @@ -1,63 +1,64 @@ from __future__ import annotations -from typing import BinaryIO - from dissect.hypervisor.descriptor.hyperv import HyperVFile +from tests._util import absolute_path + + +def test_hyperv_vmcx() -> None: + with absolute_path("_data/descriptor/hyperv/test.vmcx").open("rb") as fh: + hf = HyperVFile(fh) + + assert hf.header is hf.headers[0] + assert hf.version == 0x400 + assert len(hf.replay_logs) == 1 + assert len(hf.object_tables) == 1 + assert len(hf.key_tables) == 8 + + obj = hf.as_dict() + assert set(obj.keys()) == {"configuration"} + assert len(obj["configuration"].keys()) == 27 + assert len(obj["configuration"]["manifest"].keys()) == 39 + assert len(obj["configuration"]["properties"].keys()) == 11 + assert len(obj["configuration"]["settings"].keys()) == 6 + + +def test_hyperv_vmrs() -> None: + with absolute_path("_data/descriptor/hyperv/test.VMRS").open("rb") as fh: + hf = HyperVFile(fh) + assert hf.header is hf.headers[0] + assert hf.version == 0x400 + assert len(hf.replay_logs) == 1 + assert len(hf.object_tables) == 1 + assert len(hf.key_tables) == 2 -def test_hyperv_vmcx(vmcx: BinaryIO) -> None: - hf = HyperVFile(vmcx) - - assert hf.header is hf.headers[0] - assert hf.version == 0x400 - assert len(hf.replay_logs) == 1 - assert len(hf.object_tables) == 1 - assert len(hf.key_tables) == 8 - - obj = hf.as_dict() - assert set(obj.keys()) == {"configuration"} - assert len(obj["configuration"].keys()) == 27 - assert len(obj["configuration"]["manifest"].keys()) == 39 - assert len(obj["configuration"]["properties"].keys()) == 11 - assert len(obj["configuration"]["settings"].keys()) == 6 - - -def test_hyperv_vmrs(vmrs: BinaryIO) -> None: - hf = HyperVFile(vmrs) - - assert hf.header is hf.headers[0] - assert hf.version == 0x400 - assert len(hf.replay_logs) == 1 - assert len(hf.object_tables) == 1 - assert len(hf.key_tables) == 2 - - obj = hf.as_dict() - target = { - "configuration": { - "properties": {"version": 2304}, - "global_settings": { - "metrics": { - "devicetype": { - "guid": "83F8638B-8DCA-4152-9EDA-2CA8B33039B4", - "deviceinstance": { + obj = hf.as_dict() + target = { + "configuration": { + "properties": {"version": 2304}, + "global_settings": { + "metrics": { + "devicetype": { "guid": "83F8638B-8DCA-4152-9EDA-2CA8B33039B4", - "metric": { - "typecode": "4E1D459F-7861-46A4-887C-B64397C97E1B;0\\0\\L", - "value": 0, - "enabled": False, - "starttime": 0, - "lastcomputedtime": 0, - "peaktime": 0, - "poolid": "", - "resourcetypeid": "70BB60D2-A9D3-46AA-B654-3DE53004B4F8", + "deviceinstance": { + "guid": "83F8638B-8DCA-4152-9EDA-2CA8B33039B4", + "metric": { + "typecode": "4E1D459F-7861-46A4-887C-B64397C97E1B;0\\0\\L", + "value": 0, + "enabled": False, + "starttime": 0, + "lastcomputedtime": 0, + "peaktime": 0, + "poolid": "", + "resourcetypeid": "70BB60D2-A9D3-46AA-B654-3DE53004B4F8", + }, }, - }, + } } - } - }, - "_ac6b8dc1-3257-4a70-b1b2-a9c9215659ad_": {"VDEVVersion": 2048}, - "_e51b7ef6-4a7f-4780-aaae-d4b291aacd2e_": {"VDEVVersion": 512}, - "_83f8638b-8dca-4152-9eda-2ca8b33039b4_": {"VDEVVersion": 1792}, + }, + "_ac6b8dc1-3257-4a70-b1b2-a9c9215659ad_": {"VDEVVersion": 2048}, + "_e51b7ef6-4a7f-4780-aaae-d4b291aacd2e_": {"VDEVVersion": 512}, + "_83f8638b-8dca-4152-9eda-2ca8b33039b4_": {"VDEVVersion": 1792}, + } } - } - assert obj == target + assert obj == target diff --git a/tests/descriptor/test_vmx.py b/tests/descriptor/test_vmx.py index 403f13a..11aa6e3 100644 --- a/tests/descriptor/test_vmx.py +++ b/tests/descriptor/test_vmx.py @@ -1,10 +1,9 @@ from __future__ import annotations -from typing import BinaryIO - import pytest from dissect.hypervisor.descriptor.vmx import HAS_PYCRYPTODOME, HAS_PYSTANDALONE, VMX +from tests._util import absolute_path def test_vmx() -> None: @@ -66,8 +65,8 @@ def test_vmx() -> None: @pytest.mark.skipif((not HAS_PYCRYPTODOME and not HAS_PYSTANDALONE), reason="No crypto module available") -def test_vmx_encrypted(encrypted_vmx: BinaryIO) -> None: - vmx = VMX.parse(encrypted_vmx.read().decode()) +def test_vmx_encrypted() -> None: + vmx = VMX.parse(absolute_path("_data/descriptor/vmx/encrypted.vmx").read_text()) assert vmx.encrypted diff --git a/tests/disk/test_asif.py b/tests/disk/test_asif.py index 915e1de..ad7ebdf 100644 --- a/tests/disk/test_asif.py +++ b/tests/disk/test_asif.py @@ -1,17 +1,19 @@ from __future__ import annotations -from typing import BinaryIO +import gzip from dissect.hypervisor.disk.asif import ASIF +from tests._util import absolute_path -def test_asif(basic_asif: BinaryIO) -> None: +def test_asif() -> None: """Test ASIF parsing.""" - asif = ASIF(basic_asif) + with gzip.open(absolute_path("_data/disk/asif/basic.asif.gz"), "rb") as fh: + asif = ASIF(fh) - assert asif.internal_metadata == {"stable uuid": "13db9632-b79f-4e95-aada-835d5ef97bba"} - assert asif.user_metadata == {} + assert asif.internal_metadata == {"stable uuid": "13db9632-b79f-4e95-aada-835d5ef97bba"} + assert asif.user_metadata == {} - with asif.open() as stream: - for i in range(100): - assert stream.read(1024 * 1024).strip(bytes([i])) == b"", f"Mismatch at offset {i * 1024 * 1024:#x}" + with asif.open() as stream: + for i in range(100): + assert stream.read(1024 * 1024).strip(bytes([i])) == b"", f"Mismatch at offset {i * 1024 * 1024:#x}" diff --git a/tests/disk/test_hdd.py b/tests/disk/test_hdd.py index 1c08b74..829a26e 100644 --- a/tests/disk/test_hdd.py +++ b/tests/disk/test_hdd.py @@ -5,11 +5,29 @@ from typing import BinaryIO from unittest.mock import patch +import pytest + from dissect.hypervisor.disk.hdd import HDD +from tests._util import absolute_path Path_open = Path.open +@pytest.fixture +def plain_hdd() -> Path: + return absolute_path("_data/disk/hdd/plain.hdd") + + +@pytest.fixture +def expanding_hdd() -> Path: + return absolute_path("_data/disk/hdd/expanding.hdd") + + +@pytest.fixture +def split_hdd() -> Path: + return absolute_path("_data/disk/hdd/split.hdd") + + def mock_open_gz(self: Path, *args, **kwargs) -> BinaryIO: if self.suffix.lower() != ".hds": return Path_open(self, *args, **kwargs) diff --git a/tests/disk/test_qcow2.py b/tests/disk/test_qcow2.py index 017a919..e9011ca 100644 --- a/tests/disk/test_qcow2.py +++ b/tests/disk/test_qcow2.py @@ -10,32 +10,42 @@ from dissect.hypervisor.disk.qcow2 import QCow2, QCow2Stream from dissect.hypervisor.exceptions import Error +from tests._util import absolute_path def mock_open_gz(self: Path, *args, **kwargs) -> BinaryIO: return gzip.open(self if self.suffix.lower() == ".gz" else self.with_suffix(self.suffix + ".gz")) -@pytest.mark.parametrize("name", ["basic_qcow2", "basic_zstd_qcow2"]) -def test_basic(name: str, request: pytest.FixtureRequest) -> None: - qcow2 = QCow2(request.getfixturevalue(name)) +@pytest.mark.parametrize( + "path", + [ + "_data/disk/qcow2/basic.qcow2.gz", + "_data/disk/qcow2/basic-zstd.qcow2.gz", + ], +) +def test_basic(path: str) -> None: + with gzip.open(absolute_path(path), "rb") as fh: + qcow2 = QCow2(fh) - assert qcow2.backing_file is None - assert qcow2.data_file is qcow2.fh - assert qcow2.size == 536870912 + assert qcow2.backing_file is None + assert qcow2.data_file is qcow2.fh + assert qcow2.size == 536870912 - with qcow2.open() as stream: - for i in range(255): - assert stream.read(1024 * 1024).strip(bytes([i])) == b"", f"Mismatch at offset {i * 1024 * 1024:#x}" + with qcow2.open() as stream: + for i in range(255): + assert stream.read(1024 * 1024).strip(bytes([i])) == b"", f"Mismatch at offset {i * 1024 * 1024:#x}" -def test_data_file(data_file_qcow2: Path) -> None: +def test_data_file() -> None: + path = absolute_path("_data/disk/qcow2/data-file.qcow2.gz") + # Test with file handle - with gzip.open(data_file_qcow2, "rb") as fh: + with gzip.open(path, "rb") as fh: with pytest.raises(Error, match=r"data-file required but not provided \(image_data_file = 'data-file.bin'\)"): QCow2(fh) - with gzip.open(data_file_qcow2.with_name("data-file.bin.gz"), "rb") as fh_bin: + with gzip.open(path.with_name("data-file.bin.gz"), "rb") as fh_bin: qcow2 = QCow2(fh, data_file=fh_bin) assert qcow2.backing_file is None @@ -53,7 +63,7 @@ def test_data_file(data_file_qcow2: Path) -> None: # Test with Path with patch.object(Path, "open", mock_open_gz), patch.object(Path, "exists", return_value=True): - qcow2 = QCow2(data_file_qcow2) + qcow2 = QCow2(path) assert qcow2.backing_file is None assert qcow2.data_file is not qcow2.fh @@ -63,8 +73,10 @@ def test_data_file(data_file_qcow2: Path) -> None: assert stream.read(1024 * 1024).strip(bytes([i])) == b"", f"Mismatch at offset {i * 1024 * 1024:#x}" -def test_backing_file(backing_chain_qcow2: tuple[Path, Path, Path]) -> None: - file1, file2, file3 = backing_chain_qcow2 +def test_backing_file() -> None: + file1 = absolute_path("_data/disk/qcow2/backing-chain-1.qcow2.gz") + file2 = absolute_path("_data/disk/qcow2/backing-chain-2.qcow2.gz") + file3 = absolute_path("_data/disk/qcow2/backing-chain-3.qcow2.gz") # Test with file handle with gzip.open(file1, "rb") as fh1, gzip.open(file2, "rb") as fh2, gzip.open(file3, "rb") as fh3: @@ -125,24 +137,25 @@ def test_backing_file(backing_chain_qcow2: tuple[Path, Path, Path]) -> None: assert stream.read(1024 * 1024).strip(b"\x00") == b"Something here five" -def test_snapshot(snapshot_qcow2: BinaryIO) -> None: - qcow2 = QCow2(snapshot_qcow2) +def test_snapshot() -> None: + with gzip.open(absolute_path("_data/disk/qcow2/snapshot.qcow2.gz"), "rb") as fh: + qcow2 = QCow2(fh) - assert qcow2.backing_file is None - assert qcow2.data_file is qcow2.fh - assert qcow2.size == 536870912 + assert qcow2.backing_file is None + assert qcow2.data_file is qcow2.fh + assert qcow2.size == 536870912 - with qcow2.open() as stream: - assert stream.read(4 * 1024 * 1024).strip(b"\x00") == b"" + with qcow2.open() as stream: + assert stream.read(4 * 1024 * 1024).strip(b"\x00") == b"" - assert len(qcow2.snapshots) == 2 - assert qcow2.snapshots[0].id == "1" - assert qcow2.snapshots[0].name == "you can't see me" - assert qcow2.snapshots[1].id == "2" - assert qcow2.snapshots[1].name == "confused" + assert len(qcow2.snapshots) == 2 + assert qcow2.snapshots[0].id == "1" + assert qcow2.snapshots[0].name == "you can't see me" + assert qcow2.snapshots[1].id == "2" + assert qcow2.snapshots[1].name == "confused" - with qcow2.snapshots[1].open() as stream: - assert hashlib.sha1(stream.read(813857)).hexdigest() == "c97f53aece77ea49099d15e5f53af3af5f62fb54" + with qcow2.snapshots[1].open() as stream: + assert hashlib.sha1(stream.read(813857)).hexdigest() == "c97f53aece77ea49099d15e5f53af3af5f62fb54" - with qcow2.snapshots[0].open() as stream: - assert hashlib.sha1(stream.read(2261577)).hexdigest() == "2c7a6b5f6b5c4739f6d24c11e86c764bdf86096f" + with qcow2.snapshots[0].open() as stream: + assert hashlib.sha1(stream.read(2261577)).hexdigest() == "2c7a6b5f6b5c4739f6d24c11e86c764bdf86096f" diff --git a/tests/disk/test_vhd.py b/tests/disk/test_vhd.py index 4b35713..9a78b2f 100644 --- a/tests/disk/test_vhd.py +++ b/tests/disk/test_vhd.py @@ -1,62 +1,65 @@ from __future__ import annotations -from typing import BinaryIO +import gzip from dissect.hypervisor.disk.vhd import VHD, DynamicDisk, FixedDisk +from tests._util import absolute_path -def test_vhd_fixed(fixed_vhd: BinaryIO) -> None: - vhd = VHD(fixed_vhd) - assert vhd.size == 10485760 - assert isinstance(vhd.disk, FixedDisk) - - assert vhd.read(512) == bytes.fromhex( - "33c08ed0bc007c8ec08ed8be007cbf0006b90002fcf3a450681c06cbfbb90400" - "bdbe07807e00007c0b0f850e0183c510e2f1cd1888560055c6461105c6461000" - "b441bbaa55cd135d720f81fb55aa7509f7c101007403fe46106660807e100074" - "2666680000000066ff760868000068007c680100681000b4428a56008bf4cd13" - "9f83c4109eeb14b80102bb007c8a56008a76018a4e028a6e03cd136661731cfe" - "4e11750c807e00800f848a00b280eb845532e48a5600cd135deb9e813efe7d55" - "aa756eff7600e88d007517fab0d1e664e88300b0dfe660e87c00b0ffe664e875" - "00fbb800bbcd1a6623c0753b6681fb54435041753281f90201722c666807bb00" - "006668000200006668080000006653665366556668000000006668007c000066" - "6168000007cd1a5a32f6ea007c0000cd18a0b707eb08a0b607eb03a0b50732e4" - "0500078bf0ac3c007409bb0700b40ecd10ebf2f4ebfd2bc9e464eb002402e0f8" - "2402c3496e76616c696420706172746974696f6e207461626c65004572726f72" - "206c6f6164696e67206f7065726174696e672073797374656d004d697373696e" - "67206f7065726174696e672073797374656d000000637b9aeb3dad8c00000002" - "030007e525008000000000380000000000000000000000000000000000000000" - "00000000000000000000000000000000000000000000000000000000000055aa" - ) - - vhd.seek(0x200000) - assert vhd.read(512) == b"\xff" * 512 - - -def test_vhd_dynamic(dynamic_vhd: BinaryIO) -> None: - vhd = VHD(dynamic_vhd) - assert vhd.size == 10485760 - assert isinstance(vhd.disk, DynamicDisk) - - assert vhd.read(512) == bytes.fromhex( - "33c08ed0bc007c8ec08ed8be007cbf0006b90002fcf3a450681c06cbfbb90400" - "bdbe07807e00007c0b0f850e0183c510e2f1cd1888560055c6461105c6461000" - "b441bbaa55cd135d720f81fb55aa7509f7c101007403fe46106660807e100074" - "2666680000000066ff760868000068007c680100681000b4428a56008bf4cd13" - "9f83c4109eeb14b80102bb007c8a56008a76018a4e028a6e03cd136661731cfe" - "4e11750c807e00800f848a00b280eb845532e48a5600cd135deb9e813efe7d55" - "aa756eff7600e88d007517fab0d1e664e88300b0dfe660e87c00b0ffe664e875" - "00fbb800bbcd1a6623c0753b6681fb54435041753281f90201722c666807bb00" - "006668000200006668080000006653665366556668000000006668007c000066" - "6168000007cd1a5a32f6ea007c0000cd18a0b707eb08a0b607eb03a0b50732e4" - "0500078bf0ac3c007409bb0700b40ecd10ebf2f4ebfd2bc9e464eb002402e0f8" - "2402c3496e76616c696420706172746974696f6e207461626c65004572726f72" - "206c6f6164696e67206f7065726174696e672073797374656d004d697373696e" - "67206f7065726174696e672073797374656d000000637b9ae13dad8c00000002" - "030007e525008000000000380000000000000000000000000000000000000000" - "00000000000000000000000000000000000000000000000000000000000055aa" - ) - - vhd.seek(0x200000) - assert vhd.read(512) == b"\xff" * 512 - assert vhd.disk.read_sectors(0x3FFF, 16) == (b"\x00" * 512 * 16) +def test_vhd_fixed() -> None: + with gzip.open(absolute_path("_data/disk/vhd/fixed.vhd.gz"), "rb") as fh: + vhd = VHD(fh) + assert vhd.size == 10485760 + assert isinstance(vhd.disk, FixedDisk) + + assert vhd.read(512) == bytes.fromhex( + "33c08ed0bc007c8ec08ed8be007cbf0006b90002fcf3a450681c06cbfbb90400" + "bdbe07807e00007c0b0f850e0183c510e2f1cd1888560055c6461105c6461000" + "b441bbaa55cd135d720f81fb55aa7509f7c101007403fe46106660807e100074" + "2666680000000066ff760868000068007c680100681000b4428a56008bf4cd13" + "9f83c4109eeb14b80102bb007c8a56008a76018a4e028a6e03cd136661731cfe" + "4e11750c807e00800f848a00b280eb845532e48a5600cd135deb9e813efe7d55" + "aa756eff7600e88d007517fab0d1e664e88300b0dfe660e87c00b0ffe664e875" + "00fbb800bbcd1a6623c0753b6681fb54435041753281f90201722c666807bb00" + "006668000200006668080000006653665366556668000000006668007c000066" + "6168000007cd1a5a32f6ea007c0000cd18a0b707eb08a0b607eb03a0b50732e4" + "0500078bf0ac3c007409bb0700b40ecd10ebf2f4ebfd2bc9e464eb002402e0f8" + "2402c3496e76616c696420706172746974696f6e207461626c65004572726f72" + "206c6f6164696e67206f7065726174696e672073797374656d004d697373696e" + "67206f7065726174696e672073797374656d000000637b9aeb3dad8c00000002" + "030007e525008000000000380000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000000000000055aa" + ) + + vhd.seek(0x200000) + assert vhd.read(512) == b"\xff" * 512 + + +def test_vhd_dynamic() -> None: + with gzip.open(absolute_path("_data/disk/vhd/dynamic.vhd.gz"), "rb") as fh: + vhd = VHD(fh) + assert vhd.size == 10485760 + assert isinstance(vhd.disk, DynamicDisk) + + assert vhd.read(512) == bytes.fromhex( + "33c08ed0bc007c8ec08ed8be007cbf0006b90002fcf3a450681c06cbfbb90400" + "bdbe07807e00007c0b0f850e0183c510e2f1cd1888560055c6461105c6461000" + "b441bbaa55cd135d720f81fb55aa7509f7c101007403fe46106660807e100074" + "2666680000000066ff760868000068007c680100681000b4428a56008bf4cd13" + "9f83c4109eeb14b80102bb007c8a56008a76018a4e028a6e03cd136661731cfe" + "4e11750c807e00800f848a00b280eb845532e48a5600cd135deb9e813efe7d55" + "aa756eff7600e88d007517fab0d1e664e88300b0dfe660e87c00b0ffe664e875" + "00fbb800bbcd1a6623c0753b6681fb54435041753281f90201722c666807bb00" + "006668000200006668080000006653665366556668000000006668007c000066" + "6168000007cd1a5a32f6ea007c0000cd18a0b707eb08a0b607eb03a0b50732e4" + "0500078bf0ac3c007409bb0700b40ecd10ebf2f4ebfd2bc9e464eb002402e0f8" + "2402c3496e76616c696420706172746974696f6e207461626c65004572726f72" + "206c6f6164696e67206f7065726174696e672073797374656d004d697373696e" + "67206f7065726174696e672073797374656d000000637b9ae13dad8c00000002" + "030007e525008000000000380000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000000000000055aa" + ) + + vhd.seek(0x200000) + assert vhd.read(512) == b"\xff" * 512 + assert vhd.disk.read_sectors(0x3FFF, 16) == (b"\x00" * 512 * 16) diff --git a/tests/disk/test_vhdx.py b/tests/disk/test_vhdx.py index 1fc03f8..e2289cd 100644 --- a/tests/disk/test_vhdx.py +++ b/tests/disk/test_vhdx.py @@ -1,86 +1,92 @@ from __future__ import annotations -from typing import BinaryIO +import gzip from uuid import UUID import pytest from dissect.hypervisor.disk.vhdx import VHDX, _iter_partial_runs, c_vhdx - - -def test_vhdx_fixed(fixed_vhdx: BinaryIO) -> None: - v = VHDX(fixed_vhdx) - - assert v.size == 0xA00000 - assert v.block_size == 0x200000 - assert v.has_parent == 0 - assert v.sector_size == 0x200 - assert v.bat.chunk_ratio == 0x800 - assert v.id == UUID("4a49d245-db0a-4634-9818-9f93db5ba6c1") - - assert v.read(512) == bytes.fromhex( - "33c08ed0bc007c8ec08ed8be007cbf0006b90002fcf3a450681c06cbfbb90400" - "bdbe07807e00007c0b0f850e0183c510e2f1cd1888560055c6461105c6461000" - "b441bbaa55cd135d720f81fb55aa7509f7c101007403fe46106660807e100074" - "2666680000000066ff760868000068007c680100681000b4428a56008bf4cd13" - "9f83c4109eeb14b80102bb007c8a56008a76018a4e028a6e03cd136661731cfe" - "4e11750c807e00800f848a00b280eb845532e48a5600cd135deb9e813efe7d55" - "aa756eff7600e88d007517fab0d1e664e88300b0dfe660e87c00b0ffe664e875" - "00fbb800bbcd1a6623c0753b6681fb54435041753281f90201722c666807bb00" - "006668000200006668080000006653665366556668000000006668007c000066" - "6168000007cd1a5a32f6ea007c0000cd18a0b707eb08a0b607eb03a0b50732e4" - "0500078bf0ac3c007409bb0700b40ecd10ebf2f4ebfd2bc9e464eb002402e0f8" - "2402c3496e76616c696420706172746974696f6e207461626c65004572726f72" - "206c6f6164696e67206f7065726174696e672073797374656d004d697373696e" - "67206f7065726174696e672073797374656d000000637b9a8a3dad8c00000002" - "030007e525008000000000380000000000000000000000000000000000000000" - "00000000000000000000000000000000000000000000000000000000000055aa" - ) - - v.seek(0x200000) - assert v.read(512) == b"\xff" * 512 - - for block in range(v.bat._pb_count): - block_entry = v.bat.pb(block) - assert block_entry.state == c_vhdx.PAYLOAD_BLOCK_FULLY_PRESENT - - -def test_vhdx_dynamic(dynamic_vhdx: BinaryIO) -> None: - v = VHDX(dynamic_vhdx) - - assert v.size == 0xA00000 - assert v.block_size == 0x2000000 - assert v.has_parent == 0 - assert v.sector_size == 0x200 - assert v.bat.chunk_ratio == 0x80 - assert v.id == UUID("788015f0-5e93-4bd2-a5de-b0cd8459db11") - - assert v.read(512) == bytes.fromhex( - "33c08ed0bc007c8ec08ed8be007cbf0006b90002fcf3a450681c06cbfbb90400" - "bdbe07807e00007c0b0f850e0183c510e2f1cd1888560055c6461105c6461000" - "b441bbaa55cd135d720f81fb55aa7509f7c101007403fe46106660807e100074" - "2666680000000066ff760868000068007c680100681000b4428a56008bf4cd13" - "9f83c4109eeb14b80102bb007c8a56008a76018a4e028a6e03cd136661731cfe" - "4e11750c807e00800f848a00b280eb845532e48a5600cd135deb9e813efe7d55" - "aa756eff7600e88d007517fab0d1e664e88300b0dfe660e87c00b0ffe664e875" - "00fbb800bbcd1a6623c0753b6681fb54435041753281f90201722c666807bb00" - "006668000200006668080000006653665366556668000000006668007c000066" - "6168000007cd1a5a32f6ea007c0000cd18a0b707eb08a0b607eb03a0b50732e4" - "0500078bf0ac3c007409bb0700b40ecd10ebf2f4ebfd2bc9e464eb002402e0f8" - "2402c3496e76616c696420706172746974696f6e207461626c65004572726f72" - "206c6f6164696e67206f7065726174696e672073797374656d004d697373696e" - "67206f7065726174696e672073797374656d000000637b9af93dad8c00000002" - "030007e525008000000000380000000000000000000000000000000000000000" - "00000000000000000000000000000000000000000000000000000000000055aa" - ) - - v.seek(0x200000) - assert v.read(512) == b"\xff" * 512 - - -def test_vhdx_differencing(differencing_vhdx: BinaryIO) -> None: - with pytest.raises(IOError, match="Failed to open parent disk with locator"): - VHDX(differencing_vhdx) +from tests._util import absolute_path + + +def test_vhdx_fixed() -> None: + with gzip.open(absolute_path("_data/disk/vhdx/fixed.vhdx.gz"), "rb") as fh: + v = VHDX(fh) + + assert v.size == 0xA00000 + assert v.block_size == 0x200000 + assert v.has_parent == 0 + assert v.sector_size == 0x200 + assert v.bat.chunk_ratio == 0x800 + assert v.id == UUID("4a49d245-db0a-4634-9818-9f93db5ba6c1") + + assert v.read(512) == bytes.fromhex( + "33c08ed0bc007c8ec08ed8be007cbf0006b90002fcf3a450681c06cbfbb90400" + "bdbe07807e00007c0b0f850e0183c510e2f1cd1888560055c6461105c6461000" + "b441bbaa55cd135d720f81fb55aa7509f7c101007403fe46106660807e100074" + "2666680000000066ff760868000068007c680100681000b4428a56008bf4cd13" + "9f83c4109eeb14b80102bb007c8a56008a76018a4e028a6e03cd136661731cfe" + "4e11750c807e00800f848a00b280eb845532e48a5600cd135deb9e813efe7d55" + "aa756eff7600e88d007517fab0d1e664e88300b0dfe660e87c00b0ffe664e875" + "00fbb800bbcd1a6623c0753b6681fb54435041753281f90201722c666807bb00" + "006668000200006668080000006653665366556668000000006668007c000066" + "6168000007cd1a5a32f6ea007c0000cd18a0b707eb08a0b607eb03a0b50732e4" + "0500078bf0ac3c007409bb0700b40ecd10ebf2f4ebfd2bc9e464eb002402e0f8" + "2402c3496e76616c696420706172746974696f6e207461626c65004572726f72" + "206c6f6164696e67206f7065726174696e672073797374656d004d697373696e" + "67206f7065726174696e672073797374656d000000637b9a8a3dad8c00000002" + "030007e525008000000000380000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000000000000055aa" + ) + + v.seek(0x200000) + assert v.read(512) == b"\xff" * 512 + + for block in range(v.bat._pb_count): + block_entry = v.bat.pb(block) + assert block_entry.state == c_vhdx.PAYLOAD_BLOCK_FULLY_PRESENT + + +def test_vhdx_dynamic() -> None: + with gzip.open(absolute_path("_data/disk/vhdx/dynamic.vhdx.gz"), "rb") as fh: + v = VHDX(fh) + + assert v.size == 0xA00000 + assert v.block_size == 0x2000000 + assert v.has_parent == 0 + assert v.sector_size == 0x200 + assert v.bat.chunk_ratio == 0x80 + assert v.id == UUID("788015f0-5e93-4bd2-a5de-b0cd8459db11") + + assert v.read(512) == bytes.fromhex( + "33c08ed0bc007c8ec08ed8be007cbf0006b90002fcf3a450681c06cbfbb90400" + "bdbe07807e00007c0b0f850e0183c510e2f1cd1888560055c6461105c6461000" + "b441bbaa55cd135d720f81fb55aa7509f7c101007403fe46106660807e100074" + "2666680000000066ff760868000068007c680100681000b4428a56008bf4cd13" + "9f83c4109eeb14b80102bb007c8a56008a76018a4e028a6e03cd136661731cfe" + "4e11750c807e00800f848a00b280eb845532e48a5600cd135deb9e813efe7d55" + "aa756eff7600e88d007517fab0d1e664e88300b0dfe660e87c00b0ffe664e875" + "00fbb800bbcd1a6623c0753b6681fb54435041753281f90201722c666807bb00" + "006668000200006668080000006653665366556668000000006668007c000066" + "6168000007cd1a5a32f6ea007c0000cd18a0b707eb08a0b607eb03a0b50732e4" + "0500078bf0ac3c007409bb0700b40ecd10ebf2f4ebfd2bc9e464eb002402e0f8" + "2402c3496e76616c696420706172746974696f6e207461626c65004572726f72" + "206c6f6164696e67206f7065726174696e672073797374656d004d697373696e" + "67206f7065726174696e672073797374656d000000637b9af93dad8c00000002" + "030007e525008000000000380000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000000000000055aa" + ) + + v.seek(0x200000) + assert v.read(512) == b"\xff" * 512 + + +def test_vhdx_differencing() -> None: + with ( + pytest.raises(IOError, match="Failed to open parent disk with locator"), + gzip.open(absolute_path("_data/disk/vhdx/differencing.avhdx.gz"), "rb") as fh, + ): + VHDX(fh) @pytest.mark.parametrize( diff --git a/tests/disk/test_vmdk.py b/tests/disk/test_vmdk.py index 467a872..851a85c 100644 --- a/tests/disk/test_vmdk.py +++ b/tests/disk/test_vmdk.py @@ -1,29 +1,31 @@ from __future__ import annotations -from typing import BinaryIO +import gzip import pytest from dissect.hypervisor.disk.c_vmdk import c_vmdk from dissect.hypervisor.disk.vmdk import VMDK, DiskDescriptor, ExtentDescriptor +from tests._util import absolute_path -def test_vmdk_sesparse(sesparse_vmdk: BinaryIO) -> None: - vmdk = VMDK(sesparse_vmdk) +def test_vmdk_sesparse() -> None: + with gzip.open(absolute_path("_data/disk/vmdk/sesparse.vmdk.gz"), "rb") as fh: + vmdk = VMDK(fh) - disk = vmdk.disks[0] + disk = vmdk.disks[0] - assert disk.is_sesparse - assert disk._grain_directory_size == 0x20000 - assert disk._grain_table_size == 0x1000 - assert disk._grain_entry_type == c_vmdk.uint64 - assert disk._grain_directory[0] == 0x1000000000000000 + assert disk.is_sesparse + assert disk._grain_directory_size == 0x20000 + assert disk._grain_table_size == 0x1000 + assert disk._grain_entry_type == c_vmdk.uint64 + assert disk._grain_directory[0] == 0x1000000000000000 - header = disk.header - assert header.magic == c_vmdk.SESPARSE_CONST_HEADER_MAGIC - assert header.version == 0x200000001 + header = disk.header + assert header.magic == c_vmdk.SESPARSE_CONST_HEADER_MAGIC + assert header.version == 0x200000001 - assert vmdk.read(0x1000000) == b"a" * 0x1000000 + assert vmdk.read(0x1000000) == b"a" * 0x1000000 @pytest.mark.parametrize( diff --git a/tests/util/test_envelope.py b/tests/util/test_envelope.py index 66bd8dc..c334301 100644 --- a/tests/util/test_envelope.py +++ b/tests/util/test_envelope.py @@ -1,7 +1,6 @@ from __future__ import annotations import hashlib -from typing import BinaryIO import pytest @@ -11,10 +10,11 @@ Envelope, KeyStore, ) +from tests._util import absolute_path -def test_envelope_keystore(keystore: BinaryIO) -> None: - store = KeyStore.from_text(keystore.read()) +def test_envelope_keystore() -> None: + store = KeyStore.from_text(absolute_path("_data/util/envelope/encryption.info").read_text()) assert store.store[".encoding"] == "UTF-8" assert store.store["includeKeyCache"] == "FALSE" @@ -31,8 +31,9 @@ def test_envelope_keystore(keystore: BinaryIO) -> None: assert store._key == bytes.fromhex("ae29634dca8627013f7c7cf2d05b4d5cc444d42cd4e8acbaa4fb815dda3b3066") -def test_envelope(envelope: BinaryIO) -> None: - ev = Envelope(envelope) +def test_envelope() -> None: + with absolute_path("_data/util/envelope/local.tgz.ve").open("rb") as fh: + ev = Envelope(fh) assert ev.key_info == "7e62cec5-6aef-4d7e-838b-cae32eefd251" assert ev.cipher_name == "AES-256-GCM" @@ -43,10 +44,12 @@ def test_envelope(envelope: BinaryIO) -> None: @pytest.mark.skipif((not HAS_PYCRYPTODOME and not HAS_PYSTANDALONE), reason="No crypto module available") -def test_envelope_decrypt(envelope: BinaryIO, keystore: BinaryIO) -> None: - ev = Envelope(envelope) - store = KeyStore.from_text(keystore.read()) +def test_envelope_decrypt() -> None: + with absolute_path("_data/util/envelope/local.tgz.ve").open("rb") as fh: + ev = Envelope(fh) + store = KeyStore.from_text(absolute_path("_data/util/envelope/encryption.info").read_text()) + + decrypted = ev.decrypt(store.key, aad=b"ESXConfiguration") - decrypted = ev.decrypt(store.key, aad=b"ESXConfiguration") assert len(decrypted) == 94293 assert hashlib.sha256(decrypted).hexdigest() == "fe131620351b9fd5fc4aef219bf3211340f3742464c038e1695e7b6667f86952" diff --git a/tests/util/test_vmtar.py b/tests/util/test_vmtar.py index 5fa359b..748d179 100644 --- a/tests/util/test_vmtar.py +++ b/tests/util/test_vmtar.py @@ -1,10 +1,10 @@ from __future__ import annotations -from typing import TYPE_CHECKING, BinaryIO +from typing import TYPE_CHECKING from dissect.hypervisor.tools.vmtar import main as vmtar_main from dissect.hypervisor.util import vmtar -from tests.conftest import absolute_path +from tests._util import absolute_path if TYPE_CHECKING: from pathlib import Path @@ -12,25 +12,26 @@ import pytest -def test_vmtar(vgz: BinaryIO) -> None: - tar = vmtar.open(fileobj=vgz) +def test_vmtar() -> None: + with absolute_path("_data/util/vmtar/test.vgz").open("rb") as fh: + tar = vmtar.open(fileobj=fh) - members = {member.name: member for member in tar.getmembers()} + members = {member.name: member for member in tar.getmembers()} - # The test file has no textPgs/fixUpPgs - assert all(member.is_visor for member in members.values()) - assert set(members.keys()) == { - "test/file1", - "test/file2", - "test/file3", - "test/subdir", - "test/subdir/file4", - } + # The test file has no textPgs/fixUpPgs + assert all(member.is_visor for member in members.values()) + assert set(members.keys()) == { + "test/file1", + "test/file2", + "test/file3", + "test/subdir", + "test/subdir/file4", + } - assert tar.extractfile(members["test/file1"]).read() == (b"a" * 512) + b"\n" - assert tar.extractfile(members["test/file2"]).read() == (b"b" * 1024) + b"\n" - assert tar.extractfile(members["test/file3"]).read() == (b"c" * 2048) + b"\n" - assert tar.extractfile(members["test/subdir/file4"]).read() == (b"f" * 2048) + b"\n" + assert tar.extractfile(members["test/file1"]).read() == (b"a" * 512) + b"\n" + assert tar.extractfile(members["test/file2"]).read() == (b"b" * 1024) + b"\n" + assert tar.extractfile(members["test/file3"]).read() == (b"c" * 2048) + b"\n" + assert tar.extractfile(members["test/subdir/file4"]).read() == (b"f" * 2048) + b"\n" def test_vmtar_tool(tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture) -> None: From aa365d67b93f405ae2a8c471bbf90eedcf7c15d6 Mon Sep 17 00:00:00 2001 From: Schamper <1254028+Schamper@users.noreply.github.com> Date: Tue, 20 Jan 2026 14:30:35 +0100 Subject: [PATCH 2/2] Improve VirtualBox descriptor parsing --- dissect/hypervisor/descriptor/vbox.py | 218 ++++++++++++++++++++- tests/_data/descriptor/vbox/GOAD-DC01.vbox | 201 +++++++++++++++++++ tests/_data/descriptor/vbox/encrypted.vbox | 103 ++++++++++ tests/descriptor/test_vbox.py | 96 +++++---- 4 files changed, 576 insertions(+), 42 deletions(-) create mode 100644 tests/_data/descriptor/vbox/GOAD-DC01.vbox create mode 100644 tests/_data/descriptor/vbox/encrypted.vbox diff --git a/dissect/hypervisor/descriptor/vbox.py b/dissect/hypervisor/descriptor/vbox.py index f79b208..c3d8581 100644 --- a/dissect/hypervisor/descriptor/vbox.py +++ b/dissect/hypervisor/descriptor/vbox.py @@ -1,22 +1,226 @@ from __future__ import annotations +from datetime import datetime +from functools import cached_property from typing import TYPE_CHECKING, TextIO +from uuid import UUID from defusedxml import ElementTree if TYPE_CHECKING: - from collections.abc import Iterator from xml.etree.ElementTree import Element +NS = "{http://www.virtualbox.org/}" + class VBox: - VBOX_XML_NAMESPACE = "{http://www.virtualbox.org/}" + """VirtualBox XML descriptor parser. + + Args: + fh: A file-like object of the VirtualBox XML descriptor. + """ def __init__(self, fh: TextIO): self._xml: Element = ElementTree.fromstring(fh.read()) + if self._xml.tag != f"{NS}VirtualBox": + raise ValueError("Invalid VirtualBox XML descriptor: root element is not VirtualBox") + + if (machine := self._xml.find(f"./{NS}Machine")) is None: + raise ValueError("Invalid VirtualBox XML descriptor: no Machine element found") + + if machine.find(f"./{NS}Hardware") is None: + raise ValueError("Invalid VirtualBox XML descriptor: no Hardware element found") + + self.machine = Machine(self, machine) + + def __repr__(self) -> str: + return f"" + + @property + def uuid(self) -> UUID | None: + """The VM UUID.""" + return self.machine.uuid + + @property + def name(self) -> str | None: + """The VM name.""" + return self.machine.name + + @property + def media(self) -> dict[UUID, HardDisk]: + """The media (disks) registry.""" + return self.machine.media + + @property + def hardware(self) -> Hardware: + """The current machine hardware state.""" + return self.machine.hardware + + @property + def snapshots(self) -> dict[UUID, Snapshot]: + """All snapshots.""" + return self.machine.snapshots + + +class Machine: + def __init__(self, vbox: VBox, element: Element): + self.vbox = vbox + self.element = element + + def __repr__(self) -> str: + return f"" + + @property + def uuid(self) -> UUID: + """The machine UUID.""" + return UUID(self.element.get("uuid").strip("{}")) + + @property + def name(self) -> str: + """The machine name.""" + return self.element.get("name") + + @property + def current_snapshot(self) -> UUID | None: + """The current snapshot UUID.""" + if (value := self.element.get("currentSnapshot")) is not None: + return UUID(value.strip("{}")) + return None + + @cached_property + def media(self) -> dict[UUID, HardDisk]: + """The media (disks) registry.""" + result = {} + + stack = [(None, element) for element in self.element.find(f"./{NS}MediaRegistry/{NS}HardDisks")] + while stack: + parent, element = stack.pop() + hdd = HardDisk(self, element, parent) + result[hdd.uuid] = hdd + + stack.extend([(hdd, child) for child in element.findall(f"./{NS}HardDisk")]) + + return result + + @cached_property + def hardware(self) -> Hardware: + """The machine hardware state.""" + return Hardware(self.vbox, self.element.find(f"./{NS}Hardware")) + + @cached_property + def snapshots(self) -> dict[UUID, Snapshot]: + """All snapshots.""" + result = {} + + if (element := self.element.find(f"./{NS}Snapshot")) is None: + return result + + stack = [(None, element)] + while stack: + parent, element = stack.pop() + snapshot = Snapshot(self.vbox, element, parent) + result[snapshot.uuid] = snapshot + + if (snapshots := element.find(f"./{NS}Snapshots")) is not None: + stack.extend([(snapshot, child) for child in list(snapshots)]) + + return result + + @property + def parent(self) -> Snapshot | None: + if (uuid := self.current_snapshot) is not None: + return self.vbox.snapshots[uuid] + return None + + +class HardDisk: + def __init__(self, vbox: VBox, element: Element, parent: HardDisk | None = None): + self.vbox = vbox + self.element = element + self.parent = parent + + def __repr__(self) -> str: + return f"" + + @property + def uuid(self) -> UUID: + """The disk UUID.""" + return UUID(self.element.get("uuid").strip("{}")) + + @property + def location(self) -> str: + """The disk location.""" + return self.element.get("location") + + @property + def type(self) -> str | None: + """The disk type.""" + return self.element.get("type") + + @property + def format(self) -> str: + """The disk format.""" + return self.element.get("format") + + @cached_property + def properties(self) -> dict[str, str]: + """The disk properties.""" + return {prop.get("name"): prop.get("value") for prop in self.element.findall(f"./{NS}Property")} + + @property + def is_encrypted(self) -> bool: + """Whether the disk is encrypted.""" + disk = self + while disk is not None: + if "CRYPT/KeyId" in disk.properties or "CRYPT/KeyStore" in disk.properties: + return True + disk = disk.parent + + return False + + +class Snapshot: + def __init__(self, vbox: VBox, element: Element, parent: Snapshot | Machine | None = None): + self.vbox = vbox + self.element = element + self.parent = parent + + def __repr__(self) -> str: + return f"" + + @property + def uuid(self) -> UUID: + """The snapshot UUID.""" + return UUID(self.element.get("uuid").strip("{}")) + + @property + def name(self) -> str: + """The snapshot name.""" + return self.element.get("name") + + @property + def ts(self) -> datetime: + """The snapshot timestamp.""" + return datetime.strptime(self.element.get("timeStamp"), "%Y-%m-%dT%H:%M:%S%z") + + @cached_property + def hardware(self) -> Hardware: + """The snapshot hardware state.""" + return Hardware(self.vbox, self.element.find(f"./{NS}Hardware")) + + +class Hardware: + def __init__(self, vbox: VBox, element: Element): + self.vbox = vbox + self.element = element + + def __repr__(self) -> str: + return f"" - def disks(self) -> Iterator[str]: - for hdd_elem in self._xml.findall(f".//{self.VBOX_XML_NAMESPACE}HardDisk[@location][@type='Normal']"): - # Allow format specifier to be case-insensitive (i.e. VDI, vdi) - if (format := hdd_elem.get("format")) and format.lower() == "vdi": - yield hdd_elem.attrib["location"] + @property + def disks(self) -> list[HardDisk]: + """All attached hard disks.""" + images = self.element.findall( + f"./{NS}StorageControllers/{NS}StorageController/{NS}AttachedDevice[@type='HardDisk']/{NS}Image" + ) + return [self.vbox.media[UUID(image.get("uuid").strip("{}"))] for image in images] diff --git a/tests/_data/descriptor/vbox/GOAD-DC01.vbox b/tests/_data/descriptor/vbox/GOAD-DC01.vbox new file mode 100644 index 0000000..d841ae4 --- /dev/null +++ b/tests/_data/descriptor/vbox/GOAD-DC01.vbox @@ -0,0 +1,201 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/_data/descriptor/vbox/encrypted.vbox b/tests/_data/descriptor/vbox/encrypted.vbox new file mode 100644 index 0000000..197020e --- /dev/null +++ b/tests/_data/descriptor/vbox/encrypted.vbox @@ -0,0 +1,103 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/descriptor/test_vbox.py b/tests/descriptor/test_vbox.py index 7702ffd..5c3f5eb 100644 --- a/tests/descriptor/test_vbox.py +++ b/tests/descriptor/test_vbox.py @@ -1,43 +1,69 @@ from __future__ import annotations -from io import StringIO +from uuid import UUID from dissect.hypervisor.descriptor.vbox import VBox +from tests._util import absolute_path -def test_vbox() -> None: - xml = """ - - - - - - - - - - - """ - - with StringIO(xml.strip()) as fh: +def test_vbox_snapshots() -> None: + """Test parsing VirtualBox XML descriptor with snapshots.""" + with absolute_path("_data/descriptor/vbox/GOAD-DC01.vbox").open() as fh: vbox = VBox(fh) - assert next(vbox.disks()) == "os2warp4.vdi" - - -def test_vbox_lowercase_disk_format() -> None: - xml = """ - - - - - - - - - - - """ - - with StringIO(xml.strip()) as fh: + + assert vbox.uuid == UUID("a6277950-3d1b-45d3-b2fd-dc1f385027e1") + assert vbox.name == "GOAD-DC01" + + # Check that the "current" disk is correct + assert len(vbox.machine.hardware.disks) == 1 + assert vbox.machine.hardware.disks[0].uuid == UUID("e6800503-8273-4f16-b584-c7ba2c1df698") + assert vbox.machine.hardware.disks[0].location == "Snapshots/{e6800503-8273-4f16-b584-c7ba2c1df698}.vmdk" + + assert vbox.machine.parent.uuid == UUID("264ccd7e-9ffd-45ba-bd0e-4e1968d3355a") + + # Just to verify that we resolve the snapshot state disks correctly + assert len(vbox.snapshots) == 2 + snapshot = vbox.snapshots[UUID("95b1572d-c893-48f0-9bc9-6a01a0fd2cb6")] + assert snapshot.parent is None + assert snapshot.uuid == UUID("95b1572d-c893-48f0-9bc9-6a01a0fd2cb6") + assert snapshot.name == "push_1766163705_7292" + assert snapshot.ts.isoformat() == "2025-12-19T17:01:46+00:00" + assert len(snapshot.hardware.disks) == 1 + assert snapshot.hardware.disks[0].uuid == UUID("0898f36f-01c3-4b43-8aeb-36ba7adaef95") + assert snapshot.hardware.disks[0].location == "WindowsServer2019-disk001.vmdk" + + snapshot = vbox.snapshots[UUID("264ccd7e-9ffd-45ba-bd0e-4e1968d3355a")] + assert snapshot.parent.uuid == UUID("95b1572d-c893-48f0-9bc9-6a01a0fd2cb6") + assert snapshot.uuid == UUID("264ccd7e-9ffd-45ba-bd0e-4e1968d3355a") + assert snapshot.name == "push_1766170151_8843" + assert snapshot.ts.isoformat() == "2025-12-19T18:49:11+00:00" + assert len(snapshot.hardware.disks) == 1 + assert snapshot.hardware.disks[0].uuid == UUID("3c72ec80-dc73-4448-a63e-97970cdd87e5") + assert snapshot.hardware.disks[0].location == "Snapshots/{3c72ec80-dc73-4448-a63e-97970cdd87e5}.vmdk" + + # Test the tree of disks snapshots + disk = vbox.media[UUID("706a96fe-0e11-4985-af32-7561d26612d4")] + assert disk.parent.uuid == UUID("35bf5129-1caa-4117-b20a-d73868d9d5d2") + assert disk.parent.parent.uuid == UUID("0898f36f-01c3-4b43-8aeb-36ba7adaef95") + assert disk.parent.parent.parent is None + + disk = vbox.media[UUID("e6800503-8273-4f16-b584-c7ba2c1df698")] + assert disk.parent.uuid == UUID("3c72ec80-dc73-4448-a63e-97970cdd87e5") + assert disk.parent.parent.uuid == UUID("0898f36f-01c3-4b43-8aeb-36ba7adaef95") + assert disk.parent.parent.parent is None + + +def test_vbox_encrypted() -> None: + with absolute_path("_data/descriptor/vbox/encrypted.vbox").open() as fh: vbox = VBox(fh) - assert next(vbox.disks()) == "WinDev2407Eval-disk001.vdi" + + disk = vbox.media[UUID("24cdb8e9-35d6-42f2-aa17-c2d78bf1e1de")] + assert disk.properties["CRYPT/KeyId"] == "encrypted test" + assert "CRYPT/KeyStore" in disk.properties + assert disk.is_encrypted + + disk = vbox.media[UUID("742e6d0f-8896-4aa6-97f4-e05d70e73029")] + assert disk.is_encrypted + + disk = vbox.media[UUID("1db0b9fe-36c2-44e8-9b7c-61fa5b6d1462")] + assert not disk.is_encrypted