Skip to content

feat(indexing): implemented a two-stage Kafka processing architecture with event aggregation#890

Merged
psmagin merged 11 commits intomasterfrom
MSEARCH-1157
Feb 3, 2026
Merged

feat(indexing): implemented a two-stage Kafka processing architecture with event aggregation#890
psmagin merged 11 commits intomasterfrom
MSEARCH-1157

Conversation

@psmagin
Copy link
Collaborator

@psmagin psmagin commented Jan 31, 2026

Purpose

Address ordering and deduplication issues in instance indexing by implementing a two-stage Kafka processing architecture. This PR introduces a re-emission pattern where instance-level events are published to Kafka with instanceId as the key, ensuring all changes for the same instance are ordered within a single partition. The consumer then aggregates these events before querying the database, reducing database load and preventing race conditions.

Problem Statement

Previously, when multiple updates affected the same instance (e.g., holding changes, item updates), each update triggered an immediate database query and OpenSearch indexing operation. This caused:

  • Ordering issues: Updates could be processed out of order
  • Database load: Multiple redundant queries for the same instance
  • Race conditions: Concurrent updates could overwrite each other
  • Inefficient indexing: Multiple bulk requests for related changes

Solution Approach

Implemented a two-stage Kafka processing architecture with event aggregation:

Stage 1: Event Re-emission

  • handleInstanceEvents receives resource events (instance, holding, item, bound-with)
  • Extracts instanceId from each event
  • Publishes new IndexInstanceEvent to Kafka with instanceId as key
  • Result: All events for the same instance go to the same Kafka partition, maintaining order

Stage 2: Event Aggregation & Indexing

  • handleIndexInstanceEvents consumes batched events
  • Groups consecutive events by instanceId within each batch
  • Collapses multiple updates for same instance into single work item
  • Performs one SQL aggregation per distinct instanceId (instance + holding + item join)
  • Sends aggregated documents in single bulk indexing request to OpenSearch

Architecture Changes

New Classes

  1. ProducerRecordBuilder

    • Responsibility: Build Kafka producer records with tenant header management
    • Benefit: Centralized header manipulation logic
  2. InstanceEventMapper

    • Responsibility: Map consumer records to producer records with consortium tenant resolution
    • Benefit: Encapsulates event mapping and instanceId extraction logic

Refactored Classes

  1. KafkaMessageListener

    • Simplified handleInstanceEvents from 30+ lines to 3 lines
    • Clear separation: re-emission vs. indexing logic
  2. ResourceService

    • Consolidated duplicate methods (indexInstancesById, indexInstancesByIdNew)
    • Single indexInstanceEvents method for cleaner API
  3. InstanceFetchService

    • Removed duplicate fetching methods
    • Shared fetchInstancesFromRepository

Changes Checklist

  • API Changes: Internal API changed from ResourceEvent to IndexInstanceEvent for instance indexing flow
  • Database Schema Changes: N/A
  • Interface Version Changes: N/A
  • Interface Dependencies: N/A
  • Permissions: N/A
  • Logging: Appropriate logging maintained in all refactored methods
  • Unit Testing: All new classes covered with comprehensive unit tests (18 new tests)
  • Integration Testing: Updated integration tests to verify end-to-end flow
  • Manual Testing: Build successful, all 52+ tests passing
  • NEWS: Updated with feature description

Related Issues

MSEARCH-1157

Technical Details

Event Flow

Inventory Update (holding/item/instance)
    ↓
handleInstanceEvents (Stage 1)
    ↓
Extract instanceId + Determine target tenant
    ↓
Publish IndexInstanceEvent to Kafka (key: instanceId)
    ↓
Kafka Partition (all events for same instanceId in order)
    ↓
handleIndexInstanceEvents (Stage 2)
    ↓
Group by instanceId + Aggregate consecutive events
    ↓
Fetch instance data (one SQL query per instanceId)
    ↓
Bulk index to OpenSearch (single request per batch)

Kafka Message Format

  • Topic: {tenant}.search.index.instance
  • Key: instanceId (ensures partitioning)
  • Value: IndexInstanceEvent(tenant, instanceId)
  • Headers: Tenant information, Okapi headers

@psmagin psmagin requested a review from a team as a code owner January 31, 2026 15:53
@psmagin psmagin marked this pull request as draft January 31, 2026 16:57
@psmagin psmagin changed the title MSEARCH-1157: Refactor instance indexing to follow SOLID and DRY principles feat(indexing): implemented a two-stage Kafka processing architecture with event aggregation Feb 2, 2026
@psmagin psmagin marked this pull request as ready for review February 2, 2026 18:15
@psmagin psmagin requested a review from vgema February 2, 2026 18:16
@psmagin psmagin self-assigned this Feb 3, 2026
@sonarqubecloud
Copy link

sonarqubecloud bot commented Feb 3, 2026

@psmagin psmagin merged commit b3322f3 into master Feb 3, 2026
16 checks passed
@psmagin psmagin deleted the MSEARCH-1157 branch February 3, 2026 11:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants