Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/jxnl/kura/llms.txt

Use this file to discover all available pages before exploring further.

Kura supports caching to avoid recomputing expensive operations like summarization. This guide shows you how to use caching effectively to improve performance and reduce API costs.

Why Use Caching?

Summarizing conversations with LLMs is often the most expensive part of the pipeline in terms of:
  • API costs: Each conversation summary costs money
  • Time: LLM calls can take seconds per conversation
  • Rate limits: APIs have request limits
Caching allows you to:
  • Reuse summaries when running the pipeline multiple times
  • Resume from failures without reprocessing
  • Experiment with different clustering parameters without re-summarizing
  • Reduce costs by 90%+ on subsequent runs

Cache Strategy Base Class

Kura defines a simple interface for cache strategies (from kura/base_classes/cache.py:5-16):
from abc import ABC, abstractmethod
from typing import Any, Optional

class CacheStrategy(ABC):
    """Abstract base class for caching strategies."""
    
    @abstractmethod
    def get(self, key: str) -> Optional[Any]:
        """Retrieve a value from the cache by key."""
        raise NotImplementedError("Subclasses must implement get method")
    
    @abstractmethod
    def set(self, key: str, value: Any) -> None:
        """Store a value in the cache with the given key."""
        raise NotImplementedError("Subclasses must implement set method")

Using Disk Cache

Kura provides a disk-based cache implementation using diskcache.

Basic Setup

from kura.cache import DiskCacheStrategy
from kura.summarisation import SummaryModel

# Create cache directory
cache_strategy = DiskCacheStrategy(cache_dir="./cache")

# Initialize summary model with cache
summary_model = SummaryModel(
    model="openai/gpt-4o-mini",
    cache=cache_strategy
)

How It Works

The cache key is generated from:
  • Conversation messages (role and content)
  • Response schema
  • Prompt (hashed)
  • Temperature
  • Model name
From kura/summarisation.py:191-212:
def _get_cache_key(
    self,
    conversation: Conversation,
    response_schema: Type[T],
    prompt: str,
    temperature: float,
    **kwargs,
) -> str:
    """Generate a cache key from conversation messages and parameters."""
    # Create role-content pairs for each message
    message_data = [(msg.role, msg.content) for msg in conversation.messages]

    # Include all parameters that affect the output
    cache_components = (
        tuple(message_data),
        response_schema.__name__,
        hashlib.md5(prompt.encode()).hexdigest(),
        temperature,
        self.model,
    )

    return hashlib.md5(str(cache_components).encode()).hexdigest()

DiskCacheStrategy Implementation

Here’s the complete implementation (from kura/cache.py:8-27):
import os
import diskcache
from kura.base_classes.cache import CacheStrategy
from typing import Any, Optional

class DiskCacheStrategy(CacheStrategy):
    """Disk-based caching strategy using diskcache."""
    
    def __init__(self, cache_dir: str):
        """
        Initialize disk cache strategy.
        
        Args:
            cache_dir: Directory path for cache storage
        """
        os.makedirs(cache_dir, exist_ok=True)
        self.cache = diskcache.Cache(cache_dir)
    
    def get(self, key: str) -> Optional[Any]:
        """Retrieve a value from the disk cache."""
        return self.cache.get(key)
    
    def set(self, key: str, value: Any) -> None:
        """Store a value in the disk cache."""
        self.cache.set(key, value)

Using Cache in Your Pipeline

1
Create Cache Strategy
2
from kura.cache import DiskCacheStrategy

cache_strategy = DiskCacheStrategy("./cache")
3
Initialize Model with Cache
4
from kura.summarisation import SummaryModel

summary_model = SummaryModel(
    model="openai/gpt-4o-mini",
    max_concurrent_requests=50,
    cache=cache_strategy  # Enable caching
)
5
Run Pipeline
6
from kura.v1 import summarise_conversations, CheckpointManager
from kura.types import Conversation

# Load conversations
conversations = Conversation.from_hf_dataset(
    "ivanleomk/synthetic-gemini-conversations",
    max_conversations=1000
)

# Summarize with caching
checkpoint_mgr = CheckpointManager("./checkpoints")
summaries = await summarise_conversations(
    conversations,
    model=summary_model,
    checkpoint_manager=checkpoint_mgr
)
7
Verify Cache Usage
8
On the first run, you’ll see:
9
INFO: Starting summarization of 1000 conversations
INFO: Generated 1000 raw summaries
10
On subsequent runs with the same data:
11
INFO: Starting summarization of 1000 conversations
DEBUG: Found cached summary for conversation abc123
DEBUG: Found cached summary for conversation def456
...
INFO: Generated 1000 raw summaries  # Much faster!

Cache Invalidation

The cache is automatically invalidated when:
  • Conversation content changes
  • Prompt changes
  • Temperature changes
  • Response schema changes
  • Model changes
This means you can safely experiment with different parameters without worrying about stale cache entries.

Custom Cache Strategies

You can implement custom cache strategies for different backends.

Redis Cache Example

from kura.base_classes.cache import CacheStrategy
import redis
import pickle
from typing import Any, Optional

class RedisCacheStrategy(CacheStrategy):
    """Redis-based caching strategy for distributed systems."""
    
    def __init__(self, host: str = "localhost", port: int = 6379, db: int = 0):
        self.redis_client = redis.Redis(host=host, port=port, db=db)
    
    def get(self, key: str) -> Optional[Any]:
        """Retrieve a value from Redis cache."""
        value = self.redis_client.get(key)
        if value is None:
            return None
        return pickle.loads(value)
    
    def set(self, key: str, value: Any) -> None:
        """Store a value in Redis cache."""
        serialized = pickle.dumps(value)
        # Store with 7-day expiration
        self.redis_client.setex(key, 604800, serialized)
