""" Scan Processor - Background worker for flight scans This module processes pending flight scans by: 1. Querying flights using searcher_v3.py (with SOCS cookie integration) 2. Updating scan status and progress in real-time 3. Saving discovered routes to the database Runs as async background tasks within the FastAPI application. """ import asyncio import logging from datetime import datetime, date, timedelta from typing import Dict, List, Optional import json from database import get_connection from airports import get_airports_for_country from searcher_v3 import search_multiple_routes logger = logging.getLogger(__name__) async def process_scan(scan_id: int): """ Process a pending scan by querying flights and saving routes. Args: scan_id: The ID of the scan to process This function: 1. Updates scan status to 'running' 2. Resolves destination airports from country 3. Queries flights for each destination 4. Saves routes to database 5. Updates progress counters in real-time 6. Sets final status to 'completed' or 'failed' """ conn = None try: logger.info(f"[Scan {scan_id}] Starting scan processing") # Get scan details conn = get_connection() cursor = conn.cursor() cursor.execute(""" SELECT origin, country, start_date, end_date, seat_class, adults FROM scans WHERE id = ? """, (scan_id,)) row = cursor.fetchone() if not row: logger.error(f"[Scan {scan_id}] Scan not found in database") return origin, country_or_airports, start_date_str, end_date_str, seat_class, adults = row logger.info(f"[Scan {scan_id}] Scan details: {origin} -> {country_or_airports}, {start_date_str} to {end_date_str}") # Update status to 'running' cursor.execute(""" UPDATE scans SET status = 'running', updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (scan_id,)) conn.commit() # Determine mode: country (2 letters) or specific airports (comma-separated) try: if len(country_or_airports) == 2 and country_or_airports.isalpha(): # Country mode: resolve airports from country code logger.info(f"[Scan {scan_id}] Mode: Country search ({country_or_airports})") destinations = get_airports_for_country(country_or_airports) if not destinations: raise ValueError(f"No airports found for country: {country_or_airports}") destination_codes = [d['iata'] for d in destinations] logger.info(f"[Scan {scan_id}] Found {len(destination_codes)} destination airports: {destination_codes}") else: # Specific airports mode: parse comma-separated list destination_codes = [code.strip() for code in country_or_airports.split(',')] destinations = [] # No pre-fetched airport details; fallback to IATA code as name logger.info(f"[Scan {scan_id}] Mode: Specific airports ({len(destination_codes)} destinations: {destination_codes})") except Exception as e: logger.error(f"[Scan {scan_id}] Failed to resolve airports: {str(e)}") cursor.execute(""" UPDATE scans SET status = 'failed', error_message = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (f"Failed to resolve airports: {str(e)}", scan_id)) conn.commit() return # Note: Don't update total_routes yet - we'll set it after we know the actual number of route queries # Generate dates to scan — every day in the window start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date() end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date() dates = [] current = start_date while current <= end_date: dates.append(current.strftime('%Y-%m-%d')) current += timedelta(days=1) logger.info(f"[Scan {scan_id}] Will scan {len(dates)} dates: {dates}") # Build routes list: [(origin, destination, date), ...] routes_to_scan = [] for dest in destination_codes: for scan_date in dates: routes_to_scan.append((origin, dest, scan_date)) logger.info(f"[Scan {scan_id}] Total route queries: {len(routes_to_scan)}") # Update total_routes with actual number of queries cursor.execute(""" UPDATE scans SET total_routes = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (len(routes_to_scan), scan_id)) conn.commit() # Progress callback to update database # Signature: callback(origin, destination, date, status, count, error=None) routes_scanned_count = 0 def progress_callback(origin: str, destination: str, date: str, status: str, count: int, error: str = None): nonlocal routes_scanned_count # Increment counter for each route query (cache hit or API call) if status in ('cache_hit', 'api_success', 'error'): routes_scanned_count += 1 # Update progress in database try: progress_conn = get_connection() progress_cursor = progress_conn.cursor() progress_cursor.execute(""" UPDATE scans SET routes_scanned = routes_scanned + 1, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (scan_id,)) progress_conn.commit() progress_conn.close() if routes_scanned_count % 10 == 0: # Log every 10 routes logger.info(f"[Scan {scan_id}] Progress: {routes_scanned_count}/{len(routes_to_scan)} routes ({status}: {origin}→{destination})") except Exception as e: logger.error(f"[Scan {scan_id}] Failed to update progress: {str(e)}") # Query flights using searcher_v3 logger.info(f"[Scan {scan_id}] Starting flight queries...") results = await search_multiple_routes( routes=routes_to_scan, seat_class=seat_class or 'economy', adults=adults or 1, use_cache=True, cache_threshold_hours=24, max_workers=3, # Limit concurrency to avoid rate limiting progress_callback=progress_callback ) logger.info(f"[Scan {scan_id}] Flight queries complete. Processing results...") # Group results by destination, preserving date per flight # Structure: {dest: [(flight_dict, date), ...]} routes_by_destination: Dict[str, List] = {} total_flights = 0 for (orig, dest, scan_date), flights in results.items(): if dest not in routes_by_destination: routes_by_destination[dest] = [] for flight in flights: routes_by_destination[dest].append((flight, scan_date)) total_flights += len(flights) logger.info(f"[Scan {scan_id}] Found {total_flights} total flights across {len(routes_by_destination)} destinations") # Save routes and individual flights to database routes_saved = 0 for destination, flight_date_pairs in routes_by_destination.items(): if not flight_date_pairs: continue # Skip destinations with no flights flights = [f for f, _ in flight_date_pairs] # Get destination details (fall back to IATA code if not in DB) dest_info = next((d for d in destinations if d['iata'] == destination), None) dest_name = dest_info.get('name', destination) if dest_info else destination dest_city = dest_info.get('city', '') if dest_info else '' # Calculate statistics prices = [f.get('price') for f in flights if f.get('price')] airlines = list(set(f.get('airline') for f in flights if f.get('airline'))) if not prices: logger.info(f"[Scan {scan_id}] Skipping {destination} - no prices available") continue min_price = min(prices) max_price = max(prices) avg_price = sum(prices) / len(prices) # Insert route summary cursor.execute(""" INSERT INTO routes ( scan_id, destination, destination_name, destination_city, min_price, max_price, avg_price, flight_count, airlines ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( scan_id, destination, dest_name, dest_city, min_price, max_price, avg_price, len(flights), json.dumps(airlines) )) # Insert individual flights for flight, flight_date in flight_date_pairs: if not flight.get('price'): continue cursor.execute(""" INSERT INTO flights ( scan_id, destination, date, airline, departure_time, arrival_time, price, stops ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( scan_id, destination, flight_date, flight.get('airline'), flight.get('departure_time'), flight.get('arrival_time'), flight.get('price'), flight.get('stops', 0), )) routes_saved += 1 conn.commit() # Update scan to completed cursor.execute(""" UPDATE scans SET status = 'completed', total_flights = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (total_flights, scan_id)) conn.commit() logger.info(f"[Scan {scan_id}] ✅ Scan completed successfully! {routes_saved} routes saved with {total_flights} flights") except Exception as e: logger.error(f"[Scan {scan_id}] ❌ Scan failed with error: {str(e)}", exc_info=True) # Update scan to failed try: if conn: cursor = conn.cursor() cursor.execute(""" UPDATE scans SET status = 'failed', error_message = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (str(e), scan_id)) conn.commit() except Exception as update_error: logger.error(f"[Scan {scan_id}] Failed to update error status: {str(update_error)}") finally: if conn: conn.close() def start_scan_processor(scan_id: int): """ Start processing a scan as a background task. Args: scan_id: The ID of the scan to process Returns: asyncio.Task: The background task """ task = asyncio.create_task(process_scan(scan_id)) logger.info(f"[Scan {scan_id}] Background task created") return task