Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,7 @@ The Apache Software License, Version 2.0
* AirCompressor
- io.airlift-aircompressor-0.27.jar
* AsyncHttpClient
- org.asynchttpclient-async-http-client-2.12.4.jar
- org.asynchttpclient-async-http-client-netty-utils-2.12.4.jar
- org.asynchttpclient-async-http-client-3.0.4.jar
* Jetty
- org.eclipse.jetty-jetty-client-9.4.58.v20250814.jar
- org.eclipse.jetty-jetty-continuation-9.4.58.v20250814.jar
Expand Down Expand Up @@ -428,7 +427,7 @@ The Apache Software License, Version 2.0
* Kotlin Standard Lib
- org.jetbrains.kotlin-kotlin-stdlib-1.8.20.jar
- org.jetbrains.kotlin-kotlin-stdlib-common-1.8.20.jar
- org.jetbrains-annotations-13.0.jar
- org.jetbrains-annotations-26.0.2.jar
* gRPC
- io.grpc-grpc-all-1.75.0.jar
- io.grpc-grpc-auth-1.75.0.jar
Expand Down Expand Up @@ -578,7 +577,7 @@ Protocol Buffers License

CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
* Java Annotations API
- com.sun.activation-jakarta.activation-1.2.2.jar
- com.sun.activation-jakarta.activation-2.0.1.jar
* Java Servlet API -- javax.servlet-javax.servlet-api-3.1.0.jar
* WebSocket Server API -- javax.websocket-javax.websocket-client-api-1.0.jar
* HK2 - Dependency Injection Kernel
Expand All @@ -601,7 +600,7 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt

Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt
* Jakarta Activation
- jakarta.activation-jakarta.activation-api-1.2.2.jar
- jakarta.activation-jakarta.activation-api-2.0.1.jar
* Jakarta XML Binding -- jakarta.xml.bind-jakarta.xml.bind-api-2.3.3.jar

Eclipse Public License - v2.0 -- ../licenses/LICENSE-EPL-2.0.txt
Expand Down
8 changes: 4 additions & 4 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,7 @@ The Apache Software License, Version 2.0
* AirCompressor
- aircompressor-0.27.jar
* AsyncHttpClient
- async-http-client-2.12.4.jar
- async-http-client-netty-utils-2.12.4.jar
- async-http-client-3.0.4.jar
* Jetty
- jetty-client-9.4.58.v20250814.jar
- jetty-http-9.4.58.v20250814.jar
Expand All @@ -423,6 +422,7 @@ The Apache Software License, Version 2.0
* RoaringBitmap -- RoaringBitmap-1.2.0.jar
* Fastutil -- fastutil-8.5.16.jar
* JSpecify -- jspecify-1.0.0.jar
* Kotlin Standard Lib -- annotations-26.0.2.jar

BSD 3-clause "New" or "Revised" License
* JLine3 -- jline-3.21.0.jar -- ../licenses/LICENSE-JLine.txt
Expand All @@ -433,7 +433,7 @@ MIT License

CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt
* Java Annotations API
- jakarta.activation-1.2.2.jar
- jakarta.activation-2.0.1.jar
* WebSocket Server API -- javax.websocket-client-api-1.0.jar
* HK2 - Dependency Injection Kernel
- hk2-api-2.6.1.jar
Expand All @@ -452,7 +452,7 @@ CDDL-1.1 -- ../licenses/LICENSE-CDDL-1.1.txt

