Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/out_azure_kusto/azure_kusto.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ struct flb_azure_kusto_resources {

/* used to reload resouces after some time */
uint64_t load_time;

/* Old resources pending cleanup - deferred destruction to avoid use-after-free
* when other threads may still be using them during high-volume operations */
struct flb_upstream_ha *old_blob_ha;
struct flb_upstream_ha *old_queue_ha;
flb_sds_t old_identity_token;
};

struct flb_azure_kusto {
Expand Down
49 changes: 49 additions & 0 deletions plugins/out_azure_kusto/azure_kusto_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,22 @@ static int flb_azure_kusto_resources_clear(struct flb_azure_kusto_resources *res
resources->identity_token = NULL;
}

/* Also clean up any old resources pending destruction */
if (resources->old_blob_ha) {
flb_upstream_ha_destroy(resources->old_blob_ha);
resources->old_blob_ha = NULL;
}

if (resources->old_queue_ha) {
flb_upstream_ha_destroy(resources->old_queue_ha);
resources->old_queue_ha = NULL;
}

if (resources->old_identity_token) {
flb_sds_destroy(resources->old_identity_token);
resources->old_identity_token = NULL;
}

resources->load_time = 0;

return 0;
Expand Down Expand Up @@ -598,11 +614,44 @@ int azure_kusto_load_ingestion_resources(struct flb_azure_kusto *ctx,
parse_ingestion_identity_token(ctx, response);

if (identity_token) {
/*
Deferred cleanup: destroy resources from two refresh cycles ago,
then move current resources to 'old' before assigning new ones.
This avoids use-after-free when other threads may still be using
the current resources during high-volume operations.

With a 1-hour refresh interval, the race condition requires an
ingest operation to take >1 hour (the deferred cleanup grace period).
This is extremely unlikely under normal conditions (and hence a lock based
mechanism is avoided for performance).
*/
if (ctx->resources->old_blob_ha) {
flb_upstream_ha_destroy(ctx->resources->old_blob_ha);
flb_plg_debug(ctx->ins, "clearing up old blob HA");
}
if (ctx->resources->old_queue_ha) {
flb_upstream_ha_destroy(ctx->resources->old_queue_ha);
flb_plg_debug(ctx->ins, "clearing up old queue HA");
}
if (ctx->resources->old_identity_token) {
flb_sds_destroy(ctx->resources->old_identity_token);
flb_plg_debug(ctx->ins, "clearing up old identity token");
}

/* Move current to old */
ctx->resources->old_blob_ha = ctx->resources->blob_ha;
ctx->resources->old_queue_ha = ctx->resources->queue_ha;
ctx->resources->old_identity_token = ctx->resources->identity_token;

Comment on lines 616 to 645
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find all uses of the resource pointers
cd /tmp
rg -nP -C3 --type=c 'ctx->resources->(blob_ha|queue_ha|identity_token|old_blob_ha|old_queue_ha|old_identity_token)'

Repository: fluent/fluent-bit

Length of output: 209


🏁 Script executed:

#!/bin/bash
# Find mutex usage in the file
cd /tmp
rg -nP -C3 --type=c 'resources_mutex' plugins/out_azure_kusto

Repository: fluent/fluent-bit

Length of output: 124


🏁 Script executed:

#!/bin/bash
# Get broader context of the file structure
cd /tmp
wc -l plugins/out_azure_kusto/azure_kusto_conf.c

Repository: fluent/fluent-bit

Length of output: 134


🏁 Script executed:

#!/bin/bash
# Check the structure definition and initialization
cd /tmp
rg -nP -A5 'struct.*resources' plugins/out_azure_kusto/azure_kusto_conf.c

Repository: fluent/fluent-bit

Length of output: 143


🏁 Script executed:

#!/bin/bash
# Find the actual file location
fd -type f -name "azure_kusto_conf.c"

Repository: fluent/fluent-bit

Length of output: 231


🏁 Script executed:

#!/bin/bash
# List top-level plugins directory
find . -maxdepth 3 -type d -name "*azure*" 2>/dev/null | head -20

Repository: fluent/fluent-bit

Length of output: 166


🏁 Script executed:

#!/bin/bash
# Check structure
git ls-files | grep -i azure | head -20

Repository: fluent/fluent-bit

Length of output: 829


🏁 Script executed:

#!/bin/bash
# List contents of the kusto plugin directory
git ls-files | grep out_azure_kusto

Repository: fluent/fluent-bit

Length of output: 551


🏁 Script executed:

#!/bin/bash
# Now search for resource accesses in the correct location
fd -name "azure_kusto_conf.c" -exec cat -n {} \; | sed -n '610,650p'

Repository: fluent/fluent-bit

Length of output: 291


🏁 Script executed:

#!/bin/bash
# Search for resources_mutex in kusto plugin
rg 'resources_mutex' plugins/out_azure_kusto

Repository: fluent/fluent-bit

Length of output: 901


🏁 Script executed:

#!/bin/bash
# Read the code around lines 616-638
cat plugins/out_azure_kusto/azure_kusto_conf.c | head -650 | tail -100

Repository: fluent/fluent-bit

Length of output: 5289


🏁 Script executed:

#!/bin/bash
# Get line count to understand file size
wc -l plugins/out_azure_kusto/azure_kusto_conf.c

Repository: fluent/fluent-bit

Length of output: 107


🏁 Script executed:

#!/bin/bash
# Get more precise range
sed -n '580,680p' plugins/out_azure_kusto/azure_kusto_conf.c

Repository: fluent/fluent-bit

Length of output: 4998


🏁 Script executed:

#!/bin/bash
# Search for all accesses to blob_ha, queue_ha, identity_token
rg 'resources->(blob_ha|queue_ha|identity_token|old_blob_ha|old_queue_ha|old_identity_token)' plugins/out_azure_kusto --context 5

Repository: fluent/fluent-bit

Length of output: 11345


🏁 Script executed:

#!/bin/bash
# Check the ingest file for resource usage
rg 'blob_ha|queue_ha|identity_token' plugins/out_azure_kusto/azure_kusto_ingest.c --context 3

Repository: fluent/fluent-bit

Length of output: 1092


🏁 Script executed:

#!/bin/bash
# Check the store file for resource usage
rg 'blob_ha|queue_ha|identity_token' plugins/out_azure_kusto/azure_kusto_store.c --context 3

Repository: fluent/fluent-bit

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Find where ingest functions are called
rg 'azure_kusto_ingest_record|azure_kusto_ingest_packet' plugins/out_azure_kusto --context 5

Repository: fluent/fluent-bit

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Look for the callback that processes records
rg 'cb_flush|flush_cb' plugins/out_azure_kusto/azure_kusto.c -A 10

Repository: fluent/fluent-bit

Length of output: 232


🏁 Script executed:

#!/bin/bash
# Find where ctx->resources accesses happen outside mutex
grep -n 'ctx->resources->blob_ha\|ctx->resources->queue_ha\|ctx->resources->identity_token' plugins/out_azure_kusto/azure_kusto_ingest.c | head -20

Repository: fluent/fluent-bit

Length of output: 270


🏁 Script executed:

#!/bin/bash
# Find the flush callback implementation
rg 'cb_azure_kusto_flush' plugins/out_azure_kusto/azure_kusto.c -A 30

Repository: fluent/fluent-bit

Length of output: 1335


🏁 Script executed:

#!/bin/bash
# Check function signatures of ingest functions
rg 'azure_kusto_ingest_record|azure_kusto_ingest_packet' plugins/out_azure_kusto/azure_kusto_ingest.h

Repository: fluent/fluent-bit

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Find all calls to ingest functions with context
rg -B5 -A5 'azure_kusto_ingest_record|azure_kusto_ingest_packet' plugins/out_azure_kusto/azure_kusto.c

Repository: fluent/fluent-bit

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Look for where ensure_ingestion_resources is called
rg 'ensure_ingestion_resources' plugins/out_azure_kusto/azure_kusto.c -B 5 -A 5

Repository: fluent/fluent-bit

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Get full flush callback implementation
sed -n '/^static void cb_azure_kusto_flush/,/^}/p' plugins/out_azure_kusto/azure_kusto.c | head -150

Repository: fluent/fluent-bit

Length of output: 5997


🏁 Script executed:

#!/bin/bash
# Search for ingest function definitions
rg -n 'flb_sds_t.*ingest_' plugins/out_azure_kusto/azure_kusto_ingest.c | head -20

Repository: fluent/fluent-bit

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Search for ingest_to_kusto definition and its calls
rg -n 'ingest_to_kusto' plugins/out_azure_kusto/azure_kusto.c -B 2 -A 20 | head -60

Repository: fluent/fluent-bit

Length of output: 2181


🏁 Script executed:

#!/bin/bash
# Check if there's a wrapper that locks before calling ingest
rg -n 'int ingest_to_kusto' plugins/out_azure_kusto/azure_kusto.c -A 30

Repository: fluent/fluent-bit

Length of output: 1259


🏁 Script executed:

#!/bin/bash
# Look for where ingest functions are actually defined
rg -n 'azure_kusto_ingest' plugins/out_azure_kusto/azure_kusto_ingest.c | head -10

Repository: fluent/fluent-bit

Length of output: 95


🏁 Script executed:

#!/bin/bash
# Get the full ingest_to_kusto function to see where it calls ingest functions
sed -n '771,900p' plugins/out_azure_kusto/azure_kusto.c

Repository: fluent/fluent-bit

Length of output: 4832


🏁 Script executed:

#!/bin/bash
# Search for azure_kusto_ingest_packet and azure_kusto_ingest_record calls
rg 'azure_kusto_ingest_packet|azure_kusto_ingest_record' plugins/out_azure_kusto/azure_kusto.c -B 3 -A 3

Repository: fluent/fluent-bit

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Get the full definitions of ingest functions in ingest.c
sed -n '100,250p' plugins/out_azure_kusto/azure_kusto_ingest.c

Repository: fluent/fluent-bit

Length of output: 5853


🏁 Script executed:

#!/bin/bash
# Find azure_kusto_queued_ingestion to see what it calls
rg -n 'azure_kusto_queued_ingestion' plugins/out_azure_kusto/azure_kusto_ingest.c -A 50 | head -80

Repository: fluent/fluent-bit

Length of output: 2030


🏁 Script executed:

#!/bin/bash
# Check if any function acquires resources_mutex before ingest operations
rg -n 'pthread_mutex_lock.*resources_mutex' plugins/out_azure_kusto/azure_kusto.c

Repository: fluent/fluent-bit

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Look for the pattern of resource access in the critical ingest paths
sed -n '140,180p' plugins/out_azure_kusto/azure_kusto_ingest.c

Repository: fluent/fluent-bit

Length of output: 1441


🏁 Script executed:

#!/bin/bash
# Get the full context of azure_kusto_load_ingestion_resources to see lock/unlock
rg -n 'azure_kusto_load_ingestion_resources' plugins/out_azure_kusto/azure_kusto_conf.c -B 5 -A 80 | head -100

Repository: fluent/fluent-bit

Length of output: 4467


🏁 Script executed:

#!/bin/bash
# Verify the sequence in flush: load_ingestion_resources then ingest_to_kusto
sed -n '1295,1320p' plugins/out_azure_kusto/azure_kusto.c

Repository: fluent/fluent-bit

Length of output: 1367


Mutex protection is insufficient for safe resource rotation during ingest operations.

The resources_mutex is released after rotation (line 668 in azure_kusto_load_ingestion_resources), but ingest operations access the resources without reacquiring the lock. In azure_kusto_create_blob() (line 157) and azure_kusto_enqueue_ingestion() (line 425), calls to flb_upstream_ha_node_get(ctx->resources->blob_ha) and flb_upstream_ha_node_get(ctx->resources->queue_ha) happen without holding resources_mutex. Worker threads can hold pointers to these resources across multiple function calls after the initial refresh, so a concurrent rotation can move the current resources to old and destroy them while the worker thread is still using them on the next refresh cycle.

🤖 Prompt for AI Agents
In `@plugins/out_azure_kusto/azure_kusto_conf.c` around lines 616 - 638, The
rotation logic in azure_kusto_load_ingestion_resources moves and destroys HA and
token objects while worker functions azure_kusto_create_blob and
azure_kusto_enqueue_ingestion may access them without holding resources_mutex,
leading to use-after-free; fix by ensuring callers grab stable references under
the mutex (or an equivalent refcount) before using them: in
azure_kusto_create_blob and azure_kusto_enqueue_ingestion acquire
resources_mutex, copy ctx->resources->blob_ha/queue_ha and
ctx->resources->identity_token into local variables, call
flb_upstream_ha_node_get (or increment the object's refcount) while still
holding the lock, then release the lock and use the obtained node/token, and
update azure_kusto_load_ingestion_resources to only destroy old_* objects after
they are no longer referenced (i.e., after refcount reaches zero) or ensure
destruction happens under the same mutex that prevents new gets.

/* Assign new resources */
ctx->resources->blob_ha = blob_ha;
ctx->resources->queue_ha = queue_ha;
ctx->resources->identity_token = identity_token;
ctx->resources->load_time = now;

flb_plg_info(ctx->ins, "ingestion resources rotated successfully, "
"previous resources moved to deferred cleanup");

ret = 0;
}
else {
Expand Down
Loading