Source code for oduit.operation_sets

"""Operation set parsing, validation, resolution, inspection, and writing."""

from __future__ import annotations

import re
import time
from collections.abc import Mapping, Sequence
from dataclasses import dataclass, field
from pathlib import Path
from types import MappingProxyType
from typing import Any, Literal, cast

from .exceptions import ConfigError

OperationSetKind = Literal["install", "update", "test"]

_DEFAULT_SCHEMA_VERSION = 2
_DEFAULT_EMPTY_MAPPING: MappingProxyType[str, Any] = MappingProxyType({})
_NAME_SEPARATOR_RE = re.compile(r"[\s,]+")
_VALID_KINDS = {"install", "update", "test"}
_VALID_TOP_LEVEL_KEYS = {
    "schema_version",
    "kind",
    "name",
    "description",
    "metadata",
    "source",
    "install",
    "update",
    "test",
}
_VALID_INSTALL_KEYS = {
    "addons",
    "with_demo",
    "without_demo",
    "language",
    "max_cron_threads",
    "compact",
    "log_level",
    "verify_state",
    "retry_missing",
    "one_by_one",
    "stop_on_error",
    "skip_installed",
}
_VALID_UPDATE_KEYS = {
    "addons",
    "without_demo",
    "language",
    "i18n_overwrite",
    "max_cron_threads",
    "compact",
    "log_level",
    "verify_state",
    "retry_missing",
    "one_by_one",
    "stop_on_error",
    "require_installed",
}
_VALID_TEST_KEYS = {
    "install",
    "update",
    "test_tags",
    "test_files",
    "coverage",
    "compact",
    "stop_on_error",
    "log_level",
    "verify_state",
    "retry_missing",
    "one_by_one",
    "retry_failed_tests",
}


def _empty_mapping() -> Mapping[str, Any]:
    return _DEFAULT_EMPTY_MAPPING


def _load_toml(path: Path) -> dict[str, Any]:
    if path.suffix not in {"", ".toml"}:
        raise ConfigError("Operation sets currently support TOML files only.")
    try:
        import tomllib  # type: ignore[import-not-found]
    except ModuleNotFoundError:
        import tomli as tomllib
    with path.open("rb") as handle:
        data = tomllib.load(handle)
    if not isinstance(data, dict):
        raise ConfigError(f"Invalid operation set format: {path}")
    return data


def _check_unknown_keys(
    data: Mapping[str, Any],
    valid_keys: set[str],
    *,
    section: str,
    reference: str | None = None,
) -> None:
    unknown = sorted(set(data.keys()) - valid_keys)
    if not unknown:
        return
    prefix = "Operation set"
    if reference:
        prefix += f" '{reference}'"
    prefix += f" has unknown key(s) in [{section}]"
    raise ConfigError(
        f"{prefix}: {', '.join(unknown)}. "
        f"Supported keys: {', '.join(sorted(valid_keys))}."
    )


def _split_names(raw_value: str) -> tuple[str, ...]:
    return tuple(item for item in _NAME_SEPARATOR_RE.split(raw_value.strip()) if item)


def _string_list(value: Any, *, key: str) -> tuple[str, ...]:
    if value is None:
        return ()
    if isinstance(value, str):
        return _split_names(value)
    if isinstance(value, list):
        result: list[str] = []
        for item in value:
            if not isinstance(item, str):
                raise ConfigError(f"`{key}` must be a string or list of strings.")
            if item.strip():
                result.append(item.strip())
        return tuple(result)
    raise ConfigError(f"`{key}` must be a string or list of strings.")


def _test_tags_list(value: Any, *, key: str) -> tuple[str, ...]:
    """Normalize test tags while preserving tag punctuation."""
    if value is None:
        return ()
    if isinstance(value, str):
        if "," in value:
            return tuple(item.strip() for item in value.split(",") if item.strip())
        return (value.strip(),) if value.strip() else ()
    if isinstance(value, list):
        result: list[str] = []
        for item in value:
            if not isinstance(item, str):
                raise ConfigError(f"`{key}` must be a string or list of strings.")
            if item.strip():
                result.append(item.strip())
        return tuple(result)
    raise ConfigError(f"`{key}` must be a string or list of strings.")


