Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* Remove identifiers from keyword alias for instance and authority mappings ([MSEARCH-1118](https://folio-org.atlassian.net/browse/MSEARCH-1118))
* Change `all` query builder to use full term for multimatch search ([MSEARCH-1112](https://folio-org.atlassian.net/browse/MSEARCH-1112))
* **Authority Search**
* Implement two-stage Kafka processing with event aggregation for instance indexing ([MSEARCH-1157](https://folio-org.atlassian.net/browse/MSEARCH-1157))
* Separate LCCN and Canceled LCCN identifiers search to lccn and canceledLccn options ([MSEARCH-1066](https://folio-org.atlassian.net/browse/MSEARCH-1066))
* **Classification Browse**
* Add title and contributors to classification browse response ([MSEARCH-1045](https://folio-org.atlassian.net/browse/MSEARCH-1045))
Expand All @@ -33,6 +34,7 @@
* Add indexes for instance_(call_number/subject/classification/contributor) ([MSEARCH-1025](https://folio-org.atlassian.net/browse/MSEARCH-1025))
* Omit sub-resource if main value is blank ([MSEARCH-1084](https://folio-org.atlassian.net/browse/MSEARCH-1084))
* Remove excessive escaping of backslash character in sub-resources ([MSEARCH-1094](https://folio-org.atlassian.net/browse/MSEARCH-1094))
* Implement two-stage Kafka processing with event aggregation for instance indexing ([MSEARCH-1157](https://folio-org.atlassian.net/browse/MSEARCH-1157))
* **Instance Search**
* Add support for searching by instance/holdings/item electronic access relationship ID ([MSEARCH-816](https://folio-org.atlassian.net/browse/MSEARCH-816))
* Normalize ISSN search ([MSEARCH-658](https://folio-org.atlassian.net/browse/MSEARCH-658))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.model.event.IndexInstanceEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.CompositeBatchInterceptor;
import org.springframework.kafka.support.serializer.JsonDeserializer;
Expand Down Expand Up @@ -41,4 +43,22 @@ public ConcurrentKafkaListenerContainerFactory<String, ResourceEvent> instanceRe
factory.setBatchInterceptor(new CompositeBatchInterceptor<>(batchInterceptors));
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, IndexInstanceEvent> indexInstanceListenerContainerFactory(
@Value("#{folioKafkaProperties.listener['index-instance'].maxPollRecords}") Integer maxPollRecords,
@Value("#{folioKafkaProperties.listener['index-instance'].maxPollIntervalMs}") Integer maxPollIntervalMs) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, IndexInstanceEvent>();
factory.setBatchListener(true);
var deserializer = new JsonDeserializer<>(IndexInstanceEvent.class, false);
var overrideProperties = Map.<String, Object>of(MAX_POLL_RECORDS_CONFIG, maxPollRecords,
MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
factory.setConsumerFactory(getConsumerFactory(deserializer, kafkaProperties, overrideProperties));
return factory;
}

@Bean
public KafkaTemplate<String, IndexInstanceEvent> indexInstanceKafkaTemplate() {
return new KafkaTemplate<>(getProducerFactory(kafkaProperties));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ protected static <T> DefaultKafkaConsumerFactory<String, T> getConsumerFactory(J

public enum SearchTopic implements FolioKafkaTopic {
REINDEX_RANGE_INDEX("search.reindex.range-index"),
INDEX_SUB_RESOURCE("search.index.sub-resource");
INDEX_SUB_RESOURCE("search.index.sub-resource"),
INDEX_INSTANCE("search.index.instance");

private final String topicName;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package org.folio.search.integration.message;

import static org.folio.search.configuration.kafka.KafkaConfiguration.SearchTopic.INDEX_INSTANCE;
import static org.folio.search.utils.SearchConverterUtils.getEventPayload;
import static org.folio.search.utils.SearchConverterUtils.getNewAsMap;
import static org.folio.search.utils.SearchConverterUtils.getOldAsMap;
import static org.folio.search.utils.SearchUtils.ID_FIELD;
import static org.folio.search.utils.SearchUtils.INSTANCE_ID_FIELD;

import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.commons.collections4.MapUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.model.event.IndexInstanceEvent;
import org.folio.search.model.types.ResourceType;
import org.folio.search.service.consortium.ConsortiumTenantService;
import org.springframework.stereotype.Component;

/**
* Maps instance resource events to index instance events with producer records.
*/
@Component
@RequiredArgsConstructor
public class InstanceEventMapper {

private final ConsortiumTenantService consortiumTenantService;

/**
* Maps a consumer record to a producer record for indexing.
*
* @param event the consumer record containing resource event
* @return producer record ready to be sent to Kafka
*/
public List<ProducerRecord<String, IndexInstanceEvent>> mapToProducerRecords(
ConsumerRecord<String, ResourceEvent> event) {
var resourceEvent = event.value();
var eventTenant = resourceEvent.getTenant();
var targetTenant = consortiumTenantService.getCentralTenant(eventTenant).orElse(eventTenant);
if (isInstanceResource(resourceEvent)) {
var instanceId = MapUtils.getString(getEventPayload(resourceEvent), ID_FIELD);
return List.of(toProducerRecord(instanceId, targetTenant, event.headers()));
} else {
var oldInstanceId = getInstanceId(getOldAsMap(resourceEvent));
var newInstanceId = getInstanceId(getNewAsMap(resourceEvent));
if (oldInstanceId != null && !oldInstanceId.equals(newInstanceId)) {
return List.of(toProducerRecord(oldInstanceId, targetTenant, event.headers()),
toProducerRecord(oldInstanceId, targetTenant, event.headers()));
}
return List.of(toProducerRecord(newInstanceId, targetTenant, event.headers()));
}
}

private String getInstanceId(Map<String, Object> oldMap) {
return MapUtils.getString(oldMap, INSTANCE_ID_FIELD);
}

private boolean isInstanceResource(ResourceEvent resourceEvent) {
return ResourceType.byName(resourceEvent.getResourceName()).equals(ResourceType.INSTANCE);
}

private ProducerRecord<String, IndexInstanceEvent> toProducerRecord(String instanceId, String targetTenant,
Headers headers) {
var topic = getFullTopicName(targetTenant);
var value = new IndexInstanceEvent(targetTenant, instanceId);

return new ProducerRecordBuilder<>(topic, instanceId, value, headers).withUpdatedTenantHeaders(targetTenant);
}

private String getFullTopicName(String targetTenant) {
return INDEX_INSTANCE.fullTopicName(targetTenant);
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
package org.folio.search.integration.message;

import static org.apache.commons.collections4.MapUtils.getString;
import static org.apache.commons.lang3.RegExUtils.replaceAll;
import static org.folio.search.configuration.RetryTemplateConfiguration.KAFKA_RETRY_TEMPLATE_NAME;
import static org.folio.search.configuration.SearchCacheNames.REFERENCE_DATA_CACHE;
import static org.folio.search.domain.dto.ResourceEventType.CREATE;
import static org.folio.search.domain.dto.ResourceEventType.DELETE;
import static org.folio.search.domain.dto.ResourceEventType.REINDEX;
import static org.folio.search.utils.SearchConverterUtils.getEventPayload;
import static org.folio.search.utils.SearchConverterUtils.getResourceEventId;
import static org.folio.search.utils.SearchConverterUtils.getResourceSource;
import static org.folio.search.utils.SearchUtils.ID_FIELD;
import static org.folio.search.utils.SearchUtils.INSTANCE_ID_FIELD;
import static org.folio.search.utils.SearchUtils.SOURCE_CONSORTIUM_PREFIX;

import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -25,6 +17,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.logging.log4j.message.FormattedMessage;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.model.event.IndexInstanceEvent;
import org.folio.search.model.types.ResourceType;
import org.folio.search.service.ResourceService;
import org.folio.search.service.config.ConfigSynchronizationService;
Expand All @@ -33,6 +26,7 @@
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
Expand All @@ -47,6 +41,8 @@ public class KafkaMessageListener {
private final FolioMessageBatchProcessor folioMessageBatchProcessor;
private final SystemUserScopedExecutionService executionService;
private final ConfigSynchronizationService configSynchronizationService;
private final KafkaTemplate<String, IndexInstanceEvent> instanceEventProducer;
private final InstanceEventMapper instanceEventMapper;

/**
* Handles instance events and indexes them by id.
Expand All @@ -61,11 +57,34 @@ public class KafkaMessageListener {
concurrency = "#{folioKafkaProperties.listener['events'].concurrency}")
public void handleInstanceEvents(List<ConsumerRecord<String, ResourceEvent>> consumerRecords) {
log.info("Processing instance related events from kafka events [number of events: {}]", consumerRecords.size());
var batch = getInstanceResourceEvents(consumerRecords);
var batchByTenant = batch.stream().collect(Collectors.groupingBy(ResourceEvent::getTenant));
consumerRecords.stream().collect(Collectors.groupingBy(consumerRecord -> consumerRecord.value().getTenant()))
.forEach((tenant, records) -> executionService.executeSystemUserScoped(tenant, () -> {
records.stream()
.map(instanceEventMapper::mapToProducerRecords)
.flatMap(List::stream)
.forEach(instanceEventProducer::send);
return null;
}));
}

/**
* Handles instance events and indexes them by id.
*
* @param consumerRecords - list of consumer records from Apache Kafka to process.
*/
@KafkaListener(
id = KafkaConstants.INDEX_INSTANCE_LISTENER_ID,
containerFactory = "indexInstanceListenerContainerFactory",
topicPattern = "#{folioKafkaProperties.listener['index-instance'].topicPattern}",
groupId = "#{folioKafkaProperties.listener['index-instance'].groupId}",
concurrency = "#{folioKafkaProperties.listener['index-instance'].concurrency}")
public void handleIndexInstanceEvents(List<ConsumerRecord<String, IndexInstanceEvent>> consumerRecords) {
log.info("Processing index instance events from kafka [number of events: {}]", consumerRecords.size());
var batchByTenant = consumerRecords.stream().map(ConsumerRecord::value)
.collect(Collectors.groupingBy(IndexInstanceEvent::tenant));
batchByTenant.forEach((tenant, resourceEvents) -> executionService.executeSystemUserScoped(tenant, () -> {
folioMessageBatchProcessor.consumeBatchWithFallback(resourceEvents, KAFKA_RETRY_TEMPLATE_NAME,
resourceService::indexInstancesById, KafkaMessageListener::logFailedEvent);
resourceService::indexInstanceEvents, KafkaMessageListener::logFailedEvent);
return null;
}));
}
Expand Down Expand Up @@ -163,38 +182,6 @@ private void indexResources(List<ResourceEvent> batch, Consumer<List<ResourceEve
}));
}

private static List<ResourceEvent> getInstanceResourceEvents(List<ConsumerRecord<String, ResourceEvent>> events) {
return events.stream()
.map(KafkaMessageListener::getInstanceResourceEvent)
.filter(Objects::nonNull)
.distinct()
.toList();
}

private static ResourceEvent getInstanceResourceEvent(ConsumerRecord<String, ResourceEvent> consumerRecord) {
var instanceId = getInstanceId(consumerRecord);
var value = consumerRecord.value();
if (instanceId == null) {
log.warn("Failed to find instance id in record [record: {}]", replaceAll(value.toString(), "\\s+", " "));
return null;
}
var operation = isInstanceResource(consumerRecord) ? value.getType() : CREATE;
return value.id(instanceId).type(operation);
}

private static String getInstanceId(ConsumerRecord<String, ResourceEvent> event) {
var body = event.value();
if (body.getType() == REINDEX) {
return event.key();
}
var eventPayload = getEventPayload(body);
return isInstanceResource(event) ? getString(eventPayload, ID_FIELD) : getString(eventPayload, INSTANCE_ID_FIELD);
}

private static boolean isInstanceResource(ConsumerRecord<String, ResourceEvent> consumerRecord) {
return consumerRecord.topic().endsWith("inventory.instance");
}

private static void logFailedEvent(ResourceEvent event, Exception e) {
if (event == null) {
log.warn("Failed to index resource event [event: null]", e);
Expand All @@ -208,4 +195,16 @@ private static void logFailedEvent(ResourceEvent event, Exception e) {
resourceName, eventType, event.getTenant(), event.getId()
), e);
}

private static void logFailedEvent(IndexInstanceEvent event, Exception e) {
if (event == null) {
log.warn("Failed to index resource event [event: null]", e);
return;
}

log.warn(new FormattedMessage(
"Failed to index instance event [tenantId: {}, id: {}]",
event.tenant(), event.instanceId()
), e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.folio.search.integration.message;

import static org.folio.spring.tools.kafka.FolioKafkaProperties.TENANT_ID;

import java.nio.charset.StandardCharsets;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.folio.spring.integration.XOkapiHeaders;

/**
* Builder for creating Kafka producer records with proper header management.
*/
public class ProducerRecordBuilder<K, V> {

private final String topic;
private final K key;
private final V value;
private final Headers headers;

public ProducerRecordBuilder(String topic, K key, V value, Headers headers) {
this.topic = topic;
this.key = key;
this.value = value;
this.headers = headers;
}

/**
* Updates tenant-related headers and builds the producer record.
*
* @param targetTenant the target tenant ID
* @return a new ProducerRecord with updated headers
*/
public ProducerRecord<K, V> withUpdatedTenantHeaders(String targetTenant) {
var targetTenantBytes = targetTenant.getBytes(StandardCharsets.UTF_8);

var producerRecord = new ProducerRecord<>(topic, key, value);
copyHeaders(headers, producerRecord.headers(), targetTenantBytes);

return producerRecord;
}

private void copyHeaders(Headers source, Headers destination, byte[] targetTenantBytes) {
source.forEach(header -> {
var headerKey = header.key();
if (isTenantHeader(headerKey)) {
destination.add(headerKey, targetTenantBytes);
} else {
destination.add(headerKey, header.value());
}
});
}

private boolean isTenantHeader(String key) {
return TENANT_ID.equals(key) || XOkapiHeaders.TENANT.equals(key);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package org.folio.search.model.event;

public record IndexInstanceEvent(String tenant, String instanceId) { }
Loading
Loading