Add dns-config module for Mail-in-a-Box DNS management and Azure integration
This commit is contained in:
67
dns-config/miab.py
Normal file
67
dns-config/miab.py
Normal file
@@ -0,0 +1,67 @@
|
||||
from os import getenv
|
||||
from base64 import b64encode
|
||||
from urllib import parse
|
||||
from jmespath import search
|
||||
import requests
|
||||
|
||||
MIAB_USERNAME = getenv('MIAB_USERNAME', None) or getenv('MAILINABOX_EMAIL', None)
|
||||
MIAB_PASSWORD = getenv('MIAB_PASSWORD', None) or getenv('MAILINABOX_PASSWORD', None)
|
||||
MIAB_HOST = getenv('MIAB_HOST', None)
|
||||
|
||||
try:
|
||||
MIAB_HOST = parse.urlparse(getenv('MAILINABOX_BASE_URL', None)).hostname
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Prepare the Basic Auth header for MIAB API requests
|
||||
MIAB_AUTH_HEADER = "Basic " + b64encode(f"{MIAB_USERNAME}:{MIAB_PASSWORD}".encode()).decode()
|
||||
|
||||
def get_miab_url(domain_name: str, record_type: str = "A") -> str:
|
||||
if record_type == "A":
|
||||
return f"https://{MIAB_HOST}/admin/dns/custom/{domain_name}"
|
||||
else:
|
||||
return f"https://{MIAB_HOST}/admin/dns/custom/{domain_name}/{record_type.upper()}"
|
||||
|
||||
def set_custom_dns(domain_name: str, record_type: str = "A", record_value: str | None = None) -> bool:
|
||||
if not MIAB_HOST or not MIAB_USERNAME or not MIAB_PASSWORD:
|
||||
raise Exception("MIAB configuration is incomplete. Please set MIAB_HOST, MIAB_USERNAME, and MIAB_PASSWORD.")
|
||||
|
||||
url = get_miab_url(domain_name, record_type)
|
||||
payload = record_value if record_value else ""
|
||||
response = requests.put(url, data=payload, headers={"Authorization": MIAB_AUTH_HEADER, "Content-Type": "text/plain"})
|
||||
return response.status_code == 200
|
||||
|
||||
def delete_custom_dns(domain_name: str, record_type: str = "A", record_value: str | None = None) -> bool:
|
||||
if not MIAB_HOST or not MIAB_USERNAME or not MIAB_PASSWORD:
|
||||
raise Exception("MIAB configuration is incomplete. Please set MIAB_HOST, MIAB_USERNAME, and MIAB_PASSWORD.")
|
||||
|
||||
url = get_miab_url(domain_name, record_type)
|
||||
payload = record_value if record_value else ""
|
||||
response = requests.delete(url, data=payload, headers={"Authorization": MIAB_AUTH_HEADER, "Content-Type": "text/plain"})
|
||||
return response.status_code == 200
|
||||
|
||||
def add_custom_dns(domain_name: str, record_type: str = "A", record_value: str | None = None) -> bool:
|
||||
if not MIAB_HOST or not MIAB_USERNAME or not MIAB_PASSWORD:
|
||||
raise Exception("MIAB configuration is incomplete. Please set MIAB_HOST, MIAB_USERNAME, and MIAB_PASSWORD.")
|
||||
|
||||
url = get_miab_url(domain_name, record_type)
|
||||
payload = record_value if record_value else ""
|
||||
response = requests.post(url, data=payload, headers={"Authorization": MIAB_AUTH_HEADER, "Content-Type": "text/plain"})
|
||||
return response.status_code == 200
|
||||
|
||||
def list_custom_dns(record_type: str = None):
|
||||
response = requests.get(
|
||||
f"https://{MIAB_HOST}/admin/dns/custom",
|
||||
headers={"Authorization": MIAB_AUTH_HEADER}
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
records = response.json()
|
||||
if record_type:
|
||||
jmespath_expr = f"[?rtype=='{record_type.upper()}']"
|
||||
else:
|
||||
jmespath_expr = "[]"
|
||||
|
||||
return search(jmespath_expr + ".{name: qname, type: rtype, value: value}", records)
|
||||
else:
|
||||
raise Exception(f"Failed to retrieve DNS records: {response.status_code} {response.text}")
|
||||
Reference in New Issue
Block a user