""" Flight search logic with concurrent queries using fast-flights v3.0rc1. Includes SOCS cookie integration to bypass Google consent page. Includes SQLite caching to reduce API calls and avoid rate limiting. """ import asyncio import random import time from typing import Optional from datetime import datetime try: from cache import get_cached_results, save_results HAS_CACHE = True except ImportError: HAS_CACHE = False print("⚠️ Cache module not available - all queries will hit API") try: from fast_flights import FlightQuery, Passengers, get_flights, create_query from fast_flights.integrations.base import Integration from fast_flights.querying import Query import primp HAS_FAST_FLIGHTS = True except ImportError: HAS_FAST_FLIGHTS = False print("⚠️ fast-flights v3.0rc1 not installed.") print(" Install with: pip install --upgrade git+https://github.com/AWeirdDev/flights.git") class SOCSCookieIntegration(Integration): """ Custom integration that adds SOCS cookie to bypass Google consent page. SOCS (Secure-1P_SameSite-Cookies) is Google's consent state cookie. Cookie value from: https://github.com/AWeirdDev/flights/issues/46 This cookie tells Google that the user has accepted cookies/consent, allowing us to bypass the consent page and get flight data directly. """ # SOCS cookie value - stores consent state for 13 months SOCS_COOKIE = 'CAESHwgBEhJnd3NfMjAyNTAyMjctMF9SQzIaBXpoLUNOIAEaBgiAy6O-Bg' def fetch_html(self, q: Query | str, /) -> str: """ Fetch flights HTML with SOCS cookie attached. Args: q: Query object or query string Returns: HTML response from Google Flights """ # Create client with browser impersonation client = primp.Client( impersonate="chrome_145", impersonate_os="macos", cookie_store=True, # Enable cookie persistence ) # Prepare query parameters if isinstance(q, Query): params = q.params() else: params = {"q": q} # Make request with SOCS cookie response = client.get( "https://www.google.com/travel/flights", params=params, cookies={'SOCS': self.SOCS_COOKIE}, headers={ 'Accept-Language': 'en-US,en;q=0.9', } ) return response.text async def search_direct_flights( origin: str, destination: str, date: str, seat_class: str = "economy", adults: int = 1, cache_threshold_hours: int = 24, use_cache: bool = True, progress_callback=None, ) -> list[dict]: """ Search for direct flights between two airports on a specific date. Checks cache first; only queries API if cache miss or expired. Args: origin: Origin airport IATA code destination: Destination airport IATA code date: Departure date in YYYY-MM-DD format seat_class: Cabin class (economy, premium, business, first) adults: Number of passengers cache_threshold_hours: Maximum age of cached results in hours use_cache: Whether to use cache (set False to force fresh query) Returns: List of flight dicts with keys: origin, destination, airline, departure_time, arrival_time, duration_minutes, price, currency, stops """ if not HAS_FAST_FLIGHTS: return [] try: # Check cache first (if enabled) if use_cache and HAS_CACHE: cached = get_cached_results( origin, destination, date, seat_class, adults, cache_threshold_hours ) if cached is not None: if progress_callback: progress_callback(origin, destination, date, "cache_hit", len(cached), flights=cached) return cached # Add random delay to avoid rate limiting await asyncio.sleep(random.uniform(0.5, 1.5)) # Run the search in a thread pool (fast-flights is synchronous) result = await asyncio.to_thread( _search_flights_sync, origin, destination, date, seat_class, adults, ) # Save to cache if use_cache and HAS_CACHE and result: save_results(origin, destination, date, seat_class, adults, result) # Report progress if progress_callback: progress_callback(origin, destination, date, "api_success", len(result), flights=result) return result except Exception as e: # Log but don't crash - return empty results import traceback print(f"\n=== SEARCH ERROR ===") print(f"Query: {origin}→{destination} on {date}") print(f"Error type: {type(e).__name__}") print(f"Error message: {str(e)}") print(f"Traceback:") traceback.print_exc() print("=" * 50) if progress_callback: progress_callback(origin, destination, date, "error", 0, str(e)) return [] def _search_flights_sync( origin: str, destination: str, date: str, seat_class: str, adults: int, ) -> list[dict]: """ Synchronous flight search wrapper for v3 API. Called via asyncio.to_thread to avoid blocking the event loop. """ # Create flight query flights = [ FlightQuery( date=date, from_airport=origin, to_airport=destination, max_stops=0, # Direct flights only ) ] # Create query with passengers and preferences query = create_query( flights=flights, seat=seat_class, trip="one-way", passengers=Passengers(adults=adults), ) # Create SOCS cookie integration cookie_integration = SOCSCookieIntegration() # Execute search with retry try: result = get_flights(query, integration=cookie_integration) except Exception as e: # Retry once after delay time.sleep(2) try: result = get_flights(query, integration=cookie_integration) except Exception as retry_error: # Print detailed error for debugging import traceback print(f"\n=== FAST-FLIGHTS ERROR ===") print(f"Query: {origin}→{destination} on {date}") print(f"Error: {retry_error}") print(f"Traceback:") traceback.print_exc() print("=" * 50) raise retry_error from e # Convert v3 API result to our standard format flights_list = [] try: if isinstance(result, list): for flight_option in result: # Each flight_option has: type, price, airlines, flights, etc. price = getattr(flight_option, 'price', None) airlines = getattr(flight_option, 'airlines', []) flight_segments = getattr(flight_option, 'flights', []) # Validate flight_segments is a non-empty list if not flight_segments or price is None: continue # Handle case where flights attribute exists but is None if not isinstance(flight_segments, list): continue if len(flight_segments) == 0: continue # Get first segment (should be only one for direct flights) segment = flight_segments[0] # Validate segment is not None if segment is None: continue # Extract flight details from_airport = getattr(segment, 'from_airport', None) to_airport = getattr(segment, 'to_airport', None) departure = getattr(segment, 'departure', None) arrival = getattr(segment, 'arrival', None) duration = getattr(segment, 'duration', 0) plane_type = getattr(segment, 'plane_type', '') # Parse departure and arrival times (handle both [H] and [H, M] formats) dep_time = "" arr_time = "" if departure and hasattr(departure, 'time') and isinstance(departure.time, (list, tuple)) and len(departure.time) >= 1: try: hours = departure.time[0] minutes = departure.time[1] if len(departure.time) > 1 else 0 dep_time = f"{hours:02d}:{minutes:02d}" except (IndexError, TypeError, ValueError): dep_time = "" if arrival and hasattr(arrival, 'time') and isinstance(arrival.time, (list, tuple)) and len(arrival.time) >= 1: try: hours = arrival.time[0] minutes = arrival.time[1] if len(arrival.time) > 1 else 0 arr_time = f"{hours:02d}:{minutes:02d}" except (IndexError, TypeError, ValueError): arr_time = "" # Only add flight if we have essential data (price and times) if price and price > 0 and dep_time and arr_time: flight_dict = { "origin": origin, "destination": destination, "airline": airlines[0] if airlines else "Unknown", "departure_time": dep_time, "arrival_time": arr_time, "duration_minutes": duration, "price": price, "currency": "€", # fast-flights typically returns EUR for EU routes "stops": 0, "plane_type": plane_type, } flights_list.append(flight_dict) except Exception as parse_error: # Print detailed parsing error for debugging import traceback print(f"\n=== PARSING ERROR ===") print(f"Query: {origin}→{destination} on {date}") print(f"Error: {parse_error}") print(f"Result type: {type(result)}") print(f"Result: {result}") print(f"Traceback:") traceback.print_exc() print("=" * 50) # Return empty list instead of crashing return [] return flights_list async def search_multiple_routes( routes: list[tuple[str, str, str]], seat_class: str = "economy", adults: int = 1, max_workers: int = 5, cache_threshold_hours: int = 24, use_cache: bool = True, progress_callback=None, ) -> dict[tuple[str, str, str], list[dict]]: """ Search multiple routes concurrently. Checks cache for each route before querying API. Args: routes: List of (origin, destination, date) tuples seat_class: Cabin class adults: Number of passengers max_workers: Maximum concurrent requests cache_threshold_hours: Maximum age of cached results in hours use_cache: Whether to use cache (set False to force fresh queries) Returns: Dict mapping (origin, destination, date) tuples to lists of flight dicts """ # Create semaphore to limit concurrency semaphore = asyncio.Semaphore(max_workers) async def search_with_semaphore(origin: str, destination: str, date: str): async with semaphore: return (origin, destination, date), await search_direct_flights( origin, destination, date, seat_class, adults, cache_threshold_hours, use_cache, progress_callback ) # Execute all searches concurrently (but limited by semaphore) tasks = [ search_with_semaphore(origin, destination, date) for origin, destination, date in routes ] results = await asyncio.gather(*tasks) # Convert to dict return dict(results)