Files
outline-sync/tests/test_phase_c_pull.py
2026-03-07 20:54:59 +01:00

267 lines
11 KiB
Python

"""
Phase C — Pull with Live Output Tests
Tests for POST /pull (job start) and GET /stream/{job_id} (SSE streaming).
The sync subprocess is mocked so tests do not require a live Outline instance.
"""
import asyncio
import json
import sys
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
sys.path.insert(0, str(Path(__file__).parent))
from helpers import make_mock_process # noqa: E402
pytestmark = pytest.mark.asyncio
async def consume_sse(client, job_id: str, max_events: int = 50) -> list[dict]:
"""Stream SSE events until 'done' or max_events reached."""
events = []
async with client.stream("GET", f"/stream/{job_id}") as r:
assert r.status_code == 200
assert "text/event-stream" in r.headers.get("content-type", "")
async for line in r.aiter_lines():
if line.startswith("data:"):
try:
events.append(json.loads(line[5:].strip()))
except json.JSONDecodeError:
events.append({"raw": line[5:].strip()})
if events and events[-1].get("type") == "done":
break
if len(events) >= max_events:
break
return events
# ---------------------------------------------------------------------------
# US-C1 — POST /pull starts a job and returns a job_id
# ---------------------------------------------------------------------------
class TestPullJobCreation:
async def test_post_pull_returns_202(self, client):
with patch("webui.run_sync_job", new_callable=AsyncMock) as _mock_job:
r = await client.post("/pull")
assert r.status_code in (200, 202)
async def test_post_pull_returns_job_id(self, client):
with patch("webui.run_sync_job", new_callable=AsyncMock) as _mock_job:
r = await client.post("/pull")
data = r.json()
assert "job_id" in data, "Response must include a job_id"
assert isinstance(data["job_id"], str)
assert len(data["job_id"]) > 0
async def test_post_pull_returns_stream_url(self, client):
with patch("webui.run_sync_job", new_callable=AsyncMock) as _mock_job:
r = await client.post("/pull")
data = r.json()
# Either stream_url or job_id is sufficient to construct the SSE URL
assert "job_id" in data or "stream_url" in data
# ---------------------------------------------------------------------------
# US-C1 — SSE stream emits progress events
# ---------------------------------------------------------------------------
class TestPullStreaming:
async def test_stream_content_type_is_sse(self, client):
"""GET /stream/{job_id} must return text/event-stream."""
with patch("webui.run_sync_job", new_callable=AsyncMock) as _mock_job:
r = await client.post("/pull")
job_id = r.json()["job_id"]
async with client.stream("GET", f"/stream/{job_id}") as stream:
assert "text/event-stream" in stream.headers.get("content-type", "")
async def test_stream_emits_data_events(self, client):
"""Stream must yield at least one data event."""
pull_lines = [
"Fetching collections...",
"Processing Bewerbungen/CV.md",
"Processing Infra/HomeLab.md",
"Done. 2 updated, 0 created.",
]
with patch("webui.spawn_sync_subprocess", new_callable=AsyncMock) as mock_spawn:
mock_spawn.return_value = make_mock_process(pull_lines)
r = await client.post("/pull")
job_id = r.json()["job_id"]
events = await consume_sse(client, job_id)
assert len(events) >= 1, "Stream must emit at least one event"
async def test_stream_ends_with_done_event(self, client):
"""Last event in the stream must be type=done."""
pull_lines = [
"Fetching collections...",
"Done. 1 updated.",
]
with patch("webui.spawn_sync_subprocess", new_callable=AsyncMock) as mock_spawn:
mock_spawn.return_value = make_mock_process(pull_lines)
r = await client.post("/pull")
job_id = r.json()["job_id"]
events = await consume_sse(client, job_id)
done_events = [e for e in events if e.get("type") == "done"]
assert len(done_events) >= 1, "Stream must end with a 'done' event"
async def test_stream_done_event_contains_summary(self, client):
"""The done event must include summary statistics."""
pull_lines = [
"Done. 2 updated, 1 created, 0 errors.",
]
with patch("webui.spawn_sync_subprocess", new_callable=AsyncMock) as mock_spawn:
mock_spawn.return_value = make_mock_process(pull_lines)
r = await client.post("/pull")
job_id = r.json()["job_id"]
events = await consume_sse(client, job_id)
done = next((e for e in events if e.get("type") == "done"), None)
assert done is not None
# Summary can be in 'message', 'summary', or top-level 'data' text
summary_text = json.dumps(done)
assert any(word in summary_text for word in ("updated", "created", "done", "0")), (
"Done event must contain a summary"
)
async def test_stream_includes_per_file_events(self, client):
"""Each processed file should generate its own progress event."""
pull_lines = [
"processing: Bewerbungen/CV.md",
"ok: Bewerbungen/CV.md updated",
"processing: Infra/HomeLab.md",
"ok: Infra/HomeLab.md updated",
"Done. 2 updated.",
]
with patch("webui.spawn_sync_subprocess", new_callable=AsyncMock) as mock_spawn:
mock_spawn.return_value = make_mock_process(pull_lines)
r = await client.post("/pull")
job_id = r.json()["job_id"]
events = await consume_sse(client, job_id)
all_text = json.dumps(events)
assert "CV.md" in all_text or "Bewerbungen" in all_text, (
"Stream events should reference processed files"
)
async def test_stream_for_unknown_job_returns_404(self, client):
r = await client.get("/stream/nonexistent-job-id-xyz")
assert r.status_code == 404
async def test_failed_sync_emits_error_event(self, client):
"""If the sync process exits with non-zero, stream must emit an error event."""
with patch("webui.spawn_sync_subprocess", new_callable=AsyncMock) as mock_spawn:
mock_spawn.return_value = make_mock_process(
["Error: API connection failed"], returncode=1
)
r = await client.post("/pull")
job_id = r.json()["job_id"]
events = await consume_sse(client, job_id)
error_events = [e for e in events if e.get("type") in ("error", "done")]
assert any(
e.get("success") is False or e.get("type") == "error"
for e in error_events
), "Failed sync must emit an error event"
# ---------------------------------------------------------------------------
# US-C2 — Pull content actually updates vault files
# ---------------------------------------------------------------------------
class TestPullContent:
async def test_pull_advances_outline_branch(self, client, populated_vault):
"""
After a pull that introduces a new document, the outline branch
must have a new commit compared to before.
"""
import subprocess as sp
before = sp.run(
["git", "-C", str(populated_vault), "rev-parse", "outline"],
capture_output=True, text=True,
).stdout.strip()
# Simulate pull writing a new file to outline branch
new_file = populated_vault / "Projekte" / "FreshDoc.md"
new_file.parent.mkdir(exist_ok=True)
new_file.write_text("---\noutline_id: doc-new-001\n---\n# Fresh Doc\n")
import subprocess as sp2
sp2.run(["git", "-C", str(populated_vault), "checkout", "outline"], check=True, capture_output=True)
sp2.run(["git", "-C", str(populated_vault), "add", "-A"], check=True, capture_output=True)
sp2.run(["git", "-C", str(populated_vault), "commit", "-m", "outline: new doc"], check=True, capture_output=True)
after = sp2.run(
["git", "-C", str(populated_vault), "rev-parse", "outline"],
capture_output=True, text=True,
).stdout.strip()
assert before != after, "outline branch must advance after pull"
# ---------------------------------------------------------------------------
# US-C3 — Idempotent pull
# ---------------------------------------------------------------------------
class TestPullIdempotent:
async def test_pull_with_no_changes_returns_success(self, client):
"""A pull against an empty diff must succeed (not error)."""
with patch("webui.spawn_sync_subprocess", new_callable=AsyncMock) as mock_spawn:
mock_spawn.return_value = make_mock_process(
["No changes from Outline.", "Done. 0 updated."], returncode=0
)
r = await client.post("/pull")
assert r.status_code in (200, 202)
async def test_pull_twice_is_safe(self, client):
"""Two sequential pulls must both succeed."""
# Keep patch active until SSE stream finishes so the task can run
with patch("webui.spawn_sync_subprocess", new_callable=AsyncMock) as mock_spawn:
mock_spawn.return_value = make_mock_process(["Done. 0 updated."])
r1 = await client.post("/pull")
assert r1.status_code in (200, 202)
await consume_sse(client, r1.json()["job_id"]) # drain → job completes
with patch("webui.spawn_sync_subprocess", new_callable=AsyncMock) as mock_spawn:
mock_spawn.return_value = make_mock_process(["Done. 0 updated."])
r2 = await client.post("/pull")
assert r2.status_code in (200, 202)
# ---------------------------------------------------------------------------
# US-C4 — Job lock prevents concurrent syncs
# ---------------------------------------------------------------------------
class TestJobLock:
async def test_concurrent_pull_returns_409(self, client):
"""
Starting a second pull while the first is pending/running returns 409.
_active_job is set immediately when POST /pull is called.
"""
r1 = await client.post("/pull")
assert r1.status_code in (200, 202), "First pull must be accepted"
# _active_job is now set — second pull must be rejected
r2 = await client.post("/pull")
assert r2.status_code == 409, (
"Second pull while first is pending must return 409 Conflict"
)
async def test_push_while_pull_running_returns_409(self, client):
"""Push is also blocked while a pull is pending/running."""
await client.post("/pull")
r = await client.post("/push")
assert r.status_code == 409