Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/jxnl/kura/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Kura provides a flexible checkpoint management system for persisting intermediate pipeline results. The system supports multiple backends with different performance and feature characteristics.

Available Checkpoint Managers

JSONLCheckpointManager

The traditional JSONL file-based checkpoint system. This is the default and most compatible option.
from kura.checkpoints import JSONLCheckpointManager

manager = JSONLCheckpointManager(
    checkpoint_dir="./checkpoints",
    enabled=True
)
checkpoint_dir
str
required
Directory path for saving checkpoints
enabled
bool
default:"True"
Whether checkpointing is enabled

Methods

load_checkpoint
Optional[List[T]]
Load data from a checkpoint file if it exists.Parameters:
  • filename (str): Name of the checkpoint file
  • model_class (type[T]): Pydantic model class for deserializing the data
Returns: List of model instances if checkpoint exists, None otherwise
save_checkpoint
None
Save data to a checkpoint file.Parameters:
  • filename (str): Name of the checkpoint file
  • data (List[T]): List of model instances to save
list_checkpoints
List[str]
List all available checkpoint files.Returns: List of checkpoint filenames
delete_checkpoint
bool
Delete a checkpoint file.Parameters:
  • filename (str): Name of the checkpoint file to delete
Returns: True if file was deleted, False if it didn’t exist

Example Usage

from kura.checkpoints import JSONLCheckpointManager
from kura.types import ConversationSummary

# Initialize manager
manager = JSONLCheckpointManager("./my-checkpoints")

# Save checkpoint
summaries = [...]  # List of ConversationSummary objects
manager.save_checkpoint("summaries", summaries)

# Load checkpoint
loaded = manager.load_checkpoint("summaries.jsonl", ConversationSummary)

# List available checkpoints
checkpoints = manager.list_checkpoints()
print(checkpoints)  # ['summaries.jsonl', 'clusters.jsonl']

# Delete checkpoint
manager.delete_checkpoint("old_summaries.jsonl")

HFDatasetCheckpointManager

HuggingFace datasets-based checkpoint system with advanced features like streaming, versioning, and cloud storage integration.
from kura.checkpoints import HFDatasetCheckpointManager

manager = HFDatasetCheckpointManager(
    checkpoint_dir="./hf-checkpoints",
    enabled=True,
    hub_repo="my-username/kura-analysis",
    hub_token="hf_...",
    streaming=False,
    compression="gzip"
)
checkpoint_dir
str
required
Directory path for saving checkpoints locally
enabled
bool
default:"True"
Whether checkpointing is enabled
hub_repo
Optional[str]
default:"None"
HuggingFace Hub repository name for cloud storage (e.g., “username/repo-name”)
hub_token
Optional[str]
default:"None"
HuggingFace Hub authentication token for private repositories
streaming
bool
default:"False"
Whether to use streaming mode by default for memory-efficient processing
compression
Optional[str]
default:"gzip"
Compression algorithm to use. Options: "gzip", "lz4", "zstd", or None

Features

  • Memory-mapped files: Efficient access without loading everything into memory
  • Streaming support: Process datasets larger than available RAM
  • Built-in compression: Automatic compression with multiple algorithms
  • Version control: Track changes via HuggingFace Hub
  • Rich querying: Filter and search capabilities
  • Schema validation: Type safety with predefined schemas

Methods

load_checkpoint
Optional[List[T]]
Load data from a checkpoint using HuggingFace datasets.Parameters:
  • filename (str): Name of the checkpoint
  • model_class (type[T]): Pydantic model class for deserializing
  • streaming (Optional[bool]): Override default streaming mode
  • checkpoint_type (Optional[str]): Type hint for deserialization
Returns: List of model instances if checkpoint exists, None otherwise
save_checkpoint
None
Save data to a checkpoint using HuggingFace datasets.Parameters:
  • filename (str): Name of the checkpoint (without extension)
  • data (List[T]): List of model instances to save
  • checkpoint_type (Optional[str]): Type hint for schema selection
get_checkpoint_info
Optional[Dict[str, Any]]
Get information about a checkpoint dataset.Parameters:
  • filename (str): Name of the checkpoint
Returns: Dictionary with checkpoint metadata and statistics including:
  • num_rows: Number of records
  • num_columns: Number of columns
  • column_names: List of column names
  • features: Dataset schema information
  • size_bytes: Total size on disk
filter_checkpoint
Optional[List[T]]
Filter a checkpoint dataset without loading everything into memory.Parameters:
  • filename (str): Name of the checkpoint
  • filter_fn (Callable[[Dict[str, Any]], bool]): Function to filter rows
  • model_class (type[T]): Pydantic model class for results
  • checkpoint_type (Optional[str]): Type hint for deserialization
