# Copyright (C) 2025 The ODUIT Authors.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at https://mozilla.org/MPL/2.0/.
# mypy: disable-error-code=import-untyped
"""
Odoo code execution for programmatic use.
This module provides a class that allows executing Python code within an Odoo
environment and capturing the results directly, without printing to console.
It's designed for programmatic use where you want to execute Odoo operations
and get the results back as Python objects.
"""
import io
import signal
import sys
import threading
import traceback
from ast import Expr
from typing import Any, cast
from .config_provider import ConfigProvider
from .odoo_embedded_manager import OdooEmbeddedManager
from .output import print_error, print_info
[docs]
class OdooCodeExecutor:
"""Execute Python code within an Odoo environment and capture results.
This class provides a way to execute arbitrary Python code within an Odoo
environment and capture the results directly as Python objects, without
printing to console. It's perfect for programmatic use cases where you
want to query data, perform operations, and get results back.
Features:
- Execute code within proper Odoo environment with 'env' variable
- Capture return values and exceptions
- Support for both single expressions and multi-line code blocks
- Automatic database connection and cleanup
- Thread-safe execution
- Proper transaction handling (read-only by default)
Example:
executor = OdooCodeExecutor(config_provider)
result = executor.execute_code("env['res.partner'].search([],limit=1).name")
print(f"Partner name: {result['value']}")
"""
[docs]
def __init__(self, config_provider: ConfigProvider):
"""Initialize the code executor.
Args:
config_provider: ConfigProvider instance with Odoo configuration
"""
self.config_provider = config_provider
self._embedded_manager = OdooEmbeddedManager(config_provider)
[docs]
def execute_code(
self,
code: str,
database: str | None = None,
commit: bool = False,
timeout: float = 30.0,
allow_unsafe: bool = False,
) -> dict[str, Any]:
"""Execute trusted Python code within an Odoo environment.
Args:
code: Python code to execute (can be expression or statements)
database: Database name to connect to (uses config default if None)
commit: Whether to commit changes (default: False for safety)
timeout: Maximum execution time in seconds
allow_unsafe: Must be True to allow arbitrary code execution
Returns:
Dictionary with execution results:
- success (bool): True if execution succeeded
- value (Any): Return value if code was an expression
- output (str): Any stdout output from the code
- error (str): Error message if execution failed
- traceback (str): Full traceback if an exception occurred
"""
if not allow_unsafe:
return {
"success": False,
"error": (
"Refusing to execute arbitrary code. "
"Pass allow_unsafe=True for trusted code."
),
}
return self._execute_generated_code(
code,
database=database,
commit=commit,
timeout=timeout,
)
def _prepare_execution(
self, database: str | None = None
) -> tuple[str | None, dict]:
"""Configure Odoo runtime and resolve the target database."""
try:
from odoo.tools import config
config_dict = {
"db_host": self.config_provider.get_optional("db_host", "localhost"),
"db_port": self.config_provider.get_optional("db_port", 5432),
"db_user": self.config_provider.get_optional("db_user"),
"db_password": self.config_provider.get_optional("db_password"),
"addons_path": self.config_provider.get_optional("addons_path"),
"data_dir": self.config_provider.get_optional("data_dir"),
"db_name": self.config_provider.get_optional("db_name"),
"list_db": False,
"http_enable": False,
}
for key, value in config_dict.items():
if value is not None:
config[key] = value
raw_db_name = (
database
if database is not None
else self.config_provider.get_optional("db_name")
)
db_name = self._normalize_db_name(raw_db_name)
if not db_name:
return None, {
"success": False,
"error": (
"No database specified. Use database parameter or set "
"db_name in config."
),
}
return db_name, {}
except ImportError as e:
error_msg = f"Odoo not available for code execution: {e}"
print_error(error_msg)
return None, {"success": False, "error": error_msg}
except Exception as e:
error_msg = f"Failed to initialize Odoo for code execution: {e}"
print_error(error_msg)
return None, {
"success": False,
"error": error_msg,
"traceback": traceback.format_exc(),
}
@staticmethod
def _normalize_db_name(value: Any) -> str | None:
"""Normalize db_name values across Odoo series/config formats."""
if value is None:
return None
if isinstance(value, str):
stripped = value.strip()
return stripped or None
if isinstance(value, list | tuple):
for item in value:
normalized = OdooCodeExecutor._normalize_db_name(item)
if normalized:
return normalized
return None
stripped = str(value).strip()
return stripped or None
def _execute_generated_code(
self,
code: str,
database: str | None = None,
commit: bool = False,
timeout: float = 30.0,
) -> dict[str, Any]:
"""Execute internally generated trusted code without the unsafe opt-in."""
db_name, error = self._prepare_execution(database)
if db_name is None:
return error
return self._execute_with_database(code, db_name, commit, timeout)
def _execute_with_database(
self, code: str, db_name: str, commit: bool, timeout: float
) -> dict[str, Any]:
"""Execute code with database connection."""
try:
import odoo
# Set up threading context
cast(Any, threading.current_thread()).dbname = db_name
# Get registry and create environment
registry = self._resolve_registry(odoo, db_name)
odoo_api = self._resolve_api_module(odoo)
with registry.cursor() as cr:
# Create Odoo environment
uid = odoo.SUPERUSER_ID
ctx = odoo_api.Environment(cr, uid, {})["res.users"].context_get()
env = odoo_api.Environment(cr, uid, ctx)
# Set up execution context
execution_context = {
"env": env,
"odoo": odoo,
"registry": registry,
"cr": cr,
"uid": uid,
"context": ctx,
# Add some commonly used modules
"datetime": __import__("datetime"),
"json": __import__("json"),
"os": __import__("os"),
"sys": __import__("sys"),
}
# Execute the code with timeout and output capture
result = self._execute_trusted(code, execution_context, timeout)
if result["success"] and commit:
cr.commit()
print_info(f"Changes committed to database '{db_name}'")
else:
cr.rollback()
return result
except Exception as e:
error_msg = f"Database execution failed: {e}"
return {
"success": False,
"error": error_msg,
"traceback": traceback.format_exc(),
}
@staticmethod
def _resolve_registry(odoo_module: Any, db_name: str) -> Any:
"""Return a registry object across Odoo series."""
registry_factory = getattr(odoo_module, "registry", None)
if callable(registry_factory):
return registry_factory(db_name)
from odoo.modules.registry import Registry
return Registry.new(db_name)
@staticmethod
def _resolve_api_module(odoo_module: Any) -> Any:
"""Return the API module across Odoo series."""
api_module = getattr(odoo_module, "api", None)
if api_module is not None:
return api_module
import odoo.api as odoo_api
return odoo_api
def _execute_trusted(
self, code: str, context: dict[str, Any], timeout: float
) -> dict[str, Any]:
"""Execute trusted code with output capture and enforced timeout."""
import ast
# Capture stdout and stderr
old_stdout = sys.stdout
old_stderr = sys.stderr
stdout_capture = io.StringIO()
stderr_capture = io.StringIO()
result = {
"success": False,
"value": None,
"output": "",
"error": "",
"traceback": "",
}
old_handler: Any = None
timer_enabled = False
try:
# Redirect output streams
sys.stdout = stdout_capture
sys.stderr = stderr_capture
if timeout <= 0:
result["error"] = "Timeout must be greater than zero"
return result
if sys.platform == "win32":
result["error"] = (
"Timeout-enforced execution is not supported on Windows"
)
return result
if threading.current_thread() is not threading.main_thread():
result["error"] = (
"Timeout-enforced execution requires running on the main thread"
)
return result
if not hasattr(signal, "setitimer") or not hasattr(signal, "ITIMER_REAL"):
result["error"] = (
"Timeout enforcement is not available on this platform"
)
return result
def _timeout_handler(signum: int, frame: Any) -> None:
raise TimeoutError(f"Execution timed out after {timeout} seconds")
old_handler = signal.getsignal(signal.SIGALRM)
signal.signal(signal.SIGALRM, _timeout_handler)
signal.setitimer(signal.ITIMER_REAL, timeout)
timer_enabled = True
# Clean the code
code_stripped = code.strip()
# Try to determine if this ends with an expression using AST
try:
tree = ast.parse(code_stripped)
if tree.body and isinstance(tree.body[-1], ast.Expr):
# The last statement is an expression
if len(tree.body) > 1:
# Execute all but the last statement
statements = tree.body[:-1]
statements_code = ast.unparse(
ast.Module(body=statements, type_ignores=[])
)
exec(
compile(statements_code, "<odoo-executor>", "exec"), context
)
# Evaluate the last expression
expr = tree.body[-1]
expr_code = ast.unparse(
expr.value
) # Get the expression without ast.Expr wrapper
value = eval(
compile(expr_code, "<odoo-executor>", "eval"), context
)
result["value"] = value
result["success"] = True
return result
else:
# Only one statement and it's an expression
expr_stmt = tree.body[0]
if not isinstance(expr_stmt, Expr):
raise TypeError(
"Expected final AST node to be an expression"
)
expr_code = ast.unparse(expr_stmt.value)
value = eval(
compile(expr_code, "<odoo-executor>", "eval"), context
)
result["value"] = value
result["success"] = True
return result
else:
# No expression at the end, execute as statements
exec(compile(code, "<odoo-executor>", "exec"), context)
result["value"] = None
result["success"] = True
return result
except SyntaxError:
# If AST parsing fails, fall back to simple approach
pass
# Fallback: try as expression first, then as statements
try:
compiled = compile(code_stripped, "<odoo-executor>", "eval")
value = eval(compiled, context)
result["value"] = value
result["success"] = True
except SyntaxError:
# Not an expression, compile as statements
try:
compiled = compile(code, "<odoo-executor>", "exec")
exec(compiled, context)
result["value"] = None
result["success"] = True
except SyntaxError as e:
result["error"] = f"Syntax error: {e}"
result["traceback"] = traceback.format_exc()
except TimeoutError as e:
result["error"] = str(e)
result["traceback"] = traceback.format_exc()
except Exception as e:
result["error"] = str(e)
result["traceback"] = traceback.format_exc()
finally:
if timer_enabled:
signal.setitimer(signal.ITIMER_REAL, 0.0)
if old_handler is not None:
signal.signal(signal.SIGALRM, old_handler)
# Restore original streams and capture output
sys.stdout = old_stdout
sys.stderr = old_stderr
result["output"] = stdout_capture.getvalue()
if stderr_capture.getvalue():
if result["error"]:
result["error"] = (
str(result["error"]) + f"\nSTDERR: {stderr_capture.getvalue()}"
)
else:
result["error"] = stderr_capture.getvalue()
return result
def _safe_execute(
self, code: str, context: dict[str, Any], timeout: float
) -> dict[str, Any]:
"""Backward wrapper for trusted execution path."""
return self._execute_trusted(code, context, timeout)
[docs]
def execute_multiple(
self,
code_blocks: list[str],
database: str | None = None,
commit: bool = False,
stop_on_error: bool = True,
timeout: float = 30.0,
allow_unsafe: bool = False,
) -> dict[str, Any]:
"""Execute multiple trusted code blocks within the same transaction.
Args:
code_blocks: List of Python code strings to execute
database: Database name to connect to
commit: Whether to commit changes after all blocks succeed
stop_on_error: Whether to stop execution if any block fails
timeout: Maximum execution time per block in seconds
allow_unsafe: Must be True to allow arbitrary code execution
Returns:
Dictionary with execution results:
- success (bool): True if all blocks executed successfully
- results (list): List of individual execution results
- failed_at (int): Index of failed block (if stop_on_error=True)
- error (str): Overall error message
"""
if not allow_unsafe:
return {
"success": False,
"error": (
"Refusing to execute arbitrary code. "
"Pass allow_unsafe=True for trusted code."
),
}
db_name, error = self._prepare_execution(database)
if db_name is None:
return error
return self._execute_multiple_with_database(
code_blocks,
db_name,
commit,
stop_on_error,
timeout,
)
def _execute_multiple_with_database(
self,
code_blocks: list[str],
db_name: str,
commit: bool,
stop_on_error: bool,
timeout: float,
) -> dict[str, Any]:
"""Execute multiple code blocks with database connection."""
try:
import odoo
# Set up threading context
cast(Any, threading.current_thread()).dbname = db_name
# Get registry and create environment
registry = self._resolve_registry(odoo, db_name)
odoo_api = self._resolve_api_module(odoo)
with registry.cursor() as cr:
# Create Odoo environment
uid = odoo.SUPERUSER_ID
ctx = odoo_api.Environment(cr, uid, {})["res.users"].context_get()
env = odoo_api.Environment(cr, uid, ctx)
# Set up execution context (shared across all blocks)
execution_context = {
"env": env,
"odoo": odoo,
"registry": registry,
"cr": cr,
"uid": uid,
"context": ctx,
"datetime": __import__("datetime"),
"json": __import__("json"),
"os": __import__("os"),
"sys": __import__("sys"),
}
results = []
overall_success = True
failed_at = None
# Execute each code block
for i, code in enumerate(code_blocks):
print_info(f"Executing code block {i + 1}/{len(code_blocks)}")
result = self._execute_trusted(code, execution_context, timeout)
results.append(result)
if not result["success"]:
overall_success = False
failed_at = i
if stop_on_error:
print_error(
f"Code block {i + 1} failed, stopping execution"
)
break
else:
print_error(f"Code block {i + 1} failed, continuing")
# Handle transaction
if overall_success and commit:
cr.commit()
print_info(f"All changes committed to database '{db_name}'")
else:
cr.rollback()
if not overall_success:
print_info("Changes rolled back due to errors")
return {
"success": overall_success,
"results": results,
"failed_at": failed_at,
"total_blocks": len(code_blocks),
"executed_blocks": len(results),
}
except Exception as e:
error_msg = f"Multiple code execution failed: {e}"
return {
"success": False,
"error": error_msg,
"traceback": traceback.format_exc(),
}