All checks were successful
Deploy / deploy (push) Successful in 30s
- Full FastAPI sync engine: master→replica document sync via paperless REST API - Web UI: dashboard, replicas, logs, settings (Jinja2 + HTMX + Pico CSS) - APScheduler background sync, SSE live log stream, Prometheus metrics - Fernet encryption for API tokens at rest - pngx.env credential file: written on save, pre-fills forms on load - Dockerfile with layer-cached uv build, Python healthcheck - docker-compose with host networking for Tailscale access - Gitea Actions workflow: version bump, secret injection, docker compose deploy Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
110 lines
3.3 KiB
Python
110 lines
3.3 KiB
Python
import asyncio
|
|
import json
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, Query
|
|
from fastapi.responses import StreamingResponse
|
|
from sqlalchemy import text
|
|
from sqlmodel import Session, select
|
|
|
|
from ..database import get_session
|
|
from ..logger import subscribe_sse, unsubscribe_sse
|
|
from ..models import Log
|
|
|
|
router = APIRouter(prefix="/api/logs", tags=["logs"])
|
|
|
|
|
|
@router.get("")
|
|
def list_logs(
|
|
replica_id: Optional[int] = Query(None),
|
|
level: Optional[str] = Query(None),
|
|
from_dt: Optional[str] = Query(None, alias="from"),
|
|
to_dt: Optional[str] = Query(None, alias="to"),
|
|
q: Optional[str] = Query(None),
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(50, ge=1, le=200),
|
|
session: Session = Depends(get_session),
|
|
):
|
|
if q:
|
|
# FTS5 search — use SQLAlchemy execute() for raw SQL
|
|
fts_sql = text(
|
|
"SELECT l.id, l.run_id, l.replica_id, l.level, l.message, l.doc_id, l.created_at "
|
|
"FROM logs l JOIN logs_fts f ON l.id = f.rowid "
|
|
"WHERE logs_fts MATCH :q ORDER BY l.created_at DESC LIMIT :lim OFFSET :off"
|
|
)
|
|
offset = (page - 1) * page_size
|
|
rows = session.execute(fts_sql, {"q": q, "lim": page_size, "off": offset}).all()
|
|
return [dict(r._mapping) for r in rows]
|
|
|
|
stmt = select(Log)
|
|
if replica_id is not None:
|
|
stmt = stmt.where(Log.replica_id == replica_id)
|
|
if level:
|
|
stmt = stmt.where(Log.level == level)
|
|
if from_dt:
|
|
stmt = stmt.where(Log.created_at >= from_dt)
|
|
if to_dt:
|
|
stmt = stmt.where(Log.created_at <= to_dt)
|
|
stmt = stmt.order_by(Log.created_at.desc()) # type: ignore[attr-defined]
|
|
stmt = stmt.offset((page - 1) * page_size).limit(page_size)
|
|
|
|
logs = session.exec(stmt).all()
|
|
return [
|
|
{
|
|
"id": l.id,
|
|
"run_id": l.run_id,
|
|
"replica_id": l.replica_id,
|
|
"level": l.level,
|
|
"message": l.message,
|
|
"doc_id": l.doc_id,
|
|
"created_at": l.created_at.isoformat() if l.created_at else None,
|
|
}
|
|
for l in logs
|
|
]
|
|
|
|
|
|
@router.delete("")
|
|
def clear_logs(
|
|
older_than_days: int = Query(90, ge=1),
|
|
session: Session = Depends(get_session),
|
|
):
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
cutoff = datetime.now(timezone.utc) - timedelta(days=older_than_days)
|
|
old = session.exec(select(Log).where(Log.created_at < cutoff)).all()
|
|
count = len(old)
|
|
for log in old:
|
|
session.delete(log)
|
|
session.commit()
|
|
return {"deleted": count}
|
|
|
|
|
|
@router.get("/stream")
|
|
async def log_stream():
|
|
"""SSE endpoint for live log tail."""
|
|
|
|
async def generator():
|
|
q = subscribe_sse()
|
|
try:
|
|
yield "retry: 3000\n\n"
|
|
while True:
|
|
try:
|
|
data = await asyncio.wait_for(q.get(), timeout=30.0)
|
|
yield f"data: {data}\n\n"
|
|
except asyncio.TimeoutError:
|
|
# Send keepalive comment
|
|
yield ": keepalive\n\n"
|
|
except asyncio.CancelledError:
|
|
pass
|
|
finally:
|
|
unsubscribe_sse(q)
|
|
|
|
return StreamingResponse(
|
|
generator(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"X-Accel-Buffering": "no",
|
|
},
|
|
)
|