# Copyright (C) 2025 The ODUIT Authors.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at https://mozilla.org/MPL/2.0/.
import getpass
import os
import re
import select
import signal
import subprocess
import sys
from collections.abc import Generator
from typing import TYPE_CHECKING, Any
from . import output
from .base_process_manager import BaseProcessManager
from .output import print_error, print_info, print_warning
from .utils import build_json_payload
if TYPE_CHECKING:
from .builders import CommandOperation
# Platform detection
IS_WINDOWS = sys.platform == "win32"
IS_UNIX = not IS_WINDOWS
# Platform-specific imports with fallbacks
# PTY support is required for proper interactive shell handling on Unix systems
HAS_PTY = False
pty = None
termios = None
tty = None
if IS_UNIX:
try:
import pty # type: ignore[assignment]
import termios # type: ignore[assignment]
import tty # type: ignore[assignment]
# Verify PTY functionality is actually available
# Some Unix systems may have the modules but lack PTY support
try:
master_fd, slave_fd = pty.openpty() # type: ignore[attr-defined]
os.close(master_fd)
os.close(slave_fd)
HAS_PTY = True
except (OSError, AttributeError):
# PTY creation failed - modules exist but functionality is limited
HAS_PTY = False
except ImportError:
# PTY modules not available on this system
HAS_PTY = False
pty = None
termios = None
tty = None
[docs]
class ProcessManager(BaseProcessManager):
"""Cross-platform process execution manager for Odoo operations.
Provides comprehensive process management functionality including:
- Command execution with output streaming and error handling
- Cross-platform sudo authentication via stdin
- Interactive shell support with pseudo-terminal (PTY) handling
- Real-time output colorization and formatting
- JSON-structured logging for programmatic consumption
- Signal-safe process termination and cleanup
This class handles the complexities of running Odoo commands across
different operating systems while providing consistent interfaces
for both interactive and automated usage scenarios.
Attributes:
_sudo_password: Cached sudo password for repeated operations
Example:
>>> pm = ProcessManager()
>>> result = pm.run_command(['echo', 'hello world'])
>>> print(result['success']) # True
>>> print(result['output']) # 'hello world\\n'
>>> # Interactive shell
>>> pm.run_interactive_shell(['bash'])
"""
_sudo_password: str | None
[docs]
def __init__(self) -> None:
self._sudo_password = None
[docs]
def clear_sudo_password(self) -> None:
"""Clear cached sudo password from memory."""
self._sudo_password = None
def _get_sudo_password(self) -> str:
"""Get sudo password from cache, env var, or interactive prompt."""
if self._sudo_password is not None:
return self._sudo_password
env_password = os.environ.get("SUDO_PASSWORD")
if env_password:
self._sudo_password = env_password
return self._sudo_password
self._sudo_password = getpass.getpass("Sudo password: ")
return self._sudo_password
@staticmethod
def _write_process_stdin(process: Any, stdin_data: str | None) -> None:
"""Write optional stdin data to process and close stdin."""
if stdin_data is None or process.stdin is None:
return
process.stdin.write(stdin_data)
process.stdin.flush()
process.stdin.close()
[docs]
def run_operation(
self,
command_operation: "CommandOperation",
verbose: bool = False,
suppress_output: bool = False,
) -> dict[str, Any]:
"""Execute a CommandOperation directly.
For regular ProcessManager, this builds the command from the operation
and executes it normally with enhanced result processing.
Args:
command_operation: Structured command operation with metadata
verbose: Enable verbose output
suppress_output: Suppress output to console
Returns:
Dict containing execution results
"""
from .operation_result import OperationResult
if verbose and not suppress_output:
print_info(f"Executing {command_operation.operation_type} operation")
# Create OperationResult from CommandOperation
result_builder = OperationResult.from_operation(command_operation)
try:
# Execute the command using the regular process manager
process_result = self.run_command(
command_operation.command,
verbose=verbose,
suppress_output=suppress_output,
)
# Use the enhanced result processing
if process_result:
# Get output for parsing
output = process_result.get("output", "")
return_code = process_result.get("return_code")
if return_code is None:
return_code = 0 if process_result.get("success", False) else 1
# Set basic result info
result_builder.set_success(
process_result.get("success", False),
return_code,
).set_output(
process_result.get("stdout", output),
process_result.get("stderr", ""),
)
# Add warnings if captured
if "warnings" in process_result and process_result["warnings"]:
result_builder.set_custom_data(warnings=process_result["warnings"])
# Apply automatic parsing based on operation metadata
result_builder.process_with_parsers(output)
if "error" in process_result:
result_builder.set_error(process_result["error"])
else:
result_builder.set_error("Operation execution failed", "ExecutionError")
except Exception as e:
result_builder.set_error(
f"Failed to execute operation: {str(e)}", "OperationError"
)
return result_builder.finalize()
def _spawn_process_with_optional_sudo(
self, cmd: list[str]
) -> tuple[Any, str | None]:
"""Spawn subprocess handling sudo -S via stdin.
Returns (process, stdin_data).
"""
is_sudo_command = bool(cmd) and cmd[0] == "sudo" and "-S" in cmd
if is_sudo_command:
sudo_password = self._get_sudo_password()
process = self._create_subprocess(cmd, stdin_pipe=True)
return process, f"{sudo_password}\n"
# Non-sudo command
process = self._create_subprocess(cmd)
return process, None
def _get_process_kwargs(self) -> dict[str, Any]:
"""Get platform-appropriate process creation kwargs"""
if IS_WINDOWS:
return {"creationflags": 0x00000200} # CREATE_NEW_PROCESS_GROUP
else:
return {"preexec_fn": os.setsid}
def _create_subprocess(
self,
cmd: list[str],
env: dict[str, str] | None = None,
stdin_pipe: bool = False,
) -> Any:
"""Create subprocess with platform-appropriate settings"""
kwargs = self._get_process_kwargs()
if env:
kwargs["env"] = env
stdin_target = subprocess.PIPE if stdin_pipe else None
return subprocess.Popen(
cmd,
stdin=stdin_target,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
**kwargs,
)
def _handle_line_output(
self, line: str, should_show_line: bool, compact: bool, suppress_output: bool
) -> None:
"""Handle output for a single line"""
if suppress_output:
return
if output._formatter.format_type == "json":
if should_show_line or not compact:
self._parse_and_output_odoo_log(line)
else:
if should_show_line:
print(self._colorize_log_line(line), end="")
def _collect_error_context(
self, process: Any, suppress_output: bool, info_pattern: Any
) -> list[str]:
"""Collect additional lines for error context"""
if IS_WINDOWS:
return []
context_lines = []
remaining = 20
while remaining > 0:
try:
r, _, _ = select.select([process.stdout], [], [], 0.5)
except Exception:
break
if not r:
break
try:
next_line = process.stdout.readline()
except Exception:
break
if not next_line:
break
context_lines.append(next_line)
# In error context, always show lines regardless of compact mode
# But still respect suppress_output mode
if not suppress_output:
if output._formatter.format_type == "json":
self._parse_and_output_odoo_log(next_line)
else:
print(self._colorize_log_line(next_line), end="")
# Stop if we encounter an INFO line (error message is over)
if info_pattern.search(next_line):
break
remaining -= 1
return context_lines
def _terminate_process_on_error(self, process: Any, suppress_output: bool) -> None:
"""Terminate process when error is detected"""
if not suppress_output:
print_error("Failure detected in output. Aborting...")
self._terminate_process_cross_platform(process)
def _terminate_process_cross_platform(self, process: Any) -> None:
"""Terminate process with platform-appropriate method"""
try:
if IS_WINDOWS:
# Windows: Terminate process tree using taskkill
subprocess.run(
["taskkill", "/F", "/T", "/PID", str(process.pid)],
capture_output=True,
)
else:
# Unix: Kill process group
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
except Exception:
# Fallback to simple terminate
process.terminate()
def _parse_and_output_odoo_log(self, line: str) -> None:
"""Parse Odoo log line and output using OutputFormatter."""
import json
from datetime import datetime
# Skip if not in JSON mode
if output._formatter.format_type != "json":
return
# Try to parse as Odoo log line using OutputFormatter
if output._formatter._is_odoo_log_line(line):
parsed_log = output._formatter._parse_odoo_log_line(line)
if parsed_log:
print(json.dumps(build_json_payload("log", parsed_log, success=True)))
return
# Try to parse as coverage report line
if output._formatter._is_coverage_report_line(line):
parsed_coverage = output._formatter._parse_coverage_report_line(line)
if parsed_coverage:
print(
json.dumps(
build_json_payload(
parsed_coverage.get("type", "file_coverage"),
parsed_coverage,
success=True,
)
)
)
return
# For non-matching lines (like other process output), output as structured data
output_data = build_json_payload(
"log",
{
"source": "process",
"level": "info",
"message": line.strip(),
"timestamp": datetime.now().isoformat(),
},
success=True,
)
print(json.dumps(output_data))
def _stream_output_and_maybe_abort(
self,
process: Any,
stop_on_error: bool,
compact: bool = False,
suppress_output: bool = False,
warnings: list[str] | None = None,
) -> list[str]:
"""Stream stdout lines and abort on first failure pattern if requested.
Returns collected output lines.
"""
# Pattern indicating a test failure line in Odoo output
failure_patterns = [
re.compile(r"\\bFAIL:\\s", re.IGNORECASE),
re.compile(r"\\bERROR:\\s", re.IGNORECASE),
]
# Pattern indicating an INFO line (end of error message)
info_pattern = re.compile(r"\\bINFO:\\s", re.IGNORECASE)
# Important warning patterns to capture
warning_patterns = [
re.compile(r"invalid module names, ignored:"),
re.compile(r"module.*: not installable, skipped"),
re.compile(r"Some modules are not loaded"),
]
if not process.stdout:
return []
collected_output = []
for line in process.stdout:
collected_output.append(line)
# Capture important warnings
if warnings is not None:
for pattern in warning_patterns:
if pattern.search(line):
# Extract the warning message
warning_msg = line.strip()
if warning_msg not in warnings:
warnings.append(warning_msg)
# Check if we should show this line in compact mode
should_show_line = True
if compact:
should_show_line = self._should_show_line_in_compact(
line,
)
# Handle output
self._handle_line_output(line, should_show_line, compact, suppress_output)
if stop_on_error and any(p.search(line) for p in failure_patterns):
# Collect error context
context_lines = self._collect_error_context(
process, suppress_output, info_pattern
)
collected_output.extend(context_lines)
# Terminate process
self._terminate_process_on_error(process, suppress_output)
break
return collected_output
[docs]
def run_command(
self,
cmd: list[str],
stop_on_error: bool = False,
compact: bool = False,
verbose: bool = False,
suppress_output: bool = False,
) -> dict[str, Any]:
"""Execute a command with comprehensive output handling and error management.
This method provides a unified interface for running system commands with
proper error handling, output streaming, and optional sudo support.
Args:
cmd: Command to execute as list of strings (e.g., ['ls', '-la'])
stop_on_error: If True, terminate on first error encountered
compact: If True, use compact output formatting
verbose: If True, print the command being executed
suppress_output: If True, suppress all output to console
Returns:
Dict containing execution results with keys:
- success (bool): True if command executed successfully
- return_code (int): Process exit code (0 for success)
- output (str): Combined stdout/stderr output
- command (str): The executed command as string
- error (str): Error message if execution failed
Raises:
KeyboardInterrupt: Re-raised after proper process cleanup
Examples:
>>> pm = ProcessManager()
>>> result = pm.run_command(['echo', 'hello'])
>>> print(result['success']) # True
>>> print(result['output']) # 'hello\\n'
>>> result = pm.run_command(['false']) # Command that fails
>>> print(result['success']) # False
>>> print(result['return_code']) # 1
"""
if verbose and not suppress_output:
print_info(f"Running command: {' '.join(cmd)}")
process = None
stdin_data = None
output_lines = []
warnings: list[str] = []
try:
process, stdin_data = self._spawn_process_with_optional_sudo(cmd)
self._write_process_stdin(process, stdin_data)
output_lines = self._stream_output_and_maybe_abort(
process, stop_on_error, compact, suppress_output, warnings
)
process.wait()
if process.returncode != 0:
output_text = "".join(output_lines)
if not suppress_output:
print_error(f"Command exited with code {process.returncode}")
return {
"success": False,
"return_code": process.returncode,
"output": output_text,
"stdout": output_text,
"stderr": "",
"command": " ".join(cmd),
"warnings": warnings,
}
output_text = "".join(output_lines)
return {
"success": True,
"return_code": 0,
"output": output_text,
"stdout": output_text,
"stderr": "",
"command": " ".join(cmd),
"warnings": warnings,
}
except KeyboardInterrupt:
if not suppress_output:
print_error("Interrupted by user. Terminating subprocess...")
if process:
self._terminate_process_cross_platform(process)
return {
"success": False,
"return_code": 130,
"error": "Interrupted by user",
"output": "".join(output_lines),
"stdout": "".join(output_lines),
"stderr": "",
"command": " ".join(cmd),
"warnings": warnings,
}
except FileNotFoundError as e:
error_msg = f"Command not successful: {cmd[0]} due to {e}"
if not suppress_output:
print_error(error_msg)
return {
"success": False,
"return_code": 127,
"error": error_msg,
"output": "",
"stdout": "",
"stderr": "",
"command": " ".join(cmd),
"warnings": warnings,
}
except Exception as e:
error_msg = f"Error running command: {e}"
if not suppress_output:
print_error(error_msg)
return {
"success": False,
"return_code": 1,
"error": error_msg,
"output": "".join(output_lines),
"stdout": "".join(output_lines),
"stderr": "",
"command": " ".join(cmd),
"warnings": warnings,
}
def _stream_output_yielding(
self,
process: Any,
stop_on_error: bool,
compact: bool = False,
suppress_output: bool = False,
) -> Generator[dict[str, Any], None, None]:
"""Generator version of _stream_output_and_maybe_abort that yields lines
Yields:
dict: For each line: {
'line': str, # Raw line content
'formatted': str, # Colorized/formatted line
'should_show': bool, # Whether line should be shown in compact mode
'is_error': bool, # Whether line matches error patterns
'process_running': bool # Whether process is still active
'is_context': bool # Whether this is error context (optional)
}
"""
# Pattern indicating a test failure line in Odoo output
failure_patterns = [
re.compile(r"\\bFAIL:\\s", re.IGNORECASE),
re.compile(r"\\bERROR:\\s", re.IGNORECASE),
]
# Pattern indicating an INFO line (end of error message)
info_pattern = re.compile(r"\\bINFO:\\s", re.IGNORECASE)
if not process.stdout:
return
for line in process.stdout:
# Check if we should show this line in compact mode
should_show_line = True
if compact:
should_show_line = self._should_show_line_in_compact(
line,
)
is_error = any(p.search(line) for p in failure_patterns)
# Yield line information
yield {
"line": line,
"formatted": self._colorize_log_line(line),
"should_show": should_show_line,
"is_error": is_error,
"process_running": True,
}
# Handle output if not suppress_output
self._handle_line_output(line, should_show_line, compact, suppress_output)
if stop_on_error and is_error:
# Yield error context lines
context_lines = self._collect_error_context(
process, suppress_output, info_pattern
)
for context_line in context_lines:
yield {
"line": context_line,
"formatted": self._colorize_log_line(context_line),
"should_show": True,
"is_error": False,
"process_running": True,
"is_context": True,
}
# Terminate process
self._terminate_process_on_error(process, suppress_output)
break
[docs]
def run_command_yielding(
self,
cmd: list[str],
stop_on_error: bool = False,
compact: bool = False,
verbose: bool = False,
suppress_output: bool = False,
) -> Generator[dict[str, Any], None, None]:
"""Generator version of run_command that yields lines as they arrive
Args:
cmd: Command to execute
stop_on_error: Stop execution on first error pattern
compact: Only show relevant lines (dots, errors, warnings)
verbose: Print command before execution
suppress_output: Don't print lines to console (only yield them)
Yields:
For each line:
- dict: {'line': str, 'formatted': str, 'should_show': bool,
'is_error': bool, 'process_running': bool}
Final yield:
- dict: {'result': dict, 'process_running': False}
"""
if verbose and not suppress_output:
print_info(f"Running command: {' '.join(cmd)}")
process = None
stdin_data = None
output_lines = []
try:
process, stdin_data = self._spawn_process_with_optional_sudo(cmd)
self._write_process_stdin(process, stdin_data)
# Yield from the streaming generator
for item in self._stream_output_yielding(
process, stop_on_error, compact, suppress_output
):
line = item.get("line")
if line is not None:
output_lines.append(line)
yield item
process.wait()
if process.returncode != 0:
if not suppress_output:
print_error(f"Command exited with code {process.returncode}")
yield {
"result": {
"success": False,
"return_code": process.returncode,
"output": "".join(output_lines),
"command": " ".join(cmd),
},
"process_running": False,
}
else:
yield {
"result": {
"success": True,
"return_code": 0,
"output": "".join(output_lines),
"command": " ".join(cmd),
},
"process_running": False,
}
except KeyboardInterrupt:
if not suppress_output:
print_error("Interrupted by user. Terminating subprocess...")
if process:
self._terminate_process_cross_platform(process)
yield {
"result": {
"success": False,
"error": "Interrupted by user",
"output": "".join(output_lines),
"command": " ".join(cmd),
},
"process_running": False,
}
except FileNotFoundError:
error_msg = f"Command not found: {cmd[0]}"
if not suppress_output:
print_error(error_msg)
yield {
"result": {
"success": False,
"error": error_msg,
"command": " ".join(cmd),
},
"process_running": False,
}
except Exception as e:
error_msg = f"Error running command: {e}"
if not suppress_output:
print_error(error_msg)
yield {
"result": {
"success": False,
"error": error_msg,
"output": "".join(output_lines),
"command": " ".join(cmd),
},
"process_running": False,
}
[docs]
def run_shell_command(
self,
cmd: list[str] | str,
verbose: bool = False,
capture_output: bool = False,
allow_shell: bool = False,
input_data: str | None = None,
) -> dict[str, Any]:
"""Run a shell command that may receive piped input
Args:
cmd: Either a list of command arguments or a string to be evaluated by shell
verbose: Print command before execution
capture_output: Capture stdout/stderr instead of inheriting
allow_shell: Allow shell string evaluation when cmd is a string
input_data: Optional data to send to stdin
"""
# Determine if cmd is a string (shell evaluation) or list (direct execution)
use_shell = isinstance(cmd, str)
command_text = cmd if use_shell else " ".join(cmd)
def _build_result(
*,
success: bool,
return_code: int,
stdout: str = "",
stderr: str = "",
error: str | None = None,
) -> dict[str, Any]:
result: dict[str, Any] = {
"success": success,
"return_code": return_code,
"stdout": stdout,
"stderr": stderr,
"output": stdout + stderr,
"command": command_text,
}
if error is not None:
result["error"] = error
return result
if use_shell and not allow_shell:
error_msg = "String commands require allow_shell=True"
print_error(error_msg)
return _build_result(success=False, return_code=1, error=error_msg)
if (
not use_shell
and isinstance(cmd, list)
and cmd
and cmd[0] == "sudo"
and "-S" in cmd
and input_data is None
):
input_data = f"{self._get_sudo_password()}\n"
stdin_target = subprocess.PIPE if input_data is not None else None
if verbose:
if use_shell:
print_info(f"Running shell command: {cmd}")
else:
print_info(f"Running shell command: {' '.join(cmd)}")
process = None
try:
if capture_output:
# For JSON format, capture output instead of inheriting
process = subprocess.Popen(
cmd,
shell=use_shell,
stdin=stdin_target,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
**self._get_process_kwargs(),
)
# Wait for completion and capture output
stdout, stderr = process.communicate(input=input_data)
return_code = process.returncode
result = {
"success": return_code == 0,
"return_code": return_code,
"stdout": stdout,
"stderr": stderr,
"output": stdout + stderr,
"command": command_text,
}
if return_code != 0:
result["error"] = f"Shell command exited with code {return_code}"
return result
else:
# For shell commands, we want stdin to be inherited and
# stdout/stderr to go directly to the terminal
process = subprocess.Popen(
cmd,
shell=use_shell,
stdin=stdin_target,
stdout=None,
stderr=None,
text=True,
**self._get_process_kwargs(),
)
# Wait for the process to complete
if input_data is not None:
process.communicate(input=input_data)
return_code = process.returncode
else:
return_code = process.wait()
if return_code != 0:
print_error(f"Shell command exited with code {return_code}")
return _build_result(
success=False,
return_code=return_code,
error=f"Shell command exited with code {return_code}",
)
return _build_result(success=True, return_code=0)
except KeyboardInterrupt:
print_error("Interrupted by user. Terminating subprocess...")
if process:
self._terminate_process_cross_platform(process)
return {
"success": False,
"return_code": 130,
"error": "Interrupted by user",
"stdout": "",
"stderr": "",
"output": "",
"command": command_text,
}
except FileNotFoundError:
if use_shell:
error_msg = f"Shell command failed: {cmd}"
else:
error_msg = f"Command not found: {cmd[0]}"
print_error(error_msg)
return _build_result(success=False, return_code=127, error=error_msg)
except Exception as e:
error_msg = f"Error running shell command: {e}"
print_error(error_msg)
return _build_result(success=False, return_code=1, error=error_msg)
[docs]
@staticmethod
def run_interactive_shell(cmd: list[str]) -> int:
"""Run an interactive shell command with proper cross-platform handling"""
print_info(f"Running interactive shell: {' '.join(cmd)}")
if IS_WINDOWS:
# Windows: Simple subprocess without PTY
try:
process = subprocess.Popen(
cmd,
stdin=sys.stdin,
stdout=sys.stdout,
stderr=sys.stderr,
creationflags=0x00000200, # CREATE_NEW_PROCESS_GROUP
)
return process.wait()
except Exception as e:
print_error(f"Error running interactive shell: {e}")
return 1
elif not HAS_PTY:
# Unix without PTY support - fallback
try:
process = subprocess.Popen(
cmd,
stdin=sys.stdin,
stdout=sys.stdout,
stderr=sys.stderr,
preexec_fn=os.setsid,
)
return process.wait()
except Exception as e:
print_error(f"Error running interactive shell: {e}")
return 1
else:
# Unix with PTY support - original implementation
import pty
import termios
import tty
old_tty = None
try:
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
except (termios.error, AttributeError) as e:
print_warning(f"Could not configure terminal: {e}")
old_tty = None
# Open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()
try:
# Use os.setsid to make it run in a new process group
p = subprocess.Popen(
cmd,
preexec_fn=os.setsid,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
universal_newlines=True,
)
os.close(slave_fd) # Close slave once p is using it
# Interactive loop to handle I/O
while p.poll() is None:
try:
r, w, exc = select.select([sys.stdin, master_fd], [], [], 0.1)
if sys.stdin in r:
try:
data = os.read(sys.stdin.fileno(), 10240)
if data:
os.write(master_fd, data)
except OSError:
# PTY closed or process terminated
break
elif master_fd in r:
try:
data = os.read(master_fd, 10240)
if data:
os.write(sys.stdout.fileno(), data)
else:
# EOF from process
break
except OSError:
# PTY closed or process terminated
break
except OSError:
# Select error or PTY issues
break
# Drain any remaining output
try:
while True:
r, w, exc = select.select([master_fd], [], [], 0.1)
if master_fd in r:
data = os.read(master_fd, 10240)
if data:
os.write(sys.stdout.fileno(), data)
else:
break
else:
break
except OSError:
pass
return p.returncode or 0
finally:
# Restore terminal settings
if old_tty and termios:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
os.close(master_fd)