Files
ciaovolo/flight-comparator/PRD_LIVE_ROUTES.md
domverse ce1cf667d2 feat: write routes live during scan instead of bulk-insert at completion
Routes and individual flights are now written to the database as each
query result arrives, rather than after all queries finish. The frontend
already polls /scans/:id/routes while status=running, so routes appear
progressively with no frontend changes needed.

Changes:
- database/schema.sql: UNIQUE INDEX uq_routes_scan_dest(scan_id, destination)
- database/init_db.py: _migrate_add_routes_unique_index() migration
- scan_processor.py: _write_route_incremental() helper; progress_callback
  now writes routes/flights immediately; Phase 2 bulk-write replaced with
  a lightweight totals query
- searcher_v3.py: pass flights= kwarg to progress_callback on cache_hit
  and api_success paths

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-27 20:53:04 +01:00

428 lines
17 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# PRD: Live Routes During Active Scan
**Status:** Draft
**Date:** 2026-02-27
**Scope:** Backend only — no frontend or API contract changes required
---
## 1. Problem
Routes and flights are only visible in the UI **after a scan fully completes**. For large scans (e.g., 30 destinations × 14 days = 420 queries), users stare at a progress bar with zero actionable data for potentially many minutes.
The root cause is a two-phase waterfall in `scan_processor.py`:
- **Phase 1 (line 170):** `await search_multiple_routes(...)``asyncio.gather()` waits for **all** queries to finish before returning anything.
- **Phase 2 (line 197):** Bulk `INSERT INTO routes` and `INSERT INTO flights` only after Phase 1 completes.
---
## 2. Goal
Write each destination's route row and its individual flight rows to the database **as soon as that destination's results arrive**, rather than after all queries finish.
The frontend already polls `/scans/:id/routes` on an interval while status is `running`. No frontend changes are needed — routes will simply appear progressively.
---
## 3. Desired Behavior
| Moment | Before fix | After fix |
|--------|-----------|-----------|
| Query 1 completes (BER → MUC) | Nothing visible | MUC route + flights appear in UI |
| 50% through scan | 0 routes in UI | ~50% of routes visible |
| Scan completes | All routes appear at once | No change (already visible) |
---
## 4. Architecture Analysis
### 4.1 Event Loop Threading
`progress_callback` is called from within `search_direct_flights` (`searcher_v3.py`):
- **Cache hit** (line 121): called directly in the async coroutine → **event loop thread**
- **API success** (line 143): called after `await asyncio.to_thread()` returns → **event loop thread**
- **Error** (line 159): same → **event loop thread**
All callback invocations happen on the **single asyncio event loop thread**. No locking is needed. The existing `progress_callback` in `scan_processor.py` already opens and closes a fresh `get_connection()` per call — we reuse the same pattern.
### 4.2 Why Multiple Callbacks Fire Per Destination
For a scan covering 14 dates, a single destination (e.g., MUC) gets 14 separate queries:
- `(BER, MUC, 2026-03-01)`, `(BER, MUC, 2026-03-02)`, ... `(BER, MUC, 2026-03-14)`
Each query completes independently and fires `progress_callback`. The incremental write logic must **merge** successive results for the same destination into a single route row.
### 4.3 Required Schema Change
`routes` has no UNIQUE constraint on `(scan_id, destination)`. We need one to support upsert semantics (detect "route already started" vs "first result for this destination").
Current schema (no uniqueness):
```sql
CREATE INDEX IF NOT EXISTS idx_routes_scan_id ON routes(scan_id);
```
Required addition:
```sql
CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest ON routes(scan_id, destination);
```
---
## 5. Changes Required
### 5.1 `database/schema.sql`
Add a unique index at the end of the routes indexes block:
```sql
-- Unique constraint: one route row per (scan, destination)
CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest
ON routes(scan_id, destination);
```
### 5.2 `database/init_db.py`
Add a migration function to create the index on existing databases where it doesn't yet exist. Called before `executescript(schema_sql)`.
```python
def _migrate_add_routes_unique_index(conn, verbose=True):
"""
Migration: Add UNIQUE index on routes(scan_id, destination).
Required for incremental route writes during active scans.
The index enables upsert (INSERT + UPDATE on conflict) semantics.
Safe to run on existing data: if any (scan_id, destination) duplicates
exist, we collapse them first (keep the row with more flights).
"""
cursor = conn.execute("""
SELECT name FROM sqlite_master
WHERE type='index' AND name='uq_routes_scan_dest'
""")
if cursor.fetchone():
return # Already migrated
if verbose:
print(" 🔄 Migrating routes table: adding unique index on (scan_id, destination)...")
# Collapse any existing duplicates (completed scans may have none,
# but guard against edge cases).
conn.execute("""
DELETE FROM routes
WHERE id NOT IN (
SELECT MIN(id)
FROM routes
GROUP BY scan_id, destination
)
""")
# Create the unique index
conn.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest
ON routes(scan_id, destination)
""")
conn.commit()
if verbose:
print(" ✅ Migration complete: uq_routes_scan_dest index created")
```
Wire it into `initialize_database()` before `executescript`:
```python
# Apply migrations before running schema
_migrate_relax_country_constraint(conn, verbose)
_migrate_add_routes_unique_index(conn, verbose) # ← ADD THIS
# Load and execute schema
schema_sql = load_schema()
```
### 5.3 `scan_processor.py` — Extend `progress_callback`
#### Current signature (line 138):
```python
def progress_callback(origin: str, destination: str, date: str,
status: str, count: int, error: str = None):
```
#### New signature:
```python
def progress_callback(origin: str, destination: str, date: str,
status: str, count: int, error: str = None,
flights: list = None):
```
#### New helper function (add before `process_scan`):
```python
def _write_route_incremental(scan_id: int, destination: str,
dest_name: str, dest_city: str,
new_flights: list):
"""
Write or update a route row and its flight rows incrementally.
Called from progress_callback each time a query for (scan_id, destination)
returns results. Merges into the existing route row if one already exists.
Uses a read-then-write pattern so airlines JSON arrays can be merged in
Python rather than in SQLite (SQLite has no native JSON array merge).
Safe to call from the event loop thread — opens its own connection.
"""
if not new_flights:
return
prices = [f.get('price') for f in new_flights if f.get('price')]
if not prices:
return # No priced flights — nothing to aggregate
new_airlines = list({f.get('airline') for f in new_flights if f.get('airline')})
new_count = len(new_flights)
new_min = min(prices)
new_max = max(prices)
new_avg = sum(prices) / len(prices)
try:
conn = get_connection()
cursor = conn.cursor()
# Check if a route row already exists for this destination
cursor.execute("""
SELECT id, flight_count, min_price, max_price, avg_price, airlines
FROM routes
WHERE scan_id = ? AND destination = ?
""", (scan_id, destination))
existing = cursor.fetchone()
if existing is None:
# First result for this destination — INSERT
cursor.execute("""
INSERT INTO routes (
scan_id, destination, destination_name, destination_city,
flight_count, airlines, min_price, max_price, avg_price
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
scan_id, destination, dest_name, dest_city,
new_count, json.dumps(new_airlines),
new_min, new_max, new_avg,
))
else:
# Subsequent result — merge stats
old_count = existing['flight_count'] or 0
old_min = existing['min_price']
old_max = existing['max_price']
old_avg = existing['avg_price'] or 0.0
old_airlines = json.loads(existing['airlines']) if existing['airlines'] else []
merged_count = old_count + new_count
merged_min = min(old_min, new_min) if old_min is not None else new_min
merged_max = max(old_max, new_max) if old_max is not None else new_max
# Running weighted average
merged_avg = (old_avg * old_count + new_avg * new_count) / merged_count
merged_airlines = json.dumps(list(set(old_airlines) | set(new_airlines)))
cursor.execute("""
UPDATE routes
SET flight_count = ?,
min_price = ?,
max_price = ?,
avg_price = ?,
airlines = ?
WHERE scan_id = ? AND destination = ?
""", (
merged_count, merged_min, merged_max, merged_avg, merged_airlines,
scan_id, destination,
))
# INSERT individual flight rows (always new rows; duplicates not expected
# because each (scan_id, destination, date) query fires exactly once)
for flight in new_flights:
if not flight.get('price'):
continue
cursor.execute("""
INSERT INTO flights (
scan_id, destination, date, airline,
departure_time, arrival_time, price, stops
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
scan_id,
destination,
flight.get('date', ''), # date passed from caller
flight.get('airline'),
flight.get('departure_time'),
flight.get('arrival_time'),
flight.get('price'),
flight.get('stops', 0),
))
conn.commit()
conn.close()
except Exception as e:
logger.error(f"[Scan {scan_id}] Failed to write incremental route {destination}: {e}")
```
**Note on flight date:** Individual flights must carry their `date` when passed to `_write_route_incremental`. The callback receives `date` as a parameter, so attach it to each flight dict before passing: `f['date'] = date`.
#### Updated `progress_callback` inside `process_scan`:
```python
def progress_callback(origin: str, destination: str, date: str,
status: str, count: int, error: str = None,
flights: list = None):
nonlocal routes_scanned_count
if status in ('cache_hit', 'api_success', 'error'):
routes_scanned_count += 1
# Write route + flights immediately if we have results
if flights and status in ('cache_hit', 'api_success'):
# Annotate each flight with its query date
for f in flights:
f['date'] = date
dest_info = next((d for d in destinations if d['iata'] == destination), None)
dest_name = dest_info.get('name', destination) if dest_info else destination
dest_city = dest_info.get('city', '') if dest_info else ''
_write_route_incremental(scan_id, destination, dest_name, dest_city, flights)
# Update progress counter
try:
progress_conn = get_connection()
progress_cursor = progress_conn.cursor()
progress_cursor.execute("""
UPDATE scans
SET routes_scanned = routes_scanned + 1,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (scan_id,))
progress_conn.commit()
progress_conn.close()
if routes_scanned_count % 10 == 0:
logger.info(f"[Scan {scan_id}] Progress: {routes_scanned_count}/{len(routes_to_scan)} routes ({status}: {origin}{destination})")
except Exception as e:
logger.error(f"[Scan {scan_id}] Failed to update progress: {str(e)}")
```
#### Replace Phase 2 (lines 182262 in `process_scan`):
Remove the existing bulk-write block. Replace with a lightweight totals-only block:
```python
# Wait for all queries to complete
results = await search_multiple_routes(
routes=routes_to_scan,
seat_class=seat_class or 'economy',
adults=adults or 1,
use_cache=True,
cache_threshold_hours=24,
max_workers=3,
progress_callback=progress_callback
)
logger.info(f"[Scan {scan_id}] All queries complete. Finalizing scan...")
# Count total flights (routes already written by callback)
total_flights = sum(len(flights) for flights in results.values())
routes_saved = cursor.execute(
"SELECT COUNT(*) FROM routes WHERE scan_id = ?", (scan_id,)
).fetchone()[0]
logger.info(f"[Scan {scan_id}] ✅ {routes_saved} routes, {total_flights} flights")
```
### 5.4 `searcher_v3.py` — Pass `flights` to callback
In `search_direct_flights`, update both callback calls to pass `flights=`:
**Cache hit (line 121):**
```python
if progress_callback:
progress_callback(origin, destination, date, "cache_hit", len(cached), flights=cached)
return cached
```
**API success (line 143):**
```python
if progress_callback:
progress_callback(origin, destination, date, "api_success", len(result), flights=result)
```
The `error` callback (line 159) does not need `flights=` since there are no results.
---
## 6. Concurrency & Safety
| Concern | Analysis | Verdict |
|---------|----------|---------|
| Simultaneous writes to same route row | Cannot happen: the event loop is single-threaded; callbacks are not concurrent. `asyncio.gather` runs tasks concurrently but yields at `await` points, and callbacks fire synchronously after each `await to_thread` returns. | ✅ Safe |
| Two callbacks for same destination arriving "simultaneously" | Impossible in single-threaded event loop. Second callback blocks until first completes (no `await` in callback). | ✅ Safe |
| Duplicate flight rows | Each `(scan_id, destination, date)` query fires exactly once; its flights are written exactly once. | ✅ No duplicates |
| `total_flights` trigger still fires correctly | SQLite triggers on `INSERT INTO routes` and `UPDATE OF flight_count ON routes` fire for each incremental write — counts stay accurate. | ✅ Works |
| Scan completion `total_flights` update | Still set explicitly at completion from `results` dict count — redundant but harmless. | ✅ OK |
---
## 7. Edge Cases
| Case | Handling |
|------|----------|
| Destination returns 0 flights | `_write_route_incremental` returns early — no row created. Route only appears if at least one priced flight found. |
| Scan is deleted mid-run | `DELETE CASCADE` on `scans` removes routes/flights automatically. Progress callback write will fail with FK error, caught by `except` block and logged. |
| Scan fails mid-run | Routes written so far remain in DB. Status set to `failed`. UI will show partial results with `failed` badge — acceptable. |
| DB write error in callback | Logged, does not crash the scan. Query continues, flight data lost for that callback. |
| Existing scans (pre-feature) | No impact. Migration adds index but doesn't change old data (all complete scans already have 1 row per destination). |
---
## 8. Migration Plan
1. **`database/schema.sql`**: Add `CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest`.
2. **`database/init_db.py`**: Add `_migrate_add_routes_unique_index()` + call it in `initialize_database()`.
3. **`scan_processor.py`**: Add `_write_route_incremental()` helper; update `progress_callback` closure; remove bulk-write Phase 2.
4. **`searcher_v3.py`**: Pass `flights=` kwarg to both successful callback invocations.
**Migration is backward-safe:** The UNIQUE index is added with `IF NOT EXISTS`. Existing `completed` scans already have at most 1 route row per destination — the index creation will succeed without errors.
**No API changes:** `/scans/:id/routes` endpoint already returns live data from the `routes` table. The frontend polling already works.
---
## 9. Rollback
To revert: remove the `flights=` kwarg from `searcher_v3.py` callbacks, restore the bulk-write Phase 2 in `scan_processor.py`, and remove `_write_route_incremental`. The UNIQUE index can remain — it only adds a constraint that is naturally satisfied by the bulk-write approach anyway.
---
## 10. Testing Plan
### Unit Tests (new)
1. `test_write_route_incremental_new` — first call creates route row
2. `test_write_route_incremental_merge` — second call updates stats correctly
3. `test_write_route_incremental_no_prices` — empty-price flights produce no row
4. `test_write_route_incremental_airlines_merge` — duplicate airlines deduplicated
5. `test_weighted_average_formula` — verify avg formula with known numbers
### Integration Tests (extend existing)
6. Extend `test_scan_lifecycle` — poll routes every 0.1s during mock scan, verify routes appear before completion
7. `test_incremental_writes_idempotent` — simulate same callback called twice for same destination
8. `test_unique_index_exists` — verify migration creates index
9. `test_migration_collapses_duplicates` — seed duplicate route rows, run migration, verify collapsed
---
## 11. Out of Scope
- WebSocket or SSE for push-based updates (polling already works)
- Frontend changes (none needed)
- Real-time price charts
- Partial scan resume after crash
- `total_flights` trigger removal (keep for consistency)