Skip to content

redis-applied-ai/langcache-spark-connector

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

24 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

LangCache Spark Connector

Python Version License: MIT PySpark

A PySpark library for bulk-loading semantic cache data into LangCache. Pre-warm your LangCache instance with question-answer pairs using Spark's distributed processing power.

Overview

LangCache Spark Connector enables you to efficiently populate LangCache semantic caches at scale using PySpark. Whether you're running on Databricks, AWS EMR, or a local Spark cluster, this library provides a clean, Spark-native interface for bulk cache operations.

Key Benefits:

  • πŸš€ Distributed Processing: Leverage Spark's parallel processing for large-scale cache population
  • πŸ”Œ Simple Integration: Clean df.write.format("langcache-api") interface
  • 🏒 Production Ready: Built-in retry logic, rate limiting, and error handling
  • 🌐 Platform Agnostic: Works on Databricks, EMR, local Spark, or any Spark environment
  • πŸ”„ Zero-Downtime Deployments: Support for blue-green deployment strategies via langcache-router

Quick Start

Installation

Install from GitHub (PyPI release coming soon):

pip install git+https://github.com/redis-applied-ai/langcache-spark-connector.git

For Databricks, add to your cluster libraries or notebook:

%pip install git+https://github.com/redis-applied-ai/langcache-spark-connector.git

Basic Usage Example

Here's a complete example showing how to use the library in a PySpark job or Databricks notebook:

import os
from pyspark.sql import SparkSession
from langcache_spark.formats.langcache_api_format import add_langcache_api_format
from langcache_router import LangCacheRouter

# 1. Initialize Spark session
spark = SparkSession.builder \
    .appName("LangCache Bulk Load") \
    .getOrCreate()

# 2. Register the langcache-api format (one-time setup)
add_langcache_api_format()

# 3. Create or load your DataFrame with question-answer pairs
qa_df = spark.createDataFrame([
    ("What is Redis?", "Redis is an in-memory data store used as a database, cache, and message broker."),
    ("What is LangCache?", "LangCache is a semantic caching solution for LLM applications."),
    ("How does semantic caching work?", "Semantic caching stores responses based on meaning, not exact text matches."),
], ["question", "answer"])

# 4. Write to LangCache using the API format
qa_df.write \
    .format("langcache-api") \
    .option("api_url", os.getenv("LANGCACHE_API_URL")) \
    .option("cache_id", os.getenv("LANGCACHE_CACHE_ID")) \
    .option("api_key", os.getenv("LANGCACHE_API_KEY")) \
    .option("batch_size", "10") \
    .option("max_workers", "4") \
    .option("redis_uri", os.getenv("REDIS_URI")) \
    .option("rate_limit_per_sec", "500") \
    .save()

print("βœ… Successfully loaded data to LangCache!")

# 5. (Optional) Use the router to query the cache
router = LangCacheRouter(redis_uri=os.getenv("REDIS_URI"))
client = router.get_client()

# Query the cache
result = client.search(prompt="What is Redis?")
if result and result.data:
    print(f"Cache hit! Response: {result.data[0].response}")

Configuration

Set these environment variables before running:

export LANGCACHE_API_URL="https://your-langcache-instance.com"
export LANGCACHE_CACHE_ID="your-cache-id"
export LANGCACHE_API_KEY="your-api-key"
export REDIS_URI="redis://username:password@host:port"

For Databricks, use Databricks Secrets:

import os
os.environ["LANGCACHE_API_URL"] = dbutils.secrets.get(scope="langcache", key="api-url")
os.environ["LANGCACHE_CACHE_ID"] = dbutils.secrets.get(scope="langcache", key="cache-id")
os.environ["LANGCACHE_API_KEY"] = dbutils.secrets.get(scope="langcache", key="api-key")
os.environ["REDIS_URI"] = dbutils.secrets.get(scope="langcache", key="redis-uri")

Blue-Green Deployment Example

For production environments requiring zero-downtime deployments, use the blue-green deployment pattern with two LangCache instances:

import os
from pyspark.sql import SparkSession
from langcache import LangCache
from langcache_router import LangCacheRouter
from langcache_spark.formats.langcache_api_format import add_langcache_api_format

# 1. Initialize Spark session
spark = SparkSession.builder \
    .appName("LangCache Blue-Green Deployment") \
    .getOrCreate()

# 2. Register the langcache-api format
add_langcache_api_format()

# 3. Determine which instance is currently active
router = LangCacheRouter(redis_uri=os.getenv("REDIS_URI"))
current_config = router.get_active_config()
current_cache_id = current_config.cache_id

# Determine inactive instance (the one we'll load new data to)
if current_cache_id == os.getenv("LANGCACHE_BLUE_CACHE_ID"):
    # Blue is active, load to green
    inactive_url = os.getenv("LANGCACHE_GREEN_API_URL")
    inactive_cache_id = os.getenv("LANGCACHE_GREEN_CACHE_ID")
    inactive_api_key_var = "LANGCACHE_GREEN_API_KEY"
