Files
ciaovolo/flight-comparator/database/init_db.py
domverse 000391f7fc
All checks were successful
Deploy / deploy (push) Successful in 36s
fix: guard migrations against fresh-database installs
On a fresh DB, migrations ran before the schema was applied and tried to
operate on tables that didn't exist yet (routes, scans), causing:
  "no such table: routes" on _migrate_add_routes_unique_index
  "no such table: scans" on _migrate_add_scheduled_scan_id_to_scans

Added table-existence checks so both migrations bail out when the table
isn't there yet. The schema's CREATE TABLE IF NOT EXISTS handles fresh
installs correctly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 12:35:45 +01:00

368 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Database Initialization Script for Flight Radar Web App
This script initializes or migrates the SQLite database for the web app.
It extends the existing cache.db with new tables while preserving existing data.
Usage:
# As script
python database/init_db.py
# As module
from database import initialize_database
initialize_database()
"""
import os
import sqlite3
import sys
from pathlib import Path
def get_db_path():
"""Get path to cache.db in the flight-comparator directory."""
# Assuming this script is in flight-comparator/database/
script_dir = Path(__file__).parent
project_dir = script_dir.parent
db_path = project_dir / "cache.db"
return db_path
def get_schema_path():
"""Get path to schema.sql"""
return Path(__file__).parent / "schema.sql"
def load_schema():
"""Load schema.sql file content."""
schema_path = get_schema_path()
if not schema_path.exists():
raise FileNotFoundError(f"Schema file not found: {schema_path}")
with open(schema_path, 'r', encoding='utf-8') as f:
return f.read()
def check_existing_tables(conn):
"""Check which tables already exist in the database."""
cursor = conn.execute("""
SELECT name FROM sqlite_master
WHERE type='table'
ORDER BY name
""")
return [row[0] for row in cursor.fetchall()]
def get_schema_version(conn):
"""Get current schema version, or 0 if schema_version table doesn't exist."""
existing_tables = check_existing_tables(conn)
if 'schema_version' not in existing_tables:
return 0
cursor = conn.execute("SELECT MAX(version) FROM schema_version")
result = cursor.fetchone()[0]
return result if result is not None else 0
def _migrate_relax_country_constraint(conn, verbose=True):
"""
Migration: Relax the country column CHECK constraint from = 2 to >= 2.
The country column stores either a 2-letter ISO country code (e.g., 'DE')
or a comma-separated list of destination IATA codes (e.g., 'MUC,FRA,BER').
The original CHECK(length(country) = 2) only allowed country codes.
"""
cursor = conn.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='scans'"
)
row = cursor.fetchone()
if not row or 'length(country) = 2' not in row[0]:
return # Table doesn't exist yet or already migrated
if verbose:
print(" 🔄 Migrating scans table: relaxing country CHECK constraint...")
# SQLite doesn't support ALTER TABLE MODIFY COLUMN, so we must recreate.
# Steps:
# 1. Disable FK checks (routes has FK to scans)
# 2. Drop triggers that reference scans from routes table (they'll be
# recreated by executescript(schema_sql) below)
# 3. Create scans_new with relaxed constraint
# 4. Copy data, drop scans, rename scans_new -> scans
# 5. Re-enable FK checks
conn.execute("PRAGMA foreign_keys = OFF")
# Drop triggers on routes that reference scans in their bodies.
# They are recreated by the subsequent executescript(schema_sql) call.
conn.execute("DROP TRIGGER IF EXISTS update_scan_flight_count_insert")
conn.execute("DROP TRIGGER IF EXISTS update_scan_flight_count_update")
conn.execute("DROP TRIGGER IF EXISTS update_scan_flight_count_delete")
conn.execute("""
CREATE TABLE scans_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
origin TEXT NOT NULL CHECK(length(origin) = 3),
country TEXT NOT NULL CHECK(length(country) >= 2),
start_date TEXT NOT NULL,
end_date TEXT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
status TEXT NOT NULL DEFAULT 'pending'
CHECK(status IN ('pending', 'running', 'completed', 'failed')),
total_routes INTEGER NOT NULL DEFAULT 0 CHECK(total_routes >= 0),
routes_scanned INTEGER NOT NULL DEFAULT 0 CHECK(routes_scanned >= 0),
total_flights INTEGER NOT NULL DEFAULT 0 CHECK(total_flights >= 0),
error_message TEXT,
seat_class TEXT DEFAULT 'economy',
adults INTEGER DEFAULT 1 CHECK(adults > 0 AND adults <= 9),
CHECK(end_date >= start_date),
CHECK(routes_scanned <= total_routes OR total_routes = 0)
)
""")
conn.execute("INSERT INTO scans_new SELECT * FROM scans")
conn.execute("DROP TABLE scans")
conn.execute("ALTER TABLE scans_new RENAME TO scans")
conn.execute("PRAGMA foreign_keys = ON")
conn.commit()
if verbose:
print(" ✅ Migration complete: country column now accepts >= 2 chars")
def _migrate_add_routes_unique_index(conn, verbose=True):
"""
Migration: Add UNIQUE index on routes(scan_id, destination).
Required for incremental route writes during active scans.
Collapses any pre-existing duplicate (scan_id, destination) rows first
(keeps the row with the lowest id) before creating the index.
"""
# Fresh install: routes table doesn't exist yet — schema will create the index
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='routes'"
)
if not cursor.fetchone():
return
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='index' AND name='uq_routes_scan_dest'"
)
if cursor.fetchone():
return # Already migrated
if verbose:
print(" 🔄 Migrating routes table: adding UNIQUE index on (scan_id, destination)...")
# Collapse any existing duplicates (guard against edge cases)
conn.execute("""
DELETE FROM routes
WHERE id NOT IN (
SELECT MIN(id)
FROM routes
GROUP BY scan_id, destination
)
""")
conn.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS uq_routes_scan_dest
ON routes(scan_id, destination)
""")
conn.commit()
if verbose:
print(" ✅ Migration complete: uq_routes_scan_dest index created")
def _migrate_add_scheduled_scan_id_to_scans(conn, verbose=True):
"""
Migration: add scheduled_scan_id column to scans table.
Existing rows get NULL (manual scans). New column has no inline FK
declaration because SQLite's ALTER TABLE ADD COLUMN doesn't support it;
the relationship is enforced at the application level.
"""
cursor = conn.execute("PRAGMA table_info(scans)")
columns = [row[1] for row in cursor.fetchall()]
if not columns:
return # Fresh install: scans table doesn't exist yet — schema will create the column
if 'scheduled_scan_id' in columns:
return # Already migrated
if verbose:
print(" 🔄 Migrating scans table: adding scheduled_scan_id column...")
conn.execute("ALTER TABLE scans ADD COLUMN scheduled_scan_id INTEGER")
conn.commit()
if verbose:
print(" ✅ Migration complete: scheduled_scan_id column added to scans")
def initialize_database(db_path=None, verbose=True):
"""
Initialize or migrate the database.
Args:
db_path: Path to database file (default: cache.db or DATABASE_PATH env var)
verbose: Print status messages
Returns:
sqlite3.Connection: Database connection with foreign keys enabled
"""
if db_path is None:
# Check for DATABASE_PATH environment variable first (used by tests)
env_db_path = os.environ.get('DATABASE_PATH')
if env_db_path:
db_path = Path(env_db_path)
else:
db_path = get_db_path()
db_exists = db_path.exists()
if verbose:
if db_exists:
print(f"📊 Extending existing database: {db_path}")
else:
print(f"📊 Creating new database: {db_path}")
# Connect to database
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row # Access columns by name
# Get existing state
existing_tables = check_existing_tables(conn)
current_version = get_schema_version(conn)
if verbose:
if existing_tables:
print(f" Existing tables: {', '.join(existing_tables)}")
print(f" Current schema version: {current_version}")
else:
print(" No existing tables found")
# Apply migrations before running schema
_migrate_relax_country_constraint(conn, verbose)
_migrate_add_routes_unique_index(conn, verbose)
_migrate_add_scheduled_scan_id_to_scans(conn, verbose)
# Load and execute schema
schema_sql = load_schema()
if verbose:
print(" Executing schema...")
try:
# Execute schema (uses CREATE TABLE IF NOT EXISTS, so safe)
conn.executescript(schema_sql)
conn.commit()
if verbose:
print(" ✅ Schema executed successfully")
except sqlite3.Error as e:
conn.rollback()
if verbose:
print(f" ❌ Schema execution failed: {e}")
raise
# Verify foreign keys are enabled
cursor = conn.execute("PRAGMA foreign_keys")
fk_enabled = cursor.fetchone()[0]
if fk_enabled != 1:
if verbose:
print(" ⚠️ Warning: Foreign keys not enabled!")
print(" Enabling foreign keys for this connection...")
conn.execute("PRAGMA foreign_keys = ON")
# Check what was created
new_tables = check_existing_tables(conn)
new_version = get_schema_version(conn)
if verbose:
print(f"\n✅ Database initialized successfully!")
print(f" Total tables: {len(new_tables)}")
print(f" Schema version: {new_version}")
print(f" Foreign keys: {'✅ Enabled' if fk_enabled else '❌ Disabled'}")
# Count indexes, triggers, views
cursor = conn.execute("""
SELECT type, COUNT(*) as count
FROM sqlite_master
WHERE type IN ('index', 'trigger', 'view')
GROUP BY type
""")
for row in cursor:
print(f" {row[0].capitalize()}s: {row[1]}")
# Check for web app tables specifically
web_tables = [t for t in new_tables if t in ('scans', 'routes', 'schema_version')]
if web_tables:
print(f"\n📦 Web app tables: {', '.join(web_tables)}")
# Check for existing cache tables
cache_tables = [t for t in new_tables if 'flight' in t.lower()]
if cache_tables:
print(f"💾 Existing cache tables: {', '.join(cache_tables)}")
return conn
def get_connection(db_path=None):
"""
Get a database connection with foreign keys enabled.
Args:
db_path: Path to database file (default: cache.db or DATABASE_PATH env var)
Returns:
sqlite3.Connection: Database connection
"""
if db_path is None:
# Check for DATABASE_PATH environment variable first (used by tests)
env_db_path = os.environ.get('DATABASE_PATH')
if env_db_path:
db_path = Path(env_db_path)
else:
db_path = get_db_path()
if not db_path.exists():
raise FileNotFoundError(
f"Database not found: {db_path}\n"
"Run 'python database/init_db.py' to create it."
)
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
return conn
def main():
"""CLI entry point."""
print("=" * 60)
print("Flight Radar Web App - Database Initialization")
print("=" * 60)
print()
try:
conn = initialize_database(verbose=True)
conn.close()
print()
print("=" * 60)
print("✅ Done! Database is ready.")
print("=" * 60)
return 0
except Exception as e:
print()
print("=" * 60)
print(f"❌ Error: {e}")
print("=" * 60)
return 1
if __name__ == "__main__":
sys.exit(main())