Files
pngx-sync/app/main.py
domverse 15599b9f01
All checks were successful
Deploy / deploy (push) Successful in 17s
fix: add tojson Jinja2 filter to fix 500 on replica detail page
FastAPI's Jinja2Templates doesn't include Flask's tojson filter.
Register json.dumps as the tojson filter so replica_detail.html
can safely embed replica.name in a JS string literal.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-25 22:04:31 +01:00

248 lines
8.6 KiB
Python

"""FastAPI application entry point with startup sequence and CLI."""
import os
import sys
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, PlainTextResponse
from fastapi.templating import Jinja2Templates
from .config import get_config
from .crypto import is_valid_fernet_key
from .database import create_db_and_tables, get_engine
from .logger import emit_log
# ── Startup sequence ──────────────────────────────────────────────────────────
def _startup_validate():
config = get_config()
# 1. Validate SECRET_KEY
if not config.secret_key:
sys.exit("FATAL: SECRET_KEY environment variable is required")
if not is_valid_fernet_key(config.secret_key):
sys.exit("FATAL: SECRET_KEY is not a valid Fernet key. "
"Generate one with: python -c \"from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())\"")
# 2. Verify DB file path is writable
db_path = Path(config.db_path)
db_dir = db_path.parent
if not db_dir.exists():
try:
db_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
sys.exit(f"FATAL: Cannot create DB directory {db_dir}: {e}")
if db_path.exists() and not os.access(db_path, os.W_OK):
sys.exit(f"FATAL: DB file {db_path} is not writable")
if not os.access(db_dir, os.W_OK):
sys.exit(f"FATAL: DB directory {db_dir} is not writable")
def _startup_cleanup():
"""Close orphaned sync_runs left by an unclean shutdown."""
from datetime import datetime, timezone
from sqlmodel import Session, select
from .models import SyncRun
engine = get_engine()
with Session(engine) as session:
orphans = session.exec(
select(SyncRun).where(SyncRun.finished_at == None) # noqa: E711
).all()
now = datetime.now(timezone.utc)
for run in orphans:
run.finished_at = now
run.timed_out = True
session.add(run)
emit_log(
"warning",
f"Closed orphaned sync_run #{run.id} (unclean shutdown)",
)
if orphans:
session.commit()
def _startup_seed():
"""Seed settings from env vars on first boot."""
config = get_config()
from sqlmodel import Session
from .crypto import encrypt
from .models import Setting
engine = get_engine()
with Session(engine) as session:
def _set_if_absent(key: str, value: str) -> None:
existing = session.get(Setting, key)
if existing is None or existing.value is None:
session.add(Setting(key=key, value=value))
if config.master_url:
_set_if_absent("master_url", config.master_url)
if config.master_token:
encrypted = encrypt(config.master_token, config.secret_key)
_set_if_absent("master_token", encrypted)
session.commit()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
_startup_validate()
create_db_and_tables()
_startup_cleanup()
_startup_seed()
emit_log("info", "pngx-controller starting")
# Read sync interval from settings
from sqlmodel import Session, select
from .models import Setting
from .scheduler import SETTINGS_DEFAULTS
with Session(get_engine()) as session:
row = session.get(Setting, "sync_interval_seconds")
interval = int(row.value if row and row.value else SETTINGS_DEFAULTS["sync_interval_seconds"])
from .scheduler import start_scheduler, stop_scheduler
start_scheduler(interval_seconds=interval)
emit_log("info", f"Scheduler started, sync interval: {interval}s")
yield
# Shutdown
stop_scheduler()
emit_log("info", "pngx-controller stopped")
# ── Application ───────────────────────────────────────────────────────────────
def create_app() -> FastAPI:
app = FastAPI(
title="pngx-controller",
description="Paperless-ngx Central Sync Controller",
version="0.1.0",
lifespan=lifespan,
)
# Templates
import json as _json
templates_dir = Path(__file__).parent / "templates"
templates = Jinja2Templates(directory=str(templates_dir))
templates.env.filters["tojson"] = _json.dumps
# Register UI routes (must come before API to avoid catch-all conflicts)
from .ui.routes import router as ui_router, setup_templates
setup_templates(templates)
app.include_router(ui_router)
# API routers
from .api.replicas import router as replicas_router
from .api.sync import router as sync_router
from .api.logs import router as logs_router
from .api.settings import router as settings_router
from .api.status import router as status_router
from .api.master import router as master_router
app.include_router(replicas_router)
app.include_router(sync_router)
app.include_router(logs_router)
app.include_router(settings_router)
app.include_router(status_router)
app.include_router(master_router)
# ── Health & Metrics (no auth) ────────────────────────────────────────────
@app.get("/healthz", include_in_schema=False)
def healthz():
try:
engine = get_engine()
with engine.connect() as conn:
conn.exec_driver_sql("SELECT 1")
return {"status": "ok", "db": "ok"}
except Exception as e:
return JSONResponse(status_code=503, content={"status": "error", "db": str(e)})
@app.get("/metrics", include_in_schema=False)
def metrics():
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
return PlainTextResponse(generate_latest(), media_type=CONTENT_TYPE_LATEST)
return app
app = create_app()
# ── CLI entry point ───────────────────────────────────────────────────────────
def rotate_key_cli() -> None:
"""Re-encrypt all stored tokens: OLD_SECRET_KEY → NEW_SECRET_KEY."""
old_key = os.environ.get("OLD_SECRET_KEY", "")
new_key = os.environ.get("NEW_SECRET_KEY", "")
if not old_key or not new_key:
sys.exit("FATAL: OLD_SECRET_KEY and NEW_SECRET_KEY must be set")
if not is_valid_fernet_key(old_key):
sys.exit("FATAL: OLD_SECRET_KEY is not a valid Fernet key")
if not is_valid_fernet_key(new_key):
sys.exit("FATAL: NEW_SECRET_KEY is not a valid Fernet key")
# Temporarily set env so get_config() reads the old key for DB access
os.environ["SECRET_KEY"] = old_key
# Reset lru_cache so we pick up the new env
get_config.cache_clear() # type: ignore[attr-defined]
create_db_and_tables()
from sqlmodel import Session, select as sql_select
from .crypto import decrypt, encrypt
from .models import Replica, Setting
engine = get_engine()
count = 0
with Session(engine) as session:
for replica in session.exec(sql_select(Replica)).all():
try:
plain = decrypt(replica.api_token, old_key)
replica.api_token = encrypt(plain, new_key)
session.add(replica)
count += 1
except Exception as e:
print(f"WARNING: Could not re-encrypt token for replica {replica.name}: {e}", file=sys.stderr)
for key in ("master_token", "alert_target_token"):
setting = session.get(Setting, key)
if setting and setting.value:
try:
plain = decrypt(setting.value, old_key)
setting.value = encrypt(plain, new_key)
session.add(setting)
count += 1
except Exception as e:
print(f"WARNING: Could not re-encrypt {key}: {e}", file=sys.stderr)
session.commit()
print(f"rotate-key: re-encrypted {count} token(s) successfully")
print("Restart the container with the new SECRET_KEY.")
def cli_entry() -> None:
if len(sys.argv) > 1 and sys.argv[1] == "rotate-key":
rotate_key_cli()
else:
import uvicorn
uvicorn.run(
"app.main:app",
host=os.environ.get("BIND_HOST", "0.0.0.0"),
port=int(os.environ.get("PORT", "8000")),
log_level="warning",
)
if __name__ == "__main__":
cli_entry()