From 59fcf914f8c60c39394dcaaf360edfc4bfcb0765 Mon Sep 17 00:00:00 2001 From: Gareth Ellis Date: Mon, 15 Dec 2025 11:39:27 +0100 Subject: [PATCH 1/4] Use wait_for_completion:False instead of relying on timeouts --- esrally/driver/runner.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 1897b4d5f..efa45cdb8 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -700,9 +700,6 @@ class ForceMerge(Runner): """ async def __call__(self, es, params): - # pylint: disable=import-outside-toplevel - import elasticsearch - max_num_segments = params.get("max-num-segments") mode = params.get("mode") merge_params = self._default_kw_params(params) @@ -710,11 +707,8 @@ async def __call__(self, es, params): merge_params["max_num_segments"] = max_num_segments if mode == "polling": complete = False - try: - await es.indices.forcemerge(**merge_params) - complete = True - except elasticsearch.ConnectionTimeout: - pass + merge_params["wait_for_completion"] = False + await es.indices.forcemerge(**merge_params) while not complete: await asyncio.sleep(params.get("poll-period")) tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"}) From 505d494f55ffae813fdd9f4fd84e134d754399db Mon Sep 17 00:00:00 2001 From: Gareth Ellis Date: Mon, 15 Dec 2025 11:51:23 +0100 Subject: [PATCH 2/4] Retain old behaviour if < 8.1 --- esrally/driver/runner.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index efa45cdb8..33c7cc15c 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -700,15 +700,27 @@ class ForceMerge(Runner): """ async def __call__(self, es, params): + # pylint: disable=import-outside-toplevel + import elasticsearch + max_num_segments = params.get("max-num-segments") mode = params.get("mode") merge_params = self._default_kw_params(params) if max_num_segments: merge_params["max_num_segments"] = max_num_segments if mode == "polling": - complete = False - merge_params["wait_for_completion"] = False - await es.indices.forcemerge(**merge_params) + es_info = await es.info() + es_version = Version.from_string(es_info["version"]["number"]) + if es_version < Version(8, 1, 0): + try: + await es.indices.forcemerge(**merge_params) + complete = True + except elasticsearch.ConnectionTimeout: + pass + else: + complete = False + merge_params["wait_for_completion"] = False + await es.indices.forcemerge(**merge_params) while not complete: await asyncio.sleep(params.get("poll-period")) tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"}) From ca4e7466566fa6266be2a02657d08f0246fd18a5 Mon Sep 17 00:00:00 2001 From: Gareth Ellis Date: Mon, 15 Dec 2025 14:01:58 +0100 Subject: [PATCH 3/4] Update tests to cover new behaviour --- esrally/driver/runner.py | 3 +- tests/driver/runner_test.py | 270 +++++++++++++++++++++++++++++++++++- 2 files changed, 270 insertions(+), 3 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 33c7cc15c..7ac83f892 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -709,9 +709,10 @@ async def __call__(self, es, params): if max_num_segments: merge_params["max_num_segments"] = max_num_segments if mode == "polling": + complete = False es_info = await es.info() es_version = Version.from_string(es_info["version"]["number"]) - if es_version < Version(8, 1, 0): + if es_version < Version(8, 1, 0): # before 8.1.0 wait_for_completion is not supported try: await es.indices.forcemerge(**merge_params) complete = True diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index adccabaac..72bcf5b9a 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -1563,15 +1563,262 @@ async def test_force_merge_with_params(self, es): @pytest.mark.asyncio async def test_force_merge_with_polling_no_timeout(self, es): es.indices.forcemerge = mock.AsyncMock() + es.info = mock.AsyncMock( + return_value={ + "name": "es01", + "cluster_name": "docker-cluster", + "cluster_uuid": "7KTGbgcOTgSC0_X8B57-Gg", + "version": { + "number": "8.1.0", + "build_flavor": "default", + "build_type": "docker", + "build_hash": "3700f7679f7d95e36da0b43762189bab189bc53a", + "build_date": "2022-03-03T14:20:00.690422633Z", + "build_snapshot": False, + "lucene_version": "9.0.0", + "minimum_wire_compatibility_version": "7.17.0", + "minimum_index_compatibility_version": "7.0.0", + }, + "tagline": "You Know, for Search", + }, + ) + es.tasks.list = mock.AsyncMock( + side_effect=[ + { + "nodes": { + "Ap3OfntPT7qL4CBeKvamxg": { + "name": "instance-0000000001", + "transport_address": "10.46.79.231:19693", + "host": "10.46.79.231", + "ip": "10.46.79.231:19693", + "roles": ["data", "ingest", "master", "remote_cluster_client", "transform"], + "attributes": { + "logical_availability_zone": "zone-1", + "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", + "availability_zone": "us-east4-a", + "xpack.installed": "true", + "instance_configuration": "gcp.data.highio.1", + "transform.node": "true", + "region": "unknown-region", + }, + "tasks": { + "Ap3OfntPT7qL4CBeKvamxg:417009036": { + "node": "Ap3OfntPT7qL4CBeKvamxg", + "id": 417009036, + "type": "transport", + "action": "indices:admin/forcemerge", + "start_time_in_millis": 1598018980850, + "running_time_in_nanos": 3659821411, + "cancellable": False, + "headers": {}, + } + }, + } + } + }, + { + "nodes": {}, + }, + ] + ) force_merge = runner.ForceMerge() await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0}) - es.indices.forcemerge.assert_awaited_once_with(index="_all") + es.indices.forcemerge.assert_awaited_once_with(index="_all", wait_for_completion=False) @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_force_merge_with_polling(self, es): + es.indices.forcemerge = mock.AsyncMock() + es.info = mock.AsyncMock( + return_value={ + "name": "es01", + "cluster_name": "docker-cluster", + "cluster_uuid": "7KTGbgcOTgSC0_X8B57-Gg", + "version": { + "number": "8.1.0", + "build_flavor": "default", + "build_type": "docker", + "build_hash": "3700f7679f7d95e36da0b43762189bab189bc53a", + "build_date": "2022-03-03T14:20:00.690422633Z", + "build_snapshot": False, + "lucene_version": "9.0.0", + "minimum_wire_compatibility_version": "7.17.0", + "minimum_index_compatibility_version": "7.0.0", + }, + "tagline": "You Know, for Search", + }, + ) + es.tasks.list = mock.AsyncMock( + side_effect=[ + { + "nodes": { + "Ap3OfntPT7qL4CBeKvamxg": { + "name": "instance-0000000001", + "transport_address": "10.46.79.231:19693", + "host": "10.46.79.231", + "ip": "10.46.79.231:19693", + "roles": ["data", "ingest", "master", "remote_cluster_client", "transform"], + "attributes": { + "logical_availability_zone": "zone-1", + "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", + "availability_zone": "us-east4-a", + "xpack.installed": "true", + "instance_configuration": "gcp.data.highio.1", + "transform.node": "true", + "region": "unknown-region", + }, + "tasks": { + "Ap3OfntPT7qL4CBeKvamxg:417009036": { + "node": "Ap3OfntPT7qL4CBeKvamxg", + "id": 417009036, + "type": "transport", + "action": "indices:admin/forcemerge", + "start_time_in_millis": 1598018980850, + "running_time_in_nanos": 3659821411, + "cancellable": False, + "headers": {}, + } + }, + } + } + }, + { + "nodes": {}, + }, + ] + ) + force_merge = runner.ForceMerge() + await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0}) + es.indices.forcemerge.assert_awaited_once_with(index="_all", wait_for_completion=False) + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_force_merge_with_polling_and_params(self, es): + es.indices.forcemerge = mock.AsyncMock() + es.info = mock.AsyncMock( + return_value={ + "name": "es01", + "cluster_name": "docker-cluster", + "cluster_uuid": "7KTGbgcOTgSC0_X8B57-Gg", + "version": { + "number": "8.1.0", + "build_flavor": "default", + "build_type": "docker", + "build_hash": "3700f7679f7d95e36da0b43762189bab189bc53a", + "build_date": "2022-03-03T14:20:00.690422633Z", + "build_snapshot": False, + "lucene_version": "9.0.0", + "minimum_wire_compatibility_version": "7.17.0", + "minimum_index_compatibility_version": "7.0.0", + }, + "tagline": "You Know, for Search", + }, + ) + es.tasks.list = mock.AsyncMock( + side_effect=[ + { + "nodes": { + "Ap3OfntPT7qL4CBeKvamxg": { + "name": "instance-0000000001", + "transport_address": "10.46.79.231:19693", + "host": "10.46.79.231", + "ip": "10.46.79.231:19693", + "roles": ["data", "ingest", "master", "remote_cluster_client", "transform"], + "attributes": { + "logical_availability_zone": "zone-1", + "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", + "availability_zone": "us-east4-a", + "xpack.installed": "true", + "instance_configuration": "gcp.data.highio.1", + "transform.node": "true", + "region": "unknown-region", + }, + "tasks": { + "Ap3OfntPT7qL4CBeKvamxg:417009036": { + "node": "Ap3OfntPT7qL4CBeKvamxg", + "id": 417009036, + "type": "transport", + "action": "indices:admin/forcemerge", + "start_time_in_millis": 1598018980850, + "running_time_in_nanos": 3659821411, + "cancellable": False, + "headers": {}, + } + }, + } + } + }, + { + "nodes": {}, + }, + ] + ) + force_merge = runner.ForceMerge() + # request-timeout should be ignored as mode:polling + await force_merge( + es, + params={ + "index": "_all", + "mode": "polling", + "max-num-segments": 1, + "request-timeout": 50000, + "poll-period": 0, + }, + ) + es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, request_timeout=50000, wait_for_completion=False) + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_force_merge_with_polling_no_timeout_pre_8_1(self, es): + es.indices.forcemerge = mock.AsyncMock() + es.info = mock.AsyncMock( + return_value={ + "name": "es01", + "cluster_name": "escluster", + "cluster_uuid": "4BgOtWNiQ6-zap9zDW2Q1A", + "version": { + "number": "7.17.3", + "build_flavor": "default", + "build_type": "tar", + "build_hash": "5ad023604c8d7416c9eb6c0eadb62b14e766caff", + "build_date": "2022-04-19T08:11:19.070913226Z", + "build_snapshot": False, + "lucene_version": "8.11.1", + "minimum_wire_compatibility_version": "6.8.0", + "minimum_index_compatibility_version": "6.0.0-beta1", + }, + "tagline": "You Know, for Search", + }, + ) + + force_merge = runner.ForceMerge() + await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0}) + es.indices.forcemerge.assert_awaited_once_with(index="_all") + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_force_merge_with_polling_pre_8_1(self, es): es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout")) + es.info = mock.AsyncMock( + return_value={ + "name": "es01", + "cluster_name": "escluster", + "cluster_uuid": "4BgOtWNiQ6-zap9zDW2Q1A", + "version": { + "number": "7.17.3", + "build_flavor": "default", + "build_type": "tar", + "build_hash": "5ad023604c8d7416c9eb6c0eadb62b14e766caff", + "build_date": "2022-04-19T08:11:19.070913226Z", + "build_snapshot": False, + "lucene_version": "8.11.1", + "minimum_wire_compatibility_version": "6.8.0", + "minimum_index_compatibility_version": "6.0.0-beta1", + }, + "tagline": "You Know, for Search", + }, + ) es.tasks.list = mock.AsyncMock( side_effect=[ { @@ -1617,8 +1864,27 @@ async def test_force_merge_with_polling(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio - async def test_force_merge_with_polling_and_params(self, es): + async def test_force_merge_with_polling_and_params_pre_8_1(self, es): es.indices.forcemerge = mock.AsyncMock(return_value=elasticsearch.ConnectionTimeout("connection timeout")) + es.info = mock.AsyncMock( + return_value={ + "name": "es01", + "cluster_name": "escluster", + "cluster_uuid": "4BgOtWNiQ6-zap9zDW2Q1A", + "version": { + "number": "7.17.3", + "build_flavor": "default", + "build_type": "tar", + "build_hash": "5ad023604c8d7416c9eb6c0eadb62b14e766caff", + "build_date": "2022-04-19T08:11:19.070913226Z", + "build_snapshot": False, + "lucene_version": "8.11.1", + "minimum_wire_compatibility_version": "6.8.0", + "minimum_index_compatibility_version": "6.0.0-beta1", + }, + "tagline": "You Know, for Search", + }, + ) es.tasks.list = mock.AsyncMock( side_effect=[ { From 3d036462dd8830bcf7751200e55b60a330233ad5 Mon Sep 17 00:00:00 2001 From: Gareth Ellis Date: Thu, 18 Dec 2025 10:34:51 +0100 Subject: [PATCH 4/4] Add ability to trace via task_id if present. Update tests --- esrally/driver/runner.py | 16 +- tests/driver/runner_test.py | 389 ++++++++++++------------------------ 2 files changed, 134 insertions(+), 271 deletions(-) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 7ac83f892..f53bcd113 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -709,6 +709,7 @@ async def __call__(self, es, params): if max_num_segments: merge_params["max_num_segments"] = max_num_segments if mode == "polling": + task_id = None complete = False es_info = await es.info() es_version = Version.from_string(es_info["version"]["number"]) @@ -721,13 +722,18 @@ async def __call__(self, es, params): else: complete = False merge_params["wait_for_completion"] = False - await es.indices.forcemerge(**merge_params) + response = await es.indices.forcemerge(**merge_params) + task_id = response.get("task") while not complete: await asyncio.sleep(params.get("poll-period")) - tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"}) - if len(tasks["nodes"]) == 0: - # empty nodes response indicates no tasks - complete = True + if task_id: + tasks = await es.tasks.get(task_id=task_id) + complete = tasks.get("completed", False) + else: + tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"}) + if len(tasks["nodes"]) == 0: + # empty nodes response indicates no tasks + complete = True else: await es.indices.forcemerge(**merge_params) diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 72bcf5b9a..ae2875dd9 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -1519,6 +1519,79 @@ async def test_bulk_index_success_with_refresh_invalid(self, es): class TestForceMergeRunner: + + def _eight_cluster_info_output(self): + return { + "name": "es01", + "cluster_name": "docker-cluster", + "cluster_uuid": "7KTGbgcOTgSC0_X8B57-Gg", + "version": { + "number": "8.1.0", + "build_flavor": "default", + "build_type": "docker", + "build_hash": "3700f7679f7d95e36da0b43762189bab189bc53a", + "build_date": "2022-03-03T14:20:00.690422633Z", + "build_snapshot": False, + "lucene_version": "9.0.0", + "minimum_wire_compatibility_version": "7.17.0", + "minimum_index_compatibility_version": "7.0.0", + }, + "tagline": "You Know, for Search", + } + + def _seven_cluster_info_output(self): + return { + "name": "es01", + "cluster_name": "escluster", + "cluster_uuid": "4BgOtWNiQ6-zap9zDW2Q1A", + "version": { + "number": "7.17.3", + "build_flavor": "default", + "build_type": "tar", + "build_hash": "5ad023604c8d7416c9eb6c0eadb62b14e766caff", + "build_date": "2022-04-19T08:11:19.070913226Z", + "build_snapshot": False, + "lucene_version": "8.11.1", + "minimum_wire_compatibility_version": "6.8.0", + "minimum_index_compatibility_version": "6.0.0-beta1", + }, + "tagline": "You Know, for Search", + } + + def _task_list_output(self): + return { + "nodes": { + "Ap3OfntPT7qL4CBeKvamxg": { + "name": "instance-0000000001", + "transport_address": "10.46.79.231:19693", + "host": "10.46.79.231", + "ip": "10.46.79.231:19693", + "roles": ["data", "ingest", "master", "remote_cluster_client", "transform"], + "attributes": { + "logical_availability_zone": "zone-1", + "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", + "availability_zone": "us-east4-a", + "xpack.installed": "true", + "instance_configuration": "gcp.data.highio.1", + "transform.node": "true", + "region": "unknown-region", + }, + "tasks": { + "Ap3OfntPT7qL4CBeKvamxg:417009036": { + "node": "Ap3OfntPT7qL4CBeKvamxg", + "id": 417009036, + "type": "transport", + "action": "indices:admin/forcemerge", + "start_time_in_millis": 1598018980850, + "running_time_in_nanos": 3659821411, + "cancellable": False, + "headers": {}, + } + }, + } + } + } + @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_force_merge_with_defaults(self, es): @@ -1562,60 +1635,14 @@ async def test_force_merge_with_params(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_force_merge_with_polling_no_timeout(self, es): - es.indices.forcemerge = mock.AsyncMock() + es.indices.forcemerge = mock.AsyncMock(return_value={"task": "Ap3OfntPT7qL4CBeKvamxg"}) es.info = mock.AsyncMock( - return_value={ - "name": "es01", - "cluster_name": "docker-cluster", - "cluster_uuid": "7KTGbgcOTgSC0_X8B57-Gg", - "version": { - "number": "8.1.0", - "build_flavor": "default", - "build_type": "docker", - "build_hash": "3700f7679f7d95e36da0b43762189bab189bc53a", - "build_date": "2022-03-03T14:20:00.690422633Z", - "build_snapshot": False, - "lucene_version": "9.0.0", - "minimum_wire_compatibility_version": "7.17.0", - "minimum_index_compatibility_version": "7.0.0", - }, - "tagline": "You Know, for Search", - }, + return_value=self._eight_cluster_info_output(), ) + es.tasks.get = mock.AsyncMock(return_value={"completed": True}) es.tasks.list = mock.AsyncMock( side_effect=[ - { - "nodes": { - "Ap3OfntPT7qL4CBeKvamxg": { - "name": "instance-0000000001", - "transport_address": "10.46.79.231:19693", - "host": "10.46.79.231", - "ip": "10.46.79.231:19693", - "roles": ["data", "ingest", "master", "remote_cluster_client", "transform"], - "attributes": { - "logical_availability_zone": "zone-1", - "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", - "availability_zone": "us-east4-a", - "xpack.installed": "true", - "instance_configuration": "gcp.data.highio.1", - "transform.node": "true", - "region": "unknown-region", - }, - "tasks": { - "Ap3OfntPT7qL4CBeKvamxg:417009036": { - "node": "Ap3OfntPT7qL4CBeKvamxg", - "id": 417009036, - "type": "transport", - "action": "indices:admin/forcemerge", - "start_time_in_millis": 1598018980850, - "running_time_in_nanos": 3659821411, - "cancellable": False, - "headers": {}, - } - }, - } - } - }, + self._task_list_output(), { "nodes": {}, }, @@ -1625,64 +1652,20 @@ async def test_force_merge_with_polling_no_timeout(self, es): force_merge = runner.ForceMerge() await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0}) es.indices.forcemerge.assert_awaited_once_with(index="_all", wait_for_completion=False) + es.tasks.get.assert_awaited_once_with(task_id="Ap3OfntPT7qL4CBeKvamxg") + es.tasks.list.assert_not_awaited() @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_force_merge_with_polling(self, es): - es.indices.forcemerge = mock.AsyncMock() + es.indices.forcemerge = mock.AsyncMock(return_value={"task": "Ap3OfntPT7qL4CBeKvamxg"}) es.info = mock.AsyncMock( - return_value={ - "name": "es01", - "cluster_name": "docker-cluster", - "cluster_uuid": "7KTGbgcOTgSC0_X8B57-Gg", - "version": { - "number": "8.1.0", - "build_flavor": "default", - "build_type": "docker", - "build_hash": "3700f7679f7d95e36da0b43762189bab189bc53a", - "build_date": "2022-03-03T14:20:00.690422633Z", - "build_snapshot": False, - "lucene_version": "9.0.0", - "minimum_wire_compatibility_version": "7.17.0", - "minimum_index_compatibility_version": "7.0.0", - }, - "tagline": "You Know, for Search", - }, + return_value=self._eight_cluster_info_output(), ) + es.tasks.get = mock.AsyncMock(return_value={"completed": True}) es.tasks.list = mock.AsyncMock( side_effect=[ - { - "nodes": { - "Ap3OfntPT7qL4CBeKvamxg": { - "name": "instance-0000000001", - "transport_address": "10.46.79.231:19693", - "host": "10.46.79.231", - "ip": "10.46.79.231:19693", - "roles": ["data", "ingest", "master", "remote_cluster_client", "transform"], - "attributes": { - "logical_availability_zone": "zone-1", - "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", - "availability_zone": "us-east4-a", - "xpack.installed": "true", - "instance_configuration": "gcp.data.highio.1", - "transform.node": "true", - "region": "unknown-region", - }, - "tasks": { - "Ap3OfntPT7qL4CBeKvamxg:417009036": { - "node": "Ap3OfntPT7qL4CBeKvamxg", - "id": 417009036, - "type": "transport", - "action": "indices:admin/forcemerge", - "start_time_in_millis": 1598018980850, - "running_time_in_nanos": 3659821411, - "cancellable": False, - "headers": {}, - } - }, - } - } - }, + self._task_list_output, { "nodes": {}, }, @@ -1691,64 +1674,48 @@ async def test_force_merge_with_polling(self, es): force_merge = runner.ForceMerge() await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0}) es.indices.forcemerge.assert_awaited_once_with(index="_all", wait_for_completion=False) + es.tasks.get.assert_awaited_once_with(task_id="Ap3OfntPT7qL4CBeKvamxg") + es.tasks.list.assert_not_awaited() @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_force_merge_with_polling_and_params(self, es): - es.indices.forcemerge = mock.AsyncMock() - es.info = mock.AsyncMock( - return_value={ - "name": "es01", - "cluster_name": "docker-cluster", - "cluster_uuid": "7KTGbgcOTgSC0_X8B57-Gg", - "version": { - "number": "8.1.0", - "build_flavor": "default", - "build_type": "docker", - "build_hash": "3700f7679f7d95e36da0b43762189bab189bc53a", - "build_date": "2022-03-03T14:20:00.690422633Z", - "build_snapshot": False, - "lucene_version": "9.0.0", - "minimum_wire_compatibility_version": "7.17.0", - "minimum_index_compatibility_version": "7.0.0", + es.indices.forcemerge = mock.AsyncMock(return_value={"task": "Ap3OfntPT7qL4CBeKvamxg"}) + es.info = mock.AsyncMock(return_value=self._eight_cluster_info_output()) + es.tasks.get = mock.AsyncMock(return_value={"completed": True}) + es.tasks.list = mock.AsyncMock( + side_effect=[ + self._task_list_output(), + { + "nodes": {}, }, - "tagline": "You Know, for Search", + ] + ) + force_merge = runner.ForceMerge() + # request-timeout should be ignored as mode:polling + await force_merge( + es, + params={ + "index": "_all", + "mode": "polling", + "max-num-segments": 1, + "request-timeout": 50000, + "poll-period": 0, }, ) + es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, request_timeout=50000, wait_for_completion=False) + es.tasks.get.assert_awaited_once_with(task_id="Ap3OfntPT7qL4CBeKvamxg") + es.tasks.list.assert_not_awaited() + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_force_merge_with_polling_and_params_missing_task_id(self, es): + es.indices.forcemerge = mock.AsyncMock(return_value={}) + es.info = mock.AsyncMock(return_value=self._eight_cluster_info_output()) + es.tasks.get = mock.AsyncMock(return_value={"completed": True}) es.tasks.list = mock.AsyncMock( side_effect=[ - { - "nodes": { - "Ap3OfntPT7qL4CBeKvamxg": { - "name": "instance-0000000001", - "transport_address": "10.46.79.231:19693", - "host": "10.46.79.231", - "ip": "10.46.79.231:19693", - "roles": ["data", "ingest", "master", "remote_cluster_client", "transform"], - "attributes": { - "logical_availability_zone": "zone-1", - "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", - "availability_zone": "us-east4-a", - "xpack.installed": "true", - "instance_configuration": "gcp.data.highio.1", - "transform.node": "true", - "region": "unknown-region", - }, - "tasks": { - "Ap3OfntPT7qL4CBeKvamxg:417009036": { - "node": "Ap3OfntPT7qL4CBeKvamxg", - "id": 417009036, - "type": "transport", - "action": "indices:admin/forcemerge", - "start_time_in_millis": 1598018980850, - "running_time_in_nanos": 3659821411, - "cancellable": False, - "headers": {}, - } - }, - } - } - }, + self._task_list_output(), { "nodes": {}, }, @@ -1767,29 +1734,15 @@ async def test_force_merge_with_polling_and_params(self, es): }, ) es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, request_timeout=50000, wait_for_completion=False) + es.tasks.get.assert_not_awaited() + es.tasks.list.assert_awaited() @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_force_merge_with_polling_no_timeout_pre_8_1(self, es): es.indices.forcemerge = mock.AsyncMock() es.info = mock.AsyncMock( - return_value={ - "name": "es01", - "cluster_name": "escluster", - "cluster_uuid": "4BgOtWNiQ6-zap9zDW2Q1A", - "version": { - "number": "7.17.3", - "build_flavor": "default", - "build_type": "tar", - "build_hash": "5ad023604c8d7416c9eb6c0eadb62b14e766caff", - "build_date": "2022-04-19T08:11:19.070913226Z", - "build_snapshot": False, - "lucene_version": "8.11.1", - "minimum_wire_compatibility_version": "6.8.0", - "minimum_index_compatibility_version": "6.0.0-beta1", - }, - "tagline": "You Know, for Search", - }, + return_value=self._seven_cluster_info_output(), ) force_merge = runner.ForceMerge() @@ -1800,59 +1753,10 @@ async def test_force_merge_with_polling_no_timeout_pre_8_1(self, es): @pytest.mark.asyncio async def test_force_merge_with_polling_pre_8_1(self, es): es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout")) - es.info = mock.AsyncMock( - return_value={ - "name": "es01", - "cluster_name": "escluster", - "cluster_uuid": "4BgOtWNiQ6-zap9zDW2Q1A", - "version": { - "number": "7.17.3", - "build_flavor": "default", - "build_type": "tar", - "build_hash": "5ad023604c8d7416c9eb6c0eadb62b14e766caff", - "build_date": "2022-04-19T08:11:19.070913226Z", - "build_snapshot": False, - "lucene_version": "8.11.1", - "minimum_wire_compatibility_version": "6.8.0", - "minimum_index_compatibility_version": "6.0.0-beta1", - }, - "tagline": "You Know, for Search", - }, - ) + es.info = mock.AsyncMock(return_value=self._seven_cluster_info_output()) es.tasks.list = mock.AsyncMock( side_effect=[ - { - "nodes": { - "Ap3OfntPT7qL4CBeKvamxg": { - "name": "instance-0000000001", - "transport_address": "10.46.79.231:19693", - "host": "10.46.79.231", - "ip": "10.46.79.231:19693", - "roles": ["data", "ingest", "master", "remote_cluster_client", "transform"], - "attributes": { - "logical_availability_zone": "zone-1", - "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", - "availability_zone": "us-east4-a", - "xpack.installed": "true", - "instance_configuration": "gcp.data.highio.1", - "transform.node": "true", - "region": "unknown-region", - }, - "tasks": { - "Ap3OfntPT7qL4CBeKvamxg:417009036": { - "node": "Ap3OfntPT7qL4CBeKvamxg", - "id": 417009036, - "type": "transport", - "action": "indices:admin/forcemerge", - "start_time_in_millis": 1598018980850, - "running_time_in_nanos": 3659821411, - "cancellable": False, - "headers": {}, - } - }, - } - } - }, + self._task_list_output(), { "nodes": {}, }, @@ -1867,58 +1771,11 @@ async def test_force_merge_with_polling_pre_8_1(self, es): async def test_force_merge_with_polling_and_params_pre_8_1(self, es): es.indices.forcemerge = mock.AsyncMock(return_value=elasticsearch.ConnectionTimeout("connection timeout")) es.info = mock.AsyncMock( - return_value={ - "name": "es01", - "cluster_name": "escluster", - "cluster_uuid": "4BgOtWNiQ6-zap9zDW2Q1A", - "version": { - "number": "7.17.3", - "build_flavor": "default", - "build_type": "tar", - "build_hash": "5ad023604c8d7416c9eb6c0eadb62b14e766caff", - "build_date": "2022-04-19T08:11:19.070913226Z", - "build_snapshot": False, - "lucene_version": "8.11.1", - "minimum_wire_compatibility_version": "6.8.0", - "minimum_index_compatibility_version": "6.0.0-beta1", - }, - "tagline": "You Know, for Search", - }, + return_value=self._seven_cluster_info_output(), ) es.tasks.list = mock.AsyncMock( side_effect=[ - { - "nodes": { - "Ap3OfntPT7qL4CBeKvamxg": { - "name": "instance-0000000001", - "transport_address": "10.46.79.231:19693", - "host": "10.46.79.231", - "ip": "10.46.79.231:19693", - "roles": ["data", "ingest", "master", "remote_cluster_client", "transform"], - "attributes": { - "logical_availability_zone": "zone-1", - "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", - "availability_zone": "us-east4-a", - "xpack.installed": "true", - "instance_configuration": "gcp.data.highio.1", - "transform.node": "true", - "region": "unknown-region", - }, - "tasks": { - "Ap3OfntPT7qL4CBeKvamxg:417009036": { - "node": "Ap3OfntPT7qL4CBeKvamxg", - "id": 417009036, - "type": "transport", - "action": "indices:admin/forcemerge", - "start_time_in_millis": 1598018980850, - "running_time_in_nanos": 3659821411, - "cancellable": False, - "headers": {}, - } - }, - } - } - }, + self._task_list_output(), { "nodes": {}, },