From 893f1450b614379c59f58e03cd2db7d756fe42bf Mon Sep 17 00:00:00 2001 From: 3pacccccc Date: Sat, 13 Dec 2025 00:59:41 +0800 Subject: [PATCH 01/10] add sharedResource --- .../pulsar/client/api/Authentication.java | 7 ++ .../client/api/AuthenticationInitContext.java | 67 ++++++++++++++ .../impl/AuthenticationInitContextImpl.java | 64 ++++++++++++++ .../pulsar/client/impl/PulsarClientImpl.java | 11 ++- .../oauth2/AuthenticationFactoryOAuth2.java | 71 +++++++++++++-- .../auth/oauth2/AuthenticationOAuth2.java | 76 +++++++++++++--- .../auth/oauth2/ClientCredentialsFlow.java | 24 ++--- .../client/impl/auth/oauth2/FlowBase.java | 74 ++-------------- .../protocol/DefaultMetadataResolver.java | 25 ++++-- .../auth/oauth2/protocol/TokenClient.java | 18 ++-- .../http/AuthenticationHttpClientConfig.java | 46 ++++++++++ .../http/AuthenticationHttpClientFactory.java | 88 +++++++++++++++++++ .../pulsar/client/impl/http/package-info.java | 23 +++++ .../auth/oauth2/protocol/TokenClientTest.java | 4 +- 14 files changed, 486 insertions(+), 112 deletions(-) create mode 100644 pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationInitContext.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/AuthenticationInitContextImpl.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientFactory.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/package-info.java diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java index 48d9e3e230701..b60e7e7761fc5 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java @@ -82,6 +82,13 @@ default AuthenticationDataProvider getAuthData(String brokerHostName) throws Pul */ void start() throws PulsarClientException; + /** + * Initialize the authentication provider with {@link AuthenticationInitContext}. + */ + default void start(AuthenticationInitContext context) throws PulsarClientException { + start(); + } + /** * An authentication Stage. * when authentication complete, passed-in authFuture will contains authentication related http request headers. diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationInitContext.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationInitContext.java new file mode 100644 index 0000000000000..3cdc24184ebe4 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationInitContext.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + + +import java.util.Optional; + +/** + * Authentication initialization context that provides access to shared services and resources + * during the authentication provider initialization phase. + * + *

This context enables authentication implementations to utilize shared resources such as + * thread pools, DNS resolvers, and timers that are managed by the Pulsar client, rather than + * creating their own instances. This improves resource utilization and performance, especially + * when multiple client instances or authentication providers are used within the same application. + * + *

Usage Examples

+ * + *

Getting a shared EventLoopGroup: + *

{@code
+ * public void start(AuthenticationInitContext context) throws PulsarClientException {
+ *     Optional eventLoop = context.getService(EventLoopGroup.class);
+ *     if (eventLoop.isPresent()) {
+ *         this.sharedEventLoopGroup = eventLoop.get();
+ *     } else {
+ *         // Fallback to creating own instance
+ *         this.ownEventLoopGroup = new NioEventLoopGroup();
+ *     }
+ * }
+ * }
+ * + *

Getting a named DNS resolver: + *

{@code
+ * public void start(AuthenticationInitContext context) throws PulsarClientException {
+ *     Optional dnsResolver = context.getServiceByName(DnsResolver.class, "secure-dns");
+ *     if (dnsResolver.isPresent()) {
+ *         this.dnsResolver = dnsResolver.get();
+ *     }
+ * }
+ * }
+ * + *

Authentication providers should prefer using shared resources when available to minimize + * system resource consumption and improve performance through better resource reuse. + * + * @see Authentication + * @since 2.11.0 + */ +public interface AuthenticationInitContext { + Optional getService(Class serviceClass); + Optional getServiceByName(Class serviceClass, String name); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AuthenticationInitContextImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AuthenticationInitContextImpl.java new file mode 100644 index 0000000000000..8038c75673092 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AuthenticationInitContextImpl.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import io.netty.channel.EventLoopGroup; +import io.netty.resolver.NameResolver; +import io.netty.util.Timer; +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.pulsar.client.api.AuthenticationInitContext; + +public class AuthenticationInitContextImpl implements AuthenticationInitContext { + private final Map, Object> sharedServices = new HashMap<>(); + private final Map, Object>> namedServices = new HashMap<>(); + public AuthenticationInitContextImpl(EventLoopGroup eventLoopGroup, + Timer timer, + NameResolver nameResolver) { + this.sharedServices.put(EventLoopGroup.class, eventLoopGroup); + this.sharedServices.put(Timer.class, timer); + this.sharedServices.put(NameResolver.class, nameResolver); + } + + @Override + @SuppressWarnings("unchecked") + public Optional getService(Class serviceClass) { + return Optional.ofNullable((T) sharedServices.get(serviceClass)); + } + + @Override + @SuppressWarnings("unchecked") + public Optional getServiceByName(Class serviceClass, String name) { + Map, Object> services = namedServices.get(name); + if (services != null) { + return Optional.ofNullable((T) services.get(serviceClass)); + } + return Optional.empty(); } + + public void addService(Class serviceClass, T instance) { + sharedServices.put(serviceClass, instance); + } + + public void addService(Class serviceClass, String name, T instance) { + namedServices.computeIfAbsent(name, k -> new HashMap<>()) + .put(serviceClass, instance); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 3a2ff97f51ecc..a9e710a9b8c51 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -228,7 +228,6 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr this.eventLoopGroup = eventLoopGroupReference; this.instrumentProvider = new InstrumentProvider(conf.getOpenTelemetry()); clientClock = conf.getClock(); - conf.getAuthentication().start(); this.scheduledExecutorProvider = scheduledExecutorProvider != null ? scheduledExecutorProvider : PulsarClientResourcesConfigurer.createScheduledExecutorProvider(conf); if (connectionPool != null) { @@ -270,6 +269,7 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr } else { this.timer = timer; } + conf.getAuthentication().start(buildAuthenticationInitContext()); lookup = createLookup(conf.getServiceUrl()); if (conf.getServiceUrlProvider() != null) { @@ -306,6 +306,13 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr throw t; } } + private AuthenticationInitContextImpl buildAuthenticationInitContext() { + return new AuthenticationInitContextImpl( + createdEventLoopGroup ? null : eventLoopGroup, + needStopTimer ? null : timer, + addressResolver == null ? null : DnsResolverUtil.adaptToNameResolver(addressResolver) + ); + } private void reduceConsumerReceiverQueueSize() { for (ConsumerBase consumer : consumers) { @@ -1086,7 +1093,7 @@ public void updateAuthentication(Authentication authentication) throws IOExcepti conf.getAuthentication().close(); } conf.setAuthentication(authentication); - conf.getAuthentication().start(); + conf.getAuthentication().start(buildAuthenticationInitContext()); } public void updateTlsTrustCertsFilePath(String tlsTrustCertsFilePath) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java index 033d5308a2a96..d5bb6682674ad 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java @@ -18,10 +18,15 @@ */ package org.apache.pulsar.client.impl.auth.oauth2; +import io.netty.resolver.NameResolver; +import java.net.InetAddress; import java.net.URL; import java.time.Clock; import java.time.Duration; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.impl.http.AuthenticationHttpClientConfig; +import org.apache.pulsar.client.impl.http.AuthenticationHttpClientFactory; +import org.asynchttpclient.AsyncHttpClient; /** * Factory class that allows to create {@link Authentication} instances @@ -78,6 +83,8 @@ public static class ClientCredentialsBuilder { private Duration connectTimeout; private Duration readTimeout; private String trustCertsFilePath; + private AsyncHttpClient httpClient; + private NameResolver nameResolver; private ClientCredentialsBuilder() { } @@ -163,25 +170,77 @@ public ClientCredentialsBuilder trustCertsFilePath(String trustCertsFilePath) { return this; } + /** + * Optional custom HTTP client. + * + * @param httpClient the HTTP client + * @return the builder + */ + public ClientCredentialsBuilder httpClient(AsyncHttpClient httpClient) { + this.httpClient = httpClient; + return this; + } + + /** + * Optional custom name resolver. + * + * @param nameResolver the name resolver + * @return the builder + */ + public ClientCredentialsBuilder nameResolver(NameResolver nameResolver) { + this.nameResolver = nameResolver; + return this; + } + /** * Authenticate with client credentials. * * @return an Authentication object */ public Authentication build() { + AsyncHttpClient finalHttpClient = this.httpClient; + NameResolver finalNameResolver = this.nameResolver; + + if (finalHttpClient == null || finalNameResolver == null) { + // 构建配置,处理可能的空值 + AuthenticationHttpClientConfig.builder configBuilder = + AuthenticationHttpClientConfig.builder(); + + if (connectTimeout != null) { + configBuilder.connectTimeout((int) connectTimeout.toMillis()); + } + + if (readTimeout != null) { + configBuilder.readTimeout((int) readTimeout.toMillis()); + } + + if (trustCertsFilePath != null) { + configBuilder.trustCertsFilePath(trustCertsFilePath); + } + + AuthenticationHttpClientFactory clientFactory = new AuthenticationHttpClientFactory( + configBuilder.build(), + null + ); + + if (finalHttpClient == null) { + finalHttpClient = clientFactory.createHttpClient(); + } + + if (finalNameResolver == null) { + finalNameResolver = clientFactory.getNameResolver(); + } + } ClientCredentialsFlow flow = ClientCredentialsFlow.builder() .issuerUrl(issuerUrl) .privateKey(credentialsUrl == null ? null : credentialsUrl.toExternalForm()) .audience(audience) .scope(scope) - .connectTimeout(connectTimeout) - .readTimeout(readTimeout) - .trustCertsFilePath(trustCertsFilePath) + .httpClient(finalHttpClient) + .nameResolver(finalNameResolver) .build(); return new AuthenticationOAuth2(flow, Clock.systemDefaultZone()); } } - - -} +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java index 694e4681d97f2..f9c912cd8804e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java @@ -20,7 +20,9 @@ import java.io.IOException; import java.time.Clock; +import java.time.Duration; import java.time.Instant; +import java.time.format.DateTimeParseException; import java.util.Map; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -28,26 +30,36 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.AuthenticationInitContext; import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.AuthenticationUtil; import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; +import org.apache.pulsar.client.impl.http.AuthenticationHttpClientConfig; +import org.apache.pulsar.client.impl.http.AuthenticationHttpClientFactory; /** * Pulsar client authentication provider based on OAuth 2.0. */ @Slf4j -public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticationParameterSupport { +public class AuthenticationOAuth2 implements Authentication, + EncodedAuthenticationParameterSupport { public static final String CONFIG_PARAM_TYPE = "type"; public static final String TYPE_CLIENT_CREDENTIALS = "client_credentials"; public static final String AUTH_METHOD_NAME = "token"; + public static final String CONFIG_PARAM_CONNECT_TIMEOUT = "connectTimeout"; + public static final String CONFIG_PARAM_READ_TIMEOUT = "readTimeout"; + public static final String CONFIG_PARAM_TRUST_CERTS_FILE_PATH = "trustCertsFilePath"; + protected static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(10); + protected static final Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(30); public static final double EXPIRY_ADJUSTMENT = 0.9; private static final long serialVersionUID = 1L; final Clock clock; Flow flow; transient CachedToken cachedToken; + private Map params; public AuthenticationOAuth2() { this.clock = Clock.systemDefaultZone(); @@ -68,21 +80,59 @@ public void configure(String encodedAuthParamString) { if (StringUtils.isBlank(encodedAuthParamString)) { throw new IllegalArgumentException("No authentication parameters were provided"); } - Map params; try { - params = AuthenticationUtil.configureFromJsonString(encodedAuthParamString); + this.params = AuthenticationUtil.configureFromJsonString(encodedAuthParamString); } catch (IOException e) { throw new IllegalArgumentException("Malformed authentication parameters", e); } + String type = this.params.getOrDefault(CONFIG_PARAM_TYPE, TYPE_CLIENT_CREDENTIALS); + if (!type.equals(TYPE_CLIENT_CREDENTIALS)) { + throw new IllegalArgumentException("Unsupported authentication type: " + type); + } + } + + private void initializeFlow(AuthenticationInitContext context) { + AuthenticationHttpClientConfig config = buildHttpConfig(params, context); + AuthenticationHttpClientFactory httpClientFactory = + new AuthenticationHttpClientFactory(config, context); + this.flow = ClientCredentialsFlow.fromParameters( + params, httpClientFactory.getNameResolver(), httpClientFactory.createHttpClient()); + } - String type = params.getOrDefault(CONFIG_PARAM_TYPE, TYPE_CLIENT_CREDENTIALS); - switch(type) { - case TYPE_CLIENT_CREDENTIALS: - this.flow = ClientCredentialsFlow.fromParameters(params); - break; - default: - throw new IllegalArgumentException("Unsupported authentication type: " + type); + private static int getParameterDurationToMillis(String name, Duration value, Duration defaultValue) { + Duration duration; + if (value == null) { + log.info("Configuration for [{}] is using the default value: [{}]", name, defaultValue); + duration = defaultValue; + } else { + log.info("Configuration for [{}] is: [{}]", name, value); + duration = value; } + + return (int) duration.toMillis(); + } + + + static int parseParameterDuration(Map params, String name, Duration defaultValue) { + String value = params.get(name); + if (StringUtils.isNotBlank(value)) { + try { + return getParameterDurationToMillis(name, Duration.parse(value), defaultValue); + } catch (DateTimeParseException e) { + throw new IllegalArgumentException("Malformed configuration parameter: " + name, e); + } + } + return 0; + } + + + private AuthenticationHttpClientConfig buildHttpConfig( + Map params, + AuthenticationInitContext context) { + int connectTimeout = parseParameterDuration(params, CONFIG_PARAM_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT); + int readTimeout = parseParameterDuration(params, CONFIG_PARAM_READ_TIMEOUT, DEFAULT_READ_TIMEOUT); + String trustCertsFilePath = params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH); + return new AuthenticationHttpClientConfig(readTimeout, connectTimeout, trustCertsFilePath, context); } @Override @@ -93,6 +143,12 @@ public void configure(Map authParams) { @Override public void start() throws PulsarClientException { + start(null); + } + + @Override + public void start(AuthenticationInitContext context) throws PulsarClientException { + initializeFlow(context); flow.initialize(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java index 7f64c0b18ac73..7aee8a61d7832 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java @@ -18,15 +18,16 @@ */ package org.apache.pulsar.client.impl.auth.oauth2; +import io.netty.resolver.NameResolver; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; +import java.net.InetAddress; import java.net.URISyntaxException; import java.net.URL; import java.net.URLConnection; import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.Map; import lombok.Builder; import lombok.extern.slf4j.Slf4j; @@ -37,6 +38,7 @@ import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenClient; import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenExchangeException; import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; +import org.asynchttpclient.AsyncHttpClient; /** * Implementation of OAuth 2.0 Client Credentials flow. @@ -62,8 +64,9 @@ class ClientCredentialsFlow extends FlowBase { @Builder public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope, - Duration connectTimeout, Duration readTimeout, String trustCertsFilePath) { - super(issuerUrl, connectTimeout, readTimeout, trustCertsFilePath); + AsyncHttpClient httpClient, NameResolver nameResolver + ) { + super(issuerUrl, httpClient, nameResolver); this.audience = audience; this.privateKey = privateKey; this.scope = scope; @@ -75,24 +78,21 @@ public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, * @param params * @return */ - public static ClientCredentialsFlow fromParameters(Map params) { + public static ClientCredentialsFlow fromParameters(Map params, + NameResolver nameResolver, + AsyncHttpClient httpClient) { URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL); String privateKeyUrl = parseParameterString(params, CONFIG_PARAM_KEY_FILE); // These are optional parameters, so we only perform a get String scope = params.get(CONFIG_PARAM_SCOPE); String audience = params.get(CONFIG_PARAM_AUDIENCE); - Duration connectTimeout = parseParameterDuration(params, CONFIG_PARAM_CONNECT_TIMEOUT); - Duration readTimeout = parseParameterDuration(params, CONFIG_PARAM_READ_TIMEOUT); - String trustCertsFilePath = params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH); - return ClientCredentialsFlow.builder() .issuerUrl(issuerUrl) .audience(audience) .privateKey(privateKeyUrl) + .httpClient(httpClient) + .nameResolver(nameResolver) .scope(scope) - .connectTimeout(connectTimeout) - .readTimeout(readTimeout) - .trustCertsFilePath(trustCertsFilePath) .build(); } @@ -133,7 +133,7 @@ public void initialize() throws PulsarClientException { assert this.metadata != null; URL tokenUrl = this.metadata.getTokenEndpoint(); - this.exchanger = new TokenClient(tokenUrl, httpClient); + this.exchanger = new TokenClient(tokenUrl, httpClient, this.nameResolver); initialized = true; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java index 6cc9f8e41b5e4..3273a897a28a4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java @@ -18,25 +18,19 @@ */ package org.apache.pulsar.client.impl.auth.oauth2; -import io.netty.handler.ssl.SslContextBuilder; -import java.io.File; +import io.netty.resolver.NameResolver; import java.io.IOException; +import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URL; -import java.time.Duration; -import java.time.format.DateTimeParseException; import java.util.Map; -import javax.net.ssl.SSLException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver; import org.apache.pulsar.client.impl.auth.oauth2.protocol.Metadata; import org.apache.pulsar.client.impl.auth.oauth2.protocol.MetadataResolver; import org.asynchttpclient.AsyncHttpClient; -import org.asynchttpclient.DefaultAsyncHttpClient; -import org.asynchttpclient.DefaultAsyncHttpClientConfig; /** * An abstract OAuth 2.0 authorization flow. @@ -44,13 +38,6 @@ @Slf4j abstract class FlowBase implements Flow { - public static final String CONFIG_PARAM_CONNECT_TIMEOUT = "connectTimeout"; - public static final String CONFIG_PARAM_READ_TIMEOUT = "readTimeout"; - public static final String CONFIG_PARAM_TRUST_CERTS_FILE_PATH = "trustCertsFilePath"; - - protected static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(10); - protected static final Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(30); - private static final long serialVersionUID = 1L; protected final URL issuerUrl; @@ -58,47 +45,14 @@ abstract class FlowBase implements Flow { protected transient Metadata metadata; - protected FlowBase(URL issuerUrl, Duration connectTimeout, Duration readTimeout, String trustCertsFilePath) { - this.issuerUrl = issuerUrl; - this.httpClient = defaultHttpClient(readTimeout, connectTimeout, trustCertsFilePath); - } + protected NameResolver nameResolver; - private AsyncHttpClient defaultHttpClient(Duration readTimeout, Duration connectTimeout, - String trustCertsFilePath) { - DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); - confBuilder.setCookieStore(null); - confBuilder.setUseProxyProperties(true); - confBuilder.setFollowRedirect(true); - confBuilder.setConnectTimeout( - getParameterDurationToMillis(CONFIG_PARAM_CONNECT_TIMEOUT, connectTimeout, - DEFAULT_CONNECT_TIMEOUT)); - confBuilder.setReadTimeout( - getParameterDurationToMillis(CONFIG_PARAM_READ_TIMEOUT, readTimeout, DEFAULT_READ_TIMEOUT)); - confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); - if (StringUtils.isNotBlank(trustCertsFilePath)) { - try { - confBuilder.setSslContext(SslContextBuilder.forClient() - .trustManager(new File(trustCertsFilePath)) - .build()); - } catch (SSLException e) { - log.error("Could not set " + CONFIG_PARAM_TRUST_CERTS_FILE_PATH, e); - } - } - return new DefaultAsyncHttpClient(confBuilder.build()); + protected FlowBase(URL issuerUrl, AsyncHttpClient httpClient, NameResolver nameResolver) { + this.issuerUrl = issuerUrl; + this.httpClient = httpClient; + this.nameResolver = nameResolver; } - private int getParameterDurationToMillis(String name, Duration value, Duration defaultValue) { - Duration duration; - if (value == null) { - log.info("Configuration for [{}] is using the default value: [{}]", name, defaultValue); - duration = defaultValue; - } else { - log.info("Configuration for [{}] is: [{}]", name, value); - duration = value; - } - - return (int) duration.toMillis(); - } public void initialize() throws PulsarClientException { try { @@ -110,7 +64,7 @@ public void initialize() throws PulsarClientException { } protected MetadataResolver createMetadataResolver() { - return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, httpClient); + return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, httpClient, nameResolver); } static String parseParameterString(Map params, String name) { @@ -133,18 +87,6 @@ static URL parseParameterUrl(Map params, String name) { } } - static Duration parseParameterDuration(Map params, String name) { - String value = params.get(name); - if (StringUtils.isNotBlank(value)) { - try { - return Duration.parse(value); - } catch (DateTimeParseException e) { - throw new IllegalArgumentException("Malformed configuration parameter: " + name, e); - } - } - return null; - } - @Override public void close() throws Exception { httpClient.close(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java index 19d0c1acadd15..a96656e037903 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java @@ -21,14 +21,17 @@ import com.fasterxml.jackson.databind.ObjectReader; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.resolver.NameResolver; import java.io.IOException; import java.io.InputStream; +import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; import java.util.concurrent.ExecutionException; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.BoundRequestBuilder; import org.asynchttpclient.Response; /** @@ -39,11 +42,14 @@ public class DefaultMetadataResolver implements MetadataResolver { private final URL metadataUrl; private final ObjectReader objectReader; private final AsyncHttpClient httpClient; + private final NameResolver nameResolver; - public DefaultMetadataResolver(URL metadataUrl, AsyncHttpClient httpClient) { + public DefaultMetadataResolver(URL metadataUrl, AsyncHttpClient httpClient, + NameResolver nameResolver) { this.metadataUrl = metadataUrl; this.objectReader = ObjectMapperFactory.getMapper().reader().forType(Metadata.class); this.httpClient = httpClient; + this.nameResolver = nameResolver; } /** @@ -52,8 +58,10 @@ public DefaultMetadataResolver(URL metadataUrl, AsyncHttpClient httpClient) { * @param issuerUrl The authorization server's issuer identifier * @return a resolver */ - public static DefaultMetadataResolver fromIssuerUrl(URL issuerUrl, AsyncHttpClient httpClient) { - return new DefaultMetadataResolver(getWellKnownMetadataUrl(issuerUrl), httpClient); + public static DefaultMetadataResolver fromIssuerUrl(URL issuerUrl, + AsyncHttpClient httpClient, + NameResolver nameResolver) { + return new DefaultMetadataResolver(getWellKnownMetadataUrl(issuerUrl), httpClient, nameResolver); } /** @@ -81,11 +89,12 @@ public static URL getWellKnownMetadataUrl(URL issuerUrl) { public Metadata resolve() throws IOException { try { - Response response = httpClient.prepareGet(metadataUrl.toString()) - .addHeader(HttpHeaderNames.ACCEPT, HttpHeaderValues.APPLICATION_JSON) - .execute() - .toCompletableFuture() - .get(); + BoundRequestBuilder requestBuilder = httpClient.prepareGet(metadataUrl.toString()) + .addHeader(HttpHeaderNames.ACCEPT, HttpHeaderValues.APPLICATION_JSON); + if (nameResolver != null) { + requestBuilder.setNameResolver(nameResolver); + } + Response response = requestBuilder.execute().toCompletableFuture().get(); Metadata metadata; try (InputStream inputStream = response.getResponseBodyAsStream()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java index cb4c2a551d01e..be94b9c990ae1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.client.impl.auth.oauth2.protocol; +import io.netty.resolver.NameResolver; import java.io.IOException; +import java.net.InetAddress; import java.net.URL; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; @@ -29,6 +31,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.BoundRequestBuilder; import org.asynchttpclient.Response; /** @@ -38,10 +41,12 @@ public class TokenClient implements ClientCredentialsExchanger { private final URL tokenUrl; private final AsyncHttpClient httpClient; + private final NameResolver nameResolver; - public TokenClient(URL tokenUrl, AsyncHttpClient httpClient) { + public TokenClient(URL tokenUrl, AsyncHttpClient httpClient, NameResolver nameResolver) { this.httpClient = httpClient; this.tokenUrl = tokenUrl; + this.nameResolver = nameResolver; } @Override @@ -85,13 +90,14 @@ public TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest re String body = buildClientCredentialsBody(req); try { - - Response res = httpClient.preparePost(tokenUrl.toString()) + BoundRequestBuilder requestBuilder = httpClient.preparePost(tokenUrl.toString()) .setHeader("Accept", "application/json") .setHeader("Content-Type", "application/x-www-form-urlencoded") - .setBody(body) - .execute() - .get(); + .setBody(body); + if (nameResolver != null) { + requestBuilder.setNameResolver(nameResolver); + } + Response res = requestBuilder.execute().get(); switch (res.getStatusCode()) { case 200: diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java new file mode 100644 index 0000000000000..08b631b2d5113 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.http; + +import lombok.Builder; +import lombok.Data; +import org.apache.pulsar.client.api.AuthenticationInitContext; + +@Data +public class AuthenticationHttpClientConfig { + private int readTimeout = 30000; + private int connectTimeout = 10000; + private String trustCertsFilePath; + private AuthenticationInitContext context; + + @Builder(builderClassName = "builder") + public AuthenticationHttpClientConfig(int readTimeout, int connectTimeout, String trustCertsFilePath, + AuthenticationInitContext context) { + this.readTimeout = readTimeout; + this.connectTimeout = connectTimeout; + this.trustCertsFilePath = trustCertsFilePath; + this.context = context; + } + + @Builder(builderClassName = "builder") + public AuthenticationHttpClientConfig(String trustCertsFilePath, AuthenticationInitContext context) { + this.trustCertsFilePath = trustCertsFilePath; + this.context = context; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientFactory.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientFactory.java new file mode 100644 index 0000000000000..33c1a833ab6f4 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientFactory.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.http; + +import io.netty.channel.EventLoopGroup; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.resolver.NameResolver; +import io.netty.util.Timer; +import java.io.File; +import java.net.InetAddress; +import java.util.Optional; +import javax.net.ssl.SSLException; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.client.api.AuthenticationInitContext; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; + +@Slf4j +public class AuthenticationHttpClientFactory { + + private final AuthenticationHttpClientConfig config; + private final AuthenticationInitContext context; + + public AuthenticationHttpClientFactory(AuthenticationHttpClientConfig config, + AuthenticationInitContext context) { + this.config = config; + this.context = context; + } + + public AsyncHttpClient createHttpClient() { + DefaultAsyncHttpClientConfig.Builder confBuilder = buildBaseConfig(); + return new DefaultAsyncHttpClient(confBuilder.build()); + } + + @SuppressWarnings("unchecked") + public NameResolver getNameResolver() { + return Optional.ofNullable(context) + .flatMap(ctx -> ctx.getService(NameResolver.class)) + .orElse(null); + } + + private DefaultAsyncHttpClientConfig.Builder buildBaseConfig() { + DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); + + if (context != null) { + context.getService(Timer.class).ifPresent(confBuilder::setNettyTimer); + context.getService(EventLoopGroup.class).ifPresent(confBuilder::setEventLoopGroup); + } + + confBuilder.setCookieStore(null); + confBuilder.setUseProxyProperties(true); + confBuilder.setFollowRedirect(true); + confBuilder.setConnectTimeout(config.getConnectTimeout()); + confBuilder.setReadTimeout(config.getReadTimeout()); + confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); + + if (StringUtils.isNotBlank(config.getTrustCertsFilePath())) { + try { + confBuilder.setSslContext(SslContextBuilder.forClient() + .trustManager(new File(config.getTrustCertsFilePath())) + .build()); + } catch (SSLException e) { + log.error("Could not set trustCertsFilePath", e); + } + } + + return confBuilder; + } +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/package-info.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/package-info.java new file mode 100644 index 0000000000000..cf2f6e6f3a83e --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Pulsar Authentication http client. + */ +package org.apache.pulsar.client.impl.http; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java index 131427682076e..9dd82e77abe90 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java @@ -42,7 +42,7 @@ public void exchangeClientCredentialsSuccessByScopeTest() throws IOException, TokenExchangeException, ExecutionException, InterruptedException { DefaultAsyncHttpClient defaultAsyncHttpClient = mock(DefaultAsyncHttpClient.class); URL url = new URL("http://localhost"); - TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient); + TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient, null); ClientCredentialsExchangeRequest request = ClientCredentialsExchangeRequest.builder() .audience("test-audience") .clientId("test-client-id") @@ -75,7 +75,7 @@ public void exchangeClientCredentialsSuccessWithoutOptionalClientCredentialsTest IOException, TokenExchangeException, ExecutionException, InterruptedException { DefaultAsyncHttpClient defaultAsyncHttpClient = mock(DefaultAsyncHttpClient.class); URL url = new URL("http://localhost"); - TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient); + TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient, null); ClientCredentialsExchangeRequest request = ClientCredentialsExchangeRequest.builder() .clientId("test-client-id") .clientSecret("test-client-secret") From eb53bef6182a2a40bead3a877c22ea095c3dd55a Mon Sep 17 00:00:00 2001 From: 3pacccccc Date: Sat, 13 Dec 2025 01:30:01 +0800 Subject: [PATCH 02/10] update --- .../oauth2/AuthenticationFactoryOAuth2.java | 12 +++--- .../auth/oauth2/AuthenticationOAuth2.java | 38 ++++++++++++++++++- .../auth/oauth2/ClientCredentialsFlow.java | 6 +-- .../client/impl/auth/oauth2/FlowBase.java | 20 ---------- .../http/AuthenticationHttpClientConfig.java | 7 +--- 5 files changed, 46 insertions(+), 37 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java index d5bb6682674ad..9821f97c7fb27 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java @@ -203,23 +203,23 @@ public Authentication build() { if (finalHttpClient == null || finalNameResolver == null) { // 构建配置,处理可能的空值 - AuthenticationHttpClientConfig.builder configBuilder = - AuthenticationHttpClientConfig.builder(); + AuthenticationHttpClientConfig httpClientConfig = new + AuthenticationHttpClientConfig(); if (connectTimeout != null) { - configBuilder.connectTimeout((int) connectTimeout.toMillis()); + httpClientConfig.setConnectTimeout((int) connectTimeout.toMillis()); } if (readTimeout != null) { - configBuilder.readTimeout((int) readTimeout.toMillis()); + httpClientConfig.setReadTimeout((int) readTimeout.toMillis()); } if (trustCertsFilePath != null) { - configBuilder.trustCertsFilePath(trustCertsFilePath); + httpClientConfig.setTrustCertsFilePath(trustCertsFilePath); } AuthenticationHttpClientFactory clientFactory = new AuthenticationHttpClientFactory( - configBuilder.build(), + httpClientConfig, null ); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java index f9c912cd8804e..33b252a9d6924 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java @@ -19,6 +19,8 @@ package org.apache.pulsar.client.impl.auth.oauth2; import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; import java.time.Clock; import java.time.Duration; import java.time.Instant; @@ -51,6 +53,9 @@ public class AuthenticationOAuth2 implements Authentication, public static final String CONFIG_PARAM_CONNECT_TIMEOUT = "connectTimeout"; public static final String CONFIG_PARAM_READ_TIMEOUT = "readTimeout"; public static final String CONFIG_PARAM_TRUST_CERTS_FILE_PATH = "trustCertsFilePath"; + public static final String CONFIG_PARAM_ISSUER_URL = "issuerUrl"; + public static final String CONFIG_PARAM_KEY_FILE = "privateKey"; + protected static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(10); protected static final Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(30); public static final double EXPIRY_ADJUSTMENT = 0.9; @@ -60,6 +65,8 @@ public class AuthenticationOAuth2 implements Authentication, Flow flow; transient CachedToken cachedToken; private Map params; + private URL issuerUrl; + private String privateKeyUrl; public AuthenticationOAuth2() { this.clock = Clock.systemDefaultZone(); @@ -89,6 +96,8 @@ public void configure(String encodedAuthParamString) { if (!type.equals(TYPE_CLIENT_CREDENTIALS)) { throw new IllegalArgumentException("Unsupported authentication type: " + type); } + this.issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL); + this.privateKeyUrl = parseParameterString(params, CONFIG_PARAM_KEY_FILE); } private void initializeFlow(AuthenticationInitContext context) { @@ -96,9 +105,32 @@ private void initializeFlow(AuthenticationInitContext context) { AuthenticationHttpClientFactory httpClientFactory = new AuthenticationHttpClientFactory(config, context); this.flow = ClientCredentialsFlow.fromParameters( - params, httpClientFactory.getNameResolver(), httpClientFactory.createHttpClient()); + params, httpClientFactory.getNameResolver(), + httpClientFactory.createHttpClient(), issuerUrl, privateKeyUrl); + } + + static String parseParameterString(Map params, String name) { + String s = params.get(name); + if (StringUtils.isEmpty(s)) { + throw new IllegalArgumentException("Required configuration parameter: " + name); + } + return s; + } + + + static URL parseParameterUrl(Map params, String name) { + String s = params.get(name); + if (StringUtils.isEmpty(s)) { + throw new IllegalArgumentException("Required configuration parameter: " + name); + } + try { + return new URL(s); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Malformed configuration parameter: " + name); + } } + private static int getParameterDurationToMillis(String name, Duration value, Duration defaultValue) { Duration duration; if (value == null) { @@ -148,7 +180,9 @@ public void start() throws PulsarClientException { @Override public void start(AuthenticationInitContext context) throws PulsarClientException { - initializeFlow(context); + if (flow == null) { + initializeFlow(context); + } flow.initialize(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java index 7aee8a61d7832..df15881b0c3cf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java @@ -80,9 +80,9 @@ public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, */ public static ClientCredentialsFlow fromParameters(Map params, NameResolver nameResolver, - AsyncHttpClient httpClient) { - URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL); - String privateKeyUrl = parseParameterString(params, CONFIG_PARAM_KEY_FILE); + AsyncHttpClient httpClient, + URL issuerUrl, + String privateKeyUrl) { // These are optional parameters, so we only perform a get String scope = params.get(CONFIG_PARAM_SCOPE); String audience = params.get(CONFIG_PARAM_AUDIENCE); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java index 3273a897a28a4..de0325d981806 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java @@ -67,26 +67,6 @@ protected MetadataResolver createMetadataResolver() { return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, httpClient, nameResolver); } - static String parseParameterString(Map params, String name) { - String s = params.get(name); - if (StringUtils.isEmpty(s)) { - throw new IllegalArgumentException("Required configuration parameter: " + name); - } - return s; - } - - static URL parseParameterUrl(Map params, String name) { - String s = params.get(name); - if (StringUtils.isEmpty(s)) { - throw new IllegalArgumentException("Required configuration parameter: " + name); - } - try { - return new URL(s); - } catch (MalformedURLException e) { - throw new IllegalArgumentException("Malformed configuration parameter: " + name); - } - } - @Override public void close() throws Exception { httpClient.close(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java index 08b631b2d5113..703ebc88cf584 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.client.impl.http; -import lombok.Builder; import lombok.Data; import org.apache.pulsar.client.api.AuthenticationInitContext; @@ -29,7 +28,6 @@ public class AuthenticationHttpClientConfig { private String trustCertsFilePath; private AuthenticationInitContext context; - @Builder(builderClassName = "builder") public AuthenticationHttpClientConfig(int readTimeout, int connectTimeout, String trustCertsFilePath, AuthenticationInitContext context) { this.readTimeout = readTimeout; @@ -38,9 +36,6 @@ public AuthenticationHttpClientConfig(int readTimeout, int connectTimeout, Strin this.context = context; } - @Builder(builderClassName = "builder") - public AuthenticationHttpClientConfig(String trustCertsFilePath, AuthenticationInitContext context) { - this.trustCertsFilePath = trustCertsFilePath; - this.context = context; + public AuthenticationHttpClientConfig() { } } From 482b5cea07b5355db9bfa9a70378e88a3a96e81d Mon Sep 17 00:00:00 2001 From: 3pacccccc Date: Sat, 13 Dec 2025 01:34:56 +0800 Subject: [PATCH 03/10] remove unused import --- .../org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java index de0325d981806..49e31f1d63f46 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java @@ -21,11 +21,8 @@ import io.netty.resolver.NameResolver; import java.io.IOException; import java.net.InetAddress; -import java.net.MalformedURLException; import java.net.URL; -import java.util.Map; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver; import org.apache.pulsar.client.impl.auth.oauth2.protocol.Metadata; From 45c7152fda860f0f1b7eefe83ada3e2fef8d49ff Mon Sep 17 00:00:00 2001 From: 3pacccccc Date: Sat, 13 Dec 2025 16:17:22 +0800 Subject: [PATCH 04/10] update --- .../auth/oauth2/AuthenticationFactoryOAuth2.java | 13 ++++++------- .../impl/auth/oauth2/AuthenticationOAuth2.java | 2 +- .../impl/http/AuthenticationHttpClientConfig.java | 13 ++++++------- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java index 9821f97c7fb27..af7953a03f48a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java @@ -202,24 +202,23 @@ public Authentication build() { NameResolver finalNameResolver = this.nameResolver; if (finalHttpClient == null || finalNameResolver == null) { - // 构建配置,处理可能的空值 - AuthenticationHttpClientConfig httpClientConfig = new - AuthenticationHttpClientConfig(); + AuthenticationHttpClientConfig.ConfigBuilder builder = + AuthenticationHttpClientConfig.builder(); if (connectTimeout != null) { - httpClientConfig.setConnectTimeout((int) connectTimeout.toMillis()); + builder.connectTimeout((int) connectTimeout.toMillis()); } if (readTimeout != null) { - httpClientConfig.setReadTimeout((int) readTimeout.toMillis()); + builder.readTimeout((int) readTimeout.toMillis()); } if (trustCertsFilePath != null) { - httpClientConfig.setTrustCertsFilePath(trustCertsFilePath); + builder.trustCertsFilePath(trustCertsFilePath); } AuthenticationHttpClientFactory clientFactory = new AuthenticationHttpClientFactory( - httpClientConfig, + builder.build(), null ); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java index 33b252a9d6924..bc5be3518a3e5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java @@ -164,7 +164,7 @@ private AuthenticationHttpClientConfig buildHttpConfig( int connectTimeout = parseParameterDuration(params, CONFIG_PARAM_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT); int readTimeout = parseParameterDuration(params, CONFIG_PARAM_READ_TIMEOUT, DEFAULT_READ_TIMEOUT); String trustCertsFilePath = params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH); - return new AuthenticationHttpClientConfig(readTimeout, connectTimeout, trustCertsFilePath, context); + return new AuthenticationHttpClientConfig(readTimeout, connectTimeout, trustCertsFilePath); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java index 703ebc88cf584..fd07da5e5794a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java @@ -18,22 +18,21 @@ */ package org.apache.pulsar.client.impl.http; -import lombok.Data; -import org.apache.pulsar.client.api.AuthenticationInitContext; -@Data +import lombok.Builder; +import lombok.Getter; + +@Getter public class AuthenticationHttpClientConfig { private int readTimeout = 30000; private int connectTimeout = 10000; private String trustCertsFilePath; - private AuthenticationInitContext context; - public AuthenticationHttpClientConfig(int readTimeout, int connectTimeout, String trustCertsFilePath, - AuthenticationInitContext context) { + @Builder(builderClassName = "ConfigBuilder") + public AuthenticationHttpClientConfig(int readTimeout, int connectTimeout, String trustCertsFilePath) { this.readTimeout = readTimeout; this.connectTimeout = connectTimeout; this.trustCertsFilePath = trustCertsFilePath; - this.context = context; } public AuthenticationHttpClientConfig() { From db8886ec45cf0734f3598d430b94e0c9869397cb Mon Sep 17 00:00:00 2001 From: 3pacccccc Date: Sat, 13 Dec 2025 16:39:04 +0800 Subject: [PATCH 05/10] update --- .../apache/pulsar/client/impl/ClientInitializationTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java index f7ff30c286c76..344c7c209ed34 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java @@ -25,6 +25,7 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.mockito.Mockito; import org.testng.annotations.Test; public class ClientInitializationTest { @@ -40,7 +41,7 @@ public void testInitializeAuthWithTls() throws PulsarClientException { .authentication(auth) .build(); - verify(auth).start(); + verify(auth).start(Mockito.any()); verify(auth, times(0)).getAuthData(); } } From fb05bbc2f4ca9576d479d185732c2128a162ea72 Mon Sep 17 00:00:00 2001 From: 3pacccccc Date: Sat, 13 Dec 2025 23:27:19 +0800 Subject: [PATCH 06/10] update default value of timeout --- .../pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java | 2 +- .../client/impl/http/AuthenticationHttpClientConfig.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java index bc5be3518a3e5..6b99c23db48cb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java @@ -154,7 +154,7 @@ static int parseParameterDuration(Map params, String name, Durat throw new IllegalArgumentException("Malformed configuration parameter: " + name, e); } } - return 0; + return (int) defaultValue.toMillis(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java index fd07da5e5794a..e2f83872064dc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java @@ -30,8 +30,8 @@ public class AuthenticationHttpClientConfig { @Builder(builderClassName = "ConfigBuilder") public AuthenticationHttpClientConfig(int readTimeout, int connectTimeout, String trustCertsFilePath) { - this.readTimeout = readTimeout; - this.connectTimeout = connectTimeout; + this.readTimeout = readTimeout > 0 ? readTimeout : this.readTimeout; + this.connectTimeout = connectTimeout > 0 ? connectTimeout : this.connectTimeout; this.trustCertsFilePath = trustCertsFilePath; } From 046aa19e19b9229a6e065bf07a9d446cd6d8d990 Mon Sep 17 00:00:00 2001 From: 3pacccccc Date: Sun, 14 Dec 2025 02:09:43 +0800 Subject: [PATCH 07/10] update --- .../client/api/AuthenticationInitContext.java | 80 ++++++---- .../impl/AuthenticationInitContextImpl.java | 10 +- .../AuthenticationHttpClientConfig.java | 73 +++++++++ .../AuthenticationHttpClientFactory.java | 40 ++++- .../httpclient}/package-info.java | 2 +- .../oauth2/AuthenticationFactoryOAuth2.java | 14 +- .../auth/oauth2/AuthenticationOAuth2.java | 9 +- .../http/AuthenticationHttpClientConfig.java | 40 ----- .../AuthenticationInitContextImplTest.java | 141 ++++++++++++++++++ .../AuthenticationHttpClientFactoryTest.java | 110 ++++++++++++++ .../auth/oauth2/AuthenticationOAuth2Test.java | 3 + 11 files changed, 441 insertions(+), 81 deletions(-) create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/AuthenticationHttpClientConfig.java rename pulsar-client/src/main/java/org/apache/pulsar/client/impl/{http => auth/httpclient}/AuthenticationHttpClientFactory.java (71%) rename pulsar-client/src/main/java/org/apache/pulsar/client/impl/{http => auth/httpclient}/package-info.java (93%) delete mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/AuthenticationInitContextImplTest.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/http/AuthenticationHttpClientFactoryTest.java diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationInitContext.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationInitContext.java index 3cdc24184ebe4..83ab1aa24445c 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationInitContext.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationInitContext.java @@ -30,38 +30,66 @@ * creating their own instances. This improves resource utilization and performance, especially * when multiple client instances or authentication providers are used within the same application. * - *

Usage Examples

- * - *

Getting a shared EventLoopGroup: - *

{@code
- * public void start(AuthenticationInitContext context) throws PulsarClientException {
- *     Optional eventLoop = context.getService(EventLoopGroup.class);
- *     if (eventLoop.isPresent()) {
- *         this.sharedEventLoopGroup = eventLoop.get();
- *     } else {
- *         // Fallback to creating own instance
- *         this.ownEventLoopGroup = new NioEventLoopGroup();
- *     }
- * }
- * }
- * - *

Getting a named DNS resolver: - *

{@code
- * public void start(AuthenticationInitContext context) throws PulsarClientException {
- *     Optional dnsResolver = context.getServiceByName(DnsResolver.class, "secure-dns");
- *     if (dnsResolver.isPresent()) {
- *         this.dnsResolver = dnsResolver.get();
- *     }
- * }
- * }
- * *

Authentication providers should prefer using shared resources when available to minimize * system resource consumption and improve performance through better resource reuse. * * @see Authentication - * @since 2.11.0 */ public interface AuthenticationInitContext { + /** + * Retrieves a shared service instance by its class type. + * + *

This method looks up a globally registered service which is shared among + * all authentication providers. + * + *

Example: + *

{@code
+     * Optional eventLoop = context.getService(EventLoopGroup.class);
+     * if (eventLoop.isPresent()) {
+     *     // Use the shared event loop group
+     *     this.eventLoopGroup = eventLoop.get();
+     * } else {
+     *     // Fallback to creating a new instance
+     *     this.eventLoopGroup = new NioEventLoopGroup();
+     * }
+     * }
+ * + * @param The type of service to retrieve + * @param serviceClass The class of the service to retrieve + * @return An {@link Optional} containing the service instance if available, + * or {@link Optional#empty()} if no such service is registered + */ Optional getService(Class serviceClass); + + /** + * Retrieves a named shared service instance by its class type and name. + * + *

This method allows lookup of services that are registered under a specific + * name, enabling multiple instances of the same service type to be distinguished. + * This is useful for: + *

    + *
  • Specialized DNS resolvers (e.g., "secure-dns", "internal-dns") + *
  • Different thread pools for various purposes + *
  • Multiple timer instances with different configurations + *
  • HTTP clients with different proxy configurations + *
+ * + *

Example: + *

{@code
+     * // Get a DNS resolver configured for internal network resolution
+     * Optional internalResolver =
+     *     context.getServiceByName(NameResolver.class, "internal-network");
+     *
+     * // Get a different DNS resolver for external resolution
+     * Optional externalResolver =
+     *     context.getServiceByName(NameResolver.class, "external-network");
+     * }
+ * + * @param The type of service to retrieve + * @param serviceClass The class of the service to retrieve. Cannot be null. + * @param name The name under which the service is registered. Cannot be null or empty. + * @return An {@link Optional} containing the named service instance if available, + * or {@link Optional#empty()} if no such service is registered with the given name + */ Optional getServiceByName(Class serviceClass, String name); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AuthenticationInitContextImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AuthenticationInitContextImpl.java index 8038c75673092..3f6d782f0035f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AuthenticationInitContextImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AuthenticationInitContextImpl.java @@ -30,6 +30,7 @@ public class AuthenticationInitContextImpl implements AuthenticationInitContext { private final Map, Object> sharedServices = new HashMap<>(); private final Map, Object>> namedServices = new HashMap<>(); + public AuthenticationInitContextImpl(EventLoopGroup eventLoopGroup, Timer timer, NameResolver nameResolver) { @@ -41,17 +42,24 @@ public AuthenticationInitContextImpl(EventLoopGroup eventLoopGroup, @Override @SuppressWarnings("unchecked") public Optional getService(Class serviceClass) { + if (serviceClass == null) { + throw new IllegalArgumentException("Service class cannot be null"); + } return Optional.ofNullable((T) sharedServices.get(serviceClass)); } @Override @SuppressWarnings("unchecked") public Optional getServiceByName(Class serviceClass, String name) { + if (name == null || name.isEmpty()) { + throw new IllegalArgumentException("Service name cannot be null or empty"); + } Map, Object> services = namedServices.get(name); if (services != null) { return Optional.ofNullable((T) services.get(serviceClass)); } - return Optional.empty(); } + return Optional.empty(); + } public void addService(Class serviceClass, T instance) { sharedServices.put(serviceClass, instance); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/AuthenticationHttpClientConfig.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/AuthenticationHttpClientConfig.java new file mode 100644 index 0000000000000..4e7981fc29348 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/AuthenticationHttpClientConfig.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.httpclient; + + +import lombok.Builder; +import lombok.Getter; + +/** + * Configuration for HTTP clients used in authentication providers. + * + *

This class encapsulates the configuration parameters needed to create HTTP clients + * for authentication-related HTTP requests, such as OAuth2 token exchange requests. + * + *

Configuration Parameters

+ *
    + *
  • readTimeout: Maximum time to wait for a response from the server + * (default: 30000 ms) + *
  • connectTimeout: Maximum time to establish a connection + * (default: 10000 ms) + *
  • trustCertsFilePath: Path to a custom trust certificate file + * for TLS/SSL connections + *
+ * + *

Usage

+ *
{@code
+ * // Using the builder pattern
+ * AuthenticationHttpClientConfig config = AuthenticationHttpClientConfig.builder()
+ *     .readTimeout(30000)
+ *     .connectTimeout(10000)
+ *     .trustCertsFilePath("/path/to/certs.pem")
+ *     .build();
+ *
+ * // Using the factory
+ * AuthenticationHttpClientFactory factory = new AuthenticationHttpClientFactory(config, context);
+ * AsyncHttpClient httpClient = factory.createHttpClient();
+ * }
+ * + *

This configuration is primarily used by {@link AuthenticationHttpClientFactory} to + * create properly configured HTTP clients for authentication providers. + * + * @see AuthenticationHttpClientFactory + * @see org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 + */ +@Getter +public class AuthenticationHttpClientConfig { + private int readTimeout = 30000; + private int connectTimeout = 10000; + private final String trustCertsFilePath; + + @Builder(builderClassName = "ConfigBuilder") + public AuthenticationHttpClientConfig(int readTimeout, int connectTimeout, String trustCertsFilePath) { + this.readTimeout = readTimeout > 0 ? readTimeout : this.readTimeout; + this.connectTimeout = connectTimeout > 0 ? connectTimeout : this.connectTimeout; + this.trustCertsFilePath = trustCertsFilePath; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientFactory.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/AuthenticationHttpClientFactory.java similarity index 71% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientFactory.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/AuthenticationHttpClientFactory.java index 33c1a833ab6f4..b3d02c9e6ef6c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientFactory.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/AuthenticationHttpClientFactory.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.impl.http; +package org.apache.pulsar.client.impl.auth.httpclient; import io.netty.channel.EventLoopGroup; import io.netty.handler.ssl.SslContextBuilder; @@ -34,6 +34,44 @@ import org.asynchttpclient.DefaultAsyncHttpClient; import org.asynchttpclient.DefaultAsyncHttpClientConfig; +/** + * Factory for creating HTTP clients used by authentication providers. + * + *

This factory creates {@link AsyncHttpClient} instances that are optimized for + * authentication-related HTTP requests. It supports: + *

    + *
  • Reusing shared resources from {@link AuthenticationInitContext} + *
  • Configurable timeouts for connections and reads + *
  • Custom SSL/TLS trust certificates + *
  • DNS resolver configuration + *
+ * + *

Resource Sharing

+ *

When a {@link AuthenticationInitContext} is provided, the factory will attempt to + * reuse shared resources: + *

    + *
  • {@link EventLoopGroup}: For I/O operations + *
  • {@link Timer}: For scheduling timeouts + *
  • {@link NameResolver}: For DNS resolution + *
+ * + *

Usage Example

+ *
{@code
+ * AuthenticationHttpClientConfig config = AuthenticationHttpClientConfig.builder()
+ *     .readTimeout(30000)
+ *     .connectTimeout(10000)
+ *     .build();
+ *
+ * AuthenticationHttpClientFactory factory =
+ *     new AuthenticationHttpClientFactory(config, authenticationContext);
+ *
+ * AsyncHttpClient httpClient = factory.createHttpClient();
+ * NameResolver nameResolver = factory.getNameResolver();
+ * }
+ * + * @see AuthenticationHttpClientConfig + * @see AuthenticationInitContext + */ @Slf4j public class AuthenticationHttpClientFactory { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/package-info.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/package-info.java similarity index 93% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/package-info.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/package-info.java index cf2f6e6f3a83e..d5a1142bc5e82 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/package-info.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/package-info.java @@ -20,4 +20,4 @@ /** * Pulsar Authentication http client. */ -package org.apache.pulsar.client.impl.http; +package org.apache.pulsar.client.impl.auth.httpclient; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java index af7953a03f48a..b779c4a7cfce8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java @@ -24,8 +24,8 @@ import java.time.Clock; import java.time.Duration; import org.apache.pulsar.client.api.Authentication; -import org.apache.pulsar.client.impl.http.AuthenticationHttpClientConfig; -import org.apache.pulsar.client.impl.http.AuthenticationHttpClientFactory; +import org.apache.pulsar.client.impl.auth.httpclient.AuthenticationHttpClientConfig; +import org.apache.pulsar.client.impl.auth.httpclient.AuthenticationHttpClientFactory; import org.asynchttpclient.AsyncHttpClient; /** @@ -202,23 +202,23 @@ public Authentication build() { NameResolver finalNameResolver = this.nameResolver; if (finalHttpClient == null || finalNameResolver == null) { - AuthenticationHttpClientConfig.ConfigBuilder builder = + AuthenticationHttpClientConfig.ConfigBuilder configBuilder = AuthenticationHttpClientConfig.builder(); if (connectTimeout != null) { - builder.connectTimeout((int) connectTimeout.toMillis()); + configBuilder.connectTimeout((int) connectTimeout.toMillis()); } if (readTimeout != null) { - builder.readTimeout((int) readTimeout.toMillis()); + configBuilder.readTimeout((int) readTimeout.toMillis()); } if (trustCertsFilePath != null) { - builder.trustCertsFilePath(trustCertsFilePath); + configBuilder.trustCertsFilePath(trustCertsFilePath); } AuthenticationHttpClientFactory clientFactory = new AuthenticationHttpClientFactory( - builder.build(), + configBuilder.build(), null ); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java index 6b99c23db48cb..98e563b377ba6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java @@ -36,9 +36,9 @@ import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.AuthenticationUtil; +import org.apache.pulsar.client.impl.auth.httpclient.AuthenticationHttpClientConfig; +import org.apache.pulsar.client.impl.auth.httpclient.AuthenticationHttpClientFactory; import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; -import org.apache.pulsar.client.impl.http.AuthenticationHttpClientConfig; -import org.apache.pulsar.client.impl.http.AuthenticationHttpClientFactory; /** * Pulsar client authentication provider based on OAuth 2.0. @@ -101,7 +101,7 @@ public void configure(String encodedAuthParamString) { } private void initializeFlow(AuthenticationInitContext context) { - AuthenticationHttpClientConfig config = buildHttpConfig(params, context); + AuthenticationHttpClientConfig config = buildHttpConfig(params); AuthenticationHttpClientFactory httpClientFactory = new AuthenticationHttpClientFactory(config, context); this.flow = ClientCredentialsFlow.fromParameters( @@ -159,8 +159,7 @@ static int parseParameterDuration(Map params, String name, Durat private AuthenticationHttpClientConfig buildHttpConfig( - Map params, - AuthenticationInitContext context) { + Map params) { int connectTimeout = parseParameterDuration(params, CONFIG_PARAM_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT); int readTimeout = parseParameterDuration(params, CONFIG_PARAM_READ_TIMEOUT, DEFAULT_READ_TIMEOUT); String trustCertsFilePath = params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java deleted file mode 100644 index e2f83872064dc..0000000000000 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/http/AuthenticationHttpClientConfig.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.impl.http; - - -import lombok.Builder; -import lombok.Getter; - -@Getter -public class AuthenticationHttpClientConfig { - private int readTimeout = 30000; - private int connectTimeout = 10000; - private String trustCertsFilePath; - - @Builder(builderClassName = "ConfigBuilder") - public AuthenticationHttpClientConfig(int readTimeout, int connectTimeout, String trustCertsFilePath) { - this.readTimeout = readTimeout > 0 ? readTimeout : this.readTimeout; - this.connectTimeout = connectTimeout > 0 ? connectTimeout : this.connectTimeout; - this.trustCertsFilePath = trustCertsFilePath; - } - - public AuthenticationHttpClientConfig() { - } -} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AuthenticationInitContextImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AuthenticationInitContextImplTest.java new file mode 100644 index 0000000000000..8b1f2289d4987 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AuthenticationInitContextImplTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotSame; +import static org.testng.Assert.assertTrue; +import io.netty.channel.EventLoopGroup; +import io.netty.resolver.NameResolver; +import io.netty.util.Timer; +import java.net.InetAddress; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +@Slf4j +public class AuthenticationInitContextImplTest { + @Test + public void testConstructorWithSharedResources() { + EventLoopGroup eventLoopGroup = Mockito.mock(EventLoopGroup.class); + Timer timer = Mockito.mock(Timer.class); + NameResolver nameResolver = Mockito.mock(NameResolver.class); + + AuthenticationInitContextImpl context = new AuthenticationInitContextImpl( + eventLoopGroup, timer, nameResolver); + + // Then + Optional retrievedEventLoop = context.getService(EventLoopGroup.class); + Optional retrievedTimer = context.getService(Timer.class); + Optional retrievedNameResolver = context.getService(NameResolver.class); + + assertTrue(retrievedEventLoop.isPresent()); + assertTrue(retrievedTimer.isPresent()); + assertTrue(retrievedNameResolver.isPresent()); + assertEquals(retrievedEventLoop.get(), eventLoopGroup); + assertEquals(retrievedTimer.get(), timer); + assertEquals(retrievedNameResolver.get(), nameResolver); + } + + @Test + public void testConstructorWithNullResources() { + AuthenticationInitContextImpl context = new AuthenticationInitContextImpl( + null, null, null); + + Optional eventLoop = context.getService(EventLoopGroup.class); + Optional timer = context.getService(Timer.class); + Optional nameResolver = context.getService(NameResolver.class); + + assertFalse(eventLoop.isPresent()); + assertFalse(timer.isPresent()); + assertFalse(nameResolver.isPresent()); + } + + @Test + public void testAddAndGetService() { + AuthenticationInitContextImpl context = new AuthenticationInitContextImpl(null, null, null); + String testService = "Test Service"; + + context.addService(String.class, testService); + + Optional retrieved = context.getService(String.class); + assertTrue(retrieved.isPresent()); + assertEquals(retrieved.get(), testService); + } + + @Test + public void testAddAndGetNamedService() { + AuthenticationInitContextImpl context = new AuthenticationInitContextImpl(null, null, null); + String namedService1 = "Named Service 1"; + String namedService2 = "Named Service 2"; + + context.addService(String.class, "name1", namedService1); + context.addService(String.class, "name2", namedService2); + + Optional retrieved1 = context.getServiceByName(String.class, "name1"); + Optional retrieved2 = context.getServiceByName(String.class, "name2"); + Optional notFound = context.getServiceByName(String.class, "nonexistent"); + + assertTrue(retrieved1.isPresent()); + assertEquals(retrieved1.get(), namedService1); + assertTrue(retrieved2.isPresent()); + assertEquals(retrieved2.get(), namedService2); + assertFalse(notFound.isPresent()); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testGetServiceWithNullClass() { + AuthenticationInitContextImpl context = new AuthenticationInitContextImpl(null, null, null); + + context.getService(null); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testGetServiceByNameWithNullName() { + AuthenticationInitContextImpl context = new AuthenticationInitContextImpl(null, null, null); + + context.getServiceByName(String.class, null); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testGetServiceByNameWithEmptyName() { + AuthenticationInitContextImpl context = new AuthenticationInitContextImpl(null, null, null); + + context.getServiceByName(String.class, ""); + } + + @Test + public void testNamedServicesAreIndependent() { + AuthenticationInitContextImpl context = new AuthenticationInitContextImpl(null, null, null); + Object service1 = new Object(); + Object service2 = new Object(); + + context.addService(Object.class, "group1", service1); + context.addService(Object.class, "group2", service2); + + Optional fromGroup1 = context.getServiceByName(Object.class, "group1"); + Optional fromGroup2 = context.getServiceByName(Object.class, "group2"); + + assertTrue(fromGroup1.isPresent()); + assertTrue(fromGroup2.isPresent()); + assertNotSame(fromGroup1.get(), fromGroup2.get()); + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/http/AuthenticationHttpClientFactoryTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/http/AuthenticationHttpClientFactoryTest.java new file mode 100644 index 0000000000000..1da961dddbfbf --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/http/AuthenticationHttpClientFactoryTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.http; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import io.netty.resolver.NameResolver; +import io.netty.util.Timer; +import java.io.IOException; +import java.net.InetAddress; +import java.util.Optional; +import org.apache.pulsar.client.api.AuthenticationInitContext; +import org.apache.pulsar.client.impl.auth.httpclient.AuthenticationHttpClientConfig; +import org.apache.pulsar.client.impl.auth.httpclient.AuthenticationHttpClientFactory; +import org.asynchttpclient.AsyncHttpClient; +import org.mockito.Mockito; +import org.testng.annotations.Test; + + +public class AuthenticationHttpClientFactoryTest { + + @Test + public void testCreateHttpClientWithDefaultConfig() throws IOException { + AuthenticationHttpClientConfig config = AuthenticationHttpClientConfig.builder().build(); + AuthenticationHttpClientFactory factory = new AuthenticationHttpClientFactory(config, null); + + AsyncHttpClient httpClient = factory.createHttpClient(); + + assertNotNull(httpClient); + httpClient.close(); + } + + @Test + public void testGetNameResolverWithContext() { + AuthenticationHttpClientConfig config = AuthenticationHttpClientConfig.builder().build(); + AuthenticationInitContext context = Mockito.mock(AuthenticationInitContext.class); + NameResolver mockResolver = Mockito.mock(NameResolver.class); + + Mockito.when(context.getService(NameResolver.class)) + .thenReturn(Optional.of(mockResolver)); + + AuthenticationHttpClientFactory factory = new AuthenticationHttpClientFactory(config, context); + + NameResolver resolver = factory.getNameResolver(); + + assertNotNull(resolver); + assertEquals(resolver, mockResolver); + } + + @Test + public void testGetNameResolverWithoutContext() { + AuthenticationHttpClientConfig config = AuthenticationHttpClientConfig.builder().build(); + AuthenticationHttpClientFactory factory = new AuthenticationHttpClientFactory(config, null); + + NameResolver resolver = factory.getNameResolver(); + + assertNull(resolver); + } + + @Test + public void testGetNameResolverWithContextButNoResolver() { + AuthenticationHttpClientConfig config = AuthenticationHttpClientConfig.builder().build(); + AuthenticationInitContext context = Mockito.mock(AuthenticationInitContext.class); + + Mockito.when(context.getService(NameResolver.class)) + .thenReturn(Optional.empty()); + + AuthenticationHttpClientFactory factory = new AuthenticationHttpClientFactory(config, context); + + NameResolver resolver = factory.getNameResolver(); + + assertNull(resolver); + } + + @Test + public void testHttpClientUsesSharedResources() throws Exception { + AuthenticationHttpClientConfig config = AuthenticationHttpClientConfig.builder().build(); + AuthenticationInitContext context = Mockito.mock(AuthenticationInitContext.class); + Timer mockTimer = Mockito.mock(Timer.class); + + Mockito.when(context.getService(Timer.class)) + .thenReturn(Optional.of(mockTimer)); + + AuthenticationHttpClientFactory factory = new AuthenticationHttpClientFactory(config, context); + + AsyncHttpClient httpClient = factory.createHttpClient(); + assertNotNull(httpClient); + assertEquals(httpClient.getConfig().getNettyTimer(), mockTimer); + httpClient.close(); + mockTimer.stop(); + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java index aef69be74e120..0d0d7057e9136 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import com.fasterxml.jackson.databind.ObjectMapper; import java.net.MalformedURLException; @@ -57,6 +58,8 @@ public void before() { this.auth = new AuthenticationOAuth2(flow, this.clock); } + + @Test public void testGetAuthMethodName() { assertEquals(this.auth.getAuthMethodName(), "token"); From 278609d1d3be19061c2661524cdb084e9686a3bf Mon Sep 17 00:00:00 2001 From: 3pacccccc Date: Sun, 14 Dec 2025 02:20:02 +0800 Subject: [PATCH 08/10] update --- .../pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java index 0d0d7057e9136..db0d1c71a8b61 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import com.fasterxml.jackson.databind.ObjectMapper; import java.net.MalformedURLException; From fd1e8da51831ea9143cb315337d7e82860ae4870 Mon Sep 17 00:00:00 2001 From: 3pacccccc Date: Sun, 14 Dec 2025 15:57:26 +0800 Subject: [PATCH 09/10] add tests --- .../auth/oauth2/AuthenticationOAuth2Test.java | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java index db0d1c71a8b61..8ad842566aa24 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.AuthenticationInitContext; import org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver; import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; import org.testng.annotations.BeforeMethod; @@ -58,7 +59,6 @@ public void before() { } - @Test public void testGetAuthMethodName() { assertEquals(this.auth.getAuthMethodName(), "token"); @@ -103,6 +103,7 @@ public void testConfigureWithoutOptionalParams() throws Exception { String authParams = mapper.writeValueAsString(params); this.auth.configure(authParams); assertNotNull(this.auth.flow); + this.auth.flow.initialize(); } @Test @@ -143,4 +144,41 @@ public void testClose() throws Exception { this.auth.close(); verify(this.flow).close(); } + + @Test + public void testStartWithAuthenticationInitContext() throws Exception { + // Given: Mock AuthenticationInitContext + AuthenticationInitContext initContext = mock(AuthenticationInitContext.class); + + // Configure with required parameters + Map params = new HashMap<>(); + params.put("type", "client_credentials"); + params.put("privateKey", "data:base64,e30="); + params.put("issuerUrl", "http://localhost"); + ObjectMapper mapper = new ObjectMapper(); + String authParams = mapper.writeValueAsString(params); + auth.configure(authParams); + + // When: Call start with context + auth.start(initContext); + + // Then: Verify flow is initialized + assertNotNull(auth.flow); + } + + @Test + public void testStartWithContextOnlyInitializesOnce() throws Exception { + // Given: Mock AuthenticationInitContext + AuthenticationInitContext initContext = mock(AuthenticationInitContext.class); + + // Create a new auth instance with pre-existing flow + Flow existingFlow = mock(Flow.class); + AuthenticationOAuth2 auth = new AuthenticationOAuth2(existingFlow, this.clock); + + // When: Call start with context (flow already exists) + auth.start(initContext); + + // Then: Initialize should still be called + verify(existingFlow).initialize(); + } } From 21366dd58cdd6050706319569cbc62e0cc51e606 Mon Sep 17 00:00:00 2001 From: 3pacccccc Date: Sun, 14 Dec 2025 16:47:00 +0800 Subject: [PATCH 10/10] update --- .../apache/pulsar/client/api/AuthenticationInitContext.java | 2 +- .../pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java | 3 +-- .../pulsar/client/impl/AuthenticationInitContextImplTest.java | 1 - .../client/impl/auth/oauth2/AuthenticationOAuth2Test.java | 1 - 4 files changed, 2 insertions(+), 5 deletions(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationInitContext.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationInitContext.java index 83ab1aa24445c..edd8dd95b4bd1 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationInitContext.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationInitContext.java @@ -25,7 +25,7 @@ * Authentication initialization context that provides access to shared services and resources * during the authentication provider initialization phase. * - *

This context enables authentication implementations to utilize shared resources such as + *

This context enables authentication implementations to use shared resources such as * thread pools, DNS resolvers, and timers that are managed by the Pulsar client, rather than * creating their own instances. This improves resource utilization and performance, especially * when multiple client instances or authentication providers are used within the same application. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java index 98e563b377ba6..528d14b3c3860 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java @@ -44,8 +44,7 @@ * Pulsar client authentication provider based on OAuth 2.0. */ @Slf4j -public class AuthenticationOAuth2 implements Authentication, - EncodedAuthenticationParameterSupport { +public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticationParameterSupport { public static final String CONFIG_PARAM_TYPE = "type"; public static final String TYPE_CLIENT_CREDENTIALS = "client_credentials"; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AuthenticationInitContextImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AuthenticationInitContextImplTest.java index 8b1f2289d4987..64acf78a99bfa 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AuthenticationInitContextImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AuthenticationInitContextImplTest.java @@ -42,7 +42,6 @@ public void testConstructorWithSharedResources() { AuthenticationInitContextImpl context = new AuthenticationInitContextImpl( eventLoopGroup, timer, nameResolver); - // Then Optional retrievedEventLoop = context.getService(EventLoopGroup.class); Optional retrievedTimer = context.getService(Timer.class); Optional retrievedNameResolver = context.getService(NameResolver.class); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java index 8ad842566aa24..026d40766b4cb 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java @@ -103,7 +103,6 @@ public void testConfigureWithoutOptionalParams() throws Exception { String authParams = mapper.writeValueAsString(params); this.auth.configure(authParams); assertNotNull(this.auth.flow); - this.auth.flow.initialize(); } @Test