Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ trait-variant = "0.1.2"
tungstenite = "0.28.0"
twox-hash = { version = "2.1.2", features = ["xxhash32"] }
ulid = "1.2.1"
ureq = "2.10"
uuid = { version = "1.20.0", features = [
"v4",
"v7",
Expand Down
1 change: 1 addition & 0 deletions DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,7 @@ unicode-xid: 0.2.6, "Apache-2.0 OR MIT",
universal-hash: 0.5.1, "Apache-2.0 OR MIT",
unsafe-libyaml: 0.2.11, "MIT",
untrusted: 0.9.0, "ISC",
ureq: 2.12.1, "Apache-2.0 OR MIT",
ureq: 3.1.4, "Apache-2.0 OR MIT",
ureq-proto: 0.5.3, "Apache-2.0 OR MIT",
url: 2.5.8, "Apache-2.0 OR MIT",
Expand Down
16 changes: 9 additions & 7 deletions core/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ disable-mimalloc = []
mimalloc = ["dep:mimalloc"]
iggy-web = ["dep:rust-embed", "dep:mime_guess"]

[target.'cfg(not(target_env = "musl"))'.dependencies]
hwlocality = { workspace = true }

[target.'cfg(target_env = "musl")'.dependencies]
hwlocality = { workspace = true, features = ["vendored"] }

[dependencies]
ahash = { workspace = true }
anyhow = { workspace = true }
Expand Down Expand Up @@ -96,6 +102,7 @@ rustls = { workspace = true }
rustls-pemfile = { workspace = true }
send_wrapper = { workspace = true }
serde = { workspace = true }
serde_json.workspace = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stick to serde_json = { workspace = true } etc. everywhere

serde_with = { workspace = true }
slab = { workspace = true }
socket2 = { workspace = true }
Expand All @@ -112,15 +119,10 @@ tracing-appender = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
tungstenite = { workspace = true }
ulid = { workspace = true }
ulid = "1.2.1"
ureq = { workspace = true }
uuid = { workspace = true }

[target.'cfg(not(target_env = "musl"))'.dependencies]
hwlocality = { workspace = true }

[target.'cfg(target_env = "musl")'.dependencies]
hwlocality = { workspace = true, features = ["vendored"] }

[build-dependencies]
figment = { workspace = true, features = ["json", "toml", "env"] }
vergen-git2 = { workspace = true }
6 changes: 6 additions & 0 deletions core/server/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ decoding_secret = "top_secret$iggy123$_jwt_HS256_key#!"
# `false` means the secret is in plain text.
use_base64_secret = false

# Trusted issuers for A2A (Application-to-Application) authentication
[[http.jwt.trusted_issuers]]
issuer = "test-issuer"
jwks_url = "http://127.0.0.1:8081/.well-known/jwks.json"
audience = "iggy.apache.org"

# Metrics configuration for HTTP.
[http.metrics]
# Enable or disable the metrics endpoint.
Expand Down
1 change: 1 addition & 0 deletions core/server/src/configs/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ impl Default for HttpJwtConfig {
encoding_secret: SERVER_CONFIG.http.jwt.encoding_secret.parse().unwrap(),
decoding_secret: SERVER_CONFIG.http.jwt.decoding_secret.parse().unwrap(),
use_base64_secret: SERVER_CONFIG.http.jwt.use_base_64_secret,
trusted_issuers: None,
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions core/server/src/configs/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ use serde::{Deserialize, Serialize};
use serde_with::DisplayFromStr;
use serde_with::serde_as;

#[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)]
pub struct TrustedIssuerConfig {
pub issuer: String,
pub audience: String,
pub jwks_url: String,
}

#[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)]
pub struct HttpConfig {
pub enabled: bool,
Expand Down Expand Up @@ -72,6 +79,8 @@ pub struct HttpJwtConfig {
#[config_env(secret)]
pub decoding_secret: String,
pub use_base64_secret: bool,
#[serde(default)]
pub trusted_issuers: Option<Vec<TrustedIssuerConfig>>,
}

#[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)]
Expand Down
2 changes: 1 addition & 1 deletion core/server/src/http/jwt/json_web_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct JwtClaims {
pub jti: String,
pub iss: String,
pub aud: String,
pub sub: u32,
pub sub: String,
pub iat: u64,
pub exp: u64,
pub nbf: u64,
Expand Down
160 changes: 160 additions & 0 deletions core/server/src/http/jwt/jwks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

use iggy_common::locking::{IggyRwLock, IggyRwLockFn};
use jsonwebtoken::DecodingKey;
use serde::Deserialize;
use serde_json;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;

#[derive(Debug, Deserialize)]
struct Jwk {
kty: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make it an enum, you could use strum with all the helpers like here:

#[derive(Clone, Copy, Debug, Default, Display, PartialEq, Eq, Serialize, Deserialize)]
#[strum(serialize_all = "snake_case")]
#[serde(rename_all = "snake_case")]
pub enum McpTransport {
    #[default]
    #[strum(to_string = "http")]
    Http,
    #[strum(to_string = "stdio")]
    Stdio,
}

Then the match you do later on becomes easier.

kid: Option<String>,
n: Option<String>,
e: Option<String>,
x: Option<String>,
y: Option<String>,
crv: Option<String>,
}

#[derive(Debug, Deserialize)]
struct JwkSet {
keys: Vec<Jwk>,
}

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
struct CacheKey {
issuer: String,
kid: String,
}

#[derive(Debug, Clone)]
pub struct JwksClient {
cache: Arc<IggyRwLock<HashMap<CacheKey, DecodingKey>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just use DashMap instead?

}

impl Default for JwksClient {
fn default() -> Self {
Self {
cache: Arc::new(IggyRwLock::new(HashMap::new())),
}
}
}

impl JwksClient {
pub async fn get_key(&self, issuer: &str, jwks_url: &str, kid: &str) -> Option<DecodingKey> {
let cache_key = CacheKey {
issuer: issuer.to_string(),
kid: kid.to_string(),
};

{
let cache = self.cache.read().await;
if let Some(key) = cache.get(&cache_key) {
return Some(key.clone());
}
}

if let Ok(key) = self.fetch_and_cache_key(issuer, jwks_url, kid).await {
return Some(key);
}

None
}

async fn fetch_and_cache_key(
&self,
issuer: &str,
jwks_url: &str,
kid: &str,
) -> Result<DecodingKey, anyhow::Error> {
if let Err(e) = self.refresh_keys(issuer, jwks_url).await {
return Err(anyhow::anyhow!("Failed to refresh keys: {}", e));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the existing custom error enums we already have - feel free to add new custom variants if needed or reuse existing ones.

}

let cache_key = CacheKey {
issuer: issuer.to_string(),
kid: kid.to_string(),
};

let cache = self.cache.read().await;
cache
.get(&cache_key)
.cloned()
.ok_or_else(|| anyhow::anyhow!("Key not found in cache after refresh"))
}

async fn refresh_keys(&self, issuer: &str, jwks_url: &str) -> Result<(), anyhow::Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here regarding all the errors, don't use anyhow, please fix in all places returning errors.

let response = ureq::get(jwks_url)
.call()
.map_err(|e| anyhow::anyhow!("Failed to fetch JWKS: {}", e))?;

let body = response
.into_string()
.map_err(|e| anyhow::anyhow!("Failed to read response body: {}", e))?;

let jwks: JwkSet = serde_json::from_str(&body)
.map_err(|e| anyhow::anyhow!("Failed to parse JWKS: {}", e))?;

let mut cache = self.cache.write().await;

for key in jwks.keys {
if let Some(kid) = key.kid {
let decoding_key: DecodingKey = match key.kty.as_str() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you could match using the previously added enum, also makes sense to make the string lowercase first.

"RSA" => {
if let (Some(n), Some(e)) = (key.n.as_deref(), key.e.as_deref()) {
DecodingKey::from_rsa_components(n, e)
.map_err(|e| anyhow::anyhow!("Invalid RSA key: {}", e))?
} else {
continue;
}
}
"EC" => {
if let (Some(x), Some(y), Some(crv)) =
(key.x.as_deref(), key.y.as_deref(), key.crv.as_deref())
{
match crv {
"P-256" => DecodingKey::from_ec_components(x, y)
.map_err(|e| anyhow::anyhow!("Invalid EC key: {}", e))?,
"P-384" => DecodingKey::from_ec_components(x, y)
.map_err(|e| anyhow::anyhow!("Invalid EC key: {}", e))?,
"P-521" => DecodingKey::from_ec_components(x, y)
.map_err(|e| anyhow::anyhow!("Invalid EC key: {}", e))?,
_ => continue,
}
} else {
continue;
}
}
_ => continue,
};

let cache_key = CacheKey {
issuer: issuer.to_string(),
kid,
};
cache.insert(cache_key, decoding_key);
}
}

Ok(())
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please extend this file with unit tests matching the existing conventions to ensure that at least the logic JwksClientlikerefresh_keys` or so is correct.

Loading
Loading