"""FastAPI application entry point with startup sequence and CLI.""" import os import sys from contextlib import asynccontextmanager from pathlib import Path from fastapi import FastAPI, Request from fastapi.responses import JSONResponse, PlainTextResponse from fastapi.templating import Jinja2Templates from .config import get_config from .crypto import is_valid_fernet_key from .database import create_db_and_tables, get_engine from .logger import emit_log # ── Startup sequence ────────────────────────────────────────────────────────── def _startup_validate(): config = get_config() # 1. Validate SECRET_KEY if not config.secret_key: sys.exit("FATAL: SECRET_KEY environment variable is required") if not is_valid_fernet_key(config.secret_key): sys.exit("FATAL: SECRET_KEY is not a valid Fernet key. " "Generate one with: python -c \"from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())\"") # 2. Verify DB file path is writable db_path = Path(config.db_path) db_dir = db_path.parent if not db_dir.exists(): try: db_dir.mkdir(parents=True, exist_ok=True) except Exception as e: sys.exit(f"FATAL: Cannot create DB directory {db_dir}: {e}") if db_path.exists() and not os.access(db_path, os.W_OK): sys.exit(f"FATAL: DB file {db_path} is not writable") if not os.access(db_dir, os.W_OK): sys.exit(f"FATAL: DB directory {db_dir} is not writable") def _startup_cleanup(): """Close orphaned sync_runs left by an unclean shutdown.""" from datetime import datetime, timezone from sqlmodel import Session, select from .models import SyncRun engine = get_engine() with Session(engine) as session: orphans = session.exec( select(SyncRun).where(SyncRun.finished_at == None) # noqa: E711 ).all() now = datetime.now(timezone.utc) for run in orphans: run.finished_at = now run.timed_out = True session.add(run) emit_log( "warning", f"Closed orphaned sync_run #{run.id} (unclean shutdown)", ) if orphans: session.commit() def _startup_seed(): """Seed settings from env vars on first boot.""" config = get_config() from sqlmodel import Session from .crypto import encrypt from .models import Setting engine = get_engine() with Session(engine) as session: def _set_if_absent(key: str, value: str) -> None: existing = session.get(Setting, key) if existing is None or existing.value is None: session.add(Setting(key=key, value=value)) if config.master_url: _set_if_absent("master_url", config.master_url) if config.master_token: encrypted = encrypt(config.master_token, config.secret_key) _set_if_absent("master_token", encrypted) session.commit() @asynccontextmanager async def lifespan(app: FastAPI): # Startup _startup_validate() create_db_and_tables() _startup_cleanup() _startup_seed() emit_log("info", "pngx-controller starting") # Read sync interval from settings from sqlmodel import Session, select from .models import Setting from .scheduler import SETTINGS_DEFAULTS with Session(get_engine()) as session: row = session.get(Setting, "sync_interval_seconds") interval = int(row.value if row and row.value else SETTINGS_DEFAULTS["sync_interval_seconds"]) from .scheduler import start_scheduler, stop_scheduler start_scheduler(interval_seconds=interval) emit_log("info", f"Scheduler started, sync interval: {interval}s") yield # Shutdown stop_scheduler() emit_log("info", "pngx-controller stopped") # ── Application ─────────────────────────────────────────────────────────────── def create_app() -> FastAPI: app = FastAPI( title="pngx-controller", description="Paperless-ngx Central Sync Controller", version="0.1.0", lifespan=lifespan, ) # Templates import json as _json from markupsafe import Markup as _Markup templates_dir = Path(__file__).parent / "templates" templates = Jinja2Templates(directory=str(templates_dir)) templates.env.filters["tojson"] = lambda v: _Markup(_json.dumps(v)) # Register UI routes (must come before API to avoid catch-all conflicts) from .ui.routes import router as ui_router, setup_templates setup_templates(templates) app.include_router(ui_router) # API routers from .api.replicas import router as replicas_router from .api.sync import router as sync_router from .api.logs import router as logs_router from .api.settings import router as settings_router from .api.status import router as status_router from .api.master import router as master_router app.include_router(replicas_router) app.include_router(sync_router) app.include_router(logs_router) app.include_router(settings_router) app.include_router(status_router) app.include_router(master_router) # ── Health & Metrics (no auth) ──────────────────────────────────────────── @app.get("/healthz", include_in_schema=False) def healthz(): try: engine = get_engine() with engine.connect() as conn: conn.exec_driver_sql("SELECT 1") return {"status": "ok", "db": "ok"} except Exception as e: return JSONResponse(status_code=503, content={"status": "error", "db": str(e)}) @app.get("/metrics", include_in_schema=False) def metrics(): from prometheus_client import generate_latest, CONTENT_TYPE_LATEST return PlainTextResponse(generate_latest(), media_type=CONTENT_TYPE_LATEST) return app app = create_app() # ── CLI entry point ─────────────────────────────────────────────────────────── def rotate_key_cli() -> None: """Re-encrypt all stored tokens: OLD_SECRET_KEY → NEW_SECRET_KEY.""" old_key = os.environ.get("OLD_SECRET_KEY", "") new_key = os.environ.get("NEW_SECRET_KEY", "") if not old_key or not new_key: sys.exit("FATAL: OLD_SECRET_KEY and NEW_SECRET_KEY must be set") if not is_valid_fernet_key(old_key): sys.exit("FATAL: OLD_SECRET_KEY is not a valid Fernet key") if not is_valid_fernet_key(new_key): sys.exit("FATAL: NEW_SECRET_KEY is not a valid Fernet key") # Temporarily set env so get_config() reads the old key for DB access os.environ["SECRET_KEY"] = old_key # Reset lru_cache so we pick up the new env get_config.cache_clear() # type: ignore[attr-defined] create_db_and_tables() from sqlmodel import Session, select as sql_select from .crypto import decrypt, encrypt from .models import Replica, Setting engine = get_engine() count = 0 with Session(engine) as session: for replica in session.exec(sql_select(Replica)).all(): try: plain = decrypt(replica.api_token, old_key) replica.api_token = encrypt(plain, new_key) session.add(replica) count += 1 except Exception as e: print(f"WARNING: Could not re-encrypt token for replica {replica.name}: {e}", file=sys.stderr) for key in ("master_token", "alert_target_token"): setting = session.get(Setting, key) if setting and setting.value: try: plain = decrypt(setting.value, old_key) setting.value = encrypt(plain, new_key) session.add(setting) count += 1 except Exception as e: print(f"WARNING: Could not re-encrypt {key}: {e}", file=sys.stderr) session.commit() print(f"rotate-key: re-encrypted {count} token(s) successfully") print("Restart the container with the new SECRET_KEY.") def cli_entry() -> None: if len(sys.argv) > 1 and sys.argv[1] == "rotate-key": rotate_key_cli() else: import uvicorn uvicorn.run( "app.main:app", host=os.environ.get("BIND_HOST", "0.0.0.0"), port=int(os.environ.get("PORT", "8000")), log_level="warning", ) if __name__ == "__main__": cli_entry()