Skip to content

Commit c5e9c36

Browse files
authored
Add Federated Identity Credentials support (#203)
Adds support for Federated Identity Credentials. | CLIENT_ID | CLIENT_SECRET | MANAGED_IDENTITY_CLIENT_ID | Output | |-|-|-|-| | not_set | | | No-Auth | | set | set | | SecretsAuth | | set | not_set | | User Managed Identity Auth | | set | not_set | set (same as CLIENT_ID) | User Managed Identity Auth | | set | not_set | set (diff from CLIENT_ID) | FIC (user managed identity) | | set | not_set | "system" | FIC (system identity) | Federated Identity Credentials has a two step process. It first uses managed identity (UMI or SI) to get an token-assertion. This then is used to build a ConfidentialClient to get the actual token. #### PR Dependency Tree * **PR #203** 👈 This tree was auto-generated by [Charcoal](https://github.com/danerwilliams/charcoal)
1 parent 88e2d50 commit c5e9c36

File tree

8 files changed

+354
-121
lines changed

8 files changed

+354
-121
lines changed

packages/api/src/microsoft/teams/api/auth/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@
44
"""
55

66
from .caller import CallerIds, CallerType
7-
from .credentials import ClientCredentials, Credentials, ManagedIdentityCredentials, TokenCredentials
7+
from .credentials import (
8+
ClientCredentials,
9+
Credentials,
10+
FederatedIdentityCredentials,
11+
ManagedIdentityCredentials,
12+
TokenCredentials,
13+
)
814
from .json_web_token import JsonWebToken, JsonWebTokenPayload
915
from .token import TokenProtocol
1016

@@ -13,6 +19,7 @@
1319
"CallerType",
1420
"ClientCredentials",
1521
"Credentials",
22+
"FederatedIdentityCredentials",
1623
"ManagedIdentityCredentials",
1724
"TokenCredentials",
1825
"TokenProtocol",

packages/api/src/microsoft/teams/api/auth/credentials.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
Licensed under the MIT License.
44
"""
55

6-
from typing import Awaitable, Callable, Optional, Union
6+
from typing import Awaitable, Callable, Literal, Optional, Union
77

88
from ..models import CustomBaseModel
99

@@ -56,5 +56,27 @@ class ManagedIdentityCredentials(CustomBaseModel):
5656
"""
5757

5858

59+
class FederatedIdentityCredentials(CustomBaseModel):
60+
"""Credentials for authentication using Federated Identity Credentials with Managed Identity."""
61+
62+
client_id: str
63+
"""
64+
The client ID of the app registration.
65+
"""
66+
managed_identity_type: Literal["system", "user"]
67+
"""
68+
The type of managed identity: 'system' for system-assigned or 'user' for user-assigned.
69+
"""
70+
managed_identity_client_id: Optional[str] = None
71+
"""
72+
The client ID of the user-assigned managed identity.
73+
Required when managed_identity_type is 'user'.
74+
"""
75+
tenant_id: Optional[str] = None
76+
"""
77+
The tenant ID.
78+
"""
79+
80+
5981
# Union type for credentials
60-
Credentials = Union[ClientCredentials, TokenCredentials, ManagedIdentityCredentials]
82+
Credentials = Union[ClientCredentials, TokenCredentials, ManagedIdentityCredentials, FederatedIdentityCredentials]

packages/apps/src/microsoft/teams/apps/app.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
ConversationAccount,
2222
ConversationReference,
2323
Credentials,
24+
FederatedIdentityCredentials,
2425
ManagedIdentityCredentials,
2526
MessageActivityInput,
2627
TokenCredentials,
@@ -298,26 +299,33 @@ def _init_credentials(self) -> Optional[Credentials]:
298299
else:
299300
self.log.debug(f"Using TENANT_ID: {tenant_id} (assuming single-tenant app)")
300301

301-
# - If client_id + client_secret : use ClientCredentials (standard client auth)
302302
if client_id and client_secret:
303303
self.log.debug("Using client secret for auth")
304304
return ClientCredentials(client_id=client_id, client_secret=client_secret, tenant_id=tenant_id)
305305

306-
# - If client_id + token callable : use TokenCredentials (where token is a custom token provider)
307306
if client_id and token:
308307
return TokenCredentials(client_id=client_id, tenant_id=tenant_id, token=token)
309308

310-
# - If client_id but no client_secret : use ManagedIdentityCredentials (inferred)
311309
if client_id:
312-
# Validate that if managed_identity_client_id is provided, it must equal client_id
310+
if managed_identity_client_id == "system":
311+
self.log.debug("Using Federated Identity Credentials with system-assigned managed identity")
312+
return FederatedIdentityCredentials(
313+
client_id=client_id,
314+
managed_identity_type="system",
315+
managed_identity_client_id=None,
316+
tenant_id=tenant_id,
317+
)
318+
313319
if managed_identity_client_id and managed_identity_client_id != client_id:
314-
raise ValueError(
315-
"Federated Identity Credentials is not yet supported. "
316-
"managed_identity_client_id must equal client_id."
320+
self.log.debug("Using Federated Identity Credentials with user-assigned managed identity")
321+
return FederatedIdentityCredentials(
322+
client_id=client_id,
323+
managed_identity_type="user",
324+
managed_identity_client_id=managed_identity_client_id,
325+
tenant_id=tenant_id,
317326
)
318327

319-
self.log.debug("Using user-assigned managed identity for auth")
320-
# Use managed_identity_client_id if provided, otherwise fall back to client_id
328+
self.log.debug("Using user-assigned managed identity (direct)")
321329
mi_client_id = managed_identity_client_id or client_id
322330
return ManagedIdentityCredentials(
323331
client_id=mi_client_id,

packages/apps/src/microsoft/teams/apps/options.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ class AppOptions(TypedDict, total=False):
3030
managed_identity_client_id: Optional[str]
3131
"""
3232
The managed identity client ID for user-assigned managed identity.
33-
Defaults to client_id if not provided.
33+
Set to "system" for system-assigned managed identity (triggers Federated Identity Credentials).
34+
If set to a different client ID than client_id, triggers Federated Identity Credentials with user-assigned MI.
35+
If not set or equals client_id, uses direct managed identity (no federation).
3436
"""
3537

3638
# Infrastructure
@@ -62,7 +64,12 @@ class InternalAppOptions:
6264
token: Optional[Callable[[Union[str, list[str]], Optional[str]], Union[str, Awaitable[str]]]] = None
6365
"""Custom token provider function. If provided with client_id (no client_secret), uses TokenCredentials."""
6466
managed_identity_client_id: Optional[str] = None
65-
"""The managed identity client ID for user-assigned managed identity. Defaults to client_id if not provided."""
67+
"""
68+
The managed identity client ID for user-assigned managed identity.
69+
Set to "system" for system-assigned managed identity (triggers Federated Identity Credentials).
70+
If set to a different client ID than client_id, triggers Federated Identity Credentials with user-assigned MI.
71+
If not set or equals client_id, uses direct managed identity (no federation).
72+
"""
6673
logger: Optional[Logger] = None
6774
storage: Optional[Storage[str, Any]] = None
6875

packages/apps/src/microsoft/teams/apps/token_manager.py

Lines changed: 158 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,16 @@
1515
JsonWebToken,
1616
TokenProtocol,
1717
)
18-
from microsoft.teams.api.auth.credentials import ManagedIdentityCredentials, TokenCredentials
18+
from microsoft.teams.api.auth.credentials import (
19+
FederatedIdentityCredentials,
20+
ManagedIdentityCredentials,
21+
TokenCredentials,
22+
)
1923
from microsoft.teams.common import ConsoleLogger
2024
from msal import (
2125
ConfidentialClientApplication,
2226
ManagedIdentityClient,
27+
SystemAssignedManagedIdentity,
2328
UserAssignedManagedIdentity,
2429
)
2530

@@ -77,74 +82,166 @@ async def _get_token(
7782
if caller_name:
7883
self._logger.debug(f"No credentials provided for {caller_name}")
7984
return None
80-
if isinstance(credentials, (ClientCredentials, ManagedIdentityCredentials)):
81-
msal_client = self._get_msal_client(tenant_id)
82-
83-
# Handle different acquire_token_for_client signatures
84-
if isinstance(msal_client, ManagedIdentityClient):
85-
# ManagedIdentityClient expects resource as a keyword-only string parameter
86-
scope = scope.removesuffix("/.default")
87-
token_res: dict[str, Any] | None = await asyncio.to_thread(
88-
lambda: msal_client.acquire_token_for_client(resource=scope)
89-
)
90-
else:
91-
# ConfidentialClientApplication expects scopes as a list
92-
token_res: dict[str, Any] | None = await asyncio.to_thread(
93-
lambda: msal_client.acquire_token_for_client([scope])
94-
)
95-
96-
if token_res.get("access_token", None):
97-
access_token = token_res["access_token"]
98-
return JsonWebToken(access_token)
99-
else:
100-
self._logger.debug(f"TokenRes: {token_res}")
101-
error = token_res.get("error", "Error retrieving token")
102-
if not isinstance(error, BaseException):
103-
error = ValueError(error)
104-
error_description = token_res.get("error_description", "Error retrieving token from MSAL")
105-
self._logger.error(error_description)
106-
raise error
85+
if isinstance(credentials, ClientCredentials):
86+
return await self._get_token_with_client_credentials(credentials, scope, tenant_id)
87+
elif isinstance(credentials, ManagedIdentityCredentials):
88+
return await self._get_token_with_managed_identity(credentials, scope)
89+
elif isinstance(credentials, FederatedIdentityCredentials):
90+
return await self._get_token_with_federated_identity(credentials, scope, tenant_id)
10791
elif isinstance(credentials, TokenCredentials):
108-
token = credentials.token(scope, tenant_id)
109-
if isawaitable(token):
110-
access_token = await token
111-
else:
112-
access_token = token
92+
return await self._get_token_with_token_provider(credentials, scope, tenant_id)
93+
94+
return None
95+
96+
async def _get_token_with_client_credentials(
97+
self,
98+
credentials: ClientCredentials,
99+
scope: str,
100+
tenant_id: str,
101+
) -> TokenProtocol:
102+
"""Get token using ClientCredentials (client secret)."""
103+
confidential_client = self._get_confidential_client(credentials, tenant_id)
104+
105+
# ConfidentialClientApplication expects scopes as a list
106+
token_res: dict[str, Any] = await asyncio.to_thread(
107+
lambda: confidential_client.acquire_token_for_client([scope])
108+
)
109+
110+
return self._handle_token_response(token_res)
111+
112+
async def _get_token_with_managed_identity(
113+
self,
114+
credentials: ManagedIdentityCredentials,
115+
scope: str,
116+
) -> TokenProtocol:
117+
"""Get token using ManagedIdentityCredentials (direct, no federation)."""
118+
mi_client = self._get_managed_identity_client(credentials)
119+
120+
# ManagedIdentityClient expects resource as a keyword-only string parameter
121+
resource = scope.removesuffix("/.default")
122+
token_res: dict[str, Any] = await asyncio.to_thread(
123+
lambda: mi_client.acquire_token_for_client(resource=resource)
124+
)
125+
126+
return self._handle_token_response(token_res)
127+
128+
async def _get_token_with_federated_identity(
129+
self,
130+
credentials: FederatedIdentityCredentials,
131+
scope: str,
132+
tenant_id: str,
133+
) -> TokenProtocol:
134+
"""Get token using Federated Identity Credentials (two-step flow)."""
135+
136+
# Step 1: Get MI token from api://AzureADTokenExchange
137+
mi_token = await self._acquire_managed_identity_token(credentials)
138+
139+
# Step 2: Use MI token as client_assertion to get final access token
140+
confidential_client = ConfidentialClientApplication(
141+
credentials.client_id,
142+
client_credential={"client_assertion": mi_token},
143+
authority=DEFAULT_TOKEN_AUTHORITY.format(tenant_id=tenant_id),
144+
)
145+
146+
token_res: dict[str, Any] = await asyncio.to_thread(
147+
lambda: confidential_client.acquire_token_for_client([scope])
148+
)
149+
150+
return self._handle_token_response(token_res, error_prefix="FIC Step 2 failed")
113151

152+
async def _acquire_managed_identity_token(self, credentials: FederatedIdentityCredentials) -> str:
153+
"""Acquire managed identity token for federated identity credentials."""
154+
# Use shared method to get or create the managed identity client
155+
mi_client = self._get_managed_identity_client(credentials)
156+
157+
mi_token_res: dict[str, Any] = await asyncio.to_thread(
158+
lambda: mi_client.acquire_token_for_client(resource="api://AzureADTokenExchange")
159+
)
160+
161+
if not mi_token_res.get("access_token"):
162+
self._logger.error("FIC Step 1 failed: Could not acquire MI token")
163+
error = mi_token_res.get("error", ValueError("Error retrieving MI token"))
164+
if not isinstance(error, BaseException):
165+
error = ValueError(error)
166+
raise error
167+
168+
return mi_token_res["access_token"]
169+
170+
async def _get_token_with_token_provider(
171+
self,
172+
credentials: TokenCredentials,
173+
scope: str,
174+
tenant_id: str,
175+
) -> TokenProtocol:
176+
"""Get token using custom token provider function."""
177+
token = credentials.token(scope, tenant_id)
178+
179+
if isawaitable(token):
180+
access_token = await token
181+
else:
182+
access_token = token
183+
184+
return JsonWebToken(access_token)
185+
186+
def _handle_token_response(self, token_res: dict[str, Any], error_prefix: str = "") -> TokenProtocol:
187+
"""Handle token response from MSAL client."""
188+
if token_res.get("access_token", None):
189+
access_token = token_res["access_token"]
114190
return JsonWebToken(access_token)
191+
else:
192+
error_msg = f"{error_prefix}: " if error_prefix else ""
193+
self._logger.error(f"{error_msg}Could not acquire access token")
194+
self._logger.debug(f"TokenRes: {token_res}")
195+
196+
error = token_res.get("error", "Error retrieving token")
197+
if not isinstance(error, BaseException):
198+
error = ValueError(error)
199+
200+
error_description = token_res.get("error_description", "Error retrieving token from MSAL")
201+
self._logger.error(error_description)
202+
raise error
203+
204+
def _get_confidential_client(self, credentials: ClientCredentials, tenant_id: str) -> ConfidentialClientApplication:
205+
"""Get or create ConfidentialClientApplication for ClientCredentials."""
206+
# Check if client already exists in cache
207+
cached_client = self._confidential_clients_by_tenant.get(tenant_id)
208+
if cached_client:
209+
return cached_client
210+
211+
client: ConfidentialClientApplication = ConfidentialClientApplication(
212+
credentials.client_id,
213+
client_credential=credentials.client_secret,
214+
authority=f"https://login.microsoftonline.com/{tenant_id}",
215+
)
216+
self._confidential_clients_by_tenant[tenant_id] = client
217+
return client
115218

116-
def _get_msal_client(self, tenant_id: str) -> ConfidentialClientApplication | ManagedIdentityClient:
117-
credentials = self._credentials
219+
def _get_managed_identity_client(
220+
self, credentials: ManagedIdentityCredentials | FederatedIdentityCredentials
221+
) -> ManagedIdentityClient:
222+
"""Get or create ManagedIdentityClient for ManagedIdentityCredentials or FederatedIdentityCredentials."""
223+
# Check if client already exists in cache
118224

119-
# Create the appropriate client based on credential type
120-
if isinstance(credentials, ClientCredentials):
121-
# Check if client already exists in cache for this tenant
122-
cached_client = self._confidential_clients_by_tenant.get(tenant_id)
123-
if cached_client:
124-
return cached_client
125-
126-
client: ConfidentialClientApplication = ConfidentialClientApplication(
127-
credentials.client_id,
128-
client_credential=credentials.client_secret,
129-
authority=f"https://login.microsoftonline.com/{tenant_id}",
130-
)
131-
self._confidential_clients_by_tenant[tenant_id] = client
132-
return client
133-
elif isinstance(credentials, ManagedIdentityCredentials):
134-
# ManagedIdentityClient is tenant-agnostic, cache single instance
135-
if self._managed_identity_client:
136-
return self._managed_identity_client
225+
# ManagedIdentityClient is tenant-agnostic, cache single instance
226+
if self._managed_identity_client:
227+
return self._managed_identity_client
137228

138-
# Create user-assigned managed identity
229+
# Determine managed identity type
230+
if isinstance(credentials, FederatedIdentityCredentials):
231+
if credentials.managed_identity_type == "system":
232+
managed_identity = SystemAssignedManagedIdentity()
233+
else: # "user"
234+
mi_client_id = credentials.managed_identity_client_id or credentials.client_id
235+
managed_identity = UserAssignedManagedIdentity(client_id=mi_client_id)
236+
else: # ManagedIdentityCredentials
237+
# ManagedIdentityCredentials only supports user-assigned
139238
managed_identity = UserAssignedManagedIdentity(client_id=credentials.client_id)
140239

141-
self._managed_identity_client = ManagedIdentityClient(
142-
managed_identity,
143-
http_client=requests.Session(),
144-
)
145-
return self._managed_identity_client
146-
else:
147-
raise ValueError(f"Unsupported credential type: {type(credentials)}")
240+
self._managed_identity_client = ManagedIdentityClient(
241+
managed_identity,
242+
http_client=requests.Session(),
243+
)
244+
return self._managed_identity_client
148245

149246
def _resolve_tenant_id(self, tenant_id: str | None, default_tenant_id: str):
150247
return tenant_id or (self._credentials.tenant_id if self._credentials else False) or default_tenant_id

0 commit comments

Comments
 (0)