#!/usr/bin/env python3 from __future__ import annotations import os from pathlib import Path import pandas as pd import streamlit as st APP_TITLE = "OpenLDAP Instance Setup" ENV_FILE = "openldap.env" ENV_PAGE = "OpenLDAP Config" ENV_VARS = { "LDAP_DOMAIN": "Domain", "LDAP_BASE_DN": "Base DN", "LDAP_ORG": "Organization", "LDAP_PASSWORD": "Password", "LDAP_ADMIN_PASSWORD": "Admin Password", "KERBEROS_ENABLE": "Enable Kerberos", "KRB5_REALM": "Realm", "KRB5_SASL_HOST": "SASL Host", "KRB5_KTNAME": "Keytab Path", } FILES = { "Users": { "filename": "users.csv", "keys": ["uid", "gn", "sn", "mail"], "labels": { "uid": "UID", "gn": "Given Name", "sn": "Surname", "mail": "Email", }, }, "Admins": { "filename": "admins.csv", "keys": ["uid", "gn", "sn", "mail"], "labels": { "uid": "UID", "gn": "Given Name", "sn": "Surname", "mail": "Email", }, }, "POSIX Users": { "filename": "posix-users.csv", "keys": ["uid", "gn", "sn", "mail", "uidNumber", "gidNumber"], "labels": { "uid": "UID", "gn": "Given Name", "sn": "Surname", "mail": "Email", "uidNumber": "POSIX UID", "gidNumber": "POSIX GID", }, }, } def read_csv(path: Path, keys: list[str]) -> pd.DataFrame: if not path.exists(): return pd.DataFrame(columns=keys) df = pd.read_csv( path, header=None, names=keys, comment="#", skip_blank_lines=True, dtype=str, keep_default_na=False, usecols=range(len(keys)), ) return df.apply(lambda col: col.str.strip()) def write_csv(path: Path, keys: list[str], df: pd.DataFrame) -> None: path.parent.mkdir(parents=True, exist_ok=True) cleaned = df.fillna("").astype(str) cleaned = cleaned.apply(lambda col: col.str.strip()) cleaned = cleaned[cleaned.ne("").any(axis=1)] with path.open("w", newline="", encoding="utf-8") as f: f.write(f"# {','.join(keys)}\n") cleaned.to_csv(f, index=False, header=False) def _parse_env_lines(path: Path) -> dict[str, str]: result: dict[str, str] = {} if not path.exists(): return result with path.open(encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue key, _, val = line.partition("=") result[key.strip()] = val.strip() return result def _domain_to_dn(domain: str) -> str: return ",".join(f"dc={part}" for part in domain.strip().split(".") if part) def _on_domain_change(domain_key: str, base_dn_key: str) -> None: domain = st.session_state.get(domain_key, "") if domain: st.session_state[base_dn_key] = _domain_to_dn(domain) def write_env(path: Path, values: dict[str, str]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as f: for key, val in values.items(): f.write(f"{key}={val}\n") def render_env_page(data_dir: Path) -> None: env_path = data_dir / ENV_FILE saved_key = f"saved-{ENV_PAGE}" gen_key = "env-gen" st.subheader(ENV_PAGE) st.caption(f"File: {env_path}") st.markdown("""""", unsafe_allow_html=True) gen = st.session_state.get(gen_key, 0) if saved_key not in st.session_state: existing = _parse_env_lines(env_path) st.session_state[saved_key] = { var: ("1" if existing.get(var, "0") == "1" else "0") if var == "KERBEROS_ENABLE" else existing.get(var, "") for var in ENV_VARS } def widget_key(var: str) -> str: return f"env-val-{var}-{gen}" for var, label in ENV_VARS.items(): wkey = widget_key(var) if wkey not in st.session_state: raw = st.session_state[saved_key].get(var, "") st.session_state[wkey] = raw == "1" if var == "KERBEROS_ENABLE" else raw col_label, col_input = st.columns([1, 3]) col_label.markdown(label) if var == "KERBEROS_ENABLE": col_input.toggle(label, key=wkey, label_visibility="collapsed") else: extra = ( {"on_change": _on_domain_change, "args": (wkey, widget_key("LDAP_BASE_DN"))} if var == "LDAP_DOMAIN" else {} ) col_input.text_input( label, key=wkey, type="password" if var.endswith("_PASSWORD") else "default", label_visibility="collapsed", **extra, ) def widget_to_str(var: str) -> str: val = st.session_state.get(widget_key(var)) return ("1" if val else "0") if var == "KERBEROS_ENABLE" else (val or "") current = {var: widget_to_str(var) for var in ENV_VARS} is_dirty = current != st.session_state[saved_key] with st.container(horizontal=True): if st.button("Save", type="primary", width=120, key=f"save-{ENV_PAGE}", disabled=not is_dirty): write_env(env_path, current) st.session_state[saved_key] = current.copy() st.session_state[gen_key] = gen + 1 st.toast(f"Saved {ENV_FILE}") st.rerun() if st.button("Reload", width=120, key=f"reload-{ENV_PAGE}"): st.session_state.pop(saved_key, None) st.session_state[gen_key] = gen + 1 st.rerun() def render_page(accounts_dir: Path, page_name: str) -> None: spec = FILES[page_name] csv_path = accounts_dir / spec["filename"] editor_key = f"editor-{page_name}" st.subheader(page_name) st.caption(f"File: {csv_path}") df = read_csv(csv_path, spec["keys"]) column_config = { key: st.column_config.TextColumn(label=spec["labels"].get(key, key), width="medium") for key in spec["keys"] } edited = st.data_editor( df, width="stretch", num_rows="dynamic", column_config=column_config, hide_index=True, key=editor_key, ) delta = st.session_state.get(editor_key, {}) is_dirty = bool(delta.get("edited_rows") or delta.get("added_rows") or delta.get("deleted_rows")) with st.container(horizontal=True): if st.button("Save", type="primary", width=120, key=f"save-{page_name}", disabled=not is_dirty): write_csv(csv_path, spec["keys"], edited) st.session_state.pop(editor_key, None) st.toast(f"Saved {spec['filename']}") st.rerun() if st.button("Reload", width=120, key=f"reload-{page_name}"): st.session_state.pop(editor_key, None) st.rerun() def main() -> None: st.set_page_config(page_title=APP_TITLE, layout="wide") st.title(APP_TITLE) default_dir = os.environ.get("OPENLDAP_DATA_DIR", "~/app-data/openldap") with st.sidebar: st.header("Settings") data_dir_raw = st.text_input("Data root directory", value=default_dir) page_name = st.radio("Page", [ENV_PAGE] + list(FILES.keys())) data_dir = Path(data_dir_raw).expanduser() st.caption("This editor manages the CSV files used by OpenLDAP bootstrap.") if page_name == ENV_PAGE: if not data_dir.exists(): st.warning(f"Directory does not exist yet: {data_dir}") render_env_page(data_dir) else: accounts_dir = data_dir / "accounts" if not accounts_dir.exists(): st.warning(f"Directory does not exist yet: {accounts_dir}") render_page(accounts_dir, page_name) if __name__ == "__main__": main()