68 lines
3.0 KiB
Python
68 lines
3.0 KiB
Python
from os import getenv
|
|
from base64 import b64encode
|
|
from urllib import parse
|
|
from jmespath import search
|
|
import requests
|
|
|
|
MIAB_USERNAME = getenv('MIAB_USERNAME', None) or getenv('MAILINABOX_EMAIL', None)
|
|
MIAB_PASSWORD = getenv('MIAB_PASSWORD', None) or getenv('MAILINABOX_PASSWORD', None)
|
|
MIAB_HOST = getenv('MIAB_HOST', None)
|
|
|
|
try:
|
|
MIAB_HOST = parse.urlparse(getenv('MAILINABOX_BASE_URL', None)).hostname
|
|
except Exception:
|
|
pass
|
|
|
|
# Prepare the Basic Auth header for MIAB API requests
|
|
MIAB_AUTH_HEADER = "Basic " + b64encode(f"{MIAB_USERNAME}:{MIAB_PASSWORD}".encode()).decode()
|
|
|
|
def get_miab_url(domain_name: str, record_type: str = "A") -> str:
|
|
if record_type == "A":
|
|
return f"https://{MIAB_HOST}/admin/dns/custom/{domain_name}"
|
|
else:
|
|
return f"https://{MIAB_HOST}/admin/dns/custom/{domain_name}/{record_type.upper()}"
|
|
|
|
def set_custom_dns(domain_name: str, record_type: str = "A", record_value: str | None = None) -> bool:
|
|
if not MIAB_HOST or not MIAB_USERNAME or not MIAB_PASSWORD:
|
|
raise Exception("MIAB configuration is incomplete. Please set MIAB_HOST, MIAB_USERNAME, and MIAB_PASSWORD.")
|
|
|
|
url = get_miab_url(domain_name, record_type)
|
|
payload = record_value if record_value else ""
|
|
response = requests.put(url, data=payload, headers={"Authorization": MIAB_AUTH_HEADER, "Content-Type": "text/plain"})
|
|
return response.status_code == 200
|
|
|
|
def delete_custom_dns(domain_name: str, record_type: str = "A", record_value: str | None = None) -> bool:
|
|
if not MIAB_HOST or not MIAB_USERNAME or not MIAB_PASSWORD:
|
|
raise Exception("MIAB configuration is incomplete. Please set MIAB_HOST, MIAB_USERNAME, and MIAB_PASSWORD.")
|
|
|
|
url = get_miab_url(domain_name, record_type)
|
|
payload = record_value if record_value else ""
|
|
response = requests.delete(url, data=payload, headers={"Authorization": MIAB_AUTH_HEADER, "Content-Type": "text/plain"})
|
|
return response.status_code == 200
|
|
|
|
def add_custom_dns(domain_name: str, record_type: str = "A", record_value: str | None = None) -> bool:
|
|
if not MIAB_HOST or not MIAB_USERNAME or not MIAB_PASSWORD:
|
|
raise Exception("MIAB configuration is incomplete. Please set MIAB_HOST, MIAB_USERNAME, and MIAB_PASSWORD.")
|
|
|
|
url = get_miab_url(domain_name, record_type)
|
|
payload = record_value if record_value else ""
|
|
response = requests.post(url, data=payload, headers={"Authorization": MIAB_AUTH_HEADER, "Content-Type": "text/plain"})
|
|
return response.status_code == 200
|
|
|
|
def list_custom_dns(record_type: str = None):
|
|
response = requests.get(
|
|
f"https://{MIAB_HOST}/admin/dns/custom",
|
|
headers={"Authorization": MIAB_AUTH_HEADER}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
records = response.json()
|
|
if record_type:
|
|
jmespath_expr = f"[?rtype=='{record_type.upper()}']"
|
|
else:
|
|
jmespath_expr = "[]"
|
|
|
|
return search(jmespath_expr + ".{name: qname, type: rtype, value: value}", records)
|
|
else:
|
|
raise Exception(f"Failed to retrieve DNS records: {response.status_code} {response.text}")
|