diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java
index e2bf4dcc0156d..285b1a7b912e8 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java
@@ -244,4 +244,17 @@ public void close() throws IOException {
provider.close();
}
}
+
+ /**
+ * Creates a binary authentication session for Pulsar's binary protocol.
+ *
+ * This method initializes a {@link BinaryAuthSession} using the provided authentication context.
+ * Both the proxy and broker can use this method to perform binary protocol authentication.
+ *
+ * @param ctx the binary authentication context containing authentication state and credentials
+ * @return a new {@link BinaryAuthSession} instance
+ */
+ public BinaryAuthSession createBinaryAuthSession(BinaryAuthContext ctx) {
+ return new BinaryAuthSession(ctx);
+ }
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthContext.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthContext.java
new file mode 100644
index 0000000000000..5739156764441
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthContext.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import java.net.SocketAddress;
+import java.util.concurrent.Executor;
+import java.util.function.Supplier;
+import javax.net.ssl.SSLSession;
+import lombok.Builder;
+import lombok.Getter;
+import org.apache.pulsar.common.api.proto.CommandConnect;
+
+/**
+ * Context object that encapsulates all information required to perform binary protocol
+ * authentication for a client connection.
+ *
+ * This context is used by {@link BinaryAuthSession} to manage authentication state,
+ * credentials, and related connection details during the authentication process.
+ */
+@Getter
+@Builder
+public class BinaryAuthContext {
+ /**
+ * The {@link CommandConnect} object representing the client's connection request.
+ */
+ private CommandConnect commandConnect;
+
+ /**
+ * The {@link SSLSession} associated with the connection, if TLS/SSL is used.
+ *
+ *
May be {@code null} for non-TLS connections. When present, authenticators
+ * can use session details (peer certificates, cipher suite, etc.) as part of
+ * the authentication decision.
+ */
+ private SSLSession sslSession;
+
+ /**
+ * The {@link AuthenticationService} used to perform authentication operations.
+ *
+ *
This is typically the broker-level service that coordinates available
+ * authentication providers and performs lifecycle operations such as
+ * verifying credentials or initiating challenges.
+ */
+ private AuthenticationService authenticationService;
+
+ /**
+ * The executor used to perform asynchronous authentication operations.
+ *
+ * This should be the Netty event loop executor associated with the current connection,
+ * ensuring that authentication tasks run on the same event loop thread.
+ */
+ private Executor executor;
+
+ /**
+ * The remote {@link SocketAddress} of the client initiating the connection.
+ *
+ *
This may be used for audit, logging, access control decisions, or for
+ * binding authentication state to the client's address.
+ */
+ private SocketAddress remoteAddress;
+
+ /**
+ * Indicates whether to authenticate the client's original authentication data
+ * instead of simply trusting the provided principal.
+ *
+ * When set to {@code true}, the session re-validates the original authentication data
+ * sent by the client. When set to {@code false}, it skips re-authentication
+ * and only authorizes the provided principal if necessary.
+ */
+ private boolean authenticateOriginalAuthData;
+
+ /**
+ * A supplier that indicates whether the current connection is still in the
+ * initial connect phase.
+ *
+ *
When this supplier returns {@code true}, the connection is being
+ * established and the broker should treat the incoming data as part of the
+ * initial connect handling. When it returns {@code false}, the
+ * client is already marked connected; subsequent authentication events
+ * represent refreshes or re-authentication.
+ *
+ *
Using a supplier allows deferred evaluation of the initial-connect state
+ * (for example, if connection state may change between when the context is
+ * created and when authentication is executed).
+ */
+ private Supplier isInitialConnectSupplier;
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java
new file mode 100644
index 0000000000000..a95976e9a0c7f
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import static org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE;
+import java.util.concurrent.CompletableFuture;
+import javax.naming.AuthenticationException;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.api.AuthData;
+import org.jspecify.annotations.NonNull;
+
+/**
+ * Represents a per-connection authentication session for a client using the broker's binary protocol.
+ *
+ * This class manages the complete authentication lifecycle for a single client connection.
+ * It tracks the current {@code AuthenticationState}, authentication provider, method, and role,
+ * as well as the resolved {@code AuthenticationDataSource}. When a proxy is involved, it can
+ * also store the original credentials and authentication state forwarded by the proxy.
+ *
+ *
{@code BinaryAuthSession} handles both the initial authentication (CONNECT) and
+ * subsequent re-authentication or credential refresh flows. All asynchronous operations
+ * are executed using the {@link BinaryAuthContext#getExecutor() executor} provided by the
+ * associated {@link BinaryAuthContext}.
+ *
+ *
The session supports two main connection scenarios:
+ *
+ *
Direct client-to-broker connections:
+ *
+ * - The client and broker may exchange authentication data multiple times until
+ * authentication is complete.
+ * - If credentials expire, the broker requests the client to refresh them,
+ * ensuring that the role remains consistent across refreshes.
+ *
+ *
+ * Client-to-broker connections via a proxy:
+ *
+ * - The proxy may optionally forward the original client's authentication data.
+ * - The broker first authenticates the proxy, then optionally validates the
+ * original client's credentials if forwarded.
+ * - {@code originalAuthState} is non-null when the proxy has forwarded the original
+ * authentication data and the broker is configured to authenticate it.
+ * - Proxy authentication does not expire. The proxy acts as a transparent intermediary,
+ * and subsequent client credential refreshes occur directly between the client and broker.
+ *
+ */
+@Slf4j
+@Getter
+public class BinaryAuthSession {
+ private static final byte[] emptyArray = new byte[0];
+
+ /// Current authentication state of the connected client.
+ private AuthenticationState authState;
+ // Authentication method used by the connected client.
+ private String authMethod;
+ // Role of the connected client as determined by authentication.
+ private String authRole = null;
+ // Authentication data for the connected client (volatile for visibility across threads).
+ private volatile AuthenticationDataSource authenticationData;
+ // Authentication provider associated with this session.
+ private AuthenticationProvider authenticationProvider;
+
+ // Original authentication method forwarded by proxy, if any.
+ private String originalAuthMethod;
+ // Original principal forwarded by proxy, if any.
+ private String originalPrincipal = null;
+ // Original authentication state forwarded by proxy, if any.
+ private AuthenticationState originalAuthState;
+ // Original authentication data forwarded by proxy (volatile for thread visibility).
+ private volatile AuthenticationDataSource originalAuthData;
+ // Keep temporarily in order to verify after verifying proxy's authData
+ private AuthData originalAuthDataCopy;
+
+ // Context holding connection-specific data needed for authentication.
+ private final BinaryAuthContext context;
+
+ // Default authentication result returned after successful initial authentication
+ private AuthResult defaultAuthResult;
+ // Indicates whether the client supports authentication refresh.
+ private boolean supportsAuthRefresh;
+
+ public BinaryAuthSession(@NonNull BinaryAuthContext context) {
+ this.context = context;
+ }
+
+ /**
+ * Performs the initial authentication process for the client connection.
+ *
+ * This method handles both standard CONNECT authentication and optional original credentials
+ * forwarded by a proxy. Authentication may be asynchronous and results in a {@link AuthResult}.
+ *
+ * @return a {@link CompletableFuture} that completes with the authentication result
+ */
+ public CompletableFuture doAuthentication() {
+ var connect = context.getCommandConnect();
+ try {
+ supportsAuthRefresh = connect.getFeatureFlags().hasSupportsAuthRefresh() && connect.getFeatureFlags()
+ .isSupportsAuthRefresh();
+ var authData = connect.hasAuthData() ? connect.getAuthData() : emptyArray;
+ var clientData = AuthData.of(authData);
+ // init authentication
+ if (connect.hasAuthMethodName()) {
+ authMethod = connect.getAuthMethodName();
+ } else if (connect.hasAuthMethod()) {
+ // Legacy client is passing enum
+ authMethod = connect.getAuthMethod().name().substring(10).toLowerCase();
+ } else {
+ authMethod = "none";
+ }
+
+ defaultAuthResult = AuthResult.builder().clientProtocolVersion(connect.getProtocolVersion())
+ .clientVersion(connect.hasClientVersion() ? connect.getClientVersion() : "")
+ .build();
+
+ authenticationProvider = context.getAuthenticationService().getAuthenticationProvider(authMethod);
+ // Not find provider named authMethod. Most used for tests.
+ // In AuthenticationDisabled, it will set authMethod "none".
+ if (authenticationProvider == null) {
+ authRole = context.getAuthenticationService().getAnonymousUserRole()
+ .orElseThrow(() ->
+ new AuthenticationException(
+ "No anonymous role, and no authentication provider configured"));
+ return CompletableFuture.completedFuture(defaultAuthResult);
+ }
+
+ authState =
+ authenticationProvider.newAuthState(clientData, context.getRemoteAddress(),
+ context.getSslSession());
+
+ if (log.isDebugEnabled()) {
+ String role = "";
+ if (authState != null && authState.isComplete()) {
+ role = authState.getAuthRole();
+ } else {
+ role = "authentication incomplete or null";
+ }
+ log.debug("[{}] Authenticate role : {}", context.getRemoteAddress(), role);
+ }
+
+ if (connect.hasOriginalPrincipal() && context.isAuthenticateOriginalAuthData()
+ && !WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE.equals(connect.getOriginalPrincipal())) {
+ // Flow:
+ // 1. Initialize original authentication.
+ // 2. Authenticate the proxy's authentication data.
+ // 3. Authenticate the original authentication data.
+ if (connect.hasOriginalAuthMethod()) {
+ originalAuthMethod = connect.getOriginalAuthMethod();
+ } else {
+ originalAuthMethod = "none";
+ }
+
+ var originalAuthenticationProvider =
+ context.getAuthenticationService().getAuthenticationProvider(originalAuthMethod);
+
+ /**
+ * When both the broker and the proxy are configured with anonymousUserRole
+ * if the client does not configure an authentication method
+ * the proxy side will set the value of anonymousUserRole to clientAuthRole when it creates a connection
+ * and the value of clientAuthMethod will be none.
+ * Similarly, should also set the value of authRole to anonymousUserRole on the broker side.
+ */
+ if (originalAuthenticationProvider == null) {
+ authRole = context.getAuthenticationService().getAnonymousUserRole()
+ .orElseThrow(() ->
+ new AuthenticationException("No anonymous role, and can't find "
+ + "AuthenticationProvider for original role using auth method "
+ + "[" + originalAuthMethod + "] is not available"));
+ originalPrincipal = authRole;
+ return CompletableFuture.completedFuture(defaultAuthResult);
+ }
+
+ originalAuthDataCopy = AuthData.of(connect.getOriginalAuthData().getBytes());
+ originalAuthState = originalAuthenticationProvider.newAuthState(
+ originalAuthDataCopy,
+ context.getRemoteAddress(),
+ context.getSslSession());
+ } else if (connect.hasOriginalPrincipal()) {
+ originalPrincipal = connect.getOriginalPrincipal();
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Setting original role (forwarded from proxy): {}",
+ context.getRemoteAddress(), originalPrincipal);
+ }
+ }
+
+ return authChallenge(clientData, false, connect.getProtocolVersion(),
+ connect.hasClientVersion() ? connect.getClientVersion() : "");
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+
+ /**
+ * Processes the authentication step when the broker receives an authentication response from the client.
+ *
+ * If {@code useOriginalAuthState} is {@code true}, the authentication is performed
+ * against the original credentials forwarded by a proxy. Otherwise, the primary
+ * session {@code authState} is used.
+ */
+ public CompletableFuture authChallenge(AuthData clientData,
+ boolean useOriginalAuthState,
+ int clientProtocolVersion,
+ String clientVersion) {
+ // The original auth state can only be set on subsequent auth attempts (and only
+ // in presence of a proxy and if the proxy is forwarding the credentials).
+ // In this case, the re-validation needs to be done against the original client
+ // credentials.
+ AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState;
+ String authRole = useOriginalAuthState ? originalPrincipal : this.authRole;
+ if (log.isDebugEnabled()) {
+ log.debug("Authenticate using original auth state : {}, role = {}", useOriginalAuthState, authRole);
+ }
+ return authState
+ .authenticateAsync(clientData)
+ .thenComposeAsync((authChallenge) -> authChallengeSuccessCallback(authChallenge,
+ useOriginalAuthState, authRole, clientProtocolVersion, clientVersion),
+ context.getExecutor());
+ }
+
+ /**
+ * Callback invoked when an authentication step completes on the {@link AuthenticationState}.
+ *
+ * If {@code authChallenge} is non-null, the authentication exchange is not yet complete.
+ * An {@link AuthResult} containing the challenge bytes is returned for the broker or proxy
+ * to send to the client. This method does not send data itself.
+ *
+ *
If {@code authChallenge} is null, the authentication step is complete. In that case, this
+ * method will:
+ *
+ * - For the initial connection: set the resolved authentication data and role, and
+ * optionally authenticate original proxy-forwarded credentials.
+ * - For a refresh: validate that the role remains the same and update stored authentication
+ * data accordingly.
+ *
+ */
+ public CompletableFuture authChallengeSuccessCallback(AuthData authChallenge,
+ boolean useOriginalAuthState,
+ String authRole,
+ int clientProtocolVersion,
+ String clientVersion) {
+ try {
+ if (authChallenge == null) {
+ // Authentication has completed. It was either:
+ // 1. the 1st time the authentication process was done, in which case we'll send
+ // a `CommandConnected` response
+ // 2. an authentication refresh, in which case we need to refresh authenticationData
+ AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState;
+ String newAuthRole = authState.getAuthRole();
+ AuthenticationDataSource newAuthDataSource = authState.getAuthDataSource();
+
+ if (context.getIsInitialConnectSupplier().get()) {
+ // Set the auth data and auth role
+ if (!useOriginalAuthState) {
+ this.authRole = newAuthRole;
+ this.authenticationData = newAuthDataSource;
+ }
+ // First time authentication is done
+ if (originalAuthState != null) {
+ // We only set originalAuthState when we are going to use it.
+ return authenticateOriginalData().thenApply(
+ __ -> defaultAuthResult);
+ } else {
+ return CompletableFuture.completedFuture(defaultAuthResult);
+ }
+ } else {
+ // If the connection was already ready, it means we're doing a refresh
+ if (!StringUtils.isEmpty(authRole)) {
+ if (!authRole.equals(newAuthRole)) {
+ log.warn("[{}] Principal cannot change during an authentication refresh expected={} got={}",
+ context.getRemoteAddress(), authRole, newAuthRole);
+ return CompletableFuture.failedFuture(
+ new AuthenticationException("Auth role does not match previous role"));
+ }
+ }
+ // Refresh authentication data
+ if (!useOriginalAuthState) {
+ this.authenticationData = newAuthDataSource;
+ } else {
+ this.originalAuthData = newAuthDataSource;
+ }
+ log.info("[{}] Refreshed authentication credentials for role {}",
+ context.getRemoteAddress(), authRole);
+ }
+ } else {
+ // auth not complete, continue auth with client side.
+ return CompletableFuture.completedFuture(AuthResult.builder()
+ .clientProtocolVersion(clientProtocolVersion)
+ .clientVersion(clientVersion)
+ .authMethod(authMethod)
+ .authData(authChallenge)
+ .build());
+ }
+ } catch (Exception | AssertionError e) {
+ return CompletableFuture.failedFuture(e);
+ }
+
+ return CompletableFuture.completedFuture(defaultAuthResult);
+ }
+
+ /**
+ * Performs authentication of the original client credentials forwarded by a proxy.
+ */
+ private CompletableFuture authenticateOriginalData() {
+ return originalAuthState
+ .authenticateAsync(originalAuthDataCopy)
+ .thenComposeAsync((authChallenge) -> {
+ if (authChallenge != null) {
+ // The protocol does not yet handle an auth challenge here.
+ // See https://github.com/apache/pulsar/issues/19291.
+ return CompletableFuture.failedFuture(
+ new AuthenticationException("Failed to authenticate original auth data "
+ + "due to unsupported authChallenge."));
+ } else {
+ try {
+ // No need to retain these bytes anymore
+ originalAuthDataCopy = null;
+ originalAuthData = originalAuthState.getAuthDataSource();
+ originalPrincipal = originalAuthState.getAuthRole();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Authenticated original role (forwarded from proxy): {}",
+ context.getRemoteAddress(), originalPrincipal);
+ }
+ return CompletableFuture.completedFuture(null);
+ } catch (Exception | AssertionError e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+ }, context.getExecutor());
+ }
+
+ /**
+ * Returns whether the current effective authentication state for this session has expired.
+ *
+ * If the session has an {@code originalAuthState} forwarded by a proxy, that state is
+ * checked first. Otherwise, the session's primary {@code authState} is used.
+ */
+ public boolean isExpired() {
+ if (originalAuthState != null) {
+ return originalAuthState.isExpired();
+ }
+ return authState.isExpired();
+ }
+
+ /**
+ * Determines whether the session supports authentication refresh.
+ *
+ *
Refresh is not supported when:
+ *
+ * - the client indicated it does not support auth refresh via feature flags
+ * - the session is a proxied connection with an original principal but the proxy did not forward original
+ * credentials (so re-validation of the original user is impossible)
+ *
+ */
+ public boolean supportsAuthenticationRefresh() {
+ if (originalPrincipal != null && originalAuthState == null) {
+ // This case is only checked when the authState is expired because we've reached a point where
+ // authentication needs to be refreshed, but the protocol does not support it unless the proxy forwards
+ // the originalAuthData.
+ log.info(
+ "[{}] Cannot revalidate user credential when using proxy and"
+ + " not forwarding the credentials.",
+ context.getRemoteAddress());
+ return false;
+ }
+
+ if (!supportsAuthRefresh) {
+ log.warn("[{}] Client doesn't support auth credentials refresh",
+ context.getRemoteAddress());
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Refreshes the authentication credentials for this session.
+ *
+ * If the session has an {@code originalAuthState} (i.e., credentials forwarded by a proxy
+ * and broker is configured to authenticate them), the refresh is performed on that state.
+ * Otherwise, the primary {@code authState} is refreshed.
+ *
+ *
The returned {@link AuthResult} contains the updated authentication data and the
+ * corresponding authentication method.
+ */
+ public AuthResult refreshAuthentication() throws AuthenticationException {
+ if (originalAuthState != null) {
+ return AuthResult.builder()
+ .authData(originalAuthState.refreshAuthentication())
+ .authMethod(originalAuthMethod)
+ .build();
+ }
+ return AuthResult.builder()
+ .authData(authState.refreshAuthentication())
+ .authMethod(authMethod)
+ .build();
+ }
+
+ /**
+ * Result container for an authentication operation performed by {@link BinaryAuthSession}.
+ *
+ *
Holds the optional client protocol/version metadata and the authentication payload
+ * produced by the underlying authentication provider. This object is returned to the
+ * broker/proxy to indicate either a completed authentication (no authData) or a pending
+ * authentication exchange that requires sending {@code authData} back to the client.
+ */
+ @Builder
+ @Getter
+ public static class AuthResult {
+ /**
+ * Client protocol version used to format protocol-level responses.
+ *
+ *
This value is used by the broker or proxy when building response frames so the
+ * client can interpret any returned authentication bytes correctly.
+ */
+ int clientProtocolVersion;
+
+ /**
+ * Human-readable client version string, if provided by the client.
+ */
+ String clientVersion;
+
+ /**
+ * Authentication data produced by the authentication provider.
+ *
+ *
When non-null, these bytes represent a challenge or credentials that must be
+ * sent to the client to continue the authentication handshake. When null, no further
+ * client exchange is required and authentication is considered complete.
+ */
+ AuthData authData;
+
+ /**
+ * Identifier of the authentication method associated with {@code authData}.
+ */
+ String authMethod;
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d7010e3cf8c7c..804b0797c45d2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
-import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync;
import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
@@ -29,7 +28,6 @@
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.getMigratedClusterUrl;
import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.ignoreUnrecoverableBKException;
import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5;
-import static org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
import com.google.common.annotations.VisibleForTesting;
@@ -84,6 +82,9 @@
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.broker.authentication.BinaryAuthContext;
+import org.apache.pulsar.broker.authentication.BinaryAuthSession;
+import org.apache.pulsar.broker.authentication.BinaryAuthSession.AuthResult;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.limiter.ConnectionController;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
@@ -185,6 +186,7 @@
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.utils.TimedSingleThreadRateLimiter;
+import org.jspecify.annotations.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -209,16 +211,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private final BrokerInterceptor brokerInterceptor;
private State state;
private volatile boolean isActive = true;
- private String authRole = null;
- private volatile AuthenticationDataSource authenticationData;
- private AuthenticationProvider authenticationProvider;
- private AuthenticationState authState;
- // In case of proxy, if the authentication credentials are forwardable,
- // it will hold the credentials of the original client
- private AuthenticationState originalAuthState;
- private volatile AuthenticationDataSource originalAuthData;
// Keep temporarily in order to verify after verifying proxy's authData
- private AuthData originalAuthDataCopy;
private boolean pendingAuthChallengeResponse = false;
private ScheduledFuture> authRefreshTask;
private final DefaultAuthenticationRoleLoggingAnonymizer authenticationRoleLoggingAnonymizer;
@@ -234,9 +227,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private String clientSourceAddressAndPort;
private int nonPersistentPendingMessages = 0;
private final int maxNonPersistentPendingMessages;
- private String originalPrincipal = null;
private final boolean schemaValidationEnforced;
- private String authMethod = "none";
private final int maxMessageSize;
private boolean preciseDispatcherFlowControl;
@@ -261,6 +252,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private final int pauseReceivingCooldownMilliSeconds;
private boolean pausedDueToRateLimitation = false;
+ private BinaryAuthSession binaryAuthSession;
+
// Tracks and limits number of bytes pending to be published from a single specific IO thread.
static final class PendingBytesPerThreadTracker {
private static final FastThreadLocal pendingBytesPerThread =
@@ -551,12 +544,12 @@ private CompletableFuture isTopicOperationAllowed(TopicName topicName,
return CompletableFuture.completedFuture(true);
}
CompletableFuture result = service.getAuthorizationService().allowTopicOperationAsync(
- topicName, operation, originalPrincipal, authRole,
+ topicName, operation, getOriginalPrincipal(), getAuthRole(),
originalAuthDataSource != null ? originalAuthDataSource : authDataSource, authDataSource);
result.thenAccept(isAuthorized -> {
if (!isAuthorized) {
log.warn("Role {} or OriginalRole {} is not authorized to perform operation {} on topic {}",
- authRole, originalPrincipal, operation, topicName);
+ getAuthRole(), getOriginalPrincipal(), operation, topicName);
}
});
return result;
@@ -566,12 +559,13 @@ private CompletableFuture isTopicOperationAllowed(TopicName topicName,
TopicOperation operation) {
if (service.isAuthorizationEnabled()) {
AuthenticationDataSource authDataSource =
- new AuthenticationDataSubscription(authenticationData, subscriptionName);
- AuthenticationDataSource originalAuthDataSource = null;
+ new AuthenticationDataSubscription(getAuthenticationData(), subscriptionName);
+ AuthenticationDataSource originalAuthDataWrapper = null;
+ AuthenticationDataSource originalAuthData = getOriginalAuthData();
if (originalAuthData != null) {
- originalAuthDataSource = new AuthenticationDataSubscription(originalAuthData, subscriptionName);
+ originalAuthDataWrapper = new AuthenticationDataSubscription(originalAuthData, subscriptionName);
}
- return isTopicOperationAllowed(topicName, operation, authDataSource, originalAuthDataSource);
+ return isTopicOperationAllowed(topicName, operation, authDataSource, originalAuthDataWrapper);
} else {
return CompletableFuture.completedFuture(true);
}
@@ -613,6 +607,10 @@ protected void handleLookup(CommandLookupTopic lookupParam) {
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
+ String authRole = getAuthRole();
+ String originalPrincipal = getOriginalPrincipal();
+ AuthenticationDataSource authenticationData = getAuthenticationData();
+ AuthenticationDataSource originalAuthData = getOriginalAuthData();
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
@@ -702,6 +700,8 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
+ AuthenticationDataSource authenticationData = getAuthenticationData();
+ AuthenticationDataSource originalAuthData = getOriginalAuthData();
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
@@ -868,6 +868,9 @@ ByteBuf createConsumerStatsResponse(Consumer consumer, long requestId) {
// complete the connect and sent newConnected command
private void completeConnect(int clientProtoVersion, String clientVersion) {
+ String authRole = getAuthRole();
+ String originalPrincipal = getOriginalPrincipal();
+ String authMethod = getAuthMethod();
if (service.isAuthenticationEnabled()) {
if (service.isAuthorizationEnabled()) {
if (!service.getAuthorizationService()
@@ -919,119 +922,6 @@ private void completeConnect(int clientProtoVersion, String clientVersion) {
}
}
- // According to auth result, send Connected, AuthChallenge, or Error command.
- private void doAuthentication(AuthData clientData,
- boolean useOriginalAuthState,
- int clientProtocolVersion,
- final String clientVersion) {
- // The original auth state can only be set on subsequent auth attempts (and only
- // in presence of a proxy and if the proxy is forwarding the credentials).
- // In this case, the re-validation needs to be done against the original client
- // credentials.
- AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState;
- String authRole = useOriginalAuthState ? originalPrincipal : this.authRole;
- if (log.isDebugEnabled()) {
- log.debug("Authenticate using original auth state : {}, role = {}", useOriginalAuthState, authRole);
- }
- authState
- .authenticateAsync(clientData)
- .whenCompleteAsync((authChallenge, throwable) -> {
- if (throwable == null) {
- authChallengeSuccessCallback(authChallenge, useOriginalAuthState, authRole,
- clientProtocolVersion, clientVersion);
- } else {
- authenticationFailed(throwable);
- }
- }, ctx.executor());
- }
-
- public void authChallengeSuccessCallback(AuthData authChallenge,
- boolean useOriginalAuthState,
- String authRole,
- int clientProtocolVersion,
- String clientVersion) {
- try {
- if (authChallenge == null) {
- // Authentication has completed. It was either:
- // 1. the 1st time the authentication process was done, in which case we'll send
- // a `CommandConnected` response
- // 2. an authentication refresh, in which case we need to refresh authenticationData
- AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState;
- String newAuthRole = authState.getAuthRole();
- AuthenticationDataSource newAuthDataSource = authState.getAuthDataSource();
-
- if (state != State.Connected) {
- // Set the auth data and auth role
- if (!useOriginalAuthState) {
- this.authRole = newAuthRole;
- this.authenticationData = newAuthDataSource;
- }
- // First time authentication is done
- if (originalAuthState != null) {
- // We only set originalAuthState when we are going to use it.
- authenticateOriginalData(clientProtocolVersion, clientVersion);
- } else {
- completeConnect(clientProtocolVersion, clientVersion);
- }
- } else {
- // Refresh the auth data
- if (!useOriginalAuthState) {
- this.authenticationData = newAuthDataSource;
- } else {
- this.originalAuthData = newAuthDataSource;
- }
- // If the connection was already ready, it means we're doing a refresh
- if (!StringUtils.isEmpty(authRole)) {
- if (!authRole.equals(newAuthRole)) {
- log.warn("[{}] Principal cannot change during an authentication refresh expected={} got={}",
- remoteAddress, authRole, newAuthRole);
- ctx.close();
- } else {
- log.info("[{}] Refreshed authentication credentials for role {}", remoteAddress, authRole);
- }
- }
- }
- } else {
- // auth not complete, continue auth with client side.
- ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, authChallenge, clientProtocolVersion));
- if (log.isDebugEnabled()) {
- log.debug("[{}] Authentication in progress client by method {}.", remoteAddress, authMethod);
- }
- }
- } catch (Exception | AssertionError e) {
- authenticationFailed(e);
- }
- }
-
- private void authenticateOriginalData(int clientProtoVersion, String clientVersion) {
- originalAuthState
- .authenticateAsync(originalAuthDataCopy)
- .whenCompleteAsync((authChallenge, throwable) -> {
- if (throwable != null) {
- authenticationFailed(throwable);
- } else if (authChallenge != null) {
- // The protocol does not yet handle an auth challenge here.
- // See https://github.com/apache/pulsar/issues/19291.
- authenticationFailed(new AuthenticationException("Failed to authenticate original auth data "
- + "due to unsupported authChallenge."));
- } else {
- try {
- // No need to retain these bytes anymore
- originalAuthDataCopy = null;
- originalAuthData = originalAuthState.getAuthDataSource();
- originalPrincipal = originalAuthState.getAuthRole();
- if (log.isDebugEnabled()) {
- log.debug("[{}] Authenticated original role (forwarded from proxy): {}",
- remoteAddress, originalPrincipal);
- }
- completeConnect(clientProtoVersion, clientVersion);
- } catch (Exception | AssertionError e) {
- authenticationFailed(e);
- }
- }
- }, ctx.executor());
- }
-
// Handle authentication and authentication refresh failures. Must be called from event loop.
private void authenticationFailed(Throwable t) {
String operation;
@@ -1053,7 +943,7 @@ private void authenticationFailed(Throwable t) {
private void maybeScheduleAuthenticationCredentialsRefresh() {
assert ctx.executor().inEventLoop();
assert authRefreshTask == null;
- if (authState == null) {
+ if (getAuthState() == null) {
// Authentication is disabled or there's no local state to refresh
return;
}
@@ -1065,28 +955,15 @@ private void maybeScheduleAuthenticationCredentialsRefresh() {
private void refreshAuthenticationCredentials() {
assert ctx.executor().inEventLoop();
- AuthenticationState authState = this.originalAuthState != null ? originalAuthState : this.authState;
if (getState() == State.Failed) {
// Happens when an exception is thrown that causes this connection to close.
return;
- } else if (!authState.isExpired()) {
+ } else if (!binaryAuthSession.isExpired()) {
// Credentials are still valid. Nothing to do at this point
return;
- } else if (originalPrincipal != null && originalAuthState == null) {
- // This case is only checked when the authState is expired because we've reached a point where
- // authentication needs to be refreshed, but the protocol does not support it unless the proxy forwards
- // the originalAuthData.
- log.info(
- "[{}] Cannot revalidate user credential when using proxy and"
- + " not forwarding the credentials. Closing connection",
- remoteAddress);
- ctx.close();
- return;
}
- if (!supportsAuthenticationRefresh()) {
- log.warn("[{}] Closing connection because client doesn't support auth credentials refresh",
- remoteAddress);
+ if (!binaryAuthSession.supportsAuthenticationRefresh()) {
ctx.close();
return;
}
@@ -1099,11 +976,12 @@ private void refreshAuthenticationCredentials() {
}
log.info("[{}] Refreshing authentication credentials for originalPrincipal {} and authRole {}",
- remoteAddress, originalPrincipal, this.authRole);
+ remoteAddress, getOriginalPrincipal(), getAuthRole());
try {
- AuthData brokerData = authState.refreshAuthentication();
-
- writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData,
+ AuthResult refreshAuthentication = binaryAuthSession.refreshAuthentication();
+ String authMethod = refreshAuthentication.getAuthMethod();
+ writeAndFlush(Commands.newAuthChallenge(authMethod,
+ refreshAuthentication.getAuthData(),
getRemoteEndpointProtocolVersion()));
if (log.isDebugEnabled()) {
log.debug("[{}] Sent auth challenge to client to refresh credentials with method: {}.",
@@ -1118,8 +996,6 @@ private void refreshAuthenticationCredentials() {
}
}
- private static final byte[] emptyArray = new byte[0];
-
@Override
protected void handleConnect(CommandConnect connect) {
checkArgument(state == State.Start);
@@ -1168,31 +1044,6 @@ protected void handleConnect(CommandConnect connect) {
state = State.Connecting;
try {
- byte[] authData = connect.hasAuthData() ? connect.getAuthData() : emptyArray;
- AuthData clientData = AuthData.of(authData);
- // init authentication
- if (connect.hasAuthMethodName()) {
- authMethod = connect.getAuthMethodName();
- } else if (connect.hasAuthMethod()) {
- // Legacy client is passing enum
- authMethod = connect.getAuthMethod().name().substring(10).toLowerCase();
- } else {
- authMethod = "none";
- }
-
- authenticationProvider = getBrokerService()
- .getAuthenticationService()
- .getAuthenticationProvider(authMethod);
-
- // Not find provider named authMethod. Most used for tests.
- // In AuthenticationDisabled, it will set authMethod "none".
- if (authenticationProvider == null) {
- authRole = getBrokerService().getAuthenticationService().getAnonymousUserRole()
- .orElseThrow(() ->
- new AuthenticationException("No anonymous role, and no authentication provider configured"));
- completeConnect(clientProtocolVersion, clientVersion);
- return;
- }
// init authState and other var
ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
SSLSession sslSession = null;
@@ -1200,68 +1051,23 @@ protected void handleConnect(CommandConnect connect) {
sslSession = ((SslHandler) sslHandler).engine().getSession();
}
- authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
-
- if (log.isDebugEnabled()) {
- String role = "";
- if (authState != null && authState.isComplete()) {
- role = authState.getAuthRole();
- } else {
- role = "authentication incomplete or null";
- }
- log.debug("[{}] Authenticate role : {}", remoteAddress, role);
- }
-
- if (connect.hasOriginalPrincipal() && service.getPulsar().getConfig().isAuthenticateOriginalAuthData()
- && !WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE.equals(connect.getOriginalPrincipal())) {
- // Flow:
- // 1. Initialize original authentication.
- // 2. Authenticate the proxy's authentication data.
- // 3. Authenticate the original authentication data.
- String originalAuthMethod;
- if (connect.hasOriginalAuthMethod()) {
- originalAuthMethod = connect.getOriginalAuthMethod();
- } else {
- originalAuthMethod = "none";
- }
-
- AuthenticationProvider originalAuthenticationProvider = getBrokerService()
- .getAuthenticationService()
- .getAuthenticationProvider(originalAuthMethod);
-
- /**
- * When both the broker and the proxy are configured with anonymousUserRole
- * if the client does not configure an authentication method
- * the proxy side will set the value of anonymousUserRole to clientAuthRole when it creates a connection
- * and the value of clientAuthMethod will be none.
- * Similarly, should also set the value of authRole to anonymousUserRole on the broker side.
- */
- if (originalAuthenticationProvider == null) {
- authRole = getBrokerService().getAuthenticationService().getAnonymousUserRole()
- .orElseThrow(() ->
- new AuthenticationException("No anonymous role, and can't find "
- + "AuthenticationProvider for original role using auth method "
- + "[" + originalAuthMethod + "] is not available"));
- originalPrincipal = authRole;
- completeConnect(clientProtocolVersion, clientVersion);
- return;
- }
-
- originalAuthDataCopy = AuthData.of(connect.getOriginalAuthData().getBytes());
- originalAuthState = originalAuthenticationProvider.newAuthState(
- originalAuthDataCopy,
- remoteAddress,
- sslSession);
- } else if (connect.hasOriginalPrincipal()) {
- originalPrincipal = connect.getOriginalPrincipal();
-
- if (log.isDebugEnabled()) {
- log.debug("[{}] Setting original role (forwarded from proxy): {}",
- remoteAddress, originalPrincipal);
- }
- }
-
- doAuthentication(clientData, false, clientProtocolVersion, clientVersion);
+ binaryAuthSession = service.getAuthenticationService().createBinaryAuthSession(BinaryAuthContext.builder()
+ .executor(ctx.executor())
+ .remoteAddress(remoteAddress)
+ .sslSession(sslSession)
+ .authenticationService(service.getAuthenticationService())
+ .commandConnect(connect)
+ .isInitialConnectSupplier(() -> state != State.Connected)
+ .authenticateOriginalAuthData(service.getPulsar().getConfig().isAuthenticateOriginalAuthData())
+ .build());
+ binaryAuthSession.doAuthentication()
+ .whenCompleteAsync((authResult, ex) -> {
+ if (ex != null) {
+ authenticationFailed(ex);
+ } else {
+ handleAuthResult(authResult);
+ }
+ }, ctx.executor());
} catch (Exception e) {
authenticationFailed(e);
}
@@ -1280,14 +1086,44 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
}
try {
- AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
- doAuthentication(clientData, originalAuthState != null, authResponse.getProtocolVersion(),
- authResponse.hasClientVersion() ? authResponse.getClientVersion() : EMPTY);
+ if (binaryAuthSession != null) {
+ AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
+ binaryAuthSession.authChallenge(clientData, binaryAuthSession.getOriginalAuthState() != null,
+ authResponse.getProtocolVersion(),
+ authResponse.hasClientVersion() ? authResponse.getClientVersion() : "")
+ .whenCompleteAsync((authResult, ex) -> {
+ if (ex != null) {
+ authenticationFailed(ex);
+ } else {
+ handleAuthResult(authResult);
+ }
+ }, ctx.executor());
+ } else {
+ authenticationFailed(new AuthenticationException("Authentication session is null or not initialized"));
+ }
} catch (Exception e) {
authenticationFailed(e);
}
}
+ private void handleAuthResult(@NonNull AuthResult authResult) {
+ AuthData authData = authResult.getAuthData();
+ if (authData != null) {
+ writeAndFlush(Commands.newAuthChallenge(
+ authResult.getAuthMethod(),
+ authData,
+ authResult.getClientProtocolVersion()));
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Authentication in progress client by method {}.", remoteAddress,
+ authResult.getAuthMethod());
+ }
+ } else {
+ if (state == State.Connecting) {
+ completeConnect(authResult.getClientProtocolVersion(), authResult.getClientVersion());
+ }
+ }
+ }
+
@Override
protected void handleSubscribe(final CommandSubscribe subscribe) {
checkArgument(state == State.Connected);
@@ -1300,8 +1136,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
if (log.isDebugEnabled()) {
log.debug("[{}] Handle subscribe command: auth role = {}, original auth role = {}",
- remoteAddress, authenticationRoleLoggingAnonymizer.anonymize(authRole),
- authenticationRoleLoggingAnonymizer.anonymize(originalPrincipal));
+ remoteAddress, authenticationRoleLoggingAnonymizer.anonymize(getAuthRole()),
+ authenticationRoleLoggingAnonymizer.anonymize(getOriginalPrincipal()));
}
final String subscriptionName = subscribe.getSubscription();
@@ -1583,7 +1419,7 @@ private SchemaData getSchema(Schema protocolSchema) {
.data(protocolSchema.getSchemaData())
.isDeleted(false)
.timestamp(System.currentTimeMillis())
- .user(Strings.nullToEmpty(originalPrincipal))
+ .user(Strings.nullToEmpty(getOriginalPrincipal()))
.type(Commands.getSchemaType(protocolSchema.getType()))
.props(protocolSchema.getPropertiesList().stream().collect(
Collectors.toMap(
@@ -1621,7 +1457,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
}
CompletableFuture isAuthorizedFuture = isTopicOperationAllowed(
- topicName, TopicOperation.PRODUCE, authenticationData, originalAuthData
+ topicName, TopicOperation.PRODUCE, getAuthenticationData(), getOriginalAuthData()
);
if (!Strings.isNullOrEmpty(initialSubscriptionName)) {
@@ -2509,6 +2345,10 @@ private CompletableFuture isNamespaceOperationAllowed(NamespaceName nam
return CompletableFuture.completedFuture(true);
}
CompletableFuture isProxyAuthorizedFuture;
+ String originalPrincipal = getOriginalPrincipal();
+ AuthenticationDataSource originalAuthData = getOriginalAuthData();
+ String authRole = getAuthRole();
+ AuthenticationDataSource authenticationData = getAuthData();
if (originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationService().allowNamespaceOperationAsync(
namespaceName, operation, originalPrincipal, originalAuthData);
@@ -2905,11 +2745,13 @@ protected void handleEndTxn(CommandEndTxn command) {
private CompletableFuture isSuperUser() {
assert ctx.executor().inEventLoop();
if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) {
+ AuthenticationDataSource originalAuthData = getOriginalAuthData();
+ AuthenticationDataSource authenticationData = getAuthData();
CompletableFuture isAuthRoleAuthorized = service.getAuthorizationService().isSuperUser(
- authRole, authenticationData);
- if (originalPrincipal != null) {
+ getAuthRole(), authenticationData);
+ if (getOriginalPrincipal() != null) {
CompletableFuture isOriginalPrincipalAuthorized = service.getAuthorizationService()
- .isSuperUser(originalPrincipal,
+ .isSuperUser(getOriginalPrincipal(),
originalAuthData != null ? originalAuthData : authenticationData);
return isOriginalPrincipalAuthorized.thenCombine(isAuthRoleAuthorized,
(originalPrincipal, authRole) -> originalPrincipal && authRole);
@@ -3561,7 +3403,7 @@ public BrokerService getBrokerService() {
}
public String getRole() {
- return authRole;
+ return binaryAuthSession != null ? binaryAuthSession.getAuthRole() : null;
}
@Override
@@ -3588,11 +3430,6 @@ public boolean isBatchMessageCompatibleVersion() {
return getRemoteEndpointProtocolVersion() >= ProtocolVersion.v4.getValue();
}
- boolean supportsAuthenticationRefresh() {
- return features != null && features.isSupportsAuthRefresh();
- }
-
-
boolean supportBrokerMetadata() {
return features != null && features.isSupportsBrokerEntryMetadata();
}
@@ -3617,29 +3454,44 @@ public boolean isPreciseDispatcherFlowControl() {
}
public AuthenticationState getAuthState() {
- return authState;
+ return binaryAuthSession != null ? binaryAuthSession.getAuthState() : null;
}
@Override
public AuthenticationDataSource getAuthenticationData() {
- return originalAuthData != null ? originalAuthData : authenticationData;
+ if (binaryAuthSession == null) {
+ return null;
+ }
+ return binaryAuthSession.getAuthenticationData();
}
public String getPrincipal() {
- return originalPrincipal != null ? originalPrincipal : authRole;
+ if (binaryAuthSession == null) {
+ return null;
+ }
+ String originalPrincipal = binaryAuthSession.getOriginalPrincipal();
+ if (originalPrincipal != null) {
+ return originalPrincipal;
+ }
+ return binaryAuthSession.getAuthRole();
}
public AuthenticationProvider getAuthenticationProvider() {
- return authenticationProvider;
+ return binaryAuthSession != null ? binaryAuthSession.getAuthenticationProvider() : null;
}
@Override
public String getAuthRole() {
- return authRole;
+ return binaryAuthSession != null ? binaryAuthSession.getAuthRole() : null;
}
public String getAuthMethod() {
- return authMethod;
+ return binaryAuthSession != null ? binaryAuthSession.getAuthMethod() : null;
+ }
+
+ @VisibleForTesting
+ public BinaryAuthSession getBinaryAuthSession() {
+ return binaryAuthSession;
}
public ConcurrentLongHashMap> getConsumers() {
@@ -3784,31 +3636,26 @@ public boolean hasProducers() {
@VisibleForTesting
protected String getOriginalPrincipal() {
- return originalPrincipal;
+ return binaryAuthSession != null ? binaryAuthSession.getOriginalPrincipal() : null;
}
@VisibleForTesting
protected AuthenticationDataSource getAuthData() {
- return authenticationData;
+ return binaryAuthSession != null ? binaryAuthSession.getAuthenticationData() : null;
}
@VisibleForTesting
protected AuthenticationDataSource getOriginalAuthData() {
- return originalAuthData;
+ return binaryAuthSession != null ? binaryAuthSession.getOriginalAuthData() : null;
}
@VisibleForTesting
protected AuthenticationState getOriginalAuthState() {
- return originalAuthState;
- }
-
- @VisibleForTesting
- protected void setAuthRole(String authRole) {
- this.authRole = authRole;
+ return binaryAuthSession != null ? binaryAuthSession.getOriginalAuthState() : null;
}
@VisibleForTesting
- void setAuthState(AuthenticationState authState) {
- this.authState = authState;
+ void clearBinaryAuthSession() {
+ this.binaryAuthSession = null;
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 2cfbac35bfcfe..5eb0f801a000e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -35,6 +35,7 @@
import static org.mockito.Mockito.matches;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -101,7 +102,8 @@
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
-import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.broker.authentication.BinaryAuthContext;
+import org.apache.pulsar.broker.authentication.BinaryAuthSession;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -452,6 +454,9 @@ public void testConnectCommandWithAuthenticationPositive() throws Exception {
// test server response to CONNECT
ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", null);
+ BinaryAuthSession binaryAuthSession =
+ spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig);
+ when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
channel.writeInbound(clientCommand);
assertTrue(getResponse() instanceof CommandConnected);
@@ -477,6 +482,9 @@ public void testConnectCommandWithoutOriginalAuthInfoWhenAuthenticateOriginalAut
assertEquals(serverCnx.getState(), State.Start);
ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", "");
+ BinaryAuthSession binaryAuthSession =
+ spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig);
+ when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
channel.writeInbound(clientCommand);
Object response1 = getResponse();
@@ -507,6 +515,9 @@ public void testConnectCommandWithPassingOriginalAuthData() throws Exception {
ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null,
null, "client", "pass.client", authMethodName);
+ BinaryAuthSession binaryAuthSession =
+ spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig);
+ when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
channel.writeInbound(clientCommand);
Object response1 = getResponse();
@@ -522,6 +533,24 @@ public void testConnectCommandWithPassingOriginalAuthData() throws Exception {
channel.finish();
}
+ private BinaryAuthSession spyBinaryAuthSession(AuthenticationService authenticationService, ByteBuf connectCommand,
+ ServiceConfiguration serviceConfiguration) {
+ BinaryAuthContext binaryAuthContext = mock(BinaryAuthContext.class);
+ when(binaryAuthContext.getAuthenticationService()).thenReturn(authenticationService);
+ when(binaryAuthContext.isAuthenticateOriginalAuthData()).thenReturn(
+ serviceConfiguration.isAuthenticateOriginalAuthData());
+ when(binaryAuthContext.getExecutor()).thenReturn(serverCnx.ctx().executor());
+ when(binaryAuthContext.getIsInitialConnectSupplier()).thenReturn(() -> serverCnx.getState() != State.Connected);
+ BinaryAuthSession binaryAuthSession = spy(new BinaryAuthSession(binaryAuthContext));
+ ByteBuf copy = connectCommand.copy();
+ BaseCommand cmd = new BaseCommand();
+ int cmdSize = (int) copy.readUnsignedInt();
+ cmd.parseFrom(copy, cmdSize);
+ when(binaryAuthContext.getCommandConnect()).thenReturn(cmd.getConnect());
+
+ return binaryAuthSession;
+ }
+
@Test(timeOut = 30000)
public void testConnectCommandWithPassingOriginalAuthDataAndSetAnonymousUserRole() throws Exception {
AuthenticationService authenticationService = mock(AuthenticationService.class);
@@ -545,6 +574,11 @@ public void testConnectCommandWithPassingOriginalAuthDataAndSetAnonymousUserRole
// the proxy will use anonymousUserRole to delegate the client's role when connecting.
ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null,
null, anonymousUserRole, null, null);
+
+ BinaryAuthSession binaryAuthSession =
+ spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig);
+ when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
+
channel.writeInbound(clientCommand);
Object response1 = getResponse();
@@ -575,6 +609,9 @@ public void testConnectCommandWithPassingOriginalPrincipal() throws Exception {
ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null,
null, "client", "pass.client", authMethodName);
+ BinaryAuthSession binaryAuthSession =
+ spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig);
+ when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
channel.writeInbound(clientCommand);
Object response1 = getResponse();
@@ -605,6 +642,9 @@ public void testConnectWithNonProxyRoleAndProxyVersion() throws Exception {
ByteBuf clientCommand = Commands.newConnect(authMethodName, AuthData.of("pass.pass".getBytes()),
1, null, null, null, null, null, "my-pulsar-proxy", null);
+ BinaryAuthSession binaryAuthSession =
+ spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig);
+ when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
channel.writeInbound(clientCommand);
Object response = getResponse();
assertTrue(response instanceof CommandError);
@@ -631,11 +671,16 @@ public void testAuthChallengePrincipalChangeFails() throws Exception {
serverCnx.cancelKeepAliveTask();
ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", "");
+ BinaryAuthSession binaryAuthSession =
+ spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig);
+ when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
+
channel.writeInbound(clientCommand);
Object responseConnected = getResponse();
assertTrue(responseConnected instanceof CommandConnected);
assertEquals(serverCnx.getState(), State.Connected);
+ assertEquals(serverCnx.getAuthRole(), "pass.client");
assertEquals(serverCnx.getPrincipal(), "pass.client");
assertTrue(serverCnx.isActive());
@@ -687,6 +732,9 @@ public void testAuthChallengeOriginalPrincipalChangeFails() throws Exception {
ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null,
null, "pass.client", "pass.client", authMethodName);
+ BinaryAuthSession binaryAuthSession =
+ spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig);
+ when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
channel.writeInbound(clientCommand);
Object responseConnected = getResponse();
@@ -759,6 +807,9 @@ private void verifyAuthRoleAndOriginalPrincipalBehavior(String authMethodName, S
ByteBuf clientCommand = Commands.newConnect(authMethodName, authData, 1, null,
null, originalPrincipal, null, null);
+ BinaryAuthSession binaryAuthSession =
+ spyBinaryAuthSession(brokerService.getAuthenticationService(), clientCommand.copy(), svcConfig);
+ when(brokerService.getAuthenticationService().createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
channel.writeInbound(clientCommand);
Object response = getResponse();
@@ -832,6 +883,9 @@ public void testAuthResponseWithFailingAuthData() throws Exception {
// Trigger connect command to result in AuthChallenge
ByteBuf clientCommand = Commands.newConnect(authMethodName, "challenge.client", "1");
+ BinaryAuthSession binaryAuthSession =
+ spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig);
+ when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
channel.writeInbound(clientCommand);
Object challenge1 = getResponse();
@@ -926,6 +980,8 @@ public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy() throws E
// Submit a failing originalPrincipal to show that it is not used at all.
ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, "test", "localhost",
"fail.fail", clientRole, authMethodName);
+ BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, connect.copy(), svcConfig);
+ when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
channel.writeInbound(connect);
Object connectResponse = getResponse();
assertTrue(connectResponse instanceof CommandConnected);
@@ -1218,7 +1274,7 @@ private class ClientChannel implements Closeable {
4),
serverCnx);
public ClientChannel() {
- serverCnx.setAuthRole("");
+ serverCnx.clearBinaryAuthSession();
}
public void close(){
if (channel != null && channel.isActive()) {
@@ -1328,6 +1384,10 @@ public void testVerifyOriginalPrincipalWithoutAuthDataForwardedFromProxy() throw
String clientRole = "pass.fail";
ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, "test", "localhost",
clientRole, null, null);
+
+ BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, connect.copy(), svcConfig);
+ when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
+
channel.writeInbound(connect);
Object connectResponse = getResponse();
assertTrue(connectResponse instanceof CommandConnected);
@@ -1442,6 +1502,8 @@ public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker() throws Exc
// to pass authentication and fail authorization
String clientRole = "pass.fail";
ByteBuf connect = Commands.newConnect(authMethodName, clientRole, "test");
+ BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, connect.copy(), svcConfig);
+ when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
channel.writeInbound(connect);
Object connectResponse = getResponse();
@@ -1512,6 +1574,8 @@ public void testRefreshOriginalPrincipalWithAuthDataForwardedFromProxy() throws
String clientRole = "pass.client";
ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, "test", "localhost",
clientRole, clientRole, authMethodName);
+ BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, connect.copy(), svcConfig);
+ when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
channel.writeInbound(connect);
Object connectResponse = getResponse();
assertTrue(connectResponse instanceof CommandConnected);
@@ -1531,7 +1595,7 @@ public void testRefreshOriginalPrincipalWithAuthDataForwardedFromProxy() throws
AuthData.of(newClientRole.getBytes(StandardCharsets.UTF_8)), 0, "test");
channel.writeInbound(refreshAuth);
- assertEquals(serverCnx.getOriginalAuthData().getCommandData(), newClientRole);
+ assertEquals(serverCnx.getOriginalAuthData().getCommandData(), clientRole);
assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), newClientRole);
assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole);
assertEquals(serverCnx.getAuthRole(), proxyRole);
@@ -1618,6 +1682,51 @@ public void testProducerOnNotOwnedTopic() throws Exception {
channel.finish();
}
+ private PulsarAuthorizationProvider injectAuth() throws Exception {
+ svcConfig.setAuthorizationEnabled(true);
+ AuthorizationService authorizationService =
+ spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources());
+ Field providerField = AuthorizationService.class.getDeclaredField("provider");
+ providerField.setAccessible(true);
+ PulsarAuthorizationProvider authorizationProvider =
+ spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
+ pulsar.getPulsarResources());
+ providerField.set(authorizationService, authorizationProvider);
+ doReturn(authorizationService).when(brokerService).getAuthorizationService();
+ svcConfig.setAuthorizationEnabled(true);
+
+ AuthenticationService authenticationService = mock(AuthenticationService.class);
+ // use a dummy authentication provider
+ AuthenticationProvider authenticationProvider = new AuthenticationProvider() {
+ @Override
+ public void initialize(ServiceConfiguration config) throws IOException {
+
+ }
+
+ @Override
+ public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
+ return "role";
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return "dummy";
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+ };
+
+ String authMethodName = authenticationProvider.getAuthMethodName();
+ when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+ when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+ svcConfig.setAuthenticationEnabled(true);
+
+ return authorizationProvider;
+ }
+
@Test(timeOut = 30000)
public void testProducerCommandWithAuthorizationPositive() throws Exception {
AuthorizationService authorizationService = mock(AuthorizationService.class);
@@ -1646,32 +1755,27 @@ public void testProducerCommandWithAuthorizationPositive() throws Exception {
@Test(timeOut = 30000)
public void testNonExistentTopic() throws Exception {
- AuthorizationService authorizationService =
- spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources());
- doReturn(authorizationService).when(brokerService).getAuthorizationService();
- svcConfig.setAuthorizationEnabled(true);
- svcConfig.setAuthorizationEnabled(true);
- Field providerField = AuthorizationService.class.getDeclaredField("provider");
- providerField.setAccessible(true);
- PulsarAuthorizationProvider authorizationProvider =
- spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
- pulsar.getPulsarResources());
- providerField.set(authorizationService, authorizationProvider);
+ resetChannel();
+ PulsarAuthorizationProvider authorizationProvider = injectAuth();
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider)
.isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());
+ // Connect
+ ByteBuf connectCommand = Commands.newConnect("dummy", "", null);
+ BinaryAuthSession binaryAuthSession =
+ spyBinaryAuthSession(brokerService.getAuthenticationService(), connectCommand.copy(), svcConfig);
+ when(brokerService.getAuthenticationService().createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
+ channel.writeInbound(connectCommand);
+ Object response = getResponse();
+ assertTrue(response instanceof CommandConnected);
+
// Test producer creation
- resetChannel();
- setChannelConnected();
ByteBuf newProducerCmd = Commands.newProducer(nonExistentTopicName, 1 /* producer id */, 1 /* request id */,
"prod-name", Collections.emptyMap(), false);
channel.writeInbound(newProducerCmd);
assertTrue(getResponse() instanceof CommandError);
- channel.finish();
// Test consumer creation
- resetChannel();
- setChannelConnected();
ByteBuf newSubscribeCmd = Commands.newSubscribe(nonExistentTopicName, //
successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
"test" /* consumer name */, 0);
@@ -1682,17 +1786,9 @@ public void testNonExistentTopic() throws Exception {
@Test(timeOut = 30000)
public void testClusterAccess() throws Exception {
- svcConfig.setAuthorizationEnabled(true);
- AuthorizationService authorizationService =
- spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources());
- Field providerField = AuthorizationService.class.getDeclaredField("provider");
- providerField.setAccessible(true);
- PulsarAuthorizationProvider authorizationProvider =
- spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
- pulsar.getPulsarResources());
- providerField.set(authorizationService, authorizationProvider);
- doReturn(authorizationService).when(brokerService).getAuthorizationService();
- svcConfig.setAuthorizationEnabled(true);
+ resetChannel();
+
+ PulsarAuthorizationProvider authorizationProvider = injectAuth();
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider)
.isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider)
@@ -1701,15 +1797,20 @@ public void testClusterAccess() throws Exception {
.checkPermission(any(TopicName.class), Mockito.anyString(),
any(AuthAction.class));
- resetChannel();
- setChannelConnected();
+ // Connect
+ ByteBuf connectCommand = Commands.newConnect("dummy", "", null);
+ BinaryAuthSession binaryAuthSession =
+ spyBinaryAuthSession(brokerService.getAuthenticationService(), connectCommand.copy(), svcConfig);
+ when(brokerService.getAuthenticationService().createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
+ channel.writeInbound(connectCommand);
+ Object response = getResponse();
+ assertTrue(response instanceof CommandConnected);
+
ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
"prod-name", Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
assertTrue(getResponse() instanceof CommandProducerSuccess);
- resetChannel();
- setChannelConnected();
clientCommand = Commands.newProducer(topicWithNonLocalCluster, 1 /* producer id */, 1 /* request id */,
"prod-name", Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
@@ -1719,22 +1820,21 @@ public void testClusterAccess() throws Exception {
@Test(timeOut = 30000)
public void testNonExistentTopicSuperUserAccess() throws Exception {
- AuthorizationService authorizationService =
- spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources());
- doReturn(authorizationService).when(brokerService).getAuthorizationService();
- svcConfig.setAuthorizationEnabled(true);
- Field providerField = AuthorizationService.class.getDeclaredField("provider");
- providerField.setAccessible(true);
- PulsarAuthorizationProvider authorizationProvider =
- spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
- pulsar.getPulsarResources());
- providerField.set(authorizationService, authorizationProvider);
+ resetChannel();
+ PulsarAuthorizationProvider authorizationProvider = injectAuth();
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider)
.isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());
+ // Connect
+ ByteBuf connectCommand = Commands.newConnect("dummy", "", null);
+ BinaryAuthSession binaryAuthSession =
+ spyBinaryAuthSession(brokerService.getAuthenticationService(), connectCommand.copy(), svcConfig);
+ when(brokerService.getAuthenticationService().createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
+ channel.writeInbound(connectCommand);
+ Object response = getResponse();
+ assertTrue(response instanceof CommandConnected);
+
// Test producer creation
- resetChannel();
- setChannelConnected();
ByteBuf newProducerCmd = Commands.newProducer(nonExistentTopicName, 1 /* producer id */, 1 /* request id */,
"prod-name", Collections.emptyMap(), false);
channel.writeInbound(newProducerCmd);
@@ -1743,11 +1843,8 @@ public void testNonExistentTopicSuperUserAccess() throws Exception {
PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(nonExistentTopicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);
- channel.finish();
// Test consumer creation
- resetChannel();
- setChannelConnected();
ByteBuf newSubscribeCmd = Commands.newSubscribe(nonExistentTopicName, //
successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
"test" /* consumer name */, 0 /* avoid reseting cursor */);
@@ -2853,7 +2950,6 @@ protected void resetChannel() throws Exception {
channel.close().get();
}
serverCnx = new ServerCnx(pulsar);
- serverCnx.setAuthRole("");
channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(
maxMessageSize,
0,
@@ -3411,7 +3507,7 @@ public boolean isCompletedExceptionally() {
@Test
public void testHandleAuthResponseWithoutClientVersion() throws Exception {
- resetChannel();
+ AuthenticationService authenticationService = mock(AuthenticationService.class);
// use a dummy authentication provider
AuthenticationProvider authenticationProvider = new AuthenticationProvider() {
@Override
@@ -3434,18 +3530,36 @@ public void close() throws IOException {
}
};
+
+ String authMethodName = authenticationProvider.getAuthMethodName();
+ when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
+ when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
+ svcConfig.setAuthenticationEnabled(true);
+
+ resetChannel();
+
+ ByteBuf clientCommand = Commands.newConnect(authenticationProvider.getAuthMethodName(), "", null);
+ BinaryAuthSession binaryAuthSession =
+ spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig);
+ when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
+ channel.writeInbound(clientCommand);
+
+ // verify that authChallenge is called
+ Awaitility.await().untilAsserted(() -> verify(binaryAuthSession, times(1))
+ .authChallenge(any(), anyBoolean(), anyInt(), any()));
+ Object response = getResponse();
+ assertTrue(response instanceof CommandConnected);
+
AuthData clientData = AuthData.of(new byte[0]);
- AuthenticationState authenticationState =
- authenticationProvider.newAuthState(clientData, null, null);
- // inject the AuthenticationState instance so that auth response can be processed
- serverCnx.setAuthState(authenticationState);
// send the auth response with no client version
String clientVersion = null;
ByteBuf authResponse =
- Commands.newAuthResponse("token", clientData, Commands.getCurrentProtocolVersion(), clientVersion);
+ Commands.newAuthResponse(authenticationProvider.getAuthMethodName(), clientData,
+ Commands.getCurrentProtocolVersion(), clientVersion);
channel.writeInbound(authResponse);
- CommandConnected response = (CommandConnected) getResponse();
- assertNotNull(response);
+ // verify that authChallenge is called again
+ Awaitility.await().untilAsserted(() -> verify(binaryAuthSession, times(2))
+ .authChallenge(any(), anyBoolean(), anyInt(), any()));
}
@Test(expectedExceptions = IllegalArgumentException.class)
@@ -3692,6 +3806,10 @@ public void sendAddPartitionToTxnResponseFailedAuth() throws Exception {
ByteBuf connect = Commands.newConnect(authMethodName, "pass.fail", "test", "localhost",
"pass.pass", "pass.pass", authMethodName);
+
+ BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, connect.copy(), svcConfig);
+ when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
+
channel.writeInbound(connect);
Object connectResponse = getResponse();
assertTrue(connectResponse instanceof CommandConnected);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 5f4456d356e58..c4c7ab821d908 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -43,6 +43,7 @@
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -50,6 +51,9 @@
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.broker.authentication.BinaryAuthSession;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -100,9 +104,28 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection)
this.inboundChannel = proxyConnection.ctx().channel();
this.proxyConnection = proxyConnection;
this.inboundChannelRequestsRate = new Rate();
- this.originalPrincipal = proxyConnection.clientAuthRole;
- this.clientAuthData = proxyConnection.clientAuthData;
- this.clientAuthMethod = proxyConnection.clientAuthMethod;
+ BinaryAuthSession binaryAuthSession = proxyConnection.getBinaryAuthSession();
+ if (binaryAuthSession != null) {
+ AuthenticationState originalAuthState = binaryAuthSession.getOriginalAuthState();
+ boolean forwardOriginal =
+ originalAuthState != null && service.getConfiguration().isForwardAuthorizationCredentials();
+ AuthenticationDataSource authDataSource = forwardOriginal ? binaryAuthSession.getOriginalAuthData() :
+ binaryAuthSession.getAuthenticationData();
+ String commandData = authDataSource.getCommandData();
+ if (commandData != null) {
+ clientAuthData = AuthData.of(commandData.getBytes(StandardCharsets.UTF_8));
+ } else {
+ clientAuthData = null;
+ }
+ clientAuthMethod =
+ forwardOriginal ? binaryAuthSession.getOriginalAuthMethod() : binaryAuthSession.getAuthMethod();
+ originalPrincipal =
+ forwardOriginal ? binaryAuthSession.getOriginalPrincipal() : binaryAuthSession.getAuthRole();
+ } else {
+ originalPrincipal = null;
+ clientAuthData = null;
+ clientAuthMethod = null;
+ }
this.tlsEnabledWithBroker = service.getConfiguration().isTlsEnabledWithBroker();
this.tlsHostnameVerificationEnabled = service.getConfiguration().isTlsHostnameVerificationEnabled();
this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 1e8e2fb55e4a4..e2acfa5e2075c 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -21,9 +21,12 @@
import static com.google.common.base.Preconditions.checkArgument;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.BinaryAuthSession;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
@@ -41,33 +44,56 @@
*/
public class ProxyClientCnx extends ClientCnx {
private final boolean forwardClientAuthData;
- private final String clientAuthMethod;
- private final String clientAuthRole;
+ private String clientAuthMethod;
+ private String clientAuthRole;
+
private final ProxyConnection proxyConnection;
+ private final BinaryAuthSession binaryAuthSession;
- public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
- String clientAuthMethod, int protocolVersion,
+ public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, int protocolVersion,
boolean forwardClientAuthData, ProxyConnection proxyConnection) {
super(InstrumentProvider.NOOP, conf, eventLoopGroup, protocolVersion);
- this.clientAuthRole = clientAuthRole;
- this.clientAuthMethod = clientAuthMethod;
this.forwardClientAuthData = forwardClientAuthData;
this.proxyConnection = proxyConnection;
+ this.binaryAuthSession = proxyConnection.getBinaryAuthSession();
}
@Override
protected ByteBuf newConnectCommand() throws Exception {
+ AuthData clientAuthData = null;
+ if (binaryAuthSession != null) {
+ clientAuthRole = binaryAuthSession.getAuthRole();
+ clientAuthMethod = binaryAuthSession.getAuthMethod();
+ if (forwardClientAuthData) {
+ // There is a chance this auth data is expired because the ProxyConnection does not do early token
+ // refresh.
+ // Based on the current design, the best option is to configure the broker to accept slightly stale
+ // authentication data.
+ String commandData = binaryAuthSession.getAuthenticationData().getCommandData();
+ if (commandData != null) {
+ clientAuthData = AuthData.of(commandData.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ // If original principal is null, it means the client connects the broker via proxy, else the client
+ // connects the proxy via proxy.
+ if (binaryAuthSession.getOriginalPrincipal() != null) {
+ clientAuthRole = binaryAuthSession.getOriginalPrincipal();
+ clientAuthMethod = binaryAuthSession.getOriginalAuthMethod();
+ AuthenticationDataSource originalAuthData = binaryAuthSession.getOriginalAuthData();
+ if (forwardClientAuthData && originalAuthData != null) {
+ String commandData = originalAuthData.getCommandData();
+ if (commandData != null) {
+ clientAuthData = AuthData.of(commandData.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ }
+ }
+
if (log.isDebugEnabled()) {
log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {},"
+ " clientAuthData = {}, clientAuthMethod = {}",
- clientAuthRole, proxyConnection.getClientAuthData(), clientAuthMethod);
- }
- AuthData clientAuthData = null;
- if (forwardClientAuthData) {
- // There is a chance this auth data is expired because the ProxyConnection does not do early token refresh.
- // Based on the current design, the best option is to configure the broker to accept slightly stale
- // authentication data.
- clientAuthData = proxyConnection.getClientAuthData();
+ clientAuthRole, clientAuthData, clientAuthMethod);
}
authenticationDataProvider = authentication.getAuthData(remoteHostName);
AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 6db1b302c6613..dde95edb32ddd 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -923,6 +923,12 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private String clusterName;
+ @FieldContext(
+ category = CATEGORY_AUTHENTICATION,
+ doc = "If this flag is set then the broker authenticates the original Auth data"
+ + " else it just accepts the originalPrincipal and authorizes it (if required)")
+ private boolean authenticateOriginalAuthData = false;
+
public String getMetadataStoreUrl() {
if (StringUtils.isNotBlank(metadataStoreUrl)) {
return metadataStoreUrl;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index e479b8ee62268..9dde8edd18047 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -33,6 +33,7 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -49,8 +50,10 @@
import javax.net.ssl.SSLSession;
import lombok.Getter;
import org.apache.pulsar.broker.PulsarServerException;
-import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.broker.authentication.BinaryAuthContext;
+import org.apache.pulsar.broker.authentication.BinaryAuthSession;
+import org.apache.pulsar.broker.authentication.BinaryAuthSession.AuthResult;
import org.apache.pulsar.broker.limiter.ConnectionController;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -108,14 +111,8 @@ public class ProxyConnection extends PulsarHandler {
private Set> pendingBrokerAuthChallenges = null;
private final BrokerProxyValidator brokerProxyValidator;
private final ConnectionController connectionController;
- String clientAuthRole;
- volatile AuthData clientAuthData;
- String clientAuthMethod;
String clientVersion;
- private String authMethod = "none";
- AuthenticationProvider authenticationProvider;
- AuthenticationState authState;
private ClientConfigurationData clientConf;
private boolean hasProxyToBrokerUrl;
private int protocolVersionToAdvertise;
@@ -124,9 +121,10 @@ public class ProxyConnection extends PulsarHandler {
private final DefaultAuthenticationRoleLoggingAnonymizer authenticationRoleLoggingAnonymizer;
protected static final Integer SPLICE_BYTES = 1024 * 1024 * 1024;
- private static final byte[] EMPTY_CREDENTIALS = new byte[0];
boolean isTlsInboundChannel = false;
+ @Getter
+ private BinaryAuthSession binaryAuthSession;
enum State {
Init,
@@ -347,17 +345,41 @@ protected static boolean isTlsChannel(Channel channel) {
private synchronized void completeConnect() throws PulsarClientException {
checkArgument(state == State.Connecting);
+ String clientAuthRole;
+ String clientAuthMethod;
+ String originalAuthRole;
+ String originalAuthMethod;
+ if (binaryAuthSession != null) {
+ clientAuthRole = binaryAuthSession.getAuthRole();
+ clientAuthMethod = binaryAuthSession.getAuthMethod();
+ originalAuthRole = binaryAuthSession.getOriginalPrincipal();
+ originalAuthMethod = binaryAuthSession.getOriginalAuthMethod();
+ } else {
+ clientAuthRole = null;
+ clientAuthMethod = null;
+ originalAuthRole = null;
+ originalAuthMethod = null;
+ }
String maybeAnonymizedClientAuthRole = authenticationRoleLoggingAnonymizer.anonymize(clientAuthRole);
- LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
- remoteAddress, authMethod, maybeAnonymizedClientAuthRole, hasProxyToBrokerUrl);
+ String maybeAnonymizedOriginalAuthRole = authenticationRoleLoggingAnonymizer.anonymize(originalAuthRole);
+ LOG.info("[{}] complete connection, init proxy handler. client authenticated with {} role {}, original "
+ + "authenticated with {} role {}, hasProxyToBrokerUrl: {}",
+ remoteAddress,
+ clientAuthMethod,
+ maybeAnonymizedClientAuthRole,
+ originalAuthMethod,
+ maybeAnonymizedOriginalAuthRole,
+ hasProxyToBrokerUrl);
if (hasProxyToBrokerUrl) {
// Optimize proxy connection to fail-fast if the target broker isn't active
// Pulsar client will retry connecting after a back off timeout
if (service.getConfiguration().isCheckActiveBrokers()
&& !isBrokerActive(proxyToBrokerUrl)) {
state = State.Closing;
- LOG.warn("[{}] Target broker '{}' isn't available. authenticated with {} role {}.",
- remoteAddress, proxyToBrokerUrl, authMethod, maybeAnonymizedClientAuthRole);
+ LOG.warn("[{}] Target broker '{}' isn't available. client authenticated with {} role {}, "
+ + "original authenticated with {} role {}.",
+ remoteAddress, proxyToBrokerUrl, clientAuthMethod, maybeAnonymizedClientAuthRole,
+ originalAuthMethod, maybeAnonymizedOriginalAuthRole);
final ByteBuf msg = Commands.newError(-1,
ServerError.ServiceNotReady, "Target broker isn't available.");
writeAndFlushAndClose(msg);
@@ -373,14 +395,18 @@ private synchronized void completeConnect() throws PulsarClientException {
TargetAddressDeniedException targetAddressDeniedException =
(TargetAddressDeniedException) (throwable instanceof TargetAddressDeniedException
? throwable : throwable.getCause());
-
- LOG.warn("[{}] Target broker '{}' cannot be validated. {}. authenticated with {} role {}.",
+ LOG.warn(
+ "[{}] Target broker '{}' cannot be validated. {}. client authenticated with {} "
+ + "role {}, original authenticated with {} role {}.",
remoteAddress, proxyToBrokerUrl, targetAddressDeniedException.getMessage(),
- authMethod, maybeAnonymizedClientAuthRole);
+ clientAuthMethod, maybeAnonymizedClientAuthRole, originalAuthMethod,
+ maybeAnonymizedOriginalAuthRole);
} else {
- LOG.error("[{}] Error validating target broker '{}'. authenticated with {} role {}.",
- remoteAddress, proxyToBrokerUrl, authMethod, maybeAnonymizedClientAuthRole,
- throwable);
+ LOG.error("[{}] Error validating target broker '{}'. client authenticated with {} role {}, "
+ + "original authenticated with {} role {}.",
+ remoteAddress, proxyToBrokerUrl, clientAuthMethod, maybeAnonymizedClientAuthRole,
+ originalAuthMethod,
+ maybeAnonymizedOriginalAuthRole, throwable);
}
final ByteBuf msg = Commands.newError(-1, ServerError.ServiceNotReady,
"Target broker cannot be validated.");
@@ -392,9 +418,9 @@ private synchronized void completeConnect() throws PulsarClientException {
// and we'll take care of just topics and partitions metadata lookups
Supplier clientCnxSupplier;
if (service.getConfiguration().isAuthenticationEnabled()) {
- clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole,
- clientAuthMethod, protocolVersionToAdvertise,
- service.getConfiguration().isForwardAuthorizationCredentials(), this);
+ clientCnxSupplier =
+ () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise,
+ service.getConfiguration().isForwardAuthorizationCredentials(), this);
} else {
clientCnxSupplier =
() -> new ClientCnx(InstrumentProvider.NOOP, clientConf, service.getWorkerGroup(),
@@ -466,63 +492,18 @@ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnec
}
}
- // According to auth result, send newConnected or newAuthChallenge command.
- private void doAuthentication(AuthData clientData)
- throws Exception {
- authState
- .authenticateAsync(clientData)
- .whenCompleteAsync((authChallenge, throwable) -> {
- if (throwable == null) {
- authChallengeSuccessCallback(authChallenge);
- } else {
- authenticationFailedCallback(throwable);
- }
- }, ctx.executor());
- }
-
protected void authenticationFailedCallback(Throwable t) {
LOG.warn("[{}] Unable to authenticate: ", remoteAddress, t);
final ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate");
writeAndFlushAndClose(msg);
}
- // Always run in this class's event loop.
- protected void authChallengeSuccessCallback(AuthData authChallenge) {
- try {
- // authentication has completed, will send newConnected command.
- if (authChallenge == null) {
- clientAuthRole = authState.getAuthRole();
- if (LOG.isDebugEnabled()) {
- LOG.debug("[{}] Client successfully authenticated with {} role {}",
- remoteAddress, authMethod, authenticationRoleLoggingAnonymizer.anonymize(clientAuthRole));
- }
-
- // First connection
- if (state == State.Connecting) {
- // authentication has completed, will send newConnected command.
- completeConnect();
- }
- return;
- }
-
- // auth not complete, continue auth with client side.
- final ByteBuf msg = Commands.newAuthChallenge(authMethod, authChallenge, protocolVersionToAdvertise);
- writeAndFlush(msg);
- if (LOG.isDebugEnabled()) {
- LOG.debug("[{}] Authentication in progress client by method {}.",
- remoteAddress, authMethod);
- }
- } catch (Exception e) {
- authenticationFailedCallback(e);
- }
- }
-
private void refreshAuthenticationCredentialsAndCloseIfTooExpired() {
assert ctx.executor().inEventLoop();
if (state != State.ProxyLookupRequests) {
// Happens when an exception is thrown that causes this connection to close.
return;
- } else if (!authState.isExpired()) {
+ } else if (!binaryAuthSession.isExpired()) {
// Credentials are still valid. Nothing to do at this point
return;
}
@@ -539,8 +520,7 @@ private void refreshAuthenticationCredentialsAndCloseIfTooExpired() {
private void maybeSendAuthChallenge() {
assert ctx.executor().inEventLoop();
- if (!supportsAuthenticationRefresh()) {
- LOG.warn("[{}] Closing connection because client doesn't support auth credentials refresh", remoteAddress);
+ if (!binaryAuthSession.supportsAuthenticationRefresh()) {
ctx.close();
return;
} else if (authChallengeSentTime != Long.MAX_VALUE) {
@@ -559,11 +539,12 @@ private void maybeSendAuthChallenge() {
LOG.debug("[{}] Refreshing authentication credentials", remoteAddress);
}
try {
- AuthData challenge = authState.refreshAuthentication();
- writeAndFlush(Commands.newAuthChallenge(authMethod, challenge, protocolVersionToAdvertise));
+ AuthResult authResult = binaryAuthSession.refreshAuthentication();
+ writeAndFlush(Commands.newAuthChallenge(authResult.getAuthMethod(), authResult.getAuthData(),
+ protocolVersionToAdvertise));
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Sent auth challenge to client to refresh credentials with method: {}.",
- remoteAddress, authMethod);
+ remoteAddress, authResult.getAuthMethod());
}
authChallengeSentTime = System.nanoTime();
} catch (AuthenticationException e) {
@@ -601,15 +582,6 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
return;
}
- if (connect.hasProxyVersion()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("[{}] Client illegally provided proxyVersion.", remoteAddress);
- }
- state = State.Closing;
- writeAndFlushAndClose(Commands.newError(-1, ServerError.NotAllowedError, "Must not provide proxyVersion"));
- return;
- }
-
try {
// init authn
this.clientConf = createClientConfiguration();
@@ -620,39 +592,6 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
return;
}
- AuthData clientData = AuthData.of(connect.hasAuthData() ? connect.getAuthData() : EMPTY_CREDENTIALS);
- if (connect.hasAuthMethodName()) {
- authMethod = connect.getAuthMethodName();
- } else if (connect.hasAuthMethod()) {
- // Legacy client is passing enum
- authMethod = connect.getAuthMethod().name().substring(10).toLowerCase();
- } else {
- authMethod = "none";
- }
-
- if (service.getConfiguration().isForwardAuthorizationCredentials()) {
- // We store the first clientData here. Before this commit, we stored the last clientData.
- // Since this only works when forwarding single staged authentication, first == last is true.
- // Here is an issue to fix the protocol: https://github.com/apache/pulsar/issues/19291.
- this.clientAuthData = clientData;
- this.clientAuthMethod = authMethod;
- }
-
- authenticationProvider = service
- .getAuthenticationService()
- .getAuthenticationProvider(authMethod);
-
- // Not find provider named authMethod. Most used for tests.
- // In AuthenticationDisabled, it will set authMethod "none".
- if (authenticationProvider == null) {
- clientAuthRole = service.getAuthenticationService().getAnonymousUserRole()
- .orElseThrow(() ->
- new AuthenticationException("No anonymous role, and no authentication provider configured"));
-
- completeConnect();
- return;
- }
-
// init authState and other var
ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
SSLSession sslSession = null;
@@ -660,8 +599,59 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
sslSession = ((SslHandler) sslHandler).engine().getSession();
}
- authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
- doAuthentication(clientData);
+ binaryAuthSession = service.getAuthenticationService().createBinaryAuthSession(BinaryAuthContext.builder()
+ .executor(ctx.executor())
+ .remoteAddress(remoteAddress)
+ .sslSession(sslSession)
+ .authenticationService(service.getAuthenticationService())
+ .commandConnect(connect)
+ .isInitialConnectSupplier(() -> state == State.Connecting)
+ .authenticateOriginalAuthData(service.getConfiguration().isAuthenticateOriginalAuthData())
+ .build());
+ binaryAuthSession.doAuthentication()
+ .whenCompleteAsync((authResult, ex) -> {
+ if (ex != null) {
+ authenticationFailedCallback(ex);
+ } else {
+ handleAuthResult(authResult);
+ }
+ }, ctx.executor());
+ } catch (Exception e) {
+ authenticationFailedCallback(e);
+ }
+ }
+
+ private void handleAuthResult(BinaryAuthSession.AuthResult authResult) {
+ try {
+ AuthData authData = authResult.getAuthData();
+ if (authData != null) {
+ writeAndFlush(Commands.newAuthChallenge(
+ authResult.getAuthMethod(),
+ authData,
+ protocolVersionToAdvertise));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Authentication in progress client by method {}.", remoteAddress,
+ authResult.getAuthMethod());
+ }
+ } else {
+ // First connection
+ if (state == State.Connecting) {
+ // authentication has completed, will send newConnected command.
+ completeConnect();
+ } else {
+ if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+ // We only have pendingBrokerAuthChallenges when forwardAuthorizationCredentials is enabled.
+ if (pendingBrokerAuthChallenges != null && !pendingBrokerAuthChallenges.isEmpty()) {
+ // Send auth data to pending challenges from the broker
+ for (CompletableFuture challenge : pendingBrokerAuthChallenges) {
+ challenge.complete(AuthData.of(getFinalAuthState().getAuthDataSource().getCommandData()
+ .getBytes(StandardCharsets.UTF_8)));
+ }
+ pendingBrokerAuthChallenges.clear();
+ }
+ }
+ }
+ }
} catch (Exception e) {
authenticationFailedCallback(e);
}
@@ -677,6 +667,12 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
remoteAddress, authResponse.getResponse().getAuthMethodName());
}
+ if (binaryAuthSession == null) {
+ authenticationFailedCallback(
+ new AuthenticationException("Authentication session is null or not initialized"));
+ return;
+ }
+
try {
// Reset the auth challenge sent time to indicate we are not waiting on a client response.
authChallengeSentTime = Long.MAX_VALUE;
@@ -685,24 +681,16 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
// Note: this implementation relies on the current weakness that prevents multi-stage authentication
// from working when forwardAuthorizationCredentials is enabled. Here is an issue to fix the protocol:
// https://github.com/apache/pulsar/issues/19291.
- doAuthentication(clientData);
- if (service.getConfiguration().isForwardAuthorizationCredentials()) {
- // Update the clientAuthData to be able to initialize future ProxyClientCnx.
- this.clientAuthData = clientData;
- // We only have pendingBrokerAuthChallenges when forwardAuthorizationCredentials is enabled.
- if (pendingBrokerAuthChallenges != null && !pendingBrokerAuthChallenges.isEmpty()) {
- // Send auth data to pending challenges from the broker
- for (CompletableFuture challenge : pendingBrokerAuthChallenges) {
- challenge.complete(clientData);
- }
- pendingBrokerAuthChallenges.clear();
- }
- }
+ binaryAuthSession.authChallenge(clientData, false, protocolVersionToAdvertise, clientVersion)
+ .whenCompleteAsync((authResult, ex) -> {
+ if (ex != null) {
+ authenticationFailedCallback(ex);
+ } else {
+ handleAuthResult(authResult);
+ }
+ }, ctx.executor());
} catch (Exception e) {
- String errorMsg = "Unable to handleAuthResponse";
- LOG.warn("[{}] {} ", remoteAddress, errorMsg, e);
- final ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, errorMsg);
- writeAndFlushAndClose(msg);
+ authenticationFailedCallback(e);
}
}
@@ -846,12 +834,10 @@ private void writeAndFlushAndClose(ByteBuf cmd) {
NettyChannelUtil.writeAndFlushWithClosePromise(ctx, cmd);
}
- boolean supportsAuthenticationRefresh() {
- return features != null && features.isSupportsAuthRefresh();
- }
-
- AuthData getClientAuthData() {
- return clientAuthData;
+ private AuthenticationState getFinalAuthState() {
+ AuthenticationState originalAuthState = binaryAuthSession.getOriginalAuthState();
+ AuthenticationState authState = binaryAuthSession.getAuthState();
+ return originalAuthState != null ? originalAuthState : authState;
}
/**
@@ -861,9 +847,11 @@ AuthData getClientAuthData() {
CompletableFuture getValidClientAuthData() {
final CompletableFuture clientAuthDataFuture = new CompletableFuture<>();
ctx().executor().execute(Runnables.catchingAndLoggingThrowables(() -> {
+ AuthenticationState finalAuthState = getFinalAuthState();
// authState is not thread safe, so this must run on the ProxyConnection's event loop.
- if (!authState.isExpired()) {
- clientAuthDataFuture.complete(clientAuthData);
+ if (!finalAuthState.isExpired()) {
+ clientAuthDataFuture.complete(AuthData.of(
+ finalAuthState.getAuthDataSource().getCommandData().getBytes(StandardCharsets.UTF_8)));
} else if (state == State.ProxyLookupRequests) {
maybeSendAuthChallenge();
if (pendingBrokerAuthChallenges == null) {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 6887e9ea234c1..938da0514a3c1 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -252,7 +252,7 @@ void testAuthentication() throws Exception {
// Step 4: Ensure that all client contexts share the same auth provider
Assert.assertTrue(proxyService.getClientCnxs().size() >= 3, "expect at least 3 clients");
proxyService.getClientCnxs().stream().forEach((cnx) -> {
- Assert.assertSame(cnx.authenticationProvider,
+ Assert.assertSame(cnx.getBinaryAuthSession().getAuthenticationProvider(),
proxyService.getAuthenticationService().getAuthenticationProvider("BasicAuthentication"));
});
}
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyToProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyToProxyAuthenticationTest.java
new file mode 100644
index 0000000000000..5fc0b502311bd
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyToProxyAuthenticationTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.naming.TopicName;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class ProxyToProxyAuthenticationTest extends ProducerConsumerBase {
+ private static final String CLUSTER_NAME = "test";
+
+ private static final String ADMIN_ROLE = "admin";
+ private static final String PROXY_ROLE = "proxy";
+ private static final String BROKER_ROLE = "broker";
+ private static final String CLIENT_ROLE = "client";
+ private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+ private static final String ADMIN_TOKEN = Jwts.builder().setSubject(ADMIN_ROLE).signWith(SECRET_KEY).compact();
+ private static final String PROXY_TOKEN = Jwts.builder().setSubject(PROXY_ROLE).signWith(SECRET_KEY).compact();
+ private static final String BROKER_TOKEN = Jwts.builder().setSubject(BROKER_ROLE).signWith(SECRET_KEY).compact();
+ private static final String CLIENT_TOKEN = Jwts.builder().setSubject(CLIENT_ROLE).signWith(SECRET_KEY).compact();
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ conf.setAuthenticateOriginalAuthData(false);
+ conf.setAuthenticationEnabled(true);
+ conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+ + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+
+ Set superUserRoles = new HashSet<>();
+ superUserRoles.add(ADMIN_ROLE);
+ superUserRoles.add(PROXY_ROLE);
+ superUserRoles.add(BROKER_ROLE);
+ conf.setSuperUserRoles(superUserRoles);
+ conf.setProxyRoles(Collections.singleton(PROXY_ROLE));
+
+ conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ conf.setBrokerClientAuthenticationParameters(BROKER_TOKEN);
+ Set providers = new HashSet<>();
+ providers.add(AuthenticationProviderToken.class.getName());
+ conf.setAuthenticationProviders(providers);
+
+ conf.setClusterName(CLUSTER_NAME);
+ super.init();
+
+ admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddress()).authentication(
+ AuthenticationFactory.token(ADMIN_TOKEN)).build();
+ producerBaseSetup();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ private ProxyService createProxyService(String serviceUrl) throws Exception {
+ ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ proxyConfig.setForwardAuthorizationCredentials(true);
+ proxyConfig.setAuthenticateOriginalAuthData(true);
+ proxyConfig.setAuthenticationEnabled(true);
+ proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+ proxyConfig.setWebServicePort(Optional.of(0));
+ proxyConfig.setBrokerServiceURL(serviceUrl);
+ proxyConfig.setClusterName(CLUSTER_NAME);
+
+ proxyConfig.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+ + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+
+ proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ proxyConfig.setBrokerClientAuthenticationParameters(PROXY_TOKEN);
+
+ Set providers = new HashSet<>();
+ providers.add(AuthenticationProviderToken.class.getName());
+ proxyConfig.setAuthenticationProviders(providers);
+ AuthenticationService authenticationService = new AuthenticationService(
+ PulsarConfigurationLoader.convertFrom(proxyConfig));
+ Authentication proxyClientAuthentication =
+ AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+ ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication);
+ proxyService.start();
+ return proxyService;
+ }
+
+ @Test
+ public void testClientConnectsThroughTwoAuthenticatedProxiesToBroker() throws Exception {
+ @Cleanup
+ ProxyService az1Proxy = createProxyService(pulsar.getBrokerServiceUrl());
+ String az1ProxyServiceUrl = az1Proxy.getServiceUrl();
+ @Cleanup
+ ProxyService az2Proxy = createProxyService(az1ProxyServiceUrl);
+ @Cleanup
+ PulsarClient pulsarClient =
+ PulsarClient.builder().serviceUrl(az2Proxy.getServiceUrl())
+ .authentication(AuthenticationFactory.token(CLIENT_TOKEN))
+ .build();
+ String topic = TopicName.get("test-topic").toString();
+ String subscription1 = "test-subscription";
+ @Cleanup
+ Consumer consumer1 =
+ pulsarClient.newConsumer().topic(topic).subscriptionName(subscription1).subscribe();
+ String subscription2 = "test2-subscription";
+ @Cleanup
+ Consumer consumer2 =
+ pulsarClient.newConsumer().topic(topic).subscriptionName(subscription2).subscribe();
+
+ CompletableFuture> topicIfExists = pulsar.getBrokerService().getTopicIfExists(topic);
+ assertThat(topicIfExists).succeedsWithin(3, TimeUnit.SECONDS);
+ Topic topicRef = topicIfExists.get().orElseThrow();
+ topicRef.getSubscriptions().forEach((key, value) -> {
+ ServerCnx cnx = (ServerCnx) value.getConsumers().get(0).cnx();
+ assertThat(cnx.getAuthRole()).isEqualTo(PROXY_ROLE);
+ assertThat(cnx.getBinaryAuthSession().getOriginalPrincipal()).isEqualTo(CLIENT_ROLE);
+ });
+
+ consumer1.close();
+ consumer2.close();
+ }
+}