Files
ciaovolo/flight-comparator/searcher_v3.py
domverse 6421f83ca7 Add flight comparator web app with full scan pipeline
Full-stack flight price scanner built on fast-flights v3 (SOCS cookie bypass):

Backend (FastAPI + SQLite):
- REST API with rate limiting, Pydantic v2 validation, paginated responses
- Scan pipeline: resolves airports, queries every day in the window, saves
  individual flights + aggregate route stats to SQLite
- Background async scan processor with real-time progress tracking
- Airport search endpoint backed by OpenFlights dataset
- Daily scan window (all dates, not monthly samples)

Frontend (React 19 + TypeScript + Tailwind CSS v4):
- Dashboard with live scan status and recent scans
- Create scan form: country mode or specific airports (searchable dropdown)
- Scan detail page with expandable route rows showing individual flights
  (date, airline, departure, arrival, price) loaded on demand
- AirportSearch component with debounced live search and multi-select

Database:
- scans → routes → flights schema with FK cascade and auto-update triggers
- Migrations for schema evolution (relaxed country constraint)

Tests:
- 74 tests: unit + integration, isolated per-test SQLite DB
- Confirmed flight fixtures in tests/confirmed_flights.json (50 real flights,
  BDS→FMM Ryanair + BDS→DUS Eurowings, scraped Feb 2026)
- Integration tests parametrized from confirmed routes

Docker:
- Multi-stage builds, Compose orchestration, Nginx reverse proxy

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-26 17:11:51 +01:00

348 lines
12 KiB
Python

