"""Reconcile mode: match existing replica documents to master without re-uploading.""" import asyncio from datetime import datetime, timezone from sqlmodel import Session, select from ..config import get_config from ..crypto import decrypt from ..database import get_engine from ..logger import emit_log from ..models import Replica, SyncMap from .paperless import PaperlessClient async def run_reconcile(replica_id: int) -> dict: """ Match replica documents to master by ASN / (title + created_date). Populates sync_map without uploading files. Returns {matched, unmatched, errors}. """ config = get_config() engine = get_engine() from ..models import Setting from ..scheduler import SETTINGS_DEFAULTS with Session(engine) as session: settings = {s.key: s.value for s in session.exec(select(Setting)).all()} replica_obj = session.get(Replica, replica_id) if not replica_obj: raise ValueError(f"Replica {replica_id} not found") master_url = settings.get("master_url", "") master_token_enc = settings.get("master_token", "") if not master_url or not master_token_enc: raise ValueError("Master URL or token not configured") master_token = decrypt(master_token_enc, config.secret_key) replica_token = decrypt(replica_obj.api_token, config.secret_key) max_concurrent = int(settings.get("max_concurrent_requests") or SETTINGS_DEFAULTS["max_concurrent_requests"]) master_sem = asyncio.Semaphore(max_concurrent) replica_sem = asyncio.Semaphore(max_concurrent) matched = unmatched = errors = 0 async with PaperlessClient(master_url, master_token, master_sem) as master: async with PaperlessClient(replica_obj.url, replica_token, replica_sem) as replica: # Build replica index: asn → doc, (title, date) → doc emit_log("info", "Reconcile: indexing replica documents", replica=replica_obj.name) replica_docs = await replica.get_all_documents() asn_index: dict[int, dict] = {} title_date_index: dict[tuple, dict] = {} for doc in replica_docs: asn = doc.get("archive_serial_number") if asn is not None: asn_index[int(asn)] = doc title = (doc.get("title", "") or "").strip().lower() created = str(doc.get("created") or doc.get("created_date") or "")[:10] if title: title_date_index[(title, created)] = doc # Walk master documents emit_log("info", "Reconcile: indexing master documents", replica=replica_obj.name) master_docs = await master.get_all_documents() now = datetime.now(timezone.utc) with Session(engine) as session: for mdoc in master_docs: master_id = mdoc["id"] # Skip if already in sync_map existing = session.exec( select(SyncMap).where( SyncMap.replica_id == replica_id, SyncMap.master_doc_id == master_id, ) ).first() if existing: continue # Try to match replica_match: dict | None = None masn = mdoc.get("archive_serial_number") if masn is not None and int(masn) in asn_index: replica_match = asn_index[int(masn)] else: mtitle = (mdoc.get("title", "") or "").strip().lower() mcreated = str( mdoc.get("created") or mdoc.get("created_date") or "" )[:10] if mtitle: replica_match = title_date_index.get((mtitle, mcreated)) if replica_match: try: file_bytes = await master.download_document(master_id) import hashlib checksum = hashlib.sha256(file_bytes).hexdigest() except Exception: checksum = None entry = SyncMap( replica_id=replica_id, master_doc_id=master_id, replica_doc_id=replica_match["id"], status="ok", file_checksum=checksum, last_synced=now, ) session.add(entry) matched += 1 else: unmatched += 1 try: session.commit() except Exception as e: errors += 1 emit_log( "error", f"Reconcile DB commit failed: {e}", replica=replica_obj.name, ) emit_log( "info", f"Reconcile complete: {matched} matched, {unmatched} unmatched, {errors} errors", replica=replica_obj.name, ) return {"matched": matched, "unmatched": unmatched, "errors": errors}