def _test_files_list(
    value: Any,
    *,
    key: str,
    set_file_dir: Path,
    allow_missing: bool = False,
) -> tuple[tuple[Path, ...], tuple[str, ...]]:
    raw = _string_list(value, key=key)
    if not raw:
        return (), ()

    resolved_paths: list[Path] = []
    original_inputs: list[str] = []
    for raw_path in raw:
        resolved = (set_file_dir / raw_path).resolve(strict=False)
        if not resolved.exists() and not allow_missing:
            raise ConfigError(
                f"Test file not found: {raw_path} (resolved to {resolved})"
            )
        resolved_paths.append(resolved)
        original_inputs.append(raw_path)
    return tuple(resolved_paths), tuple(original_inputs)


def _without_demo_value(value: Any) -> bool | str:
    if value in (None, False):
        return False
    if value is True:
        return True
    if isinstance(value, str):
        return value
    raise ConfigError("without_demo must be a boolean or string.")


def _optional_bool(value: Any) -> bool:
    if value is None:
        return False
    if isinstance(value, bool):
        return value
    raise ConfigError("Expected a boolean value.")


def _default_true_bool(value: Any) -> bool:
    if value is None:
        return True
    if isinstance(value, bool):
        return value
    raise ConfigError("Expected a boolean value.")


def _optional_str(value: Any) -> str | None:
    if value is None:
        return None
    if isinstance(value, str):
        return value
    raise ConfigError("Expected a string value.")


def _optional_int(value: Any) -> int | None:
    if value is None:
        return None
    if isinstance(value, int) and not isinstance(value, bool):
        return value
    raise ConfigError("Expected an integer value.")


def _optional_non_negative_int(value: Any, *, key: str) -> int:
    parsed = _optional_int(value)
    if parsed is None:
        return 0
    if parsed < 0:
        raise ConfigError(f"`{key}` must be a non-negative integer, got {parsed}.")
    return parsed


def _mapping_section(value: Any, *, key: str) -> Mapping[str, Any]:
    if value is None:
        return _empty_mapping()
    if not isinstance(value, dict):
        raise ConfigError(f"`{key}` must be a table/object.")
    return MappingProxyType(dict(value))


def _string_or_none(value: Any, *, key: str) -> str | None:
    if value is None:
        return None
    if isinstance(value, str):
        return value
    raise ConfigError(f"`{key}` must be a string.")


def _normalize_schema_version(value: Any, *, reference: str) -> int:
    if value is None:
        return _DEFAULT_SCHEMA_VERSION
    if isinstance(value, int) and not isinstance(value, bool) and value == 2:
        return value
    raise ConfigError(
        f"Operation set '{reference}' is invalid: schema_version must be 2."
    )


def _normalize_kind(value: Any, *, reference: str) -> OperationSetKind:
    if value is None:
        raise ConfigError(
            f"Operation set '{reference}' is invalid: "
            "missing required top-level key 'kind'."
        )
    if not isinstance(value, str) or value not in _VALID_KINDS:
        raise ConfigError(
            f"Operation set '{reference}' is invalid: "
            "kind must be one of install, update, test."
        )
    return cast(OperationSetKind, value)


def _validate_kind_sections(
    kind: OperationSetKind,
    *,
    reference: str,
    install_section: InstallSetSection | None,
    update_section: UpdateSetSection | None,
    test_section: TestSetSection | None,
) -> None:
    required_map = {
        "install": install_section is not None,
        "update": update_section is not None,
        "test": test_section is not None,
    }
    if not required_map[kind] or any(
        present
        for section, present in required_map.items()
        if section != kind and present
    ):
        forbidden = ", ".join(
            f"[{section}]"
            for section, present in required_map.items()
            if section != kind and present
        )
        suffix = f" and forbids {forbidden}" if forbidden else ""
        raise ConfigError(
            f"Operation set '{reference}' is invalid: "
            f"kind '{kind}' requires [{kind}]{suffix}."
        )


def _is_explicit_read_reference(value: str) -> bool:
    reference_path = Path(value)
    return reference_path.is_absolute() or "/" in value or "\\" in value


