diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index e61fbfac56622..cfe22a4ae547d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1833,6 +1833,9 @@ protected PulsarAdminBuilder getCreateAdminClientBuilder() + ", webServiceAddress: " + webServiceAddress); } PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl); + // most of the admin request requires to make zk-call so, keep the max read-timeout based on + // zk-operation timeout. Put it before loading brokerClient_ prefix config, so user can override it + builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); // Apply all arbitrary configuration. This must be called before setting any fields annotated as // @Secret on the ClientConfigurationData object because of the way they are serialized. @@ -1864,9 +1867,6 @@ protected PulsarAdminBuilder getCreateAdminClientBuilder() .enableTlsHostnameVerification(conf.isTlsHostnameVerificationEnabled()); } - // most of the admin request requires to make zk-call so, keep the max read-timeout based on - // zk-operation timeout - builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); return builder; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index e4b2f33502378..0d964b02a5fb1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1653,6 +1653,10 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional c PulsarAdminBuilder builder = PulsarAdmin.builder(); ServiceConfiguration conf = pulsar.getConfig(); + // most of the admin request requires to make zk-call so, keep the max read-timeout based on + // zk-operation timeout. Put it before loading brokerClient_ prefix config, so user can override it + builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); + // Apply all arbitrary configuration. This must be called before setting any fields annotated as // @Secret on the ClientConfigurationData object because of the way they are serialized. // See https://github.com/apache/pulsar/issues/8509 for more information. @@ -1708,10 +1712,6 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional c ); } - // most of the admin request requires to make zk-call so, keep the max read-timeout based on - // zk-operation timeout - builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); - PulsarAdmin adminClient = builder.build(); log.info("created admin with url {} ", adminApiUrl); return adminClient; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java index e4ca7724ca1d7..f19d2356eba18 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.net.URL; import java.util.Map; -import java.util.concurrent.TimeUnit; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.WebTarget; @@ -145,8 +144,6 @@ public PulsarAdminImpl(String serviceUrl, ClientConfigurationData clientConfigDa ClientBuilder clientBuilder = ClientBuilder.newBuilder() .withConfig(httpConfig) - .connectTimeout(this.clientConfigData.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS) - .readTimeout(this.clientConfigData.getReadTimeoutMs(), TimeUnit.MILLISECONDS) .register(JacksonConfigurator.class).register(JacksonFeature.class); boolean useTls = clientConfigData.getServiceUrl().startsWith("https://"); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java index ed17df8bd73ec..ddb6f757429be 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java @@ -53,7 +53,6 @@ import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import javax.ws.rs.ProcessingException; -import javax.ws.rs.client.Client; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response.Status; import lombok.Data; @@ -62,7 +61,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; import org.apache.pulsar.PulsarVersion; -import org.apache.pulsar.client.admin.internal.PulsarAdminImpl; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.PulsarClientSharedResourcesImpl; import org.apache.pulsar.client.impl.PulsarServiceNameResolver; @@ -88,7 +86,6 @@ import org.asynchttpclient.SslEngineFactory; import org.asynchttpclient.channel.DefaultKeepAliveStrategy; import org.asynchttpclient.uri.Uri; -import org.glassfish.jersey.client.ClientProperties; import org.glassfish.jersey.client.ClientRequest; import org.glassfish.jersey.client.ClientResponse; import org.glassfish.jersey.client.spi.AsyncConnectorCallback; @@ -118,13 +115,10 @@ public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor { private final Map> concurrencyReducers = new ConcurrentHashMap<>(); private PulsarSslFactory sslFactory; - public AsyncHttpConnector(Client client, ClientConfigurationData conf, int autoCertRefreshTimeSeconds, + public AsyncHttpConnector(ClientConfigurationData conf, int autoCertRefreshTimeSeconds, boolean acceptGzipCompression) { - this((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT), - (int) client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT), - PulsarAdminImpl.DEFAULT_REQUEST_TIMEOUT_SECONDS * 1000, - autoCertRefreshTimeSeconds, - conf, acceptGzipCompression, null); + this(conf.getConnectionTimeoutMs(), conf.getReadTimeoutMs(), conf.getRequestTimeoutMs(), + autoCertRefreshTimeSeconds, conf, acceptGzipCompression, null); } @SneakyThrows @@ -216,7 +210,6 @@ private void configureAsyncHttpClientConfig(ClientConfigurationData conf, int co confBuilder.setCookieStore(null); confBuilder.setUseProxyProperties(true); confBuilder.setFollowRedirect(false); - confBuilder.setRequestTimeout(conf.getRequestTimeoutMs()); confBuilder.setConnectTimeout(connectTimeoutMs); confBuilder.setReadTimeout(readTimeoutMs); confBuilder.setUserAgent(String.format("Pulsar-Java-v%s%s", diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java index caaa356f7c709..c6085d4e4369c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java @@ -45,7 +45,7 @@ public AsyncHttpConnectorProvider(ClientConfigurationData conf, int autoCertRefr @Override public Connector getConnector(Client client, Configuration runtimeConfig) { if (connector == null) { - connector = new AsyncHttpConnector(client, conf, autoCertRefreshTimeSeconds, acceptGzipCompression); + connector = new AsyncHttpConnector(conf, autoCertRefreshTimeSeconds, acceptGzipCompression); } return connector; } diff --git a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminImplTest.java b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminImplTest.java index 27c6fd96079f5..8b87435499417 100644 --- a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminImplTest.java +++ b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminImplTest.java @@ -18,12 +18,27 @@ */ package org.apache.pulsar.client.admin.internal; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.ok; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.github.tomakehurst.wiremock.stubbing.Scenario; +import java.util.List; +import lombok.Cleanup; import lombok.SneakyThrows; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; /** @@ -31,6 +46,27 @@ */ public class PulsarAdminImplTest { + WireMockServer server; + + @BeforeClass(alwaysRun = true) + void beforeClass() { + server = new WireMockServer(WireMockConfiguration.wireMockConfig() + .port(0)); + server.start(); + } + + @AfterClass(alwaysRun = true) + void afterClass() { + if (server != null) { + server.stop(); + } + } + + @BeforeMethod + public void beforeEach() { + server.resetAll(); + } + @Test public void testAuthDisabledWhenAuthNotSpecifiedAnywhere() { assertThat(createAdminAndGetAuth(new ClientConfigurationData())) @@ -51,4 +87,166 @@ private Authentication createAdminAndGetAuth(ClientConfigurationData conf) { return admin.auth; } } + + @Test + @SneakyThrows + public void testPulsarAdminAsyncHttpConnectorSuccessWithoutRetry() { + int readTimeoutMs = 5000; + int requestTimeoutMs = 5000; + int serverDelayMs = 3000; + server.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .inScenario("read-success-without-retry") + .whenScenarioStateIs(Scenario.STARTED) + .willSetStateTo("success") + .willReturn(ok() + .withHeader("Content-Type", "application/json") + .withBody("[\"test-cluster\"]") + .withFixedDelay(serverDelayMs))); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl("http://localhost:" + server.port()); + conf.setConnectionTimeoutMs(2000); + conf.setReadTimeoutMs(readTimeoutMs); + conf.setRequestTimeoutMs(requestTimeoutMs); + + @Cleanup + PulsarAdminImpl admin = new PulsarAdminImpl(conf.getServiceUrl(), conf, null); + List clusters = admin.clusters().getClusters(); + assertThat(clusters).containsOnly("test-cluster"); + + server.verify(1, getRequestedFor(urlEqualTo("/admin/v2/clusters"))); + String scenarioState = server.getAllScenarios().getScenarios().stream() + .filter(scenario -> "read-success-without-retry".equals(scenario.getName())).findFirst().get() + .getState(); + assertEquals(scenarioState, "success"); + } + + @Test + @SneakyThrows + public void testPulsarAdminAsyncHttpConnectorTimeoutWithoutRetry() { + int readTimeoutMs = 5000; + int requestTimeoutMs = 5000; + int serverDelayMs = 8000; + + server.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .inScenario("read-timeout-without-retry") + .whenScenarioStateIs(Scenario.STARTED) + .willSetStateTo("end") + .willReturn(ok() + .withHeader("Content-Type", "application/json") + .withBody("[\"test-cluster\"]") + .withFixedDelay(serverDelayMs))); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl("http://localhost:" + server.port()); + conf.setConnectionTimeoutMs(2000); + conf.setReadTimeoutMs(readTimeoutMs); + conf.setRequestTimeoutMs(requestTimeoutMs); + + @Cleanup + PulsarAdminImpl admin = new PulsarAdminImpl(conf.getServiceUrl(), conf, null); + Assert.expectThrows(PulsarAdminException.TimeoutException.class, () -> { + admin.clusters().getClusters(); + }); + + server.verify(1, getRequestedFor(urlEqualTo("/admin/v2/clusters"))); + String scenarioState = server.getAllScenarios().getScenarios().stream() + .filter(scenario -> "read-timeout-without-retry".equals(scenario.getName())).findFirst().get() + .getState(); + assertEquals(scenarioState, "end"); + } + + @Test + @SneakyThrows + public void testPulsarAdminAsyncHttpConnectorSuccessWithRetry() { + int readTimeoutMs = 5000; + int requestTimeoutMs = 10000; + int serverDelayMs = 7000; + server.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .inScenario("read-success-with-retry") + .whenScenarioStateIs(Scenario.STARTED) + .willSetStateTo("first-call") + .willReturn(ok() + .withHeader("Content-Type", "application/json") + .withBody("[\"test-cluster\"]") + .withFixedDelay(serverDelayMs))); + + server.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .inScenario("read-success-with-retry") + .whenScenarioStateIs("first-call") + .willSetStateTo("success") + .willReturn(ok() + .withHeader("Content-Type", "application/json") + .withBody("[\"test-cluster\"]"))); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl("http://localhost:" + server.port()); + conf.setConnectionTimeoutMs(2000); + conf.setReadTimeoutMs(readTimeoutMs); + conf.setRequestTimeoutMs(requestTimeoutMs); + + @Cleanup + PulsarAdminImpl admin = new PulsarAdminImpl(conf.getServiceUrl(), conf, null); + List clusters = admin.clusters().getClusters(); + assertThat(clusters).containsOnly("test-cluster"); + + server.verify(2, getRequestedFor(urlEqualTo("/admin/v2/clusters"))); + String scenarioState = server.getAllScenarios().getScenarios().stream() + .filter(scenario -> "read-success-with-retry".equals(scenario.getName())).findFirst().get() + .getState(); + assertEquals(scenarioState, "success"); + } + + @Test + @SneakyThrows + public void testPulsarAdminAsyncHttpConnectorTimeoutWithRetry() { + int readTimeoutMs = 5000; + int requestTimeoutMs = 14000; + int serverDelayMs = 6000; + server.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .inScenario("read-timeout-with-retry") + .whenScenarioStateIs(Scenario.STARTED) + .willSetStateTo("first-call") + .willReturn(ok() + .withHeader("Content-Type", "application/json") + .withBody("[\"test-cluster\"]") + .withFixedDelay(serverDelayMs))); + + server.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .inScenario("read-timeout-with-retry") + .whenScenarioStateIs("first-call") + .willSetStateTo("second-call") + .willReturn(ok() + .withHeader("Content-Type", "application/json") + .withBody("[\"test-cluster\"]") + .withFixedDelay(serverDelayMs))); + + server.stubFor(get(urlEqualTo("/admin/v2/clusters")) + .inScenario("read-timeout-with-retry") + .whenScenarioStateIs("second-call") + .willSetStateTo("end") + .willReturn(ok() + .withHeader("Content-Type", "application/json") + .withBody("[\"test-cluster\"]") + .withFixedDelay(serverDelayMs))); + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl("http://localhost:" + server.port()); + conf.setConnectionTimeoutMs(2000); + conf.setReadTimeoutMs(readTimeoutMs); + conf.setRequestTimeoutMs(requestTimeoutMs); + + @Cleanup + PulsarAdminImpl admin = new PulsarAdminImpl(conf.getServiceUrl(), conf, null); + Assert.expectThrows(PulsarAdminException.TimeoutException.class, () -> { + admin.clusters().getClusters(); + }); + + server.verify(3, getRequestedFor(urlEqualTo("/admin/v2/clusters"))); + String scenarioState = server.getAllScenarios().getScenarios().stream() + .filter(scenario -> "read-timeout-with-retry".equals(scenario.getName())).findFirst().get() + .getState(); + assertEquals(scenarioState, "end"); + } + }