Files
openldap/scripts/accounts_editor.py
T

253 lines
7.5 KiB
Python
Executable File

#!/usr/bin/env python3
from __future__ import annotations
import os
from pathlib import Path
import pandas as pd
import streamlit as st
APP_TITLE = "OpenLDAP Instance Setup"
ENV_FILE = "openldap.env"
ENV_PAGE = "OpenLDAP Config"
ENV_VARS = {
"LDAP_DOMAIN": "Domain",
"LDAP_BASE_DN": "Base DN",
"LDAP_ORG": "Organization",
"LDAP_PASSWORD": "Password",
"LDAP_ADMIN_PASSWORD": "Admin Password",
"KERBEROS_ENABLE": "Enable Kerberos",
"KRB5_REALM": "Realm",
"KRB5_KDC_HOST": "KDC Host",
"LDAP_HOSTNAME": "LDAP Hostname",
"KRB5_KTNAME": "Keytab Path",
}
FILES = {
"Users": {
"filename": "users.csv",
"keys": ["uid", "gn", "sn", "mail"],
"labels": {
"uid": "UID",
"gn": "Given Name",
"sn": "Surname",
"mail": "Email",
},
},
"Admins": {
"filename": "admins.csv",
"keys": ["uid", "gn", "sn", "mail"],
"labels": {
"uid": "UID",
"gn": "Given Name",
"sn": "Surname",
"mail": "Email",
},
},
"POSIX Users": {
"filename": "posix-users.csv",
"keys": ["uid", "gn", "sn", "mail", "uidNumber", "gidNumber"],
"labels": {
"uid": "UID",
"gn": "Given Name",
"sn": "Surname",
"mail": "Email",
"uidNumber": "POSIX UID",
"gidNumber": "POSIX GID",
},
},
}
def read_csv(path: Path, keys: list[str]) -> pd.DataFrame:
if not path.exists():
return pd.DataFrame(columns=keys)
df = pd.read_csv(
path,
header=None,
names=keys,
comment="#",
skip_blank_lines=True,
dtype=str,
keep_default_na=False,
usecols=range(len(keys)),
)
return df.apply(lambda col: col.str.strip())
def write_csv(path: Path, keys: list[str], df: pd.DataFrame) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
cleaned = df.fillna("").astype(str)
cleaned = cleaned.apply(lambda col: col.str.strip())
cleaned = cleaned[cleaned.ne("").any(axis=1)]
with path.open("w", newline="", encoding="utf-8") as f:
f.write(f"# {','.join(keys)}\n")
cleaned.to_csv(f, index=False, header=False)
def _parse_env_lines(path: Path) -> dict[str, str]:
result: dict[str, str] = {}
if not path.exists():
return result
with path.open(encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
key, _, val = line.partition("=")
result[key.strip()] = val.strip()
return result
def _domain_to_dn(domain: str) -> str:
return ",".join(f"dc={part}" for part in domain.strip().split(".") if part)
def _on_domain_change(domain_key: str, base_dn_key: str) -> None:
domain = st.session_state.get(domain_key, "")
if domain:
st.session_state[base_dn_key] = _domain_to_dn(domain)
def write_env(path: Path, values: dict[str, str]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
for key, val in values.items():
f.write(f"{key}={val}\n")
def render_env_page(data_dir: Path) -> None:
env_path = data_dir / ENV_FILE
saved_key = f"saved-{ENV_PAGE}"
gen_key = "env-gen"
st.subheader(ENV_PAGE)
st.caption(f"File: {env_path}")
gen = st.session_state.get(gen_key, 0)
if saved_key not in st.session_state:
existing = _parse_env_lines(env_path)
st.session_state[saved_key] = {
var: ("1" if existing.get(var, "0") == "1" else "0") if var == "KERBEROS_ENABLE"
else existing.get(var, "")
for var in ENV_VARS
}
def widget_key(var: str) -> str:
return f"env-val-{var}-{gen}"
for var, label in ENV_VARS.items():
wkey = widget_key(var)
if wkey not in st.session_state:
raw = st.session_state[saved_key].get(var, "")
st.session_state[wkey] = raw == "1" if var == "KERBEROS_ENABLE" else raw
if var == "KERBEROS_ENABLE":
st.toggle(label, key=wkey, width=480)
else:
extra = (
{"on_change": _on_domain_change, "args": (wkey, widget_key("LDAP_BASE_DN"))}
if var == "LDAP_DOMAIN" else {}
)
st.text_input(
label,
key=wkey,
type="password" if var.endswith("_PASSWORD") else "default",
width=480,
**extra,
)
def widget_to_str(var: str) -> str:
val = st.session_state.get(widget_key(var))
return ("1" if val else "0") if var == "KERBEROS_ENABLE" else (val or "")
current = {var: widget_to_str(var) for var in ENV_VARS}
is_dirty = current != st.session_state[saved_key]
with st.container(horizontal=True):
if st.button("Save", type="primary", width=120, key=f"save-{ENV_PAGE}", disabled=not is_dirty):
write_env(env_path, current)
st.session_state[saved_key] = current.copy()
st.session_state[gen_key] = gen + 1
st.toast(f"Saved {ENV_FILE}")
st.rerun()
if st.button("Reload", width=120, key=f"reload-{ENV_PAGE}"):
st.session_state.pop(saved_key, None)
st.session_state[gen_key] = gen + 1
st.rerun()
def render_page(accounts_dir: Path, page_name: str) -> None:
spec = FILES[page_name]
csv_path = accounts_dir / spec["filename"]
editor_key = f"editor-{page_name}"
st.subheader(page_name)
st.caption(f"File: {csv_path}")
df = read_csv(csv_path, spec["keys"])
column_config = {
key: st.column_config.TextColumn(label=spec["labels"].get(key, key), width="medium")
for key in spec["keys"]
}
edited = st.data_editor(
df,
num_rows="dynamic",
column_config=column_config,
hide_index=True,
key=editor_key,
)
delta = st.session_state.get(editor_key, {})
is_dirty = bool(delta.get("edited_rows") or delta.get("added_rows") or delta.get("deleted_rows"))
with st.container(horizontal=True):
if st.button("Save", type="primary", width=120, key=f"save-{page_name}", disabled=not is_dirty):
write_csv(csv_path, spec["keys"], edited)
st.session_state.pop(editor_key, None)
st.toast(f"Saved {spec['filename']}")
st.rerun()
if st.button("Reload", width=120, key=f"reload-{page_name}"):
st.session_state.pop(editor_key, None)
st.rerun()
def main() -> None:
st.set_page_config(page_title=APP_TITLE, layout="wide")
st.title(APP_TITLE)
default_dir = os.environ.get("OPENLDAP_DATA_DIR", "~/app-data/openldap")
with st.sidebar:
st.header("Settings")
data_dir_raw = st.text_input("Data root directory", value=default_dir)
page_name = st.radio("Page", [ENV_PAGE] + list(FILES.keys()))
data_dir = Path(data_dir_raw).expanduser()
st.caption("This editor manages configuration files used by OpenLDAP bootstrap.")
if page_name == ENV_PAGE:
if not data_dir.exists():
st.warning(f"Directory does not exist yet: {data_dir}")
render_env_page(data_dir)
else:
accounts_dir = data_dir / "accounts"
if not accounts_dir.exists():
st.warning(f"Directory does not exist yet: {accounts_dir}")
render_page(accounts_dir, page_name)
if __name__ == "__main__":
main()