Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/fix_change_event_handling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
When file change event for component changes and vector config change event comes very closely, config change event get discarded by component chnage. Fixing this issue by tracking and giving preference to reload from disk if there was event for that.

Check failure on line 1 in changelog.d/fix_change_event_handling.md

View workflow job for this annotation

GitHub Actions / Check Spelling

`chnage` is not a recognized word. (unrecognized-spelling)

authors: anil-db
145 changes: 131 additions & 14 deletions src/config/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ pub fn spawn_thread<'a>(
delay = ?delay
);

let has_unmatched = changed_paths.iter().any(|changed_path| {
!component_configs
.iter()
.any(|p| p.config_paths.contains(changed_path))
});

let changed_components: HashMap<_, _> = component_configs
.clone()
.into_iter()
Expand All @@ -137,7 +143,7 @@ pub fn spawn_thread<'a>(
debug!(message = "Reloaded paths.");

info!("Configuration file changed.");
if !changed_components.is_empty() {
if !has_unmatched && !changed_components.is_empty() {
info!(
"Component {:?} configuration changed.",
changed_components.keys()
Expand Down Expand Up @@ -225,13 +231,22 @@ mod tests {
};

async fn test_signal(
file: &mut File,
files: &mut [std::fs::File],
expected_signal: crate::signal::SignalTo,
timeout: Duration,
mut receiver: SignalRx,
) -> bool {
file.write_all(&[0]).unwrap();
file.sync_all().unwrap();
// Write and sync each file
for file in files.iter_mut() {
if let Err(e) = file.write_all(&[0]) {
error!("Failed to write to file: {}", e);
return false;
}
if let Err(e) = file.sync_all() {
error!("Failed to sync file: {}", e);
return false;
}
}

match tokio::time::timeout(timeout, receiver.recv()).await {
Ok(Ok(signal)) => signal == expected_signal,
Expand Down Expand Up @@ -275,7 +290,7 @@ mod tests {
let signal_rx2 = signal_rx.resubscribe();

if !test_signal(
&mut component_files[0],
&mut component_files[0..1],
crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
http_component.clone(),
])),
Expand All @@ -288,7 +303,7 @@ mod tests {
}

if !test_signal(
&mut component_files[1],
&mut component_files[1..2],
crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
http_component.clone(),
])),
Expand All @@ -300,6 +315,108 @@ mod tests {
panic!("Test timed out");
}
}

#[tokio::test]
async fn multi_component_update() {
trace_init();

let delay = Duration::from_secs(3);
let dir = temp_dir().to_path_buf();
let watcher_conf = WatcherConfig::RecommendedWatcher;
let component_file_path = vec![dir.join("tls.cert"), dir.join("tls.key")];
let http_component = ComponentKey::from("http");
let http_component_2 = ComponentKey::from("http2");

std::fs::create_dir(&dir).unwrap();

let mut component_files: Vec<std::fs::File> = component_file_path
.iter()
.map(|file| File::create(file).unwrap())
.collect();
let component_config = ComponentConfig::new(
component_file_path[0..1].to_vec(),
http_component.clone(),
ComponentType::Sink,
);
let component_config_2 = ComponentConfig::new(
component_file_path[1..2].to_vec(),
http_component_2.clone(),
ComponentType::Sink,
);

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(
watcher_conf,
signal_tx,
&[dir],
vec![component_config, component_config_2],
delay,
)
.unwrap();

let signal_rx = signal_rx.resubscribe();

if !test_signal(
&mut component_files,
crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
http_component.clone(),
http_component_2.clone(),
])),
delay * 5,
signal_rx,
)
.await
{
panic!("Test timed out");
}
}

#[tokio::test]
async fn component_and_config_update() {
trace_init();

let delay = Duration::from_secs(3);
let dir = temp_dir().to_path_buf();
let watcher_conf = WatcherConfig::RecommendedWatcher;
let component_file_path = vec![dir.join("tls.cert"), dir.join("vector.toml")];
let http_component = ComponentKey::from("http");

std::fs::create_dir(&dir).unwrap();

let mut component_files: Vec<std::fs::File> = component_file_path
.iter()
.map(|file| File::create(file).unwrap())
.collect();
let component_config = ComponentConfig::new(
component_file_path[0..1].to_vec(),
http_component.clone(),
ComponentType::Sink,
);

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(
watcher_conf,
signal_tx,
&[dir],
vec![component_config],
delay,
)
.unwrap();

let signal_rx = signal_rx.resubscribe();

if !test_signal(
&mut component_files,
crate::signal::SignalTo::ReloadFromDisk,
delay * 5,
signal_rx,
)
.await
{
panic!("Test timed out");
}
}

#[tokio::test]
async fn file_directory_update() {
trace_init();
Expand All @@ -310,13 +427,13 @@ mod tests {
let watcher_conf = WatcherConfig::RecommendedWatcher;

std::fs::create_dir(&dir).unwrap();
let mut file = File::create(&file_path).unwrap();
let file = File::create(&file_path).unwrap();

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[dir], vec![], delay).unwrap();

if !test_signal(
&mut file,
&mut vec![file],
crate::signal::SignalTo::ReloadFromDisk,
delay * 5,
signal_rx,
Expand All @@ -333,14 +450,14 @@ mod tests {

let delay = Duration::from_secs(3);
let file_path = temp_file();
let mut file = File::create(&file_path).unwrap();
let file = File::create(&file_path).unwrap();
let watcher_conf = WatcherConfig::RecommendedWatcher;

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[file_path], vec![], delay).unwrap();

if !test_signal(
&mut file,
&mut vec![file],
crate::signal::SignalTo::ReloadFromDisk,
delay * 5,
signal_rx,
Expand All @@ -359,7 +476,7 @@ mod tests {
let delay = Duration::from_secs(3);
let file_path = temp_file();
let sym_file = temp_file();
let mut file = File::create(&file_path).unwrap();
let file = File::create(&file_path).unwrap();
std::os::unix::fs::symlink(&file_path, &sym_file).unwrap();

let watcher_conf = WatcherConfig::RecommendedWatcher;
Expand All @@ -368,7 +485,7 @@ mod tests {
spawn_thread(watcher_conf, signal_tx, &[sym_file], vec![], delay).unwrap();

if !test_signal(
&mut file,
&mut vec![file],
crate::signal::SignalTo::ReloadFromDisk,
delay * 5,
signal_rx,
Expand All @@ -390,13 +507,13 @@ mod tests {
let watcher_conf = WatcherConfig::RecommendedWatcher;

std::fs::create_dir_all(&sub_dir).unwrap();
let mut file = File::create(&file_path).unwrap();
let file = File::create(&file_path).unwrap();

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[sub_dir], vec![], delay).unwrap();

if !test_signal(
&mut file,
&mut vec![file],
crate::signal::SignalTo::ReloadFromDisk,
delay * 5,
signal_rx,
Expand Down
Loading