Introduce a framework for automated VM tests.

This commit is contained in:
r-vdp 2023-04-23 03:00:58 +02:00 committed by Ramses
parent 2f8a9ba967
commit 22684b6ed6
17 changed files with 2194 additions and 350 deletions

View file

@ -118,6 +118,7 @@
{
packages = {
inherit system-manager;
default = self.packages.${system}.system-manager;
};
@ -191,6 +192,10 @@
system-manager
system-manager-clippy
system-manager-test;
basicTest = import ./test/nix/modules {
inherit system;
system-manager = self;
};
};
}));
}

View file

@ -13,13 +13,11 @@ in
,
}:
let
pkgs = nixpkgs.legacyPackages.${system};
inherit (self.packages.${system}) system-manager;
# Module that sets additional module arguments
extraArgsModule = { lib, config, pkgs, ... }: {
_file = "lib.nix: extraArgsModule";
_module.args = {
pkgs = nixpkgs.legacyPackages.${system};
pkgs = nixpkgs.legacyPackages.${config.nixpkgs.hostPlatform};
utils = import "${nixos}/lib/utils.nix" {
inherit lib config pkgs;
};
@ -34,7 +32,10 @@ in
] ++ modules;
}).config;
# Get the system as it was defined in the modules.
system = config.nixpkgs.hostPlatform;
pkgs = nixpkgs.legacyPackages.${system};
inherit (self.packages.${system}) system-manager;
returnIfNoAssertions = drv:
let
@ -173,4 +174,229 @@ in
(linkFarmBinEntryFromDrv preActivationAssertionScript)
]
);
images = {
ubuntu_22_10_cloudimg = {
name = "ubuntu-22.10-server-cloudimg-amd64.img";
release = "20230302";
hash = "sha256-9hjGjlUQoXZfAYTwsEjHE3Zawd6qqrVc6VXshthNS44=";
};
ubuntu_20_04_cloudimg = {
name = "ubuntu-20.04-server-cloudimg-amd64.img";
release = "";
hash = "";
};
};
# Careful since we do not have the nix store yet when this service runs,
# so we cannot use pkgs.writeTest or pkgs.writeShellScript for instance,
# since their results would refer to the store
mount_store = { pkgs }:
pkgs.writeText "mount-store.service" ''
[Service]
Type = oneshot
ExecStart = mkdir -p /nix/.ro-store
ExecStart = mount -t 9p -o defaults,trans=virtio,version=9p2000.L,cache=loose nix-store /nix/.ro-store
ExecStart = mkdir -p -m 0755 /nix/.rw-store/ /nix/store
ExecStart = mount -t tmpfs tmpfs /nix/.rw-store
ExecStart = mkdir -p -m 0755 /nix/.rw-store/store /nix/.rw-store/work
ExecStart = mount -t overlay overlay /nix/store -o lowerdir=/nix/.ro-store,upperdir=/nix/.rw-store/store,workdir=/nix/.rw-store/work
[Install]
WantedBy = multi-user.target
'';
# Backdoor service that exposes a root shell through a socket to the test instrumentation framework
backdoor = { pkgs }:
pkgs.writeText "backdoor.service" ''
[Unit]
Requires = dev-hvc0.device dev-ttyS0.device mount-store.service
After = dev-hvc0.device dev-ttyS0.device mount-store.service
[Service]
ExecStart = ${pkgs.writeShellScript "backdoor-start-script" ''
set -euo pipefail
export USER=root
export HOME=/root
export DISPLAY=:0.0
# TODO: do we actually need to source /etc/profile ?
# Unbound vars cause the service to crash
#source /etc/profile
# Don't use a pager when executing backdoor
# actions. Because we use a tty, commands like systemctl
# or nix-store get confused into thinking they're running
# interactively.
export PAGER=
cd /tmp
exec < /dev/hvc0 > /dev/hvc0
while ! exec 2> /dev/ttyS0; do sleep 0.1; done
echo "connecting to host..." >&2
stty -F /dev/hvc0 raw -echo # prevent nl -> cr/nl conversion
# This line is essential since it signals to the test driver that the
# shell is ready.
# See: the connect method in the Machine class.
echo "Spawning backdoor root shell..."
# Passing the terminal device makes bash run non-interactively.
# Otherwise we get errors on the terminal because bash tries to
# setup things like job control.
PS1= exec /usr/bin/env bash --norc /dev/hvc0
''}
KillSignal = SIGHUP
[Install]
WantedBy = multi-user.target
'';
prepareUbuntuImage = { hostPkgs, nodeConfig, image }:
let
pkgs = hostPkgs;
img = pkgs.fetchurl {
inherit (image) hash;
url = "https://cloud-images.ubuntu.com/releases/kinetic/release-${image.release}/${image.name}";
};
in
pkgs.runCommand "configure-vm" { } ''
# We will modify the VM image, so we need a mutable copy
install -m777 ${img} ./img.qcow2
# Copy the service files here, since otherwise they end up in the VM
# wwith their paths including the nix hash
cp ${self.lib.backdoor { inherit pkgs; }} backdoor.service
cp ${self.lib.mount_store { inherit pkgs; }} mount-store.service
${lib.concatStringsSep " \\\n" [
"${pkgs.guestfs-tools}/bin/virt-customize"
"-a ./img.qcow2"
"--smp 2"
"--memsize 256"
"--no-network"
"--copy-in backdoor.service:/etc/systemd/system"
"--copy-in mount-store.service:/etc/systemd/system"
''--link ${nodeConfig.systemConfig}:/system-manager-profile''
"--run"
(pkgs.writeShellScript "run-script" ''
# Clear the root password
passwd -d root
# Don't spawn ttys on these devices, they are used for test instrumentation
systemctl mask serial-getty@ttyS0.service
systemctl mask serial-getty@hvc0.service
# Speed up the boot process
systemctl mask snapd.service
systemctl mask snapd.socket
systemctl mask snapd.seeded.service
systemctl enable backdoor.service
'')
]};
cp ./img.qcow2 $out
'';
make-vm-test =
{ system
, modules
}:
let
hostPkgs = nixpkgs.legacyPackages.${system};
config = (lib.evalModules {
specialArgs = { system-manager = self; };
modules = [
../test/nix/test-driver/modules
{
_file = "inline module in lib.nix";
inherit hostPkgs;
}
] ++ modules;
}).config;
nodes = map runVmScript (lib.attrValues config.nodes);
runVmScript = node:
# The test driver extracts the name of the node from the name of the
# VM script, so it's important here to stick to the naming scheme expected
# by the test driver.
hostPkgs.writeShellScript "run-${node.system.name}-vm" ''
set -eo pipefail
export PATH=${lib.makeBinPath [ hostPkgs.coreutils ]}''${PATH:+:}$PATH
# Create a directory for storing temporary data of the running VM.
if [ -z "$TMPDIR" ] || [ -z "$USE_TMPDIR" ]; then
TMPDIR=$(mktemp -d nix-vm.XXXXXXXXXX --tmpdir)
fi
# Create a directory for exchanging data with the VM.
mkdir -p "$TMPDIR/xchg"
cd "$TMPDIR"
# Start QEMU.
# We might need to be smarter about the QEMU binary to run when we want to
# support architectures other than x86_64.
# See qemu-common.nix in nixpkgs.
${lib.concatStringsSep "\\\n " [
"exec ${lib.getBin hostPkgs.qemu_test}/bin/qemu-kvm"
"-device virtio-rng-pci"
"-cpu max"
"-name ${node.system.name}"
"-m ${toString node.virtualisation.memorySize}"
"-smp ${toString node.virtualisation.cpus}"
"-drive file=${node.virtualisation.rootImage},format=qcow2"
"-device virtio-net-pci,netdev=net0"
"-netdev user,id=net0"
"-virtfs local,security_model=passthrough,id=fsdev1,path=/nix/store,readonly=on,mount_tag=nix-store"
(lib.concatStringsSep "\\\n "
(lib.mapAttrsToList
(tag: share: "-virtfs local,path=${share.source},security_model=none,mount_tag=${tag}")
node.virtualisation.sharedDirectories))
"-snapshot"
"-nographic"
"$QEMU_OPTS"
"$@"
]};
'';
# We vendor the test-driver until github.com/NixOS/nixpkgs#228220 gets merged
#test-driver = pkgs.callPackage "${nixpkgs}/nixos/lib/test-driver" { };
test-driver = hostPkgs.callPackage ../test/nixos-test-driver { };
runTest = { nodes, vlans, testScript, extraDriverArgs }: ''
${lib.getBin test-driver}/bin/nixos-test-driver \
${extraDriverArgs} \
--start-scripts ${lib.concatStringsSep " " nodes} \
--vlans ${lib.concatStringsSep " " vlans} \
-- ${hostPkgs.writeText "test-script" config.testScript}
'';
defaultTest = { extraDriverArgs ? "" }: runTest {
inherit extraDriverArgs nodes;
inherit (config) testScript;
vlans = [ "1" ];
};
in
hostPkgs.stdenv.mkDerivation (finalAttrs: {
name = "system-manager-vm-test";
requiredSystemFeatures = [ "kvm" "nixos-test" ];
buildCommand = ''
${defaultTest {}}
touch $out
'';
passthru = {
runVM = hostPkgs.writeShellScriptBin "run-vm"
(defaultTest {
extraDriverArgs = "--interactive";
});
};
});
}

