"""Paperless-ngx REST API client with retry/backoff and semaphore throttling.""" import asyncio import time from typing import Any import httpx class PaperlessError(Exception): pass class PaperlessClient: def __init__(self, url: str, token: str, semaphore: asyncio.Semaphore) -> None: self.base_url = url.rstrip("/") self.token = token self.semaphore = semaphore self._client: httpx.AsyncClient | None = None async def __aenter__(self) -> "PaperlessClient": self._client = httpx.AsyncClient( headers={"Authorization": f"Token {self.token}"}, timeout=120.0, ) return self async def __aexit__(self, *args) -> None: if self._client: await self._client.aclose() self._client = None async def _request(self, method: str, path: str, **kwargs) -> httpx.Response: assert self._client is not None, "Use as async context manager" url = f"{self.base_url}{path}" delays = [2, 4, 8] last_exc: Exception | None = None for attempt in range(3): try: async with self.semaphore: r = await self._client.request(method, url, **kwargs) r.raise_for_status() return r except (httpx.NetworkError, httpx.TimeoutException, httpx.ConnectError) as e: last_exc = e if attempt < 2: await asyncio.sleep(delays[attempt]) except httpx.HTTPStatusError as e: if e.response.status_code >= 500: last_exc = e if attempt < 2: await asyncio.sleep(delays[attempt]) else: raise PaperlessError( f"HTTP {e.response.status_code} {method} {path}: {e.response.text[:300]}" ) from e raise PaperlessError(f"Request failed after 3 attempts: {last_exc}") from last_exc async def _get_all(self, path: str, params: dict | None = None) -> list[dict]: """Paginate through all results.""" results: list[dict] = [] page = 1 base_params = dict(params or {}) base_params["page_size"] = 100 while True: r = await self._request("GET", path, params={**base_params, "page": page}) data = r.json() results.extend(data.get("results", [])) if not data.get("next"): break page += 1 return results # ── Documents ────────────────────────────────────────────────────────────── async def get_documents_page( self, page: int = 1, modified_gte: str | None = None, page_size: int = 100, ) -> dict: params: dict[str, Any] = { "ordering": "modified", "page_size": page_size, "page": page, } if modified_gte: params["modified__gte"] = modified_gte r = await self._request("GET", "/api/documents/", params=params) return r.json() async def get_all_documents(self, modified_gte: str | None = None) -> list[dict]: docs: list[dict] = [] page = 1 while True: data = await self.get_documents_page(page=page, modified_gte=modified_gte) docs.extend(data.get("results", [])) if not data.get("next"): break page += 1 return docs async def get_document(self, doc_id: int) -> dict: r = await self._request("GET", f"/api/documents/{doc_id}/") return r.json() async def download_document(self, doc_id: int, original: bool = True) -> bytes: params: dict[str, Any] = {} if not original: params["original"] = "false" r = await self._request("GET", f"/api/documents/{doc_id}/download/", params=params) return r.content async def post_document( self, file_bytes: bytes, filename: str, metadata: dict ) -> str: """Upload a document; returns the Celery task_id UUID string.""" # Build multipart as a list of tuples so all parts go through a single # AsyncByteStream — required for httpx >= 0.28 with AsyncClient. parts: list[tuple[str, tuple]] = [ ("document", (filename, file_bytes, "application/octet-stream")), ] for key in ("title", "created", "archive_serial_number"): val = metadata.get(key) if val is not None: parts.append((key, (None, str(val), "text/plain"))) if metadata.get("correspondent") is not None: parts.append(("correspondent", (None, str(metadata["correspondent"]), "text/plain"))) if metadata.get("document_type") is not None: parts.append(("document_type", (None, str(metadata["document_type"]), "text/plain"))) for tag_id in metadata.get("tags", []): parts.append(("tags", (None, str(tag_id), "text/plain"))) r = await self._request( "POST", "/api/documents/post_document/", files=parts, ) result = r.json() # API returns a plain task UUID string if isinstance(result, str): return result # Some versions wrap it if isinstance(result, dict): return result.get("task_id", result.get("id", "")) return str(result) async def patch_document(self, doc_id: int, metadata: dict) -> dict: r = await self._request("PATCH", f"/api/documents/{doc_id}/", json=metadata) return r.json() async def get_task(self, task_id: str) -> dict: r = await self._request("GET", "/api/tasks/", params={"task_id": task_id}) results = r.json() if isinstance(results, list) and results: return results[0] return {} # ── Metadata entities ────────────────────────────────────────────────────── async def get_tags(self) -> list[dict]: return await self._get_all("/api/tags/") async def get_correspondents(self) -> list[dict]: return await self._get_all("/api/correspondents/") async def get_document_types(self) -> list[dict]: return await self._get_all("/api/document_types/") async def get_custom_fields(self) -> list[dict]: return await self._get_all("/api/custom_fields/") async def create_tag(self, name: str, **kwargs) -> dict: r = await self._request("POST", "/api/tags/", json={"name": name, **kwargs}) return r.json() async def create_correspondent(self, name: str, **kwargs) -> dict: r = await self._request( "POST", "/api/correspondents/", json={"name": name, **kwargs} ) return r.json() async def create_document_type(self, name: str, **kwargs) -> dict: r = await self._request( "POST", "/api/document_types/", json={"name": name, **kwargs} ) return r.json() async def create_custom_field(self, name: str, data_type: str, **kwargs) -> dict: r = await self._request( "POST", "/api/custom_fields/", json={"name": name, "data_type": data_type, **kwargs}, ) return r.json() async def test_connection(self) -> dict: """Returns {ok, error, latency_ms, doc_count}.""" t0 = time.monotonic() try: r = await self._request("GET", "/api/documents/", params={"page_size": 1}) elapsed = int((time.monotonic() - t0) * 1000) data = r.json() return { "ok": True, "error": None, "latency_ms": elapsed, "doc_count": data.get("count", 0), } except Exception as e: return {"ok": False, "error": str(e), "latency_ms": 0, "doc_count": 0}