Files
outline-sync/outline_export_fixed.py
Claude d9161f64f5 Initial commit: Export tools and import script requirements
- export_with_trees.sh: Bash wrapper for Outline export
- outline_export_fixed.py: Python export implementation
- IMPORT_SCRIPT.MD: PRD for import script (to be built)
- RALPH_PROMPT.md: Ralph Loop prompt for building import script
- CLAUDE.md: Project documentation

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-19 22:33:55 +01:00

1032 lines
38 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Outline API Export Script - Enhanced Version
Exports all collections, documents, and their hierarchy from Outline wiki.
Reads configuration from settings.json in the current directory.
Improvements:
- Failed document tracking with detailed error reports
- Document caching to eliminate double API fetching
- Proper timeout configuration
- Depth limit protection for deep hierarchies
- Enhanced verification comparing with API counts
- Tree view visualization (before and after export)
- Recursive document counting for accurate verification
- Proper logging system with configurable levels
"""
import os
import sys
import json
import hashlib
import logging
import time
from datetime import datetime
from functools import wraps
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple, Callable, TypeVar
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
T = TypeVar('T')
def retry_on_failure(max_attempts: int = 3, backoff_factor: float = 1.0,
exceptions: tuple = (requests.RequestException,)) -> Callable:
"""
Decorator for retrying failed operations with exponential backoff.
Args:
max_attempts: Maximum number of retry attempts
backoff_factor: Multiplier for exponential backoff (wait = backoff_factor * 2^attempt)
exceptions: Tuple of exception types to catch and retry
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args, **kwargs) -> T:
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_attempts - 1:
wait_time = backoff_factor * (2 ** attempt)
logger.warning(f"Attempt {attempt + 1}/{max_attempts} failed: {e}. "
f"Retrying in {wait_time:.1f}s...")
time.sleep(wait_time)
else:
logger.error(f"All {max_attempts} attempts failed for {func.__name__}")
raise last_exception
return wrapper
return decorator
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s | %(levelname)-8s | %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger('outline_export')
# Try to import tqdm for progress bars
try:
from tqdm import tqdm
HAS_TQDM = True
except ImportError:
HAS_TQDM = False
logger.info("Install tqdm for progress bars: pip install tqdm")
class TreeViewGenerator:
"""Generate ASCII tree views of document hierarchies"""
@staticmethod
def generate_from_api(nav_nodes: List[Dict], prefix: str = "", is_last: bool = True) -> List[str]:
"""Generate tree view from API navigation structure"""
lines = []
for i, node in enumerate(nav_nodes):
is_last_node = (i == len(nav_nodes) - 1)
# Tree characters
if prefix == "":
connector = ""
else:
connector = "└── " if is_last_node else "├── "
title = node.get("title", "Untitled")
doc_id = node.get("id", "")[:8] # Short ID for display
lines.append(f"{prefix}{connector}{title} ({doc_id}...)")
# Process children
children = node.get("children", [])
if children:
if prefix == "":
child_prefix = ""
else:
child_prefix = prefix + (" " if is_last_node else "")
child_lines = TreeViewGenerator.generate_from_api(children, child_prefix, is_last_node)
lines.extend(child_lines)
return lines
@staticmethod
def generate_from_files(collection_path: Path, metadata: Dict) -> List[str]:
"""Generate tree view from exported files"""
lines = []
def build_tree_recursive(docs: List[Dict], prefix: str = "", is_last: bool = True) -> List[str]:
tree_lines = []
for i, doc in enumerate(docs):
is_last_node = (i == len(docs) - 1)
# Tree characters
if prefix == "":
connector = ""
else:
connector = "└── " if is_last_node else "├── "
filename = doc.get("filename", "Unknown")
tree_lines.append(f"{prefix}{connector}{filename}")
# Process children
children = doc.get("children", [])
if children:
if prefix == "":
child_prefix = ""
else:
child_prefix = prefix + (" " if is_last_node else "")
child_lines = build_tree_recursive(children, child_prefix, is_last_node)
tree_lines.extend(child_lines)
return tree_lines
documents = metadata.get("documents", [])
return build_tree_recursive(documents)
@staticmethod
def print_comparison(online_tree: List[str], exported_tree: List[str], collection_name: str):
"""Print comparison between online and exported structures"""
logger.info(f"--- Comparison for '{collection_name}' ---")
logger.info(f"Online documents: {len(online_tree)}")
logger.info(f"Exported files: {len(exported_tree)}")
if len(online_tree) == len(exported_tree):
logger.info("Counts match!")
else:
diff = abs(len(online_tree) - len(exported_tree))
logger.warning(f"Difference: {diff}")
class OutlineExporter:
"""Export Outline documents with enhanced error tracking and verification"""
def __init__(self, base_url: str, api_token: str, output_dir: str = "exports",
verify_after_export: bool = True, max_hierarchy_depth: int = 100,
show_progress: bool = True, generate_manifests: bool = True,
max_retries: int = 3, retry_backoff: float = 1.0):
self.base_url = base_url.rstrip('/')
self.api_token = api_token
self.output_dir = Path(output_dir)
self.verify_after_export = verify_after_export
self.max_hierarchy_depth = max_hierarchy_depth
self.show_progress = show_progress and HAS_TQDM
self.generate_manifests = generate_manifests
self.max_retries = max_retries
self.retry_backoff = retry_backoff
# Setup session with retry logic
self.session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
self.headers = {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json"
}
# NEW: Document cache to avoid double fetching
self.document_cache: Dict[str, Dict] = {}
# NEW: Track failed documents with detailed info
self.failed_documents: List[Dict] = []
# NEW: Track API errors
self.api_errors: List[Dict] = []
# NEW: Track expected vs actual counts per collection
self.collection_stats: Dict[str, Dict] = {}
# Manifest data
self.manifest = {
"export_date": datetime.now().isoformat(),
"source_url": self.base_url,
"collections": [],
"documents": [],
"failed_documents": [],
"statistics": {}
}
# Statistics
self.stats = {
"collections": 0,
"documents": 0,
"bytes_written": 0,
"failed": 0,
"api_errors": 0
}
def make_request(self, endpoint: str, data: Dict = None, method: str = "POST",
retry: bool = True) -> Optional[Dict]:
"""Make API request with error handling and optional retry.
Args:
endpoint: API endpoint path
data: Request body data
method: HTTP method (POST or GET)
retry: Whether to retry on failure (default True)
"""
url = f"{self.base_url}{endpoint}"
last_error = None
attempts = self.max_retries if retry else 1
for attempt in range(attempts):
try:
if method == "POST":
response = self.session.post(url, headers=self.headers, json=data or {}, timeout=30)
else:
response = self.session.get(url, headers=self.headers, timeout=30)
if response.status_code == 200:
return response.json()
elif response.status_code in [429, 500, 502, 503, 504] and attempt < attempts - 1:
# Retryable error
wait_time = self.retry_backoff * (2 ** attempt)
logger.warning(f"API error {response.status_code} on {endpoint}, "
f"retrying in {wait_time:.1f}s (attempt {attempt + 1}/{attempts})")
time.sleep(wait_time)
continue
else:
# Non-retryable error or final attempt
error_info = {
"endpoint": endpoint,
"status_code": response.status_code,
"error": response.text[:200],
"timestamp": datetime.now().isoformat()
}
self.api_errors.append(error_info)
self.stats["api_errors"] += 1
logger.error(f"API error on {endpoint}: HTTP {response.status_code}")
return None
except requests.RequestException as e:
last_error = e
if attempt < attempts - 1:
wait_time = self.retry_backoff * (2 ** attempt)
logger.warning(f"Request failed on {endpoint}: {e}, "
f"retrying in {wait_time:.1f}s (attempt {attempt + 1}/{attempts})")
time.sleep(wait_time)
else:
error_info = {
"endpoint": endpoint,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
self.api_errors.append(error_info)
self.stats["api_errors"] += 1
logger.error(f"All {attempts} attempts failed on {endpoint}: {e}")
return None
except Exception as e:
error_info = {
"endpoint": endpoint,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
self.api_errors.append(error_info)
self.stats["api_errors"] += 1
logger.exception(f"Unexpected exception on {endpoint}: {e}")
return None
return None
def health_check(self) -> bool:
"""
Verify API connectivity and authentication before export.
Returns:
True if API is accessible and authenticated, False otherwise
"""
logger.info("Performing health check...")
# Test API connectivity with auth.info endpoint
try:
result = self.make_request("/api/auth.info", retry=False)
if result and "data" in result:
user = result["data"].get("user", {})
team = result["data"].get("team", {})
logger.info(f"Authenticated as: {user.get('name', 'Unknown')} ({user.get('email', 'N/A')})")
logger.info(f"Team: {team.get('name', 'Unknown')}")
logger.info("Health check passed")
return True
else:
logger.error("Health check failed: Unable to verify authentication")
return False
except Exception as e:
logger.error(f"Health check failed: {e}")
return False
def get_collections(self) -> List[Dict]:
"""Fetch all collections"""
logger.info("Fetching collections...")
result = self.make_request("/api/collections.list")
if result and "data" in result:
collections = result["data"]
logger.info(f"Found {len(collections)} collections")
return collections
return []
def get_documents_in_collection(self, collection_id: str) -> Tuple[List[Dict], List[Dict]]:
"""
Fetch all documents in a collection
Returns: (list of documents, navigation tree)
"""
result = self.make_request("/api/documents.list", {"collectionId": collection_id})
documents = []
if result and "data" in result:
documents = result["data"]
# Also get navigation tree for hierarchy
nav_result = self.make_request("/api/collections.documents", {"id": collection_id})
nav_tree = []
if nav_result and "data" in nav_result:
nav_tree = nav_result["data"]
return documents, nav_tree
def get_document_info(self, doc_id: str) -> Optional[Dict]:
"""
Fetch full document content
FIXED: Uses cache to avoid double fetching
"""
# Check cache first
if doc_id in self.document_cache:
return self.document_cache[doc_id]
result = self.make_request("/api/documents.info", {"id": doc_id})
if result and "data" in result:
doc = result["data"]
# Cache the document
self.document_cache[doc_id] = doc
return doc
return None
def sanitize_filename(self, name: str) -> str:
"""Convert document title to safe filename"""
# Replace invalid characters
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
name = name.replace(char, '_')
# Limit length
if len(name) > 200:
name = name[:200]
return name.strip()
def calculate_checksum(self, content: str) -> str:
"""Calculate SHA256 checksum of content"""
return hashlib.sha256(content.encode('utf-8')).hexdigest()
def build_hierarchy(self, documents: List[Dict], nav_tree: List[Dict]) -> Dict:
"""
Build hierarchy mapping from navigation tree
FIXED: Export directly from nav_tree, don't rely on documents list
"""
hierarchy = {
"root": [],
"children": {},
"all_ids": set() # Track all document IDs we've seen
}
def process_nav_node(node: Dict, parent_id: Optional[str] = None):
doc_id = node["id"]
doc_title = node.get("title", "Untitled")
# Track this ID
hierarchy["all_ids"].add(doc_id)
# Create a minimal document dict from nav node
# We'll fetch full content during export
doc_data = {
"id": doc_id,
"title": doc_title,
"parentDocumentId": parent_id
}
# Add to hierarchy
if parent_id is None:
hierarchy["root"].append(doc_data)
else:
if parent_id not in hierarchy["children"]:
hierarchy["children"][parent_id] = []
hierarchy["children"][parent_id].append(doc_data)
# Process children recursively
children = node.get("children", [])
for child in children:
process_nav_node(child, doc_id)
for root_node in nav_tree:
process_nav_node(root_node)
return hierarchy
def export_document(self, document: Dict, collection_name: str, collection_path: Path,
hierarchy: Dict, level: int = 0) -> Optional[Dict]:
"""
Export a single document and its children recursively
FIXED: Enhanced error tracking and failed children tracking
"""
doc_id = document["id"]
doc_title = document.get("title", "Untitled")
if level == 0:
logger.debug(f"Exporting: {doc_title}")
# Fetch full document content (uses cache, so no double fetching)
full_doc = self.get_document_info(doc_id)
if not full_doc:
# FIXED: Track failed documents with details
self.failed_documents.append({
"id": doc_id,
"title": doc_title,
"collection": collection_name,
"reason": "Failed to fetch document info from API",
"level": level
})
logger.warning(f"Failed to fetch document: {doc_title} (ID: {doc_id})")
return None
# Generate filename
safe_title = self.sanitize_filename(doc_title)
filename = f"{safe_title}.md"
filepath = collection_path / filename
# Handle duplicates
counter = 1
while filepath.exists():
filename = f"{safe_title}_{counter}.md"
filepath = collection_path / filename
counter += 1
# Build markdown content
content = f"# {doc_title}\n\n"
content += f"<!-- Document ID: {doc_id} -->\n"
content += f"<!-- Created: {full_doc.get('createdAt')} -->\n"
content += f"<!-- Updated: {full_doc.get('updatedAt')} -->\n"
content += f"<!-- URL: {full_doc.get('url')} -->\n\n"
content += "---\n\n"
content += full_doc.get("text", "")
# Write file
try:
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
except Exception as e:
# Track file write failures
self.failed_documents.append({
"id": doc_id,
"title": doc_title,
"collection": collection_name,
"reason": f"Failed to write file: {e}",
"level": level
})
logger.error(f"Failed to write file for: {doc_title}")
return None
file_size = filepath.stat().st_size
self.stats["bytes_written"] += file_size
self.stats["documents"] += 1
# Calculate checksum
checksum = self.calculate_checksum(content)
# Build metadata
doc_metadata = {
"id": doc_id,
"title": doc_title,
"filename": filename,
"collection_name": collection_name,
"parent_id": document.get("parentDocumentId"),
"checksum": checksum,
"size_bytes": file_size,
"created_at": full_doc.get('createdAt'),
"updated_at": full_doc.get('updatedAt'),
"children": [],
"failed_children": [] # NEW: Track children that failed to export
}
# Add to manifest
if self.generate_manifests:
self.manifest["documents"].append(doc_metadata)
# Export children recursively
child_docs = hierarchy["children"].get(doc_id, [])
for child in child_docs:
child_metadata = self.export_document(
child, collection_name, collection_path, hierarchy, level + 1
)
if child_metadata:
doc_metadata["children"].append(child_metadata)
else:
# FIXED: Track failed children
doc_metadata["failed_children"].append({
"id": child["id"],
"title": child.get("title", "Untitled")
})
return doc_metadata
def export_collection(self, collection: Dict) -> None:
"""Export a single collection with all its documents"""
collection_id = collection["id"]
collection_name = collection["name"]
logger.info("=" * 60)
logger.info(f"Exporting collection: {collection_name}")
logger.info("=" * 60)
# Fetch documents and navigation tree
documents, nav_tree = self.get_documents_in_collection(collection_id)
# Build hierarchy from navigation tree
hierarchy = self.build_hierarchy(documents, nav_tree)
# FIXED: Count documents from nav_tree (source of truth), not documents.list
# The nav_tree includes ALL documents including nested ones
expected_count = len(hierarchy["all_ids"])
logger.info(f"Documents in navigation tree: {expected_count}")
if expected_count == 0:
logger.info("No documents to export")
# Still track this for statistics
self.collection_stats[collection_id] = {
"name": collection_name,
"expected": expected_count,
"fetched": 0,
"exported": 0
}
return
# Create collection directory
safe_name = self.sanitize_filename(collection_name)
collection_path = self.output_dir / safe_name
collection_path.mkdir(parents=True, exist_ok=True)
# NEW: Generate tree view of ONLINE structure
logger.info("--- Online Structure (from Outline API) ---")
online_tree = TreeViewGenerator.generate_from_api(nav_tree)
for line in online_tree[:20]: # Show first 20 lines
logger.info(line)
if len(online_tree) > 20:
logger.info(f"... and {len(online_tree) - 20} more lines")
# Prepare collection metadata
collection_metadata = {
"id": collection_id,
"name": collection_name,
"directory": safe_name,
"expected_count": expected_count, # From navigation tree (all nested docs)
"documents_list_count": len(documents), # From documents.list API
"document_count": 0, # Will be updated after export
"navigation_tree": nav_tree, # Preserve original navigation structure
"documents": []
}
# Export documents with optional progress bar
root_docs = hierarchy["root"]
if self.show_progress:
iterator = tqdm(root_docs, desc=f" Exporting {collection_name}", leave=False)
else:
iterator = root_docs
exported_count = 0
for doc in iterator:
doc_metadata = self.export_document(
doc, collection_name, collection_path, hierarchy
)
if doc_metadata:
collection_metadata["documents"].append(doc_metadata)
exported_count += 1
# FIXED: Count ALL documents recursively (including children)
def count_recursive(docs):
count = 0
for doc in docs:
count += 1 # Count this document
count += count_recursive(doc.get("children", [])) # Count children recursively
return count
actual_exported_count = count_recursive(collection_metadata["documents"])
# Update with actual exported count
collection_metadata["document_count"] = actual_exported_count
# Save collection metadata
metadata_path = collection_path / "_collection_metadata.json"
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(collection_metadata, f, indent=2, ensure_ascii=False)
# NEW: Generate tree view of EXPORTED files
logger.info("--- Exported Files (on disk) ---")
exported_tree = TreeViewGenerator.generate_from_files(collection_path, collection_metadata)
for line in exported_tree[:20]: # Show first 20 lines
logger.info(line)
if len(exported_tree) > 20:
logger.info(f"... and {len(exported_tree) - 20} more lines")
# NEW: Print comparison
TreeViewGenerator.print_comparison(online_tree, exported_tree, collection_name)
# Add to manifest
if self.generate_manifests:
self.manifest["collections"].append({
"id": collection_id,
"name": collection_name,
"directory": safe_name,
"expected_count": expected_count, # From nav_tree
"documents_list_count": len(documents), # From API documents.list
"exported_count": actual_exported_count # FIXED: Use recursive count
})
# NEW: Store collection stats
self.collection_stats[collection_id] = {
"name": collection_name,
"expected": expected_count, # From nav_tree (source of truth)
"documents_list_count": len(documents), # From API
"exported": actual_exported_count # FIXED: Use recursive count
}
self.stats["collections"] += 1
# Enhanced summary
if actual_exported_count == expected_count:
logger.info(f"Exported {actual_exported_count}/{expected_count} documents from '{collection_name}' - COMPLETE")
else:
missing = expected_count - actual_exported_count
logger.warning(f"Exported {actual_exported_count}/{expected_count} documents from '{collection_name}' - {missing} MISSING")
def save_manifest(self) -> None:
"""Save export manifest"""
if not self.generate_manifests:
return
manifest_path = self.output_dir / "manifest.json"
with open(manifest_path, 'w', encoding='utf-8') as f:
json.dump(self.manifest, f, indent=2, ensure_ascii=False)
def save_export_metadata(self) -> None:
"""Save export metadata with statistics"""
metadata = {
"export_date": datetime.now().isoformat(),
"source_url": self.base_url,
"statistics": self.stats,
"collections": self.manifest["collections"],
"failed_documents_count": len(self.failed_documents),
"api_errors_count": len(self.api_errors)
}
metadata_path = self.output_dir / "export_metadata.json"
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
def save_error_report(self) -> None:
"""Save detailed error report"""
if not self.failed_documents and not self.api_errors:
return
error_report = {
"export_date": datetime.now().isoformat(),
"failed_documents": self.failed_documents,
"api_errors": self.api_errors,
"statistics": {
"total_failed_documents": len(self.failed_documents),
"total_api_errors": len(self.api_errors)
}
}
error_path = self.output_dir / "export_errors.json"
with open(error_path, 'w', encoding='utf-8') as f:
json.dump(error_report, f, indent=2, ensure_ascii=False)
logger.warning(f"Error report saved to: {error_path}")
logger.warning(f" - {len(self.failed_documents)} failed documents")
logger.warning(f" - {len(self.api_errors)} API errors")
def verify_export(self) -> bool:
"""
Verify export integrity and completeness
FIXED: Enhanced verification comparing with API counts
"""
logger.info("=" * 60)
logger.info("Verifying Export Integrity and Completeness")
logger.info("=" * 60)
if not self.generate_manifests:
logger.warning("Skipping verification (manifests disabled)")
return True
errors = []
warnings = []
# NEW: Step 1 - Verify document count completeness
logger.info("Step 1: Verifying document count completeness...")
total_expected = 0
total_exported = 0
for coll_id, stats in self.collection_stats.items():
expected = stats["expected"]
exported = stats["exported"]
total_expected += expected
total_exported += exported
if exported < expected:
missing = expected - exported
warnings.append(f"Collection '{stats['name']}': {missing} documents missing")
if total_exported < total_expected:
errors.append(
f"Document count mismatch: Expected {total_expected} from API, "
f"exported {total_exported} (missing {total_expected - total_exported})"
)
else:
logger.info(f"All {total_expected} documents accounted for")
# Step 2 - Verify files exist and checksums match
logger.info("Step 2: Verifying file integrity...")
file_errors = 0
checksum_errors = 0
if self.show_progress:
iterator = tqdm(self.manifest["documents"], desc=" Verifying", leave=False)
else:
iterator = self.manifest["documents"]
for doc in iterator:
collection_dir = self.output_dir / doc["collection_name"]
filepath = collection_dir / doc["filename"]
# Check file exists
if not filepath.exists():
file_errors += 1
errors.append(f"Missing file: {doc['filename']}")
continue
# Verify checksum
try:
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
actual_checksum = self.calculate_checksum(content)
if actual_checksum != doc["checksum"]:
checksum_errors += 1
errors.append(f"Checksum mismatch: {doc['filename']}")
except Exception as e:
errors.append(f"Error reading file {doc['filename']}: {e}")
if file_errors == 0 and checksum_errors == 0:
logger.info(f"All {len(self.manifest['documents'])} files exist and checksums match")
# Summary
logger.info("=" * 60)
if errors:
logger.error(f"Verification FAILED: {len(errors)} critical errors")
for err in errors[:10]:
logger.error(f" - {err}")
if len(errors) > 10:
logger.error(f" ... and {len(errors) - 10} more errors")
return False
elif warnings:
logger.warning(f"Verification PASSED with warnings: {len(warnings)} issues")
for warn in warnings:
logger.warning(f" - {warn}")
return True
else:
logger.info("Verification PASSED - Export is complete and verified")
logger.info(f" - All {total_expected} documents from API accounted for")
logger.info(f" - All {len(self.manifest['documents'])} files exist")
logger.info(f" - All checksums match")
return True
def dry_run(self) -> Dict:
"""
Preview what would be exported without writing files.
Returns:
Dictionary with collection/document counts and estimated size
"""
logger.info("=" * 60)
logger.info("DRY RUN - Preview Export")
logger.info("=" * 60)
if not self.health_check():
logger.error("Dry run aborted due to failed health check")
return {}
collections = self.get_collections()
if not collections:
logger.warning("No collections found")
return {"collections": 0, "documents": 0}
total_docs = 0
results = {
"collections": [],
"total_collections": len(collections),
"total_documents": 0
}
for collection in collections:
_, nav_tree = self.get_documents_in_collection(collection["id"])
hierarchy = self.build_hierarchy([], nav_tree)
doc_count = len(hierarchy["all_ids"])
total_docs += doc_count
results["collections"].append({
"name": collection["name"],
"documents": doc_count
})
logger.info(f" {collection['name']}: {doc_count} documents")
results["total_documents"] = total_docs
logger.info("=" * 60)
logger.info(f"Total: {len(collections)} collections, {total_docs} documents")
logger.info("=" * 60)
logger.info("Dry run complete - no files written")
return results
def export_all(self, skip_health_check: bool = False) -> None:
"""Export all collections and documents.
Args:
skip_health_check: Skip the pre-export health check (default False)
"""
logger.info("=" * 60)
logger.info("OUTLINE EXPORT - ENHANCED VERSION")
logger.info("=" * 60)
logger.info(f"Source: {self.base_url}")
logger.info(f"Output: {self.output_dir}")
logger.info(f"Max depth: {self.max_hierarchy_depth}")
logger.info(f"Max retries: {self.max_retries}")
logger.info(f"Progress bars: {'Enabled' if self.show_progress else 'Disabled'}")
logger.info(f"Verification: {'Enabled' if self.verify_after_export else 'Disabled'}")
logger.info("=" * 60)
# Health check
if not skip_health_check:
if not self.health_check():
logger.error("Export aborted due to failed health check")
return
# Create output directory
self.output_dir.mkdir(parents=True, exist_ok=True)
# Fetch collections
collections = self.get_collections()
if not collections:
logger.error("No collections found or API error")
self.save_error_report()
return
# Export each collection
for collection in collections:
try:
self.export_collection(collection)
except Exception as e:
logger.error(f"Failed to export collection {collection['name']}: {e}")
self.api_errors.append({
"collection": collection['name'],
"error": str(e),
"timestamp": datetime.now().isoformat()
})
# Save manifests and metadata
self.save_manifest()
self.save_export_metadata()
if self.failed_documents or self.api_errors:
self.save_error_report()
# Print summary
logger.info("=" * 60)
logger.info("EXPORT SUMMARY")
logger.info("=" * 60)
logger.info(f"Collections exported: {self.stats['collections']}")
logger.info(f"Documents exported: {self.stats['documents']}")
logger.info(f"Total size: {self.stats['bytes_written'] / (1024*1024):.2f} MB")
logger.info(f"Failed documents: {len(self.failed_documents)}")
logger.info(f"API errors: {len(self.api_errors)}")
logger.info("=" * 60)
# Verify export
if self.verify_after_export:
verification_passed = self.verify_export()
if not verification_passed:
logger.warning("Export completed with verification errors")
logger.warning("Check export_errors.json for details")
else:
logger.info("Export completed (verification skipped)")
def load_settings(settings_file: str = "settings.json") -> Dict:
"""Load settings from JSON file"""
try:
with open(settings_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
logger.error(f"Settings file not found: {settings_file}")
logger.error("Create a settings.json file with your configuration")
sys.exit(1)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in settings file: {e}")
sys.exit(1)
def parse_args() -> 'argparse.Namespace':
"""Parse command line arguments."""
import argparse
parser = argparse.ArgumentParser(
description="Export Outline wiki documents",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'--dry-run', '-n',
action='store_true',
help='Preview what would be exported without writing files'
)
parser.add_argument(
'--output', '-o',
help='Output directory (overrides settings.json)'
)
parser.add_argument(
'--verbose', '-v',
action='count',
default=0,
help='Increase verbosity (use -vv for debug)'
)
parser.add_argument(
'--skip-verify',
action='store_true',
help='Skip post-export verification'
)
parser.add_argument(
'--skip-health-check',
action='store_true',
help='Skip pre-export health check'
)
parser.add_argument(
'--settings',
default='settings.json',
help='Path to settings file (default: settings.json)'
)
return parser.parse_args()
def main() -> None:
"""Main entry point"""
args = parse_args()
# Set log level based on verbosity
if args.verbose >= 2:
logger.setLevel(logging.DEBUG)
elif args.verbose == 1:
logger.setLevel(logging.INFO)
# Load settings
settings = load_settings(args.settings)
source = settings.get("source", {})
export_config = settings.get("export", {})
advanced = settings.get("advanced", {})
# Validate required settings
if not source.get("url") or not source.get("token"):
logger.error("Missing required settings: source.url and source.token")
sys.exit(1)
# CLI overrides for settings
output_dir = args.output or export_config.get("output_directory", "exports")
verify_after = not args.skip_verify and export_config.get("verify_after_export", True)
# Create exporter
exporter = OutlineExporter(
base_url=source["url"],
api_token=source["token"],
output_dir=output_dir,
verify_after_export=verify_after,
max_hierarchy_depth=advanced.get("max_hierarchy_depth", 100),
show_progress=advanced.get("progress_bar", True),
generate_manifests=advanced.get("generate_manifests", True),
max_retries=advanced.get("max_retries", 3),
retry_backoff=advanced.get("retry_backoff", 1.0)
)
# Run export or dry run
try:
if args.dry_run:
exporter.dry_run()
else:
exporter.export_all(skip_health_check=args.skip_health_check)
except KeyboardInterrupt:
logger.warning("Export cancelled by user")
sys.exit(1)
except Exception as e:
logger.exception(f"Export failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()