from os import getenv from base64 import b64encode from urllib import parse from jmespath import search import requests MIAB_USERNAME = getenv('MIAB_USERNAME', None) or getenv('MAILINABOX_EMAIL', None) MIAB_PASSWORD = getenv('MIAB_PASSWORD', None) or getenv('MAILINABOX_PASSWORD', None) MIAB_HOST = getenv('MIAB_HOST', None) try: MIAB_HOST = parse.urlparse(getenv('MAILINABOX_BASE_URL', None)).hostname except Exception: pass # Prepare the Basic Auth header for MIAB API requests MIAB_AUTH_HEADER = "Basic " + b64encode(f"{MIAB_USERNAME}:{MIAB_PASSWORD}".encode()).decode() def get_miab_url(domain_name: str, record_type: str = "A") -> str: if record_type == "A": return f"https://{MIAB_HOST}/admin/dns/custom/{domain_name}" else: return f"https://{MIAB_HOST}/admin/dns/custom/{domain_name}/{record_type.upper()}" def set_custom_dns(domain_name: str, record_type: str = "A", record_value: str | None = None) -> bool: if not MIAB_HOST or not MIAB_USERNAME or not MIAB_PASSWORD: raise Exception("MIAB configuration is incomplete. Please set MIAB_HOST, MIAB_USERNAME, and MIAB_PASSWORD.") url = get_miab_url(domain_name, record_type) payload = record_value if record_value else "" response = requests.put(url, data=payload, headers={"Authorization": MIAB_AUTH_HEADER, "Content-Type": "text/plain"}) return response.status_code == 200 def delete_custom_dns(domain_name: str, record_type: str = "A", record_value: str | None = None) -> bool: if not MIAB_HOST or not MIAB_USERNAME or not MIAB_PASSWORD: raise Exception("MIAB configuration is incomplete. Please set MIAB_HOST, MIAB_USERNAME, and MIAB_PASSWORD.") url = get_miab_url(domain_name, record_type) payload = record_value if record_value else "" response = requests.delete(url, data=payload, headers={"Authorization": MIAB_AUTH_HEADER, "Content-Type": "text/plain"}) return response.status_code == 200 def add_custom_dns(domain_name: str, record_type: str = "A", record_value: str | None = None) -> bool: if not MIAB_HOST or not MIAB_USERNAME or not MIAB_PASSWORD: raise Exception("MIAB configuration is incomplete. Please set MIAB_HOST, MIAB_USERNAME, and MIAB_PASSWORD.") url = get_miab_url(domain_name, record_type) payload = record_value if record_value else "" response = requests.post(url, data=payload, headers={"Authorization": MIAB_AUTH_HEADER, "Content-Type": "text/plain"}) return response.status_code == 200 def list_custom_dns(record_type: str = None): response = requests.get( f"https://{MIAB_HOST}/admin/dns/custom", headers={"Authorization": MIAB_AUTH_HEADER} ) if response.status_code == 200: records = response.json() if record_type: jmespath_expr = f"[?rtype=='{record_type.upper()}']" else: jmespath_expr = "[]" return search(jmespath_expr + ".{name: qname, type: rtype, value: value}", records) else: raise Exception(f"Failed to retrieve DNS records: {response.status_code} {response.text}")