diff --git a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java index 705c6d1d..5e78b4c0 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java +++ b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java @@ -58,6 +58,7 @@ import org.opensearch.performanceanalyzer.collectors.telemetry.RTFDisksCollector; import org.opensearch.performanceanalyzer.collectors.telemetry.RTFHeapMetricsCollector; import org.opensearch.performanceanalyzer.collectors.telemetry.RTFNodeStatsAllShardsMetricsCollector; +import org.opensearch.performanceanalyzer.collectors.telemetry.RTFShardOperationRateCollector; import org.opensearch.performanceanalyzer.collectors.telemetry.RTFThreadPoolMetricsCollector; import org.opensearch.performanceanalyzer.commons.OSMetricsGeneratorFactory; import org.opensearch.performanceanalyzer.commons.collectors.DisksCollector; @@ -230,6 +231,9 @@ private void scheduleTelemetryCollectors() { new RTFDisksCollector(performanceAnalyzerController, configOverridesWrapper)); scheduledMetricCollectorsExecutor.addScheduledMetricCollector( new RTFHeapMetricsCollector(performanceAnalyzerController, configOverridesWrapper)); + scheduledMetricCollectorsExecutor.addScheduledMetricCollector( + new RTFShardOperationRateCollector( + performanceAnalyzerController, configOverridesWrapper)); scheduledMetricCollectorsExecutor.addScheduledMetricCollector( new RTFThreadPoolMetricsCollector( performanceAnalyzerController, configOverridesWrapper)); diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollector.java new file mode 100644 index 00000000..c398897a --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollector.java @@ -0,0 +1,163 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors.telemetry; + +import java.util.HashMap; +import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.collectors.PerformanceAnalyzerMetricsCollector; +import org.opensearch.performanceanalyzer.commons.collectors.TelemetryCollector; +import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics.MetricUnits; +import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode; +import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.performanceanalyzer.util.Utils; +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +/** + * This collector measures indexing and search rate per shard. The metric measurement is difference + * between current and last window's operation. For example - if the last window had operation count + * as 10, and now it changed to 12, then collector will publish 2 ops/interval. + */ +public class RTFShardOperationRateCollector extends PerformanceAnalyzerMetricsCollector + implements TelemetryCollector { + + private static final Logger LOG = LogManager.getLogger(RTFShardOperationRateCollector.class); + public static final int SAMPLING_TIME_INTERVAL = + MetricsConfiguration.CONFIG_MAP.get(RTFShardOperationRateCollector.class) + .samplingInterval; + + private Counter indexingRateCounter; + private Counter searchRateCounter; + + private final Map prevIndexingOps; + private final Map prevSearchOps; + private long lastCollectionTimeInMillis; + + private MetricsRegistry metricsRegistry; + private boolean metricsInitialized; + private final PerformanceAnalyzerController controller; + private final ConfigOverridesWrapper configOverridesWrapper; + + public RTFShardOperationRateCollector( + PerformanceAnalyzerController controller, + ConfigOverridesWrapper configOverridesWrapper) { + super( + SAMPLING_TIME_INTERVAL, + "RTFShardOperationRateCollector", + StatMetrics.RTF_SHARD_OPERATION_RATE_COLLECTOR_EXECUTION_TIME, + StatExceptionCode.RTF_SHARD_OPERATION_RATE_COLLECTOR_ERROR); + + this.controller = controller; + this.configOverridesWrapper = configOverridesWrapper; + this.metricsInitialized = false; + + this.prevIndexingOps = new HashMap<>(); + this.prevSearchOps = new HashMap<>(); + this.lastCollectionTimeInMillis = System.currentTimeMillis(); + } + + @Override + public void collectMetrics(long startTime) { + if (controller.isCollectorDisabled(configOverridesWrapper, getCollectorName())) { + LOG.info("RTFShardOperationRateCollector is disabled. Skipping collection."); + return; + } + + metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry == null) { + LOG.error("Could not get the instance of MetricsRegistry class"); + return; + } + + initializeMetricsIfNeeded(); + LOG.debug("Executing collect metrics for RTFShardOperationRateCollector"); + + // Get all shards + Map currentShards = Utils.getShards(); + + for (Map.Entry entry : currentShards.entrySet()) { + ShardId shardId = entry.getKey(); + IndexShard shard = entry.getValue(); + + try { + long currentIndexingOps = shard.indexingStats().getTotal().getIndexCount(); + long currentSearchOps = shard.searchStats().getTotal().getQueryCount(); + + if (prevIndexingOps.containsKey(shardId)) { + processIndexingOperations(shardId, currentIndexingOps); + } + + if (prevSearchOps.containsKey(shardId)) { + processSearchOperations(shardId, currentSearchOps); + } + + // Update previous values for next collection + prevIndexingOps.put(shardId, currentIndexingOps); + prevSearchOps.put(shardId, currentSearchOps); + } catch (Exception e) { + LOG.error( + "Error collecting indexing/search rate metrics for shard {}: {}", + shardId, + e.getMessage()); + } + } + } + + private void processIndexingOperations(ShardId shardId, long currentIndexingOps) { + long indexingOpsDiff = Math.max(0, currentIndexingOps - prevIndexingOps.get(shardId)); + + if (indexingOpsDiff > 0) { + Tags tags = createTags(shardId); + indexingRateCounter.add(indexingOpsDiff, tags); + } + } + + private void processSearchOperations(ShardId shardId, long currentSearchOps) { + long searchOpsDiff = Math.max(0, currentSearchOps - prevSearchOps.get(shardId)); + + if (searchOpsDiff > 0) { + Tags tags = createTags(shardId); + searchRateCounter.add(searchOpsDiff, tags); + } + } + + // attributes= {index_name="test", shard_id="0"} + private Tags createTags(ShardId shardId) { + return Tags.create() + .addTag(RTFMetrics.CommonDimension.INDEX_NAME.toString(), shardId.getIndexName()) + .addTag( + RTFMetrics.CommonDimension.SHARD_ID.toString(), + String.valueOf(shardId.getId())); + } + + private void initializeMetricsIfNeeded() { + if (!metricsInitialized) { + indexingRateCounter = + metricsRegistry.createCounter( + RTFMetrics.OperationsValue.Constants.INDEXING_RATE, + "Indexing operations per shard", + MetricUnits.RATE.toString()); + + searchRateCounter = + metricsRegistry.createCounter( + RTFMetrics.OperationsValue.Constants.SEARCH_RATE, + "Search operations per shard", + MetricUnits.RATE.toString()); + + metricsInitialized = true; + } + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java index 6b7921cc..86abd15a 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -17,6 +17,7 @@ import org.opensearch.performanceanalyzer.OpenSearchResources; import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics.OperationsValue; import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.performanceanalyzer.util.Utils; @@ -38,6 +39,7 @@ public class RTFPerformanceAnalyzerSearchListener LogManager.getLogger(RTFPerformanceAnalyzerSearchListener.class); private static final String SHARD_FETCH_PHASE = "shard_fetch"; private static final String SHARD_QUERY_PHASE = "shard_query"; + private static final String SHARD_QUERY_PLUS_FETCH_PHASE = "shard_query_plus_fetch"; public static final String QUERY_START_TIME = "query_start_time"; public static final String FETCH_START_TIME = "fetch_start_time"; public static final String QUERY_TASK_ID = "query_task_id"; @@ -47,6 +49,10 @@ public class RTFPerformanceAnalyzerSearchListener private final PerformanceAnalyzerController controller; private final Histogram cpuUtilizationHistogram; private final Histogram heapUsedHistogram; + private final Histogram queryPhaseHistogram; + private final Histogram fetchPhaseHistogram; + private final Histogram queryPlusFetchPhaseHistogram; + private final Histogram searchLatencyHistogram; private final int numProcessors; public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController controller) { @@ -55,6 +61,15 @@ public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController createCPUUtilizationHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); this.heapUsedHistogram = createHeapUsedHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); + this.queryPhaseHistogram = + createQueryPhaseHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); + this.fetchPhaseHistogram = + createFetchPhaseHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); + this.queryPlusFetchPhaseHistogram = + createQueryPlusFetchPhaseHistogram( + OpenSearchResources.INSTANCE.getMetricsRegistry()); + this.searchLatencyHistogram = + createSearchLatencyHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); this.threadLocal = ThreadLocal.withInitial(() -> new HashMap()); this.numProcessors = Runtime.getRuntime().availableProcessors(); } @@ -83,6 +98,58 @@ private Histogram createHeapUsedHistogram(MetricsRegistry metricsRegistry) { } } + private Histogram createQueryPhaseHistogram(MetricsRegistry metricsRegistry) { + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.OperationsValue.QUERY_PHASE_LATENCY.toString(), + "Query phase latency per shard", + RTFMetrics.MetricUnits.MILLISECOND.toString()); + } else { + LOG.debug("MetricsRegistry is null"); + return null; + } + } + + private Histogram createFetchPhaseHistogram(MetricsRegistry metricsRegistry) { + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.OperationsValue.FETCH_PHASE_LATENCY.toString(), + "Fetch phase latency per shard", + RTFMetrics.MetricUnits.MILLISECOND.toString()); + } else { + LOG.debug("MetricsRegistry is null"); + return null; + } + } + + // This histogram will help to get the sum of latencies for query and fetch phase using getSum + // over an interval. + private Histogram createQueryPlusFetchPhaseHistogram(MetricsRegistry metricsRegistry) { + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.OperationsValue.QUERY_PLUS_FETCH_PHASE_LATENCY.toString(), + "Query plus fetch phase latency per shard", + RTFMetrics.MetricUnits.MILLISECOND.toString()); + } else { + LOG.debug("MetricsRegistry is null"); + return null; + } + } + + // This histogram will help to get the total latency for search request using getMax over an + // interval. + private Histogram createSearchLatencyHistogram(MetricsRegistry metricsRegistry) { + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + OperationsValue.SEARCH_LATENCY.toString(), + "Total Search latency per shard", + RTFMetrics.MetricUnits.MILLISECOND.toString()); + } else { + LOG.debug("MetricsRegistry is null"); + return null; + } + } + @Override public String toString() { return RTFPerformanceAnalyzerSearchListener.class.getSimpleName(); @@ -171,6 +238,13 @@ public void preQueryPhase(SearchContext searchContext) { public void queryPhase(SearchContext searchContext, long tookInNanos) { long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); long queryTime = (System.nanoTime() - queryStartTime); + double queryTimeInMills = queryTime / 1_000_000.0; + + queryPhaseHistogram.record( + queryTimeInMills, createTags(searchContext, SHARD_QUERY_PHASE, false)); + queryPlusFetchPhaseHistogram.record( + queryTimeInMills, createTags(searchContext, SHARD_QUERY_PLUS_FETCH_PHASE, false)); + addResourceTrackingCompletionListener( searchContext, queryStartTime, queryTime, SHARD_QUERY_PHASE, false); } @@ -192,6 +266,13 @@ public void preFetchPhase(SearchContext searchContext) { public void fetchPhase(SearchContext searchContext, long tookInNanos) { long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime()); long fetchTime = (System.nanoTime() - fetchStartTime); + double fetchTimeInMills = fetchTime / 1_000_000.0; + + fetchPhaseHistogram.record( + fetchTimeInMills, createTags(searchContext, SHARD_FETCH_PHASE, false)); + queryPlusFetchPhaseHistogram.record( + fetchTimeInMills, createTags(searchContext, SHARD_QUERY_PLUS_FETCH_PHASE, false)); + addResourceTrackingCompletionListenerForFetchPhase( searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, false); } @@ -262,32 +343,21 @@ protected void innerOnResponse(Task task) { * overall start time. */ long totalTime = System.nanoTime() - startTime; + double totalTimeInMills = totalTime / 1_000_000.0; double shareFactor = computeShareFactor(phaseTookTime, totalTime); + + searchLatencyHistogram.record( + totalTimeInMills, createTags(searchContext, phase, isFailed)); cpuUtilizationHistogram.record( Utils.calculateCPUUtilization( numProcessors, totalTime, task.getTotalResourceStats().getCpuTimeInNanos(), shareFactor), - createTags()); + createTags(searchContext, phase, isFailed)); heapUsedHistogram.record( Math.max(0, task.getTotalResourceStats().getMemoryInBytes() * shareFactor), - createTags()); - } - - private Tags createTags() { - return Tags.create() - .addTag( - RTFMetrics.CommonDimension.INDEX_NAME.toString(), - searchContext.request().shardId().getIndex().getName()) - .addTag( - RTFMetrics.CommonDimension.INDEX_UUID.toString(), - searchContext.request().shardId().getIndex().getUUID()) - .addTag( - RTFMetrics.CommonDimension.SHARD_ID.toString(), - searchContext.request().shardId().getId()) - .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase) - .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed); + createTags(searchContext, phase, isFailed)); } @Override @@ -297,6 +367,21 @@ protected void innerOnFailure(Exception e) { }; } + private Tags createTags(SearchContext searchContext, String phase, boolean isFailed) { + return Tags.create() + .addTag( + RTFMetrics.CommonDimension.INDEX_NAME.toString(), + searchContext.request().shardId().getIndex().getName()) + .addTag( + RTFMetrics.CommonDimension.INDEX_UUID.toString(), + searchContext.request().shardId().getIndex().getUUID()) + .addTag( + RTFMetrics.CommonDimension.SHARD_ID.toString(), + searchContext.request().shardId().getId()) + .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase) + .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed); + } + @VisibleForTesting static double computeShareFactor(long phaseTookTime, long totalTime) { return Math.min(1, ((double) phaseTookTime) / Math.max(1.0, totalTime)); diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java index 6eb64185..71463608 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java @@ -37,6 +37,7 @@ public final class RTFPerformanceAnalyzerTransportChannel implements TransportCh private long operationStartTime; private Histogram cpuUtilizationHistogram; + private Histogram indexingLatencyHistogram; private TransportChannel original; private String indexName; @@ -49,11 +50,13 @@ public final class RTFPerformanceAnalyzerTransportChannel implements TransportCh void set( TransportChannel original, Histogram cpuUtilizationHistogram, + Histogram indexingLatencyHistogram, String indexName, ShardId shardId, boolean bPrimary) { this.original = original; this.cpuUtilizationHistogram = cpuUtilizationHistogram; + this.indexingLatencyHistogram = indexingLatencyHistogram; this.indexName = indexName; this.shardId = shardId; this.primary = bPrimary; @@ -90,6 +93,10 @@ public void sendResponse(Exception exception) throws IOException { private void emitMetrics(boolean isFailed) { double cpuUtilization = calculateCPUUtilization(operationStartTime, cpuStartTime); recordCPUUtilizationMetric(shardId, cpuUtilization, OPERATION_SHARD_BULK, isFailed); + + long latencyInNanos = System.nanoTime() - operationStartTime; + double latencyInMillis = latencyInNanos / 1_000_000.0; + recordIndexingLatencyMetric(shardId, latencyInMillis, OPERATION_SHARD_BULK, isFailed); } private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTime) { @@ -100,24 +107,32 @@ private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTi numProcessors, (System.nanoTime() - phaseStartTime), totalCpuTime, 1.0); } + @VisibleForTesting + void recordIndexingLatencyMetric( + ShardId shardId, double indexingLatency, String operation, boolean isFailed) { + indexingLatencyHistogram.record(indexingLatency, createTags(shardId, operation, isFailed)); + } + @VisibleForTesting void recordCPUUtilizationMetric( ShardId shardId, double cpuUtilization, String operation, boolean isFailed) { - cpuUtilizationHistogram.record( - cpuUtilization, - Tags.create() - .addTag( - RTFMetrics.CommonDimension.INDEX_NAME.toString(), - shardId.getIndex().getName()) - .addTag( - RTFMetrics.CommonDimension.INDEX_UUID.toString(), - shardId.getIndex().getUUID()) - .addTag(RTFMetrics.CommonDimension.SHARD_ID.toString(), shardId.getId()) - .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation) - .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed) - .addTag( - RTFMetrics.CommonDimension.SHARD_ROLE.toString(), - primary ? SHARD_ROLE_PRIMARY : SHARD_ROLE_REPLICA)); + cpuUtilizationHistogram.record(cpuUtilization, createTags(shardId, operation, isFailed)); + } + + private Tags createTags(ShardId shardId, String operation, boolean isFailed) { + return Tags.create() + .addTag( + RTFMetrics.CommonDimension.INDEX_NAME.toString(), + shardId.getIndex().getName()) + .addTag( + RTFMetrics.CommonDimension.INDEX_UUID.toString(), + shardId.getIndex().getUUID()) + .addTag(RTFMetrics.CommonDimension.SHARD_ID.toString(), shardId.getId()) + .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation) + .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed) + .addTag( + RTFMetrics.CommonDimension.SHARD_ROLE.toString(), + primary ? SHARD_ROLE_PRIMARY : SHARD_ROLE_REPLICA); } // This function is called from the security plugin using reflection. Do not diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java index 82a0abe6..61ee6996 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java @@ -15,6 +15,7 @@ import org.opensearch.performanceanalyzer.OpenSearchResources; import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics.MetricUnits; import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.tasks.Task; @@ -38,12 +39,14 @@ public final class RTFPerformanceAnalyzerTransportRequestHandler actualHandler; private boolean logOnce = false; private final Histogram cpuUtilizationHistogram; + private final Histogram indexingLatencyHistogram; RTFPerformanceAnalyzerTransportRequestHandler( TransportRequestHandler actualHandler, PerformanceAnalyzerController controller) { this.actualHandler = actualHandler; this.controller = controller; this.cpuUtilizationHistogram = createCPUUtilizationHistogram(); + this.indexingLatencyHistogram = createIndexingLatencyHistogram(); } private Histogram createCPUUtilizationHistogram() { @@ -58,6 +61,18 @@ private Histogram createCPUUtilizationHistogram() { } } + private Histogram createIndexingLatencyHistogram() { + MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.OperationsValue.INDEXING_LATENCY.toString(), + "Indexing Latency per shard for an operation", + MetricUnits.MILLISECOND.toString()); + } else { + return null; + } + } + @Override public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { actualHandler.messageReceived(request, getChannel(request, channel, task), task); @@ -110,7 +125,12 @@ private TransportChannel getShardBulkChannel(T request, TransportChannel channel try { rtfPerformanceAnalyzerTransportChannel.set( - channel, cpuUtilizationHistogram, bsr.index(), bsr.shardId(), bPrimary); + channel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + bsr.index(), + bsr.shardId(), + bPrimary); } catch (Exception ex) { if (!logOnce) { LOG.error(ex); diff --git a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java index 26620455..fb04acc3 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java @@ -49,6 +49,9 @@ public static void configureMetrics() { MetricsConfiguration.CONFIG_MAP.put(ShardIndexingPressureMetricsCollector.class, cdefault); MetricsConfiguration.CONFIG_MAP.put(RTFDisksCollector.class, cdefault); MetricsConfiguration.CONFIG_MAP.put(RTFHeapMetricsCollector.class, cdefault); + MetricsConfiguration.CONFIG_MAP.put( + RTFShardOperationRateCollector.class, + new MetricsConfiguration.MetricConfig(5000, 0)); MetricsConfiguration.CONFIG_MAP.put( RTFNodeStatsAllShardsMetricsCollector.class, new MetricsConfiguration.MetricConfig(60000, 0)); diff --git a/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollectorTests.java b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollectorTests.java new file mode 100644 index 00000000..3803739b --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationRateCollectorTests.java @@ -0,0 +1,121 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors.telemetry; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.indices.IndicesService; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +public class RTFShardOperationRateCollectorTests extends OpenSearchSingleNodeTestCase { + + private long startTimeInMills = 1153721339; + private static final String TEST_INDEX = "test"; + private RTFShardOperationRateCollector rtfShardOperationRateCollector; + + @Mock private MetricsRegistry metricsRegistry; + @Mock private Counter indexingRateCounter; + @Mock private Counter searchRateCounter; + @Mock private ConfigOverridesWrapper configOverridesWrapper; + @Mock private PerformanceAnalyzerController performanceAnalyzerController; + + @Before + public void init() { + MockitoAnnotations.initMocks(this); + + MetricsConfiguration.CONFIG_MAP.put( + RTFShardOperationRateCollector.class, MetricsConfiguration.cdefault); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + OpenSearchResources.INSTANCE.setIndicesService(indicesService); + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + + when(metricsRegistry.createCounter(anyString(), anyString(), anyString())) + .thenReturn(indexingRateCounter) + .thenReturn(searchRateCounter); + + when(metricsRegistry.createCounter(anyString(), anyString(), anyString())) + .thenAnswer( + invocationOnMock -> { + String counterName = (String) invocationOnMock.getArguments()[0]; + if (counterName.contains( + RTFMetrics.OperationsValue.Constants.INDEXING_RATE)) { + return indexingRateCounter; + } + return searchRateCounter; + }); + + when(performanceAnalyzerController.isCollectorDisabled(any(), anyString())) + .thenReturn(false); + + rtfShardOperationRateCollector = + spy( + new RTFShardOperationRateCollector( + performanceAnalyzerController, configOverridesWrapper)); + } + + @Test + public void testCollectMetrics() throws IOException { + createIndex(TEST_INDEX); + rtfShardOperationRateCollector.collectMetrics(startTimeInMills); + + // first time collection does not publish metrics + verify(indexingRateCounter, never()).add(anyDouble(), any()); + verify(searchRateCounter, never()).add(anyDouble(), any()); + + startTimeInMills += 5000; + rtfShardOperationRateCollector.collectMetrics(startTimeInMills); + + // 0 operation count does not publish metrics + verify(indexingRateCounter, never()).add(anyDouble(), any()); + verify(searchRateCounter, never()).add(anyDouble(), any()); + + // creating indexing and search operation + client().prepareIndex(TEST_INDEX) + .setId("1") + .setSource("{\"field\":\"value1\"}", XContentType.JSON) + .get(); + client().prepareIndex(TEST_INDEX) + .setId("2") + .setSource("{\"field\":\"value2\"}", XContentType.JSON) + .get(); + + client().admin().indices().prepareRefresh(TEST_INDEX).get(); + client().prepareSearch(TEST_INDEX).setQuery(QueryBuilders.matchAllQuery()).get(); + + startTimeInMills += 5000; + rtfShardOperationRateCollector.collectMetrics(startTimeInMills); + + verify(indexingRateCounter, atLeastOnce()).add(anyDouble(), any()); + verify(searchRateCounter, atLeastOnce()).add(anyDouble(), any()); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java index 16aba4bc..5c2f34e9 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java @@ -42,6 +42,10 @@ public class RTFPerformanceAnalyzerSearchListenerTests { @Mock private MetricsRegistry metricsRegistry; @Mock private Histogram cpuUtilizationHistogram; @Mock private Histogram heapUsedHistogram; + @Mock private Histogram queryPhaseHistogram; + @Mock private Histogram fetchPhaseHistogram; + @Mock private Histogram queryPlusFetchPhaseHistogram; + @Mock private Histogram searchLatencyHistogram; @Mock private Index index; @Mock private TaskResourceUsage taskResourceUsage; @@ -68,6 +72,30 @@ public void init() { metricsRegistry.createHistogram( Mockito.eq("heap_allocated"), Mockito.anyString(), Mockito.eq("B"))) .thenReturn(heapUsedHistogram); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("query_phase_latency"), + Mockito.anyString(), + Mockito.eq("ms"))) + .thenReturn(queryPhaseHistogram); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("fetch_phase_latency"), + Mockito.anyString(), + Mockito.eq("ms"))) + .thenReturn(fetchPhaseHistogram); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("query_plus_fetch_phase_histogram"), + Mockito.anyString(), + Mockito.eq("ms"))) + .thenReturn(queryPlusFetchPhaseHistogram); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("search_latency"), + Mockito.anyString(), + Mockito.eq("ms"))) + .thenReturn(searchLatencyHistogram); searchListener = new RTFPerformanceAnalyzerSearchListener(controller); assertEquals( RTFPerformanceAnalyzerSearchListener.class.getSimpleName(), @@ -99,6 +127,9 @@ public void testQueryPhase() { searchListener.preQueryPhase(searchContext); searchListener.queryPhase(searchContext, 0l); Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + Mockito.verify(queryPhaseHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); + Mockito.verify(queryPlusFetchPhaseHistogram) + .record(Mockito.anyDouble(), Mockito.any(Tags.class)); } @Test @@ -119,6 +150,9 @@ public void testFetchPhase() { searchListener.preFetchPhase(searchContext); searchListener.fetchPhase(searchContext, 0l); Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + Mockito.verify(fetchPhaseHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); + Mockito.verify(queryPlusFetchPhaseHistogram) + .record(Mockito.anyDouble(), Mockito.any(Tags.class)); } @Test @@ -158,6 +192,7 @@ public void testTaskCompletionListener() { NotifyOnceListener taskCompletionListener = rtfSearchListener.createListener(searchContext, 0l, 0l, "test", false); taskCompletionListener.onResponse(task); + Mockito.verify(searchLatencyHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); Mockito.verify(cpuUtilizationHistogram) .record(Mockito.anyDouble(), Mockito.any(Tags.class)); Mockito.verify(heapUsedHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java index aa4e425b..571d1d91 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java @@ -32,6 +32,7 @@ public class RTFPerformanceAnalyzerTransportChannelTests { @Mock private TransportChannel originalChannel; @Mock private TransportResponse response; @Mock private Histogram cpuUtilizationHistogram; + @Mock private Histogram indexingLatencyHistogram; private ShardId shardId; @Mock private ShardId mockedShardId; @Mock private Index index; @@ -46,7 +47,13 @@ public void init() { String indexName = "testIndex"; shardId = new ShardId(new Index(indexName, "uuid"), 1); channel = new RTFPerformanceAnalyzerTransportChannel(); - channel.set(originalChannel, cpuUtilizationHistogram, indexName, shardId, false); + channel.set( + originalChannel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + indexName, + shardId, + false); assertEquals("RTFPerformanceAnalyzerTransportChannelProfile", channel.getProfileName()); assertEquals("RTFPerformanceAnalyzerTransportChannelType", channel.getChannelType()); assertEquals(originalChannel, channel.getInnerChannel()); @@ -71,7 +78,13 @@ public void testResponseWithException() throws IOException { public void testRecordCPUUtilizationMetric() { RTFPerformanceAnalyzerTransportChannel channel = new RTFPerformanceAnalyzerTransportChannel(); - channel.set(originalChannel, cpuUtilizationHistogram, "testIndex", mockedShardId, false); + channel.set( + originalChannel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + "testIndex", + mockedShardId, + false); Mockito.when(mockedShardId.getIndex()).thenReturn(index); Mockito.when(index.getName()).thenReturn("myTestIndex"); Mockito.when(index.getUUID()).thenReturn("abc-def"); @@ -79,4 +92,23 @@ public void testRecordCPUUtilizationMetric() { Mockito.verify(cpuUtilizationHistogram) .record(Mockito.anyDouble(), Mockito.any(Tags.class)); } + + @Test + public void testRecordIndexingLatencyMetric() { + RTFPerformanceAnalyzerTransportChannel channel = + new RTFPerformanceAnalyzerTransportChannel(); + channel.set( + originalChannel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + "testIndex", + mockedShardId, + false); + Mockito.when(mockedShardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + channel.recordIndexingLatencyMetric(mockedShardId, 123.456, "bulkShard", false); + Mockito.verify(indexingLatencyHistogram) + .record(Mockito.anyDouble(), Mockito.any(Tags.class)); + } }