Files
ciaovolo/flight-comparator/PRD_LIVE_ROUTES.md
domverse ce1cf667d2 feat: write routes live during scan instead of bulk-insert at completion
Routes and individual flights are now written to the database as each
query result arrives, rather than after all queries finish. The frontend
already polls /scans/:id/routes while status=running, so routes appear
progressively with no frontend changes needed.

Changes:
- database/schema.sql: UNIQUE INDEX uq_routes_scan_dest(scan_id, destination)
- database/init_db.py: _migrate_add_routes_unique_index() migration
- scan_processor.py: _write_route_incremental() helper; progress_callback
  now writes routes/flights immediately; Phase 2 bulk-write replaced with
  a lightweight totals query
- searcher_v3.py: pass flights= kwarg to progress_callback on cache_hit
  and api_success paths

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-27 20:53:04 +01:00

17 KiB
Raw Blame History

PRD: Live Routes During Active Scan

Status: Draft Date: 2026-02-27 Scope: Backend only — no frontend or API contract changes required


1. Problem

Routes and flights are only visible in the UI after a scan fully completes. For large scans (e.g., 30 destinations × 14 days = 420 queries), users stare at a progress bar with zero actionable data for potentially many minutes.

The root cause is a two-phase waterfall in scan_processor.py:

  • Phase 1 (line 170): await search_multiple_routes(...)asyncio.gather() waits for all queries to finish before returning anything.
  • Phase 2 (line 197): Bulk INSERT INTO routes and INSERT INTO flights only after Phase 1 completes.

2. Goal

Write each destination's route row and its individual flight rows to the database as soon as that destination's results arrive, rather than after all queries finish.

The frontend already polls /scans/:id/routes on an interval while status is running. No frontend changes are needed — routes will simply appear progressively.


3. Desired Behavior

Moment Before fix After fix
Query 1 completes (BER → MUC) Nothing visible MUC route + flights appear in UI
50% through scan 0 routes in UI ~50% of routes visible
Scan completes All routes appear at once No change (already visible)

4. Architecture Analysis

4.1 Event Loop Threading

progress_callback is called from within search_direct_flights (searcher_v3.py):

  • Cache hit (line 121): called directly in the async coroutine → event loop thread
  • API success (line 143): called after await asyncio.to_thread() returns → event loop thread
  • Error (line 159): same → event loop thread

All callback invocations happen on the single asyncio event loop thread. No locking is needed. The existing progress_callback in scan_processor.py already opens and closes a fresh get_connection() per call — we reuse the same pattern.

4.2 Why Multiple Callbacks Fire Per Destination

For a scan covering 14 dates, a single destination (e.g., MUC) gets 14 separate queries:

  • (BER, MUC, 2026-03-01), (BER, MUC, 2026-03-02), ... (BER, MUC, 2026-03-14)

Each query completes independently and fires progress_callback. The incremental write logic must merge successive results for the same destination into a single route row.

4.3 Required Schema Change

routes has no UNIQUE constraint on (scan_id, destination). We need one to support upsert semantics (detect "route already started" vs "first result for this destination").

Current schema (no uniqueness):

CREATE INDEX IF NOT EXISTS idx_routes_scan_id ON routes(scan_id);

Required addition:

CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest ON routes(scan_id, destination);

5. Changes Required

5.1 database/schema.sql

Add a unique index at the end of the routes indexes block:

-- Unique constraint: one route row per (scan, destination)
CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest
    ON routes(scan_id, destination);

5.2 database/init_db.py

Add a migration function to create the index on existing databases where it doesn't yet exist. Called before executescript(schema_sql).

def _migrate_add_routes_unique_index(conn, verbose=True):
    """
    Migration: Add UNIQUE index on routes(scan_id, destination).

    Required for incremental route writes during active scans.
    The index enables upsert (INSERT + UPDATE on conflict) semantics.

    Safe to run on existing data: if any (scan_id, destination) duplicates
    exist, we collapse them first (keep the row with more flights).
    """
    cursor = conn.execute("""
        SELECT name FROM sqlite_master
        WHERE type='index' AND name='uq_routes_scan_dest'
    """)
    if cursor.fetchone():
        return  # Already migrated

    if verbose:
        print("   🔄 Migrating routes table: adding unique index on (scan_id, destination)...")

    # Collapse any existing duplicates (completed scans may have none,
    # but guard against edge cases).
    conn.execute("""
        DELETE FROM routes
        WHERE id NOT IN (
            SELECT MIN(id)
            FROM routes
            GROUP BY scan_id, destination
        )
    """)

    # Create the unique index
    conn.execute("""
        CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest
        ON routes(scan_id, destination)
    """)
    conn.commit()

    if verbose:
        print("   ✅ Migration complete: uq_routes_scan_dest index created")

Wire it into initialize_database() before executescript:

# Apply migrations before running schema
_migrate_relax_country_constraint(conn, verbose)
_migrate_add_routes_unique_index(conn, verbose)   # ← ADD THIS

