-
Notifications
You must be signed in to change notification settings - Fork 9
Improve VirtualBox descriptor parsing #71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Schamper
wants to merge
2
commits into
main
Choose a base branch
from
improve-vbox
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,22 +1,226 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from datetime import datetime | ||
| from functools import cached_property | ||
| from typing import TYPE_CHECKING, TextIO | ||
| from uuid import UUID | ||
|
|
||
| from defusedxml import ElementTree | ||
|
|
||
| if TYPE_CHECKING: | ||
| from collections.abc import Iterator | ||
| from xml.etree.ElementTree import Element | ||
|
|
||
| NS = "{http://www.virtualbox.org/}" | ||
|
|
||
|
|
||
| class VBox: | ||
| VBOX_XML_NAMESPACE = "{http://www.virtualbox.org/}" | ||
| """VirtualBox XML descriptor parser. | ||
|
|
||
| Args: | ||
| fh: A file-like object of the VirtualBox XML descriptor. | ||
| """ | ||
|
|
||
| def __init__(self, fh: TextIO): | ||
| self._xml: Element = ElementTree.fromstring(fh.read()) | ||
| if self._xml.tag != f"{NS}VirtualBox": | ||
| raise ValueError("Invalid VirtualBox XML descriptor: root element is not VirtualBox") | ||
|
|
||
| if (machine := self._xml.find(f"./{NS}Machine")) is None: | ||
| raise ValueError("Invalid VirtualBox XML descriptor: no Machine element found") | ||
|
|
||
| if machine.find(f"./{NS}Hardware") is None: | ||
| raise ValueError("Invalid VirtualBox XML descriptor: no Hardware element found") | ||
|
|
||
| self.machine = Machine(self, machine) | ||
|
|
||
| def __repr__(self) -> str: | ||
| return f"<VBox uuid={self.uuid} name={self.name}>" | ||
|
|
||
| @property | ||
| def uuid(self) -> UUID | None: | ||
| """The VM UUID.""" | ||
| return self.machine.uuid | ||
|
|
||
| @property | ||
| def name(self) -> str | None: | ||
| """The VM name.""" | ||
| return self.machine.name | ||
|
|
||
| @property | ||
| def media(self) -> dict[UUID, HardDisk]: | ||
| """The media (disks) registry.""" | ||
| return self.machine.media | ||
|
|
||
| @property | ||
| def hardware(self) -> Hardware: | ||
| """The current machine hardware state.""" | ||
| return self.machine.hardware | ||
|
|
||
| @property | ||
| def snapshots(self) -> dict[UUID, Snapshot]: | ||
| """All snapshots.""" | ||
| return self.machine.snapshots | ||
|
|
||
|
|
||
| class Machine: | ||
| def __init__(self, vbox: VBox, element: Element): | ||
| self.vbox = vbox | ||
| self.element = element | ||
|
|
||
| def __repr__(self) -> str: | ||
| return f"<Machine uuid={self.uuid} name={self.name}>" | ||
|
|
||
| @property | ||
| def uuid(self) -> UUID: | ||
| """The machine UUID.""" | ||
| return UUID(self.element.get("uuid").strip("{}")) | ||
|
|
||
| @property | ||
| def name(self) -> str: | ||
| """The machine name.""" | ||
| return self.element.get("name") | ||
|
|
||
| @property | ||
| def current_snapshot(self) -> UUID | None: | ||
| """The current snapshot UUID.""" | ||
| if (value := self.element.get("currentSnapshot")) is not None: | ||
| return UUID(value.strip("{}")) | ||
| return None | ||
|
|
||
| @cached_property | ||
| def media(self) -> dict[UUID, HardDisk]: | ||
| """The media (disks) registry.""" | ||
| result = {} | ||
|
|
||
| stack = [(None, element) for element in self.element.find(f"./{NS}MediaRegistry/{NS}HardDisks")] | ||
| while stack: | ||
| parent, element = stack.pop() | ||
| hdd = HardDisk(self, element, parent) | ||
| result[hdd.uuid] = hdd | ||
|
|
||
| stack.extend([(hdd, child) for child in element.findall(f"./{NS}HardDisk")]) | ||
|
|
||
| return result | ||
|
|
||
| @cached_property | ||
| def hardware(self) -> Hardware: | ||
| """The machine hardware state.""" | ||
| return Hardware(self.vbox, self.element.find(f"./{NS}Hardware")) | ||
|
|
||
| @cached_property | ||
| def snapshots(self) -> dict[UUID, Snapshot]: | ||
| """All snapshots.""" | ||
| result = {} | ||
|
|
||
| if (element := self.element.find(f"./{NS}Snapshot")) is None: | ||
| return result | ||
|
|
||
| stack = [(None, element)] | ||
| while stack: | ||
| parent, element = stack.pop() | ||
| snapshot = Snapshot(self.vbox, element, parent) | ||
| result[snapshot.uuid] = snapshot | ||
|
|
||
| if (snapshots := element.find(f"./{NS}Snapshots")) is not None: | ||
| stack.extend([(snapshot, child) for child in list(snapshots)]) | ||
|
|
||
| return result | ||
|
|
||
| @property | ||
| def parent(self) -> Snapshot | None: | ||
| if (uuid := self.current_snapshot) is not None: | ||
| return self.vbox.snapshots[uuid] | ||
| return None | ||
|
|
||
|
|
||
| class HardDisk: | ||
| def __init__(self, vbox: VBox, element: Element, parent: HardDisk | None = None): | ||
| self.vbox = vbox | ||
| self.element = element | ||
| self.parent = parent | ||
|
|
||
| def __repr__(self) -> str: | ||
| return f"<HardDisk uuid={self.uuid} location={self.location}>" | ||
|
|
||
| @property | ||
| def uuid(self) -> UUID: | ||
| """The disk UUID.""" | ||
| return UUID(self.element.get("uuid").strip("{}")) | ||
|
|
||
| @property | ||
| def location(self) -> str: | ||
| """The disk location.""" | ||
| return self.element.get("location") | ||
|
|
||
| @property | ||
| def type(self) -> str | None: | ||
| """The disk type.""" | ||
| return self.element.get("type") | ||
|
|
||
| @property | ||
| def format(self) -> str: | ||
| """The disk format.""" | ||
| return self.element.get("format") | ||
|
|
||
| @cached_property | ||
| def properties(self) -> dict[str, str]: | ||
| """The disk properties.""" | ||
| return {prop.get("name"): prop.get("value") for prop in self.element.findall(f"./{NS}Property")} | ||
|
|
||
| @property | ||
| def is_encrypted(self) -> bool: | ||
| """Whether the disk is encrypted.""" | ||
| disk = self | ||
| while disk is not None: | ||
| if "CRYPT/KeyId" in disk.properties or "CRYPT/KeyStore" in disk.properties: | ||
| return True | ||
| disk = disk.parent | ||
|
|
||
| return False | ||
|
|
||
|
|
||
| class Snapshot: | ||
| def __init__(self, vbox: VBox, element: Element, parent: Snapshot | Machine | None = None): | ||
| self.vbox = vbox | ||
| self.element = element | ||
| self.parent = parent | ||
|
|
||
| def __repr__(self) -> str: | ||
| return f"<Snapshot uuid={self.uuid} name={self.name}>" | ||
|
|
||
| @property | ||
| def uuid(self) -> UUID: | ||
| """The snapshot UUID.""" | ||
| return UUID(self.element.get("uuid").strip("{}")) | ||
|
|
||
| @property | ||
| def name(self) -> str: | ||
| """The snapshot name.""" | ||
| return self.element.get("name") | ||
|
|
||
| @property | ||
| def ts(self) -> datetime: | ||
| """The snapshot timestamp.""" | ||
| return datetime.strptime(self.element.get("timeStamp"), "%Y-%m-%dT%H:%M:%S%z") | ||
|
|
||
| @cached_property | ||
| def hardware(self) -> Hardware: | ||
| """The snapshot hardware state.""" | ||
| return Hardware(self.vbox, self.element.find(f"./{NS}Hardware")) | ||
|
|
||
|
|
||
| class Hardware: | ||
| def __init__(self, vbox: VBox, element: Element): | ||
| self.vbox = vbox | ||
| self.element = element | ||
|
|
||
Schamper marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| def __repr__(self) -> str: | ||
| return f"<Hardware disks={len(self.disks)}>" | ||
|
|
||
| def disks(self) -> Iterator[str]: | ||
| for hdd_elem in self._xml.findall(f".//{self.VBOX_XML_NAMESPACE}HardDisk[@location][@type='Normal']"): | ||
| # Allow format specifier to be case-insensitive (i.e. VDI, vdi) | ||
| if (format := hdd_elem.get("format")) and format.lower() == "vdi": | ||
| yield hdd_elem.attrib["location"] | ||
| @property | ||
| def disks(self) -> list[HardDisk]: | ||
| """All attached hard disks.""" | ||
| images = self.element.findall( | ||
| f"./{NS}StorageControllers/{NS}StorageController/{NS}AttachedDevice[@type='HardDisk']/{NS}Image" | ||
| ) | ||
| return [self.vbox.media[UUID(image.get("uuid").strip("{}"))] for image in images] | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it not be useful to also have the format of the disk available? Not for target-query, but for the package as its own entity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also noticed that a HardDisk can have an additional
<Property name="CRYPT/KeyID">and<Property name="CRYPT/KeyStore">. These exist when you tell virtualbox that it should encrypt a vm. It creates it for every associated disk in that case.I think it might be useful to note down if the hard disk is encrypted. I can give you an example file if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have an example, sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From how I interpreted it, the encryption holds for all parent disks and their children.
In the case above I added another disk after enabling the "Encrypt all disks" tho it seems it doesn't add a key for that one. Whereas it does add the properties if you enable encryption with multiple disks attached to the vm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the second disk is encrypted? Presumably using the same key as the other one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second disk is not encrypted. Had to verify it to be sure. Tho the wording "encrypt all disks" made me assume otherwise at first.