Added Python version of the program.
This commit is contained in:
138
netbox-dns-updater.py
Executable file
138
netbox-dns-updater.py
Executable file
@ -0,0 +1,138 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
"""
|
||||||
|
netbox-dns-updater.py
|
||||||
|
A script to update DNSMasq configuration with IP addresses from NetBox.
|
||||||
|
It fetches IP addresses from the NetBox API and writes them to a dnsmasq configuration file.
|
||||||
|
It also provides a simple HTTP server to trigger updates via a web request.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
from urllib.parse import urljoin
|
||||||
|
from urllib.request import Request, urlopen
|
||||||
|
import logging
|
||||||
|
import subprocess
|
||||||
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||||
|
import threading
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
|
||||||
|
def get_netbox_token():
|
||||||
|
token = os.environ.get("NETBOX_TOKEN")
|
||||||
|
if token:
|
||||||
|
return token.strip()
|
||||||
|
home = os.environ.get("HOME", "")
|
||||||
|
paths = [
|
||||||
|
os.path.join(home, ".netbox", "token"),
|
||||||
|
"/etc/netbox/token",
|
||||||
|
]
|
||||||
|
for path in paths:
|
||||||
|
try:
|
||||||
|
with open(path) as f:
|
||||||
|
return f.read().strip()
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
logging.error("NETBOX_TOKEN not set and no token file found")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
def fetch_netbox_ipaddresses(api_base_url, token):
|
||||||
|
url = urljoin(api_base_url.rstrip('/') + '/', "ipam/ip-addresses/")
|
||||||
|
headers = {
|
||||||
|
"Accept": "application/json",
|
||||||
|
"Authorization": f"Token {token}",
|
||||||
|
}
|
||||||
|
req = Request(url, headers=headers)
|
||||||
|
try:
|
||||||
|
with urlopen(req) as resp:
|
||||||
|
if resp.status != 200:
|
||||||
|
body = resp.read().decode()
|
||||||
|
logging.error(f"Unexpected status: {resp.status}, body: {body}")
|
||||||
|
sys.exit(1)
|
||||||
|
data = json.load(resp)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"HTTP error: {e}")
|
||||||
|
sys.exit(1)
|
||||||
|
results = []
|
||||||
|
for entry in data.get("results", []):
|
||||||
|
dns_name = entry.get("dns_name", "")
|
||||||
|
address = entry.get("address", "")
|
||||||
|
if dns_name and address:
|
||||||
|
address = address.split("/")[0]
|
||||||
|
results.append((dns_name, address))
|
||||||
|
return results
|
||||||
|
|
||||||
|
def create_dnsmasq_config(api_base_url, write_config=True):
|
||||||
|
token = get_netbox_token()
|
||||||
|
ips = fetch_netbox_ipaddresses(api_base_url, token)
|
||||||
|
dir_path = "/etc/dnsmasq.d"
|
||||||
|
lines = [f"address=/{dns}/{ip}\n" for dns, ip in ips]
|
||||||
|
if os.path.isdir(dir_path) and write_config:
|
||||||
|
try:
|
||||||
|
with open(os.path.join(dir_path, "netbox.conf"), "w") as f:
|
||||||
|
f.writelines(lines)
|
||||||
|
logging.info("Wrote netbox.conf to /etc/dnsmasq.d/")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Failed to write netbox.conf: {e}")
|
||||||
|
sys.exit(1)
|
||||||
|
else:
|
||||||
|
sys.stdout.writelines(lines)
|
||||||
|
|
||||||
|
def restart_dnsmasq():
|
||||||
|
try:
|
||||||
|
subprocess.run(["systemctl", "restart", "dnsmasq.service"], check=True)
|
||||||
|
logging.info("dnsmasq.service restarted.")
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning(f"Failed to restart dnsmasq.service: {e}")
|
||||||
|
|
||||||
|
class DnsmasqHandler(BaseHTTPRequestHandler):
|
||||||
|
api_base_url = None
|
||||||
|
|
||||||
|
def do_GET(self):
|
||||||
|
if self.path == "/update-dnsmasq":
|
||||||
|
create_dnsmasq_config(self.api_base_url, write_config=True)
|
||||||
|
restart_dnsmasq()
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header('Content-type', 'text/plain')
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(b"DNSMasq config updated.")
|
||||||
|
else:
|
||||||
|
self.send_response(404)
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(b"Not found.")
|
||||||
|
|
||||||
|
def run_server(listen_addr, api_base_url):
|
||||||
|
if ':' in listen_addr:
|
||||||
|
host, port = listen_addr.split(":", 1)
|
||||||
|
port = int(port)
|
||||||
|
else:
|
||||||
|
host, port = "0.0.0.0", int(listen_addr)
|
||||||
|
DnsmasqHandler.api_base_url = api_base_url
|
||||||
|
server = HTTPServer((host, port), DnsmasqHandler)
|
||||||
|
logging.info(f"Starting web service on {host}:{port} ...")
|
||||||
|
try:
|
||||||
|
server.serve_forever()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
server.server_close()
|
||||||
|
logging.info("Server stopped.")
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(description="NetBox DNSMasq Updater")
|
||||||
|
parser.add_argument("--listen", default="0.0.0.0:8080", help="address and port to listen on (default: 0.0.0.0:8080)")
|
||||||
|
parser.add_argument("--api-url", default="https://netbox.koszewscy.waw.pl/api/", help="NetBox API base URL")
|
||||||
|
parser.add_argument("--dry-run", action="store_true", help="print output instead of writing to dnsmasq config")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if args.dry_run:
|
||||||
|
logging.info("Dry run mode enabled, not writing to dnsmasq config file.")
|
||||||
|
create_dnsmasq_config(args.api_url, write_config=False)
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
run_server(args.listen, args.api_url)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
Reference in New Issue
Block a user