All checks were successful
Deploy / deploy (push) Successful in 33s
Allows promoting any replica to master with zero document re-downloads.
The sync_map rebuild uses existing DB data only — pure in-memory join.
Changes:
- app/sync/promote.py: preflight() checks (doc count, sync lock, ack
warnings) and promote() transaction (pause scheduler, rebuild all
sync_maps, create old-master replica, swap settings, resume scheduler)
- app/api/master.py: GET /api/master/promote/{id}/preflight (dry run)
and POST /api/master/promote/{id} (execute)
- app/models.py: add promoted_from_master bool field to Replica
- app/database.py: idempotent ALTER TABLE migration for new column
- app/main.py: register master router
- app/templates/replica_detail.html: "Promote to Master" button +
dialog with pre-flight summary, 3-card stats, ack checkboxes, spinner
- app/ui/routes.py: flash query param on dashboard route
- app/templates/dashboard.html: blue info banner for post-promotion flash
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
335 lines
12 KiB
Python
335 lines
12 KiB
Python
"""Master promotion: pre-flight checks and atomic promotion transaction."""
|
|
import asyncio
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
from sqlmodel import Session, select
|
|
|
|
from ..config import get_config
|
|
from ..crypto import decrypt
|
|
from ..database import get_engine
|
|
from ..models import Log, Replica, Setting, SyncMap, SyncRun
|
|
|
|
|
|
@dataclass
|
|
class PreflightResult:
|
|
can_promote: bool
|
|
error: Optional[str] = None
|
|
detail: Optional[str] = None
|
|
pending_entries: int = 0
|
|
error_entries: int = 0
|
|
ok_entries: int = 0
|
|
missing_doc_count: int = 0
|
|
replica_doc_count: Optional[int] = None
|
|
master_doc_count: Optional[int] = None
|
|
|
|
|
|
@dataclass
|
|
class PromoteResult:
|
|
ok: bool
|
|
new_master_url: Optional[str] = None
|
|
new_master_name: Optional[str] = None
|
|
old_master_replica_id: Optional[int] = None
|
|
replicas_affected: int = 0
|
|
entries_mapped: int = 0
|
|
entries_skipped: int = 0
|
|
error: Optional[str] = None
|
|
|
|
|
|
async def preflight(replica_id: int) -> PreflightResult:
|
|
"""
|
|
Dry-run pre-flight checks. The only network calls are two doc-count fetches.
|
|
Does not modify any state.
|
|
"""
|
|
from ..api.status import _fetch_count
|
|
from ..scheduler import SETTINGS_DEFAULTS
|
|
from ..sync.engine import _sync_lock
|
|
|
|
engine = get_engine()
|
|
config = get_config()
|
|
|
|
if _sync_lock.locked():
|
|
return PreflightResult(
|
|
can_promote=False,
|
|
error="SYNC_RUNNING",
|
|
detail="A sync cycle is currently running. Wait for it to complete.",
|
|
)
|
|
|
|
with Session(engine) as session:
|
|
rows = session.exec(select(Setting)).all()
|
|
settings = dict(SETTINGS_DEFAULTS)
|
|
for row in rows:
|
|
if row.value is not None:
|
|
settings[row.key] = row.value
|
|
|
|
master_url = settings.get("master_url", "")
|
|
master_token_enc = settings.get("master_token", "")
|
|
if not master_url or not master_token_enc:
|
|
return PreflightResult(
|
|
can_promote=False,
|
|
error="NO_MASTER",
|
|
detail="Master URL or token is not configured.",
|
|
)
|
|
|
|
replica = session.get(Replica, replica_id)
|
|
if not replica:
|
|
return PreflightResult(
|
|
can_promote=False, error="NOT_FOUND", detail="Replica not found."
|
|
)
|
|
if not replica.enabled or replica.suspended_at is not None:
|
|
return PreflightResult(
|
|
can_promote=False,
|
|
error="REPLICA_DISABLED",
|
|
detail="Replica must be enabled and not suspended before promotion.",
|
|
)
|
|
|
|
all_entries = session.exec(
|
|
select(SyncMap).where(SyncMap.replica_id == replica_id)
|
|
).all()
|
|
replica_url = replica.url
|
|
replica_token_enc = replica.api_token
|
|
|
|
ok_entries = sum(1 for e in all_entries if e.status == "ok")
|
|
pending_entries = sum(1 for e in all_entries if e.status == "pending")
|
|
error_entries = sum(1 for e in all_entries if e.status == "error")
|
|
|
|
master_token = decrypt(master_token_enc, config.secret_key)
|
|
replica_token = decrypt(replica_token_enc, config.secret_key)
|
|
|
|
master_count, replica_count = await asyncio.gather(
|
|
_fetch_count(master_url, master_token),
|
|
_fetch_count(replica_url, replica_token),
|
|
)
|
|
|
|
if master_count is not None and replica_count is not None and replica_count < master_count:
|
|
return PreflightResult(
|
|
can_promote=False,
|
|
error="MISSING_DOCS",
|
|
detail=(
|
|
f"Replica has {replica_count} documents; master has {master_count}. "
|
|
f"Replica is missing {master_count - replica_count} document(s). "
|
|
"Complete the sync before promoting."
|
|
),
|
|
pending_entries=pending_entries,
|
|
error_entries=error_entries,
|
|
ok_entries=ok_entries,
|
|
missing_doc_count=master_count - replica_count,
|
|
replica_doc_count=replica_count,
|
|
master_doc_count=master_count,
|
|
)
|
|
|
|
return PreflightResult(
|
|
can_promote=True,
|
|
pending_entries=pending_entries,
|
|
error_entries=error_entries,
|
|
ok_entries=ok_entries,
|
|
replica_doc_count=replica_count,
|
|
master_doc_count=master_count,
|
|
)
|
|
|
|
|
|
async def promote(
|
|
replica_id: int,
|
|
old_master_name: str,
|
|
acknowledge_pending: bool = False,
|
|
acknowledge_errors: bool = False,
|
|
) -> PromoteResult:
|
|
"""
|
|
Execute promotion. Pauses the scheduler, runs a single DB transaction,
|
|
resumes the scheduler in a finally block.
|
|
Raises ValueError(error_code, detail) on pre-flight violations.
|
|
"""
|
|
from .. import envfile
|
|
from .. import scheduler as _sched_module
|
|
from ..sync.engine import _sync_lock
|
|
|
|
engine = get_engine()
|
|
config = get_config()
|
|
now = datetime.now(timezone.utc)
|
|
|
|
# Re-run preflight as atomic gate
|
|
pf = await preflight(replica_id)
|
|
if not pf.can_promote:
|
|
raise ValueError(pf.error or "PREFLIGHT_FAILED", pf.detail or "")
|
|
|
|
if pf.pending_entries > 0 and not acknowledge_pending:
|
|
raise ValueError(
|
|
"ACK_REQUIRED",
|
|
f"Replica has {pf.pending_entries} pending sync_map entries. "
|
|
"Set acknowledge_pending=true to proceed.",
|
|
)
|
|
if pf.error_entries > 0 and not acknowledge_errors:
|
|
raise ValueError(
|
|
"ACK_REQUIRED",
|
|
f"Replica has {pf.error_entries} error sync_map entries. "
|
|
"Set acknowledge_errors=true to proceed.",
|
|
)
|
|
|
|
# Pause scheduler before touching the DB
|
|
if _sched_module._scheduler is not None:
|
|
_sched_module._scheduler.pause()
|
|
|
|
try:
|
|
with Session(engine) as session:
|
|
# ── Read old master credentials ───────────────────────────────────
|
|
def _get_setting(key: str) -> str:
|
|
s = session.get(Setting, key)
|
|
return (s.value or "") if s else ""
|
|
|
|
old_master_url = _get_setting("master_url")
|
|
old_master_token_enc = _get_setting("master_token")
|
|
old_master_token_plain = decrypt(old_master_token_enc, config.secret_key)
|
|
|
|
# ── Load Replica A; capture attrs before deletion ────────────────
|
|
replica_a = session.get(Replica, replica_id)
|
|
promoted_name = replica_a.name # type: ignore[union-attr]
|
|
promoted_url = replica_a.url # type: ignore[union-attr]
|
|
promoted_token_enc = replica_a.api_token # type: ignore[union-attr]
|
|
promoted_token_plain = decrypt(promoted_token_enc, config.secret_key)
|
|
|
|
# ── Build A's ok-entry map: old_master_id → A_doc_id ─────────────
|
|
a_entries = session.exec(
|
|
select(SyncMap).where(
|
|
SyncMap.replica_id == replica_id,
|
|
SyncMap.status == "ok",
|
|
)
|
|
).all()
|
|
# old_master_doc_id → A's replica_doc_id (= new master doc id)
|
|
a_doc_id_map: dict[int, int] = {
|
|
e.master_doc_id: e.replica_doc_id
|
|
for e in a_entries
|
|
if e.replica_doc_id is not None
|
|
}
|
|
|
|
# ── Rebuild sync_maps for all other replicas ──────────────────────
|
|
other_replicas = session.exec(
|
|
select(Replica).where(Replica.id != replica_id)
|
|
).all()
|
|
|
|
total_mapped = 0
|
|
total_skipped = 0
|
|
|
|
for rep_b in other_replicas:
|
|
b_entries = session.exec(
|
|
select(SyncMap).where(
|
|
SyncMap.replica_id == rep_b.id,
|
|
SyncMap.status == "ok",
|
|
)
|
|
).all()
|
|
b_doc_id_map: dict[int, int] = {
|
|
e.master_doc_id: e.replica_doc_id
|
|
for e in b_entries
|
|
if e.replica_doc_id is not None
|
|
}
|
|
b_checksum_map: dict[int, Optional[str]] = {
|
|
e.master_doc_id: e.file_checksum for e in b_entries
|
|
}
|
|
|
|
# Delete all existing rows for B (ok + non-ok)
|
|
for entry in session.exec(
|
|
select(SyncMap).where(SyncMap.replica_id == rep_b.id)
|
|
).all():
|
|
session.delete(entry)
|
|
|
|
# Insert rebuilt rows
|
|
for old_master_id, a_doc_id in a_doc_id_map.items():
|
|
if old_master_id in b_doc_id_map:
|
|
session.add(SyncMap(
|
|
replica_id=rep_b.id,
|
|
master_doc_id=a_doc_id,
|
|
replica_doc_id=b_doc_id_map[old_master_id],
|
|
status="ok",
|
|
last_synced=now,
|
|
file_checksum=b_checksum_map.get(old_master_id),
|
|
))
|
|
total_mapped += 1
|
|
else:
|
|
total_skipped += 1
|
|
|
|
# Force full validation pass on next sync
|
|
rep_b.last_sync_ts = None
|
|
session.add(rep_b)
|
|
|
|
# ── Create replica row for old master ─────────────────────────────
|
|
new_old_master_replica = Replica(
|
|
name=old_master_name,
|
|
url=old_master_url,
|
|
api_token=old_master_token_enc,
|
|
enabled=True,
|
|
promoted_from_master=True,
|
|
last_sync_ts=None,
|
|
)
|
|
session.add(new_old_master_replica)
|
|
session.flush() # populate .id before inserting SyncMap rows
|
|
|
|
for old_master_id, a_doc_id in a_doc_id_map.items():
|
|
session.add(SyncMap(
|
|
replica_id=new_old_master_replica.id,
|
|
master_doc_id=a_doc_id,
|
|
replica_doc_id=old_master_id,
|
|
status="ok",
|
|
last_synced=now,
|
|
))
|
|
|
|
# ── Null out FK references so replica_a can be deleted ───────────
|
|
# sync_runs and logs keep history; just unlink the replica FK
|
|
for run in session.exec(
|
|
select(SyncRun).where(SyncRun.replica_id == replica_id)
|
|
).all():
|
|
run.replica_id = None
|
|
session.add(run)
|
|
for log in session.exec(
|
|
select(Log).where(Log.replica_id == replica_id)
|
|
).all():
|
|
log.replica_id = None
|
|
session.add(log)
|
|
|
|
# ── Delete sync_map + replica A ───────────────────────────────────
|
|
for entry in session.exec(
|
|
select(SyncMap).where(SyncMap.replica_id == replica_id)
|
|
).all():
|
|
session.delete(entry)
|
|
session.delete(replica_a)
|
|
|
|
# ── Update settings ───────────────────────────────────────────────
|
|
def _upsert(key: str, value: str) -> None:
|
|
s = session.get(Setting, key)
|
|
if s is None:
|
|
s = Setting(key=key, value=value)
|
|
else:
|
|
s.value = value
|
|
session.add(s)
|
|
|
|
_upsert("master_url", promoted_url)
|
|
_upsert("master_token", promoted_token_enc)
|
|
|
|
session.commit()
|
|
old_master_replica_id = new_old_master_replica.id
|
|
|
|
# ── Update envfile (outside DB transaction — file I/O) ────────────────
|
|
envfile.write({
|
|
"MASTER_URL": promoted_url,
|
|
"MASTER_TOKEN": promoted_token_plain,
|
|
})
|
|
url_key, token_key = envfile.replica_keys(old_master_name)
|
|
envfile.write({url_key: old_master_url, token_key: old_master_token_plain})
|
|
|
|
return PromoteResult(
|
|
ok=True,
|
|
new_master_url=promoted_url,
|
|
new_master_name=promoted_name,
|
|
old_master_replica_id=old_master_replica_id,
|
|
replicas_affected=len(other_replicas),
|
|
entries_mapped=total_mapped,
|
|
entries_skipped=total_skipped,
|
|
)
|
|
|
|
except ValueError:
|
|
raise
|
|
except Exception as exc:
|
|
raise exc
|
|
finally:
|
|
if _sched_module._scheduler is not None:
|
|
_sched_module._scheduler.resume()
|