"""Main Kiso task implementations."""
# ruff: noqa: ARG001
from __future__ import annotations
import contextlib
import copy
import io
import logging
import shutil
import subprocess
import tempfile
from collections import Counter, defaultdict
from dataclasses import fields
from functools import wraps
from ipaddress import IPv4Address, IPv4Interface, IPv6Address, IPv6Interface, ip_address
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, TypeVar
import enoslib as en
import yaml
from dacite import from_dict
from enoslib.objects import DefaultNetwork, Host, Networks, Roles
from enoslib.task import Environment, enostask
from jsonschema.validators import validator_for
from jsonschema_pyref import RefResolver, ValidationError
from rich.console import Console
import kiso.constants as const
from kiso import display, edge, utils
from kiso.configuration import Deployment, Kiso, Software
from kiso.errors import KisoError
from kiso.ip import associate_floating_ip
from kiso.log import get_process_pool_executor
from kiso.schema import SCHEMA
from kiso.version import __version__
if TYPE_CHECKING:
from os import PathLike
from enoslib.infra.enos_chameleonedge.objects import ChameleonDevice
from enoslib.infra.provider import Provider
from kiso.configuration import ExperimentTypes
T = TypeVar("T")
PROVIDER_MAP: dict[str, tuple[Callable[[dict], Any], Callable[..., Any]]] = {}
log = logging.getLogger("kiso")
console = Console()
has_fabric = False
if hasattr(en, "Vagrant"):
log.debug("Vagrant provider is available")
PROVIDER_MAP["vagrant"] = (en.VagrantConf.from_dictionary, en.Vagrant)
if hasattr(en, "CBM"):
log.debug("Chameleon Bare Metal provider is available")
PROVIDER_MAP["chameleon"] = (en.CBMConf.from_dictionary, en.CBM)
if hasattr(en, "ChameleonEdge"):
log.debug("Chameleon Edge provider is available")
PROVIDER_MAP["chameleon-edge"] = (
en.ChameleonEdgeConf.from_dictionary,
en.ChameleonEdge,
)
if hasattr(en, "Fabric"):
log.debug("FABRIC provider is available")
from enoslib.infra.enos_fabric.configuration import Fabnetv6NetworkConfiguration
PROVIDER_MAP["fabric"] = (en.FabricConf.from_dictionary, en.Fabric)
has_fabric = True
def validate_config(func: Callable[..., T]) -> Callable[..., T]:
"""Decorator to validate the experiment configuration against a predefined schema.
Validates the experiment configuration by checking it against the Kiso experiment
configuration schema. Supports configuration passed as a dictionary or a file path.
:param func: The function to be decorated, which will receive the experiment
configuration
:type func: Callable[..., T]
:return: A wrapped function that validates the configuration before executing the
original function
:rtype: Callable[..., T]
:raises ValidationError: if the configuration is invalid
"""
@wraps(func)
def wrapper(experiment_config: PathLike | dict, *args: Any, **kwargs: Any) -> T: # noqa: ANN401
log.debug("Check Kiso experiment configuration")
if isinstance(experiment_config, dict):
config = experiment_config
wd = Path.cwd().resolve()
else:
wd = Path(experiment_config).parent.resolve()
with Path(experiment_config).open() as _experiment_config:
config = yaml.safe_load(_experiment_config)
try:
validator_cls = validator_for(SCHEMA)
validator = validator_cls(SCHEMA, resolver=RefResolver.from_schema(SCHEMA))
errors = []
for error in validator.iter_errors(
_replace_labels_key_with_roles_key(config)
):
log.error(error)
errors.append(error)
if errors:
raise ValidationError("JSON Schema Validation Error", errors)
# Convert the JSON configuration to a :py:class:`dataclasses.dataclass`
kiso_config = from_dict(Kiso, config)
console.rule("[bold green]Check experiment configuration[/bold green]")
log.debug("Check only one vagrant site is present in the experiment")
label_to_machines: Roles = _get_defined_machines(kiso_config)
_check_software(kiso_config.software, label_to_machines)
_check_deployed_software(kiso_config.deployment, label_to_machines)
_check_experiments(kiso_config, label_to_machines)
except ValidationError:
log.exception("Invalid Kiso experiment config <%s>", experiment_config)
raise
log.debug("Kiso experiment configuration is valid")
return func(kiso_config, *args, wd=wd, **kwargs)
return wrapper
def check_provisioned(func: Callable[..., T]) -> Callable[..., T]:
"""Decorator to check that the resources were provisioned.
:param func: The function to be decorated, which will receive the experiment
configuration
:type func: Callable[..., T]
:return: A wrapped function that validates the configuration before executing the
original function
:rtype: Callable[..., T]
"""
@wraps(func)
def wrapper(experiment_config: Kiso, *args: Any, **kwargs: Any) -> T: # noqa: ANN401
is_provisioned = False
env = kwargs.get("env")
if env and env.get("providers"):
is_provisioned = True
if is_provisioned is False:
raise KisoError(
"No providers found, resources were not provisioned. "
"Suggestion: Run `kiso up` first."
)
return func(experiment_config, *args, **kwargs)
return wrapper
def _replace_labels_key_with_roles_key(experiment_config: Kiso | dict) -> dict:
"""Replace labels with roles in the experiment configuration."""
experiment_config = copy.deepcopy(experiment_config)
sites = (
experiment_config["sites"]
if isinstance(experiment_config, dict)
else experiment_config.sites
)
for site in sites:
for machine in site["resources"]["machines"]:
machine["roles"] = machine["labels"]
del machine["labels"]
for network in site["resources"].get("networks", []):
if isinstance(network, str):
continue
network["roles"] = network["labels"]
del network["labels"]
return experiment_config
[docs]
@validate_config
def check(experiment_config: Kiso, **kwargs: dict) -> None:
"""Check the experiment configuration for various validation criteria.
This function performs multiple validation checks on the experiment configuration,
including:
- Verifying vagrant site constraints
- Validating label definitions
- Checking docker and HTCondor configurations
- Ensuring proper node configurations
- Validating input file locations
- Performing EnOSlib platform checks
:param experiment_config: The experiment configuration dictionary
:type experiment_config: Kiso
:param kwargs: Additional keyword arguments
:type kwargs: dict
"""
log.debug("Check EnOSlib")
en.MOTD = en.INFO = ""
en.check(platform_filter=["Vagrant", "Fabric", "Chameleon", "ChameleonEdge"])
def _get_defined_machines(experiment_config: Kiso) -> Roles:
"""Get the defined machines from the experiment configuration.
Extracts and counts labels defined in the sites section of the experiment
configuration. Validates that only one Vagrant site is present and generates
additional label variants.
:param experiment_config: Configuration dictionary containing site and resource
definitions
:type experiment_config: Kiso
:raises ValueError: If multiple Vagrant sites are detected
:return: A counter of defined labels with their counts
:rtype: Roles
"""
vagrant_sites = 0
def_labels: Counter = Counter()
label_to_machines: Roles = defaultdict(set)
for site_index, site in enumerate(experiment_config.sites):
if site["kind"] == "vagrant":
vagrant_sites += 1
for machine_index, machine in enumerate(site["resources"]["machines"]):
def_labels.update({site["kind"]: machine.get("number", 1)})
for label in machine["labels"]:
def_labels.update({label: machine.get("number", 1)})
for index in range(machine.get("number", 1)):
machine_key = Host(
f"site-{site_index}-machine-{machine_index}-index-{index}"
)
label_to_machines[site["kind"]].add(machine_key)
for label in machine["labels"]:
label_to_machines[label].add(machine_key)
else:
if vagrant_sites > 1:
raise ValueError("Multiple vagrant sites are not supported")
extra_labels = {}
for label, count in def_labels.items():
machines = list(label_to_machines[label])
for index in range(1, count + 1):
extra_labels[f"kiso.{label}.{index}"] = 1
label_to_machines[f"kiso.{label}.{index}"].add(machines[index - 1])
return label_to_machines
def _check_software(softwares: Software, label_to_machines: dict[str, set]) -> None:
"""Check software configuration."""
if softwares is None:
return
for software in fields(Software):
config = getattr(softwares, software.name, None)
if config is None:
continue
# Get the `name` of the software
name = software.name
# Locate the EntryPoint for the software `name` and load it
cls = utils.get_software(name)
# Instantiate the installer class.
obj = cls(
config # Software configuration
)
obj.check(label_to_machines)
def _check_deployed_software(
deployments: Deployment, label_to_machines: dict[str, set]
) -> None:
"""Check software deployment configuration."""
if deployments is None:
return
for deployment in fields(Deployment):
config = getattr(deployments, deployment.name, None)
if config is None:
continue
# Get the `name` of the software
name = deployment.name
# Locate the EntryPoint for the software `name` and load it
cls = utils.get_deployment(name)
# Instantiate the installer class.
obj = cls(
config # Deployment configuration
)
obj.check(label_to_machines)
def _check_experiments(
experiment_config: Kiso, label_to_machines: dict[str, set]
) -> None:
"""Check software deployment configuration."""
experiments = experiment_config.experiments
if experiments is None:
return
variables = copy.deepcopy(experiment_config.variables, {})
for index, experiment in enumerate(experiments):
# Get the `kind` of experiment
kind = experiment.kind
# Locate the EntryPoint for the runner `kind` of experiment and load it
cls = utils.get_runner(kind)
# Instantiate the runner class.
runner = cls(
experiment,
index,
variables=variables, # Variables defined globally for the experiment
)
runner.check(experiment_config, label_to_machines)
[docs]
@validate_config
@enostask(new=True, symlink=False)
def up(
experiment_config: Kiso,
force: bool = False,
env: Environment = None,
**kwargs: Any, # noqa: ANN401
) -> None:
"""Create and set up resources for running an experiment.
Initializes the experiment environment, sets up working directories, and prepares
infrastructure by initializing sites, installing Docker, Apptainer, and HTCondor
across specified labels.
:param experiment_config: Configuration dictionary defining experiment parameters
:type experiment_config: Kiso
:param force: Force recreation of resources, defaults to False
:type force: bool, optional
:param env: Optional environment context for the experiment, defaults to None
:type env: Environment, optional
"""
console.rule(
"[bold green]Create and set up resources for the experiments[/bold green]"
)
env["version"] = __version__
env["wd"] = str(kwargs.get("wd", Path.cwd()))
env["remote_wd"] = str(Path("~kiso") / Path(env["wd"]).name)
experiment_config = _replace_labels_key_with_roles_key(experiment_config)
_init_sites(experiment_config, env, force)
_install_commons(env)
_install_software(experiment_config, env)
_install_deployed_software(experiment_config, env)
def _init_sites(
experiment_config: Kiso, env: Environment, force: bool = False
) -> tuple[list[Provider], Roles, Networks]:
"""Initialize sites for an experiment.
Initializes and configures sites from the experiment configuration using parallel
processing.
Performs the following key tasks:
- Initializes providers for each site concurrently
- Aggregates labels and networks from initialized sites
- Extends labels with daemon-to-site mappings
- Determines public IP requirements
- Associates floating IPs and selects preferred IPs for nodes
:param experiment_config: Configuration dictionary containing site definitions
:type experiment_config: Kiso
:param env: Environment context for the experiment
:type env: Environment
:param force: Force recreation of resources, defaults to False
:type force: bool, optional
:return: A tuple of providers, labels, and networks for the experiment
:rtype: tuple[list[Provider], Roles, Networks]
"""
log.debug("Initializing sites")
providers = []
labels = Roles()
networks = Networks()
with get_process_pool_executor() as executor:
futures = [
executor.submit(_init_site, site_index, site, force)
for site_index, site in enumerate(experiment_config.sites)
]
for future in futures:
provider, _labels, _networks = future.result()
providers.append(provider)
labels.extend(_labels)
networks.extend(_networks)
providers = en.Providers(providers)
env["providers"] = providers
env["labels"] = labels
env["networks"] = networks
daemon_to_site = _extend_labels(experiment_config, labels)
# TODO(mayani): Kiso should not have to detect and associate public IPs with nodes
# Kiso should not be aware if a software or deployment requires public IPs, it
# should be handled by the installer of the software or deployment or throw an
# error requiring the user to provision IPs during provisioning
if experiment_config.deployment and experiment_config.deployment.htcondor:
is_public_ip_required = _is_public_ip_required(daemon_to_site)
env["is_public_ip_required"] = is_public_ip_required
for node in labels.all():
preferred_ip, priority = None, 1000
addresses = _get_ips(node)
if addresses:
# Priority is,
# 0 for a public IPv4 address
# 1 for a public IPv6 address
# 2 for a private IPv4 address
# 3 for a private IPv6 address
preferred_ip, priority = addresses[0]
log.debug(
"Preferred IP <%s> with priority <%d>", preferred_ip, priority
)
if (
is_public_ip_required
and priority > 1
and (node.extra["is_central_manager"] or node.extra["is_submit"])
):
preferred_ip = associate_floating_ip(node)
node.extra["kiso_preferred_ip"] = str(preferred_ip)
return providers, labels, networks
def _init_site(
index: int, site: dict[Any, Any], force: bool = False
) -> tuple[Provider, Roles, Networks]:
"""Initialize a site for provisioning resources.
Configures and initializes a site based on its provider type, handling specific
requirements for different cloud providers like Chameleon. Performs the following
key tasks:
- Validates the site's provider type
- Configures exposed ports for containers
- Initializes provider resources and networks
- Adds metadata to nodes about their provisioning context
- Handles region-specific configurations
:param index: The index of the site in the configuration
:type index: int
:param site: Site configuration dictionary
:type site: dict[Any, Any]
:param force: Force recreation of resources, defaults to False
:type force: bool, optional
:raises TypeError: If an invalid site provider type is specified
:return: A tuple containing the provider, labels, and networks for the site
:rtype: tuple[Provider, Roles, Networks]
"""
kind = site["kind"]
if kind not in PROVIDER_MAP:
raise TypeError(f"Invalid site.type <{kind}> for site <{index}>")
# There is no firewall on ChameleonEdge containers, but to reach HTCondor
# daemons the port(s) still need to be exposed
if kind == "chameleon-edge":
for container in site["resources"]["machines"]:
container = container["container"]
exposed_ports = set(container.get("exposed_ports", []))
exposed_ports.add(str(const.HTCONDOR_PORT))
container["exposed_ports"] = list(exposed_ports)
conf = PROVIDER_MAP[kind][0](site)
provider = PROVIDER_MAP[kind][1](conf)
_labels, _networks = provider.init(force_deploy=force)
_deduplicate_hosts(_labels)
_labels[kind] = _labels.all()
_networks[kind] = _networks.all()
# For Chameleon site, the region name is important as each region will act like
# a different site
region_name = kind
if kind.startswith("chameleon"):
region_name = _get_region_name(site["rc_file"])
_labels[region_name] = _labels.all()
_networks[region_name] = _networks.all()
# To each node we add a tag to identify what site/region it was provisioned on
for node in _labels.all():
# ChameleonDevice object does not have an attribute named extra
if kind == "chameleon-edge":
attr = "extra"
setattr(node, attr, {})
elif kind == "chameleon" or kind == "fabric":
# Used to copy this file to Chameleon VMs, so we can use the Openstack
# client to get a floating IP
node.extra["rc_file"] = str(Path(conf.rc_file).resolve())
node.extra["kind"] = kind
node.extra.setdefault("site", region_name)
if kind == "fabric":
_labels[f"fabric.{node.extra['site']}"] += [node]
if kind != "chameleon-edge":
_labels = en.sync_info(_labels, _networks)
else:
# Because zunclient.v1.containers.Container is not pickleable
provider.client.concrete_resources = []
return provider, _labels, _networks
def _deduplicate_hosts(labels: Roles) -> None:
"""Deduplicate_hosts _summary_.
_extended_summary_
:param labels: _description_
:type labels: Roles
"""
dedup = {}
for _, nodes in labels.items():
update = set()
for node in nodes:
if node not in dedup:
dedup[node] = node
else:
update.add(dedup[node])
for node in update:
nodes.remove(node)
nodes.extend(update)
def _get_region_name(rc_file: str) -> str | None:
"""Extract the OpenStack region name from a given RC file.
Parses the provided RC file to find the OS_REGION_NAME environment variable
and returns its value. Raises a ValueError if the region name cannot be found.
:param rc_file: Path to the OpenStack RC file containing environment variables
:type rc_file: str
:raises ValueError: If OS_REGION_NAME is not found in the RC file
:return: The name of the OpenStack region
:rtype: str | None
"""
region_name = None
with Path(rc_file).open() as env_file:
for env_var in env_file:
if "OS_REGION_NAME" in env_var:
parts = env_var.split("=")
region_name = parts[1].strip("\n\"'")
break
else:
raise ValueError(f"Unable to get region name from the rc_file <{rc_file}>")
return region_name
def _extend_labels(experiment_config: Kiso, labels: Roles) -> dict[str, set]:
"""Extend labels for an experiment configuration by adding unique labels and flags to nodes.
Processes the given labels and experiment configuration to:
- Create unique labels for each node based on their original label
- Add flags to nodes indicating their HTCondor daemon types (central manager,
submit, execute, personal)
- Add flags for container technologies (Docker, Apptainer)
- Track the sites where different HTCondor daemon types are located
:param experiment_config: Configuration dictionary for the experiment
:type experiment_config: Kiso
:param labels: Dictionary of labels and their associated nodes
:type labels: Roles
:return: A mapping of HTCondor daemon types to their sites
:rtype: dict[str, set]
""" # noqa: E501
extra: dict[str, set] = defaultdict(set)
daemon_to_site = defaultdict(set)
central_manager_labels, submit_labels, execute_labels, personal_labels = (
_get_condor_daemon_labels(experiment_config)
)
for label, nodes in labels.items():
is_central_manager = label in central_manager_labels
is_submit = label in submit_labels
is_execute = label in execute_labels
is_personal = label in personal_labels
for index, node in enumerate(nodes, 1):
# EnOSlib resources.machines.number can be greater than 1, so we add the
# host with a new unique label of the form kiso.<label>.<index>
_label = f"kiso.{label}.{index}"
extra[_label].add(node)
# To each node we add flags to identify what HTCondor daemons will run on
# the node
node.extra["is_central_manager"] = (
node.extra.get("is_central_manager", False) or is_central_manager
)
node.extra["is_submit"] = node.extra.get("is_submit", False) or is_submit
node.extra["is_execute"] = node.extra.get("is_execute", False) or is_execute
node.extra["is_personal"] = (
node.extra.get("is_personal", False) or is_personal
)
site = ["fabric" if node.extra["kind"] == "fabric" else node.extra["site"]]
if is_execute:
daemon_to_site["execute"].update(site)
if is_submit:
daemon_to_site["submit"].update(site)
if is_central_manager:
daemon_to_site["central-manager"].update(site)
labels.update(extra)
return daemon_to_site
def _is_public_ip_required(daemon_to_site: dict[str, set]) -> bool:
"""Determine if a public IP address is required for the HTCondor cluster configuration.
Checks if public IP addresses are needed based on the distribution of HTCondor
daemons
across different sites. A public IP is required under the following conditions:
- Execute nodes are spread across multiple sites
- Submit nodes are spread across multiple sites
- Execute and submit nodes are on different sites
- Submit nodes are on a different site from the central manager
:param daemon_to_site: A dictionary mapping HTCondor daemon types to their sites
:type daemon_to_site: dict[str, set]
:return: True if a public IP is required, False otherwise
:rtype: bool
""" # noqa: E501
is_public_ip_required = False
central_manager = daemon_to_site["central-manager"]
submit = daemon_to_site["submit"]
execute = daemon_to_site["execute"]
# A public IP is required if,
# 1. If execute nodes are on multiple sites
# 2. If submit nodes are on multiple sites
# 3. If all execute nodes and submit nodes are on one site, but not the same one
# 4. If submit nodes are on one site, but not the same one as the central manager
if (central_manager or submit or execute) and (
len(execute) > 1
or len(submit) > 1
or execute != submit
or submit - central_manager
):
is_public_ip_required = True
return is_public_ip_required
def _get_ips(
machine: Host | ChameleonDevice, is_public_ip_required: bool = False
) -> list[tuple[IPv4Address | IPv6Address, int]]:
"""Get the IP addresses for a given machine.
Selects an IP address based on priority, filtering out multicast, reserved,
loopback, and link-local addresses. Supports both Host and ChameleonDevice
types. Optionally enforces returning a public IP address.
:param machine: The machine to get an IP address for
:type machine: Host | ChameleonDevice
:param is_public_ip_required: Whether a public IP is required, defaults to False
:type is_public_ip_required: bool, optional
:return: List of tuples of an IP address and it's priority.
Priority is 0 for a public IPv4 address, 1 for a public IPv6 address,
2 for a private IPv4 address, and 3 for a private IPv6 address.
:rtype: list[tuple[IPv4Address | IPv6Address, int]]
:raises ValueError: If a public IP is required but not available
"""
addresses = []
# Vagrant Host
# net_devices={
# NetDevice(
# name='eth1',
# addresses={
# IPAddress(
# network=None,
# ip=IPv6Interface('fe80::a00:27ff:fe6f:87e4/64')),
# IPAddress(
# network=<enoslib.infra.enos_vagrant.provider.VagrantNetwork ..,
# ip=IPv4Interface('172.16.255.243/24'))
# ..
# )
# }
#
# Chameleon Host
# net_devices={
# NetDevice(
# name='eno12419',
# addresses=set()),
# NetDevice(
# name='enp161s0f1',
# addresses=set()),
# NetDevice(
# name='enp161s0f0',
# addresses={
# IPAddress(
# network=<enoslib.infra.enos_openstack.objects.OSNetwork ..>,
# ip=IPv4Interface('10.52.3.205/22')
# ),
# IPAddress(
# network=None,
# ip=IPv6Interface('fe80::3680:dff:feed:50f4/64'))}
# ),
# NetDevice(
# name='lo',
# addresses={
# IPAddress(network=None, ip=IPv4Interface('127.0.0.1/8')),
# IPAddress(network=None, ip=IPv6Interface('::1/128'))}),
# NetDevice(
# name='eno8303',
# addresses=set()
# )
# )
# Chameleon Edge Host
# Fabric Host
# 1 for Management, 1 for add_fabnet, and 1 for loopback
# net_devices={
# NetDevice(
# name="lo",
# addresses={
# IPAddress(network=None, ip=IPv4Interface("127.0.0.1/8")),
# IPAddress(network=None, ip=IPv6Interface("::1/128")),
# },
# ),
# NetDevice(
# name="eth0",
# addresses={
# IPAddress(network=None, ip=IPv4Interface("10.20.4.136/23")),
# IPAddress(network=None, ip=IPv6Interface("fe80::f816:3eff:fecd:a657/64")),
# },
# ),
# NetDevice(
# name="eth1",
# addresses={
# IPAddress(network=None, ip=IPv4Interface("10.134.142.2/24")),
# IPAddress(network=None, ip=IPv6Interface("fe80::8117:f69:a883:76c5/64")),
# },
# ),
# }
if isinstance(machine, Host):
for net_device in machine.net_devices:
for address in net_device.addresses:
if isinstance(address.network, DefaultNetwork) and isinstance(
address.ip, (IPv4Interface, IPv6Interface)
):
ip = address.ip.ip
if (
ip.is_multicast
or ip.is_reserved
or ip.is_loopback
or ip.is_link_local
):
continue
# FABRIC uses the same IPRange (2602:FCFB::/36) for both IPv6
# and IPv6External networks, so we check if the IPv6 address
# assigned by FABRIC is public or private.
is_private = ip.is_private or (
has_fabric
and isinstance(
address.network.config, Fabnetv6NetworkConfiguration
)
)
# Prioritize public over private IPs and prioritize IPv4 over IPv6
priority = (
(2 if is_private else 0)
if isinstance(address.ip, IPv4Interface)
else (3 if is_private else 1)
)
addresses.append((address.ip.ip, priority))
else:
address = ip_address(machine.address)
priority = 1 if address.is_private else 0
addresses.append((address, priority))
for address in machine.extra.get("floating-ips", []):
ip = ip_address(address)
if ip.is_multicast or ip.is_reserved or ip.is_loopback or ip.is_link_local:
continue
# Prioritize public over private IPs and prioritize IPv4 over IPv6
priority = (
(2 if is_private else 0)
if isinstance(address.ip, IPv4Address)
else (3 if is_private else 1)
)
addresses.append((ip, priority))
addresses = sorted(addresses, key=lambda v: v[1])
log.debug("Addresses <%s>", addresses)
return addresses
preferred_ip, priority = addresses[0]
log.debug("Preferred IP <%s> with priority <%d>", preferred_ip, priority)
if is_public_ip_required is True and priority > 1:
# TODO(mayani): We should not use gateway IP as it could be the same for
# multiple VMs. Here we should just raise an error
preferred_ip = machine.extra.get("gateway")
if preferred_ip is None:
raise ValueError(
f"Machine <{machine.name}> does not have a public IP address"
)
preferred_ip = ip_address(preferred_ip)
return str(preferred_ip)
def _get_condor_daemon_labels(
experiment_config: Kiso,
) -> tuple[set[str], set[str], set[str], set[str]]:
"""Get labels for different HTCondor daemon types from an experiment configuration.
Parses the HTCondor configuration to extract labels for central manager, submit,
execute, and personal daemon types. Validates daemon types and raises an error for
invalid types.
:param experiment_config: Dictionary containing HTCondor cluster configuration
:type experiment_config: Kiso
:raises ValueError: If an invalid HTCondor daemon type is encountered
:return: Tuple of label sets for central manager, submit, execute, and personal
daemons
:rtype: tuple[set[str], set[str], set[str], set[str]]
"""
condor_cluster = (
experiment_config.deployment and experiment_config.deployment.htcondor
)
central_manager_labels = set()
submit_labels = set()
execute_labels = set()
personal_labels = set()
if condor_cluster:
for config in condor_cluster:
if config.kind[0] == "c": # central-manager
central_manager_labels.update(config.labels)
elif config.kind[0] == "s": # submit
submit_labels.update(config.labels)
elif config.kind[0] == "e": # execute
execute_labels.update(config.labels)
elif config.kind[0] == "p": # personal
personal_labels.update(config.labels)
else:
raise ValueError(
f"Invalid HTCondor daemon <{config.kind}> in configuration"
)
return central_manager_labels, submit_labels, execute_labels, personal_labels
def _install_commons(env: Environment) -> None:
"""Install components needed to run a Kiso experiment.
1. Disable SELinux on EL-based systems.
2. Disable Firewall.
3. Install dependencies, like sudo, curl, etc.
4. Create a kiso group and a user.
5. Allow passwordless sudo for kiso.
6. Copy .ssh dir to ~kiso/.ssh dir.
:param env: Environment context for the installation
:type env: Environment
"""
log.debug("Install Commons")
console.rule("[bold green]Installing Commons[/bold green]")
labels = env["labels"]
# Special case here. Do not pass (labels, labels) to split_labels. Since the Roles
# object is like a dictionary, so labels - labels["<key>"] and
# labels & labels["<key>"] doesn't work.
vms, containers = utils.split_labels(labels.all(), labels)
results = []
etc_hosts_content = _generate_etc_hosts(env)
log.debug("/etc/hosts content <%s>", etc_hosts_content)
if vms:
results.extend(
utils.run_ansible(
[Path(__file__).parent / "commons/main.yml"],
roles=vms,
extra_vars={"etc_hosts_content": etc_hosts_content},
)
)
if containers:
for container in containers:
results.append(
utils.run_script(
container,
Path(__file__).parent / "commons/init.sh",
"--hosts",
etc_hosts_content,
"--no-dry-run",
timeout=-1,
)
)
display.commons(console, results)
def _generate_etc_hosts(env: Environment) -> str:
"""Generate /etc/hosts file for the experiment."""
labels = env["labels"]
content = io.StringIO()
host_to_labels: dict[str, set[str]] = defaultdict(set)
for label, machines in labels.items():
if len(machines) == 1:
host_to_labels[
machines[0].extra.get("kiso_preferred_ip", machines[0].address)
].add(label)
content.write("# Kiso: Start\n")
for address, labels in host_to_labels.items():
content.write(f"{address} {' '.join(labels)}\n")
content.write("# Kiso: End\n")
return content.getvalue()
def _install_software(experiment_config: Kiso, env: Environment) -> None:
"""Install software on specified labels in an experiment configuration."""
softwares = experiment_config.software
if softwares is None:
return
for software in fields(Software):
config = getattr(softwares, software.name, None)
if config is None:
continue
# Get the `name` of the software
name = software.name
# Locate the EntryPoint for the software `name` and load it
cls = utils.get_software(name)
# Instantiate the installer class.
obj = cls(
config # Software configuration
)
obj(env)
def _install_deployed_software(experiment_config: Kiso, env: Environment) -> None:
"""Install software for deployments on specified labels in an experiment configuration.""" # noqa: E501
deployments = experiment_config.deployment
if deployments is None:
return
for deployment in fields(Deployment):
config = getattr(deployments, deployment.name, None)
if config is None:
continue
# Get the `name` of the deployment
name = deployment.name
# Locate the EntryPoint for the software `name` and load it
cls = utils.get_deployment(name)
# Instantiate the installer class.
obj = cls(
config # Deployment configuration
)
obj(env)
[docs]
@validate_config
@enostask()
@check_provisioned
def run(
experiment_config: Kiso,
force: bool = False,
env: Environment = None,
**kwargs: Any, # noqa: ANN401
) -> None:
"""Run the defined experiments.
Executes a series of experiments by performing the following steps:
- Copies experiment directory to remote labels
- Executes experiment
:param experiment_config: Configuration dictionary containing experiment details
:type experiment_config: Kiso
:param force: Force rerunning of experiments, defaults to False
:type force: bool, optional
:param env: Environment configuration containing providers, labels, and networks
:type env: Environment, optional
:param kwargs: Additional keyword arguments
:type kwargs: dict
"""
log.debug("Run Kiso experiments")
console.rule("[bold green]Run experiments[/bold green]")
experiments = experiment_config.experiments
variables = copy.deepcopy(experiment_config.variables, {})
env.setdefault("experiments", {})
if force is True:
env["experiments"] = {}
_copy_experiment_dir(env)
for experiment_index, experiment in enumerate(experiments):
env["experiments"].setdefault(experiment_index, {})
_run_experiments(experiment_index, experiment, variables, env)
def _copy_experiment_dir(env: Environment) -> None:
"""Copy experiment directory to remote labels.
Copies the experiment directory from the local working directory to the remote
working directory for specified submit node labels. Supports copying to both virtual
machines and containers.
:param env: Environment configuration containing labels and working directory
information
:type env: Environment
:raises Exception: If directory copy fails for any label
"""
log.debug("Copy experiment directory to remote nodes")
console.print("Copying experiment directory to remote nodes")
labels = env["labels"]
# Special case here. Do not pass (labels, labels) to split_labels. Since the Roles
# object is like a dictionary, so labels - labels["<key>"] and
# labels & labels["<key>"] doesn't work.
vms, containers = utils.split_labels(labels.all(), labels)
try:
kiso_state = env["experiments"]
if kiso_state.get("copy-experiment-directory") == const.STATUS_OK:
return
kiso_state["copy-experiment-directory"] = const.STATUS_STARTED
src = Path(env["wd"])
dst = Path(env["remote_wd"]).parent
if vms:
with utils.actions(roles=vms, strategy="free") as p:
# macOS's rsync does not work as expected when the host
# has an IPv6 address and a gateway host is used in between. So we
# create a temp SSH config file with the Host and HostName directives
# and use it to run rsync
tmpfile = tempfile.NamedTemporaryFile(delete=False) # noqa: SIM115
p.copy(
dest=f"{tmpfile.name}-{{{{ansible_host}}}}",
content="""
Host pegasusvm
HostName {{ansible_host}}
""",
delegate_to="localhost",
)
p.shell(
f"rsync -auzv -e 'ssh -F {tmpfile.name}-{{{{ansible_host}}}} "
"{{ansible_ssh_common_args}} "
"{% if ansible_port is defined %}-p {{ansible_port}} "
"{% endif %}{% if ansible_ssh_private_key_file is defined %}-i "
"{{ansible_ssh_private_key_file}}{% endif %}' "
f"{src} kiso@pegasusvm:{dst}",
delegate_to="localhost",
task_name="Copy experiment dir",
)
p.file(
path=f"{tmpfile.name}-{{{{ansible_host}}}}",
state="absent",
delegate_to="localhost",
)
tmpfile.close()
if containers:
for container in containers:
edge.upload(container, src, dst, user=const.KISO_USER)
except Exception:
kiso_state["copy-experiment-directory"] = const.STATUS_FAILED
raise
else:
kiso_state["copy-experiment-directory"] = const.STATUS_OK
def _run_experiments(
index: int, experiment: ExperimentTypes, variables: dict, env: Environment
) -> None:
"""Run multiple workflow instances for a specific experiment.
Generates and executes workflows for each instance of an experiment.
:param index: The overall experiment index
:type index: int
:param experiment: Configuration dictionary for the experiment
:type experiment: dict
:param env: Environment context containing workflow and execution details
:type env: Environment
"""
# Get the `kind` of experiment
kind = experiment.kind
# Locate the EntryPoint for the runner `kind` of experiment and load it
cls = utils.get_runner(kind)
# Instantiate the runner class. The runner class to use is defined in the
# runner's `RUNNER` attribute
runner = cls(
experiment,
index,
variables=variables, # Variables defined globally for the experiment
)
# Run the experiment
runner(
env["wd"], # Local experiment working directory
env["remote_wd"], # Remote experiment working directory
env["resultdir"], # Local results directory
env["labels"], # Provisioned resources
env["experiments"][index], # Store to maintain the state of the experiment
)
[docs]
@validate_config
@enostask()
@check_provisioned
def down(experiment_config: Kiso, env: Environment = None, **kwargs: dict) -> None: # noqa: C901
"""Destroy the resources provisioned for the experiments.
This function is responsible for tearing down and cleaning up resources
associated with an experiment configuration using the specified providers.
:param experiment_config: Configuration dictionary for the experiment
:type experiment_config: Kiso
:param env: Environment object containing provider information
:type env: Environment, optional
:param kwargs: Additional keyword arguments
:type kwargs: dict
"""
log.debug("Destroy the resources provisioned for the experiments")
console.rule(
"[bold green]Destroy resources created for the experiments[/bold green]"
)
if "providers" not in env:
log.debug("No providers found, skipping")
console.rule(
"No providers found. Either resources were not provisioned or the output "
"directory was removed"
)
return
vagrant_dir = Path(env["wd"]) / ".vagrant"
vagrant_file = Path(env["wd"]) / "Vagrantfile"
providers = env["providers"]
del env["providers"]
for provider in providers.providers:
if isinstance(provider, en.Vagrant) and vagrant_dir.exists():
ssh_add = shutil.which("ssh-add")
if ssh_add:
for key in vagrant_dir.glob("**/private_key"):
result = subprocess.run([ssh_add, "-d", str(key)]) # noqa: S603
if result.returncode != 0:
log.debug("Failed to remove SSH key <%s> from ssh-agent", key)
elif isinstance(provider, en.ChameleonEdge):
import chi
for container in env["labels"]["chameleon-edge"]:
with contextlib.suppress(BaseException):
chi.container.destroy_container(container.uuid)
providers.destroy()
if vagrant_dir.exists():
shutil.rmtree(vagrant_dir)
if vagrant_file.exists():
vagrant_file.unlink()