#!/usr/bin/env python3 import csv import os import socket import subprocess import time from pathlib import Path from jinja2 import Environment, FileSystemLoader LDAPI_SOCKET = "/var/run/slapd/ldapi" LDIF_DIR = Path("/bootstrap/ldif") CSV_DIR = Path("/bootstrap/accounts") INITIALIZED_FLAG = Path("/var/lib/ldap/.initialized") SLAPD_D = Path("/etc/ldap/slapd.d") base_dn = os.environ.get("LDAP_BASE_DN") or "dc=example,dc=org" password = os.environ.get("LDAP_PASSWORD") or "changeit" tls_enabled = os.environ.get("TLS_ENABLED") == "1" admin_dn = f"cn=admin,{base_dn}" def hash_password(password: str) -> str: return subprocess.check_output( ["slappasswd", "-h", "{SSHA}", "-s", password], text=True, ).strip() def run_command(cmd: list, input: str) -> None: proc = subprocess.run(cmd, input=input, text=True, capture_output=True, check=False) if proc.stdout: print(proc.stdout, end="") if proc.stderr: print(proc.stderr, end="") if proc.returncode != 0: raise subprocess.CalledProcessError(proc.returncode, proc.args, output=proc.stdout, stderr=proc.stderr) def apply_ldif(path: Path, env: Environment, command: str = "ldapadd", **ctx) -> None: print(f"Applying LDIF: {path.name}") rendered = env.get_template(path.name).render(**ctx) run_command([command, "-Q", "-Y", "EXTERNAL", "-H", "ldapi:///"], rendered) print(f"Finished LDIF: {path.name}") def wait_for_slapd(timeout: int = 30) -> None: for _ in range(timeout): try: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(LDAPI_SOCKET) sock.close() return except OSError: time.sleep(1) raise RuntimeError("slapd did not become ready in time") def stop_slapd(proc: subprocess.Popen, timeout: int = 30) -> None: proc.terminate() try: proc.wait(timeout=timeout) except subprocess.TimeoutExpired: proc.kill() proc.wait() def group_exists(name: str, group_ou: str) -> bool: # fmt: off result = subprocess.run( [ "ldapsearch", "-Q", "-Y", "EXTERNAL", "-H", "ldapi:///", "-b", f"ou={group_ou},{base_dn}", "-s", "one", f"(&(objectClass=groupOfNames)(cn={name}))", "cn", ], capture_output=True, text=True, ) # fmt: on return "numEntries: 1" in result.stdout def read_users(path: Path, fields: list[str]) -> list[dict]: users = [] with open(path, newline="") as f: for row in csv.reader(f): if not row or row[0].strip().startswith("#"): continue users.append(dict(zip(fields, (c.strip() for c in row)))) return users def main(): print("First run — starting initialisation...") env = Environment(loader=FileSystemLoader(str(LDIF_DIR)), keep_trailing_newline=True) # fmt: off proc = subprocess.Popen([ "slapd", "-d", "0", "-F", str(SLAPD_D), "-u", "openldap", "-g", "openldap", "-h", "ldapi:///", ]) # fmt: on wait_for_slapd() print("slapd is ready.") apply_ldif(LDIF_DIR / "config-memberof-module.ldif", env) apply_ldif(LDIF_DIR / "config-memberof-overlay.ldif", env) apply_ldif(LDIF_DIR / "config-indexes.ldif", env) apply_ldif(LDIF_DIR / "config-performance.ldif", env) apply_ldif(LDIF_DIR / "config-acl.ldif", env, base_dn=base_dn, admin_dn=admin_dn) if tls_enabled: apply_ldif(LDIF_DIR / "config-tls.ldif", env) print("cn=config updated.") password_hash = hash_password(password) ctx = dict( base_dn=base_dn, password_hash=password_hash, ) apply_ldif(LDIF_DIR / "base.ldif", env, **ctx) apply_ldif(LDIF_DIR / "service-accounts.ldif", env, **ctx) # Process optional account CSV files. user_files = { "users.csv": { "name": "users", "description": "Regular users", "fields": ["uid", "gn", "sn", "mail"], "group_ou": "groups", "member_ou": "users", }, "admins.csv": { "name": "admins", "description": "Administrators", "fields": ["uid", "gn", "sn", "mail"], "group_ou": "privileged-groups", "member_ou": "users", }, "posix-users.csv": { "name": "users", "description": "POSIX users", "fields": ["uid", "gn", "sn", "mail", "uidNumber", "gidNumber"], "group_ou": "groups", "member_ou": "users", }, } for filename, group in user_files.items(): path = CSV_DIR / filename if not path.exists(): # Backward-compatible fallback for older bind-mount layout. path = Path("/bootstrap") / filename if path.exists(): print(f' Reading {group["name"]} from {filename}...') users = read_users(path, group["fields"]) if users: ldif = Path(filename).stem + ".ldif" for user in users: print(f' Adding user: {user["uid"]}') apply_ldif( LDIF_DIR / ldif, env, base_dn=base_dn, password_hash=password_hash, **user, ) uids = [u["uid"] for u in users] if group_exists(group["name"], group["group_ou"]): apply_ldif( LDIF_DIR / "add-member.ldif", env, base_dn=base_dn, name=group["name"], group_ou=group["group_ou"], member_ou=group["member_ou"], uids=uids, ) else: apply_ldif( LDIF_DIR / "group.ldif", env, base_dn=base_dn, name=group["name"], description=group["description"], group_ou=group["group_ou"], member_ou=group["member_ou"], uids=uids, ) INITIALIZED_FLAG.touch() stop_slapd(proc) print("Bootstrap complete.") if __name__ == "__main__": main()