From cdc9407ff39c4c69d78ddb4ffc68bc3c9224ea2a Mon Sep 17 00:00:00 2001 From: domverse Date: Wed, 25 Mar 2026 21:16:57 +0100 Subject: [PATCH] Implement master promotion feature MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Allows promoting any replica to master with zero document re-downloads. The sync_map rebuild uses existing DB data only — pure in-memory join. Changes: - app/sync/promote.py: preflight() checks (doc count, sync lock, ack warnings) and promote() transaction (pause scheduler, rebuild all sync_maps, create old-master replica, swap settings, resume scheduler) - app/api/master.py: GET /api/master/promote/{id}/preflight (dry run) and POST /api/master/promote/{id} (execute) - app/models.py: add promoted_from_master bool field to Replica - app/database.py: idempotent ALTER TABLE migration for new column - app/main.py: register master router - app/templates/replica_detail.html: "Promote to Master" button + dialog with pre-flight summary, 3-card stats, ack checkboxes, spinner - app/ui/routes.py: flash query param on dashboard route - app/templates/dashboard.html: blue info banner for post-promotion flash Co-Authored-By: Claude Sonnet 4.6 --- app/api/master.py | 60 ++++++ app/database.py | 7 + app/main.py | 2 + app/models.py | 1 + app/sync/promote.py | 334 ++++++++++++++++++++++++++++++ app/templates/dashboard.html | 6 + app/templates/replica_detail.html | 156 +++++++++++++- app/ui/routes.py | 3 +- 8 files changed, 567 insertions(+), 2 deletions(-) create mode 100644 app/api/master.py create mode 100644 app/sync/promote.py diff --git a/app/api/master.py b/app/api/master.py new file mode 100644 index 0000000..96ca1e4 --- /dev/null +++ b/app/api/master.py @@ -0,0 +1,60 @@ +"""API endpoints for master promotion.""" +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +router = APIRouter(prefix="/api/master", tags=["master"]) + + +class PromoteRequest(BaseModel): + old_master_name: str + acknowledge_pending: bool = False + acknowledge_errors: bool = False + + +@router.get("/promote/{replica_id}/preflight") +async def preflight_endpoint(replica_id: int): + """Dry-run pre-flight check. No state is modified.""" + from ..sync.promote import preflight + + result = await preflight(replica_id) + return { + "can_promote": result.can_promote, + "error": result.error, + "detail": result.detail, + "pending_entries": result.pending_entries, + "error_entries": result.error_entries, + "ok_entries": result.ok_entries, + "missing_doc_count": result.missing_doc_count, + "replica_doc_count": result.replica_doc_count, + "master_doc_count": result.master_doc_count, + } + + +@router.post("/promote/{replica_id}") +async def promote_endpoint(replica_id: int, body: PromoteRequest): + """Execute promotion. Pauses scheduler, runs transaction, resumes scheduler.""" + from ..sync.promote import promote + + try: + result = await promote( + replica_id=replica_id, + old_master_name=body.old_master_name, + acknowledge_pending=body.acknowledge_pending, + acknowledge_errors=body.acknowledge_errors, + ) + except ValueError as exc: + error_code = exc.args[0] if exc.args else "UNKNOWN" + detail_msg = exc.args[1] if len(exc.args) > 1 else str(exc) + status = 409 if error_code == "SYNC_RUNNING" else 422 + raise HTTPException(status_code=status, detail={"error": error_code, "detail": detail_msg}) + + return { + "ok": result.ok, + "new_master": {"url": result.new_master_url, "name": result.new_master_name}, + "old_master_replica_id": result.old_master_replica_id, + "sync_map_rebuilt": { + "replicas_affected": result.replicas_affected, + "entries_mapped": result.entries_mapped, + "entries_skipped": result.entries_skipped, + }, + } diff --git a/app/database.py b/app/database.py index 2485ac0..263f42d 100644 --- a/app/database.py +++ b/app/database.py @@ -56,4 +56,11 @@ def create_db_and_tables() -> None: "INSERT INTO logs_fts(logs_fts, rowid, message) " "VALUES('delete', old.id, old.message); END" ) + # Idempotent column additions for schema evolution (no Alembic) + try: + conn.exec_driver_sql( + "ALTER TABLE replicas ADD COLUMN promoted_from_master INTEGER DEFAULT 0" + ) + except Exception: + pass # column already exists conn.commit() diff --git a/app/main.py b/app/main.py index ccfd9ba..7e02612 100644 --- a/app/main.py +++ b/app/main.py @@ -142,12 +142,14 @@ def create_app() -> FastAPI: from .api.logs import router as logs_router from .api.settings import router as settings_router from .api.status import router as status_router + from .api.master import router as master_router app.include_router(replicas_router) app.include_router(sync_router) app.include_router(logs_router) app.include_router(settings_router) app.include_router(status_router) + app.include_router(master_router) # ── Health & Metrics (no auth) ──────────────────────────────────────────── diff --git a/app/models.py b/app/models.py index 1d7982b..61dba09 100644 --- a/app/models.py +++ b/app/models.py @@ -18,6 +18,7 @@ class Replica(SQLModel, table=True): consecutive_failures: int = 0 suspended_at: Optional[datetime] = None last_alert_at: Optional[datetime] = None + promoted_from_master: Optional[bool] = Field(default=False) created_at: datetime = Field(default_factory=datetime.utcnow) diff --git a/app/sync/promote.py b/app/sync/promote.py new file mode 100644 index 0000000..7fd49c9 --- /dev/null +++ b/app/sync/promote.py @@ -0,0 +1,334 @@ +"""Master promotion: pre-flight checks and atomic promotion transaction.""" +import asyncio +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Optional + +from sqlmodel import Session, select + +from ..config import get_config +from ..crypto import decrypt +from ..database import get_engine +from ..models import Log, Replica, Setting, SyncMap, SyncRun + + +@dataclass +class PreflightResult: + can_promote: bool + error: Optional[str] = None + detail: Optional[str] = None + pending_entries: int = 0 + error_entries: int = 0 + ok_entries: int = 0 + missing_doc_count: int = 0 + replica_doc_count: Optional[int] = None + master_doc_count: Optional[int] = None + + +@dataclass +class PromoteResult: + ok: bool + new_master_url: Optional[str] = None + new_master_name: Optional[str] = None + old_master_replica_id: Optional[int] = None + replicas_affected: int = 0 + entries_mapped: int = 0 + entries_skipped: int = 0 + error: Optional[str] = None + + +async def preflight(replica_id: int) -> PreflightResult: + """ + Dry-run pre-flight checks. The only network calls are two doc-count fetches. + Does not modify any state. + """ + from ..api.status import _fetch_count + from ..scheduler import SETTINGS_DEFAULTS + from ..sync.engine import _sync_lock + + engine = get_engine() + config = get_config() + + if _sync_lock.locked(): + return PreflightResult( + can_promote=False, + error="SYNC_RUNNING", + detail="A sync cycle is currently running. Wait for it to complete.", + ) + + with Session(engine) as session: + rows = session.exec(select(Setting)).all() + settings = dict(SETTINGS_DEFAULTS) + for row in rows: + if row.value is not None: + settings[row.key] = row.value + + master_url = settings.get("master_url", "") + master_token_enc = settings.get("master_token", "") + if not master_url or not master_token_enc: + return PreflightResult( + can_promote=False, + error="NO_MASTER", + detail="Master URL or token is not configured.", + ) + + replica = session.get(Replica, replica_id) + if not replica: + return PreflightResult( + can_promote=False, error="NOT_FOUND", detail="Replica not found." + ) + if not replica.enabled or replica.suspended_at is not None: + return PreflightResult( + can_promote=False, + error="REPLICA_DISABLED", + detail="Replica must be enabled and not suspended before promotion.", + ) + + all_entries = session.exec( + select(SyncMap).where(SyncMap.replica_id == replica_id) + ).all() + replica_url = replica.url + replica_token_enc = replica.api_token + + ok_entries = sum(1 for e in all_entries if e.status == "ok") + pending_entries = sum(1 for e in all_entries if e.status == "pending") + error_entries = sum(1 for e in all_entries if e.status == "error") + + master_token = decrypt(master_token_enc, config.secret_key) + replica_token = decrypt(replica_token_enc, config.secret_key) + + master_count, replica_count = await asyncio.gather( + _fetch_count(master_url, master_token), + _fetch_count(replica_url, replica_token), + ) + + if master_count is not None and replica_count is not None and replica_count < master_count: + return PreflightResult( + can_promote=False, + error="MISSING_DOCS", + detail=( + f"Replica has {replica_count} documents; master has {master_count}. " + f"Replica is missing {master_count - replica_count} document(s). " + "Complete the sync before promoting." + ), + pending_entries=pending_entries, + error_entries=error_entries, + ok_entries=ok_entries, + missing_doc_count=master_count - replica_count, + replica_doc_count=replica_count, + master_doc_count=master_count, + ) + + return PreflightResult( + can_promote=True, + pending_entries=pending_entries, + error_entries=error_entries, + ok_entries=ok_entries, + replica_doc_count=replica_count, + master_doc_count=master_count, + ) + + +async def promote( + replica_id: int, + old_master_name: str, + acknowledge_pending: bool = False, + acknowledge_errors: bool = False, +) -> PromoteResult: + """ + Execute promotion. Pauses the scheduler, runs a single DB transaction, + resumes the scheduler in a finally block. + Raises ValueError(error_code, detail) on pre-flight violations. + """ + from .. import envfile + from .. import scheduler as _sched_module + from ..sync.engine import _sync_lock + + engine = get_engine() + config = get_config() + now = datetime.now(timezone.utc) + + # Re-run preflight as atomic gate + pf = await preflight(replica_id) + if not pf.can_promote: + raise ValueError(pf.error or "PREFLIGHT_FAILED", pf.detail or "") + + if pf.pending_entries > 0 and not acknowledge_pending: + raise ValueError( + "ACK_REQUIRED", + f"Replica has {pf.pending_entries} pending sync_map entries. " + "Set acknowledge_pending=true to proceed.", + ) + if pf.error_entries > 0 and not acknowledge_errors: + raise ValueError( + "ACK_REQUIRED", + f"Replica has {pf.error_entries} error sync_map entries. " + "Set acknowledge_errors=true to proceed.", + ) + + # Pause scheduler before touching the DB + if _sched_module._scheduler is not None: + _sched_module._scheduler.pause() + + try: + with Session(engine) as session: + # ── Read old master credentials ─────────────────────────────────── + def _get_setting(key: str) -> str: + s = session.get(Setting, key) + return (s.value or "") if s else "" + + old_master_url = _get_setting("master_url") + old_master_token_enc = _get_setting("master_token") + old_master_token_plain = decrypt(old_master_token_enc, config.secret_key) + + # ── Load Replica A; capture attrs before deletion ──────────────── + replica_a = session.get(Replica, replica_id) + promoted_name = replica_a.name # type: ignore[union-attr] + promoted_url = replica_a.url # type: ignore[union-attr] + promoted_token_enc = replica_a.api_token # type: ignore[union-attr] + promoted_token_plain = decrypt(promoted_token_enc, config.secret_key) + + # ── Build A's ok-entry map: old_master_id → A_doc_id ───────────── + a_entries = session.exec( + select(SyncMap).where( + SyncMap.replica_id == replica_id, + SyncMap.status == "ok", + ) + ).all() + # old_master_doc_id → A's replica_doc_id (= new master doc id) + a_doc_id_map: dict[int, int] = { + e.master_doc_id: e.replica_doc_id + for e in a_entries + if e.replica_doc_id is not None + } + + # ── Rebuild sync_maps for all other replicas ────────────────────── + other_replicas = session.exec( + select(Replica).where(Replica.id != replica_id) + ).all() + + total_mapped = 0 + total_skipped = 0 + + for rep_b in other_replicas: + b_entries = session.exec( + select(SyncMap).where( + SyncMap.replica_id == rep_b.id, + SyncMap.status == "ok", + ) + ).all() + b_doc_id_map: dict[int, int] = { + e.master_doc_id: e.replica_doc_id + for e in b_entries + if e.replica_doc_id is not None + } + b_checksum_map: dict[int, Optional[str]] = { + e.master_doc_id: e.file_checksum for e in b_entries + } + + # Delete all existing rows for B (ok + non-ok) + for entry in session.exec( + select(SyncMap).where(SyncMap.replica_id == rep_b.id) + ).all(): + session.delete(entry) + + # Insert rebuilt rows + for old_master_id, a_doc_id in a_doc_id_map.items(): + if old_master_id in b_doc_id_map: + session.add(SyncMap( + replica_id=rep_b.id, + master_doc_id=a_doc_id, + replica_doc_id=b_doc_id_map[old_master_id], + status="ok", + last_synced=now, + file_checksum=b_checksum_map.get(old_master_id), + )) + total_mapped += 1 + else: + total_skipped += 1 + + # Force full validation pass on next sync + rep_b.last_sync_ts = None + session.add(rep_b) + + # ── Create replica row for old master ───────────────────────────── + new_old_master_replica = Replica( + name=old_master_name, + url=old_master_url, + api_token=old_master_token_enc, + enabled=True, + promoted_from_master=True, + last_sync_ts=None, + ) + session.add(new_old_master_replica) + session.flush() # populate .id before inserting SyncMap rows + + for old_master_id, a_doc_id in a_doc_id_map.items(): + session.add(SyncMap( + replica_id=new_old_master_replica.id, + master_doc_id=a_doc_id, + replica_doc_id=old_master_id, + status="ok", + last_synced=now, + )) + + # ── Null out FK references so replica_a can be deleted ─────────── + # sync_runs and logs keep history; just unlink the replica FK + for run in session.exec( + select(SyncRun).where(SyncRun.replica_id == replica_id) + ).all(): + run.replica_id = None + session.add(run) + for log in session.exec( + select(Log).where(Log.replica_id == replica_id) + ).all(): + log.replica_id = None + session.add(log) + + # ── Delete sync_map + replica A ─────────────────────────────────── + for entry in session.exec( + select(SyncMap).where(SyncMap.replica_id == replica_id) + ).all(): + session.delete(entry) + session.delete(replica_a) + + # ── Update settings ─────────────────────────────────────────────── + def _upsert(key: str, value: str) -> None: + s = session.get(Setting, key) + if s is None: + s = Setting(key=key, value=value) + else: + s.value = value + session.add(s) + + _upsert("master_url", promoted_url) + _upsert("master_token", promoted_token_enc) + + session.commit() + old_master_replica_id = new_old_master_replica.id + + # ── Update envfile (outside DB transaction — file I/O) ──────────────── + envfile.write({ + "MASTER_URL": promoted_url, + "MASTER_TOKEN": promoted_token_plain, + }) + url_key, token_key = envfile.replica_keys(old_master_name) + envfile.write({url_key: old_master_url, token_key: old_master_token_plain}) + + return PromoteResult( + ok=True, + new_master_url=promoted_url, + new_master_name=promoted_name, + old_master_replica_id=old_master_replica_id, + replicas_affected=len(other_replicas), + entries_mapped=total_mapped, + entries_skipped=total_skipped, + ) + + except ValueError: + raise + except Exception as exc: + raise exc + finally: + if _sched_module._scheduler is not None: + _sched_module._scheduler.resume() diff --git a/app/templates/dashboard.html b/app/templates/dashboard.html index 459b66f..e120153 100644 --- a/app/templates/dashboard.html +++ b/app/templates/dashboard.html @@ -15,6 +15,12 @@ +{% if flash %} +
+ {{ flash }} +
+{% endif %} +

{{ replica.name }}

-
+
{% endif %} + {% if replica.enabled and not replica.suspended_at %} + + {% endif %}
@@ -149,4 +154,153 @@
+ + +
+
+ +

Promote {{ replica.name }} to Master

+

+ {{ replica.url }} +

+
+ +
+ +

Checking pre-flight conditions…

+
+ +
+ + +
+
+
+ + + {% endblock %} diff --git a/app/ui/routes.py b/app/ui/routes.py index 47df97b..bdbe057 100644 --- a/app/ui/routes.py +++ b/app/ui/routes.py @@ -86,7 +86,7 @@ async def doc_counts_fragment(request: Request, session: Session = Depends(get_s @router.get("/", response_class=HTMLResponse) -def dashboard(request: Request, session: Session = Depends(get_session)): +def dashboard(request: Request, session: Session = Depends(get_session), flash: Optional[str] = None): replicas = session.exec(select(Replica)).all() now = datetime.now(timezone.utc) progress = get_progress() @@ -133,6 +133,7 @@ def dashboard(request: Request, session: Session = Depends(get_session)): "replica_rows": replica_rows, "last_run": last_run, "progress": progress, + "flash": flash, }, )