From 5bddf9ec3e6ab430881da22e8a6ed884e9c6c438 Mon Sep 17 00:00:00 2001 From: Martin Ruderer Date: Thu, 12 Mar 2026 23:33:51 +0100 Subject: [PATCH 1/3] fix(mqtt source): pass client certificates to rumqttc for mutual TLS --- changelog.d/mqtt_source_mutual_TLS.fix.md | 3 +++ src/sources/mqtt/config.rs | 2 +- src/sources/mqtt/source.rs | 12 ++++++++++-- 3 files changed, 14 insertions(+), 3 deletions(-) create mode 100644 changelog.d/mqtt_source_mutual_TLS.fix.md diff --git a/changelog.d/mqtt_source_mutual_TLS.fix.md b/changelog.d/mqtt_source_mutual_TLS.fix.md new file mode 100644 index 0000000000000..229f79dc2dad5 --- /dev/null +++ b/changelog.d/mqtt_source_mutual_TLS.fix.md @@ -0,0 +1,3 @@ +Fixed the mqtt sink's certificate handling. + +authors: mr- diff --git a/src/sources/mqtt/config.rs b/src/sources/mqtt/config.rs index 635751a3a1b0a..92b8cd78f6a5b 100644 --- a/src/sources/mqtt/config.rs +++ b/src/sources/mqtt/config.rs @@ -148,7 +148,7 @@ impl MqttSourceConfig { if let Some(tls) = tls.tls() { let ca = tls.authorities_pem().flatten().collect(); - let client_auth = None; + let client_auth = tls.identity_pem(); let alpn = Some(vec!["mqtt".into()]); options.set_transport(Transport::Tls(TlsConfiguration::Simple { ca, diff --git a/src/sources/mqtt/source.rs b/src/sources/mqtt/source.rs index d0f3efea5b0f7..3ce6fd4548f8a 100644 --- a/src/sources/mqtt/source.rs +++ b/src/sources/mqtt/source.rs @@ -47,7 +47,9 @@ impl MqttSource { client .subscribe(topic, QoS::AtLeastOnce) .await - .map_err(|_| ())?; + .map_err(|e| { + tracing::error!(error = ?e, "Failed to send MQTT subscribe command"); + })?; } OneOrMany::Many(topics) => { client @@ -58,7 +60,9 @@ impl MqttSource { .map(|topic| SubscribeFilter::new(topic, QoS::AtLeastOnce)), ) .await - .map_err(|_| ())?; + .map_err(|e| { + tracing::error!(error = ?e, "Failed to send MQTT subscribe command"); + })?; } } @@ -80,6 +84,10 @@ impl MqttSource { )) => { // TODO Handle acknowledgement - https://github.com/vectordotdev/vector/issues/21967 } + Err(e) => { + tracing::error!("Error = {e:?}"); + return Ok(()); + } _ => {} } } From 9ae50b7e029702a21454938683692346dad9f16c Mon Sep 17 00:00:00 2001 From: Martin Ruderer Date: Tue, 17 Mar 2026 22:11:14 +0100 Subject: [PATCH 2/3] Update changelog.d/mqtt_source_mutual_TLS.fix.md Co-authored-by: Thomas --- changelog.d/mqtt_source_mutual_TLS.fix.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/mqtt_source_mutual_TLS.fix.md b/changelog.d/mqtt_source_mutual_TLS.fix.md index 229f79dc2dad5..9d1c52d65c370 100644 --- a/changelog.d/mqtt_source_mutual_TLS.fix.md +++ b/changelog.d/mqtt_source_mutual_TLS.fix.md @@ -1,3 +1,3 @@ -Fixed the mqtt sink's certificate handling. +Fixed a bug in the `mqtt` source where user-provided TLS client certificates (`crt_file` / `key_file`) were being silently ignored, breaking mTLS connections to strict brokers like AWS IoT Core. authors: mr- From 5ae633a29ad956a134c90d2eb751817b792b4a72 Mon Sep 17 00:00:00 2001 From: Martin Ruderer Date: Tue, 17 Mar 2026 22:18:46 +0100 Subject: [PATCH 3/3] Revert (broken) error handling --- src/sources/mqtt/source.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/sources/mqtt/source.rs b/src/sources/mqtt/source.rs index 3ce6fd4548f8a..d0f3efea5b0f7 100644 --- a/src/sources/mqtt/source.rs +++ b/src/sources/mqtt/source.rs @@ -47,9 +47,7 @@ impl MqttSource { client .subscribe(topic, QoS::AtLeastOnce) .await - .map_err(|e| { - tracing::error!(error = ?e, "Failed to send MQTT subscribe command"); - })?; + .map_err(|_| ())?; } OneOrMany::Many(topics) => { client @@ -60,9 +58,7 @@ impl MqttSource { .map(|topic| SubscribeFilter::new(topic, QoS::AtLeastOnce)), ) .await - .map_err(|e| { - tracing::error!(error = ?e, "Failed to send MQTT subscribe command"); - })?; + .map_err(|_| ())?; } } @@ -84,10 +80,6 @@ impl MqttSource { )) => { // TODO Handle acknowledgement - https://github.com/vectordotdev/vector/issues/21967 } - Err(e) => { - tracing::error!("Error = {e:?}"); - return Ok(()); - } _ => {} } }