# Copyright (C) 2025 The ODUIT Authors.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at https://mozilla.org/MPL/2.0/.
import os
import sys
from configparser import ConfigParser, SectionProxy
from dataclasses import dataclass
from typing import Any
import yaml
from .config_provider import ConfigProvider
from .mutation_policy import raise_if_legacy_db_risk_level
CANONICAL_CONFIG_SHAPE = "sectioned"
CANONICAL_CONFIG_SHAPE_VERSION = "1.0"
_BINARY_CONFIG_KEYS = frozenset({"python_bin", "odoo_bin", "coverage_bin"})
_HANDLED_ODOO_CONF_OPTION_KEYS = frozenset(
{
"addons_path",
"admin_passwd",
"csv_internal_sep",
"data_dir",
"db_host",
"db_maxconn",
"db_name",
"db_password",
"db_port",
"db_user",
"email_from",
"from_filter",
"gevent_port",
"http_port",
"limit_memory_hard",
"limit_memory_soft",
"limit_request",
"limit_time_cpu",
"limit_time_real",
"limit_time_real_cron",
"list_db",
"log_level",
"logfile",
"max_cron_threads",
"osv_memory_age_limit",
"osv_memory_count_limit",
"pg_path",
"pidfile",
"proxy_mode",
"reportgz",
"server_wide_modules",
"smtp_password",
"smtp_port",
"smtp_server",
"smtp_ssl",
"smtp_user",
"syslog",
"workers",
"xmlrpc_interface",
}
)
[docs]
@dataclass(frozen=True)
class LoadedConfigDetails:
"""Normalized configuration plus shape metadata."""
config: dict[str, Any]
canonical_config: dict[str, Any]
raw_shape: str
normalized_shape: str
shape_version: str
format_type: str
config_path: str | None
deprecation_warnings: tuple[str, ...]
[docs]
@dataclass(frozen=True)
class ImportedOdooConfDetails:
"""Imported Odoo conf plus metadata useful for migration UX."""
config: dict[str, Any]
handled_option_keys: tuple[str, ...]
unknown_option_keys: tuple[str, ...]
odoo_bin_candidates: tuple[str, ...]
[docs]
class ConfigLoader:
"""Handles loading and managing Odoo environment configurations."""
[docs]
def __init__(self, config_dir: str | None = None):
"""Initialize ConfigLoader with optional custom config directory."""
self.config_dir = config_dir or os.path.expanduser("~/.config/oduit")
def _import_toml_libs(self) -> tuple[Any, Any]:
"""Import TOML libraries with fallback handling."""
tomllib = None
tomli_w = None
try:
if sys.version_info >= (3, 11):
tomllib = __import__("tomllib")
else:
tomllib = __import__("tomli")
except ImportError:
pass
try:
tomli_w = __import__("tomli_w")
except ImportError:
pass
return tomllib, tomli_w
def _normalize_sectioned_config(self, raw_config: dict[str, Any]) -> dict[str, Any]:
"""Convert sectioned config format to flat format for backward compatibility.
Supports both:
1. Legacy flat format (direct keys at root level)
2. New sectioned format with 'binaries' and 'odoo_params' sections
Args:
raw_config: Raw configuration dictionary from YAML/TOML
Returns:
Normalized flat configuration dictionary
"""
# Check if config uses new sectioned format
if "binaries" in raw_config or "odoo_params" in raw_config:
# New sectioned format
normalized = {}
# Add binaries section
if "binaries" in raw_config and isinstance(raw_config["binaries"], dict):
normalized.update(raw_config["binaries"])
# Add odoo_params section
if "odoo_params" in raw_config and isinstance(
raw_config["odoo_params"], dict
):
normalized.update(raw_config["odoo_params"])
# Add any other top-level keys (for compatibility)
for key, value in raw_config.items():
if key not in ("binaries", "odoo_params"):
normalized[key] = value
return normalized
else:
# Legacy flat format - return as-is
return raw_config
def _normalize_addons_path(self, config: dict[str, Any]) -> dict[str, Any]:
"""Normalize ``addons_path`` into the canonical comma-separated string."""
normalized = dict(config)
if isinstance(normalized.get("addons_path"), list):
normalized["addons_path"] = ",".join(normalized["addons_path"])
return normalized
def _detect_raw_config_shape(self, raw_config: dict[str, Any]) -> str:
"""Classify the user-facing config shape before normalization."""
has_sections = "binaries" in raw_config or "odoo_params" in raw_config
if not has_sections:
return "legacy_flat"
compatibility_keys = [
key
for key in raw_config
if key not in {"binaries", "odoo_params"} and key in _BINARY_CONFIG_KEYS
]
compatibility_keys.extend(
key
for key in raw_config
if key not in {"binaries", "odoo_params"} and key not in _BINARY_CONFIG_KEYS
)
return "mixed_compat" if compatibility_keys else "sectioned"
def _build_loaded_config_details(
self,
*,
raw_config: dict[str, Any],
format_type: str,
config_path: str | None,
raw_shape: str | None = None,
) -> LoadedConfigDetails:
"""Build normalized config details from a raw config dictionary."""
normalized_config = self._normalize_addons_path(
self._normalize_sectioned_config(raw_config)
)
raise_if_legacy_db_risk_level(normalized_config)
canonical_config = ConfigProvider(normalized_config).to_sectioned_dict()
detected_shape = raw_shape or self._detect_raw_config_shape(raw_config)
deprecation_warnings: list[str] = []
if detected_shape in {"legacy_flat", "mixed_compat"}:
deprecation_warnings.append(
"Legacy flat config keys are deprecated; prefer sectioned TOML "
"with [binaries] and [odoo_params]."
)
return LoadedConfigDetails(
config=normalized_config,
canonical_config=canonical_config,
raw_shape=detected_shape,
normalized_shape=CANONICAL_CONFIG_SHAPE,
shape_version=CANONICAL_CONFIG_SHAPE_VERSION,
format_type=format_type,
config_path=config_path,
deprecation_warnings=tuple(deprecation_warnings),
)
def _load_raw_config_from_path(
self, config_path: str, format_type: str
) -> tuple[dict[str, Any], str]:
"""Load the raw config dictionary and its source shape metadata."""
if format_type == "toml":
tomllib, _ = self._import_toml_libs()
if tomllib is None:
raise ImportError(
"TOML support not available. "
"Install with: pip install tomli tomli-w"
)
with open(config_path, "rb") as f:
raw_config = tomllib.load(f)
if not isinstance(raw_config, dict):
raise ValueError(f"Invalid config format in: {config_path}")
raw_shape = self._detect_raw_config_shape(raw_config)
elif format_type == "conf":
raw_config = self.import_odoo_conf(config_path, sectioned=True)
raw_shape = "odoo_conf_import"
else:
with open(config_path) as f:
raw_config = yaml.safe_load(f)
if not isinstance(raw_config, dict):
raise ValueError(f"Invalid config format in: {config_path}")
raw_shape = self._detect_raw_config_shape(raw_config)
return raw_config, raw_shape
def _get_boolean_value(self, options: dict | SectionProxy, key: str) -> bool:
"""Helper to get boolean value from both dict and SectionProxy."""
if isinstance(options, SectionProxy):
# SectionProxy
result = options.getboolean(key)
return result if result is not None else False
else:
# Regular dict - convert manually
value = options[key]
if isinstance(value, bool):
return value
elif isinstance(value, str):
return value.lower() in ("true", "1", "yes", "on")
else:
return bool(value)
[docs]
def get_config_path(self, env_name: str, format_type: str = "yaml") -> str:
"""Returns full path to environment-specific config."""
# If env_name is already an absolute path to a config file, use it directly
if os.path.isabs(env_name) and os.path.isfile(env_name):
return env_name
# If env_name is a relative path to an existing file, use it directly
if os.path.isfile(env_name):
return os.path.abspath(env_name)
# Default behavior: use config_dir
if format_type == "toml":
return os.path.join(self.config_dir, f"{env_name}.toml")
elif format_type == "conf":
return os.path.join(self.config_dir, f"{env_name}.conf")
return os.path.join(self.config_dir, f"{env_name}.yaml")
def _detect_config_format(self, env_name: str) -> tuple[str, str]:
"""Detect config format and return (path, format_type)."""
# If env_name is already a path to an existing file, detect format
# from extension
if os.path.isfile(env_name):
abs_path = os.path.abspath(env_name)
if env_name.endswith(".toml"):
return abs_path, "toml"
elif env_name.endswith((".yaml", ".yml")):
return abs_path, "yaml"
elif env_name.endswith(".conf"):
return abs_path, "conf"
else:
# Default to YAML for files without clear extension
return abs_path, "yaml"
# Default behavior: check in config_dir
toml_path = self.get_config_path(env_name, "toml")
yaml_path = self.get_config_path(env_name, "yaml")
conf_path = self.get_config_path(env_name, "conf")
if os.path.exists(toml_path):
return toml_path, "toml"
elif os.path.exists(yaml_path):
return yaml_path, "yaml"
elif os.path.exists(conf_path):
return conf_path, "conf"
else:
# Default to YAML for backward compatibility
return yaml_path, "yaml"
[docs]
def has_local_config(self) -> bool:
"""Check if a local .oduit.toml file exists in current directory."""
return os.path.exists(".oduit.toml")
[docs]
def get_local_config_path(self) -> str:
"""Return the absolute path to the local .oduit.toml file."""
return os.path.abspath(".oduit.toml")
[docs]
def resolve_config_path(self, env_name: str) -> tuple[str, str]:
"""Resolve the config path and format for an environment name."""
return self._detect_config_format(env_name)
[docs]
def load_local_config(self) -> dict[str, Any]:
"""Load config from .oduit.toml in current directory."""
return self.load_local_config_details().config
[docs]
def load_local_config_details(self) -> LoadedConfigDetails:
"""Load local config plus canonical normalized metadata."""
config_path = ".oduit.toml"
if not os.path.exists(config_path):
raise FileNotFoundError(
f"Local configuration file not found: {config_path}"
)
raw_config, raw_shape = self._load_raw_config_from_path(config_path, "toml")
return self._build_loaded_config_details(
raw_config=raw_config,
format_type="toml",
config_path=os.path.abspath(config_path),
raw_shape=raw_shape,
)
def _parse_database_config(
self, options: dict | SectionProxy, oduit_config: dict[str, Any]
) -> None:
"""Parse database-related configuration from Odoo conf."""
if "db_name" in options and options["db_name"] != "False":
oduit_config["db_name"] = options["db_name"]
if "db_user" in options:
oduit_config["db_user"] = options["db_user"]
if "db_password" in options:
oduit_config["db_password"] = options["db_password"]
if "db_host" in options:
oduit_config["db_host"] = options["db_host"]
if "db_port" in options:
oduit_config["db_port"] = int(options["db_port"])
if "db_maxconn" in options:
oduit_config["db_maxconn"] = int(options["db_maxconn"])
def _parse_server_config(
self, options: dict | SectionProxy, oduit_config: dict[str, Any]
) -> None:
"""Parse server-related configuration from Odoo conf."""
if "http_port" in options:
oduit_config["http_port"] = int(options["http_port"])
if "gevent_port" in options:
oduit_config["gevent_port"] = int(options["gevent_port"])
if "workers" in options:
oduit_config["workers"] = int(options["workers"])
if "max_cron_threads" in options:
oduit_config["max_cron_threads"] = int(options["max_cron_threads"])
if "proxy_mode" in options:
oduit_config["proxy_mode"] = self._get_boolean_value(options, "proxy_mode")
if "xmlrpc_interface" in options:
oduit_config["xmlrpc_interface"] = options["xmlrpc_interface"]
if "pidfile" in options and options["pidfile"]:
oduit_config["pidfile"] = options["pidfile"]
def _parse_limits_config(
self, options: dict | SectionProxy, oduit_config: dict[str, Any]
) -> None:
"""Parse performance limits configuration from Odoo conf."""
if "limit_memory_hard" in options:
oduit_config["limit_memory_hard"] = int(options["limit_memory_hard"])
if "limit_memory_soft" in options:
oduit_config["limit_memory_soft"] = int(options["limit_memory_soft"])
if "limit_time_cpu" in options:
oduit_config["limit_time_cpu"] = int(options["limit_time_cpu"])
if "limit_time_real" in options:
oduit_config["limit_time_real"] = int(options["limit_time_real"])
if (
"limit_time_real_cron" in options
and options["limit_time_real_cron"] != "-1"
):
oduit_config["limit_time_real_cron"] = int(options["limit_time_real_cron"])
if "limit_request" in options:
oduit_config["limit_request"] = int(options["limit_request"])
def _parse_misc_config(
self, options: dict | SectionProxy, oduit_config: dict[str, Any]
) -> None:
"""Parse miscellaneous configuration from Odoo conf."""
self._parse_paths_config(options, oduit_config)
self._parse_logging_config(options, oduit_config)
self._parse_security_config(options, oduit_config)
self._parse_memory_config(options, oduit_config)
self._parse_email_config(options, oduit_config)
def _parse_paths_config(
self, options: dict | SectionProxy, oduit_config: dict[str, Any]
) -> None:
"""Parse paths configuration from Odoo conf."""
if "addons_path" in options:
oduit_config["addons_path"] = options["addons_path"]
if "data_dir" in options:
oduit_config["data_dir"] = options["data_dir"]
if "pg_path" in options and options["pg_path"]:
oduit_config["pg_path"] = options["pg_path"]
def _parse_logging_config(
self, options: dict | SectionProxy, oduit_config: dict[str, Any]
) -> None:
"""Parse logging configuration from Odoo conf."""
if "logfile" in options:
oduit_config["logfile"] = options["logfile"]
if "log_level" in options:
oduit_config["log_level"] = options["log_level"]
if "syslog" in options:
oduit_config["syslog"] = self._get_boolean_value(options, "syslog")
def _parse_security_config(
self, options: dict | SectionProxy, oduit_config: dict[str, Any]
) -> None:
"""Parse security configuration from Odoo conf."""
if "admin_passwd" in options:
oduit_config["admin_passwd"] = options["admin_passwd"]
if "list_db" in options:
oduit_config["list_db"] = self._get_boolean_value(options, "list_db")
if "server_wide_modules" in options:
oduit_config["server_wide_modules"] = options["server_wide_modules"]
def _parse_memory_config(
self, options: dict | SectionProxy, oduit_config: dict[str, Any]
) -> None:
"""Parse memory and performance configuration from Odoo conf."""
if (
"osv_memory_count_limit" in options
and options["osv_memory_count_limit"] != "False"
):
oduit_config["osv_memory_count_limit"] = int(
options["osv_memory_count_limit"]
)
if (
"osv_memory_age_limit" in options
and options["osv_memory_age_limit"] != "False"
):
oduit_config["osv_memory_age_limit"] = int(options["osv_memory_age_limit"])
if "csv_internal_sep" in options:
oduit_config["csv_internal_sep"] = options["csv_internal_sep"]
if "reportgz" in options:
oduit_config["reportgz"] = self._get_boolean_value(options, "reportgz")
def _parse_email_config(
self, options: dict | SectionProxy, oduit_config: dict[str, Any]
) -> None:
"""Parse email configuration from Odoo conf."""
if "email_from" in options and options["email_from"] != "False":
oduit_config["email_from"] = options["email_from"]
if "from_filter" in options and options["from_filter"] != "False":
oduit_config["from_filter"] = options["from_filter"]
if "smtp_server" in options:
oduit_config["smtp_server"] = options["smtp_server"]
if "smtp_port" in options:
oduit_config["smtp_port"] = int(options["smtp_port"])
if "smtp_ssl" in options:
oduit_config["smtp_ssl"] = self._get_boolean_value(options, "smtp_ssl")
if "smtp_user" in options and options["smtp_user"] != "False":
oduit_config["smtp_user"] = options["smtp_user"]
if "smtp_password" in options and options["smtp_password"] != "False":
oduit_config["smtp_password"] = options["smtp_password"]
[docs]
def inspect_odoo_conf_import(
self, conf_path: str, sectioned: bool = False
) -> ImportedOdooConfDetails:
"""Import Odoo configuration and return migration-friendly metadata."""
if not os.path.exists(conf_path):
raise FileNotFoundError(f"Odoo configuration file not found: {conf_path}")
config = ConfigParser()
config.read(conf_path)
if "options" not in config:
raise ValueError(
f"Invalid Odoo config format - missing [options] section: {conf_path}"
)
options = config["options"]
oduit_config = {}
# Set required oduit defaults that don't exist in Odoo conf files
# Use python3 (more likely to work with virtual envs) instead of python
oduit_config["python_bin"] = "python3"
oduit_config["coverage_bin"] = "coverage"
oduit_config["odoo_bin"] = "odoo" # Default fallback
# Parse config sections first to get addons_path
self._parse_database_config(options, oduit_config)
self._parse_server_config(options, oduit_config)
self._parse_limits_config(options, oduit_config)
self._parse_misc_config(options, oduit_config)
# Try to find odoo-bin based on addons_path
odoo_bin_candidates: tuple[str, ...] = ()
if "addons_path" in oduit_config:
candidates = self._find_odoo_bin_candidates_from_addons_path(
oduit_config["addons_path"]
)
odoo_bin_candidates = tuple(candidates)
if candidates:
oduit_config["odoo_bin"] = candidates[0]
# Add config_file reference to original conf
oduit_config["config_file"] = conf_path
handled_option_keys = tuple(
sorted(set(options.keys()) & _HANDLED_ODOO_CONF_OPTION_KEYS)
)
unknown_option_keys = tuple(
sorted(set(options.keys()) - _HANDLED_ODOO_CONF_OPTION_KEYS)
)
if sectioned:
# Convert to sectioned format using ConfigProvider
provider = ConfigProvider(oduit_config)
oduit_config = provider.to_sectioned_dict()
return ImportedOdooConfDetails(
config=oduit_config,
handled_option_keys=handled_option_keys,
unknown_option_keys=unknown_option_keys,
odoo_bin_candidates=odoo_bin_candidates,
)
[docs]
def import_odoo_conf(
self, conf_path: str, sectioned: bool = False
) -> dict[str, Any]:
"""Import Odoo configuration from .conf file and convert to oduit format.
Args:
conf_path: Path to the Odoo .conf file
sectioned: If True, return configuration in sectioned format
Returns:
Dictionary with oduit-compatible configuration
Raises:
FileNotFoundError: If conf file doesn't exist
ValueError: If conf file format is invalid
"""
return self.inspect_odoo_conf_import(conf_path, sectioned=sectioned).config
def _find_odoo_bin_candidates_from_addons_path(self, addons_path: str) -> list[str]:
"""Find executable odoo-bin candidates relative to addon paths.
Args:
addons_path: Comma-separated list of addon paths
Returns:
Candidate paths in discovery order
"""
if not addons_path:
return []
# Split addons_path and check each directory
addon_dirs = [path.strip() for path in addons_path.split(",")]
candidates: list[str] = []
seen: set[str] = set()
for addon_dir in addon_dirs:
if not addon_dir:
continue
# Convert to absolute path for consistency
addon_dir = os.path.abspath(addon_dir)
parent_dir = os.path.dirname(addon_dir)
for base_dir in (addon_dir, parent_dir):
for relative_path in (
"odoo-bin",
os.path.join("odoo", "odoo-bin"),
os.path.join("..", "odoo-bin"),
os.path.join("..", "odoo", "odoo-bin"),
):
potential_odoo_bin = os.path.normpath(
os.path.join(base_dir, relative_path)
)
if potential_odoo_bin in seen:
continue
if os.path.exists(potential_odoo_bin) and os.access(
potential_odoo_bin, os.X_OK
):
seen.add(potential_odoo_bin)
candidates.append(potential_odoo_bin)
return candidates
def _find_odoo_bin_from_addons_path(self, addons_path: str) -> str | None:
"""Return the first discovered odoo-bin candidate, if any."""
candidates = self._find_odoo_bin_candidates_from_addons_path(addons_path)
if candidates:
return candidates[0]
return None
[docs]
def load_config(self, env_name: str) -> dict[str, Any]:
"""Load config from ~/.config/oduit/<env>.(yaml|toml|conf) or from a
direct file path"""
return self.load_config_details(env_name).config
[docs]
def load_config_details(self, env_name: str) -> LoadedConfigDetails:
"""Load config plus canonical normalized metadata."""
config_path, format_type = self._detect_config_format(env_name)
if not os.path.exists(config_path):
if os.path.isfile(env_name) or os.path.isabs(env_name):
raise FileNotFoundError(f"Configuration file not found: {env_name}")
else:
raise FileNotFoundError(f"Configuration file not found: {config_path}")
raw_config, raw_shape = self._load_raw_config_from_path(
config_path, format_type
)
return self._build_loaded_config_details(
raw_config=raw_config,
format_type=format_type,
config_path=config_path,
raw_shape=raw_shape,
)
[docs]
def get_available_environments(self) -> list[str]:
"""Return a list of available environment names based on config files."""
if not os.path.exists(self.config_dir):
return []
env_files = [
f
for f in os.listdir(self.config_dir)
if f.endswith((".yaml", ".toml", ".conf"))
]
environments = [os.path.splitext(f)[0] for f in env_files]
return sorted(list(set(environments)))
[docs]
def load_demo_config(self, sectioned: bool = False) -> dict[str, Any]:
"""Load a demo configuration for testing without a real Odoo server
Args:
sectioned: If True, return configuration in sectioned format
Returns:
Dictionary with demo configuration including demo_mode=True flag
"""
config = {
"python_bin": "/usr/bin/python3",
"odoo_bin": "/demo/odoo-bin",
"config_file": "/demo/odoo.conf",
"addons_path": "/demo/addons,/demo/enterprise",
"coverage_bin": "/usr/bin/coverage", # Add missing coverage_bin
"db_name": "demo_db",
"db_host": "localhost",
"db_port": 5432,
"db_user": "odoo",
"db_password": "demo",
"without_demo": False,
"log_level": "warn", # Optional log level for testing
"demo_mode": True, # Key flag to enable demo behavior
"available_modules": [
"module_ok", # Always succeeds
"module_error", # Always fails
"module_warning", # Succeeds with warnings
"module_slow", # Takes longer to process
"test_module_pass", # All tests pass
"test_module_one_fail", # One test fails
"test_module_multi_fail", # Multiple tests fail
"sale",
"purchase",
"stock",
"account", # Standard modules
],
}
if sectioned:
# Convert to sectioned format using ConfigProvider
provider = ConfigProvider(config)
return provider.to_sectioned_dict()
return config