Compare commits

...

3 Commits

3 changed files with 157 additions and 8 deletions

View File

@ -7,8 +7,10 @@ import (
"io"
"log"
"net/http"
"net/url"
"os"
"os/exec"
"path"
"strings"
)
@ -21,8 +23,10 @@ type NetBoxResponse struct {
Results []IPAddress `json:"results"`
}
func FetchNetboxIPAddresses(apiURL, token string) ([]IPAddress, error) {
req, err := http.NewRequest("GET", apiURL, nil)
func FetchNetboxIPAddresses(apiBaseURL, token string) ([]IPAddress, error) {
u, _ := url.Parse(apiBaseURL)
u.Path = path.Join(u.Path, "ipam/ip-addresses")
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, err
}
@ -57,7 +61,7 @@ func FetchNetboxIPAddresses(apiURL, token string) ([]IPAddress, error) {
return filtered, nil
}
func CreateDnsMasqConfig() {
func CreateDnsMasqConfig(writeConfig bool) {
token := os.Getenv("NETBOX_TOKEN")
if token == "" {
home := os.Getenv("HOME")
@ -77,13 +81,13 @@ func CreateDnsMasqConfig() {
}
}
ips, err := FetchNetboxIPAddresses(apiURL, token)
ips, err := FetchNetboxIPAddresses(apiBaseURL, token)
if err != nil {
log.Fatalf("Error fetching IP addresses: %v", err)
}
dir := "/etc/dnsmasq.d"
if stat, err := os.Stat(dir); err == nil && stat.IsDir() {
if stat, err := os.Stat(dir); err == nil && stat.IsDir() && !writeConfig {
file, err := os.Create(dir + "/netbox.conf")
if err != nil {
log.Fatalf("Failed to create netbox.conf: %v", err)
@ -99,20 +103,27 @@ func CreateDnsMasqConfig() {
}
}
var apiURL string
var apiBaseURL string
func main() {
listenAddr := flag.String("listen", ":8080", "address and port to listen on (e.g. :8080 or 127.0.0.1:8080)")
flag.StringVar(&apiURL, "api-url", "https://netbox.koszewscy.waw.pl/api/ipam/ip-addresses/", "NetBox API URL to fetch IP addresses")
flag.StringVar(&apiBaseURL, "api-url", "https://netbox.koszewscy.waw.pl/api", "NetBox API URL to fetch IP addresses")
dryRun := flag.Bool("dry-run", false, "if set, do not write to dnsmasq config file, just print the output")
flag.Parse()
if *dryRun {
log.Println("Dry run mode enabled, not writing to dnsmasq config file.")
CreateDnsMasqConfig(true)
return
}
http.HandleFunc("/update-dnsmasq", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
w.Write([]byte("Method not allowed"))
return
}
CreateDnsMasqConfig()
CreateDnsMasqConfig(false)
cmd := exec.Command("systemctl", "restart", "dnsmasq.service")
if err := cmd.Run(); err != nil {
log.Printf("Failed to restart dnsmasq.service: %v", err)

138
netbox-dns-updater.py Executable file
View File

@ -0,0 +1,138 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
netbox-dns-updater.py
A script to update DNSMasq configuration with IP addresses from NetBox.
It fetches IP addresses from the NetBox API and writes them to a dnsmasq configuration file.
It also provides a simple HTTP server to trigger updates via a web request.
"""
import os
import sys
import argparse
import json
from urllib.parse import urljoin
from urllib.request import Request, urlopen
import logging
import subprocess
from http.server import BaseHTTPRequestHandler, HTTPServer
import threading
logging.basicConfig(level=logging.INFO)
def get_netbox_token():
token = os.environ.get("NETBOX_TOKEN")
if token:
return token.strip()
home = os.environ.get("HOME", "")
paths = [
os.path.join(home, ".netbox", "token"),
"/etc/netbox/token",
]
for path in paths:
try:
with open(path) as f:
return f.read().strip()
except Exception:
continue
logging.error("NETBOX_TOKEN not set and no token file found")
sys.exit(1)
def fetch_netbox_ipaddresses(api_base_url, token):
url = urljoin(api_base_url.rstrip('/') + '/', "ipam/ip-addresses/")
headers = {
"Accept": "application/json",
"Authorization": f"Token {token}",
}
req = Request(url, headers=headers)
try:
with urlopen(req) as resp:
if resp.status != 200:
body = resp.read().decode()
logging.error(f"Unexpected status: {resp.status}, body: {body}")
sys.exit(1)
data = json.load(resp)
except Exception as e:
logging.error(f"HTTP error: {e}")
sys.exit(1)
results = []
for entry in data.get("results", []):
dns_name = entry.get("dns_name", "")
address = entry.get("address", "")
if dns_name and address:
address = address.split("/")[0]
results.append((dns_name, address))
return results
def create_dnsmasq_config(api_base_url, write_config=True):
token = get_netbox_token()
ips = fetch_netbox_ipaddresses(api_base_url, token)
dir_path = "/etc/dnsmasq.d"
lines = [f"address=/{dns}/{ip}\n" for dns, ip in ips]
if os.path.isdir(dir_path) and write_config:
try:
with open(os.path.join(dir_path, "netbox.conf"), "w") as f:
f.writelines(lines)
logging.info("Wrote netbox.conf to /etc/dnsmasq.d/")
except Exception as e:
logging.error(f"Failed to write netbox.conf: {e}")
sys.exit(1)
else:
sys.stdout.writelines(lines)
def restart_dnsmasq():
try:
subprocess.run(["systemctl", "restart", "dnsmasq.service"], check=True)
logging.info("dnsmasq.service restarted.")
except Exception as e:
logging.warning(f"Failed to restart dnsmasq.service: {e}")
class DnsmasqHandler(BaseHTTPRequestHandler):
api_base_url = None
def do_GET(self):
if self.path == "/update-dnsmasq":
create_dnsmasq_config(self.api_base_url, write_config=True)
restart_dnsmasq()
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b"DNSMasq config updated.")
else:
self.send_response(404)
self.end_headers()
self.wfile.write(b"Not found.")
def run_server(listen_addr, api_base_url):
if ':' in listen_addr:
host, port = listen_addr.split(":", 1)
port = int(port)
else:
host, port = "0.0.0.0", int(listen_addr)
DnsmasqHandler.api_base_url = api_base_url
server = HTTPServer((host, port), DnsmasqHandler)
logging.info(f"Starting web service on {host}:{port} ...")
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
server.server_close()
logging.info("Server stopped.")
def main():
parser = argparse.ArgumentParser(description="NetBox DNSMasq Updater")
parser.add_argument("--listen", default="0.0.0.0:8080", help="address and port to listen on (default: 0.0.0.0:8080)")
parser.add_argument("--api-url", default="https://netbox.koszewscy.waw.pl/api/", help="NetBox API base URL")
parser.add_argument("--dry-run", action="store_true", help="print output instead of writing to dnsmasq config")
args = parser.parse_args()
if args.dry_run:
logging.info("Dry run mode enabled, not writing to dnsmasq config file.")
create_dnsmasq_config(args.api_url, write_config=False)
sys.exit(0)
run_server(args.listen, args.api_url)
if __name__ == "__main__":
main()