Files
crowdsec-admin/app/app.py
domverse 9c8b4ca0cc
All checks were successful
Deploy / deploy (push) Successful in 39s
feat: initial scaffold of CrowdSec admin webapp
Flask + htmx mini-app to list and delete CrowdSec decisions from a browser,
gated behind Authentik. Talks to host LAPI via host.docker.internal:8080
using machine JWT auth (bouncer X-Api-Key is read-only).

Gitea Actions CI/CD on push to main: runner rebuilds image and brings the
stack up via docker compose on the host (same pattern as flight-radar).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-16 23:39:20 +02:00

135 lines
4.0 KiB
Python

import os
import time
import ipaddress
import logging
from functools import wraps
import requests
from flask import Flask, render_template, request, jsonify, abort
LAPI_URL = os.environ["LAPI_URL"].rstrip("/")
MACHINE_ID = os.environ["LAPI_MACHINE_ID"]
MACHINE_PW = os.environ["LAPI_MACHINE_PASSWORD"]
TRUSTED_HOPS = int(os.environ.get("TRUSTED_PROXY_HOPS", "1"))
REQ_TIMEOUT = 5
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
log = app.logger
_token = {"jwt": None, "exp": 0.0}
def _login():
r = requests.post(
f"{LAPI_URL}/v1/watchers/login",
json={"machine_id": MACHINE_ID, "password": MACHINE_PW},
timeout=REQ_TIMEOUT,
)
r.raise_for_status()
data = r.json()
_token["jwt"] = data["code"] if "code" in data and isinstance(data["code"], str) else data.get("token")
if not _token["jwt"]:
_token["jwt"] = data.get("code") or data.get("token")
_token["exp"] = time.time() + 60 * 13
return _token["jwt"]
def _jwt():
if not _token["jwt"] or time.time() >= _token["exp"]:
return _login()
return _token["jwt"]
def _lapi(method, path, **kw):
headers = kw.pop("headers", {})
headers["Authorization"] = f"Bearer {_jwt()}"
r = requests.request(method, f"{LAPI_URL}{path}", headers=headers, timeout=REQ_TIMEOUT, **kw)
if r.status_code == 401:
_token["exp"] = 0
headers["Authorization"] = f"Bearer {_jwt()}"
r = requests.request(method, f"{LAPI_URL}{path}", headers=headers, timeout=REQ_TIMEOUT, **kw)
return r
def caller_ip():
xff = request.headers.get("X-Forwarded-For", "")
chain = [p.strip() for p in xff.split(",") if p.strip()]
if chain and TRUSTED_HOPS > 0:
idx = max(0, len(chain) - TRUSTED_HOPS)
candidate = chain[idx]
else:
candidate = request.remote_addr
try:
ipaddress.ip_address(candidate)
except (ValueError, TypeError):
abort(400, "could not determine caller ip")
return candidate
def valid_ip(s):
try:
ipaddress.ip_address(s)
return True
except (ValueError, TypeError):
return False
@app.get("/")
def index():
return render_template("index.html", my_ip=caller_ip())
@app.get("/decisions")
def list_decisions():
q = request.args.get("ip", "").strip()
params = {}
if q:
if not valid_ip(q):
return render_template("_decisions.html", error="invalid IP", decisions=[]), 400
params["ip"] = q
r = _lapi("GET", "/v1/decisions", params=params)
if r.status_code != 200:
return render_template("_decisions.html", error=f"LAPI {r.status_code}: {r.text[:200]}", decisions=[]), 502
decisions = r.json() or []
return render_template("_decisions.html", decisions=decisions, error=None)
@app.post("/unban")
def unban():
ip = request.form.get("ip", "").strip()
decision_id = request.form.get("id", "").strip()
if decision_id:
if not decision_id.isdigit():
return "invalid id", 400
r = _lapi("DELETE", f"/v1/decisions/{decision_id}")
elif ip:
if not valid_ip(ip):
return "invalid IP", 400
r = _lapi("DELETE", "/v1/decisions", params={"ip": ip})
else:
return "need id or ip", 400
if r.status_code not in (200, 204):
return f"LAPI {r.status_code}: {r.text[:200]}", 502
log.info("unbanned by=%s ip=%s id=%s", caller_ip(), ip, decision_id)
return list_decisions()
@app.post("/unban-me")
def unban_me():
ip = caller_ip()
r = _lapi("DELETE", "/v1/decisions", params={"ip": ip})
if r.status_code not in (200, 204):
return f"LAPI {r.status_code}: {r.text[:200]}", 502
log.info("unban-me by=%s", ip)
return render_template("_unban_me.html", ip=ip, result=r.json() if r.text else {})
@app.get("/healthz")
def healthz():
try:
r = _lapi("GET", "/v1/decisions", params={"limit": 1})
return jsonify(ok=r.status_code == 200, lapi_status=r.status_code), 200 if r.status_code == 200 else 503
except Exception as e:
return jsonify(ok=False, error=str(e)), 503