Source code for topos.mcp.tools.assess

"""
Assessment tool β€” compare current vs. proposed code on the lattice.

This is the main tool for agent refactor loops. When ``filepath`` is provided,
the baseline is evaluated against the cached ``ModuleDependencyGraph`` and the
proposed AST is scored against that same graph (approximating coupling under
the refactor). Anti-gaming guardrail: if scores moved meaningfully while AST
edit distance is near zero, status becomes ``SUSPICIOUS_NO_STRUCTURAL_CHANGE``.
"""

from __future__ import annotations

import difflib

from fastmcp.tools.base import ToolResult

from topos.core.morphism import ProgramMorphism
from topos.evaluation.characteristic_morphism import (
    CharacteristicMorphism,
    ClassificationResult,
)
from topos.evaluation.policies.base import Priority
from topos.functors.probes.cfg.complexity import cyclomatic_complexity
from topos.functors.profunctors.ast.compare import calculate_ast_distance
from topos.graphs.cfg.builder import _collect_callables, build_cfg_from_uast
from topos.graphs.cfg.object import ControlFlowGraph

from ..diagnostics import overlay_for_source
from ..evaluation import (
    classify_code_string,
    classify_morphism,
    gitnexus_warnings,
    load_dep_graph,
    resolve_gitnexus_dir,
)
from ..formatting import to_evaluation_result, to_tool_result
from ..schemas import (
    AgentContract,
    AssessImprovementInput,
    AssessmentResult,
    AssessmentStatus,
    EvaluationResult,
    LatticeElement,
    resolve_priority,
)
from ..security import (
    read_safe_utf8_file,
    resolve_file_root,
    resolve_within_root,
)
from ..server import mcp

_READ_ONLY_ANN = {
    "title": "Topos Refactor Assessment",
    "readOnlyHint": True,
    "destructiveHint": False,
    "idempotentHint": True,
    "openWorldHint": False,
}

# Near-zero edit distance threshold for gaming detection.
_STRUCTURAL_CHANGE_THRESHOLD = 0.02  # normalized distance
_MEANINGFUL_SCORE_DELTA = 3.0  # percentage points

# Cap the function-scoped regression diff so it stays a pinpoint, not a dump.
_REGRESSION_DIFF_MAX_LINES = 40


def _load_baseline(
    params: AssessImprovementInput, priority: Priority
) -> tuple[str, ProgramMorphism, ClassificationResult, bool, list[str], object | None]:
    if params.filepath:
        resolved, err = resolve_within_root(params.filepath)
        if err or resolved is None:
            raise ValueError((err or {}).get("error", "path error"))
        if not resolved.is_file():
            raise ValueError(f"Path is not a file: {resolved}")
        current_src, read_err = read_safe_utf8_file(resolved)
        if read_err or current_src is None:
            raise ValueError((read_err or {}).get("error", "read error"))
        project_root = resolve_file_root()
        gitnexus_dir = resolve_gitnexus_dir(params.gitnexus_dir, project_root)
        dep_graph = load_dep_graph(gitnexus_dir, str(resolved))
        current_morph = ProgramMorphism(source=current_src, language=params.language)
        current_res = classify_morphism(current_morph, priority, dep_graph)
        coupling_for_proposed = dep_graph is not None
        warnings = gitnexus_warnings(
            params.gitnexus_dir,
            project_root,
            gitnexus_dir,
            dep_graph_loaded=dep_graph is not None,
        )
        return (
            current_src,
            current_morph,
            current_res,
            coupling_for_proposed,
            warnings,
            dep_graph,
        )
    elif params.current_code:
        current_src = params.current_code
        current_res = classify_code_string(
            params.current_code, params.language, priority
        )
        current_morph = ProgramMorphism(
            source=params.current_code, language=params.language
        )
        coupling_for_proposed = False
        warnings = [
            "COMPOSABLE not scored β€” current_code mode has no filepath or "
            "ModuleDependencyGraph context."
        ]
        return (
            current_src,
            current_morph,
            current_res,
            coupling_for_proposed,
            warnings,
            None,
        )
    else:
        raise ValueError("Provide either `filepath` or `current_code`.")


