Files
pngx-sync/app/sync/promote.py
domverse cdc9407ff3
All checks were successful
Deploy / deploy (push) Successful in 33s
Implement master promotion feature
Allows promoting any replica to master with zero document re-downloads.
The sync_map rebuild uses existing DB data only — pure in-memory join.

Changes:
- app/sync/promote.py: preflight() checks (doc count, sync lock, ack
  warnings) and promote() transaction (pause scheduler, rebuild all
  sync_maps, create old-master replica, swap settings, resume scheduler)
- app/api/master.py: GET /api/master/promote/{id}/preflight (dry run)
  and POST /api/master/promote/{id} (execute)
- app/models.py: add promoted_from_master bool field to Replica
- app/database.py: idempotent ALTER TABLE migration for new column
- app/main.py: register master router
- app/templates/replica_detail.html: "Promote to Master" button +
  dialog with pre-flight summary, 3-card stats, ack checkboxes, spinner
- app/ui/routes.py: flash query param on dashboard route
- app/templates/dashboard.html: blue info banner for post-promotion flash

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-25 21:17:01 +01:00

335 lines
12 KiB
Python

"""Master promotion: pre-flight checks and atomic promotion transaction."""
import asyncio
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional
from sqlmodel import Session, select
from ..config import get_config
from ..crypto import decrypt
from ..database import get_engine
from ..models import Log, Replica, Setting, SyncMap, SyncRun
@dataclass
class PreflightResult:
can_promote: bool
error: Optional[str] = None
detail: Optional[str] = None
pending_entries: int = 0
error_entries: int = 0
ok_entries: int = 0
missing_doc_count: int = 0
replica_doc_count: Optional[int] = None
master_doc_count: Optional[int] = None
@dataclass
class PromoteResult:
ok: bool
new_master_url: Optional[str] = None
new_master_name: Optional[str] = None
old_master_replica_id: Optional[int] = None
replicas_affected: int = 0
entries_mapped: int = 0
entries_skipped: int = 0
error: Optional[str] = None
async def preflight(replica_id: int) -> PreflightResult:
"""
Dry-run pre-flight checks. The only network calls are two doc-count fetches.
Does not modify any state.
"""
from ..api.status import _fetch_count
from ..scheduler import SETTINGS_DEFAULTS
from ..sync.engine import _sync_lock
engine = get_engine()
config = get_config()
if _sync_lock.locked():
return PreflightResult(
can_promote=False,
error="SYNC_RUNNING",
detail="A sync cycle is currently running. Wait for it to complete.",
)
with Session(engine) as session:
rows = session.exec(select(Setting)).all()
settings = dict(SETTINGS_DEFAULTS)
for row in rows:
if row.value is not None:
settings[row.key] = row.value
master_url = settings.get("master_url", "")
master_token_enc = settings.get("master_token", "")
if not master_url or not master_token_enc:
return PreflightResult(
can_promote=False,
error="NO_MASTER",
detail="Master URL or token is not configured.",
)
replica = session.get(Replica, replica_id)
if not replica:
return PreflightResult(
can_promote=False, error="NOT_FOUND", detail="Replica not found."
)
if not replica.enabled or replica.suspended_at is not None:
return PreflightResult(
can_promote=False,
error="REPLICA_DISABLED",
detail="Replica must be enabled and not suspended before promotion.",
)
all_entries = session.exec(
select(SyncMap).where(SyncMap.replica_id == replica_id)
).all()
replica_url = replica.url
replica_token_enc = replica.api_token
ok_entries = sum(1 for e in all_entries if e.status == "ok")
pending_entries = sum(1 for e in all_entries if e.status == "pending")
error_entries = sum(1 for e in all_entries if e.status == "error")
master_token = decrypt(master_token_enc, config.secret_key)
replica_token = decrypt(replica_token_enc, config.secret_key)
master_count, replica_count = await asyncio.gather(
_fetch_count(master_url, master_token),
_fetch_count(replica_url, replica_token),
)
if master_count is not None and replica_count is not None and replica_count < master_count:
return PreflightResult(
can_promote=False,
error="MISSING_DOCS",
detail=(
f"Replica has {replica_count} documents; master has {master_count}. "
f"Replica is missing {master_count - replica_count} document(s). "
"Complete the sync before promoting."
),
pending_entries=pending_entries,
error_entries=error_entries,
ok_entries=ok_entries,
missing_doc_count=master_count - replica_count,
replica_doc_count=replica_count,
master_doc_count=master_count,
)
return PreflightResult(
can_promote=True,
pending_entries=pending_entries,
error_entries=error_entries,
ok_entries=ok_entries,
replica_doc_count=replica_count,
master_doc_count=master_count,
)
async def promote(
replica_id: int,
old_master_name: str,
acknowledge_pending: bool = False,
acknowledge_errors: bool = False,
) -> PromoteResult:
"""
Execute promotion. Pauses the scheduler, runs a single DB transaction,
resumes the scheduler in a finally block.
Raises ValueError(error_code, detail) on pre-flight violations.
"""
from .. import envfile
from .. import scheduler as _sched_module
from ..sync.engine import _sync_lock
engine = get_engine()
config = get_config()
now = datetime.now(timezone.utc)
# Re-run preflight as atomic gate
pf = await preflight(replica_id)
if not pf.can_promote:
raise ValueError(pf.error or "PREFLIGHT_FAILED", pf.detail or "")
if pf.pending_entries > 0 and not acknowledge_pending:
raise ValueError(
"ACK_REQUIRED",
f"Replica has {pf.pending_entries} pending sync_map entries. "
"Set acknowledge_pending=true to proceed.",
)
if pf.error_entries > 0 and not acknowledge_errors:
raise ValueError(
"ACK_REQUIRED",
f"Replica has {pf.error_entries} error sync_map entries. "
"Set acknowledge_errors=true to proceed.",
)
# Pause scheduler before touching the DB
if _sched_module._scheduler is not None:
_sched_module._scheduler.pause()
try:
with Session(engine) as session:
# ── Read old master credentials ───────────────────────────────────
def _get_setting(key: str) -> str:
s = session.get(Setting, key)
return (s.value or "") if s else ""
old_master_url = _get_setting("master_url")
old_master_token_enc = _get_setting("master_token")
old_master_token_plain = decrypt(old_master_token_enc, config.secret_key)
# ── Load Replica A; capture attrs before deletion ────────────────
replica_a = session.get(Replica, replica_id)
promoted_name = replica_a.name # type: ignore[union-attr]
promoted_url = replica_a.url # type: ignore[union-attr]
promoted_token_enc = replica_a.api_token # type: ignore[union-attr]
promoted_token_plain = decrypt(promoted_token_enc, config.secret_key)
# ── Build A's ok-entry map: old_master_id → A_doc_id ─────────────
a_entries = session.exec(
select(SyncMap).where(
SyncMap.replica_id == replica_id,
SyncMap.status == "ok",
)
).all()
# old_master_doc_id → A's replica_doc_id (= new master doc id)
a_doc_id_map: dict[int, int] = {
e.master_doc_id: e.replica_doc_id
for e in a_entries
if e.replica_doc_id is not None
}
# ── Rebuild sync_maps for all other replicas ──────────────────────
other_replicas = session.exec(
select(Replica).where(Replica.id != replica_id)
).all()
total_mapped = 0
total_skipped = 0
for rep_b in other_replicas:
b_entries = session.exec(
select(SyncMap).where(
SyncMap.replica_id == rep_b.id,
SyncMap.status == "ok",
)
).all()
b_doc_id_map: dict[int, int] = {
e.master_doc_id: e.replica_doc_id
for e in b_entries
if e.replica_doc_id is not None
}
b_checksum_map: dict[int, Optional[str]] = {
e.master_doc_id: e.file_checksum for e in b_entries
}
# Delete all existing rows for B (ok + non-ok)
for entry in session.exec(
select(SyncMap).where(SyncMap.replica_id == rep_b.id)
).all():
session.delete(entry)
# Insert rebuilt rows
for old_master_id, a_doc_id in a_doc_id_map.items():
if old_master_id in b_doc_id_map:
session.add(SyncMap(
replica_id=rep_b.id,
master_doc_id=a_doc_id,
replica_doc_id=b_doc_id_map[old_master_id],
status="ok",
last_synced=now,
file_checksum=b_checksum_map.get(old_master_id),
))
total_mapped += 1
else:
total_skipped += 1
# Force full validation pass on next sync
rep_b.last_sync_ts = None
session.add(rep_b)
# ── Create replica row for old master ─────────────────────────────
new_old_master_replica = Replica(
name=old_master_name,
url=old_master_url,
api_token=old_master_token_enc,
enabled=True,
promoted_from_master=True,
last_sync_ts=None,
)
session.add(new_old_master_replica)
session.flush() # populate .id before inserting SyncMap rows
for old_master_id, a_doc_id in a_doc_id_map.items():
session.add(SyncMap(
replica_id=new_old_master_replica.id,
master_doc_id=a_doc_id,
replica_doc_id=old_master_id,
status="ok",
last_synced=now,
))
# ── Null out FK references so replica_a can be deleted ───────────
# sync_runs and logs keep history; just unlink the replica FK
for run in session.exec(
select(SyncRun).where(SyncRun.replica_id == replica_id)
).all():
run.replica_id = None
session.add(run)
for log in session.exec(
select(Log).where(Log.replica_id == replica_id)
).all():
log.replica_id = None
session.add(log)
# ── Delete sync_map + replica A ───────────────────────────────────
for entry in session.exec(
select(SyncMap).where(SyncMap.replica_id == replica_id)
).all():
session.delete(entry)
session.delete(replica_a)
# ── Update settings ───────────────────────────────────────────────
def _upsert(key: str, value: str) -> None:
s = session.get(Setting, key)
if s is None:
s = Setting(key=key, value=value)
else:
s.value = value
session.add(s)
_upsert("master_url", promoted_url)
_upsert("master_token", promoted_token_enc)
session.commit()
old_master_replica_id = new_old_master_replica.id
# ── Update envfile (outside DB transaction — file I/O) ────────────────
envfile.write({
"MASTER_URL": promoted_url,
"MASTER_TOKEN": promoted_token_plain,
})
url_key, token_key = envfile.replica_keys(old_master_name)
envfile.write({url_key: old_master_url, token_key: old_master_token_plain})
return PromoteResult(
ok=True,
new_master_url=promoted_url,
new_master_name=promoted_name,
old_master_replica_id=old_master_replica_id,
replicas_affected=len(other_replicas),
entries_mapped=total_mapped,
entries_skipped=total_skipped,
)
except ValueError:
raise
except Exception as exc:
raise exc
finally:
if _sched_module._scheduler is not None:
_sched_module._scheduler.resume()