feat: write routes live during scan instead of bulk-insert at completion

Routes and individual flights are now written to the database as each
query result arrives, rather than after all queries finish. The frontend
already polls /scans/:id/routes while status=running, so routes appear
progressively with no frontend changes needed.

Changes:
- database/schema.sql: UNIQUE INDEX uq_routes_scan_dest(scan_id, destination)
- database/init_db.py: _migrate_add_routes_unique_index() migration
- scan_processor.py: _write_route_incremental() helper; progress_callback
  now writes routes/flights immediately; Phase 2 bulk-write replaced with
  a lightweight totals query
- searcher_v3.py: pass flights= kwarg to progress_callback on cache_hit
  and api_success paths

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-27 20:53:04 +01:00
parent 4926e89e46
commit ce1cf667d2
5 changed files with 592 additions and 93 deletions

View File

@@ -0,0 +1,427 @@
# PRD: Live Routes During Active Scan
**Status:** Draft
**Date:** 2026-02-27
**Scope:** Backend only — no frontend or API contract changes required
---
## 1. Problem
Routes and flights are only visible in the UI **after a scan fully completes**. For large scans (e.g., 30 destinations × 14 days = 420 queries), users stare at a progress bar with zero actionable data for potentially many minutes.
The root cause is a two-phase waterfall in `scan_processor.py`:
- **Phase 1 (line 170):** `await search_multiple_routes(...)``asyncio.gather()` waits for **all** queries to finish before returning anything.
- **Phase 2 (line 197):** Bulk `INSERT INTO routes` and `INSERT INTO flights` only after Phase 1 completes.
---
## 2. Goal
Write each destination's route row and its individual flight rows to the database **as soon as that destination's results arrive**, rather than after all queries finish.
The frontend already polls `/scans/:id/routes` on an interval while status is `running`. No frontend changes are needed — routes will simply appear progressively.
---
## 3. Desired Behavior
| Moment | Before fix | After fix |
|--------|-----------|-----------|
| Query 1 completes (BER → MUC) | Nothing visible | MUC route + flights appear in UI |
| 50% through scan | 0 routes in UI | ~50% of routes visible |
| Scan completes | All routes appear at once | No change (already visible) |
---
## 4. Architecture Analysis
### 4.1 Event Loop Threading
`progress_callback` is called from within `search_direct_flights` (`searcher_v3.py`):
- **Cache hit** (line 121): called directly in the async coroutine → **event loop thread**
- **API success** (line 143): called after `await asyncio.to_thread()` returns → **event loop thread**
- **Error** (line 159): same → **event loop thread**
All callback invocations happen on the **single asyncio event loop thread**. No locking is needed. The existing `progress_callback` in `scan_processor.py` already opens and closes a fresh `get_connection()` per call — we reuse the same pattern.
### 4.2 Why Multiple Callbacks Fire Per Destination
For a scan covering 14 dates, a single destination (e.g., MUC) gets 14 separate queries:
- `(BER, MUC, 2026-03-01)`, `(BER, MUC, 2026-03-02)`, ... `(BER, MUC, 2026-03-14)`
Each query completes independently and fires `progress_callback`. The incremental write logic must **merge** successive results for the same destination into a single route row.
### 4.3 Required Schema Change
`routes` has no UNIQUE constraint on `(scan_id, destination)`. We need one to support upsert semantics (detect "route already started" vs "first result for this destination").
Current schema (no uniqueness):
```sql
CREATE INDEX IF NOT EXISTS idx_routes_scan_id ON routes(scan_id);
```
Required addition:
```sql
CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest ON routes(scan_id, destination);
```
---
## 5. Changes Required
### 5.1 `database/schema.sql`
Add a unique index at the end of the routes indexes block:
```sql
-- Unique constraint: one route row per (scan, destination)
CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest
ON routes(scan_id, destination);
```
### 5.2 `database/init_db.py`
Add a migration function to create the index on existing databases where it doesn't yet exist. Called before `executescript(schema_sql)`.
```python
def _migrate_add_routes_unique_index(conn, verbose=True):
"""
Migration: Add UNIQUE index on routes(scan_id, destination).
Required for incremental route writes during active scans.
The index enables upsert (INSERT + UPDATE on conflict) semantics.
Safe to run on existing data: if any (scan_id, destination) duplicates
exist, we collapse them first (keep the row with more flights).
"""
cursor = conn.execute("""
SELECT name FROM sqlite_master
WHERE type='index' AND name='uq_routes_scan_dest'
""")
if cursor.fetchone():
return # Already migrated
if verbose:
print(" 🔄 Migrating routes table: adding unique index on (scan_id, destination)...")
# Collapse any existing duplicates (completed scans may have none,
# but guard against edge cases).
conn.execute("""
DELETE FROM routes
WHERE id NOT IN (
SELECT MIN(id)
FROM routes
GROUP BY scan_id, destination
)
""")
# Create the unique index
conn.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest
ON routes(scan_id, destination)
""")
conn.commit()
if verbose:
print(" ✅ Migration complete: uq_routes_scan_dest index created")
```
Wire it into `initialize_database()` before `executescript`:
```python
# Apply migrations before running schema
_migrate_relax_country_constraint(conn, verbose)
_migrate_add_routes_unique_index(conn, verbose) # ← ADD THIS
# Load and execute schema
schema_sql = load_schema()
```
### 5.3 `scan_processor.py` — Extend `progress_callback`
#### Current signature (line 138):
```python
def progress_callback(origin: str, destination: str, date: str,
status: str, count: int, error: str = None):
```
#### New signature:
```python
def progress_callback(origin: str, destination: str, date: str,
status: str, count: int, error: str = None,
flights: list = None):
```
#### New helper function (add before `process_scan`):
```python
def _write_route_incremental(scan_id: int, destination: str,
dest_name: str, dest_city: str,
new_flights: list):
"""
Write or update a route row and its flight rows incrementally.
Called from progress_callback each time a query for (scan_id, destination)
returns results. Merges into the existing route row if one already exists.
Uses a read-then-write pattern so airlines JSON arrays can be merged in
Python rather than in SQLite (SQLite has no native JSON array merge).
Safe to call from the event loop thread — opens its own connection.
"""
if not new_flights:
return
prices = [f.get('price') for f in new_flights if f.get('price')]
if not prices:
return # No priced flights — nothing to aggregate
new_airlines = list({f.get('airline') for f in new_flights if f.get('airline')})
new_count = len(new_flights)
new_min = min(prices)
new_max = max(prices)
new_avg = sum(prices) / len(prices)
try:
conn = get_connection()
cursor = conn.cursor()
# Check if a route row already exists for this destination
cursor.execute("""
SELECT id, flight_count, min_price, max_price, avg_price, airlines
FROM routes
WHERE scan_id = ? AND destination = ?
""", (scan_id, destination))
existing = cursor.fetchone()
if existing is None:
# First result for this destination — INSERT
cursor.execute("""
INSERT INTO routes (
scan_id, destination, destination_name, destination_city,
flight_count, airlines, min_price, max_price, avg_price
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
scan_id, destination, dest_name, dest_city,
new_count, json.dumps(new_airlines),
new_min, new_max, new_avg,
))
else:
# Subsequent result — merge stats
old_count = existing['flight_count'] or 0
old_min = existing['min_price']
old_max = existing['max_price']
old_avg = existing['avg_price'] or 0.0
old_airlines = json.loads(existing['airlines']) if existing['airlines'] else []
merged_count = old_count + new_count
merged_min = min(old_min, new_min) if old_min is not None else new_min
merged_max = max(old_max, new_max) if old_max is not None else new_max
# Running weighted average
merged_avg = (old_avg * old_count + new_avg * new_count) / merged_count
merged_airlines = json.dumps(list(set(old_airlines) | set(new_airlines)))
cursor.execute("""
UPDATE routes
SET flight_count = ?,
min_price = ?,
max_price = ?,
avg_price = ?,
airlines = ?
WHERE scan_id = ? AND destination = ?
""", (
merged_count, merged_min, merged_max, merged_avg, merged_airlines,
scan_id, destination,
))
# INSERT individual flight rows (always new rows; duplicates not expected
# because each (scan_id, destination, date) query fires exactly once)
for flight in new_flights:
if not flight.get('price'):
continue
cursor.execute("""
INSERT INTO flights (
scan_id, destination, date, airline,
departure_time, arrival_time, price, stops
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
scan_id,
destination,
flight.get('date', ''), # date passed from caller
flight.get('airline'),
flight.get('departure_time'),
flight.get('arrival_time'),
flight.get('price'),
flight.get('stops', 0),
))
conn.commit()
conn.close()
except Exception as e:
logger.error(f"[Scan {scan_id}] Failed to write incremental route {destination}: {e}")
```
**Note on flight date:** Individual flights must carry their `date` when passed to `_write_route_incremental`. The callback receives `date` as a parameter, so attach it to each flight dict before passing: `f['date'] = date`.
#### Updated `progress_callback` inside `process_scan`:
```python
def progress_callback(origin: str, destination: str, date: str,
status: str, count: int, error: str = None,
flights: list = None):
nonlocal routes_scanned_count
if status in ('cache_hit', 'api_success', 'error'):
routes_scanned_count += 1
# Write route + flights immediately if we have results
if flights and status in ('cache_hit', 'api_success'):
# Annotate each flight with its query date
for f in flights:
f['date'] = date
dest_info = next((d for d in destinations if d['iata'] == destination), None)
dest_name = dest_info.get('name', destination) if dest_info else destination
dest_city = dest_info.get('city', '') if dest_info else ''
_write_route_incremental(scan_id, destination, dest_name, dest_city, flights)
# Update progress counter
try:
progress_conn = get_connection()
progress_cursor = progress_conn.cursor()
progress_cursor.execute("""
UPDATE scans
SET routes_scanned = routes_scanned + 1,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (scan_id,))
progress_conn.commit()
progress_conn.close()
if routes_scanned_count % 10 == 0:
logger.info(f"[Scan {scan_id}] Progress: {routes_scanned_count}/{len(routes_to_scan)} routes ({status}: {origin}{destination})")
except Exception as e:
logger.error(f"[Scan {scan_id}] Failed to update progress: {str(e)}")
```
#### Replace Phase 2 (lines 182262 in `process_scan`):
Remove the existing bulk-write block. Replace with a lightweight totals-only block:
```python
# Wait for all queries to complete
results = await search_multiple_routes(
routes=routes_to_scan,
seat_class=seat_class or 'economy',
adults=adults or 1,
use_cache=True,
cache_threshold_hours=24,
max_workers=3,
progress_callback=progress_callback
)
logger.info(f"[Scan {scan_id}] All queries complete. Finalizing scan...")
# Count total flights (routes already written by callback)
total_flights = sum(len(flights) for flights in results.values())
routes_saved = cursor.execute(
"SELECT COUNT(*) FROM routes WHERE scan_id = ?", (scan_id,)
).fetchone()[0]
logger.info(f"[Scan {scan_id}] ✅ {routes_saved} routes, {total_flights} flights")
```
### 5.4 `searcher_v3.py` — Pass `flights` to callback
In `search_direct_flights`, update both callback calls to pass `flights=`:
**Cache hit (line 121):**
```python
if progress_callback:
progress_callback(origin, destination, date, "cache_hit", len(cached), flights=cached)
return cached
```
**API success (line 143):**
```python
if progress_callback:
progress_callback(origin, destination, date, "api_success", len(result), flights=result)
```
The `error` callback (line 159) does not need `flights=` since there are no results.
---
## 6. Concurrency & Safety
| Concern | Analysis | Verdict |
|---------|----------|---------|
| Simultaneous writes to same route row | Cannot happen: the event loop is single-threaded; callbacks are not concurrent. `asyncio.gather` runs tasks concurrently but yields at `await` points, and callbacks fire synchronously after each `await to_thread` returns. | ✅ Safe |
| Two callbacks for same destination arriving "simultaneously" | Impossible in single-threaded event loop. Second callback blocks until first completes (no `await` in callback). | ✅ Safe |
| Duplicate flight rows | Each `(scan_id, destination, date)` query fires exactly once; its flights are written exactly once. | ✅ No duplicates |
| `total_flights` trigger still fires correctly | SQLite triggers on `INSERT INTO routes` and `UPDATE OF flight_count ON routes` fire for each incremental write — counts stay accurate. | ✅ Works |
| Scan completion `total_flights` update | Still set explicitly at completion from `results` dict count — redundant but harmless. | ✅ OK |
---
## 7. Edge Cases
| Case | Handling |
|------|----------|
| Destination returns 0 flights | `_write_route_incremental` returns early — no row created. Route only appears if at least one priced flight found. |
| Scan is deleted mid-run | `DELETE CASCADE` on `scans` removes routes/flights automatically. Progress callback write will fail with FK error, caught by `except` block and logged. |
| Scan fails mid-run | Routes written so far remain in DB. Status set to `failed`. UI will show partial results with `failed` badge — acceptable. |
| DB write error in callback | Logged, does not crash the scan. Query continues, flight data lost for that callback. |
| Existing scans (pre-feature) | No impact. Migration adds index but doesn't change old data (all complete scans already have 1 row per destination). |
---
## 8. Migration Plan
1. **`database/schema.sql`**: Add `CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest`.
2. **`database/init_db.py`**: Add `_migrate_add_routes_unique_index()` + call it in `initialize_database()`.
3. **`scan_processor.py`**: Add `_write_route_incremental()` helper; update `progress_callback` closure; remove bulk-write Phase 2.
4. **`searcher_v3.py`**: Pass `flights=` kwarg to both successful callback invocations.
**Migration is backward-safe:** The UNIQUE index is added with `IF NOT EXISTS`. Existing `completed` scans already have at most 1 route row per destination — the index creation will succeed without errors.
**No API changes:** `/scans/:id/routes` endpoint already returns live data from the `routes` table. The frontend polling already works.
---
## 9. Rollback
To revert: remove the `flights=` kwarg from `searcher_v3.py` callbacks, restore the bulk-write Phase 2 in `scan_processor.py`, and remove `_write_route_incremental`. The UNIQUE index can remain — it only adds a constraint that is naturally satisfied by the bulk-write approach anyway.
---
## 10. Testing Plan
### Unit Tests (new)
1. `test_write_route_incremental_new` — first call creates route row
2. `test_write_route_incremental_merge` — second call updates stats correctly
3. `test_write_route_incremental_no_prices` — empty-price flights produce no row
4. `test_write_route_incremental_airlines_merge` — duplicate airlines deduplicated
5. `test_weighted_average_formula` — verify avg formula with known numbers
### Integration Tests (extend existing)
6. Extend `test_scan_lifecycle` — poll routes every 0.1s during mock scan, verify routes appear before completion
7. `test_incremental_writes_idempotent` — simulate same callback called twice for same destination
8. `test_unique_index_exists` — verify migration creates index
9. `test_migration_collapses_duplicates` — seed duplicate route rows, run migration, verify collapsed
---
## 11. Out of Scope
- WebSocket or SSE for push-based updates (polling already works)
- Frontend changes (none needed)
- Real-time price charts
- Partial scan resume after crash
- `total_flights` trigger removal (keep for consistency)