""" Flight search logic with concurrent queries. Wraps fast-flights library with async concurrency and error handling. """ import asyncio import random import time from typing import Optional from datetime import datetime try: from fast_flights import FlightData, Passengers, get_flights HAS_FAST_FLIGHTS = True except ImportError: HAS_FAST_FLIGHTS = False print("⚠️ fast-flights not installed. Install with: pip install fast-flights") async def search_direct_flights( origin: str, destination: str, date: str, seat_class: str = "economy", adults: int = 1, ) -> list[dict]: """ Search for direct flights between two airports on a specific date. Args: origin: Origin airport IATA code destination: Destination airport IATA code date: Departure date in YYYY-MM-DD format seat_class: Cabin class (economy, business, first) adults: Number of passengers Returns: List of flight dicts with keys: origin, destination, airline, departure_time, arrival_time, duration_minutes, price, currency, stops """ if not HAS_FAST_FLIGHTS: return [] try: # Add random delay to avoid rate limiting await asyncio.sleep(random.uniform(0.5, 1.5)) # Run the synchronous get_flights in a thread pool result = await asyncio.to_thread( _search_flights_sync, origin, destination, date, seat_class, adults, ) return result except Exception as e: # Log but don't crash - return empty results print(f"⚠️ {origin}->{destination} on {date}: {type(e).__name__}: {e}") return [] def _search_flights_sync( origin: str, destination: str, date: str, seat_class: str, adults: int, ) -> list[dict]: """ Synchronous flight search wrapper. Called via asyncio.to_thread to avoid blocking the event loop. """ # Map seat class to fast-flights format seat_map = { "economy": "economy", "premium": "premium-economy", "business": "business", "first": "first", } seat_string = seat_map.get(seat_class.lower(), "economy") # Create flight data object flight = FlightData( date=date, from_airport=origin, to_airport=destination, max_stops=0, # Direct flights only ) passengers = Passengers(adults=adults) # Query flights with common mode (tries common first, fallback if needed) try: result = get_flights( flight_data=[flight], # Must be a list trip='one-way', passengers=passengers, seat=seat_string, fetch_mode='common', # Use common mode instead of fallback ) except Exception as e: # Retry once after a delay time.sleep(2) try: result = get_flights( flight_data=[flight], trip='one-way', passengers=passengers, seat=seat_string, fetch_mode='common', ) except Exception as retry_error: raise retry_error from e # Filter to direct flights only and convert to our format flights = [] if not result or not hasattr(result, 'flights'): return flights for flight_option in result.flights: # Check if direct (0 stops) # fast-flights may structure this differently, so we check multiple attributes is_direct = False # Method 1: Check stops attribute if hasattr(flight_option, 'stops') and flight_option.stops == 0: is_direct = True # Method 2: Check if there's only one flight segment if hasattr(flight_option, 'flight') and len(flight_option.flight) == 1: is_direct = True # Method 3: Check if departure and arrival airports match our query # (no layovers in between) if hasattr(flight_option, 'departure_airport') and hasattr(flight_option, 'arrival_airport'): if (flight_option.departure_airport == origin and flight_option.arrival_airport == destination): is_direct = True if not is_direct: continue # Extract flight details flight_dict = { "origin": origin, "destination": destination, "airline": getattr(flight_option, 'airline', 'Unknown'), "departure_time": getattr(flight_option, 'departure_time', ''), "arrival_time": getattr(flight_option, 'arrival_time', ''), "duration_minutes": _parse_duration(getattr(flight_option, 'duration', '')), "price": getattr(flight_option, 'price', 0), "currency": getattr(flight_option, 'currency', 'USD'), "stops": 0, } flights.append(flight_dict) return flights def _parse_duration(duration_str: str) -> int: """ Parse duration string to minutes. Handles formats like "9h 30m", "9h", "90m" Args: duration_str: Duration string Returns: Total duration in minutes """ if not duration_str: return 0 total_minutes = 0 # Extract hours if 'h' in duration_str: try: hours_part = duration_str.split('h')[0].strip() total_minutes += int(hours_part) * 60 except (ValueError, IndexError): pass # Extract minutes if 'm' in duration_str: try: minutes_part = duration_str.split('h')[-1].split('m')[0].strip() total_minutes += int(minutes_part) except (ValueError, IndexError): pass return total_minutes async def search_multiple_routes( routes: list[tuple[str, str, str]], seat_class: str = "economy", adults: int = 1, max_workers: int = 5, ) -> dict[tuple[str, str], list[dict]]: """ Search multiple routes concurrently. Args: routes: List of (origin, destination, date) tuples seat_class: Cabin class adults: Number of passengers max_workers: Maximum concurrent requests Returns: Dict mapping (origin, date) tuples to lists of flight dicts """ # Create semaphore to limit concurrency semaphore = asyncio.Semaphore(max_workers) async def search_with_semaphore(origin: str, destination: str, date: str): async with semaphore: return (origin, date), await search_direct_flights( origin, destination, date, seat_class, adults ) # Execute all searches concurrently (but limited by semaphore) tasks = [ search_with_semaphore(origin, destination, date) for origin, destination, date in routes ] results = await asyncio.gather(*tasks) # Convert to dict return dict(results)