Files
outline-sync/outline_sync.py
domverse 56b141e301
All checks were successful
Deploy / deploy (push) Successful in 13s
feat: add step-by-step connectivity diagnostics to health_check
Break connectivity check into three stages — DNS resolution, TCP connect,
and API auth — each reported separately so failures pinpoint the exact layer.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-16 22:42:35 +01:00

805 lines
29 KiB
Python

#!/usr/bin/env python3
"""
Outline Sync — Phase 1: init
Creates a local vault mirroring Outline wiki structure.
Each document is written as a markdown file with YAML frontmatter
containing the Outline document ID and metadata for future syncs.
Git initialization is handled by sync.sh after this script exits.
Usage (called by sync.sh, not directly):
python3 outline_sync.py init --vault /vault --settings /work/settings.json
"""
import os
import sys
import re
import json
import socket
import subprocess
import time
import logging
import argparse
from pathlib import Path
from urllib.parse import urlparse
from typing import Dict, List, Optional, Tuple
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# ── Logging ───────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.WARNING,
format="%(asctime)s | %(levelname)-8s | %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger("outline_sync")
# ── Frontmatter helpers ───────────────────────────────────────────────────────
# Ordered fields written to every synced file
FRONTMATTER_FIELDS = [
"outline_id",
"outline_collection_id",
"outline_parent_id",
"outline_updated_at",
]
GITIGNORE = """\
# Obsidian internals
.obsidian/
# Sync config (contains API token)
settings.json
# Conflict sidecars (resolved manually)
*.conflict.md
# OS noise
.DS_Store
Thumbs.db
"""
GITATTRIBUTES = """\
# Normalize line endings for all markdown
*.md text eol=lf
# Sync log is append-only — never produce conflicts on it
_sync_log.md merge=union
"""
def build_frontmatter(fields: Dict[str, str]) -> str:
"""Serialize an ordered dict of fields to a YAML frontmatter block."""
lines = ["---"]
for key in FRONTMATTER_FIELDS:
value = fields.get(key, "")
if value: # omit empty values (e.g. outline_parent_id for root docs)
lines.append(f"{key}: {value}")
lines.append("---")
return "\n".join(lines) + "\n"
def parse_frontmatter(content: str) -> Tuple[Dict[str, str], str]:
"""
Parse a YAML frontmatter block from file content.
Returns (frontmatter_dict, body_text).
If no valid frontmatter block is found, returns ({}, original_content).
"""
if not content.startswith("---\n"):
return {}, content
end = content.find("\n---\n", 4)
if end == -1:
return {}, content
fm_text = content[4:end]
body = content[end + 5:] # skip past \n---\n
fm: Dict[str, str] = {}
for line in fm_text.splitlines():
if ": " in line:
key, _, value = line.partition(": ")
fm[key.strip()] = value.strip()
return fm, body
# ── Filename helpers ──────────────────────────────────────────────────────────
_INVALID = re.compile(r'[<>:"/\\|?*\x00-\x1f]')
_SPACES = re.compile(r"\s+")
def sanitize_name(name: str, max_len: int = 200) -> str:
"""Convert a document title to a safe filesystem name (no extension)."""
name = _INVALID.sub("_", name)
name = _SPACES.sub(" ", name).strip()
return name[:max_len] if name else "Untitled"
# ── OutlineSync ───────────────────────────────────────────────────────────────
class OutlineSync:
def __init__(self, base_url: str, api_token: str, vault_dir: Path):
self.base_url = base_url.rstrip("/")
self.api_token = api_token
self.vault_dir = Path(vault_dir)
self.session = requests.Session()
adapter = HTTPAdapter(max_retries=Retry(
total=3,
backoff_factor=1.0,
status_forcelist=[429, 500, 502, 503, 504],
))
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
self.headers = {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json",
}
self._doc_cache: Dict[str, Dict] = {}
self.stats = {"collections": 0, "documents": 0, "errors": 0}
# ── API layer ─────────────────────────────────────────────────────────────
def _api(
self,
endpoint: str,
data: Optional[Dict] = None,
method: str = "POST",
) -> Optional[Dict]:
url = f"{self.base_url}{endpoint}"
try:
if method == "POST":
r = self.session.post(
url, headers=self.headers, json=data or {}, timeout=30
)
else:
r = self.session.get(url, headers=self.headers, timeout=30)
if r.status_code == 200:
return r.json()
logger.error("API %s on %s", r.status_code, endpoint)
logger.debug("Response body: %s", r.text[:400])
return None
except requests.RequestException as exc:
logger.error("Request failed on %s: %s", endpoint, exc)
return None
def health_check(self) -> bool:
parsed = urlparse(self.base_url)
host = parsed.hostname or "outline"
port = parsed.port or (443 if parsed.scheme == "https" else 80)
print(f"Checking API connectivity to {self.base_url} ...")
# 1. DNS resolution
print(f" DNS resolve {host!r} ... ", end="", flush=True)
try:
ip = socket.gethostbyname(host)
print(f"✓ ({ip})")
except socket.gaierror as exc:
print(f"✗ DNS failed: {exc}")
return False
# 2. TCP reachability
print(f" TCP connect {ip}:{port} ... ", end="", flush=True)
try:
with socket.create_connection((host, port), timeout=5):
print("")
except (socket.timeout, ConnectionRefusedError, OSError) as exc:
print(f"{exc}")
return False
# 3. API authentication
print(f" API auth ... ", end="", flush=True)
result = self._api("/api/auth.info")
if result and "data" in result:
user = result["data"].get("user", {})
print(f"✓ (user: {user.get('name', 'unknown')})")
return True
print("✗ bad response")
return False
def get_collections(self) -> List[Dict]:
result = self._api("/api/collections.list")
if result and "data" in result:
return result["data"]
return []
def get_nav_tree(self, collection_id: str) -> List[Dict]:
"""Return the nested navigation tree for a collection."""
result = self._api("/api/collections.documents", {"id": collection_id})
if result and "data" in result:
return result["data"]
return []
def get_document_info(self, doc_id: str) -> Optional[Dict]:
"""Fetch full document content, using cache to avoid duplicate calls."""
if doc_id in self._doc_cache:
return self._doc_cache[doc_id]
result = self._api("/api/documents.info", {"id": doc_id})
if result and "data" in result:
self._doc_cache[doc_id] = result["data"]
return result["data"]
return None
# ── File writing ──────────────────────────────────────────────────────────
def _write_doc_file(
self,
path: Path,
doc_id: str,
collection_id: str,
parent_id: Optional[str],
) -> bool:
"""
Fetch document content and write it to path with YAML frontmatter.
File format:
---
outline_id: <id>
outline_collection_id: <id>
outline_parent_id: <id> ← omitted for root documents
outline_updated_at: <iso>
---
<document body from Outline API 'text' field>
"""
full = self.get_document_info(doc_id)
if not full:
logger.warning("Could not fetch document %s — skipping", doc_id)
self.stats["errors"] += 1
return False
fm = {
"outline_id": doc_id,
"outline_collection_id": collection_id,
"outline_parent_id": parent_id or "",
"outline_updated_at": full.get("updatedAt", ""),
}
body = full.get("text", "")
content = build_frontmatter(fm) + "\n" + body
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
self.stats["documents"] += 1
return True
def _unique_path(self, directory: Path, name: str) -> Path:
"""Return a non-colliding .md path, appending _N suffix if needed."""
candidate = directory / f"{name}.md"
counter = 1
while candidate.exists():
candidate = directory / f"{name}_{counter}.md"
counter += 1
return candidate
def _export_node(
self,
node: Dict,
parent_dir: Path,
collection_id: str,
parent_doc_id: Optional[str],
) -> None:
"""
Recursively export one nav-tree node and all its children.
Folder structure rule (from PRD §4.3):
- Leaf document (no children) → parent_dir/Title.md
- Document with children → parent_dir/Title/Title.md
parent_dir/Title/Child.md ...
This means the parent document and its children share the same folder.
"""
doc_id = node["id"]
title = node.get("title", "Untitled")
children = node.get("children", [])
safe = sanitize_name(title)
if children:
# Create a named subdirectory; the document itself lives inside it
doc_dir = parent_dir / safe
doc_dir.mkdir(parents=True, exist_ok=True)
doc_path = self._unique_path(doc_dir, safe)
child_dir = doc_dir
else:
doc_path = self._unique_path(parent_dir, safe)
child_dir = parent_dir # unused for leaf, but needed for recursion
logger.info(" Writing %s", doc_path.relative_to(self.vault_dir))
ok = self._write_doc_file(doc_path, doc_id, collection_id, parent_doc_id)
if ok:
for child in children:
self._export_node(child, child_dir, collection_id, doc_id)
def export_collection(self, collection: Dict) -> int:
"""Export all documents for one collection. Returns count written."""
coll_id = collection["id"]
coll_name = collection["name"]
safe_name = sanitize_name(coll_name)
coll_dir = self.vault_dir / safe_name
coll_dir.mkdir(parents=True, exist_ok=True)
print(f" {coll_name}/", end=" ", flush=True)
nav_tree = self.get_nav_tree(coll_id)
if not nav_tree:
print("(empty)")
self.stats["collections"] += 1
return 0
before = self.stats["documents"]
for node in nav_tree:
self._export_node(node, coll_dir, coll_id, None)
count = self.stats["documents"] - before
errors = self.stats["errors"]
status = f"{count} documents"
if errors:
status += f"{errors} errors"
print(status)
self.stats["collections"] += 1
return count
# ── Config files ──────────────────────────────────────────────────────────
def write_gitignore(self) -> None:
(self.vault_dir / ".gitignore").write_text(GITIGNORE, encoding="utf-8")
def write_gitattributes(self) -> None:
(self.vault_dir / ".gitattributes").write_text(GITATTRIBUTES, encoding="utf-8")
# ── Pull ──────────────────────────────────────────────────────────────────
def _git(self, *args: str) -> subprocess.CompletedProcess:
return subprocess.run(
["git", "-C", str(self.vault_dir), *args],
capture_output=True, text=True,
)
def _collect_vault_ids(self) -> Dict[str, Path]:
"""Return {outline_id: path} for every tracked .md file in the vault."""
result: Dict[str, Path] = {}
for md in self.vault_dir.rglob("*.md"):
if ".git" in md.parts:
continue
try:
fm, _ = parse_frontmatter(md.read_text(encoding="utf-8"))
oid = fm.get("outline_id")
if oid:
result[oid] = md
except OSError:
pass
return result
def cmd_pull(self) -> bool:
"""
Fetch latest document content from Outline and update the vault.
Runs entirely inside the outline-sync Docker container which has
git + requests. /vault is mounted from the host.
"""
print("Fetching collections from Outline...")
if not self.health_check():
print("✗ Cannot reach Outline API — aborting.")
return False
collections = self.get_collections()
if not collections:
print("No collections found.")
return True
# Collect all Outline documents
all_docs: List[Dict] = []
for coll in collections:
tree = self.get_nav_tree(coll["id"])
self._collect_tree_docs(tree, coll["id"], all_docs)
# Map current vault files by outline_id
vault_ids = self._collect_vault_ids()
updated = 0
created = 0
errors = 0
# Switch to outline branch for writes
self._git("stash", "--include-untracked", "-m", "webui: pre-pull stash")
self._git("checkout", "outline")
for doc_meta in all_docs:
doc_id = doc_meta["id"]
title = doc_meta.get("title", "Untitled")
coll_id = doc_meta["collection_id"]
parent_id = doc_meta.get("parent_id")
outline_ts = doc_meta.get("updatedAt", "")
full = self.get_document_info(doc_id)
if not full:
print(f"error: could not fetch {title}")
errors += 1
continue
outline_ts = full.get("updatedAt", "")
if doc_id in vault_ids:
path = vault_ids[doc_id]
try:
existing_fm, _ = parse_frontmatter(path.read_text(encoding="utf-8"))
local_ts = existing_fm.get("outline_updated_at", "")
except OSError:
local_ts = ""
if local_ts == outline_ts:
continue # already up to date
# Update existing file
fm = {
"outline_id": doc_id,
"outline_collection_id": coll_id,
"outline_parent_id": parent_id or "",
"outline_updated_at": outline_ts,
}
content = build_frontmatter(fm) + "\n" + full.get("text", "")
path.write_text(content, encoding="utf-8")
rel = str(path.relative_to(self.vault_dir))
print(f"ok: {rel} updated")
updated += 1
else:
# New document — determine path from collection + parent hierarchy
safe_coll = sanitize_name(
next((c["name"] for c in collections if c["id"] == coll_id), coll_id)
)
coll_dir = self.vault_dir / safe_coll
if parent_id and parent_id in vault_ids:
parent_path = vault_ids[parent_id]
target_dir = parent_path.parent / parent_path.stem
else:
target_dir = coll_dir
target_dir.mkdir(parents=True, exist_ok=True)
path = self._unique_path(target_dir, sanitize_name(title))
fm = {
"outline_id": doc_id,
"outline_collection_id": coll_id,
"outline_parent_id": parent_id or "",
"outline_updated_at": outline_ts,
}
content = build_frontmatter(fm) + "\n" + full.get("text", "")
path.write_text(content, encoding="utf-8")
vault_ids[doc_id] = path # register so child docs resolve parent correctly
rel = str(path.relative_to(self.vault_dir))
print(f"ok: {rel} created")
created += 1
# Commit on outline branch if anything changed
if updated + created > 0:
self._git("add", "-A")
ts = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
self._git("commit", "-m", f"sync: pull from Outline @ {ts}")
# Back to main + merge
self._git("checkout", "main")
if updated + created > 0:
self._git("merge", "outline", "--no-ff", "-m", f"merge: outline → main @ {ts}")
self._git("stash", "pop")
parts = []
if updated: parts.append(f"{updated} updated")
if created: parts.append(f"{created} created")
if errors: parts.append(f"{errors} errors")
summary = ", ".join(parts) if parts else "0 changes"
print(f"Done. {summary}.")
return errors == 0
def _collect_tree_docs(
self,
nodes: List[Dict],
collection_id: str,
out: List[Dict],
parent_id: Optional[str] = None,
) -> None:
for node in nodes:
doc = {
"id": node["id"],
"title": node.get("title", "Untitled"),
"collection_id": collection_id,
"parent_id": parent_id,
"updatedAt": node.get("updatedAt", ""),
}
out.append(doc)
for child in node.get("children", []):
self._collect_tree_docs([child], collection_id, out, node["id"])
# ── Push ──────────────────────────────────────────────────────────────────
def cmd_push(self) -> bool:
"""
Push local changes (main vs outline) to Outline.
For each file changed on main relative to outline:
- Has outline_id → call documents.update
- No outline_id → call documents.create, write back frontmatter
Runs entirely inside the outline-sync Docker container.
"""
print("Checking local changes...")
if not self.health_check():
print("✗ Cannot reach Outline API — aborting.")
return False
# Diff main vs outline
r = self._git("diff", "--name-status", "outline", "main", "--", "*.md")
if r.returncode != 0:
print(f"error: git diff failed: {r.stderr.strip()}")
return False
changed_files: List[Tuple[str, str]] = [] # (status, path)
for line in r.stdout.splitlines():
parts = line.split("\t", 1)
if len(parts) == 2:
status, path = parts
changed_files.append((status.strip(), path.strip()))
if not changed_files:
print("Done. 0 changes.")
return True
collections = self.get_collections()
coll_by_name = {sanitize_name(c["name"]): c["id"] for c in collections}
updated = 0
created = 0
errors = 0
for status, rel_path in changed_files:
if rel_path.startswith("_"):
continue # skip _sync_log.md etc.
full_path = self.vault_dir / rel_path
if not full_path.exists():
continue
print(f"processing: {rel_path}")
try:
content = full_path.read_text(encoding="utf-8")
except OSError as exc:
print(f"error: {rel_path}: {exc}")
errors += 1
continue
fm, body = parse_frontmatter(content)
doc_id = fm.get("outline_id")
title = full_path.stem
if doc_id:
# Update existing document
result = self._api("/api/documents.update", {
"id": doc_id,
"text": body,
})
if result and "data" in result:
new_ts = result["data"].get("updatedAt", "")
fm["outline_updated_at"] = new_ts
full_path.write_text(build_frontmatter(fm) + "\n" + body, encoding="utf-8")
print(f"ok: {rel_path} updated")
updated += 1
else:
print(f"error: {rel_path} update failed")
errors += 1
else:
# Create new document
path_parts = Path(rel_path).parts
coll_name = sanitize_name(path_parts[0]) if len(path_parts) > 1 else ""
coll_id = coll_by_name.get(coll_name)
if not coll_id:
# Create the collection
r_coll = self._api("/api/collections.create", {
"name": path_parts[0] if len(path_parts) > 1 else "Imported",
"private": False,
})
if r_coll and "data" in r_coll:
coll_id = r_coll["data"]["id"]
coll_by_name[coll_name] = coll_id
print(f"ok: collection '{path_parts[0]}' created (id: {coll_id})")
else:
print(f"error: could not create collection for {rel_path}")
errors += 1
continue
result = self._api("/api/documents.create", {
"title": title,
"text": body,
"collectionId": coll_id,
"publish": True,
})
if result and "data" in result:
new_id = result["data"]["id"]
new_ts = result["data"].get("updatedAt", "")
new_coll_id = result["data"].get("collectionId", coll_id)
fm = {
"outline_id": new_id,
"outline_collection_id": new_coll_id,
"outline_parent_id": "",
"outline_updated_at": new_ts,
}
full_path.write_text(build_frontmatter(fm) + "\n" + body, encoding="utf-8")
print(f"ok: {rel_path} created (id: {new_id})")
created += 1
else:
print(f"error: {rel_path} create failed")
errors += 1
# Commit frontmatter writebacks + advance outline branch
r_diff = self._git("diff", "--quiet")
if r_diff.returncode != 0:
self._git("add", "-A")
ts = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
self._git("commit", "-m", f"sync: push to Outline @ {ts}")
self._git("checkout", "outline")
self._git("merge", "main", "--ff-only")
self._git("checkout", "main")
parts = []
if updated: parts.append(f"{updated} updated")
if created: parts.append(f"{created} created")
if errors: parts.append(f"{errors} errors")
summary = ", ".join(parts) if parts else "0 changes"
print(f"Done. {summary}.")
return errors == 0
# ── Commands ──────────────────────────────────────────────────────────────
def cmd_init(self) -> bool:
"""
Initialize the vault from current Outline state.
Writes all documents as markdown files with YAML frontmatter.
Also writes .gitignore and .gitattributes.
Git initialization (branches, first commit) is done by sync.sh.
"""
print("════════════════════════════════════════════════════════════")
print(" OUTLINE SYNC — init (file export)")
print("════════════════════════════════════════════════════════════")
print()
print(f"Vault: {self.vault_dir}")
print(f"Source: {self.base_url}")
print()
# Guard: refuse if .git already exists
if (self.vault_dir / ".git").exists():
print(f"✗ Vault is already a git repo: {self.vault_dir}")
print(" Remove the directory first or choose a different path.")
return False
self.vault_dir.mkdir(parents=True, exist_ok=True)
if not self.health_check():
print("✗ Cannot reach Outline API — aborting.")
return False
print()
collections = self.get_collections()
if not collections:
print("✗ No collections found in Outline.")
return False
print(f"Exporting {len(collections)} collection(s)...")
for coll in collections:
self.export_collection(coll)
self.write_gitignore()
self.write_gitattributes()
print()
print("════════════════════════════════════════════════════════════")
c = self.stats["collections"]
d = self.stats["documents"]
e = self.stats["errors"]
print(f" {c} collection(s), {d} document(s) exported")
if e:
print(f" {e} error(s) — see warnings above")
print()
print(" Git setup will be completed by sync.sh.")
print("════════════════════════════════════════════════════════════")
return e == 0
# ── Settings + CLI ────────────────────────────────────────────────────────────
def load_settings(path: str) -> Dict:
try:
with open(path) as f:
return json.load(f)
except FileNotFoundError:
logger.error("Settings file not found: %s", path)
sys.exit(1)
except json.JSONDecodeError as exc:
logger.error("Invalid JSON in %s: %s", path, exc)
sys.exit(1)
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Outline ↔ Obsidian sync (Phase 1: init)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Commands:\n"
" init Export Outline to vault and write git config files\n"
),
)
p.add_argument("command", choices=["init", "pull", "push"], help="Sync command")
p.add_argument("--vault", required=True, help="Path to vault directory")
p.add_argument("--settings", default="settings.json", help="Path to settings file")
p.add_argument("--url", help="Outline API URL (overrides settings.source.url)")
p.add_argument("--token", help="API token (overrides settings.source.token)")
p.add_argument(
"-v", "--verbose",
action="count",
default=0,
help="Increase verbosity (-v for INFO, -vv for DEBUG)",
)
return p.parse_args()
def main() -> None:
args = parse_args()
if args.verbose >= 2:
logger.setLevel(logging.DEBUG)
elif args.verbose == 1:
logger.setLevel(logging.INFO)
settings = load_settings(args.settings)
source = settings.get("source", {})
url = args.url or source.get("url")
token = args.token or source.get("token")
if not url or not token:
logger.error(
"Missing API URL or token — set source.url and source.token "
"in settings.json, or pass --url / --token."
)
sys.exit(1)
sync = OutlineSync(
base_url = url,
api_token = token,
vault_dir = Path(args.vault),
)
if args.command == "init":
ok = sync.cmd_init()
elif args.command == "pull":
ok = sync.cmd_pull()
elif args.command == "push":
ok = sync.cmd_push()
else:
ok = False
sys.exit(0 if ok else 1)
if __name__ == "__main__":
main()