From 602cf0829defefbff5c718522380f2ba2671b38f Mon Sep 17 00:00:00 2001 From: Stephen Eckels Date: Tue, 8 Oct 2024 20:57:34 +0000 Subject: [PATCH] Port virtualbox scripts to VBoxManage CLI Port `vbox-adapter-check.py`, `vbox-clean-snapshots.py`, and `vbox-export-snapshots.py` from using the VirtualBox API (via the `virtualbox` Python library) to use VBoxManage CLI. --- .gitignore | 5 + virtualbox/README.md | 2 +- virtualbox/vbox-adapter-check.py | 188 +++++++++++++++++++--------- virtualbox/vbox-clean-snapshots.py | 132 +++++++++++++------ virtualbox/vbox-export-snapshots.py | 150 +++++++++++++++++----- virtualbox/vboxcommon.py | 138 ++++++++++++++++++++ 6 files changed, 486 insertions(+), 129 deletions(-) create mode 100644 virtualbox/vboxcommon.py diff --git a/.gitignore b/.gitignore index 4d5a0f7..af9d94d 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ *.class *.o *.so +__pycache__/ # Packages # ############ @@ -35,3 +36,7 @@ Thumbs.db # Pycharm artifacts ################### .idea + +# vscode +# ################# +.vscode/ diff --git a/virtualbox/README.md b/virtualbox/README.md index f4f07de..3209a60 100644 --- a/virtualbox/README.md +++ b/virtualbox/README.md @@ -73,7 +73,7 @@ Cleaning FLARE-VM.20240604 šŸ«§ Snapshots to delete: VM state: Paused āš ļø Snapshot deleting is slower in a running VM and may fail in a changing state -Confirm deletion ('y'):y +Confirm deletion (press 'y'):y Deleting... (this may take some time, go for an šŸ¦!) šŸ«§ DELETED 'Snapshot 1' diff --git a/virtualbox/vbox-adapter-check.py b/virtualbox/vbox-adapter-check.py index 7eb5a30..5c6cc28 100755 --- a/virtualbox/vbox-adapter-check.py +++ b/virtualbox/vbox-adapter-check.py @@ -1,57 +1,120 @@ #!/usr/bin/python3 +import argparse +import re import sys import textwrap -import argparse -import virtualbox -from virtualbox.library import NetworkAttachmentType as NetType -from gi.repository import Notify - -DYNAMIC_VM_NAME = '.dynamic' -DISABLED_ADAPTER_TYPE = NetType.host_only -ALLOWED_ADAPTER_TYPES = (NetType.host_only, NetType.internal, NetType.null) - -ENABLED_STRS = ('Disabled','Enabled ') - -def check_and_disable_internet_access(session, machine_name, max_adapters, skip_disabled, do_not_modify): - """ - Checks if a VM's network adapter is set to an internet-accessible mode - and disables it if necessary, showing a warning popup. - - Args: - session: The session of the virtual machine to check. - """ - adapters_with_internet = [] - for i in range(max_adapters): - adapter = session.machine.get_network_adapter(i) - - if skip_disabled and not adapter.enabled: - continue - print(f"{machine_name} {i+1}: {ENABLED_STRS[adapter.enabled]} {adapter.attachment_type}") +import gi - if DYNAMIC_VM_NAME in machine_name and adapter.attachment_type not in ALLOWED_ADAPTER_TYPES: - adapters_with_internet.append(i) - if not do_not_modify: - # Disable the adapter - adapter.attachment_type = DISABLED_ADAPTER_TYPE +gi.require_version("Notify", "0.7") +from gi.repository import Notify - if adapters_with_internet: - adapters_str = ", ".join(str(i+1) for i in adapters_with_internet) - if do_not_modify: - message = f"{machine_name} may be connected to the internet on adapter(s): {adapters_str}. Please double check your VMs settings." +from vboxcommon import * + +DYNAMIC_VM_NAME = ".dynamic" +DISABLED_ADAPTER_TYPE = "hostonly" +ALLOWED_ADAPTER_TYPES = ("hostonly", "intnet", "none") + + +def get_vm_uuids(dynamic_only): + """Gets the machine UUID(s) for a given VM name using 'VBoxManage list vms'.""" + machine_guids = [] + try: + # regex VM name and extract the GUID + # "FLARE-VM.testing" {b76d628b-737f-40a3-9a16-c5f66ad2cfcc} + vms_output = run_vboxmanage(["list", "vms"]) + pattern = r'"(.*?)" \{(.*?)\}' + matches = re.findall(pattern, vms_output) + for match in matches: + vm_name = match[0] + machine_guid = match[1] + # either get all vms if dynamic_only false, or just the dynamic vms if true + if (not dynamic_only) or DYNAMIC_VM_NAME in vm_name: + machine_guids.append((vm_name, machine_guid)) + except Exception as e: + raise Exception(f"Error finding machines UUIDs") from e + return machine_guids + + +def change_network_adapters_to_hostonly( + machine_guid, vm_name, hostonly_ifname, do_not_modify +): + """Verify all adapters are in an allowed configuration. Must be poweredoff""" + try: + # gather adapters in incorrect configurations + nics_with_internet = [] + invalid_nics_msg = "" + + # nic1="hostonly" + # nictype1="82540EM" + # nicspeed1="0" + # nic2="none" + # nic3="none" + # nic4="none" + # nic5="none" + # nic6="none" + # nic7="none" + # nic8="none" + + vminfo = run_vboxmanage(["showvminfo", machine_guid, "--machinereadable"]) + for nic_number, nic_value in re.findall( + '^nic(\d+)="(\S+)"', vminfo, flags=re.M + ): + if nic_value not in ALLOWED_ADAPTER_TYPES: + nics_with_internet.append(f"nic{nic_number}") + invalid_nics_msg += f"{nic_number} " + + # modify the invalid adapters if allowed + if nics_with_internet: + for nic in nics_with_internet: + if do_not_modify: + message = f"{vm_name} may be connected to the internet on adapter(s): {nic}. Please double check your VMs settings." + else: + message = f"{vm_name} may be connected to the internet on adapter(s): {nic}. The network adapter(s) have been disabled automatically to prevent an undesired internet connectivity. Please double check your VMs settings." + # different commands are necessary if the machine is running. + if get_vm_state(machine_guid) == "poweroff": + run_vboxmanage( + [ + "modifyvm", + machine_guid, + f"--{nic}", + DISABLED_ADAPTER_TYPE, + ] + ) + else: + run_vboxmanage( + [ + "controlvm", + machine_guid, + nic, + "hostonly", + hostonly_ifname, + ] + ) + print(f"Set VM {vm_name} adaper {nic} to hostonly") + + if do_not_modify: + message = f"{vm_name} may be connected to the internet on adapter(s): {invalid_nics_msg}. Please double check your VMs settings." + else: + message = f"{vm_name} may be connected to the internet on adapter(s): {invalid_nics_msg}. The network adapter(s) have been disabled automatically to prevent an undesired internet connectivity. Please double check your VMs settings." + + # Show notification using PyGObject + Notify.init("VirtualBox adapter check") + notification = Notify.Notification.new( + f"INTERNET IN VM: {vm_name}", message, "dialog-error" + ) + # Set highest priority + notification.set_urgency(2) + notification.show() + print(f"{vm_name} network configuration not ok, sent notifaction") + return else: - message = f"{machine_name} may be connected to the internet on adapter(s): {adapters_str}. The network adapter(s) have been disabled automatically to prevent an undesired internet connectivity. Please double check your VMs settings." - - # Show notification using PyGObject - Notify.init("VirtualBox adapter check") - notification = Notify.Notification.new(f"INTERNET IN VM: {machine_name}", message, "dialog-error") - # Set highest priority - notification.set_urgency(2) - notification.show() + print(f"{vm_name} network configuration is ok") + return - session.machine.save_settings() - session.unlock_machine() + except Exception as e: + raise Exception("Failed to verify VM adapter configuration") from e def main(argv=None): @@ -66,12 +129,6 @@ def main(argv=None): # Print status of all internet adapters without modifying any of them vbox-adapter-check.vm --do_not_modify - - # Print status of enabled internet adapters and disabled the enabled adapters with internet access in VMs with {DYNAMIC_VM_NAME} in the name - vbox-adapter-check.vm --skip_disabled - - # # Print status of enabled internet adapters without modifying any of them - vbox-adapter-check.vm --skip_disabled --do_not_modify """ ) parser = argparse.ArgumentParser( @@ -79,15 +136,30 @@ def main(argv=None): epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter, ) - parser.add_argument("--do_not_modify", action="store_true", help="Only print the status of the internet adapters without modifying them.") - parser.add_argument("--skip_disabled", action="store_true", help="Skip the disabled adapters.") + parser.add_argument( + "--do_not_modify", + action="store_true", + help="Only print the status of the internet adapters without modifying them.", + ) + parser.add_argument( + "--dynamic_only", + action="store_true", + help="Only scan VMs with .dynamic in the name", + ) args = parser.parse_args(args=argv) - vbox = virtualbox.VirtualBox() - for machine in vbox.machines: - session = machine.create_session() - max_adapters = vbox.system_properties.get_max_network_adapters(machine.chipset_type) - check_and_disable_internet_access(session, machine.name, max_adapters, args.skip_disabled, args.do_not_modify) + try: + hostonly_ifname = ensure_hostonlyif_exists() + machine_guids = get_vm_uuids(args.dynamic_only) + if len(machine_guids) > 0: + for vm_name, machine_guid in machine_guids: + change_network_adapters_to_hostonly( + machine_guid, vm_name, hostonly_ifname, args.do_not_modify + ) + else: + print(f"[Warning āš ļø] No VMs found") + except Exception as e: + print(f"Error verifying dynamic VM hostonly configuration: {e}") if __name__ == "__main__": diff --git a/virtualbox/vbox-clean-snapshots.py b/virtualbox/vbox-clean-snapshots.py index fa3148c..f78a38a 100755 --- a/virtualbox/vbox-clean-snapshots.py +++ b/virtualbox/vbox-clean-snapshots.py @@ -1,51 +1,105 @@ #!/usr/bin/python3 -import sys import argparse +import re +import sys import textwrap -import virtualbox -from virtualbox.library import MachineState - - -TO_DELETE = [] - -def get_snapshots_to_delete(snapshot, protected_snapshots): - for child in snapshot.children: - get_snapshots_to_delete(child, protected_snapshots) - snapshot_name = snapshot.name.lower() - for protected_str in protected_snapshots: - if protected_str.lower() in snapshot_name: - return - TO_DELETE.append((snapshot.name, snapshot.id_p)) +from vboxcommon import * + + +def get_snapshot_children(vm_name, root_snapshot_name, protected_snapshots): + """Recursively gets the children of a snapshot using 'VBoxManage showvminfo'. + + Args: + vm_name: The name of the VM. + root_snapshot_name: The name of the root snapshot we want the children of. + protected_snapshots: snapshots we ignore and do not include in the returned list + + Returns: + A list of snapshot names that are children of the given snapshot. The list is ordered by dependent relationships. + """ + try: + # SnapshotName="Fresh" + # SnapshotUUID="8da3571a-1c66-4c3e-8a22-a87973253ae8" + # SnapshotName-1="FLARE-VM" + # SnapshotUUID-1="23d7b5f3-2e9a-41ef-a908-89b9ac873033" + # SnapshotName-1-1="Child2Snapshot" + # SnapshotUUID-1-1="adf91b7d-403f-478b-9bb4-89c477081dd6" + # SnapshotName-2="Child1SnapshotTesting" + # SnapshotUUID-2="db50b1e9-f51c-4308-b577-da5a41e01068" + # Fresh + # ā”œā”€ FLARE-VM + # ā”‚ ā””ā”€ Child2Snapshot + # ā””ā”€ Child1SnapshotTesting + # Current State + + vminfo = run_vboxmanage(["showvminfo", vm_name, "--machinereadable"]) + # Find all snapshot names + snapshot_regex = rf"(SnapshotName(?:-\d+)*)=\"(.*?)\"" + snapshots = re.findall(snapshot_regex, vminfo, flags=re.M) + + children = [] + + # find the root SnapshotName by matching the name + root_snapshotid = None + for snapshotid, snapshot_name in snapshots: + if snapshot_name.lower() == root_snapshot_name.lower() and ( + not any(p.lower() in snapshot_name.lower() for p in protected_snapshots) + ): + root_snapshotid = snapshotid + + if not root_snapshotid: + print("Failed to find root snapshot") + raise Exception(f"Failed to find root snapshot {snapshot_name}") + + # children of that snapshot share the same prefix id + dependant_child = False + for snapshotid, snapshot_name in snapshots: + if snapshotid.startswith(root_snapshotid): + if not any( + p.lower() in snapshot_name.lower() for p in protected_snapshots + ): + children.append((snapshotid, snapshot_name)) + else: + dependant_child = True + + # remove the root snapshot if any children are protected OR it's the current snapshot + if dependant_child: + print("Root snapshot cannot be deleted as a child snapshot is protected") + children = [ + snapshot for snapshot in children if snapshot[0] != root_snapshotid + ] + return children + except Exception as e: + raise Exception(f"Could not get snapshot children for '{vm_name}'") from e def delete_snapshot_and_children(vm_name, snapshot_name, protected_snapshots): - vbox = virtualbox.VirtualBox() - vm = vbox.find_machine(vm_name) - snapshot = vm.find_snapshot(snapshot_name) - get_snapshots_to_delete(snapshot, protected_snapshots) + snaps_to_delete = get_snapshot_children(vm_name, snapshot_name, protected_snapshots) - if TO_DELETE: + if snaps_to_delete: print(f"\nCleaning {vm_name} šŸ«§ Snapshots to delete:") - for name, _ in TO_DELETE: - print(f" {name}") + for snapshotid, snapshot_name in snaps_to_delete: + print(f" {snapshot_name}") - if vm.state not in (MachineState.powered_off, MachineState.saved): - print(f"\nVM state: {vm.state}\nāš ļø Snapshot deleting is slower in a running VM and may fail in a changing state") + vm_state = get_vm_state(vm_name) + if vm_state not in ("poweroff", "saved"): + print( + f"\nVM state: {vm_state}\nāš ļø Snapshot deleting is slower in a running VM and may fail in a changing state" + ) - answer = input("\nConfirm deletion ('y'):") + answer = input("\nConfirm deletion (press 'y'):") if answer.lower() == "y": print("\nDeleting... (this may take some time, go for an šŸ¦!)") - session = vm.create_session() - for name, uuid in TO_DELETE: + for snapshotid, snapshot_name in reversed( + snaps_to_delete + ): # delete in reverse order to avoid issues with child snapshots try: - progress = session.machine.delete_snapshot(uuid) - progress.wait_for_completion(-1) - print(f" šŸ«§ DELETED '{name}'") + run_vboxmanage(["snapshot", vm_name, "delete", snapshot_name]) + print(f" šŸ«§ DELETED '{snapshot_name}'") except Exception as e: - print(f" āŒ ERROR '{name}': {e}") - session.unlock_machine() + print(f" āŒ ERROR '{snapshot_name}'\n{e}") else: print(f"\n{vm_name} is clean šŸ«§") @@ -67,10 +121,10 @@ def main(argv=None): # Delete the 'CLEAN with IDA 8.4' children snapshots recursively skipping the ones that include 'clean' or 'done' in the name (case insensitive) in the 'FLARE-VM.20240604' VM # NOTE: the 'CLEAN with IDA 8.4' root snapshot is skipped in this case - vbox-clean-snapshots.py FLARE-VM.20240604 --root_snapshot 'CLEAN with IDA 8.4' + vbox-clean-snapshots.py FLARE-VM.20240604 --root_snapshot CLEAN with IDA 8.4 # Delete the 'Snapshot 3' snapshot and its children recursively skipping the ones that include 'clean' or 'done' in the name (case insensitive) in the 'FLARE-VM.20240604' VM - vbox-clean-snapshots.py FLARE-VM.20240604 --root_snapshot 'Snapshot 3' + vbox-clean-snapshots.py FLARE-VM.20240604 --root_snapshot Snapshot 3 # Delete all snapshots in the 'FLARE-VM.20240604' VM vbox-clean-snapshots.py FLARE-VM.20240604 --protected_snapshots "" @@ -82,7 +136,11 @@ def main(argv=None): formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("vm_name", help="Name of the VM to clean up") - parser.add_argument("--root_snapshot", default="", help="Snapshot to delete (and its children recursively). Leave empty to clean all snapshots in the VM.") + parser.add_argument( + "--root_snapshot", + default="", + help="Snapshot to delete (and its children recursively). Leave empty to clean all snapshots in the VM.", + ) parser.add_argument( "--protected_snapshots", default="clean,done", @@ -91,7 +149,9 @@ def main(argv=None): ) args = parser.parse_args(args=argv) - delete_snapshot_and_children(args.vm_name, args.root_snapshot, args.protected_snapshots) + delete_snapshot_and_children( + args.vm_name, args.root_snapshot, args.protected_snapshots + ) if __name__ == "__main__": diff --git a/virtualbox/vbox-export-snapshots.py b/virtualbox/vbox-export-snapshots.py index 8defebe..f98ce70 100755 --- a/virtualbox/vbox-export-snapshots.py +++ b/virtualbox/vbox-export-snapshots.py @@ -5,16 +5,16 @@ The exported VM names start with "FLARE-VM.{date}". """ -import os import hashlib -import virtualbox -from virtualbox.library import VirtualSystemDescriptionType as DescType -from virtualbox.library import NetworkAttachmentType as NetType -from virtualbox.library import ExportOptions as ExportOps +import os +import re from datetime import datetime +from vboxcommon import * + # Base name of the exported VMs EXPORTED_VM_NAME = "FLARE-VM" + # Name of the VM to export the snapshots from VM_NAME = f"{EXPORTED_VM_NAME}.testing" @@ -27,9 +27,21 @@ # - VM name extension (exported VM name: "FLARE-VM..