Skip to content

brianjwalters/document-processing-service

Repository files navigation

Document Processing Service

Overview

The Document Processing Service is the central orchestration service for the Luris document processing pipeline. It provides a single /process endpoint that manages the complete flow from document upload through entity extraction, chunking, embedding generation, GraphRAG processing, and database storage.

Port: 8000
API Prefix: /api/v1

Architecture

This service acts as an orchestrator, calling other microservices via HTTP APIs:

  1. Document Upload Service (8008) - Document upload and markdown conversion
  2. Entity Extraction Service (8007) - Multi-mode entity extraction
  3. Chunking Service (8009) - Contextual document chunking
  4. Prompt Service (8003) - Embedding generation
  5. GraphRAG Service (8010) - Knowledge graph creation
  6. Supabase (Direct) - Database storage via shared client

Features

  • Single Entry Point: One /process endpoint for complete pipeline
  • Multi-Mode Processing: Support for regex, AI-enhanced, and hybrid extraction
  • Async/Sync Modes: Both synchronous and asynchronous processing
  • Job Management: Track, query, and cancel processing jobs
  • Quality Metrics: Comprehensive quality metrics and reporting
  • Error Recovery: Graceful handling of service failures
  • Progress Tracking: Real-time job progress monitoring

Installation

Prerequisites

  • Python 3.11+
  • Virtual environment
  • All dependent services running

Setup

# Create virtual environment
python3 -m venv venv
source venv/bin/activate

# Install dependencies
pip install -r requirements.txt

# Set environment variables
export SUPABASE_URL="your-supabase-url"
export SUPABASE_API_KEY="your-api-key"
export SUPABASE_SERVICE_KEY="your-service-key"

Running the Service

Development Mode

source venv/bin/activate
python run.py

Production Mode

source venv/bin/activate
export ENVIRONMENT=production
export DEBUG=false
python run.py

Using manage_services.py

source venv/bin/activate
python3 manage_services.py start document-processing

API Endpoints

Complete Pipeline Processing (NEW)

POST /api/v1/process/complete

Process a document through the COMPLETE integrated pipeline with 95% accuracy target and quality gates.

Features:

  • Full pipeline orchestration with all microservices
  • Quality gates with 95% accuracy targeting
  • Enhanced entity extraction with hybrid mode
  • Anthropic Contextual Retrieval chunking
  • GraphRAG knowledge graph construction
  • Comprehensive quality metrics and validation

Request:

  • Form data with file upload (same parameters as /process)
  • Additional parameter: target_accuracy: Accuracy target (default: 0.95)
  • Additional parameter: enable_quality_gates: Enable quality validation (default: true)

Standard Processing

POST /api/v1/process

Process a document through the complete pipeline.

Request:

  • Form data with file upload
  • Parameters:
    • file: Document file (required)
    • client_id: Client identifier (required)
    • case_id: Case identifier (optional)
    • extraction_modes: Comma-separated modes (default: "regex,ai_enhanced,hybrid")
    • processing_mode: "synchronous" or "asynchronous" (default: "asynchronous")
    • enable_contextual_enhancement: Enable chunk enhancement (default: true)
    • chunk_size: Size of chunks (default: 800)
    • chunk_overlap: Overlap between chunks (default: 200)
    • enable_graphrag: Enable GraphRAG (default: true)
    • enable_embeddings: Generate embeddings (default: true)
    • min_confidence: Minimum entity confidence (default: 0.7)

Response:

{
  "job_id": "job_abc123",
  "status": "completed",
  "document_id": "doc_xyz789",
  "document_title": "Contract Agreement",
  "document_type": "contract",
  "entity_extraction_results": {
    "regex": {
      "entity_count": 45,
      "confidence_avg": 0.92,
      "processing_time_ms": 250
    },
    "ai_enhanced": {
      "entity_count": 52,
      "confidence_avg": 0.88,
      "processing_time_ms": 1500
    }
  },
  "quality_metrics": {
    "overall_accuracy": 0.95,
    "entity_confidence_avg": 0.87,
    "entity_count_total": 52,
    "chunk_count": 12,
    "embedding_coverage": 1.0,
    "graphrag_nodes": 48,
    "graphrag_edges": 156
  },
  "processing_time_ms": 8750
}

Check Job Status

GET /api/v1/status/{job_id}

Get the status of a processing job.

Response:

{
  "job_id": "job_abc123",
  "status": "processing",
  "progress": 0.45,
  "current_stage": "chunking",
  "message": "Processing document chunks..."
}

Cancel Job

POST /api/v1/cancel/{job_id}

Cancel a processing job.

List Jobs

GET /api/v1/jobs

List processing jobs with optional filtering.

Query Parameters:

  • client_id: Filter by client
  • status: Filter by status
  • limit: Maximum results (default: 100)

Health Monitoring

The service provides standardized health check endpoints:

  • GET /api/v1/health - Basic health status
  • GET /api/v1/health/ping - Simple ping check for load balancers
  • GET /api/v1/health/ready - Readiness check with dependency verification
  • GET /api/v1/health/detailed - Comprehensive health information including metrics

Example:

# Check basic health
curl http://localhost:8000/api/v1/health

# Check readiness
curl http://localhost:8000/api/v1/health/ready

