All checks were successful
Deploy / deploy (push) Successful in 22s
Mirror cscli alerts list: new /alerts endpoint hits LAPI machine auth with since/ip/scenario/origin filters, renders ID, scope:value, reason, country, AS, events, decisions, created_at. Decisions table gains a Created column derived from until - duration (LAPI does not expose created_at on /v1/decisions). Both views format timestamps locally and append a relative "x ago" via Intl.RelativeTimeFormat, refreshed every 30s on the decisions view.
259 lines
7.9 KiB
Python
259 lines
7.9 KiB
Python
import os
|
|
import re
|
|
import time
|
|
import ipaddress
|
|
import logging
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import requests
|
|
from flask import Flask, render_template, request, jsonify, abort
|
|
|
|
LAPI_URL = os.environ["LAPI_URL"].rstrip("/")
|
|
MACHINE_ID = os.environ["LAPI_MACHINE_ID"]
|
|
MACHINE_PW = os.environ["LAPI_MACHINE_PASSWORD"]
|
|
BOUNCER_KEY = os.environ["LAPI_BOUNCER_KEY"]
|
|
TRUSTED_HOPS = int(os.environ.get("TRUSTED_PROXY_HOPS", "1"))
|
|
REQ_TIMEOUT = 5
|
|
|
|
# LAPI splits read vs write:
|
|
# GET /v1/decisions → bouncer X-Api-Key
|
|
# DELETE /v1/decisions → machine Bearer JWT (from /v1/watchers/login)
|
|
|
|
app = Flask(__name__)
|
|
logging.basicConfig(level=logging.INFO)
|
|
log = app.logger
|
|
|
|
_token = {"jwt": None, "exp": 0.0}
|
|
|
|
|
|
def _login():
|
|
r = requests.post(
|
|
f"{LAPI_URL}/v1/watchers/login",
|
|
json={"machine_id": MACHINE_ID, "password": MACHINE_PW},
|
|
timeout=REQ_TIMEOUT,
|
|
)
|
|
r.raise_for_status()
|
|
_token["jwt"] = r.json()["token"]
|
|
_token["exp"] = time.time() + 60 * 13
|
|
return _token["jwt"]
|
|
|
|
|
|
def _jwt():
|
|
if not _token["jwt"] or time.time() >= _token["exp"]:
|
|
return _login()
|
|
return _token["jwt"]
|
|
|
|
|
|
_GO_DURATION_RE = re.compile(
|
|
r"^(?:(\d+)h)?(?:(\d+)m)?(?:(\d+(?:\.\d+)?)s)?(?:(\d+(?:\.\d+)?)ms)?(?:(\d+(?:\.\d+)?)(?:us|µs))?(?:(\d+)ns)?$"
|
|
)
|
|
|
|
|
|
def _parse_go_duration(s):
|
|
if not s:
|
|
return None
|
|
m = _GO_DURATION_RE.match(s.strip())
|
|
if not m or not any(m.groups()):
|
|
return None
|
|
h, mn, sc, ms, us, ns = m.groups()
|
|
return timedelta(
|
|
hours=int(h or 0),
|
|
minutes=int(mn or 0),
|
|
seconds=float(sc or 0),
|
|
milliseconds=float(ms or 0),
|
|
microseconds=float(us or 0) + (float(ns or 0) / 1000.0),
|
|
)
|
|
|
|
|
|
def _parse_iso(s):
|
|
if not s:
|
|
return None
|
|
# CrowdSec emits RFC3339 w/ nanos; Python fromisoformat tolerates up to micros only.
|
|
s = s.strip()
|
|
if s.endswith("Z"):
|
|
s = s[:-1] + "+00:00"
|
|
# Trim fractional seconds to 6 digits.
|
|
s = re.sub(r"(\.\d{6})\d+", r"\1", s)
|
|
try:
|
|
return datetime.fromisoformat(s)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _enrich(decisions):
|
|
for d in decisions:
|
|
until = _parse_iso(d.get("until"))
|
|
dur = _parse_go_duration(d.get("duration"))
|
|
if until and dur:
|
|
created = until - dur
|
|
d["created_at"] = created.astimezone(timezone.utc).isoformat()
|
|
return decisions
|
|
|
|
|
|
def _bouncer(method, path, **kw):
|
|
headers = kw.pop("headers", {})
|
|
headers["X-Api-Key"] = BOUNCER_KEY
|
|
return requests.request(method, f"{LAPI_URL}{path}", headers=headers, timeout=REQ_TIMEOUT, **kw)
|
|
|
|
|
|
def _machine(method, path, **kw):
|
|
headers = kw.pop("headers", {})
|
|
headers["Authorization"] = f"Bearer {_jwt()}"
|
|
r = requests.request(method, f"{LAPI_URL}{path}", headers=headers, timeout=REQ_TIMEOUT, **kw)
|
|
if r.status_code == 401:
|
|
_token["exp"] = 0
|
|
headers["Authorization"] = f"Bearer {_jwt()}"
|
|
r = requests.request(method, f"{LAPI_URL}{path}", headers=headers, timeout=REQ_TIMEOUT, **kw)
|
|
return r
|
|
|
|
|
|
def caller_ip():
|
|
xff = request.headers.get("X-Forwarded-For", "")
|
|
chain = [p.strip() for p in xff.split(",") if p.strip()]
|
|
if chain and TRUSTED_HOPS > 0:
|
|
idx = max(0, len(chain) - TRUSTED_HOPS)
|
|
candidate = chain[idx]
|
|
else:
|
|
candidate = request.remote_addr
|
|
try:
|
|
ipaddress.ip_address(candidate)
|
|
except (ValueError, TypeError):
|
|
abort(400, "could not determine caller ip")
|
|
return candidate
|
|
|
|
|
|
def valid_ip(s):
|
|
try:
|
|
ipaddress.ip_address(s)
|
|
return True
|
|
except (ValueError, TypeError):
|
|
return False
|
|
|
|
|
|
@app.get("/")
|
|
def index():
|
|
return render_template("index.html", my_ip=caller_ip())
|
|
|
|
|
|
DEFAULT_LIMIT = 200
|
|
MAX_LIMIT = 2000
|
|
|
|
|
|
@app.get("/decisions")
|
|
def list_decisions():
|
|
ip = request.args.get("ip", "").strip()
|
|
reason = request.args.get("reason", "").strip()
|
|
origin = request.args.get("origin", "").strip()
|
|
try:
|
|
limit = max(1, min(MAX_LIMIT, int(request.args.get("limit", DEFAULT_LIMIT))))
|
|
except ValueError:
|
|
limit = DEFAULT_LIMIT
|
|
|
|
params = {"limit": limit}
|
|
if ip:
|
|
if not valid_ip(ip):
|
|
return render_template("_decisions.html", error="invalid IP", decisions=[], total=0, limit=limit), 400
|
|
params["ip"] = ip
|
|
if reason:
|
|
params["scenarios_containing"] = reason
|
|
if origin:
|
|
params["origins"] = origin
|
|
|
|
r = _bouncer("GET", "/v1/decisions", params=params)
|
|
if r.status_code != 200:
|
|
return render_template("_decisions.html", error=f"LAPI {r.status_code}: {r.text[:200]}", decisions=[], total=0, limit=limit), 502
|
|
decisions = _enrich(r.json() or [])
|
|
return render_template("_decisions.html", decisions=decisions, error=None, total=len(decisions), limit=limit)
|
|
|
|
|
|
ALERT_SINCE_DEFAULT = "1h"
|
|
ALERT_SINCE_ALLOWED = {"15m", "1h", "6h", "24h", "168h", "720h"}
|
|
|
|
|
|
@app.get("/alerts")
|
|
def list_alerts():
|
|
since = request.args.get("since", ALERT_SINCE_DEFAULT).strip()
|
|
if since not in ALERT_SINCE_ALLOWED:
|
|
since = ALERT_SINCE_DEFAULT
|
|
ip = request.args.get("ip", "").strip()
|
|
scenario = request.args.get("scenario", "").strip()
|
|
origin = request.args.get("origin", "").strip()
|
|
try:
|
|
limit = max(1, min(MAX_LIMIT, int(request.args.get("limit", DEFAULT_LIMIT))))
|
|
except ValueError:
|
|
limit = DEFAULT_LIMIT
|
|
|
|
params = {"limit": limit, "since": since}
|
|
if ip:
|
|
if not valid_ip(ip):
|
|
return render_template("_alerts.html", error="invalid IP", alerts=[], total=0, limit=limit, since=since), 400
|
|
params["ip"] = ip
|
|
if scenario:
|
|
params["scenario"] = scenario
|
|
if origin:
|
|
params["origin"] = origin
|
|
|
|
r = _machine("GET", "/v1/alerts", params=params)
|
|
if r.status_code != 200:
|
|
return render_template("_alerts.html", error=f"LAPI {r.status_code}: {r.text[:200]}", alerts=[], total=0, limit=limit, since=since), 502
|
|
alerts = r.json() or []
|
|
return render_template("_alerts.html", alerts=alerts, error=None, total=len(alerts), limit=limit, since=since)
|
|
|
|
|
|
@app.post("/unban")
|
|
def unban():
|
|
ip = request.form.get("ip", "").strip()
|
|
decision_id = request.form.get("id", "").strip()
|
|
if decision_id:
|
|
if not decision_id.isdigit():
|
|
return "invalid id", 400
|
|
r = _machine("DELETE", f"/v1/decisions/{decision_id}")
|
|
elif ip:
|
|
if not valid_ip(ip):
|
|
return "invalid IP", 400
|
|
r = _machine("DELETE", "/v1/decisions", params={"ip": ip})
|
|
else:
|
|
return "need id or ip", 400
|
|
if r.status_code not in (200, 204):
|
|
return f"LAPI {r.status_code}: {r.text[:200]}", 502
|
|
log.info("unbanned by=%s ip=%s id=%s", caller_ip(), ip, decision_id)
|
|
return list_decisions()
|
|
|
|
|
|
@app.post("/unban-bulk")
|
|
def unban_bulk():
|
|
ids = [i for i in request.form.getlist("id") if i.isdigit()]
|
|
if not ids:
|
|
return list_decisions()
|
|
deleted = 0
|
|
errors = []
|
|
for did in ids:
|
|
r = _machine("DELETE", f"/v1/decisions/{did}")
|
|
if r.status_code in (200, 204):
|
|
deleted += 1
|
|
else:
|
|
errors.append(f"{did}:{r.status_code}")
|
|
log.info("unban-bulk by=%s count=%d errors=%s", caller_ip(), deleted, errors)
|
|
if errors:
|
|
return f"deleted {deleted}/{len(ids)}; errors: {', '.join(errors[:5])}", 502
|
|
return list_decisions()
|
|
|
|
|
|
@app.post("/unban-me")
|
|
def unban_me():
|
|
ip = caller_ip()
|
|
r = _machine("DELETE", "/v1/decisions", params={"ip": ip})
|
|
if r.status_code not in (200, 204):
|
|
return f"LAPI {r.status_code}: {r.text[:200]}", 502
|
|
log.info("unban-me by=%s", ip)
|
|
return render_template("_unban_me.html", ip=ip, result=r.json() if r.text else {})
|
|
|
|
|
|
@app.get("/healthz")
|
|
def healthz():
|
|
try:
|
|
r = _bouncer("GET", "/v1/decisions", params={"limit": 1})
|
|
return jsonify(ok=r.status_code == 200, lapi_status=r.status_code), 200 if r.status_code == 200 else 503
|
|
except Exception as e:
|
|
return jsonify(ok=False, error=str(e)), 503
|