Add sync engine, web UI, Docker setup, and tests

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Claude
2026-03-07 20:54:59 +01:00
parent e4c69efd12
commit b3137a8af3
27 changed files with 7133 additions and 278 deletions

View File

@@ -30,14 +30,24 @@ from typing import Dict, List, Optional, Tuple
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from tqdm import tqdm
# Configure logging with tqdm-compatible handler
class TqdmLoggingHandler(logging.Handler):
"""Logging handler that uses tqdm.write() to avoid breaking progress bars."""
def emit(self, record):
try:
msg = self.format(record)
tqdm.write(msg, file=sys.stdout)
except Exception:
self.handleError(record)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s | %(levelname)-8s | %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger('outline_import')
logger.setLevel(logging.INFO)
handler = TqdmLoggingHandler()
handler.setFormatter(logging.Formatter('%(asctime)s | %(levelname)-8s | %(message)s', datefmt='%H:%M:%S'))
logger.addHandler(handler)
logger.propagate = False
class TreePrinter:
@@ -132,6 +142,17 @@ class OutlineImporter:
# Track existing collections
self.existing_collections: Dict[str, str] = {} # name -> id
# Track imported collection for verification
self.imported_collection_id: Optional[str] = None
self.imported_collection_name: Optional[str] = None
# Adaptive rate limiting
self.current_rate_delay = rate_limit_delay
self.min_rate_delay = 0.2
self.max_rate_delay = 5.0
self.rate_increase_factor = 1.5 # Increase delay by 50% on 429
self.rate_decrease_factor = 0.9 # Decrease delay by 10% on success
# Statistics
self.stats = {
"collections_created": 0,
@@ -145,23 +166,103 @@ class OutlineImporter:
# Error tracking
self.errors: List[Dict] = []
# Source metadata for verification
self.source_docs: List[Dict] = []
# Progress bar (initialized in import_all)
self.pbar: Optional[tqdm] = None
def _output(self, message: str, end: str = "\n") -> None:
"""Output message, using tqdm.write() if progress bar is active."""
if self.pbar is not None:
# tqdm.write() always adds newline, so we handle end="" specially
if end == "":
# For inline messages, update the progress bar description instead
self.pbar.set_description(message.strip())
else:
tqdm.write(message, file=sys.stdout)
else:
print(message, end=end, flush=True)
def _update_progress(self) -> None:
"""Update progress bar by 1."""
if self.pbar is not None:
self.pbar.update(1)
def _count_all_documents(self, source_collections: List[Path]) -> int:
"""Count total documents across all collections."""
total = 0
for collection_dir in source_collections:
metadata = self.load_collection_metadata(collection_dir)
if metadata:
total += metadata.get("expected_count", 0)
# In single mode, we also create a parent doc for each collection
if self.single_mode:
total += 1
return total
def _parse_retry_after(self, response: requests.Response) -> Optional[float]:
"""
Parse Retry-After header from response.
The header can be in two formats:
1. Seconds (integer): "Retry-After: 120"
2. HTTP date: "Retry-After: Wed, 21 Oct 2015 07:28:00 GMT"
Args:
response: HTTP response object
Returns:
Number of seconds to wait, or None if header not present/parseable
"""
retry_after = response.headers.get("Retry-After")
if not retry_after:
return None
# Try parsing as number (seconds) - handles both int and float
try:
seconds = float(retry_after)
logger.debug(f"Retry-After header: {seconds:.1f} seconds")
return seconds
except ValueError:
pass
# Try parsing as HTTP date
try:
from email.utils import parsedate_to_datetime
retry_date = parsedate_to_datetime(retry_after)
now = datetime.now(retry_date.tzinfo)
delta = (retry_date - now).total_seconds()
# Ensure positive wait time, minimum 1 second
wait_seconds = max(1.0, delta)
logger.debug(f"Retry-After header (HTTP date): wait {wait_seconds:.1f}s until {retry_after}")
return wait_seconds
except (ValueError, TypeError) as e:
logger.warning(f"Could not parse Retry-After header '{retry_after}': {e}")
return None
def _api_request(
self,
endpoint: str,
data: Optional[Dict] = None,
method: str = "POST"
method: str = "POST",
is_write: bool = False,
context: str = None
) -> Optional[Dict]:
"""
Make API request with error handling and retry logic.
Make API request with error handling, retry logic, and adaptive rate limiting.
Args:
endpoint: API endpoint path (e.g., '/api/collections.list')
data: Request body data
method: HTTP method (POST or GET)
is_write: Whether this is a write operation (applies rate limiting)
context: Optional context string for logging (e.g., document title)
Returns:
Response data dict or None on failure
"""
log_context = f" [{context}]" if context else ""
url = f"{self.base_url}{endpoint}"
for attempt in range(self.retry_attempts):
@@ -181,19 +282,78 @@ class OutlineImporter:
)
if response.status_code == 200:
return response.json()
elif response.status_code in [429, 500, 502, 503, 504]:
if attempt < self.retry_attempts - 1:
wait_time = self.retry_delay * (2 ** attempt)
logger.warning(
f"API error {response.status_code} on {endpoint}, "
f"retrying in {wait_time:.1f}s (attempt {attempt + 1}/{self.retry_attempts})"
# Success - gradually decrease rate delay for write operations
if is_write:
old_delay = self.current_rate_delay
self.current_rate_delay = max(
self.min_rate_delay,
self.current_rate_delay * self.rate_decrease_factor
)
if old_delay != self.current_rate_delay:
logger.debug(f"Rate delay decreased: {old_delay:.2f}s → {self.current_rate_delay:.2f}s")
return response.json()
elif response.status_code == 429:
# Rate limited - check Retry-After header
retry_after = self._parse_retry_after(response)
# Increase adaptive delay based on Retry-After or multiplicative factor
old_delay = self.current_rate_delay
if retry_after is not None:
# Use Retry-After to inform spacing: divide window by ~10 requests
# e.g., 20s Retry-After → 2s delay between requests
informed_delay = retry_after / 10.0
self.current_rate_delay = min(
self.max_rate_delay,
max(self.current_rate_delay, informed_delay)
)
else:
self.current_rate_delay = min(
self.max_rate_delay,
self.current_rate_delay * self.rate_increase_factor
)
if old_delay != self.current_rate_delay:
logger.info(f"Rate delay adjusted: {old_delay:.2f}s → {self.current_rate_delay:.2f}s")
if attempt < self.retry_attempts - 1:
# Use Retry-After if provided, otherwise fall back to exponential backoff
if retry_after is not None:
wait_time = retry_after
logger.warning(
f"Rate limited (429){log_context}, server requested {wait_time:.1f}s wait via Retry-After "
f"(attempt {attempt + 1}/{self.retry_attempts})"
)
else:
wait_time = self.current_rate_delay * (2 ** attempt)
logger.warning(
f"Rate limited (429){log_context}, waiting {wait_time:.1f}s "
f"(attempt {attempt + 1}/{self.retry_attempts}, delay now {self.current_rate_delay:.1f}s)"
)
time.sleep(wait_time)
continue
elif response.status_code in [500, 502, 503, 504]:
if attempt < self.retry_attempts - 1:
# Check for Retry-After header (common with 503)
retry_after = self._parse_retry_after(response)
if retry_after is not None:
wait_time = retry_after
logger.warning(
f"API error {response.status_code}{log_context}, "
f"server requested {wait_time:.1f}s wait via Retry-After "
f"(attempt {attempt + 1}/{self.retry_attempts})"
)
else:
wait_time = self.retry_delay * (2 ** attempt)
logger.warning(
f"API error {response.status_code}{log_context}, "
f"retrying in {wait_time:.1f}s (attempt {attempt + 1}/{self.retry_attempts})"
)
time.sleep(wait_time)
continue
# Non-retryable error or final attempt
logger.error(f"API error on {endpoint}: HTTP {response.status_code}")
logger.error(f"API error{log_context}: HTTP {response.status_code}")
logger.debug(f"Response: {response.text[:200]}")
return None
@@ -262,7 +422,7 @@ class OutlineImporter:
result = self._api_request("/api/collections.create", {
"name": name,
"permission": permission
})
}, is_write=True, context=f"collection:{name}")
if result and "data" in result:
collection_id = result["data"]["id"]
@@ -323,11 +483,11 @@ class OutlineImporter:
if parent_document_id:
data["parentDocumentId"] = parent_document_id
# Rate limiting
if self.rate_limit_delay > 0:
time.sleep(self.rate_limit_delay)
# Adaptive rate limiting - uses current_rate_delay which increases after 429s
if self.current_rate_delay > 0:
time.sleep(self.current_rate_delay)
result = self._api_request("/api/documents.create", data)
result = self._api_request("/api/documents.create", data, is_write=True, context=title)
if result and "data" in result:
return result["data"]["id"]
@@ -531,24 +691,24 @@ class OutlineImporter:
# Check if collection exists
if collection_name in self.existing_collections:
if self.force:
print(f" Deleting existing collection \"{collection_name}\"...")
self._output(f" Deleting existing collection \"{collection_name}\"...")
if not self.dry_run:
self._delete_collection(self.existing_collections[collection_name])
del self.existing_collections[collection_name]
else:
print(f" Collection exists, skipping...")
self._output(f" Collection exists, skipping...")
self.stats["collections_skipped"] += 1
return (0, doc_count, 0)
# Create collection
if self.dry_run:
print(f" [DRY RUN] Would create collection \"{collection_name}\"")
self._output(f" [DRY RUN] Would create collection \"{collection_name}\"")
collection_id = "dry-run-collection-id"
else:
print(f" Creating collection...", end=" ")
self._output(f" Creating collection... ", end="")
collection_id = self._create_collection(collection_name)
if not collection_id:
print("✗ failed")
self._output("✗ failed")
self.stats["collections_errors"] += 1
self.errors.append({
"type": "collection",
@@ -556,7 +716,7 @@ class OutlineImporter:
"error": "Failed to create collection"
})
return (0, 0, 1)
print(f"✓ (id: {collection_id[:8]}...)")
self._output(f"✓ (id: {collection_id[:8]}...)")
self.stats["collections_created"] += 1
@@ -596,7 +756,8 @@ class OutlineImporter:
content = self.read_document_content(collection_dir, filename)
if content is None:
line = TreePrinter.format_line(title, "error", "file not found", prefix + connector)
print(line)
self._output(line)
self._update_progress()
errors += 1
self.stats["documents_errors"] += 1
self.errors.append({
@@ -608,13 +769,14 @@ class OutlineImporter:
# Skip children if parent failed
if children:
child_prefix = prefix + (TreePrinter.BLANK if is_last else TreePrinter.PIPE)
print(f"{child_prefix}└── (children skipped due to parent failure)")
self._output(f"{child_prefix}└── (children skipped due to parent failure)")
continue
# Create document
if self.dry_run:
line = TreePrinter.format_line(title, "dry_run", prefix=prefix + connector)
print(line)
self._output(line)
self._update_progress()
self.id_map[old_id] = f"dry-run-{old_id}"
created += 1
self.stats["documents_created"] += 1
@@ -629,12 +791,14 @@ class OutlineImporter:
if new_id:
self.id_map[old_id] = new_id
line = TreePrinter.format_line(title, "created", prefix=prefix + connector)
print(line)
self._output(line)
self._update_progress()
created += 1
self.stats["documents_created"] += 1
else:
line = TreePrinter.format_line(title, "error", "API error", prefix + connector)
print(line)
self._output(line)
self._update_progress()
errors += 1
self.stats["documents_errors"] += 1
self.errors.append({
@@ -646,7 +810,7 @@ class OutlineImporter:
# Skip children if parent failed
if children:
child_prefix = prefix + (TreePrinter.BLANK if is_last else TreePrinter.PIPE)
print(f"{child_prefix}└── (children skipped due to parent failure)")
self._output(f"{child_prefix}└── (children skipped due to parent failure)")
continue
# Process children recursively
@@ -712,58 +876,82 @@ class OutlineImporter:
logger.error("No collections found in source directory")
return
if self.single_mode:
# Single collection mode
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
single_collection_name = f"import_{timestamp}"
# Count total documents and initialize progress bar
total_docs = self._count_all_documents(source_collections)
print(f"Total documents to import: {total_docs}")
print()
logger.info(f"Creating single collection: {single_collection_name}")
collection_id = self._create_collection(single_collection_name)
if not collection_id and not self.dry_run:
logger.error("Failed to create import collection")
return
self.pbar = tqdm(
total=total_docs,
desc="Importing",
unit="doc",
dynamic_ncols=True,
file=sys.stdout,
leave=True,
mininterval=1.0, # Update at most once per second
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]"
)
self.stats["collections_created"] += 1
try:
if self.single_mode:
# Single collection mode
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
single_collection_name = f"import_{timestamp}"
for collection_dir in source_collections:
metadata = self.load_collection_metadata(collection_dir)
if not metadata:
continue
logger.info(f"Creating single collection: {single_collection_name}")
collection_id = self._create_collection(single_collection_name)
if not collection_id and not self.dry_run:
logger.error("Failed to create import collection")
return
collection_name = metadata.get("name", collection_dir.name)
doc_count = metadata.get("expected_count", 0)
self.stats["collections_created"] += 1
print(f"\n{collection_name}/ ({doc_count} documents)")
for collection_dir in source_collections:
metadata = self.load_collection_metadata(collection_dir)
if not metadata:
continue
# Create parent document for this "collection"
parent_doc_id = self._create_document(
collection_id,
collection_name,
f"# {collection_name}\n\nImported collection.",
parent_document_id=None
)
collection_name = metadata.get("name", collection_dir.name)
doc_count = metadata.get("expected_count", 0)
if parent_doc_id:
self.stats["documents_created"] += 1
self._output(f"\n{collection_name}/ ({doc_count} documents)")
# Import documents under this parent
self.import_collection(
collection_dir,
target_collection_id=collection_id,
parent_document_id=parent_doc_id
)
else:
# Standard mode: one collection per folder
for collection_dir in source_collections:
metadata = self.load_collection_metadata(collection_dir)
if not metadata:
continue
# Create parent document for this "collection"
parent_doc_id = self._create_document(
collection_id,
collection_name,
f"# {collection_name}\n\nImported collection.",
parent_document_id=None
)
collection_name = metadata.get("name", collection_dir.name)
doc_count = metadata.get("expected_count", 0)
if parent_doc_id:
self.stats["documents_created"] += 1
self._update_progress()
print(f"\n{collection_name}/ ({doc_count} documents)")
self.import_collection(collection_dir)
# Import documents under this parent
self.import_collection(
collection_dir,
target_collection_id=collection_id,
parent_document_id=parent_doc_id
)
else:
# Standard mode: one collection per folder
for collection_dir in source_collections:
metadata = self.load_collection_metadata(collection_dir)
if not metadata:
continue
collection_name = metadata.get("name", collection_dir.name)
doc_count = metadata.get("expected_count", 0)
self._output(f"\n{collection_name}/ ({doc_count} documents)")
self.import_collection(collection_dir)
finally:
# Close progress bar
if self.pbar:
self.pbar.close()
self.pbar = None
# Print summary
duration = time.time() - start_time