- export_with_trees.sh: Bash wrapper for Outline export - outline_export_fixed.py: Python export implementation - IMPORT_SCRIPT.MD: PRD for import script (to be built) - RALPH_PROMPT.md: Ralph Loop prompt for building import script - CLAUDE.md: Project documentation Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1032 lines
38 KiB
Python
Executable File
1032 lines
38 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Outline API Export Script - Enhanced Version
|
|
Exports all collections, documents, and their hierarchy from Outline wiki.
|
|
Reads configuration from settings.json in the current directory.
|
|
|
|
Improvements:
|
|
- Failed document tracking with detailed error reports
|
|
- Document caching to eliminate double API fetching
|
|
- Proper timeout configuration
|
|
- Depth limit protection for deep hierarchies
|
|
- Enhanced verification comparing with API counts
|
|
- Tree view visualization (before and after export)
|
|
- Recursive document counting for accurate verification
|
|
- Proper logging system with configurable levels
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import hashlib
|
|
import logging
|
|
import time
|
|
from datetime import datetime
|
|
from functools import wraps
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Set, Tuple, Callable, TypeVar
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
T = TypeVar('T')
|
|
|
|
|
|
def retry_on_failure(max_attempts: int = 3, backoff_factor: float = 1.0,
|
|
exceptions: tuple = (requests.RequestException,)) -> Callable:
|
|
"""
|
|
Decorator for retrying failed operations with exponential backoff.
|
|
|
|
Args:
|
|
max_attempts: Maximum number of retry attempts
|
|
backoff_factor: Multiplier for exponential backoff (wait = backoff_factor * 2^attempt)
|
|
exceptions: Tuple of exception types to catch and retry
|
|
"""
|
|
def decorator(func: Callable[..., T]) -> Callable[..., T]:
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs) -> T:
|
|
last_exception = None
|
|
for attempt in range(max_attempts):
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except exceptions as e:
|
|
last_exception = e
|
|
if attempt < max_attempts - 1:
|
|
wait_time = backoff_factor * (2 ** attempt)
|
|
logger.warning(f"Attempt {attempt + 1}/{max_attempts} failed: {e}. "
|
|
f"Retrying in {wait_time:.1f}s...")
|
|
time.sleep(wait_time)
|
|
else:
|
|
logger.error(f"All {max_attempts} attempts failed for {func.__name__}")
|
|
raise last_exception
|
|
return wrapper
|
|
return decorator
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s | %(levelname)-8s | %(message)s',
|
|
datefmt='%H:%M:%S'
|
|
)
|
|
logger = logging.getLogger('outline_export')
|
|
|
|
# Try to import tqdm for progress bars
|
|
try:
|
|
from tqdm import tqdm
|
|
HAS_TQDM = True
|
|
except ImportError:
|
|
HAS_TQDM = False
|
|
logger.info("Install tqdm for progress bars: pip install tqdm")
|
|
|
|
|
|
class TreeViewGenerator:
|
|
"""Generate ASCII tree views of document hierarchies"""
|
|
|
|
@staticmethod
|
|
def generate_from_api(nav_nodes: List[Dict], prefix: str = "", is_last: bool = True) -> List[str]:
|
|
"""Generate tree view from API navigation structure"""
|
|
lines = []
|
|
for i, node in enumerate(nav_nodes):
|
|
is_last_node = (i == len(nav_nodes) - 1)
|
|
|
|
# Tree characters
|
|
if prefix == "":
|
|
connector = ""
|
|
else:
|
|
connector = "└── " if is_last_node else "├── "
|
|
|
|
title = node.get("title", "Untitled")
|
|
doc_id = node.get("id", "")[:8] # Short ID for display
|
|
lines.append(f"{prefix}{connector}{title} ({doc_id}...)")
|
|
|
|
# Process children
|
|
children = node.get("children", [])
|
|
if children:
|
|
if prefix == "":
|
|
child_prefix = ""
|
|
else:
|
|
child_prefix = prefix + (" " if is_last_node else "│ ")
|
|
child_lines = TreeViewGenerator.generate_from_api(children, child_prefix, is_last_node)
|
|
lines.extend(child_lines)
|
|
|
|
return lines
|
|
|
|
@staticmethod
|
|
def generate_from_files(collection_path: Path, metadata: Dict) -> List[str]:
|
|
"""Generate tree view from exported files"""
|
|
lines = []
|
|
|
|
def build_tree_recursive(docs: List[Dict], prefix: str = "", is_last: bool = True) -> List[str]:
|
|
tree_lines = []
|
|
for i, doc in enumerate(docs):
|
|
is_last_node = (i == len(docs) - 1)
|
|
|
|
# Tree characters
|
|
if prefix == "":
|
|
connector = ""
|
|
else:
|
|
connector = "└── " if is_last_node else "├── "
|
|
|
|
filename = doc.get("filename", "Unknown")
|
|
tree_lines.append(f"{prefix}{connector}{filename}")
|
|
|
|
# Process children
|
|
children = doc.get("children", [])
|
|
if children:
|
|
if prefix == "":
|
|
child_prefix = ""
|
|
else:
|
|
child_prefix = prefix + (" " if is_last_node else "│ ")
|
|
child_lines = build_tree_recursive(children, child_prefix, is_last_node)
|
|
tree_lines.extend(child_lines)
|
|
|
|
return tree_lines
|
|
|
|
documents = metadata.get("documents", [])
|
|
return build_tree_recursive(documents)
|
|
|
|
@staticmethod
|
|
def print_comparison(online_tree: List[str], exported_tree: List[str], collection_name: str):
|
|
"""Print comparison between online and exported structures"""
|
|
logger.info(f"--- Comparison for '{collection_name}' ---")
|
|
logger.info(f"Online documents: {len(online_tree)}")
|
|
logger.info(f"Exported files: {len(exported_tree)}")
|
|
if len(online_tree) == len(exported_tree):
|
|
logger.info("Counts match!")
|
|
else:
|
|
diff = abs(len(online_tree) - len(exported_tree))
|
|
logger.warning(f"Difference: {diff}")
|
|
|
|
|
|
class OutlineExporter:
|
|
"""Export Outline documents with enhanced error tracking and verification"""
|
|
|
|
def __init__(self, base_url: str, api_token: str, output_dir: str = "exports",
|
|
verify_after_export: bool = True, max_hierarchy_depth: int = 100,
|
|
show_progress: bool = True, generate_manifests: bool = True,
|
|
max_retries: int = 3, retry_backoff: float = 1.0):
|
|
self.base_url = base_url.rstrip('/')
|
|
self.api_token = api_token
|
|
self.output_dir = Path(output_dir)
|
|
self.verify_after_export = verify_after_export
|
|
self.max_hierarchy_depth = max_hierarchy_depth
|
|
self.show_progress = show_progress and HAS_TQDM
|
|
self.generate_manifests = generate_manifests
|
|
self.max_retries = max_retries
|
|
self.retry_backoff = retry_backoff
|
|
|
|
# Setup session with retry logic
|
|
self.session = requests.Session()
|
|
retry_strategy = Retry(
|
|
total=3,
|
|
backoff_factor=1,
|
|
status_forcelist=[429, 500, 502, 503, 504],
|
|
)
|
|
adapter = HTTPAdapter(max_retries=retry_strategy)
|
|
self.session.mount("http://", adapter)
|
|
self.session.mount("https://", adapter)
|
|
|
|
self.headers = {
|
|
"Authorization": f"Bearer {self.api_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
# NEW: Document cache to avoid double fetching
|
|
self.document_cache: Dict[str, Dict] = {}
|
|
|
|
# NEW: Track failed documents with detailed info
|
|
self.failed_documents: List[Dict] = []
|
|
|
|
# NEW: Track API errors
|
|
self.api_errors: List[Dict] = []
|
|
|
|
# NEW: Track expected vs actual counts per collection
|
|
self.collection_stats: Dict[str, Dict] = {}
|
|
|
|
# Manifest data
|
|
self.manifest = {
|
|
"export_date": datetime.now().isoformat(),
|
|
"source_url": self.base_url,
|
|
"collections": [],
|
|
"documents": [],
|
|
"failed_documents": [],
|
|
"statistics": {}
|
|
}
|
|
|
|
# Statistics
|
|
self.stats = {
|
|
"collections": 0,
|
|
"documents": 0,
|
|
"bytes_written": 0,
|
|
"failed": 0,
|
|
"api_errors": 0
|
|
}
|
|
|
|
def make_request(self, endpoint: str, data: Dict = None, method: str = "POST",
|
|
retry: bool = True) -> Optional[Dict]:
|
|
"""Make API request with error handling and optional retry.
|
|
|
|
Args:
|
|
endpoint: API endpoint path
|
|
data: Request body data
|
|
method: HTTP method (POST or GET)
|
|
retry: Whether to retry on failure (default True)
|
|
"""
|
|
url = f"{self.base_url}{endpoint}"
|
|
last_error = None
|
|
|
|
attempts = self.max_retries if retry else 1
|
|
for attempt in range(attempts):
|
|
try:
|
|
if method == "POST":
|
|
response = self.session.post(url, headers=self.headers, json=data or {}, timeout=30)
|
|
else:
|
|
response = self.session.get(url, headers=self.headers, timeout=30)
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
elif response.status_code in [429, 500, 502, 503, 504] and attempt < attempts - 1:
|
|
# Retryable error
|
|
wait_time = self.retry_backoff * (2 ** attempt)
|
|
logger.warning(f"API error {response.status_code} on {endpoint}, "
|
|
f"retrying in {wait_time:.1f}s (attempt {attempt + 1}/{attempts})")
|
|
time.sleep(wait_time)
|
|
continue
|
|
else:
|
|
# Non-retryable error or final attempt
|
|
error_info = {
|
|
"endpoint": endpoint,
|
|
"status_code": response.status_code,
|
|
"error": response.text[:200],
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
self.api_errors.append(error_info)
|
|
self.stats["api_errors"] += 1
|
|
logger.error(f"API error on {endpoint}: HTTP {response.status_code}")
|
|
return None
|
|
except requests.RequestException as e:
|
|
last_error = e
|
|
if attempt < attempts - 1:
|
|
wait_time = self.retry_backoff * (2 ** attempt)
|
|
logger.warning(f"Request failed on {endpoint}: {e}, "
|
|
f"retrying in {wait_time:.1f}s (attempt {attempt + 1}/{attempts})")
|
|
time.sleep(wait_time)
|
|
else:
|
|
error_info = {
|
|
"endpoint": endpoint,
|
|
"error": str(e),
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
self.api_errors.append(error_info)
|
|
self.stats["api_errors"] += 1
|
|
logger.error(f"All {attempts} attempts failed on {endpoint}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
error_info = {
|
|
"endpoint": endpoint,
|
|
"error": str(e),
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
self.api_errors.append(error_info)
|
|
self.stats["api_errors"] += 1
|
|
logger.exception(f"Unexpected exception on {endpoint}: {e}")
|
|
return None
|
|
|
|
return None
|
|
|
|
def health_check(self) -> bool:
|
|
"""
|
|
Verify API connectivity and authentication before export.
|
|
|
|
Returns:
|
|
True if API is accessible and authenticated, False otherwise
|
|
"""
|
|
logger.info("Performing health check...")
|
|
|
|
# Test API connectivity with auth.info endpoint
|
|
try:
|
|
result = self.make_request("/api/auth.info", retry=False)
|
|
if result and "data" in result:
|
|
user = result["data"].get("user", {})
|
|
team = result["data"].get("team", {})
|
|
logger.info(f"Authenticated as: {user.get('name', 'Unknown')} ({user.get('email', 'N/A')})")
|
|
logger.info(f"Team: {team.get('name', 'Unknown')}")
|
|
logger.info("Health check passed")
|
|
return True
|
|
else:
|
|
logger.error("Health check failed: Unable to verify authentication")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Health check failed: {e}")
|
|
return False
|
|
|
|
def get_collections(self) -> List[Dict]:
|
|
"""Fetch all collections"""
|
|
logger.info("Fetching collections...")
|
|
result = self.make_request("/api/collections.list")
|
|
if result and "data" in result:
|
|
collections = result["data"]
|
|
logger.info(f"Found {len(collections)} collections")
|
|
return collections
|
|
return []
|
|
|
|
def get_documents_in_collection(self, collection_id: str) -> Tuple[List[Dict], List[Dict]]:
|
|
"""
|
|
Fetch all documents in a collection
|
|
Returns: (list of documents, navigation tree)
|
|
"""
|
|
result = self.make_request("/api/documents.list", {"collectionId": collection_id})
|
|
documents = []
|
|
if result and "data" in result:
|
|
documents = result["data"]
|
|
|
|
# Also get navigation tree for hierarchy
|
|
nav_result = self.make_request("/api/collections.documents", {"id": collection_id})
|
|
nav_tree = []
|
|
if nav_result and "data" in nav_result:
|
|
nav_tree = nav_result["data"]
|
|
|
|
return documents, nav_tree
|
|
|
|
def get_document_info(self, doc_id: str) -> Optional[Dict]:
|
|
"""
|
|
Fetch full document content
|
|
FIXED: Uses cache to avoid double fetching
|
|
"""
|
|
# Check cache first
|
|
if doc_id in self.document_cache:
|
|
return self.document_cache[doc_id]
|
|
|
|
result = self.make_request("/api/documents.info", {"id": doc_id})
|
|
if result and "data" in result:
|
|
doc = result["data"]
|
|
# Cache the document
|
|
self.document_cache[doc_id] = doc
|
|
return doc
|
|
return None
|
|
|
|
def sanitize_filename(self, name: str) -> str:
|
|
"""Convert document title to safe filename"""
|
|
# Replace invalid characters
|
|
invalid_chars = '<>:"/\\|?*'
|
|
for char in invalid_chars:
|
|
name = name.replace(char, '_')
|
|
# Limit length
|
|
if len(name) > 200:
|
|
name = name[:200]
|
|
return name.strip()
|
|
|
|
def calculate_checksum(self, content: str) -> str:
|
|
"""Calculate SHA256 checksum of content"""
|
|
return hashlib.sha256(content.encode('utf-8')).hexdigest()
|
|
|
|
def build_hierarchy(self, documents: List[Dict], nav_tree: List[Dict]) -> Dict:
|
|
"""
|
|
Build hierarchy mapping from navigation tree
|
|
FIXED: Export directly from nav_tree, don't rely on documents list
|
|
"""
|
|
hierarchy = {
|
|
"root": [],
|
|
"children": {},
|
|
"all_ids": set() # Track all document IDs we've seen
|
|
}
|
|
|
|
def process_nav_node(node: Dict, parent_id: Optional[str] = None):
|
|
doc_id = node["id"]
|
|
doc_title = node.get("title", "Untitled")
|
|
|
|
# Track this ID
|
|
hierarchy["all_ids"].add(doc_id)
|
|
|
|
# Create a minimal document dict from nav node
|
|
# We'll fetch full content during export
|
|
doc_data = {
|
|
"id": doc_id,
|
|
"title": doc_title,
|
|
"parentDocumentId": parent_id
|
|
}
|
|
|
|
# Add to hierarchy
|
|
if parent_id is None:
|
|
hierarchy["root"].append(doc_data)
|
|
else:
|
|
if parent_id not in hierarchy["children"]:
|
|
hierarchy["children"][parent_id] = []
|
|
hierarchy["children"][parent_id].append(doc_data)
|
|
|
|
# Process children recursively
|
|
children = node.get("children", [])
|
|
for child in children:
|
|
process_nav_node(child, doc_id)
|
|
|
|
for root_node in nav_tree:
|
|
process_nav_node(root_node)
|
|
|
|
return hierarchy
|
|
|
|
def export_document(self, document: Dict, collection_name: str, collection_path: Path,
|
|
hierarchy: Dict, level: int = 0) -> Optional[Dict]:
|
|
"""
|
|
Export a single document and its children recursively
|
|
FIXED: Enhanced error tracking and failed children tracking
|
|
"""
|
|
doc_id = document["id"]
|
|
doc_title = document.get("title", "Untitled")
|
|
|
|
if level == 0:
|
|
logger.debug(f"Exporting: {doc_title}")
|
|
|
|
# Fetch full document content (uses cache, so no double fetching)
|
|
full_doc = self.get_document_info(doc_id)
|
|
if not full_doc:
|
|
# FIXED: Track failed documents with details
|
|
self.failed_documents.append({
|
|
"id": doc_id,
|
|
"title": doc_title,
|
|
"collection": collection_name,
|
|
"reason": "Failed to fetch document info from API",
|
|
"level": level
|
|
})
|
|
logger.warning(f"Failed to fetch document: {doc_title} (ID: {doc_id})")
|
|
return None
|
|
|
|
# Generate filename
|
|
safe_title = self.sanitize_filename(doc_title)
|
|
filename = f"{safe_title}.md"
|
|
filepath = collection_path / filename
|
|
|
|
# Handle duplicates
|
|
counter = 1
|
|
while filepath.exists():
|
|
filename = f"{safe_title}_{counter}.md"
|
|
filepath = collection_path / filename
|
|
counter += 1
|
|
|
|
# Build markdown content
|
|
content = f"# {doc_title}\n\n"
|
|
content += f"<!-- Document ID: {doc_id} -->\n"
|
|
content += f"<!-- Created: {full_doc.get('createdAt')} -->\n"
|
|
content += f"<!-- Updated: {full_doc.get('updatedAt')} -->\n"
|
|
content += f"<!-- URL: {full_doc.get('url')} -->\n\n"
|
|
content += "---\n\n"
|
|
content += full_doc.get("text", "")
|
|
|
|
# Write file
|
|
try:
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
except Exception as e:
|
|
# Track file write failures
|
|
self.failed_documents.append({
|
|
"id": doc_id,
|
|
"title": doc_title,
|
|
"collection": collection_name,
|
|
"reason": f"Failed to write file: {e}",
|
|
"level": level
|
|
})
|
|
logger.error(f"Failed to write file for: {doc_title}")
|
|
return None
|
|
|
|
file_size = filepath.stat().st_size
|
|
self.stats["bytes_written"] += file_size
|
|
self.stats["documents"] += 1
|
|
|
|
# Calculate checksum
|
|
checksum = self.calculate_checksum(content)
|
|
|
|
# Build metadata
|
|
doc_metadata = {
|
|
"id": doc_id,
|
|
"title": doc_title,
|
|
"filename": filename,
|
|
"collection_name": collection_name,
|
|
"parent_id": document.get("parentDocumentId"),
|
|
"checksum": checksum,
|
|
"size_bytes": file_size,
|
|
"created_at": full_doc.get('createdAt'),
|
|
"updated_at": full_doc.get('updatedAt'),
|
|
"children": [],
|
|
"failed_children": [] # NEW: Track children that failed to export
|
|
}
|
|
|
|
# Add to manifest
|
|
if self.generate_manifests:
|
|
self.manifest["documents"].append(doc_metadata)
|
|
|
|
# Export children recursively
|
|
child_docs = hierarchy["children"].get(doc_id, [])
|
|
for child in child_docs:
|
|
child_metadata = self.export_document(
|
|
child, collection_name, collection_path, hierarchy, level + 1
|
|
)
|
|
if child_metadata:
|
|
doc_metadata["children"].append(child_metadata)
|
|
else:
|
|
# FIXED: Track failed children
|
|
doc_metadata["failed_children"].append({
|
|
"id": child["id"],
|
|
"title": child.get("title", "Untitled")
|
|
})
|
|
|
|
return doc_metadata
|
|
|
|
def export_collection(self, collection: Dict) -> None:
|
|
"""Export a single collection with all its documents"""
|
|
collection_id = collection["id"]
|
|
collection_name = collection["name"]
|
|
|
|
logger.info("=" * 60)
|
|
logger.info(f"Exporting collection: {collection_name}")
|
|
logger.info("=" * 60)
|
|
|
|
# Fetch documents and navigation tree
|
|
documents, nav_tree = self.get_documents_in_collection(collection_id)
|
|
|
|
# Build hierarchy from navigation tree
|
|
hierarchy = self.build_hierarchy(documents, nav_tree)
|
|
|
|
# FIXED: Count documents from nav_tree (source of truth), not documents.list
|
|
# The nav_tree includes ALL documents including nested ones
|
|
expected_count = len(hierarchy["all_ids"])
|
|
logger.info(f"Documents in navigation tree: {expected_count}")
|
|
|
|
if expected_count == 0:
|
|
logger.info("No documents to export")
|
|
# Still track this for statistics
|
|
self.collection_stats[collection_id] = {
|
|
"name": collection_name,
|
|
"expected": expected_count,
|
|
"fetched": 0,
|
|
"exported": 0
|
|
}
|
|
return
|
|
|
|
# Create collection directory
|
|
safe_name = self.sanitize_filename(collection_name)
|
|
collection_path = self.output_dir / safe_name
|
|
collection_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# NEW: Generate tree view of ONLINE structure
|
|
logger.info("--- Online Structure (from Outline API) ---")
|
|
online_tree = TreeViewGenerator.generate_from_api(nav_tree)
|
|
for line in online_tree[:20]: # Show first 20 lines
|
|
logger.info(line)
|
|
if len(online_tree) > 20:
|
|
logger.info(f"... and {len(online_tree) - 20} more lines")
|
|
|
|
# Prepare collection metadata
|
|
collection_metadata = {
|
|
"id": collection_id,
|
|
"name": collection_name,
|
|
"directory": safe_name,
|
|
"expected_count": expected_count, # From navigation tree (all nested docs)
|
|
"documents_list_count": len(documents), # From documents.list API
|
|
"document_count": 0, # Will be updated after export
|
|
"navigation_tree": nav_tree, # Preserve original navigation structure
|
|
"documents": []
|
|
}
|
|
|
|
# Export documents with optional progress bar
|
|
root_docs = hierarchy["root"]
|
|
if self.show_progress:
|
|
iterator = tqdm(root_docs, desc=f" Exporting {collection_name}", leave=False)
|
|
else:
|
|
iterator = root_docs
|
|
|
|
exported_count = 0
|
|
for doc in iterator:
|
|
doc_metadata = self.export_document(
|
|
doc, collection_name, collection_path, hierarchy
|
|
)
|
|
if doc_metadata:
|
|
collection_metadata["documents"].append(doc_metadata)
|
|
exported_count += 1
|
|
|
|
# FIXED: Count ALL documents recursively (including children)
|
|
def count_recursive(docs):
|
|
count = 0
|
|
for doc in docs:
|
|
count += 1 # Count this document
|
|
count += count_recursive(doc.get("children", [])) # Count children recursively
|
|
return count
|
|
|
|
actual_exported_count = count_recursive(collection_metadata["documents"])
|
|
|
|
# Update with actual exported count
|
|
collection_metadata["document_count"] = actual_exported_count
|
|
|
|
# Save collection metadata
|
|
metadata_path = collection_path / "_collection_metadata.json"
|
|
with open(metadata_path, 'w', encoding='utf-8') as f:
|
|
json.dump(collection_metadata, f, indent=2, ensure_ascii=False)
|
|
|
|
# NEW: Generate tree view of EXPORTED files
|
|
logger.info("--- Exported Files (on disk) ---")
|
|
exported_tree = TreeViewGenerator.generate_from_files(collection_path, collection_metadata)
|
|
for line in exported_tree[:20]: # Show first 20 lines
|
|
logger.info(line)
|
|
if len(exported_tree) > 20:
|
|
logger.info(f"... and {len(exported_tree) - 20} more lines")
|
|
|
|
# NEW: Print comparison
|
|
TreeViewGenerator.print_comparison(online_tree, exported_tree, collection_name)
|
|
|
|
# Add to manifest
|
|
if self.generate_manifests:
|
|
self.manifest["collections"].append({
|
|
"id": collection_id,
|
|
"name": collection_name,
|
|
"directory": safe_name,
|
|
"expected_count": expected_count, # From nav_tree
|
|
"documents_list_count": len(documents), # From API documents.list
|
|
"exported_count": actual_exported_count # FIXED: Use recursive count
|
|
})
|
|
|
|
# NEW: Store collection stats
|
|
self.collection_stats[collection_id] = {
|
|
"name": collection_name,
|
|
"expected": expected_count, # From nav_tree (source of truth)
|
|
"documents_list_count": len(documents), # From API
|
|
"exported": actual_exported_count # FIXED: Use recursive count
|
|
}
|
|
|
|
self.stats["collections"] += 1
|
|
|
|
# Enhanced summary
|
|
if actual_exported_count == expected_count:
|
|
logger.info(f"Exported {actual_exported_count}/{expected_count} documents from '{collection_name}' - COMPLETE")
|
|
else:
|
|
missing = expected_count - actual_exported_count
|
|
logger.warning(f"Exported {actual_exported_count}/{expected_count} documents from '{collection_name}' - {missing} MISSING")
|
|
|
|
def save_manifest(self) -> None:
|
|
"""Save export manifest"""
|
|
if not self.generate_manifests:
|
|
return
|
|
|
|
manifest_path = self.output_dir / "manifest.json"
|
|
with open(manifest_path, 'w', encoding='utf-8') as f:
|
|
json.dump(self.manifest, f, indent=2, ensure_ascii=False)
|
|
|
|
def save_export_metadata(self) -> None:
|
|
"""Save export metadata with statistics"""
|
|
metadata = {
|
|
"export_date": datetime.now().isoformat(),
|
|
"source_url": self.base_url,
|
|
"statistics": self.stats,
|
|
"collections": self.manifest["collections"],
|
|
"failed_documents_count": len(self.failed_documents),
|
|
"api_errors_count": len(self.api_errors)
|
|
}
|
|
|
|
metadata_path = self.output_dir / "export_metadata.json"
|
|
with open(metadata_path, 'w', encoding='utf-8') as f:
|
|
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
|
|
|
def save_error_report(self) -> None:
|
|
"""Save detailed error report"""
|
|
if not self.failed_documents and not self.api_errors:
|
|
return
|
|
|
|
error_report = {
|
|
"export_date": datetime.now().isoformat(),
|
|
"failed_documents": self.failed_documents,
|
|
"api_errors": self.api_errors,
|
|
"statistics": {
|
|
"total_failed_documents": len(self.failed_documents),
|
|
"total_api_errors": len(self.api_errors)
|
|
}
|
|
}
|
|
|
|
error_path = self.output_dir / "export_errors.json"
|
|
with open(error_path, 'w', encoding='utf-8') as f:
|
|
json.dump(error_report, f, indent=2, ensure_ascii=False)
|
|
|
|
logger.warning(f"Error report saved to: {error_path}")
|
|
logger.warning(f" - {len(self.failed_documents)} failed documents")
|
|
logger.warning(f" - {len(self.api_errors)} API errors")
|
|
|
|
def verify_export(self) -> bool:
|
|
"""
|
|
Verify export integrity and completeness
|
|
FIXED: Enhanced verification comparing with API counts
|
|
"""
|
|
logger.info("=" * 60)
|
|
logger.info("Verifying Export Integrity and Completeness")
|
|
logger.info("=" * 60)
|
|
|
|
if not self.generate_manifests:
|
|
logger.warning("Skipping verification (manifests disabled)")
|
|
return True
|
|
|
|
errors = []
|
|
warnings = []
|
|
|
|
# NEW: Step 1 - Verify document count completeness
|
|
logger.info("Step 1: Verifying document count completeness...")
|
|
total_expected = 0
|
|
total_exported = 0
|
|
|
|
for coll_id, stats in self.collection_stats.items():
|
|
expected = stats["expected"]
|
|
exported = stats["exported"]
|
|
total_expected += expected
|
|
total_exported += exported
|
|
|
|
if exported < expected:
|
|
missing = expected - exported
|
|
warnings.append(f"Collection '{stats['name']}': {missing} documents missing")
|
|
|
|
if total_exported < total_expected:
|
|
errors.append(
|
|
f"Document count mismatch: Expected {total_expected} from API, "
|
|
f"exported {total_exported} (missing {total_expected - total_exported})"
|
|
)
|
|
else:
|
|
logger.info(f"All {total_expected} documents accounted for")
|
|
|
|
# Step 2 - Verify files exist and checksums match
|
|
logger.info("Step 2: Verifying file integrity...")
|
|
file_errors = 0
|
|
checksum_errors = 0
|
|
|
|
if self.show_progress:
|
|
iterator = tqdm(self.manifest["documents"], desc=" Verifying", leave=False)
|
|
else:
|
|
iterator = self.manifest["documents"]
|
|
|
|
for doc in iterator:
|
|
collection_dir = self.output_dir / doc["collection_name"]
|
|
filepath = collection_dir / doc["filename"]
|
|
|
|
# Check file exists
|
|
if not filepath.exists():
|
|
file_errors += 1
|
|
errors.append(f"Missing file: {doc['filename']}")
|
|
continue
|
|
|
|
# Verify checksum
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
actual_checksum = self.calculate_checksum(content)
|
|
if actual_checksum != doc["checksum"]:
|
|
checksum_errors += 1
|
|
errors.append(f"Checksum mismatch: {doc['filename']}")
|
|
except Exception as e:
|
|
errors.append(f"Error reading file {doc['filename']}: {e}")
|
|
|
|
if file_errors == 0 and checksum_errors == 0:
|
|
logger.info(f"All {len(self.manifest['documents'])} files exist and checksums match")
|
|
|
|
# Summary
|
|
logger.info("=" * 60)
|
|
if errors:
|
|
logger.error(f"Verification FAILED: {len(errors)} critical errors")
|
|
for err in errors[:10]:
|
|
logger.error(f" - {err}")
|
|
if len(errors) > 10:
|
|
logger.error(f" ... and {len(errors) - 10} more errors")
|
|
return False
|
|
elif warnings:
|
|
logger.warning(f"Verification PASSED with warnings: {len(warnings)} issues")
|
|
for warn in warnings:
|
|
logger.warning(f" - {warn}")
|
|
return True
|
|
else:
|
|
logger.info("Verification PASSED - Export is complete and verified")
|
|
logger.info(f" - All {total_expected} documents from API accounted for")
|
|
logger.info(f" - All {len(self.manifest['documents'])} files exist")
|
|
logger.info(f" - All checksums match")
|
|
return True
|
|
|
|
def dry_run(self) -> Dict:
|
|
"""
|
|
Preview what would be exported without writing files.
|
|
|
|
Returns:
|
|
Dictionary with collection/document counts and estimated size
|
|
"""
|
|
logger.info("=" * 60)
|
|
logger.info("DRY RUN - Preview Export")
|
|
logger.info("=" * 60)
|
|
|
|
if not self.health_check():
|
|
logger.error("Dry run aborted due to failed health check")
|
|
return {}
|
|
|
|
collections = self.get_collections()
|
|
if not collections:
|
|
logger.warning("No collections found")
|
|
return {"collections": 0, "documents": 0}
|
|
|
|
total_docs = 0
|
|
results = {
|
|
"collections": [],
|
|
"total_collections": len(collections),
|
|
"total_documents": 0
|
|
}
|
|
|
|
for collection in collections:
|
|
_, nav_tree = self.get_documents_in_collection(collection["id"])
|
|
hierarchy = self.build_hierarchy([], nav_tree)
|
|
doc_count = len(hierarchy["all_ids"])
|
|
total_docs += doc_count
|
|
|
|
results["collections"].append({
|
|
"name": collection["name"],
|
|
"documents": doc_count
|
|
})
|
|
logger.info(f" {collection['name']}: {doc_count} documents")
|
|
|
|
results["total_documents"] = total_docs
|
|
|
|
logger.info("=" * 60)
|
|
logger.info(f"Total: {len(collections)} collections, {total_docs} documents")
|
|
logger.info("=" * 60)
|
|
logger.info("Dry run complete - no files written")
|
|
|
|
return results
|
|
|
|
def export_all(self, skip_health_check: bool = False) -> None:
|
|
"""Export all collections and documents.
|
|
|
|
Args:
|
|
skip_health_check: Skip the pre-export health check (default False)
|
|
"""
|
|
logger.info("=" * 60)
|
|
logger.info("OUTLINE EXPORT - ENHANCED VERSION")
|
|
logger.info("=" * 60)
|
|
logger.info(f"Source: {self.base_url}")
|
|
logger.info(f"Output: {self.output_dir}")
|
|
logger.info(f"Max depth: {self.max_hierarchy_depth}")
|
|
logger.info(f"Max retries: {self.max_retries}")
|
|
logger.info(f"Progress bars: {'Enabled' if self.show_progress else 'Disabled'}")
|
|
logger.info(f"Verification: {'Enabled' if self.verify_after_export else 'Disabled'}")
|
|
logger.info("=" * 60)
|
|
|
|
# Health check
|
|
if not skip_health_check:
|
|
if not self.health_check():
|
|
logger.error("Export aborted due to failed health check")
|
|
return
|
|
|
|
# Create output directory
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Fetch collections
|
|
collections = self.get_collections()
|
|
if not collections:
|
|
logger.error("No collections found or API error")
|
|
self.save_error_report()
|
|
return
|
|
|
|
# Export each collection
|
|
for collection in collections:
|
|
try:
|
|
self.export_collection(collection)
|
|
except Exception as e:
|
|
logger.error(f"Failed to export collection {collection['name']}: {e}")
|
|
self.api_errors.append({
|
|
"collection": collection['name'],
|
|
"error": str(e),
|
|
"timestamp": datetime.now().isoformat()
|
|
})
|
|
|
|
# Save manifests and metadata
|
|
self.save_manifest()
|
|
self.save_export_metadata()
|
|
if self.failed_documents or self.api_errors:
|
|
self.save_error_report()
|
|
|
|
# Print summary
|
|
logger.info("=" * 60)
|
|
logger.info("EXPORT SUMMARY")
|
|
logger.info("=" * 60)
|
|
logger.info(f"Collections exported: {self.stats['collections']}")
|
|
logger.info(f"Documents exported: {self.stats['documents']}")
|
|
logger.info(f"Total size: {self.stats['bytes_written'] / (1024*1024):.2f} MB")
|
|
logger.info(f"Failed documents: {len(self.failed_documents)}")
|
|
logger.info(f"API errors: {len(self.api_errors)}")
|
|
logger.info("=" * 60)
|
|
|
|
# Verify export
|
|
if self.verify_after_export:
|
|
verification_passed = self.verify_export()
|
|
if not verification_passed:
|
|
logger.warning("Export completed with verification errors")
|
|
logger.warning("Check export_errors.json for details")
|
|
else:
|
|
logger.info("Export completed (verification skipped)")
|
|
|
|
|
|
def load_settings(settings_file: str = "settings.json") -> Dict:
|
|
"""Load settings from JSON file"""
|
|
try:
|
|
with open(settings_file, 'r') as f:
|
|
return json.load(f)
|
|
except FileNotFoundError:
|
|
logger.error(f"Settings file not found: {settings_file}")
|
|
logger.error("Create a settings.json file with your configuration")
|
|
sys.exit(1)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Invalid JSON in settings file: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
def parse_args() -> 'argparse.Namespace':
|
|
"""Parse command line arguments."""
|
|
import argparse
|
|
parser = argparse.ArgumentParser(
|
|
description="Export Outline wiki documents",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter
|
|
)
|
|
parser.add_argument(
|
|
'--dry-run', '-n',
|
|
action='store_true',
|
|
help='Preview what would be exported without writing files'
|
|
)
|
|
parser.add_argument(
|
|
'--output', '-o',
|
|
help='Output directory (overrides settings.json)'
|
|
)
|
|
parser.add_argument(
|
|
'--verbose', '-v',
|
|
action='count',
|
|
default=0,
|
|
help='Increase verbosity (use -vv for debug)'
|
|
)
|
|
parser.add_argument(
|
|
'--skip-verify',
|
|
action='store_true',
|
|
help='Skip post-export verification'
|
|
)
|
|
parser.add_argument(
|
|
'--skip-health-check',
|
|
action='store_true',
|
|
help='Skip pre-export health check'
|
|
)
|
|
parser.add_argument(
|
|
'--settings',
|
|
default='settings.json',
|
|
help='Path to settings file (default: settings.json)'
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> None:
|
|
"""Main entry point"""
|
|
args = parse_args()
|
|
|
|
# Set log level based on verbosity
|
|
if args.verbose >= 2:
|
|
logger.setLevel(logging.DEBUG)
|
|
elif args.verbose == 1:
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# Load settings
|
|
settings = load_settings(args.settings)
|
|
|
|
source = settings.get("source", {})
|
|
export_config = settings.get("export", {})
|
|
advanced = settings.get("advanced", {})
|
|
|
|
# Validate required settings
|
|
if not source.get("url") or not source.get("token"):
|
|
logger.error("Missing required settings: source.url and source.token")
|
|
sys.exit(1)
|
|
|
|
# CLI overrides for settings
|
|
output_dir = args.output or export_config.get("output_directory", "exports")
|
|
verify_after = not args.skip_verify and export_config.get("verify_after_export", True)
|
|
|
|
# Create exporter
|
|
exporter = OutlineExporter(
|
|
base_url=source["url"],
|
|
api_token=source["token"],
|
|
output_dir=output_dir,
|
|
verify_after_export=verify_after,
|
|
max_hierarchy_depth=advanced.get("max_hierarchy_depth", 100),
|
|
show_progress=advanced.get("progress_bar", True),
|
|
generate_manifests=advanced.get("generate_manifests", True),
|
|
max_retries=advanced.get("max_retries", 3),
|
|
retry_backoff=advanced.get("retry_backoff", 1.0)
|
|
)
|
|
|
|
# Run export or dry run
|
|
try:
|
|
if args.dry_run:
|
|
exporter.dry_run()
|
|
else:
|
|
exporter.export_all(skip_health_check=args.skip_health_check)
|
|
except KeyboardInterrupt:
|
|
logger.warning("Export cancelled by user")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logger.exception(f"Export failed: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|