Routes and individual flights are now written to the database as each query result arrives, rather than after all queries finish. The frontend already polls /scans/:id/routes while status=running, so routes appear progressively with no frontend changes needed. Changes: - database/schema.sql: UNIQUE INDEX uq_routes_scan_dest(scan_id, destination) - database/init_db.py: _migrate_add_routes_unique_index() migration - scan_processor.py: _write_route_incremental() helper; progress_callback now writes routes/flights immediately; Phase 2 bulk-write replaced with a lightweight totals query - searcher_v3.py: pass flights= kwarg to progress_callback on cache_hit and api_success paths Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
17 KiB
PRD: Live Routes During Active Scan
Status: Draft Date: 2026-02-27 Scope: Backend only — no frontend or API contract changes required
1. Problem
Routes and flights are only visible in the UI after a scan fully completes. For large scans (e.g., 30 destinations × 14 days = 420 queries), users stare at a progress bar with zero actionable data for potentially many minutes.
The root cause is a two-phase waterfall in scan_processor.py:
- Phase 1 (line 170):
await search_multiple_routes(...)—asyncio.gather()waits for all queries to finish before returning anything. - Phase 2 (line 197): Bulk
INSERT INTO routesandINSERT INTO flightsonly after Phase 1 completes.
2. Goal
Write each destination's route row and its individual flight rows to the database as soon as that destination's results arrive, rather than after all queries finish.
The frontend already polls /scans/:id/routes on an interval while status is running. No frontend changes are needed — routes will simply appear progressively.
3. Desired Behavior
| Moment | Before fix | After fix |
|---|---|---|
| Query 1 completes (BER → MUC) | Nothing visible | MUC route + flights appear in UI |
| 50% through scan | 0 routes in UI | ~50% of routes visible |
| Scan completes | All routes appear at once | No change (already visible) |
4. Architecture Analysis
4.1 Event Loop Threading
progress_callback is called from within search_direct_flights (searcher_v3.py):
- Cache hit (line 121): called directly in the async coroutine → event loop thread
- API success (line 143): called after
await asyncio.to_thread()returns → event loop thread - Error (line 159): same → event loop thread
All callback invocations happen on the single asyncio event loop thread. No locking is needed. The existing progress_callback in scan_processor.py already opens and closes a fresh get_connection() per call — we reuse the same pattern.
4.2 Why Multiple Callbacks Fire Per Destination
For a scan covering 14 dates, a single destination (e.g., MUC) gets 14 separate queries:
(BER, MUC, 2026-03-01),(BER, MUC, 2026-03-02), ...(BER, MUC, 2026-03-14)
Each query completes independently and fires progress_callback. The incremental write logic must merge successive results for the same destination into a single route row.
4.3 Required Schema Change
routes has no UNIQUE constraint on (scan_id, destination). We need one to support upsert semantics (detect "route already started" vs "first result for this destination").
Current schema (no uniqueness):
CREATE INDEX IF NOT EXISTS idx_routes_scan_id ON routes(scan_id);
Required addition:
CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest ON routes(scan_id, destination);
5. Changes Required
5.1 database/schema.sql
Add a unique index at the end of the routes indexes block:
-- Unique constraint: one route row per (scan, destination)
CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest
ON routes(scan_id, destination);
5.2 database/init_db.py
Add a migration function to create the index on existing databases where it doesn't yet exist. Called before executescript(schema_sql).
def _migrate_add_routes_unique_index(conn, verbose=True):
"""
Migration: Add UNIQUE index on routes(scan_id, destination).
Required for incremental route writes during active scans.
The index enables upsert (INSERT + UPDATE on conflict) semantics.
Safe to run on existing data: if any (scan_id, destination) duplicates
exist, we collapse them first (keep the row with more flights).
"""
cursor = conn.execute("""
SELECT name FROM sqlite_master
WHERE type='index' AND name='uq_routes_scan_dest'
""")
if cursor.fetchone():
return # Already migrated
if verbose:
print(" 🔄 Migrating routes table: adding unique index on (scan_id, destination)...")
# Collapse any existing duplicates (completed scans may have none,
# but guard against edge cases).
conn.execute("""
DELETE FROM routes
WHERE id NOT IN (
SELECT MIN(id)
FROM routes
GROUP BY scan_id, destination
)
""")
# Create the unique index
conn.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest
ON routes(scan_id, destination)
""")
conn.commit()
if verbose:
print(" ✅ Migration complete: uq_routes_scan_dest index created")
Wire it into initialize_database() before executescript:
# Apply migrations before running schema
_migrate_relax_country_constraint(conn, verbose)
_migrate_add_routes_unique_index(conn, verbose) # ← ADD THIS
# Load and execute schema
schema_sql = load_schema()
5.3 scan_processor.py — Extend progress_callback
Current signature (line 138):
def progress_callback(origin: str, destination: str, date: str,
status: str, count: int, error: str = None):
New signature:
def progress_callback(origin: str, destination: str, date: str,
status: str, count: int, error: str = None,
flights: list = None):
New helper function (add before process_scan):
def _write_route_incremental(scan_id: int, destination: str,
dest_name: str, dest_city: str,
new_flights: list):
"""
Write or update a route row and its flight rows incrementally.
Called from progress_callback each time a query for (scan_id, destination)
returns results. Merges into the existing route row if one already exists.
Uses a read-then-write pattern so airlines JSON arrays can be merged in
Python rather than in SQLite (SQLite has no native JSON array merge).
Safe to call from the event loop thread — opens its own connection.
"""
if not new_flights:
return
prices = [f.get('price') for f in new_flights if f.get('price')]
if not prices:
return # No priced flights — nothing to aggregate
new_airlines = list({f.get('airline') for f in new_flights if f.get('airline')})
new_count = len(new_flights)
new_min = min(prices)
new_max = max(prices)
new_avg = sum(prices) / len(prices)
try:
conn = get_connection()
cursor = conn.cursor()
# Check if a route row already exists for this destination
cursor.execute("""
SELECT id, flight_count, min_price, max_price, avg_price, airlines
FROM routes
WHERE scan_id = ? AND destination = ?
""", (scan_id, destination))
existing = cursor.fetchone()
if existing is None:
# First result for this destination — INSERT
cursor.execute("""
INSERT INTO routes (
scan_id, destination, destination_name, destination_city,
flight_count, airlines, min_price, max_price, avg_price
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
scan_id, destination, dest_name, dest_city,
new_count, json.dumps(new_airlines),
new_min, new_max, new_avg,
))
else:
# Subsequent result — merge stats
old_count = existing['flight_count'] or 0
old_min = existing['min_price']
old_max = existing['max_price']
old_avg = existing['avg_price'] or 0.0
old_airlines = json.loads(existing['airlines']) if existing['airlines'] else []
merged_count = old_count + new_count
merged_min = min(old_min, new_min) if old_min is not None else new_min
merged_max = max(old_max, new_max) if old_max is not None else new_max
# Running weighted average
merged_avg = (old_avg * old_count + new_avg * new_count) / merged_count
merged_airlines = json.dumps(list(set(old_airlines) | set(new_airlines)))
cursor.execute("""
UPDATE routes
SET flight_count = ?,
min_price = ?,
max_price = ?,
avg_price = ?,
airlines = ?
WHERE scan_id = ? AND destination = ?
""", (
merged_count, merged_min, merged_max, merged_avg, merged_airlines,
scan_id, destination,
))
# INSERT individual flight rows (always new rows; duplicates not expected
# because each (scan_id, destination, date) query fires exactly once)
for flight in new_flights:
if not flight.get('price'):
continue
cursor.execute("""
INSERT INTO flights (
scan_id, destination, date, airline,
departure_time, arrival_time, price, stops
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
scan_id,
destination,
flight.get('date', ''), # date passed from caller
flight.get('airline'),
flight.get('departure_time'),
flight.get('arrival_time'),
flight.get('price'),
flight.get('stops', 0),
))
conn.commit()
conn.close()
except Exception as e:
logger.error(f"[Scan {scan_id}] Failed to write incremental route {destination}: {e}")
Note on flight date: Individual flights must carry their date when passed to _write_route_incremental. The callback receives date as a parameter, so attach it to each flight dict before passing: f['date'] = date.
Updated progress_callback inside process_scan:
def progress_callback(origin: str, destination: str, date: str,
status: str, count: int, error: str = None,
flights: list = None):
nonlocal routes_scanned_count
if status in ('cache_hit', 'api_success', 'error'):
routes_scanned_count += 1
# Write route + flights immediately if we have results
if flights and status in ('cache_hit', 'api_success'):
# Annotate each flight with its query date
for f in flights:
f['date'] = date
dest_info = next((d for d in destinations if d['iata'] == destination), None)
dest_name = dest_info.get('name', destination) if dest_info else destination
dest_city = dest_info.get('city', '') if dest_info else ''
_write_route_incremental(scan_id, destination, dest_name, dest_city, flights)
# Update progress counter
try:
progress_conn = get_connection()
progress_cursor = progress_conn.cursor()
progress_cursor.execute("""
UPDATE scans
SET routes_scanned = routes_scanned + 1,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (scan_id,))
progress_conn.commit()
progress_conn.close()
if routes_scanned_count % 10 == 0:
logger.info(f"[Scan {scan_id}] Progress: {routes_scanned_count}/{len(routes_to_scan)} routes ({status}: {origin}→{destination})")
except Exception as e:
logger.error(f"[Scan {scan_id}] Failed to update progress: {str(e)}")
Replace Phase 2 (lines 182–262 in process_scan):
Remove the existing bulk-write block. Replace with a lightweight totals-only block:
# Wait for all queries to complete
results = await search_multiple_routes(
routes=routes_to_scan,
seat_class=seat_class or 'economy',
adults=adults or 1,
use_cache=True,
cache_threshold_hours=24,
max_workers=3,
progress_callback=progress_callback
)
logger.info(f"[Scan {scan_id}] All queries complete. Finalizing scan...")
# Count total flights (routes already written by callback)
total_flights = sum(len(flights) for flights in results.values())
routes_saved = cursor.execute(
"SELECT COUNT(*) FROM routes WHERE scan_id = ?", (scan_id,)
).fetchone()[0]
logger.info(f"[Scan {scan_id}] ✅ {routes_saved} routes, {total_flights} flights")
5.4 searcher_v3.py — Pass flights to callback
In search_direct_flights, update both callback calls to pass flights=:
Cache hit (line 121):
if progress_callback:
progress_callback(origin, destination, date, "cache_hit", len(cached), flights=cached)
return cached
API success (line 143):
if progress_callback:
progress_callback(origin, destination, date, "api_success", len(result), flights=result)
The error callback (line 159) does not need flights= since there are no results.
6. Concurrency & Safety
| Concern | Analysis | Verdict |
|---|---|---|
| Simultaneous writes to same route row | Cannot happen: the event loop is single-threaded; callbacks are not concurrent. asyncio.gather runs tasks concurrently but yields at await points, and callbacks fire synchronously after each await to_thread returns. |
✅ Safe |
| Two callbacks for same destination arriving "simultaneously" | Impossible in single-threaded event loop. Second callback blocks until first completes (no await in callback). |
✅ Safe |
| Duplicate flight rows | Each (scan_id, destination, date) query fires exactly once; its flights are written exactly once. |
✅ No duplicates |
total_flights trigger still fires correctly |
SQLite triggers on INSERT INTO routes and UPDATE OF flight_count ON routes fire for each incremental write — counts stay accurate. |
✅ Works |
Scan completion total_flights update |
Still set explicitly at completion from results dict count — redundant but harmless. |
✅ OK |
7. Edge Cases
| Case | Handling |
|---|---|
| Destination returns 0 flights | _write_route_incremental returns early — no row created. Route only appears if at least one priced flight found. |
| Scan is deleted mid-run | DELETE CASCADE on scans removes routes/flights automatically. Progress callback write will fail with FK error, caught by except block and logged. |
| Scan fails mid-run | Routes written so far remain in DB. Status set to failed. UI will show partial results with failed badge — acceptable. |
| DB write error in callback | Logged, does not crash the scan. Query continues, flight data lost for that callback. |
| Existing scans (pre-feature) | No impact. Migration adds index but doesn't change old data (all complete scans already have 1 row per destination). |
8. Migration Plan
database/schema.sql: AddCREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest.database/init_db.py: Add_migrate_add_routes_unique_index()+ call it ininitialize_database().scan_processor.py: Add_write_route_incremental()helper; updateprogress_callbackclosure; remove bulk-write Phase 2.searcher_v3.py: Passflights=kwarg to both successful callback invocations.
Migration is backward-safe: The UNIQUE index is added with IF NOT EXISTS. Existing completed scans already have at most 1 route row per destination — the index creation will succeed without errors.
No API changes: /scans/:id/routes endpoint already returns live data from the routes table. The frontend polling already works.
9. Rollback
To revert: remove the flights= kwarg from searcher_v3.py callbacks, restore the bulk-write Phase 2 in scan_processor.py, and remove _write_route_incremental. The UNIQUE index can remain — it only adds a constraint that is naturally satisfied by the bulk-write approach anyway.
10. Testing Plan
Unit Tests (new)
test_write_route_incremental_new— first call creates route rowtest_write_route_incremental_merge— second call updates stats correctlytest_write_route_incremental_no_prices— empty-price flights produce no rowtest_write_route_incremental_airlines_merge— duplicate airlines deduplicatedtest_weighted_average_formula— verify avg formula with known numbers
Integration Tests (extend existing)
- Extend
test_scan_lifecycle— poll routes every 0.1s during mock scan, verify routes appear before completion test_incremental_writes_idempotent— simulate same callback called twice for same destinationtest_unique_index_exists— verify migration creates indextest_migration_collapses_duplicates— seed duplicate route rows, run migration, verify collapsed
11. Out of Scope
- WebSocket or SSE for push-based updates (polling already works)
- Frontend changes (none needed)
- Real-time price charts
- Partial scan resume after crash
total_flightstrigger removal (keep for consistency)