import asyncio import json from datetime import datetime, timezone from typing import Optional _sse_queues: list[asyncio.Queue] = [] def subscribe_sse() -> asyncio.Queue: q: asyncio.Queue = asyncio.Queue(maxsize=200) _sse_queues.append(q) return q def unsubscribe_sse(q: asyncio.Queue) -> None: try: _sse_queues.remove(q) except ValueError: pass def emit_log( level: str, message: str, *, replica: str | None = None, replica_id: int | None = None, doc_id: int | None = None, run_id: int | None = None, session=None, ) -> Optional[int]: """Write to stdout JSON, optionally persist to DB and broadcast over SSE.""" ts = datetime.now(timezone.utc).isoformat() stdout_payload: dict = {"ts": ts, "level": level, "msg": message} if replica: stdout_payload["replica"] = replica if doc_id is not None: stdout_payload["doc_id"] = doc_id print(json.dumps(stdout_payload), flush=True) log_id: Optional[int] = None if session is not None: from .models import Log log = Log( run_id=run_id, replica_id=replica_id, level=level, message=message, doc_id=doc_id, ) session.add(log) session.commit() session.refresh(log) log_id = log.id sse_data = json.dumps( { "id": log_id, "ts": ts, "level": level, "message": message, "replica_id": replica_id, "doc_id": doc_id, } ) for q in list(_sse_queues): try: q.put_nowait(sse_data) except asyncio.QueueFull: pass return log_id