Returns: List of filtered model instances

Example Usage

from kura.checkpoints import HFDatasetCheckpointManager
from kura.types import Cluster

# Initialize with HuggingFace Hub integration
manager = HFDatasetCheckpointManager(
    checkpoint_dir="./hf-checkpoints",
    hub_repo="my-username/kura-analysis",
    hub_token="hf_xxx",
    compression="gzip"
)

# Save checkpoint (auto-uploads to Hub if configured)
clusters = [...]  # List of Cluster objects
manager.save_checkpoint(
    "meta_clusters",
    clusters,
    checkpoint_type="clusters"
)

# Get checkpoint information
info = manager.get_checkpoint_info("meta_clusters")
print(f"Rows: {info['num_rows']}, Size: {info['size_bytes']} bytes")

# Filter checkpoint without loading all data
large_clusters = manager.filter_checkpoint(
    "meta_clusters",
    filter_fn=lambda x: len(x['chat_ids']) > 100,
    model_class=Cluster,
    checkpoint_type="clusters"
)

# Load with streaming for large datasets
manager_stream = HFDatasetCheckpointManager(
    checkpoint_dir="./hf-checkpoints",
    streaming=True
)
data = manager_stream.load_checkpoint(
    "large_dataset",
    Cluster,
    checkpoint_type="clusters"
)

Supported Data Types

The HFDatasetCheckpointManager has predefined schemas for:
  • conversations: Conversation objects with messages and metadata
  • summaries: ConversationSummary objects with embeddings
  • clusters: Cluster objects with hierarchical relationships
  • projected_clusters: ProjectedCluster objects with 2D coordinates

ParquetCheckpointManager

Requires the optional parquet dependency group: pip install kura[parquet]
Parquet-based checkpoint system that provides better compression (~50% space savings) and faster loading compared to JSONL.
from kura.checkpoints import ParquetCheckpointManager

manager = ParquetCheckpointManager(
    checkpoint_dir="./parquet-checkpoints",
    enabled=True,
    compression="snappy"
)
checkpoint_dir
str
required
Directory path for saving Parquet files
enabled
bool
default:"True"
Whether checkpointing is enabled
compression
str
default:"snappy"
Compression algorithm: snappy, gzip, brotli, lz4, or zstd

Features

  • 50% space savings: Better compression than JSONL
  • Faster loading: Columnar format optimized for analytics
  • Type-safe schemas: PyArrow schema validation
  • Multiple compression options: Choose speed vs size tradeoff

Example Usage

from kura.checkpoints import ParquetCheckpointManager
from kura.types import ConversationSummary

# Initialize with gzip compression (better compression, slower)
manager = ParquetCheckpointManager(
    checkpoint_dir="./parquet-checkpoints",
    compression="gzip"
)

# Save checkpoint
summaries = [...]  # List of ConversationSummary objects
manager.save_checkpoint("summaries", summaries)

# Load checkpoint
loaded = manager.load_checkpoint("summaries", ConversationSummary)

Compression Options

AlgorithmSpeedCompressionBest For
snappyFastestGoodDefault choice
lz4Very FastGoodSpeed-critical
gzipMediumBetterBalanced
zstdMediumBestMax compression
brotliSlowBestArchival

SQLCheckpointManager

Requires the optional sqlmodel dependency: pip install sqlmodel
SQL-based checkpoint system with run tracking and rich querying capabilities. Supports SQLite, PostgreSQL, MySQL, and other SQLAlchemy-compatible databases.
from kura.checkpoints import SQLCheckpointManager

# SQLite (file-based, no server needed)
manager = SQLCheckpointManager(
    database_url="sqlite:///./kura-checkpoints.db",
    enabled=True
)

# PostgreSQL (for production)
manager = SQLCheckpointManager(
    database_url="postgresql://user:password@localhost/kura_db"
)
database_url
str
required
SQLAlchemy database URL:
  • SQLite: sqlite:///path/to/database.db
  • PostgreSQL: postgresql://user:pass@localhost/dbname
  • MySQL: mysql://user:pass@localhost/dbname
enabled
bool
default:"True"
Whether checkpointing is enabled

Features

  • Run tracking: Track multiple analysis runs with metadata
  • Rich querying: SQL queries for filtering and analysis
  • Relational data: Proper foreign keys and relationships
  • Production-ready: PostgreSQL support for multi-user deployments
  • ACID compliance: Reliable transactions and data integrity

Methods