else:
    # Green is active, load to blue
    inactive_url = os.getenv("LANGCACHE_BLUE_API_URL")
    inactive_cache_id = os.getenv("LANGCACHE_BLUE_CACHE_ID")
    inactive_api_key_var = "LANGCACHE_BLUE_API_KEY"

print(f"βœ… Current active instance: {current_cache_id}")
print(f"πŸ“ Will load new data to: {inactive_cache_id}")

# 4. (Optional) Clear the inactive instance before loading
# Note: Delete by specific attributes, not empty dict
try:
    print(f"🧹 Clearing old data from inactive instance...")
    cleanup_client = LangCache(
        server_url=inactive_url,
        cache_id=inactive_cache_id,
        api_key=os.getenv(inactive_api_key_var)
    )
    # Example: Delete entries with old version attribute
    # cleanup_client.delete_query(attributes={"version": "old_version"})
    print("βœ… Inactive instance ready")
except Exception as e:
    print(f"⚠️  Warning: {e}")

# 5. Create or load your DataFrame with question-answer pairs
qa_df = spark.createDataFrame([
    ("What is Redis?", "Redis is an in-memory data store used as a database, cache, and message broker."),
    ("What is LangCache?", "LangCache is a semantic caching solution for LLM applications."),
    ("How does semantic caching work?", "Semantic caching stores responses based on meaning, not exact text matches."),
    ("What is blue-green deployment?", "Blue-green deployment is a strategy that reduces downtime by running two identical production environments."),
], ["question", "answer"])

# 6. Load data to the inactive instance using PySpark
print(f"πŸ“¦ Loading {qa_df.count()} Q&A pairs to inactive instance...")

qa_df.write \
    .format("langcache-api") \
    .option("api_url", inactive_url) \
    .option("cache_id", inactive_cache_id) \
    .option("api_key", os.getenv(inactive_api_key_var)) \
    .option("batch_size", "10") \
    .option("max_workers", "4") \
    .option("redis_uri", os.getenv("REDIS_URI")) \
    .option("rate_limit_per_sec", "500") \
    .save()

print("βœ… Data loaded to inactive instance successfully!")

# 7. Perform the blue-green switch
print(f"πŸ”„ Switching to newly loaded instance...")

router.set_active_instance(
    cache_url=inactive_url,
    cache_id=inactive_cache_id,
    cache_api_key_env_var=inactive_api_key_var
)

print("βœ… Blue-green switch completed!")
print(f"   New active instance: {inactive_cache_id}")
print(f"   Previous instance: {current_cache_id}")

# 8. Verify the switch by querying the now-active instance
print("\nπŸ” Verifying deployment...")

# Get a client automatically routed to the active instance
client = router.get_client()
print(f"   Client connected to: {client.cache_id}")

# Test query
test_question = "What is Redis?"
result = client.search(prompt=test_question)

if result and result.data:
    print(f"βœ… Cache hit! Response: {result.data[0].response[:100]}...")
    print("\nπŸŽ‰ Blue-green deployment completed successfully!")
else:
    print(f"⚠️  No cache hit yet (data may still be indexing)")

print("\n" + "=" * 80)
print(f"Active Instance: {inactive_cache_id}")
print(f"Previous Instance: {current_cache_id}")
print("=" * 80)

Environment Variables for Blue-Green Deployment:

# Redis for router state management
export REDIS_URI="redis://username:password@host:port"

# Blue instance configuration
export LANGCACHE_BLUE_API_URL="https://blue-instance.langcache.com"
export LANGCACHE_BLUE_CACHE_ID="blue-cache-id"
export LANGCACHE_BLUE_API_KEY="blue-api-key"

# Green instance configuration
export LANGCACHE_GREEN_API_URL="https://green-instance.langcache.com"
export LANGCACHE_GREEN_CACHE_ID="green-cache-id"
export LANGCACHE_GREEN_API_KEY="green-api-key"

Key Benefits of Blue-Green Deployment:

  • βœ… Zero Downtime: Users continue querying the active instance while new data loads to the inactive one
  • βœ… Instant Rollback: If issues arise, switch back to the previous instance immediately
  • βœ… Safe Testing: Verify data in the inactive instance before making it active
  • βœ… Atomic Switch: The router ensures all clients switch to the new instance simultaneously

See the Advanced Usage Guide for more deployment strategies and options.

Features

  • πŸ”Œ Spark-Native Interface: Use familiar df.write.format() syntax
  • ⚑ High Performance: Batched API calls with configurable parallelism
  • πŸ”„ Built-in Reliability: Automatic retries with exponential backoff
  • 🚦 Rate Limiting: Distributed rate limiting across Spark executors
  • πŸ“¦ Flexible Data Sources: Load from DataFrames, JSON files, or generate from PDFs
  • 🎯 Router Integration: Seamless integration with langcache-router for deployment management
  • 🌐 Platform Support: Works on Databricks, AWS EMR, local Spark, or any Spark cluster
  • πŸ“Š Version Management: Automatic versioning and cleanup of old cache entries

