Files
simple-ca/simple-ca.py
T
slawek 935167ca8c
/ test-shell (push) Successful in 11s
/ test-python (push) Successful in 25s
/ test-go (push) Successful in 41s
Refactor simple-ca: Remove JSON config and streamline AIA URL handling
- Removed the JSON configuration structure and related functions.
- Introduced plain text file for AIA base URL management.
- Updated CA and certificate creation functions to directly read/write AIA URL.
- Simplified CA bundle rebuilding logic by directly reading subdirectories.
- Enhanced test coverage for CA and certificate creation, including PFX generation.
- Adjusted test cases to reflect changes in directory structure and file handling.
2026-05-24 21:40:06 +02:00

612 lines
22 KiB
Python
Executable File

#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2026 Sławomir Koszewski
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# This module requires Python 3.8+ and OpenSSL to be installed on the system.
import argparse
import datetime
import json
import os
import re
import subprocess
import sys
import tempfile
OPENSSL = "openssl"
class Config:
FILE = "simple-ca.json"
def __init__(self, ca_dir: str):
self._path = os.path.join(ca_dir, self.FILE)
self._data: dict = {}
if not os.path.isfile(self._path):
return
try:
with open(self._path, "r") as f:
self._data = json.load(f)
except (json.JSONDecodeError, OSError) as e:
print(f"WARNING: could not read {self._path}: {e}", file=sys.stderr)
def get(self, key, default=None):
return self._data.get(key, default)
def update(self, patch: dict):
file_missing = not os.path.isfile(self._path)
changed = any(self._data.get(k) != v for k, v in patch.items())
if file_missing or changed:
self._data.update(patch)
self._save()
def append_history(self, ca_key: str, entry: dict):
history = self._data.get("history", {})
history.setdefault(ca_key, []).append(entry)
self._data["history"] = history
self._save()
def revoke_in_history(self, ca_key: str, serial: str, revoked_at: str):
"""Mark a certificate as revoked. Returns True if newly revoked,
False if already revoked, None if serial not found."""
history = self._data.get("history", {})
for entry in history.get(ca_key, []):
if entry.get("serial") == serial:
if "revoked" in entry:
return False
entry["revoked"] = revoked_at
self._data["history"] = history
self._save()
return True
return None
def _save(self):
with open(self._path, "w") as f:
json.dump(self._data, f, indent=2)
f.write("\n")
_config: Config
def _now() -> str:
return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _err(msg):
print(f"ERROR: {msg}", file=sys.stderr)
def _rebuild_ca_bundle(ca_dir):
bundle_path = os.path.join(ca_dir, "ca_bundle.pem")
parts = []
root = os.path.join(ca_dir, "ca_cert.pem")
if os.path.isfile(root):
with open(root, "rb") as f:
parts.append(f.read())
for name in sorted(_config.get("subordinates", [])):
sub_cert = os.path.join(ca_dir, name, "ca_cert.pem")
if os.path.isfile(sub_cert):
with open(sub_cert, "rb") as f:
parts.append(f.read())
with open(bundle_path, "wb") as f:
f.write(b"".join(parts))
def _read_serial(cert_path) -> str:
result = subprocess.run(
[OPENSSL, "x509", "-in", cert_path, "-noout", "-serial"],
capture_output=True, text=True,
)
return result.stdout.strip().split("=", 1)[-1]
def _read_expiry(cert_path) -> str:
"""Return the certificate notAfter date as an ISO 8601 UTC string."""
result = subprocess.run(
[OPENSSL, "x509", "-in", cert_path, "-noout", "-enddate"],
capture_output=True, text=True,
)
raw = result.stdout.strip().split("=", 1)[-1]
raw = " ".join(raw.split()) # normalize whitespace ("May 4" → "May 4")
dt = datetime.datetime.strptime(raw, "%b %d %H:%M:%S %Y GMT")
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
def _iso_to_asn1(iso: str) -> str:
"""Convert "2026-05-24T14:28:10Z""260524142810Z" for OpenSSL index.txt."""
dt = datetime.datetime.strptime(iso, "%Y-%m-%dT%H:%M:%SZ")
return dt.strftime("%y%m%d%H%M%SZ")
_IP_RE = re.compile(r"^[0-9]{1,3}(\.[0-9]{1,3}){3}$")
_DNS_RE = re.compile(r"^[a-z0-9-]+(\.[a-z0-9-]+)*$")
def _is_ip(value):
return bool(_IP_RE.match(value))
def _is_dns(value):
return bool(_DNS_RE.match(value))
def _pipe(cmd1, cmd2):
p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE)
p2 = subprocess.Popen(cmd2, stdin=p1.stdout)
p1.stdout.close()
p2.communicate()
p1.wait()
return p1.returncode == 0 and p2.returncode == 0
def make_ca(ca_dir, ca_name, days=3650, issuing_ca=None, ca_publish_base_url=None):
if issuing_ca == "ca":
_err("--issuing-ca cannot be 'ca' as it is reserved for the root CA.")
return False
if not ca_dir or not os.path.isdir(ca_dir):
_err(f"Certificate directory {ca_dir} does not exist.")
return False
if not ca_name:
_err("CA name is required.")
return False
root_ca_cert_path = os.path.join(ca_dir, "ca_cert.pem")
root_ca_key_path = os.path.join(ca_dir, "ca_key.pem")
if not os.path.isfile(root_ca_cert_path) or not os.path.isfile(root_ca_key_path):
if issuing_ca:
_err(
f"Cannot create issuing CA '{ca_name}' without existing root CA "
"certificate and key. Please create the root CA first."
)
return False
print(f"Generating CA certificate '{ca_name}' and key...")
# Path length constraint of 1: allows one level of issuing CAs.
cmd = [
OPENSSL, "req",
"-x509",
"-newkey", "rsa:4096",
"-keyout", root_ca_key_path,
"-out", root_ca_cert_path,
"-days", str(days),
"-noenc",
"-subj", f"/CN={ca_name}",
"-text",
"-addext", "basicConstraints=critical,CA:TRUE,pathlen:1",
"-addext", "keyUsage=critical,keyCertSign,cRLSign",
]
if subprocess.run(cmd).returncode != 0:
_err("Failed to generate CA certificate and key.")
return False
_rebuild_ca_bundle(ca_dir)
patch = {"name": ca_name, "created": _now()}
if ca_publish_base_url:
patch["ca_publish_base_url"] = ca_publish_base_url
_config.update(patch)
return True
issuing_ca_dir = os.path.join(ca_dir, issuing_ca)
issuing_ca_cert = os.path.join(issuing_ca_dir, "ca_cert.pem")
issuing_ca_key = os.path.join(issuing_ca_dir, "ca_key.pem")
if not os.path.isfile(issuing_ca_cert) or not os.path.isfile(issuing_ca_key):
print(f"Generating issuing CA certificate '{ca_name}' and key...")
os.makedirs(issuing_ca_dir, exist_ok=True)
req_cmd = [
OPENSSL, "req",
"-newkey", "rsa:4096",
"-keyout", issuing_ca_key,
"-noenc",
"-subj", f"/CN={ca_name}",
"-addext", "basicConstraints=critical,CA:TRUE,pathlen:0",
"-addext", "keyUsage=critical,keyCertSign,cRLSign",
]
if ca_publish_base_url:
req_cmd += [
"-addext", f"authorityInfoAccess=caIssuers;URI:{ca_publish_base_url}/ca_cert.crt",
"-addext", f"crlDistributionPoints=URI:{ca_publish_base_url}/crl.pem",
]
x509_cmd = [
OPENSSL, "x509",
"-req",
"-CA", root_ca_cert_path,
"-CAkey", root_ca_key_path,
"-copy_extensions", "copyall",
"-days", str(days),
"-text",
"-out", issuing_ca_cert,
]
if not _pipe(req_cmd, x509_cmd):
_err("Failed to generate issuing CA certificate and key.")
return False
patch = {"ca_publish_base_url": ca_publish_base_url} if ca_publish_base_url else {}
subs = _config.get("subordinates", [])
if issuing_ca not in subs:
patch["subordinates"] = subs + [issuing_ca]
_config.update(patch)
_rebuild_ca_bundle(ca_dir)
return True
def make_cert(cert_subject_name, sans=None, ca_dir=None, cert_dir=None,
issuing_ca=None, days=365, ca_publish_base_url=None):
if issuing_ca == "ca":
_err("--issuing-ca cannot be 'ca' as it is reserved for the root CA.")
return False
if not ca_dir or not os.path.isdir(ca_dir):
_err(f"CA directory {ca_dir} does not exist.")
return False
if not cert_subject_name:
_err("Subject name is required.")
return False
if not _is_dns(cert_subject_name):
_err(f"Invalid subject name '{cert_subject_name}'. Must be a valid DNS name.")
return False
signing_dir = os.path.join(ca_dir, issuing_ca) if issuing_ca else ca_dir
cert_dir = cert_dir or signing_dir
if not os.path.isdir(cert_dir):
_err(f"Certificate directory {cert_dir} does not exist.")
return False
ca_cert_path = os.path.join(signing_dir, "ca_cert.pem")
ca_key_path = os.path.join(signing_dir, "ca_key.pem")
if not os.path.isfile(ca_cert_path) or not os.path.isfile(ca_key_path):
_err(
f"Signing CA certificate and key not found in {signing_dir}. "
"Please set up a signing CA first."
)
return False
aia_url = cdp_url = ""
if ca_publish_base_url:
if issuing_ca:
aia_url = f"{ca_publish_base_url}/{issuing_ca}/ca_cert.crt"
cdp_url = f"{ca_publish_base_url}/{issuing_ca}/crl.pem"
else:
aia_url = f"{ca_publish_base_url}/ca_cert.crt"
cdp_url = f"{ca_publish_base_url}/crl.pem"
cert_name = cert_subject_name.split(".", 1)[0]
san_entries = [f"DNS:{cert_subject_name}"]
for entry in sans or []:
if _is_ip(entry):
san_entries.append(f"IP:{entry}")
elif _is_dns(entry):
san_entries.append(f"DNS:{entry}")
else:
_err(f"Invalid SAN entry '{entry}'")
return False
sans_ext = "subjectAltName=" + ",".join(san_entries)
print(f"Generating server certificate for '{cert_subject_name}' with SANs:")
for san in san_entries:
print(f" - {san}")
cert_out = os.path.join(cert_dir, f"{cert_name}_cert.pem")
key_out = os.path.join(cert_dir, f"{cert_name}_key.pem")
if not os.path.isfile(cert_out) or not os.path.isfile(key_out):
print("Generating server certificate and key...")
req_cmd = [
OPENSSL, "req",
"-newkey", "rsa:4096",
"-keyout", key_out,
"-noenc",
"-subj", f"/CN={cert_subject_name}",
"-addext", "basicConstraints=critical,CA:FALSE",
"-addext", "keyUsage=critical,digitalSignature,keyEncipherment",
"-addext", "extendedKeyUsage=serverAuth,clientAuth",
"-addext", sans_ext,
]
if aia_url:
req_cmd += ["-addext", f"authorityInfoAccess=caIssuers;URI:{aia_url}"]
if cdp_url:
req_cmd += ["-addext", f"crlDistributionPoints=URI:{cdp_url}"]
x509_cmd = [
OPENSSL, "x509",
"-req",
"-CA", ca_cert_path,
"-CAkey", ca_key_path,
"-copy_extensions", "copyall",
"-days", str(days),
"-text",
"-out", cert_out,
]
if not _pipe(req_cmd, x509_cmd):
_err("Failed to generate server certificate and key.")
return False
_config.append_history(issuing_ca or "ca", {
"name": cert_subject_name,
"serial": _read_serial(cert_out),
"created": _now(),
"expires": _read_expiry(cert_out),
})
return True
def make_pfx(cert_path, ca_dir, issuing_ca=None, password=None, apple_openssl=False):
if issuing_ca == "ca":
_err("--issuing-ca cannot be 'ca' as it is reserved for the root CA.")
return False
cert_dir = os.path.dirname(cert_path)
cert_basename = os.path.basename(cert_path)
cert_name = cert_basename[:-len("_cert.pem")] if cert_basename.endswith("_cert.pem") else cert_basename
key_path = os.path.join(cert_dir, f"{cert_name}_key.pem")
if not cert_dir or not os.path.isdir(cert_dir):
_err(f"Certificate directory {cert_dir} does not exist.")
return False
if not ca_dir or not os.path.isdir(ca_dir):
_err(f"CA directory {ca_dir} does not exist.")
return False
if not os.path.isfile(cert_path) or not os.path.isfile(key_path):
_err("Server certificate or key not found.")
return False
root_ca_cert_path = os.path.join(ca_dir, "ca_cert.pem")
root_ca_key_path = os.path.join(ca_dir, "ca_key.pem")
if not os.path.isfile(root_ca_cert_path) or not os.path.isfile(root_ca_key_path):
_err(f"CA certificate or key not found in {ca_dir}.")
return False
if issuing_ca:
issuing_ca_cert_path = os.path.join(ca_dir, issuing_ca, "ca_cert.pem")
if not os.path.isfile(issuing_ca_cert_path):
_err(f"Issuing CA certificate not found: {issuing_ca_cert_path}.")
return False
if not password:
password = "changeit"
pfx_path = os.path.join(cert_dir, f"{cert_name}.pfx")
if os.path.isfile(pfx_path):
print("PKCS#12 (PFX) file already exists, aborting generation.")
return False
print("Generating PKCS#12 (PFX) file...", end="")
chain_bytes = b""
with open(root_ca_cert_path, "rb") as f:
chain_bytes += f.read()
if issuing_ca:
with open(issuing_ca_cert_path, "rb") as f:
chain_bytes += f.read()
chain_fd, chain_file = tempfile.mkstemp()
try:
with os.fdopen(chain_fd, "wb") as f:
f.write(chain_bytes)
cmd = [
"/usr/bin/openssl" if apple_openssl else OPENSSL, "pkcs12",
"-export", "-out", pfx_path,
"-inkey", key_path,
"-in", cert_path,
"-certfile", chain_file,
"-password", f"pass:{password}",
]
if subprocess.run(cmd).returncode != 0:
_err("Failed to generate PKCS#12 (PFX) file.")
return False
finally:
if os.path.exists(chain_file):
os.remove(chain_file)
print("done.")
return True
def make_crl(ca_dir, issuing_ca=None, days=30):
signing_dir = os.path.join(ca_dir, issuing_ca) if issuing_ca else ca_dir
ca_cert = os.path.join(signing_dir, "ca_cert.pem")
ca_key = os.path.join(signing_dir, "ca_key.pem")
if not os.path.isfile(ca_cert) or not os.path.isfile(ca_key):
_err(f"CA certificate or key not found in {signing_dir}.")
return False
crl_path = os.path.join(signing_dir, "crl.pem")
history_key = issuing_ca or "ca"
revoked_entries = [
e for e in _config.get("history", {}).get(history_key, [])
if "revoked" in e and "expires" in e
]
with tempfile.TemporaryDirectory() as tmp:
index_txt = os.path.join(tmp, "index.txt")
crlnumber = os.path.join(tmp, "crlnumber")
cnf_path = os.path.join(tmp, "openssl.cnf")
with open(index_txt, "w") as f:
for e in revoked_entries:
expires_asn1 = _iso_to_asn1(e["expires"])
revoked_asn1 = _iso_to_asn1(e["revoked"])
f.write(f"R\t{expires_asn1}\t{revoked_asn1}\t{e['serial']}\tunknown\t/CN={e['name']}\n")
with open(crlnumber, "w") as f:
f.write("01\n")
with open(cnf_path, "w") as f:
f.write(
"[ ca ]\ndefault_ca = CA_default\n\n"
"[ CA_default ]\n"
f"database = {index_txt}\n"
f"crlnumber = {crlnumber}\n"
f"certificate = {ca_cert}\n"
f"private_key = {ca_key}\n"
f"default_crl_days = {days}\n"
"default_md = sha256\n\n"
"[ crl_ext ]\n"
"authorityKeyIdentifier = keyid:always\n"
)
if subprocess.run([OPENSSL, "ca", "-gencrl", "-config", cnf_path, "-out", crl_path]).returncode != 0:
_err("Failed to generate CRL.")
return False
print(f"CRL written to {crl_path}")
return True
def revoke_cert(cert_path, ca_dir, issuing_ca=None):
if not os.path.isfile(cert_path):
_err(f"Certificate not found: {cert_path}")
return False
signing_dir = os.path.join(ca_dir, issuing_ca) if issuing_ca else ca_dir
if not os.path.isdir(signing_dir):
_err(f"CA directory not found: {signing_dir}")
return False
ca_key = issuing_ca or "ca"
serial = _read_serial(cert_path)
result = _config.revoke_in_history(ca_key, serial, _now())
if result is None:
_err(f"Certificate with serial {serial} not found in history for CA '{ca_key}'.")
return False
if result is False:
print(f"Certificate {cert_path} (serial {serial}) is already revoked.")
return True
print(f"Certificate {cert_path} (serial {serial}) marked as revoked.")
return True
def _build_parser():
parser = argparse.ArgumentParser(
description="Simple CA for creating and managing test certificates."
)
sub = parser.add_subparsers(dest="command", required=True)
p_ca = sub.add_parser("make-ca", help="Create a root or issuing CA.")
p_ca.add_argument("--days", type=int, default=None, help="Validity period in days (default: 3650)")
p_ca.add_argument("--issuing-ca", default=None, help="Specify the issuing CA")
p_ca.add_argument("--ca-publish-base-url", default=None,
help="Base URL for AIA and CRL distribution point extensions")
p_ca.add_argument("--ca-dir", help="Directory to store the CA files")
p_ca.add_argument("--openssl", default=None, metavar="PATH",
help=f"Path to the openssl binary (default: {OPENSSL})")
p_ca.add_argument("ca_name", help="Name of the CA")
p_cert = sub.add_parser("make-cert", help="Create a server/client certificate.")
p_cert.add_argument("--ca-dir", help="Directory of the CA")
p_cert.add_argument("--issuing-ca", default=None, help="Specify the issuing CA")
p_cert.add_argument("--days", type=int, default=None, help="Validity period in days (default: 365)")
p_cert.add_argument("--cert-dir", help="Directory to store the certificate files")
p_cert.add_argument("--openssl", default=None, metavar="PATH",
help=f"Path to the openssl binary (default: {OPENSSL})")
p_cert.add_argument("subject_name", help="Subject name for the certificate")
p_cert.add_argument("sans", nargs="*", help="Subject Alternative Names (SANs) for the certificate")
p_pfx = sub.add_parser("make-pfx", help="Create a PKCS#12 (PFX) bundle.")
p_pfx.add_argument("--issuing-ca", default=None, help="Specify the issuing CA")
p_pfx.add_argument("--ca-dir", help="Directory of the CA")
p_pfx.add_argument("--password", help="Password for the PFX file")
p_pfx.add_argument("--apple-openssl", action="store_true", default=False,
help="Use Apple's bundled /usr/bin/openssl for PKCS12 generation")
p_pfx.add_argument("path", help="Path to the certificate file")
p_crl = sub.add_parser("make-crl", help="Generate a CRL for a CA.")
p_crl.add_argument("--ca-dir", help="Directory of the CA")
p_crl.add_argument("--issuing-ca", default=None, help="Generate CRL for this issuing CA")
p_crl.add_argument("--days", type=int, default=None, help="CRL validity in days (default: 30)")
p_rev = sub.add_parser("revoke-cert", help="Revoke a certificate.")
p_rev.add_argument("--ca-dir", help="Directory of the CA")
p_rev.add_argument("--issuing-ca", default=None, help="Issuing CA that signed the certificate")
p_rev.add_argument("cert_path", help="Path to the certificate file to revoke")
return parser
def main(argv=None):
global OPENSSL, _config
parser = _build_parser()
args = parser.parse_args(argv)
ca_dir = args.ca_dir or os.environ.get("SIMPLE_CA_DIR") or os.getcwd()
_config = Config(ca_dir)
OPENSSL = getattr(args, "openssl", None) or _config.get("openssl", OPENSSL)
issuing_ca = args.issuing_ca or _config.get("issuing_ca")
ca_publish_base_url = getattr(args, "ca_publish_base_url", None) or _config.get("ca_publish_base_url")
days_cfg = _config.get("days", {})
if args.command == "make-ca":
days = args.days or days_cfg.get("ca", 3650)
ok = make_ca(
ca_dir, args.ca_name,
days=days,
issuing_ca=issuing_ca,
ca_publish_base_url=ca_publish_base_url,
)
elif args.command == "make-cert":
days = args.days or days_cfg.get("cert", 365)
ok = make_cert(
args.subject_name,
sans=args.sans,
ca_dir=ca_dir,
cert_dir=getattr(args, "cert_dir", None),
issuing_ca=issuing_ca,
days=days,
ca_publish_base_url=ca_publish_base_url,
)
elif args.command == "make-pfx":
ok = make_pfx(
args.path, ca_dir,
issuing_ca=issuing_ca,
password=args.password,
apple_openssl=args.apple_openssl,
)
elif args.command == "make-crl":
days = args.days or days_cfg.get("crl", 30)
ok = make_crl(ca_dir, issuing_ca=issuing_ca, days=days)
elif args.command == "revoke-cert":
ok = revoke_cert(args.cert_path, ca_dir, issuing_ca=issuing_ca)
else:
parser.error(f"Unknown command: {args.command}")
ok = False
return 0 if ok else 1
if __name__ == "__main__":
sys.exit(main())