- Dry-run continues even without API access - Shows planned operations from metadata alone - Better health check handling for offline testing Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
908 lines
32 KiB
Python
908 lines
32 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Outline API Import Script
|
|
Imports markdown files back into Outline wiki with hierarchy preservation.
|
|
Companion script to outline_export_fixed.py.
|
|
|
|
Usage:
|
|
python3 outline_import.py [OPTIONS]
|
|
|
|
Options:
|
|
-s, --single Import all into single timestamped collection
|
|
-n, --dry-run Preview operations without making changes
|
|
-d, --source DIR Source directory (default: outline_export)
|
|
-v, --verbose Increase verbosity (-vv for debug)
|
|
-f, --force Overwrite existing collections
|
|
--settings FILE Path to settings file (default: settings.json)
|
|
-h, --help Show help message
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import logging
|
|
import time
|
|
import argparse
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s | %(levelname)-8s | %(message)s',
|
|
datefmt='%H:%M:%S'
|
|
)
|
|
logger = logging.getLogger('outline_import')
|
|
|
|
|
|
class TreePrinter:
|
|
"""Utility for printing tree-style output."""
|
|
|
|
PIPE = "│ "
|
|
ELBOW = "└── "
|
|
TEE = "├── "
|
|
BLANK = " "
|
|
|
|
@staticmethod
|
|
def format_line(title: str, status: str, message: str = None, prefix: str = "") -> str:
|
|
"""Format a tree line with status indicator."""
|
|
# Status symbols and labels
|
|
if status == "created":
|
|
symbol = "✓"
|
|
label = "created"
|
|
elif status == "skipped":
|
|
symbol = "○"
|
|
label = "skipped"
|
|
elif status == "dry_run":
|
|
symbol = "○"
|
|
label = "(dry run)"
|
|
else:
|
|
symbol = "✗"
|
|
label = message or "error"
|
|
|
|
# Truncate title if too long
|
|
max_title_len = 40
|
|
if len(title) > max_title_len:
|
|
display_title = title[:max_title_len - 3] + "..."
|
|
else:
|
|
display_title = title
|
|
|
|
# Format: prefix + title + padding + symbol + label
|
|
filename = f"{display_title}.md"
|
|
return f"{prefix}{filename:<45} {symbol} {label}"
|
|
|
|
|
|
class OutlineImporter:
|
|
"""Import documents into Outline with hierarchy preservation."""
|
|
|
|
def __init__(
|
|
self,
|
|
base_url: str,
|
|
api_token: str,
|
|
source_dir: str = "outline_export",
|
|
dry_run: bool = False,
|
|
single_mode: bool = False,
|
|
force: bool = False,
|
|
on_collection_exists: str = "skip",
|
|
on_document_exists: str = "skip",
|
|
default_permission: str = "read_write",
|
|
request_timeout: int = 30,
|
|
retry_attempts: int = 3,
|
|
retry_delay: float = 1.0,
|
|
rate_limit_delay: float = 0.1
|
|
):
|
|
self.base_url = base_url.rstrip('/')
|
|
self.api_token = api_token
|
|
self.source_dir = Path(source_dir)
|
|
self.dry_run = dry_run
|
|
self.single_mode = single_mode
|
|
self.force = force
|
|
self.on_collection_exists = on_collection_exists
|
|
self.on_document_exists = on_document_exists
|
|
self.default_permission = default_permission
|
|
self.request_timeout = request_timeout
|
|
self.retry_attempts = retry_attempts
|
|
self.retry_delay = retry_delay
|
|
self.rate_limit_delay = rate_limit_delay
|
|
|
|
# Setup session with retry logic
|
|
self.session = requests.Session()
|
|
retry_strategy = Retry(
|
|
total=3,
|
|
backoff_factor=1,
|
|
status_forcelist=[429, 500, 502, 503, 504],
|
|
)
|
|
adapter = HTTPAdapter(max_retries=retry_strategy)
|
|
self.session.mount("http://", adapter)
|
|
self.session.mount("https://", adapter)
|
|
|
|
self.headers = {
|
|
"Authorization": f"Bearer {self.api_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
# ID mapping: old_id -> new_id
|
|
self.id_map: Dict[str, str] = {}
|
|
|
|
# Track existing collections
|
|
self.existing_collections: Dict[str, str] = {} # name -> id
|
|
|
|
# Statistics
|
|
self.stats = {
|
|
"collections_created": 0,
|
|
"collections_skipped": 0,
|
|
"collections_errors": 0,
|
|
"documents_created": 0,
|
|
"documents_skipped": 0,
|
|
"documents_errors": 0,
|
|
}
|
|
|
|
# Error tracking
|
|
self.errors: List[Dict] = []
|
|
|
|
def _api_request(
|
|
self,
|
|
endpoint: str,
|
|
data: Optional[Dict] = None,
|
|
method: str = "POST"
|
|
) -> Optional[Dict]:
|
|
"""
|
|
Make API request with error handling and retry logic.
|
|
|
|
Args:
|
|
endpoint: API endpoint path (e.g., '/api/collections.list')
|
|
data: Request body data
|
|
method: HTTP method (POST or GET)
|
|
|
|
Returns:
|
|
Response data dict or None on failure
|
|
"""
|
|
url = f"{self.base_url}{endpoint}"
|
|
|
|
for attempt in range(self.retry_attempts):
|
|
try:
|
|
if method == "POST":
|
|
response = self.session.post(
|
|
url,
|
|
headers=self.headers,
|
|
json=data or {},
|
|
timeout=self.request_timeout
|
|
)
|
|
else:
|
|
response = self.session.get(
|
|
url,
|
|
headers=self.headers,
|
|
timeout=self.request_timeout
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
elif response.status_code in [429, 500, 502, 503, 504]:
|
|
if attempt < self.retry_attempts - 1:
|
|
wait_time = self.retry_delay * (2 ** attempt)
|
|
logger.warning(
|
|
f"API error {response.status_code} on {endpoint}, "
|
|
f"retrying in {wait_time:.1f}s (attempt {attempt + 1}/{self.retry_attempts})"
|
|
)
|
|
time.sleep(wait_time)
|
|
continue
|
|
|
|
# Non-retryable error or final attempt
|
|
logger.error(f"API error on {endpoint}: HTTP {response.status_code}")
|
|
logger.debug(f"Response: {response.text[:200]}")
|
|
return None
|
|
|
|
except requests.RequestException as e:
|
|
if attempt < self.retry_attempts - 1:
|
|
wait_time = self.retry_delay * (2 ** attempt)
|
|
logger.warning(
|
|
f"Request failed on {endpoint}: {e}, "
|
|
f"retrying in {wait_time:.1f}s"
|
|
)
|
|
time.sleep(wait_time)
|
|
else:
|
|
logger.error(f"All {self.retry_attempts} attempts failed on {endpoint}: {e}")
|
|
return None
|
|
|
|
return None
|
|
|
|
def health_check(self) -> bool:
|
|
"""
|
|
Verify API connectivity and authentication.
|
|
|
|
Returns:
|
|
True if API is accessible and authenticated
|
|
"""
|
|
print("Checking API connectivity...", end=" ")
|
|
result = self._api_request("/api/auth.info")
|
|
if result and "data" in result:
|
|
user = result["data"].get("user", {})
|
|
team = result["data"].get("team", {})
|
|
print("✓")
|
|
logger.debug(f"Authenticated as: {user.get('name', 'Unknown')} ({user.get('email', 'N/A')})")
|
|
logger.debug(f"Team: {team.get('name', 'Unknown')}")
|
|
return True
|
|
print("✗")
|
|
logger.error("Health check failed: Unable to verify authentication")
|
|
return False
|
|
|
|
def _get_collections(self) -> List[Dict]:
|
|
"""Fetch all existing collections from Outline."""
|
|
result = self._api_request("/api/collections.list")
|
|
if result and "data" in result:
|
|
collections = result["data"]
|
|
# Cache name -> id mapping
|
|
self.existing_collections = {c["name"]: c["id"] for c in collections}
|
|
return collections
|
|
return []
|
|
|
|
def _create_collection(self, name: str, permission: str = None) -> Optional[str]:
|
|
"""
|
|
Create a new collection.
|
|
|
|
Args:
|
|
name: Collection name
|
|
permission: Permission level ('read' or 'read_write')
|
|
|
|
Returns:
|
|
Collection ID if created, None on failure
|
|
"""
|
|
if permission is None:
|
|
permission = self.default_permission
|
|
|
|
if self.dry_run:
|
|
logger.info(f" [DRY RUN] Would create collection \"{name}\"")
|
|
return "dry-run-collection-id"
|
|
|
|
result = self._api_request("/api/collections.create", {
|
|
"name": name,
|
|
"permission": permission
|
|
})
|
|
|
|
if result and "data" in result:
|
|
collection_id = result["data"]["id"]
|
|
logger.debug(f"Created collection: {name} (id: {collection_id})")
|
|
self.existing_collections[name] = collection_id
|
|
return collection_id
|
|
|
|
logger.error(f"Failed to create collection: {name}")
|
|
return None
|
|
|
|
def _delete_collection(self, collection_id: str) -> bool:
|
|
"""
|
|
Delete a collection.
|
|
|
|
Args:
|
|
collection_id: Collection ID to delete
|
|
|
|
Returns:
|
|
True if deleted successfully
|
|
"""
|
|
if self.dry_run:
|
|
logger.info(f" [DRY RUN] Would delete collection {collection_id}")
|
|
return True
|
|
|
|
result = self._api_request("/api/collections.delete", {"id": collection_id})
|
|
return result is not None
|
|
|
|
def _create_document(
|
|
self,
|
|
collection_id: str,
|
|
title: str,
|
|
text: str,
|
|
parent_document_id: Optional[str] = None,
|
|
publish: bool = True
|
|
) -> Optional[str]:
|
|
"""
|
|
Create a new document in a collection.
|
|
|
|
Args:
|
|
collection_id: Parent collection ID
|
|
title: Document title
|
|
text: Markdown content
|
|
parent_document_id: Optional parent document ID for nesting
|
|
publish: Whether to publish immediately
|
|
|
|
Returns:
|
|
Document ID if created, None on failure
|
|
"""
|
|
if self.dry_run:
|
|
return "dry-run-document-id"
|
|
|
|
data = {
|
|
"collectionId": collection_id,
|
|
"title": title,
|
|
"text": text,
|
|
"publish": publish
|
|
}
|
|
if parent_document_id:
|
|
data["parentDocumentId"] = parent_document_id
|
|
|
|
# Rate limiting
|
|
if self.rate_limit_delay > 0:
|
|
time.sleep(self.rate_limit_delay)
|
|
|
|
result = self._api_request("/api/documents.create", data)
|
|
|
|
if result and "data" in result:
|
|
return result["data"]["id"]
|
|
|
|
logger.error(f"Failed to create document: {title}")
|
|
return None
|
|
|
|
def _get_documents_in_collection(self, collection_id: str) -> List[Dict]:
|
|
"""Fetch all documents in a collection."""
|
|
result = self._api_request("/api/documents.list", {"collectionId": collection_id})
|
|
if result and "data" in result:
|
|
return result["data"]
|
|
return []
|
|
|
|
def load_collection_metadata(self, collection_dir: Path) -> Optional[Dict]:
|
|
"""
|
|
Load _collection_metadata.json from a collection directory.
|
|
|
|
Args:
|
|
collection_dir: Path to collection directory
|
|
|
|
Returns:
|
|
Metadata dict or None if not found/invalid
|
|
"""
|
|
metadata_path = collection_dir / "_collection_metadata.json"
|
|
if not metadata_path.exists():
|
|
logger.warning(f"No metadata file found in {collection_dir}")
|
|
return None
|
|
|
|
try:
|
|
with open(metadata_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Invalid JSON in {metadata_path}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error reading {metadata_path}: {e}")
|
|
return None
|
|
|
|
def get_source_collections(self) -> List[Path]:
|
|
"""
|
|
Get list of collection directories from source.
|
|
|
|
Returns:
|
|
List of collection directory paths
|
|
"""
|
|
if not self.source_dir.exists():
|
|
logger.error(f"Source directory not found: {self.source_dir}")
|
|
return []
|
|
|
|
collections = []
|
|
for item in sorted(self.source_dir.iterdir()):
|
|
if item.is_dir() and not item.name.startswith('.'):
|
|
# Check for metadata file
|
|
if (item / "_collection_metadata.json").exists():
|
|
collections.append(item)
|
|
else:
|
|
logger.warning(f"Skipping {item.name}: no metadata file")
|
|
|
|
return collections
|
|
|
|
def build_document_tree(self, documents: List[Dict]) -> List[Dict]:
|
|
"""
|
|
Build ordered document tree from flat metadata list.
|
|
Uses topological sort to ensure parents are created before children.
|
|
|
|
Args:
|
|
documents: List of document metadata dicts from _collection_metadata.json
|
|
|
|
Returns:
|
|
List of root documents with nested children
|
|
"""
|
|
# Build lookup by ID
|
|
doc_by_id: Dict[str, Dict] = {}
|
|
for doc in documents:
|
|
doc_by_id[doc["id"]] = doc.copy()
|
|
doc_by_id[doc["id"]]["_children"] = []
|
|
|
|
# Build parent-child relationships
|
|
roots = []
|
|
for doc in documents:
|
|
parent_id = doc.get("parent_id")
|
|
if parent_id and parent_id in doc_by_id:
|
|
doc_by_id[parent_id]["_children"].append(doc_by_id[doc["id"]])
|
|
else:
|
|
roots.append(doc_by_id[doc["id"]])
|
|
|
|
return roots
|
|
|
|
def flatten_for_import(self, doc_tree: List[Dict], result: List[Dict] = None) -> List[Dict]:
|
|
"""
|
|
Flatten document tree in topological order (parents before children).
|
|
|
|
Args:
|
|
doc_tree: Nested document tree
|
|
result: Accumulator list (used internally)
|
|
|
|
Returns:
|
|
Flat list of documents in import order
|
|
"""
|
|
if result is None:
|
|
result = []
|
|
|
|
for doc in doc_tree:
|
|
# Add this document
|
|
result.append({
|
|
"id": doc["id"],
|
|
"title": doc["title"],
|
|
"filename": doc["filename"],
|
|
"parent_id": doc.get("parent_id"),
|
|
})
|
|
# Then add children recursively
|
|
children = doc.get("_children", []) or doc.get("children", [])
|
|
if children:
|
|
self.flatten_for_import(children, result)
|
|
|
|
return result
|
|
|
|
def read_document_content(self, collection_dir: Path, filename: str) -> Optional[str]:
|
|
"""
|
|
Read markdown content from file.
|
|
|
|
Args:
|
|
collection_dir: Path to collection directory
|
|
filename: Document filename
|
|
|
|
Returns:
|
|
Markdown content or None if not found
|
|
"""
|
|
filepath = collection_dir / filename
|
|
if not filepath.exists():
|
|
logger.warning(f"File not found: {filepath}")
|
|
return None
|
|
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
# Strip the header metadata added by export
|
|
# Format: # Title\n\n<!-- metadata -->\n\n---\n\nActual content
|
|
lines = content.split('\n')
|
|
content_start = 0
|
|
|
|
for i, line in enumerate(lines):
|
|
if line.strip() == '---':
|
|
content_start = i + 1
|
|
break
|
|
|
|
if content_start > 0 and content_start < len(lines):
|
|
return '\n'.join(lines[content_start:]).strip()
|
|
|
|
return content
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error reading {filepath}: {e}")
|
|
return None
|
|
|
|
def import_collection(
|
|
self,
|
|
collection_dir: Path,
|
|
target_collection_id: Optional[str] = None,
|
|
parent_document_id: Optional[str] = None
|
|
) -> Tuple[int, int, int]:
|
|
"""
|
|
Import a single collection with tree-style output.
|
|
|
|
Args:
|
|
collection_dir: Path to collection directory
|
|
target_collection_id: Override target collection (for single mode)
|
|
parent_document_id: Parent document ID (for single mode)
|
|
|
|
Returns:
|
|
Tuple of (created, skipped, errors)
|
|
"""
|
|
metadata = self.load_collection_metadata(collection_dir)
|
|
if not metadata:
|
|
self.stats["collections_errors"] += 1
|
|
self.errors.append({
|
|
"type": "collection",
|
|
"name": collection_dir.name,
|
|
"error": "Invalid or missing metadata"
|
|
})
|
|
return (0, 0, 1)
|
|
|
|
collection_name = metadata.get("name", collection_dir.name)
|
|
documents = metadata.get("documents", [])
|
|
|
|
# Count documents recursively
|
|
def count_docs(docs):
|
|
count = 0
|
|
for doc in docs:
|
|
count += 1
|
|
count += count_docs(doc.get("children", []))
|
|
return count
|
|
|
|
doc_count = count_docs(documents)
|
|
|
|
# Determine collection ID
|
|
collection_id = target_collection_id
|
|
if not collection_id:
|
|
# Check if collection exists
|
|
if collection_name in self.existing_collections:
|
|
if self.force:
|
|
print(f" Deleting existing collection \"{collection_name}\"...")
|
|
if not self.dry_run:
|
|
self._delete_collection(self.existing_collections[collection_name])
|
|
del self.existing_collections[collection_name]
|
|
else:
|
|
print(f" Collection exists, skipping...")
|
|
self.stats["collections_skipped"] += 1
|
|
return (0, doc_count, 0)
|
|
|
|
# Create collection
|
|
if self.dry_run:
|
|
print(f" [DRY RUN] Would create collection \"{collection_name}\"")
|
|
collection_id = "dry-run-collection-id"
|
|
else:
|
|
print(f" Creating collection...", end=" ")
|
|
collection_id = self._create_collection(collection_name)
|
|
if not collection_id:
|
|
print("✗ failed")
|
|
self.stats["collections_errors"] += 1
|
|
self.errors.append({
|
|
"type": "collection",
|
|
"name": collection_name,
|
|
"error": "Failed to create collection"
|
|
})
|
|
return (0, 0, 1)
|
|
print(f"✓ (id: {collection_id[:8]}...)")
|
|
|
|
self.stats["collections_created"] += 1
|
|
|
|
# Build document tree for tree-style import
|
|
doc_tree = self.build_document_tree(documents)
|
|
|
|
# Import documents with tree visualization
|
|
created = 0
|
|
skipped = 0
|
|
errors = 0
|
|
|
|
def import_tree_recursive(
|
|
docs: List[Dict],
|
|
prefix: str = " ",
|
|
coll_id: str = None,
|
|
default_parent_id: str = None
|
|
) -> Tuple[int, int, int]:
|
|
"""Recursively import documents with tree-style output."""
|
|
nonlocal created, skipped, errors
|
|
|
|
for i, doc in enumerate(docs):
|
|
is_last = (i == len(docs) - 1)
|
|
connector = TreePrinter.ELBOW if is_last else TreePrinter.TEE
|
|
|
|
old_id = doc["id"]
|
|
title = doc["title"]
|
|
filename = doc["filename"]
|
|
old_parent_id = doc.get("parent_id")
|
|
children = doc.get("_children", []) or doc.get("children", [])
|
|
|
|
# Resolve parent ID
|
|
new_parent_id = default_parent_id
|
|
if old_parent_id:
|
|
new_parent_id = self.id_map.get(old_parent_id, default_parent_id)
|
|
|
|
# Read content
|
|
content = self.read_document_content(collection_dir, filename)
|
|
if content is None:
|
|
line = TreePrinter.format_line(title, "error", "file not found", prefix + connector)
|
|
print(line)
|
|
errors += 1
|
|
self.stats["documents_errors"] += 1
|
|
self.errors.append({
|
|
"type": "document",
|
|
"title": title,
|
|
"collection": collection_name,
|
|
"error": "File not found"
|
|
})
|
|
# Skip children if parent failed
|
|
if children:
|
|
child_prefix = prefix + (TreePrinter.BLANK if is_last else TreePrinter.PIPE)
|
|
print(f"{child_prefix}└── (children skipped due to parent failure)")
|
|
continue
|
|
|
|
# Create document
|
|
if self.dry_run:
|
|
line = TreePrinter.format_line(title, "dry_run", prefix=prefix + connector)
|
|
print(line)
|
|
self.id_map[old_id] = f"dry-run-{old_id}"
|
|
created += 1
|
|
self.stats["documents_created"] += 1
|
|
else:
|
|
new_id = self._create_document(
|
|
coll_id,
|
|
title,
|
|
content,
|
|
parent_document_id=new_parent_id
|
|
)
|
|
|
|
if new_id:
|
|
self.id_map[old_id] = new_id
|
|
line = TreePrinter.format_line(title, "created", prefix=prefix + connector)
|
|
print(line)
|
|
created += 1
|
|
self.stats["documents_created"] += 1
|
|
else:
|
|
line = TreePrinter.format_line(title, "error", "API error", prefix + connector)
|
|
print(line)
|
|
errors += 1
|
|
self.stats["documents_errors"] += 1
|
|
self.errors.append({
|
|
"type": "document",
|
|
"title": title,
|
|
"collection": collection_name,
|
|
"error": "API error during creation"
|
|
})
|
|
# Skip children if parent failed
|
|
if children:
|
|
child_prefix = prefix + (TreePrinter.BLANK if is_last else TreePrinter.PIPE)
|
|
print(f"{child_prefix}└── (children skipped due to parent failure)")
|
|
continue
|
|
|
|
# Process children recursively
|
|
if children:
|
|
child_prefix = prefix + (TreePrinter.BLANK if is_last else TreePrinter.PIPE)
|
|
import_tree_recursive(
|
|
children,
|
|
prefix=child_prefix,
|
|
coll_id=coll_id,
|
|
default_parent_id=self.id_map.get(old_id, default_parent_id)
|
|
)
|
|
|
|
# Start recursive import
|
|
import_tree_recursive(
|
|
doc_tree,
|
|
prefix=" ",
|
|
coll_id=collection_id,
|
|
default_parent_id=parent_document_id
|
|
)
|
|
|
|
return (created, skipped, errors)
|
|
|
|
def import_all(self) -> None:
|
|
"""Import all collections from source directory."""
|
|
start_time = time.time()
|
|
|
|
# Print header
|
|
mode_str = "Single collection" if self.single_mode else "Collection per folder"
|
|
dry_run_str = " (DRY RUN)" if self.dry_run else ""
|
|
|
|
print("════════════════════════════════════════════════════════════")
|
|
print(f" OUTLINE IMPORT{dry_run_str}")
|
|
print("════════════════════════════════════════════════════════════")
|
|
print()
|
|
print(f"Source: {self.source_dir}/")
|
|
print(f"Target: {self.base_url}")
|
|
print(f"Mode: {mode_str}")
|
|
print()
|
|
|
|
if self.dry_run:
|
|
print("[DRY RUN] No changes will be made")
|
|
print()
|
|
|
|
# Health check (skip in dry-run mode if it fails)
|
|
health_ok = self.health_check()
|
|
if not health_ok:
|
|
if self.dry_run:
|
|
print(" (continuing in dry-run mode without API)")
|
|
print()
|
|
else:
|
|
logger.error("Import aborted due to failed health check")
|
|
return
|
|
else:
|
|
print()
|
|
|
|
# Get existing collections (skip in dry-run if health check failed)
|
|
if health_ok:
|
|
self._get_collections()
|
|
|
|
# Get source collections
|
|
source_collections = self.get_source_collections()
|
|
if not source_collections:
|
|
logger.error("No collections found in source directory")
|
|
return
|
|
|
|
if self.single_mode:
|
|
# Single collection mode
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
single_collection_name = f"import_{timestamp}"
|
|
|
|
logger.info(f"Creating single collection: {single_collection_name}")
|
|
collection_id = self._create_collection(single_collection_name)
|
|
if not collection_id and not self.dry_run:
|
|
logger.error("Failed to create import collection")
|
|
return
|
|
|
|
self.stats["collections_created"] += 1
|
|
|
|
for collection_dir in source_collections:
|
|
metadata = self.load_collection_metadata(collection_dir)
|
|
if not metadata:
|
|
continue
|
|
|
|
collection_name = metadata.get("name", collection_dir.name)
|
|
doc_count = metadata.get("expected_count", 0)
|
|
|
|
print(f"\n{collection_name}/ ({doc_count} documents)")
|
|
|
|
# Create parent document for this "collection"
|
|
parent_doc_id = self._create_document(
|
|
collection_id,
|
|
collection_name,
|
|
f"# {collection_name}\n\nImported collection.",
|
|
parent_document_id=None
|
|
)
|
|
|
|
if parent_doc_id:
|
|
self.stats["documents_created"] += 1
|
|
|
|
# Import documents under this parent
|
|
self.import_collection(
|
|
collection_dir,
|
|
target_collection_id=collection_id,
|
|
parent_document_id=parent_doc_id
|
|
)
|
|
else:
|
|
# Standard mode: one collection per folder
|
|
for collection_dir in source_collections:
|
|
metadata = self.load_collection_metadata(collection_dir)
|
|
if not metadata:
|
|
continue
|
|
|
|
collection_name = metadata.get("name", collection_dir.name)
|
|
doc_count = metadata.get("expected_count", 0)
|
|
|
|
print(f"\n{collection_name}/ ({doc_count} documents)")
|
|
self.import_collection(collection_dir)
|
|
|
|
# Print summary
|
|
duration = time.time() - start_time
|
|
print()
|
|
print("════════════════════════════════════════════════════════════")
|
|
if self.dry_run:
|
|
print("DRY RUN SUMMARY")
|
|
else:
|
|
print("SUMMARY")
|
|
print("════════════════════════════════════════════════════════════")
|
|
print(f" Collections: {self.stats['collections_created']} created, "
|
|
f"{self.stats['collections_skipped']} skipped, "
|
|
f"{self.stats['collections_errors']} errors")
|
|
print(f" Documents: {self.stats['documents_created']} created, "
|
|
f"{self.stats['documents_skipped']} skipped, "
|
|
f"{self.stats['documents_errors']} errors")
|
|
print(f" Duration: {duration:.1f} seconds")
|
|
print("════════════════════════════════════════════════════════════")
|
|
|
|
if self.errors:
|
|
print()
|
|
print(f"Encountered {len(self.errors)} errors during import:")
|
|
|
|
|
|
def load_settings(settings_file: str = "settings.json") -> Dict:
|
|
"""Load settings from JSON file."""
|
|
try:
|
|
with open(settings_file, 'r') as f:
|
|
return json.load(f)
|
|
except FileNotFoundError:
|
|
logger.error(f"Settings file not found: {settings_file}")
|
|
logger.error("Create a settings.json file with your configuration")
|
|
sys.exit(1)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Invalid JSON in settings file: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
"""Parse command line arguments."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Import markdown files into Outline wiki",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
%(prog)s Import all collections from outline_export/
|
|
%(prog)s --dry-run Preview what would be imported
|
|
%(prog)s --single Import all into a single timestamped collection
|
|
%(prog)s -d backup/ Import from custom directory
|
|
%(prog)s --force Overwrite existing collections
|
|
"""
|
|
)
|
|
parser.add_argument(
|
|
'-s', '--single',
|
|
action='store_true',
|
|
help='Import all into single timestamped collection'
|
|
)
|
|
parser.add_argument(
|
|
'-n', '--dry-run',
|
|
action='store_true',
|
|
help='Preview operations without making changes'
|
|
)
|
|
parser.add_argument(
|
|
'-d', '--source',
|
|
default=None,
|
|
help='Source directory (default: outline_export)'
|
|
)
|
|
parser.add_argument(
|
|
'-v', '--verbose',
|
|
action='count',
|
|
default=0,
|
|
help='Increase verbosity (use -vv for debug)'
|
|
)
|
|
parser.add_argument(
|
|
'-f', '--force',
|
|
action='store_true',
|
|
help='Overwrite existing collections (instead of skip)'
|
|
)
|
|
parser.add_argument(
|
|
'--settings',
|
|
default='settings.json',
|
|
help='Path to settings file (default: settings.json)'
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> None:
|
|
"""Main entry point."""
|
|
args = parse_args()
|
|
|
|
# Set log level based on verbosity
|
|
if args.verbose >= 2:
|
|
logger.setLevel(logging.DEBUG)
|
|
elif args.verbose == 1:
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# Load settings
|
|
settings = load_settings(args.settings)
|
|
|
|
source = settings.get("source", {})
|
|
import_config = settings.get("import", {})
|
|
advanced = settings.get("advanced", {})
|
|
|
|
# Validate required settings
|
|
if not source.get("url") or not source.get("token"):
|
|
logger.error("Missing required settings: source.url and source.token")
|
|
sys.exit(1)
|
|
|
|
# Determine source directory
|
|
source_dir = args.source or import_config.get("source_directory", "outline_export")
|
|
|
|
# Create importer
|
|
importer = OutlineImporter(
|
|
base_url=source["url"],
|
|
api_token=source["token"],
|
|
source_dir=source_dir,
|
|
dry_run=args.dry_run,
|
|
single_mode=args.single,
|
|
force=args.force,
|
|
on_collection_exists=import_config.get("on_collection_exists", "skip"),
|
|
on_document_exists=import_config.get("on_document_exists", "skip"),
|
|
default_permission=import_config.get("default_permission", "read_write"),
|
|
request_timeout=advanced.get("request_timeout", 30),
|
|
retry_attempts=advanced.get("retry_attempts", 3),
|
|
retry_delay=advanced.get("retry_delay", 1.0),
|
|
rate_limit_delay=advanced.get("rate_limit_delay", 0.1)
|
|
)
|
|
|
|
# Run import
|
|
try:
|
|
importer.import_all()
|
|
except KeyboardInterrupt:
|
|
logger.warning("Import cancelled by user")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logger.exception(f"Import failed: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|