"""Dashboard status endpoint.""" import asyncio from datetime import datetime, timezone from fastapi import APIRouter, Depends from sqlmodel import Session, select from ..database import get_session from ..models import Replica, Setting, SyncRun from ..sync.engine import get_progress router = APIRouter(prefix="/api", tags=["status"]) @router.get("/status") def get_status(session: Session = Depends(get_session)): replicas = session.exec(select(Replica)).all() progress = get_progress() now = datetime.now(timezone.utc) replica_data = [] for r in replicas: lag = None if r.last_sync_ts: ts = r.last_sync_ts if ts.tzinfo is None: ts = ts.replace(tzinfo=timezone.utc) lag = int((now - ts).total_seconds()) if r.suspended_at: status = "suspended" elif progress.running and progress.phase and r.name in progress.phase: status = "syncing" elif r.consecutive_failures > 0: status = "error" elif r.last_sync_ts: status = "synced" else: status = "pending" # Last run stats for this replica last_run = session.exec( select(SyncRun) .where(SyncRun.replica_id == r.id) .order_by(SyncRun.started_at.desc()) # type: ignore[attr-defined] .limit(1) ).first() replica_data.append( { "id": r.id, "name": r.name, "url": r.url, "enabled": r.enabled, "status": status, "lag_seconds": lag, "last_sync_ts": r.last_sync_ts.isoformat() if r.last_sync_ts else None, "consecutive_failures": r.consecutive_failures, "suspended": r.suspended_at is not None, "docs_synced_last_run": last_run.docs_synced if last_run else 0, "docs_failed_last_run": last_run.docs_failed if last_run else 0, } ) last_run = session.exec( select(SyncRun) .order_by(SyncRun.started_at.desc()) # type: ignore[attr-defined] .limit(1) ).first() return { "replicas": replica_data, "sync_progress": { "running": progress.running, "phase": progress.phase, "docs_done": progress.docs_done, "docs_total": progress.docs_total, }, "last_sync_run": { "id": last_run.id, "started_at": last_run.started_at.isoformat() if last_run and last_run.started_at else None, "finished_at": last_run.finished_at.isoformat() if last_run and last_run.finished_at else None, "docs_synced": last_run.docs_synced if last_run else 0, "docs_failed": last_run.docs_failed if last_run else 0, "timed_out": last_run.timed_out if last_run else False, } if last_run else None, } async def _fetch_count(url: str, token: str, path: str = "/api/documents/") -> int | None: import httpx try: async with httpx.AsyncClient( headers={"Authorization": f"Token {token}"}, timeout=8.0 ) as client: r = await client.get(url.rstrip("/") + path, params={"page_size": 1}) r.raise_for_status() return r.json().get("count") except Exception: return None @router.get("/doc-counts") async def doc_counts(session: Session = Depends(get_session)): """Live document counts from master and all replicas (parallel fetch).""" from ..config import get_config from ..crypto import decrypt from ..scheduler import SETTINGS_DEFAULTS config = get_config() rows = session.exec(select(Setting)).all() settings = dict(SETTINGS_DEFAULTS) for row in rows: if row.value is not None: settings[row.key] = row.value master_url = settings.get("master_url", "") master_token_enc = settings.get("master_token", "") master_token = decrypt(master_token_enc, config.secret_key) if master_token_enc else "" replicas = session.exec(select(Replica)).all() # Build tasks: (label, url, token) tasks = [] if master_url and master_token: tasks.append(("__master__", master_url, master_token)) for r in replicas: token = decrypt(r.api_token, config.secret_key) tasks.append((str(r.id), r.url, token)) counts_raw = await asyncio.gather(*[_fetch_count(url, tok) for _, url, tok in tasks]) result = {} for (label, _, _), count in zip(tasks, counts_raw): result[label] = count return result