Source code for kiso.task

"""Main Kiso task implementations."""

# ruff: noqa: ARG001
from __future__ import annotations

import contextlib
import copy
import io
import logging
import shutil
import subprocess
import tempfile
from collections import Counter, defaultdict
from dataclasses import fields
from functools import wraps
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, TypeVar

import enoslib as en
import yaml
from dacite import from_dict
from enoslib.objects import Host, Networks, Roles
from enoslib.task import Environment, enostask
from jsonschema.validators import validator_for
from jsonschema_pyref import RefResolver, ValidationError
from rich.console import Console

import kiso.constants as const
from kiso import display, edge, utils
from kiso.configuration import Deployment, Kiso, Software
from kiso.errors import KisoError
from kiso.log import get_process_pool_executor
from kiso.schema import SCHEMA
from kiso.version import __version__

if TYPE_CHECKING:
    from os import PathLike

    from enoslib.infra.provider import Provider

    from kiso.configuration import ExperimentTypes


T = TypeVar("T")

PROVIDER_MAP: dict[str, tuple[Callable[[dict], Any], Callable[..., Any]]] = {}

log = logging.getLogger("kiso")

console = Console()

if hasattr(en, "Vagrant"):
    log.debug("Vagrant provider is available")
    PROVIDER_MAP["vagrant"] = (en.VagrantConf.from_dictionary, en.Vagrant)
if hasattr(en, "CBM"):
    log.debug("Chameleon Bare Metal provider is available")

    PROVIDER_MAP["chameleon"] = (en.CBMConf.from_dictionary, en.CBM)
if hasattr(en, "ChameleonEdge"):
    log.debug("Chameleon Edge provider is available")

    PROVIDER_MAP["chameleon-edge"] = (
        en.ChameleonEdgeConf.from_dictionary,
        en.ChameleonEdge,
    )
if hasattr(en, "Fabric"):
    log.debug("FABRIC provider is available")

    PROVIDER_MAP["fabric"] = (en.FabricConf.from_dictionary, en.Fabric)


def validate_config(func: Callable[..., T]) -> Callable[..., T]:
    """Decorator to validate the experiment configuration against a predefined schema.

    Validates the experiment configuration by checking it against the Kiso experiment
    configuration schema. Supports configuration passed as a dictionary or a file path.

    :param func: The function to be decorated, which will receive the experiment
    configuration
    :type func: Callable[..., T]
    :return: A wrapped function that validates the configuration before executing the
    original function
    :rtype: Callable[..., T]
    :raises ValidationError: if the configuration is invalid
    """

    @wraps(func)
    def wrapper(experiment_config: PathLike | dict, *args: Any, **kwargs: Any) -> T:  # noqa: ANN401
        log.debug("Check Kiso experiment configuration")
        if isinstance(experiment_config, dict):
            config = experiment_config
            wd = Path.cwd().resolve()
        else:
            wd = Path(experiment_config).parent.resolve()
            with Path(experiment_config).open() as _experiment_config:
                config = yaml.safe_load(_experiment_config)

        try:
            validator_cls = validator_for(SCHEMA)
            validator = validator_cls(SCHEMA, resolver=RefResolver.from_schema(SCHEMA))
            errors = []
            for error in validator.iter_errors(
                _replace_labels_key_with_roles_key(config)
            ):
                log.error(error)
                errors.append(error)
            if errors:
                raise ValidationError("JSON Schema Validation Error", errors)

            # Convert the JSON configuration to a :py:class:`dataclasses.dataclass`
            kiso_config = from_dict(Kiso, config)

            console.rule("[bold green]Check experiment configuration[/bold green]")
            log.debug("Check only one vagrant site is present in the experiment")
            label_to_machines: Roles = _get_defined_machines(kiso_config)

            _check_software(kiso_config.software, label_to_machines)
            _check_deployed_software(kiso_config.deployment, label_to_machines)
            _check_experiments(kiso_config, label_to_machines)
        except ValidationError:
            log.exception("Invalid Kiso experiment config <%s>", experiment_config)
            raise

        log.debug("Kiso experiment configuration is valid")
        return func(kiso_config, *args, wd=wd, **kwargs)

    return wrapper


