Files
hermes-mcp/scripts/kimi-session-to-obsidian.py
2026-04-29 09:52:53 -04:00

251 lines
8.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""
kimi-session-to-obsidian.py
Parse a Kimi Code session JSONL and convert it to a markdown note
in the Obsidian vault under Kimi Conversations/.
Usage:
python3 kimi-session-to-obsidian.py <session-id> [topic]
Example:
python3 kimi-session-to-obsidian.py 4b234c03673220f26266132c420581d3 hermes-mcp-oauth-fix
The script reads from ~/.kimi/sessions/<session-id>/*context.jsonl
and writes to ~/obsidian/vaults/Kimi Conversations/YYYY-MM-DD-<topic>.md
"""
import json
import os
import sys
from datetime import datetime
from pathlib import Path
KIMI_SESSIONS_DIR = Path.home() / ".kimi" / "sessions"
VAULT_DIR = Path.home() / "obsidian" / "vaults"
OUTPUT_DIR = VAULT_DIR / "Kimi Conversations"
def extract_text_content(content) -> str:
"""Extract human-readable text from Kimi Code's structured content format."""
if isinstance(content, list):
texts = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
texts.append(str(item.get("text", "")))
elif isinstance(item, dict) and item.get("type") == "think":
# Skip think blocks
continue
elif isinstance(item, str):
texts.append(item)
return " ".join(texts)
return str(content) if content else ""
def parse_context_jsonl(path: Path) -> list[dict]:
messages = []
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
# Normalize content to plain text immediately
if "content" in obj:
obj["content_plain"] = extract_text_content(obj["content"])
messages.append(obj)
return messages
def summarize_session(messages: list[dict]) -> dict:
"""Extract summary metadata from a Kimi Code session."""
user_msgs = []
assistant_msgs = []
tool_calls = []
files_modified = set()
project_hint = ""
for m in messages:
role = m.get("role", "")
content = m.get("content_plain", "")
if role == "user" and content:
user_msgs.append(content)
# Try to detect project from first user message
if not project_hint and len(content) < 200:
words = content.split()
for w in words:
w = w.strip(".,;:!?\"/")
if "/" in w and not w.startswith("{"):
project_hint = w.split("/")[-1] if "/" in w else w
break
elif "." in w and not w.startswith("{") and not w.startswith("http"):
project_hint = w.split(".")[0]
break
elif role == "assistant":
assistant_msgs.append(content)
for tc in m.get("tool_calls", []):
fn = tc.get("function", {})
name = fn.get("name", "")
args = fn.get("arguments", "")
tool_calls.append({"name": name, "arguments": args})
# Detect filesystem writes
if name in ("WriteFile", "StrReplaceFile") and isinstance(args, str):
try:
a = json.loads(args)
p = a.get("path", "")
if p:
files_modified.add(p)
except Exception:
pass
elif name == "Shell":
try:
a = json.loads(args)
cmd = a.get("command", "")
# git commits, docker builds, etc.
if cmd:
tool_calls[-1]["command_preview"] = cmd[:200]
except Exception:
pass
return {
"user_message_count": len(user_msgs),
"assistant_message_count": len(assistant_msgs),
"tool_call_count": len(tool_calls),
"first_user_message": user_msgs[0] if user_msgs else "",
"last_assistant_message": assistant_msgs[-1] if assistant_msgs else "",
"files_modified": sorted(files_modified),
"tool_names_used": sorted({t["name"] for t in tool_calls}),
"project_hint": project_hint,
}
def build_markdown(session_id: str, topic: str, summary: dict, messages: list[dict]) -> str:
date_str = datetime.now().strftime("%Y-%m-%d")
# Build files section
files_md = "\n".join(f"- `{f}`" for f in summary["files_modified"]) if summary["files_modified"] else "- *(none detected)*"
# Build tools section
tools_md = "\n".join(f"- `{t}`" for t in summary["tool_names_used"]) if summary["tool_names_used"] else "- *(none detected)*"
# Build a lightweight transcript of key exchanges
transcript = []
for m in messages:
role = m.get("role", "")
content = m.get("content_plain", "")
if role == "user" and content:
transcript.append(f"> **User:** {content[:300]}{'...' if len(content) > 300 else ''}\n")
elif role == "assistant" and content and not content.startswith("[Tool:"):
transcript.append(f"> **Kimi:** {content[:300]}{'...' if len(content) > 300 else ''}\n")
if len(transcript) >= 20: # Cap to keep file size reasonable
transcript.append("> *(transcript truncated — full session in ~/.kimi/sessions/)*\n")
break
transcript_md = "\n".join(transcript) if transcript else "- *(no transcript extracted)*"
md = f"""---
title: {topic.replace("-", " ").title()}
date: {date_str}
agent: Kimi Code
session_id: {session_id}
tags:
- kimi-code
- session-archive
project: {summary["project_hint"] or "unknown"}
---
# {topic.replace("-", " ").title()}
**Date:** {date_str}
**Agent:** Kimi Code
**Session ID:** `{session_id}`
## Session Goal
{summary["first_user_message"] or "*(no goal extracted)*"}
## Summary
- **User messages:** {summary["user_message_count"]}
- **Assistant messages:** {summary["assistant_message_count"]}
- **Tool calls:** {summary["tool_call_count"]}
## Files Modified
{files_md}
## Tools Used
{tools_md}
## Key Transcript
{transcript_md}
## Final Outcome
{summary["last_assistant_message"] or "*(no final message)*"}
---
*Auto-archived from Kimi Code session `{session_id}`*
"""
return md
def main():
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <session-id> [topic]")
print(f"Example: {sys.argv[0]} 4b234c03673220f26266132c420581d3 hermes-mcp-fix")
sys.exit(1)
session_id = sys.argv[1]
topic = sys.argv[2] if len(sys.argv) > 2 else "session-archive"
session_dir = KIMI_SESSIONS_DIR / session_id
if not session_dir.exists():
print(f"Error: session directory not found: {session_dir}")
sys.exit(1)
# Find context.jsonl files (there may be multiple sub-sessions)
context_files = list(session_dir.rglob("context.jsonl"))
if not context_files:
print(f"Error: no context.jsonl found under {session_dir}")
sys.exit(1)
# Use the largest/most recent context.jsonl
context_files.sort(key=lambda p: p.stat().st_mtime, reverse=True)
context_path = context_files[0]
messages = parse_context_jsonl(context_path)
summary = summarize_session(messages)
markdown = build_markdown(session_id, topic, summary, messages)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
date_str = datetime.now().strftime("%Y-%m-%d")
output_path = OUTPUT_DIR / f"{date_str}-{topic}.md"
# Avoid overwriting
counter = 1
original_output_path = output_path
while output_path.exists():
output_path = original_output_path.with_suffix(f"-{counter}.md")
counter += 1
with open(output_path, "w", encoding="utf-8") as f:
f.write(markdown)
print(f"Archived to: {output_path}")
print(f" - User messages: {summary['user_message_count']}")
print(f" - Tool calls: {summary['tool_call_count']}")
print(f" - Files modified: {len(summary['files_modified'])}")
if __name__ == "__main__":
main()