Added a convertion of Bash script to Python.
This commit is contained in:
435
simple-ca.py
Normal file
435
simple-ca.py
Normal file
@@ -0,0 +1,435 @@
|
||||
# MIT License
|
||||
#
|
||||
# Copyright (c) 2026 Sławomir Koszewski
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
|
||||
# This module requires Python 3.8+ and OpenSSL to be installed on the system.
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
|
||||
def _err(msg):
|
||||
print(f"ERROR: {msg}", file=sys.stderr)
|
||||
|
||||
|
||||
def make_hash_link(cert_path):
|
||||
if not os.path.isfile(cert_path):
|
||||
_err(f"Certificate file {cert_path} does not exist.")
|
||||
return False
|
||||
|
||||
cert_dir = os.path.dirname(cert_path)
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["openssl", "x509", "-in", cert_path, "-noout", "-hash"],
|
||||
capture_output=True, text=True, check=True,
|
||||
)
|
||||
except subprocess.CalledProcessError:
|
||||
_err(f"Failed to calculate hash for certificate {cert_path}.")
|
||||
return False
|
||||
|
||||
cert_hash = result.stdout.strip()
|
||||
if not cert_hash:
|
||||
_err(f"Failed to calculate hash for certificate {cert_path}.")
|
||||
return False
|
||||
|
||||
link_path = os.path.join(cert_dir, f"{cert_hash}.0")
|
||||
target = os.path.basename(cert_path)
|
||||
if os.path.islink(link_path) or os.path.exists(link_path):
|
||||
os.remove(link_path)
|
||||
os.symlink(target, link_path)
|
||||
return True
|
||||
|
||||
|
||||
def make_ca(ca_dir, ca_name, days=3650, issuing_ca=None, aia_base_url=None):
|
||||
if issuing_ca == "ca":
|
||||
_err("--issuing-ca cannot be 'ca' as it is reserved for the root CA.")
|
||||
return False
|
||||
|
||||
if not ca_dir or not os.path.isdir(ca_dir):
|
||||
_err(f"Certificate directory {ca_dir} does not exist.")
|
||||
return False
|
||||
|
||||
if not ca_name:
|
||||
_err("CA name is required.")
|
||||
return False
|
||||
|
||||
aia_file = os.path.join(ca_dir, "aia_base_url.txt")
|
||||
if not aia_base_url and os.path.isfile(aia_file):
|
||||
with open(aia_file, "r") as f:
|
||||
aia_base_url = f.read().strip()
|
||||
|
||||
ca_file_prefix = issuing_ca or "ca"
|
||||
root_ca_cert = "ca_cert.pem"
|
||||
root_ca_key = "ca_key.pem"
|
||||
ca_cert = f"{ca_file_prefix}_cert.pem"
|
||||
ca_key = f"{ca_file_prefix}_key.pem"
|
||||
|
||||
root_ca_cert_path = os.path.join(ca_dir, root_ca_cert)
|
||||
root_ca_key_path = os.path.join(ca_dir, root_ca_key)
|
||||
ca_cert_path = os.path.join(ca_dir, ca_cert)
|
||||
ca_key_path = os.path.join(ca_dir, ca_key)
|
||||
|
||||
if not os.path.isfile(root_ca_cert_path) or not os.path.isfile(root_ca_key_path):
|
||||
if root_ca_cert != ca_cert:
|
||||
_err(
|
||||
f"Cannot create issuing CA '{ca_name}' without existing root CA "
|
||||
"certificate and key. Please create the root CA first."
|
||||
)
|
||||
return False
|
||||
|
||||
print(f"Generating CA certificate '{ca_name}' and key...")
|
||||
# Path length constraint of 1 is set for the root CA to allow creating
|
||||
# one level of issuing CAs, but prevent a longer chain which is not
|
||||
# supported by this script.
|
||||
cmd = [
|
||||
"openssl", "req",
|
||||
"-x509",
|
||||
"-newkey", "rsa:4096",
|
||||
"-keyout", root_ca_key_path,
|
||||
"-out", root_ca_cert_path,
|
||||
"-days", str(days),
|
||||
"-noenc",
|
||||
"-subj", f"/CN={ca_name}",
|
||||
"-text",
|
||||
"-addext", "basicConstraints=critical,CA:TRUE,pathlen:1",
|
||||
"-addext", "keyUsage=critical,keyCertSign,cRLSign",
|
||||
]
|
||||
if subprocess.run(cmd).returncode != 0:
|
||||
_err("Failed to generate CA certificate and key.")
|
||||
return False
|
||||
|
||||
make_hash_link(root_ca_cert_path)
|
||||
|
||||
if aia_base_url:
|
||||
with open(aia_file, "w") as f:
|
||||
f.write(aia_base_url)
|
||||
|
||||
return True
|
||||
|
||||
if not os.path.isfile(ca_cert_path) or not os.path.isfile(ca_key_path):
|
||||
print(f"Generating issuing CA certificate '{ca_name}' and key...")
|
||||
req_cmd = [
|
||||
"openssl", "req",
|
||||
"-newkey", "rsa:4096",
|
||||
"-keyout", ca_key_path,
|
||||
"-noenc",
|
||||
"-subj", f"/CN={ca_name}",
|
||||
"-addext", "basicConstraints=critical,CA:TRUE,pathlen:0",
|
||||
"-addext", "keyUsage=critical,keyCertSign,cRLSign",
|
||||
]
|
||||
if aia_base_url:
|
||||
req_cmd += [
|
||||
"-addext",
|
||||
f"authorityInfoAccess=caIssuers;URI:{aia_base_url}/ca_cert.crt",
|
||||
]
|
||||
x509_cmd = [
|
||||
"openssl", "x509",
|
||||
"-req",
|
||||
"-CA", root_ca_cert_path,
|
||||
"-CAkey", root_ca_key_path,
|
||||
"-copy_extensions", "copyall",
|
||||
"-days", str(days),
|
||||
"-text",
|
||||
"-out", ca_cert_path,
|
||||
]
|
||||
if not _pipe(req_cmd, x509_cmd):
|
||||
_err("Failed to generate issuing CA certificate and key.")
|
||||
return False
|
||||
|
||||
make_hash_link(ca_cert_path)
|
||||
|
||||
if aia_base_url:
|
||||
with open(aia_file, "w") as f:
|
||||
f.write(aia_base_url)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
_IP_RE = re.compile(r"^[0-9]{1,3}(\.[0-9]{1,3}){3}$")
|
||||
_DNS_RE = re.compile(r"^[a-z0-9-]+(\.[a-z0-9-]+)*$")
|
||||
|
||||
|
||||
def _is_ip(value):
|
||||
return bool(_IP_RE.match(value))
|
||||
|
||||
|
||||
def _is_dns(value):
|
||||
return bool(_DNS_RE.match(value))
|
||||
|
||||
|
||||
def _pipe(cmd1, cmd2):
|
||||
p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE)
|
||||
p2 = subprocess.Popen(cmd2, stdin=p1.stdout)
|
||||
p1.stdout.close()
|
||||
p2.communicate()
|
||||
p1.wait()
|
||||
return p1.returncode == 0 and p2.returncode == 0
|
||||
|
||||
|
||||
def make_cert(cert_dir, cert_subject_name, sans=None, ca_dir=None,
|
||||
issuing_ca=None, days=365):
|
||||
if issuing_ca == "ca":
|
||||
_err("--issuing-ca cannot be 'ca' as it is reserved for the root CA.")
|
||||
return False
|
||||
|
||||
ca_file_prefix = issuing_ca or "ca"
|
||||
ca_dir = ca_dir or cert_dir
|
||||
|
||||
aia_base_url_file = os.path.join(ca_dir, "aia_base_url.txt")
|
||||
aia_url = ""
|
||||
if os.path.isfile(aia_base_url_file):
|
||||
with open(aia_base_url_file, "r") as f:
|
||||
aia_url = f"{f.read().strip()}/{ca_file_prefix}_cert.crt"
|
||||
|
||||
ca_cert = f"{ca_file_prefix}_cert.pem"
|
||||
ca_key = f"{ca_file_prefix}_key.pem"
|
||||
|
||||
if not cert_dir or not os.path.isdir(cert_dir):
|
||||
_err(f"Certificate directory {cert_dir} does not exist.")
|
||||
return False
|
||||
|
||||
if not ca_dir or not os.path.isdir(ca_dir):
|
||||
_err(f"CA directory {ca_dir} does not exist.")
|
||||
return False
|
||||
|
||||
if not cert_subject_name:
|
||||
_err("Subject name is required.")
|
||||
return False
|
||||
|
||||
if not _is_dns(cert_subject_name):
|
||||
_err(f"Invalid subject name '{cert_subject_name}'. Must be a valid DNS name.")
|
||||
return False
|
||||
|
||||
ca_cert_path = os.path.join(ca_dir, ca_cert)
|
||||
ca_key_path = os.path.join(ca_dir, ca_key)
|
||||
if not os.path.isfile(ca_cert_path) or not os.path.isfile(ca_key_path):
|
||||
_err(
|
||||
f"Signing CA certificate and key not found in {ca_dir}. "
|
||||
"Please call setup a signing CA first."
|
||||
)
|
||||
return False
|
||||
|
||||
# "account" name from the subject: hostname part before the first dot
|
||||
cert_name = cert_subject_name.split(".", 1)[0]
|
||||
|
||||
san_entries = [f"DNS:{cert_subject_name}"]
|
||||
for entry in sans or []:
|
||||
if _is_ip(entry):
|
||||
san_entries.append(f"IP:{entry}")
|
||||
elif _is_dns(entry):
|
||||
san_entries.append(f"DNS:{entry}")
|
||||
else:
|
||||
_err(f"Invalid SAN entry '{entry}'")
|
||||
return False
|
||||
|
||||
sans_ext = "subjectAltName=" + ",".join(san_entries)
|
||||
|
||||
print(f"Generating server certificate for '{cert_subject_name}' with SANs:")
|
||||
for san in san_entries:
|
||||
print(f" - {san}")
|
||||
|
||||
cert_out = os.path.join(cert_dir, f"{cert_name}_cert.pem")
|
||||
key_out = os.path.join(cert_dir, f"{cert_name}_key.pem")
|
||||
|
||||
if not os.path.isfile(cert_out) or not os.path.isfile(key_out):
|
||||
print("Generating server certificate and key...")
|
||||
req_cmd = [
|
||||
"openssl", "req",
|
||||
"-newkey", "rsa:4096",
|
||||
"-keyout", key_out,
|
||||
"-noenc",
|
||||
"-subj", f"/CN={cert_subject_name}",
|
||||
"-addext", "basicConstraints=critical,CA:FALSE",
|
||||
"-addext", "keyUsage=critical,digitalSignature,keyEncipherment",
|
||||
"-addext", "extendedKeyUsage=serverAuth,clientAuth",
|
||||
"-addext", sans_ext,
|
||||
]
|
||||
if aia_url:
|
||||
req_cmd += ["-addext", f"authorityInfoAccess=caIssuers;URI:{aia_url}"]
|
||||
x509_cmd = [
|
||||
"openssl", "x509",
|
||||
"-req",
|
||||
"-CA", ca_cert_path,
|
||||
"-CAkey", ca_key_path,
|
||||
"-copy_extensions", "copyall",
|
||||
"-days", str(days),
|
||||
"-text",
|
||||
"-out", cert_out,
|
||||
]
|
||||
if not _pipe(req_cmd, x509_cmd):
|
||||
_err("Failed to generate server certificate and key.")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def make_pfx(cert_path, ca_dir, issuing_ca=None, password=None):
|
||||
if issuing_ca == "ca":
|
||||
_err("--issuing-ca cannot be 'ca' as it is reserved for the root CA.")
|
||||
return False
|
||||
|
||||
root_ca_cert = "ca_cert.pem"
|
||||
root_ca_key = "ca_key.pem"
|
||||
ca_file_prefix = issuing_ca or "ca"
|
||||
ca_cert = f"{ca_file_prefix}_cert.pem"
|
||||
ca_key = f"{ca_file_prefix}_key.pem"
|
||||
|
||||
cert_dir = os.path.dirname(cert_path)
|
||||
cert_basename = os.path.basename(cert_path)
|
||||
if cert_basename.endswith("_cert.pem"):
|
||||
cert_name = cert_basename[: -len("_cert.pem")]
|
||||
else:
|
||||
cert_name = cert_basename
|
||||
key_path = os.path.join(cert_dir, f"{cert_name}_key.pem")
|
||||
|
||||
if not cert_dir or not os.path.isdir(cert_dir):
|
||||
_err(f"Certificate directory {cert_dir} does not exist.")
|
||||
return False
|
||||
|
||||
if not ca_dir or not os.path.isdir(ca_dir):
|
||||
_err(f"CA directory {ca_dir} does not exist.")
|
||||
return False
|
||||
|
||||
if not os.path.isfile(cert_path) or not os.path.isfile(key_path):
|
||||
_err("Server certificate or key not found.")
|
||||
return False
|
||||
|
||||
root_ca_cert_path = os.path.join(ca_dir, root_ca_cert)
|
||||
root_ca_key_path = os.path.join(ca_dir, root_ca_key)
|
||||
if not os.path.isfile(root_ca_cert_path) or not os.path.isfile(root_ca_key_path):
|
||||
_err(f"CA certificate or key not found in {ca_dir}.")
|
||||
return False
|
||||
|
||||
ca_cert_path = os.path.join(ca_dir, ca_cert)
|
||||
ca_key_path = os.path.join(ca_dir, ca_key)
|
||||
if issuing_ca:
|
||||
if not os.path.isfile(ca_cert_path) or not os.path.isfile(ca_key_path):
|
||||
_err(f"Issuing CA certificate or key not found in {ca_dir}.")
|
||||
return False
|
||||
|
||||
if not password:
|
||||
password = "changeit"
|
||||
|
||||
pfx_path = os.path.join(cert_dir, f"{cert_name}.pfx")
|
||||
if os.path.isfile(pfx_path):
|
||||
print("PKCS#12 (PFX) file already exists, aborting generation.")
|
||||
return False
|
||||
|
||||
print("Generating PKCS#12 (PFX) file...", end="")
|
||||
|
||||
chain_bytes = b""
|
||||
with open(root_ca_cert_path, "rb") as f:
|
||||
chain_bytes += f.read()
|
||||
if issuing_ca:
|
||||
with open(ca_cert_path, "rb") as f:
|
||||
chain_bytes += f.read()
|
||||
|
||||
import tempfile
|
||||
chain_fd, chain_file = tempfile.mkstemp()
|
||||
try:
|
||||
with os.fdopen(chain_fd, "wb") as f:
|
||||
f.write(chain_bytes)
|
||||
cmd = [
|
||||
"openssl", "pkcs12",
|
||||
"-export", "-out", pfx_path,
|
||||
"-inkey", key_path,
|
||||
"-in", cert_path,
|
||||
"-certfile", chain_file,
|
||||
"-password", f"pass:{password}",
|
||||
]
|
||||
if subprocess.run(cmd).returncode != 0:
|
||||
_err("Failed to generate PKCS#12 (PFX) file.")
|
||||
return False
|
||||
finally:
|
||||
if os.path.exists(chain_file):
|
||||
os.remove(chain_file)
|
||||
|
||||
print("done.")
|
||||
return True
|
||||
|
||||
|
||||
def _build_parser():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Simple CA for creating and managing test certificates."
|
||||
)
|
||||
sub = parser.add_subparsers(dest="command", required=True)
|
||||
|
||||
p_ca = sub.add_parser("make-ca", help="Create a root or issuing CA.")
|
||||
p_ca.add_argument("--days", type=int, default=3650)
|
||||
p_ca.add_argument("--issuing-ca")
|
||||
p_ca.add_argument("--aia-base-url")
|
||||
p_ca.add_argument("ca_dir")
|
||||
p_ca.add_argument("ca_name")
|
||||
|
||||
p_cert = sub.add_parser("make-cert", help="Create a server/client certificate.")
|
||||
p_cert.add_argument("--ca-dir")
|
||||
p_cert.add_argument("--issuing-ca")
|
||||
p_cert.add_argument("--days", type=int, default=365)
|
||||
p_cert.add_argument("cert_dir")
|
||||
p_cert.add_argument("subject_name")
|
||||
p_cert.add_argument("sans", nargs="*")
|
||||
|
||||
p_pfx = sub.add_parser("make-pfx", help="Create a PKCS#12 (PFX) bundle.")
|
||||
p_pfx.add_argument("--ca-dir", required=True)
|
||||
p_pfx.add_argument("--issuing-ca")
|
||||
p_pfx.add_argument("--path", required=True, dest="cert_path")
|
||||
p_pfx.add_argument("--password")
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
parser = _build_parser()
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
if args.command == "make-ca":
|
||||
ok = make_ca(
|
||||
args.ca_dir, args.ca_name,
|
||||
days=args.days,
|
||||
issuing_ca=args.issuing_ca,
|
||||
aia_base_url=args.aia_base_url,
|
||||
)
|
||||
elif args.command == "make-cert":
|
||||
ok = make_cert(
|
||||
args.cert_dir, args.subject_name,
|
||||
sans=args.sans,
|
||||
ca_dir=args.ca_dir,
|
||||
issuing_ca=args.issuing_ca,
|
||||
days=args.days,
|
||||
)
|
||||
elif args.command == "make-pfx":
|
||||
ok = make_pfx(
|
||||
args.cert_path, args.ca_dir,
|
||||
issuing_ca=args.issuing_ca,
|
||||
password=args.password,
|
||||
)
|
||||
else:
|
||||
parser.error(f"Unknown command: {args.command}")
|
||||
ok = False
|
||||
|
||||
return 0 if ok else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user