Skip to content
Open
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Adds support for including attributes from the `X-Amz-Firehose-Common-Attributes` header in the log events for the `aws_kinesis_firehose` source.

authors: tchanturia
32 changes: 30 additions & 2 deletions src/sources/aws_kinesis_firehose/filters.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{convert::Infallible, io};
use std::{collections::HashMap, convert::Infallible, io};

use bytes::{Buf, Bytes};
use chrono::Utc;
Expand All @@ -14,14 +14,16 @@ use super::{
Compression,
errors::{ParseSnafu, RequestError},
handlers,
models::{FirehoseRequest, FirehoseResponse},
models::{FirehoseCommonAttributesHeader, FirehoseRequest, FirehoseResponse},
};
use crate::{
SourceSender, codecs,
internal_events::{AwsKinesisFirehoseRequestError, AwsKinesisFirehoseRequestReceived},
sources::http_server::HttpConfigParamKind,
};

/// Handles routing of incoming HTTP requests from AWS Kinesis Firehose
#[allow(clippy::too_many_arguments)]
pub fn firehose(
access_keys: Vec<String>,
store_access_key: bool,
Expand All @@ -30,6 +32,7 @@ pub fn firehose(
acknowledgements: bool,
out: SourceSender,
log_namespace: LogNamespace,
common_attributes: Vec<HttpConfigParamKind>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Infallible> + Clone {
let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
let context = handlers::Context {
Expand All @@ -40,6 +43,7 @@ pub fn firehose(
bytes_received,
out,
log_namespace,
common_attributes,
};
warp::post()
.and(emit_received())
Expand All @@ -58,6 +62,7 @@ pub fn firehose(
})
.untuple_one(),
)
.and(parse_common_attributes_header())
.and(parse_body())
.and(warp::any().map(move || context.clone()))
.and_then(handlers::firehose)
Expand Down Expand Up @@ -106,6 +111,29 @@ fn parse_body() -> impl Filter<Extract = (FirehoseRequest,), Error = warp::rejec
)
}

/// Parse AWS Kinesis Firehose X-Amz-Firehose-Common-Attributes header
fn parse_common_attributes_header()
-> impl Filter<Extract = (HashMap<String, String>,), Error = warp::reject::Rejection> + Clone {
warp::any()
.and(warp::header("X-Amz-Firehose-Request-Id"))
.and(warp::header::optional("X-Amz-Firehose-Common-Attributes"))
.and_then(
|request_id: String, common_attributes: Option<String>| async move {
match common_attributes {
Some(common_attributes) => serde_json::from_str(&common_attributes)
.context(ParseSnafu {
request_id: request_id.clone(),
})
.map(|common_attributes_header: FirehoseCommonAttributesHeader| {
common_attributes_header.common_attributes
})
.map_err(warp::reject::custom),
None => Ok(HashMap::new()),
}
},
)
}

fn emit_received() -> impl Filter<Extract = (), Error = warp::reject::Rejection> + Clone {
warp::any()
.and(warp::header::optional("X-Amz-Firehose-Request-Id"))
Expand Down
57 changes: 56 additions & 1 deletion src/sources/aws_kinesis_firehose/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::io::Read;

use base64::prelude::{BASE64_STANDARD, Engine as _};
Expand All @@ -19,7 +20,10 @@ use vector_lib::{
lookup::{PathPrefix, metadata_path, path},
source_sender::SendError,
};
use vrl::compiler::SecretTarget;
use vrl::{
compiler::SecretTarget,
value::{KeyString, ObjectMap, Value},
};
use warp::reject;

use super::{
Expand All @@ -36,6 +40,7 @@ use crate::{
AwsKinesisFirehoseAutomaticRecordDecodeError, EventsReceived, StreamClosedError,
},
sources::aws_kinesis_firehose::AwsKinesisFirehoseConfig,
sources::http_server::HttpConfigParamKind,
};

#[derive(Clone)]
Expand All @@ -47,17 +52,21 @@ pub(super) struct Context {
pub(super) bytes_received: Registered<BytesReceived>,
pub(super) out: SourceSender,
pub(super) log_namespace: LogNamespace,
pub(super) common_attributes: Vec<HttpConfigParamKind>,
}

/// Publishes decoded events from the FirehoseRequest to the pipeline
pub(super) async fn firehose(
request_id: String,
source_arn: String,
common_attributes: HashMap<String, String>,
request: FirehoseRequest,
mut context: Context,
) -> Result<impl warp::Reply, reject::Rejection> {
let log_namespace = context.log_namespace;
let events_received = register!(EventsReceived);
let common_attributes_map =
build_common_attributes_map(&context.common_attributes, &common_attributes);

for record in request.records {
let bytes = decode_record(&record, context.compression)
Expand Down Expand Up @@ -132,6 +141,16 @@ pub(super) async fn firehose(
source_arn.to_owned(),
);

if !common_attributes_map.is_empty() {
log_namespace.insert_source_metadata(
AwsKinesisFirehoseConfig::NAME,
log,
Some(LegacyKey::InsertIfEmpty(path!("common_attributes"))),
path!("common_attributes"),
common_attributes_map.clone(),
);
}

if context.store_access_key
&& let Some(access_key) = &request.access_key
{
Expand Down Expand Up @@ -256,6 +275,42 @@ fn decode_gzip(data: &[u8]) -> std::io::Result<Bytes> {
Ok(Bytes::from(decoded))
}

fn build_common_attributes_map(
common_attributes_config: &[HttpConfigParamKind],
common_attributes: &HashMap<String, String>,
) -> ObjectMap {
let mut common_attributes_map = ObjectMap::new();

for common_attribute_config in common_attributes_config {
match common_attribute_config {
HttpConfigParamKind::Exact(common_attribute_name) => {
let value = common_attributes
.get(common_attribute_name)
.map(String::as_bytes);
common_attributes_map.insert(
KeyString::from(common_attribute_name.to_owned()),
Value::from(value.map(Bytes::copy_from_slice)),
);
}
HttpConfigParamKind::Glob(common_attribute_pattern) => {
for common_attribute_name in common_attributes.keys() {
if common_attribute_pattern.matches(common_attribute_name) {
let value = common_attributes
.get(common_attribute_name)
.map(String::as_bytes);
common_attributes_map.insert(
KeyString::from(common_attribute_name.to_owned()),
Value::from(value.map(Bytes::copy_from_slice)),
);
}
}
}
}
}

common_attributes_map
}

#[cfg(test)]
mod tests {
use std::io::Write as _;
Expand Down
Loading
Loading