# Copyright (C) 2025 The ODUIT Authors.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at https://mozilla.org/MPL/2.0/.
import re
from datetime import datetime
from typing import Any
from .schemas import (
COMMON_ENVELOPE_KEYS,
ResultEnvelope,
ResultMeta,
infer_read_only,
infer_safety_level,
)
_SIMPLE_ERROR_CODES = {
"ModuleNotFoundError": "module.not_found",
"DuplicateModuleError": "module.duplicate_name",
"ConfirmationRequired": "mutation.confirmation_required",
"MutationForbidden": "mutation.forbidden",
"QueryError": "runtime.query_failed",
"ConnectionError": "runtime.query_failed",
"TestFailure": "runtime.test_failure",
"ModuleOperationError": "runtime.module_operation_failed",
}
[docs]
def infer_error_code(error_type: str | None, error: str | None) -> str | None:
"""Infer a stable machine-facing error code from the payload error fields."""
if not error_type and not error:
return None
normalized_error = (error or "").lower()
if error_type == "ConfigError":
if "addons_path" in normalized_error:
return "config.addons_path_missing"
if "environment" in normalized_error or "configuration" in normalized_error:
return "config.environment_missing"
return "config.invalid"
if error_type in _SIMPLE_ERROR_CODES:
return _SIMPLE_ERROR_CODES[error_type]
if error_type == "ValidationError":
if "json" in normalized_error:
return "input.invalid_json"
return "input.invalid"
if error_type == "ModuleUninstallError":
if "dependent" in normalized_error:
return "runtime.uninstall_dependency_blocked"
if "not installed" in normalized_error:
return "runtime.uninstall_not_installed"
return "runtime.module_uninstall_failed"
if error_type == "CommandError":
if "json" in normalized_error:
return "input.invalid_json"
if "dependency" in normalized_error:
return "runtime.install_dependency_error"
if "failed test" in normalized_error or "test failure" in normalized_error:
return "runtime.test_failure"
return f"error.{(error_type or 'unknown').lower()}"
[docs]
def build_json_payload(
payload_type: str,
data: dict[str, Any] | None = None,
success: bool | None = None,
include_null_values: bool = False,
*,
flatten_data: bool = True,
flatten_meta_aliases: bool = True,
include_generated_at: bool = True,
) -> dict[str, Any]:
"""Build a versioned JSON payload envelope."""
payload_data = dict(data or {})
operation = payload_data.get("operation")
envelope_success = (
success if success is not None else bool(payload_data.get("success"))
)
warnings = list(payload_data.pop("warnings", []))
remediation = list(payload_data.pop("remediation", []))
errors = list(payload_data.pop("errors", []))
error = payload_data.get("error")
error_type = payload_data.get("error_type")
error_code = payload_data.get("error_code") or infer_error_code(error_type, error)
read_only = payload_data.get("read_only")
safety_level = payload_data.get("safety_level")
meta = ResultMeta(
timestamp=(
payload_data.get("timestamp")
or payload_data.get("generated_at")
or datetime.now().isoformat()
),
duration=payload_data.get("duration"),
)
command_data = {
key: value
for key, value in payload_data.items()
if key not in COMMON_ENVELOPE_KEYS
and key not in {"timestamp", "generated_at", "duration"}
}
if error and not errors:
errors = [
{
"message": error,
"error_type": error_type,
"error_code": error_code,
}
]
return ResultEnvelope(
payload_type=payload_type,
success=envelope_success,
operation=operation,
read_only=(
read_only
if isinstance(read_only, bool)
else infer_read_only(operation, payload_type)
),
safety_level=safety_level or infer_safety_level(operation, payload_type),
warnings=warnings,
errors=errors,
remediation=remediation,
error=error,
error_type=error_type,
error_code=error_code,
data=command_data,
meta=meta,
).to_dict(
include_null_values=include_null_values,
flatten_data=flatten_data,
flatten_meta_aliases=flatten_meta_aliases,
include_generated_at=include_generated_at,
)
[docs]
def output_result_to_json(
output: dict[str, Any],
additional_fields: dict[str, Any] | None = None,
exclude_fields: list[str] | None = None,
include_null_values: bool = False,
result_type: str = "result",
*,
flatten_data: bool = True,
flatten_meta_aliases: bool = True,
include_generated_at: bool = True,
) -> dict[str, Any]:
"""Generate JSON output for the operation result
Args:
additional_fields: Extra fields to include in the output
exclude_fields: Fields to exclude from the output
include_null_values: Whether to include fields with None values
Returns:
Dictionary suitable for JSON output
"""
output = output.copy()
payload_type = str(output.pop("type", result_type))
# Add additional fields if provided
if additional_fields:
output.update(additional_fields)
# Remove excluded fields
if exclude_fields:
for field in exclude_fields:
output.pop(field, None)
# Remove null values if requested (default behavior)
output = build_json_payload(
payload_type=payload_type,
data=output,
success=output.get("success", False),
include_null_values=include_null_values,
flatten_data=flatten_data,
flatten_meta_aliases=flatten_meta_aliases,
include_generated_at=include_generated_at,
)
# Remove empty lists/dicts unless they're meaningful for the operation
meaningful_empty_fields = {
"warnings",
"errors",
"remediation",
"failures",
"impact_set",
"unmet_dependencies",
"failed_modules",
"addons",
"models",
"inherit_models",
"base_declarations",
"source_extensions",
"source_extension_modules",
"source_view_extensions",
"installed_fields",
"installed_extension_fields",
"installed_view_extensions",
"installed_extension_modules",
"primary_views",
"extension_views",
"requested_types",
"view_counts",
"install_order",
"impacted_modules",
"candidates",
"tests",
"test_cases",
"languages",
"globs",
"files",
"missing_modules",
"related_files",
"scanned_python_files",
"nodes",
"edges",
"cycles",
"missing_required_keys",
"values",
"requested_modules",
"checks",
"installed_modules",
"not_installed_modules",
"unknown_modules",
"deprecation_warnings",
"source_addon_candidates",
"runtime_source_modules",
"source_candidates",
"executed_operations",
"skipped_operations",
}
output = {
k: v for k, v in output.items() if v != [] or k in meaningful_empty_fields
}
# Remove empty strings for stdout/stderr unless there was actually output
if output.get("stdout") == "":
output.pop("stdout", None)
if output.get("stderr") == "":
output.pop("stderr", None)
return output
[docs]
def validate_addon_name(addon_name: str) -> bool:
"""Validate addon name follows basic Odoo conventions"""
# Check basic format: lowercase letters, numbers, underscores
if not re.match(r"^[a-z][a-z0-9_]*$", addon_name):
return False
# Check length
if len(addon_name) < 2 or len(addon_name) > 50:
return False
# Check doesn't start with odoo
if addon_name.startswith("odoo"):
return False
return True