Some checks failed
Deploy / deploy (push) Failing after 18s
Users running large scans can now pause (keep partial results, resume
later), cancel (stop permanently, partial results preserved), or resume
a paused scan which races through cache hits before continuing.
Backend:
- Extend scans.status CHECK to include 'paused' and 'cancelled'
- Add _migrate_add_pause_cancel_status() table-recreation migration
- scan_processor: _running_tasks/_cancel_reasons registries,
cancel_scan_task/pause_scan_task/stop_scan_task helpers,
CancelledError handler in process_scan(), start_resume_processor()
- api_server: POST /scans/{id}/pause|cancel|resume endpoints with
rate limits (30/min pause+cancel, 10/min resume); list_scans now
accepts paused/cancelled as status filter values
Frontend:
- Scan.status type extended with 'paused' | 'cancelled'
- scanApi.pause/cancel/resume added
- StatusChip: amber PauseCircle chip for paused, grey Ban for cancelled
- ScanDetails: context-aware action row with inline-confirm for
Pause and Cancel; Resume button for paused scans
Tests: 129 total (58 new) across test_scan_control.py,
test_scan_processor_control.py, and additions to existing suites
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
417 lines
16 KiB
Python
417 lines
16 KiB
Python
"""
|
|
Scan Processor - Background worker for flight scans
|
|
|
|
This module processes pending flight scans by:
|
|
1. Querying flights using searcher_v3.py (with SOCS cookie integration)
|
|
2. Updating scan status and progress in real-time
|
|
3. Saving discovered routes to the database
|
|
|
|
Runs as async background tasks within the FastAPI application.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, date, timedelta
|
|
import json
|
|
|
|
from database import get_connection
|
|
from airports import get_airports_for_country, lookup_airport
|
|
from searcher_v3 import search_multiple_routes
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
# Task registry — tracks running asyncio tasks so they can be cancelled.
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
|
|
|
_running_tasks: dict[int, asyncio.Task] = {}
|
|
_cancel_reasons: dict[int, str] = {}
|
|
|
|
|
|
def cancel_scan_task(scan_id: int) -> bool:
|
|
"""Cancel the background task for a scan. Returns True if a task was found and cancelled."""
|
|
task = _running_tasks.get(scan_id)
|
|
if task and not task.done():
|
|
task.cancel()
|
|
return True
|
|
return False
|
|
|
|
|
|
def pause_scan_task(scan_id: int) -> bool:
|
|
"""Signal the running task to stop with status='paused'. Returns True if task was found."""
|
|
_cancel_reasons[scan_id] = 'paused'
|
|
return cancel_scan_task(scan_id)
|
|
|
|
|
|
def stop_scan_task(scan_id: int) -> bool:
|
|
"""Signal the running task to stop with status='cancelled'. Returns True if task was found."""
|
|
_cancel_reasons[scan_id] = 'cancelled'
|
|
return cancel_scan_task(scan_id)
|
|
|
|
|
|
def _write_route_incremental(scan_id: int, destination: str,
|
|
dest_name: str, dest_city: str,
|
|
new_flights: list):
|
|
"""
|
|
Write or update a route row and its individual flight rows immediately.
|
|
|
|
Called from progress_callback each time a (scan_id, destination, date)
|
|
query returns results. Merges into the existing route row if one already
|
|
exists, using a running weighted average for avg_price.
|
|
|
|
Opens its own DB connection — safe to call from the event loop thread.
|
|
"""
|
|
prices = [f.get('price') for f in new_flights if f.get('price')]
|
|
if not prices:
|
|
return
|
|
|
|
new_airlines = list({f.get('airline') for f in new_flights if f.get('airline')})
|
|
new_count = len(prices)
|
|
new_min = min(prices)
|
|
new_max = max(prices)
|
|
new_avg = sum(prices) / new_count
|
|
|
|
try:
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT id, flight_count, min_price, max_price, avg_price, airlines
|
|
FROM routes
|
|
WHERE scan_id = ? AND destination = ?
|
|
""", (scan_id, destination))
|
|
existing = cursor.fetchone()
|
|
|
|
if existing is None:
|
|
cursor.execute("""
|
|
INSERT INTO routes (
|
|
scan_id, destination, destination_name, destination_city,
|
|
flight_count, airlines, min_price, max_price, avg_price
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
scan_id, destination, dest_name, dest_city,
|
|
new_count, json.dumps(new_airlines),
|
|
new_min, new_max, new_avg,
|
|
))
|
|
else:
|
|
old_count = existing['flight_count'] or 0
|
|
old_min = existing['min_price']
|
|
old_max = existing['max_price']
|
|
old_avg = existing['avg_price'] or 0.0
|
|
old_airlines = json.loads(existing['airlines']) if existing['airlines'] else []
|
|
|
|
merged_count = old_count + new_count
|
|
merged_min = min(old_min, new_min) if old_min is not None else new_min
|
|
merged_max = max(old_max, new_max) if old_max is not None else new_max
|
|
merged_avg = (old_avg * old_count + new_avg * new_count) / merged_count
|
|
merged_airlines = json.dumps(list(set(old_airlines) | set(new_airlines)))
|
|
|
|
cursor.execute("""
|
|
UPDATE routes
|
|
SET flight_count = ?,
|
|
min_price = ?,
|
|
max_price = ?,
|
|
avg_price = ?,
|
|
airlines = ?
|
|
WHERE scan_id = ? AND destination = ?
|
|
""", (
|
|
merged_count, merged_min, merged_max, merged_avg, merged_airlines,
|
|
scan_id, destination,
|
|
))
|
|
|
|
for flight in new_flights:
|
|
if not flight.get('price'):
|
|
continue
|
|
cursor.execute("""
|
|
INSERT INTO flights (
|
|
scan_id, destination, date, airline,
|
|
departure_time, arrival_time, price, stops
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
scan_id,
|
|
destination,
|
|
flight.get('date', ''),
|
|
flight.get('airline'),
|
|
flight.get('departure_time'),
|
|
flight.get('arrival_time'),
|
|
flight.get('price'),
|
|
flight.get('stops', 0),
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"[Scan {scan_id}] Failed to write incremental route {destination}: {e}")
|
|
|
|
|
|
async def process_scan(scan_id: int):
|
|
"""
|
|
Process a pending scan by querying flights and saving routes.
|
|
|
|
Args:
|
|
scan_id: The ID of the scan to process
|
|
|
|
This function:
|
|
1. Updates scan status to 'running'
|
|
2. Resolves destination airports from country
|
|
3. Queries flights for each destination
|
|
4. Saves routes to database
|
|
5. Updates progress counters in real-time
|
|
6. Sets final status to 'completed' or 'failed'
|
|
"""
|
|
conn = None
|
|
try:
|
|
logger.info(f"[Scan {scan_id}] Starting scan processing")
|
|
|
|
# Get scan details
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT origin, country, start_date, end_date, seat_class, adults
|
|
FROM scans
|
|
WHERE id = ?
|
|
""", (scan_id,))
|
|
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
logger.error(f"[Scan {scan_id}] Scan not found in database")
|
|
return
|
|
|
|
origin, country_or_airports, start_date_str, end_date_str, seat_class, adults = row
|
|
|
|
logger.info(f"[Scan {scan_id}] Scan details: {origin} -> {country_or_airports}, {start_date_str} to {end_date_str}")
|
|
|
|
# Update status to 'running' and record when processing started
|
|
cursor.execute("""
|
|
UPDATE scans
|
|
SET status = 'running', started_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""", (scan_id,))
|
|
conn.commit()
|
|
|
|
# Determine mode: country (2 letters) or specific airports (comma-separated)
|
|
try:
|
|
if len(country_or_airports) == 2 and country_or_airports.isalpha():
|
|
# Country mode: resolve airports from country code
|
|
logger.info(f"[Scan {scan_id}] Mode: Country search ({country_or_airports})")
|
|
destinations = get_airports_for_country(country_or_airports)
|
|
if not destinations:
|
|
raise ValueError(f"No airports found for country: {country_or_airports}")
|
|
|
|
destination_codes = [d['iata'] for d in destinations]
|
|
|
|
logger.info(f"[Scan {scan_id}] Found {len(destination_codes)} destination airports: {destination_codes}")
|
|
|
|
else:
|
|
# Specific airports mode: parse comma-separated list
|
|
destination_codes = [code.strip() for code in country_or_airports.split(',')]
|
|
destinations = [
|
|
lookup_airport(code) or {'iata': code, 'name': code, 'city': ''}
|
|
for code in destination_codes
|
|
]
|
|
logger.info(f"[Scan {scan_id}] Mode: Specific airports ({len(destination_codes)} destinations: {destination_codes})")
|
|
|
|
except Exception as e:
|
|
logger.error(f"[Scan {scan_id}] Failed to resolve airports: {str(e)}")
|
|
cursor.execute("""
|
|
UPDATE scans
|
|
SET status = 'failed',
|
|
error_message = ?,
|
|
completed_at = CURRENT_TIMESTAMP,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""", (f"Failed to resolve airports: {str(e)}", scan_id))
|
|
conn.commit()
|
|
return
|
|
|
|
# Note: Don't update total_routes yet - we'll set it after we know the actual number of route queries
|
|
|
|
# Generate dates to scan — every day in the window
|
|
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
|
|
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
|
|
|
|
dates = []
|
|
current = start_date
|
|
while current <= end_date:
|
|
dates.append(current.strftime('%Y-%m-%d'))
|
|
current += timedelta(days=1)
|
|
|
|
logger.info(f"[Scan {scan_id}] Will scan {len(dates)} dates: {dates}")
|
|
|
|
# Build routes list: [(origin, destination, date), ...]
|
|
routes_to_scan = []
|
|
for dest in destination_codes:
|
|
for scan_date in dates:
|
|
routes_to_scan.append((origin, dest, scan_date))
|
|
|
|
logger.info(f"[Scan {scan_id}] Total route queries: {len(routes_to_scan)}")
|
|
|
|
# Update total_routes with actual number of queries
|
|
cursor.execute("""
|
|
UPDATE scans
|
|
SET total_routes = ?,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""", (len(routes_to_scan), scan_id))
|
|
conn.commit()
|
|
|
|
# Progress callback — updates DB progress counter and writes routes live
|
|
# Signature: callback(origin, destination, date, status, count, error=None, flights=None)
|
|
routes_scanned_count = 0
|
|
|
|
def progress_callback(origin: str, destination: str, date: str,
|
|
status: str, count: int, error: str = None,
|
|
flights: list = None):
|
|
nonlocal routes_scanned_count
|
|
|
|
if status in ('cache_hit', 'api_success', 'error'):
|
|
routes_scanned_count += 1
|
|
|
|
# Write route + flights to DB immediately if results available
|
|
if flights and status in ('cache_hit', 'api_success'):
|
|
for f in flights:
|
|
f['date'] = date
|
|
dest_info = next((d for d in destinations if d['iata'] == destination), None)
|
|
dest_name = dest_info.get('name', destination) if dest_info else destination
|
|
dest_city = dest_info.get('city', '') if dest_info else ''
|
|
_write_route_incremental(scan_id, destination, dest_name, dest_city, flights)
|
|
|
|
# Update progress counter
|
|
try:
|
|
progress_conn = get_connection()
|
|
progress_cursor = progress_conn.cursor()
|
|
|
|
progress_cursor.execute("""
|
|
UPDATE scans
|
|
SET routes_scanned = routes_scanned + 1,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""", (scan_id,))
|
|
|
|
progress_conn.commit()
|
|
progress_conn.close()
|
|
|
|
if routes_scanned_count % 10 == 0:
|
|
logger.info(f"[Scan {scan_id}] Progress: {routes_scanned_count}/{len(routes_to_scan)} routes ({status}: {origin}→{destination})")
|
|
|
|
except Exception as e:
|
|
logger.error(f"[Scan {scan_id}] Failed to update progress: {str(e)}")
|
|
|
|
# Query flights using searcher_v3
|
|
logger.info(f"[Scan {scan_id}] Starting flight queries...")
|
|
|
|
results = await search_multiple_routes(
|
|
routes=routes_to_scan,
|
|
seat_class=seat_class or 'economy',
|
|
adults=adults or 1,
|
|
use_cache=True,
|
|
cache_threshold_hours=24,
|
|
max_workers=3, # Limit concurrency to avoid rate limiting
|
|
progress_callback=progress_callback
|
|
)
|
|
|
|
logger.info(f"[Scan {scan_id}] Flight queries complete.")
|
|
|
|
# Routes and flights were written incrementally by progress_callback.
|
|
routes_saved = cursor.execute(
|
|
"SELECT COUNT(*) FROM routes WHERE scan_id = ?", (scan_id,)
|
|
).fetchone()[0]
|
|
total_flights_saved = cursor.execute(
|
|
"SELECT COALESCE(SUM(flight_count), 0) FROM routes WHERE scan_id = ?", (scan_id,)
|
|
).fetchone()[0]
|
|
|
|
# Update scan to completed and record finish time
|
|
cursor.execute("""
|
|
UPDATE scans
|
|
SET status = 'completed',
|
|
total_flights = ?,
|
|
completed_at = CURRENT_TIMESTAMP,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""", (total_flights_saved, scan_id))
|
|
conn.commit()
|
|
|
|
logger.info(f"[Scan {scan_id}] ✅ Scan completed successfully! {routes_saved} routes saved with {total_flights_saved} flights")
|
|
|
|
except asyncio.CancelledError:
|
|
reason = _cancel_reasons.pop(scan_id, 'cancelled')
|
|
logger.info(f"[Scan {scan_id}] Scan {reason} by user request")
|
|
try:
|
|
if conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
UPDATE scans
|
|
SET status = ?,
|
|
completed_at = CURRENT_TIMESTAMP,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""", (reason, scan_id))
|
|
conn.commit()
|
|
except Exception as update_error:
|
|
logger.error(f"[Scan {scan_id}] Failed to update {reason} status: {str(update_error)}")
|
|
raise # must re-raise so asyncio marks the task as cancelled
|
|
|
|
except Exception as e:
|
|
logger.error(f"[Scan {scan_id}] ❌ Scan failed with error: {str(e)}", exc_info=True)
|
|
|
|
# Update scan to failed
|
|
try:
|
|
if conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
UPDATE scans
|
|
SET status = 'failed',
|
|
error_message = ?,
|
|
completed_at = CURRENT_TIMESTAMP,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""", (str(e), scan_id))
|
|
conn.commit()
|
|
except Exception as update_error:
|
|
logger.error(f"[Scan {scan_id}] Failed to update error status: {str(update_error)}")
|
|
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
|
|
def start_scan_processor(scan_id: int):
|
|
"""
|
|
Start processing a scan as a background task.
|
|
|
|
Args:
|
|
scan_id: The ID of the scan to process
|
|
|
|
Returns:
|
|
asyncio.Task: The background task
|
|
"""
|
|
task = asyncio.create_task(process_scan(scan_id))
|
|
_running_tasks[scan_id] = task
|
|
task.add_done_callback(lambda _: _running_tasks.pop(scan_id, None))
|
|
logger.info(f"[Scan {scan_id}] Background task created")
|
|
return task
|
|
|
|
|
|
def start_resume_processor(scan_id: int):
|
|
"""
|
|
Resume processing a paused scan as a background task.
|
|
|
|
The API endpoint has already reset status to 'pending' and cleared counters.
|
|
process_scan() will transition the status to 'running' and re-run all routes,
|
|
getting instant cache hits for already-queried routes.
|
|
|
|
Args:
|
|
scan_id: The ID of the paused scan to resume
|
|
|
|
Returns:
|
|
asyncio.Task: The background task
|
|
"""
|
|
task = asyncio.create_task(process_scan(scan_id))
|
|
_running_tasks[scan_id] = task
|
|
task.add_done_callback(lambda _: _running_tasks.pop(scan_id, None))
|
|
logger.info(f"[Scan {scan_id}] Resume task created")
|
|
return task
|