Skip to content

Java Spring Boot service that processes files via gRPC, featuring an adaptive thread-pool with backpressure, Micrometer/Prometheus metrics, and example Grafana dashboards to visualize pool behavior (base state, load spike, resize, cooldown). Includes Docker Compose for local telemetry and configuration-driven resizing.

Notifications You must be signed in to change notification settings

docflex/gRPC-File-Processor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

8 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

File Processing Microservice

A scalable, concurrent, gRPC-based microservice for processing files with multiple operations such as validation, metadata extraction, OCR, compression, format conversion, image resizing, and storage.

Table of Contents

  1. Overview
  2. Architecture
  3. Key Components
  4. gRPC Services & Methods
  5. Grafana Dashboards
  6. Usage
  7. Metrics
  8. Limitations
  9. Future Scope
  10. License

Overview

This microservice is designed to:

  • Handle large volumes of files concurrently.
  • Allow multiple file operations in a scalable, thread-safe manner.
  • Provide unary, streaming, and bidirectional gRPC endpoints.
  • Enable runtime introspection using gRPC Server Reflection.

Architecture

+--------------------------+
|  gRPC Client / grpcurl   |
+-----------+--------------+
            |
            v
+--------------------------+
|  FileProcessingService   |  (GrpcService)
|  - processFile           |
|  - streamFileOperations  |
|  - uploadFiles           |
|  - liveFileProcessing    |
+-----------+--------------+
            |
            v
+--------------------------+
| ProcessFileService       |  (Business logic)
| - Converts proto request |
|   -> Internal model      |
| - Executes operations   |
| - Updates metrics        |
+-----------+--------------+
            |
            v
+--------------------------+
| WorkflowExecutorService  |  (Concurrency)
| - Manages FileWorkflow   |
| - Submits FileTasks to   |
|   ThreadPoolManager      |
+-----------+--------------+
            |
            v
+--------------------------+
| ThreadPoolManager        |  (Thread Pool)
| - Adaptive resizing      |
| - Backpressure handling  |
| - Task metrics tracking  |
+--------------------------+

Key Components

1. Models

Model Purpose
FileModel Immutable file representation (name, content, type, size)
FileOperation Encapsulates a single operation on a file with optional parameters
FileOperationResultModel Captures status, timestamps, and result location of an operation
FileProcessingRequestModel Internal request model supporting default and file-specific operations
FileProcessingSummaryModel Summarizes results of a batch of file operations
FileTask Encapsulates a file + operation + CompletableFuture result
FileWorkflow Groups multiple FileTasks into a workflow with unique ID
FileProcessingMetrics Tracks active tasks and average duration

2. Threading / Concurrency

  • ThreadPoolManager

    • Dynamic resizing based on queue size
    • Bounded queue for backpressure
    • Monitors active tasks and adjusts core/max threads
    • Uses CallerRunsPolicy for overload protection
  • WorkflowExecutorService

    • Converts a workflow of tasks into concurrent submissions
    • Waits for all CompletableFutures to complete
    • Aggregates results into a summary
    • Tracks failed tasks based on FileOperationResultModel.status(), not only exceptions
    • Works in streaming mode, pushing results to a consumer as soon as they complete

3. Operations

Operations are now implemented in a utility class FileOperationsUtil:

  • validateFile(FileModel file) – validates file size
  • extractMetadata(FileModel file) – mock metadata extraction
  • compressFile(FileModel file) – mock compression
  • (Other operations like OCR, resize, format conversion, store can be implemented similarly)

gRPC Services & Methods

RPC Method Type Description
ProcessFile Unary Process multiple files and return a summary.
StreamFileOperations Server Streaming Stream each FileOperationResult as it completes.
UploadFiles Client Streaming Upload multiple files as a stream and return a summary.
LiveFileProcessing Bidirectional Streaming Real-time streaming of file operations and results.

Server Reflection is enabled, allowing grpcurl to inspect services without .proto files.


Grafana Dashboards