View file

@ -20,6 +20,7 @@
hostPlatform = lib.mkOption {
type = types.str;
example = "x86_64-linux";
default = throw "the option nixpkgs.hostPlatform needs to be set.";
};
};
@ -131,5 +132,19 @@
'';
};
};
# Can we make sure that this does not get relaunched when activating a new profile?
# Otherwise we get an infinite loop.
systemd.services.reactivate-system-manager = {
enable = false;
# TODO should we activate earlier?
wantedBy = [ "multi-user.target" ];
serviceConfig = {
Type = "oneshot";
};
script = ''
/nix/var/nix/profiles/system-manager-profiles/system-manager/bin/activate
'';
};
};
}

View file

@ -125,6 +125,10 @@ in
config = {
systemd = {
targets.system-manager = {
wantedBy = [ "default.target" ];
};
timers =
lib.mapAttrs
(name: service:

256
test/nix/flake.lock generated
View file

@ -1,256 +0,0 @@
{
"nodes": {
"crane": {
"inputs": {
"flake-compat": [
"system-manager",
"pre-commit-hooks",
"flake-compat"
],
"flake-utils": [
"system-manager",
"flake-utils"
],
"nixpkgs": [
"system-manager",
"nixpkgs"
],
"rust-overlay": [
"system-manager",
"rust-overlay"
]
},
"locked": {
"lastModified": 1679811681,
"narHash": "sha256-gc0lwuQxtKnUzdyfmHT6wKEuSdvn5KlRcXNkWT1/4hs=",
"owner": "ipetkov",
"repo": "crane",
"rev": "72fa29510a9ce61ea7455b4469507808684f5673",
"type": "github"
},
"original": {
"owner": "ipetkov",
"repo": "crane",
"type": "github"
}
},
"devshell": {
"inputs": {
"flake-utils": [
"system-manager",
"flake-utils"
],
"nixpkgs": [
"system-manager",
"nixpkgs"
]
},
"locked": {
"lastModified": 1678957337,
"narHash": "sha256-Gw4nVbuKRdTwPngeOZQOzH/IFowmz4LryMPDiJN/ah4=",
"owner": "numtide",
"repo": "devshell",
"rev": "3e0e60ab37cd0bf7ab59888f5c32499d851edb47",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "devshell",
"type": "github"
}
},
"flake-compat": {
"flake": false,
"locked": {
"lastModified": 1673956053,
"narHash": "sha256-4gtG9iQuiKITOjNQQeQIpoIB6b16fm+504Ch3sNKLd8=",
"owner": "edolstra",
"repo": "flake-compat",
"rev": "35bb57c0c8d8b62bbfd284272c928ceb64ddbde9",
"type": "github"
},
"original": {
"owner": "edolstra",
"repo": "flake-compat",
"type": "github"
}
},
"flake-utils": {
"locked": {
"lastModified": 1678901627,
"narHash": "sha256-U02riOqrKKzwjsxc/400XnElV+UtPUQWpANPlyazjH0=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "93a2b84fc4b70d9e089d029deacc3583435c2ed6",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"gitignore": {
"inputs": {
"nixpkgs": [
"system-manager",
"pre-commit-hooks",
"nixpkgs"
]
},
"locked": {
"lastModified": 1660459072,
"narHash": "sha256-8DFJjXG8zqoONA1vXtgeKXy68KdJL5UaXR8NtVMUbx8=",
"owner": "hercules-ci",
"repo": "gitignore.nix",
"rev": "a20de23b925fd8264fd7fad6454652e142fd7f73",
"type": "github"
},
"original": {
"owner": "hercules-ci",
"repo": "gitignore.nix",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1679944645,
"narHash": "sha256-e5Qyoe11UZjVfgRfwNoSU57ZeKuEmjYb77B9IVW7L/M=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "4bb072f0a8b267613c127684e099a70e1f6ff106",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs-stable": {
"locked": {
"lastModified": 1678872516,
"narHash": "sha256-/E1YwtMtFAu2KUQKV/1+KFuReYPANM2Rzehk84VxVoc=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "9b8e5abb18324c7fe9f07cb100c3cd4a29cda8b8",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-22.11",
"repo": "nixpkgs",
"type": "github"
}
},
"pre-commit-hooks": {
"inputs": {
"flake-compat": "flake-compat",
"flake-utils": [
"system-manager",
"flake-utils"
],
"gitignore": "gitignore",
"nixpkgs": [
"system-manager",
"nixpkgs"
],
"nixpkgs-stable": "nixpkgs-stable"
},
"locked": {
"lastModified": 1678976941,
"narHash": "sha256-skNr08frCwN9NO+7I77MjOHHAw+L410/37JknNld+W4=",
"owner": "cachix",
"repo": "pre-commit-hooks.nix",
"rev": "32b1dbedfd77892a6e375737ef04d8efba634e9e",
"type": "github"
},
"original": {
"owner": "cachix",
"repo": "pre-commit-hooks.nix",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs",
"system-manager": "system-manager"
}
},
"rust-overlay": {
"inputs": {
"flake-utils": [
"system-manager",
"flake-utils"
],
"nixpkgs": [
"system-manager",
"nixpkgs"
]
},
"locked": {
"lastModified": 1679970108,
"narHash": "sha256-8OfySbY1hhBzj0Iz90k4se6oFCGS3+ke31vkd0d4k/o=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "26ef1a2029239e204e51ab3402f8aae5aa1187ed",
"type": "github"
},
"original": {
"owner": "oxalica",
"repo": "rust-overlay",
"type": "github"
}
},
"system-manager": {
"inputs": {
"crane": "crane",
"devshell": "devshell",
"flake-utils": "flake-utils",
"nixpkgs": [
"nixpkgs"
],
"pre-commit-hooks": "pre-commit-hooks",
"rust-overlay": "rust-overlay",
"treefmt-nix": "treefmt-nix"
},
"locked": {
"lastModified": 1680080819,
"narHash": "sha256-3p/85AhtTgS3DIctGMExHvf3ozStujge7E8/N1rxJQc=",
"owner": "numtide",
"repo": "system-manager",
"rev": "01073b251d1fc565174ff8d89324a5709dafda97",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "system-manager",
"type": "github"
}
},
"treefmt-nix": {
"inputs": {
"nixpkgs": [
"system-manager",
"nixpkgs"
]
},
"locked": {
"lastModified": 1679588014,
"narHash": "sha256-URkRSunu8HAp2vH2KgLogjAXckiufCDFrBs59g9uiLY=",
"owner": "numtide",
"repo": "treefmt-nix",
"rev": "af75d6efe437858f9ca5535e622cfbedad1ba717",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "treefmt-nix",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

View file

@ -1,19 +0,0 @@
{
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
system-manager = {
url = "github:numtide/system-manager";
inputs.nixpkgs.follows = "nixpkgs";
};
};
outputs = { system-manager, ... }:
{
systemConfigs.default = system-manager.lib.makeSystemConfig {
modules = [
./modules
];
};
};
}

View file

@ -1,8 +1,9 @@
{ lib
, pkgs
, ...
{ system-manager
, system
}:
{
let
testConfig = { lib, pkgs, ... }: {
config = {
nixpkgs.hostPlatform = "x86_64-linux";
@ -62,18 +63,13 @@
wants = [ "network-online.target" ];
after = [
"network-online.target"
"avahi-daemon.service"
"chrony.service"
"nss-lookup.target"
"tinc.service"
"pulseaudio.service"
];
serviceConfig = {
Type = "oneshot";
RemainAfterExit = true;
ExecReload = "${lib.getBin pkgs.coreutils}/bin/true";
};
wantedBy = [ "multi-user.target" ];
wantedBy = [ "system-manager.target" ];
requiredBy = lib.mkIf (ix > 5) [ "service-0.service" ];
script = ''
sleep ${if ix > 5 then "2" else "1"}
@ -81,4 +77,45 @@
})
);
};
};
in
system-manager.lib.make-vm-test {
inherit system;
modules = [
({ config, ... }:
let
inherit (config) hostPkgs;
in
{
nodes = {
node1 = { config, ... }: {
modules = [
testConfig
];
virtualisation.rootImage = system-manager.lib.prepareUbuntuImage {
inherit hostPkgs;
nodeConfig = config;
image = system-manager.lib.images.ubuntu_22_10_cloudimg;
};
};
};
testScript = ''
# Start all machines in parallel
start_all()
node1.wait_for_unit("default.target")
node1.execute("/system-manager-profile/bin/activate")
node1.wait_for_unit("system-manager.target")
node1.wait_for_unit("service-9.service")
node1.wait_for_file("/etc/baz/bar/foo2")
node1.wait_for_file("/etc/foo.conf")
node1.succeed("grep -F 'launch_the_rockets = true' /etc/foo.conf")
node1.fail("grep -F 'launch_the_rockets = false' /etc/foo.conf")
'';
})
];
}

View file

@ -0,0 +1,107 @@
{ lib, system-manager, ... }:
let
inherit (lib) types;
pkgsType = lib.mkOptionType {
name = "nixpkgs";
description = "An evaluation of Nixpkgs; the top level attribute set of packages";
check = builtins.isAttrs;
};
nodeOptions = { config, name, ... }: {
options = {
system.name = lib.mkOption {
type = types.str;
default = name;
};
modules = lib.mkOption {
# TODO: can we give a better type here?
# We want a list of system-manager modules
type = types.listOf types.raw;
};
systemConfig = lib.mkOption {
# TODO figure out correct type
type = types.raw;
internal = true;
readOnly = true;
};
virtualisation = {
rootImage = lib.mkOption {
# TODO: figure out the correct type.
type = types.raw;
};
memorySize = lib.mkOption {
type = types.ints.between 256 (1024 * 128);
default = 1024;
};
cpus = lib.mkOption {
type = types.ints.between 1 1024;
default = 2;
};
vlans = lib.mkOption {
type = types.ints.between 1 1024;
default = 1;
};
sharedDirectories = lib.mkOption {
type = types.attrsOf
(types.submodule {
options = {
source = lib.mkOption {
type = types.str;
};
target = lib.mkOption {
type = types.str;
};
};
});
default = { };
};
};
};
config = {
# Include these shared directories by default, they are used by the test driver.
virtualisation.sharedDirectories = {
xchg = {
source = ''"$TMPDIR"/xchg'';
target = "/tmp/xchg";
};
shared = {
source = ''"''${SHARED_DIR:-$TMPDIR/xchg}"'';
target = "/tmp/shared";
};
};
systemConfig = system-manager.lib.makeSystemConfig {
inherit (config) modules;
};
};
};
in
{
options = {
# TODO: figure out correct type
hostPkgs = lib.mkOption {
type = pkgsType;
};
nodes = lib.mkOption {
type = types.attrsOf (types.submodule nodeOptions);
default = { };
};
testScript = lib.mkOption {
type = types.str;
};
};
}

View file

@ -0,0 +1,44 @@
{ lib
, python3Packages
, enableOCR ? false
, qemu_pkg ? qemu_test
, coreutils
, imagemagick_light
, libtiff
, netpbm
, qemu_test
, socat
, tesseract4
, vde2
, extraPythonPackages ? (_: [ ])
}:
python3Packages.buildPythonApplication rec {
pname = "nixos-test-driver";
version = "1.1";
src = ./.;
propagatedBuildInputs = [
coreutils
netpbm
python3Packages.colorama
python3Packages.ptpython
qemu_pkg
socat
vde2
]
++ (lib.optionals enableOCR [ imagemagick_light tesseract4 ])
++ extraPythonPackages python3Packages;
doCheck = true;
nativeCheckInputs = with python3Packages; [ mypy pylint black ];
checkPhase = ''
mypy --disallow-untyped-defs \
--no-implicit-optional \
--pretty \
--no-color-output \
--ignore-missing-imports ${src}/test_driver
pylint --errors-only --enable=unused-import ${src}/test_driver
black --check --diff ${src}/test_driver
'';
}

View file

@ -0,0 +1,14 @@
from setuptools import setup, find_packages
setup(
name="nixos-test-driver",
version="1.1",
packages=find_packages(),
package_data={"test_driver": ["py.typed"]},
entry_points={
"console_scripts": [
"nixos-test-driver=test_driver:main",
"generate-driver-symbols=test_driver:generate_driver_symbols",
]
},
)

View file

@ -0,0 +1,126 @@
from pathlib import Path
import argparse
import ptpython.repl
import os
import time
from test_driver.logger import rootlog
from test_driver.driver import Driver
class EnvDefault(argparse.Action):
"""An argpars Action that takes values from the specified
environment variable as the flags default value.
"""
def __init__(self, envvar, required=False, default=None, nargs=None, **kwargs): # type: ignore
if not default and envvar:
if envvar in os.environ:
if nargs is not None and (nargs.isdigit() or nargs in ["*", "+"]):
default = os.environ[envvar].split()
else:
default = os.environ[envvar]
kwargs["help"] = (
kwargs["help"] + f" (default from environment: {default})"
)
if required and default:
required = False
super(EnvDefault, self).__init__(
default=default, required=required, nargs=nargs, **kwargs
)
def __call__(self, parser, namespace, values, option_string=None): # type: ignore
setattr(namespace, self.dest, values)
def writeable_dir(arg: str) -> Path:
"""Raises an ArgumentTypeError if the given argument isn't a writeable directory
Note: We want to fail as early as possible if a directory isn't writeable,
since an executed nixos-test could fail (very late) because of the test-driver
writing in a directory without proper permissions.
"""
path = Path(arg)
if not path.is_dir():
raise argparse.ArgumentTypeError(f"{path} is not a directory")
if not os.access(path, os.W_OK):
raise argparse.ArgumentTypeError(f"{path} is not a writeable directory")
return path
def main() -> None:
arg_parser = argparse.ArgumentParser(prog="nixos-test-driver")
arg_parser.add_argument(
"-K",
"--keep-vm-state",
help="re-use a VM state coming from a previous run",
action="store_true",
)
arg_parser.add_argument(
"-I",
"--interactive",
help="drop into a python repl and run the tests interactively",
action=argparse.BooleanOptionalAction,
)
arg_parser.add_argument(
"--start-scripts",
metavar="START-SCRIPT",
action=EnvDefault,
envvar="startScripts",
nargs="*",
help="start scripts for participating virtual machines",
)
arg_parser.add_argument(
"--vlans",
metavar="VLAN",
action=EnvDefault,
envvar="vlans",
nargs="*",
help="vlans to span by the driver",
)
arg_parser.add_argument(
"-o",
"--output_directory",
help="""The path to the directory where outputs copied from the VM will be placed.
By e.g. Machine.copy_from_vm or Machine.screenshot""",
default=Path.cwd(),
type=writeable_dir,
)
arg_parser.add_argument(
"testscript",
action=EnvDefault,
envvar="testScript",
help="the test script to run",
type=Path,
)
args = arg_parser.parse_args()
if not args.keep_vm_state:
rootlog.info("Machine state will be reset. To keep it, pass --keep-vm-state")
with Driver(
args.start_scripts,
args.vlans,
args.testscript.read_text(),
args.output_directory.resolve(),
args.keep_vm_state,
) as driver:
if args.interactive:
ptpython.repl.embed(driver.test_symbols(), {})
else:
tic = time.time()
driver.run_tests()
toc = time.time()
rootlog.info(f"test script finished in {(toc-tic):.2f}s")
def generate_driver_symbols() -> None:
"""
This generates a file with symbols of the test-driver code that can be used
in user's test scripts. That list is then used by pyflakes to lint those
scripts.
"""
d = Driver([], [], "", Path())
test_symbols = d.test_symbols()
with open("driver-symbols", "w") as fp:
fp.write(",".join(test_symbols.keys()))

View file

@ -0,0 +1,235 @@
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Dict, Iterator, List, Union, Optional, Callable, ContextManager
import os
import tempfile
from test_driver.logger import rootlog
from test_driver.machine import Machine, NixStartScript, retry
from test_driver.vlan import VLan
from test_driver.polling_condition import PollingCondition
def get_tmp_dir() -> Path:
"""Returns a temporary directory that is defined by TMPDIR, TEMP, TMP or CWD
Raises an exception in case the retrieved temporary directory is not writeable
See https://docs.python.org/3/library/tempfile.html#tempfile.gettempdir
"""
tmp_dir = Path(tempfile.gettempdir())
tmp_dir.mkdir(mode=0o700, exist_ok=True)
if not tmp_dir.is_dir():
raise NotADirectoryError(
f"The directory defined by TMPDIR, TEMP, TMP or CWD: {tmp_dir} is not a directory"
)
if not os.access(tmp_dir, os.W_OK):
raise PermissionError(
f"The directory defined by TMPDIR, TEMP, TMP, or CWD: {tmp_dir} is not writeable"
)
return tmp_dir
class Driver:
"""A handle to the driver that sets up the environment
and runs the tests"""
tests: str
vlans: List[VLan]
machines: List[Machine]
polling_conditions: List[PollingCondition]
def __init__(
self,
start_scripts: List[str],
vlans: List[int],
tests: str,
out_dir: Path,
keep_vm_state: bool = False,
):
self.tests = tests
self.out_dir = out_dir
tmp_dir = get_tmp_dir()
with rootlog.nested("start all VLans"):
vlans = list(set(vlans))
self.vlans = [VLan(nr, tmp_dir) for nr in vlans]
def cmd(scripts: List[str]) -> Iterator[NixStartScript]:
for s in scripts:
yield NixStartScript(s)
self.polling_conditions = []
self.machines = [
Machine(
start_command=cmd,
keep_vm_state=keep_vm_state,
name=cmd.machine_name,
tmp_dir=tmp_dir,
callbacks=[self.check_polling_conditions],
out_dir=self.out_dir,
)
for cmd in cmd(start_scripts)
]
def __enter__(self) -> "Driver":
return self
def __exit__(self, *_: Any) -> None:
with rootlog.nested("cleanup"):
for machine in self.machines:
machine.release()
def subtest(self, name: str) -> Iterator[None]:
"""Group logs under a given test name"""
with rootlog.nested("subtest: " + name):
try:
yield
return True
except Exception as e:
rootlog.error(f'Test "{name}" failed with error: "{e}"')
raise e
def test_symbols(self) -> Dict[str, Any]:
@contextmanager
def subtest(name: str) -> Iterator[None]:
return self.subtest(name)
general_symbols = dict(
start_all=self.start_all,
test_script=self.test_script,
machines=self.machines,
vlans=self.vlans,
driver=self,
log=rootlog,
os=os,
create_machine=self.create_machine,
subtest=subtest,
run_tests=self.run_tests,
join_all=self.join_all,
retry=retry,
serial_stdout_off=self.serial_stdout_off,
serial_stdout_on=self.serial_stdout_on,
polling_condition=self.polling_condition,
Machine=Machine, # for typing
)
machine_symbols = {m.name: m for m in self.machines}
# If there's exactly one machine, make it available under the name
# "machine", even if it's not called that.
if len(self.machines) == 1:
(machine_symbols["machine"],) = self.machines
vlan_symbols = {
f"vlan{v.nr}": self.vlans[idx] for idx, v in enumerate(self.vlans)
}
print(
"additionally exposed symbols:\n "
+ ", ".join(map(lambda m: m.name, self.machines))
+ ",\n "
+ ", ".join(map(lambda v: f"vlan{v.nr}", self.vlans))
+ ",\n "
+ ", ".join(list(general_symbols.keys()))
)
return {**general_symbols, **machine_symbols, **vlan_symbols}
def test_script(self) -> None:
"""Run the test script"""
with rootlog.nested("run the VM test script"):
symbols = self.test_symbols() # call eagerly
exec(self.tests, symbols, None)
def run_tests(self) -> None:
"""Run the test script (for non-interactive test runs)"""
self.test_script()
# TODO: Collect coverage data
for machine in self.machines:
if machine.is_up():
machine.execute("sync")
def start_all(self) -> None:
"""Start all machines"""
with rootlog.nested("start all VMs"):
for machine in self.machines:
machine.start()
def join_all(self) -> None:
"""Wait for all machines to shut down"""
with rootlog.nested("wait for all VMs to finish"):
for machine in self.machines:
machine.wait_for_shutdown()
def create_machine(self, args: Dict[str, Any]) -> Machine:
rootlog.warning(
"Using legacy create_machine(), please instantiate the"
"Machine class directly, instead"
)
tmp_dir = get_tmp_dir()
if args.get("startCommand"):
start_command: str = args.get("startCommand", "")
cmd = NixStartScript(start_command)
name = args.get("name", cmd.machine_name)
else:
cmd = Machine.create_startcommand(args) # type: ignore
name = args.get("name", "machine")
return Machine(
tmp_dir=tmp_dir,
out_dir=self.out_dir,
start_command=cmd,
name=name,
keep_vm_state=args.get("keep_vm_state", False),
)
def serial_stdout_on(self) -> None:
rootlog._print_serial_logs = True
def serial_stdout_off(self) -> None:
rootlog._print_serial_logs = False
def check_polling_conditions(self) -> None:
for condition in self.polling_conditions:
condition.maybe_raise()
def polling_condition(
self,
fun_: Optional[Callable] = None,
*,
seconds_interval: float = 2.0,
description: Optional[str] = None,
) -> Union[Callable[[Callable], ContextManager], ContextManager]:
driver = self
class Poll:
def __init__(self, fun: Callable):
self.condition = PollingCondition(
fun,
seconds_interval,
description,
)
def __enter__(self) -> None:
driver.polling_conditions.append(self.condition)
def __exit__(self, a, b, c) -> None: # type: ignore
res = driver.polling_conditions.pop()
assert res is self.condition
def wait(self, timeout: int = 900) -> None:
def condition(last: bool) -> bool:
if last:
rootlog.info(f"Last chance for {self.condition.description}")
ret = self.condition.check(force=True)
if not ret and not last:
rootlog.info(
f"({self.condition.description} failure not fatal yet)"
)
return ret
with rootlog.nested(f"waiting for {self.condition.description}"):
retry(condition, timeout=timeout)
if fun_ is None:
return Poll
else:
return Poll(fun_)

View file

@ -0,0 +1,103 @@
from colorama import Style, Fore
from contextlib import contextmanager
from typing import Any, Dict, Iterator
from queue import Queue, Empty
from xml.sax.saxutils import XMLGenerator
import codecs
import os
import sys
import time
import unicodedata
class Logger:
def __init__(self) -> None:
self.logfile = os.environ.get("LOGFILE", "/dev/null")
self.logfile_handle = codecs.open(self.logfile, "wb")
self.xml = XMLGenerator(self.logfile_handle, encoding="utf-8")
self.queue: "Queue[Dict[str, str]]" = Queue()
self.xml.startDocument()
self.xml.startElement("logfile", attrs={})
self._print_serial_logs = True
@staticmethod
def _eprint(*args: object, **kwargs: Any) -> None:
print(*args, file=sys.stderr, **kwargs)
def close(self) -> None:
self.xml.endElement("logfile")
self.xml.endDocument()
self.logfile_handle.close()
def sanitise(self, message: str) -> str:
return "".join(ch for ch in message if unicodedata.category(ch)[0] != "C")
def maybe_prefix(self, message: str, attributes: Dict[str, str]) -> str:
if "machine" in attributes:
return f"{attributes['machine']}: {message}"
return message
def log_line(self, message: str, attributes: Dict[str, str]) -> None:
self.xml.startElement("line", attributes)
self.xml.characters(message)
self.xml.endElement("line")
def info(self, *args, **kwargs) -> None: # type: ignore
self.log(*args, **kwargs)
def warning(self, *args, **kwargs) -> None: # type: ignore
self.log(*args, **kwargs)
def error(self, *args, **kwargs) -> None: # type: ignore
self.log(*args, **kwargs)
sys.exit(1)
def log(self, message: str, attributes: Dict[str, str] = {}) -> None:
self._eprint(self.maybe_prefix(message, attributes))
self.drain_log_queue()
self.log_line(message, attributes)
def log_serial(self, message: str, machine: str) -> None:
self.enqueue({"msg": message, "machine": machine, "type": "serial"})
if self._print_serial_logs:
self._eprint(Style.DIM + f"{machine} # {message}" + Style.RESET_ALL)
def enqueue(self, item: Dict[str, str]) -> None:
self.queue.put(item)
def drain_log_queue(self) -> None:
try:
while True:
item = self.queue.get_nowait()
msg = self.sanitise(item["msg"])
del item["msg"]
self.log_line(msg, item)
except Empty:
pass
@contextmanager
def nested(self, message: str, attributes: Dict[str, str] = {}) -> Iterator[None]:
self._eprint(
self.maybe_prefix(
Style.BRIGHT + Fore.GREEN + message + Style.RESET_ALL, attributes
)
)
self.xml.startElement("nest", attrs={})
self.xml.startElement("head", attributes)
self.xml.characters(message)
self.xml.endElement("head")
tic = time.time()
self.drain_log_queue()
yield
self.drain_log_queue()
toc = time.time()
self.log(f"(finished: {message}, in {toc - tic:.2f} seconds)")
self.xml.endElement("nest")
rootlog = Logger()

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,92 @@
from typing import Callable, Optional
from math import isfinite
import time
from .logger import rootlog
class PollingConditionFailed(Exception):
pass
class PollingCondition:
condition: Callable[[], bool]
seconds_interval: float
description: Optional[str]
last_called: float
entry_count: int
def __init__(
self,
condition: Callable[[], Optional[bool]],
seconds_interval: float = 2.0,
description: Optional[str] = None,
):
self.condition = condition # type: ignore
self.seconds_interval = seconds_interval
if description is None:
if condition.__doc__:
self.description = condition.__doc__
else:
self.description = condition.__name__
else:
self.description = str(description)
self.last_called = float("-inf")
self.entry_count = 0
def check(self, force: bool = False) -> bool:
if (self.entered or not self.overdue) and not force:
return True
with self, rootlog.nested(self.nested_message):
time_since_last = time.monotonic() - self.last_called
last_message = (
f"Time since last: {time_since_last:.2f}s"
if isfinite(time_since_last)
else "(not called yet)"
)
rootlog.info(last_message)
try:
res = self.condition() # type: ignore
except Exception:
res = False
res = res is None or res
rootlog.info(self.status_message(res))
return res
def maybe_raise(self) -> None:
if not self.check():
raise PollingConditionFailed(self.status_message(False))
def status_message(self, status: bool) -> str:
return f"Polling condition {'succeeded' if status else 'failed'}: {self.description}"
@property
def nested_message(self) -> str:
nested_message = ["Checking polling condition"]
if self.description is not None:
nested_message.append(repr(self.description))
return " ".join(nested_message)
@property
def overdue(self) -> bool:
return self.last_called + self.seconds_interval < time.monotonic()
@property
def entered(self) -> bool:
# entry_count should never dip *below* zero
assert self.entry_count >= 0
return self.entry_count > 0
def __enter__(self) -> None:
self.entry_count += 1
def __exit__(self, exc_type, exc_value, traceback) -> None: # type: ignore
assert self.entered
self.entry_count -= 1
self.last_called = time.monotonic()

View file

@ -0,0 +1,62 @@
from pathlib import Path
import io
import os
import pty
import subprocess
from test_driver.logger import rootlog
class VLan:
"""This class handles a VLAN that the run-vm scripts identify via its
number handles. The network's lifetime equals the object's lifetime.
"""
nr: int
socket_dir: Path
process: subprocess.Popen
pid: int
fd: io.TextIOBase
def __repr__(self) -> str:
return f"<Vlan Nr. {self.nr}>"
def __init__(self, nr: int, tmp_dir: Path):
self.nr = nr
self.socket_dir = tmp_dir / f"vde{self.nr}.ctl"
# TODO: don't side-effect environment here
os.environ[f"QEMU_VDE_SOCKET_{self.nr}"] = str(self.socket_dir)
rootlog.info("start vlan")
pty_master, pty_slave = pty.openpty()
# The --hub is required for the scenario determined by
# nixos/tests/networking.nix vlan-ping.
# VLAN Tagged traffic (802.1Q) seams to be blocked if a vde_switch is
# used without the hub mode (flood packets to all ports).
self.process = subprocess.Popen(
["vde_switch", "-s", self.socket_dir, "--dirmode", "0700", "--hub"],
stdin=pty_slave,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False,
)
self.pid = self.process.pid
self.fd = os.fdopen(pty_master, "w")
self.fd.write("version\n")
# TODO: perl version checks if this can be read from
# an if not, dies. we could hang here forever. Fix it.
assert self.process.stdout is not None
self.process.stdout.readline()
if not (self.socket_dir / "ctl").exists():
rootlog.error("cannot start vde_switch")
rootlog.info(f"running vlan (pid {self.pid}; ctl {self.socket_dir})")
def __del__(self) -> None:
rootlog.info(f"kill vlan (pid {self.pid})")
self.fd.close()
self.process.terminate()