# PRD: Live Routes During Active Scan **Status:** Draft **Date:** 2026-02-27 **Scope:** Backend only — no frontend or API contract changes required --- ## 1. Problem Routes and flights are only visible in the UI **after a scan fully completes**. For large scans (e.g., 30 destinations × 14 days = 420 queries), users stare at a progress bar with zero actionable data for potentially many minutes. The root cause is a two-phase waterfall in `scan_processor.py`: - **Phase 1 (line 170):** `await search_multiple_routes(...)` — `asyncio.gather()` waits for **all** queries to finish before returning anything. - **Phase 2 (line 197):** Bulk `INSERT INTO routes` and `INSERT INTO flights` only after Phase 1 completes. --- ## 2. Goal Write each destination's route row and its individual flight rows to the database **as soon as that destination's results arrive**, rather than after all queries finish. The frontend already polls `/scans/:id/routes` on an interval while status is `running`. No frontend changes are needed — routes will simply appear progressively. --- ## 3. Desired Behavior | Moment | Before fix | After fix | |--------|-----------|-----------| | Query 1 completes (BER → MUC) | Nothing visible | MUC route + flights appear in UI | | 50% through scan | 0 routes in UI | ~50% of routes visible | | Scan completes | All routes appear at once | No change (already visible) | --- ## 4. Architecture Analysis ### 4.1 Event Loop Threading `progress_callback` is called from within `search_direct_flights` (`searcher_v3.py`): - **Cache hit** (line 121): called directly in the async coroutine → **event loop thread** - **API success** (line 143): called after `await asyncio.to_thread()` returns → **event loop thread** - **Error** (line 159): same → **event loop thread** All callback invocations happen on the **single asyncio event loop thread**. No locking is needed. The existing `progress_callback` in `scan_processor.py` already opens and closes a fresh `get_connection()` per call — we reuse the same pattern. ### 4.2 Why Multiple Callbacks Fire Per Destination For a scan covering 14 dates, a single destination (e.g., MUC) gets 14 separate queries: - `(BER, MUC, 2026-03-01)`, `(BER, MUC, 2026-03-02)`, ... `(BER, MUC, 2026-03-14)` Each query completes independently and fires `progress_callback`. The incremental write logic must **merge** successive results for the same destination into a single route row. ### 4.3 Required Schema Change `routes` has no UNIQUE constraint on `(scan_id, destination)`. We need one to support upsert semantics (detect "route already started" vs "first result for this destination"). Current schema (no uniqueness): ```sql CREATE INDEX IF NOT EXISTS idx_routes_scan_id ON routes(scan_id); ``` Required addition: ```sql CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest ON routes(scan_id, destination); ``` --- ## 5. Changes Required ### 5.1 `database/schema.sql` Add a unique index at the end of the routes indexes block: ```sql -- Unique constraint: one route row per (scan, destination) CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest ON routes(scan_id, destination); ``` ### 5.2 `database/init_db.py` Add a migration function to create the index on existing databases where it doesn't yet exist. Called before `executescript(schema_sql)`. ```python def _migrate_add_routes_unique_index(conn, verbose=True): """ Migration: Add UNIQUE index on routes(scan_id, destination). Required for incremental route writes during active scans. The index enables upsert (INSERT + UPDATE on conflict) semantics. Safe to run on existing data: if any (scan_id, destination) duplicates exist, we collapse them first (keep the row with more flights). """ cursor = conn.execute(""" SELECT name FROM sqlite_master WHERE type='index' AND name='uq_routes_scan_dest' """) if cursor.fetchone(): return # Already migrated if verbose: print(" 🔄 Migrating routes table: adding unique index on (scan_id, destination)...") # Collapse any existing duplicates (completed scans may have none, # but guard against edge cases). conn.execute(""" DELETE FROM routes WHERE id NOT IN ( SELECT MIN(id) FROM routes GROUP BY scan_id, destination ) """) # Create the unique index conn.execute(""" CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest ON routes(scan_id, destination) """) conn.commit() if verbose: print(" ✅ Migration complete: uq_routes_scan_dest index created") ``` Wire it into `initialize_database()` before `executescript`: ```python # Apply migrations before running schema _migrate_relax_country_constraint(conn, verbose) _migrate_add_routes_unique_index(conn, verbose) # ← ADD THIS # Load and execute schema schema_sql = load_schema() ``` ### 5.3 `scan_processor.py` — Extend `progress_callback` #### Current signature (line 138): ```python def progress_callback(origin: str, destination: str, date: str, status: str, count: int, error: str = None): ``` #### New signature: ```python def progress_callback(origin: str, destination: str, date: str, status: str, count: int, error: str = None, flights: list = None): ``` #### New helper function (add before `process_scan`): ```python def _write_route_incremental(scan_id: int, destination: str, dest_name: str, dest_city: str, new_flights: list): """ Write or update a route row and its flight rows incrementally. Called from progress_callback each time a query for (scan_id, destination) returns results. Merges into the existing route row if one already exists. Uses a read-then-write pattern so airlines JSON arrays can be merged in Python rather than in SQLite (SQLite has no native JSON array merge). Safe to call from the event loop thread — opens its own connection. """ if not new_flights: return prices = [f.get('price') for f in new_flights if f.get('price')] if not prices: return # No priced flights — nothing to aggregate new_airlines = list({f.get('airline') for f in new_flights if f.get('airline')}) new_count = len(new_flights) new_min = min(prices) new_max = max(prices) new_avg = sum(prices) / len(prices) try: conn = get_connection() cursor = conn.cursor() # Check if a route row already exists for this destination cursor.execute(""" SELECT id, flight_count, min_price, max_price, avg_price, airlines FROM routes WHERE scan_id = ? AND destination = ? """, (scan_id, destination)) existing = cursor.fetchone() if existing is None: # First result for this destination — INSERT cursor.execute(""" INSERT INTO routes ( scan_id, destination, destination_name, destination_city, flight_count, airlines, min_price, max_price, avg_price ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( scan_id, destination, dest_name, dest_city, new_count, json.dumps(new_airlines), new_min, new_max, new_avg, )) else: # Subsequent result — merge stats old_count = existing['flight_count'] or 0 old_min = existing['min_price'] old_max = existing['max_price'] old_avg = existing['avg_price'] or 0.0 old_airlines = json.loads(existing['airlines']) if existing['airlines'] else [] merged_count = old_count + new_count merged_min = min(old_min, new_min) if old_min is not None else new_min merged_max = max(old_max, new_max) if old_max is not None else new_max # Running weighted average merged_avg = (old_avg * old_count + new_avg * new_count) / merged_count merged_airlines = json.dumps(list(set(old_airlines) | set(new_airlines))) cursor.execute(""" UPDATE routes SET flight_count = ?, min_price = ?, max_price = ?, avg_price = ?, airlines = ? WHERE scan_id = ? AND destination = ? """, ( merged_count, merged_min, merged_max, merged_avg, merged_airlines, scan_id, destination, )) # INSERT individual flight rows (always new rows; duplicates not expected # because each (scan_id, destination, date) query fires exactly once) for flight in new_flights: if not flight.get('price'): continue cursor.execute(""" INSERT INTO flights ( scan_id, destination, date, airline, departure_time, arrival_time, price, stops ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( scan_id, destination, flight.get('date', ''), # date passed from caller flight.get('airline'), flight.get('departure_time'), flight.get('arrival_time'), flight.get('price'), flight.get('stops', 0), )) conn.commit() conn.close() except Exception as e: logger.error(f"[Scan {scan_id}] Failed to write incremental route {destination}: {e}") ``` **Note on flight date:** Individual flights must carry their `date` when passed to `_write_route_incremental`. The callback receives `date` as a parameter, so attach it to each flight dict before passing: `f['date'] = date`. #### Updated `progress_callback` inside `process_scan`: ```python def progress_callback(origin: str, destination: str, date: str, status: str, count: int, error: str = None, flights: list = None): nonlocal routes_scanned_count if status in ('cache_hit', 'api_success', 'error'): routes_scanned_count += 1 # Write route + flights immediately if we have results if flights and status in ('cache_hit', 'api_success'): # Annotate each flight with its query date for f in flights: f['date'] = date dest_info = next((d for d in destinations if d['iata'] == destination), None) dest_name = dest_info.get('name', destination) if dest_info else destination dest_city = dest_info.get('city', '') if dest_info else '' _write_route_incremental(scan_id, destination, dest_name, dest_city, flights) # Update progress counter try: progress_conn = get_connection() progress_cursor = progress_conn.cursor() progress_cursor.execute(""" UPDATE scans SET routes_scanned = routes_scanned + 1, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (scan_id,)) progress_conn.commit() progress_conn.close() if routes_scanned_count % 10 == 0: logger.info(f"[Scan {scan_id}] Progress: {routes_scanned_count}/{len(routes_to_scan)} routes ({status}: {origin}→{destination})") except Exception as e: logger.error(f"[Scan {scan_id}] Failed to update progress: {str(e)}") ``` #### Replace Phase 2 (lines 182–262 in `process_scan`): Remove the existing bulk-write block. Replace with a lightweight totals-only block: ```python # Wait for all queries to complete results = await search_multiple_routes( routes=routes_to_scan, seat_class=seat_class or 'economy', adults=adults or 1, use_cache=True, cache_threshold_hours=24, max_workers=3, progress_callback=progress_callback ) logger.info(f"[Scan {scan_id}] All queries complete. Finalizing scan...") # Count total flights (routes already written by callback) total_flights = sum(len(flights) for flights in results.values()) routes_saved = cursor.execute( "SELECT COUNT(*) FROM routes WHERE scan_id = ?", (scan_id,) ).fetchone()[0] logger.info(f"[Scan {scan_id}] ✅ {routes_saved} routes, {total_flights} flights") ``` ### 5.4 `searcher_v3.py` — Pass `flights` to callback In `search_direct_flights`, update both callback calls to pass `flights=`: **Cache hit (line 121):** ```python if progress_callback: progress_callback(origin, destination, date, "cache_hit", len(cached), flights=cached) return cached ``` **API success (line 143):** ```python if progress_callback: progress_callback(origin, destination, date, "api_success", len(result), flights=result) ``` The `error` callback (line 159) does not need `flights=` since there are no results. --- ## 6. Concurrency & Safety | Concern | Analysis | Verdict | |---------|----------|---------| | Simultaneous writes to same route row | Cannot happen: the event loop is single-threaded; callbacks are not concurrent. `asyncio.gather` runs tasks concurrently but yields at `await` points, and callbacks fire synchronously after each `await to_thread` returns. | ✅ Safe | | Two callbacks for same destination arriving "simultaneously" | Impossible in single-threaded event loop. Second callback blocks until first completes (no `await` in callback). | ✅ Safe | | Duplicate flight rows | Each `(scan_id, destination, date)` query fires exactly once; its flights are written exactly once. | ✅ No duplicates | | `total_flights` trigger still fires correctly | SQLite triggers on `INSERT INTO routes` and `UPDATE OF flight_count ON routes` fire for each incremental write — counts stay accurate. | ✅ Works | | Scan completion `total_flights` update | Still set explicitly at completion from `results` dict count — redundant but harmless. | ✅ OK | --- ## 7. Edge Cases | Case | Handling | |------|----------| | Destination returns 0 flights | `_write_route_incremental` returns early — no row created. Route only appears if at least one priced flight found. | | Scan is deleted mid-run | `DELETE CASCADE` on `scans` removes routes/flights automatically. Progress callback write will fail with FK error, caught by `except` block and logged. | | Scan fails mid-run | Routes written so far remain in DB. Status set to `failed`. UI will show partial results with `failed` badge — acceptable. | | DB write error in callback | Logged, does not crash the scan. Query continues, flight data lost for that callback. | | Existing scans (pre-feature) | No impact. Migration adds index but doesn't change old data (all complete scans already have 1 row per destination). | --- ## 8. Migration Plan 1. **`database/schema.sql`**: Add `CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest`. 2. **`database/init_db.py`**: Add `_migrate_add_routes_unique_index()` + call it in `initialize_database()`. 3. **`scan_processor.py`**: Add `_write_route_incremental()` helper; update `progress_callback` closure; remove bulk-write Phase 2. 4. **`searcher_v3.py`**: Pass `flights=` kwarg to both successful callback invocations. **Migration is backward-safe:** The UNIQUE index is added with `IF NOT EXISTS`. Existing `completed` scans already have at most 1 route row per destination — the index creation will succeed without errors. **No API changes:** `/scans/:id/routes` endpoint already returns live data from the `routes` table. The frontend polling already works. --- ## 9. Rollback To revert: remove the `flights=` kwarg from `searcher_v3.py` callbacks, restore the bulk-write Phase 2 in `scan_processor.py`, and remove `_write_route_incremental`. The UNIQUE index can remain — it only adds a constraint that is naturally satisfied by the bulk-write approach anyway. --- ## 10. Testing Plan ### Unit Tests (new) 1. `test_write_route_incremental_new` — first call creates route row 2. `test_write_route_incremental_merge` — second call updates stats correctly 3. `test_write_route_incremental_no_prices` — empty-price flights produce no row 4. `test_write_route_incremental_airlines_merge` — duplicate airlines deduplicated 5. `test_weighted_average_formula` — verify avg formula with known numbers ### Integration Tests (extend existing) 6. Extend `test_scan_lifecycle` — poll routes every 0.1s during mock scan, verify routes appear before completion 7. `test_incremental_writes_idempotent` — simulate same callback called twice for same destination 8. `test_unique_index_exists` — verify migration creates index 9. `test_migration_collapses_duplicates` — seed duplicate route rows, run migration, verify collapsed --- ## 11. Out of Scope - WebSocket or SSE for push-based updates (polling already works) - Frontend changes (none needed) - Real-time price charts - Partial scan resume after crash - `total_flights` trigger removal (keep for consistency)