Files
outline-sync/outline_import.py
Claude 290030f5e8 Phase 1-5: Core import script with full functionality
- OutlineImporter class with settings loading
- API helpers with retry logic
- CLI argument parsing
- Metadata loading and document tree building
- Collection import with existence checking
- Document import with ID mapping for hierarchy
- Single collection mode
- Dry-run support

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-19 22:42:49 +01:00

824 lines
28 KiB
Python

#!/usr/bin/env python3
"""
Outline API Import Script
Imports markdown files back into Outline wiki with hierarchy preservation.
Companion script to outline_export_fixed.py.
Usage:
python3 outline_import.py [OPTIONS]
Options:
-s, --single Import all into single timestamped collection
-n, --dry-run Preview operations without making changes
-d, --source DIR Source directory (default: outline_export)
-v, --verbose Increase verbosity (-vv for debug)
-f, --force Overwrite existing collections
--settings FILE Path to settings file (default: settings.json)
-h, --help Show help message
"""
import os
import sys
import json
import logging
import time
import argparse
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s | %(levelname)-8s | %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger('outline_import')
class OutlineImporter:
"""Import documents into Outline with hierarchy preservation."""
def __init__(
self,
base_url: str,
api_token: str,
source_dir: str = "outline_export",
dry_run: bool = False,
single_mode: bool = False,
force: bool = False,
on_collection_exists: str = "skip",
on_document_exists: str = "skip",
default_permission: str = "read_write",
request_timeout: int = 30,
retry_attempts: int = 3,
retry_delay: float = 1.0,
rate_limit_delay: float = 0.1
):
self.base_url = base_url.rstrip('/')
self.api_token = api_token
self.source_dir = Path(source_dir)
self.dry_run = dry_run
self.single_mode = single_mode
self.force = force
self.on_collection_exists = on_collection_exists
self.on_document_exists = on_document_exists
self.default_permission = default_permission
self.request_timeout = request_timeout
self.retry_attempts = retry_attempts
self.retry_delay = retry_delay
self.rate_limit_delay = rate_limit_delay
# Setup session with retry logic
self.session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
self.headers = {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json"
}
# ID mapping: old_id -> new_id
self.id_map: Dict[str, str] = {}
# Track existing collections
self.existing_collections: Dict[str, str] = {} # name -> id
# Statistics
self.stats = {
"collections_created": 0,
"collections_skipped": 0,
"collections_errors": 0,
"documents_created": 0,
"documents_skipped": 0,
"documents_errors": 0,
}
# Error tracking
self.errors: List[Dict] = []
def _api_request(
self,
endpoint: str,
data: Optional[Dict] = None,
method: str = "POST"
) -> Optional[Dict]:
"""
Make API request with error handling and retry logic.
Args:
endpoint: API endpoint path (e.g., '/api/collections.list')
data: Request body data
method: HTTP method (POST or GET)
Returns:
Response data dict or None on failure
"""
url = f"{self.base_url}{endpoint}"
for attempt in range(self.retry_attempts):
try:
if method == "POST":
response = self.session.post(
url,
headers=self.headers,
json=data or {},
timeout=self.request_timeout
)
else:
response = self.session.get(
url,
headers=self.headers,
timeout=self.request_timeout
)
if response.status_code == 200:
return response.json()
elif response.status_code in [429, 500, 502, 503, 504]:
if attempt < self.retry_attempts - 1:
wait_time = self.retry_delay * (2 ** attempt)
logger.warning(
f"API error {response.status_code} on {endpoint}, "
f"retrying in {wait_time:.1f}s (attempt {attempt + 1}/{self.retry_attempts})"
)
time.sleep(wait_time)
continue
# Non-retryable error or final attempt
logger.error(f"API error on {endpoint}: HTTP {response.status_code}")
logger.debug(f"Response: {response.text[:200]}")
return None
except requests.RequestException as e:
if attempt < self.retry_attempts - 1:
wait_time = self.retry_delay * (2 ** attempt)
logger.warning(
f"Request failed on {endpoint}: {e}, "
f"retrying in {wait_time:.1f}s"
)
time.sleep(wait_time)
else:
logger.error(f"All {self.retry_attempts} attempts failed on {endpoint}: {e}")
return None
return None
def health_check(self) -> bool:
"""
Verify API connectivity and authentication.
Returns:
True if API is accessible and authenticated
"""
logger.info("Checking API connectivity...")
result = self._api_request("/api/auth.info")
if result and "data" in result:
user = result["data"].get("user", {})
team = result["data"].get("team", {})
logger.info(f"Authenticated as: {user.get('name', 'Unknown')} ({user.get('email', 'N/A')})")
logger.info(f"Team: {team.get('name', 'Unknown')}")
return True
logger.error("Health check failed: Unable to verify authentication")
return False
def _get_collections(self) -> List[Dict]:
"""Fetch all existing collections from Outline."""
result = self._api_request("/api/collections.list")
if result and "data" in result:
collections = result["data"]
# Cache name -> id mapping
self.existing_collections = {c["name"]: c["id"] for c in collections}
return collections
return []
def _create_collection(self, name: str, permission: str = None) -> Optional[str]:
"""
Create a new collection.
Args:
name: Collection name
permission: Permission level ('read' or 'read_write')
Returns:
Collection ID if created, None on failure
"""
if permission is None:
permission = self.default_permission
if self.dry_run:
logger.info(f" [DRY RUN] Would create collection \"{name}\"")
return "dry-run-collection-id"
result = self._api_request("/api/collections.create", {
"name": name,
"permission": permission
})
if result and "data" in result:
collection_id = result["data"]["id"]
logger.debug(f"Created collection: {name} (id: {collection_id})")
self.existing_collections[name] = collection_id
return collection_id
logger.error(f"Failed to create collection: {name}")
return None
def _delete_collection(self, collection_id: str) -> bool:
"""
Delete a collection.
Args:
collection_id: Collection ID to delete
Returns:
True if deleted successfully
"""
if self.dry_run:
logger.info(f" [DRY RUN] Would delete collection {collection_id}")
return True
result = self._api_request("/api/collections.delete", {"id": collection_id})
return result is not None
def _create_document(
self,
collection_id: str,
title: str,
text: str,
parent_document_id: Optional[str] = None,
publish: bool = True
) -> Optional[str]:
"""
Create a new document in a collection.
Args:
collection_id: Parent collection ID
title: Document title
text: Markdown content
parent_document_id: Optional parent document ID for nesting
publish: Whether to publish immediately
Returns:
Document ID if created, None on failure
"""
if self.dry_run:
return "dry-run-document-id"
data = {
"collectionId": collection_id,
"title": title,
"text": text,
"publish": publish
}
if parent_document_id:
data["parentDocumentId"] = parent_document_id
# Rate limiting
if self.rate_limit_delay > 0:
time.sleep(self.rate_limit_delay)
result = self._api_request("/api/documents.create", data)
if result and "data" in result:
return result["data"]["id"]
logger.error(f"Failed to create document: {title}")
return None
def _get_documents_in_collection(self, collection_id: str) -> List[Dict]:
"""Fetch all documents in a collection."""
result = self._api_request("/api/documents.list", {"collectionId": collection_id})
if result and "data" in result:
return result["data"]
return []
def load_collection_metadata(self, collection_dir: Path) -> Optional[Dict]:
"""
Load _collection_metadata.json from a collection directory.
Args:
collection_dir: Path to collection directory
Returns:
Metadata dict or None if not found/invalid
"""
metadata_path = collection_dir / "_collection_metadata.json"
if not metadata_path.exists():
logger.warning(f"No metadata file found in {collection_dir}")
return None
try:
with open(metadata_path, 'r', encoding='utf-8') as f:
return json.load(f)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in {metadata_path}: {e}")
return None
except Exception as e:
logger.error(f"Error reading {metadata_path}: {e}")
return None
def get_source_collections(self) -> List[Path]:
"""
Get list of collection directories from source.
Returns:
List of collection directory paths
"""
if not self.source_dir.exists():
logger.error(f"Source directory not found: {self.source_dir}")
return []
collections = []
for item in sorted(self.source_dir.iterdir()):
if item.is_dir() and not item.name.startswith('.'):
# Check for metadata file
if (item / "_collection_metadata.json").exists():
collections.append(item)
else:
logger.warning(f"Skipping {item.name}: no metadata file")
return collections
def build_document_tree(self, documents: List[Dict]) -> List[Dict]:
"""
Build ordered document tree from flat metadata list.
Uses topological sort to ensure parents are created before children.
Args:
documents: List of document metadata dicts from _collection_metadata.json
Returns:
List of root documents with nested children
"""
# Build lookup by ID
doc_by_id: Dict[str, Dict] = {}
for doc in documents:
doc_by_id[doc["id"]] = doc.copy()
doc_by_id[doc["id"]]["_children"] = []
# Build parent-child relationships
roots = []
for doc in documents:
parent_id = doc.get("parent_id")
if parent_id and parent_id in doc_by_id:
doc_by_id[parent_id]["_children"].append(doc_by_id[doc["id"]])
else:
roots.append(doc_by_id[doc["id"]])
return roots
def flatten_for_import(self, doc_tree: List[Dict], result: List[Dict] = None) -> List[Dict]:
"""
Flatten document tree in topological order (parents before children).
Args:
doc_tree: Nested document tree
result: Accumulator list (used internally)
Returns:
Flat list of documents in import order
"""
if result is None:
result = []
for doc in doc_tree:
# Add this document
result.append({
"id": doc["id"],
"title": doc["title"],
"filename": doc["filename"],
"parent_id": doc.get("parent_id"),
})
# Then add children recursively
children = doc.get("_children", []) or doc.get("children", [])
if children:
self.flatten_for_import(children, result)
return result
def read_document_content(self, collection_dir: Path, filename: str) -> Optional[str]:
"""
Read markdown content from file.
Args:
collection_dir: Path to collection directory
filename: Document filename
Returns:
Markdown content or None if not found
"""
filepath = collection_dir / filename
if not filepath.exists():
logger.warning(f"File not found: {filepath}")
return None
try:
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
# Strip the header metadata added by export
# Format: # Title\n\n<!-- metadata -->\n\n---\n\nActual content
lines = content.split('\n')
content_start = 0
for i, line in enumerate(lines):
if line.strip() == '---':
content_start = i + 1
break
if content_start > 0 and content_start < len(lines):
return '\n'.join(lines[content_start:]).strip()
return content
except Exception as e:
logger.error(f"Error reading {filepath}: {e}")
return None
def import_collection(
self,
collection_dir: Path,
target_collection_id: Optional[str] = None,
parent_document_id: Optional[str] = None
) -> Tuple[int, int, int]:
"""
Import a single collection.
Args:
collection_dir: Path to collection directory
target_collection_id: Override target collection (for single mode)
parent_document_id: Parent document ID (for single mode)
Returns:
Tuple of (created, skipped, errors)
"""
metadata = self.load_collection_metadata(collection_dir)
if not metadata:
self.stats["collections_errors"] += 1
self.errors.append({
"type": "collection",
"name": collection_dir.name,
"error": "Invalid or missing metadata"
})
return (0, 0, 1)
collection_name = metadata.get("name", collection_dir.name)
documents = metadata.get("documents", [])
# Count documents recursively
def count_docs(docs):
count = 0
for doc in docs:
count += 1
count += count_docs(doc.get("children", []))
return count
doc_count = count_docs(documents)
# Determine collection ID
collection_id = target_collection_id
if not collection_id:
# Check if collection exists
if collection_name in self.existing_collections:
if self.force:
logger.info(f" Deleting existing collection \"{collection_name}\"...")
if not self.dry_run:
self._delete_collection(self.existing_collections[collection_name])
del self.existing_collections[collection_name]
else:
logger.info(f" Collection exists, skipping...")
self.stats["collections_skipped"] += 1
return (0, doc_count, 0)
# Create collection
logger.info(f" Creating collection...")
collection_id = self._create_collection(collection_name)
if not collection_id:
self.stats["collections_errors"] += 1
self.errors.append({
"type": "collection",
"name": collection_name,
"error": "Failed to create collection"
})
return (0, 0, 1)
if not self.dry_run:
logger.info(f" ✓ (id: {collection_id[:8]}...)")
self.stats["collections_created"] += 1
# Build document tree and flatten for import
doc_tree = self.build_document_tree(documents)
import_order = self.flatten_for_import(doc_tree)
# Import documents
created = 0
skipped = 0
errors = 0
for doc_meta in import_order:
old_id = doc_meta["id"]
title = doc_meta["title"]
filename = doc_meta["filename"]
old_parent_id = doc_meta.get("parent_id")
# Resolve parent ID
new_parent_id = parent_document_id # Default for single mode
if old_parent_id:
new_parent_id = self.id_map.get(old_parent_id)
if not new_parent_id and not self.dry_run:
logger.warning(f"Parent not found for {title}, creating as root-level")
# Read content
content = self.read_document_content(collection_dir, filename)
if content is None:
self._print_doc_status(title, "error", "file not found")
errors += 1
self.stats["documents_errors"] += 1
self.errors.append({
"type": "document",
"title": title,
"collection": collection_name,
"error": "File not found"
})
continue
# Create document
new_id = self._create_document(
collection_id,
title,
content,
parent_document_id=new_parent_id
)
if new_id:
self.id_map[old_id] = new_id
self._print_doc_status(title, "created")
created += 1
self.stats["documents_created"] += 1
else:
self._print_doc_status(title, "error", "API error")
errors += 1
self.stats["documents_errors"] += 1
self.errors.append({
"type": "document",
"title": title,
"collection": collection_name,
"error": "API error during creation"
})
return (created, skipped, errors)
def _print_doc_status(self, title: str, status: str, message: str = None):
"""Print document import status."""
if status == "created":
symbol = ""
label = "created"
elif status == "skipped":
symbol = ""
label = "skipped"
else:
symbol = ""
label = message or "error"
# This will be enhanced in Phase 6 with tree formatting
logger.info(f" {symbol} {title[:50]:<50} {label}")
def import_all(self) -> None:
"""Import all collections from source directory."""
start_time = time.time()
# Print header
mode_str = "Single collection" if self.single_mode else "Collection per folder"
dry_run_str = " (DRY RUN)" if self.dry_run else ""
print("=" * 60)
print(f" OUTLINE IMPORT{dry_run_str}")
print("=" * 60)
print()
print(f"Source: {self.source_dir}/")
print(f"Target: {self.base_url}")
print(f"Mode: {mode_str}")
print()
if self.dry_run:
print("[DRY RUN] No changes will be made")
print()
# Health check
if not self.health_check():
logger.error("Import aborted due to failed health check")
return
print()
# Get existing collections
self._get_collections()
# Get source collections
source_collections = self.get_source_collections()
if not source_collections:
logger.error("No collections found in source directory")
return
if self.single_mode:
# Single collection mode
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
single_collection_name = f"import_{timestamp}"
logger.info(f"Creating single collection: {single_collection_name}")
collection_id = self._create_collection(single_collection_name)
if not collection_id and not self.dry_run:
logger.error("Failed to create import collection")
return
self.stats["collections_created"] += 1
for collection_dir in source_collections:
metadata = self.load_collection_metadata(collection_dir)
if not metadata:
continue
collection_name = metadata.get("name", collection_dir.name)
doc_count = metadata.get("expected_count", 0)
print(f"\n{collection_name}/ ({doc_count} documents)")
# Create parent document for this "collection"
parent_doc_id = self._create_document(
collection_id,
collection_name,
f"# {collection_name}\n\nImported collection.",
parent_document_id=None
)
if parent_doc_id:
self.stats["documents_created"] += 1
# Import documents under this parent
self.import_collection(
collection_dir,
target_collection_id=collection_id,
parent_document_id=parent_doc_id
)
else:
# Standard mode: one collection per folder
for collection_dir in source_collections:
metadata = self.load_collection_metadata(collection_dir)
if not metadata:
continue
collection_name = metadata.get("name", collection_dir.name)
doc_count = metadata.get("expected_count", 0)
print(f"\n{collection_name}/ ({doc_count} documents)")
self.import_collection(collection_dir)
# Print summary
duration = time.time() - start_time
print()
print("=" * 60)
print("SUMMARY")
print("=" * 60)
print(f" Collections: {self.stats['collections_created']} created, "
f"{self.stats['collections_skipped']} skipped, "
f"{self.stats['collections_errors']} errors")
print(f" Documents: {self.stats['documents_created']} created, "
f"{self.stats['documents_skipped']} skipped, "
f"{self.stats['documents_errors']} errors")
print(f" Duration: {duration:.1f} seconds")
print("=" * 60)
if self.errors:
print()
logger.warning(f"Encountered {len(self.errors)} errors during import")
def load_settings(settings_file: str = "settings.json") -> Dict:
"""Load settings from JSON file."""
try:
with open(settings_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
logger.error(f"Settings file not found: {settings_file}")
logger.error("Create a settings.json file with your configuration")
sys.exit(1)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in settings file: {e}")
sys.exit(1)
def parse_args() -> argparse.Namespace:
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Import markdown files into Outline wiki",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s Import all collections from outline_export/
%(prog)s --dry-run Preview what would be imported
%(prog)s --single Import all into a single timestamped collection
%(prog)s -d backup/ Import from custom directory
%(prog)s --force Overwrite existing collections
"""
)
parser.add_argument(
'-s', '--single',
action='store_true',
help='Import all into single timestamped collection'
)
parser.add_argument(
'-n', '--dry-run',
action='store_true',
help='Preview operations without making changes'
)
parser.add_argument(
'-d', '--source',
default=None,
help='Source directory (default: outline_export)'
)
parser.add_argument(
'-v', '--verbose',
action='count',
default=0,
help='Increase verbosity (use -vv for debug)'
)
parser.add_argument(
'-f', '--force',
action='store_true',
help='Overwrite existing collections (instead of skip)'
)
parser.add_argument(
'--settings',
default='settings.json',
help='Path to settings file (default: settings.json)'
)
return parser.parse_args()
def main() -> None:
"""Main entry point."""
args = parse_args()
# Set log level based on verbosity
if args.verbose >= 2:
logger.setLevel(logging.DEBUG)
elif args.verbose == 1:
logger.setLevel(logging.INFO)
# Load settings
settings = load_settings(args.settings)
source = settings.get("source", {})
import_config = settings.get("import", {})
advanced = settings.get("advanced", {})
# Validate required settings
if not source.get("url") or not source.get("token"):
logger.error("Missing required settings: source.url and source.token")
sys.exit(1)
# Determine source directory
source_dir = args.source or import_config.get("source_directory", "outline_export")
# Create importer
importer = OutlineImporter(
base_url=source["url"],
api_token=source["token"],
source_dir=source_dir,
dry_run=args.dry_run,
single_mode=args.single,
force=args.force,
on_collection_exists=import_config.get("on_collection_exists", "skip"),
on_document_exists=import_config.get("on_document_exists", "skip"),
default_permission=import_config.get("default_permission", "read_write"),
request_timeout=advanced.get("request_timeout", 30),
retry_attempts=advanced.get("retry_attempts", 3),
retry_delay=advanced.get("retry_delay", 1.0),
rate_limit_delay=advanced.get("rate_limit_delay", 0.1)
)
# Run import
try:
importer.import_all()
except KeyboardInterrupt:
logger.warning("Import cancelled by user")
sys.exit(1)
except Exception as e:
logger.exception(f"Import failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()