diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..cb46fbf --- /dev/null +++ b/.dockerignore @@ -0,0 +1,12 @@ +.git +.claude +.env +data/ +__pycache__ +*.pyc +*.pyo +.pytest_cache +.mypy_cache +*.egg-info +dist/ +build/ diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..f68f5d6 --- /dev/null +++ b/.env.example @@ -0,0 +1,10 @@ +# Required: Fernet key for encrypting API tokens at rest +# Generate with: python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())" +SECRET_KEY= + +# SQLite database path +DATABASE_URL=sqlite:////data/db.sqlite3 + +# Optional: seed settings on first boot +MASTER_URL=http://100.x.x.x:8000 +MASTER_TOKEN=your-paperless-api-token diff --git a/.gitea/workflows/deploy.yml b/.gitea/workflows/deploy.yml new file mode 100644 index 0000000..9134c01 --- /dev/null +++ b/.gitea/workflows/deploy.yml @@ -0,0 +1,62 @@ +name: Deploy + +on: + push: + branches: + - main + workflow_dispatch: + inputs: + reason: + description: "Reason for manual deploy" + required: false + default: "manual" + +env: + COMPOSE_PROJECT: pngx-controller + COMPOSE_FILE: docker-compose.yml + +jobs: + deploy: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + token: ${{ secrets.GITEA_TOKEN }} + + - name: Bump patch version + if: github.event_name != 'workflow_dispatch' + run: | + VERSION=$(cat VERSION) + MAJOR=$(echo $VERSION | cut -d. -f1) + MINOR=$(echo $VERSION | cut -d. -f2) + PATCH=$(echo $VERSION | cut -d. -f3) + NEW_VERSION="$MAJOR.$MINOR.$((PATCH + 1))" + echo $NEW_VERSION > VERSION + echo "APP_VERSION=$NEW_VERSION" >> $GITHUB_ENV + git config user.email "ci@domverse-berlin.eu" + git config user.name "CI" + git add VERSION + git commit -m "chore: bump version to $NEW_VERSION [skip ci]" + git push + + - name: Write .env + run: | + cat > .env << EOF + SECRET_KEY=${{ secrets.PNGX_SECRET_KEY }} + MASTER_URL=${{ secrets.PNGX_MASTER_URL }} + MASTER_TOKEN=${{ secrets.PNGX_MASTER_TOKEN }} + EOF + + - name: Deploy with docker compose + run: | + APP_VERSION=${APP_VERSION:-$(cat VERSION)} + echo "=== Deploying $APP_VERSION (commit ${{ gitea.sha }}) to ${{ gitea.ref_name }} ===" + docker compose -f "$COMPOSE_FILE" -p "$COMPOSE_PROJECT" build --build-arg APP_VERSION=$APP_VERSION + docker compose -f "$COMPOSE_FILE" -p "$COMPOSE_PROJECT" up -d --remove-orphans + + - name: Prune dangling images + run: docker image prune -f + + - name: Show running containers + run: docker compose -f "$COMPOSE_FILE" -p "$COMPOSE_PROJECT" ps diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b518007 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +__pycache__/ +*.pyc +*.pyo +.env +data/ +*.egg-info/ +dist/ +build/ +.mypy_cache/ +.pytest_cache/ diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..ac56f1e --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,72 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +**pngx-controller** is a Paperless-ngx Central Sync Controller: a single-container FastAPI service that reads from a master paperless-ngx instance and syncs documents/metadata to one or more replicas using only the public paperless REST API. Master always wins; replicas are read-only by convention. + +The full specification is in `pngx-controller-prd.md`. + +## Tech Stack + +| Layer | Choice | +|---|---| +| Backend | Python / FastAPI | +| Scheduler | APScheduler (runs inside FastAPI event loop) | +| Frontend | Jinja2 templates + HTMX + Pico CSS (no JS build step) | +| Database | SQLite via SQLModel | +| Auth | Authentik forward auth via `X-authentik-*` headers (no app-level auth code) | +| Transport | Tailscale IPs (bypasses Traefik/public internet) | + +## Architecture + +Single process, single container. APScheduler runs the sync job inside the FastAPI event loop. An `asyncio.Lock` prevents concurrent sync runs. + +``` +FastAPI app +├── Web UI (Jinja2 + HTMX) — /, /replicas, /logs, /settings +├── REST API — /api/* +└── APScheduler — sync job every N minutes (default: 15) + +SQLite (bind-mounted at /data/db.sqlite3) +├── replicas — configured instances (URL + encrypted API token) +├── sync_map — master_doc_id ↔ replica_doc_id mapping per replica +├── sync_runs — audit log of sync cycles +├── logs — per-document sync events +└── settings — master_url, master_token, sync_interval_seconds, log_retention_days +``` + +API tokens are encrypted at rest using **Fernet** with `SECRET_KEY` from the environment. + +## Sync Engine Logic + +1. Acquire `asyncio.Lock` (skip if already running) +2. For each enabled replica: + - `ensure_schema_parity`: sync tags, correspondents, document types, custom fields **by name** (IDs differ per instance; name→id mapping is built and used locally) + - Fetch master docs modified since last sync (`?modified__gte=...`) + - For each doc: download file + metadata from master, then create (`POST /api/documents/post_document/`) or update (`PUT /api/documents/{replica_id}/`) on replica + - Skip file re-upload if SHA256 checksum matches `sync_map.file_checksum` +3. Advance `last_sync_ts`, close `sync_run` record, release lock + +Schema parity must be established **before** document sync so custom fields exist on the replica. + +## Key Design Constraints + +- **No consume directory** — sync via REST API only; consume causes re-OCR, ID collisions, and breaks on live instances +- **No changes to paperless containers** — controller is fully external +- **SPOF accepted** — if controller is down, paperless instances run normally; sync resumes on recovery +- Live log tail uses **SSE** at `/api/logs/stream`; dashboard sync progress uses HTMX polling `/api/sync/running` + +## Environment Variables + +| Variable | Purpose | +|---|---| +| `SECRET_KEY` | Fernet key for encrypting API tokens at rest | +| `DATABASE_URL` | SQLite path, e.g. `sqlite:////data/db.sqlite3` | + +## Implementation Phases (from PRD) + +- **Phase 1 (MVP):** SQLModel schema, settings/replica CRUD, sync engine, APScheduler, basic dashboard, log table +- **Phase 2:** SSE log stream, sync progress indicator, manual trigger with live feedback +- **Phase 3:** Full resync per replica, deletion propagation (tombstone table), file checksum skip, alert webhooks diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..2b28033 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1 @@ +@AGENTS.MD \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..15985d0 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,28 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install uv +COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv + +# Install dependencies first (cached layer — only invalidates when pyproject.toml changes) +COPY pyproject.toml . +RUN uv pip install --system --no-cache fastapi uvicorn[standard] sqlmodel apscheduler \ + cryptography httpx jinja2 python-multipart prometheus-client aiofiles + +# Copy application code +COPY app/ ./app/ + +# Install the package itself (editable-free, no pip install of the whole project) +RUN uv pip install --system --no-cache --no-deps . + +ARG APP_VERSION=dev +LABEL org.opencontainers.image.version="${APP_VERSION}" + +VOLUME ["/data"] +EXPOSE 8000 + +HEALTHCHECK --interval=30s --timeout=5s --start-period=15s --retries=3 \ + CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/healthz')" || exit 1 + +CMD ["python", "-m", "app.main"] diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..6e8bf73 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +0.1.0 diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/api/__init__.py b/app/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/api/logs.py b/app/api/logs.py new file mode 100644 index 0000000..2ac71e5 --- /dev/null +++ b/app/api/logs.py @@ -0,0 +1,109 @@ +import asyncio +import json +from typing import Optional + +from fastapi import APIRouter, Depends, Query +from fastapi.responses import StreamingResponse +from sqlalchemy import text +from sqlmodel import Session, select + +from ..database import get_session +from ..logger import subscribe_sse, unsubscribe_sse +from ..models import Log + +router = APIRouter(prefix="/api/logs", tags=["logs"]) + + +@router.get("") +def list_logs( + replica_id: Optional[int] = Query(None), + level: Optional[str] = Query(None), + from_dt: Optional[str] = Query(None, alias="from"), + to_dt: Optional[str] = Query(None, alias="to"), + q: Optional[str] = Query(None), + page: int = Query(1, ge=1), + page_size: int = Query(50, ge=1, le=200), + session: Session = Depends(get_session), +): + if q: + # FTS5 search — use SQLAlchemy execute() for raw SQL + fts_sql = text( + "SELECT l.id, l.run_id, l.replica_id, l.level, l.message, l.doc_id, l.created_at " + "FROM logs l JOIN logs_fts f ON l.id = f.rowid " + "WHERE logs_fts MATCH :q ORDER BY l.created_at DESC LIMIT :lim OFFSET :off" + ) + offset = (page - 1) * page_size + rows = session.execute(fts_sql, {"q": q, "lim": page_size, "off": offset}).all() + return [dict(r._mapping) for r in rows] + + stmt = select(Log) + if replica_id is not None: + stmt = stmt.where(Log.replica_id == replica_id) + if level: + stmt = stmt.where(Log.level == level) + if from_dt: + stmt = stmt.where(Log.created_at >= from_dt) + if to_dt: + stmt = stmt.where(Log.created_at <= to_dt) + stmt = stmt.order_by(Log.created_at.desc()) # type: ignore[attr-defined] + stmt = stmt.offset((page - 1) * page_size).limit(page_size) + + logs = session.exec(stmt).all() + return [ + { + "id": l.id, + "run_id": l.run_id, + "replica_id": l.replica_id, + "level": l.level, + "message": l.message, + "doc_id": l.doc_id, + "created_at": l.created_at.isoformat() if l.created_at else None, + } + for l in logs + ] + + +@router.delete("") +def clear_logs( + older_than_days: int = Query(90, ge=1), + session: Session = Depends(get_session), +): + from datetime import datetime, timedelta, timezone + + cutoff = datetime.now(timezone.utc) - timedelta(days=older_than_days) + old = session.exec(select(Log).where(Log.created_at < cutoff)).all() + count = len(old) + for log in old: + session.delete(log) + session.commit() + return {"deleted": count} + + +@router.get("/stream") +async def log_stream(): + """SSE endpoint for live log tail.""" + + async def generator(): + q = subscribe_sse() + try: + yield "retry: 3000\n\n" + while True: + try: + data = await asyncio.wait_for(q.get(), timeout=30.0) + yield f"data: {data}\n\n" + except asyncio.TimeoutError: + # Send keepalive comment + yield ": keepalive\n\n" + except asyncio.CancelledError: + pass + finally: + unsubscribe_sse(q) + + return StreamingResponse( + generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + }, + ) diff --git a/app/api/replicas.py b/app/api/replicas.py new file mode 100644 index 0000000..ce8aa5f --- /dev/null +++ b/app/api/replicas.py @@ -0,0 +1,196 @@ +import asyncio +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel +from sqlmodel import Session, select + +from ..config import get_config +from ..crypto import decrypt, encrypt +from ..database import get_session +from ..models import Replica, SyncMap +from ..sync.paperless import PaperlessClient + +router = APIRouter(prefix="/api/replicas", tags=["replicas"]) + + +class ReplicaCreate(BaseModel): + name: str + url: str + api_token: str + enabled: bool = True + sync_interval_seconds: Optional[int] = None + + +class ReplicaUpdate(BaseModel): + name: Optional[str] = None + url: Optional[str] = None + api_token: Optional[str] = None + enabled: Optional[bool] = None + sync_interval_seconds: Optional[int] = None + + +def _serialize(r: Replica) -> dict: + return { + "id": r.id, + "name": r.name, + "url": r.url, + "enabled": r.enabled, + "sync_interval_seconds": r.sync_interval_seconds, + "last_sync_ts": r.last_sync_ts.isoformat() if r.last_sync_ts else None, + "consecutive_failures": r.consecutive_failures, + "suspended_at": r.suspended_at.isoformat() if r.suspended_at else None, + "created_at": r.created_at.isoformat() if r.created_at else None, + } + + +async def _test_conn(url: str, token: str) -> dict: + sem = asyncio.Semaphore(1) + async with PaperlessClient(url, token, sem) as client: + return await client.test_connection() + + +@router.get("") +def list_replicas(session: Session = Depends(get_session)): + return [_serialize(r) for r in session.exec(select(Replica)).all()] + + +@router.post("", status_code=201) +async def create_replica( + body: ReplicaCreate, session: Session = Depends(get_session) +): + result = await _test_conn(body.url, body.api_token) + if not result["ok"]: + raise HTTPException(422, detail=f"Connection test failed: {result['error']}") + + config = get_config() + encrypted_token = encrypt(body.api_token, config.secret_key) + replica = Replica( + name=body.name, + url=body.url, + api_token=encrypted_token, + enabled=body.enabled, + sync_interval_seconds=body.sync_interval_seconds, + ) + session.add(replica) + session.commit() + session.refresh(replica) + + from .. import envfile + url_key, token_key = envfile.replica_keys(replica.name) + envfile.write({url_key: replica.url, token_key: body.api_token}) + + response = _serialize(replica) + response["doc_count"] = result["doc_count"] + return response + + +@router.put("/{replica_id}") +async def update_replica( + replica_id: int, + body: ReplicaUpdate, + session: Session = Depends(get_session), +): + replica = session.get(Replica, replica_id) + if not replica: + raise HTTPException(404) + + config = get_config() + url_changed = body.url is not None and body.url != replica.url + token_changed = body.api_token is not None + + if url_changed or token_changed: + new_url = body.url or replica.url + new_token = body.api_token or decrypt(replica.api_token, config.secret_key) + result = await _test_conn(new_url, new_token) + if not result["ok"]: + raise HTTPException(422, detail=f"Connection test failed: {result['error']}") + + if body.name is not None: + replica.name = body.name + if body.url is not None: + replica.url = body.url + if body.api_token is not None: + replica.api_token = encrypt(body.api_token, config.secret_key) + if body.enabled is not None: + replica.enabled = body.enabled + if body.sync_interval_seconds is not None: + replica.sync_interval_seconds = body.sync_interval_seconds + + session.add(replica) + session.commit() + session.refresh(replica) + + from .. import envfile + url_key, token_key = envfile.replica_keys(replica.name) + env_write: dict[str, str] = {url_key: replica.url} + if body.api_token: + env_write[token_key] = body.api_token + envfile.write(env_write) + + return _serialize(replica) + + +@router.delete("/{replica_id}", status_code=204) +def delete_replica(replica_id: int, session: Session = Depends(get_session)): + replica = session.get(Replica, replica_id) + if not replica: + raise HTTPException(404) + # Explicitly delete sync_map rows before the replica (SQLite FK cascade) + for entry in session.exec(select(SyncMap).where(SyncMap.replica_id == replica_id)).all(): + session.delete(entry) + session.delete(replica) + session.commit() + + +@router.post("/{replica_id}/test") +async def test_replica(replica_id: int, session: Session = Depends(get_session)): + replica = session.get(Replica, replica_id) + if not replica: + raise HTTPException(404) + config = get_config() + token = decrypt(replica.api_token, config.secret_key) + return await _test_conn(replica.url, token) + + +@router.post("/{replica_id}/reconcile") +async def reconcile_replica(replica_id: int, session: Session = Depends(get_session)): + replica = session.get(Replica, replica_id) + if not replica: + raise HTTPException(404) + from ..sync.reconcile import run_reconcile + + result = await run_reconcile(replica_id) + return result + + +@router.post("/{replica_id}/unsuspend") +def unsuspend_replica(replica_id: int, session: Session = Depends(get_session)): + replica = session.get(Replica, replica_id) + if not replica: + raise HTTPException(404) + replica.suspended_at = None + replica.consecutive_failures = 0 + session.add(replica) + session.commit() + return _serialize(replica) + + +@router.post("/{replica_id}/resync") +async def resync_replica(replica_id: int, session: Session = Depends(get_session)): + """Phase 3: wipe sync_map and trigger full resync.""" + replica = session.get(Replica, replica_id) + if not replica: + raise HTTPException(404) + # Delete all sync_map entries for this replica + entries = session.exec( + select(SyncMap).where(SyncMap.replica_id == replica_id) + ).all() + for e in entries: + session.delete(e) + session.commit() + # Trigger sync + from ..sync.engine import run_sync_cycle + + started = await run_sync_cycle(triggered_by="manual", replica_id=replica_id) + return {"started": started} diff --git a/app/api/settings.py b/app/api/settings.py new file mode 100644 index 0000000..b5c2808 --- /dev/null +++ b/app/api/settings.py @@ -0,0 +1,180 @@ +import asyncio + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel +from sqlmodel import Session, select + +from ..config import get_config +from ..crypto import decrypt, encrypt +from ..database import get_session +from ..models import Setting +from ..scheduler import SETTINGS_DEFAULTS + +router = APIRouter(prefix="/api/settings", tags=["settings"]) + +ENCRYPTED_KEYS = {"master_token", "alert_target_token"} + + +def _get_all_settings(session: Session) -> dict: + rows = session.exec(select(Setting)).all() + result = dict(SETTINGS_DEFAULTS) + for row in rows: + if row.value is not None: + result[row.key] = row.value + return result + + +def _safe_settings(settings: dict) -> dict: + """Return settings with encrypted values masked.""" + out = dict(settings) + for k in ENCRYPTED_KEYS: + if out.get(k): + out[k] = "••••••••" + return out + + +@router.get("") +def get_settings(session: Session = Depends(get_session)): + return _safe_settings(_get_all_settings(session)) + + +class SettingsUpdate(BaseModel): + master_url: str | None = None + master_token: str | None = None + sync_interval_seconds: int | None = None + log_retention_days: int | None = None + sync_cycle_timeout_seconds: int | None = None + task_poll_timeout_seconds: int | None = None + replica_suspend_threshold: int | None = None + max_concurrent_requests: int | None = None + alert_target_type: str | None = None + alert_target_url: str | None = None + alert_target_token: str | None = None + alert_error_threshold: int | None = None + alert_cooldown_seconds: int | None = None + + +@router.put("") +async def update_settings( + body: SettingsUpdate, + session: Session = Depends(get_session), +): + config = get_config() + updates = body.model_dump(exclude_none=True) + + # Validate master connection if URL or token changed + current = _get_all_settings(session) + if "master_url" in updates or "master_token" in updates: + new_url = updates.get("master_url") or current.get("master_url", "") + new_token = updates.get("master_token") + if not new_token: + enc = current.get("master_token", "") + new_token = decrypt(enc, config.secret_key) if enc else "" + if new_url and new_token: + import httpx as _httpx + + try: + async with _httpx.AsyncClient( + headers={"Authorization": f"Token {new_token}"}, + timeout=10.0, + ) as _client: + _r = await _client.get( + new_url.rstrip("/") + "/api/documents/", + params={"page_size": 1}, + ) + _r.raise_for_status() + except Exception as _e: + raise HTTPException( + 422, + detail=f"Master connection test failed: {_e}", + ) + + # Capture plaintext values for envfile before encryption + env_updates: dict[str, str] = {} + if "master_url" in updates: + env_updates["MASTER_URL"] = str(updates["master_url"]) + if "master_token" in updates and updates["master_token"]: + env_updates["MASTER_TOKEN"] = str(updates["master_token"]) + + # Persist updates + for key, value in updates.items(): + if key in ENCRYPTED_KEYS and value: + value = encrypt(str(value), config.secret_key) + setting = session.get(Setting, key) + if setting: + setting.value = str(value) + else: + setting = Setting(key=key, value=str(value)) + session.add(setting) + session.commit() + + if env_updates: + from .. import envfile + envfile.write(env_updates) + + # Reschedule if interval changed + if "sync_interval_seconds" in updates: + from ..scheduler import reschedule + reschedule(int(updates["sync_interval_seconds"])) + + return _safe_settings(_get_all_settings(session)) + + +class ConnectionTestRequest(BaseModel): + url: str + token: str = "" # blank = use saved master token + + +@router.post("/test") +async def test_connection( + body: ConnectionTestRequest, + session: Session = Depends(get_session), +): + """Test a connection using the provided URL and token (does not save). + If token is blank, falls back to the saved master_token.""" + import httpx + import time + + config = get_config() + token = body.token.strip() + if not token: + settings = _get_all_settings(session) + enc = settings.get("master_token", "") + token = decrypt(enc, config.secret_key) if enc else "" + + if not token: + return {"ok": False, "error": "No token provided and no saved token found", "latency_ms": 0, "doc_count": 0} + + t0 = time.monotonic() + try: + async with httpx.AsyncClient( + headers={"Authorization": f"Token {token}"}, + timeout=10.0, + ) as client: + r = await client.get( + body.url.rstrip("/") + "/api/documents/", + params={"page_size": 1}, + ) + r.raise_for_status() + elapsed = int((time.monotonic() - t0) * 1000) + data = r.json() + return {"ok": True, "error": None, "latency_ms": elapsed, "doc_count": data.get("count", 0)} + except Exception as e: + return {"ok": False, "error": str(e), "latency_ms": 0, "doc_count": 0} + + +@router.get("/status") +async def master_status(session: Session = Depends(get_session)): + """Test the currently saved master connection.""" + config = get_config() + settings = _get_all_settings(session) + master_url = settings.get("master_url", "") + master_token_enc = settings.get("master_token", "") + if not master_url or not master_token_enc: + return {"ok": False, "error": "Not configured", "latency_ms": 0, "doc_count": 0} + master_token = decrypt(master_token_enc, config.secret_key) + from ..sync.paperless import PaperlessClient + + sem = asyncio.Semaphore(1) + async with PaperlessClient(master_url, master_token, sem) as client: + return await client.test_connection() diff --git a/app/api/status.py b/app/api/status.py new file mode 100644 index 0000000..bb3e16e --- /dev/null +++ b/app/api/status.py @@ -0,0 +1,88 @@ +"""Dashboard status endpoint.""" +from datetime import datetime, timezone + +from fastapi import APIRouter, Depends +from sqlmodel import Session, select + +from ..database import get_session +from ..models import Replica, SyncRun +from ..sync.engine import get_progress + +router = APIRouter(prefix="/api", tags=["status"]) + + +@router.get("/status") +def get_status(session: Session = Depends(get_session)): + replicas = session.exec(select(Replica)).all() + progress = get_progress() + now = datetime.now(timezone.utc) + + replica_data = [] + for r in replicas: + lag = None + if r.last_sync_ts: + ts = r.last_sync_ts + if ts.tzinfo is None: + ts = ts.replace(tzinfo=timezone.utc) + lag = int((now - ts).total_seconds()) + + if r.suspended_at: + status = "suspended" + elif progress.running and progress.phase and r.name in progress.phase: + status = "syncing" + elif r.consecutive_failures > 0: + status = "error" + elif r.last_sync_ts: + status = "synced" + else: + status = "pending" + + # Last run stats for this replica + last_run = session.exec( + select(SyncRun) + .where(SyncRun.replica_id == r.id) + .order_by(SyncRun.started_at.desc()) # type: ignore[attr-defined] + .limit(1) + ).first() + + replica_data.append( + { + "id": r.id, + "name": r.name, + "url": r.url, + "enabled": r.enabled, + "status": status, + "lag_seconds": lag, + "last_sync_ts": r.last_sync_ts.isoformat() if r.last_sync_ts else None, + "consecutive_failures": r.consecutive_failures, + "suspended": r.suspended_at is not None, + "docs_synced_last_run": last_run.docs_synced if last_run else 0, + "docs_failed_last_run": last_run.docs_failed if last_run else 0, + } + ) + + last_run = session.exec( + select(SyncRun) + .order_by(SyncRun.started_at.desc()) # type: ignore[attr-defined] + .limit(1) + ).first() + + return { + "replicas": replica_data, + "sync_progress": { + "running": progress.running, + "phase": progress.phase, + "docs_done": progress.docs_done, + "docs_total": progress.docs_total, + }, + "last_sync_run": { + "id": last_run.id, + "started_at": last_run.started_at.isoformat() if last_run and last_run.started_at else None, + "finished_at": last_run.finished_at.isoformat() if last_run and last_run.finished_at else None, + "docs_synced": last_run.docs_synced if last_run else 0, + "docs_failed": last_run.docs_failed if last_run else 0, + "timed_out": last_run.timed_out if last_run else False, + } + if last_run + else None, + } diff --git a/app/api/sync.py b/app/api/sync.py new file mode 100644 index 0000000..d0af372 --- /dev/null +++ b/app/api/sync.py @@ -0,0 +1,29 @@ +from fastapi import APIRouter +from fastapi.responses import JSONResponse + +from ..sync.engine import get_progress, run_sync_cycle + +router = APIRouter(prefix="/api/sync", tags=["sync"]) + + +@router.post("") +async def trigger_sync(replica_id: int | None = None): + started = await run_sync_cycle( + triggered_by="manual", + replica_id=replica_id, + ) + return JSONResponse( + status_code=202, + content={"started": started, "message": "Sync triggered" if started else "Already running"}, + ) + + +@router.get("/running") +def sync_running(): + p = get_progress() + return { + "running": p.running, + "phase": p.phase, + "docs_done": p.docs_done, + "docs_total": p.docs_total, + } diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..4f837de --- /dev/null +++ b/app/config.py @@ -0,0 +1,24 @@ +import os +from functools import lru_cache + + +class Config: + def __init__(self) -> None: + self.secret_key: str = os.environ.get("SECRET_KEY", "") + self.database_url: str = os.environ.get("DATABASE_URL", "sqlite:////data/db.sqlite3") + self.master_url: str | None = os.environ.get("MASTER_URL") + self.master_token: str | None = os.environ.get("MASTER_TOKEN") + + @property + def db_path(self) -> str: + url = self.database_url + if url.startswith("sqlite:////"): + return "/" + url[len("sqlite:////"):] + if url.startswith("sqlite:///"): + return url[len("sqlite:///"):] + return url + + +@lru_cache +def get_config() -> Config: + return Config() diff --git a/app/crypto.py b/app/crypto.py new file mode 100644 index 0000000..1548183 --- /dev/null +++ b/app/crypto.py @@ -0,0 +1,21 @@ +from cryptography.fernet import Fernet + + +def make_fernet(key: str) -> Fernet: + return Fernet(key.encode() if isinstance(key, str) else key) + + +def encrypt(value: str, key: str) -> str: + return make_fernet(key).encrypt(value.encode()).decode() + + +def decrypt(encrypted: str, key: str) -> str: + return make_fernet(key).decrypt(encrypted.encode()).decode() + + +def is_valid_fernet_key(key: str) -> bool: + try: + Fernet(key.encode() if isinstance(key, str) else key) + return True + except Exception: + return False diff --git a/app/database.py b/app/database.py new file mode 100644 index 0000000..2485ac0 --- /dev/null +++ b/app/database.py @@ -0,0 +1,59 @@ +from sqlalchemy import event +from sqlmodel import Session, SQLModel, create_engine + +from .config import get_config + +_engine = None + + +def get_engine(): + global _engine + if _engine is None: + config = get_config() + _engine = create_engine( + config.database_url, + connect_args={"check_same_thread": False}, + ) + + # Set PRAGMAs on every new connection (foreign_keys must be per-connection) + @event.listens_for(_engine, "connect") + def _set_pragmas(dbapi_conn, _record): + cursor = dbapi_conn.cursor() + cursor.execute("PRAGMA journal_mode=WAL") + cursor.execute("PRAGMA foreign_keys=ON") + cursor.close() + + return _engine + + +def get_session(): + with Session(get_engine()) as session: + yield session + + +def create_db_and_tables() -> None: + from . import models # noqa: ensure model classes are registered + + SQLModel.metadata.create_all(get_engine()) + engine = get_engine() + with engine.connect() as conn: + conn.exec_driver_sql( + "CREATE INDEX IF NOT EXISTS idx_sync_map_replica ON sync_map(replica_id)" + ) + conn.exec_driver_sql( + "CREATE INDEX IF NOT EXISTS idx_sync_map_status ON sync_map(replica_id, status)" + ) + conn.exec_driver_sql( + "CREATE VIRTUAL TABLE IF NOT EXISTS logs_fts " + "USING fts5(message, content=logs, content_rowid=id)" + ) + conn.exec_driver_sql( + "CREATE TRIGGER IF NOT EXISTS logs_ai AFTER INSERT ON logs BEGIN " + "INSERT INTO logs_fts(rowid, message) VALUES (new.id, new.message); END" + ) + conn.exec_driver_sql( + "CREATE TRIGGER IF NOT EXISTS logs_ad AFTER DELETE ON logs BEGIN " + "INSERT INTO logs_fts(logs_fts, rowid, message) " + "VALUES('delete', old.id, old.message); END" + ) + conn.commit() diff --git a/app/envfile.py b/app/envfile.py new file mode 100644 index 0000000..98bf8c0 --- /dev/null +++ b/app/envfile.py @@ -0,0 +1,52 @@ +"""Read and write a plain-text pngx.env credential file alongside the database.""" +from pathlib import Path + + +def _path() -> Path: + from .config import get_config + return Path(get_config().db_path).parent / "pngx.env" + + +def read() -> dict[str, str]: + """Return all key=value pairs from pngx.env, ignoring comments/blanks.""" + p = _path() + if not p.exists(): + return {} + result: dict[str, str] = {} + for line in p.read_text().splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + if "=" in line: + key, _, value = line.partition("=") + result[key.strip()] = value.strip() + return result + + +def write(updates: dict[str, str]) -> None: + """Merge updates into pngx.env, preserving unrelated existing lines.""" + p = _path() + lines: list[str] = [] + key_to_line: dict[str, int] = {} + + if p.exists(): + for i, line in enumerate(p.read_text().splitlines()): + stripped = line.strip() + if stripped and not stripped.startswith("#") and "=" in stripped: + k, _, _ = stripped.partition("=") + key_to_line[k.strip()] = i + lines.append(line) + + for key, value in updates.items(): + if key in key_to_line: + lines[key_to_line[key]] = f"{key}={value}" + else: + lines.append(f"{key}={value}") + + p.write_text("\n".join(lines) + "\n") + + +def replica_keys(name: str) -> tuple[str, str]: + """Return the env var names for a replica (URL key, token key).""" + safe = name.upper().replace(" ", "_").replace("-", "_") + return f"REPLICA_{safe}_URL", f"REPLICA_{safe}_TOKEN" diff --git a/app/logger.py b/app/logger.py new file mode 100644 index 0000000..15a9b8b --- /dev/null +++ b/app/logger.py @@ -0,0 +1,73 @@ +import asyncio +import json +from datetime import datetime, timezone +from typing import Optional + +_sse_queues: list[asyncio.Queue] = [] + + +def subscribe_sse() -> asyncio.Queue: + q: asyncio.Queue = asyncio.Queue(maxsize=200) + _sse_queues.append(q) + return q + + +def unsubscribe_sse(q: asyncio.Queue) -> None: + try: + _sse_queues.remove(q) + except ValueError: + pass + + +def emit_log( + level: str, + message: str, + *, + replica: str | None = None, + replica_id: int | None = None, + doc_id: int | None = None, + run_id: int | None = None, + session=None, +) -> Optional[int]: + """Write to stdout JSON, optionally persist to DB and broadcast over SSE.""" + ts = datetime.now(timezone.utc).isoformat() + stdout_payload: dict = {"ts": ts, "level": level, "msg": message} + if replica: + stdout_payload["replica"] = replica + if doc_id is not None: + stdout_payload["doc_id"] = doc_id + print(json.dumps(stdout_payload), flush=True) + + log_id: Optional[int] = None + if session is not None: + from .models import Log + + log = Log( + run_id=run_id, + replica_id=replica_id, + level=level, + message=message, + doc_id=doc_id, + ) + session.add(log) + session.commit() + session.refresh(log) + log_id = log.id + + sse_data = json.dumps( + { + "id": log_id, + "ts": ts, + "level": level, + "message": message, + "replica_id": replica_id, + "doc_id": doc_id, + } + ) + for q in list(_sse_queues): + try: + q.put_nowait(sse_data) + except asyncio.QueueFull: + pass + + return log_id diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..ccfd9ba --- /dev/null +++ b/app/main.py @@ -0,0 +1,243 @@ +"""FastAPI application entry point with startup sequence and CLI.""" +import os +import sys +from contextlib import asynccontextmanager +from pathlib import Path + +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse, PlainTextResponse +from fastapi.templating import Jinja2Templates + +from .config import get_config +from .crypto import is_valid_fernet_key +from .database import create_db_and_tables, get_engine +from .logger import emit_log + + +# ── Startup sequence ────────────────────────────────────────────────────────── + + +def _startup_validate(): + config = get_config() + + # 1. Validate SECRET_KEY + if not config.secret_key: + sys.exit("FATAL: SECRET_KEY environment variable is required") + if not is_valid_fernet_key(config.secret_key): + sys.exit("FATAL: SECRET_KEY is not a valid Fernet key. " + "Generate one with: python -c \"from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())\"") + + # 2. Verify DB file path is writable + db_path = Path(config.db_path) + db_dir = db_path.parent + if not db_dir.exists(): + try: + db_dir.mkdir(parents=True, exist_ok=True) + except Exception as e: + sys.exit(f"FATAL: Cannot create DB directory {db_dir}: {e}") + if db_path.exists() and not os.access(db_path, os.W_OK): + sys.exit(f"FATAL: DB file {db_path} is not writable") + if not os.access(db_dir, os.W_OK): + sys.exit(f"FATAL: DB directory {db_dir} is not writable") + + +def _startup_cleanup(): + """Close orphaned sync_runs left by an unclean shutdown.""" + from datetime import datetime, timezone + from sqlmodel import Session, select + from .models import SyncRun + + engine = get_engine() + with Session(engine) as session: + orphans = session.exec( + select(SyncRun).where(SyncRun.finished_at == None) # noqa: E711 + ).all() + now = datetime.now(timezone.utc) + for run in orphans: + run.finished_at = now + run.timed_out = True + session.add(run) + emit_log( + "warning", + f"Closed orphaned sync_run #{run.id} (unclean shutdown)", + ) + if orphans: + session.commit() + + +def _startup_seed(): + """Seed settings from env vars on first boot.""" + config = get_config() + from sqlmodel import Session + from .crypto import encrypt + from .models import Setting + + engine = get_engine() + with Session(engine) as session: + def _set_if_absent(key: str, value: str) -> None: + existing = session.get(Setting, key) + if existing is None or existing.value is None: + session.add(Setting(key=key, value=value)) + + if config.master_url: + _set_if_absent("master_url", config.master_url) + if config.master_token: + encrypted = encrypt(config.master_token, config.secret_key) + _set_if_absent("master_token", encrypted) + session.commit() + + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup + _startup_validate() + create_db_and_tables() + _startup_cleanup() + _startup_seed() + emit_log("info", "pngx-controller starting") + + # Read sync interval from settings + from sqlmodel import Session, select + from .models import Setting + from .scheduler import SETTINGS_DEFAULTS + + with Session(get_engine()) as session: + row = session.get(Setting, "sync_interval_seconds") + interval = int(row.value if row and row.value else SETTINGS_DEFAULTS["sync_interval_seconds"]) + + from .scheduler import start_scheduler, stop_scheduler + start_scheduler(interval_seconds=interval) + emit_log("info", f"Scheduler started, sync interval: {interval}s") + + yield + + # Shutdown + stop_scheduler() + emit_log("info", "pngx-controller stopped") + + +# ── Application ─────────────────────────────────────────────────────────────── + + +def create_app() -> FastAPI: + app = FastAPI( + title="pngx-controller", + description="Paperless-ngx Central Sync Controller", + version="0.1.0", + lifespan=lifespan, + ) + + # Templates + templates_dir = Path(__file__).parent / "templates" + templates = Jinja2Templates(directory=str(templates_dir)) + + # Register UI routes (must come before API to avoid catch-all conflicts) + from .ui.routes import router as ui_router, setup_templates + setup_templates(templates) + app.include_router(ui_router) + + # API routers + from .api.replicas import router as replicas_router + from .api.sync import router as sync_router + from .api.logs import router as logs_router + from .api.settings import router as settings_router + from .api.status import router as status_router + + app.include_router(replicas_router) + app.include_router(sync_router) + app.include_router(logs_router) + app.include_router(settings_router) + app.include_router(status_router) + + # ── Health & Metrics (no auth) ──────────────────────────────────────────── + + @app.get("/healthz", include_in_schema=False) + def healthz(): + try: + engine = get_engine() + with engine.connect() as conn: + conn.exec_driver_sql("SELECT 1") + return {"status": "ok", "db": "ok"} + except Exception as e: + return JSONResponse(status_code=503, content={"status": "error", "db": str(e)}) + + @app.get("/metrics", include_in_schema=False) + def metrics(): + from prometheus_client import generate_latest, CONTENT_TYPE_LATEST + return PlainTextResponse(generate_latest(), media_type=CONTENT_TYPE_LATEST) + + return app + + +app = create_app() + + +# ── CLI entry point ─────────────────────────────────────────────────────────── + + +def rotate_key_cli() -> None: + """Re-encrypt all stored tokens: OLD_SECRET_KEY → NEW_SECRET_KEY.""" + old_key = os.environ.get("OLD_SECRET_KEY", "") + new_key = os.environ.get("NEW_SECRET_KEY", "") + if not old_key or not new_key: + sys.exit("FATAL: OLD_SECRET_KEY and NEW_SECRET_KEY must be set") + if not is_valid_fernet_key(old_key): + sys.exit("FATAL: OLD_SECRET_KEY is not a valid Fernet key") + if not is_valid_fernet_key(new_key): + sys.exit("FATAL: NEW_SECRET_KEY is not a valid Fernet key") + + # Temporarily set env so get_config() reads the old key for DB access + os.environ["SECRET_KEY"] = old_key + # Reset lru_cache so we pick up the new env + get_config.cache_clear() # type: ignore[attr-defined] + + create_db_and_tables() + + from sqlmodel import Session, select as sql_select + from .crypto import decrypt, encrypt + from .models import Replica, Setting + + engine = get_engine() + count = 0 + with Session(engine) as session: + for replica in session.exec(sql_select(Replica)).all(): + try: + plain = decrypt(replica.api_token, old_key) + replica.api_token = encrypt(plain, new_key) + session.add(replica) + count += 1 + except Exception as e: + print(f"WARNING: Could not re-encrypt token for replica {replica.name}: {e}", file=sys.stderr) + + for key in ("master_token", "alert_target_token"): + setting = session.get(Setting, key) + if setting and setting.value: + try: + plain = decrypt(setting.value, old_key) + setting.value = encrypt(plain, new_key) + session.add(setting) + count += 1 + except Exception as e: + print(f"WARNING: Could not re-encrypt {key}: {e}", file=sys.stderr) + + session.commit() + + print(f"rotate-key: re-encrypted {count} token(s) successfully") + print("Restart the container with the new SECRET_KEY.") + + +def cli_entry() -> None: + if len(sys.argv) > 1 and sys.argv[1] == "rotate-key": + rotate_key_cli() + else: + import uvicorn + uvicorn.run( + "app.main:app", + host=os.environ.get("BIND_HOST", "0.0.0.0"), + port=int(os.environ.get("PORT", "8000")), + log_level="warning", + ) + + +if __name__ == "__main__": + cli_entry() diff --git a/app/metrics.py b/app/metrics.py new file mode 100644 index 0000000..7c3bb7b --- /dev/null +++ b/app/metrics.py @@ -0,0 +1,36 @@ +from prometheus_client import Counter, Gauge, Histogram + +docs_total = Counter( + "pngx_sync_docs_total", + "Total documents synced", + ["replica", "status"], +) + +sync_duration = Histogram( + "pngx_sync_duration_seconds", + "Sync cycle duration in seconds", + ["triggered_by"], +) + +replica_lag = Gauge( + "pngx_replica_lag_seconds", + "Seconds since last successful sync", + ["replica"], +) + +replica_pending_tasks = Gauge( + "pngx_replica_pending_tasks", + "Pending tasks awaiting resolution", + ["replica"], +) + +replica_consecutive_failures = Gauge( + "pngx_replica_consecutive_failures", + "Consecutive sync cycle failures per replica", + ["replica"], +) + +sync_running = Gauge( + "pngx_sync_running", + "Whether a sync cycle is currently running", +) diff --git a/app/models.py b/app/models.py new file mode 100644 index 0000000..1d7982b --- /dev/null +++ b/app/models.py @@ -0,0 +1,69 @@ +from datetime import datetime +from typing import Optional + +from sqlalchemy import UniqueConstraint +from sqlmodel import Field, SQLModel + + +class Replica(SQLModel, table=True): + __tablename__ = "replicas" # type: ignore[assignment] + + id: Optional[int] = Field(default=None, primary_key=True) + name: str + url: str + api_token: str # Fernet-encrypted + enabled: bool = True + sync_interval_seconds: Optional[int] = None + last_sync_ts: Optional[datetime] = None + consecutive_failures: int = 0 + suspended_at: Optional[datetime] = None + last_alert_at: Optional[datetime] = None + created_at: datetime = Field(default_factory=datetime.utcnow) + + +class SyncMap(SQLModel, table=True): + __tablename__ = "sync_map" # type: ignore[assignment] + __table_args__ = (UniqueConstraint("replica_id", "master_doc_id"),) # type: ignore[assignment] + + id: Optional[int] = Field(default=None, primary_key=True) + replica_id: int = Field(foreign_key="replicas.id") + master_doc_id: int + replica_doc_id: Optional[int] = None + task_id: Optional[str] = None + last_synced: Optional[datetime] = None + file_checksum: Optional[str] = None + status: str = "pending" + error_msg: Optional[str] = None + retry_count: int = 0 + + +class SyncRun(SQLModel, table=True): + __tablename__ = "sync_runs" # type: ignore[assignment] + + id: Optional[int] = Field(default=None, primary_key=True) + replica_id: Optional[int] = Field(default=None, foreign_key="replicas.id") + started_at: Optional[datetime] = None + finished_at: Optional[datetime] = None + triggered_by: Optional[str] = None + docs_synced: int = 0 + docs_failed: int = 0 + timed_out: bool = False + + +class Log(SQLModel, table=True): + __tablename__ = "logs" # type: ignore[assignment] + + id: Optional[int] = Field(default=None, primary_key=True) + run_id: Optional[int] = Field(default=None, foreign_key="sync_runs.id") + replica_id: Optional[int] = Field(default=None, foreign_key="replicas.id") + level: Optional[str] = None + message: Optional[str] = None + doc_id: Optional[int] = None + created_at: datetime = Field(default_factory=datetime.utcnow) + + +class Setting(SQLModel, table=True): + __tablename__ = "settings" # type: ignore[assignment] + + key: str = Field(primary_key=True) + value: Optional[str] = None diff --git a/app/scheduler.py b/app/scheduler.py new file mode 100644 index 0000000..1fd5bb5 --- /dev/null +++ b/app/scheduler.py @@ -0,0 +1,63 @@ +import asyncio + +from apscheduler.schedulers.asyncio import AsyncIOScheduler + +_scheduler: AsyncIOScheduler | None = None + +SETTINGS_DEFAULTS = { + "sync_interval_seconds": "900", + "log_retention_days": "90", + "sync_cycle_timeout_seconds": "1800", + "task_poll_timeout_seconds": "600", + "replica_suspend_threshold": "5", + "max_concurrent_requests": "4", + "alert_target_type": "", + "alert_target_url": "", + "alert_target_token": "", + "alert_error_threshold": "5", + "alert_cooldown_seconds": "3600", +} + + +def get_setting(settings: dict, key: str) -> str: + return settings.get(key) or SETTINGS_DEFAULTS.get(key, "") + + +def get_setting_int(settings: dict, key: str) -> int: + return int(get_setting(settings, key) or SETTINGS_DEFAULTS[key]) + + +async def _sync_job() -> None: + from .sync.engine import run_sync_cycle + + await run_sync_cycle(triggered_by="scheduler") + + +def start_scheduler(interval_seconds: int = 900) -> AsyncIOScheduler: + global _scheduler + _scheduler = AsyncIOScheduler() + _scheduler.add_job( + _sync_job, + "interval", + seconds=interval_seconds, + id="sync_job", + replace_existing=True, + max_instances=1, + ) + _scheduler.start() + return _scheduler + + +def reschedule(interval_seconds: int) -> None: + if _scheduler is None: + return + _scheduler.reschedule_job( + "sync_job", + trigger="interval", + seconds=interval_seconds, + ) + + +def stop_scheduler() -> None: + if _scheduler: + _scheduler.shutdown(wait=False) diff --git a/app/sync/__init__.py b/app/sync/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/sync/engine.py b/app/sync/engine.py new file mode 100644 index 0000000..e80289c --- /dev/null +++ b/app/sync/engine.py @@ -0,0 +1,749 @@ +"""Core sync engine: runs the full sync cycle across all eligible replicas.""" +import asyncio +import hashlib +import sqlite3 +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Optional + +from sqlmodel import Session, select + +from ..config import get_config +from ..crypto import decrypt +from ..database import get_engine +from ..logger import emit_log +from ..models import Replica, SyncMap, SyncRun +from .. import metrics +from .paperless import PaperlessClient, PaperlessError + +_sync_lock = asyncio.Lock() + + +@dataclass +class SyncProgress: + running: bool = False + phase: str = "" + docs_done: int = 0 + docs_total: int = 0 + + +_progress = SyncProgress() + + +def get_progress() -> SyncProgress: + return _progress + + +async def run_sync_cycle( + triggered_by: str = "scheduler", + replica_id: Optional[int] = None, +) -> bool: + """Trigger a sync cycle in the background. Returns False if already running.""" + if _sync_lock.locked(): + return False + asyncio.create_task(_do_sync(triggered_by, replica_id)) + return True + + +async def _get_settings() -> dict: + from ..scheduler import SETTINGS_DEFAULTS + from ..models import Setting + + with Session(get_engine()) as s: + rows = s.exec(select(Setting)).all() + result = dict(SETTINGS_DEFAULTS) + for row in rows: + if row.value is not None: + result[row.key] = row.value + return result + + +async def _ensure_schema_parity( + master: PaperlessClient, + replica: PaperlessClient, +) -> dict: + """Create missing tags/correspondents/document_types/custom_fields on replica. + Returns maps: master_id → replica_id for each entity type.""" + master_tags = {t["name"]: t for t in await master.get_tags()} + replica_tags = {t["name"]: t for t in await replica.get_tags()} + tag_map: dict[int, int] = {} + for name, mt in master_tags.items(): + rt = replica_tags.get(name) or await replica.create_tag( + name, + color=mt.get("color", ""), + is_inbox_tag=mt.get("is_inbox_tag", False), + ) + tag_map[mt["id"]] = rt["id"] + + master_corrs = {c["name"]: c for c in await master.get_correspondents()} + replica_corrs = {c["name"]: c for c in await replica.get_correspondents()} + corr_map: dict[int, int] = {} + for name, mc in master_corrs.items(): + rc = replica_corrs.get(name) or await replica.create_correspondent(name) + corr_map[mc["id"]] = rc["id"] + + master_dts = {d["name"]: d for d in await master.get_document_types()} + replica_dts = {d["name"]: d for d in await replica.get_document_types()} + dt_map: dict[int, int] = {} + for name, mdt in master_dts.items(): + rdt = replica_dts.get(name) or await replica.create_document_type(name) + dt_map[mdt["id"]] = rdt["id"] + + master_cfs = {cf["name"]: cf for cf in await master.get_custom_fields()} + replica_cfs = {cf["name"]: cf for cf in await replica.get_custom_fields()} + cf_map: dict[int, int] = {} + for name, mcf in master_cfs.items(): + rcf = replica_cfs.get(name) or await replica.create_custom_field( + name, mcf.get("data_type", "string") + ) + cf_map[mcf["id"]] = rcf["id"] + + return { + "tags": tag_map, + "correspondents": corr_map, + "document_types": dt_map, + "custom_fields": cf_map, + } + + +def _translate_metadata(meta: dict, maps: dict) -> dict: + """Translate master entity IDs to replica entity IDs.""" + result: dict = { + "title": meta.get("title", ""), + "created": meta.get("created") or meta.get("created_date"), + "archive_serial_number": meta.get("archive_serial_number"), + } + if meta.get("correspondent") is not None: + result["correspondent"] = maps["correspondents"].get(meta["correspondent"]) + if meta.get("document_type") is not None: + result["document_type"] = maps["document_types"].get(meta["document_type"]) + result["tags"] = [ + maps["tags"][t] for t in meta.get("tags", []) if t in maps["tags"] + ] + cf_list = [] + for cf_entry in meta.get("custom_fields", []): + master_cf_id = cf_entry.get("field") + if master_cf_id in maps["custom_fields"]: + cf_list.append( + { + "field": maps["custom_fields"][master_cf_id], + "value": cf_entry.get("value"), + } + ) + result["custom_fields"] = cf_list + return result + + +def _sha256(data: bytes) -> str: + return hashlib.sha256(data).hexdigest() + + +async def _resolve_pending_tasks( + replica: PaperlessClient, + replica_obj: Replica, + task_poll_timeout: int, + run_id: int, + session: Session, +) -> tuple[int, int]: + """Resolve pending sync_map entries. Returns (resolved, failed).""" + pending = session.exec( + select(SyncMap).where( + SyncMap.replica_id == replica_obj.id, + SyncMap.status == "pending", + SyncMap.task_id.is_not(None), # type: ignore[union-attr] + ) + ).all() + + resolved = failed = 0 + now = datetime.now(timezone.utc) + + for entry in pending: + try: + task = await replica.get_task(entry.task_id) # type: ignore[arg-type] + status = task.get("status", "") + age_seconds = 0 + if entry.last_synced: + last = entry.last_synced + if last.tzinfo is None: + last = last.replace(tzinfo=timezone.utc) + age_seconds = (now - last).total_seconds() + + if not task or age_seconds > task_poll_timeout: + entry.status = "error" + entry.error_msg = "task timed out" + entry.retry_count += 1 + session.add(entry) + emit_log( + "warning", + f"Task timed out for doc {entry.master_doc_id}", + replica=replica_obj.name, + replica_id=replica_obj.id, + doc_id=entry.master_doc_id, + run_id=run_id, + session=session, + ) + failed += 1 + elif status == "SUCCESS": + # Extract replica_doc_id from task result + related = task.get("related_document") + if related is not None: + entry.replica_doc_id = int(str(related)) + entry.task_id = None + entry.status = "ok" + entry.last_synced = now + session.add(entry) + resolved += 1 + elif status in ("FAILURE", "REVOKED"): + entry.status = "error" + entry.error_msg = task.get("result", "task failed")[:500] + entry.retry_count += 1 + session.add(entry) + emit_log( + "warning", + f"Task failed for doc {entry.master_doc_id}: {entry.error_msg}", + replica=replica_obj.name, + replica_id=replica_obj.id, + doc_id=entry.master_doc_id, + run_id=run_id, + session=session, + ) + failed += 1 + # else: still PENDING/STARTED — leave it + except Exception as e: + emit_log( + "warning", + f"Could not check task for doc {entry.master_doc_id}: {e}", + replica=replica_obj.name, + replica_id=replica_obj.id, + run_id=run_id, + session=session, + ) + + if pending: + session.commit() + + return resolved, failed + + +async def _sync_replica( + replica_obj: Replica, + master: PaperlessClient, + changed_docs: list[dict], + settings: dict, + run_id: int, + engine, +) -> tuple[int, int]: + """Sync one replica. Returns (docs_synced, docs_failed).""" + config = get_config() + max_concurrent = int(settings.get("max_concurrent_requests", "4")) + task_poll_timeout = int(settings.get("task_poll_timeout_seconds", "600")) + + replica_token = decrypt(replica_obj.api_token, config.secret_key) + replica_semaphore = asyncio.Semaphore(max_concurrent) + + docs_synced = docs_failed = 0 + + async with PaperlessClient( + replica_obj.url, replica_token, replica_semaphore + ) as replica: + with Session(engine) as session: + # Step 5a: ensure schema parity + _progress.phase = f"schema parity — {replica_obj.name}" + try: + maps = await _ensure_schema_parity(master, replica) + except Exception as e: + emit_log( + "error", + f"Schema parity failed: {e}", + replica=replica_obj.name, + replica_id=replica_obj.id, + run_id=run_id, + session=session, + ) + raise + + # Step 5b: resolve pending tasks + _progress.phase = f"resolving tasks — {replica_obj.name}" + await _resolve_pending_tasks( + replica, replica_obj, task_poll_timeout, run_id, session + ) + + # Step 5c: collect docs to process + last_ts = replica_obj.last_sync_ts + if last_ts and last_ts.tzinfo is None: + last_ts = last_ts.replace(tzinfo=timezone.utc) + + docs_for_replica = [ + d + for d in changed_docs + if last_ts is None + or _parse_dt(d.get("modified", "")) is None + or _parse_dt(d.get("modified", "")) >= last_ts + ] + + # Include error-status docs (capped at 50) + error_entries = session.exec( + select(SyncMap).where( + SyncMap.replica_id == replica_obj.id, + SyncMap.status == "error", + ) + ).all()[:50] + error_doc_ids = {e.master_doc_id for e in error_entries} + existing_ids = {d["id"] for d in docs_for_replica} + for e in error_entries: + if e.master_doc_id not in existing_ids: + docs_for_replica.append({"id": e.master_doc_id, "_retry": True}) + + _progress.docs_total = len(docs_for_replica) + _progress.docs_done = 0 + _progress.phase = f"syncing {replica_obj.name}" + + # Step 5d: process each document + for doc_stub in docs_for_replica: + doc_id = doc_stub["id"] + try: + # Fetch full metadata from master + meta = await master.get_document(doc_id) + file_bytes = await master.download_document(doc_id, original=True) + checksum = _sha256(file_bytes) + filename = meta.get("original_file_name") or f"document-{doc_id}.pdf" + translated = _translate_metadata(meta, maps) + + existing = session.exec( + select(SyncMap).where( + SyncMap.replica_id == replica_obj.id, + SyncMap.master_doc_id == doc_id, + ) + ).first() + + if existing and existing.replica_doc_id is not None and existing.status == "ok": + # Update metadata on replica + await replica.patch_document(existing.replica_doc_id, translated) + existing.last_synced = datetime.now(timezone.utc) + existing.file_checksum = checksum + session.add(existing) + session.commit() + docs_synced += 1 + emit_log( + "info", + f"Updated doc {doc_id} → replica {existing.replica_doc_id}", + replica=replica_obj.name, + replica_id=replica_obj.id, + doc_id=doc_id, + run_id=run_id, + session=session, + ) + else: + # Upload new document + task_id = await master_post_to_replica( + replica, file_bytes, filename, translated + ) + now = datetime.now(timezone.utc) + if existing: + existing.task_id = task_id + existing.status = "pending" + existing.replica_doc_id = None + existing.file_checksum = checksum + existing.last_synced = now + existing.retry_count = existing.retry_count + 1 + session.add(existing) + else: + entry = SyncMap( + replica_id=replica_obj.id, + master_doc_id=doc_id, + task_id=task_id, + status="pending", + file_checksum=checksum, + last_synced=now, + ) + session.add(entry) + session.commit() + emit_log( + "info", + f"Uploaded doc {doc_id}, task {task_id}", + replica=replica_obj.name, + replica_id=replica_obj.id, + doc_id=doc_id, + run_id=run_id, + session=session, + ) + + except Exception as e: + docs_failed += 1 + emit_log( + "error", + f"Failed to sync doc {doc_id}: {e}", + replica=replica_obj.name, + replica_id=replica_obj.id, + doc_id=doc_id, + run_id=run_id, + session=session, + ) + # Mark as error in sync_map + existing = session.exec( + select(SyncMap).where( + SyncMap.replica_id == replica_obj.id, + SyncMap.master_doc_id == doc_id, + ) + ).first() + if existing: + existing.status = "error" + existing.error_msg = str(e)[:500] + session.add(existing) + session.commit() + + _progress.docs_done += 1 + metrics.docs_total.labels( + replica=replica_obj.name, + status="ok" if docs_failed == 0 else "error", + ).inc() + + return docs_synced, docs_failed + + +async def master_post_to_replica( + replica: PaperlessClient, + file_bytes: bytes, + filename: str, + metadata: dict, +) -> str: + return await replica.post_document(file_bytes, filename, metadata) + + +def _parse_dt(s: str) -> datetime | None: + if not s: + return None + try: + dt = datetime.fromisoformat(s.replace("Z", "+00:00")) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + except Exception: + return None + + +async def _do_sync(triggered_by: str, target_replica_id: Optional[int]) -> None: + global _progress + + async with _sync_lock: + _progress = SyncProgress(running=True, phase="starting") + metrics.sync_running.set(1) + + config = get_config() + engine = get_engine() + start_time = datetime.now(timezone.utc) + run_id: Optional[int] = None + + try: + settings = await _get_settings() + + master_url = settings.get("master_url", "") + master_token_enc = settings.get("master_token", "") + if not master_url or not master_token_enc: + emit_log("error", "Master URL or token not configured") + return + + master_token = decrypt(master_token_enc, config.secret_key) + max_concurrent = int(settings.get("max_concurrent_requests", "4")) + sync_cycle_timeout = int(settings.get("sync_cycle_timeout_seconds", "1800")) + suspend_threshold = int(settings.get("replica_suspend_threshold", "5")) + + # Create sync_run record + with Session(engine) as session: + sync_run = SyncRun( + replica_id=target_replica_id, + started_at=start_time, + triggered_by=triggered_by, + ) + session.add(sync_run) + session.commit() + session.refresh(sync_run) + run_id = sync_run.id + + # Determine eligible replicas + with Session(engine) as session: + stmt = select(Replica).where(Replica.enabled == True) # noqa: E712 + if target_replica_id: + stmt = stmt.where(Replica.id == target_replica_id) + all_replicas = session.exec(stmt).all() + + now = datetime.now(timezone.utc) + eligible: list[Replica] = [] + for r in all_replicas: + if r.suspended_at is not None: + continue + if r.sync_interval_seconds is not None and r.last_sync_ts: + last = r.last_sync_ts + if last.tzinfo is None: + last = last.replace(tzinfo=timezone.utc) + if (now - last).total_seconds() < r.sync_interval_seconds: + continue + eligible.append(r) + + if not eligible: + emit_log("info", "No eligible replicas for this cycle") + _close_run(engine, run_id, 0, 0, False) + return + + # Find min last_sync_ts for master query. + # If ANY eligible replica has never synced, fetch ALL master docs. + any_never_synced = any(r.last_sync_ts is None for r in eligible) + if any_never_synced: + modified_gte = None + else: + last_sync_times = [r.last_sync_ts for r in eligible] # type: ignore[misc] + min_ts = min( + (t if t.tzinfo else t.replace(tzinfo=timezone.utc)) + for t in last_sync_times + ) + modified_gte = min_ts.isoformat() + + master_semaphore = asyncio.Semaphore(max_concurrent) + result_container = [0, 0] + + try: + await asyncio.wait_for( + _run_all_replicas( + eligible=eligible, + master_url=master_url, + master_token=master_token, + master_semaphore=master_semaphore, + modified_gte=modified_gte, + settings=settings, + run_id=run_id, + suspend_threshold=suspend_threshold, + engine=engine, + start_time=start_time, + result_container=result_container, + ), + timeout=sync_cycle_timeout, + ) + + except asyncio.TimeoutError: + emit_log( + "warning", + f"Sync cycle timed out after {sync_cycle_timeout}s", + ) + _close_run(engine, run_id, 0, 0, True) + return + + _close_run(engine, run_id, result_container[0], result_container[1], False) + _do_backup(config.db_path) + + except Exception as e: + emit_log("error", f"Sync cycle crashed: {e}") + if run_id: + _close_run(engine, run_id, 0, 0, False) + finally: + elapsed = (datetime.now(timezone.utc) - start_time).total_seconds() + metrics.sync_duration.labels(triggered_by=triggered_by).observe(elapsed) + metrics.sync_running.set(0) + _progress = SyncProgress(running=False) + + +async def _run_all_replicas( + *, + eligible: list[Replica], + master_url: str, + master_token: str, + master_semaphore: asyncio.Semaphore, + modified_gte: str | None, + settings: dict, + run_id: int, + suspend_threshold: int, + engine, + start_time: datetime, + result_container: list, +) -> None: + """Fetch changed docs once, then sync each replica.""" + _progress.phase = "fetching master documents" + + async with PaperlessClient(master_url, master_token, master_semaphore) as master: + changed_docs = await master.get_all_documents(modified_gte=modified_gte) + + total_synced = total_failed = 0 + + for replica_obj in eligible: + _progress.phase = f"syncing {replica_obj.name}" + try: + async with PaperlessClient( + master_url, master_token, master_semaphore + ) as master: + synced, failed = await _sync_replica( + replica_obj=replica_obj, + master=master, + changed_docs=changed_docs, + settings=settings, + run_id=run_id, + engine=engine, + ) + total_synced += synced + total_failed += failed + + # Update replica success state + with Session(engine) as session: + r = session.get(Replica, replica_obj.id) + if r: + r.last_sync_ts = start_time + r.consecutive_failures = 0 + session.add(r) + session.commit() + + metrics.replica_consecutive_failures.labels(replica=replica_obj.name).set(0) + + # Check alert threshold + alert_threshold = int(settings.get("alert_error_threshold", "5")) + if failed >= alert_threshold: + await _send_alert( + replica_obj, + "sync_failures_threshold", + {"docs_synced": synced, "docs_failed": failed}, + settings, + engine, + ) + + except Exception as e: + emit_log( + "error", + f"Replica sync failed: {e}", + replica=replica_obj.name, + replica_id=replica_obj.id, + run_id=run_id, + ) + total_failed += 1 + + with Session(engine) as session: + r = session.get(Replica, replica_obj.id) + if r: + r.consecutive_failures += 1 + if r.consecutive_failures >= suspend_threshold: + r.suspended_at = datetime.now(timezone.utc) + emit_log( + "error", + f"Replica {r.name} suspended after {r.consecutive_failures} consecutive failures", + replica=r.name, + replica_id=r.id, + ) + await _send_alert( + r, + "replica_suspended", + {"docs_synced": 0, "docs_failed": 1}, + settings, + engine, + ) + session.add(r) + session.commit() + + metrics.replica_consecutive_failures.labels( + replica=replica_obj.name + ).set(replica_obj.consecutive_failures + 1) + + # Update Prometheus lag + with Session(engine) as session: + r = session.get(Replica, replica_obj.id) + if r and r.last_sync_ts: + ts = r.last_sync_ts + if ts.tzinfo is None: + ts = ts.replace(tzinfo=timezone.utc) + lag = (datetime.now(timezone.utc) - ts).total_seconds() + metrics.replica_lag.labels(replica=replica_obj.name).set(lag) + + result_container[0] = total_synced + result_container[1] = total_failed + + +async def _send_alert( + replica: Replica, + event: str, + run_stats: dict, + settings: dict, + engine, +) -> None: + import httpx + + target_type = settings.get("alert_target_type", "") + target_url = settings.get("alert_target_url", "") + cooldown = int(settings.get("alert_cooldown_seconds", "3600")) + + if not target_type or not target_url: + return + + now = datetime.now(timezone.utc) + if replica.last_alert_at: + last = replica.last_alert_at + if last.tzinfo is None: + last = last.replace(tzinfo=timezone.utc) + if (now - last).total_seconds() < cooldown: + return + + payload = { + "event": event, + "replica": replica.name, + "replica_url": replica.url, + "consecutive_failures": replica.consecutive_failures, + "docs_failed": run_stats.get("docs_failed", 0), + "docs_synced": run_stats.get("docs_synced", 0), + "timestamp": now.isoformat(), + } + + config = get_config() + token_enc = settings.get("alert_target_token", "") + token = decrypt(token_enc, config.secret_key) if token_enc else "" + + try: + async with httpx.AsyncClient(timeout=10.0) as client: + if target_type == "gotify": + await client.post( + f"{target_url}/message", + json={ + "title": "pngx-controller alert", + "message": str(payload), + "priority": 7, + }, + headers={"X-Gotify-Key": token}, + ) + elif target_type == "webhook": + headers = {} + if token: + headers["Authorization"] = token + await client.post(target_url, json=payload, headers=headers) + + with Session(engine) as session: + r = session.get(Replica, replica.id) + if r: + r.last_alert_at = now + session.add(r) + session.commit() + except Exception as e: + emit_log("warning", f"Alert send failed: {e}") + + +def _close_run( + engine, run_id: int, synced: int, failed: int, timed_out: bool +) -> None: + with Session(engine) as session: + sr = session.get(SyncRun, run_id) + if sr: + sr.finished_at = datetime.now(timezone.utc) + sr.docs_synced = synced + sr.docs_failed = failed + sr.timed_out = timed_out + session.add(sr) + session.commit() + + +def _do_backup(db_path: str) -> None: + """Copy DB to .bak file after a successful sync run.""" + import os + + bak_path = db_path + ".bak" + try: + import sqlite3 as _sqlite3 + + src = _sqlite3.connect(db_path) + dst = _sqlite3.connect(bak_path) + src.backup(dst) + dst.close() + src.close() + except Exception as e: + emit_log("warning", f"DB backup failed: {e}") diff --git a/app/sync/paperless.py b/app/sync/paperless.py new file mode 100644 index 0000000..1de0fd5 --- /dev/null +++ b/app/sync/paperless.py @@ -0,0 +1,209 @@ +"""Paperless-ngx REST API client with retry/backoff and semaphore throttling.""" +import asyncio +import time +from typing import Any + +import httpx + + +class PaperlessError(Exception): + pass + + +class PaperlessClient: + def __init__(self, url: str, token: str, semaphore: asyncio.Semaphore) -> None: + self.base_url = url.rstrip("/") + self.token = token + self.semaphore = semaphore + self._client: httpx.AsyncClient | None = None + + async def __aenter__(self) -> "PaperlessClient": + self._client = httpx.AsyncClient( + headers={"Authorization": f"Token {self.token}"}, + timeout=120.0, + ) + return self + + async def __aexit__(self, *args) -> None: + if self._client: + await self._client.aclose() + self._client = None + + async def _request(self, method: str, path: str, **kwargs) -> httpx.Response: + assert self._client is not None, "Use as async context manager" + url = f"{self.base_url}{path}" + delays = [2, 4, 8] + last_exc: Exception | None = None + + for attempt in range(3): + try: + async with self.semaphore: + r = await self._client.request(method, url, **kwargs) + r.raise_for_status() + return r + except (httpx.NetworkError, httpx.TimeoutException, httpx.ConnectError) as e: + last_exc = e + if attempt < 2: + await asyncio.sleep(delays[attempt]) + except httpx.HTTPStatusError as e: + if e.response.status_code >= 500: + last_exc = e + if attempt < 2: + await asyncio.sleep(delays[attempt]) + else: + raise PaperlessError( + f"HTTP {e.response.status_code} {method} {path}: {e.response.text[:300]}" + ) from e + + raise PaperlessError(f"Request failed after 3 attempts: {last_exc}") from last_exc + + async def _get_all(self, path: str, params: dict | None = None) -> list[dict]: + """Paginate through all results.""" + results: list[dict] = [] + page = 1 + base_params = dict(params or {}) + base_params["page_size"] = 100 + while True: + r = await self._request("GET", path, params={**base_params, "page": page}) + data = r.json() + results.extend(data.get("results", [])) + if not data.get("next"): + break + page += 1 + return results + + # ── Documents ────────────────────────────────────────────────────────────── + + async def get_documents_page( + self, + page: int = 1, + modified_gte: str | None = None, + page_size: int = 100, + ) -> dict: + params: dict[str, Any] = { + "ordering": "modified", + "page_size": page_size, + "page": page, + } + if modified_gte: + params["modified__gte"] = modified_gte + r = await self._request("GET", "/api/documents/", params=params) + return r.json() + + async def get_all_documents(self, modified_gte: str | None = None) -> list[dict]: + docs: list[dict] = [] + page = 1 + while True: + data = await self.get_documents_page(page=page, modified_gte=modified_gte) + docs.extend(data.get("results", [])) + if not data.get("next"): + break + page += 1 + return docs + + async def get_document(self, doc_id: int) -> dict: + r = await self._request("GET", f"/api/documents/{doc_id}/") + return r.json() + + async def download_document(self, doc_id: int, original: bool = True) -> bytes: + params: dict[str, Any] = {} + if not original: + params["original"] = "false" + r = await self._request("GET", f"/api/documents/{doc_id}/download/", params=params) + return r.content + + async def post_document( + self, file_bytes: bytes, filename: str, metadata: dict + ) -> str: + """Upload a document; returns the Celery task_id UUID string.""" + form: list[tuple[str, str]] = [] + for key in ("title", "created", "archive_serial_number"): + val = metadata.get(key) + if val is not None: + form.append((key, str(val))) + if metadata.get("correspondent") is not None: + form.append(("correspondent", str(metadata["correspondent"]))) + if metadata.get("document_type") is not None: + form.append(("document_type", str(metadata["document_type"]))) + for tag_id in metadata.get("tags", []): + form.append(("tags", str(tag_id))) + + r = await self._request( + "POST", + "/api/documents/post_document/", + files={"document": (filename, file_bytes, "application/octet-stream")}, + data=form, + ) + result = r.json() + # API returns a plain task UUID string + if isinstance(result, str): + return result + # Some versions wrap it + if isinstance(result, dict): + return result.get("task_id", result.get("id", "")) + return str(result) + + async def patch_document(self, doc_id: int, metadata: dict) -> dict: + r = await self._request("PATCH", f"/api/documents/{doc_id}/", json=metadata) + return r.json() + + async def get_task(self, task_id: str) -> dict: + r = await self._request("GET", "/api/tasks/", params={"task_id": task_id}) + results = r.json() + if isinstance(results, list) and results: + return results[0] + return {} + + # ── Metadata entities ────────────────────────────────────────────────────── + + async def get_tags(self) -> list[dict]: + return await self._get_all("/api/tags/") + + async def get_correspondents(self) -> list[dict]: + return await self._get_all("/api/correspondents/") + + async def get_document_types(self) -> list[dict]: + return await self._get_all("/api/document_types/") + + async def get_custom_fields(self) -> list[dict]: + return await self._get_all("/api/custom_fields/") + + async def create_tag(self, name: str, **kwargs) -> dict: + r = await self._request("POST", "/api/tags/", json={"name": name, **kwargs}) + return r.json() + + async def create_correspondent(self, name: str, **kwargs) -> dict: + r = await self._request( + "POST", "/api/correspondents/", json={"name": name, **kwargs} + ) + return r.json() + + async def create_document_type(self, name: str, **kwargs) -> dict: + r = await self._request( + "POST", "/api/document_types/", json={"name": name, **kwargs} + ) + return r.json() + + async def create_custom_field(self, name: str, data_type: str, **kwargs) -> dict: + r = await self._request( + "POST", + "/api/custom_fields/", + json={"name": name, "data_type": data_type, **kwargs}, + ) + return r.json() + + async def test_connection(self) -> dict: + """Returns {ok, error, latency_ms, doc_count}.""" + t0 = time.monotonic() + try: + r = await self._request("GET", "/api/documents/", params={"page_size": 1}) + elapsed = int((time.monotonic() - t0) * 1000) + data = r.json() + return { + "ok": True, + "error": None, + "latency_ms": elapsed, + "doc_count": data.get("count", 0), + } + except Exception as e: + return {"ok": False, "error": str(e), "latency_ms": 0, "doc_count": 0} diff --git a/app/sync/reconcile.py b/app/sync/reconcile.py new file mode 100644 index 0000000..b61226f --- /dev/null +++ b/app/sync/reconcile.py @@ -0,0 +1,131 @@ +"""Reconcile mode: match existing replica documents to master without re-uploading.""" +import asyncio +from datetime import datetime, timezone + +from sqlmodel import Session, select + +from ..config import get_config +from ..crypto import decrypt +from ..database import get_engine +from ..logger import emit_log +from ..models import Replica, SyncMap +from .paperless import PaperlessClient + + +async def run_reconcile(replica_id: int) -> dict: + """ + Match replica documents to master by ASN / (title + created_date). + Populates sync_map without uploading files. + Returns {matched, unmatched, errors}. + """ + config = get_config() + engine = get_engine() + + from ..models import Setting + from ..scheduler import SETTINGS_DEFAULTS + + with Session(engine) as session: + settings = {s.key: s.value for s in session.exec(select(Setting)).all()} + replica_obj = session.get(Replica, replica_id) + if not replica_obj: + raise ValueError(f"Replica {replica_id} not found") + + master_url = settings.get("master_url", "") + master_token_enc = settings.get("master_token", "") + if not master_url or not master_token_enc: + raise ValueError("Master URL or token not configured") + + master_token = decrypt(master_token_enc, config.secret_key) + replica_token = decrypt(replica_obj.api_token, config.secret_key) + max_concurrent = int(settings.get("max_concurrent_requests") or SETTINGS_DEFAULTS["max_concurrent_requests"]) + + master_sem = asyncio.Semaphore(max_concurrent) + replica_sem = asyncio.Semaphore(max_concurrent) + + matched = unmatched = errors = 0 + + async with PaperlessClient(master_url, master_token, master_sem) as master: + async with PaperlessClient(replica_obj.url, replica_token, replica_sem) as replica: + # Build replica index: asn → doc, (title, date) → doc + emit_log("info", "Reconcile: indexing replica documents", replica=replica_obj.name) + replica_docs = await replica.get_all_documents() + asn_index: dict[int, dict] = {} + title_date_index: dict[tuple, dict] = {} + for doc in replica_docs: + asn = doc.get("archive_serial_number") + if asn is not None: + asn_index[int(asn)] = doc + title = (doc.get("title", "") or "").strip().lower() + created = str(doc.get("created") or doc.get("created_date") or "")[:10] + if title: + title_date_index[(title, created)] = doc + + # Walk master documents + emit_log("info", "Reconcile: indexing master documents", replica=replica_obj.name) + master_docs = await master.get_all_documents() + now = datetime.now(timezone.utc) + + with Session(engine) as session: + for mdoc in master_docs: + master_id = mdoc["id"] + + # Skip if already in sync_map + existing = session.exec( + select(SyncMap).where( + SyncMap.replica_id == replica_id, + SyncMap.master_doc_id == master_id, + ) + ).first() + if existing: + continue + + # Try to match + replica_match: dict | None = None + masn = mdoc.get("archive_serial_number") + if masn is not None and int(masn) in asn_index: + replica_match = asn_index[int(masn)] + else: + mtitle = (mdoc.get("title", "") or "").strip().lower() + mcreated = str( + mdoc.get("created") or mdoc.get("created_date") or "" + )[:10] + if mtitle: + replica_match = title_date_index.get((mtitle, mcreated)) + + if replica_match: + try: + file_bytes = await master.download_document(master_id) + import hashlib + checksum = hashlib.sha256(file_bytes).hexdigest() + except Exception: + checksum = None + + entry = SyncMap( + replica_id=replica_id, + master_doc_id=master_id, + replica_doc_id=replica_match["id"], + status="ok", + file_checksum=checksum, + last_synced=now, + ) + session.add(entry) + matched += 1 + else: + unmatched += 1 + + try: + session.commit() + except Exception as e: + errors += 1 + emit_log( + "error", + f"Reconcile DB commit failed: {e}", + replica=replica_obj.name, + ) + + emit_log( + "info", + f"Reconcile complete: {matched} matched, {unmatched} unmatched, {errors} errors", + replica=replica_obj.name, + ) + return {"matched": matched, "unmatched": unmatched, "errors": errors} diff --git a/app/templates/base.html b/app/templates/base.html new file mode 100644 index 0000000..d98ba74 --- /dev/null +++ b/app/templates/base.html @@ -0,0 +1,55 @@ + + + + + + {% block title %}pngx-controller{% endblock %} + + + + + + +
+ +
+ +
+ {% block content %}{% endblock %} +
+ + + + diff --git a/app/templates/dashboard.html b/app/templates/dashboard.html new file mode 100644 index 0000000..9a38f2e --- /dev/null +++ b/app/templates/dashboard.html @@ -0,0 +1,136 @@ +{% extends "base.html" %} +{% block title %}Dashboard — pngx-controller{% endblock %} +{% block content %} + +
+