"""
Flight search logic with concurrent queries using fast-flights v3.0rc1.
Includes SOCS cookie integration to bypass Google consent page.
Includes SQLite caching to reduce API calls and avoid rate limiting.
"""
import asyncio
import random
import time
from typing import Optional
from datetime import datetime
try:
from cache import get_cached_results, save_results
HAS_CACHE = True
except ImportError:
HAS_CACHE = False
print("⚠️ Cache module not available - all queries will hit API")
try:
from fast_flights import FlightQuery, Passengers, get_flights, create_query
from fast_flights.integrations.base import Integration
from fast_flights.querying import Query
import primp
HAS_FAST_FLIGHTS = True
except ImportError:
HAS_FAST_FLIGHTS = False
print("⚠️ fast-flights v3.0rc1 not installed.")
print(" Install with: pip install --upgrade git+https://github.com/AWeirdDev/flights.git")
class SOCSCookieIntegration(Integration):
"""
Custom integration that adds SOCS cookie to bypass Google consent page.
SOCS (Secure-1P_SameSite-Cookies) is Google's consent state cookie.
Cookie value from: https://github.com/AWeirdDev/flights/issues/46
This cookie tells Google that the user has accepted cookies/consent,
allowing us to bypass the consent page and get flight data directly.
"""
# SOCS cookie value - stores consent state for 13 months
SOCS_COOKIE = 'CAESHwgBEhJnd3NfMjAyNTAyMjctMF9SQzIaBXpoLUNOIAEaBgiAy6O-Bg'
def fetch_html(self, q: Query | str, /) -> str:
"""
Fetch flights HTML with SOCS cookie attached.
Args:
q: Query object or query string
Returns:
HTML response from Google Flights
"""
# Create client with browser impersonation
client = primp.Client(
impersonate="chrome_145",
impersonate_os="macos",
cookie_store=True, # Enable cookie persistence
)
# Prepare query parameters
if isinstance(q, Query):
params = q.params()
else:
params = {"q": q}
# Make request with SOCS cookie
response = client.get(
"https://www.google.com/travel/flights",
params=params,
cookies={'SOCS': self.SOCS_COOKIE},
headers={
'Accept-Language': 'en-US,en;q=0.9',
}
)
return response.text
async def search_direct_flights(
origin: str,
destination: str,
date: str,
seat_class: str = "economy",
adults: int = 1,
cache_threshold_hours: int = 24,
use_cache: bool = True,
progress_callback=None,
) -> list[dict]:
"""
Search for direct flights between two airports on a specific date.
Checks cache first; only queries API if cache miss or expired.
Args:
origin: Origin airport IATA code
destination: Destination airport IATA code
date: Departure date in YYYY-MM-DD format
seat_class: Cabin class (economy, premium, business, first)
adults: Number of passengers
cache_threshold_hours: Maximum age of cached results in hours
use_cache: Whether to use cache (set False to force fresh query)
Returns:
List of flight dicts with keys: origin, destination, airline, departure_time,
arrival_time, duration_minutes, price, currency, stops
"""
if not HAS_FAST_FLIGHTS:
return []
try:
# Check cache first (if enabled)
if use_cache and HAS_CACHE:
cached = get_cached_results(
origin, destination, date, seat_class, adults, cache_threshold_hours
)
if cached is not None:
if progress_callback:
progress_callback(origin, destination, date, "cache_hit", len(cached))
return cached
# Add random delay to avoid rate limiting
await asyncio.sleep(random.uniform(0.5, 1.5))
# Run the search in a thread pool (fast-flights is synchronous)
result = await asyncio.to_thread(
_search_flights_sync,
origin,
destination,
date,
seat_class,
adults,
)
# Save to cache
if use_cache and HAS_CACHE and result:
save_results(origin, destination, date, seat_class, adults, result)
# Report progress
if progress_callback:
progress_callback(origin, destination, date, "api_success", len(result))
return result
except Exception as e:
# Log but don't crash - return empty results
import traceback
print(f"\n=== SEARCH ERROR ===")
print(f"Query: {origin}{destination} on {date}")
print(f"Error type: {type(e).__name__}")
print(f"Error message: {str(e)}")
print(f"Traceback:")
traceback.print_exc()
print("=" * 50)
if progress_callback:
progress_callback(origin, destination, date, "error", 0, str(e))
return []
def _search_flights_sync(
origin: str,
destination: str,
date: str,
seat_class: str,
adults: int,
) -> list[dict]:
"""
Synchronous flight search wrapper for v3 API.
Called via asyncio.to_thread to avoid blocking the event loop.
"""
# Create flight query
flights = [
FlightQuery(
date=date,
from_airport=origin,
to_airport=destination,
max_stops=0, # Direct flights only
)
]
# Create query with passengers and preferences
query = create_query(
flights=flights,
seat=seat_class,
trip="one-way",
passengers=Passengers(adults=adults),
)
# Create SOCS cookie integration
cookie_integration = SOCSCookieIntegration()
# Execute search with retry
try:
result = get_flights(query, integration=cookie_integration)
except Exception as e:
# Retry once after delay
time.sleep(2)
try:
result = get_flights(query, integration=cookie_integration)
except Exception as retry_error:
# Print detailed error for debugging
import traceback
print(f"\n=== FAST-FLIGHTS ERROR ===")
print(f"Query: {origin}{destination} on {date}")
print(f"Error: {retry_error}")
print(f"Traceback:")
traceback.print_exc()
print("=" * 50)
raise retry_error from e
# Convert v3 API result to our standard format
flights_list = []
try:
if isinstance(result, list):
for flight_option in result:
# Each flight_option has: type, price, airlines, flights, etc.
price = getattr(flight_option, 'price', None)
airlines = getattr(flight_option, 'airlines', [])
flight_segments = getattr(flight_option, 'flights', [])
# Validate flight_segments is a non-empty list
if not flight_segments or price is None:
continue
# Handle case where flights attribute exists but is None
if not isinstance(flight_segments, list):
continue
if len(flight_segments) == 0:
continue
# Get first segment (should be only one for direct flights)
segment = flight_segments[0]
# Validate segment is not None
if segment is None:
continue
# Extract flight details
from_airport = getattr(segment, 'from_airport', None)
to_airport = getattr(segment, 'to_airport', None)
departure = getattr(segment, 'departure', None)
arrival = getattr(segment, 'arrival', None)
duration = getattr(segment, 'duration', 0)
plane_type = getattr(segment, 'plane_type', '')
# Parse departure and arrival times (handle both [H] and [H, M] formats)
dep_time = ""
arr_time = ""
if departure and hasattr(departure, 'time') and isinstance(departure.time, (list, tuple)) and len(departure.time) >= 1:
try:
hours = departure.time[0]
minutes = departure.time[1] if len(departure.time) > 1 else 0
dep_time = f"{hours:02d}:{minutes:02d}"
except (IndexError, TypeError, ValueError):
dep_time = ""
if arrival and hasattr(arrival, 'time') and isinstance(arrival.time, (list, tuple)) and len(arrival.time) >= 1:
try:
hours = arrival.time[0]
minutes = arrival.time[1] if len(arrival.time) > 1 else 0
arr_time = f"{hours:02d}:{minutes:02d}"
except (IndexError, TypeError, ValueError):
arr_time = ""
# Only add flight if we have essential data (price and times)
if price and price > 0 and dep_time and arr_time:
flight_dict = {
"origin": origin,
"destination": destination,
"airline": airlines[0] if airlines else "Unknown",
"departure_time": dep_time,
"arrival_time": arr_time,
"duration_minutes": duration,
"price": price,
"currency": "", # fast-flights typically returns EUR for EU routes
"stops": 0,
"plane_type": plane_type,
}
flights_list.append(flight_dict)
except Exception as parse_error:
# Print detailed parsing error for debugging
import traceback
print(f"\n=== PARSING ERROR ===")
print(f"Query: {origin}{destination} on {date}")
print(f"Error: {parse_error}")
print(f"Result type: {type(result)}")
print(f"Result: {result}")
print(f"Traceback:")
traceback.print_exc()
print("=" * 50)
# Return empty list instead of crashing
return []
return flights_list
async def search_multiple_routes(
routes: list[tuple[str, str, str]],
seat_class: str = "economy",
adults: int = 1,
max_workers: int = 5,
cache_threshold_hours: int = 24,
use_cache: bool = True,
progress_callback=None,
) -> dict[tuple[str, str, str], list[dict]]:
"""
Search multiple routes concurrently.
Checks cache for each route before querying API.
Args:
routes: List of (origin, destination, date) tuples
seat_class: Cabin class
adults: Number of passengers
max_workers: Maximum concurrent requests
cache_threshold_hours: Maximum age of cached results in hours
use_cache: Whether to use cache (set False to force fresh queries)
Returns:
Dict mapping (origin, destination, date) tuples to lists of flight dicts
"""
# Create semaphore to limit concurrency
semaphore = asyncio.Semaphore(max_workers)
async def search_with_semaphore(origin: str, destination: str, date: str):
async with semaphore:
return (origin, destination, date), await search_direct_flights(
origin, destination, date, seat_class, adults,
cache_threshold_hours, use_cache, progress_callback
)
# Execute all searches concurrently (but limited by semaphore)
tasks = [
search_with_semaphore(origin, destination, date)
for origin, destination, date in routes
]
results = await asyncio.gather(*tasks)
# Convert to dict
return dict(results)