Files
pngx-sync/app/logger.py
domverse b99dbf694d
All checks were successful
Deploy / deploy (push) Successful in 30s
feat: implement pngx-controller with Gitea CI/CD deployment
- Full FastAPI sync engine: master→replica document sync via paperless REST API
- Web UI: dashboard, replicas, logs, settings (Jinja2 + HTMX + Pico CSS)
- APScheduler background sync, SSE live log stream, Prometheus metrics
- Fernet encryption for API tokens at rest
- pngx.env credential file: written on save, pre-fills forms on load
- Dockerfile with layer-cached uv build, Python healthcheck
- docker-compose with host networking for Tailscale access
- Gitea Actions workflow: version bump, secret injection, docker compose deploy

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-22 17:59:25 +01:00

74 lines
1.8 KiB
Python

import asyncio
import json
from datetime import datetime, timezone
from typing import Optional
_sse_queues: list[asyncio.Queue] = []
def subscribe_sse() -> asyncio.Queue:
q: asyncio.Queue = asyncio.Queue(maxsize=200)
_sse_queues.append(q)
return q
def unsubscribe_sse(q: asyncio.Queue) -> None:
try:
_sse_queues.remove(q)
except ValueError:
pass
def emit_log(
level: str,
message: str,
*,
replica: str | None = None,
replica_id: int | None = None,
doc_id: int | None = None,
run_id: int | None = None,
session=None,
) -> Optional[int]:
"""Write to stdout JSON, optionally persist to DB and broadcast over SSE."""
ts = datetime.now(timezone.utc).isoformat()
stdout_payload: dict = {"ts": ts, "level": level, "msg": message}
if replica:
stdout_payload["replica"] = replica
if doc_id is not None:
stdout_payload["doc_id"] = doc_id
print(json.dumps(stdout_payload), flush=True)
log_id: Optional[int] = None
if session is not None:
from .models import Log
log = Log(
run_id=run_id,
replica_id=replica_id,
level=level,
message=message,
doc_id=doc_id,
)
session.add(log)
session.commit()
session.refresh(log)
log_id = log.id
sse_data = json.dumps(
{
"id": log_id,
"ts": ts,
"level": level,
"message": message,
"replica_id": replica_id,
"doc_id": doc_id,
}
)
for q in list(_sse_queues):
try:
q.put_nowait(sse_data)
except asyncio.QueueFull:
pass
return log_id