Dashboard

+
+ +
+
+ + +
+
+
+ {% if progress.running %} + {{ progress.phase }} + {% if progress.docs_total > 0 %} + — {{ progress.docs_done }} / {{ progress.docs_total }} documents + + {% else %} + + {% endif %} + {% endif %} +
+
+
+ + +{% if last_run %} +
+ + Last sync run: + {% if last_run.finished_at %} + finished {{ last_run.finished_at.strftime('%Y-%m-%d %H:%M:%S') }} UTC + — {{ last_run.docs_synced }} synced, {{ last_run.docs_failed }} failed + {% if last_run.timed_out %}timed out{% endif %} + {% else %} + running… + {% endif %} + +

Triggered by: {{ last_run.triggered_by }} — Run #{{ last_run.id }}

+
+{% endif %} + + +

Replicas

+{% if replica_rows %} +
+ + + + + + + + + + + + + {% for row in replica_rows %} + + + + + + + + + {% endfor %} + +
NameURLStatusLagLast runActions
{{ row.replica.name }}{{ row.replica.url }} + {{ row.status }} + {% if row.replica.suspended_at %} +
{{ row.replica.consecutive_failures }} failures + {% endif %} +
{{ row.lag }} + {% if row.last_run %} + + ✓ {{ row.last_run.docs_synced }} + {% if row.last_run.docs_failed %} · ✗ {{ row.last_run.docs_failed }}{% endif %} + + {% else %} + never + {% endif %} + + Details + {% if row.replica.suspended_at %} + + {% endif %} +
+
+{% else %} +

