Implement master promotion feature
All checks were successful
Deploy / deploy (push) Successful in 33s
All checks were successful
Deploy / deploy (push) Successful in 33s
Allows promoting any replica to master with zero document re-downloads.
The sync_map rebuild uses existing DB data only — pure in-memory join.
Changes:
- app/sync/promote.py: preflight() checks (doc count, sync lock, ack
warnings) and promote() transaction (pause scheduler, rebuild all
sync_maps, create old-master replica, swap settings, resume scheduler)
- app/api/master.py: GET /api/master/promote/{id}/preflight (dry run)
and POST /api/master/promote/{id} (execute)
- app/models.py: add promoted_from_master bool field to Replica
- app/database.py: idempotent ALTER TABLE migration for new column
- app/main.py: register master router
- app/templates/replica_detail.html: "Promote to Master" button +
dialog with pre-flight summary, 3-card stats, ack checkboxes, spinner
- app/ui/routes.py: flash query param on dashboard route
- app/templates/dashboard.html: blue info banner for post-promotion flash
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
334
app/sync/promote.py
Normal file
334
app/sync/promote.py
Normal file
@@ -0,0 +1,334 @@
|
||||
"""Master promotion: pre-flight checks and atomic promotion transaction."""
|
||||
import asyncio
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional
|
||||
|
||||
from sqlmodel import Session, select
|
||||
|
||||
from ..config import get_config
|
||||
from ..crypto import decrypt
|
||||
from ..database import get_engine
|
||||
from ..models import Log, Replica, Setting, SyncMap, SyncRun
|
||||
|
||||
|
||||
@dataclass
|
||||
class PreflightResult:
|
||||
can_promote: bool
|
||||
error: Optional[str] = None
|
||||
detail: Optional[str] = None
|
||||
pending_entries: int = 0
|
||||
error_entries: int = 0
|
||||
ok_entries: int = 0
|
||||
missing_doc_count: int = 0
|
||||
replica_doc_count: Optional[int] = None
|
||||
master_doc_count: Optional[int] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class PromoteResult:
|
||||
ok: bool
|
||||
new_master_url: Optional[str] = None
|
||||
new_master_name: Optional[str] = None
|
||||
old_master_replica_id: Optional[int] = None
|
||||
replicas_affected: int = 0
|
||||
entries_mapped: int = 0
|
||||
entries_skipped: int = 0
|
||||
error: Optional[str] = None
|
||||
|
||||
|
||||
async def preflight(replica_id: int) -> PreflightResult:
|
||||
"""
|
||||
Dry-run pre-flight checks. The only network calls are two doc-count fetches.
|
||||
Does not modify any state.
|
||||
"""
|
||||
from ..api.status import _fetch_count
|
||||
from ..scheduler import SETTINGS_DEFAULTS
|
||||
from ..sync.engine import _sync_lock
|
||||
|
||||
engine = get_engine()
|
||||
config = get_config()
|
||||
|
||||
if _sync_lock.locked():
|
||||
return PreflightResult(
|
||||
can_promote=False,
|
||||
error="SYNC_RUNNING",
|
||||
detail="A sync cycle is currently running. Wait for it to complete.",
|
||||
)
|
||||
|
||||
with Session(engine) as session:
|
||||
rows = session.exec(select(Setting)).all()
|
||||
settings = dict(SETTINGS_DEFAULTS)
|
||||
for row in rows:
|
||||
if row.value is not None:
|
||||
settings[row.key] = row.value
|
||||
|
||||
master_url = settings.get("master_url", "")
|
||||
master_token_enc = settings.get("master_token", "")
|
||||
if not master_url or not master_token_enc:
|
||||
return PreflightResult(
|
||||
can_promote=False,
|
||||
error="NO_MASTER",
|
||||
detail="Master URL or token is not configured.",
|
||||
)
|
||||
|
||||
replica = session.get(Replica, replica_id)
|
||||
if not replica:
|
||||
return PreflightResult(
|
||||
can_promote=False, error="NOT_FOUND", detail="Replica not found."
|
||||
)
|
||||
if not replica.enabled or replica.suspended_at is not None:
|
||||
return PreflightResult(
|
||||
can_promote=False,
|
||||
error="REPLICA_DISABLED",
|
||||
detail="Replica must be enabled and not suspended before promotion.",
|
||||
)
|
||||
|
||||
all_entries = session.exec(
|
||||
select(SyncMap).where(SyncMap.replica_id == replica_id)
|
||||
).all()
|
||||
replica_url = replica.url
|
||||
replica_token_enc = replica.api_token
|
||||
|
||||
ok_entries = sum(1 for e in all_entries if e.status == "ok")
|
||||
pending_entries = sum(1 for e in all_entries if e.status == "pending")
|
||||
error_entries = sum(1 for e in all_entries if e.status == "error")
|
||||
|
||||
master_token = decrypt(master_token_enc, config.secret_key)
|
||||
replica_token = decrypt(replica_token_enc, config.secret_key)
|
||||
|
||||
master_count, replica_count = await asyncio.gather(
|
||||
_fetch_count(master_url, master_token),
|
||||
_fetch_count(replica_url, replica_token),
|
||||
)
|
||||
|
||||
if master_count is not None and replica_count is not None and replica_count < master_count:
|
||||
return PreflightResult(
|
||||
can_promote=False,
|
||||
error="MISSING_DOCS",
|
||||
detail=(
|
||||
f"Replica has {replica_count} documents; master has {master_count}. "
|
||||
f"Replica is missing {master_count - replica_count} document(s). "
|
||||
"Complete the sync before promoting."
|
||||
),
|
||||
pending_entries=pending_entries,
|
||||
error_entries=error_entries,
|
||||
ok_entries=ok_entries,
|
||||
missing_doc_count=master_count - replica_count,
|
||||
replica_doc_count=replica_count,
|
||||
master_doc_count=master_count,
|
||||
)
|
||||
|
||||
return PreflightResult(
|
||||
can_promote=True,
|
||||
pending_entries=pending_entries,
|
||||
error_entries=error_entries,
|
||||
ok_entries=ok_entries,
|
||||
replica_doc_count=replica_count,
|
||||
master_doc_count=master_count,
|
||||
)
|
||||
|
||||
|
||||
async def promote(
|
||||
replica_id: int,
|
||||
old_master_name: str,
|
||||
acknowledge_pending: bool = False,
|
||||
acknowledge_errors: bool = False,
|
||||
) -> PromoteResult:
|
||||
"""
|
||||
Execute promotion. Pauses the scheduler, runs a single DB transaction,
|
||||
resumes the scheduler in a finally block.
|
||||
Raises ValueError(error_code, detail) on pre-flight violations.
|
||||
"""
|
||||
from .. import envfile
|
||||
from .. import scheduler as _sched_module
|
||||
from ..sync.engine import _sync_lock
|
||||
|
||||
engine = get_engine()
|
||||
config = get_config()
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
# Re-run preflight as atomic gate
|
||||
pf = await preflight(replica_id)
|
||||
if not pf.can_promote:
|
||||
raise ValueError(pf.error or "PREFLIGHT_FAILED", pf.detail or "")
|
||||
|
||||
if pf.pending_entries > 0 and not acknowledge_pending:
|
||||
raise ValueError(
|
||||
"ACK_REQUIRED",
|
||||
f"Replica has {pf.pending_entries} pending sync_map entries. "
|
||||
"Set acknowledge_pending=true to proceed.",
|
||||
)
|
||||
if pf.error_entries > 0 and not acknowledge_errors:
|
||||
raise ValueError(
|
||||
"ACK_REQUIRED",
|
||||
f"Replica has {pf.error_entries} error sync_map entries. "
|
||||
"Set acknowledge_errors=true to proceed.",
|
||||
)
|
||||
|
||||
# Pause scheduler before touching the DB
|
||||
if _sched_module._scheduler is not None:
|
||||
_sched_module._scheduler.pause()
|
||||
|
||||
try:
|
||||
with Session(engine) as session:
|
||||
# ── Read old master credentials ───────────────────────────────────
|
||||
def _get_setting(key: str) -> str:
|
||||
s = session.get(Setting, key)
|
||||
return (s.value or "") if s else ""
|
||||
|
||||
old_master_url = _get_setting("master_url")
|
||||
old_master_token_enc = _get_setting("master_token")
|
||||
old_master_token_plain = decrypt(old_master_token_enc, config.secret_key)
|
||||
|
||||
# ── Load Replica A; capture attrs before deletion ────────────────
|
||||
replica_a = session.get(Replica, replica_id)
|
||||
promoted_name = replica_a.name # type: ignore[union-attr]
|
||||
promoted_url = replica_a.url # type: ignore[union-attr]
|
||||
promoted_token_enc = replica_a.api_token # type: ignore[union-attr]
|
||||
promoted_token_plain = decrypt(promoted_token_enc, config.secret_key)
|
||||
|
||||
# ── Build A's ok-entry map: old_master_id → A_doc_id ─────────────
|
||||
a_entries = session.exec(
|
||||
select(SyncMap).where(
|
||||
SyncMap.replica_id == replica_id,
|
||||
SyncMap.status == "ok",
|
||||
)
|
||||
).all()
|
||||
# old_master_doc_id → A's replica_doc_id (= new master doc id)
|
||||
a_doc_id_map: dict[int, int] = {
|
||||
e.master_doc_id: e.replica_doc_id
|
||||
for e in a_entries
|
||||
if e.replica_doc_id is not None
|
||||
}
|
||||
|
||||
# ── Rebuild sync_maps for all other replicas ──────────────────────
|
||||
other_replicas = session.exec(
|
||||
select(Replica).where(Replica.id != replica_id)
|
||||
).all()
|
||||
|
||||
total_mapped = 0
|
||||
total_skipped = 0
|
||||
|
||||
for rep_b in other_replicas:
|
||||
b_entries = session.exec(
|
||||
select(SyncMap).where(
|
||||
SyncMap.replica_id == rep_b.id,
|
||||
SyncMap.status == "ok",
|
||||
)
|
||||
).all()
|
||||
b_doc_id_map: dict[int, int] = {
|
||||
e.master_doc_id: e.replica_doc_id
|
||||
for e in b_entries
|
||||
if e.replica_doc_id is not None
|
||||
}
|
||||
b_checksum_map: dict[int, Optional[str]] = {
|
||||
e.master_doc_id: e.file_checksum for e in b_entries
|
||||
}
|
||||
|
||||
# Delete all existing rows for B (ok + non-ok)
|
||||
for entry in session.exec(
|
||||
select(SyncMap).where(SyncMap.replica_id == rep_b.id)
|
||||
).all():
|
||||
session.delete(entry)
|
||||
|
||||
# Insert rebuilt rows
|
||||
for old_master_id, a_doc_id in a_doc_id_map.items():
|
||||
if old_master_id in b_doc_id_map:
|
||||
session.add(SyncMap(
|
||||
replica_id=rep_b.id,
|
||||
master_doc_id=a_doc_id,
|
||||
replica_doc_id=b_doc_id_map[old_master_id],
|
||||
status="ok",
|
||||
last_synced=now,
|
||||
file_checksum=b_checksum_map.get(old_master_id),
|
||||
))
|
||||
total_mapped += 1
|
||||
else:
|
||||
total_skipped += 1
|
||||
|
||||
# Force full validation pass on next sync
|
||||
rep_b.last_sync_ts = None
|
||||
session.add(rep_b)
|
||||
|
||||
# ── Create replica row for old master ─────────────────────────────
|
||||
new_old_master_replica = Replica(
|
||||
name=old_master_name,
|
||||
url=old_master_url,
|
||||
api_token=old_master_token_enc,
|
||||
enabled=True,
|
||||
promoted_from_master=True,
|
||||
last_sync_ts=None,
|
||||
)
|
||||
session.add(new_old_master_replica)
|
||||
session.flush() # populate .id before inserting SyncMap rows
|
||||
|
||||
for old_master_id, a_doc_id in a_doc_id_map.items():
|
||||
session.add(SyncMap(
|
||||
replica_id=new_old_master_replica.id,
|
||||
master_doc_id=a_doc_id,
|
||||
replica_doc_id=old_master_id,
|
||||
status="ok",
|
||||
last_synced=now,
|
||||
))
|
||||
|
||||
# ── Null out FK references so replica_a can be deleted ───────────
|
||||
# sync_runs and logs keep history; just unlink the replica FK
|
||||
for run in session.exec(
|
||||
select(SyncRun).where(SyncRun.replica_id == replica_id)
|
||||
).all():
|
||||
run.replica_id = None
|
||||
session.add(run)
|
||||
for log in session.exec(
|
||||
select(Log).where(Log.replica_id == replica_id)
|
||||
).all():
|
||||
log.replica_id = None
|
||||
session.add(log)
|
||||
|
||||
# ── Delete sync_map + replica A ───────────────────────────────────
|
||||
for entry in session.exec(
|
||||
select(SyncMap).where(SyncMap.replica_id == replica_id)
|
||||
).all():
|
||||
session.delete(entry)
|
||||
session.delete(replica_a)
|
||||
|
||||
# ── Update settings ───────────────────────────────────────────────
|
||||
def _upsert(key: str, value: str) -> None:
|
||||
s = session.get(Setting, key)
|
||||
if s is None:
|
||||
s = Setting(key=key, value=value)
|
||||
else:
|
||||
s.value = value
|
||||
session.add(s)
|
||||
|
||||
_upsert("master_url", promoted_url)
|
||||
_upsert("master_token", promoted_token_enc)
|
||||
|
||||
session.commit()
|
||||
old_master_replica_id = new_old_master_replica.id
|
||||
|
||||
# ── Update envfile (outside DB transaction — file I/O) ────────────────
|
||||
envfile.write({
|
||||
"MASTER_URL": promoted_url,
|
||||
"MASTER_TOKEN": promoted_token_plain,
|
||||
})
|
||||
url_key, token_key = envfile.replica_keys(old_master_name)
|
||||
envfile.write({url_key: old_master_url, token_key: old_master_token_plain})
|
||||
|
||||
return PromoteResult(
|
||||
ok=True,
|
||||
new_master_url=promoted_url,
|
||||
new_master_name=promoted_name,
|
||||
old_master_replica_id=old_master_replica_id,
|
||||
replicas_affected=len(other_replicas),
|
||||
entries_mapped=total_mapped,
|
||||
entries_skipped=total_skipped,
|
||||
)
|
||||
|
||||
except ValueError:
|
||||
raise
|
||||
except Exception as exc:
|
||||
raise exc
|
||||
finally:
|
||||
if _sched_module._scheduler is not None:
|
||||
_sched_module._scheduler.resume()
|
||||
Reference in New Issue
Block a user