"""Master promotion: pre-flight checks and atomic promotion transaction.""" import asyncio from dataclasses import dataclass from datetime import datetime, timezone from typing import Optional from sqlmodel import Session, select from ..config import get_config from ..crypto import decrypt from ..database import get_engine from ..models import Log, Replica, Setting, SyncMap, SyncRun @dataclass class PreflightResult: can_promote: bool error: Optional[str] = None detail: Optional[str] = None pending_entries: int = 0 error_entries: int = 0 ok_entries: int = 0 missing_doc_count: int = 0 replica_doc_count: Optional[int] = None master_doc_count: Optional[int] = None @dataclass class PromoteResult: ok: bool new_master_url: Optional[str] = None new_master_name: Optional[str] = None old_master_replica_id: Optional[int] = None replicas_affected: int = 0 entries_mapped: int = 0 entries_skipped: int = 0 error: Optional[str] = None async def preflight(replica_id: int) -> PreflightResult: """ Dry-run pre-flight checks. The only network calls are two doc-count fetches. Does not modify any state. """ from ..api.status import _fetch_count from ..scheduler import SETTINGS_DEFAULTS from ..sync.engine import _sync_lock engine = get_engine() config = get_config() if _sync_lock.locked(): return PreflightResult( can_promote=False, error="SYNC_RUNNING", detail="A sync cycle is currently running. Wait for it to complete.", ) with Session(engine) as session: rows = session.exec(select(Setting)).all() settings = dict(SETTINGS_DEFAULTS) for row in rows: if row.value is not None: settings[row.key] = row.value master_url = settings.get("master_url", "") master_token_enc = settings.get("master_token", "") if not master_url or not master_token_enc: return PreflightResult( can_promote=False, error="NO_MASTER", detail="Master URL or token is not configured.", ) replica = session.get(Replica, replica_id) if not replica: return PreflightResult( can_promote=False, error="NOT_FOUND", detail="Replica not found." ) if not replica.enabled or replica.suspended_at is not None: return PreflightResult( can_promote=False, error="REPLICA_DISABLED", detail="Replica must be enabled and not suspended before promotion.", ) all_entries = session.exec( select(SyncMap).where(SyncMap.replica_id == replica_id) ).all() replica_url = replica.url replica_token_enc = replica.api_token ok_entries = sum(1 for e in all_entries if e.status == "ok") pending_entries = sum(1 for e in all_entries if e.status == "pending") error_entries = sum(1 for e in all_entries if e.status == "error") master_token = decrypt(master_token_enc, config.secret_key) replica_token = decrypt(replica_token_enc, config.secret_key) master_count, replica_count = await asyncio.gather( _fetch_count(master_url, master_token), _fetch_count(replica_url, replica_token), ) if master_count is not None and replica_count is not None and replica_count < master_count: return PreflightResult( can_promote=False, error="MISSING_DOCS", detail=( f"Replica has {replica_count} documents; master has {master_count}. " f"Replica is missing {master_count - replica_count} document(s). " "Complete the sync before promoting." ), pending_entries=pending_entries, error_entries=error_entries, ok_entries=ok_entries, missing_doc_count=master_count - replica_count, replica_doc_count=replica_count, master_doc_count=master_count, ) return PreflightResult( can_promote=True, pending_entries=pending_entries, error_entries=error_entries, ok_entries=ok_entries, replica_doc_count=replica_count, master_doc_count=master_count, ) async def promote( replica_id: int, old_master_name: str, acknowledge_pending: bool = False, acknowledge_errors: bool = False, ) -> PromoteResult: """ Execute promotion. Pauses the scheduler, runs a single DB transaction, resumes the scheduler in a finally block. Raises ValueError(error_code, detail) on pre-flight violations. """ from .. import envfile from .. import scheduler as _sched_module from ..sync.engine import _sync_lock engine = get_engine() config = get_config() now = datetime.now(timezone.utc) # Re-run preflight as atomic gate pf = await preflight(replica_id) if not pf.can_promote: raise ValueError(pf.error or "PREFLIGHT_FAILED", pf.detail or "") if pf.pending_entries > 0 and not acknowledge_pending: raise ValueError( "ACK_REQUIRED", f"Replica has {pf.pending_entries} pending sync_map entries. " "Set acknowledge_pending=true to proceed.", ) if pf.error_entries > 0 and not acknowledge_errors: raise ValueError( "ACK_REQUIRED", f"Replica has {pf.error_entries} error sync_map entries. " "Set acknowledge_errors=true to proceed.", ) # Pause scheduler before touching the DB if _sched_module._scheduler is not None: _sched_module._scheduler.pause() try: with Session(engine) as session: # ── Read old master credentials ─────────────────────────────────── def _get_setting(key: str) -> str: s = session.get(Setting, key) return (s.value or "") if s else "" old_master_url = _get_setting("master_url") old_master_token_enc = _get_setting("master_token") old_master_token_plain = decrypt(old_master_token_enc, config.secret_key) # ── Load Replica A; capture attrs before deletion ──────────────── replica_a = session.get(Replica, replica_id) promoted_name = replica_a.name # type: ignore[union-attr] promoted_url = replica_a.url # type: ignore[union-attr] promoted_token_enc = replica_a.api_token # type: ignore[union-attr] promoted_token_plain = decrypt(promoted_token_enc, config.secret_key) # ── Build A's ok-entry map: old_master_id → A_doc_id ───────────── a_entries = session.exec( select(SyncMap).where( SyncMap.replica_id == replica_id, SyncMap.status == "ok", ) ).all() # old_master_doc_id → A's replica_doc_id (= new master doc id) a_doc_id_map: dict[int, int] = { e.master_doc_id: e.replica_doc_id for e in a_entries if e.replica_doc_id is not None } # ── Rebuild sync_maps for all other replicas ────────────────────── other_replicas = session.exec( select(Replica).where(Replica.id != replica_id) ).all() total_mapped = 0 total_skipped = 0 for rep_b in other_replicas: b_entries = session.exec( select(SyncMap).where( SyncMap.replica_id == rep_b.id, SyncMap.status == "ok", ) ).all() b_doc_id_map: dict[int, int] = { e.master_doc_id: e.replica_doc_id for e in b_entries if e.replica_doc_id is not None } b_checksum_map: dict[int, Optional[str]] = { e.master_doc_id: e.file_checksum for e in b_entries } # Delete all existing rows for B (ok + non-ok) for entry in session.exec( select(SyncMap).where(SyncMap.replica_id == rep_b.id) ).all(): session.delete(entry) # Insert rebuilt rows for old_master_id, a_doc_id in a_doc_id_map.items(): if old_master_id in b_doc_id_map: session.add(SyncMap( replica_id=rep_b.id, master_doc_id=a_doc_id, replica_doc_id=b_doc_id_map[old_master_id], status="ok", last_synced=now, file_checksum=b_checksum_map.get(old_master_id), )) total_mapped += 1 else: total_skipped += 1 # Force full validation pass on next sync rep_b.last_sync_ts = None session.add(rep_b) # ── Create replica row for old master ───────────────────────────── new_old_master_replica = Replica( name=old_master_name, url=old_master_url, api_token=old_master_token_enc, enabled=True, promoted_from_master=True, last_sync_ts=None, ) session.add(new_old_master_replica) session.flush() # populate .id before inserting SyncMap rows for old_master_id, a_doc_id in a_doc_id_map.items(): session.add(SyncMap( replica_id=new_old_master_replica.id, master_doc_id=a_doc_id, replica_doc_id=old_master_id, status="ok", last_synced=now, )) # ── Null out FK references so replica_a can be deleted ─────────── # sync_runs and logs keep history; just unlink the replica FK for run in session.exec( select(SyncRun).where(SyncRun.replica_id == replica_id) ).all(): run.replica_id = None session.add(run) for log in session.exec( select(Log).where(Log.replica_id == replica_id) ).all(): log.replica_id = None session.add(log) # ── Delete sync_map + replica A ─────────────────────────────────── for entry in session.exec( select(SyncMap).where(SyncMap.replica_id == replica_id) ).all(): session.delete(entry) session.delete(replica_a) # ── Update settings ─────────────────────────────────────────────── def _upsert(key: str, value: str) -> None: s = session.get(Setting, key) if s is None: s = Setting(key=key, value=value) else: s.value = value session.add(s) _upsert("master_url", promoted_url) _upsert("master_token", promoted_token_enc) session.commit() old_master_replica_id = new_old_master_replica.id # ── Update envfile (outside DB transaction — file I/O) ──────────────── envfile.write({ "MASTER_URL": promoted_url, "MASTER_TOKEN": promoted_token_plain, }) url_key, token_key = envfile.replica_keys(old_master_name) envfile.write({url_key: old_master_url, token_key: old_master_token_plain}) return PromoteResult( ok=True, new_master_url=promoted_url, new_master_name=promoted_name, old_master_replica_id=old_master_replica_id, replicas_affected=len(other_replicas), entries_mapped=total_mapped, entries_skipped=total_skipped, ) except ValueError: raise except Exception as exc: raise exc finally: if _sched_module._scheduler is not None: _sched_module._scheduler.resume()