def _is_suspicious(
    status: AssessmentStatus, distance: float | None, score_deltas: dict[str, float]
) -> bool:
    if distance is None:
        return False
    if distance >= _STRUCTURAL_CHANGE_THRESHOLD:
        return False
    if status not in (AssessmentStatus.IMPROVEMENT, AssessmentStatus.IMPROVEMENT_SCORE):
        return False
    return any(abs(d) >= _MEANINGFUL_SCORE_DELTA for d in score_deltas.values())


def _determine_lattice_status(
    cur_summary, prop_summary, score_deltas
) -> AssessmentStatus:
    lattice = CharacteristicMorphism().omega
    if cur_summary == prop_summary:
        score_improved = any(d > 0 for d in score_deltas.values())
        score_regressed = any(d < 0 for d in score_deltas.values())
        if score_improved and not score_regressed:
            return AssessmentStatus.IMPROVEMENT_SCORE
        if score_regressed and not score_improved:
            return AssessmentStatus.REGRESSION_SCORE
        return AssessmentStatus.LATERAL_MOVE

    if lattice.leq(cur_summary, prop_summary):
        return AssessmentStatus.IMPROVEMENT
    if lattice.leq(prop_summary, cur_summary):
        return AssessmentStatus.REGRESSION
    return AssessmentStatus.LATERAL_MOVE


def _determine_assessment_status(
    current_res, proposed_res, score_deltas, distance
) -> tuple[AssessmentStatus, str | None]:
    cur_summary = current_res.summary()
    prop_summary = proposed_res.summary()
    status = _determine_lattice_status(cur_summary, prop_summary, score_deltas)

    suspicion = None
    if _is_suspicious(status, distance, score_deltas):
        status = AssessmentStatus.SUSPICIOUS_NO_STRUCTURAL_CHANGE
        suspicion = (
            f"Scores improved (deltas={score_deltas}) but normalized AST edit "
            f"distance is only {distance:.3f} β€” the tree barely changed. Either "
            "the refactor is trivially cosmetic (comment/whitespace shuffle) "
            "or the scoring is oscillating. Re-verify with a concrete "
            "structural change."
        )
    return status, suspicion


def _evaluate_proposed(
    proposed_src: str,
    dep_graph,
    priority: Priority,
    language: str,
) -> tuple[ClassificationResult, ProgramMorphism]:
    proposed_morph = ProgramMorphism(source=proposed_src, language=language)
    proposed_res = classify_morphism(proposed_morph, priority, dep_graph)
    return proposed_res, proposed_morph


def _calculate_deltas(
    current_eval: EvaluationResult,
    proposed_eval: EvaluationResult,
    current_res: ClassificationResult,
    proposed_res: ClassificationResult,
) -> tuple[dict[str, float], dict[str, float]]:
    all_dims = set(current_eval.scores) | set(proposed_eval.scores)
    score_deltas = {
        dim: round(
            proposed_eval.scores.get(dim, 0.0) - current_eval.scores.get(dim, 0.0), 1
        )
        for dim in all_dims
    }

    all_metrics = set(current_res.raw_metrics) | set(proposed_res.raw_metrics)
    metric_deltas = {
        m: round(
            proposed_res.raw_metrics.get(m, 0.0) - current_res.raw_metrics.get(m, 0.0),
            3,
        )
        for m in all_metrics
    }
    return score_deltas, metric_deltas


