Files
ciaovolo/flight-comparator/scan_processor.py
domverse 77d2a46264
All checks were successful
Deploy / deploy (push) Successful in 30s
feat: implement reverse scan (country → specific airports)
- DB schema: relaxed origin CHECK to >=2 chars, added scan_mode column to
  scans and scheduled_scans, added origin_airport to routes and flights,
  updated unique index to (scan_id, COALESCE(origin_airport,''), destination)
- Migrations: init_db.py recreates tables and adds columns via guarded ALTERs
- API: scan_mode field on ScanRequest/Scan; Route/Flight expose origin_airport;
  GET /scans/{id}/flights accepts origin_airport filter; CreateScheduleRequest
  and Schedule carry scan_mode; scheduler and run-now pass scan_mode through
- scan_processor: _write_route_incremental accepts origin_airport; process_scan
  branches on scan_mode=reverse (country → airports × destinations × dates)
- Frontend: new CountrySelect component (populated from GET /api/v1/countries);
  Scans page adds Direction toggle + CountrySelect for both modes; ScanDetails
  shows Origin column for reverse scans and uses composite route keys; Re-run
  preserves scan_mode

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 17:58:55 +01:00

465 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Scan Processor - Background worker for flight scans
This module processes pending flight scans by:
1. Querying flights using searcher_v3.py (with SOCS cookie integration)
2. Updating scan status and progress in real-time
3. Saving discovered routes to the database
Runs as async background tasks within the FastAPI application.
"""
import asyncio
import logging
from datetime import datetime, date, timedelta
import json
from database import get_connection
from airports import get_airports_for_country, lookup_airport
from searcher_v3 import search_multiple_routes
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# Task registry — tracks running asyncio tasks so they can be cancelled.
# ─────────────────────────────────────────────────────────────────────────────
_running_tasks: dict[int, asyncio.Task] = {}
_cancel_reasons: dict[int, str] = {}
def cancel_scan_task(scan_id: int) -> bool:
"""Cancel the background task for a scan. Returns True if a task was found and cancelled."""
task = _running_tasks.get(scan_id)
if task and not task.done():
task.cancel()
return True
return False
def pause_scan_task(scan_id: int) -> bool:
"""Signal the running task to stop with status='paused'. Returns True if task was found."""
_cancel_reasons[scan_id] = 'paused'
return cancel_scan_task(scan_id)
def stop_scan_task(scan_id: int) -> bool:
"""Signal the running task to stop with status='cancelled'. Returns True if task was found."""
_cancel_reasons[scan_id] = 'cancelled'
return cancel_scan_task(scan_id)
def _write_route_incremental(scan_id: int, destination: str,
dest_name: str, dest_city: str,
new_flights: list, origin_airport: str = None):
"""
Write or update a route row and its individual flight rows immediately.
Called from progress_callback each time a (scan_id, destination, date)
query returns results. Merges into the existing route row if one already
exists, using a running weighted average for avg_price.
For reverse scans, origin_airport is the variable origin IATA code.
For forward scans, origin_airport is None.
Opens its own DB connection — safe to call from the event loop thread.
"""
prices = [f.get('price') for f in new_flights if f.get('price')]
if not prices:
return
new_airlines = list({f.get('airline') for f in new_flights if f.get('airline')})
new_count = len(prices)
new_min = min(prices)
new_max = max(prices)
new_avg = sum(prices) / new_count
try:
conn = get_connection()
cursor = conn.cursor()
# Fetch existing route row (key: scan_id + origin_airport + destination)
if origin_airport is None:
cursor.execute("""
SELECT id, flight_count, min_price, max_price, avg_price, airlines
FROM routes
WHERE scan_id = ? AND origin_airport IS NULL AND destination = ?
""", (scan_id, destination))
else:
cursor.execute("""
SELECT id, flight_count, min_price, max_price, avg_price, airlines
FROM routes
WHERE scan_id = ? AND origin_airport = ? AND destination = ?
""", (scan_id, origin_airport, destination))
existing = cursor.fetchone()
if existing is None:
cursor.execute("""
INSERT INTO routes (
scan_id, origin_airport, destination, destination_name, destination_city,
flight_count, airlines, min_price, max_price, avg_price
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
scan_id, origin_airport, destination, dest_name, dest_city,
new_count, json.dumps(new_airlines),
new_min, new_max, new_avg,
))
else:
old_count = existing['flight_count'] or 0
old_min = existing['min_price']
old_max = existing['max_price']
old_avg = existing['avg_price'] or 0.0
old_airlines = json.loads(existing['airlines']) if existing['airlines'] else []
merged_count = old_count + new_count
merged_min = min(old_min, new_min) if old_min is not None else new_min
merged_max = max(old_max, new_max) if old_max is not None else new_max
merged_avg = (old_avg * old_count + new_avg * new_count) / merged_count
merged_airlines = json.dumps(list(set(old_airlines) | set(new_airlines)))
if origin_airport is None:
cursor.execute("""
UPDATE routes
SET flight_count = ?,
min_price = ?,
max_price = ?,
avg_price = ?,
airlines = ?
WHERE scan_id = ? AND origin_airport IS NULL AND destination = ?
""", (
merged_count, merged_min, merged_max, merged_avg, merged_airlines,
scan_id, destination,
))
else:
cursor.execute("""
UPDATE routes
SET flight_count = ?,
min_price = ?,
max_price = ?,
avg_price = ?,
airlines = ?
WHERE scan_id = ? AND origin_airport = ? AND destination = ?
""", (
merged_count, merged_min, merged_max, merged_avg, merged_airlines,
scan_id, origin_airport, destination,
))
for flight in new_flights:
if not flight.get('price'):
continue
cursor.execute("""
INSERT INTO flights (
scan_id, origin_airport, destination, date, airline,
departure_time, arrival_time, price, stops
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
scan_id,
origin_airport,
destination,
flight.get('date', ''),
flight.get('airline'),
flight.get('departure_time'),
flight.get('arrival_time'),
flight.get('price'),
flight.get('stops', 0),
))
conn.commit()
conn.close()
except Exception as e:
logger.error(f"[Scan {scan_id}] Failed to write incremental route {destination}: {e}")
async def process_scan(scan_id: int):
"""
Process a pending scan by querying flights and saving routes.
Args:
scan_id: The ID of the scan to process
This function:
1. Updates scan status to 'running'
2. Resolves destination airports from country
3. Queries flights for each destination
4. Saves routes to database
5. Updates progress counters in real-time
6. Sets final status to 'completed' or 'failed'
"""
conn = None
try:
logger.info(f"[Scan {scan_id}] Starting scan processing")
# Get scan details
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT origin, country, scan_mode, start_date, end_date, seat_class, adults
FROM scans
WHERE id = ?
""", (scan_id,))
row = cursor.fetchone()
if not row:
logger.error(f"[Scan {scan_id}] Scan not found in database")
return
origin, country_or_airports, scan_mode, start_date_str, end_date_str, seat_class, adults = row
scan_mode = scan_mode or 'forward'
logger.info(f"[Scan {scan_id}] Scan details: mode={scan_mode}, {origin} -> {country_or_airports}, {start_date_str} to {end_date_str}")
# Update status to 'running' and record when processing started
cursor.execute("""
UPDATE scans
SET status = 'running', started_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (scan_id,))
conn.commit()
# Resolve airports based on scan_mode
try:
if scan_mode == 'reverse':
# Reverse scan: origin = ISO country, country_or_airports = comma-separated dest IATAs
logger.info(f"[Scan {scan_id}] Mode: Reverse scan ({origin} country → {country_or_airports})")
origin_airports = get_airports_for_country(origin)
if not origin_airports:
raise ValueError(f"No airports found for origin country: {origin}")
origin_iatas = [a['iata'] for a in origin_airports]
destination_codes = [code.strip() for code in country_or_airports.split(',')]
dest_infos = {
code: lookup_airport(code) or {'iata': code, 'name': code, 'city': ''}
for code in destination_codes
}
logger.info(f"[Scan {scan_id}] {len(origin_iatas)} origins × {len(destination_codes)} destinations")
else:
# Forward scan: origin = fixed IATA, country_or_airports = country code or dest IATAs
if len(country_or_airports) == 2 and country_or_airports.isalpha():
logger.info(f"[Scan {scan_id}] Mode: Forward country search ({country_or_airports})")
dest_list = get_airports_for_country(country_or_airports)
if not dest_list:
raise ValueError(f"No airports found for country: {country_or_airports}")
destination_codes = [d['iata'] for d in dest_list]
dest_infos = {d['iata']: d for d in dest_list}
else:
destination_codes = [code.strip() for code in country_or_airports.split(',')]
dest_infos = {
code: lookup_airport(code) or {'iata': code, 'name': code, 'city': ''}
for code in destination_codes
}
logger.info(f"[Scan {scan_id}] Mode: Forward specific airports ({destination_codes})")
except Exception as e:
logger.error(f"[Scan {scan_id}] Failed to resolve airports: {str(e)}")
cursor.execute("""
UPDATE scans
SET status = 'failed',
error_message = ?,
completed_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (f"Failed to resolve airports: {str(e)}", scan_id))
conn.commit()
return
# Generate dates to scan — every day in the window
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
dates = []
current = start_date
while current <= end_date:
dates.append(current.strftime('%Y-%m-%d'))
current += timedelta(days=1)
logger.info(f"[Scan {scan_id}] Will scan {len(dates)} dates: {dates}")
# Build routes list: [(origin_iata, destination_iata, date), ...]
routes_to_scan = []
if scan_mode == 'reverse':
for orig_iata in origin_iatas:
for dest_code in destination_codes:
for scan_date in dates:
routes_to_scan.append((orig_iata, dest_code, scan_date))
else:
for dest_code in destination_codes:
for scan_date in dates:
routes_to_scan.append((origin, dest_code, scan_date))
logger.info(f"[Scan {scan_id}] Total route queries: {len(routes_to_scan)}")
# Update total_routes with actual number of queries
cursor.execute("""
UPDATE scans
SET total_routes = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (len(routes_to_scan), scan_id))
conn.commit()
# Progress callback — updates DB progress counter and writes routes live
# Signature: callback(origin, destination, date, status, count, error=None, flights=None)
routes_scanned_count = 0
def progress_callback(cb_origin: str, destination: str, date: str,
status: str, count: int, error: str = None,
flights: list = None):
nonlocal routes_scanned_count
if status in ('cache_hit', 'api_success', 'error'):
routes_scanned_count += 1
# Write route + flights to DB immediately if results available
if flights and status in ('cache_hit', 'api_success'):
for f in flights:
f['date'] = date
dest_info = dest_infos.get(destination) or {'iata': destination, 'name': destination, 'city': ''}
dest_name = dest_info.get('name', destination)
dest_city = dest_info.get('city', '')
# For reverse scans, cb_origin is the variable origin airport IATA
route_origin = cb_origin if scan_mode == 'reverse' else None
_write_route_incremental(
scan_id, destination, dest_name, dest_city, flights,
origin_airport=route_origin
)
# Update progress counter
try:
progress_conn = get_connection()
progress_cursor = progress_conn.cursor()
progress_cursor.execute("""
UPDATE scans
SET routes_scanned = routes_scanned + 1,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (scan_id,))
progress_conn.commit()
progress_conn.close()
if routes_scanned_count % 10 == 0:
logger.info(f"[Scan {scan_id}] Progress: {routes_scanned_count}/{len(routes_to_scan)} routes ({status}: {cb_origin}{destination})")
except Exception as e:
logger.error(f"[Scan {scan_id}] Failed to update progress: {str(e)}")
# Query flights using searcher_v3
logger.info(f"[Scan {scan_id}] Starting flight queries...")
results = await search_multiple_routes(
routes=routes_to_scan,
seat_class=seat_class or 'economy',
adults=adults or 1,
use_cache=True,
cache_threshold_hours=24,
max_workers=3, # Limit concurrency to avoid rate limiting
progress_callback=progress_callback
)
logger.info(f"[Scan {scan_id}] Flight queries complete.")
# Routes and flights were written incrementally by progress_callback.
routes_saved = cursor.execute(
"SELECT COUNT(*) FROM routes WHERE scan_id = ?", (scan_id,)
).fetchone()[0]
total_flights_saved = cursor.execute(
"SELECT COALESCE(SUM(flight_count), 0) FROM routes WHERE scan_id = ?", (scan_id,)
).fetchone()[0]
# Update scan to completed and record finish time
cursor.execute("""
UPDATE scans
SET status = 'completed',
total_flights = ?,
completed_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (total_flights_saved, scan_id))
conn.commit()
logger.info(f"[Scan {scan_id}] ✅ Scan completed successfully! {routes_saved} routes saved with {total_flights_saved} flights")
except asyncio.CancelledError:
reason = _cancel_reasons.pop(scan_id, 'cancelled')
logger.info(f"[Scan {scan_id}] Scan {reason} by user request")
try:
if conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE scans
SET status = ?,
completed_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (reason, scan_id))
conn.commit()
except Exception as update_error:
logger.error(f"[Scan {scan_id}] Failed to update {reason} status: {str(update_error)}")
raise # must re-raise so asyncio marks the task as cancelled
except Exception as e:
logger.error(f"[Scan {scan_id}] ❌ Scan failed with error: {str(e)}", exc_info=True)
# Update scan to failed
try:
if conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE scans
SET status = 'failed',
error_message = ?,
completed_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (str(e), scan_id))
conn.commit()
except Exception as update_error:
logger.error(f"[Scan {scan_id}] Failed to update error status: {str(update_error)}")
finally:
if conn:
conn.close()
def start_scan_processor(scan_id: int):
"""
Start processing a scan as a background task.
Args:
scan_id: The ID of the scan to process
Returns:
asyncio.Task: The background task
"""
task = asyncio.create_task(process_scan(scan_id))
_running_tasks[scan_id] = task
task.add_done_callback(lambda _: _running_tasks.pop(scan_id, None))
logger.info(f"[Scan {scan_id}] Background task created")
return task
def start_resume_processor(scan_id: int):
"""
Resume processing a paused scan as a background task.
The API endpoint has already reset status to 'pending' and cleared counters.
process_scan() will transition the status to 'running' and re-run all routes,
getting instant cache hits for already-queried routes.
Args:
scan_id: The ID of the paused scan to resume
Returns:
asyncio.Task: The background task
"""
task = asyncio.create_task(process_scan(scan_id))
_running_tasks[scan_id] = task
task.add_done_callback(lambda _: _running_tasks.pop(scan_id, None))
logger.info(f"[Scan {scan_id}] Resume task created")
return task