Not sure which LLM to use? Want to compare clustering algorithms? This guide shows how to benchmark different configurations to find what works best for your use case.Documentation Index
Fetch the complete documentation index at: https://mintlify.com/jxnl/kura/llms.txt
Use this file to discover all available pages before exploring further.
Overview
You can compare:- LLM Models: GPT-4 vs GPT-4o-mini vs Claude vs Gemini
- Clustering Methods: K-means vs MiniBatch K-means vs HDBSCAN
- Checkpoint Formats: JSONL vs Parquet vs HuggingFace Datasets
- Concurrency Levels: Impact of parallel requests
Comparing LLM Models
Different models have different strengths:compare_llms.py
import asyncio
import time
from rich.console import Console
from rich.table import Table
from kura.types import Conversation
from kura.summarisation import SummaryModel, summarise_conversations
from kura.checkpoints import JSONLCheckpointManager
async def benchmark_model(model_name: str, conversations: list[Conversation]) -> dict:
"""Benchmark a single model."""
console = Console()
summary_model = SummaryModel(
model=model_name,
console=None, # Disable progress for cleaner output
)
# Use separate checkpoint dir per model
checkpoint_manager = JSONLCheckpointManager(
f"./checkpoints/{model_name.replace('/', '_')}",
enabled=True,
)
start_time = time.time()
summaries = await summarise_conversations(
conversations,
model=summary_model,
checkpoint_manager=checkpoint_manager,
)
elapsed = time.time() - start_time
# Analyze quality metrics
avg_frustration = sum(s.user_frustration or 0 for s in summaries) / len(summaries)
avg_concerning = sum(s.concerning_score or 0 for s in summaries) / len(summaries)
# Count conversations with errors
convs_with_errors = sum(
1 for s in summaries if s.assistant_errors and len(s.assistant_errors) > 0
)
return {
"model": model_name,
"time": elapsed,
"throughput": len(conversations) / elapsed,
"avg_frustration": avg_frustration,
"avg_concerning": avg_concerning,
"convs_with_errors": convs_with_errors,
"summaries": summaries,
}
async def main():
console = Console()
# Load test dataset
console.print("[bold]Loading conversations...[/bold]")
conversations = Conversation.from_hf_dataset(
"ivanleomk/synthetic-gemini-conversations",
split="train",
)
console.print(f"✓ Loaded {len(conversations)} conversations\n")
# Models to compare
models = [
"openai/gpt-4o-mini", # Fast, cheap
"openai/gpt-4o", # Balanced
"anthropic/claude-3-haiku", # Fast Claude
"gemini/gemini-2.0-flash", # Google's model
]
console.print("[bold blue]Benchmarking models...[/bold blue]\n")
results = []
for model_name in models:
console.print(f"Testing {model_name}...")
try:
result = await benchmark_model(model_name, conversations)
results.append(result)
console.print(f"✓ {model_name}: {result['time']:.2f}s\n")
except Exception as e:
console.print(f"✗ {model_name}: {e}\n")
# Create comparison table
table = Table(title="LLM Model Comparison")
table.add_column("Model", style="bold")
table.add_column("Time", justify="right")
table.add_column("Throughput", justify="right")
table.add_column("Avg Frustration", justify="right")
table.add_column("Avg Concerning", justify="right")
table.add_column("Errors Detected", justify="right")
for result in results:
table.add_row(
result["model"],
f"{result['time']:.2f}s",
f"{result['throughput']:.1f} conv/s",
f"{result['avg_frustration']:.2f}/5",
f"{result['avg_concerning']:.2f}/5",
f"{result['convs_with_errors']}/{len(conversations)}",
)
console.print("\n")
console.print(table)
# Find best model for each metric
fastest = min(results, key=lambda x: x["time"])
most_detailed = max(results, key=lambda x: x["convs_with_errors"])
console.print(f"\n[bold green]Recommendations:[/bold green]")
console.print(f" Fastest: {fastest['model']} ({fastest['time']:.2f}s)")
console.print(
f" Most detailed: {most_detailed['model']} ({most_detailed['convs_with_errors']} errors detected)"
)
if __name__ == "__main__":
asyncio.run(main())
Expected Output
LLM Model Comparison
┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┓
┃ Model ┃ Time ┃ Throughput ┃ Avg Frustration┃ Avg Concerning ┃ Errors Detected┃
┡━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━┩
│ openai/gpt-4o-mini │ 18.34s │ 10.4 conv/s│ 2.1/5 │ 1.8/5 │ 45/190 │
│ openai/gpt-4o │ 25.67s │ 7.4 conv/s│ 2.3/5 │ 1.9/5 │ 58/190 │
│ anthropic/claude-3-haiku│ 21.45s │ 8.9 conv/s│ 2.2/5 │ 1.9/5 │ 52/190 │
│ gemini/gemini-2.0-flash│ 16.89s │ 11.3 conv/s│ 2.0/5 │ 1.7/5 │ 43/190 │
└────────────────────────┴────────┴────────────┴────────────────┴────────────────┴────────────────┘
Recommendations:
Fastest: gemini/gemini-2.0-flash (16.89s)
Most detailed: openai/gpt-4o (58 errors detected)
- Speed
- Quality
- Cost
Fastest Models:
- Gemini 2.0 Flash: 11.3 conv/s
- GPT-4o-mini: 10.4 conv/s
- Claude 3 Haiku: 8.9 conv/s
Most Detailed Models:
- GPT-4o: Detects 30% more errors
- Claude 3 Haiku: Balanced quality
- GPT-4o-mini: Good but less nuanced
Most Cost-Effective (per 1M tokens):
- Gemini 2.0 Flash: $0.075
- GPT-4o-mini: $0.15
- Claude 3 Haiku: $0.25
- GPT-4o: $5.00
Comparing Clustering Methods
compare_clustering.py
import asyncio
import time
from rich.console import Console
from rich.table import Table
from kura.types import Conversation
from kura.summarisation import SummaryModel, summarise_conversations
from kura.k_means import MiniBatchKmeansClusteringMethod
from kura.hdbscan import HDBSCANClusteringMethod
from kura.cluster import (
ClusterDescriptionModel,
generate_base_clusters_from_conversation_summaries,
)
from kura.checkpoints import JSONLCheckpointManager
async def benchmark_clustering(method_name: str, clustering_method, summaries: list) -> dict:
"""Benchmark a clustering method."""
cluster_model = ClusterDescriptionModel(
clustering_method=clustering_method,
console=None,
)
checkpoint_manager = JSONLCheckpointManager(
f"./checkpoints/clustering_{method_name}",
enabled=True,
)
start_time = time.time()
clusters = await generate_base_clusters_from_conversation_summaries(
summaries,
model=cluster_model,
checkpoint_manager=checkpoint_manager,
)
elapsed = time.time() - start_time
# Calculate cluster quality metrics
avg_cluster_size = len(summaries) / len(clusters) if clusters else 0
largest_cluster = max(len(c.conversation_ids) for c in clusters) if clusters else 0
smallest_cluster = min(len(c.conversation_ids) for c in clusters) if clusters else 0
return {
"method": method_name,
"time": elapsed,
"num_clusters": len(clusters),
"avg_cluster_size": avg_cluster_size,
"largest_cluster": largest_cluster,
"smallest_cluster": smallest_cluster,
}
async def main():
console = Console()
# Load and summarize conversations (shared across all clustering methods)
console.print("[bold]Loading conversations...[/bold]")
conversations = Conversation.from_hf_dataset(
"ivanleomk/synthetic-gemini-conversations",
split="train",
)
console.print("[bold]Generating summaries...[/bold]")
summary_model = SummaryModel(console=None)
checkpoint_manager = JSONLCheckpointManager("./checkpoints/shared", enabled=True)
summaries = await summarise_conversations(
conversations,
model=summary_model,
checkpoint_manager=checkpoint_manager,
)
console.print(f"✓ Generated {len(summaries)} summaries\n")
# Clustering methods to compare
methods = [
(
"MiniBatch K-means (small batches)",
MiniBatchKmeansClusteringMethod(
clusters_per_group=10,
batch_size=100,
max_iter=50,
),
),
(
"MiniBatch K-means (large batches)",
MiniBatchKmeansClusteringMethod(
clusters_per_group=10,
batch_size=1000,
max_iter=100,
),
),
(
"HDBSCAN",
HDBSCANClusteringMethod(
min_cluster_size=5,
min_samples=3,
),
),
]
console.print("[bold blue]Benchmarking clustering methods...[/bold blue]\n")
results = []
for method_name, clustering_method in methods:
console.print(f"Testing {method_name}...")
try:
result = await benchmark_clustering(method_name, clustering_method, summaries)
results.append(result)
console.print(f"✓ {method_name}: {result['num_clusters']} clusters in {result['time']:.2f}s\n")
except Exception as e:
console.print(f"✗ {method_name}: {e}\n")
# Create comparison table
table = Table(title="Clustering Method Comparison")
table.add_column("Method", style="bold")
table.add_column("Time", justify="right")
table.add_column("Clusters", justify="right")
table.add_column("Avg Size", justify="right")
table.add_column("Min Size", justify="right")
table.add_column("Max Size", justify="right")
for result in results:
table.add_row(
result["method"],
f"{result['time']:.2f}s",
str(result["num_clusters"]),
f"{result['avg_cluster_size']:.1f}",
str(result["smallest_cluster"]),
str(result["largest_cluster"]),
)
console.print("\n")
console.print(table)
if __name__ == "__main__":
asyncio.run(main())
Expected Output
Clustering Method Comparison
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┓
┃ Method ┃ Time ┃ Clusters ┃ Avg Size ┃ Min Size┃ Max Size┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━┩
│ MiniBatch K-means (small) │ 2.34s │ 19 │ 10.0 │ 8 │ 12 │
│ MiniBatch K-means (large) │ 1.87s │ 19 │ 10.0 │ 8 │ 12 │
│ HDBSCAN │ 3.45s │ 24 │ 7.9 │ 5 │ 15 │
└─────────────────────────────────┴────────┴──────────┴──────────┴─────────┴─────────┘
| Method | Speed | Cluster Balance | Outlier Handling | Best For |
|---|---|---|---|---|
| MiniBatch K-means | ⚡⚡⚡ | ✅ Even sizes | ❌ Forces assignment | Large datasets, even distribution |
| K-means | ⚡⚡ | ✅ Even sizes | ❌ Forces assignment | Smaller datasets, predictable |
| HDBSCAN | ⚡ | ⚠️ Variable | ✅ Creates noise cluster | Exploratory, natural groupings |
Comparing Checkpoint Formats
compare_checkpoints.py
import asyncio
import time
from pathlib import Path
from rich.console import Console
from rich.table import Table
from kura.types import Conversation
from kura.summarisation import SummaryModel, summarise_conversations
from kura.checkpoints import (
JSONLCheckpointManager,
ParquetCheckpointManager,
HFDatasetCheckpointManager,
)
def get_directory_size(directory: Path) -> int:
"""Get total size of directory in bytes."""
return sum(f.stat().st_size for f in directory.rglob("*") if f.is_file())
def format_size(size_bytes: int) -> str:
"""Format bytes as human-readable string."""
if size_bytes == 0:
return "0 B"
units = ["B", "KB", "MB", "GB"]
unit_index = 0
size = float(size_bytes)
while size >= 1024 and unit_index < len(units) - 1:
size /= 1024
unit_index += 1
return f"{size:.2f} {units[unit_index]}"
async def benchmark_checkpoint_format(manager_type: str, manager, conversations: list) -> dict:
"""Benchmark a checkpoint format."""
summary_model = SummaryModel(console=None)
# Save time
start_time = time.time()
summaries = await summarise_conversations(
conversations,
model=summary_model,
checkpoint_manager=manager,
)
save_time = time.time() - start_time
# Get file size
checkpoint_dir = Path(manager.checkpoint_dir)
size_bytes = get_directory_size(checkpoint_dir)
# Load time
from kura.types import ConversationSummary
start_time = time.time()
loaded = manager.load_checkpoint("summaries", ConversationSummary)
load_time = time.time() - start_time
return {
"format": manager_type,
"save_time": save_time,
"load_time": load_time,
"size_bytes": size_bytes,
"size_formatted": format_size(size_bytes),
}
async def main():
console = Console()
# Load conversations
console.print("[bold]Loading conversations...[/bold]")
conversations = Conversation.from_hf_dataset(
"ivanleomk/synthetic-gemini-conversations",
split="train",
)
console.print(f"✓ Loaded {len(conversations)} conversations\n")
# Checkpoint formats to compare
managers = [
("JSONL", JSONLCheckpointManager("./checkpoints/jsonl", enabled=True)),
("Parquet", ParquetCheckpointManager("./checkpoints/parquet", enabled=True)),
("HF Datasets", HFDatasetCheckpointManager("./checkpoints/hf", enabled=True)),
]
console.print("[bold blue]Benchmarking checkpoint formats...[/bold blue]\n")
results = []
for manager_type, manager in managers:
console.print(f"Testing {manager_type}...")
try:
result = await benchmark_checkpoint_format(manager_type, manager, conversations)
results.append(result)
console.print(
f"✓ {manager_type}: {result['size_formatted']} in {result['save_time']:.2f}s\n"
)
except Exception as e:
console.print(f"✗ {manager_type}: {e}\n")
# Create comparison table
table = Table(title="Checkpoint Format Comparison")
table.add_column("Format", style="bold")
table.add_column("Save Time", justify="right")
table.add_column("Load Time", justify="right")
table.add_column("Size", justify="right")
table.add_column("Savings", justify="right")
baseline_size = results[0]["size_bytes"] # JSONL baseline
for result in results:
savings = (baseline_size - result["size_bytes"]) / baseline_size * 100
table.add_row(
result["format"],
f"{result['save_time']:.2f}s",
f"{result['load_time']:.2f}s",
result["size_formatted"],
f"{savings:.1f}%" if savings > 0 else "-",
)
console.print("\n")
console.print(table)
# Recommendations
fastest_save = min(results, key=lambda x: x["save_time"])
fastest_load = min(results, key=lambda x: x["load_time"])
smallest = min(results, key=lambda x: x["size_bytes"])
console.print(f"\n[bold green]Recommendations:[/bold green]")
console.print(f" Fastest save: {fastest_save['format']}")
console.print(f" Fastest load: {fastest_load['format']}")
console.print(f" Smallest size: {smallest['format']} ({smallest['size_formatted']})")
if __name__ == "__main__":
asyncio.run(main())
Expected Output
Checkpoint Format Comparison
┏━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━┓
┃ Format ┃ Save Time ┃ Load Time ┃ Size ┃ Savings ┃
┡━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━┩
│ JSONL │ 18.45s │ 0.12s │ 4.20 MB │ - │
│ Parquet │ 18.67s │ 0.08s │ 2.10 MB │ 50.0% │
│ HF Datasets│ 19.23s │ 0.15s │ 2.35 MB │ 44.0% │
└────────────┴───────────┴───────────┴─────────┴─────────┘
Recommendations:
Fastest save: JSONL
Fastest load: Parquet
Smallest size: Parquet (2.10 MB)
Comparing Concurrency Levels
compare_concurrency.py
import asyncio
import time
from rich.console import Console
from rich.table import Table
from kura.types import Conversation
from kura.summarisation import SummaryModel, summarise_conversations
async def benchmark_concurrency(max_concurrent: int, conversations: list) -> dict:
"""Benchmark a concurrency level."""
summary_model = SummaryModel(
max_concurrent_requests=max_concurrent,
console=None,
)
start_time = time.time()
summaries = await summarise_conversations(
conversations,
model=summary_model,
)
elapsed = time.time() - start_time
throughput = len(conversations) / elapsed
return {
"concurrency": max_concurrent,
"time": elapsed,
"throughput": throughput,
}
async def main():
console = Console()
# Load conversations
conversations = Conversation.from_hf_dataset(
"ivanleomk/synthetic-gemini-conversations",
split="train",
)
# Concurrency levels to test
concurrency_levels = [10, 25, 50, 100, 200]
console.print("[bold blue]Benchmarking concurrency levels...[/bold blue]\n")
results = []
for level in concurrency_levels:
console.print(f"Testing concurrency={level}...")
result = await benchmark_concurrency(level, conversations)
results.append(result)
console.print(
f"✓ Concurrency {level}: {result['throughput']:.1f} conv/s\n"
)
# Create table
table = Table(title="Concurrency Comparison")
table.add_column("Concurrency", justify="right")
table.add_column("Time", justify="right")
table.add_column("Throughput", justify="right")
table.add_column("Speedup", justify="right")
baseline_time = results[0]["time"]
for result in results:
speedup = baseline_time / result["time"]
table.add_row(
str(result["concurrency"]),
f"{result['time']:.2f}s",
f"{result['throughput']:.1f} conv/s",
f"{speedup:.2f}x",
)
console.print("\n")
console.print(table)
if __name__ == "__main__":
asyncio.run(main())
Summary: Choosing the Right Configuration
Speed Priority
- Model: Gemini 2.0 Flash
- Clustering: MiniBatch K-means (large batches)
- Checkpoints: Parquet
- Concurrency: 100-200
Quality Priority
- Model: GPT-4o or Claude 3 Opus
- Clustering: HDBSCAN
- Checkpoints: HF Datasets (for versioning)
- Concurrency: 25-50 (to avoid rate limits)
Cost Priority
- Model: Gemini 2.0 Flash or GPT-4o-mini
- Clustering: MiniBatch K-means
- Checkpoints: Parquet (smaller storage)
- Concurrency: 50-100
Balanced
- Model: GPT-4o-mini or Claude 3 Haiku
- Clustering: MiniBatch K-means
- Checkpoints: Multi (all formats)
- Concurrency: 50-100
Next Steps
Production Setup
Deploy Kura in production with monitoring
Custom Models
Integrate your own LLM or clustering algorithm
Optimization
Advanced performance tuning strategies
Web Interface
Visualize comparisons in the web UI