From ad792aed5705bfc3be79a093dfb786f3100f07da Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 4 Feb 2026 11:35:57 -0600 Subject: [PATCH 1/2] opentelemetry: logs opentelemetry: encode in-place to avoid extra copy - Encode OTLP-JSON logs directly into the caller's encoder instead of a local encoder and copying the result, removing the final buffer copy in flb_opentelemetry_logs_json_to_msgpack. - In process_json_payload_resource_logs_entry, build each scope group in the main encoder and rollback by truncating the buffer when no log records are added or on error, instead of using a temporary encoder and appending its buffer. This removes per-scope allocations and the msgpack_sbuffer_write copy. - On failure, restore encoder->buffer.size and output pointers so the encoder is left unchanged. - Add return checks for flb_log_event_encoder_group_init, group_header_end, and group_end. - Require a non-NULL encoder (callers already pass a valid encoder). Signed-off-by: Eduardo Silva --- src/opentelemetry/flb_opentelemetry_logs.c | 149 ++++++++++++--------- 1 file changed, 83 insertions(+), 66 deletions(-) diff --git a/src/opentelemetry/flb_opentelemetry_logs.c b/src/opentelemetry/flb_opentelemetry_logs.c index 4d39324ddc9..f2693092cfe 100644 --- a/src/opentelemetry/flb_opentelemetry_logs.c +++ b/src/opentelemetry/flb_opentelemetry_logs.c @@ -52,6 +52,7 @@ static int process_json_payload_log_records_entry( msgpack_object *severity_text = NULL; msgpack_object *trace_id = NULL; msgpack_object *span_id = NULL; + const char *body_key; struct flb_time timestamp; if (error_status) { @@ -336,7 +337,7 @@ static int process_json_payload_log_records_entry( flb_log_event_encoder_dynamic_field_reset(&encoder->body); } else { - const char *body_key = logs_body_key ? logs_body_key : "log"; + body_key = logs_body_key ? logs_body_key : "log"; flb_log_event_encoder_append_cstring( encoder, FLB_LOG_EVENT_BODY, @@ -442,9 +443,9 @@ static int process_json_payload_resource_logs_entry (struct flb_log_event_encode int ret; int result = 0; size_t index; - size_t size_before; - size_t size_after; - struct flb_log_event_encoder *tmp_encoder; + size_t size_after_header; + size_t size_after_logs; + size_t size_before_group; msgpack_object *obj; msgpack_object_map *resource = NULL; msgpack_object *resource_attr = NULL; @@ -530,16 +531,17 @@ static int process_json_payload_resource_logs_entry (struct flb_log_event_encode } for (index = 0 ; index < scope_logs->size ; index++) { - - /* - * we use a temporary encoder to hold the group information, if no record entries are added - * we will discard it. - **/ - tmp_encoder = flb_log_event_encoder_create(encoder->format); - flb_log_event_encoder_group_init(tmp_encoder); + size_before_group = encoder->buffer.size; + ret = flb_log_event_encoder_group_init(encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + if (error_status) { + *error_status = FLB_OTEL_LOGS_ERR_GROUP_METADATA; + } + return -FLB_OTEL_LOGS_ERR_GROUP_METADATA; + } /* pack internal schema */ - ret = flb_log_event_encoder_append_metadata_values(tmp_encoder, + ret = flb_log_event_encoder_append_metadata_values(encoder, FLB_LOG_EVENT_STRING_VALUE("schema", 6), FLB_LOG_EVENT_STRING_VALUE("otlp", 4), FLB_LOG_EVENT_STRING_VALUE("resource_id", 11), @@ -550,27 +552,33 @@ static int process_json_payload_resource_logs_entry (struct flb_log_event_encode if (error_status) { *error_status = FLB_OTEL_LOGS_ERR_GROUP_METADATA; } - flb_log_event_encoder_destroy(tmp_encoder); + flb_log_event_encoder_rollback_record(encoder); + encoder->buffer.size = size_before_group; + encoder->output_buffer = encoder->buffer.data; + encoder->output_length = encoder->buffer.size; return -FLB_OTEL_LOGS_ERR_GROUP_METADATA; } /* Resource key */ - flb_log_event_encoder_append_body_string(tmp_encoder, "resource", 8); + flb_log_event_encoder_append_body_string(encoder, "resource", 8); /* start resource value (map) */ - flb_log_event_encoder_body_begin_map(tmp_encoder); + flb_log_event_encoder_body_begin_map(encoder); /* Check if we have OTel resource attributes */ if (resource_attr) { - flb_log_event_encoder_append_body_string(tmp_encoder, "attributes", 10); - result = flb_otel_utils_json_payload_append_converted_kvlist(tmp_encoder, + flb_log_event_encoder_append_body_string(encoder, "attributes", 10); + result = flb_otel_utils_json_payload_append_converted_kvlist(encoder, FLB_LOG_EVENT_BODY, resource_attr); if (result < 0) { if (error_status) { *error_status = FLB_OTEL_RESOURCE_INVALID_ATTRIBUTE; } - flb_log_event_encoder_destroy(tmp_encoder); + flb_log_event_encoder_rollback_record(encoder); + encoder->buffer.size = size_before_group; + encoder->output_buffer = encoder->buffer.data; + encoder->output_length = encoder->buffer.size; return -FLB_OTEL_RESOURCE_INVALID_ATTRIBUTE; } } @@ -580,20 +588,20 @@ static int process_json_payload_resource_logs_entry (struct flb_log_event_encode result = flb_otel_utils_find_map_entry_by_key(resource, "droppedAttributesCount", 0, FLB_TRUE); if (result >= 0) { obj = &resource->ptr[result].val; - flb_log_event_encoder_append_body_values(tmp_encoder, + flb_log_event_encoder_append_body_values(encoder, FLB_LOG_EVENT_CSTRING_VALUE("dropped_attributes_count"), FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(obj)); } } if (resource_schema_url) { - flb_log_event_encoder_append_body_values(tmp_encoder, + flb_log_event_encoder_append_body_values(encoder, FLB_LOG_EVENT_CSTRING_VALUE("schema_url"), FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(resource_schema_url)); } /* close resource map */ - flb_log_event_encoder_body_commit_map(tmp_encoder); + flb_log_event_encoder_body_commit_map(encoder); /* scope schemaUrl */ result = flb_otel_utils_find_map_entry_by_key(&scope_logs->ptr[index].via.map, "schemaUrl", 0, FLB_TRUE); @@ -623,17 +631,17 @@ static int process_json_payload_resource_logs_entry (struct flb_log_event_encode */ /* append scope key */ - flb_log_event_encoder_append_body_string(tmp_encoder, "scope", 5); + flb_log_event_encoder_append_body_string(encoder, "scope", 5); /* scope map value */ - flb_log_event_encoder_body_begin_map(tmp_encoder); + flb_log_event_encoder_body_begin_map(encoder); /* scope name */ result = flb_otel_utils_find_map_entry_by_key(&scope->via.map, "name", 0, FLB_TRUE); if (result >= 0) { obj = &scope->via.map.ptr[result].val; if (obj->type == MSGPACK_OBJECT_STR) { - flb_log_event_encoder_append_body_values(tmp_encoder, + flb_log_event_encoder_append_body_values(encoder, FLB_LOG_EVENT_CSTRING_VALUE("name"), FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(obj)); } @@ -644,7 +652,7 @@ static int process_json_payload_resource_logs_entry (struct flb_log_event_encode if (result >= 0) { obj = &scope->via.map.ptr[result].val; if (obj->type == MSGPACK_OBJECT_STR) { - flb_log_event_encoder_append_body_values(tmp_encoder, + flb_log_event_encoder_append_body_values(encoder, FLB_LOG_EVENT_CSTRING_VALUE("version"), FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(obj)); } @@ -655,15 +663,18 @@ static int process_json_payload_resource_logs_entry (struct flb_log_event_encode if (result >= 0) { obj = &scope->via.map.ptr[result].val; if (obj->type == MSGPACK_OBJECT_ARRAY) { - flb_log_event_encoder_append_body_string(tmp_encoder, "attributes", 10); - result = flb_otel_utils_json_payload_append_converted_kvlist(tmp_encoder, + flb_log_event_encoder_append_body_string(encoder, "attributes", 10); + result = flb_otel_utils_json_payload_append_converted_kvlist(encoder, FLB_LOG_EVENT_BODY, obj); if (result != 0) { if (error_status) { *error_status = FLB_OTEL_LOGS_ERR_SCOPE_KVLIST; } - flb_log_event_encoder_destroy(tmp_encoder); + flb_log_event_encoder_rollback_record(encoder); + encoder->buffer.size = size_before_group; + encoder->output_buffer = encoder->buffer.data; + encoder->output_length = encoder->buffer.size; return -FLB_OTEL_LOGS_ERR_SCOPE_KVLIST; } } @@ -672,36 +683,50 @@ static int process_json_payload_resource_logs_entry (struct flb_log_event_encode if (error_status) { *error_status = FLB_OTEL_LOGS_ERR_SCOPE_KVLIST; } - flb_log_event_encoder_destroy(tmp_encoder); + flb_log_event_encoder_rollback_record(encoder); + encoder->buffer.size = size_before_group; + encoder->output_buffer = encoder->buffer.data; + encoder->output_length = encoder->buffer.size; return -FLB_OTEL_LOGS_ERR_SCOPE_KVLIST; } } /* scope schemaUrl */ if (scope_schema_url) { - flb_log_event_encoder_append_body_values(tmp_encoder, + flb_log_event_encoder_append_body_values(encoder, FLB_LOG_EVENT_CSTRING_VALUE("schema_url"), FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(scope_schema_url)); } /* close scope map */ - flb_log_event_encoder_commit_map(tmp_encoder, FLB_LOG_EVENT_BODY); + flb_log_event_encoder_commit_map(encoder, FLB_LOG_EVENT_BODY); + } + ret = flb_log_event_encoder_group_header_end(encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + if (error_status) { + *error_status = FLB_OTEL_LOGS_ERR_GROUP_METADATA; + } + encoder->buffer.size = size_before_group; + encoder->output_buffer = encoder->buffer.data; + encoder->output_length = encoder->buffer.size; + return -FLB_OTEL_LOGS_ERR_GROUP_METADATA; } - flb_log_event_encoder_group_header_end(tmp_encoder); /* before processing the scope logs, grab the number of bytes written */ - size_before = tmp_encoder->buffer.size; + size_after_header = encoder->buffer.size; /* Process the scope logs entry */ result = process_json_payload_scope_logs_entry( - tmp_encoder, + encoder, &scope_logs->ptr[index], logs_body_key, error_status); - size_after = tmp_encoder->buffer.size; + size_after_logs = encoder->buffer.size; if (result < 0) { - flb_log_event_encoder_destroy(tmp_encoder); + encoder->buffer.size = size_before_group; + encoder->output_buffer = encoder->buffer.data; + encoder->output_length = encoder->buffer.size; return result; } @@ -709,19 +734,23 @@ static int process_json_payload_resource_logs_entry (struct flb_log_event_encode * If at least one log was valid and registered, finalize the group and copy the content to * the main encoder */ - if (size_after > size_before) { - flb_log_event_encoder_group_end(tmp_encoder); - - /* Append the temporary encoder output to the main encoder */ - msgpack_sbuffer_write(&encoder->buffer, - tmp_encoder->output_buffer, - tmp_encoder->output_length); - + if (size_after_logs > size_after_header) { + ret = flb_log_event_encoder_group_end(encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + if (error_status) { + *error_status = FLB_OTEL_LOGS_ERR_GROUP_METADATA; + } + encoder->buffer.size = size_before_group; + encoder->output_buffer = encoder->buffer.data; + encoder->output_length = encoder->buffer.size; + return -FLB_OTEL_LOGS_ERR_GROUP_METADATA; + } + } + else { + encoder->buffer.size = size_before_group; encoder->output_buffer = encoder->buffer.data; encoder->output_length = encoder->buffer.size; } - - flb_log_event_encoder_destroy(tmp_encoder); } return result; @@ -820,7 +849,7 @@ int flb_opentelemetry_logs_json_to_msgpack(struct flb_log_event_encoder *encoder char *msgpack_body; msgpack_unpacked unpacked_root; size_t offset = 0; - struct flb_log_event_encoder local_log_encoder; + size_t size_before; if (encoder == NULL) { return -1; @@ -834,12 +863,7 @@ int flb_opentelemetry_logs_json_to_msgpack(struct flb_log_event_encoder *encoder return -1; } - /* Initialize the local encoder, we use this one in case an exception happens */ - result = flb_log_event_encoder_init(&local_log_encoder, FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2); - if (result != FLB_EVENT_ENCODER_SUCCESS) { - flb_free(msgpack_body); - return -1; - } + size_before = encoder->buffer.size; offset = 0; msgpack_unpacked_init(&unpacked_root); @@ -848,7 +872,6 @@ int flb_opentelemetry_logs_json_to_msgpack(struct flb_log_event_encoder *encoder msgpack_body_length, &offset); if (result != MSGPACK_UNPACK_SUCCESS) { - flb_log_event_encoder_destroy(&local_log_encoder); msgpack_unpacked_destroy(&unpacked_root); flb_free(msgpack_body); if (error_status) { @@ -858,15 +881,17 @@ int flb_opentelemetry_logs_json_to_msgpack(struct flb_log_event_encoder *encoder } /* decode OTLP/JSON as raw messagepack and do the proper encoding (groups, name-to-lowercase, etc) */ - result = process_json_payload_root(&local_log_encoder, + result = process_json_payload_root(encoder, &unpacked_root.data, logs_body_key, error_status); if (result < 0) { - flb_log_event_encoder_destroy(&local_log_encoder); msgpack_unpacked_destroy(&unpacked_root); flb_free(msgpack_body); + encoder->buffer.size = size_before; + encoder->output_buffer = encoder->buffer.data; + encoder->output_length = encoder->buffer.size; return result; } @@ -874,16 +899,8 @@ int flb_opentelemetry_logs_json_to_msgpack(struct flb_log_event_encoder *encoder msgpack_unpacked_destroy(&unpacked_root); flb_free(msgpack_body); - /* copy local buffer into caller encoder buffer */ - if (local_log_encoder.output_length > 0) { - msgpack_sbuffer_write(&encoder->buffer, - local_log_encoder.output_buffer, - local_log_encoder.output_length); - encoder->output_buffer = encoder->buffer.data; - encoder->output_length = encoder->buffer.size; - } - - flb_log_event_encoder_destroy(&local_log_encoder); + encoder->output_buffer = encoder->buffer.data; + encoder->output_length = encoder->buffer.size; return result; } From 9b47af3d7598bf1eb8a02238cd8705aa7f163c5c Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 4 Feb 2026 13:45:20 -0600 Subject: [PATCH 2/2] benchmarks: add opentelemetry perf test Signed-off-by: Eduardo Silva --- benchmarks/CMakeLists.txt | 6 ++ benchmarks/opentelemetry.c | 164 +++++++++++++++++++++++++++++++++++++ 2 files changed, 170 insertions(+) create mode 100644 benchmarks/opentelemetry.c diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index b52a8d9db1a..62a22ce06c1 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -4,3 +4,9 @@ target_link_libraries(flb-bench-pack_json ${CMAKE_THREAD_LIBS_INIT} ) +add_executable(flb-bench-opentelemetry opentelemetry.c) +target_link_libraries(flb-bench-opentelemetry + fluent-bit-static + ${CMAKE_THREAD_LIBS_INIT} +) + diff --git a/benchmarks/opentelemetry.c b/benchmarks/opentelemetry.c new file mode 100644 index 00000000000..88e736c4423 --- /dev/null +++ b/benchmarks/opentelemetry.c @@ -0,0 +1,164 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2026 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * OpenTelemetry encoding benchmarks (OTLP logs, metrics, traces). + * Usage: flb-bench-opentelemetry -m -f [-i iterations] + * + * Modes: + * otlp-json-logs OTLP-JSON logs -> Fluent Bit log events + * (flb_opentelemetry_logs_json_to_msgpack) + */ + +#include +#include +#include +#include +#include + +#include +#include +#include + +static long diff_ns(struct timespec *s, struct timespec *e) +{ + return (e->tv_sec - s->tv_sec) * 1000000000L + (e->tv_nsec - s->tv_nsec); +} + +static int run_otlp_json_logs(char *json, size_t len, int iterations) +{ + int i; + int ret; + struct timespec ts, te; + struct flb_log_event_encoder enc; + int error_status; + long d_ns; + uint64_t total_bytes; + double mibps; + + ret = flb_log_event_encoder_init(&enc, FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + fprintf(stderr, "encoder init failed\n"); + return -1; + } + + ret = flb_opentelemetry_logs_json_to_msgpack(&enc, json, len, NULL, &error_status); + if (ret < 0) { + fprintf(stderr, "OTLP encode failed (error_status=%d), file must be OTLP-JSON (resourceLogs...)\n", error_status); + flb_log_event_encoder_destroy(&enc); + return -1; + } + flb_log_event_encoder_reset(&enc); + + printf("Benchmarking OTLP-JSON logs encoding (flb_opentelemetry_logs_json_to_msgpack)\n"); + printf("------------------------------------------------------------------------\n\n"); + printf("Iterations : %d\n", iterations); + printf("JSON size : %zu bytes\n", len); + printf("------------------------------------------------------------------------\n\n"); + + clock_gettime(CLOCK_MONOTONIC, &ts); + for (i = 0; i < iterations; i++) { + flb_log_event_encoder_reset(&enc); + ret = flb_opentelemetry_logs_json_to_msgpack(&enc, json, len, NULL, &error_status); + if (ret < 0) { + fprintf(stderr, "OTLP encode failed at iteration %d\n", i); + flb_log_event_encoder_destroy(&enc); + return -1; + } + } + clock_gettime(CLOCK_MONOTONIC, &te); + flb_log_event_encoder_destroy(&enc); + + d_ns = diff_ns(&ts, &te); + total_bytes = (uint64_t) len * (uint64_t) iterations; + mibps = (double) total_bytes / d_ns * 1e9 / (1024.0 * 1024.0); + + printf("------------------------------------------------------------------------\n"); + printf("Total time : %ld ns\n", d_ns); + printf("Per call : %ld ns\n", d_ns / (long) iterations); + printf("Throughput : %.2f MiB/s\n", mibps); + printf("------------------------------------------------------------------------\n"); + return 0; +} + +static void usage(const char *prog) +{ + fprintf(stderr, "Usage: %s -m -f [-i iterations]\n", prog); + fprintf(stderr, "\n"); + fprintf(stderr, "Modes:\n"); + fprintf(stderr, " otlp-json-logs OTLP-JSON logs encoding\n"); + fprintf(stderr, "\n"); +} + +int main(int argc, char **argv) +{ + int ret; + int iterations = 100; + char *input_file = NULL; + char *mode = NULL; + char *json; + size_t len; + int opt; + + while ((opt = getopt(argc, argv, "f:i:m:")) != -1) { + switch (opt) { + case 'f': + input_file = optarg; + break; + case 'i': + iterations = atoi(optarg); + if (iterations <= 0) { + fprintf(stderr, "Invalid -i (iterations): %s\n", optarg); + return 1; + } + break; + case 'm': + mode = optarg; + break; + default: + usage(argv[0]); + return 1; + } + } + + if (mode == NULL || input_file == NULL) { + fprintf(stderr, "Error: -m and -f are required.\n\n"); + usage(argv[0]); + return 1; + } + + ret = flb_utils_read_file(input_file, &json, &len); + if (ret != 0) { + fprintf(stderr, "error reading %s\n", input_file); + return 1; + } + + if (strcmp(mode, "otlp-json-logs") == 0) { + ret = run_otlp_json_logs(json, len, iterations); + } + else { + fprintf(stderr, "Unknown mode: %s\n", mode); + usage(argv[0]); + free(json); + return 1; + } + + free(json); + return ret != 0 ? 1 : 0; +}