Imported sources.
This commit is contained in:
132
scripts/accounts_editor.py
Normal file
132
scripts/accounts_editor.py
Normal file
@@ -0,0 +1,132 @@
|
||||
#!/usr/bin/env python3
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
import streamlit as st
|
||||
|
||||
|
||||
APP_TITLE = "OpenLDAP Accounts CSV Editor"
|
||||
|
||||
|
||||
FILES = {
|
||||
"Users": {
|
||||
"filename": "users.csv",
|
||||
"keys": ["uid", "gn", "sn", "mail"],
|
||||
"labels": {
|
||||
"uid": "UID",
|
||||
"gn": "Given Name",
|
||||
"sn": "Surname",
|
||||
"mail": "Email",
|
||||
},
|
||||
},
|
||||
"Admins": {
|
||||
"filename": "admins.csv",
|
||||
"keys": ["uid", "gn", "sn", "mail"],
|
||||
"labels": {
|
||||
"uid": "UID",
|
||||
"gn": "Given Name",
|
||||
"sn": "Surname",
|
||||
"mail": "Email",
|
||||
},
|
||||
},
|
||||
"POSIX Users": {
|
||||
"filename": "posix-users.csv",
|
||||
"keys": ["uid", "gn", "sn", "mail", "uidNumber", "gidNumber"],
|
||||
"labels": {
|
||||
"uid": "UID",
|
||||
"gn": "Given Name",
|
||||
"sn": "Surname",
|
||||
"mail": "Email",
|
||||
"uidNumber": "POSIX UID",
|
||||
"gidNumber": "POSIX GID",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def read_csv(path: Path, keys: list[str]) -> pd.DataFrame:
|
||||
if not path.exists():
|
||||
return pd.DataFrame(columns=keys)
|
||||
|
||||
df = pd.read_csv(
|
||||
path,
|
||||
header=None,
|
||||
names=keys,
|
||||
comment="#",
|
||||
skip_blank_lines=True,
|
||||
dtype=str,
|
||||
keep_default_na=False,
|
||||
usecols=range(len(keys)),
|
||||
)
|
||||
return df.apply(lambda col: col.str.strip())
|
||||
|
||||
|
||||
def write_csv(path: Path, keys: list[str], df: pd.DataFrame) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
cleaned = df.fillna("").astype(str)
|
||||
cleaned = cleaned.apply(lambda col: col.str.strip())
|
||||
cleaned = cleaned[cleaned.ne("").any(axis=1)]
|
||||
|
||||
with path.open("w", newline="", encoding="utf-8") as f:
|
||||
f.write(f"# {','.join(keys)}\n")
|
||||
cleaned.to_csv(f, index=False, header=False)
|
||||
|
||||
|
||||
def render_page(accounts_dir: Path, page_name: str) -> None:
|
||||
spec = FILES[page_name]
|
||||
csv_path = accounts_dir / spec["filename"]
|
||||
|
||||
st.subheader(page_name)
|
||||
st.caption(f"File: {csv_path}")
|
||||
|
||||
df = read_csv(csv_path, spec["keys"])
|
||||
|
||||
column_config = {
|
||||
key: st.column_config.TextColumn(label=spec["labels"].get(key, key), width="medium")
|
||||
for key in spec["keys"]
|
||||
}
|
||||
|
||||
edited = st.data_editor(
|
||||
df,
|
||||
width="stretch",
|
||||
num_rows="dynamic",
|
||||
column_config=column_config,
|
||||
hide_index=True,
|
||||
key=f"editor-{page_name}",
|
||||
)
|
||||
|
||||
with st.container(horizontal=True):
|
||||
if st.button("Save", type="primary", width=120, key=f"save-{page_name}"):
|
||||
write_csv(csv_path, spec["keys"], edited)
|
||||
st.success(f"Saved {spec['filename']}")
|
||||
|
||||
if st.button("Reload", width=120, key=f"reload-{page_name}"):
|
||||
st.rerun()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
st.set_page_config(page_title=APP_TITLE, layout="wide")
|
||||
st.title(APP_TITLE)
|
||||
|
||||
default_dir = os.environ.get("OPENLDAP_ACCOUNTS_DIR", "~/app-data/openldap/accounts")
|
||||
|
||||
with st.sidebar:
|
||||
st.header("Settings")
|
||||
accounts_dir_raw = st.text_input("Accounts directory", value=default_dir)
|
||||
page_name = st.radio("Page", list(FILES.keys()))
|
||||
|
||||
accounts_dir = Path(accounts_dir_raw).expanduser()
|
||||
st.caption("This editor manages the CSV files used by OpenLDAP bootstrap.")
|
||||
|
||||
if not accounts_dir.exists():
|
||||
st.warning(f"Directory does not exist yet: {accounts_dir}")
|
||||
|
||||
render_page(accounts_dir, page_name)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
26
scripts/login.py
Normal file
26
scripts/login.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from getpass import getpass
|
||||
from ldap3 import Server, Connection, ALL
|
||||
from ldap3 import ObjectDef, AttrDef, Reader, Writer, Entry, Attribute, OperationalAttribute
|
||||
import keyring
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="LDAP Test Script")
|
||||
|
||||
# Generic LDAP connection parameters
|
||||
parser.add_argument("--host", "-H", default="gra-01.koszewscy.waw.pl", help="LDAP server host")
|
||||
parser.add_argument("--port", "-p", type=int, default=389, help="LDAP server port")
|
||||
parser.add_argument("--base-dn", "-b", default="dc=koszewscy,dc=waw,dc=pl", help="Base DN for LDAP operations")
|
||||
parser.add_argument("--user", "-u", default="cn=admin,dc=koszewscy,dc=waw,dc=pl", help="Bind DN for LDAP connection")
|
||||
|
||||
# Specific parameters
|
||||
parser.add_argument("--object-class", "-c", nargs='*', default=["inetOrgPerson"], help="Object class(es) to work with")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
server = Server(args.host, port=args.port, get_info=ALL)
|
||||
password = keyring.get_password("Home Lab Password", "admin") or getpass("Password: ")
|
||||
|
||||
conn = Connection(server, user=args.user, password=password, auto_bind=True)
|
||||
person = ObjectDef(args.object_class, conn)
|
||||
print(person)
|
||||
conn.unbind()
|
||||
31
scripts/search.py
Normal file
31
scripts/search.py
Normal file
@@ -0,0 +1,31 @@
|
||||
from ldap3 import Server, Connection, ALL
|
||||
import streamlit as st
|
||||
|
||||
BASE_DN = "dc=koszewscy,dc=waw,dc=pl"
|
||||
LDAP_HOST = "gra-01.koszewscy.waw.pl"
|
||||
LDAP_PORT = 389
|
||||
|
||||
st.set_page_config(page_title="LDAP Search", layout="wide")
|
||||
|
||||
host = st.text_input("Host", LDAP_HOST)
|
||||
port = st.number_input("Port", value=LDAP_PORT, step=1)
|
||||
bind_dn = st.text_input("Bind DN", f"cn=admin,{BASE_DN}")
|
||||
password = st.text_input("Password", type="password")
|
||||
base_dn = st.text_input("Base DN", BASE_DN)
|
||||
search_filter = st.text_input("Filter", "(objectClass=*)")
|
||||
|
||||
if st.button("Search"):
|
||||
server = Server(host, port=port, get_info=ALL)
|
||||
conn = Connection(server, user=bind_dn, password=password, auto_bind=True)
|
||||
ok = conn.search(
|
||||
search_base=base_dn,
|
||||
search_filter=search_filter,
|
||||
attributes=["*"]
|
||||
)
|
||||
if ok:
|
||||
st.write(f"Entries: {len(conn.entries)}")
|
||||
for e in conn.entries:
|
||||
st.code(e.entry_to_ldif())
|
||||
else:
|
||||
st.error(conn.result)
|
||||
conn.unbind()
|
||||
Reference in New Issue
Block a user