Files
ciaovolo/flight-comparator/searcher.py
domverse 6421f83ca7 Add flight comparator web app with full scan pipeline
Full-stack flight price scanner built on fast-flights v3 (SOCS cookie bypass):

Backend (FastAPI + SQLite):
- REST API with rate limiting, Pydantic v2 validation, paginated responses
- Scan pipeline: resolves airports, queries every day in the window, saves
  individual flights + aggregate route stats to SQLite
- Background async scan processor with real-time progress tracking
- Airport search endpoint backed by OpenFlights dataset
- Daily scan window (all dates, not monthly samples)

Frontend (React 19 + TypeScript + Tailwind CSS v4):
- Dashboard with live scan status and recent scans
- Create scan form: country mode or specific airports (searchable dropdown)
- Scan detail page with expandable route rows showing individual flights
  (date, airline, departure, arrival, price) loaded on demand
- AirportSearch component with debounced live search and multi-select

Database:
- scans → routes → flights schema with FK cascade and auto-update triggers
- Migrations for schema evolution (relaxed country constraint)

Tests:
- 74 tests: unit + integration, isolated per-test SQLite DB
- Confirmed flight fixtures in tests/confirmed_flights.json (50 real flights,
  BDS→FMM Ryanair + BDS→DUS Eurowings, scraped Feb 2026)
- Integration tests parametrized from confirmed routes

Docker:
- Multi-stage builds, Compose orchestration, Nginx reverse proxy

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-26 17:11:51 +01:00

241 lines
6.8 KiB
Python

"""
Flight search logic with concurrent queries.
Wraps fast-flights library with async concurrency and error handling.
"""
import asyncio
import random
import time
from typing import Optional
from datetime import datetime
try:
from fast_flights import FlightData, Passengers, get_flights
HAS_FAST_FLIGHTS = True
except ImportError:
HAS_FAST_FLIGHTS = False
print("⚠️ fast-flights not installed. Install with: pip install fast-flights")
async def search_direct_flights(
origin: str,
destination: str,
date: str,
seat_class: str = "economy",
adults: int = 1,
) -> list[dict]:
"""
Search for direct flights between two airports on a specific date.
Args:
origin: Origin airport IATA code
destination: Destination airport IATA code
date: Departure date in YYYY-MM-DD format
seat_class: Cabin class (economy, business, first)
adults: Number of passengers
Returns:
List of flight dicts with keys: origin, destination, airline, departure_time,
arrival_time, duration_minutes, price, currency, stops
"""
if not HAS_FAST_FLIGHTS:
return []
try:
# Add random delay to avoid rate limiting
await asyncio.sleep(random.uniform(0.5, 1.5))
# Run the synchronous get_flights in a thread pool
result = await asyncio.to_thread(
_search_flights_sync,
origin,
destination,
date,
seat_class,
adults,
)
return result
except Exception as e:
# Log but don't crash - return empty results
print(f"⚠️ {origin}->{destination} on {date}: {type(e).__name__}: {e}")
return []
def _search_flights_sync(
origin: str,
destination: str,
date: str,
seat_class: str,
adults: int,
) -> list[dict]:
"""
Synchronous flight search wrapper.
Called via asyncio.to_thread to avoid blocking the event loop.
"""
# Map seat class to fast-flights format
seat_map = {
"economy": "economy",
"premium": "premium-economy",
"business": "business",
"first": "first",
}
seat_string = seat_map.get(seat_class.lower(), "economy")
# Create flight data object
flight = FlightData(
date=date,
from_airport=origin,
to_airport=destination,
max_stops=0, # Direct flights only
)
passengers = Passengers(adults=adults)
# Query flights with common mode (tries common first, fallback if needed)
try:
result = get_flights(
flight_data=[flight], # Must be a list
trip='one-way',
passengers=passengers,
seat=seat_string,
fetch_mode='common', # Use common mode instead of fallback
)
except Exception as e:
# Retry once after a delay
time.sleep(2)
try:
result = get_flights(
flight_data=[flight],
trip='one-way',
passengers=passengers,
seat=seat_string,
fetch_mode='common',
)
except Exception as retry_error:
raise retry_error from e
# Filter to direct flights only and convert to our format
flights = []
if not result or not hasattr(result, 'flights'):
return flights
for flight_option in result.flights:
# Check if direct (0 stops)
# fast-flights may structure this differently, so we check multiple attributes
is_direct = False
# Method 1: Check stops attribute
if hasattr(flight_option, 'stops') and flight_option.stops == 0:
is_direct = True
# Method 2: Check if there's only one flight segment
if hasattr(flight_option, 'flight') and len(flight_option.flight) == 1:
is_direct = True
# Method 3: Check if departure and arrival airports match our query
# (no layovers in between)
if hasattr(flight_option, 'departure_airport') and hasattr(flight_option, 'arrival_airport'):
if (flight_option.departure_airport == origin and
flight_option.arrival_airport == destination):
is_direct = True
if not is_direct:
continue
# Extract flight details
flight_dict = {
"origin": origin,
"destination": destination,
"airline": getattr(flight_option, 'airline', 'Unknown'),
"departure_time": getattr(flight_option, 'departure_time', ''),
"arrival_time": getattr(flight_option, 'arrival_time', ''),
"duration_minutes": _parse_duration(getattr(flight_option, 'duration', '')),
"price": getattr(flight_option, 'price', 0),
"currency": getattr(flight_option, 'currency', 'USD'),
"stops": 0,
}
flights.append(flight_dict)
return flights
def _parse_duration(duration_str: str) -> int:
"""
Parse duration string to minutes.
Handles formats like "9h 30m", "9h", "90m"
Args:
duration_str: Duration string
Returns:
Total duration in minutes
"""
if not duration_str:
return 0
total_minutes = 0
# Extract hours
if 'h' in duration_str:
try:
hours_part = duration_str.split('h')[0].strip()
total_minutes += int(hours_part) * 60
except (ValueError, IndexError):
pass
# Extract minutes
if 'm' in duration_str:
try:
minutes_part = duration_str.split('h')[-1].split('m')[0].strip()
total_minutes += int(minutes_part)
except (ValueError, IndexError):
pass
return total_minutes
async def search_multiple_routes(
routes: list[tuple[str, str, str]],
seat_class: str = "economy",
adults: int = 1,
max_workers: int = 5,
) -> dict[tuple[str, str], list[dict]]:
"""
Search multiple routes concurrently.
Args:
routes: List of (origin, destination, date) tuples
seat_class: Cabin class
adults: Number of passengers
max_workers: Maximum concurrent requests
Returns:
Dict mapping (origin, date) tuples to lists of flight dicts
"""
# Create semaphore to limit concurrency
semaphore = asyncio.Semaphore(max_workers)
async def search_with_semaphore(origin: str, destination: str, date: str):
async with semaphore:
return (origin, date), await search_direct_flights(
origin, destination, date, seat_class, adults
)
# Execute all searches concurrently (but limited by semaphore)
tasks = [
search_with_semaphore(origin, destination, date)
for origin, destination, date in routes
]
results = await asyncio.gather(*tasks)
# Convert to dict
return dict(results)