All checks were successful
Deploy / deploy (push) Successful in 14s
Needed to safely recover missing docs: run reconcile first (populates sync_map for existing docs), then reset-ts, then sync (only missing docs get uploaded, existing ones get metadata patch only). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
212 lines
6.7 KiB
Python
212 lines
6.7 KiB
Python
import asyncio
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from pydantic import BaseModel
|
|
from sqlmodel import Session, select
|
|
|
|
from ..config import get_config
|
|
from ..crypto import decrypt, encrypt
|
|
from ..database import get_session
|
|
from ..models import Replica, SyncMap
|
|
from ..sync.paperless import PaperlessClient
|
|
|
|
router = APIRouter(prefix="/api/replicas", tags=["replicas"])
|
|
|
|
|
|
class ReplicaCreate(BaseModel):
|
|
name: str
|
|
url: str
|
|
api_token: str
|
|
enabled: bool = True
|
|
sync_interval_seconds: Optional[int] = None
|
|
|
|
|
|
class ReplicaUpdate(BaseModel):
|
|
name: Optional[str] = None
|
|
url: Optional[str] = None
|
|
api_token: Optional[str] = None
|
|
enabled: Optional[bool] = None
|
|
sync_interval_seconds: Optional[int] = None
|
|
|
|
|
|
def _serialize(r: Replica) -> dict:
|
|
return {
|
|
"id": r.id,
|
|
"name": r.name,
|
|
"url": r.url,
|
|
"enabled": r.enabled,
|
|
"sync_interval_seconds": r.sync_interval_seconds,
|
|
"last_sync_ts": r.last_sync_ts.isoformat() if r.last_sync_ts else None,
|
|
"consecutive_failures": r.consecutive_failures,
|
|
"suspended_at": r.suspended_at.isoformat() if r.suspended_at else None,
|
|
"created_at": r.created_at.isoformat() if r.created_at else None,
|
|
}
|
|
|
|
|
|
async def _test_conn(url: str, token: str) -> dict:
|
|
sem = asyncio.Semaphore(1)
|
|
async with PaperlessClient(url, token, sem) as client:
|
|
return await client.test_connection()
|
|
|
|
|
|
@router.get("")
|
|
def list_replicas(session: Session = Depends(get_session)):
|
|
return [_serialize(r) for r in session.exec(select(Replica)).all()]
|
|
|
|
|
|
@router.post("", status_code=201)
|
|
async def create_replica(
|
|
body: ReplicaCreate, session: Session = Depends(get_session)
|
|
):
|
|
result = await _test_conn(body.url, body.api_token)
|
|
if not result["ok"]:
|
|
raise HTTPException(422, detail=f"Connection test failed: {result['error']}")
|
|
|
|
config = get_config()
|
|
encrypted_token = encrypt(body.api_token, config.secret_key)
|
|
replica = Replica(
|
|
name=body.name,
|
|
url=body.url,
|
|
api_token=encrypted_token,
|
|
enabled=body.enabled,
|
|
sync_interval_seconds=body.sync_interval_seconds,
|
|
)
|
|
session.add(replica)
|
|
session.commit()
|
|
session.refresh(replica)
|
|
|
|
from .. import envfile
|
|
url_key, token_key = envfile.replica_keys(replica.name)
|
|
envfile.write({url_key: replica.url, token_key: body.api_token})
|
|
|
|
response = _serialize(replica)
|
|
response["doc_count"] = result["doc_count"]
|
|
return response
|
|
|
|
|
|
@router.put("/{replica_id}")
|
|
async def update_replica(
|
|
replica_id: int,
|
|
body: ReplicaUpdate,
|
|
session: Session = Depends(get_session),
|
|
):
|
|
replica = session.get(Replica, replica_id)
|
|
if not replica:
|
|
raise HTTPException(404)
|
|
|
|
config = get_config()
|
|
url_changed = body.url is not None and body.url != replica.url
|
|
token_changed = body.api_token is not None
|
|
|
|
if url_changed or token_changed:
|
|
new_url = body.url or replica.url
|
|
new_token = body.api_token or decrypt(replica.api_token, config.secret_key)
|
|
result = await _test_conn(new_url, new_token)
|
|
if not result["ok"]:
|
|
raise HTTPException(422, detail=f"Connection test failed: {result['error']}")
|
|
|
|
if body.name is not None:
|
|
replica.name = body.name
|
|
if body.url is not None:
|
|
replica.url = body.url
|
|
if body.api_token is not None:
|
|
replica.api_token = encrypt(body.api_token, config.secret_key)
|
|
if body.enabled is not None:
|
|
replica.enabled = body.enabled
|
|
if body.sync_interval_seconds is not None:
|
|
replica.sync_interval_seconds = body.sync_interval_seconds
|
|
|
|
session.add(replica)
|
|
session.commit()
|
|
session.refresh(replica)
|
|
|
|
from .. import envfile
|
|
url_key, token_key = envfile.replica_keys(replica.name)
|
|
env_write: dict[str, str] = {url_key: replica.url}
|
|
if body.api_token:
|
|
env_write[token_key] = body.api_token
|
|
envfile.write(env_write)
|
|
|
|
return _serialize(replica)
|
|
|
|
|
|
@router.delete("/{replica_id}", status_code=204)
|
|
def delete_replica(replica_id: int, session: Session = Depends(get_session)):
|
|
replica = session.get(Replica, replica_id)
|
|
if not replica:
|
|
raise HTTPException(404)
|
|
# Explicitly delete sync_map rows before the replica (SQLite FK cascade)
|
|
for entry in session.exec(select(SyncMap).where(SyncMap.replica_id == replica_id)).all():
|
|
session.delete(entry)
|
|
session.delete(replica)
|
|
session.commit()
|
|
|
|
|
|
@router.post("/{replica_id}/test")
|
|
async def test_replica(replica_id: int, session: Session = Depends(get_session)):
|
|
replica = session.get(Replica, replica_id)
|
|
if not replica:
|
|
raise HTTPException(404)
|
|
config = get_config()
|
|
token = decrypt(replica.api_token, config.secret_key)
|
|
return await _test_conn(replica.url, token)
|
|
|
|
|
|
@router.post("/{replica_id}/reconcile")
|
|
async def reconcile_replica(replica_id: int, session: Session = Depends(get_session)):
|
|
replica = session.get(Replica, replica_id)
|
|
if not replica:
|
|
raise HTTPException(404)
|
|
from ..sync.reconcile import run_reconcile
|
|
|
|
result = await run_reconcile(replica_id)
|
|
return result
|
|
|
|
|
|
@router.post("/{replica_id}/unsuspend")
|
|
def unsuspend_replica(replica_id: int, session: Session = Depends(get_session)):
|
|
replica = session.get(Replica, replica_id)
|
|
if not replica:
|
|
raise HTTPException(404)
|
|
replica.suspended_at = None
|
|
replica.consecutive_failures = 0
|
|
session.add(replica)
|
|
session.commit()
|
|
return _serialize(replica)
|
|
|
|
|
|
@router.post("/{replica_id}/reset-ts")
|
|
def reset_replica_sync_ts(replica_id: int, session: Session = Depends(get_session)):
|
|
"""Reset last_sync_ts to None so the next sync fetches all master docs. Does NOT touch sync_map."""
|
|
replica = session.get(Replica, replica_id)
|
|
if not replica:
|
|
raise HTTPException(404)
|
|
replica.last_sync_ts = None
|
|
session.add(replica)
|
|
session.commit()
|
|
return {"ok": True, "replica_id": replica_id}
|
|
|
|
|
|
@router.post("/{replica_id}/resync")
|
|
async def resync_replica(replica_id: int, session: Session = Depends(get_session)):
|
|
"""Wipe sync_map, reset last_sync_ts, and trigger a full resync."""
|
|
replica = session.get(Replica, replica_id)
|
|
if not replica:
|
|
raise HTTPException(404)
|
|
# Reset last_sync_ts so the sync fetches ALL master documents
|
|
replica.last_sync_ts = None
|
|
session.add(replica)
|
|
# Delete all sync_map entries for this replica
|
|
entries = session.exec(
|
|
select(SyncMap).where(SyncMap.replica_id == replica_id)
|
|
).all()
|
|
for e in entries:
|
|
session.delete(e)
|
|
session.commit()
|
|
# Trigger sync
|
|
from ..sync.engine import run_sync_cycle
|
|
|
|
started = await run_sync_cycle(triggered_by="manual", replica_id=replica_id)
|
|
return {"started": started}
|