import os import time import ipaddress import logging import requests from flask import Flask, render_template, request, jsonify, abort LAPI_URL = os.environ["LAPI_URL"].rstrip("/") MACHINE_ID = os.environ["LAPI_MACHINE_ID"] MACHINE_PW = os.environ["LAPI_MACHINE_PASSWORD"] BOUNCER_KEY = os.environ["LAPI_BOUNCER_KEY"] TRUSTED_HOPS = int(os.environ.get("TRUSTED_PROXY_HOPS", "1")) REQ_TIMEOUT = 5 # LAPI splits read vs write: # GET /v1/decisions → bouncer X-Api-Key # DELETE /v1/decisions → machine Bearer JWT (from /v1/watchers/login) app = Flask(__name__) logging.basicConfig(level=logging.INFO) log = app.logger _token = {"jwt": None, "exp": 0.0} def _login(): r = requests.post( f"{LAPI_URL}/v1/watchers/login", json={"machine_id": MACHINE_ID, "password": MACHINE_PW}, timeout=REQ_TIMEOUT, ) r.raise_for_status() _token["jwt"] = r.json()["token"] _token["exp"] = time.time() + 60 * 13 return _token["jwt"] def _jwt(): if not _token["jwt"] or time.time() >= _token["exp"]: return _login() return _token["jwt"] def _bouncer(method, path, **kw): headers = kw.pop("headers", {}) headers["X-Api-Key"] = BOUNCER_KEY return requests.request(method, f"{LAPI_URL}{path}", headers=headers, timeout=REQ_TIMEOUT, **kw) def _machine(method, path, **kw): headers = kw.pop("headers", {}) headers["Authorization"] = f"Bearer {_jwt()}" r = requests.request(method, f"{LAPI_URL}{path}", headers=headers, timeout=REQ_TIMEOUT, **kw) if r.status_code == 401: _token["exp"] = 0 headers["Authorization"] = f"Bearer {_jwt()}" r = requests.request(method, f"{LAPI_URL}{path}", headers=headers, timeout=REQ_TIMEOUT, **kw) return r def caller_ip(): xff = request.headers.get("X-Forwarded-For", "") chain = [p.strip() for p in xff.split(",") if p.strip()] if chain and TRUSTED_HOPS > 0: idx = max(0, len(chain) - TRUSTED_HOPS) candidate = chain[idx] else: candidate = request.remote_addr try: ipaddress.ip_address(candidate) except (ValueError, TypeError): abort(400, "could not determine caller ip") return candidate def valid_ip(s): try: ipaddress.ip_address(s) return True except (ValueError, TypeError): return False @app.get("/") def index(): return render_template("index.html", my_ip=caller_ip()) @app.get("/decisions") def list_decisions(): q = request.args.get("ip", "").strip() params = {} if q: if not valid_ip(q): return render_template("_decisions.html", error="invalid IP", decisions=[]), 400 params["ip"] = q r = _bouncer("GET", "/v1/decisions", params=params) if r.status_code != 200: return render_template("_decisions.html", error=f"LAPI {r.status_code}: {r.text[:200]}", decisions=[]), 502 decisions = r.json() or [] return render_template("_decisions.html", decisions=decisions, error=None) @app.post("/unban") def unban(): ip = request.form.get("ip", "").strip() decision_id = request.form.get("id", "").strip() if decision_id: if not decision_id.isdigit(): return "invalid id", 400 r = _machine("DELETE", f"/v1/decisions/{decision_id}") elif ip: if not valid_ip(ip): return "invalid IP", 400 r = _machine("DELETE", "/v1/decisions", params={"ip": ip}) else: return "need id or ip", 400 if r.status_code not in (200, 204): return f"LAPI {r.status_code}: {r.text[:200]}", 502 log.info("unbanned by=%s ip=%s id=%s", caller_ip(), ip, decision_id) return list_decisions() @app.post("/unban-me") def unban_me(): ip = caller_ip() r = _machine("DELETE", "/v1/decisions", params={"ip": ip}) if r.status_code not in (200, 204): return f"LAPI {r.status_code}: {r.text[:200]}", 502 log.info("unban-me by=%s", ip) return render_template("_unban_me.html", ip=ip, result=r.json() if r.text else {}) @app.get("/healthz") def healthz(): try: r = _bouncer("GET", "/v1/decisions", params={"limit": 1}) return jsonify(ok=r.status_code == 200, lapi_status=r.status_code), 200 if r.status_code == 200 else 503 except Exception as e: return jsonify(ok=False, error=str(e)), 503