Below are suggested Grafana dashboard panels and example Prometheus queries for monitoring the thread pool and workflow metrics. The repository includes five dashboard screenshots in the public/ folder — add them to a Grafana dashboard or view them here as reference:

  1. Base State

Base State

  1. Load Spike Taking Place

Load Spike Taking Place

  1. Prolonged Load

Prolonged Load

  1. Resize Mechanism Triggered for Pool Size

Resize Mechanism Triggered for Pool Size

  1. After the Cooldown, pool resizes to core size limit

After Cooldown - Resized to Core Pool

Notes / Assumptions

  • Metrics logged in the code use dot notation (for example fileprocessing.threadpool.active). When exported to Prometheus via Micrometer the metric names are expected to be converted to snake_case (e.g. fileprocessing_threadpool_active). The PromQL examples below use the underscore form — adjust if your exporter uses a different naming convention.
  • If you expose configuration values as metrics (recommended) you can reference them in alerts; otherwise use the literal values shown below.

Thread pool config (used for the dashboard behavior)

fileprocessing.threadpool.core-size=4
fileprocessing.threadpool.max-size=16
fileprocessing.threadpool.queue-capacity=200
fileprocessing.threadpool.resize-threshold=65
fileprocessing.threadpool.keep-alive-seconds=60
fileprocessing.threadpool.monitor-interval-seconds=1

Recommended Grafana panels and PromQL queries

  • Active Threads (Gauge / Time series)

    • Metric: fileprocessing_threadpool_active
    • Panel: Time series (line)
    • Color: Orange / Yellow
    • PromQL: fileprocessing_threadpool_active
    • Alert rule (example): fileprocessing_threadpool_active > fileprocessing_threadpool_size * 0.9 (fires when > 90% of current pool size)
  • Queue Size (Time series)

    • Metric: fileprocessing_threadpool_queue
    • Panel: Time series (line)
    • Color: Red when queue > threshold
    • PromQL: fileprocessing_threadpool_queue
    • Alert rule (example): fileprocessing_threadpool_queue > 0.8 * 200 (assumes queue capacity = 200; if you export capacity as a metric use fileprocessing_threadpool_queue_capacity instead)
  • Pool Size (Time series)

    • Metric: fileprocessing_threadpool_size
    • Panel: Time series (line)
    • PromQL: fileprocessing_threadpool_size
    • Notes: Overlay current pool size with fileprocessing_threadpool_largest (peak) to see growth
  • Largest Pool Size (Single Stat / Gauge)

    • Metric: fileprocessing_threadpool_largest
    • Panel: Stat / Gauge
    • PromQL: fileprocessing_threadpool_largest
    • Notes: Shows peak threads used since process start
  • Completed Tasks & Total Tasks Submitted (Time series)

    • Metrics: fileprocessing_threadpool_completed, fileprocessing_threadpool_submitted_total
    • Panel: Time series (multi-line)
    • PromQL: rate(fileprocessing_threadpool_submitted_total[1m]) and rate(fileprocessing_threadpool_completed[1m]) for throughput; use raw counters for totals
    • Throughput / Pending calculation: fileprocessing_threadpool_submitted_total - fileprocessing_threadpool_completed = pending_tasks

Example Prometheus alerts (suggested)

  1. High active threads relative to pool size

Expression:

fileprocessing_threadpool_active > fileprocessing_threadpool_size * 0.9

For alerting, wrap in avg_over_time or require for a duration, e.g.:

max_over_time((fileprocessing_threadpool_active > fileprocessing_threadpool_size * 0.9)[5m:])
  1. Queue saturation

Expression (literal capacity example):

fileprocessing_threadpool_queue > 0.8 * 200

If you export fileprocessing_threadpool_queue_capacity as a metric, prefer:

fileprocessing_threadpool_queue > 0.8 * fileprocessing_threadpool_queue_capacity
  1. Pending tasks steadily increasing (possible backpressure)

Expression (rate-based):

increase(fileprocessing_threadpool_submitted_total[5m]) - increase(fileprocessing_threadpool_completed[5m]) > 50

