#!/usr/bin/env python3 # MIT License # # Copyright (c) 2026 Sławomir Koszewski # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. # This module requires Python 3.8+ and OpenSSL to be installed on the system. import argparse import datetime import json import os import re import subprocess import sys import tempfile OPENSSL = "openssl" class Config: FILE = "simple-ca.json" def __init__(self, ca_dir: str): self._path = os.path.join(ca_dir, self.FILE) self._data: dict = {} if not os.path.isfile(self._path): return try: with open(self._path, "r") as f: self._data = json.load(f) except (json.JSONDecodeError, OSError) as e: print(f"WARNING: could not read {self._path}: {e}", file=sys.stderr) def get(self, key, default=None): return self._data.get(key, default) def update(self, patch: dict): file_missing = not os.path.isfile(self._path) changed = any(self._data.get(k) != v for k, v in patch.items()) if file_missing or changed: self._data.update(patch) self._save() def append_history(self, ca_key: str, entry: dict): history = self._data.get("history", {}) history.setdefault(ca_key, []).append(entry) self._data["history"] = history self._save() def revoke_in_history(self, ca_key: str, serial: str, revoked_at: str): """Mark a certificate as revoked. Returns True if newly revoked, False if already revoked, None if serial not found.""" history = self._data.get("history", {}) for entry in history.get(ca_key, []): if entry.get("serial") == serial: if "revoked" in entry: return False entry["revoked"] = revoked_at self._data["history"] = history self._save() return True return None def _save(self): with open(self._path, "w") as f: json.dump(self._data, f, indent=2) f.write("\n") _config: Config def _now() -> str: return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def _err(msg): print(f"ERROR: {msg}", file=sys.stderr) def _rebuild_ca_bundle(ca_dir): bundle_path = os.path.join(ca_dir, "ca_bundle.pem") parts = [] root = os.path.join(ca_dir, "ca_cert.pem") if os.path.isfile(root): with open(root, "rb") as f: parts.append(f.read()) for name in sorted(_config.get("subordinates", [])): sub_cert = os.path.join(ca_dir, name, "ca_cert.pem") if os.path.isfile(sub_cert): with open(sub_cert, "rb") as f: parts.append(f.read()) with open(bundle_path, "wb") as f: f.write(b"".join(parts)) def _read_serial(cert_path) -> str: result = subprocess.run( [OPENSSL, "x509", "-in", cert_path, "-noout", "-serial"], capture_output=True, text=True, ) return result.stdout.strip().split("=", 1)[-1] def _read_expiry(cert_path) -> str: """Return the certificate notAfter date as an ISO 8601 UTC string.""" result = subprocess.run( [OPENSSL, "x509", "-in", cert_path, "-noout", "-enddate"], capture_output=True, text=True, ) raw = result.stdout.strip().split("=", 1)[-1] raw = " ".join(raw.split()) # normalize whitespace ("May 4" → "May 4") dt = datetime.datetime.strptime(raw, "%b %d %H:%M:%S %Y GMT") return dt.strftime("%Y-%m-%dT%H:%M:%SZ") def _iso_to_asn1(iso: str) -> str: """Convert "2026-05-24T14:28:10Z" → "260524142810Z" for OpenSSL index.txt.""" dt = datetime.datetime.strptime(iso, "%Y-%m-%dT%H:%M:%SZ") return dt.strftime("%y%m%d%H%M%SZ") _IP_RE = re.compile(r"^[0-9]{1,3}(\.[0-9]{1,3}){3}$") _DNS_RE = re.compile(r"^[a-z0-9-]+(\.[a-z0-9-]+)*$") def _is_ip(value): return bool(_IP_RE.match(value)) def _is_dns(value): return bool(_DNS_RE.match(value)) def _pipe(cmd1, cmd2): p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE) p2 = subprocess.Popen(cmd2, stdin=p1.stdout) p1.stdout.close() p2.communicate() p1.wait() return p1.returncode == 0 and p2.returncode == 0 def make_ca(ca_dir, ca_name, days=3650, issuing_ca=None, ca_publish_base_url=None): if issuing_ca == "ca": _err("--issuing-ca cannot be 'ca' as it is reserved for the root CA.") return False if not ca_dir or not os.path.isdir(ca_dir): _err(f"Certificate directory {ca_dir} does not exist.") return False if not ca_name: _err("CA name is required.") return False root_ca_cert_path = os.path.join(ca_dir, "ca_cert.pem") root_ca_key_path = os.path.join(ca_dir, "ca_key.pem") if not os.path.isfile(root_ca_cert_path) or not os.path.isfile(root_ca_key_path): if issuing_ca: _err( f"Cannot create issuing CA '{ca_name}' without existing root CA " "certificate and key. Please create the root CA first." ) return False print(f"Generating CA certificate '{ca_name}' and key...") # Path length constraint of 1: allows one level of issuing CAs. cmd = [ OPENSSL, "req", "-x509", "-newkey", "rsa:4096", "-keyout", root_ca_key_path, "-out", root_ca_cert_path, "-days", str(days), "-noenc", "-subj", f"/CN={ca_name}", "-text", "-addext", "basicConstraints=critical,CA:TRUE,pathlen:1", "-addext", "keyUsage=critical,keyCertSign,cRLSign", ] if subprocess.run(cmd).returncode != 0: _err("Failed to generate CA certificate and key.") return False _rebuild_ca_bundle(ca_dir) patch = {"name": ca_name, "created": _now()} if ca_publish_base_url: patch["ca_publish_base_url"] = ca_publish_base_url _config.update(patch) return True issuing_ca_dir = os.path.join(ca_dir, issuing_ca) issuing_ca_cert = os.path.join(issuing_ca_dir, "ca_cert.pem") issuing_ca_key = os.path.join(issuing_ca_dir, "ca_key.pem") if not os.path.isfile(issuing_ca_cert) or not os.path.isfile(issuing_ca_key): print(f"Generating issuing CA certificate '{ca_name}' and key...") os.makedirs(issuing_ca_dir, exist_ok=True) req_cmd = [ OPENSSL, "req", "-newkey", "rsa:4096", "-keyout", issuing_ca_key, "-noenc", "-subj", f"/CN={ca_name}", "-addext", "basicConstraints=critical,CA:TRUE,pathlen:0", "-addext", "keyUsage=critical,keyCertSign,cRLSign", ] if ca_publish_base_url: req_cmd += [ "-addext", f"authorityInfoAccess=caIssuers;URI:{ca_publish_base_url}/ca_cert.crt", "-addext", f"crlDistributionPoints=URI:{ca_publish_base_url}/crl.pem", ] x509_cmd = [ OPENSSL, "x509", "-req", "-CA", root_ca_cert_path, "-CAkey", root_ca_key_path, "-copy_extensions", "copyall", "-days", str(days), "-text", "-out", issuing_ca_cert, ] if not _pipe(req_cmd, x509_cmd): _err("Failed to generate issuing CA certificate and key.") return False patch = {"ca_publish_base_url": ca_publish_base_url} if ca_publish_base_url else {} subs = _config.get("subordinates", []) if issuing_ca not in subs: patch["subordinates"] = subs + [issuing_ca] _config.update(patch) _rebuild_ca_bundle(ca_dir) return True def make_cert(cert_dir, cert_subject_name, sans=None, ca_dir=None, issuing_ca=None, days=365, ca_publish_base_url=None): if issuing_ca == "ca": _err("--issuing-ca cannot be 'ca' as it is reserved for the root CA.") return False ca_dir = ca_dir or cert_dir if not cert_dir or not os.path.isdir(cert_dir): _err(f"Certificate directory {cert_dir} does not exist.") return False if not ca_dir or not os.path.isdir(ca_dir): _err(f"CA directory {ca_dir} does not exist.") return False if not cert_subject_name: _err("Subject name is required.") return False if not _is_dns(cert_subject_name): _err(f"Invalid subject name '{cert_subject_name}'. Must be a valid DNS name.") return False signing_dir = os.path.join(ca_dir, issuing_ca) if issuing_ca else ca_dir ca_cert_path = os.path.join(signing_dir, "ca_cert.pem") ca_key_path = os.path.join(signing_dir, "ca_key.pem") if not os.path.isfile(ca_cert_path) or not os.path.isfile(ca_key_path): _err( f"Signing CA certificate and key not found in {signing_dir}. " "Please set up a signing CA first." ) return False aia_url = cdp_url = "" if ca_publish_base_url: if issuing_ca: aia_url = f"{ca_publish_base_url}/{issuing_ca}/ca_cert.crt" cdp_url = f"{ca_publish_base_url}/{issuing_ca}/crl.pem" else: aia_url = f"{ca_publish_base_url}/ca_cert.crt" cdp_url = f"{ca_publish_base_url}/crl.pem" cert_name = cert_subject_name.split(".", 1)[0] san_entries = [f"DNS:{cert_subject_name}"] for entry in sans or []: if _is_ip(entry): san_entries.append(f"IP:{entry}") elif _is_dns(entry): san_entries.append(f"DNS:{entry}") else: _err(f"Invalid SAN entry '{entry}'") return False sans_ext = "subjectAltName=" + ",".join(san_entries) print(f"Generating server certificate for '{cert_subject_name}' with SANs:") for san in san_entries: print(f" - {san}") cert_out = os.path.join(cert_dir, f"{cert_name}_cert.pem") key_out = os.path.join(cert_dir, f"{cert_name}_key.pem") if not os.path.isfile(cert_out) or not os.path.isfile(key_out): print("Generating server certificate and key...") req_cmd = [ OPENSSL, "req", "-newkey", "rsa:4096", "-keyout", key_out, "-noenc", "-subj", f"/CN={cert_subject_name}", "-addext", "basicConstraints=critical,CA:FALSE", "-addext", "keyUsage=critical,digitalSignature,keyEncipherment", "-addext", "extendedKeyUsage=serverAuth,clientAuth", "-addext", sans_ext, ] if aia_url: req_cmd += ["-addext", f"authorityInfoAccess=caIssuers;URI:{aia_url}"] if cdp_url: req_cmd += ["-addext", f"crlDistributionPoints=URI:{cdp_url}"] x509_cmd = [ OPENSSL, "x509", "-req", "-CA", ca_cert_path, "-CAkey", ca_key_path, "-copy_extensions", "copyall", "-days", str(days), "-text", "-out", cert_out, ] if not _pipe(req_cmd, x509_cmd): _err("Failed to generate server certificate and key.") return False _config.append_history(issuing_ca or "ca", { "name": cert_subject_name, "serial": _read_serial(cert_out), "created": _now(), "expires": _read_expiry(cert_out), }) return True def make_pfx(cert_path, ca_dir, issuing_ca=None, password=None, apple_openssl=False): if issuing_ca == "ca": _err("--issuing-ca cannot be 'ca' as it is reserved for the root CA.") return False cert_dir = os.path.dirname(cert_path) cert_basename = os.path.basename(cert_path) cert_name = cert_basename[:-len("_cert.pem")] if cert_basename.endswith("_cert.pem") else cert_basename key_path = os.path.join(cert_dir, f"{cert_name}_key.pem") if not cert_dir or not os.path.isdir(cert_dir): _err(f"Certificate directory {cert_dir} does not exist.") return False if not ca_dir or not os.path.isdir(ca_dir): _err(f"CA directory {ca_dir} does not exist.") return False if not os.path.isfile(cert_path) or not os.path.isfile(key_path): _err("Server certificate or key not found.") return False root_ca_cert_path = os.path.join(ca_dir, "ca_cert.pem") root_ca_key_path = os.path.join(ca_dir, "ca_key.pem") if not os.path.isfile(root_ca_cert_path) or not os.path.isfile(root_ca_key_path): _err(f"CA certificate or key not found in {ca_dir}.") return False if issuing_ca: issuing_ca_cert_path = os.path.join(ca_dir, issuing_ca, "ca_cert.pem") if not os.path.isfile(issuing_ca_cert_path): _err(f"Issuing CA certificate not found: {issuing_ca_cert_path}.") return False if not password: password = "changeit" pfx_path = os.path.join(cert_dir, f"{cert_name}.pfx") if os.path.isfile(pfx_path): print("PKCS#12 (PFX) file already exists, aborting generation.") return False print("Generating PKCS#12 (PFX) file...", end="") chain_bytes = b"" with open(root_ca_cert_path, "rb") as f: chain_bytes += f.read() if issuing_ca: with open(issuing_ca_cert_path, "rb") as f: chain_bytes += f.read() chain_fd, chain_file = tempfile.mkstemp() try: with os.fdopen(chain_fd, "wb") as f: f.write(chain_bytes) cmd = [ "/usr/bin/openssl" if apple_openssl else OPENSSL, "pkcs12", "-export", "-out", pfx_path, "-inkey", key_path, "-in", cert_path, "-certfile", chain_file, "-password", f"pass:{password}", ] if subprocess.run(cmd).returncode != 0: _err("Failed to generate PKCS#12 (PFX) file.") return False finally: if os.path.exists(chain_file): os.remove(chain_file) print("done.") return True def make_crl(ca_dir, issuing_ca=None, days=30): signing_dir = os.path.join(ca_dir, issuing_ca) if issuing_ca else ca_dir ca_cert = os.path.join(signing_dir, "ca_cert.pem") ca_key = os.path.join(signing_dir, "ca_key.pem") if not os.path.isfile(ca_cert) or not os.path.isfile(ca_key): _err(f"CA certificate or key not found in {signing_dir}.") return False crl_path = os.path.join(signing_dir, "crl.pem") history_key = issuing_ca or "ca" revoked_entries = [ e for e in _config.get("history", {}).get(history_key, []) if "revoked" in e and "expires" in e ] with tempfile.TemporaryDirectory() as tmp: index_txt = os.path.join(tmp, "index.txt") crlnumber = os.path.join(tmp, "crlnumber") cnf_path = os.path.join(tmp, "openssl.cnf") with open(index_txt, "w") as f: for e in revoked_entries: expires_asn1 = _iso_to_asn1(e["expires"]) revoked_asn1 = _iso_to_asn1(e["revoked"]) f.write(f"R\t{expires_asn1}\t{revoked_asn1}\t{e['serial']}\tunknown\t/CN={e['name']}\n") with open(crlnumber, "w") as f: f.write("01\n") with open(cnf_path, "w") as f: f.write( "[ ca ]\ndefault_ca = CA_default\n\n" "[ CA_default ]\n" f"database = {index_txt}\n" f"crlnumber = {crlnumber}\n" f"certificate = {ca_cert}\n" f"private_key = {ca_key}\n" f"default_crl_days = {days}\n" "default_md = sha256\n\n" "[ crl_ext ]\n" "authorityKeyIdentifier = keyid:always\n" ) if subprocess.run([OPENSSL, "ca", "-gencrl", "-config", cnf_path, "-out", crl_path]).returncode != 0: _err("Failed to generate CRL.") return False print(f"CRL written to {crl_path}") return True def revoke_cert(cert_path, ca_dir, issuing_ca=None): if not os.path.isfile(cert_path): _err(f"Certificate not found: {cert_path}") return False signing_dir = os.path.join(ca_dir, issuing_ca) if issuing_ca else ca_dir if not os.path.isdir(signing_dir): _err(f"CA directory not found: {signing_dir}") return False ca_key = issuing_ca or "ca" serial = _read_serial(cert_path) result = _config.revoke_in_history(ca_key, serial, _now()) if result is None: _err(f"Certificate with serial {serial} not found in history for CA '{ca_key}'.") return False if result is False: print(f"Certificate {cert_path} (serial {serial}) is already revoked.") return True print(f"Certificate {cert_path} (serial {serial}) marked as revoked.") return True def _build_parser(): parser = argparse.ArgumentParser( description="Simple CA for creating and managing test certificates." ) sub = parser.add_subparsers(dest="command", required=True) p_ca = sub.add_parser("make-ca", help="Create a root or issuing CA.") p_ca.add_argument("--days", type=int, default=None, help="Validity period in days (default: 3650)") p_ca.add_argument("--issuing-ca", default=None, help="Specify the issuing CA") p_ca.add_argument("--ca-publish-base-url", default=None, help="Base URL for AIA and CRL distribution point extensions") p_ca.add_argument("--ca-dir", help="Directory to store the CA files") p_ca.add_argument("--openssl", default=None, metavar="PATH", help=f"Path to the openssl binary (default: {OPENSSL})") p_ca.add_argument("ca_name", help="Name of the CA") p_cert = sub.add_parser("make-cert", help="Create a server/client certificate.") p_cert.add_argument("--ca-dir", help="Directory of the CA") p_cert.add_argument("--issuing-ca", default=None, help="Specify the issuing CA") p_cert.add_argument("--days", type=int, default=None, help="Validity period in days (default: 365)") p_cert.add_argument("--cert-dir", help="Directory to store the certificate files") p_cert.add_argument("--openssl", default=None, metavar="PATH", help=f"Path to the openssl binary (default: {OPENSSL})") p_cert.add_argument("subject_name", help="Subject name for the certificate") p_cert.add_argument("sans", nargs="*", help="Subject Alternative Names (SANs) for the certificate") p_pfx = sub.add_parser("make-pfx", help="Create a PKCS#12 (PFX) bundle.") p_pfx.add_argument("--issuing-ca", default=None, help="Specify the issuing CA") p_pfx.add_argument("--ca-dir", help="Directory of the CA") p_pfx.add_argument("--password", help="Password for the PFX file") p_pfx.add_argument("--apple-openssl", action="store_true", default=False, help="Use Apple's bundled /usr/bin/openssl for PKCS12 generation") p_pfx.add_argument("path", help="Path to the certificate file") p_crl = sub.add_parser("make-crl", help="Generate a CRL for a CA.") p_crl.add_argument("--ca-dir", help="Directory of the CA") p_crl.add_argument("--issuing-ca", default=None, help="Generate CRL for this issuing CA") p_crl.add_argument("--days", type=int, default=None, help="CRL validity in days (default: 30)") p_rev = sub.add_parser("revoke-cert", help="Revoke a certificate.") p_rev.add_argument("--ca-dir", help="Directory of the CA") p_rev.add_argument("--issuing-ca", default=None, help="Issuing CA that signed the certificate") p_rev.add_argument("cert_path", help="Path to the certificate file to revoke") return parser def main(argv=None): global OPENSSL, _config parser = _build_parser() args = parser.parse_args(argv) ca_dir = args.ca_dir or os.environ.get("SIMPLE_CA_DIR") or os.getcwd() _config = Config(ca_dir) OPENSSL = getattr(args, "openssl", None) or _config.get("openssl", OPENSSL) issuing_ca = args.issuing_ca or _config.get("issuing_ca") ca_publish_base_url = getattr(args, "ca_publish_base_url", None) or _config.get("ca_publish_base_url") days_cfg = _config.get("days", {}) if args.command == "make-ca": days = args.days or days_cfg.get("ca", 3650) ok = make_ca( ca_dir, args.ca_name, days=days, issuing_ca=issuing_ca, ca_publish_base_url=ca_publish_base_url, ) elif args.command == "make-cert": days = args.days or days_cfg.get("cert", 365) ok = make_cert( args.cert_dir, args.subject_name, sans=args.sans, ca_dir=ca_dir, issuing_ca=issuing_ca, days=days, ca_publish_base_url=ca_publish_base_url, ) elif args.command == "make-pfx": ok = make_pfx( args.path, ca_dir, issuing_ca=issuing_ca, password=args.password, apple_openssl=args.apple_openssl, ) elif args.command == "make-crl": days = args.days or days_cfg.get("crl", 30) ok = make_crl(ca_dir, issuing_ca=issuing_ca, days=days) elif args.command == "revoke-cert": ok = revoke_cert(args.cert_path, ca_dir, issuing_ca=issuing_ca) else: parser.error(f"Unknown command: {args.command}") ok = False return 0 if ok else 1 if __name__ == "__main__": sys.exit(main())