From d663b5d211fd5e6a2639a04f51c3e135c81aa193 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mathieu=20Dupr=C3=A9?= Date: Mon, 9 Mar 2026 18:19:02 +0100 Subject: [PATCH 1/2] tests: add pytest integration tests for cluster mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 Signed-off-by: Mathieu Dupré --- .github/workflows/ci.yml | 2 +- tests/test_vm_manager_cluster.py | 623 +++++++++++++++++++++++++++++++ 2 files changed, 624 insertions(+), 1 deletion(-) create mode 100644 tests/test_vm_manager_cluster.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d6d84e1..92ad6d8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -54,7 +54,7 @@ jobs: run: pip install ".[test]" - name: Run tests - run: sg libvirt -c "pytest tests/ -v --tb=short" + run: sg libvirt -c "pytest tests/ -v --tb=short --ignore=tests/test_vm_manager_cluster.py" - name: Install documentation dependencies run: pip install ".[docs]" diff --git a/tests/test_vm_manager_cluster.py b/tests/test_vm_manager_cluster.py new file mode 100644 index 0000000..26c083b --- /dev/null +++ b/tests/test_vm_manager_cluster.py @@ -0,0 +1,623 @@ +# Copyright (C) 2025, RTE (http://www.rte-france.com) +# Copyright (C) 2026 Savoir-faire Linux Inc. +# SPDX-License-Identifier: Apache-2.0 + +import os +import secrets +import subprocess +import tempfile +import xml.etree.ElementTree as ElementTree + +import pytest + +from vm_manager import vm_manager_cluster as vmc +from vm_manager.helpers.libvirt import LibVirtManager + +TESTDATA_XML_PATH = os.path.join( + os.path.dirname(__file__), + "..", + "vm_manager", + "testdata", + "vm.xml", +) + + +def _read_test_xml(): + with open(TESTDATA_XML_PATH) as f: + return f.read() + + +@pytest.fixture +def vm_name(): + """Generate a unique VM name and ensure cleanup after test.""" + name = "testvm" + secrets.token_hex(4) + yield name + # Cleanup: remove VM from all subsystems + try: + vmc.remove(name) + except Exception: + pass + + +@pytest.fixture +def second_vm_name(): + """Generate a second unique VM name for clone tests.""" + name = "testvm" + secrets.token_hex(4) + yield name + try: + vmc.remove(name) + except Exception: + pass + + +@pytest.fixture +def qcow2_image(): + """Create a small temporary qcow2 image for testing.""" + with tempfile.NamedTemporaryFile(suffix=".qcow2", delete=False) as f: + path = f.name + subprocess.run( + ["qemu-img", "create", "-f", "qcow2", path, "64M"], + check=True, + capture_output=True, + ) + yield path + try: + os.unlink(path) + except OSError: + pass + + +@pytest.fixture +def created_vm(vm_name, qcow2_image): + """Create a VM and return its name. Enables by default.""" + xml = _read_test_xml() + vmc.create( + { + "name": vm_name, + "image": qcow2_image, + "base_xml": xml, + } + ) + return vm_name + + +@pytest.fixture +def disabled_vm(vm_name, qcow2_image): + """Create a disabled VM and return its name.""" + xml = _read_test_xml() + vmc.create( + { + "name": vm_name, + "image": qcow2_image, + "base_xml": xml, + "enable": False, + } + ) + return vm_name + + +# ── _check_name ────────────────────────────────────────────────────── + + +class TestCheckName: + def test_valid_alphanumeric(self): + vmc._check_name("myvm1") + + def test_reserved_name_raises(self): + with pytest.raises(ValueError, match="reserved word"): + vmc._check_name("xml") + + def test_empty_string_raises(self): + with pytest.raises(ValueError): + vmc._check_name("") + + def test_special_chars_raises(self): + with pytest.raises(ValueError): + vmc._check_name("my-vm") + + def test_spaces_raises(self): + with pytest.raises(ValueError): + vmc._check_name("my vm") + + def test_underscore_raises(self): + with pytest.raises(ValueError): + vmc._check_name("my_vm") + + def test_non_string_raises(self): + with pytest.raises(ValueError): + vmc._check_name(123) + + def test_none_raises(self): + with pytest.raises(ValueError): + vmc._check_name(None) + + +# ── list_vms ───────────────────────────────────────────────────────── + + +class TestListVms: + def test_returns_list(self): + result = vmc.list_vms() + assert isinstance(result, list) + + def test_returns_list_enabled(self): + result = vmc.list_vms(enabled=True) + assert isinstance(result, list) + + def test_created_vm_in_list(self, created_vm): + assert created_vm in vmc.list_vms() + + def test_created_vm_in_enabled_list(self, created_vm): + assert created_vm in vmc.list_vms(enabled=True) + + def test_disabled_vm_in_all_list(self, disabled_vm): + assert disabled_vm in vmc.list_vms() + + def test_disabled_vm_not_in_enabled_list(self, disabled_vm): + assert disabled_vm not in vmc.list_vms(enabled=True) + + +# ── create ─────────────────────────────────────────────────────────── + + +class TestCreate: + def test_create_vm(self, created_vm): + assert created_vm in vmc.list_vms() + + def test_create_invalid_name_raises(self, qcow2_image): + with pytest.raises(ValueError): + vmc.create( + { + "name": "test-vm", + "image": qcow2_image, + "base_xml": _read_test_xml(), + } + ) + + def test_create_missing_image_raises(self): + with pytest.raises(IOError): + vmc.create( + { + "name": "testvm", + "image": "/nonexistent/disk.qcow2", + "base_xml": _read_test_xml(), + } + ) + + def test_create_invalid_metadata_type_raises(self, qcow2_image): + with pytest.raises(ValueError, match="metadata"): + vmc.create( + { + "name": "testvm", + "image": qcow2_image, + "base_xml": _read_test_xml(), + "metadata": "not_a_dict", + } + ) + + def test_create_disabled(self, disabled_vm): + assert vmc.status(disabled_vm) == "Disabled" + + def test_create_already_exists_raises(self, created_vm, qcow2_image): + with pytest.raises(Exception, match="already exists"): + vmc.create( + { + "name": created_vm, + "image": qcow2_image, + "base_xml": _read_test_xml(), + } + ) + + def test_create_with_force(self, created_vm, qcow2_image): + vmc.create( + { + "name": created_vm, + "image": qcow2_image, + "base_xml": _read_test_xml(), + "force": True, + } + ) + assert created_vm in vmc.list_vms() + + def test_create_with_metadata(self, vm_name, qcow2_image): + vmc.create( + { + "name": vm_name, + "image": qcow2_image, + "base_xml": _read_test_xml(), + "metadata": {"mykey": "myvalue"}, + } + ) + assert vmc.get_metadata(vm_name, "mykey") == "myvalue" + + def test_create_nones_filtered(self, vm_name, qcow2_image): + vmc.create( + { + "name": vm_name, + "image": qcow2_image, + "base_xml": _read_test_xml(), + "pinned_host": None, + "preferred_host": None, + } + ) + assert vm_name in vmc.list_vms() + + +# ── _create_xml ────────────────────────────────────────────────────── + + +class TestCreateXml: + def test_name_replaced(self): + xml = _read_test_xml() + result = vmc._create_xml(xml, "myvm") + assert "myvm" in result + + def test_uuid_replaced(self): + xml = _read_test_xml() + result = vmc._create_xml(xml, "myvm") + assert "7b48b1fe-066a-41a6-aef4-f0a9c028f719" not in result + assert "" in result + + def test_rbd_disk_added(self): + xml = _read_test_xml() + result = vmc._create_xml(xml, "myvm") + assert "rbd/system_myvm" in result + assert 'type="network"' in result + + def test_custom_disk_bus(self): + xml = _read_test_xml() + result = vmc._create_xml(xml, "myvm", target_disk_bus="scsi") + assert 'bus="scsi"' in result + + +# ── status ─────────────────────────────────────────────────────────── + + +class TestStatus: + def test_undefined_vm(self): + assert vmc.status("nonexistentvmxyz") == "Undefined" + + def test_disabled_vm(self, disabled_vm): + assert vmc.status(disabled_vm) == "Disabled" + + def test_started_vm(self, created_vm): + assert vmc.status(created_vm) == "Started" + + +# ── is_enabled ─────────────────────────────────────────────────────── + + +class TestIsEnabled: + def test_enabled(self, created_vm): + assert vmc.is_enabled(created_vm) is True + + def test_disabled(self, disabled_vm): + assert vmc.is_enabled(disabled_vm) is False + + def test_nonexistent(self): + assert vmc.is_enabled("nonexistentvmxyz") is False + + +# ── start / stop ───────────────────────────────────────────────────── + + +class TestStartStop: + def test_stop_running_vm(self, created_vm): + vmc.stop(created_vm) + assert vmc.status(created_vm) == "Stopped (disabled)" + + def test_start_stopped_vm(self, created_vm): + vmc.stop(created_vm) + vmc.start(created_vm) + assert vmc.status(created_vm) == "Started" + + def test_start_already_started(self, created_vm): + # Should not raise + vmc.start(created_vm) + assert vmc.status(created_vm) == "Started" + + def test_stop_already_stopped(self, created_vm): + vmc.stop(created_vm) + # Should not raise + vmc.stop(created_vm) + assert vmc.status(created_vm) == "Stopped (disabled)" + + def test_start_not_enabled_raises(self, disabled_vm): + with pytest.raises(Exception, match="not on the cluster"): + vmc.start(disabled_vm) + + def test_stop_not_enabled_raises(self, disabled_vm): + with pytest.raises(Exception, match="not on the cluster"): + vmc.stop(disabled_vm) + + +# ── enable_vm / disable_vm ─────────────────────────────────────────── + + +class TestEnableDisable: + def test_enable_disabled_vm(self, disabled_vm): + vmc.enable_vm(disabled_vm) + assert vmc.is_enabled(disabled_vm) is True + assert vmc.status(disabled_vm) == "Started" + + def test_enable_with_nostart(self, disabled_vm): + vmc.enable_vm(disabled_vm, nostart=True) + assert vmc.is_enabled(disabled_vm) is True + assert vmc.status(disabled_vm) == "Stopped (disabled)" + + def test_disable_running_vm(self, created_vm): + vmc.disable_vm(created_vm) + assert vmc.is_enabled(created_vm) is False + assert vmc.status(created_vm) == "Disabled" + + def test_disable_already_disabled(self, disabled_vm): + # Should not raise + vmc.disable_vm(disabled_vm) + + def test_enable_already_enabled(self, created_vm): + # Should not raise + vmc.enable_vm(created_vm) + assert vmc.status(created_vm) == "Started" + + +# ── remove ─────────────────────────────────────────────────────────── + + +class TestRemove: + def test_remove_enabled_vm(self, vm_name, qcow2_image): + vmc.create( + { + "name": vm_name, + "image": qcow2_image, + "base_xml": _read_test_xml(), + } + ) + vmc.remove(vm_name) + assert vmc.status(vm_name) == "Undefined" + + def test_remove_disabled_vm(self, vm_name, qcow2_image): + vmc.create( + { + "name": vm_name, + "image": qcow2_image, + "base_xml": _read_test_xml(), + "enable": False, + } + ) + vmc.remove(vm_name) + assert vmc.status(vm_name) == "Undefined" + + +# ── Snapshots ──────────────────────────────────────────────────────── + + +class TestSnapshots: + def test_create_snapshot(self, created_vm): + vmc.create_snapshot(created_vm, "snap1") + assert "snap1" in vmc.list_snapshots(created_vm) + + def test_remove_snapshot(self, created_vm): + vmc.create_snapshot(created_vm, "snap1") + vmc.remove_snapshot(created_vm, "snap1") + assert "snap1" not in vmc.list_snapshots(created_vm) + + def test_list_snapshots_empty(self, created_vm): + result = vmc.list_snapshots(created_vm) + assert isinstance(result, list) + + def test_create_duplicate_snapshot_raises(self, created_vm): + vmc.create_snapshot(created_vm, "snap1") + with pytest.raises(Exception, match="already exists"): + vmc.create_snapshot(created_vm, "snap1") + + def test_invalid_snapshot_name_raises(self, created_vm): + with pytest.raises(ValueError): + vmc.create_snapshot(created_vm, "snap-1") + + def test_rollback_snapshot(self, created_vm): + vmc.create_snapshot(created_vm, "snap1") + vmc.rollback_snapshot(created_vm, "snap1") + + def test_rollback_nonexistent_raises(self, created_vm): + with pytest.raises(Exception, match="does not exist"): + vmc.rollback_snapshot(created_vm, "nosuchsnap") + + def test_purge_all_snapshots(self, created_vm): + vmc.create_snapshot(created_vm, "snap1") + vmc.create_snapshot(created_vm, "snap2") + vmc.purge_image(created_vm) + assert len(vmc.list_snapshots(created_vm)) == 0 + + def test_purge_by_number(self, created_vm): + vmc.create_snapshot(created_vm, "snap1") + vmc.create_snapshot(created_vm, "snap2") + vmc.create_snapshot(created_vm, "snap3") + vmc.purge_image(created_vm, number=2) + snaps = vmc.list_snapshots(created_vm) + assert len(snaps) == 1 + + def test_purge_invalid_args(self): + with pytest.raises(ValueError, match="not datetime"): + vmc.purge_image("vm", date="2025-01-01") + + with pytest.raises(ValueError, match="positive integer"): + vmc.purge_image("vm", number=-1) + + +# ── Metadata ───────────────────────────────────────────────────────── + + +class TestMetadata: + def test_set_and_get_metadata(self, created_vm): + vmc.set_metadata(created_vm, "mykey", "myvalue") + assert vmc.get_metadata(created_vm, "mykey") == "myvalue" + + def test_list_metadata(self, created_vm): + vmc.set_metadata(created_vm, "key1", "val1") + metadata = vmc.list_metadata(created_vm) + assert isinstance(metadata, list) + assert "key1" in metadata + + def test_set_metadata_invalid_name_raises(self, created_vm): + with pytest.raises(ValueError): + vmc.set_metadata(created_vm, "bad-key", "value1") + + +# ── clone ──────────────────────────────────────────────────────────── + + +class TestClone: + def test_clone_vm(self, created_vm, second_vm_name): + vmc.clone( + { + "name": created_vm, + "dst_name": second_vm_name, + } + ) + assert second_vm_name in vmc.list_vms() + + def test_clone_same_name_raises(self): + with pytest.raises(ValueError, match="same name"): + vmc.clone({"name": "myvm", "dst_name": "myvm"}) + + def test_clone_invalid_dst_name_raises(self): + with pytest.raises(ValueError): + vmc.clone({"name": "myvm", "dst_name": "bad-name"}) + + def test_clone_invalid_metadata_type_raises(self): + with pytest.raises(ValueError, match="metadata"): + vmc.clone( + { + "name": "srcvm", + "dst_name": "dstvm", + "metadata": "not_a_dict", + } + ) + + +# ── add_to_cluster ─────────────────────────────────────────────────── + + +@pytest.fixture +def libvirt_vm(vm_name, qcow2_image): + """Define a VM in libvirt with a disk, for import tests.""" + xml_template = _read_test_xml() + root = ElementTree.fromstring(xml_template) + root.find("name").text = vm_name + # Add a disk device pointing to the qcow2 image + devices = root.find("devices") + disk = ElementTree.SubElement(devices, "disk", type="file", device="disk") + ElementTree.SubElement(disk, "driver", name="qemu", type="qcow2") + ElementTree.SubElement(disk, "source", file=qcow2_image) + ElementTree.SubElement(disk, "target", dev="vda", bus="virtio") + vm_xml = ElementTree.tostring(root, encoding="unicode") + with LibVirtManager() as lvm: + lvm.define(vm_xml) + yield vm_name + # Cleanup: undefine from libvirt if still present + try: + with LibVirtManager() as lvm: + if vm_name in lvm.list(): + lvm.undefine(vm_name) + except Exception: + pass + + +class TestAddToCluster: + def test_add_to_cluster(self, libvirt_vm, qcow2_image): + """Add a libvirt VM to the cluster.""" + vmc.add_to_cluster( + { + "name": libvirt_vm, + "image": qcow2_image, + } + ) + assert libvirt_vm in vmc.list_vms() + + def test_add_to_cluster_auto_disk(self, libvirt_vm): + """Add a libvirt VM without specifying image path.""" + vmc.add_to_cluster({"name": libvirt_vm}) + assert libvirt_vm in vmc.list_vms() + + def test_add_to_cluster_with_new_name( + self, libvirt_vm, second_vm_name, qcow2_image + ): + """Add a libvirt VM with a different target name.""" + vmc.add_to_cluster( + { + "name": libvirt_vm, + "new_name": second_vm_name, + "image": qcow2_image, + } + ) + assert second_vm_name in vmc.list_vms() + + def test_add_to_cluster_disabled(self, libvirt_vm, qcow2_image): + """Add a libvirt VM as disabled.""" + vmc.add_to_cluster( + { + "name": libvirt_vm, + "image": qcow2_image, + "enable": False, + } + ) + assert vmc.status(libvirt_vm) == "Disabled" + + def test_add_nonexistent_vm_raises(self): + """Add a VM that doesn't exist in libvirt.""" + with pytest.raises(Exception, match="does not exist"): + vmc.add_to_cluster( + { + "name": "nonexistentvmxyz", + "image": "/tmp/fake.qcow2", + } + ) + + def test_add_invalid_name_raises(self, libvirt_vm, qcow2_image): + """Add with an invalid target name.""" + with pytest.raises(ValueError): + vmc.add_to_cluster( + { + "name": libvirt_vm, + "new_name": "bad-name", + "image": qcow2_image, + } + ) + + def test_add_multi_disk_vm_raises(self, vm_name, qcow2_image): + """Add a VM with multiple disks raises an exception.""" + xml_template = _read_test_xml() + root = ElementTree.fromstring(xml_template) + root.find("name").text = vm_name + devices = root.find("devices") + for dev_name in ("vda", "vdb"): + disk = ElementTree.SubElement( + devices, "disk", type="file", device="disk" + ) + ElementTree.SubElement(disk, "driver", name="qemu", type="qcow2") + ElementTree.SubElement(disk, "source", file=qcow2_image) + ElementTree.SubElement(disk, "target", dev=dev_name, bus="virtio") + vm_xml = ElementTree.tostring(root, encoding="unicode") + with LibVirtManager() as lvm: + lvm.define(vm_xml) + try: + with pytest.raises(Exception, match="more than one disk"): + vmc.add_to_cluster( + { + "name": vm_name, + "image": qcow2_image, + } + ) + finally: + with LibVirtManager() as lvm: + if vm_name in lvm.list(): + lvm.undefine(vm_name) + + +# ── console ────────────────────────────────────────────────────────── + + +class TestConsole: + def test_console_nonexistent_vm(self): + with pytest.raises(SystemExit): + vmc.console("nonexistentvmxyz") From e98ed68e71cf20d8fe6ef03d0dd9f97b2b443916 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mathieu=20Dupr=C3=A9?= Date: Wed, 11 Mar 2026 15:16:09 +0100 Subject: [PATCH 2/2] README: add sections about tests and documentation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mathieu Dupré --- README.md | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/README.md b/README.md index 94d39bd..57942f7 100644 --- a/README.md +++ b/README.md @@ -24,3 +24,53 @@ sudo dnf install python3 python3-libvirt python3-rbd python3-flask-wtf ``` All theses dependencies are satisfied on a SEAPATH machine. + +## Tests + +The test suite uses pytest and requires libvirt/QEMU to be available on the host. + +### Install test dependencies + +```bash +pip install .[test] +``` + +### Run all tests + +```bash +pytest tests/ +``` + +### Run only standalone tests (no cluster required) + +The cluster tests (`test_vm_manager_cluster.py`) require a running Pacemaker/Ceph +cluster. To run only the standalone libvirt tests, exclude that file: + +```bash +pytest tests/ --ignore=tests/test_vm_manager_cluster.py +``` + +## Documentation + +The HTML documentation is generated with Sphinx. + +### Install documentation dependencies + +```bash +pip install .[docs] +``` + +### Build the documentation + +```bash +sphinx-build -b html docs docs/_build/html +``` + +The generated documentation is available at `docs/_build/html/index.html`. + +Alternatively, using cqfd (Docker-based build wrapper): + +```bash +cqfd init +cqfd -b docs +```