""" Tests for scan control endpoints: pause, cancel, resume. Covers API behaviour, DB state, status transitions, rate limit headers, and schema-level acceptance of the new 'paused' and 'cancelled' values. """ import pytest import sqlite3 from fastapi.testclient import TestClient # ============================================================================= # TestScanControlEndpoints — API unit tests # ============================================================================= @pytest.mark.unit @pytest.mark.api class TestScanControlEndpoints: """Tests for pause, cancel, and resume endpoints in isolation.""" # ── Pause ────────────────────────────────────────────────────────────── def test_pause_running_scan(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='running') resp = client.post(f"/api/v1/scans/{scan_id}/pause") assert resp.status_code == 200 body = resp.json() assert body["status"] == "paused" assert body["id"] == scan_id def test_pause_pending_scan(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='pending') resp = client.post(f"/api/v1/scans/{scan_id}/pause") assert resp.status_code == 200 assert resp.json()["status"] == "paused" def test_pause_nonexistent_scan(self, client: TestClient): resp = client.post("/api/v1/scans/99999/pause") assert resp.status_code == 404 def test_pause_completed_scan(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='completed') resp = client.post(f"/api/v1/scans/{scan_id}/pause") assert resp.status_code == 409 def test_pause_already_paused_scan(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='paused') resp = client.post(f"/api/v1/scans/{scan_id}/pause") assert resp.status_code == 409 def test_pause_cancelled_scan(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='cancelled') resp = client.post(f"/api/v1/scans/{scan_id}/pause") assert resp.status_code == 409 # ── Cancel ───────────────────────────────────────────────────────────── def test_cancel_running_scan(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='running') resp = client.post(f"/api/v1/scans/{scan_id}/cancel") assert resp.status_code == 200 assert resp.json()["status"] == "cancelled" def test_cancel_pending_scan(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='pending') resp = client.post(f"/api/v1/scans/{scan_id}/cancel") assert resp.status_code == 200 assert resp.json()["status"] == "cancelled" def test_cancel_nonexistent_scan(self, client: TestClient): resp = client.post("/api/v1/scans/99999/cancel") assert resp.status_code == 404 def test_cancel_completed_scan(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='completed') resp = client.post(f"/api/v1/scans/{scan_id}/cancel") assert resp.status_code == 409 def test_cancel_already_cancelled_scan(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='cancelled') resp = client.post(f"/api/v1/scans/{scan_id}/cancel") assert resp.status_code == 409 # ── Resume ───────────────────────────────────────────────────────────── def test_resume_paused_scan(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='paused') resp = client.post(f"/api/v1/scans/{scan_id}/resume") assert resp.status_code == 200 body = resp.json() assert body["status"] == "pending" assert body["id"] == scan_id def test_resume_nonexistent_scan(self, client: TestClient): resp = client.post("/api/v1/scans/99999/resume") assert resp.status_code == 404 def test_resume_running_scan(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='running') resp = client.post(f"/api/v1/scans/{scan_id}/resume") assert resp.status_code == 409 def test_resume_cancelled_scan(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='cancelled') resp = client.post(f"/api/v1/scans/{scan_id}/resume") assert resp.status_code == 409 def test_resume_completed_scan(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='completed') resp = client.post(f"/api/v1/scans/{scan_id}/resume") assert resp.status_code == 409 # ── Response shape ────────────────────────────────────────────────────── def test_pause_response_shape(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='running') body = client.post(f"/api/v1/scans/{scan_id}/pause").json() assert "id" in body assert "status" in body def test_cancel_response_shape(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='running') body = client.post(f"/api/v1/scans/{scan_id}/cancel").json() assert "id" in body assert "status" in body def test_resume_response_shape(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='paused') body = client.post(f"/api/v1/scans/{scan_id}/resume").json() assert "id" in body assert "status" in body # ============================================================================= # TestScanControlDatabaseState — verify DB state after operations # ============================================================================= @pytest.mark.database class TestScanControlDatabaseState: """Tests that verify SQLite state after pause/cancel/resume operations.""" def test_pause_sets_completed_at(self, client: TestClient, create_test_scan, clean_database): scan_id = create_test_scan(status='running') client.post(f"/api/v1/scans/{scan_id}/pause") conn = sqlite3.connect(clean_database) row = conn.execute("SELECT completed_at FROM scans WHERE id = ?", (scan_id,)).fetchone() conn.close() assert row[0] is not None def test_cancel_sets_completed_at(self, client: TestClient, create_test_scan, clean_database): scan_id = create_test_scan(status='running') client.post(f"/api/v1/scans/{scan_id}/cancel") conn = sqlite3.connect(clean_database) row = conn.execute("SELECT completed_at FROM scans WHERE id = ?", (scan_id,)).fetchone() conn.close() assert row[0] is not None def test_resume_clears_completed_at(self, client: TestClient, create_test_scan, clean_database): scan_id = create_test_scan(status='paused') client.post(f"/api/v1/scans/{scan_id}/resume") conn = sqlite3.connect(clean_database) row = conn.execute("SELECT completed_at FROM scans WHERE id = ?", (scan_id,)).fetchone() conn.close() assert row[0] is None def test_resume_resets_started_at_from_old_value(self, client: TestClient, create_test_scan, clean_database): """After resume, started_at is no longer the old seeded timestamp. The endpoint clears started_at; the background processor may then set a new timestamp immediately. Either way, the old value is gone. """ old_timestamp = '2026-01-01 10:00:00' scan_id = create_test_scan(status='paused') conn = sqlite3.connect(clean_database) conn.execute("UPDATE scans SET started_at = ? WHERE id = ?", (old_timestamp, scan_id)) conn.commit() conn.close() client.post(f"/api/v1/scans/{scan_id}/resume") conn = sqlite3.connect(clean_database) row = conn.execute("SELECT started_at FROM scans WHERE id = ?", (scan_id,)).fetchone() conn.close() # The endpoint cleared the old timestamp; the processor may have set a new one assert row[0] != old_timestamp def test_resume_resets_routes_scanned(self, client: TestClient, create_test_scan, clean_database): scan_id = create_test_scan(status='paused') conn = sqlite3.connect(clean_database) conn.execute("UPDATE scans SET routes_scanned = 50, total_routes = 100 WHERE id = ?", (scan_id,)) conn.commit() conn.close() client.post(f"/api/v1/scans/{scan_id}/resume") conn = sqlite3.connect(clean_database) row = conn.execute("SELECT routes_scanned FROM scans WHERE id = ?", (scan_id,)).fetchone() conn.close() assert row[0] == 0 def test_pause_preserves_routes( self, client: TestClient, create_test_scan, create_test_route, clean_database ): scan_id = create_test_scan(status='running') create_test_route(scan_id=scan_id, destination='MUC') client.post(f"/api/v1/scans/{scan_id}/pause") conn = sqlite3.connect(clean_database) count = conn.execute( "SELECT COUNT(*) FROM routes WHERE scan_id = ?", (scan_id,) ).fetchone()[0] conn.close() assert count == 1 def test_cancel_preserves_routes( self, client: TestClient, create_test_scan, create_test_route, clean_database ): scan_id = create_test_scan(status='running') create_test_route(scan_id=scan_id, destination='MUC') client.post(f"/api/v1/scans/{scan_id}/cancel") conn = sqlite3.connect(clean_database) count = conn.execute( "SELECT COUNT(*) FROM routes WHERE scan_id = ?", (scan_id,) ).fetchone()[0] conn.close() assert count == 1 # ============================================================================= # TestScanControlStatusTransitions — full workflow integration tests # ============================================================================= @pytest.mark.integration @pytest.mark.database class TestScanControlStatusTransitions: """Full workflow tests across multiple API calls.""" def test_running_to_paused_to_pending(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='running') # Pause it resp = client.post(f"/api/v1/scans/{scan_id}/pause") assert resp.json()["status"] == "paused" # Verify persisted assert client.get(f"/api/v1/scans/{scan_id}").json()["status"] == "paused" # Resume → pending (background processor moves to running) resp = client.post(f"/api/v1/scans/{scan_id}/resume") assert resp.json()["status"] == "pending" def test_running_to_cancelled(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='running') resp = client.post(f"/api/v1/scans/{scan_id}/cancel") assert resp.json()["status"] == "cancelled" assert client.get(f"/api/v1/scans/{scan_id}").json()["status"] == "cancelled" def test_pause_then_delete(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='paused') resp = client.delete(f"/api/v1/scans/{scan_id}") assert resp.status_code == 204 def test_cancel_then_delete(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='cancelled') resp = client.delete(f"/api/v1/scans/{scan_id}") assert resp.status_code == 204 def test_cannot_delete_running_scan(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='running') resp = client.delete(f"/api/v1/scans/{scan_id}") assert resp.status_code == 409 def test_cannot_delete_pending_scan(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='pending') resp = client.delete(f"/api/v1/scans/{scan_id}") assert resp.status_code == 409 def test_list_scans_filter_paused(self, client: TestClient, create_test_scan): paused_id = create_test_scan(status='paused') create_test_scan(status='running') create_test_scan(status='completed') resp = client.get("/api/v1/scans?status=paused") assert resp.status_code == 200 scans = resp.json()["data"] assert len(scans) >= 1 assert all(s["status"] == "paused" for s in scans) assert any(s["id"] == paused_id for s in scans) def test_list_scans_filter_cancelled(self, client: TestClient, create_test_scan): cancelled_id = create_test_scan(status='cancelled') create_test_scan(status='running') resp = client.get("/api/v1/scans?status=cancelled") assert resp.status_code == 200 scans = resp.json()["data"] assert len(scans) >= 1 assert all(s["status"] == "cancelled" for s in scans) assert any(s["id"] == cancelled_id for s in scans) # ============================================================================= # TestScanControlRateLimits — rate limit headers on control endpoints # ============================================================================= @pytest.mark.api class TestScanControlRateLimits: """Verify that rate limit response headers are present on control endpoints.""" def test_pause_rate_limit_headers(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='running') resp = client.post(f"/api/v1/scans/{scan_id}/pause") assert "x-ratelimit-limit" in resp.headers assert "x-ratelimit-remaining" in resp.headers def test_cancel_rate_limit_headers(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='running') resp = client.post(f"/api/v1/scans/{scan_id}/cancel") assert "x-ratelimit-limit" in resp.headers assert "x-ratelimit-remaining" in resp.headers def test_resume_rate_limit_headers(self, client: TestClient, create_test_scan): scan_id = create_test_scan(status='paused') resp = client.post(f"/api/v1/scans/{scan_id}/resume") assert "x-ratelimit-limit" in resp.headers assert "x-ratelimit-remaining" in resp.headers # ============================================================================= # TestScanControlNewStatuses — schema-level acceptance of new status values # ============================================================================= @pytest.mark.database class TestScanControlNewStatuses: """Verify the new status values are accepted/rejected at the SQLite level.""" def test_paused_status_accepted_by_schema(self, clean_database, create_test_scan): scan_id = create_test_scan(status='pending') conn = sqlite3.connect(clean_database) conn.execute("UPDATE scans SET status='paused' WHERE id = ?", (scan_id,)) conn.commit() row = conn.execute("SELECT status FROM scans WHERE id = ?", (scan_id,)).fetchone() conn.close() assert row[0] == 'paused' def test_cancelled_status_accepted_by_schema(self, clean_database, create_test_scan): scan_id = create_test_scan(status='pending') conn = sqlite3.connect(clean_database) conn.execute("UPDATE scans SET status='cancelled' WHERE id = ?", (scan_id,)) conn.commit() row = conn.execute("SELECT status FROM scans WHERE id = ?", (scan_id,)).fetchone() conn.close() assert row[0] == 'cancelled' def test_invalid_status_rejected_by_schema(self, clean_database, create_test_scan): scan_id = create_test_scan(status='pending') conn = sqlite3.connect(clean_database) with pytest.raises(sqlite3.IntegrityError): conn.execute("UPDATE scans SET status='stopped' WHERE id = ?", (scan_id,)) conn.commit() conn.close() def test_filter_active_scans_excludes_paused(self, clean_database, create_test_scan): paused_id = create_test_scan(status='paused') conn = sqlite3.connect(clean_database) rows = conn.execute("SELECT id FROM active_scans").fetchall() conn.close() ids = [r[0] for r in rows] assert paused_id not in ids def test_filter_active_scans_excludes_cancelled(self, clean_database, create_test_scan): cancelled_id = create_test_scan(status='cancelled') conn = sqlite3.connect(clean_database) rows = conn.execute("SELECT id FROM active_scans").fetchall() conn.close() ids = [r[0] for r in rows] assert cancelled_id not in ids