Add script to generate Markdown reports from Terraform plans using OpenAI

This commit is contained in:
2026-06-10 18:26:01 +02:00
parent 3577a87948
commit 839d3978d3
+413
View File
@@ -0,0 +1,413 @@
#!/usr/bin/env python3
"""
Generate a Markdown report from Terraform plan using OpenAI compatible endpoint.
Loads tfplan.json, extracts resources to be created, and generates a report via local LLM.
"""
import json
import re
import sys
import os
import argparse
from pathlib import Path
from datetime import datetime
import time
from typing import Any
from openai import OpenAI
# Default Configuration
PLAN_FILE_DEFAULT = "tfplan.json"
MODEL_DEFAULT = os.getenv("TFPLAN_REPORT_MODEL", "local-model")
OPENAI_ENDPOINT_DEFAULT = os.getenv("TFPLAN_REPORT_ENDPOINT", "http://127.0.0.1:1234/v1")
OPENAI_API_KEY_DEFAULT = os.getenv("TFPLAN_REPORT_API_KEY")
def load_plan(plan_file: Path) -> dict[str, Any]:
"""Load and parse the Terraform plan JSON file."""
with open(plan_file, "r") as f:
return json.load(f)
# Action labels for display
ACTION_LABELS = {
"create": "create",
"delete": "delete",
"update": "update",
"replace": "replace (delete+create)",
"no-op": "no-op",
"read": "read",
}
def classify_actions(actions: list[str]) -> str:
"""Normalise a list of actions into a single change category."""
action_set = set(actions)
if action_set == {"create"}:
return "create"
if action_set == {"delete"}:
return "delete"
if action_set == {"update"}:
return "update"
if "create" in action_set and "delete" in action_set:
return "replace"
if action_set == {"no-op"}:
return "no-op"
if action_set == {"read"}:
return "read"
return "+".join(sorted(action_set))
def extract_resource_changes(plan: dict[str, Any]) -> dict[str, list[dict[str, Any]]]:
"""Extract all non-no-op resource changes grouped by action category."""
categorised: dict[str, list[dict[str, Any]]] = {}
for change in plan.get("resource_changes", []):
actions = change.get("change", {}).get("actions", [])
category = classify_actions(actions)
if category in ("no-op", "read"):
continue
entry = {
"address": change.get("address"),
"type": change.get("type"),
"name": change.get("name"),
"action": category,
"change": change.get("change", {}),
}
categorised.setdefault(category, []).append(entry)
return categorised
def get_module_path(address: str) -> str:
"""Extract the module path from a resource address, or '(root)' for root resources."""
match = re.match(r'^((?:module\.[^.[]+(?:\[[^\]]*\])?\.)+)', address)
return match.group(1).rstrip('.') if match else "(root)"
def group_by_module_and_type(resources: list[dict[str, Any]]) -> dict[str, dict[str, list[str]]]:
"""Group resources by module path, then by resource type."""
grouped: dict[str, dict[str, list[str]]] = {}
for resource in resources:
module = get_module_path(resource["address"])
grouped.setdefault(module, {}).setdefault(resource["type"], []).append(resource["address"])
return grouped
def _sorted_modules(grouped: dict[str, dict[str, list[str]]]) -> list[str]:
"""Return module keys sorted: named modules first (alphabetically), root last."""
return sorted(grouped.keys(), key=lambda m: (m == "(root)", m))
def _render_grouped_section(resources: list[dict[str, Any]], heading: str, lines: list[str]) -> None:
"""Append a headed section of resources grouped by module and type to lines."""
grouped = group_by_module_and_type(resources)
lines.append(heading)
lines.append("")
for module in _sorted_modules(grouped):
by_type = grouped[module]
module_count = sum(len(v) for v in by_type.values())
lines.append(f"#### {module} ({module_count})")
lines.append("")
for rtype in sorted(by_type.keys()):
for address in by_type[rtype]:
lines.append(f"- `{address}`")
lines.append("")
ACTION_ORDER = ["delete", "replace", "update", "create"]
ACTION_HEADINGS = {
"delete": "### Deleted resources",
"replace": "### Replaced resources (destroy + recreate)",
"update": "### Updated resources",
"create": "### Created resources",
}
def generate_plain_report(categorised: dict[str, list[dict[str, Any]]], plan: dict[str, Any]) -> str:
"""Generate a Markdown report directly from plan data without AI."""
total = sum(len(v) for v in categorised.values())
lines = [
"# Terraform Plan Report",
"",
f"- **Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
f"- **Terraform version:** {plan.get('terraform_version', 'unknown')}",
f"- **Total changes:** {total}",
]
for action in ACTION_ORDER:
if action in categorised:
lines.append(f"- **{ACTION_LABELS[action]}:** {len(categorised[action])}")
lines.append("")
lines.append("## Changes")
lines.append("")
for action in ACTION_ORDER:
if action not in categorised:
continue
_render_grouped_section(categorised[action], ACTION_HEADINGS[action], lines)
return "\n".join(lines)
def format_changes_for_llm(categorised: dict[str, list[dict[str, Any]]]) -> str:
"""Format all changes grouped by action, then module, then type."""
lines = []
for action in ACTION_ORDER:
if action not in categorised:
continue
resources = categorised[action]
grouped = group_by_module_and_type(resources)
lines.append(f"### {ACTION_LABELS[action].upper()} ({len(resources)})")
for module in _sorted_modules(grouped):
by_type = grouped[module]
lines.append(f"#### {module}")
for rtype in sorted(by_type.keys()):
for address in by_type[rtype]:
lines.append(f"- `{address}`")
lines.append("")
return "\n".join(lines)
def generate_report(
categorised: dict[str, list[dict[str, Any]]],
plan: dict[str, Any],
endpoint: str,
api_key: str | None,
model: str,
azure_endpoint: str | None = None,
azure_api_version: str = "2024-10-21",
use_gemini: bool = False,
gcp_project: str | None = None,
gcp_location: str | None = "us-central1"
) -> str:
"""Generate an AI-powered Markdown report using OpenAI compatible endpoint or Google Gemini."""
changes_text = format_changes_for_llm(categorised)
total = sum(len(v) for v in categorised.values())
summary_lines = [f"- {ACTION_LABELS[a]}: {len(categorised[a])}" for a in ACTION_ORDER if a in categorised]
summary_text = "\n".join(summary_lines)
prompt = f"""Review the following Terraform plan changes and produce a Markdown report to help the deployment team verify alignment with intent.
**Plan Summary:**
- Total changes: {total}
{summary_text}
- Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**Planned changes (grouped by action, module and type):**
{changes_text}
Guidelines:
- The list of changes above must be reproduced verbatim in the report under a "Planned Changes" section, preserving the grouping.
- Focus exclusively on what is changing and whether it is expected.
- Deletions and replacements are highest priority — analyse each one individually: why might it be deleted or replaced, and is that intentional?
- For updates and creates, note anything surprising in scope or naming.
- Do not include implementation advice — the plan is already the product of implementation.
- Do not pad with generic statements.
Structure:
1. **Change Summary** — one-paragraph overview of the overall scope
2. **Planned Changes** — verbatim list from above
3. **Deletions & Replacements Analysis** — only if any exist; individual analysis per resource
4. **Scope Review** — does the set of changes look coherent and complete? flag anomalies"""
system_instruction = "You are a senior infrastructure engineer reviewing a Terraform plan before it is applied. Your job is to identify whether the changes match the deployment intent and surface anything worth scrutinising."
if use_gemini:
try:
from google import genai
from google.genai import types
except ImportError:
print("❌ Error: google-genai package is not installed. Please install it to use Gemini.", file=sys.stderr)
sys.exit(1)
if not gcp_project:
print("❌ Error: GCP project must be specified via --gcp-project or GOOGLE_CLOUD_PROJECT environment variable for Gemini.", file=sys.stderr)
sys.exit(1)
client = genai.Client(vertexai=True, project=gcp_project, location=gcp_location)
response = client.models.generate_content(
model=model,
contents=prompt,
config=types.GenerateContentConfig(
temperature=0.4,
max_output_tokens=2000,
system_instruction=system_instruction
)
)
return response.text
elif azure_endpoint:
from openai import AzureOpenAI
if api_key:
client = AzureOpenAI(
azure_endpoint=azure_endpoint,
api_key=api_key,
api_version=azure_api_version,
)
else:
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
token_provider = get_bearer_token_provider(
DefaultAzureCredential(),
"https://cognitiveservices.azure.com/.default",
)
client = AzureOpenAI(
azure_endpoint=azure_endpoint,
azure_ad_token_provider=token_provider,
api_version=azure_api_version,
)
else:
client = OpenAI(
base_url=endpoint,
api_key=api_key or "not-needed",
)
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_instruction},
{"role": "user", "content": prompt}
],
temperature=0.4,
max_tokens=2000,
)
return response.choices[0].message.content
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Generate a Markdown report from Terraform plan using OpenAI compatible endpoint."
)
parser.add_argument(
"plan",
nargs="?",
default=PLAN_FILE_DEFAULT,
help=f"Terraform plan file (default: {PLAN_FILE_DEFAULT})"
)
parser.add_argument(
"-o", "--output",
help="Output filename for the report (default: derived from plan filename)"
)
parser.add_argument(
"--ai",
action="store_true",
help="Use AI to generate an analytical report (requires OpenAI compatible endpoint)"
)
parser.add_argument(
"--model",
default=MODEL_DEFAULT,
help=f"Model to use for AI report generation (default: {MODEL_DEFAULT})"
)
parser.add_argument(
"--endpoint",
default=OPENAI_ENDPOINT_DEFAULT,
help=f"OpenAI API endpoint (default: {OPENAI_ENDPOINT_DEFAULT})"
)
parser.add_argument(
"--api-key",
default=OPENAI_API_KEY_DEFAULT,
help="OpenAI or Azure OpenAI API key. When omitted with --azure-endpoint, Entra ID authentication (DefaultAzureCredential) is used."
)
parser.add_argument(
"--azure-endpoint",
default=None,
help="Azure OpenAI endpoint URL (e.g. https://oai-name.openai.azure.com/). When set, uses AzureOpenAI client and --endpoint is ignored."
)
parser.add_argument(
"--azure-api-version",
default="2024-10-21",
help="Azure OpenAI API version (default: 2024-10-21)"
)
parser.add_argument(
"--gemini",
action="store_true",
help="Use Google Gemini (Vertex AI) for AI report generation"
)
parser.add_argument(
"--gcp-project",
default=os.getenv("GOOGLE_CLOUD_PROJECT"),
help="Google Cloud Project ID for Gemini"
)
parser.add_argument(
"--gcp-location",
default=os.getenv("GOOGLE_CLOUD_LOCATION", "us-central1"),
help="Google Cloud Location for Gemini (default: us-central1)"
)
args = parser.parse_args()
# Determine the plan file path (relative to current working directory if not absolute)
plan_file = Path(args.plan)
if not plan_file.is_absolute():
plan_file = Path.cwd() / plan_file
if not plan_file.exists():
print(f"❌ Error: {plan_file} not found", file=sys.stderr)
sys.exit(1)
print(f"📋 Loading plan from {plan_file}...", file=sys.stderr)
plan = load_plan(plan_file)
print("🔍 Extracting resource changes...", file=sys.stderr)
categorised = extract_resource_changes(plan)
total = sum(len(v) for v in categorised.values())
if not categorised:
print("⚠️ No resource changes found in the plan", file=sys.stderr)
sys.exit(1)
for action in ACTION_ORDER:
count = len(categorised.get(action, []))
if count:
print(f" {ACTION_LABELS[action]}: {count}", file=sys.stderr)
print(f"✅ Total changes: {total}", file=sys.stderr)
# Generate report
if args.ai:
if args.gemini:
if args.model == MODEL_DEFAULT:
args.model = "gemini-2.5-flash"
print(f"🚀 Generating AI report with Gemini ({args.model})...", file=sys.stderr)
elif args.azure_endpoint:
print(f"🚀 Generating AI report with Azure OpenAI ({args.model})...", file=sys.stderr)
else:
print(f"🚀 Generating AI report with OpenAI endpoint ({args.model})...", file=sys.stderr)
report_start = time.time()
report = generate_report(
categorised,
plan,
args.endpoint,
args.api_key,
args.model,
args.azure_endpoint,
args.azure_api_version,
args.gemini,
args.gcp_project,
args.gcp_location
)
report_elapsed = time.time() - report_start
print(f" ✓ Completed in {report_elapsed:.2f}s", file=sys.stderr)
else:
print("📄 Generating plain report...", file=sys.stderr)
report = generate_plain_report(categorised, plan)
# Determine output filename
if args.output:
output_file = Path(args.output)
if not output_file.is_absolute():
output_file = plan_file.parent / output_file
else:
# Swap .json for .md
output_filename = plan_file.stem + ".md"
output_file = plan_file.parent / output_filename
with open(output_file, "w") as f:
f.write(report)
print(f"📄 Report written to {output_file}", file=sys.stderr)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n⚠️ Interrupted by user", file=sys.stderr)
sys.exit(1)