"""Shared helpers for CLI evaluate / inspect — file discovery and formatting."""
from __future__ import annotations
import json
from pathlib import Path
import click
from topos import __version__
from topos.evaluation.characteristic_morphism import ClassificationResult
from topos.graphs.ast.dispatch import language_file_suffixes
from topos.mcp.evaluation import classify_file
from topos.utils.discovery import collect_source_files
_DETAIL_FILE_LIMIT = 5
[docs]
def collect_files(paths: tuple[str, ...], recursive: bool, language: str) -> list[Path]:
"""Collect source files for *language* from *paths* (files or directories)."""
suffixes = tuple(language_file_suffixes(language))
return collect_source_files(paths, suffixes=suffixes, recursive=recursive)
[docs]
def run_classify_file(
filepath: Path,
*,
priority: str,
gitnexus_dir: str | None,
) -> ClassificationResult:
"""Classify one file using the same pipeline as MCP evaluate-file."""
gdir = Path(gitnexus_dir).expanduser() if gitnexus_dir else None
from topos.evaluation.policies import Priority
result, _deps = classify_file(filepath, Priority(priority), gdir)
return result
[docs]
def result_to_row(filepath: Path, result: ClassificationResult) -> dict[str, object]:
"""Shape a :class:`ClassificationResult` for text/JSON CLI output."""
summary = result.summary()
entropy = result.raw_metrics.get("ast.entropy", 0.0)
return {
"file": str(filepath),
"is_parseable": result.is_parseable,
"lattice_element": summary.name,
"lattice_symbol": summary.symbol,
"dimensions": {dim: val.name for dim, val in result.dimensions.items()},
"dimension_symbols": {
dim: val.symbol for dim, val in result.dimensions.items()
},
"scores": {dim: round(s * 100.0, 1) for dim, s in result.scores.items()},
"priority": result.priority.value,
"raw_metrics": result.raw_metrics,
"entropy": entropy,
"valid": result.is_parseable,
"_result": result,
}
_PILLAR_THRESHOLDS = {
"simple": 60.0,
"composable": 60.0,
"secure": 70.0,
}
def _build_gauge(score: float, threshold: float, width: int = 10) -> str:
"""Build a mathematically aligned progress track."""
score_idx = min(width, max(0, round((score / 100.0) * width)))
threshold_idx = min(width, max(0, round((threshold / 100.0) * width)))
# Track characters: use dimmer line
track_char = "─"
if score_idx == threshold_idx:
color = "green" if score >= threshold else "red"
bullet = click.style("◆", fg=color, bold=True)
left = click.style(track_char * score_idx, dim=True)
right = click.style(track_char * (width - score_idx), dim=True)
return f"{left}{bullet}{right}"
elif score_idx < threshold_idx:
bullet = click.style("◆", fg="red", bold=True)
pipe = click.style("│", fg="white", dim=True)
left = click.style(track_char * score_idx, dim=True)
middle = click.style(track_char * (threshold_idx - score_idx - 1), dim=True)
right = click.style(track_char * (width - threshold_idx), dim=True)
return f"{left}{bullet}{middle}{pipe}{right}"
else:
bullet = click.style("◆", fg="green", bold=True)
pipe = click.style("│", fg="white", dim=True)
left = click.style(track_char * threshold_idx, dim=True)
middle = click.style(track_char * (score_idx - threshold_idx - 1), dim=True)
right = click.style(track_char * (width - score_idx), dim=True)
return f"{left}{pipe}{middle}{bullet}{right}"
def _display_score(result: dict[str, object]) -> float:
scores = result["scores"]
if not isinstance(scores, dict) or not scores:
return 0.0
values = [float(value) for value in scores.values()]
return sum(values) / len(values)
def _pillar_order(pillars: set[str]) -> list[str]:
preferred = ["simple", "composable", "secure"]
ordered = [pillar for pillar in preferred if pillar in pillars]
ordered.extend(sorted(pillars - set(preferred)))
return ordered
def _format_file_summary(result: dict[str, object]) -> str:
scores = result["scores"]
score_parts = []
if isinstance(scores, dict):
score_parts = [f"{dim} {float(score):.0f}" for dim, score in scores.items()]
score_text = " " + " ".join(score_parts) if score_parts else ""
medal = _format_medal(result)
return f"{result['file']} {medal} {_display_score(result):.0f}%{score_text}"
def _format_medal(result: dict[str, object]) -> str:
symbol = result.get("lattice_symbol", "")
element = result.get("lattice_element", "SLOP")
return f"[{symbol} {element}]"
def _format_file_row(
rank_str: str, filepath: str, result: dict[str, object], max_file_len: int = 42
) -> str:
# 1. File path truncation from left if it exceeds max_file_len
file_path_str = str(filepath)
if len(file_path_str) > max_file_len:
file_path_str = "..." + file_path_str[-(max_file_len - 3) :]
file_col = f"{file_path_str:<{max_file_len}}"
# 2. Medal formatting
symbol = result.get("lattice_symbol", "")
element = result.get("lattice_element", "SLOP")
medal_text = f"[{symbol} {element}]"
if element == "IDEAL":
medal_col = click.style(f"{medal_text:<16}", fg="green", bold=True)
elif "COMPOSABLE" in element or "SECURE" in element or "SIMPLE" in element:
if symbol == "🥈":
medal_col = click.style(f"{medal_text:<16}", fg="yellow", bold=True)
elif symbol == "🥉":
medal_col = click.style(f"{medal_text:<16}", fg="cyan")
else:
medal_col = click.style(f"{medal_text:<16}", fg="red")
else:
medal_col = click.style(f"{medal_text:<16}", fg="red", dim=True)
# 3. Score formatting (right aligned, padded with spaces)
avg_score = _display_score(result)
score_col = click.style(f"{avg_score:>3.0f}%", bold=True)
# 4. Pillar scores with micro-gauges
scores = result.get("scores", {})
pillar_parts = []
if isinstance(scores, dict):
ordered_pillars = ["simple", "composable", "secure"]
for p in ordered_pillars:
if p in scores:
p_score = float(scores[p])
p_label = p[0].upper() # 'S', 'C', 'S'
gauge = _build_gauge(p_score, _PILLAR_THRESHOLDS[p], width=10)
p_threshold = _PILLAR_THRESHOLDS[p]
score_color = "green" if p_score >= p_threshold else "red"
score_percent_str = click.style(f"{p_score:>3.0f}%", fg=score_color)
label_str = click.style(f"{p_label}:", dim=True)
bracket_left = click.style("[", dim=True)
bracket_right = click.style("]", dim=True)
gauge_str = (
f"{label_str}{score_percent_str} "
f"{bracket_left}{gauge}{bracket_right}"
)
pillar_parts.append(gauge_str)
pillar_col = " ".join(pillar_parts)
return f" {rank_str:<4} {file_col} {medal_col} {score_col} {pillar_col}"
def _output_file_details(results: list[dict[str, object]], verbose: bool) -> None:
for result in results:
symbol = result.get("lattice_symbol", "")
element = result.get("lattice_element", "SLOP")
medal_text = f"[{symbol} {element}]"
if element == "IDEAL":
medal_styled = click.style(medal_text, fg="green", bold=True)
elif symbol == "🥈":
medal_styled = click.style(medal_text, fg="yellow", bold=True)
elif symbol == "🥉":
medal_styled = click.style(medal_text, fg="cyan")
else:
medal_styled = click.style(medal_text, fg="red", dim=True)
avg_score = _display_score(result)
score_styled = click.style(f"{avg_score:.0f}%", bold=True)
click.echo(f" {result['file']} {medal_styled} {score_styled}")
scores = result.get("scores", {})
dimensions = result.get("dimensions", {})
if isinstance(scores, dict):
click.echo(click.style(" Pillar Progress vs Threshold (│)", dim=True))
click.echo(click.style(" ────── ───────────────────────────", dim=True))
for dim in ["simple", "composable", "secure"]:
if dim in scores:
score = float(scores[dim])
threshold = _PILLAR_THRESHOLDS[dim]
status = "PASS" if score >= threshold else "FAIL"
status_styled = click.style(
f"{status:<4}", fg="green" if status == "PASS" else "red"
)
gauge = _build_gauge(score, threshold, width=20)
dim_label = f" {dim:<10}"
bracket_l = click.style("[", dim=True)
bracket_r = click.style("]", dim=True)
score_str = click.style(
f"{score:>3.0f}%", fg="green" if status == "PASS" else "red"
)
req_str = click.style(f"(Req: {threshold:.0f}%)", dim=True)
click.echo(
f"{dim_label} {bracket_l}{gauge}{bracket_r} "
f"{score_str} {req_str} {status_styled}"
)
if not dimensions:
click.echo(click.style(" ⊥ SLOP (parse failure)", fg="red", dim=True))
if verbose:
click.echo(click.style(" Raw Metrics:", dim=True))
for key, value in result["raw_metrics"].items():
click.echo(click.style(f" {key}: {value:.3f}", dim=True))
if "error" in result:
click.echo(click.style(f" Error: {result['error']}", fg="red"))
_render_file_diagnostics(result)
click.echo()
def _render_file_diagnostics(result: dict[str, object]) -> None:
"""Render per-file security findings + suggestions (verbose mode)."""
from topos.cli.diagnostics import (
render_security_findings,
render_suggestions,
render_verdict_line,
)
active = result.get("_active_findings") or []
acknowledged = result.get("_acknowledged") or []
verdict = result.get("_verdict")
suggestions = result.get("_suggestions") or []
render_security_findings(active, acknowledged, indent=" ")
if verdict is not None:
render_verdict_line(verdict, indent=" ")
render_suggestions(suggestions, indent=" ")
def _lowest_hanging_fruit(
results: list[dict[str, object]],
) -> list[dict[str, object]]:
"""Find files whose cheapest failing pillar is closest to passing.
For each file we look at every *measured* pillar that fails its threshold,
take the one with the smallest gap (the easiest win), and rank files
ascending by that gap. Parse failures (no dimensions / SLOP) are skipped —
they belong in "Needs attention", not here. Returns up to the top 5.
"""
fruit: list[dict[str, object]] = []
for result in results:
scores = result.get("scores")
dimensions = result.get("dimensions")
if not isinstance(scores, dict) or not scores:
continue
# Skip parse failures — they have no measured dimensions to improve.
if not isinstance(dimensions, dict) or not dimensions:
continue
best_gap: dict[str, object] | None = None
for pillar, raw_score in scores.items():
threshold = _PILLAR_THRESHOLDS.get(str(pillar))
if threshold is None:
continue
score = float(raw_score)
if score >= threshold:
continue
gap = threshold - score
if best_gap is None or gap < float(best_gap["gap"]):
best_gap = {
"pillar": str(pillar),
"score": score,
"threshold": threshold,
"gap": gap,
}
if best_gap is None:
continue
suggestion = _suggestion_for_pillar(result, str(best_gap["pillar"]))
fruit.append(
{
"file": result["file"],
"pillar": best_gap["pillar"],
"score": best_gap["score"],
"threshold": best_gap["threshold"],
"gap": best_gap["gap"],
"suggestion": suggestion,
}
)
fruit.sort(key=lambda item: (float(item["gap"]), str(item["file"])))
return fruit[:5]
def _suggestion_for_pillar(result: dict[str, object], pillar: str) -> str | None:
"""Return the most relevant suggestion message for *pillar*, or None.
Prefers a "fix" (gate-failure) suggestion over an advisory "improve" one.
"""
suggestions = result.get("_suggestions") or []
matches = [s for s in suggestions if getattr(s, "pillar", None) == pillar]
if not matches:
return None
matches.sort(key=lambda s: 0 if getattr(s, "severity", "") == "fix" else 1)
return getattr(matches[0], "message", None)
[docs]
def output_directory_average(results: list[dict[str, object]]) -> None:
"""Consolidated into output_text. This is now a backward compatible no-op."""
pass
[docs]
def output_overall(overall: dict[str, object]) -> None:
"""Consolidated into output_text. This is now a backward compatible no-op."""
pass
[docs]
def output_text(results: list[dict[str, object]], verbose: bool) -> None:
"""Output results as a compact summary, with detailed rows in verbose mode."""
click.echo(
click.style(
f"Evaluated {len(results)} file{'s' if len(results) != 1 else ''}",
bold=True,
)
)
# Compute overall directory floor verdict
from topos.evaluation.characteristic_morphism import CharacteristicMorphism
classifier = CharacteristicMorphism()
classification_results = [r["_result"] for r in results]
overall = classifier.combine_dimensions(classification_results)
if verbose or len(results) <= _DETAIL_FILE_LIMIT:
click.echo()
click.echo(click.style("Files", fg="cyan", bold=True))
_output_file_details(results, verbose=verbose)
else:
pillars: set[str] = set()
for result in results:
scores = result["scores"]
if isinstance(scores, dict):
pillars.update(str(dim) for dim in scores)
if pillars:
click.echo()
click.echo(click.style("Pillars", fg="cyan", bold=True))
header_line = (
f" {'Pillar':<12} {'Status':<8} {'Avg Score':<11} "
f"{'Min Score':<11} {'Failures':<10} "
f"{'Progress vs Threshold (│)':<25}"
)
click.echo(click.style(header_line, bold=True))
divider = (
f" {'──────':<12} {'──────':<8} {'─────────':<11} "
f"{'─────────':<11} {'────────':<10} "
f"{'─────────────────────────':<25}"
)
click.echo(click.style(divider, dim=True))
for pillar in _pillar_order(pillars):
pillar_scores = [
float(result["scores"][pillar])
for result in results
if isinstance(result["scores"], dict) and pillar in result["scores"]
]
dimensions = [
result["dimensions"].get(pillar)
for result in results
if isinstance(result["dimensions"], dict)
]
avg_score = sum(pillar_scores) / len(pillar_scores)
min_score = min(pillar_scores)
failures = sum(1 for value in dimensions if value == "SLOP")
floor_val = overall.get(pillar)
is_passing = getattr(floor_val, "name", "SLOP") != "SLOP"
status_text = "PASS" if is_passing else "FAIL"
status_styled = click.style(
f"{status_text:<8}", fg="green" if is_passing else "red", bold=True
)
threshold = _PILLAR_THRESHOLDS.get(pillar, 60.0)
gauge = _build_gauge(avg_score, threshold, width=20)
click.echo(
f" {pillar:<12} "
f"{status_styled} "
f"{avg_score:>9.0f}% "
f"{min_score:>9.0f}% "
f"{failures:>3d}/{len(results):<6} "
f"{click.style('[', dim=True)}{gauge}{click.style(']', dim=True)}"
)
average = sum(_display_score(result) for result in results) / len(results)
avg_styled = click.style(f"{average:.0f}%", bold=True)
click.echo(f" Directory Average Score: {avg_styled} (Mean across all files)")
from topos.core.omega import EvaluationValue
meet_mask = 0
if overall.get("simple", EvaluationValue.SLOP) != EvaluationValue.SLOP:
meet_mask |= EvaluationValue.SIMPLE
if overall.get("composable", EvaluationValue.SLOP) != EvaluationValue.SLOP:
meet_mask |= EvaluationValue.COMPOSABLE
if overall.get("secure", EvaluationValue.SLOP) != EvaluationValue.SLOP:
meet_mask |= EvaluationValue.SECURE
meet_val = EvaluationValue(meet_mask)
meet_styled = click.style(
f"{meet_val.symbol} {meet_val.name}",
fg="green" if meet_val == EvaluationValue.IDEAL else "red",
bold=True,
)
click.echo(f" Directory Floor Verdict: {meet_styled} (Pointwise lattice meet)")
click.echo(
click.style(
" [INFO] The Floor Verdict is the minimum category-theoretic level "
"achieved across all files.",
dim=True,
)
)
if not (verbose or len(results) <= _DETAIL_FILE_LIMIT):
ranked = sorted(
results,
key=lambda result: (_display_score(result), result["file"]),
)
click.echo()
click.echo(click.style("Needs attention", fg="cyan", bold=True))
header_files = (
f" {'Rank':<4} {'File':<42} {'Verdict':<16} "
f"{'Score':<5} {'Pillar Scores'}"
)
click.echo(click.style(header_files, bold=True))
divider_files = (
f" {'────':<4} "
f"{'──────────────────────────────────────────':<42} "
f"{'───────':<16} {'─────':<5} {'─────────────'}"
)
click.echo(click.style(divider_files, dim=True))
for idx, result in enumerate(ranked[:5], start=1):
click.echo(_format_file_row(f"{idx}.", result["file"], result))
best = max(
results,
key=lambda result: (_display_score(result), result["file"]),
)
click.echo()
click.echo(click.style("Best file", fg="cyan", bold=True))
click.echo(click.style(header_files, bold=True))
click.echo(click.style(divider_files, dim=True))
click.echo(_format_file_row("-", best["file"], best))
_output_lowest_hanging_fruit(results)
def _output_lowest_hanging_fruit(results: list[dict[str, object]]) -> None:
"""Render the cheapest-win section: failing pillars closest to passing."""
fruit = _lowest_hanging_fruit(results)
click.echo()
click.echo(click.style("Lowest-hanging fruit", fg="cyan", bold=True))
if not fruit:
click.echo(
click.style(
" All measured pillars pass — no near-misses to fix.", dim=True
)
)
return
click.echo(
click.style(" Smallest improvement that flips a failing pillar.", dim=True)
)
for idx, item in enumerate(fruit, start=1):
pillar = str(item["pillar"])
score = float(item["score"])
threshold = float(item["threshold"])
gap = float(item["gap"])
file_str = str(item["file"])
if len(file_str) > 42:
file_str = "..." + file_str[-39:]
gap_str = click.style(
f"{pillar} {score:.0f}% → {threshold:.0f}% (+{gap:.0f} pts)",
fg="yellow",
bold=True,
)
click.echo(f" {idx}. {file_str}")
click.echo(f" {gap_str}")
suggestion = item.get("suggestion")
if suggestion:
click.echo(click.style(f" ↳ {suggestion}", dim=True))
[docs]
def output_json(results: list[dict[str, object]]) -> None:
"""Output results as JSON."""
serialisable = [
{k: v for k, v in r.items() if not k.startswith("_")} for r in results
]
output = {
"version": __version__,
"results": serialisable,
}
click.echo(json.dumps(output, indent=2))