373 lines
14 KiB
Python
Executable File
373 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Simple HTTP server that responds 200 with a test message including client IP.
|
|
|
|
Behavior:
|
|
- GET returns HTTP/1.1 200 with a message including the requester IP
|
|
- HEAD returns the same headers as GET but no body
|
|
- Displays whether any incoming X-* headers were detected and lists them
|
|
- If User-Agent contains "curl" or "wget" (case-insensitive) the server
|
|
responds with Content-Type: text/plain; otherwise Content-Type: text/html.
|
|
- --pem path to a PEM bundle (cert chain + private key). When provided,
|
|
the server listens on both HTTP (--port) and HTTPS (--tls-port).
|
|
Without --pem the server listens on plain HTTP only.
|
|
- --look controls which HTML variant is returned for non-CLI agents:
|
|
* basic - plain HTML with no external references
|
|
* nice - includes Google Font "Noto Sans" (default)
|
|
* bootstrap - Bootstrap 5 layout
|
|
* tailwind - Tailwind CSS via @tailwindcss/browser@latest
|
|
- The URL query parameter "look" (e.g. /?look=nice) overrides the command-line --look
|
|
for that request only. Values are case-insensitive and must be one of basic,nice,bootstrap,tailwind.
|
|
|
|
Usage:
|
|
python3 ok_server.py # binds 0.0.0.0:8080, nice look
|
|
python3 ok_server.py --look basic
|
|
python3 ok_server.py -b 127.0.0.1 -p 8080 --look tailwind
|
|
python3 ok_server.py --pem /path/to/bundle.pem # HTTP :8080 + HTTPS :8443
|
|
python3 ok_server.py --pem /path/to/bundle.pem --tls-port 9443
|
|
|
|
Test:
|
|
curl -i http://localhost:8080/ # text/plain for curl
|
|
curl -i "http://localhost:8080/?look=basic" # request-level override
|
|
curl -I http://localhost:8080/ # HEAD (headers only)
|
|
wget -S -O - http://localhost:8080/ # text/plain for wget
|
|
open http://localhost:8080/ # browser gets HTML variant per look
|
|
"""
|
|
|
|
import argparse
|
|
import signal
|
|
import ssl
|
|
import sys
|
|
import threading
|
|
from html import escape
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from typing import Tuple
|
|
from urllib.parse import urlparse, parse_qs
|
|
|
|
PLAIN_TEMPLATE = (
|
|
"Hello, This is a test HTTP server.\n\n"
|
|
"Your request came from {ip}.\n\n"
|
|
"{proxy_headers_block}"
|
|
"Have a nice day!\n"
|
|
)
|
|
|
|
|
|
HTML_BASIC = """<!doctype html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="utf-8">
|
|
<title>Test HTTP server</title>
|
|
</head>
|
|
<body>
|
|
<h1>Hello,</h1>
|
|
<p>This is a test HTTP server.</p>
|
|
<p>Your request came from {ip_html}.</p>
|
|
{proxy_headers_html}
|
|
<p>Have a nice day!</p>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
|
|
HTML_NICE = """<!doctype html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="utf-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1">
|
|
<title>Test HTTP server</title>
|
|
<!-- Google Font: Noto Sans -->
|
|
<link href="https://fonts.googleapis.com/css2?family=Noto+Sans:wght@400;700&display=swap" rel="stylesheet">
|
|
<style>
|
|
body {{ font-family: "Noto Sans", system-ui, -apple-system, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif; padding: 2rem; background: #f6f7fb; }}
|
|
.card {{ background: white; padding: 1.5rem; border-radius: 8px; box-shadow: 0 2px 8px rgba(0,0,0,0.06); max-width: 800px; margin: 2rem auto; }}
|
|
h1 {{ margin-top: 0; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div class="card">
|
|
<h1>Hello,</h1>
|
|
<p>This is a test HTTP server.</p>
|
|
<p>Your request came from {ip_html}.</p>
|
|
{proxy_headers_html}
|
|
<p>Have a nice day!</p>
|
|
</div>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
|
|
HTML_BOOTSTRAP = """<!doctype html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="utf-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1">
|
|
<title>Test HTTP server</title>
|
|
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet" crossorigin="anonymous">
|
|
</head>
|
|
<body class="bg-light">
|
|
<nav class="navbar navbar-expand-lg navbar-dark bg-primary mb-4">
|
|
<div class="container">
|
|
<a class="navbar-brand" href="#">Test HTTP server</a>
|
|
</div>
|
|
</nav>
|
|
|
|
<main class="container">
|
|
<div class="row justify-content-center">
|
|
<div class="col-md-8">
|
|
<div class="card shadow-sm">
|
|
<div class="card-body">
|
|
<h1 class="card-title">Hello,</h1>
|
|
<p class="card-text">This is a test HTTP server.</p>
|
|
<p class="card-text">Your request came from {ip_html}.</p>
|
|
{proxy_headers_html}
|
|
<hr>
|
|
<p class="mb-0">Have a nice day!</p>
|
|
</div>
|
|
</div>
|
|
</div>
|
|
</div>
|
|
</main>
|
|
|
|
<footer class="text-center mt-4 mb-4">
|
|
<small class="text-muted">Simple status page for firewall/testing</small>
|
|
</footer>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
|
|
HTML_TAILWIND = """<!doctype html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="utf-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1">
|
|
<title>Test HTTP server</title>
|
|
<script src="https://cdn.jsdelivr.net/npm/@tailwindcss/browser@latest"></script>
|
|
</head>
|
|
<body class="min-h-screen bg-slate-100 text-slate-900">
|
|
<div class="mx-auto max-w-3xl px-4 py-10 sm:py-16">
|
|
<div class="overflow-hidden rounded-2xl border border-slate-200 bg-white shadow-lg">
|
|
<div class="bg-slate-900 px-6 py-4 text-slate-100">
|
|
<h1 class="text-xl font-semibold">Test HTTP server</h1>
|
|
<p class="mt-1 text-sm text-slate-300">Simple status page for firewall/testing</p>
|
|
</div>
|
|
<main class="space-y-4 px-6 py-6">
|
|
<p class="text-2xl font-bold">Hello,</p>
|
|
<p>This is a test HTTP server.</p>
|
|
<p>Your request came from {ip_html}.</p>
|
|
{proxy_headers_html}
|
|
<p class="pt-2">Have a nice day!</p>
|
|
</main>
|
|
</div>
|
|
</div>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
|
|
VALID_LOOKS = ("basic", "nice", "bootstrap", "tailwind")
|
|
|
|
|
|
class OkHandler(BaseHTTPRequestHandler):
|
|
protocol_version = "HTTP/1.1"
|
|
|
|
def _collect_x_headers(self):
|
|
x_headers = []
|
|
for name, value in self.headers.items():
|
|
if name.lower().startswith("x-"):
|
|
x_headers.append((name, value))
|
|
return sorted(x_headers, key=lambda item: item[0].lower())
|
|
|
|
def _proxy_markup(self, x_headers):
|
|
if not x_headers:
|
|
return "", ""
|
|
|
|
plain_lines = [f" - {name}: {value}" for name, value in x_headers]
|
|
proxy_headers_block = (
|
|
"Reverse proxy condition: detected via X-* headers.\n"
|
|
"X-* headers:\n" + "\n".join(plain_lines) + "\n\n"
|
|
)
|
|
|
|
html_items = "".join(
|
|
[f"<li><code>{escape(name)}</code>: {escape(value)}</li>" for name, value in x_headers]
|
|
)
|
|
proxy_headers_html = (
|
|
"<p>Reverse proxy condition: <strong>detected via X-* headers</strong>.</p>"
|
|
"<p>X-* headers:</p><ul>" + html_items + "</ul>"
|
|
)
|
|
return proxy_headers_block, proxy_headers_html
|
|
|
|
def _is_cli_agent(self, ua: str) -> bool:
|
|
if not ua:
|
|
return False
|
|
ua_l = ua.lower()
|
|
return ("curl" in ua_l) or ("wget" in ua_l)
|
|
|
|
def _get_request_look(self) -> str:
|
|
"""Return look from URL query (if valid) or None."""
|
|
# parse query string from the request path
|
|
parsed = urlparse(self.path)
|
|
qs = parse_qs(parsed.query)
|
|
look_vals = qs.get("look")
|
|
if not look_vals:
|
|
return None
|
|
# use first value, case-insensitive
|
|
look = look_vals[0].strip().lower()
|
|
if look in VALID_LOOKS:
|
|
return look
|
|
return None
|
|
|
|
def _html_for_look(self, look: str, ip_html: str, proxy_headers_html: str) -> str:
|
|
if look == "basic":
|
|
return HTML_BASIC.format(
|
|
ip_html=ip_html,
|
|
proxy_headers_html=proxy_headers_html,
|
|
)
|
|
if look == "nice":
|
|
return HTML_NICE.format(
|
|
ip_html=ip_html,
|
|
proxy_headers_html=proxy_headers_html,
|
|
)
|
|
if look == "bootstrap":
|
|
return HTML_BOOTSTRAP.format(
|
|
ip_html=ip_html,
|
|
proxy_headers_html=proxy_headers_html,
|
|
)
|
|
# fallback to tailwind
|
|
return HTML_TAILWIND.format(
|
|
ip_html=ip_html,
|
|
proxy_headers_html=proxy_headers_html,
|
|
)
|
|
|
|
def _make_body_and_type(self, client_ip: str, user_agent: str) -> Tuple[bytes, str]:
|
|
x_headers = self._collect_x_headers()
|
|
proxy_headers_block, proxy_headers_html = self._proxy_markup(x_headers)
|
|
|
|
xff = self.headers.get("X-Forwarded-For", "")
|
|
if xff:
|
|
real_ip = xff.split(",")[0].strip()
|
|
# strip port: [::1]:port -> ::1, 1.2.3.4:port -> 1.2.3.4
|
|
if real_ip.startswith("["):
|
|
real_ip = real_ip[1:real_ip.index("]")] if "]" in real_ip else real_ip[1:]
|
|
elif real_ip.count(":") == 1:
|
|
real_ip = real_ip.split(":")[0]
|
|
ip_text = f"{real_ip} (forwarded by proxy {client_ip})"
|
|
ip_html = (
|
|
f"<strong>{escape(real_ip)}</strong>"
|
|
f" (forwarded by proxy <strong>{escape(client_ip)}</strong>)"
|
|
)
|
|
else:
|
|
ip_text = client_ip
|
|
ip_html = f"<strong>{escape(client_ip)}</strong>"
|
|
|
|
if self._is_cli_agent(user_agent):
|
|
body = PLAIN_TEMPLATE.format(
|
|
ip=ip_text,
|
|
proxy_headers_block=proxy_headers_block,
|
|
).encode("utf-8")
|
|
ctype = "text/plain; charset=utf-8"
|
|
return body, ctype
|
|
|
|
# Check request-level override first
|
|
request_look = self._get_request_look()
|
|
if request_look:
|
|
look = request_look
|
|
else:
|
|
# fallback to server-level default (now: nice)
|
|
look = getattr(self.server, "look", "nice")
|
|
|
|
html = self._html_for_look(look, ip_html, proxy_headers_html)
|
|
body = html.encode("utf-8")
|
|
ctype = "text/html; charset=utf-8"
|
|
return body, ctype
|
|
|
|
def _send_ok_headers(self, content_type: str, body_len: int):
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", content_type)
|
|
self.send_header("Content-Length", str(body_len))
|
|
self.end_headers()
|
|
|
|
def do_GET(self):
|
|
client_ip = self.client_address[0]
|
|
user_agent = self.headers.get("User-Agent", "")
|
|
body, ctype = self._make_body_and_type(client_ip, user_agent)
|
|
try:
|
|
self._send_ok_headers(ctype, len(body))
|
|
self.wfile.write(body)
|
|
except BrokenPipeError:
|
|
# client closed connection early; ignore
|
|
pass
|
|
|
|
def do_HEAD(self):
|
|
client_ip = self.client_address[0]
|
|
user_agent = self.headers.get("User-Agent", "")
|
|
body, ctype = self._make_body_and_type(client_ip, user_agent)
|
|
# send same headers as GET but no body
|
|
self._send_ok_headers(ctype, len(body))
|
|
|
|
# minimal single-line log to stderr
|
|
def log_message(self, format, *args):
|
|
sys.stderr.write("%s - - [%s] %s\n" %
|
|
(self.client_address[0], self.log_date_time_string(), format % args))
|
|
|
|
|
|
def run(bind: str, port: int, tls_port: int, look: str, pem: str | None = None):
|
|
ssl_ctx = None
|
|
if pem:
|
|
try:
|
|
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
|
ssl_ctx.load_cert_chain(pem)
|
|
except (ssl.SSLError, OSError) as e:
|
|
print(f"Cannot load PEM file '{pem}': {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
http_server = ThreadingHTTPServer((bind, port), OkHandler)
|
|
http_server.look = look
|
|
https_server = None
|
|
|
|
if ssl_ctx:
|
|
https_server = ThreadingHTTPServer((bind, tls_port), OkHandler)
|
|
https_server.look = look
|
|
https_server.socket = ssl_ctx.wrap_socket(https_server.socket, server_side=True)
|
|
|
|
def _handle_termination(signum, _frame):
|
|
_ = signum
|
|
if https_server:
|
|
https_server.shutdown()
|
|
raise KeyboardInterrupt
|
|
|
|
signal.signal(signal.SIGINT, _handle_termination)
|
|
signal.signal(signal.SIGTERM, _handle_termination)
|
|
|
|
try:
|
|
print(f"Serving on http://{bind}:{port} (default look={look})")
|
|
if https_server:
|
|
print(f"Serving on https://{bind}:{tls_port} (default look={look})")
|
|
t = threading.Thread(target=https_server.serve_forever, daemon=True)
|
|
t.start()
|
|
http_server.serve_forever()
|
|
except KeyboardInterrupt:
|
|
print("\nShutting down server")
|
|
except Exception as e:
|
|
print("Server error:", e, file=sys.stderr)
|
|
raise
|
|
finally:
|
|
http_server.server_close()
|
|
if https_server:
|
|
https_server.server_close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="HTTP OK test server")
|
|
parser.add_argument("-b", "--bind", default="0.0.0.0",
|
|
help="Address to bind to (default: 0.0.0.0)")
|
|
parser.add_argument("-p", "--port", type=int, default=8080,
|
|
help="Port to listen on (default: 8080)")
|
|
parser.add_argument("--look", choices=VALID_LOOKS, default=VALID_LOOKS[1],
|
|
help="Default HTML look for non-cli agents (basic, nice, bootstrap, tailwind). Default: nice")
|
|
parser.add_argument("--tls-port", type=int, default=8443, dest="tls_port",
|
|
help="TLS port to listen on when --pem is provided (default: 8443)")
|
|
parser.add_argument("--pem", default=None, metavar="PEMFILE",
|
|
help="PEM bundle file (cert chain + private key); enables HTTPS on --tls-port alongside HTTP on --port")
|
|
args = parser.parse_args()
|
|
run(args.bind, args.port, args.tls_port, args.look, args.pem)
|