Eclipse Distribution License 1.0 -- ../licenses/LICENSE-EDL-1.0.txt
* Jakarta Activation
- jakarta.activation-api-1.2.2.jar
- jakarta.activation-api-2.0.1.jar
- validation-api-1.1.0.Final.jar
* Jakarta XML Binding -- jakarta.xml.bind-api-2.3.3.jar

Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ flexible messaging model and an intuitive client API.</description>
<prometheus-jmx.version>0.16.1</prometheus-jmx.version>
<confluent.version>7.9.2</confluent.version>
<aircompressor.version>0.27</aircompressor.version>
<asynchttpclient.version>2.12.4</asynchttpclient.version>
<asynchttpclient.version>3.0.4</asynchttpclient.version>
<commons-lang3.version>3.19.0</commons-lang3.version>
<commons-io.version>2.21.0</commons-io.version>
<commons-codec.version>1.20.0</commons-codec.version>
Expand All @@ -279,7 +279,7 @@ flexible messaging model and an intuitive client API.</description>
<lombok.version>1.18.42</lombok.version>
<jakarta.annotation-api.version>1.3.5</jakarta.annotation-api.version>
<jaxb-api>2.3.1</jaxb-api>
<jakarta.activation.version>1.2.2</jakarta.activation.version>
<jakarta.activation.version>2.0.1</jakarta.activation.version>
<jakarta.xml.bind.version>2.3.3</jakarta.xml.bind.version>
<jakarta.validation.version>2.0.2</jakarta.validation.version>
<jna.version>5.12.1</jna.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.security.PublicKey;
import java.security.interfaces.ECPublicKey;
import java.security.interfaces.RSAPublicKey;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -169,9 +170,9 @@ public void initialize(Context context) throws IOException {
this.issuers = validateIssuers(getConfigValueAsSet(config, ALLOWED_TOKEN_ISSUERS), requireHttps,
fallbackDiscoveryMode != FallbackDiscoveryMode.DISABLED);

int connectionTimeout = getConfigValueAsInt(config, HTTP_CONNECTION_TIMEOUT_MILLIS,
int connectionTimeoutMs = getConfigValueAsInt(config, HTTP_CONNECTION_TIMEOUT_MILLIS,
HTTP_CONNECTION_TIMEOUT_MILLIS_DEFAULT);
int readTimeout = getConfigValueAsInt(config, HTTP_READ_TIMEOUT_MILLIS, HTTP_READ_TIMEOUT_MILLIS_DEFAULT);
int readTimeoutMs = getConfigValueAsInt(config, HTTP_READ_TIMEOUT_MILLIS, HTTP_READ_TIMEOUT_MILLIS_DEFAULT);
String trustCertsFilePath = getConfigValueAsString(config, ISSUER_TRUST_CERTS_FILE_PATH, null);
SslContext sslContext = null;
// When config is in the conf file but is empty, it defaults to the empty string, which is not meaningful and
Expand All @@ -184,8 +185,8 @@ public void initialize(Context context) throws IOException {
}
AsyncHttpClientConfig clientConfig = new DefaultAsyncHttpClientConfig.Builder()
.setCookieStore(null)
.setConnectTimeout(connectionTimeout)
.setReadTimeout(readTimeout)
.setConnectTimeout(Duration.ofMillis(connectionTimeoutMs))
.setReadTimeout(Duration.ofMillis(readTimeoutMs))
.setSslContext(sslContext)
.build();
httpClient = new DefaultAsyncHttpClient(clientConfig);
Expand Down
5 changes: 5 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,11 @@
<artifactId>jakarta.activation-api</artifactId>
</dependency>

<dependency>
<groupId>com.sun.activation</groupId>
<artifactId>jakarta.activation</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private void configureAsyncHttpClientConfig(ClientConfigurationData conf, int co
confBuilder.setAcquireFreeChannelTimeout(conf.getRequestTimeoutMs());
}
if (conf.getConnectionMaxIdleSeconds() > 0) {
confBuilder.setPooledConnectionIdleTimeout(conf.getConnectionMaxIdleSeconds() * 1000);
confBuilder.setPooledConnectionIdleTimeout(Duration.ofSeconds(conf.getConnectionMaxIdleSeconds()));
}
if (sharedResources != null) {
if (this.eventLoopGroup != null) {
Expand All @@ -216,14 +216,14 @@ private void configureAsyncHttpClientConfig(ClientConfigurationData conf, int co
confBuilder.setCookieStore(null);
confBuilder.setUseProxyProperties(true);
confBuilder.setFollowRedirect(false);
confBuilder.setRequestTimeout(conf.getRequestTimeoutMs());
confBuilder.setConnectTimeout(connectTimeoutMs);
confBuilder.setReadTimeout(readTimeoutMs);
confBuilder.setRequestTimeout(Duration.ofMillis(conf.getRequestTimeoutMs()));
confBuilder.setConnectTimeout(Duration.ofMillis(connectTimeoutMs));
confBuilder.setReadTimeout(Duration.ofMillis(readTimeoutMs));
confBuilder.setUserAgent(String.format("Pulsar-Java-v%s%s",
PulsarVersion.getVersion(),
(conf.getDescription() == null ? "" : ("-" + conf.getDescription()))
));
confBuilder.setRequestTimeout(requestTimeoutMs);
confBuilder.setRequestTimeout(Duration.ofMillis(requestTimeoutMs));
confBuilder.setIoThreadsCount(conf.getNumIoThreads());
confBuilder.setKeepAliveStrategy(new DefaultKeepAliveStrategy() {
@Override
Expand All @@ -234,7 +234,8 @@ public boolean keepAlive(InetSocketAddress remoteAddress, Request ahcRequest,
&& super.keepAlive(remoteAddress, ahcRequest, request, response);
}
});
confBuilder.setDisableHttpsEndpointIdentificationAlgorithm(!conf.isTlsHostnameVerificationEnable());
confBuilder.setSslEngineFactory(
new PulsarHttpAsyncSslEngineFactory(sslFactory, null, conf.isTlsHostnameVerificationEnable()));
}

protected AsyncHttpClient createAsyncHttpClient(AsyncHttpClientConfig asyncHttpClientConfig) {
Expand All @@ -260,7 +261,8 @@ private void configureAsyncHttpClientSslEngineFactory(ClientConfigurationData co
}
String hostname = conf.isTlsHostnameVerificationEnable() ? null : serviceNameResolver
.resolveHostUri().getHost();
SslEngineFactory sslEngineFactory = new PulsarHttpAsyncSslEngineFactory(sslFactory, hostname);
SslEngineFactory sslEngineFactory =
new PulsarHttpAsyncSslEngineFactory(sslFactory, hostname, conf.isTlsHostnameVerificationEnable());
confBuilder.setSslEngineFactory(sslEngineFactory);
confBuilder.setUseInsecureTrustManager(conf.isTlsAllowInsecureConnection());
confBuilder.setDisableHttpsEndpointIdentificationAlgorithm(!conf.isTlsHostnameVerificationEnable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
Expand Down Expand Up @@ -91,8 +92,8 @@ private AsyncHttpClient buildHttpClient() {
confBuilder.setUseProxyProperties(true);
confBuilder.setFollowRedirect(true);
confBuilder.setMaxRedirects(DEFAULT_MAX_REDIRECTS);
confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000);
confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
confBuilder.setConnectTimeout(Duration.ofSeconds(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS));
confBuilder.setReadTimeout(Duration.ofSeconds(DEFAULT_READ_TIMEOUT_IN_SECONDS));
confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
confBuilder.setKeepAliveStrategy(new DefaultKeepAliveStrategy() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.time.Duration;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
Expand Down Expand Up @@ -89,8 +90,8 @@ protected HttpClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup
confBuilder.setUseProxyProperties(true);
confBuilder.setFollowRedirect(true);
confBuilder.setMaxRedirects(conf.getMaxLookupRedirects());
confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000);
confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
confBuilder.setConnectTimeout(Duration.ofSeconds(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS));
confBuilder.setReadTimeout(Duration.ofSeconds(DEFAULT_READ_TIMEOUT_IN_SECONDS));
confBuilder.setUserAgent(String.format("Pulsar-Java-v%s%s",
PulsarVersion.getVersion(),
(conf.getDescription() == null ? "" : ("-" + conf.getDescription()))
Expand Down Expand Up @@ -124,7 +125,9 @@ public boolean keepAlive(InetSocketAddress remoteAddress, Request ahcRequest,
}
String hostname = conf.isTlsHostnameVerificationEnable() ? null : serviceNameResolver
.resolveHostUri().getHost();
SslEngineFactory sslEngineFactory = new PulsarHttpAsyncSslEngineFactory(this.sslFactory, hostname);
SslEngineFactory sslEngineFactory =
new PulsarHttpAsyncSslEngineFactory(this.sslFactory, hostname,
conf.isTlsHostnameVerificationEnable());
confBuilder.setSslEngineFactory(sslEngineFactory);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,9 @@ private AsyncHttpClient defaultHttpClient(Duration readTimeout, Duration connect
confBuilder.setCookieStore(null);
confBuilder.setUseProxyProperties(true);
confBuilder.setFollowRedirect(true);
confBuilder.setConnectTimeout(
getParameterDurationToMillis(CONFIG_PARAM_CONNECT_TIMEOUT, connectTimeout,
confBuilder.setConnectTimeout(getParameterDuration(CONFIG_PARAM_CONNECT_TIMEOUT, connectTimeout,
DEFAULT_CONNECT_TIMEOUT));
confBuilder.setReadTimeout(
getParameterDurationToMillis(CONFIG_PARAM_READ_TIMEOUT, readTimeout, DEFAULT_READ_TIMEOUT));
confBuilder.setReadTimeout(getParameterDuration(CONFIG_PARAM_READ_TIMEOUT, readTimeout, DEFAULT_READ_TIMEOUT));
confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
if (StringUtils.isNotBlank(trustCertsFilePath)) {
try {
Expand All @@ -87,17 +85,14 @@ private AsyncHttpClient defaultHttpClient(Duration readTimeout, Duration connect
return new DefaultAsyncHttpClient(confBuilder.build());
}

private int getParameterDurationToMillis(String name, Duration value, Duration defaultValue) {
Duration duration;
private Duration getParameterDuration(String name, Duration value, Duration defaultValue) {
if (value == null) {
log.info("Configuration for [{}] is using the default value: [{}]", name, defaultValue);
duration = defaultValue;
return defaultValue;
} else {
log.info("Configuration for [{}] is: [{}]", name, value);
duration = value;
return value;
}

return (int) duration.toMillis();
}

public void initialize() throws PulsarClientException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,28 @@ public class PulsarHttpAsyncSslEngineFactory extends DefaultSslEngineFactory {

private final PulsarSslFactory pulsarSslFactory;
private final String host;
private final boolean enableHostnameVerification;

public PulsarHttpAsyncSslEngineFactory(PulsarSslFactory pulsarSslFactory, String host) {
public PulsarHttpAsyncSslEngineFactory(PulsarSslFactory pulsarSslFactory, String host,
boolean enableHostnameVerification) {
this.pulsarSslFactory = pulsarSslFactory;
this.host = host;
this.enableHostnameVerification = enableHostnameVerification;
}

@Override
protected void configureSslEngine(SSLEngine sslEngine, AsyncHttpClientConfig config) {
super.configureSslEngine(sslEngine, config);
SSLParameters parameters = sslEngine.getSSLParameters();
if (StringUtils.isNotBlank(host)) {
SSLParameters parameters = sslEngine.getSSLParameters();
parameters.setServerNames(Collections.singletonList(new SNIHostName(host)));
sslEngine.setSSLParameters(parameters);
}

if (enableHostnameVerification) {
parameters.setEndpointIdentificationAlgorithm("HTTPS");
}

sslEngine.setSSLParameters(parameters);
}

@Override
Expand Down
Loading