From 839d3978d3eaaefcdca474f374c62cfcdb4314f6 Mon Sep 17 00:00:00 2001 From: Slawomir Koszewski Date: Wed, 10 Jun 2026 18:26:01 +0200 Subject: [PATCH] Add script to generate Markdown reports from Terraform plans using OpenAI --- examples/generate_tfplan_report.py | 413 +++++++++++++++++++++++++++++ 1 file changed, 413 insertions(+) create mode 100755 examples/generate_tfplan_report.py diff --git a/examples/generate_tfplan_report.py b/examples/generate_tfplan_report.py new file mode 100755 index 0000000..8b0724d --- /dev/null +++ b/examples/generate_tfplan_report.py @@ -0,0 +1,413 @@ +#!/usr/bin/env python3 +""" +Generate a Markdown report from Terraform plan using OpenAI compatible endpoint. +Loads tfplan.json, extracts resources to be created, and generates a report via local LLM. +""" + +import json +import re +import sys +import os +import argparse +from pathlib import Path +from datetime import datetime +import time +from typing import Any +from openai import OpenAI + +# Default Configuration +PLAN_FILE_DEFAULT = "tfplan.json" +MODEL_DEFAULT = os.getenv("TFPLAN_REPORT_MODEL", "local-model") +OPENAI_ENDPOINT_DEFAULT = os.getenv("TFPLAN_REPORT_ENDPOINT", "http://127.0.0.1:1234/v1") +OPENAI_API_KEY_DEFAULT = os.getenv("TFPLAN_REPORT_API_KEY") + + +def load_plan(plan_file: Path) -> dict[str, Any]: + """Load and parse the Terraform plan JSON file.""" + with open(plan_file, "r") as f: + return json.load(f) + + +# Action labels for display +ACTION_LABELS = { + "create": "create", + "delete": "delete", + "update": "update", + "replace": "replace (delete+create)", + "no-op": "no-op", + "read": "read", +} + + +def classify_actions(actions: list[str]) -> str: + """Normalise a list of actions into a single change category.""" + action_set = set(actions) + if action_set == {"create"}: + return "create" + if action_set == {"delete"}: + return "delete" + if action_set == {"update"}: + return "update" + if "create" in action_set and "delete" in action_set: + return "replace" + if action_set == {"no-op"}: + return "no-op" + if action_set == {"read"}: + return "read" + return "+".join(sorted(action_set)) + + +def extract_resource_changes(plan: dict[str, Any]) -> dict[str, list[dict[str, Any]]]: + """Extract all non-no-op resource changes grouped by action category.""" + categorised: dict[str, list[dict[str, Any]]] = {} + for change in plan.get("resource_changes", []): + actions = change.get("change", {}).get("actions", []) + category = classify_actions(actions) + if category in ("no-op", "read"): + continue + entry = { + "address": change.get("address"), + "type": change.get("type"), + "name": change.get("name"), + "action": category, + "change": change.get("change", {}), + } + categorised.setdefault(category, []).append(entry) + return categorised + + +def get_module_path(address: str) -> str: + """Extract the module path from a resource address, or '(root)' for root resources.""" + match = re.match(r'^((?:module\.[^.[]+(?:\[[^\]]*\])?\.)+)', address) + return match.group(1).rstrip('.') if match else "(root)" + + +def group_by_module_and_type(resources: list[dict[str, Any]]) -> dict[str, dict[str, list[str]]]: + """Group resources by module path, then by resource type.""" + grouped: dict[str, dict[str, list[str]]] = {} + for resource in resources: + module = get_module_path(resource["address"]) + grouped.setdefault(module, {}).setdefault(resource["type"], []).append(resource["address"]) + return grouped + + +def _sorted_modules(grouped: dict[str, dict[str, list[str]]]) -> list[str]: + """Return module keys sorted: named modules first (alphabetically), root last.""" + return sorted(grouped.keys(), key=lambda m: (m == "(root)", m)) + + +def _render_grouped_section(resources: list[dict[str, Any]], heading: str, lines: list[str]) -> None: + """Append a headed section of resources grouped by module and type to lines.""" + grouped = group_by_module_and_type(resources) + lines.append(heading) + lines.append("") + for module in _sorted_modules(grouped): + by_type = grouped[module] + module_count = sum(len(v) for v in by_type.values()) + lines.append(f"#### {module} ({module_count})") + lines.append("") + for rtype in sorted(by_type.keys()): + for address in by_type[rtype]: + lines.append(f"- `{address}`") + lines.append("") + + +ACTION_ORDER = ["delete", "replace", "update", "create"] +ACTION_HEADINGS = { + "delete": "### Deleted resources", + "replace": "### Replaced resources (destroy + recreate)", + "update": "### Updated resources", + "create": "### Created resources", +} + + +def generate_plain_report(categorised: dict[str, list[dict[str, Any]]], plan: dict[str, Any]) -> str: + """Generate a Markdown report directly from plan data without AI.""" + total = sum(len(v) for v in categorised.values()) + lines = [ + "# Terraform Plan Report", + "", + f"- **Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", + f"- **Terraform version:** {plan.get('terraform_version', 'unknown')}", + f"- **Total changes:** {total}", + ] + for action in ACTION_ORDER: + if action in categorised: + lines.append(f"- **{ACTION_LABELS[action]}:** {len(categorised[action])}") + lines.append("") + lines.append("## Changes") + lines.append("") + for action in ACTION_ORDER: + if action not in categorised: + continue + _render_grouped_section(categorised[action], ACTION_HEADINGS[action], lines) + return "\n".join(lines) + + +def format_changes_for_llm(categorised: dict[str, list[dict[str, Any]]]) -> str: + """Format all changes grouped by action, then module, then type.""" + lines = [] + for action in ACTION_ORDER: + if action not in categorised: + continue + resources = categorised[action] + grouped = group_by_module_and_type(resources) + lines.append(f"### {ACTION_LABELS[action].upper()} ({len(resources)})") + for module in _sorted_modules(grouped): + by_type = grouped[module] + lines.append(f"#### {module}") + for rtype in sorted(by_type.keys()): + for address in by_type[rtype]: + lines.append(f"- `{address}`") + lines.append("") + return "\n".join(lines) + + +def generate_report( + categorised: dict[str, list[dict[str, Any]]], + plan: dict[str, Any], + endpoint: str, + api_key: str | None, + model: str, + azure_endpoint: str | None = None, + azure_api_version: str = "2024-10-21", + use_gemini: bool = False, + gcp_project: str | None = None, + gcp_location: str | None = "us-central1" +) -> str: + """Generate an AI-powered Markdown report using OpenAI compatible endpoint or Google Gemini.""" + + changes_text = format_changes_for_llm(categorised) + total = sum(len(v) for v in categorised.values()) + summary_lines = [f"- {ACTION_LABELS[a]}: {len(categorised[a])}" for a in ACTION_ORDER if a in categorised] + summary_text = "\n".join(summary_lines) + + prompt = f"""Review the following Terraform plan changes and produce a Markdown report to help the deployment team verify alignment with intent. + +**Plan Summary:** +- Total changes: {total} +{summary_text} +- Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} + +**Planned changes (grouped by action, module and type):** + +{changes_text} + +Guidelines: +- The list of changes above must be reproduced verbatim in the report under a "Planned Changes" section, preserving the grouping. +- Focus exclusively on what is changing and whether it is expected. +- Deletions and replacements are highest priority — analyse each one individually: why might it be deleted or replaced, and is that intentional? +- For updates and creates, note anything surprising in scope or naming. +- Do not include implementation advice — the plan is already the product of implementation. +- Do not pad with generic statements. + +Structure: +1. **Change Summary** — one-paragraph overview of the overall scope +2. **Planned Changes** — verbatim list from above +3. **Deletions & Replacements Analysis** — only if any exist; individual analysis per resource +4. **Scope Review** — does the set of changes look coherent and complete? flag anomalies""" + + system_instruction = "You are a senior infrastructure engineer reviewing a Terraform plan before it is applied. Your job is to identify whether the changes match the deployment intent and surface anything worth scrutinising." + + if use_gemini: + try: + from google import genai + from google.genai import types + except ImportError: + print("❌ Error: google-genai package is not installed. Please install it to use Gemini.", file=sys.stderr) + sys.exit(1) + + if not gcp_project: + print("❌ Error: GCP project must be specified via --gcp-project or GOOGLE_CLOUD_PROJECT environment variable for Gemini.", file=sys.stderr) + sys.exit(1) + + client = genai.Client(vertexai=True, project=gcp_project, location=gcp_location) + response = client.models.generate_content( + model=model, + contents=prompt, + config=types.GenerateContentConfig( + temperature=0.4, + max_output_tokens=2000, + system_instruction=system_instruction + ) + ) + return response.text + + elif azure_endpoint: + from openai import AzureOpenAI + if api_key: + client = AzureOpenAI( + azure_endpoint=azure_endpoint, + api_key=api_key, + api_version=azure_api_version, + ) + else: + from azure.identity import DefaultAzureCredential, get_bearer_token_provider + token_provider = get_bearer_token_provider( + DefaultAzureCredential(), + "https://cognitiveservices.azure.com/.default", + ) + client = AzureOpenAI( + azure_endpoint=azure_endpoint, + azure_ad_token_provider=token_provider, + api_version=azure_api_version, + ) + else: + client = OpenAI( + base_url=endpoint, + api_key=api_key or "not-needed", + ) + + response = client.chat.completions.create( + model=model, + messages=[ + {"role": "system", "content": system_instruction}, + {"role": "user", "content": prompt} + ], + temperature=0.4, + max_tokens=2000, + ) + + return response.choices[0].message.content + + +def main(): + """Main entry point.""" + parser = argparse.ArgumentParser( + description="Generate a Markdown report from Terraform plan using OpenAI compatible endpoint." + ) + parser.add_argument( + "plan", + nargs="?", + default=PLAN_FILE_DEFAULT, + help=f"Terraform plan file (default: {PLAN_FILE_DEFAULT})" + ) + parser.add_argument( + "-o", "--output", + help="Output filename for the report (default: derived from plan filename)" + ) + parser.add_argument( + "--ai", + action="store_true", + help="Use AI to generate an analytical report (requires OpenAI compatible endpoint)" + ) + parser.add_argument( + "--model", + default=MODEL_DEFAULT, + help=f"Model to use for AI report generation (default: {MODEL_DEFAULT})" + ) + parser.add_argument( + "--endpoint", + default=OPENAI_ENDPOINT_DEFAULT, + help=f"OpenAI API endpoint (default: {OPENAI_ENDPOINT_DEFAULT})" + ) + parser.add_argument( + "--api-key", + default=OPENAI_API_KEY_DEFAULT, + help="OpenAI or Azure OpenAI API key. When omitted with --azure-endpoint, Entra ID authentication (DefaultAzureCredential) is used." + ) + parser.add_argument( + "--azure-endpoint", + default=None, + help="Azure OpenAI endpoint URL (e.g. https://oai-name.openai.azure.com/). When set, uses AzureOpenAI client and --endpoint is ignored." + ) + parser.add_argument( + "--azure-api-version", + default="2024-10-21", + help="Azure OpenAI API version (default: 2024-10-21)" + ) + parser.add_argument( + "--gemini", + action="store_true", + help="Use Google Gemini (Vertex AI) for AI report generation" + ) + parser.add_argument( + "--gcp-project", + default=os.getenv("GOOGLE_CLOUD_PROJECT"), + help="Google Cloud Project ID for Gemini" + ) + parser.add_argument( + "--gcp-location", + default=os.getenv("GOOGLE_CLOUD_LOCATION", "us-central1"), + help="Google Cloud Location for Gemini (default: us-central1)" + ) + args = parser.parse_args() + + # Determine the plan file path (relative to current working directory if not absolute) + plan_file = Path(args.plan) + if not plan_file.is_absolute(): + plan_file = Path.cwd() / plan_file + + if not plan_file.exists(): + print(f"❌ Error: {plan_file} not found", file=sys.stderr) + sys.exit(1) + + print(f"📋 Loading plan from {plan_file}...", file=sys.stderr) + plan = load_plan(plan_file) + + print("🔍 Extracting resource changes...", file=sys.stderr) + categorised = extract_resource_changes(plan) + total = sum(len(v) for v in categorised.values()) + + if not categorised: + print("⚠️ No resource changes found in the plan", file=sys.stderr) + sys.exit(1) + + for action in ACTION_ORDER: + count = len(categorised.get(action, [])) + if count: + print(f" {ACTION_LABELS[action]}: {count}", file=sys.stderr) + print(f"✅ Total changes: {total}", file=sys.stderr) + + # Generate report + if args.ai: + if args.gemini: + if args.model == MODEL_DEFAULT: + args.model = "gemini-2.5-flash" + print(f"🚀 Generating AI report with Gemini ({args.model})...", file=sys.stderr) + elif args.azure_endpoint: + print(f"🚀 Generating AI report with Azure OpenAI ({args.model})...", file=sys.stderr) + else: + print(f"🚀 Generating AI report with OpenAI endpoint ({args.model})...", file=sys.stderr) + + report_start = time.time() + report = generate_report( + categorised, + plan, + args.endpoint, + args.api_key, + args.model, + args.azure_endpoint, + args.azure_api_version, + args.gemini, + args.gcp_project, + args.gcp_location + ) + report_elapsed = time.time() - report_start + print(f" ✓ Completed in {report_elapsed:.2f}s", file=sys.stderr) + else: + print("📄 Generating plain report...", file=sys.stderr) + report = generate_plain_report(categorised, plan) + + # Determine output filename + if args.output: + output_file = Path(args.output) + if not output_file.is_absolute(): + output_file = plan_file.parent / output_file + else: + # Swap .json for .md + output_filename = plan_file.stem + ".md" + output_file = plan_file.parent / output_filename + + with open(output_file, "w") as f: + f.write(report) + + print(f"📄 Report written to {output_file}", file=sys.stderr) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n⚠️ Interrupted by user", file=sys.stderr) + sys.exit(1)