def check_provisioned(func: Callable[..., T]) -> Callable[..., T]:
    """Decorator to check that the resources were provisioned.

    :param func: The function to be decorated, which will receive the experiment
    configuration
    :type func: Callable[..., T]
    :return: A wrapped function that validates the configuration before executing the
    original function
    :rtype: Callable[..., T]
    """

    @wraps(func)
    def wrapper(experiment_config: Kiso, *args: Any, **kwargs: Any) -> T:  # noqa: ANN401
        is_provisioned = False
        env = kwargs.get("env")
        if env and env.get("providers"):
            is_provisioned = True

        if is_provisioned is False:
            raise KisoError(
                "No providers found, resources were not provisioned. "
                "Suggestion: Run `kiso up` first."
            )

        return func(experiment_config, *args, **kwargs)

    return wrapper


def _replace_labels_key_with_roles_key(experiment_config: Kiso | dict) -> dict:
    """Replace labels with roles in the experiment configuration."""
    experiment_config = copy.deepcopy(experiment_config)
    sites = (
        experiment_config["sites"]
        if isinstance(experiment_config, dict)
        else experiment_config.sites
    )
    for site in sites:
        for machine in site["resources"]["machines"]:
            machine["roles"] = machine["labels"]
            del machine["labels"]

        for network in site["resources"].get("networks", []):
            if isinstance(network, str):
                continue

            network["roles"] = network["labels"]
            del network["labels"]

    return experiment_config


