diff --git a/README.md b/README.md index fd2b1b5..0a98208 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,9 @@ Apache Kafka client library providing additional integrations relating to OAuth/OIDC integrations with Confluent Cloud and Apache Kafka. -## Authenticating to Confluent Cloud via OAuth, using Azure Managed Identites +## Authenticating to Confluent Cloud via OAuth, using Azure Managed Identities -Example Kafka client config and JAAS config: +Example Kafka client config and JAAS config for authenticating to Confluent Cloud using Azure Managed Identities / Pod Identity: ``` bootstrap.servers=pkc-xxxxx.ap-southeast-2.aws.confluent.cloud:9092 @@ -19,3 +19,39 @@ sasl.jaas.config= \ extension_logicalCluster='lkc-xxxxxx' \ extension_identityPoolId='pool-xxxx'; ``` + +Use Azure K8s Workload Identities: + +``` +bootstrap.servers=pkc-xxxxx.ap-southeast-2.aws.confluent.cloud:9092 +security.protocol=SASL_SSL +sasl.oauthbearer.token.endpoint.url=${AZURE_AUTHORITY_HOST}${AZURE_TENANT_ID}/oauth2/v2.0/token +sasl.login.callback.handler.class=io.confluent.oauth.azure.managedidentity.OAuthBearerLoginCallbackHandler +sasl.mechanism=OAUTHBEARER +sasl.jaas.config= \ + org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ + clientId='ignored' \ + clientSecret='ignored' \ + useWorkloadIdentity='true' \ + scope='${CONFLUENT_CLOUD_APP_ID}/.default' \ + extension_logicalCluster='lkc-xxxxxx' \ + extension_identityPoolId='pool-xxxx'; +``` + + + +Use with Schema Registry + +example: +``` +echo '{"make": "Ford", "model": "Mustang", "price": 10000}' | kafka-avro-console-producer \ + --bootstrap-server .confluent.cloud:9092 \ + --property schema.registry.url=https://.confluent.cloud \ + --property bearer.auth.credentials.source='CUSTOM' \ + --property bearer.auth.custom.provider.class=io.confluent.oauth.azure.managedidentity.RegistryBearerAuthCredentialProvider \ + --property bearer.auth.logical.cluster='lsrc-xxxxxx' \ + --producer.config client.properties \ + --reader-config client.properties \ + --topic cars \ + --property value.schema='{"type": "record", "name": "Car", "namespace": "io.spoud.training", "fields": [{"name": "make", "type": "string"}, {"name": "model", "type": "string"}, {"name": "price", "type": "int", "default": 0}]}' +``` diff --git a/build.gradle b/build.gradle index 9d42fd0..207b8c9 100644 --- a/build.gradle +++ b/build.gradle @@ -7,13 +7,18 @@ version '1.1-SNAPSHOT' repositories { mavenCentral() + maven { + url = uri("https://packages.confluent.io/maven") + } } dependencies { - implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.6.1' + implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.0' + implementation group: 'io.confluent', name: 'kafka-schema-registry-client', version: '7.6.0' implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.2' - implementation 'org.bitbucket.b_c:jose4j:0.9.3' + implementation 'org.bitbucket.b_c:jose4j:0.9.6' implementation 'org.slf4j:slf4j-api:1.7.36' + implementation 'com.azure:azure-identity:1.12.0' testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1' testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1' } diff --git a/src/main/java/io/confluent/oauth/HttpAccessTokenRetriever.java b/src/main/java/io/confluent/oauth/HttpAccessTokenRetriever.java index 26e97ad..4267590 100644 --- a/src/main/java/io/confluent/oauth/HttpAccessTokenRetriever.java +++ b/src/main/java/io/confluent/oauth/HttpAccessTokenRetriever.java @@ -21,6 +21,10 @@ package io.confluent.oauth; +import com.azure.core.credential.AccessToken; +import com.azure.core.credential.TokenCredential; +import com.azure.core.credential.TokenRequestContext; +import com.azure.identity.CredentialUnavailableException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.ByteArrayInputStream; @@ -42,6 +46,8 @@ import java.util.concurrent.ExecutionException; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLSocketFactory; + +import io.confluent.oauth.azure.managedidentity.utils.WorkloadIdentityUtils; import org.apache.kafka.common.KafkaException; @@ -122,6 +128,8 @@ public class HttpAccessTokenRetriever implements AccessTokenRetriever { private final Map headers = new HashMap<>(); + private final boolean useWorkloadIdentity; + public HttpAccessTokenRetriever(String clientId, String clientSecret, String scope, @@ -131,9 +139,11 @@ public HttpAccessTokenRetriever(String clientId, long loginRetryBackoffMaxMs, Integer loginConnectTimeoutMs, Integer loginReadTimeoutMs, - String requestMethod) { + String requestMethod, + boolean useWorkloadIdentity) { this.clientId = Objects.requireNonNull(clientId); this.clientSecret = Objects.requireNonNull(clientSecret); + this.useWorkloadIdentity = useWorkloadIdentity; this.scope = scope; this.sslSocketFactory = sslSocketFactory; this.tokenEndpointUrl = Objects.requireNonNull(tokenEndpointUrl); @@ -161,17 +171,29 @@ public HttpAccessTokenRetriever(String clientId, @Override public String retrieve() throws IOException { - String authorizationHeader = formatAuthorizationHeader(clientId, clientSecret); - String requestBody = requestMethod == "GET" ? null : formatRequestBody(scope); - Retry retry = new Retry<>(loginRetryBackoffMs, loginRetryBackoffMaxMs); + String responseBody; - final Map requestHeaders = new HashMap<>(headers); - requestHeaders.put(AUTHORIZATION_HEADER, authorizationHeader); + try { + // TODO: should we use the AZURE_FEDERATED_TOKEN_FILE variable to enable workload identity? And AZURE_AUTHORITY_HOST to use for the endpoint url? + if (this.useWorkloadIdentity){ + log.debug("using workload identity to get token"); + // AccessToken https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core/src/main/java/com/azure/core/credential/AccessToken.java + TokenCredential workloadIdentityCredential = WorkloadIdentityUtils.createWorkloadIdentityCredentialFromEnvironment(); + TokenRequestContext tokenRequestContext = WorkloadIdentityUtils.createTokenRequestContextFromEnvironment(scope); + AccessToken azureIdentityAccessToken = workloadIdentityCredential.getTokenSync(tokenRequestContext); + log.trace("useWorkloadIdentity token, got token from AzureAD: '{}'", azureIdentityAccessToken.getToken()); + return azureIdentityAccessToken.getToken(); + } - String responseBody; + String authorizationHeader = formatAuthorizationHeader(clientId, clientSecret); + String requestBody = requestMethod == "GET" ? null : formatRequestBody(scope); + Retry retry = new Retry<>(loginRetryBackoffMs, loginRetryBackoffMaxMs); + + final Map requestHeaders = new HashMap<>(headers); + requestHeaders.put(AUTHORIZATION_HEADER, authorizationHeader); + - try { responseBody = retry.execute(() -> { HttpURLConnection con = null; @@ -195,6 +217,9 @@ public String retrieve() throws IOException { throw (IOException) e.getCause(); else throw new KafkaException(e.getCause()); + } catch (CredentialUnavailableException ex){ + log.error("Error getting token from AzureAD: {}", ex.getMessage()); + throw new KafkaException(ex); } return parseAccessToken(responseBody); diff --git a/src/main/java/io/confluent/oauth/azure/managedidentity/OAuthBearerLoginCallbackHandler.java b/src/main/java/io/confluent/oauth/azure/managedidentity/OAuthBearerLoginCallbackHandler.java index f3c2a34..1b97fec 100644 --- a/src/main/java/io/confluent/oauth/azure/managedidentity/OAuthBearerLoginCallbackHandler.java +++ b/src/main/java/io/confluent/oauth/azure/managedidentity/OAuthBearerLoginCallbackHandler.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import javax.net.ssl.SSLSocketFactory; import javax.security.auth.callback.Callback; import javax.security.auth.callback.UnsupportedCallbackException; @@ -167,6 +168,7 @@ public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHand public static final String CLIENT_ID_CONFIG = "clientId"; public static final String CLIENT_SECRET_CONFIG = "clientSecret"; public static final String SCOPE_CONFIG = "scope"; + public static final String USE_WORKLOAD_IDENTITY_CONFIG = "useWorkloadIdentity"; public static final String CLIENT_ID_DOC = "The OAuth/OIDC identity provider-issued " + "client ID to uniquely identify the service account to use for authentication for " + @@ -182,9 +184,15 @@ public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHand "clientcredentials grant type."; public static final String SCOPE_DOC = "The (optional) HTTP/HTTPS login request to the " + - "token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL + ") may need to specify an " + - "OAuth \"scope\". If so, the " + SCOPE_CONFIG + " is used to provide the value to " + - "include with the login request."; + "token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL + ") may need to specify an " + + "OAuth \"scope\". If so, the " + SCOPE_CONFIG + " is used to provide the value to " + + "include with the login request."; + + + public static final String USE_WORKLOAD_IDENTITY_DOC = "The (optional) HTTP/HTTPS login request to the " + + "token endpoint (" + SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL + ") may need to specify an " + + "'Authorization' header of the Workload Identity. If so, the " + USE_WORKLOAD_IDENTITY_CONFIG + + " must be set to true"; private static final String EXTENSION_PREFIX = "extension_"; @@ -221,7 +229,7 @@ void init(AccessTokenRetriever accessTokenRetriever, AccessTokenValidator access isInitialized = true; } - protected AccessTokenRetriever createAccessTokenRetriever(Map configs, + public static AccessTokenRetriever createAccessTokenRetriever(Map configs, String saslMechanism, Map jaasConfig) { final ConfigurationUtils cu = new ConfigurationUtils(configs, saslMechanism); @@ -231,22 +239,29 @@ protected AccessTokenRetriever createAccessTokenRetriever(Map configs final String clientId = jou.validateString(CLIENT_ID_CONFIG); final String clientSecret = jou.validateString(CLIENT_SECRET_CONFIG); final String scope = jou.validateString(SCOPE_CONFIG, false); + final boolean useWorkloadIdentity = jou.validateString(USE_WORKLOAD_IDENTITY_CONFIG).equalsIgnoreCase("true"); SSLSocketFactory sslSocketFactory = null; if (jou.shouldCreateSSLSocketFactory(tokenEndpointUrl)) sslSocketFactory = jou.createSSLSocketFactory(); + // make sure we have defaults since we don't get the properties for the schema registry case + long loginRetryBackoffMs = Optional.ofNullable(cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MS, false)).orElse(DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MAX_MS); + long loginRetryBackoffMaxMs = Optional.ofNullable(cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MAX_MS, false)).orElse(DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MAX_MS); + + io.confluent.oauth.HttpAccessTokenRetriever httpAccessTokenRetriever = new io.confluent.oauth.HttpAccessTokenRetriever(clientId, clientSecret, scope, sslSocketFactory, tokenEndpointUrl.toString(), - cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MS), - cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MAX_MS), + loginRetryBackoffMs, + loginRetryBackoffMaxMs, cu.validateInteger(SASL_LOGIN_CONNECT_TIMEOUT_MS, false), cu.validateInteger(SASL_LOGIN_READ_TIMEOUT_MS, false), - "GET"); + "GET", + useWorkloadIdentity); httpAccessTokenRetriever.getHeaders().put("Metadata", "true"); return httpAccessTokenRetriever; diff --git a/src/main/java/io/confluent/oauth/azure/managedidentity/RegistryBearerAuthCredentialProvider.java b/src/main/java/io/confluent/oauth/azure/managedidentity/RegistryBearerAuthCredentialProvider.java new file mode 100644 index 0000000..1885425 --- /dev/null +++ b/src/main/java/io/confluent/oauth/azure/managedidentity/RegistryBearerAuthCredentialProvider.java @@ -0,0 +1,130 @@ +package io.confluent.oauth.azure.managedidentity; + +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; +import io.confluent.kafka.schemaregistry.client.security.bearerauth.BearerAuthCredentialProvider; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.security.JaasContext; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.internals.secured.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.AppConfigurationEntry; +import java.io.IOException; +import java.net.URL; +import java.util.*; + +import static io.confluent.oauth.azure.managedidentity.OAuthBearerLoginCallbackHandler.createAccessTokenRetriever; + +public class RegistryBearerAuthCredentialProvider implements BearerAuthCredentialProvider { + + private static final Logger log = LoggerFactory.getLogger(RegistryBearerAuthCredentialProvider.class); + public static final String SASL_IDENTITY_POOL_CONFIG = "extension_identityPoolId"; + + private String targetSchemaRegistry; + private String targetIdentityPoolId; + private Map moduleOptions; + + private AccessTokenRetriever accessTokenRetriever; + private AccessTokenValidator accessTokenValidator; + private boolean isInitialized; + + + @Override + public void configure(Map configs) { + // from SaslOauthCredentialProvider + Map updatedConfigs = getConfigsForJaasUtil(configs); + JaasContext jaasContext = JaasContext.loadClientContext(updatedConfigs); + List appConfigurationEntries = jaasContext.configurationEntries(); + Map jaasconfig; + if (Objects.requireNonNull(appConfigurationEntries).size() == 1 + && appConfigurationEntries.get(0) != null) { + jaasconfig = Collections.unmodifiableMap( + ((AppConfigurationEntry) appConfigurationEntries.get(0)).getOptions()); + } else { + throw new ConfigException( + String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)", + appConfigurationEntries.size())); + } + + // make sure we have scope and sub set + Map myConfigs = new HashMap<>(configs); + myConfigs.put(SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME, "scope"); + myConfigs.put(SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME, "sub"); + + + ConfigurationUtils cu = new ConfigurationUtils(myConfigs); + JaasOptionsUtils jou = new JaasOptionsUtils((Map) jaasconfig); + + targetSchemaRegistry = cu.validateString( + SchemaRegistryClientConfig.BEARER_AUTH_LOGICAL_CLUSTER, false); + + // if the schema registry oauth configs are set it is given higher preference + targetIdentityPoolId = cu.get(SchemaRegistryClientConfig.BEARER_AUTH_IDENTITY_POOL_ID) != null + ? cu.validateString(SchemaRegistryClientConfig.BEARER_AUTH_IDENTITY_POOL_ID) + : jou.validateString(SASL_IDENTITY_POOL_CONFIG, false); + + String saslMechanism = cu.validateString(SaslConfigs.SASL_MECHANISM); + moduleOptions = JaasOptionsUtils.getOptions(saslMechanism, appConfigurationEntries); + AccessTokenRetriever accessTokenRetriever = createAccessTokenRetriever(myConfigs, saslMechanism, moduleOptions); + + AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(myConfigs, saslMechanism); + init(accessTokenRetriever, accessTokenValidator); + } + + /* + * Package-visible for testing. + */ + + void init(AccessTokenRetriever accessTokenRetriever, AccessTokenValidator accessTokenValidator) { + this.accessTokenRetriever = accessTokenRetriever; + this.accessTokenValidator = accessTokenValidator; + + try { + this.accessTokenRetriever.init(); + } catch (IOException e) { + throw new KafkaException("The OAuth login configuration encountered an error when initializing the AccessTokenRetriever", e); + } + + isInitialized = true; + } + + + @Override + public String getBearerToken(URL url) { + try { + String accessToken = accessTokenRetriever.retrieve(); + OAuthBearerToken token = accessTokenValidator.validate(accessToken); + return accessToken; + } catch (ValidateException | IOException e) { + log.warn(e.getMessage(), e); + return ""; + } + } + + @Override + public String getTargetIdentityPoolId() { + return targetIdentityPoolId; + } + + @Override + public String getTargetSchemaRegistry() { + return targetSchemaRegistry; + } + + // from SaslOauthCredentialProvider + Map getConfigsForJaasUtil(Map configs) { + Map updatedConfigs = new HashMap<>(configs); + if (updatedConfigs.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { + Object saslJaasConfig = updatedConfigs.get(SaslConfigs.SASL_JAAS_CONFIG); + if (saslJaasConfig instanceof String) { + updatedConfigs.put(SaslConfigs.SASL_JAAS_CONFIG, new Password((String) saslJaasConfig)); + } + } + return updatedConfigs; + } + +} diff --git a/src/main/java/io/confluent/oauth/azure/managedidentity/utils/WorkloadIdentityKafkaClientOAuthBearerAuthenticationException.java b/src/main/java/io/confluent/oauth/azure/managedidentity/utils/WorkloadIdentityKafkaClientOAuthBearerAuthenticationException.java new file mode 100644 index 0000000..483f3d7 --- /dev/null +++ b/src/main/java/io/confluent/oauth/azure/managedidentity/utils/WorkloadIdentityKafkaClientOAuthBearerAuthenticationException.java @@ -0,0 +1,28 @@ +/* +MIT License + +Copyright (c) 2023 Nikolai Seip + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +This file was copied from https://github.com/nniikkoollaaii/workload-identity-kafka-sasl-oauthbearer/blob/main/src/main/java/io/github/nniikkoollaaii/kafka/workload_identity/WorkloadIdentityKafkaClientOAuthBearerAuthenticationException.java + */ +package io.confluent.oauth.azure.managedidentity.utils; + +/** + * Custom runtime exception to signal errors when using Workload Identity to fetch a token from AzureAD. + */ +public class WorkloadIdentityKafkaClientOAuthBearerAuthenticationException extends RuntimeException { + + public WorkloadIdentityKafkaClientOAuthBearerAuthenticationException(String message) { + super(message); + } +} \ No newline at end of file diff --git a/src/main/java/io/confluent/oauth/azure/managedidentity/utils/WorkloadIdentityUtils.java b/src/main/java/io/confluent/oauth/azure/managedidentity/utils/WorkloadIdentityUtils.java new file mode 100644 index 0000000..7d4c31e --- /dev/null +++ b/src/main/java/io/confluent/oauth/azure/managedidentity/utils/WorkloadIdentityUtils.java @@ -0,0 +1,97 @@ +/* +MIT License + +Copyright (c) 2023 Nikolai Seip + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +This file was copied from https://github.com/nniikkoollaaii/workload-identity-kafka-sasl-oauthbearer/blob/main/src/main/java/io/github/nniikkoollaaii/kafka/workload_identity/WorkloadIdentityUtils.java + */ +package io.confluent.oauth.azure.managedidentity.utils; + +import com.azure.core.credential.TokenRequestContext; +import com.azure.identity.WorkloadIdentityCredential; +import com.azure.identity.WorkloadIdentityCredentialBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WorkloadIdentityUtils { + + private static final Logger log = LoggerFactory.getLogger(WorkloadIdentityUtils.class); + + + // ENV vars set by AzureAD Workload Identity Mutating Admission Webhook: https://azure.github.io/azure-workload-identity/docs/installation/mutating-admission-webhook.html + public static final String AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_FEDERATED_TOKEN_FILE = "AZURE_FEDERATED_TOKEN_FILE"; + public static final String AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_AUTHORITY_HOST = "AZURE_AUTHORITY_HOST"; + public static final String AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_TENANT_ID = "AZURE_TENANT_ID"; + public static final String AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_CLIENT_ID = "AZURE_CLIENT_ID"; + + + public static String getTenantId() { + String tenantId = System.getenv(WorkloadIdentityUtils.AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_TENANT_ID); + if (tenantId == null || tenantId.equals("")) + throw new WorkloadIdentityKafkaClientOAuthBearerAuthenticationException(String.format("Missing environment variable %s", WorkloadIdentityUtils.AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_TENANT_ID)); + log.debug("Config: Tenant Id " + tenantId); + return tenantId; + } + + public static String getClientId() { + String clientId = System.getenv(WorkloadIdentityUtils.AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_CLIENT_ID); + if (clientId == null || clientId.equals("")) + throw new WorkloadIdentityKafkaClientOAuthBearerAuthenticationException(String.format("Missing environment variable %s", WorkloadIdentityUtils.AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_CLIENT_ID)); + log.debug("Config: Client Id " + clientId); + return clientId; + } + + public static WorkloadIdentityCredential createWorkloadIdentityCredentialFromEnvironment() { + String federatedTokeFilePath = System.getenv(WorkloadIdentityUtils.AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_FEDERATED_TOKEN_FILE); + if (federatedTokeFilePath == null || federatedTokeFilePath.equals("")) + throw new WorkloadIdentityKafkaClientOAuthBearerAuthenticationException(String.format("Missing environment variable %s", WorkloadIdentityUtils.AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_FEDERATED_TOKEN_FILE)); + log.debug("Config: Federated Token File Path " + federatedTokeFilePath); + + String authorityHost = System.getenv(WorkloadIdentityUtils.AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_AUTHORITY_HOST); + if (authorityHost == null || authorityHost.equals("")) + throw new WorkloadIdentityKafkaClientOAuthBearerAuthenticationException(String.format("Missing environment variable %s", WorkloadIdentityUtils.AZURE_AD_WORKLOAD_IDENTITY_MUTATING_ADMISSION_WEBHOOK_ENV_AUTHORITY_HOST)); + log.debug("Config: Authority host " + authorityHost); + + String tenantId = getTenantId(); + String clientId = getClientId(); + + + WorkloadIdentityCredential workloadIdentityCredential = new WorkloadIdentityCredentialBuilder() + .tokenFilePath(federatedTokeFilePath) + .authorityHost(authorityHost) + .clientId(clientId) + .tenantId(tenantId) + .build(); + + return workloadIdentityCredential; + } + + + public static TokenRequestContext createTokenRequestContextFromEnvironment(String scope) { + + String tenantId = getTenantId(); + String clientId = getClientId(); + + //Construct a TokenRequestContext to be used be requsting a token at runtime. + String usedScope = clientId + "/.default"; + if(scope != null && !scope.isEmpty()) { + usedScope = scope; + } + log.debug("Config: Scope " + usedScope); + TokenRequestContext tokenRequestContext = new TokenRequestContext() // TokenRequestContext: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core/src/main/java/com/azure/core/credential/TokenRequestContext.java + .addScopes(usedScope) + .setTenantId(tenantId); + + return tokenRequestContext; + } +} \ No newline at end of file