def _is_explicit_write_reference(value: str) -> bool:
    reference_path = Path(value)
    return (
        reference_path.is_absolute()
        or "/" in value
        or "\\" in value
        or reference_path.suffix != ""
    )


def _directory_reference_names(value: str) -> tuple[str, ...]:
    reference_name = Path(value).name
    if reference_name.endswith(".toml"):
        return (reference_name,)
    return (f"{reference_name}.toml", reference_name)


def _unique_paths(paths: Sequence[tuple[Path, str]]) -> list[tuple[Path, str]]:
    seen: set[Path] = set()
    unique: list[tuple[Path, str]] = []
    for path, source in paths:
        normalized = path.resolve(strict=False)
        if normalized in seen:
            continue
        seen.add(normalized)
        unique.append((normalized, source))
    return unique


def _active_config_sets_dir(context: OperationSetLocationContext) -> Path | None:
    if (
        context.active_config_source == "local"
        and context.active_config_path is not None
    ):
        return context.active_config_path.parent / ".oduit" / "sets"
    if (
        context.active_config_source in {"env", "demo"}
        and context.global_config_dir is not None
    ):
        return context.global_config_dir / "sets"
    return None


def _serialize_mapping(value: Mapping[str, Any]) -> dict[str, Any]:
    return {key: _serialize_value(item) for key, item in value.items()}


def _serialize_value(value: Any) -> Any:
    if isinstance(value, Mapping):
        return _serialize_mapping(value)
    if isinstance(value, Path):
        return str(value)
    if isinstance(value, tuple):
        return [_serialize_value(item) for item in value]
    if isinstance(value, list):
        return [_serialize_value(item) for item in value]
    return value


def _dump_toml(document: Mapping[str, Any]) -> str:
    try:
        import tomli_w
    except ModuleNotFoundError as exc:
        raise ConfigError("Operation set writing requires tomli_w.") from exc
    return tomli_w.dumps(_serialize_mapping(document))


def _operation_set_section_addons(
    operation_set: OperationSet,
) -> list[tuple[str, tuple[str, ...]]]:
    addon_lists: list[tuple[str, tuple[str, ...]]] = []
    if operation_set.install is not None:
        addon_lists.append(("install.addons", operation_set.install.addons))
    if operation_set.update is not None:
        addon_lists.append(("update.addons", operation_set.update.addons))
    if operation_set.test is not None:
        addon_lists.append(("test.install", operation_set.test.install))
        addon_lists.append(("test.update", operation_set.test.update))
        if operation_set.test.coverage:
            addon_lists.append(("test.coverage", (operation_set.test.coverage,)))
    return addon_lists


def _addon_roles(operation_set: OperationSet) -> dict[str, list[str]]:
    roles: dict[str, list[str]] = {}
    for label, addons in _operation_set_section_addons(operation_set):
        if addons:
            roles[label] = list(addons)
    return roles


def _operation_set_addon_count(operation_set: OperationSet) -> int:
    unique_addons: list[str] = []
    for _label, addons in _operation_set_section_addons(operation_set):
        for addon in addons:
            if addon not in unique_addons:
                unique_addons.append(addon)
    return len(unique_addons)


