diff --git a/flight-comparator/PRD_LIVE_ROUTES.md b/flight-comparator/PRD_LIVE_ROUTES.md new file mode 100644 index 0000000..7cece81 --- /dev/null +++ b/flight-comparator/PRD_LIVE_ROUTES.md @@ -0,0 +1,427 @@ +# PRD: Live Routes During Active Scan + +**Status:** Draft +**Date:** 2026-02-27 +**Scope:** Backend only — no frontend or API contract changes required + +--- + +## 1. Problem + +Routes and flights are only visible in the UI **after a scan fully completes**. For large scans (e.g., 30 destinations × 14 days = 420 queries), users stare at a progress bar with zero actionable data for potentially many minutes. + +The root cause is a two-phase waterfall in `scan_processor.py`: + +- **Phase 1 (line 170):** `await search_multiple_routes(...)` — `asyncio.gather()` waits for **all** queries to finish before returning anything. +- **Phase 2 (line 197):** Bulk `INSERT INTO routes` and `INSERT INTO flights` only after Phase 1 completes. + +--- + +## 2. Goal + +Write each destination's route row and its individual flight rows to the database **as soon as that destination's results arrive**, rather than after all queries finish. + +The frontend already polls `/scans/:id/routes` on an interval while status is `running`. No frontend changes are needed — routes will simply appear progressively. + +--- + +## 3. Desired Behavior + +| Moment | Before fix | After fix | +|--------|-----------|-----------| +| Query 1 completes (BER → MUC) | Nothing visible | MUC route + flights appear in UI | +| 50% through scan | 0 routes in UI | ~50% of routes visible | +| Scan completes | All routes appear at once | No change (already visible) | + +--- + +## 4. Architecture Analysis + +### 4.1 Event Loop Threading + +`progress_callback` is called from within `search_direct_flights` (`searcher_v3.py`): + +- **Cache hit** (line 121): called directly in the async coroutine → **event loop thread** +- **API success** (line 143): called after `await asyncio.to_thread()` returns → **event loop thread** +- **Error** (line 159): same → **event loop thread** + +All callback invocations happen on the **single asyncio event loop thread**. No locking is needed. The existing `progress_callback` in `scan_processor.py` already opens and closes a fresh `get_connection()` per call — we reuse the same pattern. + +### 4.2 Why Multiple Callbacks Fire Per Destination + +For a scan covering 14 dates, a single destination (e.g., MUC) gets 14 separate queries: +- `(BER, MUC, 2026-03-01)`, `(BER, MUC, 2026-03-02)`, ... `(BER, MUC, 2026-03-14)` + +Each query completes independently and fires `progress_callback`. The incremental write logic must **merge** successive results for the same destination into a single route row. + +### 4.3 Required Schema Change + +`routes` has no UNIQUE constraint on `(scan_id, destination)`. We need one to support upsert semantics (detect "route already started" vs "first result for this destination"). + +Current schema (no uniqueness): +```sql +CREATE INDEX IF NOT EXISTS idx_routes_scan_id ON routes(scan_id); +``` + +Required addition: +```sql +CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest ON routes(scan_id, destination); +``` + +--- + +## 5. Changes Required + +### 5.1 `database/schema.sql` + +Add a unique index at the end of the routes indexes block: + +```sql +-- Unique constraint: one route row per (scan, destination) +CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest + ON routes(scan_id, destination); +``` + +### 5.2 `database/init_db.py` + +Add a migration function to create the index on existing databases where it doesn't yet exist. Called before `executescript(schema_sql)`. + +```python +def _migrate_add_routes_unique_index(conn, verbose=True): + """ + Migration: Add UNIQUE index on routes(scan_id, destination). + + Required for incremental route writes during active scans. + The index enables upsert (INSERT + UPDATE on conflict) semantics. + + Safe to run on existing data: if any (scan_id, destination) duplicates + exist, we collapse them first (keep the row with more flights). + """ + cursor = conn.execute(""" + SELECT name FROM sqlite_master + WHERE type='index' AND name='uq_routes_scan_dest' + """) + if cursor.fetchone(): + return # Already migrated + + if verbose: + print(" 🔄 Migrating routes table: adding unique index on (scan_id, destination)...") + + # Collapse any existing duplicates (completed scans may have none, + # but guard against edge cases). + conn.execute(""" + DELETE FROM routes + WHERE id NOT IN ( + SELECT MIN(id) + FROM routes + GROUP BY scan_id, destination + ) + """) + + # Create the unique index + conn.execute(""" + CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest + ON routes(scan_id, destination) + """) + conn.commit() + + if verbose: + print(" ✅ Migration complete: uq_routes_scan_dest index created") +``` + +Wire it into `initialize_database()` before `executescript`: + +```python +# Apply migrations before running schema +_migrate_relax_country_constraint(conn, verbose) +_migrate_add_routes_unique_index(conn, verbose) # ← ADD THIS + +# Load and execute schema +schema_sql = load_schema() +``` + +### 5.3 `scan_processor.py` — Extend `progress_callback` + +#### Current signature (line 138): +```python +def progress_callback(origin: str, destination: str, date: str, + status: str, count: int, error: str = None): +``` + +#### New signature: +```python +def progress_callback(origin: str, destination: str, date: str, + status: str, count: int, error: str = None, + flights: list = None): +``` + +#### New helper function (add before `process_scan`): + +```python +def _write_route_incremental(scan_id: int, destination: str, + dest_name: str, dest_city: str, + new_flights: list): + """ + Write or update a route row and its flight rows incrementally. + + Called from progress_callback each time a query for (scan_id, destination) + returns results. Merges into the existing route row if one already exists. + + Uses a read-then-write pattern so airlines JSON arrays can be merged in + Python rather than in SQLite (SQLite has no native JSON array merge). + + Safe to call from the event loop thread — opens its own connection. + """ + if not new_flights: + return + + prices = [f.get('price') for f in new_flights if f.get('price')] + if not prices: + return # No priced flights — nothing to aggregate + + new_airlines = list({f.get('airline') for f in new_flights if f.get('airline')}) + new_count = len(new_flights) + new_min = min(prices) + new_max = max(prices) + new_avg = sum(prices) / len(prices) + + try: + conn = get_connection() + cursor = conn.cursor() + + # Check if a route row already exists for this destination + cursor.execute(""" + SELECT id, flight_count, min_price, max_price, avg_price, airlines + FROM routes + WHERE scan_id = ? AND destination = ? + """, (scan_id, destination)) + existing = cursor.fetchone() + + if existing is None: + # First result for this destination — INSERT + cursor.execute(""" + INSERT INTO routes ( + scan_id, destination, destination_name, destination_city, + flight_count, airlines, min_price, max_price, avg_price + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + scan_id, destination, dest_name, dest_city, + new_count, json.dumps(new_airlines), + new_min, new_max, new_avg, + )) + else: + # Subsequent result — merge stats + old_count = existing['flight_count'] or 0 + old_min = existing['min_price'] + old_max = existing['max_price'] + old_avg = existing['avg_price'] or 0.0 + old_airlines = json.loads(existing['airlines']) if existing['airlines'] else [] + + merged_count = old_count + new_count + merged_min = min(old_min, new_min) if old_min is not None else new_min + merged_max = max(old_max, new_max) if old_max is not None else new_max + # Running weighted average + merged_avg = (old_avg * old_count + new_avg * new_count) / merged_count + merged_airlines = json.dumps(list(set(old_airlines) | set(new_airlines))) + + cursor.execute(""" + UPDATE routes + SET flight_count = ?, + min_price = ?, + max_price = ?, + avg_price = ?, + airlines = ? + WHERE scan_id = ? AND destination = ? + """, ( + merged_count, merged_min, merged_max, merged_avg, merged_airlines, + scan_id, destination, + )) + + # INSERT individual flight rows (always new rows; duplicates not expected + # because each (scan_id, destination, date) query fires exactly once) + for flight in new_flights: + if not flight.get('price'): + continue + cursor.execute(""" + INSERT INTO flights ( + scan_id, destination, date, airline, + departure_time, arrival_time, price, stops + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, ( + scan_id, + destination, + flight.get('date', ''), # date passed from caller + flight.get('airline'), + flight.get('departure_time'), + flight.get('arrival_time'), + flight.get('price'), + flight.get('stops', 0), + )) + + conn.commit() + conn.close() + + except Exception as e: + logger.error(f"[Scan {scan_id}] Failed to write incremental route {destination}: {e}") +``` + +**Note on flight date:** Individual flights must carry their `date` when passed to `_write_route_incremental`. The callback receives `date` as a parameter, so attach it to each flight dict before passing: `f['date'] = date`. + +#### Updated `progress_callback` inside `process_scan`: + +```python +def progress_callback(origin: str, destination: str, date: str, + status: str, count: int, error: str = None, + flights: list = None): + nonlocal routes_scanned_count + + if status in ('cache_hit', 'api_success', 'error'): + routes_scanned_count += 1 + + # Write route + flights immediately if we have results + if flights and status in ('cache_hit', 'api_success'): + # Annotate each flight with its query date + for f in flights: + f['date'] = date + dest_info = next((d for d in destinations if d['iata'] == destination), None) + dest_name = dest_info.get('name', destination) if dest_info else destination + dest_city = dest_info.get('city', '') if dest_info else '' + _write_route_incremental(scan_id, destination, dest_name, dest_city, flights) + + # Update progress counter + try: + progress_conn = get_connection() + progress_cursor = progress_conn.cursor() + progress_cursor.execute(""" + UPDATE scans + SET routes_scanned = routes_scanned + 1, + updated_at = CURRENT_TIMESTAMP + WHERE id = ? + """, (scan_id,)) + progress_conn.commit() + progress_conn.close() + + if routes_scanned_count % 10 == 0: + logger.info(f"[Scan {scan_id}] Progress: {routes_scanned_count}/{len(routes_to_scan)} routes ({status}: {origin}→{destination})") + + except Exception as e: + logger.error(f"[Scan {scan_id}] Failed to update progress: {str(e)}") +``` + +#### Replace Phase 2 (lines 182–262 in `process_scan`): + +Remove the existing bulk-write block. Replace with a lightweight totals-only block: + +```python +# Wait for all queries to complete +results = await search_multiple_routes( + routes=routes_to_scan, + seat_class=seat_class or 'economy', + adults=adults or 1, + use_cache=True, + cache_threshold_hours=24, + max_workers=3, + progress_callback=progress_callback +) + +logger.info(f"[Scan {scan_id}] All queries complete. Finalizing scan...") + +# Count total flights (routes already written by callback) +total_flights = sum(len(flights) for flights in results.values()) +routes_saved = cursor.execute( + "SELECT COUNT(*) FROM routes WHERE scan_id = ?", (scan_id,) +).fetchone()[0] + +logger.info(f"[Scan {scan_id}] ✅ {routes_saved} routes, {total_flights} flights") +``` + +### 5.4 `searcher_v3.py` — Pass `flights` to callback + +In `search_direct_flights`, update both callback calls to pass `flights=`: + +**Cache hit (line 121):** +```python +if progress_callback: + progress_callback(origin, destination, date, "cache_hit", len(cached), flights=cached) +return cached +``` + +**API success (line 143):** +```python +if progress_callback: + progress_callback(origin, destination, date, "api_success", len(result), flights=result) +``` + +The `error` callback (line 159) does not need `flights=` since there are no results. + +--- + +## 6. Concurrency & Safety + +| Concern | Analysis | Verdict | +|---------|----------|---------| +| Simultaneous writes to same route row | Cannot happen: the event loop is single-threaded; callbacks are not concurrent. `asyncio.gather` runs tasks concurrently but yields at `await` points, and callbacks fire synchronously after each `await to_thread` returns. | ✅ Safe | +| Two callbacks for same destination arriving "simultaneously" | Impossible in single-threaded event loop. Second callback blocks until first completes (no `await` in callback). | ✅ Safe | +| Duplicate flight rows | Each `(scan_id, destination, date)` query fires exactly once; its flights are written exactly once. | ✅ No duplicates | +| `total_flights` trigger still fires correctly | SQLite triggers on `INSERT INTO routes` and `UPDATE OF flight_count ON routes` fire for each incremental write — counts stay accurate. | ✅ Works | +| Scan completion `total_flights` update | Still set explicitly at completion from `results` dict count — redundant but harmless. | ✅ OK | + +--- + +## 7. Edge Cases + +| Case | Handling | +|------|----------| +| Destination returns 0 flights | `_write_route_incremental` returns early — no row created. Route only appears if at least one priced flight found. | +| Scan is deleted mid-run | `DELETE CASCADE` on `scans` removes routes/flights automatically. Progress callback write will fail with FK error, caught by `except` block and logged. | +| Scan fails mid-run | Routes written so far remain in DB. Status set to `failed`. UI will show partial results with `failed` badge — acceptable. | +| DB write error in callback | Logged, does not crash the scan. Query continues, flight data lost for that callback. | +| Existing scans (pre-feature) | No impact. Migration adds index but doesn't change old data (all complete scans already have 1 row per destination). | + +--- + +## 8. Migration Plan + +1. **`database/schema.sql`**: Add `CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest`. +2. **`database/init_db.py`**: Add `_migrate_add_routes_unique_index()` + call it in `initialize_database()`. +3. **`scan_processor.py`**: Add `_write_route_incremental()` helper; update `progress_callback` closure; remove bulk-write Phase 2. +4. **`searcher_v3.py`**: Pass `flights=` kwarg to both successful callback invocations. + +**Migration is backward-safe:** The UNIQUE index is added with `IF NOT EXISTS`. Existing `completed` scans already have at most 1 route row per destination — the index creation will succeed without errors. + +**No API changes:** `/scans/:id/routes` endpoint already returns live data from the `routes` table. The frontend polling already works. + +--- + +## 9. Rollback + +To revert: remove the `flights=` kwarg from `searcher_v3.py` callbacks, restore the bulk-write Phase 2 in `scan_processor.py`, and remove `_write_route_incremental`. The UNIQUE index can remain — it only adds a constraint that is naturally satisfied by the bulk-write approach anyway. + +--- + +## 10. Testing Plan + +### Unit Tests (new) + +1. `test_write_route_incremental_new` — first call creates route row +2. `test_write_route_incremental_merge` — second call updates stats correctly +3. `test_write_route_incremental_no_prices` — empty-price flights produce no row +4. `test_write_route_incremental_airlines_merge` — duplicate airlines deduplicated +5. `test_weighted_average_formula` — verify avg formula with known numbers + +### Integration Tests (extend existing) + +6. Extend `test_scan_lifecycle` — poll routes every 0.1s during mock scan, verify routes appear before completion +7. `test_incremental_writes_idempotent` — simulate same callback called twice for same destination +8. `test_unique_index_exists` — verify migration creates index +9. `test_migration_collapses_duplicates` — seed duplicate route rows, run migration, verify collapsed + +--- + +## 11. Out of Scope + +- WebSocket or SSE for push-based updates (polling already works) +- Frontend changes (none needed) +- Real-time price charts +- Partial scan resume after crash +- `total_flights` trigger removal (keep for consistency) diff --git a/flight-comparator/database/init_db.py b/flight-comparator/database/init_db.py index 0307bc0..77a621c 100644 --- a/flight-comparator/database/init_db.py +++ b/flight-comparator/database/init_db.py @@ -130,6 +130,43 @@ def _migrate_relax_country_constraint(conn, verbose=True): print(" ✅ Migration complete: country column now accepts >= 2 chars") +def _migrate_add_routes_unique_index(conn, verbose=True): + """ + Migration: Add UNIQUE index on routes(scan_id, destination). + + Required for incremental route writes during active scans. + Collapses any pre-existing duplicate (scan_id, destination) rows first + (keeps the row with the lowest id) before creating the index. + """ + cursor = conn.execute( + "SELECT name FROM sqlite_master WHERE type='index' AND name='uq_routes_scan_dest'" + ) + if cursor.fetchone(): + return # Already migrated + + if verbose: + print(" 🔄 Migrating routes table: adding UNIQUE index on (scan_id, destination)...") + + # Collapse any existing duplicates (guard against edge cases) + conn.execute(""" + DELETE FROM routes + WHERE id NOT IN ( + SELECT MIN(id) + FROM routes + GROUP BY scan_id, destination + ) + """) + + conn.execute(""" + CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest + ON routes(scan_id, destination) + """) + conn.commit() + + if verbose: + print(" ✅ Migration complete: uq_routes_scan_dest index created") + + def initialize_database(db_path=None, verbose=True): """ Initialize or migrate the database. @@ -174,6 +211,7 @@ def initialize_database(db_path=None, verbose=True): # Apply migrations before running schema _migrate_relax_country_constraint(conn, verbose) + _migrate_add_routes_unique_index(conn, verbose) # Load and execute schema schema_sql = load_schema() diff --git a/flight-comparator/database/schema.sql b/flight-comparator/database/schema.sql index 183597d..a2d9433 100644 --- a/flight-comparator/database/schema.sql +++ b/flight-comparator/database/schema.sql @@ -111,6 +111,10 @@ CREATE INDEX IF NOT EXISTS idx_routes_min_price ON routes(min_price) WHERE min_price IS NOT NULL; -- Partial index for routes with prices +-- One route row per (scan, destination) — enables incremental upsert writes +CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest + ON routes(scan_id, destination); + -- ============================================================================ -- Triggers: Auto-update timestamps and aggregates -- ============================================================================ diff --git a/flight-comparator/scan_processor.py b/flight-comparator/scan_processor.py index 08d3ebc..4789586 100644 --- a/flight-comparator/scan_processor.py +++ b/flight-comparator/scan_processor.py @@ -12,7 +12,6 @@ Runs as async background tasks within the FastAPI application. import asyncio import logging from datetime import datetime, date, timedelta -from typing import Dict, List, Optional import json from database import get_connection @@ -23,6 +22,102 @@ from searcher_v3 import search_multiple_routes logger = logging.getLogger(__name__) +def _write_route_incremental(scan_id: int, destination: str, + dest_name: str, dest_city: str, + new_flights: list): + """ + Write or update a route row and its individual flight rows immediately. + + Called from progress_callback each time a (scan_id, destination, date) + query returns results. Merges into the existing route row if one already + exists, using a running weighted average for avg_price. + + Opens its own DB connection — safe to call from the event loop thread. + """ + prices = [f.get('price') for f in new_flights if f.get('price')] + if not prices: + return + + new_airlines = list({f.get('airline') for f in new_flights if f.get('airline')}) + new_count = len(prices) + new_min = min(prices) + new_max = max(prices) + new_avg = sum(prices) / new_count + + try: + conn = get_connection() + cursor = conn.cursor() + + cursor.execute(""" + SELECT id, flight_count, min_price, max_price, avg_price, airlines + FROM routes + WHERE scan_id = ? AND destination = ? + """, (scan_id, destination)) + existing = cursor.fetchone() + + if existing is None: + cursor.execute(""" + INSERT INTO routes ( + scan_id, destination, destination_name, destination_city, + flight_count, airlines, min_price, max_price, avg_price + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + scan_id, destination, dest_name, dest_city, + new_count, json.dumps(new_airlines), + new_min, new_max, new_avg, + )) + else: + old_count = existing['flight_count'] or 0 + old_min = existing['min_price'] + old_max = existing['max_price'] + old_avg = existing['avg_price'] or 0.0 + old_airlines = json.loads(existing['airlines']) if existing['airlines'] else [] + + merged_count = old_count + new_count + merged_min = min(old_min, new_min) if old_min is not None else new_min + merged_max = max(old_max, new_max) if old_max is not None else new_max + merged_avg = (old_avg * old_count + new_avg * new_count) / merged_count + merged_airlines = json.dumps(list(set(old_airlines) | set(new_airlines))) + + cursor.execute(""" + UPDATE routes + SET flight_count = ?, + min_price = ?, + max_price = ?, + avg_price = ?, + airlines = ? + WHERE scan_id = ? AND destination = ? + """, ( + merged_count, merged_min, merged_max, merged_avg, merged_airlines, + scan_id, destination, + )) + + for flight in new_flights: + if not flight.get('price'): + continue + cursor.execute(""" + INSERT INTO flights ( + scan_id, destination, date, airline, + departure_time, arrival_time, price, stops + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, ( + scan_id, + destination, + flight.get('date', ''), + flight.get('airline'), + flight.get('departure_time'), + flight.get('arrival_time'), + flight.get('price'), + flight.get('stops', 0), + )) + + conn.commit() + conn.close() + + except Exception as e: + logger.error(f"[Scan {scan_id}] Failed to write incremental route {destination}: {e}") + + async def process_scan(scan_id: int): """ Process a pending scan by querying flights and saving routes. @@ -131,19 +226,28 @@ async def process_scan(scan_id: int): """, (len(routes_to_scan), scan_id)) conn.commit() - # Progress callback to update database - # Signature: callback(origin, destination, date, status, count, error=None) + # Progress callback — updates DB progress counter and writes routes live + # Signature: callback(origin, destination, date, status, count, error=None, flights=None) routes_scanned_count = 0 def progress_callback(origin: str, destination: str, date: str, - status: str, count: int, error: str = None): + status: str, count: int, error: str = None, + flights: list = None): nonlocal routes_scanned_count - # Increment counter for each route query (cache hit or API call) if status in ('cache_hit', 'api_success', 'error'): routes_scanned_count += 1 - # Update progress in database + # Write route + flights to DB immediately if results available + if flights and status in ('cache_hit', 'api_success'): + for f in flights: + f['date'] = date + dest_info = next((d for d in destinations if d['iata'] == destination), None) + dest_name = dest_info.get('name', destination) if dest_info else destination + dest_city = dest_info.get('city', '') if dest_info else '' + _write_route_incremental(scan_id, destination, dest_name, dest_city, flights) + + # Update progress counter try: progress_conn = get_connection() progress_cursor = progress_conn.cursor() @@ -158,7 +262,7 @@ async def process_scan(scan_id: int): progress_conn.commit() progress_conn.close() - if routes_scanned_count % 10 == 0: # Log every 10 routes + if routes_scanned_count % 10 == 0: logger.info(f"[Scan {scan_id}] Progress: {routes_scanned_count}/{len(routes_to_scan)} routes ({status}: {origin}→{destination})") except Exception as e: @@ -177,89 +281,15 @@ async def process_scan(scan_id: int): progress_callback=progress_callback ) - logger.info(f"[Scan {scan_id}] Flight queries complete. Processing results...") + logger.info(f"[Scan {scan_id}] Flight queries complete.") - # Group results by destination, preserving date per flight - # Structure: {dest: [(flight_dict, date), ...]} - routes_by_destination: Dict[str, List] = {} - total_flights = 0 - - for (orig, dest, scan_date), flights in results.items(): - if dest not in routes_by_destination: - routes_by_destination[dest] = [] - - for flight in flights: - routes_by_destination[dest].append((flight, scan_date)) - total_flights += len(flights) - - logger.info(f"[Scan {scan_id}] Found {total_flights} total flights across {len(routes_by_destination)} destinations") - - # Save routes and individual flights to database - routes_saved = 0 - for destination, flight_date_pairs in routes_by_destination.items(): - if not flight_date_pairs: - continue # Skip destinations with no flights - - flights = [f for f, _ in flight_date_pairs] - - # Get destination details (fall back to IATA code if not in DB) - dest_info = next((d for d in destinations if d['iata'] == destination), None) - dest_name = dest_info.get('name', destination) if dest_info else destination - dest_city = dest_info.get('city', '') if dest_info else '' - - # Calculate statistics - prices = [f.get('price') for f in flights if f.get('price')] - airlines = list(set(f.get('airline') for f in flights if f.get('airline'))) - - if not prices: - logger.info(f"[Scan {scan_id}] Skipping {destination} - no prices available") - continue - - min_price = min(prices) - max_price = max(prices) - avg_price = sum(prices) / len(prices) - - # Insert route summary - cursor.execute(""" - INSERT INTO routes ( - scan_id, destination, destination_name, destination_city, - min_price, max_price, avg_price, flight_count, airlines - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) - """, ( - scan_id, - destination, - dest_name, - dest_city, - min_price, - max_price, - avg_price, - len(flights), - json.dumps(airlines) - )) - - # Insert individual flights - for flight, flight_date in flight_date_pairs: - if not flight.get('price'): - continue - cursor.execute(""" - INSERT INTO flights ( - scan_id, destination, date, airline, - departure_time, arrival_time, price, stops - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) - """, ( - scan_id, - destination, - flight_date, - flight.get('airline'), - flight.get('departure_time'), - flight.get('arrival_time'), - flight.get('price'), - flight.get('stops', 0), - )) - - routes_saved += 1 - - conn.commit() + # Routes and flights were written incrementally by progress_callback. + routes_saved = cursor.execute( + "SELECT COUNT(*) FROM routes WHERE scan_id = ?", (scan_id,) + ).fetchone()[0] + total_flights_saved = cursor.execute( + "SELECT COALESCE(SUM(flight_count), 0) FROM routes WHERE scan_id = ?", (scan_id,) + ).fetchone()[0] # Update scan to completed cursor.execute(""" @@ -268,10 +298,10 @@ async def process_scan(scan_id: int): total_flights = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? - """, (total_flights, scan_id)) + """, (total_flights_saved, scan_id)) conn.commit() - logger.info(f"[Scan {scan_id}] ✅ Scan completed successfully! {routes_saved} routes saved with {total_flights} flights") + logger.info(f"[Scan {scan_id}] ✅ Scan completed successfully! {routes_saved} routes saved with {total_flights_saved} flights") except Exception as e: logger.error(f"[Scan {scan_id}] ❌ Scan failed with error: {str(e)}", exc_info=True) diff --git a/flight-comparator/searcher_v3.py b/flight-comparator/searcher_v3.py index ffcb1be..c505336 100644 --- a/flight-comparator/searcher_v3.py +++ b/flight-comparator/searcher_v3.py @@ -118,7 +118,7 @@ async def search_direct_flights( ) if cached is not None: if progress_callback: - progress_callback(origin, destination, date, "cache_hit", len(cached)) + progress_callback(origin, destination, date, "cache_hit", len(cached), flights=cached) return cached # Add random delay to avoid rate limiting @@ -140,7 +140,7 @@ async def search_direct_flights( # Report progress if progress_callback: - progress_callback(origin, destination, date, "api_success", len(result)) + progress_callback(origin, destination, date, "api_success", len(result), flights=result) return result