Files
pngx-sync/app/scheduler.py
domverse b99dbf694d
All checks were successful
Deploy / deploy (push) Successful in 30s
feat: implement pngx-controller with Gitea CI/CD deployment
- Full FastAPI sync engine: master→replica document sync via paperless REST API
- Web UI: dashboard, replicas, logs, settings (Jinja2 + HTMX + Pico CSS)
- APScheduler background sync, SSE live log stream, Prometheus metrics
- Fernet encryption for API tokens at rest
- pngx.env credential file: written on save, pre-fills forms on load
- Dockerfile with layer-cached uv build, Python healthcheck
- docker-compose with host networking for Tailscale access
- Gitea Actions workflow: version bump, secret injection, docker compose deploy

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-22 17:59:25 +01:00

64 lines
1.5 KiB
Python

import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
_scheduler: AsyncIOScheduler | None = None
SETTINGS_DEFAULTS = {
"sync_interval_seconds": "900",
"log_retention_days": "90",
"sync_cycle_timeout_seconds": "1800",
"task_poll_timeout_seconds": "600",
"replica_suspend_threshold": "5",
"max_concurrent_requests": "4",
"alert_target_type": "",
"alert_target_url": "",
"alert_target_token": "",
"alert_error_threshold": "5",
"alert_cooldown_seconds": "3600",
}
def get_setting(settings: dict, key: str) -> str:
return settings.get(key) or SETTINGS_DEFAULTS.get(key, "")
def get_setting_int(settings: dict, key: str) -> int:
return int(get_setting(settings, key) or SETTINGS_DEFAULTS[key])
async def _sync_job() -> None:
from .sync.engine import run_sync_cycle
await run_sync_cycle(triggered_by="scheduler")
def start_scheduler(interval_seconds: int = 900) -> AsyncIOScheduler:
global _scheduler
_scheduler = AsyncIOScheduler()
_scheduler.add_job(
_sync_job,
"interval",
seconds=interval_seconds,
id="sync_job",
replace_existing=True,
max_instances=1,
)
_scheduler.start()
return _scheduler
def reschedule(interval_seconds: int) -> None:
if _scheduler is None:
return
_scheduler.reschedule_job(
"sync_job",
trigger="interval",
seconds=interval_seconds,
)
def stop_scheduler() -> None:
if _scheduler:
_scheduler.shutdown(wait=False)