Refactor CA management and certificate generation
- Removed the legacy simple-ca.sh script, consolidating functionality into Go code. - Introduced a JSON configuration file (simple-ca.json) to manage CA settings, including validity periods and AIA base URLs. - Enhanced the makeCA function to utilize configuration values for CA creation and AIA URL management. - Updated makeCert and makePFX functions to support configuration-driven behavior. - Improved error handling and user feedback throughout the CA and certificate generation processes. - Added support for using Apple's OpenSSL for PKCS#12 file generation.
This commit is contained in:
+81
-79
@@ -30,50 +30,57 @@ import re
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
OPENSSL = "/usr/bin/openssl"
|
||||
OPENSSL = "openssl"
|
||||
|
||||
CONFIG_FILE = "simple-ca.json"
|
||||
class Config:
|
||||
FILE = "simple-ca.json"
|
||||
|
||||
def __init__(self, ca_dir: str):
|
||||
self._path = os.path.join(ca_dir, self.FILE)
|
||||
self._data: dict = {}
|
||||
if not os.path.isfile(self._path):
|
||||
return
|
||||
try:
|
||||
with open(self._path, "r") as f:
|
||||
self._data = json.load(f)
|
||||
except (json.JSONDecodeError, OSError) as e:
|
||||
print(f"WARNING: could not read {self._path}: {e}", file=sys.stderr)
|
||||
|
||||
def get(self, key, default=None):
|
||||
return self._data.get(key, default)
|
||||
|
||||
def update(self, patch: dict):
|
||||
file_missing = not os.path.isfile(self._path)
|
||||
changed = any(self._data.get(k) != v for k, v in patch.items())
|
||||
if file_missing or changed:
|
||||
self._data.update(patch)
|
||||
self._save()
|
||||
|
||||
def _save(self):
|
||||
with open(self._path, "w") as f:
|
||||
json.dump(self._data, f, indent=2)
|
||||
f.write("\n")
|
||||
|
||||
|
||||
_config: Config
|
||||
|
||||
|
||||
def _err(msg):
|
||||
print(f"ERROR: {msg}", file=sys.stderr)
|
||||
|
||||
|
||||
def _load_config(ca_dir) -> dict:
|
||||
path = os.path.join(ca_dir, CONFIG_FILE)
|
||||
if not os.path.isfile(path):
|
||||
return {}
|
||||
try:
|
||||
with open(path, "r") as f:
|
||||
return json.load(f)
|
||||
except (json.JSONDecodeError, OSError) as e:
|
||||
print(f"WARNING: could not read {path}: {e}", file=sys.stderr)
|
||||
return {}
|
||||
|
||||
|
||||
def _save_config(ca_dir, patch: dict):
|
||||
path = os.path.join(ca_dir, CONFIG_FILE)
|
||||
cfg = _load_config(ca_dir)
|
||||
cfg.update(patch)
|
||||
with open(path, "w") as f:
|
||||
json.dump(cfg, f, indent=2)
|
||||
f.write("\n")
|
||||
|
||||
|
||||
def _rebuild_ca_bundle(ca_dir):
|
||||
"""Write ca_bundle.pem = root cert + any issuing CA certs in this dir."""
|
||||
"""Write ca_bundle.pem = root cert + registered subordinate CA certs."""
|
||||
bundle_path = os.path.join(ca_dir, "ca_bundle.pem")
|
||||
parts = []
|
||||
root = os.path.join(ca_dir, "ca_cert.pem")
|
||||
if os.path.isfile(root):
|
||||
with open(root, "rb") as f:
|
||||
parts.append(f.read())
|
||||
for name in sorted(os.listdir(ca_dir)):
|
||||
if name == "ca_cert.pem" or not name.endswith("_cert.pem"):
|
||||
continue
|
||||
path = os.path.join(ca_dir, name)
|
||||
if os.path.isfile(path):
|
||||
with open(path, "rb") as f:
|
||||
for name in sorted(_config.get("subordinates", [])):
|
||||
sub_cert = os.path.join(ca_dir, name, "ca_cert.pem")
|
||||
if os.path.isfile(sub_cert):
|
||||
with open(sub_cert, "rb") as f:
|
||||
parts.append(f.read())
|
||||
with open(bundle_path, "wb") as f:
|
||||
f.write(b"".join(parts))
|
||||
@@ -92,19 +99,11 @@ def make_ca(ca_dir, ca_name, days=3650, issuing_ca=None, aia_base_url=None):
|
||||
_err("CA name is required.")
|
||||
return False
|
||||
|
||||
ca_file_prefix = issuing_ca or "ca"
|
||||
root_ca_cert = "ca_cert.pem"
|
||||
root_ca_key = "ca_key.pem"
|
||||
ca_cert = f"{ca_file_prefix}_cert.pem"
|
||||
ca_key = f"{ca_file_prefix}_key.pem"
|
||||
|
||||
root_ca_cert_path = os.path.join(ca_dir, root_ca_cert)
|
||||
root_ca_key_path = os.path.join(ca_dir, root_ca_key)
|
||||
ca_cert_path = os.path.join(ca_dir, ca_cert)
|
||||
ca_key_path = os.path.join(ca_dir, ca_key)
|
||||
root_ca_cert_path = os.path.join(ca_dir, "ca_cert.pem")
|
||||
root_ca_key_path = os.path.join(ca_dir, "ca_key.pem")
|
||||
|
||||
if not os.path.isfile(root_ca_cert_path) or not os.path.isfile(root_ca_key_path):
|
||||
if root_ca_cert != ca_cert:
|
||||
if issuing_ca:
|
||||
_err(
|
||||
f"Cannot create issuing CA '{ca_name}' without existing root CA "
|
||||
"certificate and key. Please create the root CA first."
|
||||
@@ -133,15 +132,20 @@ def make_ca(ca_dir, ca_name, days=3650, issuing_ca=None, aia_base_url=None):
|
||||
return False
|
||||
|
||||
_rebuild_ca_bundle(ca_dir)
|
||||
_save_config(ca_dir, {"aia_base_url": aia_base_url} if aia_base_url else {})
|
||||
_config.update({"aia_base_url": aia_base_url} if aia_base_url else {})
|
||||
return True
|
||||
|
||||
if not os.path.isfile(ca_cert_path) or not os.path.isfile(ca_key_path):
|
||||
issuing_ca_dir = os.path.join(ca_dir, issuing_ca)
|
||||
issuing_ca_cert = os.path.join(issuing_ca_dir, "ca_cert.pem")
|
||||
issuing_ca_key = os.path.join(issuing_ca_dir, "ca_key.pem")
|
||||
|
||||
if not os.path.isfile(issuing_ca_cert) or not os.path.isfile(issuing_ca_key):
|
||||
print(f"Generating issuing CA certificate '{ca_name}' and key...")
|
||||
os.makedirs(issuing_ca_dir, exist_ok=True)
|
||||
req_cmd = [
|
||||
OPENSSL, "req",
|
||||
"-newkey", "rsa:4096",
|
||||
"-keyout", ca_key_path,
|
||||
"-keyout", issuing_ca_key,
|
||||
"-noenc",
|
||||
"-subj", f"/CN={ca_name}",
|
||||
"-addext", "basicConstraints=critical,CA:TRUE,pathlen:0",
|
||||
@@ -160,14 +164,18 @@ def make_ca(ca_dir, ca_name, days=3650, issuing_ca=None, aia_base_url=None):
|
||||
"-copy_extensions", "copyall",
|
||||
"-days", str(days),
|
||||
"-text",
|
||||
"-out", ca_cert_path,
|
||||
"-out", issuing_ca_cert,
|
||||
]
|
||||
if not _pipe(req_cmd, x509_cmd):
|
||||
_err("Failed to generate issuing CA certificate and key.")
|
||||
return False
|
||||
|
||||
patch = {"aia_base_url": aia_base_url} if aia_base_url else {}
|
||||
subs = _config.get("subordinates", [])
|
||||
if issuing_ca not in subs:
|
||||
patch["subordinates"] = subs + [issuing_ca]
|
||||
_config.update(patch)
|
||||
_rebuild_ca_bundle(ca_dir)
|
||||
_save_config(ca_dir, {"aia_base_url": aia_base_url} if aia_base_url else {})
|
||||
return True
|
||||
|
||||
|
||||
@@ -198,14 +206,8 @@ def make_cert(cert_dir, cert_subject_name, sans=None, ca_dir=None,
|
||||
_err("--issuing-ca cannot be 'ca' as it is reserved for the root CA.")
|
||||
return False
|
||||
|
||||
ca_file_prefix = issuing_ca or "ca"
|
||||
ca_dir = ca_dir or cert_dir
|
||||
|
||||
aia_url = f"{aia_base_url}/{ca_file_prefix}_cert.crt" if aia_base_url else ""
|
||||
|
||||
ca_cert = f"{ca_file_prefix}_cert.pem"
|
||||
ca_key = f"{ca_file_prefix}_key.pem"
|
||||
|
||||
if not cert_dir or not os.path.isdir(cert_dir):
|
||||
_err(f"Certificate directory {cert_dir} does not exist.")
|
||||
return False
|
||||
@@ -222,15 +224,21 @@ def make_cert(cert_dir, cert_subject_name, sans=None, ca_dir=None,
|
||||
_err(f"Invalid subject name '{cert_subject_name}'. Must be a valid DNS name.")
|
||||
return False
|
||||
|
||||
ca_cert_path = os.path.join(ca_dir, ca_cert)
|
||||
ca_key_path = os.path.join(ca_dir, ca_key)
|
||||
signing_dir = os.path.join(ca_dir, issuing_ca) if issuing_ca else ca_dir
|
||||
ca_cert_path = os.path.join(signing_dir, "ca_cert.pem")
|
||||
ca_key_path = os.path.join(signing_dir, "ca_key.pem")
|
||||
if not os.path.isfile(ca_cert_path) or not os.path.isfile(ca_key_path):
|
||||
_err(
|
||||
f"Signing CA certificate and key not found in {ca_dir}. "
|
||||
"Please call setup a signing CA first."
|
||||
f"Signing CA certificate and key not found in {signing_dir}. "
|
||||
"Please set up a signing CA first."
|
||||
)
|
||||
return False
|
||||
|
||||
if aia_base_url:
|
||||
aia_url = f"{aia_base_url}/{issuing_ca}/ca_cert.crt" if issuing_ca else f"{aia_base_url}/ca_cert.crt"
|
||||
else:
|
||||
aia_url = ""
|
||||
|
||||
# "account" name from the subject: hostname part before the first dot
|
||||
cert_name = cert_subject_name.split(".", 1)[0]
|
||||
|
||||
@@ -285,17 +293,11 @@ def make_cert(cert_dir, cert_subject_name, sans=None, ca_dir=None,
|
||||
return True
|
||||
|
||||
|
||||
def make_pfx(cert_path, ca_dir, issuing_ca=None, password=None):
|
||||
def make_pfx(cert_path, ca_dir, issuing_ca=None, password=None, apple_openssl=False):
|
||||
if issuing_ca == "ca":
|
||||
_err("--issuing-ca cannot be 'ca' as it is reserved for the root CA.")
|
||||
return False
|
||||
|
||||
root_ca_cert = "ca_cert.pem"
|
||||
root_ca_key = "ca_key.pem"
|
||||
ca_file_prefix = issuing_ca or "ca"
|
||||
ca_cert = f"{ca_file_prefix}_cert.pem"
|
||||
ca_key = f"{ca_file_prefix}_key.pem"
|
||||
|
||||
cert_dir = os.path.dirname(cert_path)
|
||||
cert_basename = os.path.basename(cert_path)
|
||||
if cert_basename.endswith("_cert.pem"):
|
||||
@@ -316,17 +318,16 @@ def make_pfx(cert_path, ca_dir, issuing_ca=None, password=None):
|
||||
_err("Server certificate or key not found.")
|
||||
return False
|
||||
|
||||
root_ca_cert_path = os.path.join(ca_dir, root_ca_cert)
|
||||
root_ca_key_path = os.path.join(ca_dir, root_ca_key)
|
||||
root_ca_cert_path = os.path.join(ca_dir, "ca_cert.pem")
|
||||
root_ca_key_path = os.path.join(ca_dir, "ca_key.pem")
|
||||
if not os.path.isfile(root_ca_cert_path) or not os.path.isfile(root_ca_key_path):
|
||||
_err(f"CA certificate or key not found in {ca_dir}.")
|
||||
return False
|
||||
|
||||
ca_cert_path = os.path.join(ca_dir, ca_cert)
|
||||
ca_key_path = os.path.join(ca_dir, ca_key)
|
||||
if issuing_ca:
|
||||
if not os.path.isfile(ca_cert_path) or not os.path.isfile(ca_key_path):
|
||||
_err(f"Issuing CA certificate or key not found in {ca_dir}.")
|
||||
issuing_ca_cert_path = os.path.join(ca_dir, issuing_ca, "ca_cert.pem")
|
||||
if not os.path.isfile(issuing_ca_cert_path):
|
||||
_err(f"Issuing CA certificate not found: {issuing_ca_cert_path}.")
|
||||
return False
|
||||
|
||||
if not password:
|
||||
@@ -343,7 +344,7 @@ def make_pfx(cert_path, ca_dir, issuing_ca=None, password=None):
|
||||
with open(root_ca_cert_path, "rb") as f:
|
||||
chain_bytes += f.read()
|
||||
if issuing_ca:
|
||||
with open(ca_cert_path, "rb") as f:
|
||||
with open(issuing_ca_cert_path, "rb") as f:
|
||||
chain_bytes += f.read()
|
||||
|
||||
import tempfile
|
||||
@@ -352,7 +353,7 @@ def make_pfx(cert_path, ca_dir, issuing_ca=None, password=None):
|
||||
with os.fdopen(chain_fd, "wb") as f:
|
||||
f.write(chain_bytes)
|
||||
cmd = [
|
||||
OPENSSL, "pkcs12",
|
||||
"/usr/bin/openssl" if apple_openssl else OPENSSL, "pkcs12",
|
||||
"-export", "-out", pfx_path,
|
||||
"-inkey", key_path,
|
||||
"-in", cert_path,
|
||||
@@ -399,27 +400,27 @@ def _build_parser():
|
||||
p_pfx.add_argument("--issuing-ca", default=None, help="Specify the issuing CA")
|
||||
p_pfx.add_argument("--ca-dir", help="Directory of the CA")
|
||||
p_pfx.add_argument("--password", help="Password for the PFX file")
|
||||
p_pfx.add_argument("--openssl", default=None, metavar="PATH",
|
||||
help=f"Path to the openssl binary (default: {OPENSSL})")
|
||||
p_pfx.add_argument("--apple-openssl", action="store_true", default=False,
|
||||
help="Use Apple's bundled /usr/bin/openssl for PKCS12 generation")
|
||||
p_pfx.add_argument("path", help="Path to the certificate file")
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
global OPENSSL
|
||||
global OPENSSL, _config
|
||||
|
||||
parser = _build_parser()
|
||||
args = parser.parse_args(argv)
|
||||
ca_dir = args.ca_dir or os.environ.get("SIMPLE_CA_DIR") or os.getcwd()
|
||||
|
||||
cfg = _load_config(ca_dir)
|
||||
_config = Config(ca_dir)
|
||||
|
||||
OPENSSL = args.openssl or cfg.get("openssl", OPENSSL)
|
||||
issuing_ca = args.issuing_ca or cfg.get("issuing_ca")
|
||||
aia_base_url = getattr(args, "aia_base_url", None) or cfg.get("aia_base_url")
|
||||
OPENSSL = getattr(args, "openssl", None) or _config.get("openssl", OPENSSL)
|
||||
issuing_ca = args.issuing_ca or _config.get("issuing_ca")
|
||||
aia_base_url = getattr(args, "aia_base_url", None) or _config.get("aia_base_url")
|
||||
|
||||
days_cfg = cfg.get("days", {})
|
||||
days_cfg = _config.get("days", {})
|
||||
|
||||
if args.command == "make-ca":
|
||||
days = args.days or days_cfg.get("ca", 3650)
|
||||
@@ -444,6 +445,7 @@ def main(argv=None):
|
||||
args.path, ca_dir,
|
||||
issuing_ca=issuing_ca,
|
||||
password=args.password,
|
||||
apple_openssl=args.apple_openssl,
|
||||
)
|
||||
else:
|
||||
parser.error(f"Unknown command: {args.command}")
|
||||
|
||||
Reference in New Issue
Block a user