#!/usr/bin/env python3 """ Outline API Export Script - Enhanced Version Exports all collections, documents, and their hierarchy from Outline wiki. Reads configuration from settings.json in the current directory. Improvements: - Failed document tracking with detailed error reports - Document caching to eliminate double API fetching - Proper timeout configuration - Depth limit protection for deep hierarchies - Enhanced verification comparing with API counts - Tree view visualization (before and after export) - Recursive document counting for accurate verification - Proper logging system with configurable levels """ import os import sys import json import hashlib import logging import time from datetime import datetime from functools import wraps from pathlib import Path from typing import Dict, List, Optional, Set, Tuple, Callable, TypeVar import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry T = TypeVar('T') def retry_on_failure(max_attempts: int = 3, backoff_factor: float = 1.0, exceptions: tuple = (requests.RequestException,)) -> Callable: """ Decorator for retrying failed operations with exponential backoff. Args: max_attempts: Maximum number of retry attempts backoff_factor: Multiplier for exponential backoff (wait = backoff_factor * 2^attempt) exceptions: Tuple of exception types to catch and retry """ def decorator(func: Callable[..., T]) -> Callable[..., T]: @wraps(func) def wrapper(*args, **kwargs) -> T: last_exception = None for attempt in range(max_attempts): try: return func(*args, **kwargs) except exceptions as e: last_exception = e if attempt < max_attempts - 1: wait_time = backoff_factor * (2 ** attempt) logger.warning(f"Attempt {attempt + 1}/{max_attempts} failed: {e}. " f"Retrying in {wait_time:.1f}s...") time.sleep(wait_time) else: logger.error(f"All {max_attempts} attempts failed for {func.__name__}") raise last_exception return wrapper return decorator # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s | %(levelname)-8s | %(message)s', datefmt='%H:%M:%S' ) logger = logging.getLogger('outline_export') # Try to import tqdm for progress bars try: from tqdm import tqdm HAS_TQDM = True except ImportError: HAS_TQDM = False logger.info("Install tqdm for progress bars: pip install tqdm") class TreeViewGenerator: """Generate ASCII tree views of document hierarchies""" @staticmethod def generate_from_api(nav_nodes: List[Dict], prefix: str = "", is_last: bool = True) -> List[str]: """Generate tree view from API navigation structure""" lines = [] for i, node in enumerate(nav_nodes): is_last_node = (i == len(nav_nodes) - 1) # Tree characters if prefix == "": connector = "" else: connector = "└── " if is_last_node else "├── " title = node.get("title", "Untitled") doc_id = node.get("id", "")[:8] # Short ID for display lines.append(f"{prefix}{connector}{title} ({doc_id}...)") # Process children children = node.get("children", []) if children: if prefix == "": child_prefix = "" else: child_prefix = prefix + (" " if is_last_node else "│ ") child_lines = TreeViewGenerator.generate_from_api(children, child_prefix, is_last_node) lines.extend(child_lines) return lines @staticmethod def generate_from_files(collection_path: Path, metadata: Dict) -> List[str]: """Generate tree view from exported files""" lines = [] def build_tree_recursive(docs: List[Dict], prefix: str = "", is_last: bool = True) -> List[str]: tree_lines = [] for i, doc in enumerate(docs): is_last_node = (i == len(docs) - 1) # Tree characters if prefix == "": connector = "" else: connector = "└── " if is_last_node else "├── " filename = doc.get("filename", "Unknown") tree_lines.append(f"{prefix}{connector}{filename}") # Process children children = doc.get("children", []) if children: if prefix == "": child_prefix = "" else: child_prefix = prefix + (" " if is_last_node else "│ ") child_lines = build_tree_recursive(children, child_prefix, is_last_node) tree_lines.extend(child_lines) return tree_lines documents = metadata.get("documents", []) return build_tree_recursive(documents) @staticmethod def print_comparison(online_tree: List[str], exported_tree: List[str], collection_name: str): """Print comparison between online and exported structures""" logger.info(f"--- Comparison for '{collection_name}' ---") logger.info(f"Online documents: {len(online_tree)}") logger.info(f"Exported files: {len(exported_tree)}") if len(online_tree) == len(exported_tree): logger.info("Counts match!") else: diff = abs(len(online_tree) - len(exported_tree)) logger.warning(f"Difference: {diff}") class OutlineExporter: """Export Outline documents with enhanced error tracking and verification""" def __init__(self, base_url: str, api_token: str, output_dir: str = "exports", verify_after_export: bool = True, max_hierarchy_depth: int = 100, show_progress: bool = True, generate_manifests: bool = True, max_retries: int = 3, retry_backoff: float = 1.0): self.base_url = base_url.rstrip('/') self.api_token = api_token self.output_dir = Path(output_dir) self.verify_after_export = verify_after_export self.max_hierarchy_depth = max_hierarchy_depth self.show_progress = show_progress and HAS_TQDM self.generate_manifests = generate_manifests self.max_retries = max_retries self.retry_backoff = retry_backoff # Setup session with retry logic self.session = requests.Session() retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], ) adapter = HTTPAdapter(max_retries=retry_strategy) self.session.mount("http://", adapter) self.session.mount("https://", adapter) self.headers = { "Authorization": f"Bearer {self.api_token}", "Content-Type": "application/json" } # NEW: Document cache to avoid double fetching self.document_cache: Dict[str, Dict] = {} # NEW: Track failed documents with detailed info self.failed_documents: List[Dict] = [] # NEW: Track API errors self.api_errors: List[Dict] = [] # NEW: Track expected vs actual counts per collection self.collection_stats: Dict[str, Dict] = {} # Manifest data self.manifest = { "export_date": datetime.now().isoformat(), "source_url": self.base_url, "collections": [], "documents": [], "failed_documents": [], "statistics": {} } # Statistics self.stats = { "collections": 0, "documents": 0, "bytes_written": 0, "failed": 0, "api_errors": 0 } def make_request(self, endpoint: str, data: Dict = None, method: str = "POST", retry: bool = True) -> Optional[Dict]: """Make API request with error handling and optional retry. Args: endpoint: API endpoint path data: Request body data method: HTTP method (POST or GET) retry: Whether to retry on failure (default True) """ url = f"{self.base_url}{endpoint}" last_error = None attempts = self.max_retries if retry else 1 for attempt in range(attempts): try: if method == "POST": response = self.session.post(url, headers=self.headers, json=data or {}, timeout=30) else: response = self.session.get(url, headers=self.headers, timeout=30) if response.status_code == 200: return response.json() elif response.status_code in [429, 500, 502, 503, 504] and attempt < attempts - 1: # Retryable error wait_time = self.retry_backoff * (2 ** attempt) logger.warning(f"API error {response.status_code} on {endpoint}, " f"retrying in {wait_time:.1f}s (attempt {attempt + 1}/{attempts})") time.sleep(wait_time) continue else: # Non-retryable error or final attempt error_info = { "endpoint": endpoint, "status_code": response.status_code, "error": response.text[:200], "timestamp": datetime.now().isoformat() } self.api_errors.append(error_info) self.stats["api_errors"] += 1 logger.error(f"API error on {endpoint}: HTTP {response.status_code}") return None except requests.RequestException as e: last_error = e if attempt < attempts - 1: wait_time = self.retry_backoff * (2 ** attempt) logger.warning(f"Request failed on {endpoint}: {e}, " f"retrying in {wait_time:.1f}s (attempt {attempt + 1}/{attempts})") time.sleep(wait_time) else: error_info = { "endpoint": endpoint, "error": str(e), "timestamp": datetime.now().isoformat() } self.api_errors.append(error_info) self.stats["api_errors"] += 1 logger.error(f"All {attempts} attempts failed on {endpoint}: {e}") return None except Exception as e: error_info = { "endpoint": endpoint, "error": str(e), "timestamp": datetime.now().isoformat() } self.api_errors.append(error_info) self.stats["api_errors"] += 1 logger.exception(f"Unexpected exception on {endpoint}: {e}") return None return None def health_check(self) -> bool: """ Verify API connectivity and authentication before export. Returns: True if API is accessible and authenticated, False otherwise """ logger.info("Performing health check...") # Test API connectivity with auth.info endpoint try: result = self.make_request("/api/auth.info", retry=False) if result and "data" in result: user = result["data"].get("user", {}) team = result["data"].get("team", {}) logger.info(f"Authenticated as: {user.get('name', 'Unknown')} ({user.get('email', 'N/A')})") logger.info(f"Team: {team.get('name', 'Unknown')}") logger.info("Health check passed") return True else: logger.error("Health check failed: Unable to verify authentication") return False except Exception as e: logger.error(f"Health check failed: {e}") return False def get_collections(self) -> List[Dict]: """Fetch all collections""" logger.info("Fetching collections...") result = self.make_request("/api/collections.list") if result and "data" in result: collections = result["data"] logger.info(f"Found {len(collections)} collections") return collections return [] def get_documents_in_collection(self, collection_id: str) -> Tuple[List[Dict], List[Dict]]: """ Fetch all documents in a collection Returns: (list of documents, navigation tree) """ result = self.make_request("/api/documents.list", {"collectionId": collection_id}) documents = [] if result and "data" in result: documents = result["data"] # Also get navigation tree for hierarchy nav_result = self.make_request("/api/collections.documents", {"id": collection_id}) nav_tree = [] if nav_result and "data" in nav_result: nav_tree = nav_result["data"] return documents, nav_tree def get_document_info(self, doc_id: str) -> Optional[Dict]: """ Fetch full document content FIXED: Uses cache to avoid double fetching """ # Check cache first if doc_id in self.document_cache: return self.document_cache[doc_id] result = self.make_request("/api/documents.info", {"id": doc_id}) if result and "data" in result: doc = result["data"] # Cache the document self.document_cache[doc_id] = doc return doc return None def sanitize_filename(self, name: str) -> str: """Convert document title to safe filename""" # Replace invalid characters invalid_chars = '<>:"/\\|?*' for char in invalid_chars: name = name.replace(char, '_') # Limit length if len(name) > 200: name = name[:200] return name.strip() def calculate_checksum(self, content: str) -> str: """Calculate SHA256 checksum of content""" return hashlib.sha256(content.encode('utf-8')).hexdigest() def build_hierarchy(self, documents: List[Dict], nav_tree: List[Dict]) -> Dict: """ Build hierarchy mapping from navigation tree FIXED: Export directly from nav_tree, don't rely on documents list """ hierarchy = { "root": [], "children": {}, "all_ids": set() # Track all document IDs we've seen } def process_nav_node(node: Dict, parent_id: Optional[str] = None): doc_id = node["id"] doc_title = node.get("title", "Untitled") # Track this ID hierarchy["all_ids"].add(doc_id) # Create a minimal document dict from nav node # We'll fetch full content during export doc_data = { "id": doc_id, "title": doc_title, "parentDocumentId": parent_id } # Add to hierarchy if parent_id is None: hierarchy["root"].append(doc_data) else: if parent_id not in hierarchy["children"]: hierarchy["children"][parent_id] = [] hierarchy["children"][parent_id].append(doc_data) # Process children recursively children = node.get("children", []) for child in children: process_nav_node(child, doc_id) for root_node in nav_tree: process_nav_node(root_node) return hierarchy def export_document(self, document: Dict, collection_name: str, collection_path: Path, hierarchy: Dict, level: int = 0) -> Optional[Dict]: """ Export a single document and its children recursively FIXED: Enhanced error tracking and failed children tracking """ doc_id = document["id"] doc_title = document.get("title", "Untitled") if level == 0: logger.debug(f"Exporting: {doc_title}") # Fetch full document content (uses cache, so no double fetching) full_doc = self.get_document_info(doc_id) if not full_doc: # FIXED: Track failed documents with details self.failed_documents.append({ "id": doc_id, "title": doc_title, "collection": collection_name, "reason": "Failed to fetch document info from API", "level": level }) logger.warning(f"Failed to fetch document: {doc_title} (ID: {doc_id})") return None # Generate filename safe_title = self.sanitize_filename(doc_title) filename = f"{safe_title}.md" filepath = collection_path / filename # Handle duplicates counter = 1 while filepath.exists(): filename = f"{safe_title}_{counter}.md" filepath = collection_path / filename counter += 1 # Build markdown content content = f"# {doc_title}\n\n" content += f"\n" content += f"\n" content += f"\n" content += f"\n\n" content += "---\n\n" content += full_doc.get("text", "") # Write file try: with open(filepath, 'w', encoding='utf-8') as f: f.write(content) except Exception as e: # Track file write failures self.failed_documents.append({ "id": doc_id, "title": doc_title, "collection": collection_name, "reason": f"Failed to write file: {e}", "level": level }) logger.error(f"Failed to write file for: {doc_title}") return None file_size = filepath.stat().st_size self.stats["bytes_written"] += file_size self.stats["documents"] += 1 # Calculate checksum checksum = self.calculate_checksum(content) # Build metadata doc_metadata = { "id": doc_id, "title": doc_title, "filename": filename, "collection_name": collection_name, "parent_id": document.get("parentDocumentId"), "checksum": checksum, "size_bytes": file_size, "created_at": full_doc.get('createdAt'), "updated_at": full_doc.get('updatedAt'), "children": [], "failed_children": [] # NEW: Track children that failed to export } # Add to manifest if self.generate_manifests: self.manifest["documents"].append(doc_metadata) # Export children recursively child_docs = hierarchy["children"].get(doc_id, []) for child in child_docs: child_metadata = self.export_document( child, collection_name, collection_path, hierarchy, level + 1 ) if child_metadata: doc_metadata["children"].append(child_metadata) else: # FIXED: Track failed children doc_metadata["failed_children"].append({ "id": child["id"], "title": child.get("title", "Untitled") }) return doc_metadata def export_collection(self, collection: Dict) -> None: """Export a single collection with all its documents""" collection_id = collection["id"] collection_name = collection["name"] logger.info("=" * 60) logger.info(f"Exporting collection: {collection_name}") logger.info("=" * 60) # Fetch documents and navigation tree documents, nav_tree = self.get_documents_in_collection(collection_id) # Build hierarchy from navigation tree hierarchy = self.build_hierarchy(documents, nav_tree) # FIXED: Count documents from nav_tree (source of truth), not documents.list # The nav_tree includes ALL documents including nested ones expected_count = len(hierarchy["all_ids"]) logger.info(f"Documents in navigation tree: {expected_count}") if expected_count == 0: logger.info("No documents to export") # Still track this for statistics self.collection_stats[collection_id] = { "name": collection_name, "expected": expected_count, "fetched": 0, "exported": 0 } return # Create collection directory safe_name = self.sanitize_filename(collection_name) collection_path = self.output_dir / safe_name collection_path.mkdir(parents=True, exist_ok=True) # NEW: Generate tree view of ONLINE structure logger.info("--- Online Structure (from Outline API) ---") online_tree = TreeViewGenerator.generate_from_api(nav_tree) for line in online_tree[:20]: # Show first 20 lines logger.info(line) if len(online_tree) > 20: logger.info(f"... and {len(online_tree) - 20} more lines") # Prepare collection metadata collection_metadata = { "id": collection_id, "name": collection_name, "directory": safe_name, "expected_count": expected_count, # From navigation tree (all nested docs) "documents_list_count": len(documents), # From documents.list API "document_count": 0, # Will be updated after export "navigation_tree": nav_tree, # Preserve original navigation structure "documents": [] } # Export documents with optional progress bar root_docs = hierarchy["root"] if self.show_progress: iterator = tqdm(root_docs, desc=f" Exporting {collection_name}", leave=False) else: iterator = root_docs exported_count = 0 for doc in iterator: doc_metadata = self.export_document( doc, collection_name, collection_path, hierarchy ) if doc_metadata: collection_metadata["documents"].append(doc_metadata) exported_count += 1 # FIXED: Count ALL documents recursively (including children) def count_recursive(docs): count = 0 for doc in docs: count += 1 # Count this document count += count_recursive(doc.get("children", [])) # Count children recursively return count actual_exported_count = count_recursive(collection_metadata["documents"]) # Update with actual exported count collection_metadata["document_count"] = actual_exported_count # Save collection metadata metadata_path = collection_path / "_collection_metadata.json" with open(metadata_path, 'w', encoding='utf-8') as f: json.dump(collection_metadata, f, indent=2, ensure_ascii=False) # NEW: Generate tree view of EXPORTED files logger.info("--- Exported Files (on disk) ---") exported_tree = TreeViewGenerator.generate_from_files(collection_path, collection_metadata) for line in exported_tree[:20]: # Show first 20 lines logger.info(line) if len(exported_tree) > 20: logger.info(f"... and {len(exported_tree) - 20} more lines") # NEW: Print comparison TreeViewGenerator.print_comparison(online_tree, exported_tree, collection_name) # Add to manifest if self.generate_manifests: self.manifest["collections"].append({ "id": collection_id, "name": collection_name, "directory": safe_name, "expected_count": expected_count, # From nav_tree "documents_list_count": len(documents), # From API documents.list "exported_count": actual_exported_count # FIXED: Use recursive count }) # NEW: Store collection stats self.collection_stats[collection_id] = { "name": collection_name, "expected": expected_count, # From nav_tree (source of truth) "documents_list_count": len(documents), # From API "exported": actual_exported_count # FIXED: Use recursive count } self.stats["collections"] += 1 # Enhanced summary if actual_exported_count == expected_count: logger.info(f"Exported {actual_exported_count}/{expected_count} documents from '{collection_name}' - COMPLETE") else: missing = expected_count - actual_exported_count logger.warning(f"Exported {actual_exported_count}/{expected_count} documents from '{collection_name}' - {missing} MISSING") def save_manifest(self) -> None: """Save export manifest""" if not self.generate_manifests: return manifest_path = self.output_dir / "manifest.json" with open(manifest_path, 'w', encoding='utf-8') as f: json.dump(self.manifest, f, indent=2, ensure_ascii=False) def save_export_metadata(self) -> None: """Save export metadata with statistics""" metadata = { "export_date": datetime.now().isoformat(), "source_url": self.base_url, "statistics": self.stats, "collections": self.manifest["collections"], "failed_documents_count": len(self.failed_documents), "api_errors_count": len(self.api_errors) } metadata_path = self.output_dir / "export_metadata.json" with open(metadata_path, 'w', encoding='utf-8') as f: json.dump(metadata, f, indent=2, ensure_ascii=False) def save_error_report(self) -> None: """Save detailed error report""" if not self.failed_documents and not self.api_errors: return error_report = { "export_date": datetime.now().isoformat(), "failed_documents": self.failed_documents, "api_errors": self.api_errors, "statistics": { "total_failed_documents": len(self.failed_documents), "total_api_errors": len(self.api_errors) } } error_path = self.output_dir / "export_errors.json" with open(error_path, 'w', encoding='utf-8') as f: json.dump(error_report, f, indent=2, ensure_ascii=False) logger.warning(f"Error report saved to: {error_path}") logger.warning(f" - {len(self.failed_documents)} failed documents") logger.warning(f" - {len(self.api_errors)} API errors") def verify_export(self) -> bool: """ Verify export integrity and completeness FIXED: Enhanced verification comparing with API counts """ logger.info("=" * 60) logger.info("Verifying Export Integrity and Completeness") logger.info("=" * 60) if not self.generate_manifests: logger.warning("Skipping verification (manifests disabled)") return True errors = [] warnings = [] # NEW: Step 1 - Verify document count completeness logger.info("Step 1: Verifying document count completeness...") total_expected = 0 total_exported = 0 for coll_id, stats in self.collection_stats.items(): expected = stats["expected"] exported = stats["exported"] total_expected += expected total_exported += exported if exported < expected: missing = expected - exported warnings.append(f"Collection '{stats['name']}': {missing} documents missing") if total_exported < total_expected: errors.append( f"Document count mismatch: Expected {total_expected} from API, " f"exported {total_exported} (missing {total_expected - total_exported})" ) else: logger.info(f"All {total_expected} documents accounted for") # Step 2 - Verify files exist and checksums match logger.info("Step 2: Verifying file integrity...") file_errors = 0 checksum_errors = 0 if self.show_progress: iterator = tqdm(self.manifest["documents"], desc=" Verifying", leave=False) else: iterator = self.manifest["documents"] for doc in iterator: collection_dir = self.output_dir / doc["collection_name"] filepath = collection_dir / doc["filename"] # Check file exists if not filepath.exists(): file_errors += 1 errors.append(f"Missing file: {doc['filename']}") continue # Verify checksum try: with open(filepath, 'r', encoding='utf-8') as f: content = f.read() actual_checksum = self.calculate_checksum(content) if actual_checksum != doc["checksum"]: checksum_errors += 1 errors.append(f"Checksum mismatch: {doc['filename']}") except Exception as e: errors.append(f"Error reading file {doc['filename']}: {e}") if file_errors == 0 and checksum_errors == 0: logger.info(f"All {len(self.manifest['documents'])} files exist and checksums match") # Summary logger.info("=" * 60) if errors: logger.error(f"Verification FAILED: {len(errors)} critical errors") for err in errors[:10]: logger.error(f" - {err}") if len(errors) > 10: logger.error(f" ... and {len(errors) - 10} more errors") return False elif warnings: logger.warning(f"Verification PASSED with warnings: {len(warnings)} issues") for warn in warnings: logger.warning(f" - {warn}") return True else: logger.info("Verification PASSED - Export is complete and verified") logger.info(f" - All {total_expected} documents from API accounted for") logger.info(f" - All {len(self.manifest['documents'])} files exist") logger.info(f" - All checksums match") return True def dry_run(self) -> Dict: """ Preview what would be exported without writing files. Returns: Dictionary with collection/document counts and estimated size """ logger.info("=" * 60) logger.info("DRY RUN - Preview Export") logger.info("=" * 60) if not self.health_check(): logger.error("Dry run aborted due to failed health check") return {} collections = self.get_collections() if not collections: logger.warning("No collections found") return {"collections": 0, "documents": 0} total_docs = 0 results = { "collections": [], "total_collections": len(collections), "total_documents": 0 } for collection in collections: _, nav_tree = self.get_documents_in_collection(collection["id"]) hierarchy = self.build_hierarchy([], nav_tree) doc_count = len(hierarchy["all_ids"]) total_docs += doc_count results["collections"].append({ "name": collection["name"], "documents": doc_count }) logger.info(f" {collection['name']}: {doc_count} documents") results["total_documents"] = total_docs logger.info("=" * 60) logger.info(f"Total: {len(collections)} collections, {total_docs} documents") logger.info("=" * 60) logger.info("Dry run complete - no files written") return results def export_all(self, skip_health_check: bool = False) -> None: """Export all collections and documents. Args: skip_health_check: Skip the pre-export health check (default False) """ logger.info("=" * 60) logger.info("OUTLINE EXPORT - ENHANCED VERSION") logger.info("=" * 60) logger.info(f"Source: {self.base_url}") logger.info(f"Output: {self.output_dir}") logger.info(f"Max depth: {self.max_hierarchy_depth}") logger.info(f"Max retries: {self.max_retries}") logger.info(f"Progress bars: {'Enabled' if self.show_progress else 'Disabled'}") logger.info(f"Verification: {'Enabled' if self.verify_after_export else 'Disabled'}") logger.info("=" * 60) # Health check if not skip_health_check: if not self.health_check(): logger.error("Export aborted due to failed health check") return # Create output directory self.output_dir.mkdir(parents=True, exist_ok=True) # Fetch collections collections = self.get_collections() if not collections: logger.error("No collections found or API error") self.save_error_report() return # Export each collection for collection in collections: try: self.export_collection(collection) except Exception as e: logger.error(f"Failed to export collection {collection['name']}: {e}") self.api_errors.append({ "collection": collection['name'], "error": str(e), "timestamp": datetime.now().isoformat() }) # Save manifests and metadata self.save_manifest() self.save_export_metadata() if self.failed_documents or self.api_errors: self.save_error_report() # Print summary logger.info("=" * 60) logger.info("EXPORT SUMMARY") logger.info("=" * 60) logger.info(f"Collections exported: {self.stats['collections']}") logger.info(f"Documents exported: {self.stats['documents']}") logger.info(f"Total size: {self.stats['bytes_written'] / (1024*1024):.2f} MB") logger.info(f"Failed documents: {len(self.failed_documents)}") logger.info(f"API errors: {len(self.api_errors)}") logger.info("=" * 60) # Verify export if self.verify_after_export: verification_passed = self.verify_export() if not verification_passed: logger.warning("Export completed with verification errors") logger.warning("Check export_errors.json for details") else: logger.info("Export completed (verification skipped)") def load_settings(settings_file: str = "settings.json") -> Dict: """Load settings from JSON file""" try: with open(settings_file, 'r') as f: return json.load(f) except FileNotFoundError: logger.error(f"Settings file not found: {settings_file}") logger.error("Create a settings.json file with your configuration") sys.exit(1) except json.JSONDecodeError as e: logger.error(f"Invalid JSON in settings file: {e}") sys.exit(1) def parse_args() -> 'argparse.Namespace': """Parse command line arguments.""" import argparse parser = argparse.ArgumentParser( description="Export Outline wiki documents", formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument( '--dry-run', '-n', action='store_true', help='Preview what would be exported without writing files' ) parser.add_argument( '--output', '-o', help='Output directory (overrides settings.json)' ) parser.add_argument( '--verbose', '-v', action='count', default=0, help='Increase verbosity (use -vv for debug)' ) parser.add_argument( '--skip-verify', action='store_true', help='Skip post-export verification' ) parser.add_argument( '--skip-health-check', action='store_true', help='Skip pre-export health check' ) parser.add_argument( '--settings', default='settings.json', help='Path to settings file (default: settings.json)' ) return parser.parse_args() def main() -> None: """Main entry point""" args = parse_args() # Set log level based on verbosity if args.verbose >= 2: logger.setLevel(logging.DEBUG) elif args.verbose == 1: logger.setLevel(logging.INFO) # Load settings settings = load_settings(args.settings) source = settings.get("source", {}) export_config = settings.get("export", {}) advanced = settings.get("advanced", {}) # Validate required settings if not source.get("url") or not source.get("token"): logger.error("Missing required settings: source.url and source.token") sys.exit(1) # CLI overrides for settings output_dir = args.output or export_config.get("output_directory", "exports") verify_after = not args.skip_verify and export_config.get("verify_after_export", True) # Create exporter exporter = OutlineExporter( base_url=source["url"], api_token=source["token"], output_dir=output_dir, verify_after_export=verify_after, max_hierarchy_depth=advanced.get("max_hierarchy_depth", 100), show_progress=advanced.get("progress_bar", True), generate_manifests=advanced.get("generate_manifests", True), max_retries=advanced.get("max_retries", 3), retry_backoff=advanced.get("retry_backoff", 1.0) ) # Run export or dry run try: if args.dry_run: exporter.dry_run() else: exporter.export_all(skip_health_check=args.skip_health_check) except KeyboardInterrupt: logger.warning("Export cancelled by user") sys.exit(1) except Exception as e: logger.exception(f"Export failed: {e}") sys.exit(1) if __name__ == "__main__": main()