#!/usr/bin/env python3 """ Outline API Import Script Imports markdown files back into Outline wiki with hierarchy preservation. Companion script to outline_export_fixed.py. Usage: python3 outline_import.py [OPTIONS] Options: -s, --single Import all into single timestamped collection -n, --dry-run Preview operations without making changes -d, --source DIR Source directory (default: outline_export) -v, --verbose Increase verbosity (-vv for debug) -f, --force Overwrite existing collections --settings FILE Path to settings file (default: settings.json) -h, --help Show help message """ import os import sys import json import logging import time import argparse from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from tqdm import tqdm # Configure logging with tqdm-compatible handler class TqdmLoggingHandler(logging.Handler): """Logging handler that uses tqdm.write() to avoid breaking progress bars.""" def emit(self, record): try: msg = self.format(record) tqdm.write(msg, file=sys.stdout) except Exception: self.handleError(record) logger = logging.getLogger('outline_import') logger.setLevel(logging.INFO) handler = TqdmLoggingHandler() handler.setFormatter(logging.Formatter('%(asctime)s | %(levelname)-8s | %(message)s', datefmt='%H:%M:%S')) logger.addHandler(handler) logger.propagate = False class TreePrinter: """Utility for printing tree-style output.""" PIPE = "│ " ELBOW = "└── " TEE = "├── " BLANK = " " @staticmethod def format_line(title: str, status: str, message: str = None, prefix: str = "") -> str: """Format a tree line with status indicator.""" # Status symbols and labels if status == "created": symbol = "✓" label = "created" elif status == "skipped": symbol = "○" label = "skipped" elif status == "dry_run": symbol = "○" label = "(dry run)" else: symbol = "✗" label = message or "error" # Truncate title if too long max_title_len = 40 if len(title) > max_title_len: display_title = title[:max_title_len - 3] + "..." else: display_title = title # Format: prefix + title + padding + symbol + label filename = f"{display_title}.md" return f"{prefix}{filename:<45} {symbol} {label}" class OutlineImporter: """Import documents into Outline with hierarchy preservation.""" def __init__( self, base_url: str, api_token: str, source_dir: str = "outline_export", dry_run: bool = False, single_mode: bool = False, force: bool = False, on_collection_exists: str = "skip", on_document_exists: str = "skip", default_permission: str = "read_write", request_timeout: int = 30, retry_attempts: int = 3, retry_delay: float = 1.0, rate_limit_delay: float = 0.1 ): self.base_url = base_url.rstrip('/') self.api_token = api_token self.source_dir = Path(source_dir) self.dry_run = dry_run self.single_mode = single_mode self.force = force self.on_collection_exists = on_collection_exists self.on_document_exists = on_document_exists self.default_permission = default_permission self.request_timeout = request_timeout self.retry_attempts = retry_attempts self.retry_delay = retry_delay self.rate_limit_delay = rate_limit_delay # Setup session with retry logic self.session = requests.Session() retry_strategy = Retry( total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], ) adapter = HTTPAdapter(max_retries=retry_strategy) self.session.mount("http://", adapter) self.session.mount("https://", adapter) self.headers = { "Authorization": f"Bearer {self.api_token}", "Content-Type": "application/json" } # ID mapping: old_id -> new_id self.id_map: Dict[str, str] = {} # Track existing collections self.existing_collections: Dict[str, str] = {} # name -> id # Track imported collection for verification self.imported_collection_id: Optional[str] = None self.imported_collection_name: Optional[str] = None # Adaptive rate limiting self.current_rate_delay = rate_limit_delay self.min_rate_delay = 0.2 self.max_rate_delay = 5.0 self.rate_increase_factor = 1.5 # Increase delay by 50% on 429 self.rate_decrease_factor = 0.9 # Decrease delay by 10% on success # Statistics self.stats = { "collections_created": 0, "collections_skipped": 0, "collections_errors": 0, "documents_created": 0, "documents_skipped": 0, "documents_errors": 0, } # Error tracking self.errors: List[Dict] = [] # Source metadata for verification self.source_docs: List[Dict] = [] # Progress bar (initialized in import_all) self.pbar: Optional[tqdm] = None def _output(self, message: str, end: str = "\n") -> None: """Output message, using tqdm.write() if progress bar is active.""" if self.pbar is not None: # tqdm.write() always adds newline, so we handle end="" specially if end == "": # For inline messages, update the progress bar description instead self.pbar.set_description(message.strip()) else: tqdm.write(message, file=sys.stdout) else: print(message, end=end, flush=True) def _update_progress(self) -> None: """Update progress bar by 1.""" if self.pbar is not None: self.pbar.update(1) def _count_all_documents(self, source_collections: List[Path]) -> int: """Count total documents across all collections.""" total = 0 for collection_dir in source_collections: metadata = self.load_collection_metadata(collection_dir) if metadata: total += metadata.get("expected_count", 0) # In single mode, we also create a parent doc for each collection if self.single_mode: total += 1 return total def _parse_retry_after(self, response: requests.Response) -> Optional[float]: """ Parse Retry-After header from response. The header can be in two formats: 1. Seconds (integer): "Retry-After: 120" 2. HTTP date: "Retry-After: Wed, 21 Oct 2015 07:28:00 GMT" Args: response: HTTP response object Returns: Number of seconds to wait, or None if header not present/parseable """ retry_after = response.headers.get("Retry-After") if not retry_after: return None # Try parsing as number (seconds) - handles both int and float try: seconds = float(retry_after) logger.debug(f"Retry-After header: {seconds:.1f} seconds") return seconds except ValueError: pass # Try parsing as HTTP date try: from email.utils import parsedate_to_datetime retry_date = parsedate_to_datetime(retry_after) now = datetime.now(retry_date.tzinfo) delta = (retry_date - now).total_seconds() # Ensure positive wait time, minimum 1 second wait_seconds = max(1.0, delta) logger.debug(f"Retry-After header (HTTP date): wait {wait_seconds:.1f}s until {retry_after}") return wait_seconds except (ValueError, TypeError) as e: logger.warning(f"Could not parse Retry-After header '{retry_after}': {e}") return None def _api_request( self, endpoint: str, data: Optional[Dict] = None, method: str = "POST", is_write: bool = False, context: str = None ) -> Optional[Dict]: """ Make API request with error handling, retry logic, and adaptive rate limiting. Args: endpoint: API endpoint path (e.g., '/api/collections.list') data: Request body data method: HTTP method (POST or GET) is_write: Whether this is a write operation (applies rate limiting) context: Optional context string for logging (e.g., document title) Returns: Response data dict or None on failure """ log_context = f" [{context}]" if context else "" url = f"{self.base_url}{endpoint}" for attempt in range(self.retry_attempts): try: if method == "POST": response = self.session.post( url, headers=self.headers, json=data or {}, timeout=self.request_timeout ) else: response = self.session.get( url, headers=self.headers, timeout=self.request_timeout ) if response.status_code == 200: # Success - gradually decrease rate delay for write operations if is_write: old_delay = self.current_rate_delay self.current_rate_delay = max( self.min_rate_delay, self.current_rate_delay * self.rate_decrease_factor ) if old_delay != self.current_rate_delay: logger.debug(f"Rate delay decreased: {old_delay:.2f}s → {self.current_rate_delay:.2f}s") return response.json() elif response.status_code == 429: # Rate limited - check Retry-After header retry_after = self._parse_retry_after(response) # Increase adaptive delay based on Retry-After or multiplicative factor old_delay = self.current_rate_delay if retry_after is not None: # Use Retry-After to inform spacing: divide window by ~10 requests # e.g., 20s Retry-After → 2s delay between requests informed_delay = retry_after / 10.0 self.current_rate_delay = min( self.max_rate_delay, max(self.current_rate_delay, informed_delay) ) else: self.current_rate_delay = min( self.max_rate_delay, self.current_rate_delay * self.rate_increase_factor ) if old_delay != self.current_rate_delay: logger.info(f"Rate delay adjusted: {old_delay:.2f}s → {self.current_rate_delay:.2f}s") if attempt < self.retry_attempts - 1: # Use Retry-After if provided, otherwise fall back to exponential backoff if retry_after is not None: wait_time = retry_after logger.warning( f"Rate limited (429){log_context}, server requested {wait_time:.1f}s wait via Retry-After " f"(attempt {attempt + 1}/{self.retry_attempts})" ) else: wait_time = self.current_rate_delay * (2 ** attempt) logger.warning( f"Rate limited (429){log_context}, waiting {wait_time:.1f}s " f"(attempt {attempt + 1}/{self.retry_attempts}, delay now {self.current_rate_delay:.1f}s)" ) time.sleep(wait_time) continue elif response.status_code in [500, 502, 503, 504]: if attempt < self.retry_attempts - 1: # Check for Retry-After header (common with 503) retry_after = self._parse_retry_after(response) if retry_after is not None: wait_time = retry_after logger.warning( f"API error {response.status_code}{log_context}, " f"server requested {wait_time:.1f}s wait via Retry-After " f"(attempt {attempt + 1}/{self.retry_attempts})" ) else: wait_time = self.retry_delay * (2 ** attempt) logger.warning( f"API error {response.status_code}{log_context}, " f"retrying in {wait_time:.1f}s (attempt {attempt + 1}/{self.retry_attempts})" ) time.sleep(wait_time) continue # Non-retryable error or final attempt logger.error(f"API error{log_context}: HTTP {response.status_code}") logger.debug(f"Response: {response.text[:200]}") return None except requests.RequestException as e: if attempt < self.retry_attempts - 1: wait_time = self.retry_delay * (2 ** attempt) logger.warning( f"Request failed on {endpoint}: {e}, " f"retrying in {wait_time:.1f}s" ) time.sleep(wait_time) else: logger.error(f"All {self.retry_attempts} attempts failed on {endpoint}: {e}") return None return None def health_check(self) -> bool: """ Verify API connectivity and authentication. Returns: True if API is accessible and authenticated """ print("Checking API connectivity...", end=" ") result = self._api_request("/api/auth.info") if result and "data" in result: user = result["data"].get("user", {}) team = result["data"].get("team", {}) print("✓") logger.debug(f"Authenticated as: {user.get('name', 'Unknown')} ({user.get('email', 'N/A')})") logger.debug(f"Team: {team.get('name', 'Unknown')}") return True print("✗") logger.error("Health check failed: Unable to verify authentication") return False def _get_collections(self) -> List[Dict]: """Fetch all existing collections from Outline.""" result = self._api_request("/api/collections.list") if result and "data" in result: collections = result["data"] # Cache name -> id mapping self.existing_collections = {c["name"]: c["id"] for c in collections} return collections return [] def _create_collection(self, name: str, permission: str = None) -> Optional[str]: """ Create a new collection. Args: name: Collection name permission: Permission level ('read' or 'read_write') Returns: Collection ID if created, None on failure """ if permission is None: permission = self.default_permission if self.dry_run: logger.info(f" [DRY RUN] Would create collection \"{name}\"") return "dry-run-collection-id" result = self._api_request("/api/collections.create", { "name": name, "permission": permission }, is_write=True, context=f"collection:{name}") if result and "data" in result: collection_id = result["data"]["id"] logger.debug(f"Created collection: {name} (id: {collection_id})") self.existing_collections[name] = collection_id return collection_id logger.error(f"Failed to create collection: {name}") return None def _delete_collection(self, collection_id: str) -> bool: """ Delete a collection. Args: collection_id: Collection ID to delete Returns: True if deleted successfully """ if self.dry_run: logger.info(f" [DRY RUN] Would delete collection {collection_id}") return True result = self._api_request("/api/collections.delete", {"id": collection_id}) return result is not None def _create_document( self, collection_id: str, title: str, text: str, parent_document_id: Optional[str] = None, publish: bool = True ) -> Optional[str]: """ Create a new document in a collection. Args: collection_id: Parent collection ID title: Document title text: Markdown content parent_document_id: Optional parent document ID for nesting publish: Whether to publish immediately Returns: Document ID if created, None on failure """ if self.dry_run: return "dry-run-document-id" data = { "collectionId": collection_id, "title": title, "text": text, "publish": publish } if parent_document_id: data["parentDocumentId"] = parent_document_id # Adaptive rate limiting - uses current_rate_delay which increases after 429s if self.current_rate_delay > 0: time.sleep(self.current_rate_delay) result = self._api_request("/api/documents.create", data, is_write=True, context=title) if result and "data" in result: return result["data"]["id"] logger.error(f"Failed to create document: {title}") return None def _get_documents_in_collection(self, collection_id: str) -> List[Dict]: """Fetch all documents in a collection.""" result = self._api_request("/api/documents.list", {"collectionId": collection_id}) if result and "data" in result: return result["data"] return [] def load_collection_metadata(self, collection_dir: Path) -> Optional[Dict]: """ Load _collection_metadata.json from a collection directory. Args: collection_dir: Path to collection directory Returns: Metadata dict or None if not found/invalid """ metadata_path = collection_dir / "_collection_metadata.json" if not metadata_path.exists(): logger.warning(f"No metadata file found in {collection_dir}") return None try: with open(metadata_path, 'r', encoding='utf-8') as f: return json.load(f) except json.JSONDecodeError as e: logger.error(f"Invalid JSON in {metadata_path}: {e}") return None except Exception as e: logger.error(f"Error reading {metadata_path}: {e}") return None def get_source_collections(self) -> List[Path]: """ Get list of collection directories from source. Returns: List of collection directory paths """ if not self.source_dir.exists(): logger.error(f"Source directory not found: {self.source_dir}") return [] collections = [] for item in sorted(self.source_dir.iterdir()): if item.is_dir() and not item.name.startswith('.'): # Check for metadata file if (item / "_collection_metadata.json").exists(): collections.append(item) else: logger.warning(f"Skipping {item.name}: no metadata file") return collections def build_document_tree(self, documents: List[Dict]) -> List[Dict]: """ Build ordered document tree from flat metadata list. Uses topological sort to ensure parents are created before children. Args: documents: List of document metadata dicts from _collection_metadata.json Returns: List of root documents with nested children """ # Build lookup by ID doc_by_id: Dict[str, Dict] = {} for doc in documents: doc_by_id[doc["id"]] = doc.copy() doc_by_id[doc["id"]]["_children"] = [] # Build parent-child relationships roots = [] for doc in documents: parent_id = doc.get("parent_id") if parent_id and parent_id in doc_by_id: doc_by_id[parent_id]["_children"].append(doc_by_id[doc["id"]]) else: roots.append(doc_by_id[doc["id"]]) return roots def flatten_for_import(self, doc_tree: List[Dict], result: List[Dict] = None) -> List[Dict]: """ Flatten document tree in topological order (parents before children). Args: doc_tree: Nested document tree result: Accumulator list (used internally) Returns: Flat list of documents in import order """ if result is None: result = [] for doc in doc_tree: # Add this document result.append({ "id": doc["id"], "title": doc["title"], "filename": doc["filename"], "parent_id": doc.get("parent_id"), }) # Then add children recursively children = doc.get("_children", []) or doc.get("children", []) if children: self.flatten_for_import(children, result) return result def read_document_content(self, collection_dir: Path, filename: str) -> Optional[str]: """ Read markdown content from file. Args: collection_dir: Path to collection directory filename: Document filename Returns: Markdown content or None if not found """ filepath = collection_dir / filename if not filepath.exists(): logger.warning(f"File not found: {filepath}") return None try: with open(filepath, 'r', encoding='utf-8') as f: content = f.read() # Strip the header metadata added by export # Format: # Title\n\n\n\n---\n\nActual content lines = content.split('\n') content_start = 0 for i, line in enumerate(lines): if line.strip() == '---': content_start = i + 1 break if content_start > 0 and content_start < len(lines): return '\n'.join(lines[content_start:]).strip() return content except Exception as e: logger.error(f"Error reading {filepath}: {e}") return None def import_collection( self, collection_dir: Path, target_collection_id: Optional[str] = None, parent_document_id: Optional[str] = None ) -> Tuple[int, int, int]: """ Import a single collection with tree-style output. Args: collection_dir: Path to collection directory target_collection_id: Override target collection (for single mode) parent_document_id: Parent document ID (for single mode) Returns: Tuple of (created, skipped, errors) """ metadata = self.load_collection_metadata(collection_dir) if not metadata: self.stats["collections_errors"] += 1 self.errors.append({ "type": "collection", "name": collection_dir.name, "error": "Invalid or missing metadata" }) return (0, 0, 1) collection_name = metadata.get("name", collection_dir.name) documents = metadata.get("documents", []) # Count documents recursively def count_docs(docs): count = 0 for doc in docs: count += 1 count += count_docs(doc.get("children", [])) return count doc_count = count_docs(documents) # Determine collection ID collection_id = target_collection_id if not collection_id: # Check if collection exists if collection_name in self.existing_collections: if self.force: self._output(f" Deleting existing collection \"{collection_name}\"...") if not self.dry_run: self._delete_collection(self.existing_collections[collection_name]) del self.existing_collections[collection_name] else: self._output(f" Collection exists, skipping...") self.stats["collections_skipped"] += 1 return (0, doc_count, 0) # Create collection if self.dry_run: self._output(f" [DRY RUN] Would create collection \"{collection_name}\"") collection_id = "dry-run-collection-id" else: self._output(f" Creating collection... ", end="") collection_id = self._create_collection(collection_name) if not collection_id: self._output("✗ failed") self.stats["collections_errors"] += 1 self.errors.append({ "type": "collection", "name": collection_name, "error": "Failed to create collection" }) return (0, 0, 1) self._output(f"✓ (id: {collection_id[:8]}...)") self.stats["collections_created"] += 1 # Build document tree for tree-style import doc_tree = self.build_document_tree(documents) # Import documents with tree visualization created = 0 skipped = 0 errors = 0 def import_tree_recursive( docs: List[Dict], prefix: str = " ", coll_id: str = None, default_parent_id: str = None ) -> Tuple[int, int, int]: """Recursively import documents with tree-style output.""" nonlocal created, skipped, errors for i, doc in enumerate(docs): is_last = (i == len(docs) - 1) connector = TreePrinter.ELBOW if is_last else TreePrinter.TEE old_id = doc["id"] title = doc["title"] filename = doc["filename"] old_parent_id = doc.get("parent_id") children = doc.get("_children", []) or doc.get("children", []) # Resolve parent ID new_parent_id = default_parent_id if old_parent_id: new_parent_id = self.id_map.get(old_parent_id, default_parent_id) # Read content content = self.read_document_content(collection_dir, filename) if content is None: line = TreePrinter.format_line(title, "error", "file not found", prefix + connector) self._output(line) self._update_progress() errors += 1 self.stats["documents_errors"] += 1 self.errors.append({ "type": "document", "title": title, "collection": collection_name, "error": "File not found" }) # Skip children if parent failed if children: child_prefix = prefix + (TreePrinter.BLANK if is_last else TreePrinter.PIPE) self._output(f"{child_prefix}└── (children skipped due to parent failure)") continue # Create document if self.dry_run: line = TreePrinter.format_line(title, "dry_run", prefix=prefix + connector) self._output(line) self._update_progress() self.id_map[old_id] = f"dry-run-{old_id}" created += 1 self.stats["documents_created"] += 1 else: new_id = self._create_document( coll_id, title, content, parent_document_id=new_parent_id ) if new_id: self.id_map[old_id] = new_id line = TreePrinter.format_line(title, "created", prefix=prefix + connector) self._output(line) self._update_progress() created += 1 self.stats["documents_created"] += 1 else: line = TreePrinter.format_line(title, "error", "API error", prefix + connector) self._output(line) self._update_progress() errors += 1 self.stats["documents_errors"] += 1 self.errors.append({ "type": "document", "title": title, "collection": collection_name, "error": "API error during creation" }) # Skip children if parent failed if children: child_prefix = prefix + (TreePrinter.BLANK if is_last else TreePrinter.PIPE) self._output(f"{child_prefix}└── (children skipped due to parent failure)") continue # Process children recursively if children: child_prefix = prefix + (TreePrinter.BLANK if is_last else TreePrinter.PIPE) import_tree_recursive( children, prefix=child_prefix, coll_id=coll_id, default_parent_id=self.id_map.get(old_id, default_parent_id) ) # Start recursive import import_tree_recursive( doc_tree, prefix=" ", coll_id=collection_id, default_parent_id=parent_document_id ) return (created, skipped, errors) def import_all(self) -> None: """Import all collections from source directory.""" start_time = time.time() # Print header mode_str = "Single collection" if self.single_mode else "Collection per folder" dry_run_str = " (DRY RUN)" if self.dry_run else "" print("════════════════════════════════════════════════════════════") print(f" OUTLINE IMPORT{dry_run_str}") print("════════════════════════════════════════════════════════════") print() print(f"Source: {self.source_dir}/") print(f"Target: {self.base_url}") print(f"Mode: {mode_str}") print() if self.dry_run: print("[DRY RUN] No changes will be made") print() # Health check (skip in dry-run mode if it fails) health_ok = self.health_check() if not health_ok: if self.dry_run: print(" (continuing in dry-run mode without API)") print() else: logger.error("Import aborted due to failed health check") return else: print() # Get existing collections (skip in dry-run if health check failed) if health_ok: self._get_collections() # Get source collections source_collections = self.get_source_collections() if not source_collections: logger.error("No collections found in source directory") return # Count total documents and initialize progress bar total_docs = self._count_all_documents(source_collections) print(f"Total documents to import: {total_docs}") print() self.pbar = tqdm( total=total_docs, desc="Importing", unit="doc", dynamic_ncols=True, file=sys.stdout, leave=True, mininterval=1.0, # Update at most once per second bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]" ) try: if self.single_mode: # Single collection mode timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") single_collection_name = f"import_{timestamp}" logger.info(f"Creating single collection: {single_collection_name}") collection_id = self._create_collection(single_collection_name) if not collection_id and not self.dry_run: logger.error("Failed to create import collection") return self.stats["collections_created"] += 1 for collection_dir in source_collections: metadata = self.load_collection_metadata(collection_dir) if not metadata: continue collection_name = metadata.get("name", collection_dir.name) doc_count = metadata.get("expected_count", 0) self._output(f"\n{collection_name}/ ({doc_count} documents)") # Create parent document for this "collection" parent_doc_id = self._create_document( collection_id, collection_name, f"# {collection_name}\n\nImported collection.", parent_document_id=None ) if parent_doc_id: self.stats["documents_created"] += 1 self._update_progress() # Import documents under this parent self.import_collection( collection_dir, target_collection_id=collection_id, parent_document_id=parent_doc_id ) else: # Standard mode: one collection per folder for collection_dir in source_collections: metadata = self.load_collection_metadata(collection_dir) if not metadata: continue collection_name = metadata.get("name", collection_dir.name) doc_count = metadata.get("expected_count", 0) self._output(f"\n{collection_name}/ ({doc_count} documents)") self.import_collection(collection_dir) finally: # Close progress bar if self.pbar: self.pbar.close() self.pbar = None # Print summary duration = time.time() - start_time print() print("════════════════════════════════════════════════════════════") if self.dry_run: print("DRY RUN SUMMARY") else: print("SUMMARY") print("════════════════════════════════════════════════════════════") print(f" Collections: {self.stats['collections_created']} created, " f"{self.stats['collections_skipped']} skipped, " f"{self.stats['collections_errors']} errors") print(f" Documents: {self.stats['documents_created']} created, " f"{self.stats['documents_skipped']} skipped, " f"{self.stats['documents_errors']} errors") print(f" Duration: {duration:.1f} seconds") print("════════════════════════════════════════════════════════════") if self.errors: print() print(f"Encountered {len(self.errors)} errors during import:") def load_settings(settings_file: str = "settings.json") -> Dict: """Load settings from JSON file.""" try: with open(settings_file, 'r') as f: return json.load(f) except FileNotFoundError: logger.error(f"Settings file not found: {settings_file}") logger.error("Create a settings.json file with your configuration") sys.exit(1) except json.JSONDecodeError as e: logger.error(f"Invalid JSON in settings file: {e}") sys.exit(1) def parse_args() -> argparse.Namespace: """Parse command line arguments.""" parser = argparse.ArgumentParser( description="Import markdown files into Outline wiki", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s Import all collections from outline_export/ %(prog)s --dry-run Preview what would be imported %(prog)s --single Import all into a single timestamped collection %(prog)s -d backup/ Import from custom directory %(prog)s --force Overwrite existing collections """ ) parser.add_argument( '-s', '--single', action='store_true', help='Import all into single timestamped collection' ) parser.add_argument( '-n', '--dry-run', action='store_true', help='Preview operations without making changes' ) parser.add_argument( '-d', '--source', default=None, help='Source directory (default: outline_export)' ) parser.add_argument( '-v', '--verbose', action='count', default=0, help='Increase verbosity (use -vv for debug)' ) parser.add_argument( '-f', '--force', action='store_true', help='Overwrite existing collections (instead of skip)' ) parser.add_argument( '--settings', default='settings.json', help='Path to settings file (default: settings.json)' ) return parser.parse_args() def main() -> None: """Main entry point.""" args = parse_args() # Set log level based on verbosity if args.verbose >= 2: logger.setLevel(logging.DEBUG) elif args.verbose == 1: logger.setLevel(logging.INFO) # Load settings settings = load_settings(args.settings) source = settings.get("source", {}) import_config = settings.get("import", {}) advanced = settings.get("advanced", {}) # Validate required settings if not source.get("url") or not source.get("token"): logger.error("Missing required settings: source.url and source.token") sys.exit(1) # Determine source directory source_dir = args.source or import_config.get("source_directory", "outline_export") # Create importer importer = OutlineImporter( base_url=source["url"], api_token=source["token"], source_dir=source_dir, dry_run=args.dry_run, single_mode=args.single, force=args.force, on_collection_exists=import_config.get("on_collection_exists", "skip"), on_document_exists=import_config.get("on_document_exists", "skip"), default_permission=import_config.get("default_permission", "read_write"), request_timeout=advanced.get("request_timeout", 30), retry_attempts=advanced.get("retry_attempts", 3), retry_delay=advanced.get("retry_delay", 1.0), rate_limit_delay=advanced.get("rate_limit_delay", 0.1) ) # Run import try: importer.import_all() except KeyboardInterrupt: logger.warning("Import cancelled by user") sys.exit(1) except Exception as e: logger.exception(f"Import failed: {e}") sys.exit(1) if __name__ == "__main__": main()