Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/jxnl/kura/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Kura’s checkpoint system saves intermediate pipeline results to disk, enabling:
  • Resume capability: Re-run analysis without repeating expensive steps
  • Iterative refinement: Adjust later stages without re-processing earlier ones
  • Cost savings: Avoid redundant LLM API calls
  • Scale: Process millions of conversations incrementally
Checkpoints are saved after each major stage:
  1. Summaries → summaries.jsonl (or other format)
  2. Base clusters → clusters.jsonl
  3. Meta-clusters → meta_clusters.jsonl
  4. Dimensionality → dimensionality.jsonl

Available Backends

Kura supports multiple checkpoint formats through the BaseCheckpointManager interface:
BackendFormatBest ForProsCons
JSONLLine-delimited JSONDefault, simpleHuman-readable, no dependenciesLarger files, slower
ParquetApache ParquetLarge datasets50% smaller, faster loadRequires pyarrow
HuggingFace DatasetsHF ArrowCloud storage, versioningStreaming, Hub integrationRequires datasets
SQLSQLite/PostgreSQLQueryable dataSQL queries, indexesRequires sqlmodel
MultiMultiple backendsRedundancyFallback, flexibilityMore disk space

JSONL Checkpoints (Default)

The traditional format, no dependencies required.

Usage

From kura/checkpoints/jsonl.py:19-114:
from kura.checkpoints import JSONLCheckpointManager

checkpoint_mgr = JSONLCheckpointManager(
    checkpoint_dir="./checkpoints",
    enabled=True
)

# Save checkpoint
checkpoint_mgr.save_checkpoint(
    filename="summaries",  # Creates summaries.jsonl
    data=summaries
)

# Load checkpoint
loaded = checkpoint_mgr.load_checkpoint(
    filename="summaries",
    model_class=ConversationSummary
)

File Format

Each line is a JSON-serialized Pydantic model:
{"chat_id": "conv_001", "summary": "User asks...", "concerning_score": 1, ...}
{"chat_id": "conv_002", "summary": "User requests...", "concerning_score": 2, ...}
{"chat_id": "conv_003", "summary": "User needs...", "concerning_score": 1, ...}

Advantages

  • Human-readable
  • No external dependencies
  • Simple to debug
  • Works everywhere

Disadvantages

  • Larger file sizes (no compression)
  • Slower loading for large datasets
  • No built-in querying

Parquet Checkpoints

Columnar format with compression, 50% smaller files.

Setup

pip install pyarrow

Usage

From kura/checkpoints/parquet.py:
from kura.checkpoints import ParquetCheckpointManager

checkpoint_mgr = ParquetCheckpointManager(
    checkpoint_dir="./checkpoints",
    compression="gzip"  # or "snappy", "brotli", "zstd"
)

# Same API as JSONL
checkpoint_mgr.save_checkpoint("summaries", summaries)
loaded = checkpoint_mgr.load_checkpoint("summaries", ConversationSummary)

File Format

Binary columnar format:
summaries.parquet
├── chat_id: ["conv_001", "conv_002", ...]
├── summary: ["User asks...", "User requests...", ...]
├── concerning_score: [1, 2, 1, ...]
└── ...

Compression Options

# Fast compression (default)
ParquetCheckpointManager(compression="snappy")

# Balanced compression
ParquetCheckpointManager(compression="gzip")

# Maximum compression
ParquetCheckpointManager(compression="zstd")

# No compression
ParquetCheckpointManager(compression=None)

Advantages

  • 50%+ smaller files
  • Faster loading (columnar format)
  • Better for analytics (tools like pandas, DuckDB)
  • Built-in compression

Disadvantages

  • Not human-readable
  • Requires pyarrow dependency
  • More complex debugging

HuggingFace Datasets Checkpoints

Advanced format with streaming, versioning, and cloud storage.

Setup

pip install datasets

Usage

From kura/checkpoints/hf_dataset.py:41-670:
from kura.checkpoints import HFDatasetCheckpointManager

checkpoint_mgr = HFDatasetCheckpointManager(
    checkpoint_dir="./checkpoints",
    hub_repo="my-username/kura-analysis",  # Optional: push to Hub
    hub_token="hf_...",  # Optional: for private repos
    streaming=False,  # Set True for datasets larger than RAM
    compression="gzip"
)

# Same API
checkpoint_mgr.save_checkpoint("summaries", summaries)
loaded = checkpoint_mgr.load_checkpoint("summaries", ConversationSummary)

HuggingFace Hub Integration

Automatically sync checkpoints to the cloud:
checkpoint_mgr = HFDatasetCheckpointManager(
    checkpoint_dir="./checkpoints",
    hub_repo="my-org/conversation-analysis",
    hub_token="hf_your_token"
)

