Skip to content
Open
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
`opentelemetry` source: Implemented header enrichment for OTLP metrics and traces. Unlike logs, which support enriching
the event itself or its metadata, depending on `log_namespace` settings, for metrics and traces this setting is ignored
and header values are added to the event metadata.

Issue: https://github.com/vectordotdev/vector/issues/24619

authors: ozanichkovsky
5 changes: 3 additions & 2 deletions src/sources/opentelemetry/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ pub struct OpentelemetryConfig {
pub acknowledgements: SourceAcknowledgementsConfig,

/// The namespace to use for logs. This overrides the global setting.
/// Is ignored for header enrichment of metrics and traces.
#[configurable(metadata(docs::hidden))]
#[serde(default)]
pub log_namespace: Option<bool>,
Expand Down Expand Up @@ -202,11 +203,11 @@ pub struct HttpConfig {
#[serde(default)]
pub keepalive: KeepaliveConfig,

/// A list of HTTP headers to include in the log event.
/// A list of HTTP headers to include in the event.
///
/// Accepts the wildcard (`*`) character for headers matching a specified pattern.
///
/// Specifying "*" results in all headers included in the log event.
/// Specifying "*" results in all headers included in the event.
///
/// These headers are not included in the JSON payload if a field with a conflicting name exists.
#[serde(default)]
Expand Down
16 changes: 14 additions & 2 deletions src/sources/opentelemetry/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,15 @@ pub(crate) fn build_warp_filter(
out.clone(),
bytes_received.clone(),
events_received.clone(),
headers.clone(),
metrics_deserializer,
);
let trace_filters = build_warp_trace_filter(
acknowledgements,
out.clone(),
bytes_received,
events_received,
headers.clone(),
traces_deserializer,
);
log_filters
Expand Down Expand Up @@ -260,9 +262,10 @@ fn build_warp_metrics_filter(
source_sender: SourceSender,
bytes_received: Registered<BytesReceived>,
events_received: Registered<EventsReceived>,
headers_cfg: Vec<HttpConfigParamKind>,
deserializer: Option<OtlpDeserializer>,
) -> BoxedFilter<(Response,)> {
let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
decompress_body(encoding_header.as_deref(), body)
.inspect_err(|err| {
// Other status codes are already handled by `sources::util::decompress_body` (tech debt).
Expand All @@ -285,6 +288,10 @@ fn build_warp_metrics_filter(
} else {
decode_metrics_body(decoded_body, &events_received)
}
.map(|mut events| {
enrich_events(&mut events, &headers_cfg, &headers, LogNamespace::default());
events
})
})
};

Expand All @@ -301,9 +308,10 @@ fn build_warp_trace_filter(
source_sender: SourceSender,
bytes_received: Registered<BytesReceived>,
events_received: Registered<EventsReceived>,
headers_cfg: Vec<HttpConfigParamKind>,
deserializer: Option<OtlpDeserializer>,
) -> BoxedFilter<(Response,)> {
let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
decompress_body(encoding_header.as_deref(), body)
.inspect_err(|err| {
// Other status codes are already handled by `sources::util::decompress_body` (tech debt).
Expand All @@ -326,6 +334,10 @@ fn build_warp_trace_filter(
} else {
decode_trace_body(decoded_body, &events_received)
}
.map(|mut events| {
enrich_events(&mut events, &headers_cfg, &headers, LogNamespace::default());
events
})
})
};

Expand Down
Loading
Loading