From be8e1082a3058ff65753cf7d1099b611932b03e6 Mon Sep 17 00:00:00 2001 From: Arpit Patawat Date: Tue, 8 Apr 2025 11:14:57 +0530 Subject: [PATCH 1/4] Add Indexing Latency, Indexing and Search Rate metric per shard Signed-off-by: Arpit Patawat --- .../PerformanceAnalyzerPlugin.java | 4 + .../RTFShardOperationRateCollector.java | 171 ++++++++++++++++++ ...TFPerformanceAnalyzerTransportChannel.java | 18 ++ ...rmanceAnalyzerTransportRequestHandler.java | 22 ++- .../performanceanalyzer/util/Utils.java | 3 + .../RTFShardOperationRateCollectorTests.java | 98 ++++++++++ ...formanceAnalyzerTransportChannelTests.java | 36 +++- 7 files changed, 349 insertions(+), 3 deletions(-) create mode 100644 src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollector.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollectorTests.java diff --git a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java index 705c6d1d..5e78b4c0 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java +++ b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java @@ -58,6 +58,7 @@ import org.opensearch.performanceanalyzer.collectors.telemetry.RTFDisksCollector; import org.opensearch.performanceanalyzer.collectors.telemetry.RTFHeapMetricsCollector; import org.opensearch.performanceanalyzer.collectors.telemetry.RTFNodeStatsAllShardsMetricsCollector; +import org.opensearch.performanceanalyzer.collectors.telemetry.RTFShardOperationRateCollector; import org.opensearch.performanceanalyzer.collectors.telemetry.RTFThreadPoolMetricsCollector; import org.opensearch.performanceanalyzer.commons.OSMetricsGeneratorFactory; import org.opensearch.performanceanalyzer.commons.collectors.DisksCollector; @@ -230,6 +231,9 @@ private void scheduleTelemetryCollectors() { new RTFDisksCollector(performanceAnalyzerController, configOverridesWrapper)); scheduledMetricCollectorsExecutor.addScheduledMetricCollector( new RTFHeapMetricsCollector(performanceAnalyzerController, configOverridesWrapper)); + scheduledMetricCollectorsExecutor.addScheduledMetricCollector( + new RTFShardOperationRateCollector( + performanceAnalyzerController, configOverridesWrapper)); scheduledMetricCollectorsExecutor.addScheduledMetricCollector( new RTFThreadPoolMetricsCollector( performanceAnalyzerController, configOverridesWrapper)); diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollector.java new file mode 100644 index 00000000..155f6c53 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollector.java @@ -0,0 +1,171 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors.telemetry; + +import java.util.HashMap; +import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.collectors.PerformanceAnalyzerMetricsCollector; +import org.opensearch.performanceanalyzer.commons.collectors.TelemetryCollector; +import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics.MetricUnits; +import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode; +import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.performanceanalyzer.util.Utils; +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +/** This collector measures indexing and search rate per shard per minute. */ +public class RTFShardOperationRateCollector extends PerformanceAnalyzerMetricsCollector + implements TelemetryCollector { + + private static final Logger LOG = LogManager.getLogger(RTFShardOperationRateCollector.class); + public static final int SAMPLING_TIME_INTERVAL = + MetricsConfiguration.CONFIG_MAP.get(RTFShardOperationRateCollector.class) + .samplingInterval; + + private Counter indexingRateHistogram; + private Counter searchRateHistogram; + + private final Map prevIndexingOps; + private final Map prevSearchOps; + private long lastCollectionTimeInMillis; + + private MetricsRegistry metricsRegistry; + private boolean metricsInitialized; + private final PerformanceAnalyzerController controller; + private final ConfigOverridesWrapper configOverridesWrapper; + + public RTFShardOperationRateCollector( + PerformanceAnalyzerController controller, + ConfigOverridesWrapper configOverridesWrapper) { + super( + SAMPLING_TIME_INTERVAL, + "RTFShardOperationRateCollector", + StatMetrics.RTF_SHARD_OPERATION_RATE_COLLECTOR_EXECUTION_TIME, + StatExceptionCode.RTF_SHARD_OPERATION_RATE_COLLECTOR_ERROR); + + this.controller = controller; + this.configOverridesWrapper = configOverridesWrapper; + this.metricsInitialized = false; + + this.prevIndexingOps = new HashMap<>(); + this.prevSearchOps = new HashMap<>(); + this.lastCollectionTimeInMillis = System.currentTimeMillis(); + } + + @Override + public void collectMetrics(long startTime) { + if (controller.isCollectorDisabled(configOverridesWrapper, getCollectorName())) { + LOG.info("RTFShardOperationRateCollector is disabled. Skipping collection."); + return; + } + + metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry == null) { + LOG.error("Could not get the instance of MetricsRegistry class"); + return; + } + + initializeMetricsIfNeeded(); + LOG.debug("Executing collect metrics for RTFShardOperationRateCollector"); + + long currentTimeInMillis = System.currentTimeMillis(); + float minutesSinceLastCollection = + (currentTimeInMillis - lastCollectionTimeInMillis) / (1000.0f * 60.0f); + + // Get all shards + Map currentShards = Utils.getShards(); + + for (Map.Entry entry : currentShards.entrySet()) { + ShardId shardId = entry.getKey(); + IndexShard shard = entry.getValue(); + + try { + long currentIndexingOps = shard.indexingStats().getTotal().getIndexCount(); + long currentSearchOps = shard.searchStats().getTotal().getQueryCount(); + + if (prevIndexingOps.containsKey(shardId)) { + processIndexingOperations( + shardId, currentIndexingOps, minutesSinceLastCollection); + } + + if (prevSearchOps.containsKey(shardId)) { + processSearchOperations(shardId, currentSearchOps, minutesSinceLastCollection); + } + + // Update previous values for next collection + prevIndexingOps.put(shardId, currentIndexingOps); + prevSearchOps.put(shardId, currentSearchOps); + } catch (Exception e) { + LOG.error( + "Error collecting indexing/search rate metrics for shard {}: {}", + shardId, + e.getMessage()); + } + } + + lastCollectionTimeInMillis = currentTimeInMillis; + } + + private void processIndexingOperations( + ShardId shardId, long currentIndexingOps, float minutesSinceLastCollection) { + long indexingOpsDiff = Math.max(0, currentIndexingOps - prevIndexingOps.get(shardId)); + float indexingRatePerMinute = indexingOpsDiff / minutesSinceLastCollection; + + // Round to 2 decimal places + indexingRatePerMinute = Math.round(indexingRatePerMinute * 100.0f) / 100.0f; + + Tags tags = createTags(shardId); + indexingRateHistogram.add(indexingRatePerMinute, tags); + } + + private void processSearchOperations( + ShardId shardId, long currentSearchOps, float minutesSinceLastCollection) { + long searchOpsDiff = Math.max(0, currentSearchOps - prevSearchOps.get(shardId)); + float searchRatePerMinute = searchOpsDiff / minutesSinceLastCollection; + + // Round to 2 decimal places + searchRatePerMinute = Math.round(searchRatePerMinute * 100.0f) / 100.0f; + + Tags tags = createTags(shardId); + searchRateHistogram.add(searchRatePerMinute, tags); + } + + private Tags createTags(ShardId shardId) { + return Tags.create() + .addTag(RTFMetrics.CommonDimension.INDEX_NAME.toString(), shardId.getIndexName()) + .addTag( + RTFMetrics.CommonDimension.SHARD_ID.toString(), + String.valueOf(shardId.getId())); + } + + private void initializeMetricsIfNeeded() { + if (!metricsInitialized) { + indexingRateHistogram = + metricsRegistry.createCounter( + RTFMetrics.OperationsValue.Constants.INDEXING_RATE, + "Indexing operations per minute per shard", + MetricUnits.RATE.toString()); + + searchRateHistogram = + metricsRegistry.createCounter( + RTFMetrics.OperationsValue.Constants.SEARCH_RATE, + "Search operations per minute per shard", + MetricUnits.RATE.toString()); + + metricsInitialized = true; + } + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java index 6eb64185..4b56ed87 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java @@ -37,6 +37,7 @@ public final class RTFPerformanceAnalyzerTransportChannel implements TransportCh private long operationStartTime; private Histogram cpuUtilizationHistogram; + private Histogram indexingLatencyHistogram; private TransportChannel original; private String indexName; @@ -49,11 +50,13 @@ public final class RTFPerformanceAnalyzerTransportChannel implements TransportCh void set( TransportChannel original, Histogram cpuUtilizationHistogram, + Histogram indexingLatencyHistogram, String indexName, ShardId shardId, boolean bPrimary) { this.original = original; this.cpuUtilizationHistogram = cpuUtilizationHistogram; + this.indexingLatencyHistogram = indexingLatencyHistogram; this.indexName = indexName; this.shardId = shardId; this.primary = bPrimary; @@ -90,6 +93,10 @@ public void sendResponse(Exception exception) throws IOException { private void emitMetrics(boolean isFailed) { double cpuUtilization = calculateCPUUtilization(operationStartTime, cpuStartTime); recordCPUUtilizationMetric(shardId, cpuUtilization, OPERATION_SHARD_BULK, isFailed); + + long latencyInNanos = System.nanoTime() - operationStartTime; + double latencyInMillis = latencyInNanos / 1_000_000.0; + recordIndexingLatencyMetric(latencyInMillis); } private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTime) { @@ -100,6 +107,17 @@ private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTi numProcessors, (System.nanoTime() - phaseStartTime), totalCpuTime, 1.0); } + @VisibleForTesting + void recordIndexingLatencyMetric(double indexingLatency) { + indexingLatencyHistogram.record( + indexingLatency, + Tags.create() + .addTag(RTFMetrics.CommonDimension.INDEX_NAME.toString(), indexName) + .addTag( + RTFMetrics.CommonDimension.SHARD_ID.toString(), + shardId.toString())); + } + @VisibleForTesting void recordCPUUtilizationMetric( ShardId shardId, double cpuUtilization, String operation, boolean isFailed) { diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java index 82a0abe6..61ee6996 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java @@ -15,6 +15,7 @@ import org.opensearch.performanceanalyzer.OpenSearchResources; import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics.MetricUnits; import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.tasks.Task; @@ -38,12 +39,14 @@ public final class RTFPerformanceAnalyzerTransportRequestHandler actualHandler; private boolean logOnce = false; private final Histogram cpuUtilizationHistogram; + private final Histogram indexingLatencyHistogram; RTFPerformanceAnalyzerTransportRequestHandler( TransportRequestHandler actualHandler, PerformanceAnalyzerController controller) { this.actualHandler = actualHandler; this.controller = controller; this.cpuUtilizationHistogram = createCPUUtilizationHistogram(); + this.indexingLatencyHistogram = createIndexingLatencyHistogram(); } private Histogram createCPUUtilizationHistogram() { @@ -58,6 +61,18 @@ private Histogram createCPUUtilizationHistogram() { } } + private Histogram createIndexingLatencyHistogram() { + MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.OperationsValue.INDEXING_LATENCY.toString(), + "Indexing Latency per shard for an operation", + MetricUnits.MILLISECOND.toString()); + } else { + return null; + } + } + @Override public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { actualHandler.messageReceived(request, getChannel(request, channel, task), task); @@ -110,7 +125,12 @@ private TransportChannel getShardBulkChannel(T request, TransportChannel channel try { rtfPerformanceAnalyzerTransportChannel.set( - channel, cpuUtilizationHistogram, bsr.index(), bsr.shardId(), bPrimary); + channel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + bsr.index(), + bsr.shardId(), + bPrimary); } catch (Exception ex) { if (!logOnce) { LOG.error(ex); diff --git a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java index 26620455..9fa9fc62 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java @@ -49,6 +49,9 @@ public static void configureMetrics() { MetricsConfiguration.CONFIG_MAP.put(ShardIndexingPressureMetricsCollector.class, cdefault); MetricsConfiguration.CONFIG_MAP.put(RTFDisksCollector.class, cdefault); MetricsConfiguration.CONFIG_MAP.put(RTFHeapMetricsCollector.class, cdefault); + MetricsConfiguration.CONFIG_MAP.put( + RTFShardOperationRateCollector.class, + new MetricsConfiguration.MetricConfig(60000, 0)); MetricsConfiguration.CONFIG_MAP.put( RTFNodeStatsAllShardsMetricsCollector.class, new MetricsConfiguration.MetricConfig(60000, 0)); diff --git a/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollectorTests.java b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollectorTests.java new file mode 100644 index 00000000..96d5e2c5 --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollectorTests.java @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors.telemetry; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.indices.IndicesService; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +public class RTFShardOperationRateCollectorTests extends OpenSearchSingleNodeTestCase { + + private long startTimeInMills = 1153721339; + private static final String TEST_INDEX = "test"; + private RTFShardOperationRateCollector rtfShardOperationRateCollector; + + @Mock private MetricsRegistry metricsRegistry; + @Mock private Counter indexingRateCounter; + @Mock private Counter searchRateCounter; + @Mock private ConfigOverridesWrapper configOverridesWrapper; + @Mock private PerformanceAnalyzerController performanceAnalyzerController; + + @Before + public void init() { + MockitoAnnotations.initMocks(this); + + MetricsConfiguration.CONFIG_MAP.put( + RTFShardOperationRateCollector.class, MetricsConfiguration.cdefault); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + OpenSearchResources.INSTANCE.setIndicesService(indicesService); + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + + when(metricsRegistry.createCounter(anyString(), anyString(), anyString())) + .thenReturn(indexingRateCounter) + .thenReturn(searchRateCounter); + + when(metricsRegistry.createCounter(anyString(), anyString(), anyString())) + .thenAnswer( + invocationOnMock -> { + String counterName = (String) invocationOnMock.getArguments()[0]; + if (counterName.contains( + RTFMetrics.OperationsValue.Constants.INDEXING_RATE)) { + return indexingRateCounter; + } + return searchRateCounter; + }); + + when(performanceAnalyzerController.isCollectorDisabled(any(), anyString())) + .thenReturn(false); + + rtfShardOperationRateCollector = + spy( + new RTFShardOperationRateCollector( + performanceAnalyzerController, configOverridesWrapper)); + } + + @Test + public void testCollectMetrics() throws IOException { + createIndex(TEST_INDEX); + rtfShardOperationRateCollector.collectMetrics(startTimeInMills); + + verify(indexingRateCounter, never()).add(anyDouble(), any()); + verify(searchRateCounter, never()).add(anyDouble(), any()); + + startTimeInMills += 60000; + rtfShardOperationRateCollector.collectMetrics(startTimeInMills); + + verify(indexingRateCounter, atLeastOnce()).add(anyDouble(), any()); + verify(searchRateCounter, atLeastOnce()).add(anyDouble(), any()); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java index aa4e425b..8a19de59 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java @@ -32,6 +32,7 @@ public class RTFPerformanceAnalyzerTransportChannelTests { @Mock private TransportChannel originalChannel; @Mock private TransportResponse response; @Mock private Histogram cpuUtilizationHistogram; + @Mock private Histogram indexingLatencyHistogram; private ShardId shardId; @Mock private ShardId mockedShardId; @Mock private Index index; @@ -46,7 +47,13 @@ public void init() { String indexName = "testIndex"; shardId = new ShardId(new Index(indexName, "uuid"), 1); channel = new RTFPerformanceAnalyzerTransportChannel(); - channel.set(originalChannel, cpuUtilizationHistogram, indexName, shardId, false); + channel.set( + originalChannel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + indexName, + shardId, + false); assertEquals("RTFPerformanceAnalyzerTransportChannelProfile", channel.getProfileName()); assertEquals("RTFPerformanceAnalyzerTransportChannelType", channel.getChannelType()); assertEquals(originalChannel, channel.getInnerChannel()); @@ -71,7 +78,13 @@ public void testResponseWithException() throws IOException { public void testRecordCPUUtilizationMetric() { RTFPerformanceAnalyzerTransportChannel channel = new RTFPerformanceAnalyzerTransportChannel(); - channel.set(originalChannel, cpuUtilizationHistogram, "testIndex", mockedShardId, false); + channel.set( + originalChannel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + "testIndex", + mockedShardId, + false); Mockito.when(mockedShardId.getIndex()).thenReturn(index); Mockito.when(index.getName()).thenReturn("myTestIndex"); Mockito.when(index.getUUID()).thenReturn("abc-def"); @@ -79,4 +92,23 @@ public void testRecordCPUUtilizationMetric() { Mockito.verify(cpuUtilizationHistogram) .record(Mockito.anyDouble(), Mockito.any(Tags.class)); } + + @Test + public void testRecordIndexingLatencyMetric() { + RTFPerformanceAnalyzerTransportChannel channel = + new RTFPerformanceAnalyzerTransportChannel(); + channel.set( + originalChannel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + "testIndex", + mockedShardId, + false); + Mockito.when(mockedShardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + channel.recordIndexingLatencyMetric(123.456); + Mockito.verify(indexingLatencyHistogram) + .record(Mockito.anyDouble(), Mockito.any(Tags.class)); + } } From 97bfbe7437f6bc63b0a387f52785542bd51ab478 Mon Sep 17 00:00:00 2001 From: Arpit Patawat Date: Sun, 20 Apr 2025 23:43:27 +0530 Subject: [PATCH 2/4] Remove collection time check from rate calculation Signed-off-by: Arpit Patawat --- .../RTFShardOperationRateCollector.java | 56 ++++++++----------- ...TFPerformanceAnalyzerTransportChannel.java | 45 +++++++-------- .../performanceanalyzer/util/Utils.java | 2 +- .../RTFShardOperationRateCollectorTests.java | 25 ++++++++- ...formanceAnalyzerTransportChannelTests.java | 2 +- 5 files changed, 71 insertions(+), 59 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollector.java index 155f6c53..c398897a 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollector.java +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollector.java @@ -26,7 +26,11 @@ import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.metrics.tags.Tags; -/** This collector measures indexing and search rate per shard per minute. */ +/** + * This collector measures indexing and search rate per shard. The metric measurement is difference + * between current and last window's operation. For example - if the last window had operation count + * as 10, and now it changed to 12, then collector will publish 2 ops/interval. + */ public class RTFShardOperationRateCollector extends PerformanceAnalyzerMetricsCollector implements TelemetryCollector { @@ -35,8 +39,8 @@ public class RTFShardOperationRateCollector extends PerformanceAnalyzerMetricsCo MetricsConfiguration.CONFIG_MAP.get(RTFShardOperationRateCollector.class) .samplingInterval; - private Counter indexingRateHistogram; - private Counter searchRateHistogram; + private Counter indexingRateCounter; + private Counter searchRateCounter; private final Map prevIndexingOps; private final Map prevSearchOps; @@ -81,10 +85,6 @@ public void collectMetrics(long startTime) { initializeMetricsIfNeeded(); LOG.debug("Executing collect metrics for RTFShardOperationRateCollector"); - long currentTimeInMillis = System.currentTimeMillis(); - float minutesSinceLastCollection = - (currentTimeInMillis - lastCollectionTimeInMillis) / (1000.0f * 60.0f); - // Get all shards Map currentShards = Utils.getShards(); @@ -97,12 +97,11 @@ public void collectMetrics(long startTime) { long currentSearchOps = shard.searchStats().getTotal().getQueryCount(); if (prevIndexingOps.containsKey(shardId)) { - processIndexingOperations( - shardId, currentIndexingOps, minutesSinceLastCollection); + processIndexingOperations(shardId, currentIndexingOps); } if (prevSearchOps.containsKey(shardId)) { - processSearchOperations(shardId, currentSearchOps, minutesSinceLastCollection); + processSearchOperations(shardId, currentSearchOps); } // Update previous values for next collection @@ -115,34 +114,27 @@ public void collectMetrics(long startTime) { e.getMessage()); } } - - lastCollectionTimeInMillis = currentTimeInMillis; } - private void processIndexingOperations( - ShardId shardId, long currentIndexingOps, float minutesSinceLastCollection) { + private void processIndexingOperations(ShardId shardId, long currentIndexingOps) { long indexingOpsDiff = Math.max(0, currentIndexingOps - prevIndexingOps.get(shardId)); - float indexingRatePerMinute = indexingOpsDiff / minutesSinceLastCollection; - - // Round to 2 decimal places - indexingRatePerMinute = Math.round(indexingRatePerMinute * 100.0f) / 100.0f; - Tags tags = createTags(shardId); - indexingRateHistogram.add(indexingRatePerMinute, tags); + if (indexingOpsDiff > 0) { + Tags tags = createTags(shardId); + indexingRateCounter.add(indexingOpsDiff, tags); + } } - private void processSearchOperations( - ShardId shardId, long currentSearchOps, float minutesSinceLastCollection) { + private void processSearchOperations(ShardId shardId, long currentSearchOps) { long searchOpsDiff = Math.max(0, currentSearchOps - prevSearchOps.get(shardId)); - float searchRatePerMinute = searchOpsDiff / minutesSinceLastCollection; - - // Round to 2 decimal places - searchRatePerMinute = Math.round(searchRatePerMinute * 100.0f) / 100.0f; - Tags tags = createTags(shardId); - searchRateHistogram.add(searchRatePerMinute, tags); + if (searchOpsDiff > 0) { + Tags tags = createTags(shardId); + searchRateCounter.add(searchOpsDiff, tags); + } } + // attributes= {index_name="test", shard_id="0"} private Tags createTags(ShardId shardId) { return Tags.create() .addTag(RTFMetrics.CommonDimension.INDEX_NAME.toString(), shardId.getIndexName()) @@ -153,16 +145,16 @@ private Tags createTags(ShardId shardId) { private void initializeMetricsIfNeeded() { if (!metricsInitialized) { - indexingRateHistogram = + indexingRateCounter = metricsRegistry.createCounter( RTFMetrics.OperationsValue.Constants.INDEXING_RATE, - "Indexing operations per minute per shard", + "Indexing operations per shard", MetricUnits.RATE.toString()); - searchRateHistogram = + searchRateCounter = metricsRegistry.createCounter( RTFMetrics.OperationsValue.Constants.SEARCH_RATE, - "Search operations per minute per shard", + "Search operations per shard", MetricUnits.RATE.toString()); metricsInitialized = true; diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java index 4b56ed87..71463608 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java @@ -96,7 +96,7 @@ private void emitMetrics(boolean isFailed) { long latencyInNanos = System.nanoTime() - operationStartTime; double latencyInMillis = latencyInNanos / 1_000_000.0; - recordIndexingLatencyMetric(latencyInMillis); + recordIndexingLatencyMetric(shardId, latencyInMillis, OPERATION_SHARD_BULK, isFailed); } private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTime) { @@ -108,34 +108,31 @@ private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTi } @VisibleForTesting - void recordIndexingLatencyMetric(double indexingLatency) { - indexingLatencyHistogram.record( - indexingLatency, - Tags.create() - .addTag(RTFMetrics.CommonDimension.INDEX_NAME.toString(), indexName) - .addTag( - RTFMetrics.CommonDimension.SHARD_ID.toString(), - shardId.toString())); + void recordIndexingLatencyMetric( + ShardId shardId, double indexingLatency, String operation, boolean isFailed) { + indexingLatencyHistogram.record(indexingLatency, createTags(shardId, operation, isFailed)); } @VisibleForTesting void recordCPUUtilizationMetric( ShardId shardId, double cpuUtilization, String operation, boolean isFailed) { - cpuUtilizationHistogram.record( - cpuUtilization, - Tags.create() - .addTag( - RTFMetrics.CommonDimension.INDEX_NAME.toString(), - shardId.getIndex().getName()) - .addTag( - RTFMetrics.CommonDimension.INDEX_UUID.toString(), - shardId.getIndex().getUUID()) - .addTag(RTFMetrics.CommonDimension.SHARD_ID.toString(), shardId.getId()) - .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation) - .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed) - .addTag( - RTFMetrics.CommonDimension.SHARD_ROLE.toString(), - primary ? SHARD_ROLE_PRIMARY : SHARD_ROLE_REPLICA)); + cpuUtilizationHistogram.record(cpuUtilization, createTags(shardId, operation, isFailed)); + } + + private Tags createTags(ShardId shardId, String operation, boolean isFailed) { + return Tags.create() + .addTag( + RTFMetrics.CommonDimension.INDEX_NAME.toString(), + shardId.getIndex().getName()) + .addTag( + RTFMetrics.CommonDimension.INDEX_UUID.toString(), + shardId.getIndex().getUUID()) + .addTag(RTFMetrics.CommonDimension.SHARD_ID.toString(), shardId.getId()) + .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation) + .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed) + .addTag( + RTFMetrics.CommonDimension.SHARD_ROLE.toString(), + primary ? SHARD_ROLE_PRIMARY : SHARD_ROLE_REPLICA); } // This function is called from the security plugin using reflection. Do not diff --git a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java index 9fa9fc62..fb04acc3 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java @@ -51,7 +51,7 @@ public static void configureMetrics() { MetricsConfiguration.CONFIG_MAP.put(RTFHeapMetricsCollector.class, cdefault); MetricsConfiguration.CONFIG_MAP.put( RTFShardOperationRateCollector.class, - new MetricsConfiguration.MetricConfig(60000, 0)); + new MetricsConfiguration.MetricConfig(5000, 0)); MetricsConfiguration.CONFIG_MAP.put( RTFNodeStatsAllShardsMetricsCollector.class, new MetricsConfiguration.MetricConfig(60000, 0)); diff --git a/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollectorTests.java b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollectorTests.java index 96d5e2c5..3803739b 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollectorTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollectorTests.java @@ -20,6 +20,8 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.query.QueryBuilders; import org.opensearch.indices.IndicesService; import org.opensearch.performanceanalyzer.OpenSearchResources; import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper; @@ -81,10 +83,31 @@ public void testCollectMetrics() throws IOException { createIndex(TEST_INDEX); rtfShardOperationRateCollector.collectMetrics(startTimeInMills); + // first time collection does not publish metrics verify(indexingRateCounter, never()).add(anyDouble(), any()); verify(searchRateCounter, never()).add(anyDouble(), any()); - startTimeInMills += 60000; + startTimeInMills += 5000; + rtfShardOperationRateCollector.collectMetrics(startTimeInMills); + + // 0 operation count does not publish metrics + verify(indexingRateCounter, never()).add(anyDouble(), any()); + verify(searchRateCounter, never()).add(anyDouble(), any()); + + // creating indexing and search operation + client().prepareIndex(TEST_INDEX) + .setId("1") + .setSource("{\"field\":\"value1\"}", XContentType.JSON) + .get(); + client().prepareIndex(TEST_INDEX) + .setId("2") + .setSource("{\"field\":\"value2\"}", XContentType.JSON) + .get(); + + client().admin().indices().prepareRefresh(TEST_INDEX).get(); + client().prepareSearch(TEST_INDEX).setQuery(QueryBuilders.matchAllQuery()).get(); + + startTimeInMills += 5000; rtfShardOperationRateCollector.collectMetrics(startTimeInMills); verify(indexingRateCounter, atLeastOnce()).add(anyDouble(), any()); diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java index 8a19de59..571d1d91 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java @@ -107,7 +107,7 @@ public void testRecordIndexingLatencyMetric() { Mockito.when(mockedShardId.getIndex()).thenReturn(index); Mockito.when(index.getName()).thenReturn("myTestIndex"); Mockito.when(index.getUUID()).thenReturn("abc-def"); - channel.recordIndexingLatencyMetric(123.456); + channel.recordIndexingLatencyMetric(mockedShardId, 123.456, "bulkShard", false); Mockito.verify(indexingLatencyHistogram) .record(Mockito.anyDouble(), Mockito.any(Tags.class)); } From ffc3e3fb00fe0ad4c04d19112779d7e763dd515a Mon Sep 17 00:00:00 2001 From: Arpit Patawat Date: Tue, 22 Apr 2025 14:40:10 +0530 Subject: [PATCH 3/4] Add search latency metric Signed-off-by: Arpit Patawat --- .../RTFPerformanceAnalyzerSearchListener.java | 118 +++++++++++++++--- ...erformanceAnalyzerSearchListenerTests.java | 35 ++++++ 2 files changed, 136 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java index 6b7921cc..7936e647 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -17,6 +17,7 @@ import org.opensearch.performanceanalyzer.OpenSearchResources; import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics.OperationsValue; import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.performanceanalyzer.util.Utils; @@ -47,6 +48,10 @@ public class RTFPerformanceAnalyzerSearchListener private final PerformanceAnalyzerController controller; private final Histogram cpuUtilizationHistogram; private final Histogram heapUsedHistogram; + private final Histogram queryPhaseHistogram; + private final Histogram fetchPhaseHistogram; + private final Histogram queryPlusFetchPhaseHistogram; + private final Histogram searchLatencyHistogram; private final int numProcessors; public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController controller) { @@ -55,6 +60,15 @@ public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController createCPUUtilizationHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); this.heapUsedHistogram = createHeapUsedHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); + this.queryPhaseHistogram = + createQueryPhaseHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); + this.fetchPhaseHistogram = + createFetchPhaseHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); + this.queryPlusFetchPhaseHistogram = + createQueryPlusFetchPhaseHistogram( + OpenSearchResources.INSTANCE.getMetricsRegistry()); + this.searchLatencyHistogram = + createSearchLatencyHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); this.threadLocal = ThreadLocal.withInitial(() -> new HashMap()); this.numProcessors = Runtime.getRuntime().availableProcessors(); } @@ -83,6 +97,58 @@ private Histogram createHeapUsedHistogram(MetricsRegistry metricsRegistry) { } } + private Histogram createQueryPhaseHistogram(MetricsRegistry metricsRegistry) { + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.OperationsValue.QUERY_PHASE_LATENCY.toString(), + "Query phase latency per shard", + RTFMetrics.MetricUnits.MILLISECOND.toString()); + } else { + LOG.debug("MetricsRegistry is null"); + return null; + } + } + + private Histogram createFetchPhaseHistogram(MetricsRegistry metricsRegistry) { + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.OperationsValue.FETCH_PHASE_LATENCY.toString(), + "Fetch phase latency per shard", + RTFMetrics.MetricUnits.MILLISECOND.toString()); + } else { + LOG.debug("MetricsRegistry is null"); + return null; + } + } + + // This histogram will help to get the sum of latencies for query and fetch phase using getSum + // over an interval. + private Histogram createQueryPlusFetchPhaseHistogram(MetricsRegistry metricsRegistry) { + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.OperationsValue.QUERY_PLUS_FETCH_PHASE_LATENCY.toString(), + "Query plus fetch phase latency per shard", + RTFMetrics.MetricUnits.MILLISECOND.toString()); + } else { + LOG.debug("MetricsRegistry is null"); + return null; + } + } + + // This histogram will help to get the total latency for search request using getMax over an + // interval. + private Histogram createSearchLatencyHistogram(MetricsRegistry metricsRegistry) { + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + OperationsValue.SEARCH_LATENCY.toString(), + "Total Search latency per shard", + RTFMetrics.MetricUnits.MILLISECOND.toString()); + } else { + LOG.debug("MetricsRegistry is null"); + return null; + } + } + @Override public String toString() { return RTFPerformanceAnalyzerSearchListener.class.getSimpleName(); @@ -171,6 +237,13 @@ public void preQueryPhase(SearchContext searchContext) { public void queryPhase(SearchContext searchContext, long tookInNanos) { long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); long queryTime = (System.nanoTime() - queryStartTime); + double queryTimeInMills = queryTime / 1_000_000.0; + + queryPhaseHistogram.record( + queryTimeInMills, createTags(searchContext, SHARD_QUERY_PHASE, false)); + queryPlusFetchPhaseHistogram.record( + queryTimeInMills, createTags(searchContext, SHARD_QUERY_PHASE, false)); + addResourceTrackingCompletionListener( searchContext, queryStartTime, queryTime, SHARD_QUERY_PHASE, false); } @@ -192,6 +265,13 @@ public void preFetchPhase(SearchContext searchContext) { public void fetchPhase(SearchContext searchContext, long tookInNanos) { long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime()); long fetchTime = (System.nanoTime() - fetchStartTime); + double fetchTimeInMills = fetchTime / 1_000_000.0; + + fetchPhaseHistogram.record( + fetchTimeInMills, createTags(searchContext, SHARD_FETCH_PHASE, false)); + queryPlusFetchPhaseHistogram.record( + fetchTimeInMills, createTags(searchContext, SHARD_FETCH_PHASE, false)); + addResourceTrackingCompletionListenerForFetchPhase( searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, false); } @@ -262,32 +342,21 @@ protected void innerOnResponse(Task task) { * overall start time. */ long totalTime = System.nanoTime() - startTime; + double totalTimeInMills = totalTime / 1_000_000.0; double shareFactor = computeShareFactor(phaseTookTime, totalTime); + + searchLatencyHistogram.record( + totalTimeInMills, createTags(searchContext, phase, isFailed)); cpuUtilizationHistogram.record( Utils.calculateCPUUtilization( numProcessors, totalTime, task.getTotalResourceStats().getCpuTimeInNanos(), shareFactor), - createTags()); + createTags(searchContext, phase, isFailed)); heapUsedHistogram.record( Math.max(0, task.getTotalResourceStats().getMemoryInBytes() * shareFactor), - createTags()); - } - - private Tags createTags() { - return Tags.create() - .addTag( - RTFMetrics.CommonDimension.INDEX_NAME.toString(), - searchContext.request().shardId().getIndex().getName()) - .addTag( - RTFMetrics.CommonDimension.INDEX_UUID.toString(), - searchContext.request().shardId().getIndex().getUUID()) - .addTag( - RTFMetrics.CommonDimension.SHARD_ID.toString(), - searchContext.request().shardId().getId()) - .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase) - .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed); + createTags(searchContext, phase, isFailed)); } @Override @@ -297,6 +366,21 @@ protected void innerOnFailure(Exception e) { }; } + private Tags createTags(SearchContext searchContext, String phase, boolean isFailed) { + return Tags.create() + .addTag( + RTFMetrics.CommonDimension.INDEX_NAME.toString(), + searchContext.request().shardId().getIndex().getName()) + .addTag( + RTFMetrics.CommonDimension.INDEX_UUID.toString(), + searchContext.request().shardId().getIndex().getUUID()) + .addTag( + RTFMetrics.CommonDimension.SHARD_ID.toString(), + searchContext.request().shardId().getId()) + .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase) + .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed); + } + @VisibleForTesting static double computeShareFactor(long phaseTookTime, long totalTime) { return Math.min(1, ((double) phaseTookTime) / Math.max(1.0, totalTime)); diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java index 16aba4bc..5c2f34e9 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java @@ -42,6 +42,10 @@ public class RTFPerformanceAnalyzerSearchListenerTests { @Mock private MetricsRegistry metricsRegistry; @Mock private Histogram cpuUtilizationHistogram; @Mock private Histogram heapUsedHistogram; + @Mock private Histogram queryPhaseHistogram; + @Mock private Histogram fetchPhaseHistogram; + @Mock private Histogram queryPlusFetchPhaseHistogram; + @Mock private Histogram searchLatencyHistogram; @Mock private Index index; @Mock private TaskResourceUsage taskResourceUsage; @@ -68,6 +72,30 @@ public void init() { metricsRegistry.createHistogram( Mockito.eq("heap_allocated"), Mockito.anyString(), Mockito.eq("B"))) .thenReturn(heapUsedHistogram); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("query_phase_latency"), + Mockito.anyString(), + Mockito.eq("ms"))) + .thenReturn(queryPhaseHistogram); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("fetch_phase_latency"), + Mockito.anyString(), + Mockito.eq("ms"))) + .thenReturn(fetchPhaseHistogram); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("query_plus_fetch_phase_histogram"), + Mockito.anyString(), + Mockito.eq("ms"))) + .thenReturn(queryPlusFetchPhaseHistogram); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("search_latency"), + Mockito.anyString(), + Mockito.eq("ms"))) + .thenReturn(searchLatencyHistogram); searchListener = new RTFPerformanceAnalyzerSearchListener(controller); assertEquals( RTFPerformanceAnalyzerSearchListener.class.getSimpleName(), @@ -99,6 +127,9 @@ public void testQueryPhase() { searchListener.preQueryPhase(searchContext); searchListener.queryPhase(searchContext, 0l); Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + Mockito.verify(queryPhaseHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); + Mockito.verify(queryPlusFetchPhaseHistogram) + .record(Mockito.anyDouble(), Mockito.any(Tags.class)); } @Test @@ -119,6 +150,9 @@ public void testFetchPhase() { searchListener.preFetchPhase(searchContext); searchListener.fetchPhase(searchContext, 0l); Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + Mockito.verify(fetchPhaseHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); + Mockito.verify(queryPlusFetchPhaseHistogram) + .record(Mockito.anyDouble(), Mockito.any(Tags.class)); } @Test @@ -158,6 +192,7 @@ public void testTaskCompletionListener() { NotifyOnceListener taskCompletionListener = rtfSearchListener.createListener(searchContext, 0l, 0l, "test", false); taskCompletionListener.onResponse(task); + Mockito.verify(searchLatencyHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); Mockito.verify(cpuUtilizationHistogram) .record(Mockito.anyDouble(), Mockito.any(Tags.class)); Mockito.verify(heapUsedHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); From f704418a2eb77440e4a14a4d953afb07eb709892 Mon Sep 17 00:00:00 2001 From: Arpit Patawat Date: Tue, 29 Apr 2025 05:53:38 +0530 Subject: [PATCH 4/4] Change phase to be same for query and fetch phase Signed-off-by: Arpit Patawat --- .../listener/RTFPerformanceAnalyzerSearchListener.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java index 7936e647..86abd15a 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -39,6 +39,7 @@ public class RTFPerformanceAnalyzerSearchListener LogManager.getLogger(RTFPerformanceAnalyzerSearchListener.class); private static final String SHARD_FETCH_PHASE = "shard_fetch"; private static final String SHARD_QUERY_PHASE = "shard_query"; + private static final String SHARD_QUERY_PLUS_FETCH_PHASE = "shard_query_plus_fetch"; public static final String QUERY_START_TIME = "query_start_time"; public static final String FETCH_START_TIME = "fetch_start_time"; public static final String QUERY_TASK_ID = "query_task_id"; @@ -242,7 +243,7 @@ public void queryPhase(SearchContext searchContext, long tookInNanos) { queryPhaseHistogram.record( queryTimeInMills, createTags(searchContext, SHARD_QUERY_PHASE, false)); queryPlusFetchPhaseHistogram.record( - queryTimeInMills, createTags(searchContext, SHARD_QUERY_PHASE, false)); + queryTimeInMills, createTags(searchContext, SHARD_QUERY_PLUS_FETCH_PHASE, false)); addResourceTrackingCompletionListener( searchContext, queryStartTime, queryTime, SHARD_QUERY_PHASE, false); @@ -270,7 +271,7 @@ public void fetchPhase(SearchContext searchContext, long tookInNanos) { fetchPhaseHistogram.record( fetchTimeInMills, createTags(searchContext, SHARD_FETCH_PHASE, false)); queryPlusFetchPhaseHistogram.record( - fetchTimeInMills, createTags(searchContext, SHARD_FETCH_PHASE, false)); + fetchTimeInMills, createTags(searchContext, SHARD_QUERY_PLUS_FETCH_PHASE, false)); addResourceTrackingCompletionListenerForFetchPhase( searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, false); @@ -344,7 +345,7 @@ protected void innerOnResponse(Task task) { long totalTime = System.nanoTime() - startTime; double totalTimeInMills = totalTime / 1_000_000.0; double shareFactor = computeShareFactor(phaseTookTime, totalTime); - + searchLatencyHistogram.record( totalTimeInMills, createTags(searchContext, phase, isFailed)); cpuUtilizationHistogram.record(