# Saves locally AND pushes to Hub
checkpoint_mgr.save_checkpoint("summaries", summaries)

# Loads from Hub if available, otherwise local
loaded = checkpoint_mgr.load_checkpoint("summaries", ConversationSummary)

Streaming for Large Datasets

Process datasets larger than RAM:
checkpoint_mgr = HFDatasetCheckpointManager(
    checkpoint_dir="./checkpoints",
    streaming=True
)

# Data is not loaded into memory all at once
loaded = checkpoint_mgr.load_checkpoint(
    "summaries",
    ConversationSummary,
    streaming=True
)

# Process in chunks
for batch in batched(loaded, 1000):
    process_batch(batch)

Filtering Without Loading

Query checkpoints without loading everything:
# Get only high-concern conversations
high_concern = checkpoint_mgr.filter_checkpoint(
    filename="summaries",
    filter_fn=lambda x: x["concerning_score"] >= 4,
    model_class=ConversationSummary
)

print(f"Found {len(high_concern)} high-concern conversations")

Advantages

  • Streaming support (datasets > RAM)
  • Cloud storage (HuggingFace Hub)
  • Version control
  • Dataset cards and metadata
  • Memory-mapped files
  • Advanced filtering

Disadvantages

  • Requires datasets dependency
  • More complex setup
  • Larger on-disk format (but memory-efficient)

Multi-Backend Checkpoints

Use multiple backends simultaneously for redundancy:
from kura.checkpoints import (
    MultiCheckpointManager,
    JSONLCheckpointManager,
    ParquetCheckpointManager
)

checkpoint_mgr = MultiCheckpointManager(
    managers=[
        JSONLCheckpointManager("./checkpoints/jsonl"),
        ParquetCheckpointManager("./checkpoints/parquet")
    ]
)

# Saves to BOTH backends
checkpoint_mgr.save_checkpoint("summaries", summaries)

# Loads from the FIRST available backend (JSONL in this case)
loaded = checkpoint_mgr.load_checkpoint("summaries", ConversationSummary)

Use Cases

  • Backup: Save to both local and cloud
  • Migration: Transition from JSONL to Parquet
  • Performance: Load from fast backend, save to durable backend

Using Checkpoints in the Pipeline

Checkpoints are automatically used by pipeline functions:
from kura.summarisation import summarise_conversations
from kura.cluster import generate_base_clusters_from_conversation_summaries
from kura.meta_cluster import reduce_clusters_from_base_clusters
from kura.dimensionality import reduce_dimensionality_from_clusters
from kura.checkpoints import JSONLCheckpointManager

checkpoint_mgr = JSONLCheckpointManager("./checkpoints")

# First run: computes summaries and saves checkpoint
summaries = await summarise_conversations(
    conversations=conversations,
    model=summary_model,
    checkpoint_manager=checkpoint_mgr  # Add to every stage
)

# Second run: loads summaries from checkpoint (instant)
summaries = await summarise_conversations(
    conversations=conversations,
    model=summary_model,
    checkpoint_manager=checkpoint_mgr
)

# Continue pipeline with checkpointing
base_clusters = await generate_base_clusters_from_conversation_summaries(
    summaries=summaries,
    checkpoint_manager=checkpoint_mgr
)

meta_clusters = await reduce_clusters_from_base_clusters(
    clusters=base_clusters,
    model=meta_model,
    checkpoint_manager=checkpoint_mgr
)

projected = await reduce_dimensionality_from_clusters(
    clusters=meta_clusters,
    model=dim_model,
    checkpoint_manager=checkpoint_mgr
)

Checkpoint Lifecycle

Checking Availability

# List all checkpoints
checkpoints = checkpoint_mgr.list_checkpoints()
print(f"Available: {checkpoints}")
# Output: ['summaries.jsonl', 'clusters.jsonl', 'meta_clusters.jsonl']

# Check if specific checkpoint exists
if "summaries" in [c.replace(".jsonl", "") for c in checkpoints]:
    print("Summaries checkpoint exists")

Deleting Checkpoints

# Delete a single checkpoint
checkpoint_mgr.delete_checkpoint("summaries")

# Delete all checkpoints
for checkpoint in checkpoint_mgr.list_checkpoints():
    checkpoint_mgr.delete_checkpoint(checkpoint)

Checkpoint Info (HF Datasets only)

info = checkpoint_mgr.get_checkpoint_info("summaries")
print(info)
# {
#   "num_rows": 1000,
#   "num_columns": 10,
#   "column_names": ["chat_id", "summary", ...],
#   "size_bytes": 5242880
# }

Disabling Checkpoints

For testing or when checkpoints aren’t needed:
checkpoint_mgr = JSONLCheckpointManager(
    checkpoint_dir="./checkpoints",
    enabled=False  # Disable checkpointing
)

