""" Scan Processor - Background worker for flight scans This module processes pending flight scans by: 1. Querying flights using searcher_v3.py (with SOCS cookie integration) 2. Updating scan status and progress in real-time 3. Saving discovered routes to the database Runs as async background tasks within the FastAPI application. """ import asyncio import logging from datetime import datetime, date, timedelta import json from database import get_connection from airports import get_airports_for_country, lookup_airport from searcher_v3 import search_multiple_routes logger = logging.getLogger(__name__) # ───────────────────────────────────────────────────────────────────────────── # Task registry — tracks running asyncio tasks so they can be cancelled. # ───────────────────────────────────────────────────────────────────────────── _running_tasks: dict[int, asyncio.Task] = {} _cancel_reasons: dict[int, str] = {} def cancel_scan_task(scan_id: int) -> bool: """Cancel the background task for a scan. Returns True if a task was found and cancelled.""" task = _running_tasks.get(scan_id) if task and not task.done(): task.cancel() return True return False def pause_scan_task(scan_id: int) -> bool: """Signal the running task to stop with status='paused'. Returns True if task was found.""" _cancel_reasons[scan_id] = 'paused' return cancel_scan_task(scan_id) def stop_scan_task(scan_id: int) -> bool: """Signal the running task to stop with status='cancelled'. Returns True if task was found.""" _cancel_reasons[scan_id] = 'cancelled' return cancel_scan_task(scan_id) def _write_route_incremental(scan_id: int, destination: str, dest_name: str, dest_city: str, new_flights: list, origin_airport: str = None): """ Write or update a route row and its individual flight rows immediately. Called from progress_callback each time a (scan_id, destination, date) query returns results. Merges into the existing route row if one already exists, using a running weighted average for avg_price. For reverse scans, origin_airport is the variable origin IATA code. For forward scans, origin_airport is None. Opens its own DB connection — safe to call from the event loop thread. """ prices = [f.get('price') for f in new_flights if f.get('price')] if not prices: return new_airlines = list({f.get('airline') for f in new_flights if f.get('airline')}) new_count = len(prices) new_min = min(prices) new_max = max(prices) new_avg = sum(prices) / new_count try: conn = get_connection() cursor = conn.cursor() # Fetch existing route row (key: scan_id + origin_airport + destination) if origin_airport is None: cursor.execute(""" SELECT id, flight_count, min_price, max_price, avg_price, airlines FROM routes WHERE scan_id = ? AND origin_airport IS NULL AND destination = ? """, (scan_id, destination)) else: cursor.execute(""" SELECT id, flight_count, min_price, max_price, avg_price, airlines FROM routes WHERE scan_id = ? AND origin_airport = ? AND destination = ? """, (scan_id, origin_airport, destination)) existing = cursor.fetchone() if existing is None: cursor.execute(""" INSERT INTO routes ( scan_id, origin_airport, destination, destination_name, destination_city, flight_count, airlines, min_price, max_price, avg_price ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( scan_id, origin_airport, destination, dest_name, dest_city, new_count, json.dumps(new_airlines), new_min, new_max, new_avg, )) else: old_count = existing['flight_count'] or 0 old_min = existing['min_price'] old_max = existing['max_price'] old_avg = existing['avg_price'] or 0.0 old_airlines = json.loads(existing['airlines']) if existing['airlines'] else [] merged_count = old_count + new_count merged_min = min(old_min, new_min) if old_min is not None else new_min merged_max = max(old_max, new_max) if old_max is not None else new_max merged_avg = (old_avg * old_count + new_avg * new_count) / merged_count merged_airlines = json.dumps(list(set(old_airlines) | set(new_airlines))) if origin_airport is None: cursor.execute(""" UPDATE routes SET flight_count = ?, min_price = ?, max_price = ?, avg_price = ?, airlines = ? WHERE scan_id = ? AND origin_airport IS NULL AND destination = ? """, ( merged_count, merged_min, merged_max, merged_avg, merged_airlines, scan_id, destination, )) else: cursor.execute(""" UPDATE routes SET flight_count = ?, min_price = ?, max_price = ?, avg_price = ?, airlines = ? WHERE scan_id = ? AND origin_airport = ? AND destination = ? """, ( merged_count, merged_min, merged_max, merged_avg, merged_airlines, scan_id, origin_airport, destination, )) for flight in new_flights: if not flight.get('price'): continue cursor.execute(""" INSERT INTO flights ( scan_id, origin_airport, destination, date, airline, departure_time, arrival_time, price, stops ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( scan_id, origin_airport, destination, flight.get('date', ''), flight.get('airline'), flight.get('departure_time'), flight.get('arrival_time'), flight.get('price'), flight.get('stops', 0), )) conn.commit() conn.close() except Exception as e: logger.error(f"[Scan {scan_id}] Failed to write incremental route {destination}: {e}") async def process_scan(scan_id: int): """ Process a pending scan by querying flights and saving routes. Args: scan_id: The ID of the scan to process This function: 1. Updates scan status to 'running' 2. Resolves destination airports from country 3. Queries flights for each destination 4. Saves routes to database 5. Updates progress counters in real-time 6. Sets final status to 'completed' or 'failed' """ conn = None try: logger.info(f"[Scan {scan_id}] Starting scan processing") # Get scan details conn = get_connection() cursor = conn.cursor() cursor.execute(""" SELECT origin, country, scan_mode, start_date, end_date, seat_class, adults FROM scans WHERE id = ? """, (scan_id,)) row = cursor.fetchone() if not row: logger.error(f"[Scan {scan_id}] Scan not found in database") return origin, country_or_airports, scan_mode, start_date_str, end_date_str, seat_class, adults = row scan_mode = scan_mode or 'forward' logger.info(f"[Scan {scan_id}] Scan details: mode={scan_mode}, {origin} -> {country_or_airports}, {start_date_str} to {end_date_str}") # Update status to 'running' and record when processing started cursor.execute(""" UPDATE scans SET status = 'running', started_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (scan_id,)) conn.commit() # Resolve airports based on scan_mode try: if scan_mode == 'reverse': # Reverse scan: origin = ISO country, country_or_airports = comma-separated dest IATAs logger.info(f"[Scan {scan_id}] Mode: Reverse scan ({origin} country → {country_or_airports})") origin_airports = get_airports_for_country(origin) if not origin_airports: raise ValueError(f"No airports found for origin country: {origin}") origin_iatas = [a['iata'] for a in origin_airports] destination_codes = [code.strip() for code in country_or_airports.split(',')] dest_infos = { code: lookup_airport(code) or {'iata': code, 'name': code, 'city': ''} for code in destination_codes } logger.info(f"[Scan {scan_id}] {len(origin_iatas)} origins × {len(destination_codes)} destinations") else: # Forward scan: origin = fixed IATA, country_or_airports = country code or dest IATAs if len(country_or_airports) == 2 and country_or_airports.isalpha(): logger.info(f"[Scan {scan_id}] Mode: Forward country search ({country_or_airports})") dest_list = get_airports_for_country(country_or_airports) if not dest_list: raise ValueError(f"No airports found for country: {country_or_airports}") destination_codes = [d['iata'] for d in dest_list] dest_infos = {d['iata']: d for d in dest_list} else: destination_codes = [code.strip() for code in country_or_airports.split(',')] dest_infos = { code: lookup_airport(code) or {'iata': code, 'name': code, 'city': ''} for code in destination_codes } logger.info(f"[Scan {scan_id}] Mode: Forward specific airports ({destination_codes})") except Exception as e: logger.error(f"[Scan {scan_id}] Failed to resolve airports: {str(e)}") cursor.execute(""" UPDATE scans SET status = 'failed', error_message = ?, completed_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (f"Failed to resolve airports: {str(e)}", scan_id)) conn.commit() return # Generate dates to scan — every day in the window start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date() end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date() dates = [] current = start_date while current <= end_date: dates.append(current.strftime('%Y-%m-%d')) current += timedelta(days=1) logger.info(f"[Scan {scan_id}] Will scan {len(dates)} dates: {dates}") # Build routes list: [(origin_iata, destination_iata, date), ...] routes_to_scan = [] if scan_mode == 'reverse': for orig_iata in origin_iatas: for dest_code in destination_codes: for scan_date in dates: routes_to_scan.append((orig_iata, dest_code, scan_date)) else: for dest_code in destination_codes: for scan_date in dates: routes_to_scan.append((origin, dest_code, scan_date)) logger.info(f"[Scan {scan_id}] Total route queries: {len(routes_to_scan)}") # Update total_routes with actual number of queries cursor.execute(""" UPDATE scans SET total_routes = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (len(routes_to_scan), scan_id)) conn.commit() # Progress callback — updates DB progress counter and writes routes live # Signature: callback(origin, destination, date, status, count, error=None, flights=None) routes_scanned_count = 0 def progress_callback(cb_origin: str, destination: str, date: str, status: str, count: int, error: str = None, flights: list = None): nonlocal routes_scanned_count if status in ('cache_hit', 'api_success', 'error'): routes_scanned_count += 1 # Write route + flights to DB immediately if results available if flights and status in ('cache_hit', 'api_success'): for f in flights: f['date'] = date dest_info = dest_infos.get(destination) or {'iata': destination, 'name': destination, 'city': ''} dest_name = dest_info.get('name', destination) dest_city = dest_info.get('city', '') # For reverse scans, cb_origin is the variable origin airport IATA route_origin = cb_origin if scan_mode == 'reverse' else None _write_route_incremental( scan_id, destination, dest_name, dest_city, flights, origin_airport=route_origin ) # Update progress counter try: progress_conn = get_connection() progress_cursor = progress_conn.cursor() progress_cursor.execute(""" UPDATE scans SET routes_scanned = routes_scanned + 1, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (scan_id,)) progress_conn.commit() progress_conn.close() if routes_scanned_count % 10 == 0: logger.info(f"[Scan {scan_id}] Progress: {routes_scanned_count}/{len(routes_to_scan)} routes ({status}: {cb_origin}→{destination})") except Exception as e: logger.error(f"[Scan {scan_id}] Failed to update progress: {str(e)}") # Query flights using searcher_v3 logger.info(f"[Scan {scan_id}] Starting flight queries...") results = await search_multiple_routes( routes=routes_to_scan, seat_class=seat_class or 'economy', adults=adults or 1, use_cache=True, cache_threshold_hours=24, max_workers=3, # Limit concurrency to avoid rate limiting progress_callback=progress_callback ) logger.info(f"[Scan {scan_id}] Flight queries complete.") # Routes and flights were written incrementally by progress_callback. routes_saved = cursor.execute( "SELECT COUNT(*) FROM routes WHERE scan_id = ?", (scan_id,) ).fetchone()[0] total_flights_saved = cursor.execute( "SELECT COALESCE(SUM(flight_count), 0) FROM routes WHERE scan_id = ?", (scan_id,) ).fetchone()[0] # Update scan to completed and record finish time cursor.execute(""" UPDATE scans SET status = 'completed', total_flights = ?, completed_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (total_flights_saved, scan_id)) conn.commit() logger.info(f"[Scan {scan_id}] ✅ Scan completed successfully! {routes_saved} routes saved with {total_flights_saved} flights") except asyncio.CancelledError: reason = _cancel_reasons.pop(scan_id, 'cancelled') logger.info(f"[Scan {scan_id}] Scan {reason} by user request") try: if conn: cursor = conn.cursor() cursor.execute(""" UPDATE scans SET status = ?, completed_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (reason, scan_id)) conn.commit() except Exception as update_error: logger.error(f"[Scan {scan_id}] Failed to update {reason} status: {str(update_error)}") raise # must re-raise so asyncio marks the task as cancelled except Exception as e: logger.error(f"[Scan {scan_id}] ❌ Scan failed with error: {str(e)}", exc_info=True) # Update scan to failed try: if conn: cursor = conn.cursor() cursor.execute(""" UPDATE scans SET status = 'failed', error_message = ?, completed_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (str(e), scan_id)) conn.commit() except Exception as update_error: logger.error(f"[Scan {scan_id}] Failed to update error status: {str(update_error)}") finally: if conn: conn.close() def start_scan_processor(scan_id: int): """ Start processing a scan as a background task. Args: scan_id: The ID of the scan to process Returns: asyncio.Task: The background task """ task = asyncio.create_task(process_scan(scan_id)) _running_tasks[scan_id] = task task.add_done_callback(lambda _: _running_tasks.pop(scan_id, None)) logger.info(f"[Scan {scan_id}] Background task created") return task def start_resume_processor(scan_id: int): """ Resume processing a paused scan as a background task. The API endpoint has already reset status to 'pending' and cleared counters. process_scan() will transition the status to 'running' and re-run all routes, getting instant cache hits for already-queried routes. Args: scan_id: The ID of the paused scan to resume Returns: asyncio.Task: The background task """ task = asyncio.create_task(process_scan(scan_id)) _running_tasks[scan_id] = task task.add_done_callback(lambda _: _running_tasks.pop(scan_id, None)) logger.info(f"[Scan {scan_id}] Resume task created") return task