# Copyright (C) 2025 The ODUIT Authors.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at https://mozilla.org/MPL/2.0/.
from collections.abc import Collection, Mapping
from graphlib import TopologicalSorter
from pathlib import Path
from typing import Any
from manifestoo_core.core_addons import is_core_ce_addon, is_core_ee_addon
from manifestoo_core.odoo_series import OdooSeries, detect_from_addon_version
from .addons_path_manager import AddonsPathManager
from .cli_types import SortingChoice
from .manifest import (
InvalidManifestError,
Manifest,
ManifestError,
ManifestNotFoundError,
)
from .manifest_collection import ManifestCollection
[docs]
class ModuleManager:
"""Manages Odoo module operations and dependency resolution."""
[docs]
def __init__(self, addons_path: str):
"""Initialize ModuleManager.
Args:
addons_path: Comma-separated string of addon directory paths
"""
self.addons_path = addons_path
self._path_manager = AddonsPathManager(addons_path)
@staticmethod
def _extract_cycle_path(stack: list[str], repeated_module: str) -> list[str]:
"""Return the ordered cycle path from a traversal stack."""
if repeated_module in stack:
start_index = stack.index(repeated_module)
return stack[start_index:] + [repeated_module]
return stack + [repeated_module]
@classmethod
def _format_cycle_error(cls, stack: list[str], repeated_module: str) -> str:
"""Format a stable circular dependency error message."""
cycle_path = cls._extract_cycle_path(stack, repeated_module)
return f"Circular dependency detected: {' -> '.join(cycle_path)}"
[docs]
@staticmethod
def parse_cycle_error(message: str) -> list[str]:
"""Extract the module cycle path from an error message."""
prefix = "Circular dependency detected: "
if not message.startswith(prefix):
return []
return [item.strip() for item in message[len(prefix) :].split("->")]
def _find_cycle_in_graph(
self, graph: Mapping[str, Collection[str]]
) -> list[str] | None:
"""Return the first detected cycle path in a dependency graph."""
visited: set[str] = set()
stack: list[str] = []
active: set[str] = set()
def _visit(module_name: str) -> list[str] | None:
if module_name in active:
return self._extract_cycle_path(stack, module_name)
if module_name in visited:
return None
visited.add(module_name)
active.add(module_name)
stack.append(module_name)
for dependency in sorted(graph.get(module_name, [])):
if dependency not in graph:
continue
cycle = _visit(dependency)
if cycle:
return cycle
stack.pop()
active.remove(module_name)
return None
for module_name in sorted(graph):
cycle = _visit(module_name)
if cycle:
return cycle
return None
@staticmethod
def _normalize_requested_modules(*module_names: str) -> list[str]:
"""Return requested module names with whitespace and duplicates removed."""
normalized: list[str] = []
for module_name in module_names:
cleaned = module_name.strip()
if cleaned and cleaned not in normalized:
normalized.append(cleaned)
return normalized
@staticmethod
def _build_cycle_edges(cycle_path: list[str]) -> list[dict[str, str]]:
"""Return ordered dependency edges for a closed cycle path."""
if len(cycle_path) < 2:
return []
return [
{"from": cycle_path[index], "to": cycle_path[index + 1]}
for index in range(len(cycle_path) - 1)
]
@staticmethod
def _cycle_modules_from_path(cycle_path: list[str]) -> list[str]:
"""Return unique modules involved in a cycle, excluding the closing node."""
if not cycle_path:
return []
if len(cycle_path) > 1 and cycle_path[0] == cycle_path[-1]:
return cycle_path[:-1]
return cycle_path
def _get_addon_type(self, module_name: str, odoo_series: OdooSeries | None) -> str:
"""Return a human-readable addon type label."""
if odoo_series:
if is_core_ce_addon(module_name, odoo_series):
return "Odoo CE (Community)"
if is_core_ee_addon(module_name, odoo_series):
return "Odoo EE (Enterprise)"
return "Custom"
def _describe_modules(self, module_names: list[str]) -> dict[str, dict[str, Any]]:
"""Return filesystem and manifest details for the provided modules."""
if not module_names:
return {}
odoo_series = self.detect_odoo_series()
described_modules: dict[str, dict[str, Any]] = {}
for module_name in module_names:
module_path = self.find_module_path(module_name)
manifest = self.get_manifest(module_name)
source_dir = None
if module_path:
source_dir = Path(module_path).parent.name
described_modules[module_name] = {
"module": module_name,
"module_path": module_path,
"source_dir": source_dir,
"depends": list(manifest.codependencies) if manifest else [],
"version": manifest.version if manifest else None,
"addon_type": self._get_addon_type(module_name, odoo_series),
}
return described_modules
def _collect_dependency_graph(
self, module_name: str
) -> tuple[dict[str, list[str]], list[str] | None]:
"""Collect a dependency graph snapshot and the first detected cycle."""
graph: dict[str, list[str]] = {}
visited: set[str] = set()
visiting: set[str] = set()
stack: list[str] = []
cycle_path: list[str] | None = None
def _visit(mod_name: str) -> None:
nonlocal cycle_path
if mod_name in visiting:
if cycle_path is None:
cycle_path = self._extract_cycle_path(stack, mod_name)
return
if mod_name in visited:
return
visiting.add(mod_name)
stack.append(mod_name)
codependencies = self.get_module_codependencies(mod_name)
graph[mod_name] = codependencies
for dependency in codependencies:
_visit(dependency)
stack.pop()
visiting.remove(mod_name)
visited.add(mod_name)
_visit(module_name)
return graph, cycle_path
def _build_combined_dependency_graph(
self, *module_names: str
) -> tuple[dict[str, list[str]], list[str] | None]:
"""Build a combined graph plus the first directly observed cycle path."""
all_graphs: dict[str, list[str]] = {}
cycle_path: list[str] | None = None
for module_name in self._normalize_requested_modules(*module_names):
graph, observed_cycle = self._collect_dependency_graph(module_name)
all_graphs.update(graph)
if cycle_path is None and observed_cycle:
cycle_path = observed_cycle
return all_graphs, cycle_path
[docs]
def find_module_dirs(self, filter_dir: str | None = None) -> list[str]:
"""Return all module directories with __manifest__.py in configured paths
Args:
filter_dir: Optional directory name to filter results.
Only modules in directories with exact basename match
will be returned.
Returns:
Sorted list of module directory names
"""
return self._path_manager.get_module_names(filter_dir)
[docs]
def find_modules(
self, filter_dir: str | None = None, skip_invalid: bool = False
) -> ManifestCollection:
"""Return all modules with manifests in configured paths as a collection
Args:
filter_dir: Optional directory name to filter results.
Only modules in directories with exact basename match
will be returned.
skip_invalid: If True, skip modules with invalid manifests instead of
raising an exception
Returns:
ManifestCollection containing all found modules
Raises:
ManifestError: If a manifest is invalid and skip_invalid is False
"""
if filter_dir:
return self._path_manager.get_collection_by_filter(filter_dir, skip_invalid)
return self._path_manager.get_all_collections(skip_invalid)
[docs]
def find_module_path(self, module_name: str) -> str | None:
"""Find the absolute path to a module within addons_path and Odoo base addons
Args:
module_name: Name of the module to find
Returns:
Absolute path to module directory or None if not found
"""
return self._path_manager.find_module_path(module_name)
[docs]
def get_manifest(self, module_name: str) -> Manifest | None:
"""Get the manifest for a module.
Args:
module_name: Name of the module to get manifest for
Returns:
Manifest instance or None if module not found
"""
return self._path_manager.get_manifest(module_name)
[docs]
def parse_manifest(self, module_name: str) -> dict[str, Any] | None:
"""Parse and return module's __manifest__.py content.
Args:
module_name: Name of the module to parse manifest for
Returns:
Dictionary containing manifest data or None if not found
Raises:
ValueError: If manifest exists but contains invalid Python syntax
Note:
This method is maintained for backward compatibility.
Consider using get_manifest() for new code.
"""
module_path = self.find_module_path(module_name)
if not module_path:
return None
# Try to create manifest directly to preserve exception behavior
try:
manifest = Manifest(module_path)
return manifest.get_raw_data()
except (ManifestNotFoundError, FileNotFoundError):
return None
except (ManifestError, InvalidManifestError) as e:
# Convert to ValueError for backward compatibility
raise ValueError(str(e)) from e
[docs]
def get_module_codependencies(self, module_name: str) -> list[str]:
"""Get codependencies from module's manifest 'depends' field.
Codependencies are modules that this module depends on, meaning changes
to those modules may impact this module.
Args:
module_name: Name of the module to get codependencies for
Returns:
List of codependency module names, empty list if no codependencies
or module not found
"""
manifest = self.get_manifest(module_name)
if not manifest:
return []
return manifest.codependencies
[docs]
def get_direct_dependencies(self, *module_names: str) -> list[str]:
"""Get direct dependencies needed to install a set of modules.
Direct dependencies are the minimal set of external modules (not in the
provided set) needed to install the specified modules.
Args:
*module_names: One or more module names to get direct dependencies for
Returns:
Sorted list of module names that are direct dependencies (external to
the provided set) needed for installation
Example:
For modules a, b, c where:
- a depends on ['b', 'c']
- b depends on ['crm']
- c depends on ['mail']
get_direct_dependencies('a', 'b', 'c') returns ['crm', 'mail']
"""
if not module_names:
return []
module_set = set(module_names)
direct_deps = set()
for module_name in module_names:
# Get all dependencies through the dependency graph
try:
graph = self.build_dependency_graph(module_name)
# Collect all modules in the graph that are not in our module set
for module in graph:
if module not in module_set and module != "base":
direct_deps.add(module)
except ValueError:
# Skip modules with errors
continue
return sorted(direct_deps)
[docs]
def build_dependency_graph(self, module_name: str) -> dict[str, list[str]]:
"""Build complete dependency graph for a module and all its codependencies.
Args:
module_name: Name of the root module to build graph for
Returns:
Dictionary mapping each module to its direct codependencies.
Format: {module_name: [list_of_codependencies]}
Raises:
ValueError: If circular dependency is detected
"""
graph, cycle_path = self._collect_dependency_graph(module_name)
if cycle_path:
raise ValueError(f"Circular dependency detected: {' -> '.join(cycle_path)}")
return graph
[docs]
def get_dependency_tree(
self, module_name: str, max_depth: int | None = None
) -> dict[str, Any]:
"""Get hierarchical dependency tree for a module.
Args:
module_name: Name of the module to get dependency tree for
max_depth: Maximum depth to traverse (None for unlimited)
Returns:
Nested dictionary representing the dependency tree.
Format: {module_name: {codependency1: {subdeps...}, codependency2: {}}}
Raises:
ValueError: If circular dependency is detected
"""
visited: set[str] = set()
visiting: set[str] = set()
stack: list[str] = []
def _build_tree_recursive(
mod_name: str, current_depth: int = 0
) -> dict[str, Any]:
if mod_name in visiting:
raise ValueError(self._format_cycle_error(stack, mod_name))
if mod_name in visited:
# Already processed module, return empty to avoid infinite recursion
return {}
# Check if we've reached max depth
if max_depth is not None and current_depth >= max_depth:
return {}
visiting.add(mod_name)
stack.append(mod_name)
# Get codependencies for current module
codependencies = self.get_module_codependencies(mod_name)
tree = {}
# Build subtree for each codependency
for dep in codependencies:
tree[dep] = _build_tree_recursive(dep, current_depth + 1)
stack.pop()
visiting.remove(mod_name)
visited.add(mod_name)
return tree
return {module_name: _build_tree_recursive(module_name)}
[docs]
def get_dependencies_at_depth(
self, module_names: list[str], max_depth: int | None = None
) -> list[str]:
"""Get all dependencies up to a specified depth for a list of modules.
Args:
module_names: List of module names to get dependencies for
max_depth: Maximum depth to traverse (None for unlimited)
Returns:
Sorted list of unique dependency names (excluding input modules)
"""
module_set = set(module_names)
all_deps = set()
for module_name in module_names:
dep_tree = self.get_dependency_tree(module_name, max_depth=max_depth)
# Flatten the tree to get all dependencies
def _flatten_tree(tree: dict[str, Any]) -> set[str]:
deps = set()
for key, subtree in tree.items():
if key not in module_set and key != "base":
deps.add(key)
if isinstance(subtree, dict) and subtree:
deps.update(_flatten_tree(subtree))
return deps
all_deps.update(_flatten_tree(dep_tree))
return sorted(all_deps - module_set)
[docs]
def get_install_order(self, *module_names: str) -> list[str]:
"""Get the proper installation order for one or more modules and
their codependencies.
Uses topological sorting to determine the correct order for installing
modules such that all codependencies are installed before the modules
that depend on them.
Args:
*module_names: One or more module names to get install order for
Returns:
List of module names in the order they should be installed.
Codependencies come first, then modules that depend on them.
Raises:
ValueError: If circular dependency is detected
"""
requested_modules = self._normalize_requested_modules(*module_names)
if not requested_modules:
raise ValueError("At least one module name must be provided")
all_graphs, direct_cycle = self._build_combined_dependency_graph(
*requested_modules
)
if direct_cycle:
raise ValueError(
f"Circular dependency detected: {' -> '.join(direct_cycle)}"
)
if not all_graphs:
# If no modules were found, return empty list
return []
# Implement Kahn's algorithm for topological sorting
# The in-degree represents how many codependencies a module has
in_degree = {
module: len(codependencies) for module, codependencies in all_graphs.items()
}
# Initialize queue with nodes that have no codependencies (in-degree = 0)
queue = [module for module, degree in in_degree.items() if degree == 0]
result = []
while queue:
# Remove a node with no codependencies
current = queue.pop(0)
result.append(current)
# For each module that depends on the current one, reduce its in-degree
for module, codependencies in all_graphs.items():
if current in codependencies:
in_degree[module] -= 1
# If this module now has no unmet codependencies, add it to queue
if in_degree[module] == 0:
queue.append(module)
# If we haven't processed all nodes, there's a cycle
if len(result) != len(all_graphs):
remaining_modules = [
module for module in all_graphs if module not in result
]
remaining_set = set(remaining_modules)
remaining_graph = {
module: [dep for dep in all_graphs[module] if dep in remaining_set]
for module in remaining_modules
}
cycle = self._find_cycle_in_graph(remaining_graph)
if not cycle:
cycle = self._find_cycle_in_graph(all_graphs)
if cycle:
raise ValueError(f"Circular dependency detected: {' -> '.join(cycle)}")
raise ValueError(
"Topological sort failed - circular dependency suspected among: "
+ ", ".join(sorted(remaining_modules))
)
return result
[docs]
def analyze_dependency_cycle(self, *module_names: str) -> dict[str, Any]:
"""Return structured diagnostics for the first detected dependency cycle."""
requested_modules = self._normalize_requested_modules(*module_names)
if not requested_modules:
raise ValueError("At least one module name must be provided")
combined_graph, observed_cycle = self._build_combined_dependency_graph(
*requested_modules
)
cycle_path = observed_cycle or self._find_cycle_in_graph(combined_graph) or []
cycle_modules = self._cycle_modules_from_path(cycle_path)
cycle_module_set = set(cycle_modules)
cycle_graph = {
module: [
dependency
for dependency in combined_graph.get(module, [])
if dependency in cycle_module_set
]
for module in cycle_modules
}
return {
"requested_modules": requested_modules,
"graph": cycle_graph,
"cycle_path": cycle_path,
"cycle_length": len(cycle_modules),
"cycle_edges": self._build_cycle_edges(cycle_path),
"cycle_modules": cycle_modules,
"modules": self._describe_modules(cycle_modules),
}
[docs]
def find_missing_dependencies(self, module_name: str) -> list[str]:
"""Find codependencies that are not available in the addons_path.
Args:
module_name: Name of the module to check codependencies for
Returns:
List of codependency names that could not be found in addons_path.
Empty list if all codependencies are available.
Raises:
ValueError: If circular dependency is detected during graph traversal
"""
try:
# Build dependency graph - this will traverse all dependencies
graph = self.build_dependency_graph(module_name)
# Check which modules in the graph don't exist in addons_path
missing = []
for module in graph:
if self.find_module_path(module) is None:
missing.append(module)
return sorted(missing)
except ValueError as e:
# Re-raise circular dependency errors
if "Circular dependency" in str(e):
raise
# For other errors (module not found), return root as missing
return [module_name]
[docs]
def get_reverse_dependencies(self, target_module: str) -> list[str]:
"""Get all modules that directly or indirectly depend on the target module.
This method searches through all available modules to find which ones
have the target module in their codependency chain.
Args:
target_module: Name of the module to find reverse dependencies for
Returns:
List of module names that depend on the target module.
Empty list if no modules depend on the target.
"""
# Get all available modules
all_modules = self.find_module_dirs()
reverse_deps = []
for module in all_modules:
try:
# Build dependency graph for this module
graph = self.build_dependency_graph(module)
# Check if target_module appears in the graph
# (excluding the module itself if it's the same as target)
if target_module in graph and module != target_module:
reverse_deps.append(module)
except ValueError:
# Skip modules with circular dependencies or other errors
continue
return sorted(reverse_deps)
[docs]
def detect_odoo_series(self) -> OdooSeries | None:
"""Detect the Odoo series from available modules.
Scans all available modules and attempts to detect the Odoo series
from their version strings.
Returns:
OdooSeries if detected, None if unable to detect
"""
module_dirs = self.find_module_dirs()
for module_name in module_dirs:
manifest = self.get_manifest(module_name)
if manifest and manifest.version:
series = detect_from_addon_version(manifest.version)
if series:
return series
return None
[docs]
def get_module_version_display(
self, module_name: str, odoo_series: OdooSeries | None = None
) -> str:
"""Get formatted version string for display in dependency trees.
Args:
module_name: Name of the module
odoo_series: Detected Odoo series (if None, will try to detect)
Returns:
Formatted version string:
- "16.0+ce" for core CE addons
- "16.0+ee" for core EE addons
- "1.0.2" for custom addons (actual version)
- "✘ not installed" for missing addons
"""
manifest = self.get_manifest(module_name)
if not manifest:
return "✘ not installed"
if odoo_series is None:
odoo_series = self.detect_odoo_series()
if odoo_series:
if is_core_ce_addon(module_name, odoo_series):
return f"{odoo_series.value}+ce"
elif is_core_ee_addon(module_name, odoo_series):
return f"{odoo_series.value}+ee"
return manifest.version
[docs]
def sort_modules(
self,
module_names: list[str],
sorting: SortingChoice | str = SortingChoice.ALPHABETICAL,
) -> list[str]:
"""Sort module names according to the specified sorting method.
Args:
module_names: List of module names to sort
sorting: Sorting method - either 'alphabetical' or 'topological'
Returns:
Sorted list of module names
Raises:
ValueError: If circular dependency is detected in topological sort
"""
if isinstance(sorting, str):
sorting = SortingChoice(sorting)
if sorting == SortingChoice.ALPHABETICAL:
return sorted(module_names)
elif sorting == SortingChoice.TOPOLOGICAL:
return self._sort_topological(module_names)
def _sort_topological(self, module_names: list[str]) -> list[str]:
"""Sort modules topologically based on their dependencies.
Args:
module_names: List of module names to sort
Returns:
Topologically sorted list of module names
Raises:
ValueError: If circular dependency is detected
"""
if not module_names:
return []
module_set = set(module_names)
graph: dict[str, set[str]] = {}
for module_name in module_names:
manifest = self.get_manifest(module_name)
if manifest:
deps_in_set = {
dep for dep in manifest.codependencies if dep in module_set
}
graph[module_name] = deps_in_set
else:
graph[module_name] = set()
try:
ts = TopologicalSorter(graph)
return list(ts.static_order())
except ValueError as e:
cycle = self._find_cycle_in_graph(graph)
if cycle:
raise ValueError(
f"Circular dependency detected: {' -> '.join(cycle)}"
) from e
raise ValueError(f"Circular dependency detected: {e}") from e