From 290030f5e8a098b80bc38f2b72e5cff932b693e2 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 19 Jan 2026 22:42:49 +0100 Subject: [PATCH] Phase 1-5: Core import script with full functionality - OutlineImporter class with settings loading - API helpers with retry logic - CLI argument parsing - Metadata loading and document tree building - Collection import with existence checking - Document import with ID mapping for hierarchy - Single collection mode - Dry-run support Co-Authored-By: Claude Opus 4.5 --- outline_import.py | 823 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 823 insertions(+) create mode 100644 outline_import.py diff --git a/outline_import.py b/outline_import.py new file mode 100644 index 0000000..d8ac2d2 --- /dev/null +++ b/outline_import.py @@ -0,0 +1,823 @@ +#!/usr/bin/env python3 +""" +Outline API Import Script +Imports markdown files back into Outline wiki with hierarchy preservation. +Companion script to outline_export_fixed.py. + +Usage: + python3 outline_import.py [OPTIONS] + +Options: + -s, --single Import all into single timestamped collection + -n, --dry-run Preview operations without making changes + -d, --source DIR Source directory (default: outline_export) + -v, --verbose Increase verbosity (-vv for debug) + -f, --force Overwrite existing collections + --settings FILE Path to settings file (default: settings.json) + -h, --help Show help message +""" + +import os +import sys +import json +import logging +import time +import argparse +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s | %(levelname)-8s | %(message)s', + datefmt='%H:%M:%S' +) +logger = logging.getLogger('outline_import') + + +class OutlineImporter: + """Import documents into Outline with hierarchy preservation.""" + + def __init__( + self, + base_url: str, + api_token: str, + source_dir: str = "outline_export", + dry_run: bool = False, + single_mode: bool = False, + force: bool = False, + on_collection_exists: str = "skip", + on_document_exists: str = "skip", + default_permission: str = "read_write", + request_timeout: int = 30, + retry_attempts: int = 3, + retry_delay: float = 1.0, + rate_limit_delay: float = 0.1 + ): + self.base_url = base_url.rstrip('/') + self.api_token = api_token + self.source_dir = Path(source_dir) + self.dry_run = dry_run + self.single_mode = single_mode + self.force = force + self.on_collection_exists = on_collection_exists + self.on_document_exists = on_document_exists + self.default_permission = default_permission + self.request_timeout = request_timeout + self.retry_attempts = retry_attempts + self.retry_delay = retry_delay + self.rate_limit_delay = rate_limit_delay + + # Setup session with retry logic + self.session = requests.Session() + retry_strategy = Retry( + total=3, + backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504], + ) + adapter = HTTPAdapter(max_retries=retry_strategy) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) + + self.headers = { + "Authorization": f"Bearer {self.api_token}", + "Content-Type": "application/json" + } + + # ID mapping: old_id -> new_id + self.id_map: Dict[str, str] = {} + + # Track existing collections + self.existing_collections: Dict[str, str] = {} # name -> id + + # Statistics + self.stats = { + "collections_created": 0, + "collections_skipped": 0, + "collections_errors": 0, + "documents_created": 0, + "documents_skipped": 0, + "documents_errors": 0, + } + + # Error tracking + self.errors: List[Dict] = [] + + def _api_request( + self, + endpoint: str, + data: Optional[Dict] = None, + method: str = "POST" + ) -> Optional[Dict]: + """ + Make API request with error handling and retry logic. + + Args: + endpoint: API endpoint path (e.g., '/api/collections.list') + data: Request body data + method: HTTP method (POST or GET) + + Returns: + Response data dict or None on failure + """ + url = f"{self.base_url}{endpoint}" + + for attempt in range(self.retry_attempts): + try: + if method == "POST": + response = self.session.post( + url, + headers=self.headers, + json=data or {}, + timeout=self.request_timeout + ) + else: + response = self.session.get( + url, + headers=self.headers, + timeout=self.request_timeout + ) + + if response.status_code == 200: + return response.json() + elif response.status_code in [429, 500, 502, 503, 504]: + if attempt < self.retry_attempts - 1: + wait_time = self.retry_delay * (2 ** attempt) + logger.warning( + f"API error {response.status_code} on {endpoint}, " + f"retrying in {wait_time:.1f}s (attempt {attempt + 1}/{self.retry_attempts})" + ) + time.sleep(wait_time) + continue + + # Non-retryable error or final attempt + logger.error(f"API error on {endpoint}: HTTP {response.status_code}") + logger.debug(f"Response: {response.text[:200]}") + return None + + except requests.RequestException as e: + if attempt < self.retry_attempts - 1: + wait_time = self.retry_delay * (2 ** attempt) + logger.warning( + f"Request failed on {endpoint}: {e}, " + f"retrying in {wait_time:.1f}s" + ) + time.sleep(wait_time) + else: + logger.error(f"All {self.retry_attempts} attempts failed on {endpoint}: {e}") + return None + + return None + + def health_check(self) -> bool: + """ + Verify API connectivity and authentication. + + Returns: + True if API is accessible and authenticated + """ + logger.info("Checking API connectivity...") + result = self._api_request("/api/auth.info") + if result and "data" in result: + user = result["data"].get("user", {}) + team = result["data"].get("team", {}) + logger.info(f"Authenticated as: {user.get('name', 'Unknown')} ({user.get('email', 'N/A')})") + logger.info(f"Team: {team.get('name', 'Unknown')}") + return True + logger.error("Health check failed: Unable to verify authentication") + return False + + def _get_collections(self) -> List[Dict]: + """Fetch all existing collections from Outline.""" + result = self._api_request("/api/collections.list") + if result and "data" in result: + collections = result["data"] + # Cache name -> id mapping + self.existing_collections = {c["name"]: c["id"] for c in collections} + return collections + return [] + + def _create_collection(self, name: str, permission: str = None) -> Optional[str]: + """ + Create a new collection. + + Args: + name: Collection name + permission: Permission level ('read' or 'read_write') + + Returns: + Collection ID if created, None on failure + """ + if permission is None: + permission = self.default_permission + + if self.dry_run: + logger.info(f" [DRY RUN] Would create collection \"{name}\"") + return "dry-run-collection-id" + + result = self._api_request("/api/collections.create", { + "name": name, + "permission": permission + }) + + if result and "data" in result: + collection_id = result["data"]["id"] + logger.debug(f"Created collection: {name} (id: {collection_id})") + self.existing_collections[name] = collection_id + return collection_id + + logger.error(f"Failed to create collection: {name}") + return None + + def _delete_collection(self, collection_id: str) -> bool: + """ + Delete a collection. + + Args: + collection_id: Collection ID to delete + + Returns: + True if deleted successfully + """ + if self.dry_run: + logger.info(f" [DRY RUN] Would delete collection {collection_id}") + return True + + result = self._api_request("/api/collections.delete", {"id": collection_id}) + return result is not None + + def _create_document( + self, + collection_id: str, + title: str, + text: str, + parent_document_id: Optional[str] = None, + publish: bool = True + ) -> Optional[str]: + """ + Create a new document in a collection. + + Args: + collection_id: Parent collection ID + title: Document title + text: Markdown content + parent_document_id: Optional parent document ID for nesting + publish: Whether to publish immediately + + Returns: + Document ID if created, None on failure + """ + if self.dry_run: + return "dry-run-document-id" + + data = { + "collectionId": collection_id, + "title": title, + "text": text, + "publish": publish + } + if parent_document_id: + data["parentDocumentId"] = parent_document_id + + # Rate limiting + if self.rate_limit_delay > 0: + time.sleep(self.rate_limit_delay) + + result = self._api_request("/api/documents.create", data) + + if result and "data" in result: + return result["data"]["id"] + + logger.error(f"Failed to create document: {title}") + return None + + def _get_documents_in_collection(self, collection_id: str) -> List[Dict]: + """Fetch all documents in a collection.""" + result = self._api_request("/api/documents.list", {"collectionId": collection_id}) + if result and "data" in result: + return result["data"] + return [] + + def load_collection_metadata(self, collection_dir: Path) -> Optional[Dict]: + """ + Load _collection_metadata.json from a collection directory. + + Args: + collection_dir: Path to collection directory + + Returns: + Metadata dict or None if not found/invalid + """ + metadata_path = collection_dir / "_collection_metadata.json" + if not metadata_path.exists(): + logger.warning(f"No metadata file found in {collection_dir}") + return None + + try: + with open(metadata_path, 'r', encoding='utf-8') as f: + return json.load(f) + except json.JSONDecodeError as e: + logger.error(f"Invalid JSON in {metadata_path}: {e}") + return None + except Exception as e: + logger.error(f"Error reading {metadata_path}: {e}") + return None + + def get_source_collections(self) -> List[Path]: + """ + Get list of collection directories from source. + + Returns: + List of collection directory paths + """ + if not self.source_dir.exists(): + logger.error(f"Source directory not found: {self.source_dir}") + return [] + + collections = [] + for item in sorted(self.source_dir.iterdir()): + if item.is_dir() and not item.name.startswith('.'): + # Check for metadata file + if (item / "_collection_metadata.json").exists(): + collections.append(item) + else: + logger.warning(f"Skipping {item.name}: no metadata file") + + return collections + + def build_document_tree(self, documents: List[Dict]) -> List[Dict]: + """ + Build ordered document tree from flat metadata list. + Uses topological sort to ensure parents are created before children. + + Args: + documents: List of document metadata dicts from _collection_metadata.json + + Returns: + List of root documents with nested children + """ + # Build lookup by ID + doc_by_id: Dict[str, Dict] = {} + for doc in documents: + doc_by_id[doc["id"]] = doc.copy() + doc_by_id[doc["id"]]["_children"] = [] + + # Build parent-child relationships + roots = [] + for doc in documents: + parent_id = doc.get("parent_id") + if parent_id and parent_id in doc_by_id: + doc_by_id[parent_id]["_children"].append(doc_by_id[doc["id"]]) + else: + roots.append(doc_by_id[doc["id"]]) + + return roots + + def flatten_for_import(self, doc_tree: List[Dict], result: List[Dict] = None) -> List[Dict]: + """ + Flatten document tree in topological order (parents before children). + + Args: + doc_tree: Nested document tree + result: Accumulator list (used internally) + + Returns: + Flat list of documents in import order + """ + if result is None: + result = [] + + for doc in doc_tree: + # Add this document + result.append({ + "id": doc["id"], + "title": doc["title"], + "filename": doc["filename"], + "parent_id": doc.get("parent_id"), + }) + # Then add children recursively + children = doc.get("_children", []) or doc.get("children", []) + if children: + self.flatten_for_import(children, result) + + return result + + def read_document_content(self, collection_dir: Path, filename: str) -> Optional[str]: + """ + Read markdown content from file. + + Args: + collection_dir: Path to collection directory + filename: Document filename + + Returns: + Markdown content or None if not found + """ + filepath = collection_dir / filename + if not filepath.exists(): + logger.warning(f"File not found: {filepath}") + return None + + try: + with open(filepath, 'r', encoding='utf-8') as f: + content = f.read() + + # Strip the header metadata added by export + # Format: # Title\n\n\n\n---\n\nActual content + lines = content.split('\n') + content_start = 0 + + for i, line in enumerate(lines): + if line.strip() == '---': + content_start = i + 1 + break + + if content_start > 0 and content_start < len(lines): + return '\n'.join(lines[content_start:]).strip() + + return content + + except Exception as e: + logger.error(f"Error reading {filepath}: {e}") + return None + + def import_collection( + self, + collection_dir: Path, + target_collection_id: Optional[str] = None, + parent_document_id: Optional[str] = None + ) -> Tuple[int, int, int]: + """ + Import a single collection. + + Args: + collection_dir: Path to collection directory + target_collection_id: Override target collection (for single mode) + parent_document_id: Parent document ID (for single mode) + + Returns: + Tuple of (created, skipped, errors) + """ + metadata = self.load_collection_metadata(collection_dir) + if not metadata: + self.stats["collections_errors"] += 1 + self.errors.append({ + "type": "collection", + "name": collection_dir.name, + "error": "Invalid or missing metadata" + }) + return (0, 0, 1) + + collection_name = metadata.get("name", collection_dir.name) + documents = metadata.get("documents", []) + + # Count documents recursively + def count_docs(docs): + count = 0 + for doc in docs: + count += 1 + count += count_docs(doc.get("children", [])) + return count + + doc_count = count_docs(documents) + + # Determine collection ID + collection_id = target_collection_id + if not collection_id: + # Check if collection exists + if collection_name in self.existing_collections: + if self.force: + logger.info(f" Deleting existing collection \"{collection_name}\"...") + if not self.dry_run: + self._delete_collection(self.existing_collections[collection_name]) + del self.existing_collections[collection_name] + else: + logger.info(f" Collection exists, skipping...") + self.stats["collections_skipped"] += 1 + return (0, doc_count, 0) + + # Create collection + logger.info(f" Creating collection...") + collection_id = self._create_collection(collection_name) + if not collection_id: + self.stats["collections_errors"] += 1 + self.errors.append({ + "type": "collection", + "name": collection_name, + "error": "Failed to create collection" + }) + return (0, 0, 1) + + if not self.dry_run: + logger.info(f" ✓ (id: {collection_id[:8]}...)") + self.stats["collections_created"] += 1 + + # Build document tree and flatten for import + doc_tree = self.build_document_tree(documents) + import_order = self.flatten_for_import(doc_tree) + + # Import documents + created = 0 + skipped = 0 + errors = 0 + + for doc_meta in import_order: + old_id = doc_meta["id"] + title = doc_meta["title"] + filename = doc_meta["filename"] + old_parent_id = doc_meta.get("parent_id") + + # Resolve parent ID + new_parent_id = parent_document_id # Default for single mode + if old_parent_id: + new_parent_id = self.id_map.get(old_parent_id) + if not new_parent_id and not self.dry_run: + logger.warning(f"Parent not found for {title}, creating as root-level") + + # Read content + content = self.read_document_content(collection_dir, filename) + if content is None: + self._print_doc_status(title, "error", "file not found") + errors += 1 + self.stats["documents_errors"] += 1 + self.errors.append({ + "type": "document", + "title": title, + "collection": collection_name, + "error": "File not found" + }) + continue + + # Create document + new_id = self._create_document( + collection_id, + title, + content, + parent_document_id=new_parent_id + ) + + if new_id: + self.id_map[old_id] = new_id + self._print_doc_status(title, "created") + created += 1 + self.stats["documents_created"] += 1 + else: + self._print_doc_status(title, "error", "API error") + errors += 1 + self.stats["documents_errors"] += 1 + self.errors.append({ + "type": "document", + "title": title, + "collection": collection_name, + "error": "API error during creation" + }) + + return (created, skipped, errors) + + def _print_doc_status(self, title: str, status: str, message: str = None): + """Print document import status.""" + if status == "created": + symbol = "✓" + label = "created" + elif status == "skipped": + symbol = "○" + label = "skipped" + else: + symbol = "✗" + label = message or "error" + + # This will be enhanced in Phase 6 with tree formatting + logger.info(f" {symbol} {title[:50]:<50} {label}") + + def import_all(self) -> None: + """Import all collections from source directory.""" + start_time = time.time() + + # Print header + mode_str = "Single collection" if self.single_mode else "Collection per folder" + dry_run_str = " (DRY RUN)" if self.dry_run else "" + + print("=" * 60) + print(f" OUTLINE IMPORT{dry_run_str}") + print("=" * 60) + print() + print(f"Source: {self.source_dir}/") + print(f"Target: {self.base_url}") + print(f"Mode: {mode_str}") + print() + + if self.dry_run: + print("[DRY RUN] No changes will be made") + print() + + # Health check + if not self.health_check(): + logger.error("Import aborted due to failed health check") + return + + print() + + # Get existing collections + self._get_collections() + + # Get source collections + source_collections = self.get_source_collections() + if not source_collections: + logger.error("No collections found in source directory") + return + + if self.single_mode: + # Single collection mode + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + single_collection_name = f"import_{timestamp}" + + logger.info(f"Creating single collection: {single_collection_name}") + collection_id = self._create_collection(single_collection_name) + if not collection_id and not self.dry_run: + logger.error("Failed to create import collection") + return + + self.stats["collections_created"] += 1 + + for collection_dir in source_collections: + metadata = self.load_collection_metadata(collection_dir) + if not metadata: + continue + + collection_name = metadata.get("name", collection_dir.name) + doc_count = metadata.get("expected_count", 0) + + print(f"\n{collection_name}/ ({doc_count} documents)") + + # Create parent document for this "collection" + parent_doc_id = self._create_document( + collection_id, + collection_name, + f"# {collection_name}\n\nImported collection.", + parent_document_id=None + ) + + if parent_doc_id: + self.stats["documents_created"] += 1 + + # Import documents under this parent + self.import_collection( + collection_dir, + target_collection_id=collection_id, + parent_document_id=parent_doc_id + ) + else: + # Standard mode: one collection per folder + for collection_dir in source_collections: + metadata = self.load_collection_metadata(collection_dir) + if not metadata: + continue + + collection_name = metadata.get("name", collection_dir.name) + doc_count = metadata.get("expected_count", 0) + + print(f"\n{collection_name}/ ({doc_count} documents)") + self.import_collection(collection_dir) + + # Print summary + duration = time.time() - start_time + print() + print("=" * 60) + print("SUMMARY") + print("=" * 60) + print(f" Collections: {self.stats['collections_created']} created, " + f"{self.stats['collections_skipped']} skipped, " + f"{self.stats['collections_errors']} errors") + print(f" Documents: {self.stats['documents_created']} created, " + f"{self.stats['documents_skipped']} skipped, " + f"{self.stats['documents_errors']} errors") + print(f" Duration: {duration:.1f} seconds") + print("=" * 60) + + if self.errors: + print() + logger.warning(f"Encountered {len(self.errors)} errors during import") + + +def load_settings(settings_file: str = "settings.json") -> Dict: + """Load settings from JSON file.""" + try: + with open(settings_file, 'r') as f: + return json.load(f) + except FileNotFoundError: + logger.error(f"Settings file not found: {settings_file}") + logger.error("Create a settings.json file with your configuration") + sys.exit(1) + except json.JSONDecodeError as e: + logger.error(f"Invalid JSON in settings file: {e}") + sys.exit(1) + + +def parse_args() -> argparse.Namespace: + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="Import markdown files into Outline wiki", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s Import all collections from outline_export/ + %(prog)s --dry-run Preview what would be imported + %(prog)s --single Import all into a single timestamped collection + %(prog)s -d backup/ Import from custom directory + %(prog)s --force Overwrite existing collections + """ + ) + parser.add_argument( + '-s', '--single', + action='store_true', + help='Import all into single timestamped collection' + ) + parser.add_argument( + '-n', '--dry-run', + action='store_true', + help='Preview operations without making changes' + ) + parser.add_argument( + '-d', '--source', + default=None, + help='Source directory (default: outline_export)' + ) + parser.add_argument( + '-v', '--verbose', + action='count', + default=0, + help='Increase verbosity (use -vv for debug)' + ) + parser.add_argument( + '-f', '--force', + action='store_true', + help='Overwrite existing collections (instead of skip)' + ) + parser.add_argument( + '--settings', + default='settings.json', + help='Path to settings file (default: settings.json)' + ) + return parser.parse_args() + + +def main() -> None: + """Main entry point.""" + args = parse_args() + + # Set log level based on verbosity + if args.verbose >= 2: + logger.setLevel(logging.DEBUG) + elif args.verbose == 1: + logger.setLevel(logging.INFO) + + # Load settings + settings = load_settings(args.settings) + + source = settings.get("source", {}) + import_config = settings.get("import", {}) + advanced = settings.get("advanced", {}) + + # Validate required settings + if not source.get("url") or not source.get("token"): + logger.error("Missing required settings: source.url and source.token") + sys.exit(1) + + # Determine source directory + source_dir = args.source or import_config.get("source_directory", "outline_export") + + # Create importer + importer = OutlineImporter( + base_url=source["url"], + api_token=source["token"], + source_dir=source_dir, + dry_run=args.dry_run, + single_mode=args.single, + force=args.force, + on_collection_exists=import_config.get("on_collection_exists", "skip"), + on_document_exists=import_config.get("on_document_exists", "skip"), + default_permission=import_config.get("default_permission", "read_write"), + request_timeout=advanced.get("request_timeout", 30), + retry_attempts=advanced.get("retry_attempts", 3), + retry_delay=advanced.get("retry_delay", 1.0), + rate_limit_delay=advanced.get("rate_limit_delay", 0.1) + ) + + # Run import + try: + importer.import_all() + except KeyboardInterrupt: + logger.warning("Import cancelled by user") + sys.exit(1) + except Exception as e: + logger.exception(f"Import failed: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main()