# Load and execute schema
schema_sql = load_schema()

5.3 scan_processor.py — Extend progress_callback

Current signature (line 138):

def progress_callback(origin: str, destination: str, date: str,
                      status: str, count: int, error: str = None):

New signature:

def progress_callback(origin: str, destination: str, date: str,
                      status: str, count: int, error: str = None,
                      flights: list = None):

New helper function (add before process_scan):

def _write_route_incremental(scan_id: int, destination: str,
                              dest_name: str, dest_city: str,
                              new_flights: list):
    """
    Write or update a route row and its flight rows incrementally.

    Called from progress_callback each time a query for (scan_id, destination)
    returns results. Merges into the existing route row if one already exists.

    Uses a read-then-write pattern so airlines JSON arrays can be merged in
    Python rather than in SQLite (SQLite has no native JSON array merge).

    Safe to call from the event loop thread — opens its own connection.
    """
    if not new_flights:
        return

    prices = [f.get('price') for f in new_flights if f.get('price')]
    if not prices:
        return  # No priced flights — nothing to aggregate

    new_airlines = list({f.get('airline') for f in new_flights if f.get('airline')})
    new_count = len(new_flights)
    new_min = min(prices)
    new_max = max(prices)
    new_avg = sum(prices) / len(prices)

    try:
        conn = get_connection()
        cursor = conn.cursor()

        # Check if a route row already exists for this destination
        cursor.execute("""
            SELECT id, flight_count, min_price, max_price, avg_price, airlines
            FROM routes
            WHERE scan_id = ? AND destination = ?
        """, (scan_id, destination))
        existing = cursor.fetchone()

        if existing is None:
            # First result for this destination — INSERT
            cursor.execute("""
                INSERT INTO routes (
                    scan_id, destination, destination_name, destination_city,
                    flight_count, airlines, min_price, max_price, avg_price
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                scan_id, destination, dest_name, dest_city,
                new_count, json.dumps(new_airlines),
                new_min, new_max, new_avg,
            ))
        else:
            # Subsequent result — merge stats
            old_count = existing['flight_count'] or 0
            old_min = existing['min_price']
            old_max = existing['max_price']
            old_avg = existing['avg_price'] or 0.0
            old_airlines = json.loads(existing['airlines']) if existing['airlines'] else []

            merged_count = old_count + new_count
            merged_min = min(old_min, new_min) if old_min is not None else new_min
            merged_max = max(old_max, new_max) if old_max is not None else new_max
            # Running weighted average
            merged_avg = (old_avg * old_count + new_avg * new_count) / merged_count
            merged_airlines = json.dumps(list(set(old_airlines) | set(new_airlines)))

            cursor.execute("""
                UPDATE routes
                SET flight_count = ?,
                    min_price    = ?,
                    max_price    = ?,
                    avg_price    = ?,
                    airlines     = ?
                WHERE scan_id = ? AND destination = ?
            """, (
                merged_count, merged_min, merged_max, merged_avg, merged_airlines,
                scan_id, destination,
            ))

        # INSERT individual flight rows (always new rows; duplicates not expected
        # because each (scan_id, destination, date) query fires exactly once)
        for flight in new_flights:
            if not flight.get('price'):
                continue
            cursor.execute("""
                INSERT INTO flights (
                    scan_id, destination, date, airline,
                    departure_time, arrival_time, price, stops
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                scan_id,
                destination,
                flight.get('date', ''),    # date passed from caller
                flight.get('airline'),
                flight.get('departure_time'),
                flight.get('arrival_time'),
                flight.get('price'),
                flight.get('stops', 0),
            ))

        conn.commit()
        conn.close()

    except Exception as e:
        logger.error(f"[Scan {scan_id}] Failed to write incremental route {destination}: {e}")

Note on flight date: Individual flights must carry their date when passed to _write_route_incremental. The callback receives date as a parameter, so attach it to each flight dict before passing: f['date'] = date.

Updated progress_callback inside process_scan:

def progress_callback(origin: str, destination: str, date: str,
                      status: str, count: int, error: str = None,
                      flights: list = None):
    nonlocal routes_scanned_count

    if status in ('cache_hit', 'api_success', 'error'):
        routes_scanned_count += 1

        # Write route + flights immediately if we have results
        if flights and status in ('cache_hit', 'api_success'):
            # Annotate each flight with its query date
            for f in flights:
                f['date'] = date
            dest_info = next((d for d in destinations if d['iata'] == destination), None)
            dest_name = dest_info.get('name', destination) if dest_info else destination
            dest_city = dest_info.get('city', '') if dest_info else ''
            _write_route_incremental(scan_id, destination, dest_name, dest_city, flights)

        # Update progress counter
        try:
            progress_conn = get_connection()
            progress_cursor = progress_conn.cursor()
            progress_cursor.execute("""
                UPDATE scans
                SET routes_scanned = routes_scanned + 1,
                    updated_at = CURRENT_TIMESTAMP
                WHERE id = ?
            """, (scan_id,))
            progress_conn.commit()
            progress_conn.close()

            if routes_scanned_count % 10 == 0:
                logger.info(f"[Scan {scan_id}] Progress: {routes_scanned_count}/{len(routes_to_scan)} routes ({status}: {origin}{destination})")

        except Exception as e:
            logger.error(f"[Scan {scan_id}] Failed to update progress: {str(e)}")