Usage Patterns

Pattern 1: Load from Existing DataFrame

If you already have question-answer data in a DataFrame:

from langcache_spark.formats.langcache_api_format import add_langcache_api_format

add_langcache_api_format()

# Your existing DataFrame with any column names
your_df.write \
    .format("langcache-api") \
    .option("api_url", "https://your-langcache.com") \
    .option("cache_id", "your-cache-id") \
    .option("api_key", "your-api-key") \
    .save()

Pattern 2: Load from JSON Cache File

If you have pre-generated Q&A pairs in a JSON file:

from langcache_spark.utils.qa_cache import load_qa_pairs_from_cache, create_qa_dataframe_from_cache
from langcache_spark.formats.langcache_api_format import add_langcache_api_format

# Load Q&A pairs from JSON file
qa_pairs = load_qa_pairs_from_cache("cache/my_qa_pairs.json")
qa_df = create_qa_dataframe_from_cache(spark, qa_pairs)

# Write to LangCache
add_langcache_api_format()
qa_df.write \
    .format("langcache-api") \
    .option("api_url", api_url) \
    .option("cache_id", cache_id) \
    .option("api_key", api_key) \
    .save()

Pattern 3: Generate from Documents (Advanced)

For generating Q&A pairs from PDF documents, see the Advanced Usage Guide.

Configuration Options

The langcache-api format supports the following options:

Option Description Default Required
api_url LangCache API endpoint URL LANGCACHE_API_URL env var Yes
cache_id LangCache cache identifier LANGCACHE_CACHE_ID env var Yes
api_key LangCache API key LANGCACHE_API_KEY env var No
batch_size Number of entries per batch 10 No
max_workers Concurrent API calls per executor 4 No
max_retries Maximum retry attempts 3 No
redis_uri Redis URI for rate limiting REDIS_URI env var No
rate_limit_per_sec Global rate limit (requests/sec) None No
attribute_version Version tag for entries Auto-generated timestamp No

Example with All Options

qa_df.write \
    .format("langcache-api") \
    .option("api_url", "https://your-langcache.com") \
    .option("cache_id", "prod-cache") \
    .option("api_key", "your-api-key") \
    .option("batch_size", "20") \
    .option("max_workers", "8") \
    .option("max_retries", "5") \
    .option("redis_uri", "redis://localhost:6379") \
    .option("rate_limit_per_sec", "500") \
    .option("attribute_version", "20250109120000") \
    .save()

Advanced Usage & Tools

This library includes additional tools and utilities for advanced use cases:

πŸ“š Documentation

πŸ› οΈ Additional Tools

  • Deployment Job: Unified deployment tool supporting multiple strategies (single-instance, blue-green)
  • Benchmarking Tools: Query performance testing with live metrics and deployment switch detection
  • Question Rephrasing: Generate realistic test scenarios for semantic cache validation
  • PDF Processing: Extract and generate Q&A pairs from PDF documents
  • Direct Redis Format: High-performance direct Redis writes for bulk operations

See the Advanced Usage Guide for detailed documentation on these tools.

πŸ““ Interactive Notebooks

Explore the library interactively with our Jupyter notebooks:

πŸ’‘ Examples

Check out the examples/ directory for complete code examples:

  • blue_green_deployment_example.py - Complete blue-green deployment workflow (recommended starting point)
  • langcache_api_format_example.py - Using the langcache-api format
  • router_example.py - Working with langcache-router
  • rate_limiting_example.py - Configuring distributed rate limiting
  • load_hf_datasets_to_redis.py - Loading HuggingFace datasets

Platform Support

Databricks

The library works seamlessly on Databricks. See the Databricks Deployment Guide for:

  • Installation instructions
  • Secrets management
  • DBFS integration
  • Complete example notebooks

AWS EMR

Works on AWS EMR with standard PySpark configuration. Install the package on your cluster and use as shown in the examples above.

Local Spark

Perfect for development and testing. Install locally and run with:

pip install git+https://github.com/redis-applied-ai/langcache-spark-connector.git

API Reference

LangCache API Format

The primary interface for writing data to LangCache:

from langcache_spark.formats.langcache_api_format import add_langcache_api_format

add_langcache_api_format()

df.write \
    .format("langcache-api") \
    .option("api_url", "...") \
    .option("cache_id", "...") \
    .save()

See API Format Documentation for complete details.

Router Integration

For deployment management and client routing:

from langcache_router import LangCacheRouter

router = LangCacheRouter(redis_uri="redis://localhost:6379")
client = router.get_client()
result = client.search(prompt="Your question")
if result and result.data:
    print(f"Response: {result.data[0].response}")

See Router Integration Guide for complete details.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

Related Projects

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published