Source code for oduit.odoo_code_executor

# Copyright (C) 2025 The ODUIT Authors.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at https://mozilla.org/MPL/2.0/.

# mypy: disable-error-code=import-untyped

"""
Odoo code execution for programmatic use.

This module provides a class that allows executing Python code within an Odoo
environment and capturing the results directly, without printing to console.
It's designed for programmatic use where you want to execute Odoo operations
and get the results back as Python objects.
"""

import io
import signal
import sys
import threading
import traceback
from ast import Expr
from typing import Any, cast

from .config_provider import ConfigProvider
from .odoo_embedded_manager import OdooEmbeddedManager
from .output import print_error, print_info


[docs] class OdooCodeExecutor: """Execute Python code within an Odoo environment and capture results. This class provides a way to execute arbitrary Python code within an Odoo environment and capture the results directly as Python objects, without printing to console. It's perfect for programmatic use cases where you want to query data, perform operations, and get results back. Features: - Execute code within proper Odoo environment with 'env' variable - Capture return values and exceptions - Support for both single expressions and multi-line code blocks - Automatic database connection and cleanup - Thread-safe execution - Proper transaction handling (read-only by default) Example: executor = OdooCodeExecutor(config_provider) result = executor.execute_code("env['res.partner'].search([],limit=1).name") print(f"Partner name: {result['value']}") """
[docs] def __init__(self, config_provider: ConfigProvider): """Initialize the code executor. Args: config_provider: ConfigProvider instance with Odoo configuration """ self.config_provider = config_provider self._embedded_manager = OdooEmbeddedManager(config_provider)
[docs] def execute_code( self, code: str, database: str | None = None, commit: bool = False, timeout: float = 30.0, allow_unsafe: bool = False, ) -> dict[str, Any]: """Execute trusted Python code within an Odoo environment. Args: code: Python code to execute (can be expression or statements) database: Database name to connect to (uses config default if None) commit: Whether to commit changes (default: False for safety) timeout: Maximum execution time in seconds allow_unsafe: Must be True to allow arbitrary code execution Returns: Dictionary with execution results: - success (bool): True if execution succeeded - value (Any): Return value if code was an expression - output (str): Any stdout output from the code - error (str): Error message if execution failed - traceback (str): Full traceback if an exception occurred """ if not allow_unsafe: return { "success": False, "error": ( "Refusing to execute arbitrary code. " "Pass allow_unsafe=True for trusted code." ), } return self._execute_generated_code( code, database=database, commit=commit, timeout=timeout, )
def _prepare_execution( self, database: str | None = None ) -> tuple[str | None, dict]: """Configure Odoo runtime and resolve the target database.""" try: from odoo.tools import config config_dict = { "db_host": self.config_provider.get_optional("db_host", "localhost"), "db_port": self.config_provider.get_optional("db_port", 5432), "db_user": self.config_provider.get_optional("db_user"), "db_password": self.config_provider.get_optional("db_password"), "addons_path": self.config_provider.get_optional("addons_path"), "data_dir": self.config_provider.get_optional("data_dir"), "db_name": self.config_provider.get_optional("db_name"), "list_db": False, "http_enable": False, } for key, value in config_dict.items(): if value is not None: config[key] = value raw_db_name = ( database if database is not None else self.config_provider.get_optional("db_name") ) db_name = self._normalize_db_name(raw_db_name) if not db_name: return None, { "success": False, "error": ( "No database specified. Use database parameter or set " "db_name in config." ), } return db_name, {} except ImportError as e: error_msg = f"Odoo not available for code execution: {e}" print_error(error_msg) return None, {"success": False, "error": error_msg} except Exception as e: error_msg = f"Failed to initialize Odoo for code execution: {e}" print_error(error_msg) return None, { "success": False, "error": error_msg, "traceback": traceback.format_exc(), } @staticmethod def _normalize_db_name(value: Any) -> str | None: """Normalize db_name values across Odoo series/config formats.""" if value is None: return None if isinstance(value, str): stripped = value.strip() return stripped or None if isinstance(value, list | tuple): for item in value: normalized = OdooCodeExecutor._normalize_db_name(item) if normalized: return normalized return None stripped = str(value).strip() return stripped or None def _execute_generated_code( self, code: str, database: str | None = None, commit: bool = False, timeout: float = 30.0, ) -> dict[str, Any]: """Execute internally generated trusted code without the unsafe opt-in.""" db_name, error = self._prepare_execution(database) if db_name is None: return error return self._execute_with_database(code, db_name, commit, timeout) def _execute_with_database( self, code: str, db_name: str, commit: bool, timeout: float ) -> dict[str, Any]: """Execute code with database connection.""" try: import odoo # Set up threading context cast(Any, threading.current_thread()).dbname = db_name # Get registry and create environment registry = self._resolve_registry(odoo, db_name) odoo_api = self._resolve_api_module(odoo) with registry.cursor() as cr: # Create Odoo environment uid = odoo.SUPERUSER_ID ctx = odoo_api.Environment(cr, uid, {})["res.users"].context_get() env = odoo_api.Environment(cr, uid, ctx) # Set up execution context execution_context = { "env": env, "odoo": odoo, "registry": registry, "cr": cr, "uid": uid, "context": ctx, # Add some commonly used modules "datetime": __import__("datetime"), "json": __import__("json"), "os": __import__("os"), "sys": __import__("sys"), } # Execute the code with timeout and output capture result = self._execute_trusted(code, execution_context, timeout) if result["success"] and commit: cr.commit() print_info(f"Changes committed to database '{db_name}'") else: cr.rollback() return result except Exception as e: error_msg = f"Database execution failed: {e}" return { "success": False, "error": error_msg, "traceback": traceback.format_exc(), } @staticmethod def _resolve_registry(odoo_module: Any, db_name: str) -> Any: """Return a registry object across Odoo series.""" registry_factory = getattr(odoo_module, "registry", None) if callable(registry_factory): return registry_factory(db_name) from odoo.modules.registry import Registry return Registry.new(db_name) @staticmethod def _resolve_api_module(odoo_module: Any) -> Any: """Return the API module across Odoo series.""" api_module = getattr(odoo_module, "api", None) if api_module is not None: return api_module import odoo.api as odoo_api return odoo_api def _execute_trusted( self, code: str, context: dict[str, Any], timeout: float ) -> dict[str, Any]: """Execute trusted code with output capture and enforced timeout.""" import ast # Capture stdout and stderr old_stdout = sys.stdout old_stderr = sys.stderr stdout_capture = io.StringIO() stderr_capture = io.StringIO() result = { "success": False, "value": None, "output": "", "error": "", "traceback": "", } old_handler: Any = None timer_enabled = False try: # Redirect output streams sys.stdout = stdout_capture sys.stderr = stderr_capture if timeout <= 0: result["error"] = "Timeout must be greater than zero" return result if sys.platform == "win32": result["error"] = ( "Timeout-enforced execution is not supported on Windows" ) return result if threading.current_thread() is not threading.main_thread(): result["error"] = ( "Timeout-enforced execution requires running on the main thread" ) return result if not hasattr(signal, "setitimer") or not hasattr(signal, "ITIMER_REAL"): result["error"] = ( "Timeout enforcement is not available on this platform" ) return result def _timeout_handler(signum: int, frame: Any) -> None: raise TimeoutError(f"Execution timed out after {timeout} seconds") old_handler = signal.getsignal(signal.SIGALRM) signal.signal(signal.SIGALRM, _timeout_handler) signal.setitimer(signal.ITIMER_REAL, timeout) timer_enabled = True # Clean the code code_stripped = code.strip() # Try to determine if this ends with an expression using AST try: tree = ast.parse(code_stripped) if tree.body and isinstance(tree.body[-1], ast.Expr): # The last statement is an expression if len(tree.body) > 1: # Execute all but the last statement statements = tree.body[:-1] statements_code = ast.unparse( ast.Module(body=statements, type_ignores=[]) ) exec( compile(statements_code, "<odoo-executor>", "exec"), context ) # Evaluate the last expression expr = tree.body[-1] expr_code = ast.unparse( expr.value ) # Get the expression without ast.Expr wrapper value = eval( compile(expr_code, "<odoo-executor>", "eval"), context ) result["value"] = value result["success"] = True return result else: # Only one statement and it's an expression expr_stmt = tree.body[0] if not isinstance(expr_stmt, Expr): raise TypeError( "Expected final AST node to be an expression" ) expr_code = ast.unparse(expr_stmt.value) value = eval( compile(expr_code, "<odoo-executor>", "eval"), context ) result["value"] = value result["success"] = True return result else: # No expression at the end, execute as statements exec(compile(code, "<odoo-executor>", "exec"), context) result["value"] = None result["success"] = True return result except SyntaxError: # If AST parsing fails, fall back to simple approach pass # Fallback: try as expression first, then as statements try: compiled = compile(code_stripped, "<odoo-executor>", "eval") value = eval(compiled, context) result["value"] = value result["success"] = True except SyntaxError: # Not an expression, compile as statements try: compiled = compile(code, "<odoo-executor>", "exec") exec(compiled, context) result["value"] = None result["success"] = True except SyntaxError as e: result["error"] = f"Syntax error: {e}" result["traceback"] = traceback.format_exc() except TimeoutError as e: result["error"] = str(e) result["traceback"] = traceback.format_exc() except Exception as e: result["error"] = str(e) result["traceback"] = traceback.format_exc() finally: if timer_enabled: signal.setitimer(signal.ITIMER_REAL, 0.0) if old_handler is not None: signal.signal(signal.SIGALRM, old_handler) # Restore original streams and capture output sys.stdout = old_stdout sys.stderr = old_stderr result["output"] = stdout_capture.getvalue() if stderr_capture.getvalue(): if result["error"]: result["error"] = ( str(result["error"]) + f"\nSTDERR: {stderr_capture.getvalue()}" ) else: result["error"] = stderr_capture.getvalue() return result def _safe_execute( self, code: str, context: dict[str, Any], timeout: float ) -> dict[str, Any]: """Backward wrapper for trusted execution path.""" return self._execute_trusted(code, context, timeout)
[docs] def execute_multiple( self, code_blocks: list[str], database: str | None = None, commit: bool = False, stop_on_error: bool = True, timeout: float = 30.0, allow_unsafe: bool = False, ) -> dict[str, Any]: """Execute multiple trusted code blocks within the same transaction. Args: code_blocks: List of Python code strings to execute database: Database name to connect to commit: Whether to commit changes after all blocks succeed stop_on_error: Whether to stop execution if any block fails timeout: Maximum execution time per block in seconds allow_unsafe: Must be True to allow arbitrary code execution Returns: Dictionary with execution results: - success (bool): True if all blocks executed successfully - results (list): List of individual execution results - failed_at (int): Index of failed block (if stop_on_error=True) - error (str): Overall error message """ if not allow_unsafe: return { "success": False, "error": ( "Refusing to execute arbitrary code. " "Pass allow_unsafe=True for trusted code." ), } db_name, error = self._prepare_execution(database) if db_name is None: return error return self._execute_multiple_with_database( code_blocks, db_name, commit, stop_on_error, timeout, )
def _execute_multiple_with_database( self, code_blocks: list[str], db_name: str, commit: bool, stop_on_error: bool, timeout: float, ) -> dict[str, Any]: """Execute multiple code blocks with database connection.""" try: import odoo # Set up threading context cast(Any, threading.current_thread()).dbname = db_name # Get registry and create environment registry = self._resolve_registry(odoo, db_name) odoo_api = self._resolve_api_module(odoo) with registry.cursor() as cr: # Create Odoo environment uid = odoo.SUPERUSER_ID ctx = odoo_api.Environment(cr, uid, {})["res.users"].context_get() env = odoo_api.Environment(cr, uid, ctx) # Set up execution context (shared across all blocks) execution_context = { "env": env, "odoo": odoo, "registry": registry, "cr": cr, "uid": uid, "context": ctx, "datetime": __import__("datetime"), "json": __import__("json"), "os": __import__("os"), "sys": __import__("sys"), } results = [] overall_success = True failed_at = None # Execute each code block for i, code in enumerate(code_blocks): print_info(f"Executing code block {i + 1}/{len(code_blocks)}") result = self._execute_trusted(code, execution_context, timeout) results.append(result) if not result["success"]: overall_success = False failed_at = i if stop_on_error: print_error( f"Code block {i + 1} failed, stopping execution" ) break else: print_error(f"Code block {i + 1} failed, continuing") # Handle transaction if overall_success and commit: cr.commit() print_info(f"All changes committed to database '{db_name}'") else: cr.rollback() if not overall_success: print_info("Changes rolled back due to errors") return { "success": overall_success, "results": results, "failed_at": failed_at, "total_blocks": len(code_blocks), "executed_blocks": len(results), } except Exception as e: error_msg = f"Multiple code execution failed: {e}" return { "success": False, "error": error_msg, "traceback": traceback.format_exc(), }