Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1833,6 +1833,9 @@ protected PulsarAdminBuilder getCreateAdminClientBuilder()
+ ", webServiceAddress: " + webServiceAddress);
}
PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl);
// most of the admin request requires to make zk-call so, keep the max read-timeout based on
// zk-operation timeout. Put it before loading brokerClient_ prefix config, so user can override it
builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);

// Apply all arbitrary configuration. This must be called before setting any fields annotated as
// @Secret on the ClientConfigurationData object because of the way they are serialized.
Expand Down Expand Up @@ -1864,9 +1867,6 @@ protected PulsarAdminBuilder getCreateAdminClientBuilder()
.enableTlsHostnameVerification(conf.isTlsHostnameVerificationEnabled());
}

// most of the admin request requires to make zk-call so, keep the max read-timeout based on
// zk-operation timeout
builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1653,6 +1653,10 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional<ClusterData> c
PulsarAdminBuilder builder = PulsarAdmin.builder();

ServiceConfiguration conf = pulsar.getConfig();
// most of the admin request requires to make zk-call so, keep the max read-timeout based on
// zk-operation timeout. Put it before loading brokerClient_ prefix config, so user can override it
builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);

// Apply all arbitrary configuration. This must be called before setting any fields annotated as
// @Secret on the ClientConfigurationData object because of the way they are serialized.
// See https://github.com/apache/pulsar/issues/8509 for more information.
Expand Down Expand Up @@ -1708,10 +1712,6 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional<ClusterData> c
);
}

// most of the admin request requires to make zk-call so, keep the max read-timeout based on
// zk-operation timeout
builder.readTimeout(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);