# All checkpoint operations become no-ops
checkpoint_mgr.save_checkpoint("summaries", summaries)  # Does nothing
loaded = checkpoint_mgr.load_checkpoint("summaries", ConversationSummary)  # Returns None

Custom Checkpoint Managers

Implement BaseCheckpointManager for custom backends:
from kura.base_classes import BaseCheckpointManager
from typing import Optional, TypeVar, List
from pydantic import BaseModel

T = TypeVar("T", bound=BaseModel)

class RedisCheckpointManager(BaseCheckpointManager):
    def __init__(self, redis_url: str, **kwargs):
        super().__init__(checkpoint_dir="", **kwargs)
        self.redis = redis.from_url(redis_url)
    
    def setup_checkpoint_dir(self) -> None:
        # Connect to Redis
        pass
    
    def load_checkpoint(
        self, filename: str, model_class: type[T], **kwargs
    ) -> Optional[List[T]]:
        # Load from Redis
        data = self.redis.get(filename)
        if data:
            return [model_class.model_validate_json(line) for line in data.split("\n")]
        return None
    
    def save_checkpoint(self, filename: str, data: List[T], **kwargs) -> None:
        # Save to Redis
        serialized = "\n".join(item.model_dump_json() for item in data)
        self.redis.set(filename, serialized)
    
    def list_checkpoints(self) -> List[str]:
        # List keys in Redis
        return [k.decode() for k in self.redis.keys("*")]

Migration Between Formats

Convert checkpoints from one format to another:
# Load from JSONL
jsonl_mgr = JSONLCheckpointManager("./checkpoints/jsonl")
summaries = jsonl_mgr.load_checkpoint("summaries", ConversationSummary)

# Save to Parquet
parquet_mgr = ParquetCheckpointManager("./checkpoints/parquet")
parquet_mgr.save_checkpoint("summaries", summaries)

# Save to HuggingFace Datasets
hf_mgr = HFDatasetCheckpointManager(
    checkpoint_dir="./checkpoints/hf",
    hub_repo="my-org/kura-analysis"
)
hf_mgr.save_checkpoint("summaries", summaries)

CLI Migration Tool

Kura includes a CLI for checkpoint migration:
# Analyze existing checkpoints
kura analyze-checkpoints ./checkpoints

# Migrate JSONL to HuggingFace Datasets
kura migrate-checkpoints \
    ./old_checkpoints \
    ./new_hf_checkpoints \
    --hub-repo my-username/kura-analysis \
    --hub-token $HF_TOKEN \
    --compression gzip

Best Practices

1. Choose the Right Backend

# Small datasets (< 10k conversations): JSONL
checkpoint_mgr = JSONLCheckpointManager("./checkpoints")

# Medium datasets (10k-100k): Parquet
checkpoint_mgr = ParquetCheckpointManager("./checkpoints", compression="gzip")

# Large datasets (> 100k): HuggingFace Datasets with streaming
checkpoint_mgr = HFDatasetCheckpointManager(
    checkpoint_dir="./checkpoints",
    streaming=True,
    hub_repo="my-org/kura-analysis"
)

2. Use Cloud Storage for Collaboration

checkpoint_mgr = HFDatasetCheckpointManager(
    checkpoint_dir="./checkpoints",
    hub_repo="my-org/shared-analysis",
    hub_token=os.environ["HF_TOKEN"]
)

# Team members can load from the same Hub repo
loaded = checkpoint_mgr.load_checkpoint("summaries", ConversationSummary)

3. Version Your Checkpoints

import datetime

timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
checkpoint_mgr.save_checkpoint(f"summaries_{timestamp}", summaries)

4. Clean Up Old Checkpoints

import os
from pathlib import Path

def cleanup_old_checkpoints(checkpoint_dir: Path, keep_latest: int = 3):
    """Keep only the N most recent checkpoints."""
    files = sorted(
        checkpoint_dir.glob("*.jsonl"),
        key=lambda f: f.stat().st_mtime,
        reverse=True
    )
    for file in files[keep_latest:]:
        file.unlink()
        print(f"Deleted old checkpoint: {file.name}")

cleanup_old_checkpoints(Path("./checkpoints"), keep_latest=3)

5. Monitor Checkpoint Sizes

import os

def get_checkpoint_size(checkpoint_mgr, filename: str) -> int:
    """Get checkpoint size in bytes."""
    path = checkpoint_mgr.get_checkpoint_path(filename)
    return path.stat().st_size if path.exists() else 0

size_mb = get_checkpoint_size(checkpoint_mgr, "summaries") / 1024 / 1024
print(f"Summaries checkpoint: {size_mb:.2f} MB")

Next Steps

Pipeline Overview

Review the complete pipeline architecture

Conversations

Start at the beginning: loading conversations