feat: add scheduled scans (cron-like recurring scans)

- New `scheduled_scans` table with daily/weekly/monthly frequencies
- asyncio background scheduler loop checks for due schedules every 60s
- 6 REST endpoints: CRUD + toggle enabled + run-now
- `scheduled_scan_id` FK added to scans table; migrated automatically
- Frontend: Schedules page (list + create form), Schedules nav link,
  "Scheduled" badge on ScanDetails when scan was triggered by a schedule

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-28 10:48:43 +01:00
parent ef5a27097d
commit 836c8474eb
9 changed files with 1666 additions and 10 deletions

View File

@@ -22,6 +22,7 @@ from pydantic import BaseModel, Field, validator, ValidationError
from contextlib import asynccontextmanager
from functools import lru_cache
from datetime import datetime, date, timedelta
import asyncio
import json
import os
import re
@@ -224,6 +225,7 @@ RATE_LIMITS = {
'scans': (50, 60), # 50 scan creations per minute
'logs': (100, 60), # 100 log requests per minute
'airports': (500, 60), # 500 airport searches per minute
'schedules': (30, 60), # 30 schedule requests per minute
}
@@ -240,10 +242,127 @@ def get_rate_limit_for_path(path: str) -> tuple[str, int, int]:
return 'logs', *RATE_LIMITS['logs']
elif '/airports' in path:
return 'airports', *RATE_LIMITS['airports']
elif '/schedules' in path:
return 'schedules', *RATE_LIMITS['schedules']
else:
return 'default', *RATE_LIMITS['default']
# =============================================================================
# Scheduler
# =============================================================================
def compute_next_run(frequency: str, hour: int, minute: int,
day_of_week: int = None, day_of_month: int = None,
after: datetime = None) -> datetime:
"""Compute the next UTC run time for a scheduled scan."""
now = after or datetime.utcnow()
base = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if frequency == 'daily':
return base if base > now else base + timedelta(days=1)
elif frequency == 'weekly':
days_ahead = (day_of_week - now.weekday()) % 7
if days_ahead == 0 and base <= now:
days_ahead = 7
return (now + timedelta(days=days_ahead)).replace(
hour=hour, minute=minute, second=0, microsecond=0)
elif frequency == 'monthly':
candidate = now.replace(day=day_of_month, hour=hour, minute=minute,
second=0, microsecond=0)
if candidate <= now:
m, y = (now.month % 12) + 1, now.year + (1 if now.month == 12 else 0)
candidate = candidate.replace(year=y, month=m)
return candidate
raise ValueError(f"Unknown frequency: {frequency}")
def _check_and_run_due_schedules():
"""Query DB for due schedules and fire a scan for each."""
try:
conn = get_connection()
cursor = conn.cursor()
now_str = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute("""
SELECT id, origin, country, window_months, seat_class, adults,
frequency, hour, minute, day_of_week, day_of_month
FROM scheduled_scans
WHERE enabled = 1 AND next_run_at <= ?
""", (now_str,))
due = cursor.fetchall()
for row in due:
(sched_id, origin, country, window_months, seat_class, adults,
frequency, hour, minute, day_of_week, day_of_month) = row
# Concurrency guard: skip if a scan for this schedule is still active
running = conn.execute("""
SELECT id FROM scans
WHERE scheduled_scan_id = ? AND status IN ('pending', 'running')
""", (sched_id,)).fetchone()
if running:
logging.info(
f"Schedule {sched_id}: previous scan {running[0]} still active, skipping"
)
else:
# Compute date window
start_date = (date.today() + timedelta(days=1)).isoformat()
end_dt = date.today() + timedelta(days=1) + timedelta(days=30 * window_months)
end_date = end_dt.isoformat()
conn.execute("""
INSERT INTO scans (
origin, country, start_date, end_date,
status, seat_class, adults, scheduled_scan_id
) VALUES (?, ?, ?, ?, 'pending', ?, ?, ?)
""", (origin, country, start_date, end_date,
seat_class, adults, sched_id))
conn.commit()
scan_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
try:
start_scan_processor(scan_id)
logging.info(
f"Schedule {sched_id}: fired scan {scan_id} "
f"({origin}{country})"
)
except Exception as e:
logging.error(
f"Schedule {sched_id}: failed to start scan {scan_id}: {e}"
)
# Advance next_run_at regardless of whether we fired
next_run = compute_next_run(
frequency, hour, minute, day_of_week, day_of_month
)
conn.execute("""
UPDATE scheduled_scans
SET last_run_at = ?, next_run_at = ?
WHERE id = ?
""", (now_str, next_run.strftime('%Y-%m-%d %H:%M:%S'), sched_id))
conn.commit()
conn.close()
except Exception as e:
logging.error(f"Scheduler error: {e}", exc_info=True)
async def _scheduler_loop():
"""Background task: check for due schedules every 60 seconds."""
logging.info("Scheduler loop started")
# Run immediately on startup to catch any missed schedules
_check_and_run_due_schedules()
while True:
await asyncio.sleep(60)
_check_and_run_due_schedules()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize airport data and database on server start."""
@@ -308,7 +427,18 @@ async def lifespan(app: FastAPI):
print(f"⚠️ Scan cleanup warning: {e}")
logging.info("Flight Radar API v2.0 startup complete")
# Start scheduled scan background task
scheduler_task = asyncio.create_task(_scheduler_loop())
logging.info("Scheduled scan background task started")
yield
scheduler_task.cancel()
try:
await scheduler_task
except asyncio.CancelledError:
pass
logging.info("Flight Radar API v2.0 shutting down")
@@ -799,6 +929,7 @@ class Scan(BaseModel):
error_message: Optional[str] = Field(None, description="Error message if scan failed")
seat_class: str = Field(..., description="Seat class")
adults: int = Field(..., ge=1, le=9, description="Number of adults")
scheduled_scan_id: Optional[int] = Field(None, description="ID of the schedule that created this scan")
class ScanCreateResponse(BaseModel):
@@ -1123,7 +1254,7 @@ async def create_scan(request: ScanRequest):
SELECT id, origin, country, start_date, end_date,
created_at, updated_at, status, total_routes,
routes_scanned, total_flights, error_message,
seat_class, adults
seat_class, adults, scheduled_scan_id
FROM scans
WHERE id = ?
""", (scan_id,))
@@ -1148,7 +1279,8 @@ async def create_scan(request: ScanRequest):
total_flights=row[10],
error_message=row[11],
seat_class=row[12],
adults=row[13]
adults=row[13],
scheduled_scan_id=row[14] if len(row) > 14 else None
)
logging.info(f"Scan created: ID={scan_id}, origin={scan.origin}, country={scan.country}, dates={scan.start_date} to {scan.end_date}")
@@ -1227,7 +1359,7 @@ async def list_scans(
SELECT id, origin, country, start_date, end_date,
created_at, updated_at, status, total_routes,
routes_scanned, total_flights, error_message,
seat_class, adults
seat_class, adults, scheduled_scan_id
FROM scans
{where_clause}
ORDER BY created_at DESC
@@ -1254,7 +1386,8 @@ async def list_scans(
total_flights=row[10],
error_message=row[11],
seat_class=row[12],
adults=row[13]
adults=row[13],
scheduled_scan_id=row[14] if len(row) > 14 else None
))
# Build pagination metadata
@@ -1295,7 +1428,7 @@ async def get_scan_status(scan_id: int):
SELECT id, origin, country, start_date, end_date,
created_at, updated_at, status, total_routes,
routes_scanned, total_flights, error_message,
seat_class, adults
seat_class, adults, scheduled_scan_id
FROM scans
WHERE id = ?
""", (scan_id,))
@@ -1323,7 +1456,8 @@ async def get_scan_status(scan_id: int):
total_flights=row[10],
error_message=row[11],
seat_class=row[12],
adults=row[13]
adults=row[13],
scheduled_scan_id=row[14] if len(row) > 14 else None
)
except HTTPException:
@@ -1649,7 +1783,7 @@ async def get_logs(
@router_v1.get("/flights/{route_id}")
async def get_flights(route_id: str):
async def get_flights_stub(route_id: str):
"""
Get all flights for a specific route.
@@ -1659,6 +1793,348 @@ async def get_flights(route_id: str):
raise HTTPException(status_code=501, detail="Flights endpoint not yet implemented")
# =============================================================================
# Schedules
# =============================================================================
class CreateScheduleRequest(BaseModel):
"""Request body for creating or updating a scheduled scan."""
origin: str = Field(..., description="Origin airport IATA code (3 letters)")
country: str = Field(..., description="Destination country ISO code (2 letters) or comma-separated IATA codes")
window_months: int = Field(1, ge=1, le=12, description="Months of data per scan run")
seat_class: str = Field('economy', description="Seat class")
adults: int = Field(1, ge=1, le=9, description="Number of adults")
label: Optional[str] = Field(None, description="Human-readable name for this schedule")
frequency: str = Field(..., description="Recurrence: daily | weekly | monthly")
hour: int = Field(6, ge=0, le=23, description="UTC hour (023)")
minute: int = Field(0, ge=0, le=59, description="UTC minute (059)")
day_of_week: Optional[int] = Field(None, ge=0, le=6, description="Required for weekly (0=Mon)")
day_of_month: Optional[int] = Field(None, ge=1, le=28, description="Required for monthly (128)")
@validator('origin', pre=True)
def uppercase_origin(cls, v):
return v.strip().upper() if v else v
@validator('country', pre=True)
def uppercase_country(cls, v):
return v.strip().upper() if v else v
@validator('frequency')
def validate_frequency(cls, v):
if v not in ('daily', 'weekly', 'monthly'):
raise ValueError("frequency must be daily, weekly, or monthly")
return v
@validator('day_of_week', always=True)
def validate_day_of_week(cls, v, values):
if values.get('frequency') == 'weekly' and v is None:
raise ValueError("day_of_week is required when frequency is weekly")
return v
@validator('day_of_month', always=True)
def validate_day_of_month(cls, v, values):
if values.get('frequency') == 'monthly' and v is None:
raise ValueError("day_of_month is required when frequency is monthly")
return v
class UpdateScheduleRequest(BaseModel):
"""Request body for PATCH /schedules/{id}."""
enabled: Optional[bool] = None
label: Optional[str] = None
frequency: Optional[str] = None
hour: Optional[int] = Field(None, ge=0, le=23)
minute: Optional[int] = Field(None, ge=0, le=59)
day_of_week: Optional[int] = Field(None, ge=0, le=6)
day_of_month: Optional[int] = Field(None, ge=1, le=28)
window_months: Optional[int] = Field(None, ge=1, le=12)
seat_class: Optional[str] = None
adults: Optional[int] = Field(None, ge=1, le=9)
@validator('frequency')
def validate_frequency(cls, v):
if v is not None and v not in ('daily', 'weekly', 'monthly'):
raise ValueError("frequency must be daily, weekly, or monthly")
return v
class Schedule(BaseModel):
"""A recurring scheduled scan."""
id: int
origin: str
country: str
window_months: int
seat_class: str
adults: int
label: Optional[str]
frequency: str
hour: int
minute: int
day_of_week: Optional[int]
day_of_month: Optional[int]
enabled: bool
last_run_at: Optional[str]
next_run_at: str
created_at: str
recent_scan_ids: List[int]
def _row_to_schedule(row, recent_scan_ids: list) -> Schedule:
"""Convert a DB row (sqlite3.Row or tuple) to a Schedule model."""
return Schedule(
id=row['id'],
origin=row['origin'],
country=row['country'],
window_months=row['window_months'],
seat_class=row['seat_class'],
adults=row['adults'],
label=row['label'],
frequency=row['frequency'],
hour=row['hour'],
minute=row['minute'],
day_of_week=row['day_of_week'],
day_of_month=row['day_of_month'],
enabled=bool(row['enabled']),
last_run_at=row['last_run_at'],
next_run_at=row['next_run_at'],
created_at=row['created_at'],
recent_scan_ids=recent_scan_ids,
)
def _get_recent_scan_ids(conn, schedule_id: int, limit: int = 5) -> list:
rows = conn.execute("""
SELECT id FROM scans
WHERE scheduled_scan_id = ?
ORDER BY created_at DESC
LIMIT ?
""", (schedule_id, limit)).fetchall()
return [r[0] for r in rows]
@router_v1.get("/schedules", response_model=PaginatedResponse[Schedule])
async def list_schedules(
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
):
"""List all scheduled scans with pagination."""
try:
conn = get_connection()
total = conn.execute("SELECT COUNT(*) FROM scheduled_scans").fetchone()[0]
total_pages = math.ceil(total / limit) if total > 0 else 0
offset = (page - 1) * limit
rows = conn.execute("""
SELECT * FROM scheduled_scans
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""", (limit, offset)).fetchall()
items = [
_row_to_schedule(r, _get_recent_scan_ids(conn, r['id']))
for r in rows
]
conn.close()
pagination = PaginationMetadata(
page=page, limit=limit, total=total, pages=total_pages,
has_next=page < total_pages, has_prev=page > 1,
)
return PaginatedResponse(data=items, pagination=pagination)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to list schedules: {e}")
@router_v1.post("/schedules", response_model=Schedule, status_code=201)
async def create_schedule(request: CreateScheduleRequest):
"""Create a new scheduled scan."""
try:
next_run = compute_next_run(
request.frequency, request.hour, request.minute,
request.day_of_week, request.day_of_month,
)
next_run_str = next_run.strftime('%Y-%m-%d %H:%M:%S')
conn = get_connection()
conn.execute("""
INSERT INTO scheduled_scans (
origin, country, window_months, seat_class, adults,
label, frequency, hour, minute, day_of_week, day_of_month,
enabled, next_run_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, ?)
""", (
request.origin, request.country, request.window_months,
request.seat_class, request.adults, request.label,
request.frequency, request.hour, request.minute,
request.day_of_week, request.day_of_month, next_run_str,
))
conn.commit()
sched_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
row = conn.execute(
"SELECT * FROM scheduled_scans WHERE id = ?", (sched_id,)
).fetchone()
result = _row_to_schedule(row, [])
conn.close()
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to create schedule: {e}")
@router_v1.get("/schedules/{schedule_id}", response_model=Schedule)
async def get_schedule(schedule_id: int):
"""Get a single schedule by ID, including its last 5 scan IDs."""
try:
conn = get_connection()
row = conn.execute(
"SELECT * FROM scheduled_scans WHERE id = ?", (schedule_id,)
).fetchone()
if not row:
conn.close()
raise HTTPException(status_code=404, detail=f"Schedule not found: {schedule_id}")
recent = _get_recent_scan_ids(conn, schedule_id)
result = _row_to_schedule(row, recent)
conn.close()
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get schedule: {e}")
@router_v1.patch("/schedules/{schedule_id}", response_model=Schedule)
async def update_schedule(schedule_id: int, request: UpdateScheduleRequest):
"""Update schedule fields. Recomputes next_run_at if schedule params change."""
try:
conn = get_connection()
row = conn.execute(
"SELECT * FROM scheduled_scans WHERE id = ?", (schedule_id,)
).fetchone()
if not row:
conn.close()
raise HTTPException(status_code=404, detail=f"Schedule not found: {schedule_id}")
# Merge updates on top of existing values
frequency = request.frequency if request.frequency is not None else row['frequency']
hour = request.hour if request.hour is not None else row['hour']
minute = request.minute if request.minute is not None else row['minute']
day_of_week = request.day_of_week if request.day_of_week is not None else row['day_of_week']
day_of_month = request.day_of_month if request.day_of_month is not None else row['day_of_month']
next_run = compute_next_run(frequency, hour, minute, day_of_week, day_of_month)
next_run_str = next_run.strftime('%Y-%m-%d %H:%M:%S')
enabled_val = int(request.enabled) if request.enabled is not None else row['enabled']
label_val = request.label if request.label is not None else row['label']
wm_val = request.window_months if request.window_months is not None else row['window_months']
sc_val = request.seat_class if request.seat_class is not None else row['seat_class']
adults_val = request.adults if request.adults is not None else row['adults']
conn.execute("""
UPDATE scheduled_scans
SET enabled = ?, label = ?, frequency = ?, hour = ?, minute = ?,
day_of_week = ?, day_of_month = ?, window_months = ?,
seat_class = ?, adults = ?, next_run_at = ?
WHERE id = ?
""", (
enabled_val, label_val, frequency, hour, minute,
day_of_week, day_of_month, wm_val, sc_val, adults_val,
next_run_str, schedule_id,
))
conn.commit()
updated_row = conn.execute(
"SELECT * FROM scheduled_scans WHERE id = ?", (schedule_id,)
).fetchone()
recent = _get_recent_scan_ids(conn, schedule_id)
result = _row_to_schedule(updated_row, recent)
conn.close()
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to update schedule: {e}")
@router_v1.delete("/schedules/{schedule_id}", status_code=204)
async def delete_schedule(schedule_id: int):
"""Delete a schedule. Associated scans are kept with scheduled_scan_id set to NULL."""
try:
conn = get_connection()
row = conn.execute(
"SELECT id FROM scheduled_scans WHERE id = ?", (schedule_id,)
).fetchone()
if not row:
conn.close()
raise HTTPException(status_code=404, detail=f"Schedule not found: {schedule_id}")
# Nullify FK in scans before deleting (SQLite FK cascade may not be set)
conn.execute(
"UPDATE scans SET scheduled_scan_id = NULL WHERE scheduled_scan_id = ?",
(schedule_id,)
)
conn.execute("DELETE FROM scheduled_scans WHERE id = ?", (schedule_id,))
conn.commit()
conn.close()
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to delete schedule: {e}")
@router_v1.post("/schedules/{schedule_id}/run-now")
async def run_schedule_now(schedule_id: int):
"""Trigger a scheduled scan immediately, ignoring next_run_at."""
try:
conn = get_connection()
row = conn.execute(
"SELECT * FROM scheduled_scans WHERE id = ?", (schedule_id,)
).fetchone()
if not row:
conn.close()
raise HTTPException(status_code=404, detail=f"Schedule not found: {schedule_id}")
start_date = (date.today() + timedelta(days=1)).isoformat()
end_dt = date.today() + timedelta(days=1) + timedelta(days=30 * row['window_months'])
end_date = end_dt.isoformat()
conn.execute("""
INSERT INTO scans (
origin, country, start_date, end_date,
status, seat_class, adults, scheduled_scan_id
) VALUES (?, ?, ?, ?, 'pending', ?, ?, ?)
""", (
row['origin'], row['country'], start_date, end_date,
row['seat_class'], row['adults'], schedule_id,
))
conn.commit()
scan_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
conn.close()
start_scan_processor(scan_id)
logging.info(f"Schedule {schedule_id}: manual run-now fired scan {scan_id}")
return {"scan_id": scan_id}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to run schedule: {e}")
# =============================================================================
# Include Router (IMPORTANT!)
# =============================================================================