diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java index 48d9e3e230701..b60e7e7761fc5 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java @@ -82,6 +82,13 @@ default AuthenticationDataProvider getAuthData(String brokerHostName) throws Pul */ void start() throws PulsarClientException; + /** + * Initialize the authentication provider with {@link AuthenticationInitContext}. + */ + default void start(AuthenticationInitContext context) throws PulsarClientException { + start(); + } + /** * An authentication Stage. * when authentication complete, passed-in authFuture will contains authentication related http request headers. diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationInitContext.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationInitContext.java new file mode 100644 index 0000000000000..edd8dd95b4bd1 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationInitContext.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + + +import java.util.Optional; + +/** + * Authentication initialization context that provides access to shared services and resources + * during the authentication provider initialization phase. + * + *

This context enables authentication implementations to use shared resources such as + * thread pools, DNS resolvers, and timers that are managed by the Pulsar client, rather than + * creating their own instances. This improves resource utilization and performance, especially + * when multiple client instances or authentication providers are used within the same application. + * + *

Authentication providers should prefer using shared resources when available to minimize + * system resource consumption and improve performance through better resource reuse. + * + * @see Authentication + */ +public interface AuthenticationInitContext { + /** + * Retrieves a shared service instance by its class type. + * + *

This method looks up a globally registered service which is shared among + * all authentication providers. + * + *

Example: + *

{@code
+     * Optional eventLoop = context.getService(EventLoopGroup.class);
+     * if (eventLoop.isPresent()) {
+     *     // Use the shared event loop group
+     *     this.eventLoopGroup = eventLoop.get();
+     * } else {
+     *     // Fallback to creating a new instance
+     *     this.eventLoopGroup = new NioEventLoopGroup();
+     * }
+     * }
+ * + * @param The type of service to retrieve + * @param serviceClass The class of the service to retrieve + * @return An {@link Optional} containing the service instance if available, + * or {@link Optional#empty()} if no such service is registered + */ + Optional getService(Class serviceClass); + + /** + * Retrieves a named shared service instance by its class type and name. + * + *

This method allows lookup of services that are registered under a specific + * name, enabling multiple instances of the same service type to be distinguished. + * This is useful for: + *

    + *
  • Specialized DNS resolvers (e.g., "secure-dns", "internal-dns") + *
  • Different thread pools for various purposes + *
  • Multiple timer instances with different configurations + *
  • HTTP clients with different proxy configurations + *
+ * + *

Example: + *

{@code
+     * // Get a DNS resolver configured for internal network resolution
+     * Optional internalResolver =
+     *     context.getServiceByName(NameResolver.class, "internal-network");
+     *
+     * // Get a different DNS resolver for external resolution
+     * Optional externalResolver =
+     *     context.getServiceByName(NameResolver.class, "external-network");
+     * }
+ * + * @param The type of service to retrieve + * @param serviceClass The class of the service to retrieve. Cannot be null. + * @param name The name under which the service is registered. Cannot be null or empty. + * @return An {@link Optional} containing the named service instance if available, + * or {@link Optional#empty()} if no such service is registered with the given name + */ + Optional getServiceByName(Class serviceClass, String name); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AuthenticationInitContextImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AuthenticationInitContextImpl.java new file mode 100644 index 0000000000000..3f6d782f0035f --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AuthenticationInitContextImpl.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import io.netty.channel.EventLoopGroup; +import io.netty.resolver.NameResolver; +import io.netty.util.Timer; +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.pulsar.client.api.AuthenticationInitContext; + +public class AuthenticationInitContextImpl implements AuthenticationInitContext { + private final Map, Object> sharedServices = new HashMap<>(); + private final Map, Object>> namedServices = new HashMap<>(); + + public AuthenticationInitContextImpl(EventLoopGroup eventLoopGroup, + Timer timer, + NameResolver nameResolver) { + this.sharedServices.put(EventLoopGroup.class, eventLoopGroup); + this.sharedServices.put(Timer.class, timer); + this.sharedServices.put(NameResolver.class, nameResolver); + } + + @Override + @SuppressWarnings("unchecked") + public Optional getService(Class serviceClass) { + if (serviceClass == null) { + throw new IllegalArgumentException("Service class cannot be null"); + } + return Optional.ofNullable((T) sharedServices.get(serviceClass)); + } + + @Override + @SuppressWarnings("unchecked") + public Optional getServiceByName(Class serviceClass, String name) { + if (name == null || name.isEmpty()) { + throw new IllegalArgumentException("Service name cannot be null or empty"); + } + Map, Object> services = namedServices.get(name); + if (services != null) { + return Optional.ofNullable((T) services.get(serviceClass)); + } + return Optional.empty(); + } + + public void addService(Class serviceClass, T instance) { + sharedServices.put(serviceClass, instance); + } + + public void addService(Class serviceClass, String name, T instance) { + namedServices.computeIfAbsent(name, k -> new HashMap<>()) + .put(serviceClass, instance); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 3a2ff97f51ecc..a9e710a9b8c51 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -228,7 +228,6 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr this.eventLoopGroup = eventLoopGroupReference; this.instrumentProvider = new InstrumentProvider(conf.getOpenTelemetry()); clientClock = conf.getClock(); - conf.getAuthentication().start(); this.scheduledExecutorProvider = scheduledExecutorProvider != null ? scheduledExecutorProvider : PulsarClientResourcesConfigurer.createScheduledExecutorProvider(conf); if (connectionPool != null) { @@ -270,6 +269,7 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr } else { this.timer = timer; } + conf.getAuthentication().start(buildAuthenticationInitContext()); lookup = createLookup(conf.getServiceUrl()); if (conf.getServiceUrlProvider() != null) { @@ -306,6 +306,13 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr throw t; } } + private AuthenticationInitContextImpl buildAuthenticationInitContext() { + return new AuthenticationInitContextImpl( + createdEventLoopGroup ? null : eventLoopGroup, + needStopTimer ? null : timer, + addressResolver == null ? null : DnsResolverUtil.adaptToNameResolver(addressResolver) + ); + } private void reduceConsumerReceiverQueueSize() { for (ConsumerBase consumer : consumers) { @@ -1086,7 +1093,7 @@ public void updateAuthentication(Authentication authentication) throws IOExcepti conf.getAuthentication().close(); } conf.setAuthentication(authentication); - conf.getAuthentication().start(); + conf.getAuthentication().start(buildAuthenticationInitContext()); } public void updateTlsTrustCertsFilePath(String tlsTrustCertsFilePath) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/AuthenticationHttpClientConfig.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/AuthenticationHttpClientConfig.java new file mode 100644 index 0000000000000..4e7981fc29348 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/AuthenticationHttpClientConfig.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.httpclient; + + +import lombok.Builder; +import lombok.Getter; + +/** + * Configuration for HTTP clients used in authentication providers. + * + *

This class encapsulates the configuration parameters needed to create HTTP clients + * for authentication-related HTTP requests, such as OAuth2 token exchange requests. + * + *

Configuration Parameters

+ *
    + *
  • readTimeout: Maximum time to wait for a response from the server + * (default: 30000 ms) + *
  • connectTimeout: Maximum time to establish a connection + * (default: 10000 ms) + *
  • trustCertsFilePath: Path to a custom trust certificate file + * for TLS/SSL connections + *
+ * + *

Usage

+ *
{@code
+ * // Using the builder pattern
+ * AuthenticationHttpClientConfig config = AuthenticationHttpClientConfig.builder()
+ *     .readTimeout(30000)
+ *     .connectTimeout(10000)
+ *     .trustCertsFilePath("/path/to/certs.pem")
+ *     .build();
+ *
+ * // Using the factory
+ * AuthenticationHttpClientFactory factory = new AuthenticationHttpClientFactory(config, context);
+ * AsyncHttpClient httpClient = factory.createHttpClient();
+ * }
+ * + *

This configuration is primarily used by {@link AuthenticationHttpClientFactory} to + * create properly configured HTTP clients for authentication providers. + * + * @see AuthenticationHttpClientFactory + * @see org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 + */ +@Getter +public class AuthenticationHttpClientConfig { + private int readTimeout = 30000; + private int connectTimeout = 10000; + private final String trustCertsFilePath; + + @Builder(builderClassName = "ConfigBuilder") + public AuthenticationHttpClientConfig(int readTimeout, int connectTimeout, String trustCertsFilePath) { + this.readTimeout = readTimeout > 0 ? readTimeout : this.readTimeout; + this.connectTimeout = connectTimeout > 0 ? connectTimeout : this.connectTimeout; + this.trustCertsFilePath = trustCertsFilePath; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/AuthenticationHttpClientFactory.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/AuthenticationHttpClientFactory.java new file mode 100644 index 0000000000000..b3d02c9e6ef6c --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/AuthenticationHttpClientFactory.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.httpclient; + +import io.netty.channel.EventLoopGroup; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.resolver.NameResolver; +import io.netty.util.Timer; +import java.io.File; +import java.net.InetAddress; +import java.util.Optional; +import javax.net.ssl.SSLException; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.client.api.AuthenticationInitContext; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; + +/** + * Factory for creating HTTP clients used by authentication providers. + * + *

This factory creates {@link AsyncHttpClient} instances that are optimized for + * authentication-related HTTP requests. It supports: + *

    + *
  • Reusing shared resources from {@link AuthenticationInitContext} + *
  • Configurable timeouts for connections and reads + *
  • Custom SSL/TLS trust certificates + *
  • DNS resolver configuration + *
+ * + *

Resource Sharing

+ *

When a {@link AuthenticationInitContext} is provided, the factory will attempt to + * reuse shared resources: + *

    + *
  • {@link EventLoopGroup}: For I/O operations + *
  • {@link Timer}: For scheduling timeouts + *
  • {@link NameResolver}: For DNS resolution + *
+ * + *

Usage Example

+ *
{@code
+ * AuthenticationHttpClientConfig config = AuthenticationHttpClientConfig.builder()
+ *     .readTimeout(30000)
+ *     .connectTimeout(10000)
+ *     .build();
+ *
+ * AuthenticationHttpClientFactory factory =
+ *     new AuthenticationHttpClientFactory(config, authenticationContext);
+ *
+ * AsyncHttpClient httpClient = factory.createHttpClient();
+ * NameResolver nameResolver = factory.getNameResolver();
+ * }
+ * + * @see AuthenticationHttpClientConfig + * @see AuthenticationInitContext + */ +@Slf4j +public class AuthenticationHttpClientFactory { + + private final AuthenticationHttpClientConfig config; + private final AuthenticationInitContext context; + + public AuthenticationHttpClientFactory(AuthenticationHttpClientConfig config, + AuthenticationInitContext context) { + this.config = config; + this.context = context; + } + + public AsyncHttpClient createHttpClient() { + DefaultAsyncHttpClientConfig.Builder confBuilder = buildBaseConfig(); + return new DefaultAsyncHttpClient(confBuilder.build()); + } + + @SuppressWarnings("unchecked") + public NameResolver getNameResolver() { + return Optional.ofNullable(context) + .flatMap(ctx -> ctx.getService(NameResolver.class)) + .orElse(null); + } + + private DefaultAsyncHttpClientConfig.Builder buildBaseConfig() { + DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); + + if (context != null) { + context.getService(Timer.class).ifPresent(confBuilder::setNettyTimer); + context.getService(EventLoopGroup.class).ifPresent(confBuilder::setEventLoopGroup); + } + + confBuilder.setCookieStore(null); + confBuilder.setUseProxyProperties(true); + confBuilder.setFollowRedirect(true); + confBuilder.setConnectTimeout(config.getConnectTimeout()); + confBuilder.setReadTimeout(config.getReadTimeout()); + confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); + + if (StringUtils.isNotBlank(config.getTrustCertsFilePath())) { + try { + confBuilder.setSslContext(SslContextBuilder.forClient() + .trustManager(new File(config.getTrustCertsFilePath())) + .build()); + } catch (SSLException e) { + log.error("Could not set trustCertsFilePath", e); + } + } + + return confBuilder; + } +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/package-info.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/package-info.java new file mode 100644 index 0000000000000..d5a1142bc5e82 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/httpclient/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Pulsar Authentication http client. + */ +package org.apache.pulsar.client.impl.auth.httpclient; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java index 033d5308a2a96..b779c4a7cfce8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java @@ -18,10 +18,15 @@ */ package org.apache.pulsar.client.impl.auth.oauth2; +import io.netty.resolver.NameResolver; +import java.net.InetAddress; import java.net.URL; import java.time.Clock; import java.time.Duration; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.impl.auth.httpclient.AuthenticationHttpClientConfig; +import org.apache.pulsar.client.impl.auth.httpclient.AuthenticationHttpClientFactory; +import org.asynchttpclient.AsyncHttpClient; /** * Factory class that allows to create {@link Authentication} instances @@ -78,6 +83,8 @@ public static class ClientCredentialsBuilder { private Duration connectTimeout; private Duration readTimeout; private String trustCertsFilePath; + private AsyncHttpClient httpClient; + private NameResolver nameResolver; private ClientCredentialsBuilder() { } @@ -163,25 +170,76 @@ public ClientCredentialsBuilder trustCertsFilePath(String trustCertsFilePath) { return this; } + /** + * Optional custom HTTP client. + * + * @param httpClient the HTTP client + * @return the builder + */ + public ClientCredentialsBuilder httpClient(AsyncHttpClient httpClient) { + this.httpClient = httpClient; + return this; + } + + /** + * Optional custom name resolver. + * + * @param nameResolver the name resolver + * @return the builder + */ + public ClientCredentialsBuilder nameResolver(NameResolver nameResolver) { + this.nameResolver = nameResolver; + return this; + } + /** * Authenticate with client credentials. * * @return an Authentication object */ public Authentication build() { + AsyncHttpClient finalHttpClient = this.httpClient; + NameResolver finalNameResolver = this.nameResolver; + + if (finalHttpClient == null || finalNameResolver == null) { + AuthenticationHttpClientConfig.ConfigBuilder configBuilder = + AuthenticationHttpClientConfig.builder(); + + if (connectTimeout != null) { + configBuilder.connectTimeout((int) connectTimeout.toMillis()); + } + + if (readTimeout != null) { + configBuilder.readTimeout((int) readTimeout.toMillis()); + } + + if (trustCertsFilePath != null) { + configBuilder.trustCertsFilePath(trustCertsFilePath); + } + + AuthenticationHttpClientFactory clientFactory = new AuthenticationHttpClientFactory( + configBuilder.build(), + null + ); + + if (finalHttpClient == null) { + finalHttpClient = clientFactory.createHttpClient(); + } + + if (finalNameResolver == null) { + finalNameResolver = clientFactory.getNameResolver(); + } + } ClientCredentialsFlow flow = ClientCredentialsFlow.builder() .issuerUrl(issuerUrl) .privateKey(credentialsUrl == null ? null : credentialsUrl.toExternalForm()) .audience(audience) .scope(scope) - .connectTimeout(connectTimeout) - .readTimeout(readTimeout) - .trustCertsFilePath(trustCertsFilePath) + .httpClient(finalHttpClient) + .nameResolver(finalNameResolver) .build(); return new AuthenticationOAuth2(flow, Clock.systemDefaultZone()); } } - - -} +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java index 694e4681d97f2..528d14b3c3860 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java @@ -19,8 +19,12 @@ package org.apache.pulsar.client.impl.auth.oauth2; import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; import java.time.Clock; +import java.time.Duration; import java.time.Instant; +import java.time.format.DateTimeParseException; import java.util.Map; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -28,9 +32,12 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.AuthenticationInitContext; import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.AuthenticationUtil; +import org.apache.pulsar.client.impl.auth.httpclient.AuthenticationHttpClientConfig; +import org.apache.pulsar.client.impl.auth.httpclient.AuthenticationHttpClientFactory; import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; /** @@ -42,12 +49,23 @@ public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticati public static final String CONFIG_PARAM_TYPE = "type"; public static final String TYPE_CLIENT_CREDENTIALS = "client_credentials"; public static final String AUTH_METHOD_NAME = "token"; + public static final String CONFIG_PARAM_CONNECT_TIMEOUT = "connectTimeout"; + public static final String CONFIG_PARAM_READ_TIMEOUT = "readTimeout"; + public static final String CONFIG_PARAM_TRUST_CERTS_FILE_PATH = "trustCertsFilePath"; + public static final String CONFIG_PARAM_ISSUER_URL = "issuerUrl"; + public static final String CONFIG_PARAM_KEY_FILE = "privateKey"; + + protected static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(10); + protected static final Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(30); public static final double EXPIRY_ADJUSTMENT = 0.9; private static final long serialVersionUID = 1L; final Clock clock; Flow flow; transient CachedToken cachedToken; + private Map params; + private URL issuerUrl; + private String privateKeyUrl; public AuthenticationOAuth2() { this.clock = Clock.systemDefaultZone(); @@ -68,21 +86,83 @@ public void configure(String encodedAuthParamString) { if (StringUtils.isBlank(encodedAuthParamString)) { throw new IllegalArgumentException("No authentication parameters were provided"); } - Map params; try { - params = AuthenticationUtil.configureFromJsonString(encodedAuthParamString); + this.params = AuthenticationUtil.configureFromJsonString(encodedAuthParamString); } catch (IOException e) { throw new IllegalArgumentException("Malformed authentication parameters", e); } + String type = this.params.getOrDefault(CONFIG_PARAM_TYPE, TYPE_CLIENT_CREDENTIALS); + if (!type.equals(TYPE_CLIENT_CREDENTIALS)) { + throw new IllegalArgumentException("Unsupported authentication type: " + type); + } + this.issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL); + this.privateKeyUrl = parseParameterString(params, CONFIG_PARAM_KEY_FILE); + } + + private void initializeFlow(AuthenticationInitContext context) { + AuthenticationHttpClientConfig config = buildHttpConfig(params); + AuthenticationHttpClientFactory httpClientFactory = + new AuthenticationHttpClientFactory(config, context); + this.flow = ClientCredentialsFlow.fromParameters( + params, httpClientFactory.getNameResolver(), + httpClientFactory.createHttpClient(), issuerUrl, privateKeyUrl); + } + + static String parseParameterString(Map params, String name) { + String s = params.get(name); + if (StringUtils.isEmpty(s)) { + throw new IllegalArgumentException("Required configuration parameter: " + name); + } + return s; + } + + + static URL parseParameterUrl(Map params, String name) { + String s = params.get(name); + if (StringUtils.isEmpty(s)) { + throw new IllegalArgumentException("Required configuration parameter: " + name); + } + try { + return new URL(s); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Malformed configuration parameter: " + name); + } + } - String type = params.getOrDefault(CONFIG_PARAM_TYPE, TYPE_CLIENT_CREDENTIALS); - switch(type) { - case TYPE_CLIENT_CREDENTIALS: - this.flow = ClientCredentialsFlow.fromParameters(params); - break; - default: - throw new IllegalArgumentException("Unsupported authentication type: " + type); + + private static int getParameterDurationToMillis(String name, Duration value, Duration defaultValue) { + Duration duration; + if (value == null) { + log.info("Configuration for [{}] is using the default value: [{}]", name, defaultValue); + duration = defaultValue; + } else { + log.info("Configuration for [{}] is: [{}]", name, value); + duration = value; } + + return (int) duration.toMillis(); + } + + + static int parseParameterDuration(Map params, String name, Duration defaultValue) { + String value = params.get(name); + if (StringUtils.isNotBlank(value)) { + try { + return getParameterDurationToMillis(name, Duration.parse(value), defaultValue); + } catch (DateTimeParseException e) { + throw new IllegalArgumentException("Malformed configuration parameter: " + name, e); + } + } + return (int) defaultValue.toMillis(); + } + + + private AuthenticationHttpClientConfig buildHttpConfig( + Map params) { + int connectTimeout = parseParameterDuration(params, CONFIG_PARAM_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT); + int readTimeout = parseParameterDuration(params, CONFIG_PARAM_READ_TIMEOUT, DEFAULT_READ_TIMEOUT); + String trustCertsFilePath = params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH); + return new AuthenticationHttpClientConfig(readTimeout, connectTimeout, trustCertsFilePath); } @Override @@ -93,6 +173,14 @@ public void configure(Map authParams) { @Override public void start() throws PulsarClientException { + start(null); + } + + @Override + public void start(AuthenticationInitContext context) throws PulsarClientException { + if (flow == null) { + initializeFlow(context); + } flow.initialize(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java index 7f64c0b18ac73..df15881b0c3cf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java @@ -18,15 +18,16 @@ */ package org.apache.pulsar.client.impl.auth.oauth2; +import io.netty.resolver.NameResolver; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; +import java.net.InetAddress; import java.net.URISyntaxException; import java.net.URL; import java.net.URLConnection; import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.Map; import lombok.Builder; import lombok.extern.slf4j.Slf4j; @@ -37,6 +38,7 @@ import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenClient; import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenExchangeException; import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; +import org.asynchttpclient.AsyncHttpClient; /** * Implementation of OAuth 2.0 Client Credentials flow. @@ -62,8 +64,9 @@ class ClientCredentialsFlow extends FlowBase { @Builder public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, String scope, - Duration connectTimeout, Duration readTimeout, String trustCertsFilePath) { - super(issuerUrl, connectTimeout, readTimeout, trustCertsFilePath); + AsyncHttpClient httpClient, NameResolver nameResolver + ) { + super(issuerUrl, httpClient, nameResolver); this.audience = audience; this.privateKey = privateKey; this.scope = scope; @@ -75,24 +78,21 @@ public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey, * @param params * @return */ - public static ClientCredentialsFlow fromParameters(Map params) { - URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL); - String privateKeyUrl = parseParameterString(params, CONFIG_PARAM_KEY_FILE); + public static ClientCredentialsFlow fromParameters(Map params, + NameResolver nameResolver, + AsyncHttpClient httpClient, + URL issuerUrl, + String privateKeyUrl) { // These are optional parameters, so we only perform a get String scope = params.get(CONFIG_PARAM_SCOPE); String audience = params.get(CONFIG_PARAM_AUDIENCE); - Duration connectTimeout = parseParameterDuration(params, CONFIG_PARAM_CONNECT_TIMEOUT); - Duration readTimeout = parseParameterDuration(params, CONFIG_PARAM_READ_TIMEOUT); - String trustCertsFilePath = params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH); - return ClientCredentialsFlow.builder() .issuerUrl(issuerUrl) .audience(audience) .privateKey(privateKeyUrl) + .httpClient(httpClient) + .nameResolver(nameResolver) .scope(scope) - .connectTimeout(connectTimeout) - .readTimeout(readTimeout) - .trustCertsFilePath(trustCertsFilePath) .build(); } @@ -133,7 +133,7 @@ public void initialize() throws PulsarClientException { assert this.metadata != null; URL tokenUrl = this.metadata.getTokenEndpoint(); - this.exchanger = new TokenClient(tokenUrl, httpClient); + this.exchanger = new TokenClient(tokenUrl, httpClient, this.nameResolver); initialized = true; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java index 6cc9f8e41b5e4..49e31f1d63f46 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java @@ -18,25 +18,16 @@ */ package org.apache.pulsar.client.impl.auth.oauth2; -import io.netty.handler.ssl.SslContextBuilder; -import java.io.File; +import io.netty.resolver.NameResolver; import java.io.IOException; -import java.net.MalformedURLException; +import java.net.InetAddress; import java.net.URL; -import java.time.Duration; -import java.time.format.DateTimeParseException; -import java.util.Map; -import javax.net.ssl.SSLException; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver; import org.apache.pulsar.client.impl.auth.oauth2.protocol.Metadata; import org.apache.pulsar.client.impl.auth.oauth2.protocol.MetadataResolver; import org.asynchttpclient.AsyncHttpClient; -import org.asynchttpclient.DefaultAsyncHttpClient; -import org.asynchttpclient.DefaultAsyncHttpClientConfig; /** * An abstract OAuth 2.0 authorization flow. @@ -44,13 +35,6 @@ @Slf4j abstract class FlowBase implements Flow { - public static final String CONFIG_PARAM_CONNECT_TIMEOUT = "connectTimeout"; - public static final String CONFIG_PARAM_READ_TIMEOUT = "readTimeout"; - public static final String CONFIG_PARAM_TRUST_CERTS_FILE_PATH = "trustCertsFilePath"; - - protected static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(10); - protected static final Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(30); - private static final long serialVersionUID = 1L; protected final URL issuerUrl; @@ -58,47 +42,14 @@ abstract class FlowBase implements Flow { protected transient Metadata metadata; - protected FlowBase(URL issuerUrl, Duration connectTimeout, Duration readTimeout, String trustCertsFilePath) { - this.issuerUrl = issuerUrl; - this.httpClient = defaultHttpClient(readTimeout, connectTimeout, trustCertsFilePath); - } + protected NameResolver nameResolver; - private AsyncHttpClient defaultHttpClient(Duration readTimeout, Duration connectTimeout, - String trustCertsFilePath) { - DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); - confBuilder.setCookieStore(null); - confBuilder.setUseProxyProperties(true); - confBuilder.setFollowRedirect(true); - confBuilder.setConnectTimeout( - getParameterDurationToMillis(CONFIG_PARAM_CONNECT_TIMEOUT, connectTimeout, - DEFAULT_CONNECT_TIMEOUT)); - confBuilder.setReadTimeout( - getParameterDurationToMillis(CONFIG_PARAM_READ_TIMEOUT, readTimeout, DEFAULT_READ_TIMEOUT)); - confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); - if (StringUtils.isNotBlank(trustCertsFilePath)) { - try { - confBuilder.setSslContext(SslContextBuilder.forClient() - .trustManager(new File(trustCertsFilePath)) - .build()); - } catch (SSLException e) { - log.error("Could not set " + CONFIG_PARAM_TRUST_CERTS_FILE_PATH, e); - } - } - return new DefaultAsyncHttpClient(confBuilder.build()); + protected FlowBase(URL issuerUrl, AsyncHttpClient httpClient, NameResolver nameResolver) { + this.issuerUrl = issuerUrl; + this.httpClient = httpClient; + this.nameResolver = nameResolver; } - private int getParameterDurationToMillis(String name, Duration value, Duration defaultValue) { - Duration duration; - if (value == null) { - log.info("Configuration for [{}] is using the default value: [{}]", name, defaultValue); - duration = defaultValue; - } else { - log.info("Configuration for [{}] is: [{}]", name, value); - duration = value; - } - - return (int) duration.toMillis(); - } public void initialize() throws PulsarClientException { try { @@ -110,39 +61,7 @@ public void initialize() throws PulsarClientException { } protected MetadataResolver createMetadataResolver() { - return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, httpClient); - } - - static String parseParameterString(Map params, String name) { - String s = params.get(name); - if (StringUtils.isEmpty(s)) { - throw new IllegalArgumentException("Required configuration parameter: " + name); - } - return s; - } - - static URL parseParameterUrl(Map params, String name) { - String s = params.get(name); - if (StringUtils.isEmpty(s)) { - throw new IllegalArgumentException("Required configuration parameter: " + name); - } - try { - return new URL(s); - } catch (MalformedURLException e) { - throw new IllegalArgumentException("Malformed configuration parameter: " + name); - } - } - - static Duration parseParameterDuration(Map params, String name) { - String value = params.get(name); - if (StringUtils.isNotBlank(value)) { - try { - return Duration.parse(value); - } catch (DateTimeParseException e) { - throw new IllegalArgumentException("Malformed configuration parameter: " + name, e); - } - } - return null; + return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, httpClient, nameResolver); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java index 19d0c1acadd15..a96656e037903 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java @@ -21,14 +21,17 @@ import com.fasterxml.jackson.databind.ObjectReader; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.resolver.NameResolver; import java.io.IOException; import java.io.InputStream; +import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; import java.util.concurrent.ExecutionException; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.BoundRequestBuilder; import org.asynchttpclient.Response; /** @@ -39,11 +42,14 @@ public class DefaultMetadataResolver implements MetadataResolver { private final URL metadataUrl; private final ObjectReader objectReader; private final AsyncHttpClient httpClient; + private final NameResolver nameResolver; - public DefaultMetadataResolver(URL metadataUrl, AsyncHttpClient httpClient) { + public DefaultMetadataResolver(URL metadataUrl, AsyncHttpClient httpClient, + NameResolver nameResolver) { this.metadataUrl = metadataUrl; this.objectReader = ObjectMapperFactory.getMapper().reader().forType(Metadata.class); this.httpClient = httpClient; + this.nameResolver = nameResolver; } /** @@ -52,8 +58,10 @@ public DefaultMetadataResolver(URL metadataUrl, AsyncHttpClient httpClient) { * @param issuerUrl The authorization server's issuer identifier * @return a resolver */ - public static DefaultMetadataResolver fromIssuerUrl(URL issuerUrl, AsyncHttpClient httpClient) { - return new DefaultMetadataResolver(getWellKnownMetadataUrl(issuerUrl), httpClient); + public static DefaultMetadataResolver fromIssuerUrl(URL issuerUrl, + AsyncHttpClient httpClient, + NameResolver nameResolver) { + return new DefaultMetadataResolver(getWellKnownMetadataUrl(issuerUrl), httpClient, nameResolver); } /** @@ -81,11 +89,12 @@ public static URL getWellKnownMetadataUrl(URL issuerUrl) { public Metadata resolve() throws IOException { try { - Response response = httpClient.prepareGet(metadataUrl.toString()) - .addHeader(HttpHeaderNames.ACCEPT, HttpHeaderValues.APPLICATION_JSON) - .execute() - .toCompletableFuture() - .get(); + BoundRequestBuilder requestBuilder = httpClient.prepareGet(metadataUrl.toString()) + .addHeader(HttpHeaderNames.ACCEPT, HttpHeaderValues.APPLICATION_JSON); + if (nameResolver != null) { + requestBuilder.setNameResolver(nameResolver); + } + Response response = requestBuilder.execute().toCompletableFuture().get(); Metadata metadata; try (InputStream inputStream = response.getResponseBodyAsStream()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java index cb4c2a551d01e..be94b9c990ae1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.client.impl.auth.oauth2.protocol; +import io.netty.resolver.NameResolver; import java.io.IOException; +import java.net.InetAddress; import java.net.URL; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; @@ -29,6 +31,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.BoundRequestBuilder; import org.asynchttpclient.Response; /** @@ -38,10 +41,12 @@ public class TokenClient implements ClientCredentialsExchanger { private final URL tokenUrl; private final AsyncHttpClient httpClient; + private final NameResolver nameResolver; - public TokenClient(URL tokenUrl, AsyncHttpClient httpClient) { + public TokenClient(URL tokenUrl, AsyncHttpClient httpClient, NameResolver nameResolver) { this.httpClient = httpClient; this.tokenUrl = tokenUrl; + this.nameResolver = nameResolver; } @Override @@ -85,13 +90,14 @@ public TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest re String body = buildClientCredentialsBody(req); try { - - Response res = httpClient.preparePost(tokenUrl.toString()) + BoundRequestBuilder requestBuilder = httpClient.preparePost(tokenUrl.toString()) .setHeader("Accept", "application/json") .setHeader("Content-Type", "application/x-www-form-urlencoded") - .setBody(body) - .execute() - .get(); + .setBody(body); + if (nameResolver != null) { + requestBuilder.setNameResolver(nameResolver); + } + Response res = requestBuilder.execute().get(); switch (res.getStatusCode()) { case 200: diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AuthenticationInitContextImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AuthenticationInitContextImplTest.java new file mode 100644 index 0000000000000..64acf78a99bfa --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AuthenticationInitContextImplTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotSame; +import static org.testng.Assert.assertTrue; +import io.netty.channel.EventLoopGroup; +import io.netty.resolver.NameResolver; +import io.netty.util.Timer; +import java.net.InetAddress; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +@Slf4j +public class AuthenticationInitContextImplTest { + @Test + public void testConstructorWithSharedResources() { + EventLoopGroup eventLoopGroup = Mockito.mock(EventLoopGroup.class); + Timer timer = Mockito.mock(Timer.class); + NameResolver nameResolver = Mockito.mock(NameResolver.class); + + AuthenticationInitContextImpl context = new AuthenticationInitContextImpl( + eventLoopGroup, timer, nameResolver); + + Optional retrievedEventLoop = context.getService(EventLoopGroup.class); + Optional retrievedTimer = context.getService(Timer.class); + Optional retrievedNameResolver = context.getService(NameResolver.class); + + assertTrue(retrievedEventLoop.isPresent()); + assertTrue(retrievedTimer.isPresent()); + assertTrue(retrievedNameResolver.isPresent()); + assertEquals(retrievedEventLoop.get(), eventLoopGroup); + assertEquals(retrievedTimer.get(), timer); + assertEquals(retrievedNameResolver.get(), nameResolver); + } + + @Test + public void testConstructorWithNullResources() { + AuthenticationInitContextImpl context = new AuthenticationInitContextImpl( + null, null, null); + + Optional eventLoop = context.getService(EventLoopGroup.class); + Optional timer = context.getService(Timer.class); + Optional nameResolver = context.getService(NameResolver.class); + + assertFalse(eventLoop.isPresent()); + assertFalse(timer.isPresent()); + assertFalse(nameResolver.isPresent()); + } + + @Test + public void testAddAndGetService() { + AuthenticationInitContextImpl context = new AuthenticationInitContextImpl(null, null, null); + String testService = "Test Service"; + + context.addService(String.class, testService); + + Optional retrieved = context.getService(String.class); + assertTrue(retrieved.isPresent()); + assertEquals(retrieved.get(), testService); + } + + @Test + public void testAddAndGetNamedService() { + AuthenticationInitContextImpl context = new AuthenticationInitContextImpl(null, null, null); + String namedService1 = "Named Service 1"; + String namedService2 = "Named Service 2"; + + context.addService(String.class, "name1", namedService1); + context.addService(String.class, "name2", namedService2); + + Optional retrieved1 = context.getServiceByName(String.class, "name1"); + Optional retrieved2 = context.getServiceByName(String.class, "name2"); + Optional notFound = context.getServiceByName(String.class, "nonexistent"); + + assertTrue(retrieved1.isPresent()); + assertEquals(retrieved1.get(), namedService1); + assertTrue(retrieved2.isPresent()); + assertEquals(retrieved2.get(), namedService2); + assertFalse(notFound.isPresent()); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testGetServiceWithNullClass() { + AuthenticationInitContextImpl context = new AuthenticationInitContextImpl(null, null, null); + + context.getService(null); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testGetServiceByNameWithNullName() { + AuthenticationInitContextImpl context = new AuthenticationInitContextImpl(null, null, null); + + context.getServiceByName(String.class, null); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testGetServiceByNameWithEmptyName() { + AuthenticationInitContextImpl context = new AuthenticationInitContextImpl(null, null, null); + + context.getServiceByName(String.class, ""); + } + + @Test + public void testNamedServicesAreIndependent() { + AuthenticationInitContextImpl context = new AuthenticationInitContextImpl(null, null, null); + Object service1 = new Object(); + Object service2 = new Object(); + + context.addService(Object.class, "group1", service1); + context.addService(Object.class, "group2", service2); + + Optional fromGroup1 = context.getServiceByName(Object.class, "group1"); + Optional fromGroup2 = context.getServiceByName(Object.class, "group2"); + + assertTrue(fromGroup1.isPresent()); + assertTrue(fromGroup2.isPresent()); + assertNotSame(fromGroup1.get(), fromGroup2.get()); + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java index f7ff30c286c76..344c7c209ed34 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientInitializationTest.java @@ -25,6 +25,7 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.mockito.Mockito; import org.testng.annotations.Test; public class ClientInitializationTest { @@ -40,7 +41,7 @@ public void testInitializeAuthWithTls() throws PulsarClientException { .authentication(auth) .build(); - verify(auth).start(); + verify(auth).start(Mockito.any()); verify(auth, times(0)).getAuthData(); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/http/AuthenticationHttpClientFactoryTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/http/AuthenticationHttpClientFactoryTest.java new file mode 100644 index 0000000000000..1da961dddbfbf --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/http/AuthenticationHttpClientFactoryTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.http; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import io.netty.resolver.NameResolver; +import io.netty.util.Timer; +import java.io.IOException; +import java.net.InetAddress; +import java.util.Optional; +import org.apache.pulsar.client.api.AuthenticationInitContext; +import org.apache.pulsar.client.impl.auth.httpclient.AuthenticationHttpClientConfig; +import org.apache.pulsar.client.impl.auth.httpclient.AuthenticationHttpClientFactory; +import org.asynchttpclient.AsyncHttpClient; +import org.mockito.Mockito; +import org.testng.annotations.Test; + + +public class AuthenticationHttpClientFactoryTest { + + @Test + public void testCreateHttpClientWithDefaultConfig() throws IOException { + AuthenticationHttpClientConfig config = AuthenticationHttpClientConfig.builder().build(); + AuthenticationHttpClientFactory factory = new AuthenticationHttpClientFactory(config, null); + + AsyncHttpClient httpClient = factory.createHttpClient(); + + assertNotNull(httpClient); + httpClient.close(); + } + + @Test + public void testGetNameResolverWithContext() { + AuthenticationHttpClientConfig config = AuthenticationHttpClientConfig.builder().build(); + AuthenticationInitContext context = Mockito.mock(AuthenticationInitContext.class); + NameResolver mockResolver = Mockito.mock(NameResolver.class); + + Mockito.when(context.getService(NameResolver.class)) + .thenReturn(Optional.of(mockResolver)); + + AuthenticationHttpClientFactory factory = new AuthenticationHttpClientFactory(config, context); + + NameResolver resolver = factory.getNameResolver(); + + assertNotNull(resolver); + assertEquals(resolver, mockResolver); + } + + @Test + public void testGetNameResolverWithoutContext() { + AuthenticationHttpClientConfig config = AuthenticationHttpClientConfig.builder().build(); + AuthenticationHttpClientFactory factory = new AuthenticationHttpClientFactory(config, null); + + NameResolver resolver = factory.getNameResolver(); + + assertNull(resolver); + } + + @Test + public void testGetNameResolverWithContextButNoResolver() { + AuthenticationHttpClientConfig config = AuthenticationHttpClientConfig.builder().build(); + AuthenticationInitContext context = Mockito.mock(AuthenticationInitContext.class); + + Mockito.when(context.getService(NameResolver.class)) + .thenReturn(Optional.empty()); + + AuthenticationHttpClientFactory factory = new AuthenticationHttpClientFactory(config, context); + + NameResolver resolver = factory.getNameResolver(); + + assertNull(resolver); + } + + @Test + public void testHttpClientUsesSharedResources() throws Exception { + AuthenticationHttpClientConfig config = AuthenticationHttpClientConfig.builder().build(); + AuthenticationInitContext context = Mockito.mock(AuthenticationInitContext.class); + Timer mockTimer = Mockito.mock(Timer.class); + + Mockito.when(context.getService(Timer.class)) + .thenReturn(Optional.of(mockTimer)); + + AuthenticationHttpClientFactory factory = new AuthenticationHttpClientFactory(config, context); + + AsyncHttpClient httpClient = factory.createHttpClient(); + assertNotNull(httpClient); + assertEquals(httpClient.getConfig().getNettyTimer(), mockTimer); + httpClient.close(); + mockTimer.stop(); + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java index aef69be74e120..026d40766b4cb 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.AuthenticationInitContext; import org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver; import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; import org.testng.annotations.BeforeMethod; @@ -57,6 +58,7 @@ public void before() { this.auth = new AuthenticationOAuth2(flow, this.clock); } + @Test public void testGetAuthMethodName() { assertEquals(this.auth.getAuthMethodName(), "token"); @@ -141,4 +143,41 @@ public void testClose() throws Exception { this.auth.close(); verify(this.flow).close(); } + + @Test + public void testStartWithAuthenticationInitContext() throws Exception { + // Given: Mock AuthenticationInitContext + AuthenticationInitContext initContext = mock(AuthenticationInitContext.class); + + // Configure with required parameters + Map params = new HashMap<>(); + params.put("type", "client_credentials"); + params.put("privateKey", "data:base64,e30="); + params.put("issuerUrl", "http://localhost"); + ObjectMapper mapper = new ObjectMapper(); + String authParams = mapper.writeValueAsString(params); + auth.configure(authParams); + + // When: Call start with context + auth.start(initContext); + + // Then: Verify flow is initialized + assertNotNull(auth.flow); + } + + @Test + public void testStartWithContextOnlyInitializesOnce() throws Exception { + // Given: Mock AuthenticationInitContext + AuthenticationInitContext initContext = mock(AuthenticationInitContext.class); + + // Create a new auth instance with pre-existing flow + Flow existingFlow = mock(Flow.class); + AuthenticationOAuth2 auth = new AuthenticationOAuth2(existingFlow, this.clock); + + // When: Call start with context (flow already exists) + auth.start(initContext); + + // Then: Initialize should still be called + verify(existingFlow).initialize(); + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java index 131427682076e..9dd82e77abe90 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java @@ -42,7 +42,7 @@ public void exchangeClientCredentialsSuccessByScopeTest() throws IOException, TokenExchangeException, ExecutionException, InterruptedException { DefaultAsyncHttpClient defaultAsyncHttpClient = mock(DefaultAsyncHttpClient.class); URL url = new URL("http://localhost"); - TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient); + TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient, null); ClientCredentialsExchangeRequest request = ClientCredentialsExchangeRequest.builder() .audience("test-audience") .clientId("test-client-id") @@ -75,7 +75,7 @@ public void exchangeClientCredentialsSuccessWithoutOptionalClientCredentialsTest IOException, TokenExchangeException, ExecutionException, InterruptedException { DefaultAsyncHttpClient defaultAsyncHttpClient = mock(DefaultAsyncHttpClient.class); URL url = new URL("http://localhost"); - TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient); + TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient, null); ClientCredentialsExchangeRequest request = ClientCredentialsExchangeRequest.builder() .clientId("test-client-id") .clientSecret("test-client-secret")