diff --git a/plugins/in_splunk/splunk.c b/plugins/in_splunk/splunk.c index 096c9078705..13a18ce4013 100644 --- a/plugins/in_splunk/splunk.c +++ b/plugins/in_splunk/splunk.c @@ -261,6 +261,16 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_splunk, tag_key), "Set a record key to specify the tag of the record" }, + { + FLB_CONFIG_MAP_BOOL, "add_remote_addr", "false", + 0, FLB_TRUE, offsetof(struct flb_splunk, add_remote_addr), + "Inject a remote address using the X-Forwarded-For header or connection address" + }, + { + FLB_CONFIG_MAP_STR, "remote_addr_key", "remote_addr", + 0, FLB_TRUE, offsetof(struct flb_splunk, remote_addr_key), + "Set a record key for storing the remote address" + }, /* EOF */ diff --git a/plugins/in_splunk/splunk.h b/plugins/in_splunk/splunk.h index 56a45bf2d05..9140747a7fb 100644 --- a/plugins/in_splunk/splunk.h +++ b/plugins/in_splunk/splunk.h @@ -54,6 +54,8 @@ struct flb_splunk { size_t ingested_auth_header_len; int store_token_in_metadata; flb_sds_t store_token_key; + int add_remote_addr; + flb_sds_t remote_addr_key; struct flb_log_event_encoder log_encoder; @@ -71,6 +73,10 @@ struct flb_splunk { struct flb_downstream *downstream; /* Client manager */ struct mk_list connections; /* linked list of connections */ struct mk_server *server; + + /* Remote address */ + flb_sds_t current_remote_addr; + size_t current_remote_addr_len; }; diff --git a/plugins/in_splunk/splunk_config.c b/plugins/in_splunk/splunk_config.c index 7e3d058f208..014dc8aab69 100644 --- a/plugins/in_splunk/splunk_config.c +++ b/plugins/in_splunk/splunk_config.c @@ -146,6 +146,9 @@ struct flb_splunk *splunk_config_create(struct flb_input_instance *ins) ctx->ingested_auth_header = NULL; + ctx->current_remote_addr = NULL; + ctx->current_remote_addr_len = 0; + ret = setup_hec_tokens(ctx); if (ret != 0) { splunk_config_destroy(ctx); diff --git a/plugins/in_splunk/splunk_prot.c b/plugins/in_splunk/splunk_prot.c index 536a939f108..8541575e832 100644 --- a/plugins/in_splunk/splunk_prot.c +++ b/plugins/in_splunk/splunk_prot.c @@ -20,10 +20,14 @@ #include #include #include +#include #include #include #include #include +#include +#include +#include #include #include @@ -155,6 +159,152 @@ static int send_json_message_response(struct splunk_conn *conn, int http_status, return 0; } +/* + * We use two backends for HTTP parsing and it depends on the version of the + * protocol: + * + * http/1.x: we use Monkey HTTP parser: struct mk_http_session.parser + */ +static int http_header_lookup(int version, void *ptr, char *key, + char **val, size_t *val_len) +{ + int key_len; + + /* HTTP/1.1 */ + struct mk_list *head; + struct mk_http_session *session; + struct mk_http_request *request_11; + struct mk_http_header *header; + + /* HTTP/2.0 */ + char *value; + struct flb_http_request *request_20; + + if (!key) { + return -1; + } + + key_len = strlen(key); + if (key_len <= 0) { + return -1; + } + + if (version <= HTTP_PROTOCOL_VERSION_11) { + if (!ptr) { + return -1; + } + + request_11 = (struct mk_http_request *) ptr; + session = request_11->session; + mk_list_foreach(head, &session->parser.header_list) { + header = mk_list_entry(head, struct mk_http_header, _head); + if (header->key.len == key_len && + strncasecmp(header->key.data, key, key_len) == 0) { + *val = header->val.data; + *val_len = header->val.len; + return 0; + } + } + return -1; + } + else if (version == HTTP_PROTOCOL_VERSION_20) { + request_20 = ptr; + if (!request_20) { + return -1; + } + + value = flb_http_request_get_header(request_20, key); + if (!value) { + return -1; + } + + *val = value; + *val_len = strlen(value); + return 0; + } + + return -1; +} + +static void extract_xff_value(const char *value, size_t value_len, + const char **out_value, size_t *out_len) +{ + const char *start; + const char *end; + const char *comma; + + *out_value = NULL; + *out_len = 0; + + if (value == NULL || value_len == 0) { + return; + } + + start = value; + end = value + value_len; + + while (start < end && (*start == ' ' || *start == '\t')) { + start++; + } + + comma = memchr(start, ',', end - start); + if (comma != NULL) { + end = comma; + } + + while (end > start && (end[-1] == ' ' || end[-1] == '\t')) { + end--; + } + + if (end > start) { + *out_value = start; + *out_len = end - start; + } +} + +static int extract_remote_address(const char *xff_value, + size_t xff_value_len, + struct flb_connection *connection, + char **out, + size_t *out_len) +{ + const char *value = NULL; + size_t len = 0; + + extract_xff_value(xff_value, xff_value_len, &value, &len); + + if (value == NULL && connection != NULL) { + value = flb_connection_get_remote_address(connection); + if (value != NULL) { + len = strlen(value); + } + } + + if (value == NULL || len == 0) { + return -1; + } + + *out = value; + *out_len = len; + return 0; +} + +static int append_remote_addr(struct flb_splunk *ctx, + const char *addr, + size_t addr_len) +{ + if (ctx->add_remote_addr != FLB_TRUE || + ctx->remote_addr_key == NULL || + addr == NULL || addr_len == 0) { + return FLB_EVENT_ENCODER_SUCCESS; + } + + return flb_log_event_encoder_append_body_values( + &ctx->log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE(ctx->remote_addr_key), + FLB_LOG_EVENT_STRING_VALUE(addr, addr_len)); +} + /* implements functionality to get tag from key in record */ static flb_sds_t tag_key(struct flb_splunk *ctx, msgpack_object *map) { @@ -236,6 +386,12 @@ static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char } } + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = append_remote_addr(ctx, + ctx->current_remote_addr, + ctx->current_remote_addr_len); + } + if (ret == FLB_EVENT_ENCODER_SUCCESS) { ret = flb_log_event_encoder_commit_record(&ctx->log_encoder); } @@ -267,7 +423,10 @@ static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *record, flb_sds_t tag, flb_sds_t tag_from_record, - struct flb_time tm) { + struct flb_time tm, + const char *remote_addr, + size_t remote_addr_len) +{ int ret; int i; msgpack_object_kv *kv; @@ -280,13 +439,25 @@ static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *recor &tm); } - if (ctx->store_token_in_metadata == FLB_TRUE) { - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - &ctx->log_encoder, - record); + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + /* Always build body by appending map entries so we can extend it */ + if (record->type == MSGPACK_OBJECT_MAP) { + kv = record->via.map.ptr; + for (i = 0; i < record->via.map.size && + ret == FLB_EVENT_ENCODER_SUCCESS; i++) { + ret = flb_log_event_encoder_append_body_values( + &ctx->log_encoder, + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].key), + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].val)); + } + } + else { + ret = flb_log_event_encoder_set_body_from_msgpack_object(&ctx->log_encoder, + record); } + } + if (ctx->store_token_in_metadata == FLB_TRUE) { if (ctx->ingested_auth_header != NULL) { if (ret == FLB_EVENT_ENCODER_SUCCESS) { ret = flb_log_event_encoder_append_metadata_values( @@ -299,15 +470,6 @@ static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *recor } else { if (ctx->ingested_auth_header != NULL) { - /* iterate through the old record map to create the appendable new buffer */ - kv = record->via.map.ptr; - for(i = 0; i < record->via.map.size && ret == FLB_EVENT_ENCODER_SUCCESS; i++) { - ret = flb_log_event_encoder_append_body_values( - &ctx->log_encoder, - FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].key), - FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].val)); - } - if (ret == FLB_EVENT_ENCODER_SUCCESS) { ret = flb_log_event_encoder_append_body_values( &ctx->log_encoder, @@ -316,13 +478,12 @@ static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *recor ctx->ingested_auth_header_len)); } } - else { - if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_set_body_from_msgpack_object( - &ctx->log_encoder, - record); - } - } + } + + if (ret == FLB_EVENT_ENCODER_SUCCESS) { + ret = append_remote_addr(ctx, + ctx->current_remote_addr, + ctx->current_remote_addr_len); } if (ret == FLB_EVENT_ENCODER_SUCCESS) { @@ -378,7 +539,9 @@ static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char tag_from_record = tag_key(ctx, &result.data); } - process_flb_log_append(ctx, &result.data, tag, tag_from_record, tm); + process_flb_log_append(ctx, &result.data, tag, tag_from_record, tm, + ctx->current_remote_addr, + ctx->current_remote_addr_len); flb_log_event_encoder_reset(&ctx->log_encoder); } @@ -393,7 +556,9 @@ static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char tag_from_record = tag_key(ctx, &record); } - process_flb_log_append(ctx, &record, tag, tag_from_record, tm); + process_flb_log_append(ctx, &record, tag, tag_from_record, tm, + ctx->current_remote_addr, + ctx->current_remote_addr_len); /* TODO : Optimize this * @@ -616,7 +781,9 @@ static int process_hec_payload(struct flb_splunk *ctx, struct splunk_conn *conn, static int process_hec_raw_payload(struct flb_splunk *ctx, struct splunk_conn *conn, flb_sds_t tag, struct mk_http_session *session, - struct mk_http_request *request) + struct mk_http_request *request, + const char *remote_addr, + size_t remote_addr_len) { int ret = -1; struct mk_http_header *header; @@ -691,6 +858,9 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, off_t diff; flb_sds_t tag; struct mk_http_header *header; + char *hval = NULL; + size_t hlen = 0; + const char *peer; if (request->uri.data[0] != '/') { send_response(conn, 400, "error: invalid request\n"); @@ -835,13 +1005,34 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, request->data.len = out_chunked_size; } + /* Resolve per-request remote address */ + ctx->current_remote_addr = NULL; + ctx->current_remote_addr_len = 0; + + if (http_header_lookup(HTTP_PROTOCOL_VERSION_11, request, + SPLUNK_XFF_HEADER, &hval, &hlen) == 0) { + extract_remote_address(hval, hlen, conn->connection, + &ctx->current_remote_addr, + &ctx->current_remote_addr_len); + } + else { + /* fallback to peer addr */ + peer = flb_connection_get_remote_address(conn->connection); + if (peer) { + ctx->current_remote_addr = peer; + ctx->current_remote_addr_len = strlen(peer); + } + } + /* Handle every ingested payload cleanly */ flb_log_event_encoder_reset(&ctx->log_encoder); if (request->method == MK_METHOD_POST) { if (strcasecmp(uri, "/services/collector/raw/1.0") == 0 || strcasecmp(uri, "/services/collector/raw") == 0) { - ret = process_hec_raw_payload(ctx, conn, tag, session, request); + ret = process_hec_raw_payload(ctx, conn, tag, session, request, + ctx->current_remote_addr, + ctx->current_remote_addr_len); if (ret == -2) { /* Response already sent, skip further response */ @@ -927,6 +1118,10 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, request->data.data = original_data; request->data.len = original_data_size; + /* Clear per-request remote address to avoid leakage across keep-alive/pipeline */ + ctx->current_remote_addr = NULL; + ctx->current_remote_addr_len = 0; + return ret; } @@ -1138,6 +1333,10 @@ int splunk_prot_handle_ng(struct flb_http_request *request, struct flb_splunk *context; int ret = -1; flb_sds_t tag; + struct flb_http_server_session *parent_session; + char *hval = NULL; + size_t hlen = 0; + const char *peer; context = (struct flb_splunk *) response->stream->user_data; @@ -1185,6 +1384,28 @@ int splunk_prot_handle_ng(struct flb_http_request *request, /* Handle every ingested payload cleanly */ flb_log_event_encoder_reset(&context->log_encoder); + /* Resolve per-request remote address */ + context->current_remote_addr = NULL; + context->current_remote_addr_len = 0; + + parent_session = (struct flb_http_server_session *) request->stream->parent; + if (parent_session != NULL) { + if (http_header_lookup(HTTP_PROTOCOL_VERSION_20, request, + SPLUNK_XFF_HEADER, &hval, &hlen) == 0) { + extract_remote_address(hval, hlen, parent_session->connection, + &context->current_remote_addr, + &context->current_remote_addr_len); + } + else { + /* fallback to peer addr */ + peer = flb_connection_get_remote_address(parent_session->connection); + if (peer) { + context->current_remote_addr = peer; + context->current_remote_addr_len = strlen(peer); + } + } + } + if (request->method != HTTP_METHOD_POST) { /* HEAD, PUT, PATCH, and DELETE methods are prohibited to use.*/ send_response_ng(response, 400, "error: invalid HTTP method\n"); @@ -1239,5 +1460,10 @@ int splunk_prot_handle_ng(struct flb_http_request *request, } flb_sds_destroy(tag); + + /* Clear per-request remote address to avoid leakage across keep-alive/pipeline */ + context->current_remote_addr = NULL; + context->current_remote_addr_len = 0; + return ret; } diff --git a/plugins/in_splunk/splunk_prot.h b/plugins/in_splunk/splunk_prot.h index f979d4755a8..5c965155686 100644 --- a/plugins/in_splunk/splunk_prot.h +++ b/plugins/in_splunk/splunk_prot.h @@ -25,6 +25,8 @@ #define SPLUNK_AUTH_MISSING_CRED -1 #define SPLUNK_AUTH_UNAUTHORIZED -2 +#define SPLUNK_XFF_HEADER "x-forwarded-for" + #include int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, diff --git a/tests/runtime/in_splunk.c b/tests/runtime/in_splunk.c index 666866442c7..8221b9857e3 100644 --- a/tests/runtime/in_splunk.c +++ b/tests/runtime/in_splunk.c @@ -27,6 +27,7 @@ #include #include #include "flb_tests_runtime.h" +#include "../../plugins/in_splunk/splunk_prot.h" #define JSON_CONTENT_TYPE "application/json" @@ -922,6 +923,92 @@ void flb_test_splunk_collector_event_1_0() flb_test_splunk(8819, "/services/collector/event/1.0"); } +void flb_test_splunk_xff_extract() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + int num; + size_t b_sent; + char *buf = "{\"event\": \"Pony 1 has left the barn\"}"; + char *expected = "\"xff\":\"203.0.113.1\""; + char *xff_value = " 203.0.113.1, 70.41.3.18, 150.172.238.178"; + char sport[16]; + int port = 8820; + + snprintf(sport, 16, "%d", port); + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = expected; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "port", sport, + "add_remote_addr", "true", + "remote_addr_key", "xff", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ctx->httpc = splunk_client_ctx_create(port); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/services/collector/event", + buf, strlen(buf), "127.0.0.1", port, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, + strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + JSON_CONTENT_TYPE, strlen(JSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + ret = flb_http_add_header(c, SPLUNK_XFF_HEADER, + strlen(SPLUNK_XFF_HEADER), + xff_value, strlen(xff_value)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("splunk_client failed"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + else if (!TEST_CHECK(c->resp.status == 200)) { + TEST_MSG("http response code error. expect: 200, got: %d\n", c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + TEST_LIST = { {"health", flb_test_splunk_health}, {"collector", flb_test_splunk_collector}, @@ -936,5 +1023,6 @@ TEST_LIST = { {"tag_key", flb_test_splunk_tag_key}, {"collector_event_with_auth_key", flb_test_splunk_collector_event_hec_token_key}, {"collector_raw_with_auth_key", flb_test_splunk_collector_raw_hec_token_key}, + {"collector_xff_extract", flb_test_splunk_xff_extract}, {NULL, NULL} };