No replicas configured. Add one →

+{% endif %} + + + +{% endblock %} diff --git a/app/templates/logs.html b/app/templates/logs.html new file mode 100644 index 0000000..f5c8f6e --- /dev/null +++ b/app/templates/logs.html @@ -0,0 +1,115 @@ +{% extends "base.html" %} +{% block title %}Logs — pngx-controller{% endblock %} +{% block content %} + +

Logs

+ +
+ + + + + +
+ + +
+ Live log stream +
+
+ Connecting to log stream… +
+
+
+ +
+ + +
+ {% include "partials/log_table.html" %} +
+ + + +{% endblock %} diff --git a/app/templates/partials/log_table.html b/app/templates/partials/log_table.html new file mode 100644 index 0000000..1657f4c --- /dev/null +++ b/app/templates/partials/log_table.html @@ -0,0 +1,28 @@ +{% if logs %} +
+ + + + + + + + + + + + {% for log in logs %} + + + + + + + + {% endfor %} + +
TimeLevelReplicaDoc IDMessage
{{ log.created_at.strftime('%Y-%m-%d %H:%M:%S') if log.created_at else '' }}{{ log.level or 'info' }}{{ log.replica_id or '' }}{{ log.doc_id or '' }}{{ log.message or '' }}
+
+{% else %} +

