Skip to content

Commit 8fd71ca

Browse files
committed
check if there was event to reload from disk
1 parent 22cb8c5 commit 8fd71ca

File tree

2 files changed

+134
-14
lines changed

2 files changed

+134
-14
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
When file change event for component changes and vector config change event comes very closely, config change event get discarded by component chnage. Fixing this issue by tracking and giving preference to reload from disk if there was event for that.
2+
3+
authors: anil-db

src/config/watcher.rs

Lines changed: 131 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ pub fn spawn_thread<'a>(
121121
delay = ?delay
122122
);
123123

124+
let has_unmatched = changed_paths.iter().any(|changed_path| {
125+
!component_configs
126+
.iter()
127+
.any(|p| p.config_paths.contains(changed_path))
128+
});
129+
124130
let changed_components: HashMap<_, _> = component_configs
125131
.clone()
126132
.into_iter()
@@ -137,7 +143,7 @@ pub fn spawn_thread<'a>(
137143
debug!(message = "Reloaded paths.");
138144

139145
info!("Configuration file changed.");
140-
if !changed_components.is_empty() {
146+
if !has_unmatched && !changed_components.is_empty() {
141147
info!(
142148
"Component {:?} configuration changed.",
143149
changed_components.keys()
@@ -225,13 +231,22 @@ mod tests {
225231
};
226232

227233
async fn test_signal(
228-
file: &mut File,
234+
files: &mut [std::fs::File],
229235
expected_signal: crate::signal::SignalTo,
230236
timeout: Duration,
231237
mut receiver: SignalRx,
232238
) -> bool {
233-
file.write_all(&[0]).unwrap();
234-
file.sync_all().unwrap();
239+
// Write and sync each file
240+
for file in files.iter_mut() {
241+
if let Err(e) = file.write_all(&[0]) {
242+
error!("Failed to write to file: {}", e);
243+
return false;
244+
}
245+
if let Err(e) = file.sync_all() {
246+
error!("Failed to sync file: {}", e);
247+
return false;
248+
}
249+
}
235250

236251
match tokio::time::timeout(timeout, receiver.recv()).await {
237252
Ok(Ok(signal)) => signal == expected_signal,
@@ -275,7 +290,7 @@ mod tests {
275290
let signal_rx2 = signal_rx.resubscribe();
276291

277292
if !test_signal(
278-
&mut component_files[0],
293+
&mut component_files[0..1],
279294
crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
280295
http_component.clone(),
281296
])),
@@ -288,7 +303,7 @@ mod tests {
288303
}
289304

290305
if !test_signal(
291-
&mut component_files[1],
306+
&mut component_files[1..2],
292307
crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
293308
http_component.clone(),
294309
])),
@@ -300,6 +315,108 @@ mod tests {
300315
panic!("Test timed out");
301316
}
302317
}
318+
319+
#[tokio::test]
320+
async fn multi_component_update() {
321+
trace_init();
322+
323+
let delay = Duration::from_secs(3);
324+
let dir = temp_dir().to_path_buf();
325+
let watcher_conf = WatcherConfig::RecommendedWatcher;
326+
let component_file_path = vec![dir.join("tls.cert"), dir.join("tls.key")];
327+
let http_component = ComponentKey::from("http");
328+
let http_component_2 = ComponentKey::from("http2");
329+
330+
std::fs::create_dir(&dir).unwrap();
331+
332+
let mut component_files: Vec<std::fs::File> = component_file_path
333+
.iter()
334+
.map(|file| File::create(file).unwrap())
335+
.collect();
336+
let component_config = ComponentConfig::new(
337+
component_file_path[0..1].to_vec(),
338+
http_component.clone(),
339+
ComponentType::Sink,
340+
);
341+
let component_config_2 = ComponentConfig::new(
342+
component_file_path[1..2].to_vec(),
343+
http_component_2.clone(),
344+
ComponentType::Sink,
345+
);
346+
347+
let (signal_tx, signal_rx) = broadcast::channel(128);
348+
spawn_thread(
349+
watcher_conf,
350+
signal_tx,
351+
&[dir],
352+
vec![component_config, component_config_2],
353+
delay,
354+
)
355+
.unwrap();
356+
357+
let signal_rx = signal_rx.resubscribe();
358+
359+
if !test_signal(
360+
&mut component_files,
361+
crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
362+
http_component.clone(),
363+
http_component_2.clone(),
364+
])),
365+
delay * 5,
366+
signal_rx,
367+
)
368+
.await
369+
{
370+
panic!("Test timed out");
371+
}
372+
}
373+
374+
#[tokio::test]
375+
async fn component_and_config_update() {
376+
trace_init();
377+
378+
let delay = Duration::from_secs(3);
379+
let dir = temp_dir().to_path_buf();
380+
let watcher_conf = WatcherConfig::RecommendedWatcher;
381+
let component_file_path = vec![dir.join("tls.cert"), dir.join("vector.toml")];
382+
let http_component = ComponentKey::from("http");
383+
384+
std::fs::create_dir(&dir).unwrap();
385+
386+
let mut component_files: Vec<std::fs::File> = component_file_path
387+
.iter()
388+
.map(|file| File::create(file).unwrap())
389+
.collect();
390+
let component_config = ComponentConfig::new(
391+
component_file_path[0..1].to_vec(),
392+
http_component.clone(),
393+
ComponentType::Sink,
394+
);
395+
396+
let (signal_tx, signal_rx) = broadcast::channel(128);
397+
spawn_thread(
398+
watcher_conf,
399+
signal_tx,
400+
&[dir],
401+
vec![component_config],
402+
delay,
403+
)
404+
.unwrap();
405+
406+
let signal_rx = signal_rx.resubscribe();
407+
408+
if !test_signal(
409+
&mut component_files,
410+
crate::signal::SignalTo::ReloadFromDisk,
411+
delay * 5,
412+
signal_rx,
413+
)
414+
.await
415+
{
416+
panic!("Test timed out");
417+
}
418+
}
419+
303420
#[tokio::test]
304421
async fn file_directory_update() {
305422
trace_init();
@@ -310,13 +427,13 @@ mod tests {
310427
let watcher_conf = WatcherConfig::RecommendedWatcher;
311428

312429
std::fs::create_dir(&dir).unwrap();
313-
let mut file = File::create(&file_path).unwrap();
430+
let file = File::create(&file_path).unwrap();
314431

315432
let (signal_tx, signal_rx) = broadcast::channel(128);
316433
spawn_thread(watcher_conf, signal_tx, &[dir], vec![], delay).unwrap();
317434

318435
if !test_signal(
319-
&mut file,
436+
&mut vec![file],
320437
crate::signal::SignalTo::ReloadFromDisk,
321438
delay * 5,
322439
signal_rx,
@@ -333,14 +450,14 @@ mod tests {
333450

334451
let delay = Duration::from_secs(3);
335452
let file_path = temp_file();
336-
let mut file = File::create(&file_path).unwrap();
453+
let file = File::create(&file_path).unwrap();
337454
let watcher_conf = WatcherConfig::RecommendedWatcher;
338455

339456
let (signal_tx, signal_rx) = broadcast::channel(128);
340457
spawn_thread(watcher_conf, signal_tx, &[file_path], vec![], delay).unwrap();
341458

342459
if !test_signal(
343-
&mut file,
460+
&mut vec![file],
344461
crate::signal::SignalTo::ReloadFromDisk,
345462
delay * 5,
346463
signal_rx,
@@ -359,7 +476,7 @@ mod tests {
359476
let delay = Duration::from_secs(3);
360477
let file_path = temp_file();
361478
let sym_file = temp_file();
362-
let mut file = File::create(&file_path).unwrap();
479+
let file = File::create(&file_path).unwrap();
363480
std::os::unix::fs::symlink(&file_path, &sym_file).unwrap();
364481

365482
let watcher_conf = WatcherConfig::RecommendedWatcher;
@@ -368,7 +485,7 @@ mod tests {
368485
spawn_thread(watcher_conf, signal_tx, &[sym_file], vec![], delay).unwrap();
369486

370487
if !test_signal(
371-
&mut file,
488+
&mut vec![file],
372489
crate::signal::SignalTo::ReloadFromDisk,
373490
delay * 5,
374491
signal_rx,
@@ -390,13 +507,13 @@ mod tests {
390507
let watcher_conf = WatcherConfig::RecommendedWatcher;
391508

392509
std::fs::create_dir_all(&sub_dir).unwrap();
393-
let mut file = File::create(&file_path).unwrap();
510+
let file = File::create(&file_path).unwrap();
394511

395512
let (signal_tx, signal_rx) = broadcast::channel(128);
396513
spawn_thread(watcher_conf, signal_tx, &[sub_dir], vec![], delay).unwrap();
397514

398515
if !test_signal(
399-
&mut file,
516+
&mut vec![file],
400517
crate::signal::SignalTo::ReloadFromDisk,
401518
delay * 5,
402519
signal_rx,

0 commit comments

Comments
 (0)