(Alerts if more than 50 tasks are accumulating in 5 minutes — tune per workload)

  1. Sudden spike in task duration (example if you export avg duration)

Expression (if metric exported as fileprocessing_threadpool_avg_task_duration_seconds):

fileprocessing_threadpool_avg_task_duration_seconds > 10

Dashboard design tips

  • Use a short (1s–5s) panel refresh rate for the thread pool dashboards when debugging, but increase to 10s–30s for normal operation to reduce load.
  • Add thresholds and colored regions to highlight warning and critical conditions.
  • Add a row showing current config values (core, max, queue capacity) as text or single stat panels — this helps correlate resizing events.
  • If possible, export config values as metrics (e.g., fileprocessing_threadpool_core_size) at service startup — this makes alerting and dashboards robust to config changes.
  • Correlate with application logs: include a log panel (Loki) or link to recent logs around resize events to debug why the pool resized.

How to include the images in the repo README

The images are already in public/. The markdown above references them relatively (e.g. ./public/1. Base State.png). If your markdown renderer cannot resolve spaces in filenames you can either:

  • Rename files to remove spaces (recommended), or
  • URL encode spaces (1.%20Base%20State.png) in the image path.

Usage

1. Run the server

mvn spring-boot:run
# or
java -jar target/fileprocessing-1.0-SNAPSHOT.jar

Server runs on localhost:9090 (default).

2. List services via grpcurl

grpcurl -plaintext localhost:9090 list
# Output:
# com.fileprocessing.FileProcessingService
# grpc.health.v1.Health
# grpc.reflection.v1alpha.ServerReflection

3. Describe methods

grpcurl -plaintext localhost:9090 describe com.fileprocessing.FileProcessingService

4. Invoke ProcessFile (unary)

grpcurl -plaintext -d '{
  "files": [
    {
      "fileId": "1",
      "fileName": "sample.txt",
      "content": "SGVsbG8gd29ybGQ=",
      "fileType": "txt",
      "sizeBytes": 11
    }
  ],
  "operations": ["VALIDATE","METADATA_EXTRACTION"]
}' localhost:9090 com.fileprocessing.FileProcessingService/ProcessFile

Streaming RPC Usage Example

The streaming RPC StreamFileOperations allows the client to receive results in real-time as each file operation completes.

grpcurl Command

grpcurl -plaintext \
  -d '{
        "files": [
          {
            "fileId": "file-001",
            "fileName": "example.pdf",
            "content": "VGhpcyBpcyBhIHRlc3QgZmlsZSBjb250ZW50Lg==",
            "fileType": "pdf",
            "sizeBytes": 1024
          },
          {
            "fileId": "file-002",
            "fileName": "test.pdf",
            "content": "VGhpcyBpcyBhbiBvdGhlciBmaWxlLg==",
            "fileType": "pdf",
            "sizeBytes": 2048
          }
        ],
        "operations": ["VALIDATE","METADATA_EXTRACTION","FILE_COMPRESSION"]
      }' \
  localhost:9090 com.fileprocessing.FileProcessingService/StreamFileOperations

Metrics

  • Metrics tracked in FileProcessingMetrics
  • Active tasks and average task duration are updated in real-time

Limitations

  • Operations like OCR, format conversion, image resizing are currently mock implementations.
  • No persistence: file storage is only simulated.
  • No authentication, authorization, or multi-tenant support.
  • Single node only; clustering not yet implemented.
  • Streaming endpoints are placeholders and need completion.

Future Scope

  • Implement actual OCR, compression, format conversion, resizing, and storage.
  • Persist results and metadata to database or object storage (S3, MinIO).
  • Add security (TLS, JWT, or mTLS) for gRPC endpoints.
  • Distributed workflows using Kafka/RabbitMQ for massive file batches.
  • Expose REST gateway for non-gRPC clients.
  • Add advanced metrics and monitoring dashboards (Prometheus + Grafana).
  • Support for hot-swappable operations and dynamic configuration.

Diagrams

Workflow Execution (Unary)