PulsarAdmin adminClient = builder.build();
log.info("created admin with url {} ", adminApiUrl);
return adminClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
Expand Down Expand Up @@ -145,8 +144,6 @@ public PulsarAdminImpl(String serviceUrl, ClientConfigurationData clientConfigDa

ClientBuilder clientBuilder = ClientBuilder.newBuilder()
.withConfig(httpConfig)
.connectTimeout(this.clientConfigData.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS)
.readTimeout(this.clientConfigData.getReadTimeoutMs(), TimeUnit.MILLISECONDS)
.register(JacksonConfigurator.class).register(JacksonFeature.class);

boolean useTls = clientConfigData.getServiceUrl().startsWith("https://");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response.Status;
import lombok.Data;
Expand All @@ -62,7 +61,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientSharedResourcesImpl;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
Expand All @@ -88,7 +86,6 @@
import org.asynchttpclient.SslEngineFactory;
import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
import org.asynchttpclient.uri.Uri;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.client.ClientRequest;
import org.glassfish.jersey.client.ClientResponse;
import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
Expand Down Expand Up @@ -118,13 +115,10 @@ public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor {
private final Map<String, ConcurrencyReducer<Response>> concurrencyReducers = new ConcurrentHashMap<>();
private PulsarSslFactory sslFactory;

public AsyncHttpConnector(Client client, ClientConfigurationData conf, int autoCertRefreshTimeSeconds,
public AsyncHttpConnector(ClientConfigurationData conf, int autoCertRefreshTimeSeconds,
boolean acceptGzipCompression) {
this((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT),
(int) client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT),
PulsarAdminImpl.DEFAULT_REQUEST_TIMEOUT_SECONDS * 1000,
autoCertRefreshTimeSeconds,
conf, acceptGzipCompression, null);
this(conf.getConnectionTimeoutMs(), conf.getReadTimeoutMs(), conf.getRequestTimeoutMs(),
autoCertRefreshTimeSeconds, conf, acceptGzipCompression, null);
}

@SneakyThrows
Expand Down Expand Up @@ -216,7 +210,6 @@ private void configureAsyncHttpClientConfig(ClientConfigurationData conf, int co
confBuilder.setCookieStore(null);
confBuilder.setUseProxyProperties(true);
confBuilder.setFollowRedirect(false);
confBuilder.setRequestTimeout(conf.getRequestTimeoutMs());
confBuilder.setConnectTimeout(connectTimeoutMs);
confBuilder.setReadTimeout(readTimeoutMs);
confBuilder.setUserAgent(String.format("Pulsar-Java-v%s%s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public AsyncHttpConnectorProvider(ClientConfigurationData conf, int autoCertRefr
@Override
public Connector getConnector(Client client, Configuration runtimeConfig) {
if (connector == null) {
connector = new AsyncHttpConnector(client, conf, autoCertRefreshTimeSeconds, acceptGzipCompression);
connector = new AsyncHttpConnector(conf, autoCertRefreshTimeSeconds, acceptGzipCompression);
}
return connector;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,55 @@
*/
package org.apache.pulsar.client.admin.internal;

import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.ok;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.stubbing.Scenario;
import java.util.List;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/**
* Unit tests for {@link PulsarAdminImpl}.
*/
public class PulsarAdminImplTest {

WireMockServer server;

@BeforeClass(alwaysRun = true)
void beforeClass() {
server = new WireMockServer(WireMockConfiguration.wireMockConfig()
.port(0));
server.start();
}

@AfterClass(alwaysRun = true)
void afterClass() {
if (server != null) {
server.stop();
}
}

@BeforeMethod
public void beforeEach() {
server.resetAll();
}

@Test
public void testAuthDisabledWhenAuthNotSpecifiedAnywhere() {
assertThat(createAdminAndGetAuth(new ClientConfigurationData()))
Expand All @@ -51,4 +87,166 @@ private Authentication createAdminAndGetAuth(ClientConfigurationData conf) {
return admin.auth;
}
}

@Test
@SneakyThrows
public void testPulsarAdminAsyncHttpConnectorSuccessWithoutRetry() {
int readTimeoutMs = 5000;
int requestTimeoutMs = 5000;
int serverDelayMs = 3000;
server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
.inScenario("read-success-without-retry")
.whenScenarioStateIs(Scenario.STARTED)
.willSetStateTo("success")
.willReturn(ok()
.withHeader("Content-Type", "application/json")
.withBody("[\"test-cluster\"]")
.withFixedDelay(serverDelayMs)));

ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("http://localhost:" + server.port());
conf.setConnectionTimeoutMs(2000);
conf.setReadTimeoutMs(readTimeoutMs);
conf.setRequestTimeoutMs(requestTimeoutMs);

@Cleanup
PulsarAdminImpl admin = new PulsarAdminImpl(conf.getServiceUrl(), conf, null);
List<String> clusters = admin.clusters().getClusters();
assertThat(clusters).containsOnly("test-cluster");

server.verify(1, getRequestedFor(urlEqualTo("/admin/v2/clusters")));
String scenarioState = server.getAllScenarios().getScenarios().stream()
.filter(scenario -> "read-success-without-retry".equals(scenario.getName())).findFirst().get()
.getState();
assertEquals(scenarioState, "success");
}

@Test
@SneakyThrows
public void testPulsarAdminAsyncHttpConnectorTimeoutWithoutRetry() {
int readTimeoutMs = 5000;
int requestTimeoutMs = 5000;
int serverDelayMs = 8000;

server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
.inScenario("read-timeout-without-retry")
.whenScenarioStateIs(Scenario.STARTED)
.willSetStateTo("end")
.willReturn(ok()
.withHeader("Content-Type", "application/json")
.withBody("[\"test-cluster\"]")
.withFixedDelay(serverDelayMs)));

ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("http://localhost:" + server.port());
conf.setConnectionTimeoutMs(2000);
conf.setReadTimeoutMs(readTimeoutMs);
conf.setRequestTimeoutMs(requestTimeoutMs);

@Cleanup
PulsarAdminImpl admin = new PulsarAdminImpl(conf.getServiceUrl(), conf, null);
Assert.expectThrows(PulsarAdminException.TimeoutException.class, () -> {
admin.clusters().getClusters();
});

server.verify(1, getRequestedFor(urlEqualTo("/admin/v2/clusters")));
String scenarioState = server.getAllScenarios().getScenarios().stream()
.filter(scenario -> "read-timeout-without-retry".equals(scenario.getName())).findFirst().get()
.getState();
assertEquals(scenarioState, "end");
}

@Test
@SneakyThrows
public void testPulsarAdminAsyncHttpConnectorSuccessWithRetry() {
int readTimeoutMs = 5000;
int requestTimeoutMs = 10000;
int serverDelayMs = 7000;
server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
.inScenario("read-success-with-retry")
.whenScenarioStateIs(Scenario.STARTED)
.willSetStateTo("first-call")
.willReturn(ok()
.withHeader("Content-Type", "application/json")
.withBody("[\"test-cluster\"]")
.withFixedDelay(serverDelayMs)));

server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
.inScenario("read-success-with-retry")
.whenScenarioStateIs("first-call")
.willSetStateTo("success")
.willReturn(ok()
.withHeader("Content-Type", "application/json")
.withBody("[\"test-cluster\"]")));

ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("http://localhost:" + server.port());
conf.setConnectionTimeoutMs(2000);
conf.setReadTimeoutMs(readTimeoutMs);
conf.setRequestTimeoutMs(requestTimeoutMs);

@Cleanup
PulsarAdminImpl admin = new PulsarAdminImpl(conf.getServiceUrl(), conf, null);
List<String> clusters = admin.clusters().getClusters();
assertThat(clusters).containsOnly("test-cluster");

server.verify(2, getRequestedFor(urlEqualTo("/admin/v2/clusters")));
String scenarioState = server.getAllScenarios().getScenarios().stream()
.filter(scenario -> "read-success-with-retry".equals(scenario.getName())).findFirst().get()
.getState();
assertEquals(scenarioState, "success");
}

@Test
@SneakyThrows
public void testPulsarAdminAsyncHttpConnectorTimeoutWithRetry() {
int readTimeoutMs = 5000;
int requestTimeoutMs = 14000;
int serverDelayMs = 6000;
server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
.inScenario("read-timeout-with-retry")
.whenScenarioStateIs(Scenario.STARTED)
.willSetStateTo("first-call")
.willReturn(ok()
.withHeader("Content-Type", "application/json")
.withBody("[\"test-cluster\"]")
.withFixedDelay(serverDelayMs)));

server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
.inScenario("read-timeout-with-retry")
.whenScenarioStateIs("first-call")
.willSetStateTo("second-call")
.willReturn(ok()
.withHeader("Content-Type", "application/json")
.withBody("[\"test-cluster\"]")
.withFixedDelay(serverDelayMs)));

server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
.inScenario("read-timeout-with-retry")
.whenScenarioStateIs("second-call")
.willSetStateTo("end")
.willReturn(ok()
.withHeader("Content-Type", "application/json")
.withBody("[\"test-cluster\"]")
.withFixedDelay(serverDelayMs)));

ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("http://localhost:" + server.port());
conf.setConnectionTimeoutMs(2000);
conf.setReadTimeoutMs(readTimeoutMs);
conf.setRequestTimeoutMs(requestTimeoutMs);

@Cleanup
PulsarAdminImpl admin = new PulsarAdminImpl(conf.getServiceUrl(), conf, null);
Assert.expectThrows(PulsarAdminException.TimeoutException.class, () -> {
admin.clusters().getClusters();
});

server.verify(3, getRequestedFor(urlEqualTo("/admin/v2/clusters")));
String scenarioState = server.getAllScenarios().getScenarios().stream()
.filter(scenario -> "read-timeout-with-retry".equals(scenario.getName())).findFirst().get()
.getState();
assertEquals(scenarioState, "end");
}

}