[docs] @validate_config def check(experiment_config: Kiso, **kwargs: dict) -> None: """Check the experiment configuration for various validation criteria. This function performs multiple validation checks on the experiment configuration, including: - Verifying vagrant site constraints - Validating label definitions - Checking docker and HTCondor configurations - Ensuring proper node configurations - Validating input file locations - Performing EnOSlib platform checks :param experiment_config: The experiment configuration dictionary :type experiment_config: Kiso :param kwargs: Additional keyword arguments :type kwargs: dict """ log.debug("Check EnOSlib") en.MOTD = en.INFO = "" en.check(platform_filter=["Vagrant", "Fabric", "Chameleon", "ChameleonEdge"]) _show_rysnc_warning(experiment_config.sites)
def _get_defined_machines(experiment_config: Kiso) -> Roles: """Get the defined machines from the experiment configuration. Extracts and counts labels defined in the sites section of the experiment configuration. Validates that only one Vagrant site is present and generates additional label variants. :param experiment_config: Configuration dictionary containing site and resource definitions :type experiment_config: Kiso :raises ValueError: If multiple Vagrant sites are detected :return: A counter of defined labels with their counts :rtype: Roles """ vagrant_sites = 0 def_labels: Counter = Counter() label_to_machines: Roles = defaultdict(set) for site_index, site in enumerate(experiment_config.sites): if site["kind"] == "vagrant": vagrant_sites += 1 for machine_index, machine in enumerate(site["resources"]["machines"]): def_labels.update({site["kind"]: machine.get("number", 1)}) for label in machine["labels"]: def_labels.update({label: machine.get("number", 1)}) for index in range(machine.get("number", 1)): machine_key = Host( f"site-{site_index}-machine-{machine_index}-index-{index}" ) label_to_machines[site["kind"]].add(machine_key) for label in machine["labels"]: label_to_machines[label].add(machine_key) else: if vagrant_sites > 1: raise ValueError("Multiple vagrant sites are not supported") extra_labels = {} for label, count in def_labels.items(): machines = list(label_to_machines[label]) for index in range(1, count + 1): extra_labels[f"kiso.{label}.{index}"] = 1 label_to_machines[f"kiso.{label}.{index}"].add(machines[index - 1]) return label_to_machines def _check_software(softwares: Software, label_to_machines: dict[str, set]) -> None: """Check software configuration.""" if softwares is None: return for software in fields(Software): config = getattr(softwares, software.name, None) if config is None: continue # Get the `name` of the software name = software.name # Locate the EntryPoint for the software `name` and load it cls = utils.get_software(name) # Instantiate the installer class. obj = cls( config # Software configuration ) obj.check(label_to_machines) def _check_deployed_software( deployments: Deployment, label_to_machines: dict[str, set] ) -> None: """Check software deployment configuration.""" if deployments is None: return for deployment in fields(Deployment): config = getattr(deployments, deployment.name, None) if config is None: continue # Get the `name` of the software name = deployment.name # Locate the EntryPoint for the software `name` and load it cls = utils.get_deployment(name) # Instantiate the installer class. obj = cls( config # Deployment configuration ) obj.check(label_to_machines) def _check_experiments( experiment_config: Kiso, label_to_machines: dict[str, set] ) -> None: """Check the experiment configurations defined in the experiment config.""" experiments = experiment_config.experiments if experiments is None: return variables = copy.deepcopy(experiment_config.variables, {}) for index, experiment in enumerate(experiments): # Get the `kind` of experiment kind = experiment.kind # Locate the EntryPoint for the runner `kind` of experiment and load it cls = utils.get_runner(kind) # Instantiate the runner class. runner = cls( experiment, index, variables=variables, # Variables defined globally for the experiment ) runner.check(experiment_config, label_to_machines)
[docs] @validate_config @enostask(new=True, symlink=False) def up( experiment_config: Kiso, force: bool = False, env: Environment = None, **kwargs: Any, # noqa: ANN401 ) -> None: """Create and set up resources for running an experiment. Initializes the experiment environment, sets up working directories, and prepares infrastructure by initializing sites, installing Docker, Apptainer, and HTCondor across specified labels. :param experiment_config: Configuration dictionary defining experiment parameters :type experiment_config: Kiso :param force: Force recreation of resources, defaults to False :type force: bool, optional :param env: Optional environment context for the experiment, defaults to None :type env: Environment, optional """ console.rule( "[bold green]Create and set up resources for the experiments[/bold green]" ) env["version"] = __version__ env["wd"] = str(kwargs.get("wd", Path.cwd())) env["remote_wd"] = str(Path("~kiso") / Path(env["wd"]).name) experiment_config = _replace_labels_key_with_roles_key(experiment_config) _init_sites(experiment_config, env, force) _install_commons(env) _install_software(experiment_config, env) _install_deployed_software(experiment_config, env) _show_rysnc_warning(experiment_config.sites)
def _init_sites( experiment_config: Kiso, env: Environment, force: bool = False ) -> tuple[list[Provider], Roles, Networks]: """Initialize sites for an experiment. Initializes and configures sites from the experiment configuration using parallel processing. Performs the following key tasks: - Initializes providers for each site concurrently - Aggregates labels and networks from initialized sites - Extends labels with daemon-to-site mappings - Determines public IP requirements :param experiment_config: Configuration dictionary containing site definitions :type experiment_config: Kiso :param env: Environment context for the experiment :type env: Environment :param force: Force recreation of resources, defaults to False :type force: bool, optional :return: A tuple of providers, labels, and networks for the experiment :rtype: tuple[list[Provider], Roles, Networks] """ log.debug("Initializing sites") providers = [] labels = Roles() networks = Networks() with get_process_pool_executor() as executor: futures = [ executor.submit(_init_site, site_index, site, force) for site_index, site in enumerate(experiment_config.sites) ] for future in futures: provider, _labels, _networks = future.result() providers.append(provider) labels.extend(_labels) networks.extend(_networks) providers = en.Providers(providers) env["providers"] = providers env["labels"] = labels env["networks"] = networks _extend_labels(labels) for node in labels.all(): preferred_ip, priority = None, 1000 addresses = utils.get_ips(node) if addresses: preferred_ip, priority = addresses[0] log.debug("Preferred IP <%s> with priority <%d>", preferred_ip, priority) node.extra["kiso_preferred_ip"] = str(preferred_ip) node.extra["is_kiso_preferred_ip_private"] = priority > 1 return providers, labels, networks def _init_site( index: int, site: dict[Any, Any], force: bool = False ) -> tuple[Provider, Roles, Networks]: """Initialize a site for provisioning resources. Configures and initializes a site based on its provider type, handling specific requirements for different cloud providers like Chameleon. Performs the following key tasks: - Validates the site's provider type - Configures exposed ports for containers - Initializes provider resources and networks - Adds metadata to nodes about their provisioning context - Handles region-specific configurations :param index: The index of the site in the configuration :type index: int :param site: Site configuration dictionary :type site: dict[Any, Any] :param force: Force recreation of resources, defaults to False :type force: bool, optional :raises TypeError: If an invalid site provider type is specified :return: A tuple containing the provider, labels, and networks for the site :rtype: tuple[Provider, Roles, Networks] """ kind = site["kind"] if kind not in PROVIDER_MAP: raise TypeError(f"Invalid site.type <{kind}> for site <{index}>") # There is no firewall on ChameleonEdge containers, but to reach HTCondor # daemons the port(s) still need to be exposed if kind == "chameleon-edge": for container in site["resources"]["machines"]: container = container["container"] exposed_ports = set(container.get("exposed_ports", [])) exposed_ports.add(str(const.HTCONDOR_PORT)) container["exposed_ports"] = list(exposed_ports) conf = PROVIDER_MAP[kind][0](site) provider = PROVIDER_MAP[kind][1](conf) _labels, _networks = provider.init(force_deploy=force) _deduplicate_hosts(_labels) _labels[kind] = _labels.all() _networks[kind] = _networks.all() # For Chameleon site, the region name is important as each region will act like # a different site region_name = kind if kind.startswith("chameleon"): region_name = _get_region_name(site["rc_file"]) _labels[region_name] = _labels.all() _networks[region_name] = _networks.all() # To each node we add a tag to identify what site/region it was provisioned on for node in _labels.all(): # ChameleonDevice object does not have an attribute named extra if kind == "chameleon-edge": attr = "extra" setattr(node, attr, {}) elif kind == "chameleon" or kind == "fabric": # Used to copy this file to Chameleon VMs, so we can use the Openstack # client to get a floating IP node.extra["rc_file"] = str(Path(conf.rc_file).resolve()) node.extra["kind"] = kind node.extra.setdefault("site", region_name) if kind == "fabric": _labels[f"fabric.{node.extra['site']}"] += [node] if kind != "chameleon-edge": _labels = en.sync_info(_labels, _networks) else: # Because zunclient.v1.containers.Container is not pickleable provider.client.concrete_resources = [] return provider, _labels, _networks def _deduplicate_hosts(labels: Roles) -> None: """Deduplicate host objects across label groups to a single canonical instance. When a host appears in multiple label groups it may be represented by different Python objects. This function replaces duplicate objects with the first-seen canonical instance so that identity comparisons across groups remain consistent. :param labels: Label-to-host mapping to deduplicate in place :type labels: Roles """ dedup = {} for _, nodes in labels.items(): update = set() for node in nodes: if node not in dedup: dedup[node] = node else: update.add(dedup[node]) for node in update: nodes.remove(node) nodes.extend(update) def _get_region_name(rc_file: str) -> str | None: """Extract the OpenStack region name from a given RC file. Parses the provided RC file to find the OS_REGION_NAME environment variable and returns its value. Raises a ValueError if the region name cannot be found. :param rc_file: Path to the OpenStack RC file containing environment variables :type rc_file: str :raises ValueError: If OS_REGION_NAME is not found in the RC file :return: The name of the OpenStack region :rtype: str | None """ region_name = None with Path(rc_file).open() as env_file: for env_var in env_file: if "OS_REGION_NAME" in env_var: parts = env_var.split("=") region_name = parts[1].strip("\n\"'") break else: raise ValueError(f"Unable to get region name from the rc_file <{rc_file}>") return region_name def _extend_labels(labels: Roles) -> None: """Create unique labels for each node based on their original label. :param labels: Dictionary of labels and their associated nodes :type labels: Roles """ extra: dict[str, set] = defaultdict(set) for label, nodes in labels.items(): for index, node in enumerate(nodes, 1): # EnOSlib resources.machines.number can be greater than 1, so we add the # host with a new unique label of the form kiso.<label>.<index> _label = f"kiso.{label}.{index}" extra[_label].add(node) labels.update(extra) def _show_rysnc_warning(sites: list[dict[str, Any]]) -> None: """Show a warning if default rsync package is in use on the host. The site kind is FABRIC The host OS is macOS The rsync available is the default one """ import sys from rich.markdown import Markdown for site in sites: if site["kind"] == "fabric": break else: return rsync = shutil.which("rsync") if sys.platform == "darwin" and rsync and rsync == "/usr/bin/rsync": console.print(Markdown("# ⚠️ Warning ⚠️"), style="yellow blink") console.print( "macOS' rsync does not work as expected when a host has an IPv6 address " "and a gateway host is used in between. Install the rsync package from " "Homebrew before invoking kiso run", style="yellow", ) def _install_commons(env: Environment) -> None: """Install components needed to run a Kiso experiment. 1. Disable SELinux on EL-based systems. 2. Disable Firewall. 3. Install dependencies, like sudo, curl, etc. 4. Create a kiso group and a user. 5. Allow passwordless sudo for kiso. 6. Copy .ssh dir to ~kiso/.ssh dir. :param env: Environment context for the installation :type env: Environment """ log.debug("Install Commons") console.rule("[bold green]Installing Commons[/bold green]") labels = env["labels"] # Special case here. Do not pass (labels, labels) to split_labels. Since the Roles # object is like a dictionary, so labels - labels["<key>"] and # labels & labels["<key>"] doesn't work. vms, containers = utils.split_labels(labels.all(), labels) results = [] etc_hosts_content = _generate_etc_hosts(env) log.debug("/etc/hosts content <%s>", etc_hosts_content) if vms: results.extend( utils.run_ansible( [Path(__file__).parent / "commons/main.yml"], roles=vms, extra_vars={"etc_hosts_content": etc_hosts_content}, ) ) if containers: for container in containers: results.append( edge.run_script( container, Path(__file__).parent / "commons/init.sh", "--hosts", etc_hosts_content, "--no-dry-run", timeout=-1, ) ) display.commons(console, results) def _generate_etc_hosts(env: Environment) -> str: """Generate /etc/hosts file for the experiment.""" labels = env["labels"] content = io.StringIO() host_to_labels: dict[str, set[str]] = defaultdict(set) for label, machines in labels.items(): if len(machines) == 1: host_to_labels[ machines[0].extra.get("kiso_preferred_ip", machines[0].address) ].add(label) content.write("# Kiso: Start\n") for address, labels in host_to_labels.items(): content.write(f"{address} {' '.join(labels)}\n") content.write("# Kiso: End\n") return content.getvalue() def _install_software(experiment_config: Kiso, env: Environment) -> None: """Install software on specified labels in an experiment configuration.""" softwares = experiment_config.software if softwares is None: return for software in fields(Software): config = getattr(softwares, software.name, None) if config is None: continue # Get the `name` of the software name = software.name # Locate the EntryPoint for the software `name` and load it cls = utils.get_software(name) # Instantiate the installer class. obj = cls( config # Software configuration ) obj(env) def _install_deployed_software(experiment_config: Kiso, env: Environment) -> None: """Install software for deployments on specified labels in an experiment configuration.""" # noqa: E501 deployments = experiment_config.deployment if deployments is None: return for deployment in fields(Deployment): config = getattr(deployments, deployment.name, None) if config is None: continue # Get the `name` of the deployment name = deployment.name # Locate the EntryPoint for the software `name` and load it cls = utils.get_deployment(name) # Instantiate the installer class. obj = cls( config # Deployment configuration ) obj(env)
[docs] @validate_config @enostask() @check_provisioned def run( experiment_config: Kiso, force: bool = False, env: Environment = None, **kwargs: Any, # noqa: ANN401 ) -> None: """Run the defined experiments. Executes a series of experiments by performing the following steps: - Copies experiment directory to remote labels - Executes experiment :param experiment_config: Configuration dictionary containing experiment details :type experiment_config: Kiso :param force: Force rerunning of experiments, defaults to False :type force: bool, optional :param env: Environment configuration containing providers, labels, and networks :type env: Environment, optional :param kwargs: Additional keyword arguments :type kwargs: dict """ log.debug("Run Kiso experiments") console.rule("[bold green]Run experiments[/bold green]") experiments = experiment_config.experiments variables = copy.deepcopy(experiment_config.variables, {}) env.setdefault("experiments", {}) if force is True: env["experiments"] = {} _copy_experiment_dir(env) for experiment_index, experiment in enumerate(experiments): env["experiments"].setdefault(experiment_index, {}) _run_experiments(experiment_index, experiment, variables, env)
def _copy_experiment_dir(env: Environment) -> None: """Copy experiment directory to remote labels. Copies the experiment directory from the local working directory to the remote working directory for specified submit node labels. Supports copying to both virtual machines and containers. :param env: Environment configuration containing labels and working directory information :type env: Environment :raises Exception: If directory copy fails for any label """ log.debug("Copy experiment directory to remote nodes") console.print("Copying experiment directory to remote nodes") labels = env["labels"] # Special case here. Do not pass (labels, labels) to split_labels. Since the Roles # object is like a dictionary, so labels - labels["<key>"] and # labels & labels["<key>"] doesn't work. vms, containers = utils.split_labels(labels.all(), labels) try: kiso_state = env["experiments"] if kiso_state.get("copy-experiment-directory") == const.STATUS_OK: return kiso_state["copy-experiment-directory"] = const.STATUS_STARTED src = Path(env["wd"]) dst = Path(env["remote_wd"]).parent if vms: with utils.actions(roles=vms, strategy="free") as p: # macOS's rsync does not work as expected when the host # has an IPv6 address and a gateway host is used in between. So we # create a temp SSH config file with the Host and HostName directives # and use it to run rsync tmpfile = tempfile.NamedTemporaryFile(delete=False) # noqa: SIM115 p.copy( dest=f"{tmpfile.name}-{{{{ansible_host}}}}", content=""" Host pegasusvm HostName {{ansible_host}} """, delegate_to="localhost", ) p.shell( f"rsync -auzv -e 'ssh -F {tmpfile.name}-{{{{ansible_host}}}} " "{{ansible_ssh_common_args}} " "{% if ansible_port is defined %}-p {{ansible_port}} " "{% endif %}{% if ansible_ssh_private_key_file is defined %}-i " "{{ansible_ssh_private_key_file}}{% endif %}' " f"{src} kiso@pegasusvm:{dst}", delegate_to="localhost", task_name="Copy experiment dir", ) p.file( path=f"{tmpfile.name}-{{{{ansible_host}}}}", state="absent", delegate_to="localhost", ) tmpfile.close() if containers: for container in containers: edge.upload(container, src, dst, user=const.KISO_USER) except Exception: kiso_state["copy-experiment-directory"] = const.STATUS_FAILED raise else: kiso_state["copy-experiment-directory"] = const.STATUS_OK def _run_experiments( index: int, experiment: ExperimentTypes, variables: dict, env: Environment ) -> None: """Run multiple workflow instances for a specific experiment. Generates and executes workflows for each instance of an experiment. :param index: The overall experiment index :type index: int :param experiment: Configuration dictionary for the experiment :type experiment: dict :param env: Environment context containing workflow and execution details :type env: Environment """ # Get the `kind` of experiment kind = experiment.kind # Locate the EntryPoint for the runner `kind` of experiment and load it cls = utils.get_runner(kind) # Instantiate the runner class. The runner class to use is defined in the # runner's `RUNNER` attribute runner = cls( experiment, index, variables=variables, # Variables defined globally for the experiment ) # Run the experiment runner( env["wd"], # Local experiment working directory env["remote_wd"], # Remote experiment working directory env["resultdir"], # Local results directory env["labels"], # Provisioned resources env["experiments"][index], # Store to maintain the state of the experiment )
[docs] @validate_config @enostask() @check_provisioned def down(experiment_config: Kiso, env: Environment = None, **kwargs: dict) -> None: # noqa: C901 """Destroy the resources provisioned for the experiments. This function is responsible for tearing down and cleaning up resources associated with an experiment configuration using the specified providers. :param experiment_config: Configuration dictionary for the experiment :type experiment_config: Kiso :param env: Environment object containing provider information :type env: Environment, optional :param kwargs: Additional keyword arguments :type kwargs: dict """ log.debug("Destroy the resources provisioned for the experiments") console.rule( "[bold green]Destroy resources created for the experiments[/bold green]" ) if "providers" not in env: log.debug("No providers found, skipping") console.rule( "No providers found. Either resources were not provisioned or the output " "directory was removed" ) return vagrant_dir = Path(env["wd"]) / ".vagrant" vagrant_file = Path(env["wd"]) / "Vagrantfile" providers = env["providers"] del env["providers"] has_vagrant = hasattr(en, "Vagrant") has_chameleon_edge = hasattr(en, "ChameleonEdge") for provider in providers.providers: if has_vagrant and isinstance(provider, en.Vagrant) and vagrant_dir.exists(): ssh_add = shutil.which("ssh-add") if ssh_add: for key in vagrant_dir.glob("**/private_key"): result = subprocess.run([ssh_add, "-d", str(key)]) # noqa: S603 if result.returncode != 0: log.debug("Failed to remove SSH key <%s> from ssh-agent", key) elif has_chameleon_edge and isinstance(provider, en.ChameleonEdge): import chi for container in env["labels"]["chameleon-edge"]: with contextlib.suppress(BaseException): chi.container.destroy_container(container.uuid) providers.destroy() if has_vagrant and vagrant_dir.exists(): shutil.rmtree(vagrant_dir) if has_vagrant and vagrant_file.exists(): vagrant_file.unlink()