Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ default AuthenticationDataProvider getAuthData(String brokerHostName) throws Pul
*/
void start() throws PulsarClientException;

/**
* Initialize the authentication provider with {@link AuthenticationInitContext}.
*/
default void start(AuthenticationInitContext context) throws PulsarClientException {
start();
}

/**
* An authentication Stage.
* when authentication complete, passed-in authFuture will contains authentication related http request headers.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;


import java.util.Optional;

/**
* Authentication initialization context that provides access to shared services and resources
* during the authentication provider initialization phase.
*
* <p>This context enables authentication implementations to use shared resources such as
* thread pools, DNS resolvers, and timers that are managed by the Pulsar client, rather than
* creating their own instances. This improves resource utilization and performance, especially
* when multiple client instances or authentication providers are used within the same application.
*
* <p>Authentication providers should prefer using shared resources when available to minimize
* system resource consumption and improve performance through better resource reuse.
*
* @see Authentication
*/
public interface AuthenticationInitContext {
/**
* Retrieves a shared service instance by its class type.
*
* <p>This method looks up a globally registered service which is shared among
* all authentication providers.
*
* <p><strong>Example:</strong>
* <pre>{@code
* Optional<EventLoopGroup> eventLoop = context.getService(EventLoopGroup.class);
* if (eventLoop.isPresent()) {
* // Use the shared event loop group
* this.eventLoopGroup = eventLoop.get();
* } else {
* // Fallback to creating a new instance
* this.eventLoopGroup = new NioEventLoopGroup();
* }
* }</pre>
*
* @param <T> The type of service to retrieve
* @param serviceClass The class of the service to retrieve
* @return An {@link Optional} containing the service instance if available,
* or {@link Optional#empty()} if no such service is registered
*/
<T> Optional<T> getService(Class<T> serviceClass);

/**
* Retrieves a named shared service instance by its class type and name.
*
* <p>This method allows lookup of services that are registered under a specific
* name, enabling multiple instances of the same service type to be distinguished.
* This is useful for:
* <ul>
* <li>Specialized DNS resolvers (e.g., "secure-dns", "internal-dns")
* <li>Different thread pools for various purposes
* <li>Multiple timer instances with different configurations
* <li>HTTP clients with different proxy configurations
* </ul>
*
* <p><strong>Example:</strong>
* <pre>{@code
* // Get a DNS resolver configured for internal network resolution
* Optional<NameResolver> internalResolver =
* context.getServiceByName(NameResolver.class, "internal-network");
*
* // Get a different DNS resolver for external resolution
* Optional<NameResolver> externalResolver =
* context.getServiceByName(NameResolver.class, "external-network");
* }</pre>
*
* @param <T> The type of service to retrieve
* @param serviceClass The class of the service to retrieve. Cannot be null.
* @param name The name under which the service is registered. Cannot be null or empty.
* @return An {@link Optional} containing the named service instance if available,
* or {@link Optional#empty()} if no such service is registered with the given name
*/
<T> Optional<T> getServiceByName(Class<T> serviceClass, String name);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import io.netty.channel.EventLoopGroup;
import io.netty.resolver.NameResolver;
import io.netty.util.Timer;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.client.api.AuthenticationInitContext;

public class AuthenticationInitContextImpl implements AuthenticationInitContext {
private final Map<Class<?>, Object> sharedServices = new HashMap<>();
private final Map<String, Map<Class<?>, Object>> namedServices = new HashMap<>();

public AuthenticationInitContextImpl(EventLoopGroup eventLoopGroup,
Timer timer,
NameResolver<InetAddress> nameResolver) {
this.sharedServices.put(EventLoopGroup.class, eventLoopGroup);
this.sharedServices.put(Timer.class, timer);
this.sharedServices.put(NameResolver.class, nameResolver);
}

@Override
@SuppressWarnings("unchecked")
public <T> Optional<T> getService(Class<T> serviceClass) {
if (serviceClass == null) {
throw new IllegalArgumentException("Service class cannot be null");
}
return Optional.ofNullable((T) sharedServices.get(serviceClass));
}

@Override
@SuppressWarnings("unchecked")
public <T> Optional<T> getServiceByName(Class<T> serviceClass, String name) {
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException("Service name cannot be null or empty");
}
Map<Class<?>, Object> services = namedServices.get(name);
if (services != null) {
return Optional.ofNullable((T) services.get(serviceClass));
}
return Optional.empty();
}

public <T> void addService(Class<T> serviceClass, T instance) {
sharedServices.put(serviceClass, instance);
}

public <T> void addService(Class<T> serviceClass, String name, T instance) {
namedServices.computeIfAbsent(name, k -> new HashMap<>())
.put(serviceClass, instance);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr
this.eventLoopGroup = eventLoopGroupReference;
this.instrumentProvider = new InstrumentProvider(conf.getOpenTelemetry());
clientClock = conf.getClock();
conf.getAuthentication().start();
this.scheduledExecutorProvider = scheduledExecutorProvider != null ? scheduledExecutorProvider :
PulsarClientResourcesConfigurer.createScheduledExecutorProvider(conf);
if (connectionPool != null) {
Expand Down Expand Up @@ -270,6 +269,7 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr
} else {
this.timer = timer;
}
conf.getAuthentication().start(buildAuthenticationInitContext());
lookup = createLookup(conf.getServiceUrl());

if (conf.getServiceUrlProvider() != null) {
Expand Down Expand Up @@ -306,6 +306,13 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr
throw t;
}
}
private AuthenticationInitContextImpl buildAuthenticationInitContext() {
return new AuthenticationInitContextImpl(
createdEventLoopGroup ? null : eventLoopGroup,
needStopTimer ? null : timer,
addressResolver == null ? null : DnsResolverUtil.adaptToNameResolver(addressResolver)
);
}

private void reduceConsumerReceiverQueueSize() {
for (ConsumerBase<?> consumer : consumers) {
Expand Down Expand Up @@ -1086,7 +1093,7 @@ public void updateAuthentication(Authentication authentication) throws IOExcepti
conf.getAuthentication().close();
}
conf.setAuthentication(authentication);
conf.getAuthentication().start();
conf.getAuthentication().start(buildAuthenticationInitContext());
}

public void updateTlsTrustCertsFilePath(String tlsTrustCertsFilePath) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.auth.httpclient;


import lombok.Builder;
import lombok.Getter;

/**
* Configuration for HTTP clients used in authentication providers.
*
* <p>This class encapsulates the configuration parameters needed to create HTTP clients
* for authentication-related HTTP requests, such as OAuth2 token exchange requests.
*
* <h2>Configuration Parameters</h2>
* <ul>
* <li><strong>readTimeout</strong>: Maximum time to wait for a response from the server
* (default: 30000 ms)
* <li><strong>connectTimeout</strong>: Maximum time to establish a connection
* (default: 10000 ms)
* <li><strong>trustCertsFilePath</strong>: Path to a custom trust certificate file
* for TLS/SSL connections
* </ul>
*
* <h2>Usage</h2>
* <pre>{@code
* // Using the builder pattern
* AuthenticationHttpClientConfig config = AuthenticationHttpClientConfig.builder()
* .readTimeout(30000)
* .connectTimeout(10000)
* .trustCertsFilePath("/path/to/certs.pem")
* .build();
*
* // Using the factory
* AuthenticationHttpClientFactory factory = new AuthenticationHttpClientFactory(config, context);
* AsyncHttpClient httpClient = factory.createHttpClient();
* }</pre>
*
* <p>This configuration is primarily used by {@link AuthenticationHttpClientFactory} to
* create properly configured HTTP clients for authentication providers.
*
* @see AuthenticationHttpClientFactory
* @see org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
*/
@Getter
public class AuthenticationHttpClientConfig {
private int readTimeout = 30000;
private int connectTimeout = 10000;
private final String trustCertsFilePath;

@Builder(builderClassName = "ConfigBuilder")
public AuthenticationHttpClientConfig(int readTimeout, int connectTimeout, String trustCertsFilePath) {
this.readTimeout = readTimeout > 0 ? readTimeout : this.readTimeout;
this.connectTimeout = connectTimeout > 0 ? connectTimeout : this.connectTimeout;
this.trustCertsFilePath = trustCertsFilePath;
}
}
Loading