# Get detailed health info
curl http://localhost:8000/api/v1/health/detailed

Advanced Health Endpoints

GET /api/v1/health/dependencies

Detailed dependency status for all microservices in the processing pipeline.

Pipeline Stages

1. Upload Stage

  • Accepts document file
  • Converts to markdown format
  • Extracts basic metadata
  • Generates document ID

2. Entity Extraction Stage

  • Runs multiple extraction modes in parallel
  • Regex pattern matching
  • AI-enhanced extraction
  • Hybrid approach combining both
  • Calculates confidence scores

3. Chunking Stage

  • Splits document into manageable chunks
  • Applies contextual enhancement
  • Maintains chunk overlap
  • Preserves document structure

4. Embedding Stage

  • Generates vector embeddings
  • Uses Prompt Service
  • 768-dimensional vectors
  • Batch processing for efficiency

5. GraphRAG Stage

  • Creates knowledge graph
  • Links entities and relationships
  • Builds document registry
  • Generates graph metrics

6. Database Storage Stage

  • Stores in graph schema tables
  • Document registry
  • Enhanced contextual chunks
  • Entity records
  • Relationship mappings

Configuration

Key environment variables:

# Service Configuration
SERVICE_PORT=8000
ENVIRONMENT=development
DEBUG=false

# Service URLs
DOCUMENT_UPLOAD_SERVICE_URL=http://localhost:8008
ENTITY_EXTRACTION_SERVICE_URL=http://localhost:8007
CHUNKING_SERVICE_URL=http://localhost:8009
GRAPHRAG_SERVICE_URL=http://localhost:8010
PROMPT_SERVICE_URL=http://localhost:8003

# Processing Configuration
MAX_FILE_SIZE_MB=100
OVERALL_TIMEOUT=600  # 10 minutes
MAX_CONCURRENT_JOBS=5

# Database Configuration
SUPABASE_URL=your-supabase-url
SUPABASE_API_KEY=your-api-key
SUPABASE_SERVICE_KEY=your-service-key

Error Handling

The service implements comprehensive error handling:

  • Service Failures: Gracefully handles unavailable services
  • Timeouts: Configurable timeouts for each stage
  • Retries: Automatic retry with exponential backoff
  • Partial Success: Continues pipeline even if optional stages fail
  • Error Reporting: Detailed error information in responses

Quality Metrics

The service calculates and reports quality metrics:

  • Overall Accuracy: Pipeline completion rate
  • Entity Confidence: Average confidence scores
  • Extraction Coverage: Entities found vs expected
  • Processing Completeness: Stages completed successfully
  • Performance Metrics: Processing time per stage

Monitoring

Metrics Endpoint

When enabled, Prometheus metrics available at /metrics:

  • Request counts and latencies
  • Job processing metrics
  • Service dependency health
  • Error rates and types

Logging

Comprehensive logging with:

  • Request/response tracking
  • Stage execution details
  • Error stack traces
  • Performance measurements

Development

Running Tests

source venv/bin/activate
pytest tests/ -v

Code Quality

# Linting
flake8 src/

# Type checking
mypy src/

# Format code
black src/
isort src/

Troubleshooting

Common Issues

  1. Service Dependencies Not Available

    • Ensure all required services are running
    • Check service URLs in configuration
    • Verify network connectivity
  2. Timeout Errors

    • Increase timeout values for large documents
    • Check service performance
    • Consider using asynchronous mode
  3. Database Connection Issues

    • Verify Supabase credentials
    • Check network access to Supabase
    • Ensure proper schema permissions

Debug Mode

Enable debug mode for detailed logging:

export DEBUG=true
export LOG_LEVEL=DEBUG
python run.py

API Examples

Process Document (Synchronous)

curl -X POST http://localhost:8000/api/v1/process \
  -F "file=@document.pdf" \
  -F "client_id=client_123" \
  -F "processing_mode=synchronous" \
  -F "extraction_modes=regex,ai_enhanced,hybrid"

Process Document (Asynchronous)

# Start processing
response=$(curl -X POST http://localhost:8000/api/v1/process \
  -F "file=@document.pdf" \
  -F "client_id=client_123" \
  -F "processing_mode=asynchronous")

job_id=$(echo $response | jq -r '.job_id')

# Check status
curl http://localhost:8000/api/v1/status/$job_id

Python Client Example

import httpx
import asyncio

async def process_document():
    async with httpx.AsyncClient() as client:
        # Upload and process
        with open("document.pdf", "rb") as f:
            response = await client.post(
                "http://localhost:8000/api/v1/process",
                files={"file": ("document.pdf", f)},
                data={
                    "client_id": "client_123",
                    "extraction_modes": "regex,ai_enhanced",
                    "processing_mode": "asynchronous"
                }
            )
        
        job = response.json()
        job_id = job["job_id"]
        
        # Poll for completion
        while True:
            status_response = await client.get(
                f"http://localhost:8000/api/v1/status/{job_id}"
            )
            status = status_response.json()
            
            if status["status"] in ["completed", "failed"]:
                return status
                
            await asyncio.sleep(2)

# Run
result = asyncio.run(process_document())
print(result)

License

Copyright (c) 2024 Luris. All rights reserved.

About

Document processing service orchestrating the 7-stage pipeline in Luris

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages