All checks were successful
Deploy / deploy (push) Successful in 30s
- Full FastAPI sync engine: master→replica document sync via paperless REST API - Web UI: dashboard, replicas, logs, settings (Jinja2 + HTMX + Pico CSS) - APScheduler background sync, SSE live log stream, Prometheus metrics - Fernet encryption for API tokens at rest - pngx.env credential file: written on save, pre-fills forms on load - Dockerfile with layer-cached uv build, Python healthcheck - docker-compose with host networking for Tailscale access - Gitea Actions workflow: version bump, secret injection, docker compose deploy Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
74 lines
1.8 KiB
Python
74 lines
1.8 KiB
Python
import asyncio
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
_sse_queues: list[asyncio.Queue] = []
|
|
|
|
|
|
def subscribe_sse() -> asyncio.Queue:
|
|
q: asyncio.Queue = asyncio.Queue(maxsize=200)
|
|
_sse_queues.append(q)
|
|
return q
|
|
|
|
|
|
def unsubscribe_sse(q: asyncio.Queue) -> None:
|
|
try:
|
|
_sse_queues.remove(q)
|
|
except ValueError:
|
|
pass
|
|
|
|
|
|
def emit_log(
|
|
level: str,
|
|
message: str,
|
|
*,
|
|
replica: str | None = None,
|
|
replica_id: int | None = None,
|
|
doc_id: int | None = None,
|
|
run_id: int | None = None,
|
|
session=None,
|
|
) -> Optional[int]:
|
|
"""Write to stdout JSON, optionally persist to DB and broadcast over SSE."""
|
|
ts = datetime.now(timezone.utc).isoformat()
|
|
stdout_payload: dict = {"ts": ts, "level": level, "msg": message}
|
|
if replica:
|
|
stdout_payload["replica"] = replica
|
|
if doc_id is not None:
|
|
stdout_payload["doc_id"] = doc_id
|
|
print(json.dumps(stdout_payload), flush=True)
|
|
|
|
log_id: Optional[int] = None
|
|
if session is not None:
|
|
from .models import Log
|
|
|
|
log = Log(
|
|
run_id=run_id,
|
|
replica_id=replica_id,
|
|
level=level,
|
|
message=message,
|
|
doc_id=doc_id,
|
|
)
|
|
session.add(log)
|
|
session.commit()
|
|
session.refresh(log)
|
|
log_id = log.id
|
|
|
|
sse_data = json.dumps(
|
|
{
|
|
"id": log_id,
|
|
"ts": ts,
|
|
"level": level,
|
|
"message": message,
|
|
"replica_id": replica_id,
|
|
"doc_id": doc_id,
|
|
}
|
|
)
|
|
for q in list(_sse_queues):
|
|
try:
|
|
q.put_nowait(sse_data)
|
|
except asyncio.QueueFull:
|
|
pass
|
|
|
|
return log_id
|