#!/usr/bin/env python3 from __future__ import annotations import argparse import ast import json import sys from dataclasses import dataclass from pathlib import Path REPO_ROOT = Path(__file__).resolve().parents[2] PACKAGE_ROOT = REPO_ROOT / "alphapulldown" OMIT_PREFIXES = ( "alphapulldown/analysis_pipeline/af2plots/", ) @dataclass(frozen=True) class FunctionSpan: path: Path qualname: str lineno: int body_lineno: int end_lineno: int class FunctionCollector(ast.NodeVisitor): def __init__(self, path: Path) -> None: self.path = path self.stack: list[str] = [] self.functions: list[FunctionSpan] = [] def visit_ClassDef(self, node: ast.ClassDef) -> None: self.stack.append(node.name) self.generic_visit(node) self.stack.pop() def visit_FunctionDef(self, node: ast.FunctionDef) -> None: self._record(node) def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: self._record(node) def _record(self, node: ast.FunctionDef | ast.AsyncFunctionDef) -> None: qualname = ".".join([*self.stack, node.name]) if self.stack else node.name end_lineno = getattr(node, "end_lineno", node.lineno) body_lineno = getattr(node.body[0], "lineno", node.lineno) if node.body else node.lineno self.functions.append( FunctionSpan( path=self.path, qualname=qualname, lineno=node.lineno, body_lineno=body_lineno, end_lineno=end_lineno, ) ) self.stack.append(node.name) self.generic_visit(node) self.stack.pop() def iter_package_functions() -> list[FunctionSpan]: functions: list[FunctionSpan] = [] for path in sorted(PACKAGE_ROOT.rglob("*.py")): rel_path = path.relative_to(REPO_ROOT).as_posix() if any(rel_path.startswith(prefix) for prefix in OMIT_PREFIXES): continue tree = ast.parse(path.read_text(encoding="utf-8")) collector = FunctionCollector(path.relative_to(REPO_ROOT)) collector.visit(tree) functions.extend(collector.functions) return functions def normalize_coverage_paths(payload: dict) -> dict[Path, set[int]]: files = payload.get("files", {}) normalized: dict[Path, set[int]] = {} for raw_path, file_payload in files.items(): path = Path(raw_path) if path.is_absolute(): try: path = path.relative_to(REPO_ROOT) except ValueError: continue executed = set(file_payload.get("executed_lines", [])) normalized[path] = executed return normalized def check_function_coverage(coverage_json: Path, *, report_only: bool = False) -> int: payload = json.loads(coverage_json.read_text(encoding="utf-8")) executed_by_path = normalize_coverage_paths(payload) missing: list[FunctionSpan] = [] for function in iter_package_functions(): executed_lines = executed_by_path.get(function.path, set()) if not any( line in executed_lines for line in range(function.body_lineno, function.end_lineno + 1) ): missing.append(function) if not missing: print("Function coverage check passed: every alphapulldown function was executed at least once.") return 0 status = "report" if report_only else "failed" print(f"Function coverage check {status}. Missing functions:") for function in missing: print(f" {function.path}:{function.lineno} {function.qualname}") print(f"Total uncovered functions: {len(missing)}") return 0 if report_only else 1 def main() -> int: parser = argparse.ArgumentParser(description="Report or fail if alphapulldown functions were not executed.") parser.add_argument( "coverage_json", nargs="?", default="coverage.json", help="Path to coverage.py JSON report generated by pytest-cov.", ) parser.add_argument( "--report-only", action="store_true", help="Print uncovered functions without failing.", ) args = parser.parse_args() return check_function_coverage( Path(args.coverage_json), report_only=args.report_only, ) if __name__ == "__main__": sys.exit(main())