No log entries.

+{% endif %} diff --git a/app/templates/replica_detail.html b/app/templates/replica_detail.html new file mode 100644 index 0000000..785be76 --- /dev/null +++ b/app/templates/replica_detail.html @@ -0,0 +1,152 @@ +{% extends "base.html" %} +{% block title %}{{ replica.name }} — pngx-controller{% endblock %} +{% block content %} + + + +
+

{{ replica.name }}

+
+ + + {% if replica.suspended_at %} + + {% endif %} +
+
+ + + +
+
+
URL
+ {{ replica.url }} +
+
+
Status
+ {% if replica.suspended_at %} + suspended +
{{ replica.consecutive_failures }} consecutive failures + {% elif replica.last_sync_ts %} + synced + {% else %} + pending + {% endif %} +
+
+
Last sync
+ {% if replica.last_sync_ts %} + {{ replica.last_sync_ts.strftime('%Y-%m-%d %H:%M:%S') }} UTC +
{{ lag }} + {% else %} + never + {% endif %} +
+
+
Interval
+ {% if replica.sync_interval_seconds %}{{ replica.sync_interval_seconds }}s{% else %}global{% endif %} +
+
+ +

Sync Run History (last 20)

+{% if recent_runs %} +
+ + + + + + {% for run in recent_runs %} + + + + + + + + + + {% endfor %} + +
#StartedDurationSyncedFailedTriggered by
{{ run.id }}{{ run.started_at.strftime('%Y-%m-%d %H:%M:%S') if run.started_at else '—' }} + {% if run.started_at and run.finished_at %} + {% set dur = (run.finished_at - run.started_at).total_seconds()|int %} + {% if dur < 60 %}{{ dur }}s + {% else %}{{ dur // 60 }}m {{ dur % 60 }}s{% endif %} + {% elif run.started_at %} + running… + {% else %}—{% endif %} + {{ run.docs_synced }} + {% if run.docs_failed %} + {{ run.docs_failed }} + {% else %}0{% endif %} + {{ run.triggered_by }}{% if run.timed_out %}timed out{% endif %}
+
+{% else %} +

No sync runs yet.

+{% endif %} + +

Sync Map (last 50)

+{% if sync_map_page %} +
+ + + + + + {% for entry in sync_map_page %} + + + + + + + + + {% endfor %} + +
Master IDReplica IDStatusLast syncedRetriesError
{{ entry.master_doc_id }}{{ entry.replica_doc_id or '—' }}{{ entry.status }}{{ entry.last_synced.strftime('%Y-%m-%d %H:%M') if entry.last_synced else '—' }}{{ entry.retry_count }}{{ entry.error_msg or '' }}
+
+{% else %} +

No sync map entries yet.

+{% endif %} + +
+ Danger Zone +
+

Full resync wipes the sync map for this replica and re-syncs everything from scratch.

+ + + +
+
+ +{% endblock %} diff --git a/app/templates/replicas.html b/app/templates/replicas.html new file mode 100644 index 0000000..da1b325 --- /dev/null +++ b/app/templates/replicas.html @@ -0,0 +1,193 @@ +{% extends "base.html" %} +{% block title %}Replicas — pngx-controller{% endblock %} +{% block content %} + +

Replicas

+ +{% if replicas %} +
+ + + + + + + + + + + + + {% for r in replicas %} + + + + + + + + + {% endfor %} + +
NameURLEnabledIntervalLast SyncActions
{{ r.name }}{{ r.url }}{% if r.enabled %}✅{% else %}⛔{% endif %} + {% if r.sync_interval_seconds %}{{ r.sync_interval_seconds }}s{% else %}global{% endif %} + + {% if r.last_sync_ts %}{{ r.last_sync_ts.strftime('%Y-%m-%d %H:%M') }}{% else %}never{% endif %} + {% if r.suspended_at %} +
suspended · {{ r.consecutive_failures }} failures + {% endif %} +
+ Detail + + {% if r.suspended_at %} + + {% endif %} + + + +
+
+{% else %} +

No replicas yet.

+{% endif %} + +
+ +

Add Replica

+ +{% if env_replicas %} +
+ From pngx.env — {{ env_replicas|length }} not yet added +
+ {% for er in env_replicas %} + {% set er_name = er.safe | lower | replace('_', '-') %} + + {% endfor %} +
+
+{% endif %} + +
+
+
+ + + + + +
+
+ +
+
+ + + +
+

Edit Replica

+
+ + + + + + +
+ +
+
+
+ + + +{% endblock %} diff --git a/app/templates/settings.html b/app/templates/settings.html new file mode 100644 index 0000000..609b1c6 --- /dev/null +++ b/app/templates/settings.html @@ -0,0 +1,169 @@ +{% extends "base.html" %} +{% block title %}Settings — pngx-controller{% endblock %} +{% block content %} + +

Settings

+ +
+

Master Instance

+
+ + +
+ +

Sync Engine

+
+ + + + + + +
+ +

Notifications

+
+ + + + + +
+ +
+ + + +
+
+ + + +{% endblock %} diff --git a/app/ui/__init__.py b/app/ui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/ui/routes.py b/app/ui/routes.py new file mode 100644 index 0000000..bfbe438 --- /dev/null +++ b/app/ui/routes.py @@ -0,0 +1,204 @@ +"""Jinja2 HTML routes for the web UI.""" +from datetime import datetime, timezone +from typing import Optional + +from fastapi import APIRouter, Depends, Request +from fastapi.responses import HTMLResponse +from fastapi.templating import Jinja2Templates +from sqlmodel import Session, select + +from ..database import get_session +from ..models import Log, Replica, SyncRun +from ..sync.engine import get_progress + +router = APIRouter(tags=["ui"]) +templates: Optional[Jinja2Templates] = None + + +def setup_templates(t: Jinja2Templates) -> None: + global templates + templates = t + + +def _tmpl(name: str, request: Request, ctx: dict) -> HTMLResponse: + assert templates is not None + return templates.TemplateResponse(name, {"request": request, **ctx}) + + +def _lag_str(ts: datetime | None) -> str: + if not ts: + return "never" + if ts.tzinfo is None: + ts = ts.replace(tzinfo=timezone.utc) + seconds = int((datetime.now(timezone.utc) - ts).total_seconds()) + if seconds < 120: + return f"{seconds}s ago" + if seconds < 7200: + return f"{seconds // 60}m ago" + if seconds < 172800: + return f"{seconds // 3600}h ago" + return f"{seconds // 86400}d ago" + + +@router.get("/", response_class=HTMLResponse) +def dashboard(request: Request, session: Session = Depends(get_session)): + replicas = session.exec(select(Replica)).all() + now = datetime.now(timezone.utc) + progress = get_progress() + + replica_rows = [] + for r in replicas: + last_run = session.exec( + select(SyncRun) + .where(SyncRun.replica_id == r.id) + .order_by(SyncRun.started_at.desc()) # type: ignore[attr-defined] + .limit(1) + ).first() + + if r.suspended_at: + status = "suspended" + elif progress.running and r.name in (progress.phase or ""): + status = "syncing" + elif r.consecutive_failures > 0: + status = "error" + elif r.last_sync_ts: + status = "synced" + else: + status = "pending" + + replica_rows.append( + { + "replica": r, + "status": status, + "lag": _lag_str(r.last_sync_ts), + "last_run": last_run, + } + ) + + last_run = session.exec( + select(SyncRun) + .order_by(SyncRun.started_at.desc()) # type: ignore[attr-defined] + .limit(1) + ).first() + + return _tmpl( + "dashboard.html", + request, + { + "replica_rows": replica_rows, + "last_run": last_run, + "progress": progress, + }, + ) + + +@router.get("/replicas", response_class=HTMLResponse) +def replicas_page(request: Request, session: Session = Depends(get_session)): + from .. import envfile + + replicas = session.exec(select(Replica).order_by(Replica.created_at)).all() # type: ignore[attr-defined] + env = envfile.read() + + # Find env-defined replicas not yet in DB (keyed by name) + existing_names = {r.name.upper().replace(" ", "_").replace("-", "_") for r in replicas} + env_replicas: list[dict] = [] + seen: set[str] = set() + for key, value in env.items(): + if key.startswith("REPLICA_") and key.endswith("_URL"): + safe = key[len("REPLICA_"):-len("_URL")] + if safe not in existing_names and safe not in seen: + token_key = f"REPLICA_{safe}_TOKEN" + env_replicas.append({"safe": safe, "url": value, "token": env.get(token_key, "")}) + seen.add(safe) + + return _tmpl("replicas.html", request, {"replicas": replicas, "env_replicas": env_replicas}) + + +@router.get("/replicas/{replica_id}", response_class=HTMLResponse) +def replica_detail( + replica_id: int, request: Request, session: Session = Depends(get_session) +): + replica = session.get(Replica, replica_id) + if not replica: + return HTMLResponse("Not found", status_code=404) + + recent_runs = session.exec( + select(SyncRun) + .where(SyncRun.replica_id == replica_id) + .order_by(SyncRun.started_at.desc()) # type: ignore[attr-defined] + .limit(20) + ).all() + + from ..models import SyncMap + + sync_map_page = session.exec( + select(SyncMap) + .where(SyncMap.replica_id == replica_id) + .order_by(SyncMap.last_synced.desc()) # type: ignore[attr-defined] + .limit(50) + ).all() + + return _tmpl( + "replica_detail.html", + request, + { + "replica": replica, + "recent_runs": recent_runs, + "sync_map_page": sync_map_page, + "lag": _lag_str(replica.last_sync_ts), + }, + ) + + +@router.get("/logs", response_class=HTMLResponse) +def logs_page(request: Request, session: Session = Depends(get_session)): + replicas = session.exec(select(Replica)).all() + recent_logs = session.exec( + select(Log) + .order_by(Log.created_at.desc()) # type: ignore[attr-defined] + .limit(100) + ).all() + return _tmpl( + "logs.html", + request, + {"replicas": replicas, "logs": recent_logs}, + ) + + +@router.get("/settings", response_class=HTMLResponse) +def settings_page(request: Request, session: Session = Depends(get_session)): + from .. import envfile + from ..models import Setting + from ..scheduler import SETTINGS_DEFAULTS + + rows = session.exec(select(Setting)).all() + settings = dict(SETTINGS_DEFAULTS) + for row in rows: + if row.value is not None: + settings[row.key] = row.value + + env = envfile.read() + + # Fallback master_url from env if not saved in DB + if not settings.get("master_url") and env.get("MASTER_URL"): + settings["master_url"] = env["MASTER_URL"] + + # Mask DB secrets; pass env token for pre-fill only if DB has none + db_has_token = bool(settings.get("master_token")) + db_has_alert_token = bool(settings.get("alert_target_token")) + for k in ("master_token", "alert_target_token"): + if settings.get(k): + settings[k] = "••••••••" + + env_master_token = env.get("MASTER_TOKEN", "") if not db_has_token else "" + env_alert_token = env.get("ALERT_TARGET_TOKEN", "") if not db_has_alert_token else "" + + return _tmpl( + "settings.html", + request, + { + "settings": settings, + "env_master_token": env_master_token, + "env_alert_token": env_alert_token, + }, + ) diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..4526c1a --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,16 @@ +services: + pngx-controller: + build: . + restart: unless-stopped + network_mode: host # required for Tailscale IP access (Linux only) + env_file: .env + environment: + DATABASE_URL: sqlite:////data/db.sqlite3 + volumes: + - ./data:/data + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/healthz')"] + interval: 30s + timeout: 5s + retries: 3 + start_period: 15s diff --git a/pngx-controller-prd.md b/pngx-controller-prd.md new file mode 100644 index 0000000..c793178 --- /dev/null +++ b/pngx-controller-prd.md @@ -0,0 +1,562 @@ +# PRD — pngx-controller + +> Paperless-ngx Central Sync Controller with Web UI + +**Status:** Draft +**Author:** Alex +**Last updated:** 2026-03-20 + +--- + +## Table of Contents + +1. [Problem Statement](#1-problem-statement) +2. [Goals](#2-goals) +3. [Non-Goals](#3-non-goals) +4. [Design Principles](#4-design-principles) +5. [Architecture](#5-architecture) +6. [Web UI](#6-web-ui) +7. [Data Model](#7-data-model) +8. [API](#8-api) +9. [Sync Engine](#9-sync-engine) +10. [Deployment](#10-deployment) +11. [Implementation Phases](#11-implementation-phases) +12. [Open Questions](#12-open-questions) + +--- + +## 1. Problem Statement + +Paperless-ngx has no native multi-instance sync. The built-in export/import workflow works well for snapshots, but the import via the consume directory requires a blank instance without existing users — making it unusable for continuous sync against a live replica. The goal is a single central controller that reads from a designated master and writes to one or more replicas, using nothing but the public paperless REST API. + +--- + +## 2. Goals + +- **High availability / failover** — replicas stay current enough to serve as a fallback if the master goes down; they are fully user-facing and exposed via Traefik + Authentik +- **Backup** — at least one replica acts as a verified, importable backup at all times +- **No paperless fork** — all sync logic lives outside the paperless-ngx containers, using only its public API +- **Conflict resolution** — master always wins; replicas never push changes back +- **Web UI** — all configuration, monitoring, and operations happen through a browser interface +- **Minimal ops overhead** — one additional container in the existing Docker/Traefik/Authentik stack +- **Observable by default** — health, metrics, and structured logs available without requiring the web UI + +--- + +## 3. Non-Goals + +- Bidirectional sync or multi-master +- Real-time sync (eventual consistency is acceptable, target: ≤15 min lag) +- Replacing Authentik SSO or Traefik routing on paperless instances +- Syncing user accounts, passwords, or sessions across instances +- Automatic failover (manual failover procedure only in v1) +- **Deletion propagation in v1** — replicas are strictly additive; documents deleted on the master are not removed from replicas. This is safe and well-defined behaviour for a first version. + +--- + +## 4. Design Principles + +- **No sidecars, no changes to paperless containers** — the controller is a separate service that talks to all instances from outside via their existing REST APIs +- **Central single point of control** — one place to configure, one place to check logs, one place to restart when something breaks +- **SPOF accepted** — if the controller is down, all paperless instances keep running normally; they just don't sync until it recovers. This is acceptable because paperless does not depend on the controller in any way +- **Tailscale-native transport** — all API calls go over Tailscale IPs directly, bypassing public internet entirely +- **Fail fast and visibly** — misconfiguration (missing env vars, bad credentials, unreachable replicas) surfaces immediately as hard errors, not silent no-ops + +--- + +## 5. Architecture + +``` +┌─ domverse ──────────────────────────────────────────┐ +│ │ +│ ┌─ pngx-controller ──────────────────────────────┐ │ +│ │ │ │ +│ │ FastAPI app APScheduler │ │ +│ │ ├── Web UI (HTMX) └── sync job (every Nm, │ │ +│ │ ├── REST API (/api) max 30 min timeout) │ │ +│ │ ├── /healthz (no auth) │ │ +│ │ └── /metrics (no auth, Prometheus) │ │ +│ │ │ │ +│ │ SQLite (WAL mode) — replicas, sync_map, logs │ │ +│ └─────────────────────────────────────────────────┘ │ +│ │ Tailscale (direct, host network) │ +└───────┼───────────────────────────────────────────── ┘ + │ + ├── GET/POST → paperless #1 (master, 100.x.x.x:8000) + ├── GET/POST → paperless #2 (replica, 100.y.y.y:8000) + └── GET/POST → paperless #3 (replica, 100.z.z.z:8000) +``` + +Single process, single container. APScheduler runs a single global sync job at the configured base interval. Each replica has an optional `sync_interval_seconds` override; the job checks per replica whether enough time has elapsed since `last_sync_ts` before including it in the cycle. An `asyncio.Lock` prevents concurrent runs. + +**Startup sequence:** +1. Validate `SECRET_KEY` is present and a valid Fernet key — exit with error if not +2. Verify DB file path is writable — exit with error if not +3. Run SQLite migrations; set `PRAGMA journal_mode=WAL` +4. Seed `settings` table from env vars if not already populated (`MASTER_URL`, `MASTER_TOKEN`) +5. Close any orphaned `sync_runs` (records where `finished_at IS NULL`) left by a previous unclean shutdown — set `finished_at = now()`, `timed_out = true`, log a warning per record +6. Start APScheduler; register sync job +7. Start FastAPI + +All structured logs are emitted as JSON to stdout (`{"ts": ..., "level": ..., "replica": ..., "doc_id": ..., "msg": ...}`) in addition to being written to the `logs` DB table. DB logs drive the web UI; stdout logs are for external aggregators (Loki, etc.). + +### Tech Stack + +| Layer | Choice | Notes | +|---|---|---| +| Backend | Python / FastAPI | Sync engine as APScheduler background job | +| Frontend | Jinja2 + HTMX | No JS build step; partial page updates via HTMX | +| Styling | Pico CSS | Minimal, semantic, no build required | +| Database | SQLite via SQLModel | WAL mode; single file, bind-mounted volume | +| Auth | Authentik forward auth | `X-authentik-*` headers; no app-level auth code needed | + +--- + +## 6. Web UI + +### Pages + +#### Dashboard (`/`) + +- Master instance: URL, connection status (green/red indicator), last successful contact +- Per-replica row: name, URL, lag (time since last successful sync), status badge (`synced` / `syncing` / `error` / `unreachable` / `suspended`) — runtime-computed; and error rate from last run (e.g. `47 synced · 3 failed` linking to filtered log view) +- Global **Sync now** button — triggers an immediate full cycle across all enabled replicas; returns 202 immediately; client polls `/api/sync/running` which returns progress detail, not just a boolean +- Last sync: timestamp, duration, documents synced, documents failed + +#### Replicas (`/replicas`) + +- Table of configured replicas: name, Tailscale URL, enabled toggle, per-replica sync interval (blank = global), last sync timestamp, action buttons (edit, delete, **Test connection**) +- Suspended replicas show a `suspended — N consecutive failures` badge with a distinct **Re-enable** button (separate from the enabled toggle) +- **Add replica** form: name, Tailscale URL, API token (masked input), sync interval override (optional), enabled checkbox — form runs a connection test before saving and shows the result inline; if the replica already contains documents, a **Reconcile** button appears after successful save to populate `sync_map` without re-uploading files +- Replica detail view (`/replicas/{id}`): + - Cumulative sync stats + - Sync run history: table of the last 20 sync runs — timestamp, duration, docs synced, docs failed, triggered by + - Per-document sync map (paginated) + - Error history + +#### Logs (`/logs`) + +- Live log stream via SSE — HTMX connects to `/api/logs/stream` using the `hx-ext="sse"` extension +- Filter by: replica, level (`info` / `warning` / `error`), date range +- Full-text search on message (SQLite FTS5) +- Each log entry: timestamp, level, replica name, document ID (if applicable), message +- **Clear logs** action: remove entries older than N days + +#### Settings (`/settings`) + +- Master instance URL + API token (editable in place, with connection test on save) +- Global sync interval (minutes) +- Log retention (days) +- Sync cycle timeout (minutes; default 30) +- Task poll timeout (minutes; default 10) +- Replica suspend threshold (consecutive failed cycles; default 5) +- Max concurrent requests per target (default 4) +- **Notifications:** alert target (Gotify URL + token, or generic webhook URL + optional auth header), error threshold (docs failed per run to trigger alert), alert cooldown (minutes; default 60) +- **Danger zone:** Full resync button per replica — available in Phase 3; wipes the sync map for that replica and re-syncs everything from scratch + +--- + +## 7. Data Model + +```sql +-- Enable WAL mode on every connection open: +-- PRAGMA journal_mode=WAL; + +CREATE TABLE replicas ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + url TEXT NOT NULL, -- Tailscale URL, e.g. http://100.y.y.y:8000 + api_token TEXT NOT NULL, -- Fernet-encrypted (key from SECRET_KEY env) + enabled BOOLEAN DEFAULT TRUE, + sync_interval_seconds INTEGER, -- NULL = use global setting + last_sync_ts DATETIME, -- per-replica; advanced only on successful sync + consecutive_failures INTEGER DEFAULT 0, -- reset to 0 on any successful sync cycle + suspended_at DATETIME, -- NULL = active; set when consecutive_failures >= threshold + last_alert_at DATETIME, -- used to enforce alert cooldown per replica + created_at DATETIME DEFAULT CURRENT_TIMESTAMP +); + +CREATE TABLE sync_map ( + id INTEGER PRIMARY KEY, + replica_id INTEGER REFERENCES replicas(id) ON DELETE CASCADE, + master_doc_id INTEGER NOT NULL, + replica_doc_id INTEGER, -- NULL while post_document task is pending + task_id TEXT, -- Celery task UUID returned by post_document; cleared once resolved + last_synced DATETIME, + file_checksum TEXT, -- SHA256 of original file; populated but not used for skipping until Phase 3 + status TEXT DEFAULT 'pending', -- pending | ok | error + error_msg TEXT, + retry_count INTEGER DEFAULT 0, -- incremented each time this doc is retried from error state + UNIQUE(replica_id, master_doc_id) +); +-- Recommended indexes: +-- CREATE INDEX idx_sync_map_replica ON sync_map(replica_id); +-- CREATE INDEX idx_sync_map_status ON sync_map(replica_id, status); + +CREATE TABLE sync_runs ( + id INTEGER PRIMARY KEY, + replica_id INTEGER REFERENCES replicas(id) ON DELETE SET NULL, -- NULL = all-replica run + started_at DATETIME, + finished_at DATETIME, + triggered_by TEXT, -- 'scheduler' | 'manual' | 'reconcile' + docs_synced INTEGER DEFAULT 0, + docs_failed INTEGER DEFAULT 0, + timed_out BOOLEAN DEFAULT FALSE +); + +CREATE TABLE logs ( + id INTEGER PRIMARY KEY, + run_id INTEGER REFERENCES sync_runs(id) ON DELETE SET NULL, + replica_id INTEGER REFERENCES replicas(id) ON DELETE SET NULL, + level TEXT, -- info | warning | error + message TEXT, + doc_id INTEGER, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP +); +-- FTS5 index for full-text search on message: +-- CREATE VIRTUAL TABLE logs_fts USING fts5(message, content=logs, content_rowid=id); + +CREATE TABLE settings ( + key TEXT PRIMARY KEY, + value TEXT -- master_token value is Fernet-encrypted, same as replicas.api_token +); +-- Keys: +-- master_url (seeded from MASTER_URL env var on first boot) +-- master_token (encrypted; seeded from MASTER_TOKEN env var on first boot) +-- sync_interval_seconds (default 900) +-- log_retention_days (default 90) +-- sync_cycle_timeout_seconds (default 1800) +-- task_poll_timeout_seconds (default 600) +-- replica_suspend_threshold (default 5) +-- max_concurrent_requests (default 4; applies independently per target instance) +-- alert_target_type ('gotify' | 'webhook' | '') +-- alert_target_url (Gotify or webhook URL) +-- alert_target_token (encrypted; Gotify token or webhook auth header value) +-- alert_error_threshold (docs failed per run to trigger; default 5) +-- alert_cooldown_seconds (minimum seconds between alerts per replica; default 3600) +``` + +--- + +## 8. API + +All endpoints return JSON unless noted. HTMX partial updates use the `/`-prefixed HTML routes which return rendered template fragments. + +**Authentication:** All `/api/*` and UI routes go through Authentik forward auth. `/healthz` and `/metrics` are excluded — configured via a separate Traefik router without the Authentik middleware. + +| Method | Path | Auth | Description | +|---|---|---|---| +| `GET` | `/healthz` | None | `{"status":"ok","db":"ok"}` or 503. For Docker `HEALTHCHECK` and uptime monitors. | +| `GET` | `/metrics` | None | Prometheus text format — see metrics list below | +| `GET` | `/api/status` | Authentik | Dashboard data: master health, per-replica lag, last-run error rates (runtime-computed) | +| `POST` | `/api/sync` | Authentik | Trigger immediate sync — returns **202** immediately. Accepts `?replica_id=N`. | +| `GET` | `/api/sync/running` | Authentik | `{"running": bool, "phase": str, "docs_done": int, "docs_total": int}` — drives UI spinner | +| `GET` | `/api/replicas` | Authentik | List all replicas | +| `POST` | `/api/replicas` | Authentik | Add a replica — runs connection test before saving; returns 422 if test fails | +| `PUT` | `/api/replicas/{id}` | Authentik | Update a replica — re-runs connection test if URL or token changed | +| `DELETE` | `/api/replicas/{id}` | Authentik | Remove a replica and its sync_map entries | +| `POST` | `/api/replicas/{id}/test` | Authentik | Test connection; returns `{"ok": bool, "error": str\|null, "latency_ms": int, "doc_count": int}` | +| `POST` | `/api/replicas/{id}/reconcile` | Authentik | Match existing replica documents to master by ASN / (title + date); populate sync_map without re-uploading | +| `POST` | `/api/replicas/{id}/resync` | Authentik | Wipe sync_map for this replica, trigger full resync *(Phase 3)* | +| `POST` | `/api/replicas/{id}/unsuspend` | Authentik | Clear `suspended_at` and `consecutive_failures`, re-enable replica | +| `GET` | `/api/logs` | Authentik | Paginated log query (`?replica_id`, `?level`, `?from`, `?to`, `?q` for FTS) | +| `GET` | `/api/logs/stream` | Authentik | SSE endpoint for live log tail | +| `GET` | `/api/settings` | Authentik | Read all settings | +| `PUT` | `/api/settings` | Authentik | Update settings; validate master connection before saving master_url/master_token | + +### Prometheus metrics (`/metrics`) + +| Metric | Type | Labels | +|---|---|---| +| `pngx_sync_docs_total` | Counter | `replica`, `status` (`ok`/`error`) | +| `pngx_sync_duration_seconds` | Histogram | `triggered_by` | +| `pngx_replica_lag_seconds` | Gauge | `replica` | +| `pngx_replica_pending_tasks` | Gauge | `replica` | +| `pngx_replica_consecutive_failures` | Gauge | `replica` | +| `pngx_sync_running` | Gauge | — | + +--- + +## 9. Sync Engine + +### Why not the consume directory + +The consume directory triggers paperless's full ingestion pipeline: re-OCR, re-classification, ID reassignment. It also requires no prior documents to exist. Syncing via consume to a live instance with users causes ID collisions and duplicate processing. The controller uses the REST API's `POST /api/documents/post_document/` (create) and `PATCH /api/documents/{id}/` (update metadata) endpoints with explicit metadata. + +**Important:** `post_document` still goes through paperless's Celery consumption pipeline — OCR will run on replicas for newly uploaded documents. This adds processing overhead but the metadata supplied at upload time (title, tags, dates, etc.) takes precedence. This is an accepted cost of using the public API without modifying paperless containers. + +### What gets synced + +Replicas are HA and fully user-facing; both original and archived files are synced. + +| Entity | Method | Notes | +|---|---|---| +| Documents (original file) | Binary download/upload | Always synced | +| Documents (archived/OCR'd file) | Binary download/upload | Always synced — replicas are HA | +| Document metadata | JSON via API | Title, dates, notes, custom fields, ASN | +| Tags | API + name-based dedup | IDs differ per instance; mapped by name | +| Correspondents | API + name-based dedup | Same | +| Document types | API + name-based dedup | Same | +| Custom field schemas | API, synced before docs | Schema must exist on replica before document data | +| Users / groups | Not synced | Managed independently per instance | + +Replicas are **strictly additive in v1**: documents deleted on the master are not removed from replicas. + +### Resilience primitives + +**Concurrency throttle:** An `asyncio.Semaphore` with `max_concurrent_requests` (default 4) is created per target instance (one for master, one per replica) at the start of each sync cycle. All HTTP calls acquire the relevant semaphore before executing. This prevents the controller from overwhelming any single paperless instance with concurrent requests, especially during a full initial sync. + +**Retry with exponential backoff:** All individual HTTP calls to master and replicas are wrapped in a retry decorator — 3 attempts with 2 s / 4 s / 8 s delays. Only network-level and 5xx errors are retried; 4xx errors (auth, not found) fail immediately. Each retry is logged at `warning` level. A document is only marked `error` in `sync_map` after all retries are exhausted. + +**Task poll timeout:** After `POST /api/documents/post_document/` returns a task UUID, the controller polls `/api/tasks/?task_id=` on the next sync cycle (step 4b below). If a task has been pending for longer than `task_poll_timeout_seconds` (default 600 s / 10 min), it is marked `error` with message `"task timed out"` and `replica_doc_id` remains NULL. The document will be retried from scratch on a full resync. + +**Sync cycle timeout:** The entire sync cycle (all replicas combined) has a hard timeout of `sync_cycle_timeout_seconds` (default 1800 s / 30 min). If exceeded, the cycle is cancelled, the `asyncio.Lock` released, `sync_run.timed_out` set to `true`, and a `warning` log emitted. The next scheduled run starts fresh. + +**Auto-suspend:** After `replica_suspend_threshold` (default 5) consecutive sync cycles where a replica fails entirely (the replica itself is unreachable or auth fails — not individual document errors), the controller sets `suspended_at = now()` and stops including that replica in future sync cycles. A prominent `error` log is emitted. The UI shows a `suspended` badge and a **Re-enable** button (`POST /api/replicas/{id}/unsuspend`). `consecutive_failures` resets to 0 on any successful sync cycle for that replica. + +**SQLite backup:** On every successful sync run completion, `sqlite3.connect(db_path).backup(sqlite3.connect(backup_path))` is called to produce `/data/db.sqlite3.bak`. This is safe while the DB is open and provides one-cycle-lag recovery from DB corruption. + +**Alert / notification:** After each sync run, if `docs_failed >= alert_error_threshold` OR a replica was just suspended, and `now() - replica.last_alert_at > alert_cooldown_seconds`, the controller sends an alert and updates `replica.last_alert_at`. Two target types are supported: +- **Gotify:** `POST {alert_target_url}/message` with `{"title": "pngx-controller alert", "message": "...", "priority": 7}` +- **Generic webhook:** `POST {alert_target_url}` with JSON payload and optional `Authorization` header + +Alert payload: +```json +{ + "event": "sync_failures_threshold" | "replica_suspended", + "replica": "backup", + "replica_url": "http://100.y.y.y:8000", + "consecutive_failures": 5, + "docs_failed": 12, + "docs_synced": 3, + "timestamp": "2026-03-20T14:00:00Z", + "controller_url": "https://pngx.domverse.de" +} +``` + +### Reconcile mode + +Used when adding a replica that already contains documents to avoid creating duplicates. Triggered via `POST /api/replicas/{id}/reconcile`. The reconcile process: + +1. Paginate through all documents on the replica; build a map of `asn → replica_doc` and `(title, created_date) → replica_doc` +2. Paginate through all documents on the master; for each master doc: + - Match by ASN first (most reliable); fall back to (title + created_date) + - If matched: insert `sync_map` row with both IDs, `status='ok'`, compute `file_checksum` from master download + - If unmatched: leave for the normal sync cycle to handle (will be created on replica) +3. Replica documents with no master match are left untouched +4. Reconcile is **non-destructive and idempotent** — safe to run multiple times + +Reconcile is a one-time operation per replica. After it completes, normal sync cycles take over. + +### Sync cycle + +The name→id mapping for tags, correspondents, document types, and custom fields is built **in memory** at the start of each sync run by querying both master and replica. It is not persisted to the DB; it is rebuilt every cycle to avoid stale mappings. + +The APScheduler job fires at `sync_interval_seconds` (global setting). At the start of each run, each replica is checked: if `replica.sync_interval_seconds IS NOT NULL` and `now() - replica.last_sync_ts < replica.sync_interval_seconds`, that replica is skipped this cycle. This allows per-replica intervals without multiple scheduler jobs. + +``` +Every N minutes (global base interval), with sync_cycle_timeout_seconds hard limit: + +1. acquire asyncio.Lock — skip cycle if already running + +2. create sync_run record (triggered_by = 'scheduler' | 'manual') + +3. determine eligible replicas: + enabled AND NOT suspended AND (sync_interval_seconds IS NULL + OR now() - last_sync_ts >= sync_interval_seconds) + +4. fetch changed_docs from master with pagination (outside the replica loop): + page = 1 + all_changed_docs = [] + loop: + response = GET master /api/documents/ + ?modified__gte={min(last_sync_ts across eligible replicas)} + &ordering=modified&page_size=100&page={page} + (with retry/backoff, inside master semaphore) + all_changed_docs += response.results + if response.next is None: break + page += 1 + +5. for each eligible replica: + + a. ensure_schema_parity(master, replica) + → paginate and query all tags / correspondents / doc types / custom fields + from master and replica (inside respective semaphores, with retry/backoff) + → create missing entities on replica + → build in-memory name→id maps: + master_tag_id → replica_tag_id + master_cf_id → replica_cf_id + (same for correspondents, document types) + + b. resolve pending sync_map entries (status='pending', replica_doc_id IS NULL): + → for each: GET replica /api/tasks/?task_id={task_id} + (inside replica semaphore, with retry/backoff) + → if complete: update replica_doc_id, clear task_id, set status='ok' + → if failed: set status='error', increment retry_count, store error_msg + → if age > task_poll_timeout_seconds: set status='error', msg='task timed out' + + c. collect docs to process: + - changed_docs filtered to those modified since replica.last_sync_ts + - UNION sync_map entries for this replica where status='error' + (capped at 50 per cycle to avoid starving new documents) + + d. for each doc in docs_to_process: + (all HTTP calls inside respective semaphores, with retry/backoff) + + file_orig = GET master /api/documents/{id}/download/ + file_archived = GET master /api/documents/{id}/download/?original=false + meta = GET master /api/documents/{id}/ + + translate metadata using in-memory name→id maps: + tag_ids → [replica_tag_id for each master_tag_id] + correspondent → replica_correspondent_id + document_type → replica_document_type_id + custom_fields → {replica_cf_id: value for each master_cf_id} + + if master_doc_id in sync_map[replica] AND replica_doc_id IS NOT NULL: + PATCH metadata → replica /api/documents/{replica_doc_id}/ + if sha256(file_orig) != sync_map.file_checksum: + re-upload original file → replica + upload archived file → replica + update sync_map (last_synced, file_checksum, status='ok', retry_count reset) + else if master_doc_id NOT in sync_map[replica]: + POST file_orig + translated metadata → replica /api/documents/post_document/ + → response: {task_id: ""} + insert sync_map row (status='pending', task_id=, replica_doc_id=NULL) + → task resolution and archived file upload handled in step 5b of next cycle + + log result to logs table (DB + stdout JSON) + + e. on full success for this replica: + → set replica.last_sync_ts = start of this cycle + → reset replica.consecutive_failures = 0 + → emit metrics update + → send alert if docs_failed >= alert_error_threshold and cooldown elapsed + + f. on replica-level failure (unreachable, auth error): + → increment replica.consecutive_failures + → if consecutive_failures >= replica_suspend_threshold: + set replica.suspended_at = now() + log error "replica suspended after N consecutive failures" + send alert if cooldown elapsed + +6. if all eligible replicas completed without timeout: + → call sqlite3 backup: db.sqlite3 → db.sqlite3.bak + +7. close sync_run record with stats (docs_synced, docs_failed, timed_out) +8. release lock +``` + +### Conflict resolution + +Master always wins. If a document was modified on the replica directly, the master's version overwrites it on the next sync cycle. Replicas should be treated as read-only by convention; there is no enforcement mechanism in v1. + +--- + +## 10. Deployment + +```yaml +services: + pngx-controller: + image: ghcr.io/yourname/pngx-controller:latest + restart: unless-stopped + network_mode: host # required for Tailscale IP access + environment: + SECRET_KEY: ${PNGX_SECRET_KEY} # Fernet key for encrypting API tokens at rest (required) + DATABASE_URL: sqlite:////data/db.sqlite3 + MASTER_URL: ${PNGX_MASTER_URL} # optional: seeds settings.master_url on first boot + MASTER_TOKEN: ${PNGX_MASTER_TOKEN} # optional: seeds settings.master_token on first boot + volumes: + - /srv/docker/pngx-controller/data:/data + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/healthz"] + interval: 30s + timeout: 5s + retries: 3 + start_period: 10s +``` + +**Why `network_mode: host`:** The controller makes HTTP requests to Tailscale IPs (100.x.x.x). Inside a bridged Docker network, these are unreachable without additional routing. Host networking gives the container direct access to the host's Tailscale interface. Traefik can still proxy to `localhost:8000` on the host. + +**Unauthenticated routes (`/healthz`, `/metrics`):** Configure a second Traefik router for these paths without the `authentik@file` middleware. Both paths are read-only and expose no user data. + +**SECRET_KEY rotation:** If `SECRET_KEY` must be replaced, run the bundled CLI command before restarting with the new key: +``` +docker run --rm -v /srv/docker/pngx-controller/data:/data \ + -e OLD_SECRET_KEY= -e NEW_SECRET_KEY= \ + ghcr.io/yourname/pngx-controller:latest rotate-key +``` +This decrypts all stored tokens with the old key and re-encrypts them with the new key in a single transaction. The container must be stopped before running this command. + +`SECRET_KEY` is the only required env var at startup. `MASTER_URL` / `MASTER_TOKEN` are optional conveniences — if omitted, they are entered through the Settings UI on first run. All credentials are stored Fernet-encrypted in SQLite. + +### Directory structure + +``` +/srv/docker/pngx-controller/ +└── data/ + ├── db.sqlite3 + └── db.sqlite3.bak # written after each successful sync run +``` + +--- + +## 11. Implementation Phases + +### Phase 1 — Working sync (MVP) + +- Startup validation: check `SECRET_KEY` validity and DB writability; exit with error if either fails +- Startup cleanup: close orphaned `sync_runs` left by unclean shutdown +- SQLite schema + SQLModel models; enable WAL mode on startup +- Env var seeding: populate `settings` from `MASTER_URL` / `MASTER_TOKEN` on first boot if not set +- Settings page: configure master URL + token (with connection test on save), sync interval, timeouts, suspend threshold, max concurrent requests +- Replica CRUD with per-replica sync interval override; connection test on add/edit (`POST /api/replicas/{id}/test`) +- Reconcile mode: `POST /api/replicas/{id}/reconcile`; UI button appears on replica add if replica has existing documents +- Sync engine: + - Paginated master document query + - In-memory name→id mapping; schema parity + - `asyncio.Semaphore` per target instance (`max_concurrent_requests`) + - Document push (original + archived files) with retry/backoff (3 attempts, 2/4/8 s) + - Error-status document retry (up to 50 per cycle per replica) + - Async task polling with `task_poll_timeout_seconds` + - Sync cycle timeout (`sync_cycle_timeout_seconds`) + - Auto-suspend after `replica_suspend_threshold` consecutive failures + - Per-replica interval check inside global scheduler job +- APScheduler integration with `asyncio.Lock` +- Structured JSON logs to stdout on every sync event +- Basic dashboard: last sync time, per-replica status badge, error rate (N synced · N failed) +- `/api/sync/running` returns progress detail (`phase`, `docs_done`, `docs_total`) +- Log table view (paginated, filterable, FTS search) +- `/healthz` endpoint (unauthenticated) +- `rotate-key` CLI command + +### Phase 2 — Live feedback and observability + +- SSE log stream on `/api/logs/stream` with HTMX `hx-ext="sse"` integration +- Sync progress indicator on dashboard (HTMX polls `/api/sync/running`, displays phase + count) +- Per-replica document count + lag calculation +- Live feedback on manual sync trigger +- Sync run history on replica detail page (last 20 runs: timestamp, duration, docs synced/failed) +- `/metrics` Prometheus endpoint (unauthenticated) +- SQLite backup to `db.sqlite3.bak` after each successful sync run +- `POST /api/replicas/{id}/unsuspend` + Re-enable UI button +- Alert / notification: Gotify and generic webhook support with configurable threshold and cooldown + +### Phase 3 — Resilience and operations + +- Full resync per replica (wipe sync_map, rebuild from scratch) — UI button enabled +- File checksum comparison to skip unchanged file re-uploads (`file_checksum` column already exists in Phase 1 schema) +- Deletion propagation via tombstone table (or remain strictly additive — decision deferred) +- Export sync_map as CSV for debugging + +--- + +## 12. Open Questions + +1. **Deletion propagation** — resolved for v1: replicas are strictly additive. Revisit in Phase 3: options are tombstone tracking (propagate deletes) or leave as-is (backup semantics, never delete). + +2. **File versions** — resolved: both original and archived files are synced. Replicas are HA and must serve users the same experience as the master (archived/OCR'd version is what users download by default). + +3. **Replica read access** — resolved: replicas are fully user-facing HA instances with Traefik + Authentik exposure. They are not backup-only. + +4. **Sync webhooks** — paperless-ngx supports outgoing webhooks on document events. Phase 3+ could use webhook-triggered sync for near-real-time replication. **Constraint:** the webhook receiver on the controller would need an unauthenticated route (Authentik forward auth blocks unauthenticated POSTs), requiring a separate `/webhook/paperless` route excluded from the Authentik middleware — evaluate security implications before implementing. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..ccea993 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,27 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "pngx-controller" +version = "0.1.0" +description = "Paperless-ngx Central Sync Controller" +requires-python = ">=3.11" +dependencies = [ + "fastapi>=0.115.0", + "uvicorn[standard]>=0.32.0", + "sqlmodel>=0.0.22", + "apscheduler>=3.10.4", + "cryptography>=43.0.0", + "httpx>=0.27.0", + "jinja2>=3.1.4", + "python-multipart>=0.0.12", + "prometheus-client>=0.21.0", + "aiofiles>=24.1.0", +] + +[project.scripts] +pngx-controller = "app.main:cli_entry" + +[tool.hatch.build.targets.wheel] +packages = ["app"]