Routes and individual flights are now written to the database as each query result arrives, rather than after all queries finish. The frontend already polls /scans/:id/routes while status=running, so routes appear progressively with no frontend changes needed. Changes: - database/schema.sql: UNIQUE INDEX uq_routes_scan_dest(scan_id, destination) - database/init_db.py: _migrate_add_routes_unique_index() migration - scan_processor.py: _write_route_incremental() helper; progress_callback now writes routes/flights immediately; Phase 2 bulk-write replaced with a lightweight totals query - searcher_v3.py: pass flights= kwarg to progress_callback on cache_hit and api_success paths Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
428 lines
17 KiB
Markdown
428 lines
17 KiB
Markdown
# PRD: Live Routes During Active Scan
|
||
|
||
**Status:** Draft
|
||
**Date:** 2026-02-27
|
||
**Scope:** Backend only — no frontend or API contract changes required
|
||
|
||
---
|
||
|
||
## 1. Problem
|
||
|
||
Routes and flights are only visible in the UI **after a scan fully completes**. For large scans (e.g., 30 destinations × 14 days = 420 queries), users stare at a progress bar with zero actionable data for potentially many minutes.
|
||
|
||
The root cause is a two-phase waterfall in `scan_processor.py`:
|
||
|
||
- **Phase 1 (line 170):** `await search_multiple_routes(...)` — `asyncio.gather()` waits for **all** queries to finish before returning anything.
|
||
- **Phase 2 (line 197):** Bulk `INSERT INTO routes` and `INSERT INTO flights` only after Phase 1 completes.
|
||
|
||
---
|
||
|
||
## 2. Goal
|
||
|
||
Write each destination's route row and its individual flight rows to the database **as soon as that destination's results arrive**, rather than after all queries finish.
|
||
|
||
The frontend already polls `/scans/:id/routes` on an interval while status is `running`. No frontend changes are needed — routes will simply appear progressively.
|
||
|
||
---
|
||
|
||
## 3. Desired Behavior
|
||
|
||
| Moment | Before fix | After fix |
|
||
|--------|-----------|-----------|
|
||
| Query 1 completes (BER → MUC) | Nothing visible | MUC route + flights appear in UI |
|
||
| 50% through scan | 0 routes in UI | ~50% of routes visible |
|
||
| Scan completes | All routes appear at once | No change (already visible) |
|
||
|
||
---
|
||
|
||
## 4. Architecture Analysis
|
||
|
||
### 4.1 Event Loop Threading
|
||
|
||
`progress_callback` is called from within `search_direct_flights` (`searcher_v3.py`):
|
||
|
||
- **Cache hit** (line 121): called directly in the async coroutine → **event loop thread**
|
||
- **API success** (line 143): called after `await asyncio.to_thread()` returns → **event loop thread**
|
||
- **Error** (line 159): same → **event loop thread**
|
||
|
||
All callback invocations happen on the **single asyncio event loop thread**. No locking is needed. The existing `progress_callback` in `scan_processor.py` already opens and closes a fresh `get_connection()` per call — we reuse the same pattern.
|
||
|
||
### 4.2 Why Multiple Callbacks Fire Per Destination
|
||
|
||
For a scan covering 14 dates, a single destination (e.g., MUC) gets 14 separate queries:
|
||
- `(BER, MUC, 2026-03-01)`, `(BER, MUC, 2026-03-02)`, ... `(BER, MUC, 2026-03-14)`
|
||
|
||
Each query completes independently and fires `progress_callback`. The incremental write logic must **merge** successive results for the same destination into a single route row.
|
||
|
||
### 4.3 Required Schema Change
|
||
|
||
`routes` has no UNIQUE constraint on `(scan_id, destination)`. We need one to support upsert semantics (detect "route already started" vs "first result for this destination").
|
||
|
||
Current schema (no uniqueness):
|
||
```sql
|
||
CREATE INDEX IF NOT EXISTS idx_routes_scan_id ON routes(scan_id);
|
||
```
|
||
|
||
Required addition:
|
||
```sql
|
||
CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest ON routes(scan_id, destination);
|
||
```
|
||
|
||
---
|
||
|
||
## 5. Changes Required
|
||
|
||
### 5.1 `database/schema.sql`
|
||
|
||
Add a unique index at the end of the routes indexes block:
|
||
|
||
```sql
|
||
-- Unique constraint: one route row per (scan, destination)
|
||
CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest
|
||
ON routes(scan_id, destination);
|
||
```
|
||
|
||
### 5.2 `database/init_db.py`
|
||
|
||
Add a migration function to create the index on existing databases where it doesn't yet exist. Called before `executescript(schema_sql)`.
|
||
|
||
```python
|
||
def _migrate_add_routes_unique_index(conn, verbose=True):
|
||
"""
|
||
Migration: Add UNIQUE index on routes(scan_id, destination).
|
||
|
||
Required for incremental route writes during active scans.
|
||
The index enables upsert (INSERT + UPDATE on conflict) semantics.
|
||
|
||
Safe to run on existing data: if any (scan_id, destination) duplicates
|
||
exist, we collapse them first (keep the row with more flights).
|
||
"""
|
||
cursor = conn.execute("""
|
||
SELECT name FROM sqlite_master
|
||
WHERE type='index' AND name='uq_routes_scan_dest'
|
||
""")
|
||
if cursor.fetchone():
|
||
return # Already migrated
|
||
|
||
if verbose:
|
||
print(" 🔄 Migrating routes table: adding unique index on (scan_id, destination)...")
|
||
|
||
# Collapse any existing duplicates (completed scans may have none,
|
||
# but guard against edge cases).
|
||
conn.execute("""
|
||
DELETE FROM routes
|
||
WHERE id NOT IN (
|
||
SELECT MIN(id)
|
||
FROM routes
|
||
GROUP BY scan_id, destination
|
||
)
|
||
""")
|
||
|
||
# Create the unique index
|
||
conn.execute("""
|
||
CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest
|
||
ON routes(scan_id, destination)
|
||
""")
|
||
conn.commit()
|
||
|
||
if verbose:
|
||
print(" ✅ Migration complete: uq_routes_scan_dest index created")
|
||
```
|
||
|
||
Wire it into `initialize_database()` before `executescript`:
|
||
|
||
```python
|
||
# Apply migrations before running schema
|
||
_migrate_relax_country_constraint(conn, verbose)
|
||
_migrate_add_routes_unique_index(conn, verbose) # ← ADD THIS
|
||
|
||
# Load and execute schema
|
||
schema_sql = load_schema()
|
||
```
|
||
|
||
### 5.3 `scan_processor.py` — Extend `progress_callback`
|
||
|
||
#### Current signature (line 138):
|
||
```python
|
||
def progress_callback(origin: str, destination: str, date: str,
|
||
status: str, count: int, error: str = None):
|
||
```
|
||
|
||
#### New signature:
|
||
```python
|
||
def progress_callback(origin: str, destination: str, date: str,
|
||
status: str, count: int, error: str = None,
|
||
flights: list = None):
|
||
```
|
||
|
||
#### New helper function (add before `process_scan`):
|
||
|
||
```python
|
||
def _write_route_incremental(scan_id: int, destination: str,
|
||
dest_name: str, dest_city: str,
|
||
new_flights: list):
|
||
"""
|
||
Write or update a route row and its flight rows incrementally.
|
||
|
||
Called from progress_callback each time a query for (scan_id, destination)
|
||
returns results. Merges into the existing route row if one already exists.
|
||
|
||
Uses a read-then-write pattern so airlines JSON arrays can be merged in
|
||
Python rather than in SQLite (SQLite has no native JSON array merge).
|
||
|
||
Safe to call from the event loop thread — opens its own connection.
|
||
"""
|
||
if not new_flights:
|
||
return
|
||
|
||
prices = [f.get('price') for f in new_flights if f.get('price')]
|
||
if not prices:
|
||
return # No priced flights — nothing to aggregate
|
||
|
||
new_airlines = list({f.get('airline') for f in new_flights if f.get('airline')})
|
||
new_count = len(new_flights)
|
||
new_min = min(prices)
|
||
new_max = max(prices)
|
||
new_avg = sum(prices) / len(prices)
|
||
|
||
try:
|
||
conn = get_connection()
|
||
cursor = conn.cursor()
|
||
|
||
# Check if a route row already exists for this destination
|
||
cursor.execute("""
|
||
SELECT id, flight_count, min_price, max_price, avg_price, airlines
|
||
FROM routes
|
||
WHERE scan_id = ? AND destination = ?
|
||
""", (scan_id, destination))
|
||
existing = cursor.fetchone()
|
||
|
||
if existing is None:
|
||
# First result for this destination — INSERT
|
||
cursor.execute("""
|
||
INSERT INTO routes (
|
||
scan_id, destination, destination_name, destination_city,
|
||
flight_count, airlines, min_price, max_price, avg_price
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (
|
||
scan_id, destination, dest_name, dest_city,
|
||
new_count, json.dumps(new_airlines),
|
||
new_min, new_max, new_avg,
|
||
))
|
||
else:
|
||
# Subsequent result — merge stats
|
||
old_count = existing['flight_count'] or 0
|
||
old_min = existing['min_price']
|
||
old_max = existing['max_price']
|
||
old_avg = existing['avg_price'] or 0.0
|
||
old_airlines = json.loads(existing['airlines']) if existing['airlines'] else []
|
||
|
||
merged_count = old_count + new_count
|
||
merged_min = min(old_min, new_min) if old_min is not None else new_min
|
||
merged_max = max(old_max, new_max) if old_max is not None else new_max
|
||
# Running weighted average
|
||
merged_avg = (old_avg * old_count + new_avg * new_count) / merged_count
|
||
merged_airlines = json.dumps(list(set(old_airlines) | set(new_airlines)))
|
||
|
||
cursor.execute("""
|
||
UPDATE routes
|
||
SET flight_count = ?,
|
||
min_price = ?,
|
||
max_price = ?,
|
||
avg_price = ?,
|
||
airlines = ?
|
||
WHERE scan_id = ? AND destination = ?
|
||
""", (
|
||
merged_count, merged_min, merged_max, merged_avg, merged_airlines,
|
||
scan_id, destination,
|
||
))
|
||
|
||
# INSERT individual flight rows (always new rows; duplicates not expected
|
||
# because each (scan_id, destination, date) query fires exactly once)
|
||
for flight in new_flights:
|
||
if not flight.get('price'):
|
||
continue
|
||
cursor.execute("""
|
||
INSERT INTO flights (
|
||
scan_id, destination, date, airline,
|
||
departure_time, arrival_time, price, stops
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (
|
||
scan_id,
|
||
destination,
|
||
flight.get('date', ''), # date passed from caller
|
||
flight.get('airline'),
|
||
flight.get('departure_time'),
|
||
flight.get('arrival_time'),
|
||
flight.get('price'),
|
||
flight.get('stops', 0),
|
||
))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Scan {scan_id}] Failed to write incremental route {destination}: {e}")
|
||
```
|
||
|
||
**Note on flight date:** Individual flights must carry their `date` when passed to `_write_route_incremental`. The callback receives `date` as a parameter, so attach it to each flight dict before passing: `f['date'] = date`.
|
||
|
||
#### Updated `progress_callback` inside `process_scan`:
|
||
|
||
```python
|
||
def progress_callback(origin: str, destination: str, date: str,
|
||
status: str, count: int, error: str = None,
|
||
flights: list = None):
|
||
nonlocal routes_scanned_count
|
||
|
||
if status in ('cache_hit', 'api_success', 'error'):
|
||
routes_scanned_count += 1
|
||
|
||
# Write route + flights immediately if we have results
|
||
if flights and status in ('cache_hit', 'api_success'):
|
||
# Annotate each flight with its query date
|
||
for f in flights:
|
||
f['date'] = date
|
||
dest_info = next((d for d in destinations if d['iata'] == destination), None)
|
||
dest_name = dest_info.get('name', destination) if dest_info else destination
|
||
dest_city = dest_info.get('city', '') if dest_info else ''
|
||
_write_route_incremental(scan_id, destination, dest_name, dest_city, flights)
|
||
|
||
# Update progress counter
|
||
try:
|
||
progress_conn = get_connection()
|
||
progress_cursor = progress_conn.cursor()
|
||
progress_cursor.execute("""
|
||
UPDATE scans
|
||
SET routes_scanned = routes_scanned + 1,
|
||
updated_at = CURRENT_TIMESTAMP
|
||
WHERE id = ?
|
||
""", (scan_id,))
|
||
progress_conn.commit()
|
||
progress_conn.close()
|
||
|
||
if routes_scanned_count % 10 == 0:
|
||
logger.info(f"[Scan {scan_id}] Progress: {routes_scanned_count}/{len(routes_to_scan)} routes ({status}: {origin}→{destination})")
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Scan {scan_id}] Failed to update progress: {str(e)}")
|
||
```
|
||
|
||
#### Replace Phase 2 (lines 182–262 in `process_scan`):
|
||
|
||
Remove the existing bulk-write block. Replace with a lightweight totals-only block:
|
||
|
||
```python
|
||
# Wait for all queries to complete
|
||
results = await search_multiple_routes(
|
||
routes=routes_to_scan,
|
||
seat_class=seat_class or 'economy',
|
||
adults=adults or 1,
|
||
use_cache=True,
|
||
cache_threshold_hours=24,
|
||
max_workers=3,
|
||
progress_callback=progress_callback
|
||
)
|
||
|
||
logger.info(f"[Scan {scan_id}] All queries complete. Finalizing scan...")
|
||
|
||
# Count total flights (routes already written by callback)
|
||
total_flights = sum(len(flights) for flights in results.values())
|
||
routes_saved = cursor.execute(
|
||
"SELECT COUNT(*) FROM routes WHERE scan_id = ?", (scan_id,)
|
||
).fetchone()[0]
|
||
|
||
logger.info(f"[Scan {scan_id}] ✅ {routes_saved} routes, {total_flights} flights")
|
||
```
|
||
|
||
### 5.4 `searcher_v3.py` — Pass `flights` to callback
|
||
|
||
In `search_direct_flights`, update both callback calls to pass `flights=`:
|
||
|
||
**Cache hit (line 121):**
|
||
```python
|
||
if progress_callback:
|
||
progress_callback(origin, destination, date, "cache_hit", len(cached), flights=cached)
|
||
return cached
|
||
```
|
||
|
||
**API success (line 143):**
|
||
```python
|
||
if progress_callback:
|
||
progress_callback(origin, destination, date, "api_success", len(result), flights=result)
|
||
```
|
||
|
||
The `error` callback (line 159) does not need `flights=` since there are no results.
|
||
|
||
---
|
||
|
||
## 6. Concurrency & Safety
|
||
|
||
| Concern | Analysis | Verdict |
|
||
|---------|----------|---------|
|
||
| Simultaneous writes to same route row | Cannot happen: the event loop is single-threaded; callbacks are not concurrent. `asyncio.gather` runs tasks concurrently but yields at `await` points, and callbacks fire synchronously after each `await to_thread` returns. | ✅ Safe |
|
||
| Two callbacks for same destination arriving "simultaneously" | Impossible in single-threaded event loop. Second callback blocks until first completes (no `await` in callback). | ✅ Safe |
|
||
| Duplicate flight rows | Each `(scan_id, destination, date)` query fires exactly once; its flights are written exactly once. | ✅ No duplicates |
|
||
| `total_flights` trigger still fires correctly | SQLite triggers on `INSERT INTO routes` and `UPDATE OF flight_count ON routes` fire for each incremental write — counts stay accurate. | ✅ Works |
|
||
| Scan completion `total_flights` update | Still set explicitly at completion from `results` dict count — redundant but harmless. | ✅ OK |
|
||
|
||
---
|
||
|
||
## 7. Edge Cases
|
||
|
||
| Case | Handling |
|
||
|------|----------|
|
||
| Destination returns 0 flights | `_write_route_incremental` returns early — no row created. Route only appears if at least one priced flight found. |
|
||
| Scan is deleted mid-run | `DELETE CASCADE` on `scans` removes routes/flights automatically. Progress callback write will fail with FK error, caught by `except` block and logged. |
|
||
| Scan fails mid-run | Routes written so far remain in DB. Status set to `failed`. UI will show partial results with `failed` badge — acceptable. |
|
||
| DB write error in callback | Logged, does not crash the scan. Query continues, flight data lost for that callback. |
|
||
| Existing scans (pre-feature) | No impact. Migration adds index but doesn't change old data (all complete scans already have 1 row per destination). |
|
||
|
||
---
|
||
|
||
## 8. Migration Plan
|
||
|
||
1. **`database/schema.sql`**: Add `CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest`.
|
||
2. **`database/init_db.py`**: Add `_migrate_add_routes_unique_index()` + call it in `initialize_database()`.
|
||
3. **`scan_processor.py`**: Add `_write_route_incremental()` helper; update `progress_callback` closure; remove bulk-write Phase 2.
|
||
4. **`searcher_v3.py`**: Pass `flights=` kwarg to both successful callback invocations.
|
||
|
||
**Migration is backward-safe:** The UNIQUE index is added with `IF NOT EXISTS`. Existing `completed` scans already have at most 1 route row per destination — the index creation will succeed without errors.
|
||
|
||
**No API changes:** `/scans/:id/routes` endpoint already returns live data from the `routes` table. The frontend polling already works.
|
||
|
||
---
|
||
|
||
## 9. Rollback
|
||
|
||
To revert: remove the `flights=` kwarg from `searcher_v3.py` callbacks, restore the bulk-write Phase 2 in `scan_processor.py`, and remove `_write_route_incremental`. The UNIQUE index can remain — it only adds a constraint that is naturally satisfied by the bulk-write approach anyway.
|
||
|
||
---
|
||
|
||
## 10. Testing Plan
|
||
|
||
### Unit Tests (new)
|
||
|
||
1. `test_write_route_incremental_new` — first call creates route row
|
||
2. `test_write_route_incremental_merge` — second call updates stats correctly
|
||
3. `test_write_route_incremental_no_prices` — empty-price flights produce no row
|
||
4. `test_write_route_incremental_airlines_merge` — duplicate airlines deduplicated
|
||
5. `test_weighted_average_formula` — verify avg formula with known numbers
|
||
|
||
### Integration Tests (extend existing)
|
||
|
||
6. Extend `test_scan_lifecycle` — poll routes every 0.1s during mock scan, verify routes appear before completion
|
||
7. `test_incremental_writes_idempotent` — simulate same callback called twice for same destination
|
||
8. `test_unique_index_exists` — verify migration creates index
|
||
9. `test_migration_collapses_duplicates` — seed duplicate route rows, run migration, verify collapsed
|
||
|
||
---
|
||
|
||
## 11. Out of Scope
|
||
|
||
- WebSocket or SSE for push-based updates (polling already works)
|
||
- Frontend changes (none needed)
|
||
- Real-time price charts
|
||
- Partial scan resume after crash
|
||
- `total_flights` trigger removal (keep for consistency)
|