#!/usr/bin/env python3 """ kimi-session-to-obsidian.py Parse a Kimi Code session JSONL and convert it to a markdown note in the Obsidian vault under Kimi Conversations/. Usage: python3 kimi-session-to-obsidian.py [topic] Example: python3 kimi-session-to-obsidian.py 4b234c03673220f26266132c420581d3 hermes-mcp-oauth-fix The script reads from ~/.kimi/sessions//*context.jsonl and writes to ~/obsidian/vaults/Kimi Conversations/YYYY-MM-DD-.md """ import json import os import sys from datetime import datetime from pathlib import Path KIMI_SESSIONS_DIR = Path.home() / ".kimi" / "sessions" VAULT_DIR = Path.home() / "obsidian" / "vaults" OUTPUT_DIR = VAULT_DIR / "Kimi Conversations" def extract_text_content(content) -> str: """Extract human-readable text from Kimi Code's structured content format.""" if isinstance(content, list): texts = [] for item in content: if isinstance(item, dict) and item.get("type") == "text": texts.append(str(item.get("text", ""))) elif isinstance(item, dict) and item.get("type") == "think": # Skip think blocks continue elif isinstance(item, str): texts.append(item) return " ".join(texts) return str(content) if content else "" def parse_context_jsonl(path: Path) -> list[dict]: messages = [] with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) except json.JSONDecodeError: continue # Normalize content to plain text immediately if "content" in obj: obj["content_plain"] = extract_text_content(obj["content"]) messages.append(obj) return messages def summarize_session(messages: list[dict]) -> dict: """Extract summary metadata from a Kimi Code session.""" user_msgs = [] assistant_msgs = [] tool_calls = [] files_modified = set() project_hint = "" for m in messages: role = m.get("role", "") content = m.get("content_plain", "") if role == "user" and content: user_msgs.append(content) # Try to detect project from first user message if not project_hint and len(content) < 200: words = content.split() for w in words: w = w.strip(".,;:!?\"/") if "/" in w and not w.startswith("{"): project_hint = w.split("/")[-1] if "/" in w else w break elif "." in w and not w.startswith("{") and not w.startswith("http"): project_hint = w.split(".")[0] break elif role == "assistant": assistant_msgs.append(content) for tc in m.get("tool_calls", []): fn = tc.get("function", {}) name = fn.get("name", "") args = fn.get("arguments", "") tool_calls.append({"name": name, "arguments": args}) # Detect filesystem writes if name in ("WriteFile", "StrReplaceFile") and isinstance(args, str): try: a = json.loads(args) p = a.get("path", "") if p: files_modified.add(p) except Exception: pass elif name == "Shell": try: a = json.loads(args) cmd = a.get("command", "") # git commits, docker builds, etc. if cmd: tool_calls[-1]["command_preview"] = cmd[:200] except Exception: pass return { "user_message_count": len(user_msgs), "assistant_message_count": len(assistant_msgs), "tool_call_count": len(tool_calls), "first_user_message": user_msgs[0] if user_msgs else "", "last_assistant_message": assistant_msgs[-1] if assistant_msgs else "", "files_modified": sorted(files_modified), "tool_names_used": sorted({t["name"] for t in tool_calls}), "project_hint": project_hint, } def build_markdown(session_id: str, topic: str, summary: dict, messages: list[dict]) -> str: date_str = datetime.now().strftime("%Y-%m-%d") # Build files section files_md = "\n".join(f"- `{f}`" for f in summary["files_modified"]) if summary["files_modified"] else "- *(none detected)*" # Build tools section tools_md = "\n".join(f"- `{t}`" for t in summary["tool_names_used"]) if summary["tool_names_used"] else "- *(none detected)*" # Build a lightweight transcript of key exchanges transcript = [] for m in messages: role = m.get("role", "") content = m.get("content_plain", "") if role == "user" and content: transcript.append(f"> **User:** {content[:300]}{'...' if len(content) > 300 else ''}\n") elif role == "assistant" and content and not content.startswith("[Tool:"): transcript.append(f"> **Kimi:** {content[:300]}{'...' if len(content) > 300 else ''}\n") if len(transcript) >= 20: # Cap to keep file size reasonable transcript.append("> *(transcript truncated — full session in ~/.kimi/sessions/)*\n") break transcript_md = "\n".join(transcript) if transcript else "- *(no transcript extracted)*" md = f"""--- title: {topic.replace("-", " ").title()} date: {date_str} agent: Kimi Code session_id: {session_id} tags: - kimi-code - session-archive project: {summary["project_hint"] or "unknown"} --- # {topic.replace("-", " ").title()} **Date:** {date_str} **Agent:** Kimi Code **Session ID:** `{session_id}` ## Session Goal {summary["first_user_message"] or "*(no goal extracted)*"} ## Summary - **User messages:** {summary["user_message_count"]} - **Assistant messages:** {summary["assistant_message_count"]} - **Tool calls:** {summary["tool_call_count"]} ## Files Modified {files_md} ## Tools Used {tools_md} ## Key Transcript {transcript_md} ## Final Outcome {summary["last_assistant_message"] or "*(no final message)*"} --- *Auto-archived from Kimi Code session `{session_id}`* """ return md def main(): if len(sys.argv) < 2: print(f"Usage: {sys.argv[0]} [topic]") print(f"Example: {sys.argv[0]} 4b234c03673220f26266132c420581d3 hermes-mcp-fix") sys.exit(1) session_id = sys.argv[1] topic = sys.argv[2] if len(sys.argv) > 2 else "session-archive" session_dir = KIMI_SESSIONS_DIR / session_id if not session_dir.exists(): print(f"Error: session directory not found: {session_dir}") sys.exit(1) # Find context.jsonl files (there may be multiple sub-sessions) context_files = list(session_dir.rglob("context.jsonl")) if not context_files: print(f"Error: no context.jsonl found under {session_dir}") sys.exit(1) # Use the largest/most recent context.jsonl context_files.sort(key=lambda p: p.stat().st_mtime, reverse=True) context_path = context_files[0] messages = parse_context_jsonl(context_path) summary = summarize_session(messages) markdown = build_markdown(session_id, topic, summary, messages) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) date_str = datetime.now().strftime("%Y-%m-%d") output_path = OUTPUT_DIR / f"{date_str}-{topic}.md" # Avoid overwriting counter = 1 original_output_path = output_path while output_path.exists(): output_path = original_output_path.with_suffix(f"-{counter}.md") counter += 1 with open(output_path, "w", encoding="utf-8") as f: f.write(markdown) print(f"Archived to: {output_path}") print(f" - User messages: {summary['user_message_count']}") print(f" - Tool calls: {summary['tool_call_count']}") print(f" - Files modified: {len(summary['files_modified'])}") if __name__ == "__main__": main()