# Copyright (C) 2025 The ODUIT Authors.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at https://mozilla.org/MPL/2.0/.
"""Structured runtime inspection helpers built on top of OdooCodeExecutor."""
from __future__ import annotations
import re
from textwrap import dedent
from typing import Any
from .config_provider import ConfigProvider
from .odoo_code_executor import OdooCodeExecutor
from .odoo_query import OdooQuery
from .schemas import (
CONTROLLED_RUNTIME_MUTATION,
SAFE_READ_ONLY,
UNSAFE_ARBITRARY_EXECUTION,
)
[docs]
class OdooInspector:
"""Expose structured Odoo inspection workflows for CLI and Python callers."""
_MODEL_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_.]*$")
_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
_XMLID_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*\.[A-Za-z_][A-Za-z0-9_.]*$")
[docs]
def __init__(self, config: ConfigProvider | dict[str, Any]):
if isinstance(config, ConfigProvider):
self.config_provider = config
else:
self.config_provider = ConfigProvider(config)
self._query = OdooQuery(self.config_provider)
self._executor = OdooCodeExecutor(self.config_provider)
[docs]
def execute_code(
self,
code: str,
*,
database: str | None = None,
commit: bool = False,
timeout: float = 30.0,
) -> dict[str, Any]:
"""Execute trusted arbitrary Python within Odoo via the embedded runtime."""
if not isinstance(code, str) or not code.strip():
return self._validation_error(
"execute_code",
"code must be a non-empty string",
read_only=False,
safety_level=UNSAFE_ARBITRARY_EXECUTION,
)
database_error = self._validate_database(database)
if database_error:
return self._validation_error(
"execute_code",
database_error,
database=database,
read_only=False,
safety_level=UNSAFE_ARBITRARY_EXECUTION,
)
timeout_value, timeout_error = self._validate_timeout(timeout)
if timeout_error:
return self._validation_error(
"execute_code",
timeout_error,
database=database,
read_only=False,
safety_level=UNSAFE_ARBITRARY_EXECUTION,
)
result = self._executor.execute_code(
code,
database=database,
commit=commit,
timeout=timeout_value,
allow_unsafe=True,
)
return self._finalize_result(
"execute_code",
result,
database=database,
commit=commit,
read_only=False,
safety_level=UNSAFE_ARBITRARY_EXECUTION,
)
[docs]
def inspect_ref(
self,
xmlid: str,
*,
database: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]:
"""Resolve one XMLID and return stable record metadata."""
xmlid_error = self._validate_xmlid(xmlid)
if xmlid_error:
return self._validation_error("inspect_ref", xmlid_error, xmlid=xmlid)
result = self._execute_generated(
"inspect_ref",
dedent(
f"""
_oduit_xmlid = {xmlid!r}
try:
_oduit_record = env.ref(_oduit_xmlid)
except ValueError:
_oduit_result = {{
"xmlid": _oduit_xmlid,
"exists": False,
"model": None,
"res_id": None,
"display_name": None,
}}
else:
_oduit_result = {{
"xmlid": _oduit_xmlid,
"exists": True,
"model": _oduit_record._name,
"res_id": _oduit_record.id,
"display_name": getattr(
_oduit_record,
"display_name",
str(_oduit_record),
),
}}
_oduit_result
"""
).strip(),
database=database,
timeout=timeout,
xmlid=xmlid,
)
return self._require_exists(
result,
field_name="xmlid",
value=xmlid,
message=f"XMLID {xmlid!r} was not found",
)
[docs]
def inspect_cron(
self,
xmlid: str,
*,
trigger: bool = False,
database: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]:
"""Inspect one cron job and optionally trigger it."""
xmlid_error = self._validate_xmlid(xmlid)
if xmlid_error:
return self._validation_error(
"inspect_cron",
xmlid_error,
xmlid=xmlid,
read_only=not trigger,
safety_level=(
SAFE_READ_ONLY if not trigger else CONTROLLED_RUNTIME_MUTATION
),
)
result = self._execute_generated(
"inspect_cron",
dedent(
f"""
_oduit_xmlid = {xmlid!r}
_oduit_trigger = {trigger!r}
try:
_oduit_record = env.ref(_oduit_xmlid)
except ValueError:
_oduit_result = {{
"xmlid": _oduit_xmlid,
"exists": False,
"is_cron": False,
"trigger_requested": _oduit_trigger,
"triggered": False,
}}
else:
_oduit_is_cron = _oduit_record._name == "ir.cron"
_oduit_result = {{
"xmlid": _oduit_xmlid,
"exists": True,
"is_cron": _oduit_is_cron,
"model": _oduit_record._name,
"res_id": _oduit_record.id,
"name": getattr(_oduit_record, "name", None),
"active": bool(getattr(_oduit_record, "active", False)),
"interval_number": getattr(
_oduit_record,
"interval_number",
None,
),
"interval_type": getattr(_oduit_record, "interval_type", None),
"numbercall": getattr(_oduit_record, "numbercall", None),
"doall": bool(getattr(_oduit_record, "doall", False)),
"user_id": (
[
getattr(_oduit_record.user_id, "id", None),
getattr(_oduit_record.user_id, "name", None),
]
if getattr(_oduit_record, "user_id", None)
else None
),
"nextcall": (
str(getattr(_oduit_record, "nextcall"))
if getattr(_oduit_record, "nextcall", None) else None
),
"state": getattr(_oduit_record, "state", None),
"code": getattr(_oduit_record, "code", None),
"trigger_requested": _oduit_trigger,
"triggered": False,
}}
if _oduit_is_cron and _oduit_trigger:
_oduit_record.method_direct_trigger()
_oduit_result["triggered"] = True
_oduit_result
"""
).strip(),
database=database,
timeout=timeout,
commit=trigger,
read_only=not trigger,
safety_level=(
SAFE_READ_ONLY if not trigger else CONTROLLED_RUNTIME_MUTATION
),
xmlid=xmlid,
trigger_requested=trigger,
)
result = self._require_exists(
result,
field_name="xmlid",
value=xmlid,
message=f"Cron XMLID {xmlid!r} was not found",
)
if result.get("success") and not result.get("is_cron", False):
result["success"] = False
result["error"] = f"XMLID {xmlid!r} does not reference an ir.cron record"
result["error_type"] = "ValidationError"
return result
[docs]
def inspect_modules(
self,
*,
state: str | None = None,
names_only: bool = False,
database: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]:
"""List runtime addon inventory from ``ir.module.module``."""
state_filter: list[str] | None = None
if state is not None:
name_error = self._validate_name(state, "state")
if name_error:
return self._validation_error(
"inspect_modules",
name_error,
state=state,
)
state_filter = [state]
result = self._query.query_model(
"ir.module.module",
domain=[["state", "in", state_filter]] if state_filter else [],
fields=["name", "state", "shortdesc", "application", "auto_install"],
limit=500,
database=database,
timeout=timeout,
)
final_result = self._finalize_result(
"inspect_modules",
result,
database=database,
state=state,
names_only=names_only,
)
if not final_result.get("success"):
return final_result
modules = sorted(
[
{
"name": str(record.get("name", "")),
"state": str(record.get("state", "")),
"shortdesc": record.get("shortdesc"),
"application": self._normalize_optional_bool(
record.get("application")
),
"auto_install": self._normalize_optional_bool(
record.get("auto_install")
),
}
for record in final_result.get("records", [])
if record.get("name")
],
key=lambda record: record["name"],
)
final_result["modules"] = modules
final_result["names"] = [record["name"] for record in modules]
final_result["total"] = len(modules)
return final_result
[docs]
def inspect_subtypes(
self,
model: str,
*,
database: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]:
"""List ``mail.message.subtype`` records for one model."""
model_error = self._validate_model(model)
if model_error:
return self._validation_error("inspect_subtypes", model_error, model=model)
result = self._query.query_model(
"mail.message.subtype",
domain=[["res_model", "=", model]],
fields=[
"name",
"description",
"default",
"internal",
"hidden",
"sequence",
"parent_id",
"relation_field",
"res_model",
],
limit=500,
database=database,
timeout=timeout,
)
final_result = self._finalize_result(
"inspect_subtypes",
result,
database=database,
model=model,
)
if not final_result.get("success"):
return final_result
subtypes = sorted(
final_result.get("records", []),
key=lambda record: (
int(record.get("sequence", 0) or 0),
str(record.get("name", "")),
),
)
final_result["subtypes"] = subtypes
final_result["total"] = len(subtypes)
return final_result
[docs]
def inspect_model(
self,
model: str,
*,
database: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]:
"""Inspect model registration metadata."""
model_error = self._validate_model(model)
if model_error:
return self._validation_error("inspect_model", model_error, model=model)
result = self._execute_generated(
"inspect_model",
dedent(
f"""
_oduit_model_name = {model!r}
_oduit_model_class = registry.models.get(_oduit_model_name)
if _oduit_model_class is None:
_oduit_result = {{
"model": _oduit_model_name,
"exists": False,
}}
else:
_oduit_model = env[_oduit_model_name]
_oduit_inherits = getattr(_oduit_model, "_inherits", {{}})
_oduit_fields = getattr(_oduit_model, "_fields", {{}})
_oduit_inherit = getattr(_oduit_model, "_inherit", None)
if isinstance(_oduit_inherit, (list, tuple)):
_oduit_inherit_list = list(_oduit_inherit)
elif _oduit_inherit:
_oduit_inherit_list = [_oduit_inherit]
else:
_oduit_inherit_list = []
_oduit_result = {{
"model": _oduit_model_name,
"exists": True,
"registered_name": getattr(
_oduit_model,
"_name",
_oduit_model_name,
),
"table": getattr(_oduit_model, "_table", None),
"transient": bool(getattr(_oduit_model, "_transient", False)),
"abstract": bool(getattr(_oduit_model, "_abstract", False)),
"auto": bool(getattr(_oduit_model, "_auto", True)),
"rec_name": getattr(_oduit_model, "_rec_name", None),
"inherit": _oduit_inherit_list,
"inherits": sorted(_oduit_inherits.keys()),
"field_count": len(_oduit_fields),
"field_names": sorted(_oduit_fields.keys()),
}}
_oduit_result
"""
).strip(),
database=database,
timeout=timeout,
model=model,
)
return self._require_exists(
result,
field_name="model",
value=model,
message=f"Model {model!r} is not registered in the active Odoo runtime",
)
[docs]
def inspect_field(
self,
model: str,
field: str,
*,
with_db: bool = False,
database: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]:
"""Inspect one ORM field, optionally including DB-level metadata."""
model_error = self._validate_model(model)
if model_error:
return self._validation_error(
"inspect_field",
model_error,
model=model,
field=field,
)
field_error = self._validate_name(field, "field")
if field_error:
return self._validation_error(
"inspect_field",
field_error,
model=model,
field=field,
)
result = self._execute_generated(
"inspect_field",
dedent(
f"""
_oduit_model_name = {model!r}
_oduit_field_name = {field!r}
_oduit_with_db = {with_db!r}
_oduit_model_class = registry.models.get(_oduit_model_name)
if _oduit_model_class is None:
_oduit_result = {{
"model": _oduit_model_name,
"field": _oduit_field_name,
"model_exists": False,
"exists": False,
}}
else:
_oduit_model = env[_oduit_model_name]
_oduit_fields = getattr(_oduit_model, "_fields", {{}})
_oduit_field = _oduit_fields.get(_oduit_field_name)
if _oduit_field is None:
_oduit_result = {{
"model": _oduit_model_name,
"field": _oduit_field_name,
"model_exists": True,
"exists": False,
}}
else:
_oduit_field_type = getattr(_oduit_field, "type", None)
_oduit_field_relation = getattr(_oduit_field, "relation", None)
_oduit_selection = getattr(_oduit_field, "selection", None)
if callable(_oduit_selection):
_oduit_selection = None
_oduit_related = (
list(getattr(_oduit_field, "related", []) or []) or None
)
_oduit_selection_list = (
list(_oduit_selection)
if isinstance(_oduit_selection, (list, tuple))
else None
)
_oduit_model_table = getattr(_oduit_model, "_table", None)
_oduit_result = {{
"model": _oduit_model_name,
"field": _oduit_field_name,
"model_exists": True,
"exists": True,
"field_type": _oduit_field_type,
"relation": _oduit_field_relation,
"comodel_name": getattr(_oduit_field, "comodel_name", None),
"required": bool(getattr(_oduit_field, "required", False)),
"readonly": bool(getattr(_oduit_field, "readonly", False)),
"store": bool(getattr(_oduit_field, "store", False)),
"index": bool(getattr(_oduit_field, "index", False)),
"translate": bool(
getattr(_oduit_field, "translate", False)
),
"company_dependent": bool(
getattr(_oduit_field, "company_dependent", False)
),
"compute": getattr(_oduit_field, "compute", None),
"inverse": getattr(_oduit_field, "inverse", None),
"related": _oduit_related,
"selection": _oduit_selection_list,
"db_table_name": _oduit_model_table,
"db_column_name": None,
"db_column_found": None,
"db_column_type": None,
"db_data_type": None,
"db_nullable": None,
"m2m_relation_table": (
_oduit_field_relation
if _oduit_field_type == "many2many"
else None
),
"m2m_column1": getattr(_oduit_field, "column1", None),
"m2m_column2": getattr(_oduit_field, "column2", None),
"relation_table_exists": None,
}}
if _oduit_with_db and bool(
getattr(_oduit_model, "_auto", True)
):
if _oduit_field_type == "many2many":
if _oduit_field_relation:
cr.execute(
"SELECT EXISTS("
"SELECT 1 FROM information_schema.tables "
"WHERE table_schema = 'public' "
"AND table_name = %s"
")",
(_oduit_field_relation,),
)
_oduit_result["relation_table_exists"] = bool(
(cr.fetchone() or [False])[0]
)
elif bool(getattr(_oduit_field, "store", False)):
_oduit_column_name = (
getattr(_oduit_field, "column", None)
or _oduit_field_name
)
_oduit_result["db_column_name"] = _oduit_column_name
cr.execute(
'''
SELECT data_type, udt_name, is_nullable
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = %s
AND column_name = %s
''',
(_oduit_model_table, _oduit_column_name),
)
_oduit_column_row = cr.fetchone()
_oduit_result["db_column_found"] = bool(
_oduit_column_row
)
if _oduit_column_row:
_oduit_result["db_data_type"] = _oduit_column_row[0]
_oduit_result["db_column_type"] = (
_oduit_column_row[1]
)
_oduit_result["db_nullable"] = (
_oduit_column_row[2] == "YES"
)
_oduit_result
"""
).strip(),
database=database,
timeout=timeout,
model=model,
field=field,
with_db=with_db,
)
result = self._require_exists(
result,
field_name="model",
value=model,
exists_field="model_exists",
message=f"Model {model!r} is not registered in the active Odoo runtime",
)
if result.get("success") and not result.get("exists", False):
result["success"] = False
result["error"] = f"Field {field!r} was not found on model {model!r}"
result["error_type"] = "NotFoundError"
return result
[docs]
def inspect_recordset(
self,
expression: str,
*,
database: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]:
"""Execute a trusted recordset expression as an escape hatch."""
result = self.execute_code(
expression,
database=database,
commit=False,
timeout=timeout,
)
result["operation"] = "inspect_recordset"
result["expression"] = expression
return result
[docs]
def describe_table(
self,
table_name: str,
*,
database: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]:
"""Describe columns for one PostgreSQL table."""
table_error = self._validate_name(table_name, "table_name")
if table_error:
return self._validation_error(
"describe_table",
table_error,
table_name=table_name,
)
result = self._execute_generated(
"describe_table",
dedent(
f"""
_oduit_table_name = {table_name!r}
cr.execute(
'''
SELECT
column_name,
data_type,
udt_name,
is_nullable,
column_default,
ordinal_position
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = %s
ORDER BY ordinal_position
''',
(_oduit_table_name,),
)
_oduit_rows = cr.fetchall()
_oduit_result = {{
"table_name": _oduit_table_name,
"exists": bool(_oduit_rows),
"columns": [
{{
"column_name": row[0],
"data_type": row[1],
"udt_name": row[2],
"nullable": row[3] == "YES",
"default": row[4],
"ordinal_position": row[5],
}}
for row in _oduit_rows
],
"column_count": len(_oduit_rows),
}}
_oduit_result
"""
).strip(),
database=database,
timeout=timeout,
table_name=table_name,
)
return self._require_exists(
result,
field_name="table_name",
value=table_name,
message=f"Table {table_name!r} was not found",
)
[docs]
def describe_column(
self,
table_name: str,
column_name: str,
*,
database: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]:
"""Describe one PostgreSQL column."""
table_error = self._validate_name(table_name, "table_name")
if table_error:
return self._validation_error(
"describe_column",
table_error,
table_name=table_name,
column_name=column_name,
)
column_error = self._validate_name(column_name, "column_name")
if column_error:
return self._validation_error(
"describe_column",
column_error,
table_name=table_name,
column_name=column_name,
)
result = self._execute_generated(
"describe_column",
dedent(
f"""
_oduit_table_name = {table_name!r}
_oduit_column_name = {column_name!r}
cr.execute(
'''
SELECT
data_type,
udt_name,
is_nullable,
column_default,
ordinal_position
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = %s
AND column_name = %s
''',
(_oduit_table_name, _oduit_column_name),
)
_oduit_row = cr.fetchone()
_oduit_result = {{
"table_name": _oduit_table_name,
"column_name": _oduit_column_name,
"exists": bool(_oduit_row),
"column": (
{{
"column_name": _oduit_column_name,
"data_type": _oduit_row[0],
"udt_name": _oduit_row[1],
"nullable": _oduit_row[2] == "YES",
"default": _oduit_row[3],
"ordinal_position": _oduit_row[4],
}}
if _oduit_row else None
),
}}
_oduit_result
"""
).strip(),
database=database,
timeout=timeout,
table_name=table_name,
column_name=column_name,
)
return self._require_exists(
result,
field_name="column_name",
value=column_name,
message=f"Column {column_name!r} was not found on table {table_name!r}",
)
[docs]
def list_constraints(
self,
table_name: str,
*,
database: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]:
"""List PostgreSQL constraints for one table."""
table_error = self._validate_name(table_name, "table_name")
if table_error:
return self._validation_error(
"list_constraints",
table_error,
table_name=table_name,
)
result = self._execute_generated(
"list_constraints",
dedent(
f"""
_oduit_table_name = {table_name!r}
cr.execute(
"SELECT EXISTS("
"SELECT 1 FROM information_schema.tables "
"WHERE table_schema = 'public' "
"AND table_name = %s"
")",
(_oduit_table_name,),
)
_oduit_exists = bool((cr.fetchone() or [False])[0])
cr.execute(
'''
SELECT c.conname, c.contype, pg_get_constraintdef(c.oid)
FROM pg_catalog.pg_constraint c
JOIN pg_catalog.pg_class t ON t.oid = c.conrelid
JOIN pg_catalog.pg_namespace n ON n.oid = t.relnamespace
WHERE n.nspname = 'public'
AND t.relname = %s
ORDER BY c.conname
''',
(_oduit_table_name,),
)
_oduit_type_names = {{
"p": "primary_key",
"f": "foreign_key",
"u": "unique",
"c": "check",
"x": "exclusion",
}}
_oduit_rows = cr.fetchall()
_oduit_result = {{
"table_name": _oduit_table_name,
"exists": _oduit_exists,
"constraints": [
{{
"name": row[0],
"constraint_type": _oduit_type_names.get(row[1], row[1]),
"definition": row[2],
}}
for row in _oduit_rows
],
"total": len(_oduit_rows),
}}
_oduit_result
"""
).strip(),
database=database,
timeout=timeout,
table_name=table_name,
)
return self._require_exists(
result,
field_name="table_name",
value=table_name,
message=f"Table {table_name!r} was not found",
)
[docs]
def list_tables(
self,
pattern: str | None = None,
*,
database: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]:
"""List PostgreSQL tables, optionally filtered by an ILIKE pattern."""
if pattern is not None and (
not isinstance(pattern, str) or not pattern.strip()
):
return self._validation_error(
"list_tables",
"pattern must be a non-empty string when provided",
pattern=pattern,
)
result = self._execute_generated(
"list_tables",
dedent(
f"""
_oduit_pattern = {pattern!r}
_oduit_like = f"%{{_oduit_pattern}}%" if _oduit_pattern else None
_oduit_query = [
"SELECT table_name",
"FROM information_schema.tables",
"WHERE table_schema = 'public'",
" AND table_type = 'BASE TABLE'",
]
_oduit_params = []
if _oduit_like is not None:
_oduit_query.append(" AND table_name ILIKE %s")
_oduit_params.append(_oduit_like)
_oduit_query.append("ORDER BY table_name")
cr.execute("\\n".join(_oduit_query), tuple(_oduit_params))
_oduit_rows = cr.fetchall()
{{
"pattern": _oduit_pattern,
"tables": [row[0] for row in _oduit_rows],
"total": len(_oduit_rows),
}}
"""
).strip(),
database=database,
timeout=timeout,
pattern=pattern,
)
return result
[docs]
def inspect_m2m(
self,
model: str,
field: str,
*,
database: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]:
"""Inspect Many2many relation-table metadata for one ORM field."""
field_result = self.inspect_field(
model,
field,
with_db=True,
database=database,
timeout=timeout,
)
final_result = self._finalize_result(
"inspect_m2m",
field_result,
model=model,
field=field,
database=database,
)
if not final_result.get("success"):
return final_result
if final_result.get("field_type") != "many2many":
final_result["success"] = False
final_result["error"] = (
f"Field {field!r} on model {model!r} is not a many2many field"
)
final_result["error_type"] = "ValidationError"
return final_result
final_result["relation_table"] = final_result.get("m2m_relation_table")
final_result["column1"] = final_result.get("m2m_column1")
final_result["column2"] = final_result.get("m2m_column2")
return final_result
def _execute_generated(
self,
operation: str,
code: str,
*,
database: str | None = None,
timeout: float = 30.0,
commit: bool = False,
read_only: bool = True,
safety_level: str = SAFE_READ_ONLY,
**metadata: Any,
) -> dict[str, Any]:
database_error = self._validate_database(database)
if database_error:
return self._validation_error(
operation,
database_error,
read_only=read_only,
safety_level=safety_level,
**metadata,
)
timeout_value, timeout_error = self._validate_timeout(timeout)
if timeout_error:
return self._validation_error(
operation,
timeout_error,
read_only=read_only,
safety_level=safety_level,
**metadata,
)
result = self._executor._execute_generated_code(
code,
database=database,
commit=commit,
timeout=timeout_value,
)
return self._finalize_result(
operation,
result,
database=database,
read_only=read_only,
safety_level=safety_level,
**metadata,
)
@staticmethod
def _normalize_optional_bool(value: Any) -> bool | None:
if value is None:
return None
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "on"}
return bool(value)
def _validation_error(
self,
operation: str,
message: str,
*,
read_only: bool = True,
safety_level: str = SAFE_READ_ONLY,
**details: Any,
) -> dict[str, Any]:
result = {
"success": False,
"operation": operation,
"error": message,
"error_type": "ValidationError",
"read_only": read_only,
"safety_level": safety_level,
}
for key, value in details.items():
if value is not None:
result[key] = value
return result
def _finalize_result(
self,
operation: str,
result: dict[str, Any],
*,
read_only: bool = True,
safety_level: str = SAFE_READ_ONLY,
**metadata: Any,
) -> dict[str, Any]:
final_result = result.copy()
final_result["operation"] = operation
final_result["read_only"] = read_only
final_result["safety_level"] = safety_level
for key, value in metadata.items():
if value is not None:
final_result[key] = value
if final_result.get("success") and isinstance(final_result.get("value"), dict):
final_result.update(final_result["value"])
return final_result
@staticmethod
def _require_exists(
result: dict[str, Any],
*,
field_name: str,
value: str,
message: str,
exists_field: str = "exists",
) -> dict[str, Any]:
if result.get("success") and not result.get(exists_field, False):
result["success"] = False
result["error"] = message
result["error_type"] = "NotFoundError"
result[field_name] = value
return result
def _validate_model(self, value: str) -> str | None:
return self._validate_pattern(
value,
pattern=self._MODEL_RE,
field_name="model",
invalid_message="model contains invalid characters",
)
def _validate_name(self, value: str, field_name: str) -> str | None:
return self._validate_pattern(
value,
pattern=self._NAME_RE,
field_name=field_name,
invalid_message=f"{field_name} contains invalid characters",
)
def _validate_xmlid(self, value: str) -> str | None:
return self._validate_pattern(
value,
pattern=self._XMLID_RE,
field_name="xmlid",
invalid_message="xmlid must use the form module.record_name",
)
@staticmethod
def _validate_pattern(
value: str,
*,
pattern: re.Pattern[str],
field_name: str,
invalid_message: str,
) -> str | None:
if not isinstance(value, str) or not value.strip():
return f"{field_name} must be a non-empty string"
if not pattern.match(value):
return invalid_message
return None
@staticmethod
def _validate_database(database: str | None) -> str | None:
if database is None:
return None
if not isinstance(database, str) or not database.strip():
return "database must be a non-empty string when provided"
return None
@staticmethod
def _validate_timeout(timeout: float) -> tuple[float, str | None]:
if isinstance(timeout, bool) or not isinstance(timeout, int | float):
return 0.0, "timeout must be a number"
if timeout <= 0:
return 0.0, "timeout must be greater than zero"
return float(timeout), None
@staticmethod
def _validate_limit(limit: int) -> tuple[int, str | None]:
if isinstance(limit, bool) or not isinstance(limit, int):
return 0, "limit must be an integer"
if limit <= 0:
return 0, "limit must be greater than zero"
if limit > 500:
return 0, "limit must be less than or equal to 500"
return limit, None