Full-stack flight price scanner built on fast-flights v3 (SOCS cookie bypass): Backend (FastAPI + SQLite): - REST API with rate limiting, Pydantic v2 validation, paginated responses - Scan pipeline: resolves airports, queries every day in the window, saves individual flights + aggregate route stats to SQLite - Background async scan processor with real-time progress tracking - Airport search endpoint backed by OpenFlights dataset - Daily scan window (all dates, not monthly samples) Frontend (React 19 + TypeScript + Tailwind CSS v4): - Dashboard with live scan status and recent scans - Create scan form: country mode or specific airports (searchable dropdown) - Scan detail page with expandable route rows showing individual flights (date, airline, departure, arrival, price) loaded on demand - AirportSearch component with debounced live search and multi-select Database: - scans → routes → flights schema with FK cascade and auto-update triggers - Migrations for schema evolution (relaxed country constraint) Tests: - 74 tests: unit + integration, isolated per-test SQLite DB - Confirmed flight fixtures in tests/confirmed_flights.json (50 real flights, BDS→FMM Ryanair + BDS→DUS Eurowings, scraped Feb 2026) - Integration tests parametrized from confirmed routes Docker: - Multi-stage builds, Compose orchestration, Nginx reverse proxy Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
312 lines
11 KiB
Python
312 lines
11 KiB
Python
"""
|
|
Scan Processor - Background worker for flight scans
|
|
|
|
This module processes pending flight scans by:
|
|
1. Querying flights using searcher_v3.py (with SOCS cookie integration)
|
|
2. Updating scan status and progress in real-time
|
|
3. Saving discovered routes to the database
|
|
|
|
Runs as async background tasks within the FastAPI application.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, date, timedelta
|
|
from typing import Dict, List, Optional
|
|
import json
|
|
|
|
from database import get_connection
|
|
from airports import get_airports_for_country
|
|
from searcher_v3 import search_multiple_routes
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def process_scan(scan_id: int):
|
|
"""
|
|
Process a pending scan by querying flights and saving routes.
|
|
|
|
Args:
|
|
scan_id: The ID of the scan to process
|
|
|
|
This function:
|
|
1. Updates scan status to 'running'
|
|
2. Resolves destination airports from country
|
|
3. Queries flights for each destination
|
|
4. Saves routes to database
|
|
5. Updates progress counters in real-time
|
|
6. Sets final status to 'completed' or 'failed'
|
|
"""
|
|
conn = None
|
|
try:
|
|
logger.info(f"[Scan {scan_id}] Starting scan processing")
|
|
|
|
# Get scan details
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT origin, country, start_date, end_date, seat_class, adults
|
|
FROM scans
|
|
WHERE id = ?
|
|
""", (scan_id,))
|
|
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
logger.error(f"[Scan {scan_id}] Scan not found in database")
|
|
return
|
|
|
|
origin, country_or_airports, start_date_str, end_date_str, seat_class, adults = row
|
|
|
|
logger.info(f"[Scan {scan_id}] Scan details: {origin} -> {country_or_airports}, {start_date_str} to {end_date_str}")
|
|
|
|
# Update status to 'running'
|
|
cursor.execute("""
|
|
UPDATE scans
|
|
SET status = 'running', updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""", (scan_id,))
|
|
conn.commit()
|
|
|
|
# Determine mode: country (2 letters) or specific airports (comma-separated)
|
|
try:
|
|
if len(country_or_airports) == 2 and country_or_airports.isalpha():
|
|
# Country mode: resolve airports from country code
|
|
logger.info(f"[Scan {scan_id}] Mode: Country search ({country_or_airports})")
|
|
destinations = get_airports_for_country(country_or_airports)
|
|
if not destinations:
|
|
raise ValueError(f"No airports found for country: {country_or_airports}")
|
|
|
|
destination_codes = [d['iata'] for d in destinations]
|
|
|
|
logger.info(f"[Scan {scan_id}] Found {len(destination_codes)} destination airports: {destination_codes}")
|
|
|
|
else:
|
|
# Specific airports mode: parse comma-separated list
|
|
destination_codes = [code.strip() for code in country_or_airports.split(',')]
|
|
destinations = [] # No pre-fetched airport details; fallback to IATA code as name
|
|
logger.info(f"[Scan {scan_id}] Mode: Specific airports ({len(destination_codes)} destinations: {destination_codes})")
|
|
|
|
except Exception as e:
|
|
logger.error(f"[Scan {scan_id}] Failed to resolve airports: {str(e)}")
|
|
cursor.execute("""
|
|
UPDATE scans
|
|
SET status = 'failed',
|
|
error_message = ?,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""", (f"Failed to resolve airports: {str(e)}", scan_id))
|
|
conn.commit()
|
|
return
|
|
|
|
# Note: Don't update total_routes yet - we'll set it after we know the actual number of route queries
|
|
|
|
# Generate dates to scan — every day in the window
|
|
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
|
|
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
|
|
|
|
dates = []
|
|
current = start_date
|
|
while current <= end_date:
|
|
dates.append(current.strftime('%Y-%m-%d'))
|
|
current += timedelta(days=1)
|
|
|
|
logger.info(f"[Scan {scan_id}] Will scan {len(dates)} dates: {dates}")
|
|
|
|
# Build routes list: [(origin, destination, date), ...]
|
|
routes_to_scan = []
|
|
for dest in destination_codes:
|
|
for scan_date in dates:
|
|
routes_to_scan.append((origin, dest, scan_date))
|
|
|
|
logger.info(f"[Scan {scan_id}] Total route queries: {len(routes_to_scan)}")
|
|
|
|
# Update total_routes with actual number of queries
|
|
cursor.execute("""
|
|
UPDATE scans
|
|
SET total_routes = ?,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""", (len(routes_to_scan), scan_id))
|
|
conn.commit()
|
|
|
|
# Progress callback to update database
|
|
# Signature: callback(origin, destination, date, status, count, error=None)
|
|
routes_scanned_count = 0
|
|
|
|
def progress_callback(origin: str, destination: str, date: str,
|
|
status: str, count: int, error: str = None):
|
|
nonlocal routes_scanned_count
|
|
|
|
# Increment counter for each route query (cache hit or API call)
|
|
if status in ('cache_hit', 'api_success', 'error'):
|
|
routes_scanned_count += 1
|
|
|
|
# Update progress in database
|
|
try:
|
|
progress_conn = get_connection()
|
|
progress_cursor = progress_conn.cursor()
|
|
|
|
progress_cursor.execute("""
|
|
UPDATE scans
|
|
SET routes_scanned = routes_scanned + 1,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""", (scan_id,))
|
|
|
|
progress_conn.commit()
|
|
progress_conn.close()
|
|
|
|
if routes_scanned_count % 10 == 0: # Log every 10 routes
|
|
logger.info(f"[Scan {scan_id}] Progress: {routes_scanned_count}/{len(routes_to_scan)} routes ({status}: {origin}→{destination})")
|
|
|
|
except Exception as e:
|
|
logger.error(f"[Scan {scan_id}] Failed to update progress: {str(e)}")
|
|
|
|
# Query flights using searcher_v3
|
|
logger.info(f"[Scan {scan_id}] Starting flight queries...")
|
|
|
|
results = await search_multiple_routes(
|
|
routes=routes_to_scan,
|
|
seat_class=seat_class or 'economy',
|
|
adults=adults or 1,
|
|
use_cache=True,
|
|
cache_threshold_hours=24,
|
|
max_workers=3, # Limit concurrency to avoid rate limiting
|
|
progress_callback=progress_callback
|
|
)
|
|
|
|
logger.info(f"[Scan {scan_id}] Flight queries complete. Processing results...")
|
|
|
|
# Group results by destination, preserving date per flight
|
|
# Structure: {dest: [(flight_dict, date), ...]}
|
|
routes_by_destination: Dict[str, List] = {}
|
|
total_flights = 0
|
|
|
|
for (orig, dest, scan_date), flights in results.items():
|
|
if dest not in routes_by_destination:
|
|
routes_by_destination[dest] = []
|
|
|
|
for flight in flights:
|
|
routes_by_destination[dest].append((flight, scan_date))
|
|
total_flights += len(flights)
|
|
|
|
logger.info(f"[Scan {scan_id}] Found {total_flights} total flights across {len(routes_by_destination)} destinations")
|
|
|
|
# Save routes and individual flights to database
|
|
routes_saved = 0
|
|
for destination, flight_date_pairs in routes_by_destination.items():
|
|
if not flight_date_pairs:
|
|
continue # Skip destinations with no flights
|
|
|
|
flights = [f for f, _ in flight_date_pairs]
|
|
|
|
# Get destination details (fall back to IATA code if not in DB)
|
|
dest_info = next((d for d in destinations if d['iata'] == destination), None)
|
|
dest_name = dest_info.get('name', destination) if dest_info else destination
|
|
dest_city = dest_info.get('city', '') if dest_info else ''
|
|
|
|
# Calculate statistics
|
|
prices = [f.get('price') for f in flights if f.get('price')]
|
|
airlines = list(set(f.get('airline') for f in flights if f.get('airline')))
|
|
|
|
if not prices:
|
|
logger.info(f"[Scan {scan_id}] Skipping {destination} - no prices available")
|
|
continue
|
|
|
|
min_price = min(prices)
|
|
max_price = max(prices)
|
|
avg_price = sum(prices) / len(prices)
|
|
|
|
# Insert route summary
|
|
cursor.execute("""
|
|
INSERT INTO routes (
|
|
scan_id, destination, destination_name, destination_city,
|
|
min_price, max_price, avg_price, flight_count, airlines
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
scan_id,
|
|
destination,
|
|
dest_name,
|
|
dest_city,
|
|
min_price,
|
|
max_price,
|
|
avg_price,
|
|
len(flights),
|
|
json.dumps(airlines)
|
|
))
|
|
|
|
# Insert individual flights
|
|
for flight, flight_date in flight_date_pairs:
|
|
if not flight.get('price'):
|
|
continue
|
|
cursor.execute("""
|
|
INSERT INTO flights (
|
|
scan_id, destination, date, airline,
|
|
departure_time, arrival_time, price, stops
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
scan_id,
|
|
destination,
|
|
flight_date,
|
|
flight.get('airline'),
|
|
flight.get('departure_time'),
|
|
flight.get('arrival_time'),
|
|
flight.get('price'),
|
|
flight.get('stops', 0),
|
|
))
|
|
|
|
routes_saved += 1
|
|
|
|
conn.commit()
|
|
|
|
# Update scan to completed
|
|
cursor.execute("""
|
|
UPDATE scans
|
|
SET status = 'completed',
|
|
total_flights = ?,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""", (total_flights, scan_id))
|
|
conn.commit()
|
|
|
|
logger.info(f"[Scan {scan_id}] ✅ Scan completed successfully! {routes_saved} routes saved with {total_flights} flights")
|
|
|
|
except Exception as e:
|
|
logger.error(f"[Scan {scan_id}] ❌ Scan failed with error: {str(e)}", exc_info=True)
|
|
|
|
# Update scan to failed
|
|
try:
|
|
if conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
UPDATE scans
|
|
SET status = 'failed',
|
|
error_message = ?,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""", (str(e), scan_id))
|
|
conn.commit()
|
|
except Exception as update_error:
|
|
logger.error(f"[Scan {scan_id}] Failed to update error status: {str(update_error)}")
|
|
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
|
|
def start_scan_processor(scan_id: int):
|
|
"""
|
|
Start processing a scan as a background task.
|
|
|
|
Args:
|
|
scan_id: The ID of the scan to process
|
|
|
|
Returns:
|
|
asyncio.Task: The background task
|
|
"""
|
|
task = asyncio.create_task(process_scan(scan_id))
|
|
logger.info(f"[Scan {scan_id}] Background task created")
|
|
return task
|