Replace Phase 2 (lines 182262 in process_scan):

Remove the existing bulk-write block. Replace with a lightweight totals-only block:

# Wait for all queries to complete
results = await search_multiple_routes(
    routes=routes_to_scan,
    seat_class=seat_class or 'economy',
    adults=adults or 1,
    use_cache=True,
    cache_threshold_hours=24,
    max_workers=3,
    progress_callback=progress_callback
)

logger.info(f"[Scan {scan_id}] All queries complete. Finalizing scan...")

# Count total flights (routes already written by callback)
total_flights = sum(len(flights) for flights in results.values())
routes_saved = cursor.execute(
    "SELECT COUNT(*) FROM routes WHERE scan_id = ?", (scan_id,)
).fetchone()[0]

logger.info(f"[Scan {scan_id}] ✅ {routes_saved} routes, {total_flights} flights")

5.4 searcher_v3.py — Pass flights to callback

In search_direct_flights, update both callback calls to pass flights=:

Cache hit (line 121):

if progress_callback:
    progress_callback(origin, destination, date, "cache_hit", len(cached), flights=cached)
return cached

API success (line 143):

if progress_callback:
    progress_callback(origin, destination, date, "api_success", len(result), flights=result)

The error callback (line 159) does not need flights= since there are no results.


6. Concurrency & Safety

Concern Analysis Verdict
Simultaneous writes to same route row Cannot happen: the event loop is single-threaded; callbacks are not concurrent. asyncio.gather runs tasks concurrently but yields at await points, and callbacks fire synchronously after each await to_thread returns. Safe
Two callbacks for same destination arriving "simultaneously" Impossible in single-threaded event loop. Second callback blocks until first completes (no await in callback). Safe
Duplicate flight rows Each (scan_id, destination, date) query fires exactly once; its flights are written exactly once. No duplicates
total_flights trigger still fires correctly SQLite triggers on INSERT INTO routes and UPDATE OF flight_count ON routes fire for each incremental write — counts stay accurate. Works
Scan completion total_flights update Still set explicitly at completion from results dict count — redundant but harmless. OK

7. Edge Cases

Case Handling
Destination returns 0 flights _write_route_incremental returns early — no row created. Route only appears if at least one priced flight found.
Scan is deleted mid-run DELETE CASCADE on scans removes routes/flights automatically. Progress callback write will fail with FK error, caught by except block and logged.
Scan fails mid-run Routes written so far remain in DB. Status set to failed. UI will show partial results with failed badge — acceptable.
DB write error in callback Logged, does not crash the scan. Query continues, flight data lost for that callback.
Existing scans (pre-feature) No impact. Migration adds index but doesn't change old data (all complete scans already have 1 row per destination).

8. Migration Plan

  1. database/schema.sql: Add CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest.
  2. database/init_db.py: Add _migrate_add_routes_unique_index() + call it in initialize_database().
  3. scan_processor.py: Add _write_route_incremental() helper; update progress_callback closure; remove bulk-write Phase 2.
  4. searcher_v3.py: Pass flights= kwarg to both successful callback invocations.

Migration is backward-safe: The UNIQUE index is added with IF NOT EXISTS. Existing completed scans already have at most 1 route row per destination — the index creation will succeed without errors.

No API changes: /scans/:id/routes endpoint already returns live data from the routes table. The frontend polling already works.


9. Rollback

To revert: remove the flights= kwarg from searcher_v3.py callbacks, restore the bulk-write Phase 2 in scan_processor.py, and remove _write_route_incremental. The UNIQUE index can remain — it only adds a constraint that is naturally satisfied by the bulk-write approach anyway.


10. Testing Plan

Unit Tests (new)

  1. test_write_route_incremental_new — first call creates route row
  2. test_write_route_incremental_merge — second call updates stats correctly
  3. test_write_route_incremental_no_prices — empty-price flights produce no row
  4. test_write_route_incremental_airlines_merge — duplicate airlines deduplicated
  5. test_weighted_average_formula — verify avg formula with known numbers

Integration Tests (extend existing)

  1. Extend test_scan_lifecycle — poll routes every 0.1s during mock scan, verify routes appear before completion
  2. test_incremental_writes_idempotent — simulate same callback called twice for same destination
  3. test_unique_index_exists — verify migration creates index
  4. test_migration_collapses_duplicates — seed duplicate route rows, run migration, verify collapsed

11. Out of Scope

  • WebSocket or SSE for push-based updates (polling already works)
  • Frontend changes (none needed)
  • Real-time price charts
  • Partial scan resume after crash
  • total_flights trigger removal (keep for consistency)