out_azure_kusto: Add changes to defer close old resource handles in subsequent cycles#11418
out_azure_kusto: Add changes to defer close old resource handles in subsequent cycles#11418ag-ramachandran wants to merge 4 commits intofluent:masterfrom
Conversation
📝 WalkthroughWalkthroughAdds three deferred-cleanup fields and implements a two-stage resource rotation in the Azure Kusto plugin to delay destruction of previous upstreams and identity tokens while new resources are installed, preventing use-after-free during high-volume concurrent operations. Changes
Sequence Diagram(s)sequenceDiagram
participant Loader as azure_kusto_load_ingestion_resources
participant Parser as storage_parser
participant Identity as IdentityService
participant Active as ActiveResources
participant Old as OldResources
Loader->>Parser: parse storage config
Parser-->>Loader: storage parameters
Loader->>Identity: request new identity token
Identity-->>Loader: return new token
Loader->>Old: destroy existing old_blob_ha/old_queue_ha/old_identity_token (if any)
Loader->>Old: move Active.blob_ha -> Old.old_blob_ha\nActive.queue_ha -> Old.old_queue_ha\nActive.identity_token -> Old.old_identity_token
Loader->>Active: assign new blob_ha, queue_ha, identity_token
Loader->>Loader: update load_time
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
The patch looks good but we need to add a |
33d6e3b to
9f8e03e
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@plugins/out_azure_kusto/azure_kusto_conf.c`:
- Around line 616-638: The rotation logic in
azure_kusto_load_ingestion_resources moves and destroys HA and token objects
while worker functions azure_kusto_create_blob and azure_kusto_enqueue_ingestion
may access them without holding resources_mutex, leading to use-after-free; fix
by ensuring callers grab stable references under the mutex (or an equivalent
refcount) before using them: in azure_kusto_create_blob and
azure_kusto_enqueue_ingestion acquire resources_mutex, copy
ctx->resources->blob_ha/queue_ha and ctx->resources->identity_token into local
variables, call flb_upstream_ha_node_get (or increment the object's refcount)
while still holding the lock, then release the lock and use the obtained
node/token, and update azure_kusto_load_ingestion_resources to only destroy
old_* objects after they are no longer referenced (i.e., after refcount reaches
zero) or ensure destruction happens under the same mutex that prevents new gets.
| if (identity_token) { | ||
| /* Deferred cleanup: destroy resources from two refresh cycles ago, | ||
| * then move current resources to 'old' before assigning new ones. | ||
| * This avoids use-after-free when other threads may still be using | ||
| * the current resources during high-volume operations. */ | ||
| if (ctx->resources->old_blob_ha) { | ||
| flb_upstream_ha_destroy(ctx->resources->old_blob_ha); | ||
| flb_plg_debug(ctx->ins, "clearing up old blob HA"); | ||
| } | ||
| if (ctx->resources->old_queue_ha) { | ||
| flb_upstream_ha_destroy(ctx->resources->old_queue_ha); | ||
| flb_plg_debug(ctx->ins, "clearing up old queue HA"); | ||
| } | ||
| if (ctx->resources->old_identity_token) { | ||
| flb_sds_destroy(ctx->resources->old_identity_token); | ||
| flb_plg_debug(ctx->ins, "clearing up old identity token"); | ||
| } | ||
|
|
||
| /* Move current to old */ | ||
| ctx->resources->old_blob_ha = ctx->resources->blob_ha; | ||
| ctx->resources->old_queue_ha = ctx->resources->queue_ha; | ||
| ctx->resources->old_identity_token = ctx->resources->identity_token; | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find all uses of the resource pointers
cd /tmp
rg -nP -C3 --type=c 'ctx->resources->(blob_ha|queue_ha|identity_token|old_blob_ha|old_queue_ha|old_identity_token)'Repository: fluent/fluent-bit
Length of output: 209
🏁 Script executed:
#!/bin/bash
# Find mutex usage in the file
cd /tmp
rg -nP -C3 --type=c 'resources_mutex' plugins/out_azure_kustoRepository: fluent/fluent-bit
Length of output: 124
🏁 Script executed:
#!/bin/bash
# Get broader context of the file structure
cd /tmp
wc -l plugins/out_azure_kusto/azure_kusto_conf.cRepository: fluent/fluent-bit
Length of output: 134
🏁 Script executed:
#!/bin/bash
# Check the structure definition and initialization
cd /tmp
rg -nP -A5 'struct.*resources' plugins/out_azure_kusto/azure_kusto_conf.cRepository: fluent/fluent-bit
Length of output: 143
🏁 Script executed:
#!/bin/bash
# Find the actual file location
fd -type f -name "azure_kusto_conf.c"Repository: fluent/fluent-bit
Length of output: 231
🏁 Script executed:
#!/bin/bash
# List top-level plugins directory
find . -maxdepth 3 -type d -name "*azure*" 2>/dev/null | head -20Repository: fluent/fluent-bit
Length of output: 166
🏁 Script executed:
#!/bin/bash
# Check structure
git ls-files | grep -i azure | head -20Repository: fluent/fluent-bit
Length of output: 829
🏁 Script executed:
#!/bin/bash
# List contents of the kusto plugin directory
git ls-files | grep out_azure_kustoRepository: fluent/fluent-bit
Length of output: 551
🏁 Script executed:
#!/bin/bash
# Now search for resource accesses in the correct location
fd -name "azure_kusto_conf.c" -exec cat -n {} \; | sed -n '610,650p'Repository: fluent/fluent-bit
Length of output: 291
🏁 Script executed:
#!/bin/bash
# Search for resources_mutex in kusto plugin
rg 'resources_mutex' plugins/out_azure_kustoRepository: fluent/fluent-bit
Length of output: 901
🏁 Script executed:
#!/bin/bash
# Read the code around lines 616-638
cat plugins/out_azure_kusto/azure_kusto_conf.c | head -650 | tail -100Repository: fluent/fluent-bit
Length of output: 5289
🏁 Script executed:
#!/bin/bash
# Get line count to understand file size
wc -l plugins/out_azure_kusto/azure_kusto_conf.cRepository: fluent/fluent-bit
Length of output: 107
🏁 Script executed:
#!/bin/bash
# Get more precise range
sed -n '580,680p' plugins/out_azure_kusto/azure_kusto_conf.cRepository: fluent/fluent-bit
Length of output: 4998
🏁 Script executed:
#!/bin/bash
# Search for all accesses to blob_ha, queue_ha, identity_token
rg 'resources->(blob_ha|queue_ha|identity_token|old_blob_ha|old_queue_ha|old_identity_token)' plugins/out_azure_kusto --context 5Repository: fluent/fluent-bit
Length of output: 11345
🏁 Script executed:
#!/bin/bash
# Check the ingest file for resource usage
rg 'blob_ha|queue_ha|identity_token' plugins/out_azure_kusto/azure_kusto_ingest.c --context 3Repository: fluent/fluent-bit
Length of output: 1092
🏁 Script executed:
#!/bin/bash
# Check the store file for resource usage
rg 'blob_ha|queue_ha|identity_token' plugins/out_azure_kusto/azure_kusto_store.c --context 3Repository: fluent/fluent-bit
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Find where ingest functions are called
rg 'azure_kusto_ingest_record|azure_kusto_ingest_packet' plugins/out_azure_kusto --context 5Repository: fluent/fluent-bit
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Look for the callback that processes records
rg 'cb_flush|flush_cb' plugins/out_azure_kusto/azure_kusto.c -A 10Repository: fluent/fluent-bit
Length of output: 232
🏁 Script executed:
#!/bin/bash
# Find where ctx->resources accesses happen outside mutex
grep -n 'ctx->resources->blob_ha\|ctx->resources->queue_ha\|ctx->resources->identity_token' plugins/out_azure_kusto/azure_kusto_ingest.c | head -20Repository: fluent/fluent-bit
Length of output: 270
🏁 Script executed:
#!/bin/bash
# Find the flush callback implementation
rg 'cb_azure_kusto_flush' plugins/out_azure_kusto/azure_kusto.c -A 30Repository: fluent/fluent-bit
Length of output: 1335
🏁 Script executed:
#!/bin/bash
# Check function signatures of ingest functions
rg 'azure_kusto_ingest_record|azure_kusto_ingest_packet' plugins/out_azure_kusto/azure_kusto_ingest.hRepository: fluent/fluent-bit
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Find all calls to ingest functions with context
rg -B5 -A5 'azure_kusto_ingest_record|azure_kusto_ingest_packet' plugins/out_azure_kusto/azure_kusto.cRepository: fluent/fluent-bit
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Look for where ensure_ingestion_resources is called
rg 'ensure_ingestion_resources' plugins/out_azure_kusto/azure_kusto.c -B 5 -A 5Repository: fluent/fluent-bit
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Get full flush callback implementation
sed -n '/^static void cb_azure_kusto_flush/,/^}/p' plugins/out_azure_kusto/azure_kusto.c | head -150Repository: fluent/fluent-bit
Length of output: 5997
🏁 Script executed:
#!/bin/bash
# Search for ingest function definitions
rg -n 'flb_sds_t.*ingest_' plugins/out_azure_kusto/azure_kusto_ingest.c | head -20Repository: fluent/fluent-bit
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Search for ingest_to_kusto definition and its calls
rg -n 'ingest_to_kusto' plugins/out_azure_kusto/azure_kusto.c -B 2 -A 20 | head -60Repository: fluent/fluent-bit
Length of output: 2181
🏁 Script executed:
#!/bin/bash
# Check if there's a wrapper that locks before calling ingest
rg -n 'int ingest_to_kusto' plugins/out_azure_kusto/azure_kusto.c -A 30Repository: fluent/fluent-bit
Length of output: 1259
🏁 Script executed:
#!/bin/bash
# Look for where ingest functions are actually defined
rg -n 'azure_kusto_ingest' plugins/out_azure_kusto/azure_kusto_ingest.c | head -10Repository: fluent/fluent-bit
Length of output: 95
🏁 Script executed:
#!/bin/bash
# Get the full ingest_to_kusto function to see where it calls ingest functions
sed -n '771,900p' plugins/out_azure_kusto/azure_kusto.cRepository: fluent/fluent-bit
Length of output: 4832
🏁 Script executed:
#!/bin/bash
# Search for azure_kusto_ingest_packet and azure_kusto_ingest_record calls
rg 'azure_kusto_ingest_packet|azure_kusto_ingest_record' plugins/out_azure_kusto/azure_kusto.c -B 3 -A 3Repository: fluent/fluent-bit
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Get the full definitions of ingest functions in ingest.c
sed -n '100,250p' plugins/out_azure_kusto/azure_kusto_ingest.cRepository: fluent/fluent-bit
Length of output: 5853
🏁 Script executed:
#!/bin/bash
# Find azure_kusto_queued_ingestion to see what it calls
rg -n 'azure_kusto_queued_ingestion' plugins/out_azure_kusto/azure_kusto_ingest.c -A 50 | head -80Repository: fluent/fluent-bit
Length of output: 2030
🏁 Script executed:
#!/bin/bash
# Check if any function acquires resources_mutex before ingest operations
rg -n 'pthread_mutex_lock.*resources_mutex' plugins/out_azure_kusto/azure_kusto.cRepository: fluent/fluent-bit
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Look for the pattern of resource access in the critical ingest paths
sed -n '140,180p' plugins/out_azure_kusto/azure_kusto_ingest.cRepository: fluent/fluent-bit
Length of output: 1441
🏁 Script executed:
#!/bin/bash
# Get the full context of azure_kusto_load_ingestion_resources to see lock/unlock
rg -n 'azure_kusto_load_ingestion_resources' plugins/out_azure_kusto/azure_kusto_conf.c -B 5 -A 80 | head -100Repository: fluent/fluent-bit
Length of output: 4467
🏁 Script executed:
#!/bin/bash
# Verify the sequence in flush: load_ingestion_resources then ingest_to_kusto
sed -n '1295,1320p' plugins/out_azure_kusto/azure_kusto.cRepository: fluent/fluent-bit
Length of output: 1367
Mutex protection is insufficient for safe resource rotation during ingest operations.
The resources_mutex is released after rotation (line 668 in azure_kusto_load_ingestion_resources), but ingest operations access the resources without reacquiring the lock. In azure_kusto_create_blob() (line 157) and azure_kusto_enqueue_ingestion() (line 425), calls to flb_upstream_ha_node_get(ctx->resources->blob_ha) and flb_upstream_ha_node_get(ctx->resources->queue_ha) happen without holding resources_mutex. Worker threads can hold pointers to these resources across multiple function calls after the initial refresh, so a concurrent rotation can move the current resources to old and destroy them while the worker thread is still using them on the next refresh cycle.
🤖 Prompt for AI Agents
In `@plugins/out_azure_kusto/azure_kusto_conf.c` around lines 616 - 638, The
rotation logic in azure_kusto_load_ingestion_resources moves and destroys HA and
token objects while worker functions azure_kusto_create_blob and
azure_kusto_enqueue_ingestion may access them without holding resources_mutex,
leading to use-after-free; fix by ensuring callers grab stable references under
the mutex (or an equivalent refcount) before using them: in
azure_kusto_create_blob and azure_kusto_enqueue_ingestion acquire
resources_mutex, copy ctx->resources->blob_ha/queue_ha and
ctx->resources->identity_token into local variables, call
flb_upstream_ha_node_get (or increment the object's refcount) while still
holding the lock, then release the lock and use the obtained node/token, and
update azure_kusto_load_ingestion_resources to only destroy old_* objects after
they are no longer referenced (i.e., after refcount reaches zero) or ensure
destruction happens under the same mutex that prevents new gets.
Signed-off-by: ag-ramachandran <ramacg@microsoft.com>
9f8e03e to
f270724
Compare
Signed-off-by: ag-ramachandran <ramacg@microsoft.com>
This pull request improves the safety and stability of resource management in the Azure Kusto output plugin by introducing deferred destruction of old resources. This change helps prevent use-after-free errors during high-volume operations where resources might still be in use by other threads when a refresh occurs.
Resource lifecycle management improvements:
old_blob_ha,old_queue_ha,old_identity_token) tostruct flb_azure_kusto_resourcesto track old resources that are pending cleanup, enabling deferred destruction.azure_kusto_load_ingestion_resourcesto first destroy resources from two refresh cycles ago, then move current resources to the "old" fields before assigning new ones, ensuring safe resource transitions.flb_azure_kusto_resources_clearto clean up any old resources pending destruction, ensuring proper resource deallocation and preventing memory leaks.Summary by CodeRabbit