From f2feecf8baac21b5a0f45e7e066922a7dd4feb29 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Mon, 20 Oct 2025 19:03:34 +0800 Subject: [PATCH 01/17] [improve][broker] Extract authentication logic from ServerCnx to dedicated class --- .../authentication/AuthenticationService.java | 4 + .../authentication/BinaryAuthContext.java | 39 +++ .../authentication/BinaryAuthSession.java | 278 ++++++++++++++++++ .../pulsar/broker/service/ServerCnx.java | 272 ++++------------- 4 files changed, 386 insertions(+), 207 deletions(-) create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthContext.java create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java index e2bf4dcc0156d..2e7d3876045d7 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java @@ -244,4 +244,8 @@ public void close() throws IOException { provider.close(); } } + + public static BinaryAuthSession createBinaryAuthSession(BinaryAuthContext ctx) { + return new BinaryAuthSession(ctx); + } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthContext.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthContext.java new file mode 100644 index 0000000000000..4204dccedf82f --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthContext.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.authentication; + +import java.net.SocketAddress; +import java.util.concurrent.Executor; +import java.util.function.Supplier; +import javax.net.ssl.SSLSession; +import lombok.Builder; +import lombok.Getter; +import org.apache.pulsar.common.api.proto.CommandConnect; + +@Getter +@Builder +public class BinaryAuthContext { + private CommandConnect commandConnect; + private SSLSession sslSession; + private AuthenticationService authenticationService; + private Executor executor; + private SocketAddress remoteAddress; + private boolean authenticateOriginalAuthData; + private Supplier isConnectingSupplier; +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java new file mode 100644 index 0000000000000..394e4f28f63b2 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.authentication; + +import static org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE; +import java.util.concurrent.CompletableFuture; +import javax.naming.AuthenticationException; +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.api.AuthData; +import org.jspecify.annotations.NonNull; + +@Slf4j +@Getter +public class BinaryAuthSession { + private static final byte[] emptyArray = new byte[0]; + + private AuthenticationState authState; + private String authMethod; + private String authRole = null; + private volatile AuthenticationDataSource authenticationData; + private AuthenticationProvider authenticationProvider; + + // In case of proxy, if the authentication credentials are forwardable, + // it will hold the credentials of the original client + private String originalAuthMethod; + private String originalPrincipal = null; + private AuthenticationState originalAuthState; + private volatile AuthenticationDataSource originalAuthData; + // Keep temporarily in order to verify after verifying proxy's authData + private AuthData originalAuthDataCopy; + + private final BinaryAuthContext context; + + private AuthResult defaultAuthResult; + + public BinaryAuthSession(@NonNull BinaryAuthContext context) { + this.context = context; + } + + public CompletableFuture doAuthentication() { + var connect = context.getCommandConnect(); + try { + var authData = connect.hasAuthData() ? connect.getAuthData() : emptyArray; + var clientData = AuthData.of(authData); + // init authentication + if (connect.hasAuthMethodName()) { + authMethod = connect.getAuthMethodName(); + } else if (connect.hasAuthMethod()) { + // Legacy client is passing enum + authMethod = connect.getAuthMethod().name().substring(10).toLowerCase(); + } else { + authMethod = "none"; + } + + defaultAuthResult = AuthResult.builder().clientProtocolVersion(connect.getProtocolVersion()) + .clientVersion(connect.hasClientVersion()? connect.getClientVersion() : "") + .build(); + + authenticationProvider = context.getAuthenticationService().getAuthenticationProvider(authMethod); + // Not find provider named authMethod. Most used for tests. + // In AuthenticationDisabled, it will set authMethod "none". + if (authenticationProvider == null) { + authRole = context.getAuthenticationService().getAnonymousUserRole() + .orElseThrow(() -> + new AuthenticationException( + "No anonymous role, and no authentication provider configured")); + return CompletableFuture.completedFuture(defaultAuthResult); + } + + authState = + authenticationProvider.newAuthState(AuthData.of(connect.getAuthData()), context.getRemoteAddress(), + context.getSslSession()); + + if (log.isDebugEnabled()) { + String role = ""; + if (authState != null && authState.isComplete()) { + role = authState.getAuthRole(); + } else { + role = "authentication incomplete or null"; + } + log.debug("[{}] Authenticate role : {}", context.getRemoteAddress(), role); + } + + if (connect.hasOriginalPrincipal() && context.isAuthenticateOriginalAuthData() + && !WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE.equals(connect.getOriginalPrincipal())) { + // Flow: + // 1. Initialize original authentication. + // 2. Authenticate the proxy's authentication data. + // 3. Authenticate the original authentication data. + if (connect.hasOriginalAuthMethod()) { + originalAuthMethod = connect.getOriginalAuthMethod(); + } else { + originalAuthMethod = "none"; + } + + var originalAuthenticationProvider = + context.getAuthenticationService().getAuthenticationProvider(originalAuthMethod); + + /** + * When both the broker and the proxy are configured with anonymousUserRole + * if the client does not configure an authentication method + * the proxy side will set the value of anonymousUserRole to clientAuthRole when it creates a connection + * and the value of clientAuthMethod will be none. + * Similarly, should also set the value of authRole to anonymousUserRole on the broker side. + */ + if (originalAuthenticationProvider == null) { + originalPrincipal = context.getAuthenticationService().getAnonymousUserRole() + .orElseThrow(() -> + new AuthenticationException("No anonymous role, and can't find " + + "AuthenticationProvider for original role using auth method " + + "[" + originalAuthMethod + "] is not available")); + return CompletableFuture.completedFuture(defaultAuthResult); + } + + originalAuthDataCopy = AuthData.of(connect.getOriginalAuthData().getBytes()); + originalAuthState = originalAuthenticationProvider.newAuthState( + originalAuthDataCopy, + context.getRemoteAddress(), + context.getSslSession()); + } else if (connect.hasOriginalPrincipal()) { + originalPrincipal = connect.getOriginalPrincipal(); + + if (log.isDebugEnabled()) { + log.debug("[{}] Setting original role (forwarded from proxy): {}", + context.getSslSession(), originalPrincipal); + } + } + + return authChallenge(clientData, false, connect.getProtocolVersion(), + connect.hasClientVersion() ? connect.getClientVersion() : ""); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + } + + + // According to auth result, send Connected, AuthChallenge, or Error command. + public CompletableFuture authChallenge(AuthData clientData, + boolean useOriginalAuthState, + int clientProtocolVersion, + String clientVersion) { + // The original auth state can only be set on subsequent auth attempts (and only + // in presence of a proxy and if the proxy is forwarding the credentials). + // In this case, the re-validation needs to be done against the original client + // credentials. + AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState; + String authRole = useOriginalAuthState ? originalPrincipal : this.authRole; + if (log.isDebugEnabled()) { + log.debug("Authenticate using original auth state : {}, role = {}", useOriginalAuthState, authRole); + } + return authState + .authenticateAsync(clientData) + .thenComposeAsync((authChallenge) -> authChallengeSuccessCallback(authChallenge, + useOriginalAuthState, authRole, clientProtocolVersion, clientVersion), + context.getExecutor()); + } + + public CompletableFuture authChallengeSuccessCallback(AuthData authChallenge, + boolean useOriginalAuthState, + String authRole, + int clientProtocolVersion, + String clientVersion) { + try { + if (authChallenge == null) { + // Authentication has completed. It was either: + // 1. the 1st time the authentication process was done, in which case we'll send + // a `CommandConnected` response + // 2. an authentication refresh, in which case we need to refresh authenticationData + AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState; + String newAuthRole = authState.getAuthRole(); + AuthenticationDataSource newAuthDataSource = authState.getAuthDataSource(); + + if (context.getIsConnectingSupplier().get()) { + // Set the auth data and auth role + if (!useOriginalAuthState) { + this.authRole = newAuthRole; + this.authenticationData = newAuthDataSource; + } + // First time authentication is done + if (originalAuthState != null) { + // We only set originalAuthState when we are going to use it. + return authenticateOriginalData().thenApply( + __ -> defaultAuthResult); + } else { + return CompletableFuture.completedFuture(defaultAuthResult); + } + } else { + // Refresh the auth data + if (!useOriginalAuthState) { + this.authenticationData = newAuthDataSource; + } else { + this.originalAuthData = newAuthDataSource; + } + // If the connection was already ready, it means we're doing a refresh + if (!StringUtils.isEmpty(authRole)) { + if (!authRole.equals(newAuthRole)) { + log.warn("[{}] Principal cannot change during an authentication refresh expected={} got={}", + context.getRemoteAddress(), authRole, newAuthRole); + return CompletableFuture.failedFuture( + new AuthenticationException("Auth role not match previous")); + } else { + log.info("[{}] Refreshed authentication credentials for role {}", + context.getRemoteAddress(), authRole); + } + } + } + } else { + // auth not complete, continue auth with client side. + return CompletableFuture.completedFuture(AuthResult.builder() + .clientProtocolVersion(clientProtocolVersion) + .clientVersion(clientVersion) + .authMethod(authMethod) + .authData(authChallenge) + .build()); + } + } catch (Exception | AssertionError e) { + return CompletableFuture.failedFuture(e); + } + + return CompletableFuture.completedFuture(defaultAuthResult); + } + + private CompletableFuture authenticateOriginalData() { + return originalAuthState + .authenticateAsync(originalAuthDataCopy) + .thenComposeAsync((authChallenge) -> { + if (authChallenge != null) { + // The protocol does not yet handle an auth challenge here. + // See https://github.com/apache/pulsar/issues/19291. + return CompletableFuture.failedFuture( + new AuthenticationException("Failed to authenticate original auth data " + + "due to unsupported authChallenge.")); + } else { + try { + // No need to retain these bytes anymore + originalAuthDataCopy = null; + originalAuthData = originalAuthState.getAuthDataSource(); + originalPrincipal = originalAuthState.getAuthRole(); + if (log.isDebugEnabled()) { + log.debug("[{}] Authenticated original role (forwarded from proxy): {}", + context.getRemoteAddress(), originalPrincipal); + } + return CompletableFuture.completedFuture(null); + } catch (Exception | AssertionError e) { + return CompletableFuture.failedFuture(e); + } + } + }, context.getExecutor()); + } + + @Builder + @Getter + public static class AuthResult { + int clientProtocolVersion; + String clientVersion; + AuthData authData; + String authMethod; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d7010e3cf8c7c..f83c2189f0fdc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; import static javax.ws.rs.core.Response.Status.NOT_FOUND; -import static org.apache.commons.lang3.StringUtils.EMPTY; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync; import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync; @@ -29,7 +28,6 @@ import static org.apache.pulsar.broker.service.persistent.PersistentTopic.getMigratedClusterUrl; import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.ignoreUnrecoverableBKException; import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5; -import static org.apache.pulsar.common.naming.Constants.WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse; import com.google.common.annotations.VisibleForTesting; @@ -83,7 +81,11 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.broker.authentication.AuthenticationProvider; +import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authentication.AuthenticationState; +import org.apache.pulsar.broker.authentication.BinaryAuthContext; +import org.apache.pulsar.broker.authentication.BinaryAuthSession; +import org.apache.pulsar.broker.authentication.BinaryAuthSession.AuthResult; import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.limiter.ConnectionController; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; @@ -185,6 +187,7 @@ import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.apache.pulsar.utils.TimedSingleThreadRateLimiter; +import org.jspecify.annotations.NonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -261,6 +264,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private final int pauseReceivingCooldownMilliSeconds; private boolean pausedDueToRateLimitation = false; + private BinaryAuthSession binaryAuthSession; + // Tracks and limits number of bytes pending to be published from a single specific IO thread. static final class PendingBytesPerThreadTracker { private static final FastThreadLocal pendingBytesPerThread = @@ -919,119 +924,6 @@ private void completeConnect(int clientProtoVersion, String clientVersion) { } } - // According to auth result, send Connected, AuthChallenge, or Error command. - private void doAuthentication(AuthData clientData, - boolean useOriginalAuthState, - int clientProtocolVersion, - final String clientVersion) { - // The original auth state can only be set on subsequent auth attempts (and only - // in presence of a proxy and if the proxy is forwarding the credentials). - // In this case, the re-validation needs to be done against the original client - // credentials. - AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState; - String authRole = useOriginalAuthState ? originalPrincipal : this.authRole; - if (log.isDebugEnabled()) { - log.debug("Authenticate using original auth state : {}, role = {}", useOriginalAuthState, authRole); - } - authState - .authenticateAsync(clientData) - .whenCompleteAsync((authChallenge, throwable) -> { - if (throwable == null) { - authChallengeSuccessCallback(authChallenge, useOriginalAuthState, authRole, - clientProtocolVersion, clientVersion); - } else { - authenticationFailed(throwable); - } - }, ctx.executor()); - } - - public void authChallengeSuccessCallback(AuthData authChallenge, - boolean useOriginalAuthState, - String authRole, - int clientProtocolVersion, - String clientVersion) { - try { - if (authChallenge == null) { - // Authentication has completed. It was either: - // 1. the 1st time the authentication process was done, in which case we'll send - // a `CommandConnected` response - // 2. an authentication refresh, in which case we need to refresh authenticationData - AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState; - String newAuthRole = authState.getAuthRole(); - AuthenticationDataSource newAuthDataSource = authState.getAuthDataSource(); - - if (state != State.Connected) { - // Set the auth data and auth role - if (!useOriginalAuthState) { - this.authRole = newAuthRole; - this.authenticationData = newAuthDataSource; - } - // First time authentication is done - if (originalAuthState != null) { - // We only set originalAuthState when we are going to use it. - authenticateOriginalData(clientProtocolVersion, clientVersion); - } else { - completeConnect(clientProtocolVersion, clientVersion); - } - } else { - // Refresh the auth data - if (!useOriginalAuthState) { - this.authenticationData = newAuthDataSource; - } else { - this.originalAuthData = newAuthDataSource; - } - // If the connection was already ready, it means we're doing a refresh - if (!StringUtils.isEmpty(authRole)) { - if (!authRole.equals(newAuthRole)) { - log.warn("[{}] Principal cannot change during an authentication refresh expected={} got={}", - remoteAddress, authRole, newAuthRole); - ctx.close(); - } else { - log.info("[{}] Refreshed authentication credentials for role {}", remoteAddress, authRole); - } - } - } - } else { - // auth not complete, continue auth with client side. - ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, authChallenge, clientProtocolVersion)); - if (log.isDebugEnabled()) { - log.debug("[{}] Authentication in progress client by method {}.", remoteAddress, authMethod); - } - } - } catch (Exception | AssertionError e) { - authenticationFailed(e); - } - } - - private void authenticateOriginalData(int clientProtoVersion, String clientVersion) { - originalAuthState - .authenticateAsync(originalAuthDataCopy) - .whenCompleteAsync((authChallenge, throwable) -> { - if (throwable != null) { - authenticationFailed(throwable); - } else if (authChallenge != null) { - // The protocol does not yet handle an auth challenge here. - // See https://github.com/apache/pulsar/issues/19291. - authenticationFailed(new AuthenticationException("Failed to authenticate original auth data " - + "due to unsupported authChallenge.")); - } else { - try { - // No need to retain these bytes anymore - originalAuthDataCopy = null; - originalAuthData = originalAuthState.getAuthDataSource(); - originalPrincipal = originalAuthState.getAuthRole(); - if (log.isDebugEnabled()) { - log.debug("[{}] Authenticated original role (forwarded from proxy): {}", - remoteAddress, originalPrincipal); - } - completeConnect(clientProtoVersion, clientVersion); - } catch (Exception | AssertionError e) { - authenticationFailed(e); - } - } - }, ctx.executor()); - } - // Handle authentication and authentication refresh failures. Must be called from event loop. private void authenticationFailed(Throwable t) { String operation; @@ -1118,8 +1010,6 @@ private void refreshAuthenticationCredentials() { } } - private static final byte[] emptyArray = new byte[0]; - @Override protected void handleConnect(CommandConnect connect) { checkArgument(state == State.Start); @@ -1168,31 +1058,6 @@ protected void handleConnect(CommandConnect connect) { state = State.Connecting; try { - byte[] authData = connect.hasAuthData() ? connect.getAuthData() : emptyArray; - AuthData clientData = AuthData.of(authData); - // init authentication - if (connect.hasAuthMethodName()) { - authMethod = connect.getAuthMethodName(); - } else if (connect.hasAuthMethod()) { - // Legacy client is passing enum - authMethod = connect.getAuthMethod().name().substring(10).toLowerCase(); - } else { - authMethod = "none"; - } - - authenticationProvider = getBrokerService() - .getAuthenticationService() - .getAuthenticationProvider(authMethod); - - // Not find provider named authMethod. Most used for tests. - // In AuthenticationDisabled, it will set authMethod "none". - if (authenticationProvider == null) { - authRole = getBrokerService().getAuthenticationService().getAnonymousUserRole() - .orElseThrow(() -> - new AuthenticationException("No anonymous role, and no authentication provider configured")); - completeConnect(clientProtocolVersion, clientVersion); - return; - } // init authState and other var ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER); SSLSession sslSession = null; @@ -1200,68 +1065,23 @@ protected void handleConnect(CommandConnect connect) { sslSession = ((SslHandler) sslHandler).engine().getSession(); } - authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession); - - if (log.isDebugEnabled()) { - String role = ""; - if (authState != null && authState.isComplete()) { - role = authState.getAuthRole(); - } else { - role = "authentication incomplete or null"; - } - log.debug("[{}] Authenticate role : {}", remoteAddress, role); - } - - if (connect.hasOriginalPrincipal() && service.getPulsar().getConfig().isAuthenticateOriginalAuthData() - && !WEBSOCKET_DUMMY_ORIGINAL_PRINCIPLE.equals(connect.getOriginalPrincipal())) { - // Flow: - // 1. Initialize original authentication. - // 2. Authenticate the proxy's authentication data. - // 3. Authenticate the original authentication data. - String originalAuthMethod; - if (connect.hasOriginalAuthMethod()) { - originalAuthMethod = connect.getOriginalAuthMethod(); - } else { - originalAuthMethod = "none"; - } - - AuthenticationProvider originalAuthenticationProvider = getBrokerService() - .getAuthenticationService() - .getAuthenticationProvider(originalAuthMethod); - - /** - * When both the broker and the proxy are configured with anonymousUserRole - * if the client does not configure an authentication method - * the proxy side will set the value of anonymousUserRole to clientAuthRole when it creates a connection - * and the value of clientAuthMethod will be none. - * Similarly, should also set the value of authRole to anonymousUserRole on the broker side. - */ - if (originalAuthenticationProvider == null) { - authRole = getBrokerService().getAuthenticationService().getAnonymousUserRole() - .orElseThrow(() -> - new AuthenticationException("No anonymous role, and can't find " - + "AuthenticationProvider for original role using auth method " - + "[" + originalAuthMethod + "] is not available")); - originalPrincipal = authRole; - completeConnect(clientProtocolVersion, clientVersion); - return; - } - - originalAuthDataCopy = AuthData.of(connect.getOriginalAuthData().getBytes()); - originalAuthState = originalAuthenticationProvider.newAuthState( - originalAuthDataCopy, - remoteAddress, - sslSession); - } else if (connect.hasOriginalPrincipal()) { - originalPrincipal = connect.getOriginalPrincipal(); - - if (log.isDebugEnabled()) { - log.debug("[{}] Setting original role (forwarded from proxy): {}", - remoteAddress, originalPrincipal); - } - } - - doAuthentication(clientData, false, clientProtocolVersion, clientVersion); + binaryAuthSession = AuthenticationService.createBinaryAuthSession(BinaryAuthContext.builder() + .executor(ctx.executor()) + .remoteAddress(remoteAddress) + .sslSession(sslSession) + .authenticationService(service.getAuthenticationService()) + .commandConnect(connect) + .isConnectingSupplier(() -> state != State.Connected) + .authenticateOriginalAuthData(service.getPulsar().getConfig().isAuthenticateOriginalAuthData()) + .build()); + binaryAuthSession.doAuthentication() + .whenCompleteAsync((authResult, ex) -> { + if (ex != null) { + authenticationFailed(ex); + } else { + handleAuthResult(authResult); + } + }, ctx.executor()); } catch (Exception e) { authenticationFailed(e); } @@ -1280,14 +1100,52 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) { } try { - AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData()); - doAuthentication(clientData, originalAuthState != null, authResponse.getProtocolVersion(), - authResponse.hasClientVersion() ? authResponse.getClientVersion() : EMPTY); + if (binaryAuthSession != null) { + AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData()); + binaryAuthSession.authChallenge(clientData, binaryAuthSession.getOriginalAuthState() != null, + authResponse.getProtocolVersion(), + authResponse.hasClientVersion() ? authResponse.getClientVersion() : "") + .whenCompleteAsync((authResult, ex) -> { + if (ex != null) { + authenticationFailed(ex); + } else { + handleAuthResult(authResult); + } + }, ctx.executor()); + } else { + authenticationFailed(new AuthenticationException("authentication session is null")); + } } catch (Exception e) { authenticationFailed(e); } } + private void handleAuthResult(@NonNull AuthResult authResult) { + AuthData authData = authResult.getAuthData(); + if (authData != null) { + writeAndFlush(Commands.newAuthChallenge( + authResult.getAuthMethod(), + authData, + authResult.getClientProtocolVersion())); + if (log.isDebugEnabled()) { + log.debug("[{}] Authentication in progress client by method {}.", remoteAddress, authMethod); + } + } else { + authMethod = binaryAuthSession.getAuthMethod(); + authenticationData = binaryAuthSession.getAuthenticationData(); + authState = binaryAuthSession.getAuthState(); + authRole = binaryAuthSession.getAuthRole(); + + originalAuthData = binaryAuthSession.getOriginalAuthData(); + originalPrincipal = binaryAuthSession.getOriginalPrincipal(); + originalAuthState = binaryAuthSession.getOriginalAuthState(); + + if (state == State.Connecting) { + completeConnect(authResult.getClientProtocolVersion(), authResult.getClientVersion()); + } + } + } + @Override protected void handleSubscribe(final CommandSubscribe subscribe) { checkArgument(state == State.Connected); From 2e5690466a042c9d14ae589c467fd513eed3f57b Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 21 Oct 2025 10:56:27 +0800 Subject: [PATCH 02/17] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../main/java/org/apache/pulsar/broker/service/ServerCnx.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index f83c2189f0fdc..3763fb38274f9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1128,7 +1128,7 @@ private void handleAuthResult(@NonNull AuthResult authResult) { authData, authResult.getClientProtocolVersion())); if (log.isDebugEnabled()) { - log.debug("[{}] Authentication in progress client by method {}.", remoteAddress, authMethod); + log.debug("[{}] Authentication in progress client by method {}.", remoteAddress, authResult.getAuthMethod()); } } else { authMethod = binaryAuthSession.getAuthMethod(); From e31b37ed6df79b0e30123de340aab8890fe85c29 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 21 Oct 2025 10:57:22 +0800 Subject: [PATCH 03/17] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../apache/pulsar/broker/authentication/BinaryAuthSession.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java index 394e4f28f63b2..de05f5771e0bd 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java @@ -141,7 +141,7 @@ public CompletableFuture doAuthentication() { if (log.isDebugEnabled()) { log.debug("[{}] Setting original role (forwarded from proxy): {}", - context.getSslSession(), originalPrincipal); + context.getRemoteAddress(), originalPrincipal); } } From 9b78540cb279a09a875fe3ae728b0fa14b1f74ed Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 21 Oct 2025 10:59:15 +0800 Subject: [PATCH 04/17] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../authentication/BinaryAuthContext.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthContext.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthContext.java index 4204dccedf82f..0757a3e97d9f4 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthContext.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthContext.java @@ -29,11 +29,42 @@ @Getter @Builder public class BinaryAuthContext { + /** + * The CommandConnect object representing the client's connection request. + */ private CommandConnect commandConnect; + + /** + * The SSLSession associated with the connection, if SSL/TLS is used. + */ private SSLSession sslSession; + + /** + * The AuthenticationService used to perform authentication for this context. + */ private AuthenticationService authenticationService; + + /** + * The Executor to use for asynchronous authentication operations. + * Must be provided if authentication involves async tasks. + */ private Executor executor; + + /** + * The remote address of the client initiating the connection. + */ private SocketAddress remoteAddress; + + /** + * If true, authentication should be performed using the original authentication data + * provided by the client, rather than any intermediate or proxy data. + * Set to true when authenticating the initial client request. + */ private boolean authenticateOriginalAuthData; + + /** + * Supplier indicating whether the connection is currently in the process of connecting. + * Used to determine connection state during authentication. + */ private Supplier isConnectingSupplier; } From 404fa833edf1b25956b08f5fc8b5fac3e1bc652a Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 21 Oct 2025 11:00:37 +0800 Subject: [PATCH 05/17] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../apache/pulsar/broker/authentication/BinaryAuthSession.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java index de05f5771e0bd..6fa3478a5e1a8 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java @@ -87,7 +87,7 @@ public CompletableFuture doAuthentication() { } authState = - authenticationProvider.newAuthState(AuthData.of(connect.getAuthData()), context.getRemoteAddress(), + authenticationProvider.newAuthState(clientData, context.getRemoteAddress(), context.getSslSession()); if (log.isDebugEnabled()) { From 2f9542890ff30cfd62659acd6177b0467316f03a Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 21 Oct 2025 11:07:16 +0800 Subject: [PATCH 06/17] Fix code style --- .../apache/pulsar/broker/authentication/BinaryAuthSession.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java index 6fa3478a5e1a8..e81e8b5ae59bb 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java @@ -72,7 +72,7 @@ public CompletableFuture doAuthentication() { } defaultAuthResult = AuthResult.builder().clientProtocolVersion(connect.getProtocolVersion()) - .clientVersion(connect.hasClientVersion()? connect.getClientVersion() : "") + .clientVersion(connect.hasClientVersion() ? connect.getClientVersion() : "") .build(); authenticationProvider = context.getAuthenticationService().getAuthenticationProvider(authMethod); From 630bf38870b930c0709900d8fc1a7537a4aabba2 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 21 Oct 2025 11:50:47 +0800 Subject: [PATCH 07/17] Fix code style --- .../main/java/org/apache/pulsar/broker/service/ServerCnx.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 3763fb38274f9..3164fd3231410 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1128,7 +1128,8 @@ private void handleAuthResult(@NonNull AuthResult authResult) { authData, authResult.getClientProtocolVersion())); if (log.isDebugEnabled()) { - log.debug("[{}] Authentication in progress client by method {}.", remoteAddress, authResult.getAuthMethod()); + log.debug("[{}] Authentication in progress client by method {}.", remoteAddress, + authResult.getAuthMethod()); } } else { authMethod = binaryAuthSession.getAuthMethod(); From 3001e0c81d146542ed5bfa9c7c80d7a782672ac3 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 21 Oct 2025 21:39:39 +0800 Subject: [PATCH 08/17] Fix test --- .../authentication/AuthenticationService.java | 2 +- .../authentication/BinaryAuthSession.java | 20 ++-- .../pulsar/broker/service/ServerCnx.java | 3 +- .../pulsar/broker/service/ServerCnxTest.java | 96 +++++++++++++++++-- 4 files changed, 98 insertions(+), 23 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java index 2e7d3876045d7..4f2db07b8dbcf 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java @@ -245,7 +245,7 @@ public void close() throws IOException { } } - public static BinaryAuthSession createBinaryAuthSession(BinaryAuthContext ctx) { + public BinaryAuthSession createBinaryAuthSession(BinaryAuthContext ctx) { return new BinaryAuthSession(ctx); } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java index e81e8b5ae59bb..4a766342de828 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java @@ -123,11 +123,12 @@ public CompletableFuture doAuthentication() { * Similarly, should also set the value of authRole to anonymousUserRole on the broker side. */ if (originalAuthenticationProvider == null) { - originalPrincipal = context.getAuthenticationService().getAnonymousUserRole() + authRole = context.getAuthenticationService().getAnonymousUserRole() .orElseThrow(() -> new AuthenticationException("No anonymous role, and can't find " + "AuthenticationProvider for original role using auth method " + "[" + originalAuthMethod + "] is not available")); + originalPrincipal = authRole; return CompletableFuture.completedFuture(defaultAuthResult); } @@ -204,12 +205,6 @@ public CompletableFuture authChallengeSuccessCallback(AuthData authC return CompletableFuture.completedFuture(defaultAuthResult); } } else { - // Refresh the auth data - if (!useOriginalAuthState) { - this.authenticationData = newAuthDataSource; - } else { - this.originalAuthData = newAuthDataSource; - } // If the connection was already ready, it means we're doing a refresh if (!StringUtils.isEmpty(authRole)) { if (!authRole.equals(newAuthRole)) { @@ -217,11 +212,16 @@ public CompletableFuture authChallengeSuccessCallback(AuthData authC context.getRemoteAddress(), authRole, newAuthRole); return CompletableFuture.failedFuture( new AuthenticationException("Auth role not match previous")); - } else { - log.info("[{}] Refreshed authentication credentials for role {}", - context.getRemoteAddress(), authRole); } } + // Refresh authentication data + if (!useOriginalAuthState) { + this.authenticationData = newAuthDataSource; + } else { + this.originalAuthData = newAuthDataSource; + } + log.info("[{}] Refreshed authentication credentials for role {}", + context.getRemoteAddress(), authRole); } } else { // auth not complete, continue auth with client side. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 3164fd3231410..41e2854a7f293 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -81,7 +81,6 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.broker.authentication.AuthenticationProvider; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authentication.AuthenticationState; import org.apache.pulsar.broker.authentication.BinaryAuthContext; import org.apache.pulsar.broker.authentication.BinaryAuthSession; @@ -1065,7 +1064,7 @@ protected void handleConnect(CommandConnect connect) { sslSession = ((SslHandler) sslHandler).engine().getSession(); } - binaryAuthSession = AuthenticationService.createBinaryAuthSession(BinaryAuthContext.builder() + binaryAuthSession = service.getAuthenticationService().createBinaryAuthSession(BinaryAuthContext.builder() .executor(ctx.executor()) .remoteAddress(remoteAddress) .sslSession(sslSession) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 2cfbac35bfcfe..bb6008367db3a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -35,6 +35,7 @@ import static org.mockito.Mockito.matches; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -101,7 +102,8 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationService; -import org.apache.pulsar.broker.authentication.AuthenticationState; +import org.apache.pulsar.broker.authentication.BinaryAuthContext; +import org.apache.pulsar.broker.authentication.BinaryAuthSession; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; import org.apache.pulsar.broker.namespace.NamespaceService; @@ -452,6 +454,8 @@ public void testConnectCommandWithAuthenticationPositive() throws Exception { // test server response to CONNECT ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", null); + BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(clientCommand); assertTrue(getResponse() instanceof CommandConnected); @@ -477,6 +481,8 @@ public void testConnectCommandWithoutOriginalAuthInfoWhenAuthenticateOriginalAut assertEquals(serverCnx.getState(), State.Start); ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", ""); + BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(clientCommand); Object response1 = getResponse(); @@ -507,6 +513,8 @@ public void testConnectCommandWithPassingOriginalAuthData() throws Exception { ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null, null, "client", "pass.client", authMethodName); + BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(clientCommand); Object response1 = getResponse(); @@ -522,6 +530,24 @@ public void testConnectCommandWithPassingOriginalAuthData() throws Exception { channel.finish(); } + private BinaryAuthSession spyBinaryAuthSession(AuthenticationService authenticationService, ByteBuf connectCommand, ServiceConfiguration serviceConfiguration) { + BinaryAuthContext binaryAuthContext = mock(BinaryAuthContext.class); + when(binaryAuthContext.getAuthenticationService()).thenReturn(authenticationService); + when(binaryAuthContext.isAuthenticateOriginalAuthData()).thenReturn( + serviceConfiguration.isAuthenticateOriginalAuthData()); + when(binaryAuthContext.getExecutor()).thenReturn(serverCnx.ctx().executor()); + when(binaryAuthContext.getIsConnectingSupplier()).thenReturn(() -> serverCnx.getState() != State.Connected); + BinaryAuthSession binaryAuthSession = spy(new BinaryAuthSession(binaryAuthContext)); + when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); + ByteBuf copy = connectCommand.copy(); + BaseCommand cmd = new BaseCommand(); + int cmdSize = (int) copy.readUnsignedInt(); + cmd.parseFrom(copy, cmdSize); + when(binaryAuthContext.getCommandConnect()).thenReturn(cmd.getConnect()); + + return binaryAuthSession; + } + @Test(timeOut = 30000) public void testConnectCommandWithPassingOriginalAuthDataAndSetAnonymousUserRole() throws Exception { AuthenticationService authenticationService = mock(AuthenticationService.class); @@ -545,6 +571,10 @@ public void testConnectCommandWithPassingOriginalAuthDataAndSetAnonymousUserRole // the proxy will use anonymousUserRole to delegate the client's role when connecting. ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null, null, anonymousUserRole, null, null); + + BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); + channel.writeInbound(clientCommand); Object response1 = getResponse(); @@ -575,6 +605,8 @@ public void testConnectCommandWithPassingOriginalPrincipal() throws Exception { ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null, null, "client", "pass.client", authMethodName); + BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(clientCommand); Object response1 = getResponse(); @@ -605,6 +637,8 @@ public void testConnectWithNonProxyRoleAndProxyVersion() throws Exception { ByteBuf clientCommand = Commands.newConnect(authMethodName, AuthData.of("pass.pass".getBytes()), 1, null, null, null, null, null, "my-pulsar-proxy", null); + BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(clientCommand); Object response = getResponse(); assertTrue(response instanceof CommandError); @@ -631,6 +665,9 @@ public void testAuthChallengePrincipalChangeFails() throws Exception { serverCnx.cancelKeepAliveTask(); ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", ""); + BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); + channel.writeInbound(clientCommand); Object responseConnected = getResponse(); @@ -687,6 +724,8 @@ public void testAuthChallengeOriginalPrincipalChangeFails() throws Exception { ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null, null, "pass.client", "pass.client", authMethodName); + BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(clientCommand); Object responseConnected = getResponse(); @@ -759,6 +798,9 @@ private void verifyAuthRoleAndOriginalPrincipalBehavior(String authMethodName, S ByteBuf clientCommand = Commands.newConnect(authMethodName, authData, 1, null, null, originalPrincipal, null, null); + BinaryAuthSession binaryAuthSession = + spyBinaryAuthSession(brokerService.getAuthenticationService(), clientCommand.copy(), svcConfig); + when(brokerService.getAuthenticationService().createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(clientCommand); Object response = getResponse(); @@ -832,6 +874,8 @@ public void testAuthResponseWithFailingAuthData() throws Exception { // Trigger connect command to result in AuthChallenge ByteBuf clientCommand = Commands.newConnect(authMethodName, "challenge.client", "1"); + BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(clientCommand); Object challenge1 = getResponse(); @@ -926,6 +970,8 @@ public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy() throws E // Submit a failing originalPrincipal to show that it is not used at all. ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, "test", "localhost", "fail.fail", clientRole, authMethodName); + BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, connect.copy(), svcConfig); + when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(connect); Object connectResponse = getResponse(); assertTrue(connectResponse instanceof CommandConnected); @@ -1328,6 +1374,10 @@ public void testVerifyOriginalPrincipalWithoutAuthDataForwardedFromProxy() throw String clientRole = "pass.fail"; ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, "test", "localhost", clientRole, null, null); + + BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, connect.copy(), svcConfig); + when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); + channel.writeInbound(connect); Object connectResponse = getResponse(); assertTrue(connectResponse instanceof CommandConnected); @@ -1442,6 +1492,8 @@ public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker() throws Exc // to pass authentication and fail authorization String clientRole = "pass.fail"; ByteBuf connect = Commands.newConnect(authMethodName, clientRole, "test"); + BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, connect.copy(), svcConfig); + when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(connect); Object connectResponse = getResponse(); @@ -1512,6 +1564,8 @@ public void testRefreshOriginalPrincipalWithAuthDataForwardedFromProxy() throws String clientRole = "pass.client"; ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, "test", "localhost", clientRole, clientRole, authMethodName); + BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, connect.copy(), svcConfig); + when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(connect); Object connectResponse = getResponse(); assertTrue(connectResponse instanceof CommandConnected); @@ -1531,7 +1585,7 @@ public void testRefreshOriginalPrincipalWithAuthDataForwardedFromProxy() throws AuthData.of(newClientRole.getBytes(StandardCharsets.UTF_8)), 0, "test"); channel.writeInbound(refreshAuth); - assertEquals(serverCnx.getOriginalAuthData().getCommandData(), newClientRole); + assertEquals(serverCnx.getOriginalAuthData().getCommandData(), clientRole); assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), newClientRole); assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole); assertEquals(serverCnx.getAuthRole(), proxyRole); @@ -3411,7 +3465,7 @@ public boolean isCompletedExceptionally() { @Test public void testHandleAuthResponseWithoutClientVersion() throws Exception { - resetChannel(); + AuthenticationService authenticationService = mock(AuthenticationService.class); // use a dummy authentication provider AuthenticationProvider authenticationProvider = new AuthenticationProvider() { @Override @@ -3434,18 +3488,36 @@ public void close() throws IOException { } }; + + String authMethodName = authenticationProvider.getAuthMethodName(); + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + svcConfig.setAuthenticationEnabled(true); + + resetChannel(); + + ByteBuf clientCommand = Commands.newConnect(authenticationProvider.getAuthMethodName(), "", null); + BinaryAuthSession binaryAuthSession = + spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); + channel.writeInbound(clientCommand); + + // verify that authChallenge is called + Awaitility.await().untilAsserted(() -> verify(binaryAuthSession, times(1)) + .authChallenge(any(), anyBoolean(), anyInt(), any())); + Object response = getResponse(); + assertTrue(response instanceof CommandConnected); + AuthData clientData = AuthData.of(new byte[0]); - AuthenticationState authenticationState = - authenticationProvider.newAuthState(clientData, null, null); - // inject the AuthenticationState instance so that auth response can be processed - serverCnx.setAuthState(authenticationState); // send the auth response with no client version String clientVersion = null; ByteBuf authResponse = - Commands.newAuthResponse("token", clientData, Commands.getCurrentProtocolVersion(), clientVersion); + Commands.newAuthResponse(authenticationProvider.getAuthMethodName(), clientData, + Commands.getCurrentProtocolVersion(), clientVersion); channel.writeInbound(authResponse); - CommandConnected response = (CommandConnected) getResponse(); - assertNotNull(response); + // verify that authChallenge is called again + Awaitility.await().untilAsserted(() -> verify(binaryAuthSession, times(2)) + .authChallenge(any(), anyBoolean(), anyInt(), any())); } @Test(expectedExceptions = IllegalArgumentException.class) @@ -3692,6 +3764,10 @@ public void sendAddPartitionToTxnResponseFailedAuth() throws Exception { ByteBuf connect = Commands.newConnect(authMethodName, "pass.fail", "test", "localhost", "pass.pass", "pass.pass", authMethodName); + + BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, connect.copy(), svcConfig); + when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); + channel.writeInbound(connect); Object connectResponse = getResponse(); assertTrue(connectResponse instanceof CommandConnected); From 352a85321a449c2d02858a54a656c091300a8136 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 21 Oct 2025 22:36:25 +0800 Subject: [PATCH 09/17] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../apache/pulsar/broker/authentication/BinaryAuthSession.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java index 4a766342de828..afd822194ddb5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java @@ -211,7 +211,7 @@ public CompletableFuture authChallengeSuccessCallback(AuthData authC log.warn("[{}] Principal cannot change during an authentication refresh expected={} got={}", context.getRemoteAddress(), authRole, newAuthRole); return CompletableFuture.failedFuture( - new AuthenticationException("Auth role not match previous")); + new AuthenticationException("Auth role does not match previous role")); } } // Refresh authentication data From b3c9055441f76563f44c13ebf4a6fd2795651b7d Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 21 Oct 2025 22:36:51 +0800 Subject: [PATCH 10/17] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../main/java/org/apache/pulsar/broker/service/ServerCnx.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 41e2854a7f293..abfe5a615b227 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1112,7 +1112,7 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) { } }, ctx.executor()); } else { - authenticationFailed(new AuthenticationException("authentication session is null")); + authenticationFailed(new AuthenticationException("Authentication session is null or not initialized")); } } catch (Exception e) { authenticationFailed(e); From a4b46446a947c24a44430d70354314a5c7cd2aa4 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 21 Oct 2025 22:37:07 +0800 Subject: [PATCH 11/17] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../java/org/apache/pulsar/broker/service/ServerCnxTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index bb6008367db3a..21014939c0bed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -538,7 +538,6 @@ private BinaryAuthSession spyBinaryAuthSession(AuthenticationService authenticat when(binaryAuthContext.getExecutor()).thenReturn(serverCnx.ctx().executor()); when(binaryAuthContext.getIsConnectingSupplier()).thenReturn(() -> serverCnx.getState() != State.Connected); BinaryAuthSession binaryAuthSession = spy(new BinaryAuthSession(binaryAuthContext)); - when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); ByteBuf copy = connectCommand.copy(); BaseCommand cmd = new BaseCommand(); int cmdSize = (int) copy.readUnsignedInt(); From b2101daca281ad370957677ad1c0a6f11e262fb8 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Wed, 22 Oct 2025 14:20:43 +0800 Subject: [PATCH 12/17] Fix code style --- .../pulsar/broker/service/ServerCnxTest.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 21014939c0bed..5151222b800a6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -454,7 +454,8 @@ public void testConnectCommandWithAuthenticationPositive() throws Exception { // test server response to CONNECT ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", null); - BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + BinaryAuthSession binaryAuthSession = + spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(clientCommand); @@ -481,7 +482,8 @@ public void testConnectCommandWithoutOriginalAuthInfoWhenAuthenticateOriginalAut assertEquals(serverCnx.getState(), State.Start); ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", ""); - BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + BinaryAuthSession binaryAuthSession = + spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(clientCommand); @@ -530,7 +532,8 @@ public void testConnectCommandWithPassingOriginalAuthData() throws Exception { channel.finish(); } - private BinaryAuthSession spyBinaryAuthSession(AuthenticationService authenticationService, ByteBuf connectCommand, ServiceConfiguration serviceConfiguration) { + private BinaryAuthSession spyBinaryAuthSession(AuthenticationService authenticationService, ByteBuf connectCommand, + ServiceConfiguration serviceConfiguration) { BinaryAuthContext binaryAuthContext = mock(BinaryAuthContext.class); when(binaryAuthContext.getAuthenticationService()).thenReturn(authenticationService); when(binaryAuthContext.isAuthenticateOriginalAuthData()).thenReturn( @@ -571,7 +574,8 @@ public void testConnectCommandWithPassingOriginalAuthDataAndSetAnonymousUserRole ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null, null, anonymousUserRole, null, null); - BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + BinaryAuthSession binaryAuthSession = + spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(clientCommand); @@ -604,7 +608,8 @@ public void testConnectCommandWithPassingOriginalPrincipal() throws Exception { ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null, null, "client", "pass.client", authMethodName); - BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + BinaryAuthSession binaryAuthSession = + spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(clientCommand); @@ -873,7 +878,8 @@ public void testAuthResponseWithFailingAuthData() throws Exception { // Trigger connect command to result in AuthChallenge ByteBuf clientCommand = Commands.newConnect(authMethodName, "challenge.client", "1"); - BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + BinaryAuthSession binaryAuthSession = + spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(clientCommand); From cc030d675ccb89740706392de2107082582d9498 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Wed, 22 Oct 2025 17:05:26 +0800 Subject: [PATCH 13/17] Fix code style --- .../apache/pulsar/broker/service/ServerCnxTest.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 5151222b800a6..1b1d3a8a64fb9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -515,7 +515,8 @@ public void testConnectCommandWithPassingOriginalAuthData() throws Exception { ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null, null, "client", "pass.client", authMethodName); - BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + BinaryAuthSession binaryAuthSession = + spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(clientCommand); @@ -641,7 +642,8 @@ public void testConnectWithNonProxyRoleAndProxyVersion() throws Exception { ByteBuf clientCommand = Commands.newConnect(authMethodName, AuthData.of("pass.pass".getBytes()), 1, null, null, null, null, null, "my-pulsar-proxy", null); - BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + BinaryAuthSession binaryAuthSession = + spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(clientCommand); Object response = getResponse(); @@ -669,7 +671,8 @@ public void testAuthChallengePrincipalChangeFails() throws Exception { serverCnx.cancelKeepAliveTask(); ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", ""); - BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + BinaryAuthSession binaryAuthSession = + spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(clientCommand); @@ -728,7 +731,8 @@ public void testAuthChallengeOriginalPrincipalChangeFails() throws Exception { ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null, null, "pass.client", "pass.client", authMethodName); - BinaryAuthSession binaryAuthSession = spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); + BinaryAuthSession binaryAuthSession = + spyBinaryAuthSession(authenticationService, clientCommand.copy(), svcConfig); when(authenticationService.createBinaryAuthSession(any())).thenReturn(binaryAuthSession); channel.writeInbound(clientCommand); From 4a2ce3bc07238992545ae3da343f1083c5630980 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Mon, 27 Oct 2025 20:52:15 +0800 Subject: [PATCH 14/17] [improve][proxy] Refactor broker and proxy auth --- .../authentication/BinaryAuthSession.java | 44 +++ .../pulsar/broker/service/ServerCnx.java | 141 +++++---- .../pulsar/broker/service/ServerCnxTest.java | 129 +++++---- .../proxy/server/DirectProxyHandler.java | 24 +- .../pulsar/proxy/server/ProxyClientCnx.java | 50 +++- .../proxy/server/ProxyConfiguration.java | 6 + .../pulsar/proxy/server/ProxyConnection.java | 267 +++++++++--------- .../proxy/server/ProxyAuthenticationTest.java | 2 +- .../ProxyToProxyAuthenticationTest.java | 167 +++++++++++ 9 files changed, 548 insertions(+), 282 deletions(-) create mode 100644 pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyToProxyAuthenticationTest.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java index afd822194ddb5..a6ac908d9eac4 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java @@ -51,6 +51,7 @@ public class BinaryAuthSession { private final BinaryAuthContext context; private AuthResult defaultAuthResult; + private boolean supportsAuthRefresh; public BinaryAuthSession(@NonNull BinaryAuthContext context) { this.context = context; @@ -59,6 +60,8 @@ public BinaryAuthSession(@NonNull BinaryAuthContext context) { public CompletableFuture doAuthentication() { var connect = context.getCommandConnect(); try { + supportsAuthRefresh = connect.getFeatureFlags().hasSupportsAuthRefresh() && connect.getFeatureFlags() + .isSupportsAuthRefresh(); var authData = connect.hasAuthData() ? connect.getAuthData() : emptyArray; var clientData = AuthData.of(authData); // init authentication @@ -267,6 +270,47 @@ private CompletableFuture authenticateOriginalData() { }, context.getExecutor()); } + public boolean isExpired() { + if (originalAuthState != null) { + return originalAuthState.isExpired(); + } + return authState.isExpired(); + } + + public boolean supportsAuthenticationRefresh(){ + if (originalPrincipal != null && originalAuthState == null) { + // This case is only checked when the authState is expired because we've reached a point where + // authentication needs to be refreshed, but the protocol does not support it unless the proxy forwards + // the originalAuthData. + log.info( + "[{}] Cannot revalidate user credential when using proxy and" + + " not forwarding the credentials.", + context.getRemoteAddress()); + return false; + } + + if (!supportsAuthRefresh) { + log.warn("[{}] Client doesn't support auth credentials refresh", + context.getRemoteAddress()); + return false; + } + + return true; + } + + public AuthResult refreshAuthentication() throws AuthenticationException { + if (originalAuthState != null) { + return AuthResult.builder() + .authData(originalAuthState.refreshAuthentication()) + .authMethod(originalAuthMethod) + .build(); + } + return AuthResult.builder() + .authData(authState.refreshAuthentication()) + .authMethod(authMethod) + .build(); + } + @Builder @Getter public static class AuthResult { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index abfe5a615b227..224324167b592 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -211,16 +211,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private final BrokerInterceptor brokerInterceptor; private State state; private volatile boolean isActive = true; - private String authRole = null; - private volatile AuthenticationDataSource authenticationData; - private AuthenticationProvider authenticationProvider; - private AuthenticationState authState; - // In case of proxy, if the authentication credentials are forwardable, - // it will hold the credentials of the original client - private AuthenticationState originalAuthState; - private volatile AuthenticationDataSource originalAuthData; // Keep temporarily in order to verify after verifying proxy's authData - private AuthData originalAuthDataCopy; private boolean pendingAuthChallengeResponse = false; private ScheduledFuture authRefreshTask; private final DefaultAuthenticationRoleLoggingAnonymizer authenticationRoleLoggingAnonymizer; @@ -236,9 +227,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private String clientSourceAddressAndPort; private int nonPersistentPendingMessages = 0; private final int maxNonPersistentPendingMessages; - private String originalPrincipal = null; private final boolean schemaValidationEnforced; - private String authMethod = "none"; private final int maxMessageSize; private boolean preciseDispatcherFlowControl; @@ -555,12 +544,12 @@ private CompletableFuture isTopicOperationAllowed(TopicName topicName, return CompletableFuture.completedFuture(true); } CompletableFuture result = service.getAuthorizationService().allowTopicOperationAsync( - topicName, operation, originalPrincipal, authRole, + topicName, operation, getOriginalPrincipal(), getAuthRole(), originalAuthDataSource != null ? originalAuthDataSource : authDataSource, authDataSource); result.thenAccept(isAuthorized -> { if (!isAuthorized) { log.warn("Role {} or OriginalRole {} is not authorized to perform operation {} on topic {}", - authRole, originalPrincipal, operation, topicName); + getAuthRole(), getOriginalPrincipal(), operation, topicName); } }); return result; @@ -570,12 +559,13 @@ private CompletableFuture isTopicOperationAllowed(TopicName topicName, TopicOperation operation) { if (service.isAuthorizationEnabled()) { AuthenticationDataSource authDataSource = - new AuthenticationDataSubscription(authenticationData, subscriptionName); - AuthenticationDataSource originalAuthDataSource = null; + new AuthenticationDataSubscription(getAuthenticationData(), subscriptionName); + AuthenticationDataSource originalAuthDataWrapper = null; + AuthenticationDataSource originalAuthData = getOriginalAuthData(); if (originalAuthData != null) { - originalAuthDataSource = new AuthenticationDataSubscription(originalAuthData, subscriptionName); + originalAuthDataWrapper = new AuthenticationDataSubscription(originalAuthData, subscriptionName); } - return isTopicOperationAllowed(topicName, operation, authDataSource, originalAuthDataSource); + return isTopicOperationAllowed(topicName, operation, authDataSource, originalAuthDataWrapper); } else { return CompletableFuture.completedFuture(true); } @@ -617,6 +607,10 @@ protected void handleLookup(CommandLookupTopic lookupParam) { final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { + String authRole = getAuthRole(); + String originalPrincipal = getOriginalPrincipal(); + AuthenticationDataSource authenticationData = getAuthenticationData(); + AuthenticationDataSource originalAuthData = getOriginalAuthData(); isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply( isAuthorized -> { if (isAuthorized) { @@ -706,6 +700,8 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { + AuthenticationDataSource authenticationData = getAuthenticationData(); + AuthenticationDataSource originalAuthData = getOriginalAuthData(); isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply( isAuthorized -> { if (isAuthorized) { @@ -872,6 +868,9 @@ ByteBuf createConsumerStatsResponse(Consumer consumer, long requestId) { // complete the connect and sent newConnected command private void completeConnect(int clientProtoVersion, String clientVersion) { + String authRole = getAuthRole(); + String originalPrincipal = getOriginalPrincipal(); + String authMethod = getAuthMethod(); if (service.isAuthenticationEnabled()) { if (service.isAuthorizationEnabled()) { if (!service.getAuthorizationService() @@ -944,7 +943,7 @@ private void authenticationFailed(Throwable t) { private void maybeScheduleAuthenticationCredentialsRefresh() { assert ctx.executor().inEventLoop(); assert authRefreshTask == null; - if (authState == null) { + if (getAuthState() == null) { // Authentication is disabled or there's no local state to refresh return; } @@ -956,28 +955,15 @@ private void maybeScheduleAuthenticationCredentialsRefresh() { private void refreshAuthenticationCredentials() { assert ctx.executor().inEventLoop(); - AuthenticationState authState = this.originalAuthState != null ? originalAuthState : this.authState; if (getState() == State.Failed) { // Happens when an exception is thrown that causes this connection to close. return; - } else if (!authState.isExpired()) { + } else if (!binaryAuthSession.isExpired()) { // Credentials are still valid. Nothing to do at this point return; - } else if (originalPrincipal != null && originalAuthState == null) { - // This case is only checked when the authState is expired because we've reached a point where - // authentication needs to be refreshed, but the protocol does not support it unless the proxy forwards - // the originalAuthData. - log.info( - "[{}] Cannot revalidate user credential when using proxy and" - + " not forwarding the credentials. Closing connection", - remoteAddress); - ctx.close(); - return; } - if (!supportsAuthenticationRefresh()) { - log.warn("[{}] Closing connection because client doesn't support auth credentials refresh", - remoteAddress); + if (!binaryAuthSession.supportsAuthenticationRefresh()) { ctx.close(); return; } @@ -990,11 +976,12 @@ private void refreshAuthenticationCredentials() { } log.info("[{}] Refreshing authentication credentials for originalPrincipal {} and authRole {}", - remoteAddress, originalPrincipal, this.authRole); + remoteAddress, getOriginalPrincipal(), getAuthRole()); try { - AuthData brokerData = authState.refreshAuthentication(); - - writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, + AuthResult refreshAuthentication = binaryAuthSession.refreshAuthentication(); + String authMethod = refreshAuthentication.getAuthMethod(); + writeAndFlush(Commands.newAuthChallenge(authMethod, + refreshAuthentication.getAuthData(), getRemoteEndpointProtocolVersion())); if (log.isDebugEnabled()) { log.debug("[{}] Sent auth challenge to client to refresh credentials with method: {}.", @@ -1131,15 +1118,6 @@ private void handleAuthResult(@NonNull AuthResult authResult) { authResult.getAuthMethod()); } } else { - authMethod = binaryAuthSession.getAuthMethod(); - authenticationData = binaryAuthSession.getAuthenticationData(); - authState = binaryAuthSession.getAuthState(); - authRole = binaryAuthSession.getAuthRole(); - - originalAuthData = binaryAuthSession.getOriginalAuthData(); - originalPrincipal = binaryAuthSession.getOriginalPrincipal(); - originalAuthState = binaryAuthSession.getOriginalAuthState(); - if (state == State.Connecting) { completeConnect(authResult.getClientProtocolVersion(), authResult.getClientVersion()); } @@ -1158,8 +1136,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { if (log.isDebugEnabled()) { log.debug("[{}] Handle subscribe command: auth role = {}, original auth role = {}", - remoteAddress, authenticationRoleLoggingAnonymizer.anonymize(authRole), - authenticationRoleLoggingAnonymizer.anonymize(originalPrincipal)); + remoteAddress, authenticationRoleLoggingAnonymizer.anonymize(getAuthRole()), + authenticationRoleLoggingAnonymizer.anonymize(getOriginalPrincipal())); } final String subscriptionName = subscribe.getSubscription(); @@ -1441,7 +1419,7 @@ private SchemaData getSchema(Schema protocolSchema) { .data(protocolSchema.getSchemaData()) .isDeleted(false) .timestamp(System.currentTimeMillis()) - .user(Strings.nullToEmpty(originalPrincipal)) + .user(Strings.nullToEmpty(getOriginalPrincipal())) .type(Commands.getSchemaType(protocolSchema.getType())) .props(protocolSchema.getPropertiesList().stream().collect( Collectors.toMap( @@ -1479,7 +1457,7 @@ protected void handleProducer(final CommandProducer cmdProducer) { } CompletableFuture isAuthorizedFuture = isTopicOperationAllowed( - topicName, TopicOperation.PRODUCE, authenticationData, originalAuthData + topicName, TopicOperation.PRODUCE, getAuthenticationData(), getOriginalAuthData() ); if (!Strings.isNullOrEmpty(initialSubscriptionName)) { @@ -2367,6 +2345,10 @@ private CompletableFuture isNamespaceOperationAllowed(NamespaceName nam return CompletableFuture.completedFuture(true); } CompletableFuture isProxyAuthorizedFuture; + String originalPrincipal = getOriginalPrincipal(); + AuthenticationDataSource originalAuthData = getOriginalAuthData(); + String authRole = getAuthRole(); + AuthenticationDataSource authenticationData = getAuthData(); if (originalPrincipal != null) { isProxyAuthorizedFuture = service.getAuthorizationService().allowNamespaceOperationAsync( namespaceName, operation, originalPrincipal, originalAuthData); @@ -2763,11 +2745,13 @@ protected void handleEndTxn(CommandEndTxn command) { private CompletableFuture isSuperUser() { assert ctx.executor().inEventLoop(); if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) { + AuthenticationDataSource originalAuthData = getOriginalAuthData(); + AuthenticationDataSource authenticationData = getAuthData(); CompletableFuture isAuthRoleAuthorized = service.getAuthorizationService().isSuperUser( - authRole, authenticationData); - if (originalPrincipal != null) { + getAuthRole(), authenticationData); + if (getOriginalPrincipal() != null) { CompletableFuture isOriginalPrincipalAuthorized = service.getAuthorizationService() - .isSuperUser(originalPrincipal, + .isSuperUser(getOriginalPrincipal(), originalAuthData != null ? originalAuthData : authenticationData); return isOriginalPrincipalAuthorized.thenCombine(isAuthRoleAuthorized, (originalPrincipal, authRole) -> originalPrincipal && authRole); @@ -3419,7 +3403,7 @@ public BrokerService getBrokerService() { } public String getRole() { - return authRole; + return binaryAuthSession != null ? binaryAuthSession.getAuthRole() : null; } @Override @@ -3446,11 +3430,6 @@ public boolean isBatchMessageCompatibleVersion() { return getRemoteEndpointProtocolVersion() >= ProtocolVersion.v4.getValue(); } - boolean supportsAuthenticationRefresh() { - return features != null && features.isSupportsAuthRefresh(); - } - - boolean supportBrokerMetadata() { return features != null && features.isSupportsBrokerEntryMetadata(); } @@ -3475,29 +3454,44 @@ public boolean isPreciseDispatcherFlowControl() { } public AuthenticationState getAuthState() { - return authState; + return binaryAuthSession != null ? binaryAuthSession.getAuthState() : null; } @Override public AuthenticationDataSource getAuthenticationData() { - return originalAuthData != null ? originalAuthData : authenticationData; + if (binaryAuthSession == null) { + return null; + } + return binaryAuthSession.getAuthenticationData(); } public String getPrincipal() { - return originalPrincipal != null ? originalPrincipal : authRole; + if (binaryAuthSession == null) { + return null; + } + String originalPrincipal = binaryAuthSession.getOriginalPrincipal(); + if (originalPrincipal != null) { + return originalPrincipal; + } + return binaryAuthSession.getAuthRole(); } public AuthenticationProvider getAuthenticationProvider() { - return authenticationProvider; + return binaryAuthSession != null ? binaryAuthSession.getAuthenticationProvider() : null; } @Override public String getAuthRole() { - return authRole; + return binaryAuthSession != null ? binaryAuthSession.getAuthRole() : null; } public String getAuthMethod() { - return authMethod; + return binaryAuthSession != null ? binaryAuthSession.getAuthMethod() : null; + } + + @VisibleForTesting + public BinaryAuthSession getBinaryAuthSession() { + return binaryAuthSession; } public ConcurrentLongHashMap> getConsumers() { @@ -3642,31 +3636,26 @@ public boolean hasProducers() { @VisibleForTesting protected String getOriginalPrincipal() { - return originalPrincipal; + return binaryAuthSession != null ? binaryAuthSession.getOriginalPrincipal() : null; } @VisibleForTesting protected AuthenticationDataSource getAuthData() { - return authenticationData; + return binaryAuthSession != null ? binaryAuthSession.getAuthenticationData() : null; } @VisibleForTesting protected AuthenticationDataSource getOriginalAuthData() { - return originalAuthData; + return binaryAuthSession != null ? binaryAuthSession.getOriginalAuthData() : null; } @VisibleForTesting protected AuthenticationState getOriginalAuthState() { - return originalAuthState; - } - - @VisibleForTesting - protected void setAuthRole(String authRole) { - this.authRole = authRole; + return binaryAuthSession != null ? binaryAuthSession.getOriginalAuthState() : null; } @VisibleForTesting - void setAuthState(AuthenticationState authState) { - this.authState = authState; + void clearBinaryAuthSession() { + this.binaryAuthSession = null; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 1b1d3a8a64fb9..6420496612eb4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -680,6 +680,7 @@ public void testAuthChallengePrincipalChangeFails() throws Exception { Object responseConnected = getResponse(); assertTrue(responseConnected instanceof CommandConnected); assertEquals(serverCnx.getState(), State.Connected); + assertEquals(serverCnx.getAuthRole(), "pass.client"); assertEquals(serverCnx.getPrincipal(), "pass.client"); assertTrue(serverCnx.isActive()); @@ -1273,7 +1274,7 @@ private class ClientChannel implements Closeable { 4), serverCnx); public ClientChannel() { - serverCnx.setAuthRole(""); + serverCnx.clearBinaryAuthSession(); } public void close(){ if (channel != null && channel.isActive()) { @@ -1681,6 +1682,51 @@ public void testProducerOnNotOwnedTopic() throws Exception { channel.finish(); } + private PulsarAuthorizationProvider injectAuth() throws Exception { + svcConfig.setAuthorizationEnabled(true); + AuthorizationService authorizationService = + spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources()); + Field providerField = AuthorizationService.class.getDeclaredField("provider"); + providerField.setAccessible(true); + PulsarAuthorizationProvider authorizationProvider = + spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig, + pulsar.getPulsarResources()); + providerField.set(authorizationService, authorizationProvider); + doReturn(authorizationService).when(brokerService).getAuthorizationService(); + svcConfig.setAuthorizationEnabled(true); + + AuthenticationService authenticationService = mock(AuthenticationService.class); + // use a dummy authentication provider + AuthenticationProvider authenticationProvider = new AuthenticationProvider() { + @Override + public void initialize(ServiceConfiguration config) throws IOException { + + } + + @Override + public String authenticate(AuthenticationDataSource authData) throws AuthenticationException { + return "role"; + } + + @Override + public String getAuthMethodName() { + return "dummy"; + } + + @Override + public void close() throws IOException { + + } + }; + + String authMethodName = authenticationProvider.getAuthMethodName(); + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + svcConfig.setAuthenticationEnabled(true); + + return authorizationProvider; + } + @Test(timeOut = 30000) public void testProducerCommandWithAuthorizationPositive() throws Exception { AuthorizationService authorizationService = mock(AuthorizationService.class); @@ -1709,32 +1755,27 @@ public void testProducerCommandWithAuthorizationPositive() throws Exception { @Test(timeOut = 30000) public void testNonExistentTopic() throws Exception { - AuthorizationService authorizationService = - spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources()); - doReturn(authorizationService).when(brokerService).getAuthorizationService(); - svcConfig.setAuthorizationEnabled(true); - svcConfig.setAuthorizationEnabled(true); - Field providerField = AuthorizationService.class.getDeclaredField("provider"); - providerField.setAccessible(true); - PulsarAuthorizationProvider authorizationProvider = - spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig, - pulsar.getPulsarResources()); - providerField.set(authorizationService, authorizationProvider); + resetChannel(); + PulsarAuthorizationProvider authorizationProvider = injectAuth(); doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider) .isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any()); + // Connect + ByteBuf connectCommand = Commands.newConnect("dummy", "", null); + BinaryAuthSession binaryAuthSession = + spyBinaryAuthSession(brokerService.getAuthenticationService(), connectCommand.copy(), svcConfig); + when(brokerService.getAuthenticationService().createBinaryAuthSession(any())).thenReturn(binaryAuthSession); + channel.writeInbound(connectCommand); + Object response = getResponse(); + assertTrue(response instanceof CommandConnected); + // Test producer creation - resetChannel(); - setChannelConnected(); ByteBuf newProducerCmd = Commands.newProducer(nonExistentTopicName, 1 /* producer id */, 1 /* request id */, "prod-name", Collections.emptyMap(), false); channel.writeInbound(newProducerCmd); assertTrue(getResponse() instanceof CommandError); - channel.finish(); // Test consumer creation - resetChannel(); - setChannelConnected(); ByteBuf newSubscribeCmd = Commands.newSubscribe(nonExistentTopicName, // successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, "test" /* consumer name */, 0); @@ -1745,17 +1786,9 @@ public void testNonExistentTopic() throws Exception { @Test(timeOut = 30000) public void testClusterAccess() throws Exception { - svcConfig.setAuthorizationEnabled(true); - AuthorizationService authorizationService = - spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources()); - Field providerField = AuthorizationService.class.getDeclaredField("provider"); - providerField.setAccessible(true); - PulsarAuthorizationProvider authorizationProvider = - spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig, - pulsar.getPulsarResources()); - providerField.set(authorizationService, authorizationProvider); - doReturn(authorizationService).when(brokerService).getAuthorizationService(); - svcConfig.setAuthorizationEnabled(true); + resetChannel(); + + PulsarAuthorizationProvider authorizationProvider = injectAuth(); doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider) .isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any()); doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider) @@ -1764,15 +1797,20 @@ public void testClusterAccess() throws Exception { .checkPermission(any(TopicName.class), Mockito.anyString(), any(AuthAction.class)); - resetChannel(); - setChannelConnected(); + // Connect + ByteBuf connectCommand = Commands.newConnect("dummy", "", null); + BinaryAuthSession binaryAuthSession = + spyBinaryAuthSession(brokerService.getAuthenticationService(), connectCommand.copy(), svcConfig); + when(brokerService.getAuthenticationService().createBinaryAuthSession(any())).thenReturn(binaryAuthSession); + channel.writeInbound(connectCommand); + Object response = getResponse(); + assertTrue(response instanceof CommandConnected); + ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, "prod-name", Collections.emptyMap(), false); channel.writeInbound(clientCommand); assertTrue(getResponse() instanceof CommandProducerSuccess); - resetChannel(); - setChannelConnected(); clientCommand = Commands.newProducer(topicWithNonLocalCluster, 1 /* producer id */, 1 /* request id */, "prod-name", Collections.emptyMap(), false); channel.writeInbound(clientCommand); @@ -1782,22 +1820,21 @@ public void testClusterAccess() throws Exception { @Test(timeOut = 30000) public void testNonExistentTopicSuperUserAccess() throws Exception { - AuthorizationService authorizationService = - spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources()); - doReturn(authorizationService).when(brokerService).getAuthorizationService(); - svcConfig.setAuthorizationEnabled(true); - Field providerField = AuthorizationService.class.getDeclaredField("provider"); - providerField.setAccessible(true); - PulsarAuthorizationProvider authorizationProvider = - spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig, - pulsar.getPulsarResources()); - providerField.set(authorizationService, authorizationProvider); + resetChannel(); + PulsarAuthorizationProvider authorizationProvider = injectAuth(); doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider) .isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any()); + // Connect + ByteBuf connectCommand = Commands.newConnect("dummy", "", null); + BinaryAuthSession binaryAuthSession = + spyBinaryAuthSession(brokerService.getAuthenticationService(), connectCommand.copy(), svcConfig); + when(brokerService.getAuthenticationService().createBinaryAuthSession(any())).thenReturn(binaryAuthSession); + channel.writeInbound(connectCommand); + Object response = getResponse(); + assertTrue(response instanceof CommandConnected); + // Test producer creation - resetChannel(); - setChannelConnected(); ByteBuf newProducerCmd = Commands.newProducer(nonExistentTopicName, 1 /* producer id */, 1 /* request id */, "prod-name", Collections.emptyMap(), false); channel.writeInbound(newProducerCmd); @@ -1806,11 +1843,8 @@ public void testNonExistentTopicSuperUserAccess() throws Exception { PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(nonExistentTopicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); - channel.finish(); // Test consumer creation - resetChannel(); - setChannelConnected(); ByteBuf newSubscribeCmd = Commands.newSubscribe(nonExistentTopicName, // successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, "test" /* consumer name */, 0 /* avoid reseting cursor */); @@ -2916,7 +2950,6 @@ protected void resetChannel() throws Exception { channel.close().get(); } serverCnx = new ServerCnx(pulsar); - serverCnx.setAuthRole(""); channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder( maxMessageSize, 0, diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index 5f4456d356e58..7c3b45c2c9562 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -43,6 +43,7 @@ import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.CharsetUtil; import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -50,6 +51,9 @@ import lombok.Getter; import lombok.SneakyThrows; import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationState; +import org.apache.pulsar.broker.authentication.BinaryAuthSession; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.PulsarClientException; @@ -100,9 +104,23 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection) this.inboundChannel = proxyConnection.ctx().channel(); this.proxyConnection = proxyConnection; this.inboundChannelRequestsRate = new Rate(); - this.originalPrincipal = proxyConnection.clientAuthRole; - this.clientAuthData = proxyConnection.clientAuthData; - this.clientAuthMethod = proxyConnection.clientAuthMethod; + BinaryAuthSession binaryAuthSession = proxyConnection.getBinaryAuthSession(); + if (binaryAuthSession != null) { + AuthenticationState originalAuthState = binaryAuthSession.getOriginalAuthState(); + boolean forwardOriginal = + originalAuthState != null && service.getConfiguration().isForwardAuthorizationCredentials(); + AuthenticationDataSource authDataSource = forwardOriginal ? binaryAuthSession.getOriginalAuthData() : + binaryAuthSession.getAuthenticationData(); + clientAuthData = AuthData.of(authDataSource.getCommandData().getBytes(StandardCharsets.UTF_8)); + clientAuthMethod = + forwardOriginal ? binaryAuthSession.getOriginalAuthMethod() : binaryAuthSession.getAuthMethod(); + originalPrincipal = + forwardOriginal ? binaryAuthSession.getOriginalPrincipal() : binaryAuthSession.getAuthRole(); + } else { + originalPrincipal = null; + clientAuthData = null; + clientAuthMethod = null; + } this.tlsEnabledWithBroker = service.getConfiguration().isTlsEnabledWithBroker(); this.tlsHostnameVerificationEnabled = service.getConfiguration().isTlsHostnameVerificationEnabled(); this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask; diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java index 1e8e2fb55e4a4..d4ed3e936ddc7 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java @@ -21,9 +21,12 @@ import static com.google.common.base.Preconditions.checkArgument; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.BinaryAuthSession; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.metrics.InstrumentProvider; @@ -41,33 +44,52 @@ */ public class ProxyClientCnx extends ClientCnx { private final boolean forwardClientAuthData; - private final String clientAuthMethod; - private final String clientAuthRole; + private String clientAuthMethod; + private String clientAuthRole; + private final ProxyConnection proxyConnection; + private final BinaryAuthSession binaryAuthSession; - public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole, - String clientAuthMethod, int protocolVersion, + public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, int protocolVersion, boolean forwardClientAuthData, ProxyConnection proxyConnection) { super(InstrumentProvider.NOOP, conf, eventLoopGroup, protocolVersion); - this.clientAuthRole = clientAuthRole; - this.clientAuthMethod = clientAuthMethod; this.forwardClientAuthData = forwardClientAuthData; this.proxyConnection = proxyConnection; + this.binaryAuthSession = proxyConnection.getBinaryAuthSession(); } @Override protected ByteBuf newConnectCommand() throws Exception { + AuthData clientAuthData = null; + if (binaryAuthSession != null) { + clientAuthRole = binaryAuthSession.getAuthRole(); + clientAuthMethod = binaryAuthSession.getAuthMethod(); + if (forwardClientAuthData) { + // There is a chance this auth data is expired because the ProxyConnection does not do early token + // refresh. + // Based on the current design, the best option is to configure the broker to accept slightly stale + // authentication data. + clientAuthData = AuthData.of( + binaryAuthSession.getAuthenticationData().getCommandData().getBytes(StandardCharsets.UTF_8)); + } + + // If original principal is null, it means the client connects the broker via proxy, else the client + // connects the proxy via proxy. + if (binaryAuthSession.getOriginalPrincipal() != null) { + clientAuthRole = binaryAuthSession.getOriginalPrincipal(); + clientAuthMethod = binaryAuthSession.getOriginalAuthMethod(); + AuthenticationDataSource originalAuthData = binaryAuthSession.getOriginalAuthData(); + if (forwardClientAuthData && originalAuthData != null) { + clientAuthData = AuthData.of( + originalAuthData.getCommandData().getBytes(StandardCharsets.UTF_8)); + } + } + } + if (log.isDebugEnabled()) { log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {}," + " clientAuthData = {}, clientAuthMethod = {}", - clientAuthRole, proxyConnection.getClientAuthData(), clientAuthMethod); - } - AuthData clientAuthData = null; - if (forwardClientAuthData) { - // There is a chance this auth data is expired because the ProxyConnection does not do early token refresh. - // Based on the current design, the best option is to configure the broker to accept slightly stale - // authentication data. - clientAuthData = proxyConnection.getClientAuthData(); + clientAuthRole, clientAuthData, clientAuthMethod); } authenticationDataProvider = authentication.getAuthData(remoteHostName); AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index 6db1b302c6613..dde95edb32ddd 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -923,6 +923,12 @@ public class ProxyConfiguration implements PulsarConfiguration { ) private String clusterName; + @FieldContext( + category = CATEGORY_AUTHENTICATION, + doc = "If this flag is set then the broker authenticates the original Auth data" + + " else it just accepts the originalPrincipal and authorizes it (if required)") + private boolean authenticateOriginalAuthData = false; + public String getMetadataStoreUrl() { if (StringUtils.isNotBlank(metadataStoreUrl)) { return metadataStoreUrl; diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index e479b8ee62268..aebf165928f7a 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -33,6 +33,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -49,8 +50,10 @@ import javax.net.ssl.SSLSession; import lombok.Getter; import org.apache.pulsar.broker.PulsarServerException; -import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationState; +import org.apache.pulsar.broker.authentication.BinaryAuthContext; +import org.apache.pulsar.broker.authentication.BinaryAuthSession; +import org.apache.pulsar.broker.authentication.BinaryAuthSession.AuthResult; import org.apache.pulsar.broker.limiter.ConnectionController; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.PulsarClientException; @@ -108,14 +111,8 @@ public class ProxyConnection extends PulsarHandler { private Set> pendingBrokerAuthChallenges = null; private final BrokerProxyValidator brokerProxyValidator; private final ConnectionController connectionController; - String clientAuthRole; - volatile AuthData clientAuthData; - String clientAuthMethod; String clientVersion; - private String authMethod = "none"; - AuthenticationProvider authenticationProvider; - AuthenticationState authState; private ClientConfigurationData clientConf; private boolean hasProxyToBrokerUrl; private int protocolVersionToAdvertise; @@ -124,9 +121,10 @@ public class ProxyConnection extends PulsarHandler { private final DefaultAuthenticationRoleLoggingAnonymizer authenticationRoleLoggingAnonymizer; protected static final Integer SPLICE_BYTES = 1024 * 1024 * 1024; - private static final byte[] EMPTY_CREDENTIALS = new byte[0]; boolean isTlsInboundChannel = false; + @Getter + private BinaryAuthSession binaryAuthSession; enum State { Init, @@ -347,17 +345,41 @@ protected static boolean isTlsChannel(Channel channel) { private synchronized void completeConnect() throws PulsarClientException { checkArgument(state == State.Connecting); + String clientAuthRole; + String clientAuthMethod; + String originalAuthRole; + String originalAuthMethod; + if (binaryAuthSession != null) { + clientAuthRole = binaryAuthSession.getAuthRole(); + clientAuthMethod = binaryAuthSession.getAuthMethod(); + originalAuthRole = binaryAuthSession.getOriginalPrincipal(); + originalAuthMethod = binaryAuthSession.getOriginalAuthMethod(); + } else { + clientAuthRole = null; + clientAuthMethod = null; + originalAuthRole = null; + originalAuthMethod = null; + } String maybeAnonymizedClientAuthRole = authenticationRoleLoggingAnonymizer.anonymize(clientAuthRole); - LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}", - remoteAddress, authMethod, maybeAnonymizedClientAuthRole, hasProxyToBrokerUrl); + String maybeAnonymizedOriginalAuthRole = authenticationRoleLoggingAnonymizer.anonymize(originalAuthRole); + LOG.info("[{}] complete connection, init proxy handler. client authenticated with {} role {}, original " + + "authenticated with {} role {}, hasProxyToBrokerUrl: {}", + remoteAddress, + clientAuthMethod, + maybeAnonymizedClientAuthRole, + originalAuthMethod, + maybeAnonymizedOriginalAuthRole, + hasProxyToBrokerUrl); if (hasProxyToBrokerUrl) { // Optimize proxy connection to fail-fast if the target broker isn't active // Pulsar client will retry connecting after a back off timeout if (service.getConfiguration().isCheckActiveBrokers() && !isBrokerActive(proxyToBrokerUrl)) { state = State.Closing; - LOG.warn("[{}] Target broker '{}' isn't available. authenticated with {} role {}.", - remoteAddress, proxyToBrokerUrl, authMethod, maybeAnonymizedClientAuthRole); + LOG.warn("[{}] Target broker '{}' isn't available. client authenticated with {} role {}, " + + "original authenticated with {} role {}.", + remoteAddress, proxyToBrokerUrl, clientAuthMethod, maybeAnonymizedClientAuthRole, + originalAuthMethod, maybeAnonymizedOriginalAuthRole); final ByteBuf msg = Commands.newError(-1, ServerError.ServiceNotReady, "Target broker isn't available."); writeAndFlushAndClose(msg); @@ -373,14 +395,18 @@ private synchronized void completeConnect() throws PulsarClientException { TargetAddressDeniedException targetAddressDeniedException = (TargetAddressDeniedException) (throwable instanceof TargetAddressDeniedException ? throwable : throwable.getCause()); - - LOG.warn("[{}] Target broker '{}' cannot be validated. {}. authenticated with {} role {}.", + LOG.warn( + "[{}] Target broker '{}' cannot be validated. {}. client authenticated with {} " + + "role {}, original authenticated with {} role {}.", remoteAddress, proxyToBrokerUrl, targetAddressDeniedException.getMessage(), - authMethod, maybeAnonymizedClientAuthRole); + clientAuthMethod, maybeAnonymizedClientAuthRole, originalAuthMethod, + maybeAnonymizedOriginalAuthRole); } else { - LOG.error("[{}] Error validating target broker '{}'. authenticated with {} role {}.", - remoteAddress, proxyToBrokerUrl, authMethod, maybeAnonymizedClientAuthRole, - throwable); + LOG.error("[{}] Error validating target broker '{}'. client authenticated with {} role {}, " + + "original authenticated with {} role {}.", + remoteAddress, proxyToBrokerUrl, clientAuthMethod, maybeAnonymizedClientAuthRole, + originalAuthMethod, + maybeAnonymizedOriginalAuthRole, throwable); } final ByteBuf msg = Commands.newError(-1, ServerError.ServiceNotReady, "Target broker cannot be validated."); @@ -392,8 +418,7 @@ private synchronized void completeConnect() throws PulsarClientException { // and we'll take care of just topics and partitions metadata lookups Supplier clientCnxSupplier; if (service.getConfiguration().isAuthenticationEnabled()) { - clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, - clientAuthMethod, protocolVersionToAdvertise, + clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise, service.getConfiguration().isForwardAuthorizationCredentials(), this); } else { clientCnxSupplier = @@ -466,63 +491,18 @@ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnec } } - // According to auth result, send newConnected or newAuthChallenge command. - private void doAuthentication(AuthData clientData) - throws Exception { - authState - .authenticateAsync(clientData) - .whenCompleteAsync((authChallenge, throwable) -> { - if (throwable == null) { - authChallengeSuccessCallback(authChallenge); - } else { - authenticationFailedCallback(throwable); - } - }, ctx.executor()); - } - protected void authenticationFailedCallback(Throwable t) { LOG.warn("[{}] Unable to authenticate: ", remoteAddress, t); final ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"); writeAndFlushAndClose(msg); } - // Always run in this class's event loop. - protected void authChallengeSuccessCallback(AuthData authChallenge) { - try { - // authentication has completed, will send newConnected command. - if (authChallenge == null) { - clientAuthRole = authState.getAuthRole(); - if (LOG.isDebugEnabled()) { - LOG.debug("[{}] Client successfully authenticated with {} role {}", - remoteAddress, authMethod, authenticationRoleLoggingAnonymizer.anonymize(clientAuthRole)); - } - - // First connection - if (state == State.Connecting) { - // authentication has completed, will send newConnected command. - completeConnect(); - } - return; - } - - // auth not complete, continue auth with client side. - final ByteBuf msg = Commands.newAuthChallenge(authMethod, authChallenge, protocolVersionToAdvertise); - writeAndFlush(msg); - if (LOG.isDebugEnabled()) { - LOG.debug("[{}] Authentication in progress client by method {}.", - remoteAddress, authMethod); - } - } catch (Exception e) { - authenticationFailedCallback(e); - } - } - private void refreshAuthenticationCredentialsAndCloseIfTooExpired() { assert ctx.executor().inEventLoop(); if (state != State.ProxyLookupRequests) { // Happens when an exception is thrown that causes this connection to close. return; - } else if (!authState.isExpired()) { + } else if (!binaryAuthSession.isExpired()) { // Credentials are still valid. Nothing to do at this point return; } @@ -539,8 +519,7 @@ private void refreshAuthenticationCredentialsAndCloseIfTooExpired() { private void maybeSendAuthChallenge() { assert ctx.executor().inEventLoop(); - if (!supportsAuthenticationRefresh()) { - LOG.warn("[{}] Closing connection because client doesn't support auth credentials refresh", remoteAddress); + if (!binaryAuthSession.supportsAuthenticationRefresh()) { ctx.close(); return; } else if (authChallengeSentTime != Long.MAX_VALUE) { @@ -559,11 +538,12 @@ private void maybeSendAuthChallenge() { LOG.debug("[{}] Refreshing authentication credentials", remoteAddress); } try { - AuthData challenge = authState.refreshAuthentication(); - writeAndFlush(Commands.newAuthChallenge(authMethod, challenge, protocolVersionToAdvertise)); + AuthResult authResult = binaryAuthSession.refreshAuthentication(); + writeAndFlush(Commands.newAuthChallenge(authResult.getAuthMethod(), authResult.getAuthData(), + protocolVersionToAdvertise)); if (LOG.isDebugEnabled()) { LOG.debug("[{}] Sent auth challenge to client to refresh credentials with method: {}.", - remoteAddress, authMethod); + remoteAddress, authResult.getAuthMethod()); } authChallengeSentTime = System.nanoTime(); } catch (AuthenticationException e) { @@ -601,15 +581,6 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), return; } - if (connect.hasProxyVersion()) { - if (LOG.isDebugEnabled()) { - LOG.debug("[{}] Client illegally provided proxyVersion.", remoteAddress); - } - state = State.Closing; - writeAndFlushAndClose(Commands.newError(-1, ServerError.NotAllowedError, "Must not provide proxyVersion")); - return; - } - try { // init authn this.clientConf = createClientConfiguration(); @@ -620,39 +591,6 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), return; } - AuthData clientData = AuthData.of(connect.hasAuthData() ? connect.getAuthData() : EMPTY_CREDENTIALS); - if (connect.hasAuthMethodName()) { - authMethod = connect.getAuthMethodName(); - } else if (connect.hasAuthMethod()) { - // Legacy client is passing enum - authMethod = connect.getAuthMethod().name().substring(10).toLowerCase(); - } else { - authMethod = "none"; - } - - if (service.getConfiguration().isForwardAuthorizationCredentials()) { - // We store the first clientData here. Before this commit, we stored the last clientData. - // Since this only works when forwarding single staged authentication, first == last is true. - // Here is an issue to fix the protocol: https://github.com/apache/pulsar/issues/19291. - this.clientAuthData = clientData; - this.clientAuthMethod = authMethod; - } - - authenticationProvider = service - .getAuthenticationService() - .getAuthenticationProvider(authMethod); - - // Not find provider named authMethod. Most used for tests. - // In AuthenticationDisabled, it will set authMethod "none". - if (authenticationProvider == null) { - clientAuthRole = service.getAuthenticationService().getAnonymousUserRole() - .orElseThrow(() -> - new AuthenticationException("No anonymous role, and no authentication provider configured")); - - completeConnect(); - return; - } - // init authState and other var ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER); SSLSession sslSession = null; @@ -660,8 +598,59 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), sslSession = ((SslHandler) sslHandler).engine().getSession(); } - authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession); - doAuthentication(clientData); + binaryAuthSession = service.getAuthenticationService().createBinaryAuthSession(BinaryAuthContext.builder() + .executor(ctx.executor()) + .remoteAddress(remoteAddress) + .sslSession(sslSession) + .authenticationService(service.getAuthenticationService()) + .commandConnect(connect) + .isConnectingSupplier(() -> state == State.Connecting) + .authenticateOriginalAuthData(service.getConfiguration().isAuthenticateOriginalAuthData()) + .build()); + binaryAuthSession.doAuthentication() + .whenCompleteAsync((authResult, ex) -> { + if (ex != null) { + authenticationFailedCallback(ex); + } else { + handleAuthResult(authResult); + } + }, ctx.executor()); + } catch (Exception e) { + authenticationFailedCallback(e); + } + } + + private void handleAuthResult(BinaryAuthSession.AuthResult authResult) { + try { + AuthData authData = authResult.getAuthData(); + if (authData != null) { + writeAndFlush(Commands.newAuthChallenge( + authResult.getAuthMethod(), + authData, + protocolVersionToAdvertise)); + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] Authentication in progress client by method {}.", remoteAddress, + authResult.getAuthMethod()); + } + } else { + // First connection + if (state == State.Connecting) { + // authentication has completed, will send newConnected command. + completeConnect(); + } else { + if (service.getConfiguration().isForwardAuthorizationCredentials()) { + // We only have pendingBrokerAuthChallenges when forwardAuthorizationCredentials is enabled. + if (pendingBrokerAuthChallenges != null && !pendingBrokerAuthChallenges.isEmpty()) { + // Send auth data to pending challenges from the broker + for (CompletableFuture challenge : pendingBrokerAuthChallenges) { + challenge.complete(AuthData.of(getFinalAuthState().getAuthDataSource().getCommandData() + .getBytes(StandardCharsets.UTF_8))); + } + pendingBrokerAuthChallenges.clear(); + } + } + } + } } catch (Exception e) { authenticationFailedCallback(e); } @@ -677,6 +666,12 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) { remoteAddress, authResponse.getResponse().getAuthMethodName()); } + if (binaryAuthSession == null) { + authenticationFailedCallback( + new AuthenticationException("Authentication session is null or not initialized")); + return; + } + try { // Reset the auth challenge sent time to indicate we are not waiting on a client response. authChallengeSentTime = Long.MAX_VALUE; @@ -685,24 +680,16 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) { // Note: this implementation relies on the current weakness that prevents multi-stage authentication // from working when forwardAuthorizationCredentials is enabled. Here is an issue to fix the protocol: // https://github.com/apache/pulsar/issues/19291. - doAuthentication(clientData); - if (service.getConfiguration().isForwardAuthorizationCredentials()) { - // Update the clientAuthData to be able to initialize future ProxyClientCnx. - this.clientAuthData = clientData; - // We only have pendingBrokerAuthChallenges when forwardAuthorizationCredentials is enabled. - if (pendingBrokerAuthChallenges != null && !pendingBrokerAuthChallenges.isEmpty()) { - // Send auth data to pending challenges from the broker - for (CompletableFuture challenge : pendingBrokerAuthChallenges) { - challenge.complete(clientData); - } - pendingBrokerAuthChallenges.clear(); - } - } + binaryAuthSession.authChallenge(clientData, false, protocolVersionToAdvertise, clientVersion) + .whenCompleteAsync((authResult, ex) -> { + if (ex != null) { + authenticationFailedCallback(ex); + } else { + handleAuthResult(authResult); + } + }, ctx.executor()); } catch (Exception e) { - String errorMsg = "Unable to handleAuthResponse"; - LOG.warn("[{}] {} ", remoteAddress, errorMsg, e); - final ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, errorMsg); - writeAndFlushAndClose(msg); + authenticationFailedCallback(e); } } @@ -846,12 +833,10 @@ private void writeAndFlushAndClose(ByteBuf cmd) { NettyChannelUtil.writeAndFlushWithClosePromise(ctx, cmd); } - boolean supportsAuthenticationRefresh() { - return features != null && features.isSupportsAuthRefresh(); - } - - AuthData getClientAuthData() { - return clientAuthData; + private AuthenticationState getFinalAuthState() { + AuthenticationState originalAuthState = binaryAuthSession.getOriginalAuthState(); + AuthenticationState authState = binaryAuthSession.getAuthState(); + return originalAuthState != null ? originalAuthState : authState; } /** @@ -861,9 +846,11 @@ AuthData getClientAuthData() { CompletableFuture getValidClientAuthData() { final CompletableFuture clientAuthDataFuture = new CompletableFuture<>(); ctx().executor().execute(Runnables.catchingAndLoggingThrowables(() -> { + AuthenticationState finalAuthState = getFinalAuthState(); // authState is not thread safe, so this must run on the ProxyConnection's event loop. - if (!authState.isExpired()) { - clientAuthDataFuture.complete(clientAuthData); + if (!finalAuthState.isExpired()) { + clientAuthDataFuture.complete(AuthData.of( + finalAuthState.getAuthDataSource().getCommandData().getBytes(StandardCharsets.UTF_8))); } else if (state == State.ProxyLookupRequests) { maybeSendAuthChallenge(); if (pendingBrokerAuthChallenges == null) { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java index 6887e9ea234c1..938da0514a3c1 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java @@ -252,7 +252,7 @@ void testAuthentication() throws Exception { // Step 4: Ensure that all client contexts share the same auth provider Assert.assertTrue(proxyService.getClientCnxs().size() >= 3, "expect at least 3 clients"); proxyService.getClientCnxs().stream().forEach((cnx) -> { - Assert.assertSame(cnx.authenticationProvider, + Assert.assertSame(cnx.getBinaryAuthSession().getAuthenticationProvider(), proxyService.getAuthenticationService().getAuthenticationProvider("BasicAuthentication")); }); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyToProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyToProxyAuthenticationTest.java new file mode 100644 index 0000000000000..5fc0b502311bd --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyToProxyAuthenticationTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import static org.assertj.core.api.Assertions.assertThat; +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.SignatureAlgorithm; +import java.util.Base64; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import javax.crypto.SecretKey; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.naming.TopicName; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Slf4j +public class ProxyToProxyAuthenticationTest extends ProducerConsumerBase { + private static final String CLUSTER_NAME = "test"; + + private static final String ADMIN_ROLE = "admin"; + private static final String PROXY_ROLE = "proxy"; + private static final String BROKER_ROLE = "broker"; + private static final String CLIENT_ROLE = "client"; + private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + + private static final String ADMIN_TOKEN = Jwts.builder().setSubject(ADMIN_ROLE).signWith(SECRET_KEY).compact(); + private static final String PROXY_TOKEN = Jwts.builder().setSubject(PROXY_ROLE).signWith(SECRET_KEY).compact(); + private static final String BROKER_TOKEN = Jwts.builder().setSubject(BROKER_ROLE).signWith(SECRET_KEY).compact(); + private static final String CLIENT_TOKEN = Jwts.builder().setSubject(CLIENT_ROLE).signWith(SECRET_KEY).compact(); + + @BeforeMethod + @Override + protected void setup() throws Exception { + conf.setAuthenticateOriginalAuthData(false); + conf.setAuthenticationEnabled(true); + conf.getProperties().setProperty("tokenSecretKey", "data:;base64," + + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded())); + + Set superUserRoles = new HashSet<>(); + superUserRoles.add(ADMIN_ROLE); + superUserRoles.add(PROXY_ROLE); + superUserRoles.add(BROKER_ROLE); + conf.setSuperUserRoles(superUserRoles); + conf.setProxyRoles(Collections.singleton(PROXY_ROLE)); + + conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + conf.setBrokerClientAuthenticationParameters(BROKER_TOKEN); + Set providers = new HashSet<>(); + providers.add(AuthenticationProviderToken.class.getName()); + conf.setAuthenticationProviders(providers); + + conf.setClusterName(CLUSTER_NAME); + super.init(); + + admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddress()).authentication( + AuthenticationFactory.token(ADMIN_TOKEN)).build(); + producerBaseSetup(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + private ProxyService createProxyService(String serviceUrl) throws Exception { + ProxyConfiguration proxyConfig = new ProxyConfiguration(); + proxyConfig.setForwardAuthorizationCredentials(true); + proxyConfig.setAuthenticateOriginalAuthData(true); + proxyConfig.setAuthenticationEnabled(true); + proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); + proxyConfig.setWebServicePort(Optional.of(0)); + proxyConfig.setBrokerServiceURL(serviceUrl); + proxyConfig.setClusterName(CLUSTER_NAME); + + proxyConfig.getProperties().setProperty("tokenSecretKey", "data:;base64," + + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded())); + + proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + proxyConfig.setBrokerClientAuthenticationParameters(PROXY_TOKEN); + + Set providers = new HashSet<>(); + providers.add(AuthenticationProviderToken.class.getName()); + proxyConfig.setAuthenticationProviders(providers); + AuthenticationService authenticationService = new AuthenticationService( + PulsarConfigurationLoader.convertFrom(proxyConfig)); + Authentication proxyClientAuthentication = + AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication); + proxyService.start(); + return proxyService; + } + + @Test + public void testClientConnectsThroughTwoAuthenticatedProxiesToBroker() throws Exception { + @Cleanup + ProxyService az1Proxy = createProxyService(pulsar.getBrokerServiceUrl()); + String az1ProxyServiceUrl = az1Proxy.getServiceUrl(); + @Cleanup + ProxyService az2Proxy = createProxyService(az1ProxyServiceUrl); + @Cleanup + PulsarClient pulsarClient = + PulsarClient.builder().serviceUrl(az2Proxy.getServiceUrl()) + .authentication(AuthenticationFactory.token(CLIENT_TOKEN)) + .build(); + String topic = TopicName.get("test-topic").toString(); + String subscription1 = "test-subscription"; + @Cleanup + Consumer consumer1 = + pulsarClient.newConsumer().topic(topic).subscriptionName(subscription1).subscribe(); + String subscription2 = "test2-subscription"; + @Cleanup + Consumer consumer2 = + pulsarClient.newConsumer().topic(topic).subscriptionName(subscription2).subscribe(); + + CompletableFuture> topicIfExists = pulsar.getBrokerService().getTopicIfExists(topic); + assertThat(topicIfExists).succeedsWithin(3, TimeUnit.SECONDS); + Topic topicRef = topicIfExists.get().orElseThrow(); + topicRef.getSubscriptions().forEach((key, value) -> { + ServerCnx cnx = (ServerCnx) value.getConsumers().get(0).cnx(); + assertThat(cnx.getAuthRole()).isEqualTo(PROXY_ROLE); + assertThat(cnx.getBinaryAuthSession().getOriginalPrincipal()).isEqualTo(CLIENT_ROLE); + }); + + consumer1.close(); + consumer2.close(); + } +} From 759e8ca3a58a5208e04fa15daa18d386da48c238 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Mon, 27 Oct 2025 23:41:34 +0800 Subject: [PATCH 15/17] Fix code style --- .../java/org/apache/pulsar/proxy/server/ProxyConnection.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index aebf165928f7a..89c2babc8e46d 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -418,8 +418,9 @@ private synchronized void completeConnect() throws PulsarClientException { // and we'll take care of just topics and partitions metadata lookups Supplier clientCnxSupplier; if (service.getConfiguration().isAuthenticationEnabled()) { - clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise, - service.getConfiguration().isForwardAuthorizationCredentials(), this); + clientCnxSupplier = + () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise, + service.getConfiguration().isForwardAuthorizationCredentials(), this); } else { clientCnxSupplier = () -> new ClientCnx(InstrumentProvider.NOOP, clientConf, service.getWorkerGroup(), From b97b6e2632cc9e59815ca684d05145734ddd6577 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 28 Oct 2025 10:19:13 +0800 Subject: [PATCH 16/17] [fix][proxy] Handle null command data --- .../pulsar/proxy/server/DirectProxyHandler.java | 7 ++++++- .../apache/pulsar/proxy/server/ProxyClientCnx.java | 12 ++++++++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index 7c3b45c2c9562..c4c7ab821d908 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -111,7 +111,12 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection) originalAuthState != null && service.getConfiguration().isForwardAuthorizationCredentials(); AuthenticationDataSource authDataSource = forwardOriginal ? binaryAuthSession.getOriginalAuthData() : binaryAuthSession.getAuthenticationData(); - clientAuthData = AuthData.of(authDataSource.getCommandData().getBytes(StandardCharsets.UTF_8)); + String commandData = authDataSource.getCommandData(); + if (commandData != null) { + clientAuthData = AuthData.of(commandData.getBytes(StandardCharsets.UTF_8)); + } else { + clientAuthData = null; + } clientAuthMethod = forwardOriginal ? binaryAuthSession.getOriginalAuthMethod() : binaryAuthSession.getAuthMethod(); originalPrincipal = diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java index d4ed3e936ddc7..e2acfa5e2075c 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java @@ -69,8 +69,10 @@ protected ByteBuf newConnectCommand() throws Exception { // refresh. // Based on the current design, the best option is to configure the broker to accept slightly stale // authentication data. - clientAuthData = AuthData.of( - binaryAuthSession.getAuthenticationData().getCommandData().getBytes(StandardCharsets.UTF_8)); + String commandData = binaryAuthSession.getAuthenticationData().getCommandData(); + if (commandData != null) { + clientAuthData = AuthData.of(commandData.getBytes(StandardCharsets.UTF_8)); + } } // If original principal is null, it means the client connects the broker via proxy, else the client @@ -80,8 +82,10 @@ protected ByteBuf newConnectCommand() throws Exception { clientAuthMethod = binaryAuthSession.getOriginalAuthMethod(); AuthenticationDataSource originalAuthData = binaryAuthSession.getOriginalAuthData(); if (forwardClientAuthData && originalAuthData != null) { - clientAuthData = AuthData.of( - originalAuthData.getCommandData().getBytes(StandardCharsets.UTF_8)); + String commandData = originalAuthData.getCommandData(); + if (commandData != null) { + clientAuthData = AuthData.of(commandData.getBytes(StandardCharsets.UTF_8)); + } } } } From 6f168edf3a78d7344ef79983cf8a3f327cd4a814 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 28 Oct 2025 11:57:57 +0800 Subject: [PATCH 17/17] Add javadoc and fix test --- .../authentication/AuthenticationService.java | 9 ++ .../authentication/BinaryAuthContext.java | 57 +++++-- .../authentication/BinaryAuthSession.java | 146 +++++++++++++++++- .../pulsar/broker/service/ServerCnx.java | 2 +- .../pulsar/broker/service/ServerCnxTest.java | 2 +- .../pulsar/proxy/server/ProxyConnection.java | 2 +- 6 files changed, 196 insertions(+), 22 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java index 4f2db07b8dbcf..285b1a7b912e8 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java @@ -245,6 +245,15 @@ public void close() throws IOException { } } + /** + * Creates a binary authentication session for Pulsar's binary protocol. + *

+ * This method initializes a {@link BinaryAuthSession} using the provided authentication context. + * Both the proxy and broker can use this method to perform binary protocol authentication. + * + * @param ctx the binary authentication context containing authentication state and credentials + * @return a new {@link BinaryAuthSession} instance + */ public BinaryAuthSession createBinaryAuthSession(BinaryAuthContext ctx) { return new BinaryAuthSession(ctx); } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthContext.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthContext.java index 0757a3e97d9f4..5739156764441 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthContext.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthContext.java @@ -26,45 +26,78 @@ import lombok.Getter; import org.apache.pulsar.common.api.proto.CommandConnect; +/** + * Context object that encapsulates all information required to perform binary protocol + * authentication for a client connection. + *

+ * This context is used by {@link BinaryAuthSession} to manage authentication state, + * credentials, and related connection details during the authentication process. + */ @Getter @Builder public class BinaryAuthContext { /** - * The CommandConnect object representing the client's connection request. + * The {@link CommandConnect} object representing the client's connection request. */ private CommandConnect commandConnect; /** - * The SSLSession associated with the connection, if SSL/TLS is used. + * The {@link SSLSession} associated with the connection, if TLS/SSL is used. + * + *

May be {@code null} for non-TLS connections. When present, authenticators + * can use session details (peer certificates, cipher suite, etc.) as part of + * the authentication decision. */ private SSLSession sslSession; /** - * The AuthenticationService used to perform authentication for this context. + * The {@link AuthenticationService} used to perform authentication operations. + * + *

This is typically the broker-level service that coordinates available + * authentication providers and performs lifecycle operations such as + * verifying credentials or initiating challenges. */ private AuthenticationService authenticationService; /** - * The Executor to use for asynchronous authentication operations. - * Must be provided if authentication involves async tasks. + * The executor used to perform asynchronous authentication operations. + *

+ * This should be the Netty event loop executor associated with the current connection, + * ensuring that authentication tasks run on the same event loop thread. */ private Executor executor; /** - * The remote address of the client initiating the connection. + * The remote {@link SocketAddress} of the client initiating the connection. + * + *

This may be used for audit, logging, access control decisions, or for + * binding authentication state to the client's address. */ private SocketAddress remoteAddress; /** - * If true, authentication should be performed using the original authentication data - * provided by the client, rather than any intermediate or proxy data. - * Set to true when authenticating the initial client request. + * Indicates whether to authenticate the client's original authentication data + * instead of simply trusting the provided principal. + *

+ * When set to {@code true}, the session re-validates the original authentication data + * sent by the client. When set to {@code false}, it skips re-authentication + * and only authorizes the provided principal if necessary. */ private boolean authenticateOriginalAuthData; /** - * Supplier indicating whether the connection is currently in the process of connecting. - * Used to determine connection state during authentication. + * A supplier that indicates whether the current connection is still in the + * initial connect phase. + * + *

When this supplier returns {@code true}, the connection is being + * established and the broker should treat the incoming data as part of the + * initial connect handling. When it returns {@code false}, the + * client is already marked connected; subsequent authentication events + * represent refreshes or re-authentication. + * + *

Using a supplier allows deferred evaluation of the initial-connect state + * (for example, if connection state may change between when the context is + * created and when authentication is executed). */ - private Supplier isConnectingSupplier; + private Supplier isInitialConnectSupplier; } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java index a6ac908d9eac4..a95976e9a0c7f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java @@ -28,35 +28,87 @@ import org.apache.pulsar.common.api.AuthData; import org.jspecify.annotations.NonNull; +/** + * Represents a per-connection authentication session for a client using the broker's binary protocol. + * + *

This class manages the complete authentication lifecycle for a single client connection. + * It tracks the current {@code AuthenticationState}, authentication provider, method, and role, + * as well as the resolved {@code AuthenticationDataSource}. When a proxy is involved, it can + * also store the original credentials and authentication state forwarded by the proxy. + * + *

{@code BinaryAuthSession} handles both the initial authentication (CONNECT) and + * subsequent re-authentication or credential refresh flows. All asynchronous operations + * are executed using the {@link BinaryAuthContext#getExecutor() executor} provided by the + * associated {@link BinaryAuthContext}. + * + *

The session supports two main connection scenarios: + * + *

Direct client-to-broker connections:

+ *
    + *
  • The client and broker may exchange authentication data multiple times until + * authentication is complete.
  • + *
  • If credentials expire, the broker requests the client to refresh them, + * ensuring that the role remains consistent across refreshes.
  • + *
+ * + *

Client-to-broker connections via a proxy:

+ *
    + *
  • The proxy may optionally forward the original client's authentication data.
  • + *
  • The broker first authenticates the proxy, then optionally validates the + * original client's credentials if forwarded.
  • + *
  • {@code originalAuthState} is non-null when the proxy has forwarded the original + * authentication data and the broker is configured to authenticate it.
  • + *
  • Proxy authentication does not expire. The proxy acts as a transparent intermediary, + * and subsequent client credential refreshes occur directly between the client and broker.
  • + *
+ */ @Slf4j @Getter public class BinaryAuthSession { private static final byte[] emptyArray = new byte[0]; + /// Current authentication state of the connected client. private AuthenticationState authState; + // Authentication method used by the connected client. private String authMethod; + // Role of the connected client as determined by authentication. private String authRole = null; + // Authentication data for the connected client (volatile for visibility across threads). private volatile AuthenticationDataSource authenticationData; + // Authentication provider associated with this session. private AuthenticationProvider authenticationProvider; - // In case of proxy, if the authentication credentials are forwardable, - // it will hold the credentials of the original client + // Original authentication method forwarded by proxy, if any. private String originalAuthMethod; + // Original principal forwarded by proxy, if any. private String originalPrincipal = null; + // Original authentication state forwarded by proxy, if any. private AuthenticationState originalAuthState; + // Original authentication data forwarded by proxy (volatile for thread visibility). private volatile AuthenticationDataSource originalAuthData; // Keep temporarily in order to verify after verifying proxy's authData private AuthData originalAuthDataCopy; + // Context holding connection-specific data needed for authentication. private final BinaryAuthContext context; + // Default authentication result returned after successful initial authentication private AuthResult defaultAuthResult; + // Indicates whether the client supports authentication refresh. private boolean supportsAuthRefresh; public BinaryAuthSession(@NonNull BinaryAuthContext context) { this.context = context; } + /** + * Performs the initial authentication process for the client connection. + *

+ * This method handles both standard CONNECT authentication and optional original credentials + * forwarded by a proxy. Authentication may be asynchronous and results in a {@link AuthResult}. + * + * @return a {@link CompletableFuture} that completes with the authentication result + */ public CompletableFuture doAuthentication() { var connect = context.getCommandConnect(); try { @@ -156,12 +208,17 @@ public CompletableFuture doAuthentication() { } } - - // According to auth result, send Connected, AuthChallenge, or Error command. + /** + * Processes the authentication step when the broker receives an authentication response from the client. + * + *

If {@code useOriginalAuthState} is {@code true}, the authentication is performed + * against the original credentials forwarded by a proxy. Otherwise, the primary + * session {@code authState} is used. + */ public CompletableFuture authChallenge(AuthData clientData, boolean useOriginalAuthState, int clientProtocolVersion, - String clientVersion) { + String clientVersion) { // The original auth state can only be set on subsequent auth attempts (and only // in presence of a proxy and if the proxy is forwarding the credentials). // In this case, the re-validation needs to be done against the original client @@ -178,6 +235,22 @@ public CompletableFuture authChallenge(AuthData clientData, context.getExecutor()); } + /** + * Callback invoked when an authentication step completes on the {@link AuthenticationState}. + * + *

If {@code authChallenge} is non-null, the authentication exchange is not yet complete. + * An {@link AuthResult} containing the challenge bytes is returned for the broker or proxy + * to send to the client. This method does not send data itself. + * + *

If {@code authChallenge} is null, the authentication step is complete. In that case, this + * method will: + *

    + *
  • For the initial connection: set the resolved authentication data and role, and + * optionally authenticate original proxy-forwarded credentials.
  • + *
  • For a refresh: validate that the role remains the same and update stored authentication + * data accordingly.
  • + *
+ */ public CompletableFuture authChallengeSuccessCallback(AuthData authChallenge, boolean useOriginalAuthState, String authRole, @@ -193,7 +266,7 @@ public CompletableFuture authChallengeSuccessCallback(AuthData authC String newAuthRole = authState.getAuthRole(); AuthenticationDataSource newAuthDataSource = authState.getAuthDataSource(); - if (context.getIsConnectingSupplier().get()) { + if (context.getIsInitialConnectSupplier().get()) { // Set the auth data and auth role if (!useOriginalAuthState) { this.authRole = newAuthRole; @@ -242,6 +315,9 @@ public CompletableFuture authChallengeSuccessCallback(AuthData authC return CompletableFuture.completedFuture(defaultAuthResult); } + /** + * Performs authentication of the original client credentials forwarded by a proxy. + */ private CompletableFuture authenticateOriginalData() { return originalAuthState .authenticateAsync(originalAuthDataCopy) @@ -270,6 +346,12 @@ private CompletableFuture authenticateOriginalData() { }, context.getExecutor()); } + /** + * Returns whether the current effective authentication state for this session has expired. + * + *

If the session has an {@code originalAuthState} forwarded by a proxy, that state is + * checked first. Otherwise, the session's primary {@code authState} is used. + */ public boolean isExpired() { if (originalAuthState != null) { return originalAuthState.isExpired(); @@ -277,7 +359,17 @@ public boolean isExpired() { return authState.isExpired(); } - public boolean supportsAuthenticationRefresh(){ + /** + * Determines whether the session supports authentication refresh. + * + *

Refresh is not supported when: + *

    + *
  • the client indicated it does not support auth refresh via feature flags
  • + *
  • the session is a proxied connection with an original principal but the proxy did not forward original + * credentials (so re-validation of the original user is impossible)
  • + *
+ */ + public boolean supportsAuthenticationRefresh() { if (originalPrincipal != null && originalAuthState == null) { // This case is only checked when the authState is expired because we've reached a point where // authentication needs to be refreshed, but the protocol does not support it unless the proxy forwards @@ -298,6 +390,16 @@ public boolean supportsAuthenticationRefresh(){ return true; } + /** + * Refreshes the authentication credentials for this session. + * + *

If the session has an {@code originalAuthState} (i.e., credentials forwarded by a proxy + * and broker is configured to authenticate them), the refresh is performed on that state. + * Otherwise, the primary {@code authState} is refreshed. + * + *

The returned {@link AuthResult} contains the updated authentication data and the + * corresponding authentication method. + */ public AuthResult refreshAuthentication() throws AuthenticationException { if (originalAuthState != null) { return AuthResult.builder() @@ -311,12 +413,42 @@ public AuthResult refreshAuthentication() throws AuthenticationException { .build(); } + /** + * Result container for an authentication operation performed by {@link BinaryAuthSession}. + * + *

Holds the optional client protocol/version metadata and the authentication payload + * produced by the underlying authentication provider. This object is returned to the + * broker/proxy to indicate either a completed authentication (no authData) or a pending + * authentication exchange that requires sending {@code authData} back to the client. + */ @Builder @Getter public static class AuthResult { + /** + * Client protocol version used to format protocol-level responses. + * + *

This value is used by the broker or proxy when building response frames so the + * client can interpret any returned authentication bytes correctly. + */ int clientProtocolVersion; + + /** + * Human-readable client version string, if provided by the client. + */ String clientVersion; + + /** + * Authentication data produced by the authentication provider. + * + *

When non-null, these bytes represent a challenge or credentials that must be + * sent to the client to continue the authentication handshake. When null, no further + * client exchange is required and authentication is considered complete. + */ AuthData authData; + + /** + * Identifier of the authentication method associated with {@code authData}. + */ String authMethod; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 224324167b592..804b0797c45d2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1057,7 +1057,7 @@ protected void handleConnect(CommandConnect connect) { .sslSession(sslSession) .authenticationService(service.getAuthenticationService()) .commandConnect(connect) - .isConnectingSupplier(() -> state != State.Connected) + .isInitialConnectSupplier(() -> state != State.Connected) .authenticateOriginalAuthData(service.getPulsar().getConfig().isAuthenticateOriginalAuthData()) .build()); binaryAuthSession.doAuthentication() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 6420496612eb4..5eb0f801a000e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -540,7 +540,7 @@ private BinaryAuthSession spyBinaryAuthSession(AuthenticationService authenticat when(binaryAuthContext.isAuthenticateOriginalAuthData()).thenReturn( serviceConfiguration.isAuthenticateOriginalAuthData()); when(binaryAuthContext.getExecutor()).thenReturn(serverCnx.ctx().executor()); - when(binaryAuthContext.getIsConnectingSupplier()).thenReturn(() -> serverCnx.getState() != State.Connected); + when(binaryAuthContext.getIsInitialConnectSupplier()).thenReturn(() -> serverCnx.getState() != State.Connected); BinaryAuthSession binaryAuthSession = spy(new BinaryAuthSession(binaryAuthContext)); ByteBuf copy = connectCommand.copy(); BaseCommand cmd = new BaseCommand(); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 89c2babc8e46d..9dde8edd18047 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -605,7 +605,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), .sslSession(sslSession) .authenticationService(service.getAuthenticationService()) .commandConnect(connect) - .isConnectingSupplier(() -> state == State.Connecting) + .isInitialConnectSupplier(() -> state == State.Connecting) .authenticateOriginalAuthData(service.getConfiguration().isAuthenticateOriginalAuthData()) .build()); binaryAuthSession.doAuthentication()