267 lines
11 KiB
Python
267 lines
11 KiB
Python
"""
|
|
Phase C — Pull with Live Output Tests
|
|
|
|
Tests for POST /pull (job start) and GET /stream/{job_id} (SSE streaming).
|
|
The sync subprocess is mocked so tests do not require a live Outline instance.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from helpers import make_mock_process # noqa: E402
|
|
|
|
pytestmark = pytest.mark.asyncio
|
|
|
|
|
|
async def consume_sse(client, job_id: str, max_events: int = 50) -> list[dict]:
|
|
"""Stream SSE events until 'done' or max_events reached."""
|
|
events = []
|
|
async with client.stream("GET", f"/stream/{job_id}") as r:
|
|
assert r.status_code == 200
|
|
assert "text/event-stream" in r.headers.get("content-type", "")
|
|
async for line in r.aiter_lines():
|
|
if line.startswith("data:"):
|
|
try:
|
|
events.append(json.loads(line[5:].strip()))
|
|
except json.JSONDecodeError:
|
|
events.append({"raw": line[5:].strip()})
|
|
if events and events[-1].get("type") == "done":
|
|
break
|
|
if len(events) >= max_events:
|
|
break
|
|
return events
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# US-C1 — POST /pull starts a job and returns a job_id
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPullJobCreation:
|
|
|
|
async def test_post_pull_returns_202(self, client):
|
|
with patch("webui.run_sync_job", new_callable=AsyncMock) as _mock_job:
|
|
r = await client.post("/pull")
|
|
assert r.status_code in (200, 202)
|
|
|
|
async def test_post_pull_returns_job_id(self, client):
|
|
with patch("webui.run_sync_job", new_callable=AsyncMock) as _mock_job:
|
|
r = await client.post("/pull")
|
|
data = r.json()
|
|
assert "job_id" in data, "Response must include a job_id"
|
|
assert isinstance(data["job_id"], str)
|
|
assert len(data["job_id"]) > 0
|
|
|
|
async def test_post_pull_returns_stream_url(self, client):
|
|
with patch("webui.run_sync_job", new_callable=AsyncMock) as _mock_job:
|
|
r = await client.post("/pull")
|
|
data = r.json()
|
|
# Either stream_url or job_id is sufficient to construct the SSE URL
|
|
assert "job_id" in data or "stream_url" in data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# US-C1 — SSE stream emits progress events
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPullStreaming:
|
|
|
|
async def test_stream_content_type_is_sse(self, client):
|
|
"""GET /stream/{job_id} must return text/event-stream."""
|
|
with patch("webui.run_sync_job", new_callable=AsyncMock) as _mock_job:
|
|
r = await client.post("/pull")
|
|
job_id = r.json()["job_id"]
|
|
|
|
async with client.stream("GET", f"/stream/{job_id}") as stream:
|
|
assert "text/event-stream" in stream.headers.get("content-type", "")
|
|
|
|
async def test_stream_emits_data_events(self, client):
|
|
"""Stream must yield at least one data event."""
|
|
pull_lines = [
|
|
"Fetching collections...",
|
|
"Processing Bewerbungen/CV.md",
|
|
"Processing Infra/HomeLab.md",
|
|
"Done. 2 updated, 0 created.",
|
|
]
|
|
|
|
with patch("webui.spawn_sync_subprocess", new_callable=AsyncMock) as mock_spawn:
|
|
mock_spawn.return_value = make_mock_process(pull_lines)
|
|
r = await client.post("/pull")
|
|
job_id = r.json()["job_id"]
|
|
events = await consume_sse(client, job_id)
|
|
|
|
assert len(events) >= 1, "Stream must emit at least one event"
|
|
|
|
async def test_stream_ends_with_done_event(self, client):
|
|
"""Last event in the stream must be type=done."""
|
|
pull_lines = [
|
|
"Fetching collections...",
|
|
"Done. 1 updated.",
|
|
]
|
|
|
|
with patch("webui.spawn_sync_subprocess", new_callable=AsyncMock) as mock_spawn:
|
|
mock_spawn.return_value = make_mock_process(pull_lines)
|
|
r = await client.post("/pull")
|
|
job_id = r.json()["job_id"]
|
|
events = await consume_sse(client, job_id)
|
|
|
|
done_events = [e for e in events if e.get("type") == "done"]
|
|
assert len(done_events) >= 1, "Stream must end with a 'done' event"
|
|
|
|
async def test_stream_done_event_contains_summary(self, client):
|
|
"""The done event must include summary statistics."""
|
|
pull_lines = [
|
|
"Done. 2 updated, 1 created, 0 errors.",
|
|
]
|
|
|
|
with patch("webui.spawn_sync_subprocess", new_callable=AsyncMock) as mock_spawn:
|
|
mock_spawn.return_value = make_mock_process(pull_lines)
|
|
r = await client.post("/pull")
|
|
job_id = r.json()["job_id"]
|
|
events = await consume_sse(client, job_id)
|
|
|
|
done = next((e for e in events if e.get("type") == "done"), None)
|
|
assert done is not None
|
|
# Summary can be in 'message', 'summary', or top-level 'data' text
|
|
summary_text = json.dumps(done)
|
|
assert any(word in summary_text for word in ("updated", "created", "done", "0")), (
|
|
"Done event must contain a summary"
|
|
)
|
|
|
|
async def test_stream_includes_per_file_events(self, client):
|
|
"""Each processed file should generate its own progress event."""
|
|
pull_lines = [
|
|
"processing: Bewerbungen/CV.md",
|
|
"ok: Bewerbungen/CV.md updated",
|
|
"processing: Infra/HomeLab.md",
|
|
"ok: Infra/HomeLab.md updated",
|
|
"Done. 2 updated.",
|
|
]
|
|
|
|
with patch("webui.spawn_sync_subprocess", new_callable=AsyncMock) as mock_spawn:
|
|
mock_spawn.return_value = make_mock_process(pull_lines)
|
|
r = await client.post("/pull")
|
|
job_id = r.json()["job_id"]
|
|
events = await consume_sse(client, job_id)
|
|
|
|
all_text = json.dumps(events)
|
|
assert "CV.md" in all_text or "Bewerbungen" in all_text, (
|
|
"Stream events should reference processed files"
|
|
)
|
|
|
|
async def test_stream_for_unknown_job_returns_404(self, client):
|
|
r = await client.get("/stream/nonexistent-job-id-xyz")
|
|
assert r.status_code == 404
|
|
|
|
async def test_failed_sync_emits_error_event(self, client):
|
|
"""If the sync process exits with non-zero, stream must emit an error event."""
|
|
with patch("webui.spawn_sync_subprocess", new_callable=AsyncMock) as mock_spawn:
|
|
mock_spawn.return_value = make_mock_process(
|
|
["Error: API connection failed"], returncode=1
|
|
)
|
|
r = await client.post("/pull")
|
|
job_id = r.json()["job_id"]
|
|
events = await consume_sse(client, job_id)
|
|
|
|
error_events = [e for e in events if e.get("type") in ("error", "done")]
|
|
assert any(
|
|
e.get("success") is False or e.get("type") == "error"
|
|
for e in error_events
|
|
), "Failed sync must emit an error event"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# US-C2 — Pull content actually updates vault files
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPullContent:
|
|
|
|
async def test_pull_advances_outline_branch(self, client, populated_vault):
|
|
"""
|
|
After a pull that introduces a new document, the outline branch
|
|
must have a new commit compared to before.
|
|
"""
|
|
import subprocess as sp
|
|
before = sp.run(
|
|
["git", "-C", str(populated_vault), "rev-parse", "outline"],
|
|
capture_output=True, text=True,
|
|
).stdout.strip()
|
|
|
|
# Simulate pull writing a new file to outline branch
|
|
new_file = populated_vault / "Projekte" / "FreshDoc.md"
|
|
new_file.parent.mkdir(exist_ok=True)
|
|
new_file.write_text("---\noutline_id: doc-new-001\n---\n# Fresh Doc\n")
|
|
|
|
import subprocess as sp2
|
|
sp2.run(["git", "-C", str(populated_vault), "checkout", "outline"], check=True, capture_output=True)
|
|
sp2.run(["git", "-C", str(populated_vault), "add", "-A"], check=True, capture_output=True)
|
|
sp2.run(["git", "-C", str(populated_vault), "commit", "-m", "outline: new doc"], check=True, capture_output=True)
|
|
after = sp2.run(
|
|
["git", "-C", str(populated_vault), "rev-parse", "outline"],
|
|
capture_output=True, text=True,
|
|
).stdout.strip()
|
|
|
|
assert before != after, "outline branch must advance after pull"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# US-C3 — Idempotent pull
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPullIdempotent:
|
|
|
|
async def test_pull_with_no_changes_returns_success(self, client):
|
|
"""A pull against an empty diff must succeed (not error)."""
|
|
with patch("webui.spawn_sync_subprocess", new_callable=AsyncMock) as mock_spawn:
|
|
mock_spawn.return_value = make_mock_process(
|
|
["No changes from Outline.", "Done. 0 updated."], returncode=0
|
|
)
|
|
r = await client.post("/pull")
|
|
assert r.status_code in (200, 202)
|
|
|
|
async def test_pull_twice_is_safe(self, client):
|
|
"""Two sequential pulls must both succeed."""
|
|
# Keep patch active until SSE stream finishes so the task can run
|
|
with patch("webui.spawn_sync_subprocess", new_callable=AsyncMock) as mock_spawn:
|
|
mock_spawn.return_value = make_mock_process(["Done. 0 updated."])
|
|
r1 = await client.post("/pull")
|
|
assert r1.status_code in (200, 202)
|
|
await consume_sse(client, r1.json()["job_id"]) # drain → job completes
|
|
|
|
with patch("webui.spawn_sync_subprocess", new_callable=AsyncMock) as mock_spawn:
|
|
mock_spawn.return_value = make_mock_process(["Done. 0 updated."])
|
|
r2 = await client.post("/pull")
|
|
assert r2.status_code in (200, 202)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# US-C4 — Job lock prevents concurrent syncs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestJobLock:
|
|
|
|
async def test_concurrent_pull_returns_409(self, client):
|
|
"""
|
|
Starting a second pull while the first is pending/running returns 409.
|
|
_active_job is set immediately when POST /pull is called.
|
|
"""
|
|
r1 = await client.post("/pull")
|
|
assert r1.status_code in (200, 202), "First pull must be accepted"
|
|
|
|
# _active_job is now set — second pull must be rejected
|
|
r2 = await client.post("/pull")
|
|
assert r2.status_code == 409, (
|
|
"Second pull while first is pending must return 409 Conflict"
|
|
)
|
|
|
|
async def test_push_while_pull_running_returns_409(self, client):
|
|
"""Push is also blocked while a pull is pending/running."""
|
|
await client.post("/pull")
|
|
r = await client.post("/push")
|
|
assert r.status_code == 409
|