create_run
RunTable
Create a new analysis run for organizing checkpoints.Parameters:
  • name (str): Human-readable name for the run
  • description (Optional[str]): Detailed description
  • config (Optional[dict]): Configuration dictionary
Returns: RunTable object with generated ID
get_run
Optional[RunTable]
Retrieve a specific run by ID.Parameters:
  • run_id (str): The run’s unique identifier
Returns: RunTable object or None
list_runs
List[RunTable]
List all analysis runs.Returns: List of RunTable objects ordered by creation time

Example Usage

from kura.checkpoints import SQLCheckpointManager
from kura.types import ConversationSummary

# Initialize with SQLite
manager = SQLCheckpointManager("sqlite:///./kura.db")

# Create a run for this analysis
run = manager.create_run(
    name="Production Analysis 2024-03",
    description="Monthly conversation analysis",
    config={"model": "gpt-4o", "clusters": 100}
)

print(f"Created run: {run.id}")

# Save checkpoint (automatically associated with current run)
summaries = [...]
manager.save_checkpoint("summaries", summaries)

# Query previous runs
runs = manager.list_runs()
for run in runs:
    print(f"{run.name}: {run.status} - {run.created_at}")

# Load from specific run
manager.current_run_id = run.id
loaded = manager.load_checkpoint("summaries", ConversationSummary)

PostgreSQL Production Setup

# Production configuration with PostgreSQL
manager = SQLCheckpointManager(
    database_url="postgresql://kura_user:secure_pass@db.example.com:5432/kura_prod"
)

# Use connection pooling for better performance
from sqlalchemy.pool import QueuePool
manager.engine = create_engine(
    database_url,
    poolclass=QueuePool,
    pool_size=10,
    max_overflow=20
)

MultiCheckpointManager

Coordinates multiple checkpoint backends simultaneously for redundancy and performance optimization.
from kura.checkpoints import (
    MultiCheckpointManager,
    JSONLCheckpointManager,
    HFDatasetCheckpointManager
)

# Create multiple backends
jsonl_backend = JSONLCheckpointManager("./jsonl-checkpoints")
hf_backend = HFDatasetCheckpointManager("./hf-checkpoints")

# Coordinate them
manager = MultiCheckpointManager([jsonl_backend, hf_backend])

# Operations apply to all backends
manager.save_checkpoint("data", items)

Base Class

BaseCheckpointManager

Abstract base class that all checkpoint managers inherit from.
from kura.base_classes import BaseCheckpointManager
from typing import Optional, List, TypeVar
from pydantic import BaseModel

T = TypeVar("T", bound=BaseModel)

class CustomCheckpointManager(BaseCheckpointManager):
    def setup_checkpoint_dir(self) -> None:
        # Setup logic
        pass
    
    def load_checkpoint(
        self, filename: str, model_class: type[T], **kwargs
    ) -> Optional[List[T]]:
        # Load logic
        pass
    
    def save_checkpoint(self, filename: str, data: List[T], **kwargs) -> None:
        # Save logic
        pass
    
    def list_checkpoints(self) -> List[str]:
        # List logic
        pass

Configuration

Environment Variables

Checkpoint managers can be configured via environment variables:
# Set checkpoint directory
export KURA_CHECKPOINT_DIR="./my-checkpoints"

# Set checkpoint format
export KURA_CHECKPOINT_FORMAT="hf-dataset"  # or "jsonl"

In Pipeline

Checkpoint managers integrate with the Kura pipeline:
from kura.v1 import Pipeline
from kura.checkpoints import HFDatasetCheckpointManager

pipeline = Pipeline(
    checkpoint_manager=HFDatasetCheckpointManager(
        checkpoint_dir="./checkpoints",
        compression="zstd"  # Better compression
    )
)

Performance Comparison

FeatureJSONLHuggingFace Datasets
CompatibilityHighRequires datasets package
CompressionNone50%+ space savings
Loading SpeedFast for small dataFaster for large data
Memory UsageLoads all into RAMMemory-mapped
StreamingNoYes
Cloud StorageManualBuilt-in (Hub)
QueryingManual filteringRich query support

Best Practices

Use JSONLCheckpointManager for:
  • Small to medium datasets (< 100K conversations)
  • Maximum compatibility
  • Simple local storage needs
Use HFDatasetCheckpointManager for:
  • Large datasets (> 100K conversations)
  • Memory-constrained environments
  • Cloud storage and versioning needs
  • Advanced filtering and querying
Enable compression when using HFDatasetCheckpointManager to reduce disk usage by 50% or more.
Use streaming mode for datasets that don’t fit in available RAM.

Migration

See the CLI documentation for migrating between checkpoint formats.