[docs] @mcp.tool( name="topos_assess_improvement", tags={"assess", "workflow"}, annotations=_READ_ONLY_ANN, ) def topos_assess_improvement(params: AssessImprovementInput) -> ToolResult: """Compare proposed code against the current baseline. **Preferred usage** β€” pass ``filepath`` (code loaded from disk + coupling scored against the cached ``ModuleDependencyGraph``). The proposed code is parsed, but coupling is an approximation: it uses the *current* dep graph for the target file, so inbound edges from other files reflect the pre-refactor state. That's fine for tight iteration loops. **Legacy usage** β€” pass ``current_code`` + ``proposed_code``. Coupling is NOT computed (AST-only). Anti-gaming: when scores move meaningfully but AST edit distance is near zero, status becomes ``SUSPICIOUS_NO_STRUCTURAL_CHANGE`` and ``suspicion_reason`` is populated. """ priority, priority_source = resolve_priority(params.preferences) # ---- load baseline ---- try: ( current_src, current_morph, current_res, coupling_for_proposed, warnings, dep_graph, ) = _load_baseline(params, priority) except ValueError as exc: return _err_assessment(params, str(exc)) proposed_src, proposed_err = _load_proposed_source(params) if proposed_err or proposed_src is None: return _err_assessment( params, proposed_err or "Unable to load proposed source." ) # ---- evaluate proposed & findings ---- proposed_res, proposed_morph = _evaluate_proposed( proposed_src, dep_graph, priority, params.language, ) prefs = params.preferences.to_preferences() if params.preferences else None file_path = None if params.filepath: file_path, _ = resolve_within_root(params.filepath) current_overlay = overlay_for_source( current_src, params.language, current_res, file_path=file_path, allows=params.allow, include_security_findings=params.include_security_findings, ) proposed_overlay = overlay_for_source( proposed_src, params.language, proposed_res, file_path=file_path, allows=params.allow, include_security_findings=params.include_security_findings, ) # Warnings live on the top-level AssessmentResult only; the nested # current/proposed evals would otherwise duplicate the identical list. current_eval = to_evaluation_result( current_res, coupling_available=dep_graph is not None, preferences=prefs, priority_source=priority_source, include_agent_contract=False, **_overlay_kwargs(current_overlay), ) proposed_eval = to_evaluation_result( proposed_res, coupling_available=coupling_for_proposed, preferences=prefs, priority_source=priority_source, include_agent_contract=False, **_overlay_kwargs(proposed_overlay), ) # ---- score & metric deltas ---- score_deltas, metric_deltas = _calculate_deltas( current_eval, proposed_eval, current_res, proposed_res ) # ---- structural distance ---- distance = None similarity = None if current_res.is_parseable and proposed_res.is_parseable: dist = calculate_ast_distance(current_morph.ast, proposed_morph.ast) distance = dist.normalized_distance similarity = 1.0 - dist.normalized_distance # ---- status classification & anti-gaming ---- status, suspicion = _determine_assessment_status( current_res, proposed_res, score_deltas, distance ) # ---- regression pinpoint ---- # On a regression/suspicious verdict, give the agent a function-scoped diff # of the single worst function instead of forcing a full metric-tree diff. regression_diff = None if status in _REGRESSION_STATUSES: regression_diff = _regression_diff(current_src, proposed_src, params.language) model = AssessmentResult( status=status, priority=priority, priority_source=priority_source, current=current_eval, proposed=proposed_eval, score_deltas=score_deltas, metric_deltas=metric_deltas, structural_distance=distance, similarity=similarity, coupling_available_for_proposed=coupling_for_proposed, warnings=warnings, agent_contract=_assessment_contract(status, warnings, proposed_eval), suspicion_reason=suspicion, regression_diff=regression_diff, ) return to_tool_result(model, render_assessment_md(model))
def _err_assessment(params: AssessImprovementInput, msg: str) -> ToolResult: priority, priority_source = resolve_priority(params.preferences) empty = EvaluationResult( is_parseable=False, lattice_element=LatticeElement.SLOP, lattice_symbol="βŠ₯", lattice_description="not evaluated", dimensions={}, scores={}, priority=priority, priority_source=priority_source, guidance="", coupling_available=False, ) model = AssessmentResult( status=AssessmentStatus.LATERAL_MOVE, priority=priority, priority_source=priority_source, current=empty, proposed=empty, score_deltas={}, structural_distance=None, similarity=None, coupling_available_for_proposed=False, agent_contract=AgentContract( blocked_by=["assessment_error"], risk_flags=["assessment_error"], ), error=msg, ) return to_tool_result(model, render_assessment_md(model)) def _load_proposed_source( params: AssessImprovementInput, ) -> tuple[str | None, str | None]: if params.proposed_code is not None: return params.proposed_code, None if params.proposed_filepath is None: return None, "Provide exactly one of `proposed_code` or `proposed_filepath`." source, err = read_safe_utf8_file(params.proposed_filepath) if err: return None, err["error"] return source, None def _overlay_kwargs(overlay): if overlay is None: return {} return { "security_findings": overlay.active_findings, "acknowledged_risks": overlay.acknowledged_risks, "adjusted_verdict": overlay.verdict, } def _assessment_contract( status: AssessmentStatus, warnings: list[str], proposed_eval: EvaluationResult, ) -> AgentContract: risk_flags: list[str] = [] blocked_by: list[str] = [] next_actions: list[str] = [] if warnings: risk_flags.append("warnings") if proposed_eval.grade_capped: risk_flags.append("grade_capped") if proposed_eval.security_findings: risk_flags.append("active_security_findings") if status == AssessmentStatus.SUSPICIOUS_NO_STRUCTURAL_CHANGE: blocked_by.append("suspicious_no_structural_change") risk_flags.append("metric_gaming_risk") next_tool = "topos_inspect_code" next_actions.append("make a real structural change before reassessing") elif status in _REGRESSION_STATUSES: blocked_by.append("regression") risk_flags.append("regression") next_tool = "topos_inspect_code" next_actions.append("discard or revise the proposed change") elif status in (AssessmentStatus.IMPROVEMENT, AssessmentStatus.IMPROVEMENT_SCORE): next_tool = "topos_evaluate_project" next_actions.append("run project rollup and behavior checks before accepting") else: next_tool = "topos_inspect_code" next_actions.append("try a different focused structural change") return AgentContract( next_tool=next_tool, next_actions=next_actions, blocked_by=blocked_by, verification_gates=[ "assessment status is IMPROVEMENT or IMPROVEMENT_SCORE", "assessment status is not SUSPICIOUS_NO_STRUCTURAL_CHANGE", "behavior tests or type/lint checks pass when available", ], risk_flags=risk_flags, ) # --------------------------------------------------------------------------- # Regression pinpoint β€” function-scoped unified diff # --------------------------------------------------------------------------- # Statuses that warrant a targeted regression diff. _REGRESSION_STATUSES = frozenset( { AssessmentStatus.REGRESSION, AssessmentStatus.REGRESSION_SCORE, AssessmentStatus.SUSPICIOUS_NO_STRUCTURAL_CHANGE, } ) def _span_text(source_bytes: bytes, span) -> str: """Slice a UAST byte span out of the UTF-8-encoded source. UAST offsets are byte offsets, so we must index the encoded bytes, not the code-point-indexed str. Bounds-guarded like ``cpg/object.py`` in case the span refers to a different revision than ``source_bytes``. """ if span.end_byte > len(source_bytes): return "" return source_bytes[span.start_byte : span.end_byte].decode( "utf-8", errors="replace" ) def _function_complexities( source: str, language: str ) -> dict[str, tuple[int, list[str]]]: """Map function name -> (cyclomatic_complexity, source_lines). Mirrors the callable-collection pattern in ``inspect.py``. Source lines are sliced by the UAST byte span so they round-trip exactly into difflib. """ out: dict[str, tuple[int, list[str]]] = {} morph = ProgramMorphism(source=source, language=language) if not (morph.ast and morph.ast.uast_root): return out # UAST spans are UTF-8 byte offsets; encode ONCE and slice the bytes so # non-ASCII source (β†’, β€”, emoji) doesn't shift names/bodies. See _span_text. source_bytes = morph.source.encode("utf-8") try: callables = _collect_callables(morph.ast.uast_root) except Exception: return out for c in callables: name = c.attributes.get("name") if not name: for child in c.children: if child.kind == "Identifier": name = _span_text(source_bytes, child.span) break if not name: name = c.attributes.get("scope") or "anonymous" if name in out: # Overloads / duplicate names: skip rather than guess which moved. continue try: blocks, edges, entry_id, exit_id = build_cfg_from_uast(c) cfg = ControlFlowGraph( blocks=blocks, edges=edges, entry_id=entry_id, exit_id=exit_id ) complexity = cyclomatic_complexity(cfg) except Exception: continue body = _span_text(source_bytes, c.span) # No keepends: difflib + lineterm="" then a "\n".join keeps lines clean. out[name] = (complexity, body.splitlines()) return out def _regression_diff(current_src: str, proposed_src: str, language: str) -> str | None: """Unified diff of the single function with the worst complexity increase. Returns ``None`` (rather than a whole-file diff) when no function got more complex, or when function matching is ambiguous β€” keeps the output lean and actionable. stdlib ``difflib`` only. """ cur = _function_complexities(current_src, language) prop = _function_complexities(proposed_src, language) if not cur or not prop: return None # Match by name; find the largest ADVERSE complexity increase. worst_name: str | None = None worst_delta = 0 for name, (prop_cx, _) in prop.items(): if name not in cur: # Rename/add β€” don't dump a whole-function diff. Fallback: None. continue delta = prop_cx - cur[name][0] if delta > worst_delta: worst_delta = delta worst_name = name if worst_name is None: return None cur_cx, cur_lines = cur[worst_name] prop_cx, prop_lines = prop[worst_name] diff_lines = list( difflib.unified_diff( cur_lines, prop_lines, fromfile=f"{worst_name} (current)", tofile=f"{worst_name} (proposed)", lineterm="", ) ) if not diff_lines: return None header = ( f"# regression in `{worst_name}`: cyclomatic complexity " f"{cur_cx} -> {prop_cx} ({prop_cx - cur_cx:+d})" ) body = diff_lines if len(body) > _REGRESSION_DIFF_MAX_LINES: hidden = len(body) - _REGRESSION_DIFF_MAX_LINES body = body[:_REGRESSION_DIFF_MAX_LINES] body.append(f"# ... (truncated, {hidden} more lines)") return "\n".join([header, *body]) # --------------------------------------------------------------------------- # Markdown renderer (rendered into ToolResult.content) # --------------------------------------------------------------------------- _STATUS_MEANING: dict[AssessmentStatus, str] = { AssessmentStatus.IMPROVEMENT: "moved up the lattice", AssessmentStatus.IMPROVEMENT_SCORE: "same verdict, scores improved", AssessmentStatus.LATERAL_MOVE: "no verdict or score movement", AssessmentStatus.REGRESSION: "moved down the lattice", AssessmentStatus.REGRESSION_SCORE: "same verdict, scores regressed", AssessmentStatus.SUSPICIOUS_NO_STRUCTURAL_CHANGE: ( "scores moved but the AST barely changed" ), } def _render_deltas(r: AssessmentResult) -> list[str]: lines = [] if r.score_deltas: deltas = ", ".join(f"{k}={v:+.1f}" for k, v in sorted(r.score_deltas.items())) lines.append(f"**Score deltas:** {deltas}") moved = {m: d for m, d in r.metric_deltas.items() if d != 0.0} if moved: md = ", ".join(f"`{m}`={d:+.3f}" for m, d in sorted(moved.items())) lines.append(f"**Metric deltas:** {md}") return lines
[docs] def render_assessment_md(r: AssessmentResult) -> str: """Compact markdown for a refactor assessment. Summarizes current vs. proposed rather than dumping both full evaluations; the structured_content channel still carries everything. """ if r.error: return f"**Error:** {r.error}" meaning = _STATUS_MEANING.get(r.status, "") lines = [f"**Status:** {r.status.value} β€” {meaning}"] lines.append(f"**Priority:** `{r.priority.value}`") lines.append( f"**Verdict:** {r.current.lattice_element.value} β†’ " f"{r.proposed.lattice_element.value}" ) if r.structural_distance is not None: sim = f", similarity {r.similarity:.3f}" if r.similarity is not None else "" lines.append(f"**Structural distance:** {r.structural_distance:.3f}{sim}") if r.agent_contract is not None and ( r.agent_contract.next_tool or r.agent_contract.next_actions or r.agent_contract.blocked_by ): lines.append("") lines.append("## Agent Contract") if r.agent_contract.next_tool: lines.append(f"- **Next tool:** `{r.agent_contract.next_tool}`") for action in r.agent_contract.next_actions: lines.append(f"- **Action:** {action}") for blocked in r.agent_contract.blocked_by: lines.append(f"- **Blocked by:** `{blocked}`") lines.extend(_render_deltas(r)) if r.suspicion_reason: lines.append(f"> ⚠️ {r.suspicion_reason}") if r.regression_diff: lines.append("") lines.append("## Regression diff") lines.append("```diff") lines.append(r.regression_diff) lines.append("```") return "\n".join(lines)