Client ---> gRPC Server ---> ProcessFileService ---> WorkflowExecutorService ---> ThreadPoolManager
   |                                                          |
   |<------------------- FileProcessingSummary --------------|

Thread Pool / Concurrency

1. Workflow Execution (Unary RPC)

Client ---> gRPC Server ---> ProcessFileService ---> WorkflowExecutorService ---> ThreadPoolManager
   |                       (Batch Mode)                     | 
   |                                                        v
   |                                               +----------------+
   |                                               | FileTask 1     |
   |                                               | FileTask 2     |
   |                                               | ...            |
   |                                               +----------------+
   |                                                       |
   |<------------------ FileProcessingSummaryModel --------|
   |   - totalFiles                                     
   |   - successfulFiles                                
   |   - failedFiles                                     
   |   - results (per task)

Notes:

  • Each FileTask is submitted to ThreadPoolManager.
  • WorkflowExecutorService waits for all tasks to complete.
  • Task-level failures are recorded in FileProcessingMetrics.
  • Summary includes both successful and failed tasks.

2. Workflow Execution (Streaming RPC)

Client ---> gRPC Server ---> ProcessFileService ---> WorkflowExecutorService ---> ThreadPoolManager
   |                      (Streaming Mode)                  |
   |                                                        v
   |                                               +----------------+
   |                                               | FileTask 1     |
   |                                               | FileTask 2     |
   |                                               | ...            |
   |                                               +----------------+
   |                                                        |
   |                                                        v
   |<--- FileOperationResult (task 1) ----------------------|
   |                                                        |
   |<--- FileOperationResult (task 2) ----------------------|
   |                                                        |
   |<--- FileOperationResult (failed task) -----------------|
   |                                                        |
   |<--- ... (as tasks complete) ---------------------------|
   |                                                        |
   |<--- Completion signal (observer.onCompleted) ----------|

Notes:

  • Each task is executed concurrently and result is pushed immediately to the client via StreamObserver.
  • Failed tasks are delivered as FileOperationResult with status = FAILED.
  • Metrics (activeTasks, completedTasks, failedTasks, taskSuccessRatePercent) are updated per task.
  • Stream continues even if some tasks fail (failure isolation).

3. Thread Pool / Concurrency with Metrics Tracking

ThreadPoolManager
+------------------------+
| Task Queue (bounded)   |<-- Tasks submitted by WorkflowExecutorService
| Active Threads         |
| Dynamic Resizing       |
+------------------------+
            |
            v
+------------------------+
| FileTask.run()          |
| - Execute operation     |
| - Update result         |
| - Update metrics:       |
|   - completedTasks      |
|   - failedTasks         |
|   - taskDuration        |
+------------------------+
            |
            v
WorkflowExecutorService collects:
  - Success / Failure per task
  - Aggregates metrics per workflow

Development Notes

  • Use Java 17, Spring Boot, gRPC, and Maven.
  • Proto files are under src/main/proto/.
  • Generate gRPC stubs using mvn protobuf:compile.
  • All models are immutable (record) for thread-safety.
  • ThreadPoolManager dynamically scales cores and max threads based on queue load.

Unary RPC Usage Example

The unary RPC ProcessFile allows sending a batch of files in a single request and receiving a summarized result.

Prerequisites

  • Make sure the gRPC server is running (default port: 9090).
  • grpcurl installed. You can install it via:
brew install grpcurl   # macOS
sudo apt install grpcurl # Linux (Debian/Ubuntu)

Example Proto Request

Assume the proto message:

message FileProcessingRequest {
  repeated File files = 1;
  repeated OperationType operations = 2;
}

Example JSON payload for grpcurl:

{
  "files": [
    {
      "fileId": "file-001",
      "fileName": "example.pdf",
      "content": "VGhpcyBpcyBhIHRlc3QgZmlsZSBjb250ZW50Lg==",
      "fileType": "pdf",
      "sizeBytes": 1024
    }
  ],
  "operations": ["VALIDATE", "METADATA_EXTRACTION"]
}

