Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,20 @@ static struct flb_aws_header storage_class_header = {
.val_len = 0,
};

static struct flb_aws_header sse_header = {
.key = "x-amz-server-side-encryption",
.key_len = 28,
.val = "",
.val_len = 0,
};

static struct flb_aws_header sse_kms_key_id_header = {
.key = "x-amz-server-side-encryption-aws-kms-key-id",
.key_len = 43,
.val = "",
.val_len = 0,
};

static char *mock_error_response(char *error_env_var)
{
char *err_val = NULL;
Expand Down Expand Up @@ -194,6 +208,12 @@ int create_headers(struct flb_s3 *ctx, char *body_md5,
if (ctx->storage_class != NULL) {
headers_len++;
}
if (ctx->sse != NULL) {
headers_len++;
}
if (ctx->sse_kms_key_id != NULL) {
headers_len++;
}
if (headers_len == 0) {
*num_headers = headers_len;
*headers = s3_headers;
Expand Down Expand Up @@ -239,6 +259,19 @@ int create_headers(struct flb_s3 *ctx, char *body_md5,
s3_headers[n] = storage_class_header;
s3_headers[n].val = ctx->storage_class;
s3_headers[n].val_len = strlen(ctx->storage_class);
n++;
}
if (ctx->sse != NULL) {
s3_headers[n] = sse_header;
s3_headers[n].val = ctx->sse;
s3_headers[n].val_len = strlen(ctx->sse);
n++;
}
if (ctx->sse_kms_key_id != NULL && ctx->sse != NULL && (strncmp(ctx->sse, "aws:kms", sizeof("aws:kms")) == 0 || strncmp(ctx->sse, "aws:kms:dsse", sizeof("aws:kms:dsse")) == 0)) {
s3_headers[n] = sse_kms_key_id_header;
s3_headers[n].val = ctx->sse_kms_key_id;
s3_headers[n].val_len = strlen(ctx->sse_kms_key_id);
n++;
}

*num_headers = headers_len;
Expand Down Expand Up @@ -860,6 +893,26 @@ static int cb_s3_init(struct flb_output_instance *ins,
ctx->storage_class = (char *) tmp;
}

tmp = flb_output_get_property("sse", ins);
if (tmp) {
if (strncmp(tmp, "AES256", sizeof("AES256")) != 0 &&
strncmp(tmp, "aws:kms", sizeof("aws:kms")) != 0 &&
strncmp(tmp, "aws:kms:dsse", sizeof("aws:kms:dsse")) != 0) {
flb_plg_error(ctx->ins, "Invalid 'sse' value '%s'. Must be 'AES256', 'aws:kms', or 'aws:kms:dsse'", tmp);
return -1;
}
ctx->sse = (char *) tmp;
}

tmp = flb_output_get_property("sse_kms_key_id", ins);
if (tmp) {
if (ctx->sse == NULL || (strncmp(ctx->sse, "aws:kms", sizeof("aws:kms")) != 0 && strncmp(ctx->sse, "aws:kms:dsse", sizeof("aws:kms:dsse")) != 0)) {
flb_plg_error(ctx->ins, "Invalid 'sse_kms_key_id' value '%s'. 'sse_kms_key_id' is only applicable when 'sse' is set to 'aws:kms' or 'aws:kms:dsse'", tmp);
return -1;
}
ctx->sse_kms_key_id = (char *) tmp;
}

if (ctx->insecure == FLB_FALSE) {
ctx->client_tls = flb_tls_create(FLB_TLS_CLIENT_MODE,
ins->tls_verify,
Expand Down Expand Up @@ -4154,6 +4207,23 @@ static struct flb_config_map config_map[] = {
"will be stored with the default 'STANDARD' storage class."
},

{
FLB_CONFIG_MAP_STR, "sse", NULL,
0, FLB_FALSE, 0,
"Server-side encryption for S3 objects. Set to 'AES256' for S3-managed keys "
"(SSE-S3), 'aws:kms' for AWS KMS-managed keys (SSE-KMS), or 'aws:kms:dsse' for "
"dual-layer server-side encryption with KMS (DSSE-KMS). When using 'aws:kms' or "
"'aws:kms:dsse'"
},

{
FLB_CONFIG_MAP_STR, "sse_kms_key_id", NULL,
0, FLB_FALSE, 0,
"AWS key ARN for server-side encryption. Only applicable when "
"'sse' is set to 'aws:kms' or 'aws:kms:dsse'. If not specified, the default AWS-managed KMS key "
"for S3 will be used."
},

{
FLB_CONFIG_MAP_STR, "profile", NULL,
0, FLB_TRUE, offsetof(struct flb_s3, profile),
Expand Down
2 changes: 2 additions & 0 deletions plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ struct flb_s3 {
char *canned_acl;
char *content_type;
char *storage_class;
char *sse;
char *sse_kms_key_id;
char *log_key;
char *external_id;
char *profile;
Expand Down
97 changes: 97 additions & 0 deletions tests/runtime/out_s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,99 @@ void flb_test_s3_complete_upload_error(void)
unsetenv("TEST_COMPLETE_MULTIPART_UPLOAD_ERROR");
}

void flb_test_s3_sse_invalid_value(void)
{
int ret;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* mocks calls- signals that we are in test mode */
setenv("FLB_S3_PLUGIN_UNDER_TEST", "true", 1);

ctx = flb_create();

in_ffd = flb_input(ctx, (char *) "lib", NULL);
TEST_CHECK(in_ffd >= 0);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

out_ffd = flb_output(ctx, (char *) "s3", NULL);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd, "match", "*", NULL);
flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL);
flb_output_set(ctx, out_ffd, "bucket", "fluent", NULL);
flb_output_set(ctx, out_ffd, "sse", "invalid_encryption", NULL);

ret = flb_start(ctx);
TEST_CHECK(ret != 0); /* Expect failure due to invalid sse value */

flb_destroy(ctx);
}

void flb_test_s3_sse_kms_valid(void)
{
int ret;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* mocks calls- signals that we are in test mode */
setenv("FLB_S3_PLUGIN_UNDER_TEST", "true", 1);

ctx = flb_create();

in_ffd = flb_input(ctx, (char *) "lib", NULL);
TEST_CHECK(in_ffd >= 0);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

out_ffd = flb_output(ctx, (char *) "s3", NULL);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd, "match", "*", NULL);
flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL);
flb_output_set(ctx, out_ffd, "bucket", "fluent", NULL);
flb_output_set(ctx, out_ffd, "sse", "aws:kms", NULL);
flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL);

ret = flb_start(ctx);
TEST_CHECK(ret == 0); /* Expect success with valid sse value */

sleep(1);
flb_stop(ctx);
flb_destroy(ctx);
}

void flb_test_s3_sse_aes256_valid(void)
{
int ret;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* mocks calls- signals that we are in test mode */
setenv("FLB_S3_PLUGIN_UNDER_TEST", "true", 1);

ctx = flb_create();

in_ffd = flb_input(ctx, (char *) "lib", NULL);
TEST_CHECK(in_ffd >= 0);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

out_ffd = flb_output(ctx, (char *) "s3", NULL);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd, "match", "*", NULL);
flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL);
flb_output_set(ctx, out_ffd, "bucket", "fluent", NULL);
flb_output_set(ctx, out_ffd, "sse", "AES256", NULL);
flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL);

ret = flb_start(ctx);
TEST_CHECK(ret == 0); /* Expect success with valid sse value */

sleep(1);
flb_stop(ctx);
flb_destroy(ctx);
}


/* Test list */
TEST_LIST = {
Expand All @@ -237,5 +330,9 @@ TEST_LIST = {
{"create_upload_error", flb_test_s3_create_upload_error },
{"upload_part_error", flb_test_s3_upload_part_error },
{"complete_upload_error", flb_test_s3_complete_upload_error },
{"sse_invalid_value", flb_test_s3_sse_invalid_value },
{"sse_kms_valid", flb_test_s3_sse_kms_valid },
{"sse_aes256_valid", flb_test_s3_sse_aes256_valid },
{NULL, NULL}
};