[docs] @dataclass(frozen=True) class InstallSetSection: addons: tuple[str, ...] = () with_demo: bool = False without_demo: bool | str = False language: str | None = None max_cron_threads: int | None = None compact: bool = False log_level: str | None = None verify_state: bool = True retry_missing: int = 0 one_by_one: bool = False stop_on_error: bool = True skip_installed: bool = True
[docs] @dataclass(frozen=True) class UpdateSetSection: addons: tuple[str, ...] = () without_demo: bool | str = False language: str | None = None i18n_overwrite: bool = False max_cron_threads: int | None = None compact: bool = False log_level: str | None = None verify_state: bool = True retry_missing: int = 0 one_by_one: bool = False stop_on_error: bool = True require_installed: bool = True
[docs] @dataclass(frozen=True) class TestSetSection: install: tuple[str, ...] = () update: tuple[str, ...] = () test_tags: tuple[str, ...] = () test_files: tuple[Path, ...] = () test_file_inputs: tuple[str, ...] = () coverage: str | None = None compact: bool = False stop_on_error: bool = False log_level: str | None = None verify_state: bool = True retry_missing: int = 0 one_by_one: bool = False retry_failed_tests: int = 0
[docs] @dataclass(frozen=True) class OperationSetLocationContext: cwd: Path active_config_path: Path | None = None active_config_source: str | None = None global_config_dir: Path | None = None
[docs] @dataclass(frozen=True) class OperationSetResolution: reference: str path: Path source: str attempted: tuple[Path, ...]
[docs] @dataclass(frozen=True) class OperationSetWriteResult: path: Path reference: str kind: OperationSetKind addon_count: int overwritten: bool
[docs] @dataclass(frozen=True) class OperationSet: path: Path requested_value: str kind: OperationSetKind schema_version: int = _DEFAULT_SCHEMA_VERSION name: str | None = None description: str | None = None metadata: Mapping[str, Any] = field(default_factory=_empty_mapping) source: Mapping[str, Any] = field(default_factory=_empty_mapping) install: InstallSetSection | None = None update: UpdateSetSection | None = None test: TestSetSection | None = None resolution_source: str | None = None
[docs] def build_operation_set_location_context( *, cwd: Path, config_path: str | None, config_source: str | None, config_dir: str | None, ) -> OperationSetLocationContext: return OperationSetLocationContext( cwd=cwd.resolve(strict=False), active_config_path=( Path(config_path).expanduser().resolve(strict=False) if config_path else None ), active_config_source=config_source, global_config_dir=( Path(config_dir).expanduser().resolve(strict=False) if config_dir else None ), )
[docs] def resolve_operation_set_path( value: str, *, context: OperationSetLocationContext | None = None, base_dir: Path | None = None, ) -> OperationSetResolution: """Resolve a user-provided set reference to an absolute path.""" if context is None: context = OperationSetLocationContext(cwd=(base_dir or Path.cwd()).resolve()) cwd = context.cwd direct_candidate = Path(value) if not direct_candidate.is_absolute(): direct_candidate = (cwd / direct_candidate).resolve(strict=False) attempted: list[Path] = [direct_candidate] if direct_candidate.is_file(): return OperationSetResolution( reference=value, path=direct_candidate.resolve(), source="direct", attempted=tuple(attempted), ) if _is_explicit_read_reference(value): raise ConfigError( f"Cannot resolve operation set '{value}'. Tried:\n " + "\n ".join(str(path) for path in attempted) ) candidate_dirs: list[tuple[Path, str]] = [] active_dir = _active_config_sets_dir(context) if active_dir is not None: candidate_dirs.append((active_dir, "active_config")) candidate_dirs.append((cwd / ".oduit" / "sets", "local_project")) if context.global_config_dir is not None: candidate_dirs.append((context.global_config_dir / "sets", "global_config")) for directory, source in _unique_paths(candidate_dirs): for filename in _directory_reference_names(value): candidate = (directory / filename).resolve(strict=False) attempted.append(candidate) if candidate.is_file(): return OperationSetResolution( reference=value, path=candidate.resolve(), source=source, attempted=tuple(attempted), ) raise ConfigError( f"Cannot resolve operation set '{value}'. Tried:\n " + "\n ".join(str(path) for path in attempted) )
[docs] def resolve_operation_set_write_path( value: str, *, context: OperationSetLocationContext | None = None, base_dir: Path | None = None, overwrite: bool = False, ) -> OperationSetResolution: """Resolve the target path for writing an operation set.""" if context is None: context = OperationSetLocationContext(cwd=(base_dir or Path.cwd()).resolve()) cwd = context.cwd attempted: list[Path] = [] if _is_explicit_write_reference(value): path = Path(value) if not path.is_absolute(): path = (cwd / path).resolve(strict=False) attempted.append(path) if path.suffix not in {"", ".toml"}: raise ConfigError("Operation sets currently support TOML files only.") if path.exists() and not overwrite: raise ConfigError( f"Refusing to overwrite existing operation set: " f"{path}. Use --overwrite." ) return OperationSetResolution( reference=value, path=path, source="explicit", attempted=tuple(attempted), ) active_dir = _active_config_sets_dir(context) if active_dir is not None: target_dir = active_dir source = "active_config" elif context.global_config_dir is not None: target_dir = context.global_config_dir / "sets" source = "global_config" else: target_dir = cwd / ".oduit" / "sets" source = "local_project" path = (target_dir / f"{value}.toml").resolve(strict=False) attempted.append(path) if path.exists() and not overwrite: raise ConfigError( f"Refusing to overwrite existing operation set: {path}. Use --overwrite." ) return OperationSetResolution( reference=value, path=path, source=source, attempted=tuple(attempted), )
def _parse_install_section( data: dict[str, Any], *, reference: str ) -> InstallSetSection: _check_unknown_keys( data, _VALID_INSTALL_KEYS, section="install", reference=reference ) with_demo = _optional_bool(data.get("with_demo")) without_demo = _without_demo_value(data.get("without_demo")) if with_demo and without_demo: raise ConfigError( "with_demo and without_demo must not both be active " "in the same install section." ) return InstallSetSection( addons=_string_list(data.get("addons"), key="addons"), with_demo=with_demo, without_demo=without_demo, language=_optional_str(data.get("language")), max_cron_threads=_optional_int(data.get("max_cron_threads")), compact=_optional_bool(data.get("compact")), log_level=_optional_str(data.get("log_level")), verify_state=_default_true_bool(data.get("verify_state")), retry_missing=_optional_non_negative_int( data.get("retry_missing"), key="retry_missing" ), one_by_one=_optional_bool(data.get("one_by_one")), stop_on_error=_default_true_bool(data.get("stop_on_error")), skip_installed=_default_true_bool(data.get("skip_installed")), ) def _parse_update_section(data: dict[str, Any], *, reference: str) -> UpdateSetSection: _check_unknown_keys(data, _VALID_UPDATE_KEYS, section="update", reference=reference) return UpdateSetSection( addons=_string_list(data.get("addons"), key="addons"), without_demo=_without_demo_value(data.get("without_demo")), language=_optional_str(data.get("language")), i18n_overwrite=_optional_bool(data.get("i18n_overwrite")), max_cron_threads=_optional_int(data.get("max_cron_threads")), compact=_optional_bool(data.get("compact")), log_level=_optional_str(data.get("log_level")), verify_state=_default_true_bool(data.get("verify_state")), retry_missing=_optional_non_negative_int( data.get("retry_missing"), key="retry_missing" ), one_by_one=_optional_bool(data.get("one_by_one")), stop_on_error=_default_true_bool(data.get("stop_on_error")), require_installed=_default_true_bool(data.get("require_installed")), ) def _parse_test_section( data: dict[str, Any], *, reference: str, set_file_dir: Path, allow_missing_test_files: bool = False, ) -> TestSetSection: _check_unknown_keys(data, _VALID_TEST_KEYS, section="test", reference=reference) test_files, test_file_inputs = _test_files_list( data.get("test_files"), key="test_files", set_file_dir=set_file_dir, allow_missing=allow_missing_test_files, ) return TestSetSection( install=_string_list(data.get("install"), key="install"), update=_string_list(data.get("update"), key="update"), test_tags=_test_tags_list(data.get("test_tags"), key="test_tags"), test_files=test_files, test_file_inputs=test_file_inputs, coverage=_optional_str(data.get("coverage")), compact=_optional_bool(data.get("compact")), stop_on_error=_optional_bool(data.get("stop_on_error")), log_level=_optional_str(data.get("log_level")), verify_state=_default_true_bool(data.get("verify_state")), retry_missing=_optional_non_negative_int( data.get("retry_missing"), key="retry_missing" ), one_by_one=_optional_bool(data.get("one_by_one")), retry_failed_tests=_optional_non_negative_int( data.get("retry_failed_tests"), key="retry_failed_tests" ), )
[docs] def load_operation_set( value: str, *, context: OperationSetLocationContext | None = None, base_dir: Path | None = None, allow_missing_test_files: bool = False, ) -> OperationSet: """Load and validate an operation set from a user-provided reference.""" resolution = resolve_operation_set_path(value, context=context, base_dir=base_dir) data = _load_toml(resolution.path) _check_unknown_keys( data, _VALID_TOP_LEVEL_KEYS, section="top level", reference=value ) schema_version = _normalize_schema_version( data.get("schema_version"), reference=value ) kind = _normalize_kind(data.get("kind"), reference=value) install_section = ( _parse_install_section(data["install"], reference=value) if "install" in data else None ) update_section = ( _parse_update_section(data["update"], reference=value) if "update" in data else None ) test_section = ( _parse_test_section( data["test"], reference=value, set_file_dir=resolution.path.parent, allow_missing_test_files=allow_missing_test_files, ) if "test" in data else None ) _validate_kind_sections( kind, reference=value, install_section=install_section, update_section=update_section, test_section=test_section, ) return OperationSet( path=resolution.path, requested_value=value, kind=kind, schema_version=schema_version, name=_string_or_none(data.get("name"), key="name"), description=_string_or_none(data.get("description"), key="description"), metadata=_mapping_section(data.get("metadata"), key="metadata"), source=_mapping_section(data.get("source"), key="source"), install=install_section, update=update_section, test=test_section, resolution_source=resolution.source, )
[docs] def find_missing_operation_set_addons( operation_set: OperationSet, *, addons_path: str, module_manager_cls: type | None = None, ) -> list[str]: if module_manager_cls is None: from .module_manager import ModuleManager module_manager_cls = ModuleManager manager = module_manager_cls(addons_path) missing: list[str] = [] for _label, addons in _operation_set_section_addons(operation_set): for addon in addons: if not manager.find_module_path(addon) and addon not in missing: missing.append(addon) return missing
[docs] def validate_operation_set_addons( operation_set: OperationSet, *, addons_path: str, module_manager_cls: type | None = None, ) -> None: """Validate that all addon names in the set exist in the addons path.""" missing = find_missing_operation_set_addons( operation_set, addons_path=addons_path, module_manager_cls=module_manager_cls, ) if missing: raise ConfigError( "Operation set contains unknown addon(s): " f"{', '.join(missing)}. Check addons_path or the set file." )
[docs] def inspect_operation_set( operation_set: OperationSet, *, addons_path: str | None = None, module_manager_cls: type | None = None, ) -> dict[str, Any]: missing_addons: list[str] = [] validation_status = "not_checked" if addons_path: missing_addons = find_missing_operation_set_addons( operation_set, addons_path=addons_path, module_manager_cls=module_manager_cls, ) validation_status = "ok" if not missing_addons else "missing_addons" test_files = [] if operation_set.test is not None: for index, resolved_path in enumerate(operation_set.test.test_files): input_value = ( operation_set.test.test_file_inputs[index] if index < len(operation_set.test.test_file_inputs) else str(resolved_path) ) test_files.append( { "input": input_value, "path": str(resolved_path), "exists": resolved_path.exists(), } ) return { "reference": operation_set.requested_value, "path": str(operation_set.path), "resolution_source": operation_set.resolution_source, "schema_version": operation_set.schema_version, "kind": operation_set.kind, "name": operation_set.name, "description": operation_set.description, "addon_count": _operation_set_addon_count(operation_set), "addons_by_role": _addon_roles(operation_set), "missing_addons": missing_addons, "addons_path_status": validation_status, "test_files": test_files, "metadata": _serialize_mapping(operation_set.metadata), "source": _serialize_mapping(operation_set.source), }
[docs] def build_operation_set_document( *, kind: OperationSetKind, addons: Sequence[str], name: str | None = None, description: str | None = None, metadata: Mapping[str, Any] | None = None, source: Mapping[str, Any] | None = None, options: Mapping[str, Any] | None = None, ) -> dict[str, Any]: document: dict[str, Any] = { "schema_version": _DEFAULT_SCHEMA_VERSION, "kind": kind, } if name: document["name"] = name if description: document["description"] = description if metadata: document["metadata"] = _serialize_mapping(metadata) if source: document["source"] = _serialize_mapping(source) addon_list = [addon for addon in addons if addon] section: dict[str, Any] = {} if kind == "install": section["addons"] = addon_list elif kind == "update": section["addons"] = addon_list else: section["install"] = addon_list section["test_tags"] = [f"/{addon}" for addon in addon_list] if options: section.update(_serialize_mapping(options)) document[kind] = section return document
[docs] def write_operation_set( reference: str, *, kind: OperationSetKind, addons: Sequence[str], context: OperationSetLocationContext | None = None, base_dir: Path | None = None, overwrite: bool = False, name: str | None = None, description: str | None = None, metadata: Mapping[str, Any] | None = None, source: Mapping[str, Any] | None = None, options: Mapping[str, Any] | None = None, ) -> OperationSetWriteResult: resolution = resolve_operation_set_write_path( reference, context=context, base_dir=base_dir, overwrite=overwrite, ) document = build_operation_set_document( kind=kind, addons=addons, name=name, description=description, metadata=metadata, source=source, options=options, ) resolution.path.parent.mkdir(parents=True, exist_ok=True) overwritten = resolution.path.exists() resolution.path.write_text(_dump_toml(document), encoding="utf-8") return OperationSetWriteResult( path=resolution.path, reference=reference, kind=kind, addon_count=len([addon for addon in addons if addon]), overwritten=overwritten, )
[docs] def list_operation_sets( *, context: OperationSetLocationContext, ) -> list[OperationSetResolution]: candidate_dirs: list[tuple[Path, str]] = [] active_dir = _active_config_sets_dir(context) if active_dir is not None: candidate_dirs.append((active_dir, "active_config")) candidate_dirs.append((context.cwd / ".oduit" / "sets", "local_project")) if context.global_config_dir is not None: candidate_dirs.append((context.global_config_dir / "sets", "global_config")) results: list[OperationSetResolution] = [] seen_paths: set[Path] = set() for directory, source in _unique_paths(candidate_dirs): if not directory.is_dir(): continue for candidate in sorted(directory.iterdir()): if not candidate.is_file() or candidate.suffix not in {"", ".toml"}: continue resolved = candidate.resolve() if resolved in seen_paths: continue seen_paths.add(resolved) results.append( OperationSetResolution( reference=candidate.stem if candidate.suffix == ".toml" else candidate.name, path=resolved, source=source, attempted=(resolved,), ) ) return results
[docs] def build_operation_set_result( operation_set: OperationSet, *, results: list[dict[str, Any]], failures: list[dict[str, Any]], started_at: float, ) -> dict[str, Any]: """Build the aggregate result for an operation set execution.""" any_failure = len(failures) > 0 or any( not result.get("success", False) for result in results ) duration = time.time() - started_at kind = operation_set.kind # Compute enriched summary fields from results total_skipped = sum(len(r.get("skipped_addons", [])) for r in results) total_retried = sum(max(0, len(r.get("attempts", [])) - 1) for r in results) total_missing = sum(len(r.get("missing_addons", [])) for r in results) execution_modes = {r.get("execution_mode", "batch") for r in results} if len(execution_modes) > 1: execution_mode = "mixed" elif execution_modes: execution_mode = execution_modes.pop() else: execution_mode = "batch" return { "success": not any_failure, "operation": f"operation_set_{kind}", "set_name": operation_set.name, "set_description": operation_set.description, "set_path": str(operation_set.path), "set_reference": operation_set.requested_value, "kind": kind, "executed_operations": results, "failures": failures, "summary": { "kind": kind, "addon_count": _operation_set_addon_count(operation_set), "executed": len(results), "failed": len(failures), "skipped": total_skipped, "retried": total_retried, "missing": total_missing, "verification_enabled": any( r.get("verification", {}).get("enabled", False) or r.get("execution_mode") == "one_by_one" for r in results ), "execution_mode": execution_mode, }, "duration": round(duration, 2), }
[docs] def sanitize_operation_result( result: dict[str, Any], *, include_command: bool, include_stdout: bool, ) -> dict[str, Any]: """Remove sensitive or verbose fields from a per-operation result.""" blocked: set[str] = {"stderr"} if not include_command: blocked.add("command") if not include_stdout: blocked.add("stdout") return {key: value for key, value in result.items() if key not in blocked}