grpcurl Command

grpcurl -plaintext \
  -d '{
        "files": [
          {
            "fileId": "file-001",
            "fileName": "example.pdf",
            "content": "VGhpcyBpcyBhIHRlc3QgZmlsZSBjb250ZW50Lg==",
            "fileType": "pdf",
            "sizeBytes": 1024
          }
        ],
        "operations": ["VALIDATE","METADATA_EXTRACTION"]
      }' \
  localhost:9090 com.fileprocessing.FileProcessingService/ProcessFile

Sample Response

{
  "totalFiles": 1,
  "successfulFiles": 2,
  "failedFiles": 0,
  "results": [
    {
      "fileId": "file-001",
      "operation": "VALIDATE",
      "status": "SUCCESS",
      "details": "File validated successfully",
      "startTime": "2025-09-28T20:00:00.000Z",
      "endTime": "2025-09-28T20:00:00.150Z",
      "resultLocation": ""
    },
    {
      "fileId": "file-001",
      "operation": "METADATA_EXTRACTION",
      "status": "SUCCESS",
      "details": "Metadata extracted",
      "startTime": "2025-09-28T20:00:00.150Z",
      "endTime": "2025-09-28T20:00:00.250Z",
      "resultLocation": ""
    }
  ]
}

Notes

  • All file contents must be Base64 encoded when sending JSON requests.
  • The unary RPC returns once all requested operations are completed.
  • Errors in processing will return an INTERNAL gRPC status with a descriptive message.

License

MIT License — free to use and extend.


To Do List

1. gRPC Services

  • Implement StreamFileOperations (server streaming)
  • Implement UploadFiles (client streaming)
  • Implement LiveFileProcessing (bidirectional streaming)
  • Add request validation for all RPCs (check file size, type, operation support)
  • Support gRPC error handling with meaningful status codes

2. File Operations / Business Logic

  • Implement actual OCR functionality for PDF and images
  • Implement image resizing logic for JPG, PNG, GIF
  • Implement compression logic (e.g., zip or image compression)
  • Implement format conversion (e.g., PNG → JPG, PDF → TXT)
  • Implement file storage logic (local FS or cloud like S3/MinIO)
  • Add metadata extraction logic (size, type, creation date, EXIF for images)
  • Add validation rules for file type, allowed operations, etc.

3. Concurrency / Workflow

  • Finalize WorkflowExecutorService integration for all gRPC endpoints
  • Add error handling for individual FileTasks without failing the whole workflow
  • Add timeout support for long-running tasks
  • Improve backpressure handling (queue thresholds, client notifications)

4. Metrics / Monitoring

  • Add per-operation metrics (e.g., processing duration per operation type)
  • Expose metrics via Prometheus or Spring Actuator (note: actuator endpoint conflict with gRPC)
  • Track success/failure rates per workflow

5. Spring / Configuration

  • Make all service properties configurable via application.properties (port, thread pool sizes, thresholds)
  • Enable hot reload for config changes without restarting server
  • Ensure beans are properly registered (ProcessFileService, StreamFileOperationsService, etc.)

6. Testing

  • Unit tests for all FileOperations
  • Integration tests for WorkflowExecutorService
  • gRPC end-to-end tests using grpc-java or grpcurl
  • Load testing for concurrent workflows
  • Failure scenario testing (invalid file, large file, network failure)

7. Documentation

  • Convert ASCII diagrams to PlantUML/Mermaid visuals for README
  • Document gRPC request/response examples
  • Document supported file types and operations
  • Add contributing guide if open-source

8. Future Enhancements / Optional

  • Add authentication/authorization for gRPC endpoints (JWT/mTLS)
  • Add persistent storage for workflow state
  • Add distributed workflow support (Kafka, RabbitMQ, or other queue)
  • Expose REST gateway via grpc-spring-boot-starter or Envoy proxy
  • Add real-time notifications for live file processing (WebSockets, gRPC streaming)

Milestones

[DELIVERED] Phase 1 — Core Unary File Processing (MVP)

