out_es: sds: Implement es upstream with sds view#11647
out_es: sds: Implement es upstream with sds view#11647
Conversation
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughA non-owning SDS string view type ( Changes
Sequence DiagramsequenceDiagram
autonumber
participant Client
participant "ES Output" as ES
participant "Upstream HA" as HA
participant "Upstream Conn" as Conn
participant "Elasticsearch Node" as Node
Client->>ES: flush trigger
ES->>HA: select active node (flb_upstream_ha_node_get)
HA-->>ES: node info (properties)
ES->>ES: resolve node props (path, pipeline, creds)
ES->>ES: es_compose_bulk_uri()
ES->>HA: get upstream connection for node
HA-->>Conn: connection
ES->>Conn: HTTP POST composed /_bulk[?pipeline] with headers
Conn->>Node: send request
Node-->>Conn: response
Conn-->>ES: response
ES->>ES: destroy composed uri/signature as needed
ES-->>Client: flush result
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Hi @mabrarov, |
7408ae1 to
ce5b402
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7408ae174b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (1)
tests/runtime/out_elasticsearch.c (1)
1092-1118: Add one upstream test that goes throughcb_es_flush().All of the new cases use the formatter hook, so they stop at
out_es_plugin.test_formatter.callbackand never exercise the new HA branch incb_es_flush()(flb_upstream_ha_node_get(), per-node URI composition, or node-scoped auth). A single response/integration case with two upstream nodes would cover the behavior this PR adds.Based on learnings: Add or update tests for behavior changes, especially protocol parsing and encoder/decoder paths.
Also applies to: 1121-1326
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/runtime/out_elasticsearch.c` around lines 1092 - 1118, Add an integration test that exercises cb_es_flush() end-to-end (including flb_upstream_ha_node_get, per-node URI composition and node-scoped auth) by creating a context with create_upstream_test_ctx but without installing the formatter hook (avoid out_es_plugin.test_formatter.callback) so the output plugin performs a real flush; specifically, create an upstream file with two nodes, configure the "es" output to use that upstream, send records and assert the request goes through the HA branch (e.g., by mocking responses for both nodes or asserting composed URIs and auth for the selected node), ensuring the new HA behavior in cb_es_flush() is exercised.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@plugins/out_es/es_conf.c`:
- Around line 249-260: flb_upstream_ha_from_file() may return an HA object with
zero nodes which later causes cb_es_flush() to loop on FLB_RETRY when
flb_upstream_ha_node_get() returns NULL; after creating ctx->ha in the upstream
branch check that the HA contains at least one node (e.g., via the HA API or
node-count accessor) and if empty log an error (use flb_plg_error with context
like "upstream file contains no [NODE] entries"), call flb_es_conf_destroy(ctx)
and return NULL; ensure this validation occurs immediately after
flb_upstream_ha_from_file() and before flb_output_upstream_ha_set(ctx->ha, ins)
so empty upstream files are rejected during init.
In `@plugins/out_es/es.c`:
- Around line 885-895: The code currently assigns the result of
flb_sds_printf(...) directly back into header_line and then calls
flb_sds_len(header_line), which dereferences header_line if flb_sds_printf
returns NULL and the original SDS is lost; change this by printing into a
temporary flb_sds_t (e.g., tmp_header) and only assign it to header_line after
verifying tmp_header != NULL, and always check tmp_header for NULL before
calling flb_sds_len or using header_line (update call sites around
flb_sds_printf and flb_sds_len that reference header_line accordingly).
- Around line 969-975: The current check uses
flb_sds_view_is_empty(http_user/http_passwd) which treats "" the same as a
missing property and drops Basic Auth when the password is intentionally empty;
change the condition to check for presence by testing the .buf pointer returned
by es_get_property_view instead (e.g. if (http_user.buf != NULL &&
http_passwd.buf != NULL) call flb_http_basic_auth), so an empty-string password
still results in an Authorization header; update the code around
es_get_property_view/http_user/http_passwd and the flb_http_basic_auth call
accordingly.
In `@src/flb_sds.c`:
- Around line 92-95: flb_sds_create_from_view currently passes view.len (size_t)
directly into flb_sds_create_len(int) causing truncation for lengths > INT_MAX;
update flb_sds_create_from_view to check that view.len <= INT_MAX before calling
flb_sds_create_len and handle the overflow case (e.g., return NULL or propagate
an error) instead of narrowing silently, ensuring any error path is
documented/consistent with other SDS constructors.
---
Nitpick comments:
In `@tests/runtime/out_elasticsearch.c`:
- Around line 1092-1118: Add an integration test that exercises cb_es_flush()
end-to-end (including flb_upstream_ha_node_get, per-node URI composition and
node-scoped auth) by creating a context with create_upstream_test_ctx but
without installing the formatter hook (avoid
out_es_plugin.test_formatter.callback) so the output plugin performs a real
flush; specifically, create an upstream file with two nodes, configure the "es"
output to use that upstream, send records and assert the request goes through
the HA branch (e.g., by mocking responses for both nodes or asserting composed
URIs and auth for the selected node), ensuring the new HA behavior in
cb_es_flush() is exercised.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 619235e1-d33e-4dba-b62c-1648782b2631
📒 Files selected for processing (7)
include/fluent-bit/flb_sds.hplugins/out_es/es.cplugins/out_es/es.hplugins/out_es/es_conf.csrc/flb_sds.ctests/internal/sds.ctests/runtime/out_elasticsearch.c
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
plugins/out_es/es.c (1)
91-104: Apply the sameflb_sds_printfnull-check pattern used forheader_line.If
flb_sds_printffails and returnsNULL, the buffer allocated byflb_sds_create_sizemay be leaked (depending on internal SDS behavior), anduriis left in an inconsistent state. The same defensive pattern applied at lines 987-995 should be used here.♻️ Suggested fix
if (!flb_sds_view_is_empty(pipeline)) { uri = flb_sds_create_size(path.len + pipeline.len + 19); if (uri == NULL) { flb_errno(); return NULL; } - uri = flb_sds_printf(&uri, "%.*s/_bulk/?pipeline=%.*s", - (int) path.len, path.buf, - (int) pipeline.len, pipeline.buf); + if (flb_sds_printf(&uri, "%.*s/_bulk/?pipeline=%.*s", + (int) path.len, path.buf, + (int) pipeline.len, pipeline.buf) == NULL) { + flb_sds_destroy(uri); + return NULL; + } } else { uri = flb_sds_create_size(path.len + 8); if (uri == NULL) { flb_errno(); return NULL; } - uri = flb_sds_printf(&uri, "%.*s/_bulk", - (int) path.len, path.buf); + if (flb_sds_printf(&uri, "%.*s/_bulk", + (int) path.len, path.buf) == NULL) { + flb_sds_destroy(uri); + return NULL; + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/out_es/es.c` around lines 91 - 104, The flb_sds_printf call that populates uri can return NULL and leave the previously allocated SDS buffer leaked or uri in an inconsistent state; replicate the defensive pattern used for header_line: after each flb_sds_printf(...) assignment to uri check if uri == NULL, call flb_errno(), free/destroy the allocated uri SDS (using the same cleanup used in the header_line fix), and return NULL so no leak or invalid state remains; apply this to both branches (the pipeline and non-pipeline uri constructions).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@plugins/out_es/es.c`:
- Around line 954-960: es_compose_bulk_uri() can return NULL and code jumps to
the retry label while the local http client pointer c may still be NULL; update
the retry handling to guard the flb_http_client_destroy(c) call by checking c !=
NULL before calling flb_http_client_destroy to avoid a NULL dereference (look
for the retry: label, the variable c, the es_compose_bulk_uri() call and the
flb_http_client_destroy() invocation).
---
Nitpick comments:
In `@plugins/out_es/es.c`:
- Around line 91-104: The flb_sds_printf call that populates uri can return NULL
and leave the previously allocated SDS buffer leaked or uri in an inconsistent
state; replicate the defensive pattern used for header_line: after each
flb_sds_printf(...) assignment to uri check if uri == NULL, call flb_errno(),
free/destroy the allocated uri SDS (using the same cleanup used in the
header_line fix), and return NULL so no leak or invalid state remains; apply
this to both branches (the pipeline and non-pipeline uri constructions).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 0c169969-2f0b-43c5-9b8a-6e9d6c48eb0e
📒 Files selected for processing (2)
plugins/out_es/es.cplugins/out_es/es_conf.c
🚧 Files skipped from review as they are similar to previous changes (1)
- plugins/out_es/es_conf.c
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@plugins/out_es/es.c`:
- Around line 970-980: The current auth selection checks instance/cloud basic
auth before node-local API key, causing nodes that only set http_api_key to
incorrectly inherit http_user/http_passwd; change the order in the auth decision
(the block using es_get_property_view for http_user, http_passwd, http_api_key)
to prefer node-local credentials first: if http_user and http_passwd (from
es_get_property_view) then call flb_http_basic_auth with those; else if
http_api_key is non-empty (flb_sds_view_is_empty) then use the node API key
handling; only if neither node-local basic nor node API key exist fall back to
ctx->cloud_user/ctx->cloud_passwd and call flb_http_basic_auth with cloud
credentials.
- Around line 91-93: The formatted URI currently inserts an extra '/' before the
query string when a pipeline is present; change the flb_sds_printf call that
builds uri (the line using "%.*s/_bulk/?pipeline=%.*s") to remove that slash so
it uses "%.*s/_bulk?pipeline=%.*s" and thus preserves the intended _bulk path
regardless of pipeline value (refer to variables uri, path, pipeline and the
flb_sds_printf invocation to locate the change).
- Around line 1103-1114: The retry path currently leaks the SigV4 `signature`
allocated by add_aws_auth(); ensure you free it before jumping to `retry` and on
the error/cleanup path: check `signature` for non-NULL and call the appropriate
destroy function (e.g. flb_sds_destroy(signature)) and then set `signature =
NULL`; update the cleanup block that runs on retries (the code around the
`retry:` label and where add_aws_auth() is called) to always destroy `signature`
to prevent unbounded leaks.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io> Co-authored-by: Marat Abrarov <abrarov@gmail.com>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io> Co-authored-by: Marat Abrarov <abrarov@gmail.com>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
2c44bcf to
25de407
Compare
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
| value = flb_upstream_node_get_property(property, node); | ||
| if (value != NULL) { | ||
| ret = flb_utils_bool(value); | ||
| if (ret != -1) { |
There was a problem hiding this comment.
When boolean property is set in plugin configuration and not in upstream node configuration, then it is handled differently - refer to
fluent-bit/src/flb_config_map.c
Line 791 in 3e414ac
which is used in flb_output_config_map_set which is used in flb_es_conf_create. In that place if boolean property is given unsupported value then it fails plugin configuration which, if I understand Fluent Bit correctly, fails start of Fluent Bit. This question (about validation of configuration) is described in #11647 (comment) too.
To implement Elasticsearch upstream connection feature,
we need to implement sds_view which means borrowed sds type operations at first.
Then, we can implement upstream HA feature on out_es plugin.
This could be more simple approach against #7608.
Enter
[N/A]in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
Example configs are here:
Normal es ingestions:
fluent-bit-es/fluent-bit.conf and (same in YAML format) fluent-bit-es/fluent-bit.yaml
upstream ingestions:
fluent-bit-es-cluster/fluent-bit.yaml in mabrarov/elastic-stack repository.
Normal:
Upstream:
Normal:
Upstream:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-testlabel to test for all targets (requires maintainer to do).Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Behavior Changes
Tests