Usage:
redis_cache = RedisCacheStrategy(host="localhost", port=6379)
summary_model = SummaryModel(cache=redis_cache)

Memory Cache Example

For testing or small datasets:
from kura.base_classes.cache import CacheStrategy
from typing import Any, Optional

class MemoryCacheStrategy(CacheStrategy):
    """In-memory caching strategy (not persistent)."""
    
    def __init__(self):
        self._cache = {}
    
    def get(self, key: str) -> Optional[Any]:
        return self._cache.get(key)
    
    def set(self, key: str, value: Any) -> None:
        self._cache[key] = value
    
    def clear(self):
        """Clear all cached items."""
        self._cache.clear()

S3 Cache Example

For cloud storage:
from kura.base_classes.cache import CacheStrategy
import boto3
import pickle
import hashlib
from typing import Any, Optional

class S3CacheStrategy(CacheStrategy):
    """S3-based caching strategy for cloud deployments."""
    
    def __init__(self, bucket_name: str, prefix: str = "kura-cache/"):
        self.s3_client = boto3.client('s3')
        self.bucket_name = bucket_name
        self.prefix = prefix
    
    def _get_s3_key(self, key: str) -> str:
        """Convert cache key to S3 object key."""
        return f"{self.prefix}{key}"
    
    def get(self, key: str) -> Optional[Any]:
        """Retrieve a value from S3 cache."""
        try:
            s3_key = self._get_s3_key(key)
            response = self.s3_client.get_object(
                Bucket=self.bucket_name,
                Key=s3_key
            )
            data = response['Body'].read()
            return pickle.loads(data)
        except self.s3_client.exceptions.NoSuchKey:
            return None
    
    def set(self, key: str, value: Any) -> None:
        """Store a value in S3 cache."""
        s3_key = self._get_s3_key(key)
        serialized = pickle.dumps(value)
        self.s3_client.put_object(
            Bucket=self.bucket_name,
            Key=s3_key,
            Body=serialized
        )

Cache vs Checkpoints

Understand the difference between caching and checkpointing:
FeatureCacheCheckpoints
PurposeAvoid recomputing individual summariesSave pipeline stage outputs
GranularityPer conversationPer stage (all summaries, all clusters)
InvalidationAutomatic on parameter changeManual (delete file)
Location./cache/./checkpoints/
Use caseResuming failed runs, experimentingPersisting results, sharing outputs
Best practice: Use both together:
from kura.cache import DiskCacheStrategy
from kura.v1 import CheckpointManager

# Cache individual summaries
cache = DiskCacheStrategy("./cache")
summary_model = SummaryModel(cache=cache)

# Checkpoint stage outputs
checkpoint_mgr = CheckpointManager("./checkpoints")

# Run pipeline with both
summaries = await summarise_conversations(
    conversations,
    model=summary_model,
    checkpoint_manager=checkpoint_mgr
)

Performance Impact

Caching can dramatically improve performance:

First Run (No Cache)

1000 conversations @ 2s each = 33 minutes
Cost: $10 in API calls

Second Run (With Cache)

1000 conversations @ 0.001s each = 1 second
Cost: $0

Partial Cache Hit

1000 conversations:
- 800 cached: 0.8 seconds
- 200 new: 400 seconds
Total: ~7 minutes (79% faster)
Cost: $2 (80% savings)

Managing Cache Size

The disk cache can grow large over time. Monitor and manage it:

Check Cache Size

from kura.cache import DiskCacheStrategy

cache = DiskCacheStrategy("./cache")
print(f"Cache size: {cache.cache.volume()} bytes")
print(f"Cache items: {len(cache.cache)}")

Clear Cache

# Clear all cache entries
cache.cache.clear()

# Or delete specific entries
for key in cache.cache.iterkeys():
    if some_condition(key):
        del cache.cache[key]

Set Cache Limits

import diskcache

# Limit cache to 1GB
cache = diskcache.Cache(
    "./cache",
    size_limit=1024**3  # 1GB in bytes
)

Best Practices

1
Use Separate Cache Directories
2
For different experiments or models:
3
cache_v1 = DiskCacheStrategy("./cache/experiment-1")
cache_v2 = DiskCacheStrategy("./cache/experiment-2")
4
Cache Only Expensive Operations
5
Don’t cache everything - focus on expensive operations:
6
  • ✅ LLM summarization calls
  • ✅ Embedding generation (if using paid APIs)
  • ❌ Clustering (fast and cheap)
  • ❌ Dimensionality reduction (fast)
  • 7
    Monitor Cache Hit Rate
    8
    Track how often cache is used:
    9
    import logging
    
    logging.basicConfig(level=logging.DEBUG)
    # Look for "Found cached summary" messages
    
    10
    Clear Cache When Prompts Change
    11
    If you modify your summarization prompt:
    12
    cache.cache.clear()
    print("Cache cleared after prompt change")
    

    Troubleshooting

    Cache Not Working

    If summaries aren’t being cached:
    1. Verify cache is passed to model:
    print(summary_model.cache)  # Should not be None
    
    1. Check logging:
    import logging
    logging.basicConfig(level=logging.DEBUG)
    # Look for cache-related messages
    
    1. Ensure cache directory is writable:
    ls -la ./cache
    

    Cache Permission Errors

    # Fix cache directory permissions
    chmod -R 755 ./cache
    

    Cache Corruption

    If you encounter cache errors:
    # Delete and recreate cache
    import shutil
    shutil.rmtree("./cache")
    cache = DiskCacheStrategy("./cache")
    

    Next Steps