From c29537c9e6011a5fb8a572fbf50b6395dde178fc Mon Sep 17 00:00:00 2001 From: Slawomir Koszewski Date: Fri, 24 Apr 2026 23:02:57 +0200 Subject: [PATCH] Added a convertion of Bash script to Python. --- simple-ca.py | 435 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 435 insertions(+) create mode 100644 simple-ca.py diff --git a/simple-ca.py b/simple-ca.py new file mode 100644 index 0000000..ed393a4 --- /dev/null +++ b/simple-ca.py @@ -0,0 +1,435 @@ +# MIT License +# +# Copyright (c) 2026 Sławomir Koszewski +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +# This module requires Python 3.8+ and OpenSSL to be installed on the system. + +import argparse +import os +import re +import subprocess +import sys + + +def _err(msg): + print(f"ERROR: {msg}", file=sys.stderr) + + +def make_hash_link(cert_path): + if not os.path.isfile(cert_path): + _err(f"Certificate file {cert_path} does not exist.") + return False + + cert_dir = os.path.dirname(cert_path) + try: + result = subprocess.run( + ["openssl", "x509", "-in", cert_path, "-noout", "-hash"], + capture_output=True, text=True, check=True, + ) + except subprocess.CalledProcessError: + _err(f"Failed to calculate hash for certificate {cert_path}.") + return False + + cert_hash = result.stdout.strip() + if not cert_hash: + _err(f"Failed to calculate hash for certificate {cert_path}.") + return False + + link_path = os.path.join(cert_dir, f"{cert_hash}.0") + target = os.path.basename(cert_path) + if os.path.islink(link_path) or os.path.exists(link_path): + os.remove(link_path) + os.symlink(target, link_path) + return True + + +def make_ca(ca_dir, ca_name, days=3650, issuing_ca=None, aia_base_url=None): + if issuing_ca == "ca": + _err("--issuing-ca cannot be 'ca' as it is reserved for the root CA.") + return False + + if not ca_dir or not os.path.isdir(ca_dir): + _err(f"Certificate directory {ca_dir} does not exist.") + return False + + if not ca_name: + _err("CA name is required.") + return False + + aia_file = os.path.join(ca_dir, "aia_base_url.txt") + if not aia_base_url and os.path.isfile(aia_file): + with open(aia_file, "r") as f: + aia_base_url = f.read().strip() + + ca_file_prefix = issuing_ca or "ca" + root_ca_cert = "ca_cert.pem" + root_ca_key = "ca_key.pem" + ca_cert = f"{ca_file_prefix}_cert.pem" + ca_key = f"{ca_file_prefix}_key.pem" + + root_ca_cert_path = os.path.join(ca_dir, root_ca_cert) + root_ca_key_path = os.path.join(ca_dir, root_ca_key) + ca_cert_path = os.path.join(ca_dir, ca_cert) + ca_key_path = os.path.join(ca_dir, ca_key) + + if not os.path.isfile(root_ca_cert_path) or not os.path.isfile(root_ca_key_path): + if root_ca_cert != ca_cert: + _err( + f"Cannot create issuing CA '{ca_name}' without existing root CA " + "certificate and key. Please create the root CA first." + ) + return False + + print(f"Generating CA certificate '{ca_name}' and key...") + # Path length constraint of 1 is set for the root CA to allow creating + # one level of issuing CAs, but prevent a longer chain which is not + # supported by this script. + cmd = [ + "openssl", "req", + "-x509", + "-newkey", "rsa:4096", + "-keyout", root_ca_key_path, + "-out", root_ca_cert_path, + "-days", str(days), + "-noenc", + "-subj", f"/CN={ca_name}", + "-text", + "-addext", "basicConstraints=critical,CA:TRUE,pathlen:1", + "-addext", "keyUsage=critical,keyCertSign,cRLSign", + ] + if subprocess.run(cmd).returncode != 0: + _err("Failed to generate CA certificate and key.") + return False + + make_hash_link(root_ca_cert_path) + + if aia_base_url: + with open(aia_file, "w") as f: + f.write(aia_base_url) + + return True + + if not os.path.isfile(ca_cert_path) or not os.path.isfile(ca_key_path): + print(f"Generating issuing CA certificate '{ca_name}' and key...") + req_cmd = [ + "openssl", "req", + "-newkey", "rsa:4096", + "-keyout", ca_key_path, + "-noenc", + "-subj", f"/CN={ca_name}", + "-addext", "basicConstraints=critical,CA:TRUE,pathlen:0", + "-addext", "keyUsage=critical,keyCertSign,cRLSign", + ] + if aia_base_url: + req_cmd += [ + "-addext", + f"authorityInfoAccess=caIssuers;URI:{aia_base_url}/ca_cert.crt", + ] + x509_cmd = [ + "openssl", "x509", + "-req", + "-CA", root_ca_cert_path, + "-CAkey", root_ca_key_path, + "-copy_extensions", "copyall", + "-days", str(days), + "-text", + "-out", ca_cert_path, + ] + if not _pipe(req_cmd, x509_cmd): + _err("Failed to generate issuing CA certificate and key.") + return False + + make_hash_link(ca_cert_path) + + if aia_base_url: + with open(aia_file, "w") as f: + f.write(aia_base_url) + + return True + + +_IP_RE = re.compile(r"^[0-9]{1,3}(\.[0-9]{1,3}){3}$") +_DNS_RE = re.compile(r"^[a-z0-9-]+(\.[a-z0-9-]+)*$") + + +def _is_ip(value): + return bool(_IP_RE.match(value)) + + +def _is_dns(value): + return bool(_DNS_RE.match(value)) + + +def _pipe(cmd1, cmd2): + p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE) + p2 = subprocess.Popen(cmd2, stdin=p1.stdout) + p1.stdout.close() + p2.communicate() + p1.wait() + return p1.returncode == 0 and p2.returncode == 0 + + +def make_cert(cert_dir, cert_subject_name, sans=None, ca_dir=None, + issuing_ca=None, days=365): + if issuing_ca == "ca": + _err("--issuing-ca cannot be 'ca' as it is reserved for the root CA.") + return False + + ca_file_prefix = issuing_ca or "ca" + ca_dir = ca_dir or cert_dir + + aia_base_url_file = os.path.join(ca_dir, "aia_base_url.txt") + aia_url = "" + if os.path.isfile(aia_base_url_file): + with open(aia_base_url_file, "r") as f: + aia_url = f"{f.read().strip()}/{ca_file_prefix}_cert.crt" + + ca_cert = f"{ca_file_prefix}_cert.pem" + ca_key = f"{ca_file_prefix}_key.pem" + + if not cert_dir or not os.path.isdir(cert_dir): + _err(f"Certificate directory {cert_dir} does not exist.") + return False + + if not ca_dir or not os.path.isdir(ca_dir): + _err(f"CA directory {ca_dir} does not exist.") + return False + + if not cert_subject_name: + _err("Subject name is required.") + return False + + if not _is_dns(cert_subject_name): + _err(f"Invalid subject name '{cert_subject_name}'. Must be a valid DNS name.") + return False + + ca_cert_path = os.path.join(ca_dir, ca_cert) + ca_key_path = os.path.join(ca_dir, ca_key) + if not os.path.isfile(ca_cert_path) or not os.path.isfile(ca_key_path): + _err( + f"Signing CA certificate and key not found in {ca_dir}. " + "Please call setup a signing CA first." + ) + return False + + # "account" name from the subject: hostname part before the first dot + cert_name = cert_subject_name.split(".", 1)[0] + + san_entries = [f"DNS:{cert_subject_name}"] + for entry in sans or []: + if _is_ip(entry): + san_entries.append(f"IP:{entry}") + elif _is_dns(entry): + san_entries.append(f"DNS:{entry}") + else: + _err(f"Invalid SAN entry '{entry}'") + return False + + sans_ext = "subjectAltName=" + ",".join(san_entries) + + print(f"Generating server certificate for '{cert_subject_name}' with SANs:") + for san in san_entries: + print(f" - {san}") + + cert_out = os.path.join(cert_dir, f"{cert_name}_cert.pem") + key_out = os.path.join(cert_dir, f"{cert_name}_key.pem") + + if not os.path.isfile(cert_out) or not os.path.isfile(key_out): + print("Generating server certificate and key...") + req_cmd = [ + "openssl", "req", + "-newkey", "rsa:4096", + "-keyout", key_out, + "-noenc", + "-subj", f"/CN={cert_subject_name}", + "-addext", "basicConstraints=critical,CA:FALSE", + "-addext", "keyUsage=critical,digitalSignature,keyEncipherment", + "-addext", "extendedKeyUsage=serverAuth,clientAuth", + "-addext", sans_ext, + ] + if aia_url: + req_cmd += ["-addext", f"authorityInfoAccess=caIssuers;URI:{aia_url}"] + x509_cmd = [ + "openssl", "x509", + "-req", + "-CA", ca_cert_path, + "-CAkey", ca_key_path, + "-copy_extensions", "copyall", + "-days", str(days), + "-text", + "-out", cert_out, + ] + if not _pipe(req_cmd, x509_cmd): + _err("Failed to generate server certificate and key.") + return False + + return True + + +def make_pfx(cert_path, ca_dir, issuing_ca=None, password=None): + if issuing_ca == "ca": + _err("--issuing-ca cannot be 'ca' as it is reserved for the root CA.") + return False + + root_ca_cert = "ca_cert.pem" + root_ca_key = "ca_key.pem" + ca_file_prefix = issuing_ca or "ca" + ca_cert = f"{ca_file_prefix}_cert.pem" + ca_key = f"{ca_file_prefix}_key.pem" + + cert_dir = os.path.dirname(cert_path) + cert_basename = os.path.basename(cert_path) + if cert_basename.endswith("_cert.pem"): + cert_name = cert_basename[: -len("_cert.pem")] + else: + cert_name = cert_basename + key_path = os.path.join(cert_dir, f"{cert_name}_key.pem") + + if not cert_dir or not os.path.isdir(cert_dir): + _err(f"Certificate directory {cert_dir} does not exist.") + return False + + if not ca_dir or not os.path.isdir(ca_dir): + _err(f"CA directory {ca_dir} does not exist.") + return False + + if not os.path.isfile(cert_path) or not os.path.isfile(key_path): + _err("Server certificate or key not found.") + return False + + root_ca_cert_path = os.path.join(ca_dir, root_ca_cert) + root_ca_key_path = os.path.join(ca_dir, root_ca_key) + if not os.path.isfile(root_ca_cert_path) or not os.path.isfile(root_ca_key_path): + _err(f"CA certificate or key not found in {ca_dir}.") + return False + + ca_cert_path = os.path.join(ca_dir, ca_cert) + ca_key_path = os.path.join(ca_dir, ca_key) + if issuing_ca: + if not os.path.isfile(ca_cert_path) or not os.path.isfile(ca_key_path): + _err(f"Issuing CA certificate or key not found in {ca_dir}.") + return False + + if not password: + password = "changeit" + + pfx_path = os.path.join(cert_dir, f"{cert_name}.pfx") + if os.path.isfile(pfx_path): + print("PKCS#12 (PFX) file already exists, aborting generation.") + return False + + print("Generating PKCS#12 (PFX) file...", end="") + + chain_bytes = b"" + with open(root_ca_cert_path, "rb") as f: + chain_bytes += f.read() + if issuing_ca: + with open(ca_cert_path, "rb") as f: + chain_bytes += f.read() + + import tempfile + chain_fd, chain_file = tempfile.mkstemp() + try: + with os.fdopen(chain_fd, "wb") as f: + f.write(chain_bytes) + cmd = [ + "openssl", "pkcs12", + "-export", "-out", pfx_path, + "-inkey", key_path, + "-in", cert_path, + "-certfile", chain_file, + "-password", f"pass:{password}", + ] + if subprocess.run(cmd).returncode != 0: + _err("Failed to generate PKCS#12 (PFX) file.") + return False + finally: + if os.path.exists(chain_file): + os.remove(chain_file) + + print("done.") + return True + + +def _build_parser(): + parser = argparse.ArgumentParser( + description="Simple CA for creating and managing test certificates." + ) + sub = parser.add_subparsers(dest="command", required=True) + + p_ca = sub.add_parser("make-ca", help="Create a root or issuing CA.") + p_ca.add_argument("--days", type=int, default=3650) + p_ca.add_argument("--issuing-ca") + p_ca.add_argument("--aia-base-url") + p_ca.add_argument("ca_dir") + p_ca.add_argument("ca_name") + + p_cert = sub.add_parser("make-cert", help="Create a server/client certificate.") + p_cert.add_argument("--ca-dir") + p_cert.add_argument("--issuing-ca") + p_cert.add_argument("--days", type=int, default=365) + p_cert.add_argument("cert_dir") + p_cert.add_argument("subject_name") + p_cert.add_argument("sans", nargs="*") + + p_pfx = sub.add_parser("make-pfx", help="Create a PKCS#12 (PFX) bundle.") + p_pfx.add_argument("--ca-dir", required=True) + p_pfx.add_argument("--issuing-ca") + p_pfx.add_argument("--path", required=True, dest="cert_path") + p_pfx.add_argument("--password") + + return parser + + +def main(argv=None): + parser = _build_parser() + args = parser.parse_args(argv) + + if args.command == "make-ca": + ok = make_ca( + args.ca_dir, args.ca_name, + days=args.days, + issuing_ca=args.issuing_ca, + aia_base_url=args.aia_base_url, + ) + elif args.command == "make-cert": + ok = make_cert( + args.cert_dir, args.subject_name, + sans=args.sans, + ca_dir=args.ca_dir, + issuing_ca=args.issuing_ca, + days=args.days, + ) + elif args.command == "make-pfx": + ok = make_pfx( + args.cert_path, args.ca_dir, + issuing_ca=args.issuing_ca, + password=args.password, + ) + else: + parser.error(f"Unknown command: {args.command}") + ok = False + + return 0 if ok else 1 + + +if __name__ == "__main__": + sys.exit(main())