Goal: Get the basic unary RPC (ProcessFile) fully functional with core operations.

Tasks:

  1. Implement all placeholder operations in ProcessFileService:
    • Validation
    • Metadata extraction
    • Compression
    • Format conversion
    • File storage
  2. Ensure ProcessFileService is properly registered as a Spring Bean.
  3. Complete the unary gRPC endpoint (processFile) in FileProcessingServiceImpl.
  4. Add basic unit tests for operations.
  5. Expose basic metrics (FileProcessingMetrics) for the unary flow.
  6. Update README with unary RPC usage examples (grpcurl commands).

Milestone Deliverable: ✅ Fully functional unary RPC processing files end-to-end with logs, metrics, and error handling.


[DELIVERED] Phase 2 — Concurrency & Workflow Management

Goal: Introduce WorkflowExecutorService for orchestrating tasks with thread pool management.

Tasks:

  1. Integrate WorkflowExecutorService with ProcessFileService for concurrent execution of tasks.
  2. Add ThreadPoolManager adaptive thread pool management.
  3. Track per-task metrics in FileProcessingMetrics.
  4. Implement error isolation so a failed task does not fail the entire workflow.
  5. Unit tests and integration tests for concurrency handling.

Milestone Deliverable: ✅ Concurrent file processing with metrics, adaptive threads, and task-level isolation.


[DELIVERED] Phase 3 — Streaming gRPC Endpoints

Goal: Implement non-unary RPCs for batch and real-time processing.

Tasks:

  1. Implement StreamFileOperations (server streaming).
  2. Implement UploadFiles (client streaming).
  3. Implement LiveFileProcessing (bidirectional streaming).
  4. Add proper backpressure handling for streaming (queue limits, slow consumers).
  5. Add tests for streaming scenarios.

Milestone Deliverable: ✅ Streaming endpoints functional with concurrency, backpressure, and error handling.


[DELIVERED] Phase 4 — Monitoring, Metrics, and Observability

Goal: Make the service observable and production-ready.

Tasks:

  1. Expose metrics via Spring Actuator or Prometheus.
  2. Track workflow success/failure rates, average task duration.
  3. Add logging with SLF4J/Logback and structured logging.
  4. Optional: gRPC reflection for debugging.

Milestone Deliverable: ✅ Production-grade observability with metrics and logs.


Phase 5 — Storage, Format, and OCR Enhancements

Goal: Implement full file-processing logic beyond placeholders.

Tasks:

  1. Implement OCR for PDFs and images.
  2. Implement image resizing and format conversion.
  3. Implement compression (zip, image optimization).
  4. Implement file storage (local, S3, MinIO, or pluggable storage).
  5. Metadata extraction (EXIF, file info, text content).

Milestone Deliverable: ✅ Fully-featured file-processing pipeline for supported file types.


Phase 6 — Configuration, Security, and Scaling

Goal: Make the service configurable, secure, and horizontally scalable.

Tasks:

  1. Make thread pool sizes, queue thresholds, and ports configurable via application.properties (hot-reloadable).
  2. Add authentication/authorization for gRPC (JWT/mTLS).
  3. Add persistent workflow storage (optional: Redis, DB, or queue for distributed processing).
  4. Add load testing and performance tuning.

Milestone Deliverable: ✅ Configurable, secure, and horizontally scalable file processing service.


Phase 7 — Optional Enhancements

Tasks:

  1. Add REST gateway for external clients.
  2. Add real-time notifications (WebSocket or gRPC streaming).
  3. Add distributed workflow orchestration (Kafka, RabbitMQ, etc.).
  4. Add more file types or operation plugins (extensible architecture).

Milestone Deliverable: ✅ Advanced production-ready features, extensible for future growth.


About

Java Spring Boot service that processes files via gRPC, featuring an adaptive thread-pool with backpressure, Micrometer/Prometheus metrics, and example Grafana dashboards to visualize pool behavior (base state, load spike, resize, cooldown). Includes Docker Compose for local telemetry and configuration-driven resizing.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages