251 lines
8.1 KiB
Python
Executable File
251 lines
8.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
kimi-session-to-obsidian.py
|
|
|
|
Parse a Kimi Code session JSONL and convert it to a markdown note
|
|
in the Obsidian vault under Kimi Conversations/.
|
|
|
|
Usage:
|
|
python3 kimi-session-to-obsidian.py <session-id> [topic]
|
|
|
|
Example:
|
|
python3 kimi-session-to-obsidian.py 4b234c03673220f26266132c420581d3 hermes-mcp-oauth-fix
|
|
|
|
The script reads from ~/.kimi/sessions/<session-id>/*context.jsonl
|
|
and writes to ~/obsidian/vaults/Kimi Conversations/YYYY-MM-DD-<topic>.md
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
KIMI_SESSIONS_DIR = Path.home() / ".kimi" / "sessions"
|
|
VAULT_DIR = Path.home() / "obsidian" / "vaults"
|
|
OUTPUT_DIR = VAULT_DIR / "Kimi Conversations"
|
|
|
|
|
|
def extract_text_content(content) -> str:
|
|
"""Extract human-readable text from Kimi Code's structured content format."""
|
|
if isinstance(content, list):
|
|
texts = []
|
|
for item in content:
|
|
if isinstance(item, dict) and item.get("type") == "text":
|
|
texts.append(str(item.get("text", "")))
|
|
elif isinstance(item, dict) and item.get("type") == "think":
|
|
# Skip think blocks
|
|
continue
|
|
elif isinstance(item, str):
|
|
texts.append(item)
|
|
return " ".join(texts)
|
|
return str(content) if content else ""
|
|
|
|
|
|
def parse_context_jsonl(path: Path) -> list[dict]:
|
|
messages = []
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
obj = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
# Normalize content to plain text immediately
|
|
if "content" in obj:
|
|
obj["content_plain"] = extract_text_content(obj["content"])
|
|
messages.append(obj)
|
|
return messages
|
|
|
|
|
|
def summarize_session(messages: list[dict]) -> dict:
|
|
"""Extract summary metadata from a Kimi Code session."""
|
|
user_msgs = []
|
|
assistant_msgs = []
|
|
tool_calls = []
|
|
files_modified = set()
|
|
project_hint = ""
|
|
|
|
for m in messages:
|
|
role = m.get("role", "")
|
|
content = m.get("content_plain", "")
|
|
|
|
if role == "user" and content:
|
|
user_msgs.append(content)
|
|
# Try to detect project from first user message
|
|
if not project_hint and len(content) < 200:
|
|
words = content.split()
|
|
for w in words:
|
|
w = w.strip(".,;:!?\"/")
|
|
if "/" in w and not w.startswith("{"):
|
|
project_hint = w.split("/")[-1] if "/" in w else w
|
|
break
|
|
elif "." in w and not w.startswith("{") and not w.startswith("http"):
|
|
project_hint = w.split(".")[0]
|
|
break
|
|
|
|
elif role == "assistant":
|
|
assistant_msgs.append(content)
|
|
for tc in m.get("tool_calls", []):
|
|
fn = tc.get("function", {})
|
|
name = fn.get("name", "")
|
|
args = fn.get("arguments", "")
|
|
tool_calls.append({"name": name, "arguments": args})
|
|
# Detect filesystem writes
|
|
if name in ("WriteFile", "StrReplaceFile") and isinstance(args, str):
|
|
try:
|
|
a = json.loads(args)
|
|
p = a.get("path", "")
|
|
if p:
|
|
files_modified.add(p)
|
|
except Exception:
|
|
pass
|
|
elif name == "Shell":
|
|
try:
|
|
a = json.loads(args)
|
|
cmd = a.get("command", "")
|
|
# git commits, docker builds, etc.
|
|
if cmd:
|
|
tool_calls[-1]["command_preview"] = cmd[:200]
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"user_message_count": len(user_msgs),
|
|
"assistant_message_count": len(assistant_msgs),
|
|
"tool_call_count": len(tool_calls),
|
|
"first_user_message": user_msgs[0] if user_msgs else "",
|
|
"last_assistant_message": assistant_msgs[-1] if assistant_msgs else "",
|
|
"files_modified": sorted(files_modified),
|
|
"tool_names_used": sorted({t["name"] for t in tool_calls}),
|
|
"project_hint": project_hint,
|
|
}
|
|
|
|
|
|
def build_markdown(session_id: str, topic: str, summary: dict, messages: list[dict]) -> str:
|
|
date_str = datetime.now().strftime("%Y-%m-%d")
|
|
|
|
# Build files section
|
|
files_md = "\n".join(f"- `{f}`" for f in summary["files_modified"]) if summary["files_modified"] else "- *(none detected)*"
|
|
|
|
# Build tools section
|
|
tools_md = "\n".join(f"- `{t}`" for t in summary["tool_names_used"]) if summary["tool_names_used"] else "- *(none detected)*"
|
|
|
|
# Build a lightweight transcript of key exchanges
|
|
transcript = []
|
|
for m in messages:
|
|
role = m.get("role", "")
|
|
content = m.get("content_plain", "")
|
|
if role == "user" and content:
|
|
transcript.append(f"> **User:** {content[:300]}{'...' if len(content) > 300 else ''}\n")
|
|
elif role == "assistant" and content and not content.startswith("[Tool:"):
|
|
transcript.append(f"> **Kimi:** {content[:300]}{'...' if len(content) > 300 else ''}\n")
|
|
if len(transcript) >= 20: # Cap to keep file size reasonable
|
|
transcript.append("> *(transcript truncated — full session in ~/.kimi/sessions/)*\n")
|
|
break
|
|
|
|
transcript_md = "\n".join(transcript) if transcript else "- *(no transcript extracted)*"
|
|
|
|
md = f"""---
|
|
title: {topic.replace("-", " ").title()}
|
|
date: {date_str}
|
|
agent: Kimi Code
|
|
session_id: {session_id}
|
|
tags:
|
|
- kimi-code
|
|
- session-archive
|
|
project: {summary["project_hint"] or "unknown"}
|
|
---
|
|
|
|
# {topic.replace("-", " ").title()}
|
|
|
|
**Date:** {date_str}
|
|
**Agent:** Kimi Code
|
|
**Session ID:** `{session_id}`
|
|
|
|
## Session Goal
|
|
|
|
{summary["first_user_message"] or "*(no goal extracted)*"}
|
|
|
|
## Summary
|
|
|
|
- **User messages:** {summary["user_message_count"]}
|
|
- **Assistant messages:** {summary["assistant_message_count"]}
|
|
- **Tool calls:** {summary["tool_call_count"]}
|
|
|
|
## Files Modified
|
|
|
|
{files_md}
|
|
|
|
## Tools Used
|
|
|
|
{tools_md}
|
|
|
|
## Key Transcript
|
|
|
|
{transcript_md}
|
|
|
|
## Final Outcome
|
|
|
|
{summary["last_assistant_message"] or "*(no final message)*"}
|
|
|
|
---
|
|
|
|
*Auto-archived from Kimi Code session `{session_id}`*
|
|
"""
|
|
return md
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print(f"Usage: {sys.argv[0]} <session-id> [topic]")
|
|
print(f"Example: {sys.argv[0]} 4b234c03673220f26266132c420581d3 hermes-mcp-fix")
|
|
sys.exit(1)
|
|
|
|
session_id = sys.argv[1]
|
|
topic = sys.argv[2] if len(sys.argv) > 2 else "session-archive"
|
|
|
|
session_dir = KIMI_SESSIONS_DIR / session_id
|
|
if not session_dir.exists():
|
|
print(f"Error: session directory not found: {session_dir}")
|
|
sys.exit(1)
|
|
|
|
# Find context.jsonl files (there may be multiple sub-sessions)
|
|
context_files = list(session_dir.rglob("context.jsonl"))
|
|
if not context_files:
|
|
print(f"Error: no context.jsonl found under {session_dir}")
|
|
sys.exit(1)
|
|
|
|
# Use the largest/most recent context.jsonl
|
|
context_files.sort(key=lambda p: p.stat().st_mtime, reverse=True)
|
|
context_path = context_files[0]
|
|
|
|
messages = parse_context_jsonl(context_path)
|
|
summary = summarize_session(messages)
|
|
markdown = build_markdown(session_id, topic, summary, messages)
|
|
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
date_str = datetime.now().strftime("%Y-%m-%d")
|
|
output_path = OUTPUT_DIR / f"{date_str}-{topic}.md"
|
|
|
|
# Avoid overwriting
|
|
counter = 1
|
|
original_output_path = output_path
|
|
while output_path.exists():
|
|
output_path = original_output_path.with_suffix(f"-{counter}.md")
|
|
counter += 1
|
|
|
|
with open(output_path, "w", encoding="utf-8") as f:
|
|
f.write(markdown)
|
|
|
|
print(f"Archived to: {output_path}")
|
|
print(f" - User messages: {summary['user_message_count']}")
|
|
print(f" - Tool calls: {summary['tool_call_count']}")
|
|
print(f" - Files modified: {len(summary['files_modified'])}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|