"""Jinja2 HTML routes for the web UI."""
from datetime import datetime, timezone
from typing import Optional
from fastapi import APIRouter, Depends, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from sqlmodel import Session, select
from ..database import get_session
from ..models import Log, Replica, SyncRun
from ..sync.engine import get_progress
router = APIRouter(tags=["ui"])
templates: Optional[Jinja2Templates] = None
def setup_templates(t: Jinja2Templates) -> None:
global templates
templates = t
def _tmpl(name: str, request: Request, ctx: dict) -> HTMLResponse:
assert templates is not None
return templates.TemplateResponse(name, {"request": request, **ctx})
def _lag_str(ts: datetime | None) -> str:
if not ts:
return "never"
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
seconds = int((datetime.now(timezone.utc) - ts).total_seconds())
if seconds < 120:
return f"{seconds}s ago"
if seconds < 7200:
return f"{seconds // 60}m ago"
if seconds < 172800:
return f"{seconds // 3600}h ago"
return f"{seconds // 86400}d ago"
@router.get("/ui/doc-counts", response_class=HTMLResponse)
async def doc_counts_fragment(request: Request, session: Session = Depends(get_session)):
from .. import envfile
from ..api.status import _fetch_count, doc_counts as _doc_counts
from ..config import get_config
from ..crypto import decrypt
from ..models import Setting
from ..scheduler import SETTINGS_DEFAULTS
import asyncio
config = get_config()
rows = session.exec(select(Setting)).all()
settings = dict(SETTINGS_DEFAULTS)
for row in rows:
if row.value is not None:
settings[row.key] = row.value
master_url = settings.get("master_url", "")
master_token_enc = settings.get("master_token", "")
master_token = decrypt(master_token_enc, config.secret_key) if master_token_enc else ""
replicas = session.exec(select(Replica)).all()
tasks = []
if master_url and master_token:
tasks.append(("Master", master_url, master_token))
for r in replicas:
token = decrypt(r.api_token, config.secret_key)
tasks.append((r.name, r.url, token))
counts_raw = await asyncio.gather(*[_fetch_count(url, tok) for _, url, tok in tasks])
parts = []
for (label, _, _), count in zip(tasks, counts_raw):
val = str(count) if count is not None else "?"
parts.append(f"{label}: {val} docs")
html = (
'
'
+ " · ".join(parts)
+ "
"
) if parts else ""
return HTMLResponse(html)
@router.get("/", response_class=HTMLResponse)
def dashboard(request: Request, session: Session = Depends(get_session)):
replicas = session.exec(select(Replica)).all()
now = datetime.now(timezone.utc)
progress = get_progress()
replica_rows = []
for r in replicas:
last_run = session.exec(
select(SyncRun)
.where(SyncRun.replica_id == r.id)
.order_by(SyncRun.started_at.desc()) # type: ignore[attr-defined]
.limit(1)
).first()
if r.suspended_at:
status = "suspended"
elif progress.running and r.name in (progress.phase or ""):
status = "syncing"
elif r.consecutive_failures > 0:
status = "error"
elif r.last_sync_ts:
status = "synced"
else:
status = "pending"
replica_rows.append(
{
"replica": r,
"status": status,
"lag": _lag_str(r.last_sync_ts),
"last_run": last_run,
}
)
last_run = session.exec(
select(SyncRun)
.order_by(SyncRun.started_at.desc()) # type: ignore[attr-defined]
.limit(1)
).first()
return _tmpl(
"dashboard.html",
request,
{
"replica_rows": replica_rows,
"last_run": last_run,
"progress": progress,
},
)
@router.get("/replicas", response_class=HTMLResponse)
def replicas_page(request: Request, session: Session = Depends(get_session)):
from .. import envfile
replicas = session.exec(select(Replica).order_by(Replica.created_at)).all() # type: ignore[attr-defined]
env = envfile.read()
# Find env-defined replicas not yet in DB (keyed by name)
existing_names = {r.name.upper().replace(" ", "_").replace("-", "_") for r in replicas}
env_replicas: list[dict] = []
seen: set[str] = set()
for key, value in env.items():
if key.startswith("REPLICA_") and key.endswith("_URL"):
safe = key[len("REPLICA_"):-len("_URL")]
if safe not in existing_names and safe not in seen:
token_key = f"REPLICA_{safe}_TOKEN"
env_replicas.append({"safe": safe, "url": value, "token": env.get(token_key, "")})
seen.add(safe)
return _tmpl("replicas.html", request, {"replicas": replicas, "env_replicas": env_replicas})
@router.get("/replicas/{replica_id}", response_class=HTMLResponse)
def replica_detail(
replica_id: int, request: Request, session: Session = Depends(get_session)
):
replica = session.get(Replica, replica_id)
if not replica:
return HTMLResponse("Not found", status_code=404)
recent_runs = session.exec(
select(SyncRun)
.where(SyncRun.replica_id == replica_id)
.order_by(SyncRun.started_at.desc()) # type: ignore[attr-defined]
.limit(20)
).all()
from ..models import SyncMap
sync_map_page = session.exec(
select(SyncMap)
.where(SyncMap.replica_id == replica_id)
.order_by(SyncMap.last_synced.desc()) # type: ignore[attr-defined]
.limit(50)
).all()
return _tmpl(
"replica_detail.html",
request,
{
"replica": replica,
"recent_runs": recent_runs,
"sync_map_page": sync_map_page,
"lag": _lag_str(replica.last_sync_ts),
},
)
@router.get("/logs", response_class=HTMLResponse)
def logs_page(request: Request, session: Session = Depends(get_session)):
replicas = session.exec(select(Replica)).all()
recent_logs = session.exec(
select(Log)
.order_by(Log.created_at.desc()) # type: ignore[attr-defined]
.limit(100)
).all()
return _tmpl(
"logs.html",
request,
{"replicas": replicas, "logs": recent_logs},
)
@router.get("/settings", response_class=HTMLResponse)
def settings_page(request: Request, session: Session = Depends(get_session)):
from .. import envfile
from ..models import Setting
from ..scheduler import SETTINGS_DEFAULTS
rows = session.exec(select(Setting)).all()
settings = dict(SETTINGS_DEFAULTS)
for row in rows:
if row.value is not None:
settings[row.key] = row.value
env = envfile.read()
# Fallback master_url from env if not saved in DB
if not settings.get("master_url") and env.get("MASTER_URL"):
settings["master_url"] = env["MASTER_URL"]
# Mask DB secrets; pass env token for pre-fill only if DB has none
db_has_token = bool(settings.get("master_token"))
db_has_alert_token = bool(settings.get("alert_target_token"))
for k in ("master_token", "alert_target_token"):
if settings.get(k):
settings[k] = "••••••••"
env_master_token = env.get("MASTER_TOKEN", "") if not db_has_token else ""
env_alert_token = env.get("ALERT_TARGET_TOKEN", "") if not db_has_alert_token else ""
return _tmpl(
"settings.html",
request,
{
"settings": settings,
"env_master_token": env_master_token,
"env_alert_token": env_alert_token,
},
)