From 1495803e5fbc5fa3ee6576434e3987417d98596b Mon Sep 17 00:00:00 2001 From: nishchay21 Date: Sun, 29 Jun 2025 10:59:38 +0530 Subject: [PATCH 1/7] Added shard operations collector and optimised node stats collector Signed-off-by: nishchay21 --- .../PerformanceAnalyzerPlugin.java | 4 + .../NodeStatsAllShardsMetricsCollector.java | 23 +-- ...RTFNodeStatsAllShardsMetricsCollector.java | 71 ++++--- .../telemetry/RTFShardOperationCollector.java | 176 ++++++++++++++++++ .../PerformanceAnalyzerClusterSettings.java | 2 +- .../RTFPerformanceAnalyzerSearchListener.java | 70 ++++--- ...TFPerformanceAnalyzerTransportChannel.java | 64 +++++-- ...rmanceAnalyzerTransportRequestHandler.java | 40 +++- .../performanceanalyzer/util/Utils.java | 8 + .../RTFShardOperationCollectorTests.java | 122 ++++++++++++ ...erformanceAnalyzerSearchListenerTests.java | 21 ++- ...formanceAnalyzerTransportChannelTests.java | 59 +++++- 12 files changed, 556 insertions(+), 104 deletions(-) create mode 100644 src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationCollector.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationCollectorTests.java diff --git a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java index 705c6d1d..d877184f 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java +++ b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java @@ -58,6 +58,7 @@ import org.opensearch.performanceanalyzer.collectors.telemetry.RTFDisksCollector; import org.opensearch.performanceanalyzer.collectors.telemetry.RTFHeapMetricsCollector; import org.opensearch.performanceanalyzer.collectors.telemetry.RTFNodeStatsAllShardsMetricsCollector; +import org.opensearch.performanceanalyzer.collectors.telemetry.RTFShardOperationCollector; import org.opensearch.performanceanalyzer.collectors.telemetry.RTFThreadPoolMetricsCollector; import org.opensearch.performanceanalyzer.commons.OSMetricsGeneratorFactory; import org.opensearch.performanceanalyzer.commons.collectors.DisksCollector; @@ -239,6 +240,9 @@ private void scheduleTelemetryCollectors() { scheduledMetricCollectorsExecutor.addScheduledMetricCollector( new RTFCacheConfigMetricsCollector( performanceAnalyzerController, configOverridesWrapper)); + scheduledMetricCollectorsExecutor.addScheduledMetricCollector( + new RTFShardOperationCollector( + performanceAnalyzerController, configOverridesWrapper)); } private void scheduleRcaCollectors() { diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/NodeStatsAllShardsMetricsCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/NodeStatsAllShardsMetricsCollector.java index 72c10220..c0153b56 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/collectors/NodeStatsAllShardsMetricsCollector.java +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/NodeStatsAllShardsMetricsCollector.java @@ -13,6 +13,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableMap; import java.lang.reflect.Field; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.logging.log4j.LogManager; @@ -57,8 +58,6 @@ public class NodeStatsAllShardsMetricsCollector extends PerformanceAnalyzerMetri private static final int KEYS_PATH_LENGTH = 2; private static final Logger LOG = LogManager.getLogger(NodeStatsAllShardsMetricsCollector.class); - private HashMap currentShards; - private HashMap currentPerShardStats; private HashMap prevPerShardStats; private final PerformanceAnalyzerController controller; @@ -68,21 +67,10 @@ public NodeStatsAllShardsMetricsCollector(final PerformanceAnalyzerController co "NodeStatsMetrics", NODE_STATS_ALL_SHARDS_METRICS_COLLECTOR_EXECUTION_TIME, NODESTATS_COLLECTION_ERROR); - currentShards = new HashMap<>(); prevPerShardStats = new HashMap<>(); - currentPerShardStats = new HashMap<>(); this.controller = controller; } - private void populateCurrentShards() { - if (!currentShards.isEmpty()) { - prevPerShardStats.putAll(currentPerShardStats); - currentPerShardStats.clear(); - } - currentShards.clear(); - currentShards = Utils.getShards(); - } - private static final Map maps = new HashMap() { { @@ -152,8 +140,8 @@ public void collectMetrics(long startTime) { if (indicesService == null) { return; } - populateCurrentShards(); - populatePerShardStats(indicesService); + + Map currentPerShardStats = populatePerShardStats(indicesService); for (HashMap.Entry currentShard : currentPerShardStats.entrySet()) { ShardId shardId = (ShardId) currentShard.getKey(); @@ -188,8 +176,10 @@ Field getNodeIndicesStatsByShardField() throws Exception { return field; } - public void populatePerShardStats(IndicesService indicesService) { + public Map populatePerShardStats(IndicesService indicesService) { // Populate the shard stats per shard. + HashMap currentShards = Utils.getShards(); + Map currentPerShardStats = new HashMap<>(Collections.emptyMap()); for (HashMap.Entry currentShard : currentShards.entrySet()) { IndexShard currentIndexShard = (IndexShard) currentShard.getValue(); IndexShardStats currentIndexShardStats = @@ -204,6 +194,7 @@ public void populatePerShardStats(IndicesService indicesService) { currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats); } } + return currentPerShardStats; } public void populateMetricValue( diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java index 9bd21757..54fddf49 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java @@ -11,6 +11,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableMap; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.logging.log4j.LogManager; @@ -42,8 +43,6 @@ public class RTFNodeStatsAllShardsMetricsCollector extends PerformanceAnalyzerMe .samplingInterval; private static final Logger LOG = LogManager.getLogger(RTFNodeStatsAllShardsMetricsCollector.class); - private Map currentShards; - private Map currentPerShardStats; private Map prevPerShardStats; private MetricsRegistry metricsRegistry; private Counter cacheQueryHitMetrics; @@ -67,23 +66,12 @@ public RTFNodeStatsAllShardsMetricsCollector( "RTFNodeStatsMetricsCollector", RTF_NODE_STATS_ALL_SHARDS_METRICS_COLLECTOR_EXECUTION_TIME, RTF_NODESTATS_COLLECTION_ERROR); - currentShards = new HashMap<>(); prevPerShardStats = new HashMap<>(); - currentPerShardStats = new HashMap<>(); this.metricsInitialised = false; this.performanceAnalyzerController = performanceAnalyzerController; this.configOverridesWrapper = configOverridesWrapper; } - private void populateCurrentShards() { - if (!currentShards.isEmpty()) { - prevPerShardStats.putAll(currentPerShardStats); - currentPerShardStats.clear(); - } - currentShards.clear(); - currentShards = Utils.getShards(); - } - private static final ImmutableMap valueCalculators = ImmutableMap.of( RTFMetrics.ShardStatsValue.INDEXING_THROTTLE_TIME.toString(), @@ -133,38 +121,40 @@ configOverridesWrapper, getCollectorName())) { LOG.debug("Executing collect metrics for RTFNodeStatsAllShardsMetricsCollector"); initialiseMetricsIfNeeded(); - populateCurrentShards(); - populatePerShardStats(indicesService); + Map currentPerShardStats = populatePerShardStats(indicesService); - for (Map.Entry currentShard : currentPerShardStats.entrySet()) { - ShardId shardId = (ShardId) currentShard.getKey(); - ShardStats currentShardStats = (ShardStats) currentShard.getValue(); - if (prevPerShardStats.size() == 0) { + for (Map.Entry currentShard : currentPerShardStats.entrySet()) { + ShardId shardId = currentShard.getKey(); + ShardStats currentShardStats = currentShard.getValue(); + if (prevPerShardStats.isEmpty()) { // Populating value for the first run. recordMetrics( new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats), shardId); continue; } - ShardStats prevShardStats = prevPerShardStats.get(shardId); - if (prevShardStats == null) { - // Populate value for shards which are new and were not present in the previous - // run. - recordMetrics( - new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats), - shardId); - continue; + if (prevPerShardStats.containsKey(shardId)) { + ShardStats prevShardStats = prevPerShardStats.get(shardId); + if (prevShardStats == null) { + // Populate value for shards which are new and were not present in the previous + // run. + recordMetrics( + new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats), + shardId); + continue; + } + NodeStatsMetricsAllShardsPerCollectionStatus prevValue = + new NodeStatsMetricsAllShardsPerCollectionStatus(prevShardStats); + NodeStatsMetricsAllShardsPerCollectionStatus currValue = + new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats); + populateDiffMetricValue(prevValue, currValue, shardId); } - NodeStatsMetricsAllShardsPerCollectionStatus prevValue = - new NodeStatsMetricsAllShardsPerCollectionStatus(prevShardStats); - NodeStatsMetricsAllShardsPerCollectionStatus currValue = - new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats); - populateDiffMetricValue(prevValue, currValue, shardId); } + prevPerShardStats = currentPerShardStats; } private void initialiseMetricsIfNeeded() { - if (metricsInitialised == false) { + if (!metricsInitialised) { cacheQueryHitMetrics = metricsRegistry.createCounter( RTFMetrics.ShardStatsValue.Constants.QUEY_CACHE_HIT_COUNT_VALUE, @@ -222,10 +212,12 @@ private void initialiseMetricsIfNeeded() { } } - public void populatePerShardStats(IndicesService indicesService) { + public Map populatePerShardStats(IndicesService indicesService) { // Populate the shard stats per shard. - for (Map.Entry currentShard : currentShards.entrySet()) { - IndexShard currentIndexShard = (IndexShard) currentShard.getValue(); + Map currentShards = Utils.getShards(); + Map currentPerShardStats = new HashMap<>(Collections.emptyMap()); + for (Map.Entry currentShard : currentShards.entrySet()) { + IndexShard currentIndexShard = currentShard.getValue(); IndexShardStats currentIndexShardStats = Utils.indexShardStats( indicesService, @@ -234,10 +226,13 @@ public void populatePerShardStats(IndicesService indicesService) { CommonStatsFlags.Flag.QueryCache, CommonStatsFlags.Flag.FieldData, CommonStatsFlags.Flag.RequestCache)); - for (ShardStats shardStats : currentIndexShardStats.getShards()) { - currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats); + if (currentIndexShardStats != null) { + for (ShardStats shardStats : currentIndexShardStats.getShards()) { + currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats); + } } } + return currentPerShardStats; } private void recordMetrics( diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationCollector.java new file mode 100644 index 00000000..b02a53dc --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationCollector.java @@ -0,0 +1,176 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors.telemetry; + +import java.util.HashMap; +import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.collectors.PerformanceAnalyzerMetricsCollector; +import org.opensearch.performanceanalyzer.commons.collectors.TelemetryCollector; +import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics.MetricUnits; +import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode; +import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.performanceanalyzer.util.Utils; +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +/** + * This collector measures indexing and search rate per shard. The metric measurement is difference + * between current and last window's operation. For example - if the last window had operation count + * as 10, and now it changed to 12, then collector will publish 2 ops/interval. + */ +public class RTFShardOperationCollector extends PerformanceAnalyzerMetricsCollector + implements TelemetryCollector { + + private static final Logger LOG = LogManager.getLogger(RTFShardOperationCollector.class); + public static final int SAMPLING_TIME_INTERVAL = + MetricsConfiguration.CONFIG_MAP.get(RTFShardOperationCollector.class).samplingInterval; + + private Counter indexingRateCounter; + private Counter searchRateCounter; + + private Map previousIndexOps; + private final long lastCollectionTimeInMillis; + + private MetricsRegistry metricsRegistry; + private boolean metricsInitialized; + private final PerformanceAnalyzerController controller; + private final ConfigOverridesWrapper configOverridesWrapper; + + public RTFShardOperationCollector( + PerformanceAnalyzerController controller, + ConfigOverridesWrapper configOverridesWrapper) { + super( + SAMPLING_TIME_INTERVAL, + "RTFShardOperationCollector", + StatMetrics.RTF_SHARD_OPERATION_COLLECTOR_EXECUTION_TIME, + StatExceptionCode.RTF_SHARD_OPERATION_COLLECTOR_ERROR); + + this.controller = controller; + this.configOverridesWrapper = configOverridesWrapper; + this.metricsInitialized = false; + this.previousIndexOps = new HashMap<>(); + this.lastCollectionTimeInMillis = System.currentTimeMillis(); + } + + @Override + public void collectMetrics(long startTime) { + if (controller.isCollectorDisabled(configOverridesWrapper, getCollectorName())) { + LOG.info("RTFShardOperationCollector is disabled. Skipping collection."); + return; + } + + metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry == null) { + LOG.error("Could not get the instance of MetricsRegistry class"); + return; + } + + initializeMetricsIfNeeded(); + LOG.debug("Executing collect metrics for RTFShardOperationCollector"); + + // Get all shards + Map currentShards = Utils.getShards(); + Map currentIndexOpsMap = new HashMap<>(); + + for (Map.Entry entry : currentShards.entrySet()) { + ShardId shardId = entry.getKey(); + IndexShard shard = entry.getValue(); + + try { + long currentIndexingOps = shard.indexingStats().getTotal().getIndexCount(); + long currentSearchOps = shard.searchStats().getTotal().getQueryCount(); + + if (previousIndexOps.containsKey(shardId)) { + long prevIndexingOps = previousIndexOps.get(shardId).indexOps(); + long prevSearchOps = previousIndexOps.get(shardId).searchOps(); + processOperations( + prevIndexingOps, + prevSearchOps, + currentIndexingOps, + currentSearchOps, + shardId); + } else { + processOperations(0, 0, currentIndexingOps, currentSearchOps, shardId); + } + currentIndexOpsMap.put( + shardId, new ShardOperation(currentIndexingOps, currentSearchOps)); + } catch (Exception e) { + LOG.error( + "Error collecting indexing/search rate metrics for shard {}: {}", + shardId, + e.getMessage()); + } + } + + // Update previous values for next collection + this.previousIndexOps = currentIndexOpsMap; + } + + private void processOperations( + long prevIndexingOps, + long prevSearchOps, + long currentIndexingOps, + long currentSearchOps, + ShardId shardId) { + long indexingOpsDiff = Math.max(0, currentIndexingOps - prevIndexingOps); + long searchOpsDiff = Math.max(0, currentSearchOps - prevSearchOps); + + if (indexingOpsDiff > 0) { + Tags tags = createTags(shardId); + indexingRateCounter.add(indexingOpsDiff, tags); + } + + if (searchOpsDiff > 0) { + Tags tags = createTags(shardId); + searchRateCounter.add(searchOpsDiff, tags); + } + } + + // attributes= {index_name="test", shard_id="0"} + private Tags createTags(ShardId shardId) { + return Tags.create() + .addTag(RTFMetrics.CommonDimension.INDEX_NAME.toString(), shardId.getIndexName()) + .addTag( + RTFMetrics.CommonDimension.SHARD_ID.toString(), + String.valueOf(shardId.getId())); + } + + private void initializeMetricsIfNeeded() { + if (!metricsInitialized) { + indexingRateCounter = + metricsRegistry.createCounter( + RTFMetrics.ShardOperationsValue.Constants.SHARD_INDEXING_RATE, + "Indexing operations per shard", + MetricUnits.RATE.toString()); + + searchRateCounter = + metricsRegistry.createCounter( + RTFMetrics.ShardOperationsValue.Constants.SHARD_SEARCH_RATE, + "Search operations per shard", + MetricUnits.RATE.toString()); + + metricsInitialized = true; + } + } + + /** + * Stores the index and search operations for a shard. + * + * @param indexOps count of index operations. + * @param searchOps count of search operations + */ + public record ShardOperation(long indexOps, long searchOps) {} +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/config/setting/PerformanceAnalyzerClusterSettings.java b/src/main/java/org/opensearch/performanceanalyzer/config/setting/PerformanceAnalyzerClusterSettings.java index e1775128..c2dabd5e 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/config/setting/PerformanceAnalyzerClusterSettings.java +++ b/src/main/java/org/opensearch/performanceanalyzer/config/setting/PerformanceAnalyzerClusterSettings.java @@ -46,7 +46,7 @@ public enum PerformanceAnalyzerFeatureBits { public static final Setting PA_COLLECTORS_SETTING = Setting.intSetting( "cluster.metadata.perf_analyzer.collectors.mode", - 0, + 1, Setting.Property.NodeScope, Setting.Property.Dynamic); diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java index 6b7921cc..b5fd2776 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -6,6 +6,7 @@ package org.opensearch.performanceanalyzer.listener; import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR; +import static org.opensearch.performanceanalyzer.util.Utils.computeShareFactor; import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; @@ -17,6 +18,7 @@ import org.opensearch.performanceanalyzer.OpenSearchResources; import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics.ShardOperationsValue; import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.performanceanalyzer.util.Utils; @@ -47,6 +49,7 @@ public class RTFPerformanceAnalyzerSearchListener private final PerformanceAnalyzerController controller; private final Histogram cpuUtilizationHistogram; private final Histogram heapUsedHistogram; + private final Histogram searchLatencyHistogram; private final int numProcessors; public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController controller) { @@ -55,7 +58,9 @@ public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController createCPUUtilizationHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); this.heapUsedHistogram = createHeapUsedHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); - this.threadLocal = ThreadLocal.withInitial(() -> new HashMap()); + this.searchLatencyHistogram = + createSearchLatencyHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); + this.threadLocal = ThreadLocal.withInitial(HashMap::new); this.numProcessors = Runtime.getRuntime().availableProcessors(); } @@ -83,6 +88,20 @@ private Histogram createHeapUsedHistogram(MetricsRegistry metricsRegistry) { } } + // This histogram will help to get the total latency for search request using getMax over an + // interval. + private Histogram createSearchLatencyHistogram(MetricsRegistry metricsRegistry) { + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + ShardOperationsValue.SHARD_SEARCH_LATENCY.toString(), + "Search latency per shard per phase", + RTFMetrics.MetricUnits.MILLISECOND.toString()); + } else { + LOG.debug("MetricsRegistry is null"); + return null; + } + } + @Override public String toString() { return RTFPerformanceAnalyzerSearchListener.class.getSimpleName(); @@ -171,6 +190,11 @@ public void preQueryPhase(SearchContext searchContext) { public void queryPhase(SearchContext searchContext, long tookInNanos) { long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); long queryTime = (System.nanoTime() - queryStartTime); + double queryTimeInMills = queryTime / 1_000_000.0; + + searchLatencyHistogram.record( + queryTimeInMills, createTags(searchContext, SHARD_QUERY_PHASE, false)); + addResourceTrackingCompletionListener( searchContext, queryStartTime, queryTime, SHARD_QUERY_PHASE, false); } @@ -192,6 +216,11 @@ public void preFetchPhase(SearchContext searchContext) { public void fetchPhase(SearchContext searchContext, long tookInNanos) { long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime()); long fetchTime = (System.nanoTime() - fetchStartTime); + double fetchTimeInMills = fetchTime / 1_000_000.0; + + searchLatencyHistogram.record( + fetchTimeInMills, createTags(searchContext, SHARD_FETCH_PHASE, false)); + addResourceTrackingCompletionListenerForFetchPhase( searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, false); } @@ -262,32 +291,21 @@ protected void innerOnResponse(Task task) { * overall start time. */ long totalTime = System.nanoTime() - startTime; + double totalTimeInMills = totalTime / 1_000_000.0; double shareFactor = computeShareFactor(phaseTookTime, totalTime); + + searchLatencyHistogram.record( + totalTimeInMills, createTags(searchContext, phase, isFailed)); cpuUtilizationHistogram.record( Utils.calculateCPUUtilization( numProcessors, totalTime, task.getTotalResourceStats().getCpuTimeInNanos(), shareFactor), - createTags()); + createTags(searchContext, phase, isFailed)); heapUsedHistogram.record( Math.max(0, task.getTotalResourceStats().getMemoryInBytes() * shareFactor), - createTags()); - } - - private Tags createTags() { - return Tags.create() - .addTag( - RTFMetrics.CommonDimension.INDEX_NAME.toString(), - searchContext.request().shardId().getIndex().getName()) - .addTag( - RTFMetrics.CommonDimension.INDEX_UUID.toString(), - searchContext.request().shardId().getIndex().getUUID()) - .addTag( - RTFMetrics.CommonDimension.SHARD_ID.toString(), - searchContext.request().shardId().getId()) - .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase) - .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed); + createTags(searchContext, phase, isFailed)); } @Override @@ -297,8 +315,18 @@ protected void innerOnFailure(Exception e) { }; } - @VisibleForTesting - static double computeShareFactor(long phaseTookTime, long totalTime) { - return Math.min(1, ((double) phaseTookTime) / Math.max(1.0, totalTime)); + private Tags createTags(SearchContext searchContext, String phase, boolean isFailed) { + return Tags.create() + .addTag( + RTFMetrics.CommonDimension.INDEX_NAME.toString(), + searchContext.request().shardId().getIndex().getName()) + .addTag( + RTFMetrics.CommonDimension.INDEX_UUID.toString(), + searchContext.request().shardId().getIndex().getUUID()) + .addTag( + RTFMetrics.CommonDimension.SHARD_ID.toString(), + searchContext.request().shardId().getId()) + .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase) + .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed); } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java index 6eb64185..6e7897e8 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java @@ -37,6 +37,8 @@ public final class RTFPerformanceAnalyzerTransportChannel implements TransportCh private long operationStartTime; private Histogram cpuUtilizationHistogram; + private Histogram indexingLatencyHistogram; + private Histogram heapUsedHistogram; private TransportChannel original; private String indexName; @@ -45,15 +47,20 @@ public final class RTFPerformanceAnalyzerTransportChannel implements TransportCh private long threadID; private int numProcessors; + private long initialHeapUsedBytes; void set( TransportChannel original, Histogram cpuUtilizationHistogram, + Histogram indexingLatencyHistogram, + Histogram heapUsedHistogram, String indexName, ShardId shardId, boolean bPrimary) { this.original = original; this.cpuUtilizationHistogram = cpuUtilizationHistogram; + this.indexingLatencyHistogram = indexingLatencyHistogram; + this.heapUsedHistogram = heapUsedHistogram; this.indexName = indexName; this.shardId = shardId; this.primary = bPrimary; @@ -61,6 +68,7 @@ void set( this.operationStartTime = System.nanoTime(); threadID = Thread.currentThread().getId(); this.cpuStartTime = threadMXBean.getThreadCpuTime(threadID); + this.initialHeapUsedBytes = threadMXBean.getThreadAllocatedBytes(threadID); this.numProcessors = Runtime.getRuntime().availableProcessors(); LOG.debug("Thread Name {}", Thread.currentThread().getName()); } @@ -90,6 +98,18 @@ public void sendResponse(Exception exception) throws IOException { private void emitMetrics(boolean isFailed) { double cpuUtilization = calculateCPUUtilization(operationStartTime, cpuStartTime); recordCPUUtilizationMetric(shardId, cpuUtilization, OPERATION_SHARD_BULK, isFailed); + + double heapUsedBytes = calculateHeapUsed(); + recordHeapUsedMetric(shardId, heapUsedBytes, OPERATION_SHARD_BULK, isFailed); + + long latencyInNanos = System.nanoTime() - operationStartTime; + double latencyInMillis = latencyInNanos / 1_000_000.0; + recordIndexingLatencyMetric(shardId, latencyInMillis, OPERATION_SHARD_BULK, isFailed); + } + + private double calculateHeapUsed() { + double shareFactor = Utils.computeShareFactor(System.nanoTime(), operationStartTime); + return shareFactor * threadMXBean.getThreadAllocatedBytes(threadID) - initialHeapUsedBytes; } private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTime) { @@ -100,24 +120,38 @@ private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTi numProcessors, (System.nanoTime() - phaseStartTime), totalCpuTime, 1.0); } + @VisibleForTesting + void recordIndexingLatencyMetric( + ShardId shardId, double indexingLatency, String operation, boolean isFailed) { + indexingLatencyHistogram.record(indexingLatency, createTags(shardId, operation, isFailed)); + } + @VisibleForTesting void recordCPUUtilizationMetric( ShardId shardId, double cpuUtilization, String operation, boolean isFailed) { - cpuUtilizationHistogram.record( - cpuUtilization, - Tags.create() - .addTag( - RTFMetrics.CommonDimension.INDEX_NAME.toString(), - shardId.getIndex().getName()) - .addTag( - RTFMetrics.CommonDimension.INDEX_UUID.toString(), - shardId.getIndex().getUUID()) - .addTag(RTFMetrics.CommonDimension.SHARD_ID.toString(), shardId.getId()) - .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation) - .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed) - .addTag( - RTFMetrics.CommonDimension.SHARD_ROLE.toString(), - primary ? SHARD_ROLE_PRIMARY : SHARD_ROLE_REPLICA)); + cpuUtilizationHistogram.record(cpuUtilization, createTags(shardId, operation, isFailed)); + } + + @VisibleForTesting + void recordHeapUsedMetric( + ShardId shardId, double heapUsedBytes, String operation, boolean isFailed) { + heapUsedHistogram.record(heapUsedBytes, createTags(shardId, operation, isFailed)); + } + + private Tags createTags(ShardId shardId, String operation, boolean isFailed) { + return Tags.create() + .addTag( + RTFMetrics.CommonDimension.INDEX_NAME.toString(), + shardId.getIndex().getName()) + .addTag( + RTFMetrics.CommonDimension.INDEX_UUID.toString(), + shardId.getIndex().getUUID()) + .addTag(RTFMetrics.CommonDimension.SHARD_ID.toString(), shardId.getId()) + .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation) + .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed) + .addTag( + RTFMetrics.CommonDimension.SHARD_ROLE.toString(), + primary ? SHARD_ROLE_PRIMARY : SHARD_ROLE_REPLICA); } // This function is called from the security plugin using reflection. Do not diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java index 82a0abe6..fe10f6e7 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java @@ -15,6 +15,7 @@ import org.opensearch.performanceanalyzer.OpenSearchResources; import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics.MetricUnits; import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.tasks.Task; @@ -38,12 +39,16 @@ public final class RTFPerformanceAnalyzerTransportRequestHandler actualHandler; private boolean logOnce = false; private final Histogram cpuUtilizationHistogram; + private final Histogram indexingLatencyHistogram; + private final Histogram heapUsedHistogram; RTFPerformanceAnalyzerTransportRequestHandler( TransportRequestHandler actualHandler, PerformanceAnalyzerController controller) { this.actualHandler = actualHandler; this.controller = controller; this.cpuUtilizationHistogram = createCPUUtilizationHistogram(); + this.indexingLatencyHistogram = createIndexingLatencyHistogram(); + this.heapUsedHistogram = createHeapUsedHistogram(); } private Histogram createCPUUtilizationHistogram() { @@ -58,6 +63,30 @@ private Histogram createCPUUtilizationHistogram() { } } + private Histogram createHeapUsedHistogram() { + MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.OSMetrics.HEAP_ALLOCATED.toString(), + "Heap Utilization per shard for an operation", + RTFMetrics.MetricUnits.BYTE.toString()); + } else { + return null; + } + } + + private Histogram createIndexingLatencyHistogram() { + MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.ShardOperationsValue.SHARD_INDEXING_LATENCY.toString(), + "Indexing Latency per shard for an operation", + MetricUnits.MILLISECOND.toString()); + } else { + return null; + } + } + @Override public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { actualHandler.messageReceived(request, getChannel(request, channel, task), task); @@ -100,17 +129,22 @@ private TransportChannel getShardBulkChannel(T request, TransportChannel channel TransportRequest transportRequest = ((ConcreteShardRequest) request).getRequest(); - if (!(transportRequest instanceof BulkShardRequest)) { + if (!(transportRequest instanceof BulkShardRequest bsr)) { return channel; } - BulkShardRequest bsr = (BulkShardRequest) transportRequest; RTFPerformanceAnalyzerTransportChannel rtfPerformanceAnalyzerTransportChannel = new RTFPerformanceAnalyzerTransportChannel(); try { rtfPerformanceAnalyzerTransportChannel.set( - channel, cpuUtilizationHistogram, bsr.index(), bsr.shardId(), bPrimary); + channel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + heapUsedHistogram, + bsr.index(), + bsr.shardId(), + bPrimary); } catch (Exception ex) { if (!logOnce) { LOG.error(ex); diff --git a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java index 26620455..4d1158c1 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java @@ -5,6 +5,7 @@ package org.opensearch.performanceanalyzer.util; +import com.google.common.annotations.VisibleForTesting; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; @@ -56,6 +57,8 @@ public static void configureMetrics() { MetricsConfiguration.CONFIG_MAP.put( RTFCacheConfigMetricsCollector.class, new MetricsConfiguration.MetricConfig(60000, 0)); + MetricsConfiguration.CONFIG_MAP.put( + RTFShardOperationCollector.class, new MetricsConfiguration.MetricConfig(5000, 0)); } // These methods are utility functions for the Node Stat Metrics Collectors. These methods are @@ -148,4 +151,9 @@ public static double calculateCPUUtilization( LOG.debug("Performance Analyzer CPUUtilization calculation with cpuUtil {}", cpuUtil); return cpuUtil; } + + @VisibleForTesting + public static double computeShareFactor(long phaseTookTime, long totalTime) { + return Math.min(1, ((double) phaseTookTime) / Math.max(1.0, totalTime)); + } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationCollectorTests.java b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationCollectorTests.java new file mode 100644 index 00000000..f21a67ed --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationCollectorTests.java @@ -0,0 +1,122 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors.telemetry; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.indices.IndicesService; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +public class RTFShardOperationCollectorTests extends OpenSearchSingleNodeTestCase { + + private long startTimeInMills = 1153721339; + private static final String TEST_INDEX = "test"; + private RTFShardOperationCollector rtfShardOperationRateCollector; + + @Mock private MetricsRegistry metricsRegistry; + @Mock private Counter indexingRateCounter; + @Mock private Counter searchRateCounter; + @Mock private ConfigOverridesWrapper configOverridesWrapper; + @Mock private PerformanceAnalyzerController performanceAnalyzerController; + + @Before + public void init() { + MockitoAnnotations.initMocks(this); + + MetricsConfiguration.CONFIG_MAP.put( + RTFShardOperationCollector.class, MetricsConfiguration.cdefault); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + OpenSearchResources.INSTANCE.setIndicesService(indicesService); + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + + when(metricsRegistry.createCounter(anyString(), anyString(), anyString())) + .thenReturn(indexingRateCounter) + .thenReturn(searchRateCounter); + + when(metricsRegistry.createCounter(anyString(), anyString(), anyString())) + .thenAnswer( + invocationOnMock -> { + String counterName = (String) invocationOnMock.getArguments()[0]; + if (counterName.contains( + RTFMetrics.ShardOperationsValue.Constants + .SHARD_INDEXING_RATE)) { + return indexingRateCounter; + } + return searchRateCounter; + }); + + when(performanceAnalyzerController.isCollectorDisabled(any(), anyString())) + .thenReturn(false); + + rtfShardOperationRateCollector = + spy( + new RTFShardOperationCollector( + performanceAnalyzerController, configOverridesWrapper)); + } + + @Test + public void testCollectMetrics() throws IOException { + createIndex(TEST_INDEX); + rtfShardOperationRateCollector.collectMetrics(startTimeInMills); + + // first time collection does not publish metrics + verify(indexingRateCounter, never()).add(anyDouble(), any()); + verify(searchRateCounter, never()).add(anyDouble(), any()); + + startTimeInMills += 5000; + rtfShardOperationRateCollector.collectMetrics(startTimeInMills); + + // 0 operation count does not publish metrics + verify(indexingRateCounter, never()).add(anyDouble(), any()); + verify(searchRateCounter, never()).add(anyDouble(), any()); + + // creating indexing and search operation + client().prepareIndex(TEST_INDEX) + .setId("1") + .setSource("{\"field\":\"value1\"}", XContentType.JSON) + .get(); + client().prepareIndex(TEST_INDEX) + .setId("2") + .setSource("{\"field\":\"value2\"}", XContentType.JSON) + .get(); + + client().admin().indices().prepareRefresh(TEST_INDEX).get(); + client().prepareSearch(TEST_INDEX).setQuery(QueryBuilders.matchAllQuery()).get(); + + startTimeInMills += 5000; + rtfShardOperationRateCollector.collectMetrics(startTimeInMills); + + verify(indexingRateCounter, atLeastOnce()).add(anyDouble(), any()); + verify(searchRateCounter, atLeastOnce()).add(anyDouble(), any()); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java index 16aba4bc..f15aefb8 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java @@ -23,6 +23,7 @@ import org.opensearch.performanceanalyzer.OpenSearchResources; import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.performanceanalyzer.util.Utils; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.tasks.Task; @@ -42,6 +43,7 @@ public class RTFPerformanceAnalyzerSearchListenerTests { @Mock private MetricsRegistry metricsRegistry; @Mock private Histogram cpuUtilizationHistogram; @Mock private Histogram heapUsedHistogram; + @Mock private Histogram searchLatencyHistogram; @Mock private Index index; @Mock private TaskResourceUsage taskResourceUsage; @@ -68,6 +70,12 @@ public void init() { metricsRegistry.createHistogram( Mockito.eq("heap_allocated"), Mockito.anyString(), Mockito.eq("B"))) .thenReturn(heapUsedHistogram); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("search_latency"), + Mockito.anyString(), + Mockito.eq("ms"))) + .thenReturn(searchLatencyHistogram); searchListener = new RTFPerformanceAnalyzerSearchListener(controller); assertEquals( RTFPerformanceAnalyzerSearchListener.class.getSimpleName(), @@ -99,6 +107,7 @@ public void testQueryPhase() { searchListener.preQueryPhase(searchContext); searchListener.queryPhase(searchContext, 0l); Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + Mockito.verify(searchLatencyHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); } @Test @@ -119,6 +128,7 @@ public void testFetchPhase() { searchListener.preFetchPhase(searchContext); searchListener.fetchPhase(searchContext, 0l); Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + Mockito.verify(searchLatencyHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); } @Test @@ -133,14 +143,8 @@ public void testFetchPhaseFailed() { @Test public void testOperationShareFactor() { - assertEquals( - Double.valueOf(10.0 / 15), - RTFPerformanceAnalyzerSearchListener.computeShareFactor(10, 15), - 0); - assertEquals( - Double.valueOf(1), - RTFPerformanceAnalyzerSearchListener.computeShareFactor(15, 10), - 0); + assertEquals(Double.valueOf(10.0 / 15), Utils.computeShareFactor(10, 15), 0); + assertEquals(Double.valueOf(1), Utils.computeShareFactor(15, 10), 0); } @Test @@ -158,6 +162,7 @@ public void testTaskCompletionListener() { NotifyOnceListener taskCompletionListener = rtfSearchListener.createListener(searchContext, 0l, 0l, "test", false); taskCompletionListener.onResponse(task); + Mockito.verify(searchLatencyHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); Mockito.verify(cpuUtilizationHistogram) .record(Mockito.anyDouble(), Mockito.any(Tags.class)); Mockito.verify(heapUsedHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java index aa4e425b..62add0e7 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java @@ -32,6 +32,8 @@ public class RTFPerformanceAnalyzerTransportChannelTests { @Mock private TransportChannel originalChannel; @Mock private TransportResponse response; @Mock private Histogram cpuUtilizationHistogram; + @Mock private Histogram indexingLatencyHistogram; + @Mock private Histogram heapUsedHistogram; private ShardId shardId; @Mock private ShardId mockedShardId; @Mock private Index index; @@ -46,7 +48,14 @@ public void init() { String indexName = "testIndex"; shardId = new ShardId(new Index(indexName, "uuid"), 1); channel = new RTFPerformanceAnalyzerTransportChannel(); - channel.set(originalChannel, cpuUtilizationHistogram, indexName, shardId, false); + channel.set( + originalChannel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + heapUsedHistogram, + indexName, + shardId, + false); assertEquals("RTFPerformanceAnalyzerTransportChannelProfile", channel.getProfileName()); assertEquals("RTFPerformanceAnalyzerTransportChannelType", channel.getChannelType()); assertEquals(originalChannel, channel.getInnerChannel()); @@ -71,7 +80,14 @@ public void testResponseWithException() throws IOException { public void testRecordCPUUtilizationMetric() { RTFPerformanceAnalyzerTransportChannel channel = new RTFPerformanceAnalyzerTransportChannel(); - channel.set(originalChannel, cpuUtilizationHistogram, "testIndex", mockedShardId, false); + channel.set( + originalChannel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + heapUsedHistogram, + "testIndex", + mockedShardId, + false); Mockito.when(mockedShardId.getIndex()).thenReturn(index); Mockito.when(index.getName()).thenReturn("myTestIndex"); Mockito.when(index.getUUID()).thenReturn("abc-def"); @@ -79,4 +95,43 @@ public void testRecordCPUUtilizationMetric() { Mockito.verify(cpuUtilizationHistogram) .record(Mockito.anyDouble(), Mockito.any(Tags.class)); } + + @Test + public void testRecordIndexingLatencyMetric() { + RTFPerformanceAnalyzerTransportChannel channel = + new RTFPerformanceAnalyzerTransportChannel(); + channel.set( + originalChannel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + heapUsedHistogram, + "testIndex", + mockedShardId, + false); + Mockito.when(mockedShardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + channel.recordIndexingLatencyMetric(mockedShardId, 123.456, "bulkShard", false); + Mockito.verify(indexingLatencyHistogram) + .record(Mockito.anyDouble(), Mockito.any(Tags.class)); + } + + @Test + public void testRecordHeapUsedMetric() { + RTFPerformanceAnalyzerTransportChannel channel = + new RTFPerformanceAnalyzerTransportChannel(); + channel.set( + originalChannel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + heapUsedHistogram, + "testIndex", + mockedShardId, + false); + Mockito.when(mockedShardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + channel.recordHeapUsedMetric(mockedShardId, 10l, "bulkShard", false); + Mockito.verify(heapUsedHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); + } } From e09421b8384924f5b48495aa49c8636623b68ae6 Mon Sep 17 00:00:00 2001 From: nishchay21 Date: Sun, 29 Jun 2025 13:17:57 +0530 Subject: [PATCH 2/7] Added shard operations collector and optimised node stats collector Signed-off-by: nishchay21 --- .../NodeStatsAllShardsMetricsCollector.java | 34 +++++++++++-------- ...RTFNodeStatsAllShardsMetricsCollector.java | 34 +++++++++---------- 2 files changed, 35 insertions(+), 33 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/NodeStatsAllShardsMetricsCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/NodeStatsAllShardsMetricsCollector.java index c0153b56..b2d1235b 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/collectors/NodeStatsAllShardsMetricsCollector.java +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/NodeStatsAllShardsMetricsCollector.java @@ -58,7 +58,7 @@ public class NodeStatsAllShardsMetricsCollector extends PerformanceAnalyzerMetri private static final int KEYS_PATH_LENGTH = 2; private static final Logger LOG = LogManager.getLogger(NodeStatsAllShardsMetricsCollector.class); - private HashMap prevPerShardStats; + private Map prevPerShardStats; private final PerformanceAnalyzerController controller; public NodeStatsAllShardsMetricsCollector(final PerformanceAnalyzerController controller) { @@ -143,10 +143,10 @@ public void collectMetrics(long startTime) { Map currentPerShardStats = populatePerShardStats(indicesService); - for (HashMap.Entry currentShard : currentPerShardStats.entrySet()) { - ShardId shardId = (ShardId) currentShard.getKey(); - ShardStats currentShardStats = (ShardStats) currentShard.getValue(); - if (prevPerShardStats.size() == 0) { + for (HashMap.Entry currentShard : currentPerShardStats.entrySet()) { + ShardId shardId = currentShard.getKey(); + ShardStats currentShardStats = currentShard.getValue(); + if (prevPerShardStats.isEmpty() || !prevPerShardStats.containsKey(shardId)) { // Populating value for the first run. populateMetricValue( currentShardStats, startTime, shardId.getIndexName(), shardId.id()); @@ -167,6 +167,7 @@ public void collectMetrics(long startTime) { populateDiffMetricValue( prevValue, currValue, startTime, shardId.getIndexName(), shardId.id()); } + prevPerShardStats = currentPerShardStats; } // - Separated to have a unit test; and catch any code changes around this field @@ -180,8 +181,8 @@ public Map populatePerShardStats(IndicesService indicesServ // Populate the shard stats per shard. HashMap currentShards = Utils.getShards(); Map currentPerShardStats = new HashMap<>(Collections.emptyMap()); - for (HashMap.Entry currentShard : currentShards.entrySet()) { - IndexShard currentIndexShard = (IndexShard) currentShard.getValue(); + for (HashMap.Entry currentShard : currentShards.entrySet()) { + IndexShard currentIndexShard = currentShard.getValue(); IndexShardStats currentIndexShardStats = Utils.indexShardStats( indicesService, @@ -190,8 +191,10 @@ public Map populatePerShardStats(IndicesService indicesServ CommonStatsFlags.Flag.QueryCache, CommonStatsFlags.Flag.FieldData, CommonStatsFlags.Flag.RequestCache)); - for (ShardStats shardStats : currentIndexShardStats.getShards()) { - currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats); + if (currentIndexShardStats != null) { + for (ShardStats shardStats : currentIndexShardStats.getShards()) { + currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats); + } } } return currentPerShardStats; @@ -199,12 +202,13 @@ public Map populatePerShardStats(IndicesService indicesServ public void populateMetricValue( ShardStats shardStats, long startTime, String IndexName, int ShardId) { - StringBuilder value = new StringBuilder(); - value.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds()); - // Populate the result with cache specific metrics only. - value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor) - .append(new NodeStatsMetricsAllShardsPerCollectionStatus(shardStats).serialize()); - saveMetricValues(value.toString(), startTime, IndexName, String.valueOf(ShardId)); + String value = + PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds() + + + // Populate the result with cache specific metrics only. + PerformanceAnalyzerMetrics.sMetricNewLineDelimitor + + new NodeStatsMetricsAllShardsPerCollectionStatus(shardStats).serialize(); + saveMetricValues(value, startTime, IndexName, String.valueOf(ShardId)); } public void populateDiffMetricValue( diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java index 54fddf49..19720a35 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java @@ -105,7 +105,7 @@ public RTFNodeStatsAllShardsMetricsCollector( public void collectMetrics(long startTime) { if (performanceAnalyzerController.isCollectorDisabled( configOverridesWrapper, getCollectorName())) { - LOG.info("RTFDisksCollector is disabled. Skipping collection."); + LOG.info("RTFNodeStatsMetricsCollector is disabled. Skipping collection."); return; } IndicesService indicesService = OpenSearchResources.INSTANCE.getIndicesService(); @@ -126,29 +126,27 @@ configOverridesWrapper, getCollectorName())) { for (Map.Entry currentShard : currentPerShardStats.entrySet()) { ShardId shardId = currentShard.getKey(); ShardStats currentShardStats = currentShard.getValue(); - if (prevPerShardStats.isEmpty()) { - // Populating value for the first run. + if (prevPerShardStats.isEmpty() || !prevPerShardStats.containsKey(shardId)) { + // Populating value for the first run of shard. recordMetrics( new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats), shardId); continue; } - if (prevPerShardStats.containsKey(shardId)) { - ShardStats prevShardStats = prevPerShardStats.get(shardId); - if (prevShardStats == null) { - // Populate value for shards which are new and were not present in the previous - // run. - recordMetrics( - new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats), - shardId); - continue; - } - NodeStatsMetricsAllShardsPerCollectionStatus prevValue = - new NodeStatsMetricsAllShardsPerCollectionStatus(prevShardStats); - NodeStatsMetricsAllShardsPerCollectionStatus currValue = - new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats); - populateDiffMetricValue(prevValue, currValue, shardId); + ShardStats prevShardStats = prevPerShardStats.get(shardId); + if (prevShardStats == null) { + // Populate value for shards which are new and were not present in the previous + // run. + recordMetrics( + new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats), + shardId); + continue; } + NodeStatsMetricsAllShardsPerCollectionStatus prevValue = + new NodeStatsMetricsAllShardsPerCollectionStatus(prevShardStats); + NodeStatsMetricsAllShardsPerCollectionStatus currValue = + new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats); + populateDiffMetricValue(prevValue, currValue, shardId); } prevPerShardStats = currentPerShardStats; } From f8b20193e6188efe6c87cee909f759b4bb32c9ef Mon Sep 17 00:00:00 2001 From: nishchay21 Date: Sun, 29 Jun 2025 13:40:54 +0530 Subject: [PATCH 3/7] Added shard operations collector and optimised node stats collector Signed-off-by: nishchay21 --- .../config/setting/PerformanceAnalyzerClusterSettings.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/config/setting/PerformanceAnalyzerClusterSettings.java b/src/main/java/org/opensearch/performanceanalyzer/config/setting/PerformanceAnalyzerClusterSettings.java index c2dabd5e..e1775128 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/config/setting/PerformanceAnalyzerClusterSettings.java +++ b/src/main/java/org/opensearch/performanceanalyzer/config/setting/PerformanceAnalyzerClusterSettings.java @@ -46,7 +46,7 @@ public enum PerformanceAnalyzerFeatureBits { public static final Setting PA_COLLECTORS_SETTING = Setting.intSetting( "cluster.metadata.perf_analyzer.collectors.mode", - 1, + 0, Setting.Property.NodeScope, Setting.Property.Dynamic); From e916125f7f94947fc8425abb92e2a5753a13c385 Mon Sep 17 00:00:00 2001 From: nishchay21 Date: Tue, 1 Jul 2025 19:28:43 +0530 Subject: [PATCH 4/7] Added shard operations collector and optimised node stats collector Signed-off-by: nishchay21 --- ...RTFNodeStatsAllShardsMetricsCollector.java | 10 +-------- .../telemetry/RTFShardOperationCollector.java | 21 +++++++++++++------ 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java index 19720a35..2f6c01df 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java @@ -126,7 +126,7 @@ configOverridesWrapper, getCollectorName())) { for (Map.Entry currentShard : currentPerShardStats.entrySet()) { ShardId shardId = currentShard.getKey(); ShardStats currentShardStats = currentShard.getValue(); - if (prevPerShardStats.isEmpty() || !prevPerShardStats.containsKey(shardId)) { + if (prevPerShardStats.isEmpty() || prevPerShardStats.get(shardId) == null) { // Populating value for the first run of shard. recordMetrics( new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats), @@ -134,14 +134,6 @@ configOverridesWrapper, getCollectorName())) { continue; } ShardStats prevShardStats = prevPerShardStats.get(shardId); - if (prevShardStats == null) { - // Populate value for shards which are new and were not present in the previous - // run. - recordMetrics( - new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats), - shardId); - continue; - } NodeStatsMetricsAllShardsPerCollectionStatus prevValue = new NodeStatsMetricsAllShardsPerCollectionStatus(prevShardStats); NodeStatsMetricsAllShardsPerCollectionStatus currValue = diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationCollector.java index b02a53dc..016b8efe 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationCollector.java +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationCollector.java @@ -141,11 +141,20 @@ private void processOperations( // attributes= {index_name="test", shard_id="0"} private Tags createTags(ShardId shardId) { - return Tags.create() - .addTag(RTFMetrics.CommonDimension.INDEX_NAME.toString(), shardId.getIndexName()) - .addTag( - RTFMetrics.CommonDimension.SHARD_ID.toString(), - String.valueOf(shardId.getId())); + Tags shardOperationsMetricsTag = + Tags.create() + .addTag( + RTFMetrics.CommonDimension.INDEX_NAME.toString(), + shardId.getIndexName()) + .addTag( + RTFMetrics.CommonDimension.SHARD_ID.toString(), + String.valueOf(shardId.getId())); + + if (shardId.getIndex() != null) { + shardOperationsMetricsTag.addTag( + RTFMetrics.CommonDimension.INDEX_UUID.toString(), shardId.getIndex().getUUID()); + } + return shardOperationsMetricsTag; } private void initializeMetricsIfNeeded() { @@ -172,5 +181,5 @@ private void initializeMetricsIfNeeded() { * @param indexOps count of index operations. * @param searchOps count of search operations */ - public record ShardOperation(long indexOps, long searchOps) {} + private record ShardOperation(long indexOps, long searchOps) {} } From 19df885c2537a0e865d784ff8442da4bd19c610a Mon Sep 17 00:00:00 2001 From: nishchay21 Date: Wed, 2 Jul 2025 16:00:51 +0530 Subject: [PATCH 5/7] Added shard operations collector and optimized node stats collector Signed-off-by: nishchay21 --- .../RTFPerformanceAnalyzerSearchListenerTests.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java index f15aefb8..d1aa84dd 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java @@ -72,7 +72,7 @@ public void init() { .thenReturn(heapUsedHistogram); Mockito.when( metricsRegistry.createHistogram( - Mockito.eq("search_latency"), + Mockito.eq("shard_search_latency"), Mockito.anyString(), Mockito.eq("ms"))) .thenReturn(searchLatencyHistogram); @@ -104,6 +104,9 @@ public void testQueryPhase() { initializeValidSearchContext(true); Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + Mockito.when(shardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); searchListener.preQueryPhase(searchContext); searchListener.queryPhase(searchContext, 0l); Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); @@ -115,6 +118,9 @@ public void testQueryPhaseFailed() { initializeValidSearchContext(true); Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + Mockito.when(shardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); searchListener.preQueryPhase(searchContext); searchListener.failedQueryPhase(searchContext); Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); @@ -125,6 +131,9 @@ public void testFetchPhase() { initializeValidSearchContext(true); Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + Mockito.when(shardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); searchListener.preFetchPhase(searchContext); searchListener.fetchPhase(searchContext, 0l); Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); @@ -136,6 +145,9 @@ public void testFetchPhaseFailed() { initializeValidSearchContext(true); Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + Mockito.when(shardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); searchListener.preFetchPhase(searchContext); searchListener.failedFetchPhase(searchContext); Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); From cfa5ae4fe73e068cf2da6d7cc25ec8dfa1db94b9 Mon Sep 17 00:00:00 2001 From: nishchay21 Date: Thu, 3 Jul 2025 13:37:56 +0530 Subject: [PATCH 6/7] Adding shard operation changes Signed-off-by: nishchay21 --- .../RTFPerformanceAnalyzerSearchListener.java | 15 ++++----------- ...RTFPerformanceAnalyzerSearchListenerTests.java | 2 +- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java index b5fd2776..97141231 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -189,14 +189,13 @@ public void preQueryPhase(SearchContext searchContext) { @Override public void queryPhase(SearchContext searchContext, long tookInNanos) { long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); - long queryTime = (System.nanoTime() - queryStartTime); - double queryTimeInMills = queryTime / 1_000_000.0; + double queryTimeInMills = tookInNanos / 1_000_000.0; searchLatencyHistogram.record( queryTimeInMills, createTags(searchContext, SHARD_QUERY_PHASE, false)); addResourceTrackingCompletionListener( - searchContext, queryStartTime, queryTime, SHARD_QUERY_PHASE, false); + searchContext, queryStartTime, tookInNanos, SHARD_QUERY_PHASE, false); } @Override @@ -215,14 +214,12 @@ public void preFetchPhase(SearchContext searchContext) { @Override public void fetchPhase(SearchContext searchContext, long tookInNanos) { long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime()); - long fetchTime = (System.nanoTime() - fetchStartTime); - double fetchTimeInMills = fetchTime / 1_000_000.0; - + double fetchTimeInMills = tookInNanos / 1_000_000.0; searchLatencyHistogram.record( fetchTimeInMills, createTags(searchContext, SHARD_FETCH_PHASE, false)); addResourceTrackingCompletionListenerForFetchPhase( - searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, false); + searchContext, fetchStartTime, tookInNanos, SHARD_FETCH_PHASE, false); } @Override @@ -291,11 +288,7 @@ protected void innerOnResponse(Task task) { * overall start time. */ long totalTime = System.nanoTime() - startTime; - double totalTimeInMills = totalTime / 1_000_000.0; double shareFactor = computeShareFactor(phaseTookTime, totalTime); - - searchLatencyHistogram.record( - totalTimeInMills, createTags(searchContext, phase, isFailed)); cpuUtilizationHistogram.record( Utils.calculateCPUUtilization( numProcessors, diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java index d1aa84dd..76b420ed 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java @@ -174,7 +174,7 @@ public void testTaskCompletionListener() { NotifyOnceListener taskCompletionListener = rtfSearchListener.createListener(searchContext, 0l, 0l, "test", false); taskCompletionListener.onResponse(task); - Mockito.verify(searchLatencyHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); + Mockito.verify(cpuUtilizationHistogram) .record(Mockito.anyDouble(), Mockito.any(Tags.class)); Mockito.verify(heapUsedHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); From 21f3f7c1c1d01e5655271a274340dcf0cc8d0bc9 Mon Sep 17 00:00:00 2001 From: nishchay21 Date: Tue, 28 Oct 2025 10:21:27 +0530 Subject: [PATCH 7/7] Added shard operations collector and optimized node stats collector Signed-off-by: nishchay21 --- .../PerformanceAnalyzerPlugin.java | 2 + .../ShardMetricsCollector.java | 119 ++++++++++++++++++ .../RTFPerformanceAnalyzerSearchListener.java | 60 ++++++--- ...TFPerformanceAnalyzerTransportChannel.java | 41 ++++-- .../ShardMetricsCollectorTests.java | 84 +++++++++++++ ...erformanceAnalyzerSearchListenerTests.java | 51 +++++--- ...formanceAnalyzerTransportChannelTests.java | 61 ++++++++- 7 files changed, 369 insertions(+), 49 deletions(-) create mode 100644 src/main/java/org/opensearch/performanceanalyzer/ShardMetricsCollector.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/ShardMetricsCollectorTests.java diff --git a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java index d877184f..6e9f85a1 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java +++ b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java @@ -407,6 +407,8 @@ public Collection createComponents( // initialize it. This is the earliest point at which we know ClusterService is created. // So, call the initialize method here. clusterSettingsManager.initialize(); + // Initialize ShardMetricsCollector histograms + ShardMetricsCollector.INSTANCE.initialize(); return Collections.singletonList(performanceAnalyzerController); } diff --git a/src/main/java/org/opensearch/performanceanalyzer/ShardMetricsCollector.java b/src/main/java/org/opensearch/performanceanalyzer/ShardMetricsCollector.java new file mode 100644 index 00000000..d18f9406 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/ShardMetricsCollector.java @@ -0,0 +1,119 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer; + +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +/** + * A singleton collector for recording per-shard CPU and heap metrics in OpenSearch. This class + * maintains two histograms: + * + *
    + *
  • CPU utilization histogram - tracks CPU usage per shard + *
  • Heap usage histogram - tracks heap memory allocation per shard + *
+ * + * The metrics are recorded with tags for better categorization and analysis. + */ +public final class ShardMetricsCollector { + /** Singleton instance of the ShardMetricsCollector */ + public static final ShardMetricsCollector INSTANCE = new ShardMetricsCollector(); + + public static final String SHARD_CPU_UTILIZATION = "shard_cpu_utilization"; + public static final String SHARD_HEAP_ALLOCATED = "shard_heap_allocated"; + + /** Histogram for tracking CPU utilization -- GETTER -- Gets the CPU utilization histogram. */ + private Histogram cpuUtilizationHistogram; + + /** Histogram for tracking heap usage -- GETTER -- Gets the heap usage histogram. */ + private Histogram heapUsedHistogram; + + /** + * Private constructor that initializes the CPU and heap histograms. This is private to ensure + * singleton pattern. + */ + private ShardMetricsCollector() { + this.cpuUtilizationHistogram = null; + this.heapUsedHistogram = null; + } + + /** Initialise metric histograms */ + public void initialize() { + if (this.cpuUtilizationHistogram == null) { + this.cpuUtilizationHistogram = createCpuUtilizationHistogram(); + } + if (this.heapUsedHistogram == null) { + this.heapUsedHistogram = createHeapUsedHistogram(); + } + } + + /** + * Creates a histogram for tracking CPU utilization. + * + * @return A histogram instance for CPU metrics, or null if metrics registry is unavailable + */ + private Histogram createCpuUtilizationHistogram() { + MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + SHARD_CPU_UTILIZATION, + "CPU Utilization per shard for an operation", + RTFMetrics.MetricUnits.RATE.toString()); + } + return null; + } + + /** + * Creates a histogram for tracking heap usage. + * + * @return A histogram instance for heap metrics, or null if metrics registry is unavailable + */ + private Histogram createHeapUsedHistogram() { + MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + SHARD_HEAP_ALLOCATED, + "Heap Utilization per shard for an operation", + RTFMetrics.MetricUnits.BYTE.toString()); + } + return null; + } + + /** + * Records a CPU utilization measurement with associated tags. + * + * @param cpuUtilization The CPU utilization value to record (as a percentage) + * @param tags The tags to associate with this measurement (e.g., shard ID, operation type) + */ + public void recordCpuUtilization(double cpuUtilization, Tags tags) { + if (cpuUtilizationHistogram != null) { + cpuUtilizationHistogram.record(cpuUtilization, tags); + } + } + + /** + * Records a heap usage measurement with associated tags. + * + * @param heapBytes The heap usage value to record (in bytes) + * @param tags The tags to associate with this measurement (e.g., shard ID, operation type) + */ + public void recordHeapUsed(double heapBytes, Tags tags) { + if (heapUsedHistogram != null) { + heapUsedHistogram.record(heapBytes, tags); + } + } + + public Histogram getCpuUtilizationHistogram() { + return cpuUtilizationHistogram; + } + + public Histogram getHeapUsedHistogram() { + return heapUsedHistogram; + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java index 97141231..6d7b0532 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -16,6 +16,7 @@ import org.opensearch.core.action.NotifyOnceListener; import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.ShardMetricsCollector; import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics.ShardOperationsValue; @@ -287,18 +288,29 @@ protected void innerOnResponse(Task task) { * particular phaseTime and the total time till this calculation happen from the * overall start time. */ - long totalTime = System.nanoTime() - startTime; + long totalTime = System.nanoTime() - task.getStartTimeNanos(); double shareFactor = computeShareFactor(phaseTookTime, totalTime); - cpuUtilizationHistogram.record( + LOG.debug( + "Total task time {} ns. Total Operation Listener time {} ns. " + + "Phase took time {} ns. Share factor {} ", + totalTime, + System.nanoTime() - startTime, + phaseTookTime, + shareFactor); + double cpuUtilization = Utils.calculateCPUUtilization( numProcessors, totalTime, task.getTotalResourceStats().getCpuTimeInNanos(), - shareFactor), - createTags(searchContext, phase, isFailed)); - heapUsedHistogram.record( - Math.max(0, task.getTotalResourceStats().getMemoryInBytes() * shareFactor), - createTags(searchContext, phase, isFailed)); + shareFactor); + cpuUtilizationHistogram.record( + cpuUtilization, createTags(searchContext, phase, isFailed)); + ShardMetricsCollector.INSTANCE.recordCpuUtilization( + cpuUtilization, createTags(searchContext)); + double heapUsed = + Math.max(0, task.getTotalResourceStats().getMemoryInBytes() * shareFactor); + heapUsedHistogram.record(heapUsed, createTags(searchContext, phase, isFailed)); + ShardMetricsCollector.INSTANCE.recordHeapUsed(heapUsed, createTags(searchContext)); } @Override @@ -309,17 +321,27 @@ protected void innerOnFailure(Exception e) { } private Tags createTags(SearchContext searchContext, String phase, boolean isFailed) { - return Tags.create() - .addTag( - RTFMetrics.CommonDimension.INDEX_NAME.toString(), - searchContext.request().shardId().getIndex().getName()) - .addTag( - RTFMetrics.CommonDimension.INDEX_UUID.toString(), - searchContext.request().shardId().getIndex().getUUID()) - .addTag( - RTFMetrics.CommonDimension.SHARD_ID.toString(), - searchContext.request().shardId().getId()) - .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase) - .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed); + Tags tags = + Tags.create() + .addTag( + RTFMetrics.CommonDimension.INDEX_NAME.toString(), + searchContext.request().shardId().getIndex().getName()) + .addTag( + RTFMetrics.CommonDimension.INDEX_UUID.toString(), + searchContext.request().shardId().getIndex().getUUID()) + .addTag( + RTFMetrics.CommonDimension.SHARD_ID.toString(), + searchContext.request().shardId().getId()); + + // Only add phase tag if phase is not null + if (phase != null && !phase.isEmpty()) { + tags.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase) + .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed); + } + return tags; + } + + private Tags createTags(SearchContext searchContext) { + return createTags(searchContext, null, false); } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java index ed459cf9..92d49409 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java @@ -15,6 +15,7 @@ import org.opensearch.Version; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.performanceanalyzer.ShardMetricsCollector; import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; import org.opensearch.performanceanalyzer.util.Utils; import org.opensearch.telemetry.metrics.Histogram; @@ -142,27 +143,41 @@ void recordIndexingLatencyMetric( void recordCPUUtilizationMetric( ShardId shardId, double cpuUtilization, String operation, boolean isFailed) { cpuUtilizationHistogram.record(cpuUtilization, createTags(shardId, operation, isFailed)); + ShardMetricsCollector.INSTANCE.recordCpuUtilization(cpuUtilization, createTags(shardId)); } @VisibleForTesting void recordHeapUsedMetric( ShardId shardId, double heapUsedBytes, String operation, boolean isFailed) { heapUsedHistogram.record(heapUsedBytes, createTags(shardId, operation, isFailed)); + ShardMetricsCollector.INSTANCE.recordHeapUsed(heapUsedBytes, createTags(shardId)); } private Tags createTags(ShardId shardId, String operation, boolean isFailed) { - return Tags.create() - .addTag( - RTFMetrics.CommonDimension.INDEX_NAME.toString(), - shardId.getIndex().getName()) - .addTag( - RTFMetrics.CommonDimension.INDEX_UUID.toString(), - shardId.getIndex().getUUID()) - .addTag(RTFMetrics.CommonDimension.SHARD_ID.toString(), shardId.getId()) - .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation) - .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed) - .addTag( - RTFMetrics.CommonDimension.SHARD_ROLE.toString(), - primary ? SHARD_ROLE_PRIMARY : SHARD_ROLE_REPLICA); + Tags tags = + Tags.create() + .addTag( + RTFMetrics.CommonDimension.INDEX_NAME.toString(), + shardId.getIndex().getName()) + .addTag( + RTFMetrics.CommonDimension.INDEX_UUID.toString(), + shardId.getIndex().getUUID()) + .addTag(RTFMetrics.CommonDimension.SHARD_ID.toString(), shardId.getId()); + + // Only add operation tag if operation is not null + if (operation != null && !operation.isEmpty()) { + tags.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation) + .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed) + .addTag( + RTFMetrics.CommonDimension.SHARD_ROLE.toString(), + primary ? SHARD_ROLE_PRIMARY : SHARD_ROLE_REPLICA); + ; + } + + return tags; + } + + private Tags createTags(ShardId shardId) { + return createTags(shardId, null, false); } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/ShardMetricsCollectorTests.java b/src/test/java/org/opensearch/performanceanalyzer/ShardMetricsCollectorTests.java new file mode 100644 index 00000000..872aba52 --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/ShardMetricsCollectorTests.java @@ -0,0 +1,84 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.junit.Test; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +public class ShardMetricsCollectorTests { + private ShardMetricsCollector shardMetricsCollector; + private static MetricsRegistry metricsRegistry; + private static Histogram cpuUtilizationHistogram; + private static Histogram heapUsedHistogram; + + @Before + public void init() { + if (cpuUtilizationHistogram != null && heapUsedHistogram != null) { + // Clear any previous mock interactions + clearInvocations(cpuUtilizationHistogram, heapUsedHistogram); + } + + metricsRegistry = mock(MetricsRegistry.class); + cpuUtilizationHistogram = mock(Histogram.class); + heapUsedHistogram = mock(Histogram.class); + + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + + when(metricsRegistry.createHistogram(anyString(), anyString(), anyString())) + .thenAnswer( + invocationOnMock -> { + String histogramName = (String) invocationOnMock.getArguments()[0]; + if (histogramName.equals(ShardMetricsCollector.SHARD_CPU_UTILIZATION)) { + return cpuUtilizationHistogram; + } else if (histogramName.equals( + ShardMetricsCollector.SHARD_HEAP_ALLOCATED)) { + return heapUsedHistogram; + } + return null; + }); + } + + @Test + public void testRecordMetrics() { + shardMetricsCollector = ShardMetricsCollector.INSTANCE; + shardMetricsCollector.initialize(); + Tags testTags = Tags.create().addTag("shard_id", "1").addTag("operation", "search"); + + shardMetricsCollector.recordCpuUtilization(75.0, testTags); + verify(cpuUtilizationHistogram, times(1)).record(75.0, testTags); + + shardMetricsCollector.recordHeapUsed(1024.0, testTags); + verify(heapUsedHistogram, times(1)).record(1024.0, testTags); + } + + @Test + public void testNullHistogram() { + // Reset collector and set null registry + shardMetricsCollector = ShardMetricsCollector.INSTANCE; + OpenSearchResources.INSTANCE.setMetricsRegistry(null); + shardMetricsCollector.initialize(); + + Tags testTags = Tags.create().addTag("shard_id", "1").addTag("operation", "search"); + + // Verify no exceptions when recording with null histograms + shardMetricsCollector.recordCpuUtilization(75.0, testTags); + shardMetricsCollector.recordHeapUsed(1024.0, testTags); + + // Verify no interactions + verifyZeroInteractions(cpuUtilizationHistogram, heapUsedHistogram); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java index 76b420ed..7d5c726c 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java @@ -21,6 +21,8 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.ShardMetricsCollector; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.performanceanalyzer.util.Utils; @@ -44,6 +46,8 @@ public class RTFPerformanceAnalyzerSearchListenerTests { @Mock private Histogram cpuUtilizationHistogram; @Mock private Histogram heapUsedHistogram; @Mock private Histogram searchLatencyHistogram; + @Mock private Histogram shardMetricsCpuHistogram; + @Mock private Histogram shardMetricsHeapHistogram; @Mock private Index index; @Mock private TaskResourceUsage taskResourceUsage; @@ -60,22 +64,31 @@ public void init() { initMocks(this); OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); Mockito.when(controller.isPerformanceAnalyzerEnabled()).thenReturn(true); + + // First set up metrics registry with most lenient matching Mockito.when( metricsRegistry.createHistogram( - Mockito.eq("cpu_utilization"), - Mockito.anyString(), - Mockito.eq("rate"))) - .thenReturn(cpuUtilizationHistogram); - Mockito.when( - metricsRegistry.createHistogram( - Mockito.eq("heap_allocated"), Mockito.anyString(), Mockito.eq("B"))) - .thenReturn(heapUsedHistogram); - Mockito.when( - metricsRegistry.createHistogram( - Mockito.eq("shard_search_latency"), - Mockito.anyString(), - Mockito.eq("ms"))) - .thenReturn(searchLatencyHistogram); + Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) + .thenAnswer( + invocation -> { + String name = invocation.getArgument(0); + if (name.equals(ShardMetricsCollector.SHARD_CPU_UTILIZATION)) { + return shardMetricsCpuHistogram; + } else if (name.equals(ShardMetricsCollector.SHARD_HEAP_ALLOCATED)) { + return shardMetricsHeapHistogram; + } else if (name.equals( + RTFMetrics.OSMetrics.CPU_UTILIZATION.toString())) { + return cpuUtilizationHistogram; + } else if (name.equals( + RTFMetrics.OSMetrics.HEAP_ALLOCATED.toString())) { + return heapUsedHistogram; + } else if (name.equals( + RTFMetrics.ShardOperationsValue.SHARD_SEARCH_LATENCY + .toString())) { + return searchLatencyHistogram; + } + return null; + }); searchListener = new RTFPerformanceAnalyzerSearchListener(controller); assertEquals( RTFPerformanceAnalyzerSearchListener.class.getSimpleName(), @@ -161,6 +174,14 @@ public void testOperationShareFactor() { @Test public void testTaskCompletionListener() { + Histogram shardCpu = ShardMetricsCollector.INSTANCE.getCpuUtilizationHistogram(); + Histogram shardHeap = ShardMetricsCollector.INSTANCE.getHeapUsedHistogram(); + + if (shardCpu != null && shardHeap != null) { + // Clear any previous mock interactions + Mockito.clearInvocations(shardCpu, shardHeap); + } + ShardMetricsCollector.INSTANCE.initialize(); initializeValidSearchContext(true); RTFPerformanceAnalyzerSearchListener rtfSearchListener = new RTFPerformanceAnalyzerSearchListener(controller); @@ -178,6 +199,8 @@ public void testTaskCompletionListener() { Mockito.verify(cpuUtilizationHistogram) .record(Mockito.anyDouble(), Mockito.any(Tags.class)); Mockito.verify(heapUsedHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); + Mockito.verify(shardCpu).record(Mockito.anyDouble(), Mockito.any(Tags.class)); + Mockito.verify(shardHeap).record(Mockito.anyDouble(), Mockito.any(Tags.class)); } private void initializeValidSearchContext(boolean isValid) { diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java index 79eb0e88..4c0c800d 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java @@ -31,8 +31,11 @@ import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.ShardMetricsCollector; import org.opensearch.performanceanalyzer.util.Utils; import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.metrics.tags.Tags; import org.opensearch.transport.TransportChannel; @@ -44,6 +47,9 @@ public class RTFPerformanceAnalyzerTransportChannelTests { @Mock private Histogram cpuUtilizationHistogram; @Mock private Histogram indexingLatencyHistogram; @Mock private Histogram heapUsedHistogram; + @Mock private Histogram shardMetricsCpuHistogram; + @Mock private Histogram shardMetricsHeapHistogram; + private ShardId shardId; @Mock private ShardId mockedShardId; @Mock private Index index; @@ -58,7 +64,35 @@ public void init() { String indexName = "testIndex"; shardId = new ShardId(new Index(indexName, "uuid"), 1); channel = new RTFPerformanceAnalyzerTransportChannel(); - channel.set(originalChannel, cpuUtilizationHistogram, indexName, shardId, false); + + // Setup metrics registry to return our mock histograms + MetricsRegistry metricsRegistry = Mockito.mock(MetricsRegistry.class); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq(ShardMetricsCollector.SHARD_CPU_UTILIZATION), + Mockito.anyString(), + Mockito.anyString())) + .thenReturn(shardMetricsCpuHistogram); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq(ShardMetricsCollector.SHARD_HEAP_ALLOCATED), + Mockito.anyString(), + Mockito.anyString())) + .thenReturn(shardMetricsHeapHistogram); + + // Set the metrics registry + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + + // Initialize ShardMetricsCollector + ShardMetricsCollector.INSTANCE.initialize(); + channel.set( + originalChannel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + heapUsedHistogram, + indexName, + shardId, + false); } @Test @@ -98,6 +132,11 @@ public void testResponseWithException() throws IOException { @Test public void testRecordCPUUtilizationMetric() { + Histogram shardCpu = ShardMetricsCollector.INSTANCE.getCpuUtilizationHistogram(); + if (shardCpu != null) { + // Clear any previous mock interactions + Mockito.clearInvocations(shardCpu); + } RTFPerformanceAnalyzerTransportChannel channel = new RTFPerformanceAnalyzerTransportChannel(); channel.set( @@ -114,6 +153,7 @@ public void testRecordCPUUtilizationMetric() { channel.recordCPUUtilizationMetric(mockedShardId, 10l, "bulkShard", false); Mockito.verify(cpuUtilizationHistogram) .record(Mockito.anyDouble(), Mockito.any(Tags.class)); + Mockito.verify(shardCpu).record(anyDouble(), any(Tags.class)); } @Test @@ -138,6 +178,11 @@ public void testRecordIndexingLatencyMetric() { @Test public void testRecordHeapUsedMetric() { + Histogram shardHeap = ShardMetricsCollector.INSTANCE.getHeapUsedHistogram(); + if (shardHeap != null) { + // Clear any previous mock interactions + Mockito.clearInvocations(shardHeap); + } RTFPerformanceAnalyzerTransportChannel channel = new RTFPerformanceAnalyzerTransportChannel(); channel.set( @@ -153,13 +198,23 @@ public void testRecordHeapUsedMetric() { Mockito.when(index.getUUID()).thenReturn("abc-def"); channel.recordHeapUsedMetric(mockedShardId, 10l, "bulkShard", false); Mockito.verify(heapUsedHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); + // Verify the shard metrics histogram + Mockito.verify(shardHeap).record(anyDouble(), any(Tags.class)); + } public void testRTFPAChannelDelegatesToOriginal() throws InvocationTargetException, IllegalAccessException { TransportChannel handlerSpy = spy(originalChannel); RTFPerformanceAnalyzerTransportChannel rtfChannel = new RTFPerformanceAnalyzerTransportChannel(); - rtfChannel.set(handlerSpy, cpuUtilizationHistogram, index.getName(), shardId, false); + rtfChannel.set( + handlerSpy, + cpuUtilizationHistogram, + indexingLatencyHistogram, + heapUsedHistogram, + index.getName(), + shardId, + false); List overridableMethods = Arrays.stream(TransportChannel.class.getMethods()) @@ -171,7 +226,7 @@ public void testRTFPAChannelDelegatesToOriginal() .collect(Collectors.toList()); for (Method method : overridableMethods) { - // completeStream and sendresponsebatch Methods are experimental and not + // completeStream and sendresponsebatch Methods are experimental and not // implemented in PAChannel if (Set.of("sendresponsebatch", "completestream") .contains(method.getName().toLowerCase())) {