Full-stack flight price scanner built on fast-flights v3 (SOCS cookie bypass): Backend (FastAPI + SQLite): - REST API with rate limiting, Pydantic v2 validation, paginated responses - Scan pipeline: resolves airports, queries every day in the window, saves individual flights + aggregate route stats to SQLite - Background async scan processor with real-time progress tracking - Airport search endpoint backed by OpenFlights dataset - Daily scan window (all dates, not monthly samples) Frontend (React 19 + TypeScript + Tailwind CSS v4): - Dashboard with live scan status and recent scans - Create scan form: country mode or specific airports (searchable dropdown) - Scan detail page with expandable route rows showing individual flights (date, airline, departure, arrival, price) loaded on demand - AirportSearch component with debounced live search and multi-select Database: - scans → routes → flights schema with FK cascade and auto-update triggers - Migrations for schema evolution (relaxed country constraint) Tests: - 74 tests: unit + integration, isolated per-test SQLite DB - Confirmed flight fixtures in tests/confirmed_flights.json (50 real flights, BDS→FMM Ryanair + BDS→DUS Eurowings, scraped Feb 2026) - Integration tests parametrized from confirmed routes Docker: - Multi-stage builds, Compose orchestration, Nginx reverse proxy Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
310 lines
11 KiB
Python
310 lines
11 KiB
Python
"""
|
|
Integration tests for Flight Radar Web App.
|
|
|
|
Tests that verify multiple components working together, including
|
|
database operations, full workflows, and system behavior.
|
|
"""
|
|
|
|
import pytest
|
|
import sqlite3
|
|
import time
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.database
|
|
class TestScanWorkflow:
|
|
"""Integration tests for complete scan workflow."""
|
|
|
|
def test_create_and_retrieve_scan(self, client: TestClient):
|
|
"""Test creating a scan and retrieving it."""
|
|
# Create scan
|
|
create_data = {
|
|
"origin": "BDS",
|
|
"country": "DE",
|
|
"start_date": "2026-04-01",
|
|
"end_date": "2026-06-30",
|
|
"adults": 2
|
|
}
|
|
|
|
create_response = client.post("/api/v1/scans", json=create_data)
|
|
assert create_response.status_code == 200
|
|
|
|
scan_id = create_response.json()["id"]
|
|
|
|
# Retrieve scan
|
|
get_response = client.get(f"/api/v1/scans/{scan_id}")
|
|
assert get_response.status_code == 200
|
|
|
|
scan = get_response.json()
|
|
assert scan["id"] == scan_id
|
|
assert scan["origin"] == create_data["origin"]
|
|
assert scan["country"] == create_data["country"]
|
|
assert scan["status"] == "pending"
|
|
|
|
def test_scan_appears_in_list(self, client: TestClient):
|
|
"""Test that created scan appears in list."""
|
|
# Create scan
|
|
create_response = client.post("/api/v1/scans", json={
|
|
"origin": "MUC",
|
|
"country": "IT"
|
|
})
|
|
|
|
scan_id = create_response.json()["id"]
|
|
|
|
# List scans
|
|
list_response = client.get("/api/v1/scans")
|
|
scans = list_response.json()["data"]
|
|
|
|
# Find our scan
|
|
found = any(scan["id"] == scan_id for scan in scans)
|
|
assert found
|
|
|
|
def test_scan_with_routes_workflow(self, client: TestClient, create_test_route):
|
|
"""Test creating scan and adding routes."""
|
|
# Create scan
|
|
create_response = client.post("/api/v1/scans", json={
|
|
"origin": "BDS",
|
|
"country": "DE"
|
|
})
|
|
|
|
scan_id = create_response.json()["id"]
|
|
|
|
# Add routes
|
|
create_test_route(scan_id=scan_id, destination="MUC", min_price=100)
|
|
create_test_route(scan_id=scan_id, destination="FRA", min_price=50)
|
|
create_test_route(scan_id=scan_id, destination="BER", min_price=75)
|
|
|
|
# Get routes
|
|
routes_response = client.get(f"/api/v1/scans/{scan_id}/routes")
|
|
assert routes_response.status_code == 200
|
|
|
|
routes = routes_response.json()["data"]
|
|
assert len(routes) == 3
|
|
|
|
# Check ordering (by price)
|
|
prices = [r["min_price"] for r in routes]
|
|
assert prices == sorted(prices)
|
|
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.database
|
|
class TestDatabaseOperations:
|
|
"""Integration tests for database operations."""
|
|
|
|
def test_foreign_key_constraints(self, client: TestClient, clean_database):
|
|
"""Test that foreign key constraints are enforced."""
|
|
# Try to create route for non-existent scan
|
|
conn = sqlite3.connect(clean_database)
|
|
conn.execute("PRAGMA foreign_keys = ON") # Enable foreign keys
|
|
cursor = conn.cursor()
|
|
|
|
with pytest.raises(sqlite3.IntegrityError):
|
|
cursor.execute("""
|
|
INSERT INTO routes (scan_id, destination, destination_name,
|
|
destination_city, flight_count, airlines)
|
|
VALUES (999, 'MUC', 'Munich', 'Munich', 10, '[]')
|
|
""")
|
|
conn.commit()
|
|
|
|
conn.close()
|
|
|
|
def test_cascade_delete(self, client: TestClient, create_test_scan, create_test_route, clean_database):
|
|
"""Test that deleting scan cascades to routes."""
|
|
# Create scan and routes
|
|
scan_id = create_test_scan()
|
|
create_test_route(scan_id=scan_id, destination="MUC")
|
|
create_test_route(scan_id=scan_id, destination="FRA")
|
|
|
|
# Delete scan
|
|
conn = sqlite3.connect(clean_database)
|
|
conn.execute("PRAGMA foreign_keys = ON") # Enable foreign keys for cascade
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("DELETE FROM scans WHERE id = ?", (scan_id,))
|
|
conn.commit()
|
|
|
|
# Check routes are deleted
|
|
cursor.execute("SELECT COUNT(*) FROM routes WHERE scan_id = ?", (scan_id,))
|
|
count = cursor.fetchone()[0]
|
|
|
|
conn.close()
|
|
|
|
assert count == 0
|
|
|
|
def test_timestamp_triggers(self, client: TestClient, create_test_scan, clean_database):
|
|
"""Test that timestamp triggers work."""
|
|
scan_id = create_test_scan()
|
|
|
|
# Get original timestamp
|
|
conn = sqlite3.connect(clean_database)
|
|
conn.execute("PRAGMA foreign_keys = ON") # Enable foreign keys
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT updated_at FROM scans WHERE id = ?", (scan_id,))
|
|
original_time = cursor.fetchone()[0]
|
|
|
|
# Wait a moment (SQLite CURRENT_TIMESTAMP has 1-second precision)
|
|
time.sleep(1.1)
|
|
|
|
# Update scan
|
|
cursor.execute("UPDATE scans SET status = 'running' WHERE id = ?", (scan_id,))
|
|
conn.commit()
|
|
|
|
# Get new timestamp
|
|
cursor.execute("SELECT updated_at FROM scans WHERE id = ?", (scan_id,))
|
|
new_time = cursor.fetchone()[0]
|
|
|
|
conn.close()
|
|
|
|
assert new_time != original_time
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestPaginationAcrossEndpoints:
|
|
"""Integration tests for pagination consistency."""
|
|
|
|
def test_pagination_metadata_consistency(self, client: TestClient, create_test_scan):
|
|
"""Test pagination metadata is consistent across endpoints."""
|
|
# Create 10 scans
|
|
for i in range(10):
|
|
create_test_scan()
|
|
|
|
# Test scans pagination
|
|
response = client.get("/api/v1/scans?page=1&limit=3")
|
|
data = response.json()
|
|
|
|
assert data["pagination"]["page"] == 1
|
|
assert data["pagination"]["limit"] == 3
|
|
assert data["pagination"]["total"] == 10
|
|
assert data["pagination"]["pages"] == 4
|
|
assert data["pagination"]["has_next"] is True
|
|
assert data["pagination"]["has_prev"] is False
|
|
|
|
def test_pagination_last_page(self, client: TestClient, create_test_scan):
|
|
"""Test pagination on last page."""
|
|
# Create 7 scans
|
|
for i in range(7):
|
|
create_test_scan()
|
|
|
|
# Get last page
|
|
response = client.get("/api/v1/scans?page=2&limit=5")
|
|
data = response.json()
|
|
|
|
assert data["pagination"]["page"] == 2
|
|
assert data["pagination"]["has_next"] is False
|
|
assert data["pagination"]["has_prev"] is True
|
|
assert len(data["data"]) == 2 # Only 2 items on last page
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestErrorHandlingIntegration:
|
|
"""Integration tests for error handling across the system."""
|
|
|
|
def test_error_logging(self, client: TestClient):
|
|
"""Test that errors are logged."""
|
|
# Trigger error
|
|
client.get("/api/v1/scans/999")
|
|
|
|
# Check logs contain error (would need to check log buffer)
|
|
# This is a basic integration test
|
|
response = client.get("/api/v1/logs?search=not+found")
|
|
# Just verify we can get logs, specific content may vary
|
|
assert response.status_code == 200
|
|
|
|
def test_request_id_consistency(self, client: TestClient):
|
|
"""Test that request ID is consistent in error response and headers."""
|
|
response = client.get("/api/v1/scans/999")
|
|
|
|
request_id_header = response.headers.get("x-request-id")
|
|
request_id_body = response.json().get("request_id")
|
|
|
|
assert request_id_header == request_id_body
|
|
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.slow
|
|
class TestRateLimitingIntegration:
|
|
"""Integration tests for rate limiting system."""
|
|
|
|
def test_rate_limit_per_endpoint(self, client: TestClient):
|
|
"""Test that different endpoints have different rate limits."""
|
|
# Airports endpoint (100/min)
|
|
airport_response = client.get("/api/v1/airports?q=MUC")
|
|
airport_limit = int(airport_response.headers["x-ratelimit-limit"])
|
|
|
|
# Scans endpoint (10/min)
|
|
scan_response = client.post("/api/v1/scans", json={"origin": "BDS", "country": "DE"})
|
|
scan_limit = int(scan_response.headers["x-ratelimit-limit"])
|
|
|
|
# Different limits
|
|
assert airport_limit > scan_limit
|
|
assert airport_limit == 100
|
|
assert scan_limit == 10
|
|
|
|
def test_rate_limit_recovery(self, client: TestClient):
|
|
"""Test that rate limit counter is per-IP and independent."""
|
|
# Make some requests to airports
|
|
for i in range(3):
|
|
client.get("/api/v1/airports?q=MUC")
|
|
|
|
# Scans endpoint should have independent counter
|
|
response = client.post("/api/v1/scans", json={"origin": "BDS", "country": "DE"})
|
|
remaining = int(response.headers["x-ratelimit-remaining"])
|
|
|
|
# Should still have most of scan limit available (10 total, used 1)
|
|
assert remaining >= 8
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestStartupCleanup:
|
|
"""Integration tests for startup cleanup behavior."""
|
|
|
|
def test_stuck_scans_detection(self, client: TestClient, create_test_scan, clean_database):
|
|
"""Test that stuck scans are detected."""
|
|
# Create stuck scan
|
|
scan_id = create_test_scan(status="running")
|
|
|
|
# Verify it's in running state
|
|
conn = sqlite3.connect(clean_database)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT status FROM scans WHERE id = ?", (scan_id,))
|
|
status = cursor.fetchone()[0]
|
|
conn.close()
|
|
|
|
assert status == "running"
|
|
|
|
# Note: Actual cleanup happens on server restart, tested manually
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestValidationIntegration:
|
|
"""Integration tests for validation across the system."""
|
|
|
|
def test_validation_consistency(self, client: TestClient):
|
|
"""Test that validation is consistent across endpoints."""
|
|
# Invalid IATA code
|
|
response1 = client.post("/api/v1/scans", json={"origin": "TOOLONG", "country": "DE"})
|
|
assert response1.status_code == 422
|
|
|
|
# Invalid date format
|
|
response2 = client.post("/api/v1/scans", json={
|
|
"origin": "BDS",
|
|
"country": "DE",
|
|
"start_date": "01-04-2026" # Wrong format
|
|
})
|
|
assert response2.status_code == 422
|
|
|
|
def test_auto_normalization(self, client: TestClient):
|
|
"""Test that IATA codes are auto-normalized to uppercase."""
|
|
response = client.post("/api/v1/scans", json={
|
|
"origin": "bds", # lowercase
|
|
"country": "de" # lowercase
|
|
})
|
|
|
|
assert response.status_code == 200
|
|
scan = response.json()["scan"]
|
|
|
|
assert scan["origin"] == "BDS" # Uppercased
|
|
assert scan["country"] == "DE" # Uppercased
|