Skip to content

Commit 26f764e

Browse files
committed
check if there was event to reload from disk
1 parent 22cb8c5 commit 26f764e

File tree

1 file changed

+131
-14
lines changed

1 file changed

+131
-14
lines changed

src/config/watcher.rs

Lines changed: 131 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ pub fn spawn_thread<'a>(
121121
delay = ?delay
122122
);
123123

124+
let has_unmatched = changed_paths.iter().any(|changed_path| {
125+
!component_configs
126+
.iter()
127+
.any(|p| p.config_paths.contains(changed_path))
128+
});
129+
124130
let changed_components: HashMap<_, _> = component_configs
125131
.clone()
126132
.into_iter()
@@ -137,7 +143,7 @@ pub fn spawn_thread<'a>(
137143
debug!(message = "Reloaded paths.");
138144

139145
info!("Configuration file changed.");
140-
if !changed_components.is_empty() {
146+
if !has_unmatched && !changed_components.is_empty() {
141147
info!(
142148
"Component {:?} configuration changed.",
143149
changed_components.keys()
@@ -225,13 +231,22 @@ mod tests {
225231
};
226232

227233
async fn test_signal(
228-
file: &mut File,
234+
files: &mut [std::fs::File],
229235
expected_signal: crate::signal::SignalTo,
230236
timeout: Duration,
231237
mut receiver: SignalRx,
232238
) -> bool {
233-
file.write_all(&[0]).unwrap();
234-
file.sync_all().unwrap();
239+
// Write and sync each file
240+
for file in files.iter_mut() {
241+
if let Err(e) = file.write_all(&[0]) {
242+
error!("Failed to write to file: {}", e);
243+
return false;
244+
}
245+
if let Err(e) = file.sync_all() {
246+
error!("Failed to sync file: {}", e);
247+
return false;
248+
}
249+
}
235250

236251
match tokio::time::timeout(timeout, receiver.recv()).await {
237252
Ok(Ok(signal)) => signal == expected_signal,
@@ -275,7 +290,7 @@ mod tests {
275290
let signal_rx2 = signal_rx.resubscribe();
276291

277292
if !test_signal(
278-
&mut component_files[0],
293+
&mut component_files[0..1],
279294
crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
280295
http_component.clone(),
281296
])),
@@ -288,7 +303,7 @@ mod tests {
288303
}
289304

290305
if !test_signal(
291-
&mut component_files[1],
306+
&mut component_files[1..2],
292307
crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
293308
http_component.clone(),
294309
])),
@@ -300,6 +315,108 @@ mod tests {
300315
panic!("Test timed out");
301316
}
302317
}
318+
319+
#[tokio::test]
320+
async fn multi_component_update() {
321+
trace_init();
322+
323+
let delay = Duration::from_secs(3);
324+
let dir = temp_dir().to_path_buf();
325+
let watcher_conf = WatcherConfig::RecommendedWatcher;
326+
let component_file_path = vec![dir.join("tls.cert"), dir.join("tls.key")];
327+
let http_component = ComponentKey::from("http");
328+
let http_component_2 = ComponentKey::from("http2");
329+
330+
std::fs::create_dir(&dir).unwrap();
331+
332+
let mut component_files: Vec<std::fs::File> = component_file_path
333+
.iter()
334+
.map(|file| File::create(file).unwrap())
335+
.collect();
336+
let component_config = ComponentConfig::new(
337+
component_file_path[0..1].to_vec(),
338+
http_component.clone(),
339+
ComponentType::Sink,
340+
);
341+
let component_config_2 = ComponentConfig::new(
342+
component_file_path[1..2].to_vec(),
343+
http_component_2.clone(),
344+
ComponentType::Sink,
345+
);
346+
347+
let (signal_tx, signal_rx) = broadcast::channel(128);
348+
spawn_thread(
349+
watcher_conf,
350+
signal_tx,
351+
&[dir],
352+
vec![component_config, component_config_2],
353+
delay,
354+
)
355+
.unwrap();
356+
357+
let signal_rx = signal_rx.resubscribe();
358+
359+
if !test_signal(
360+
&mut component_files,
361+
crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
362+
http_component.clone(),
363+
http_component_2.clone(),
364+
])),
365+
delay * 5,
366+
signal_rx,
367+
)
368+
.await
369+
{
370+
panic!("Test timed out");
371+
}
372+
}
373+
374+
#[tokio::test]
375+
async fn component_and_config_update() {
376+
trace_init();
377+
378+
let delay = Duration::from_secs(3);
379+
let dir = temp_dir().to_path_buf();
380+
let watcher_conf = WatcherConfig::RecommendedWatcher;
381+
let component_file_path = vec![dir.join("tls.cert"), dir.join("vector.toml")];
382+
let http_component = ComponentKey::from("http");
383+
384+
std::fs::create_dir(&dir).unwrap();
385+
386+
let mut component_files: Vec<std::fs::File> = component_file_path
387+
.iter()
388+
.map(|file| File::create(file).unwrap())
389+
.collect();
390+
let component_config = ComponentConfig::new(
391+
component_file_path[0..1].to_vec(),
392+
http_component.clone(),
393+
ComponentType::Sink,
394+
);
395+
396+
let (signal_tx, signal_rx) = broadcast::channel(128);
397+
spawn_thread(
398+
watcher_conf,
399+
signal_tx,
400+
&[dir],
401+
vec![component_config],
402+
delay,
403+
)
404+
.unwrap();
405+
406+
let signal_rx = signal_rx.resubscribe();
407+
408+
if !test_signal(
409+
&mut component_files,
410+
crate::signal::SignalTo::ReloadFromDisk,
411+
delay * 5,
412+
signal_rx,
413+
)
414+
.await
415+
{
416+
panic!("Test timed out");
417+
}
418+
}
419+
303420
#[tokio::test]
304421
async fn file_directory_update() {
305422
trace_init();
@@ -310,13 +427,13 @@ mod tests {
310427
let watcher_conf = WatcherConfig::RecommendedWatcher;
311428

312429
std::fs::create_dir(&dir).unwrap();
313-
let mut file = File::create(&file_path).unwrap();
430+
let file = File::create(&file_path).unwrap();
314431

315432
let (signal_tx, signal_rx) = broadcast::channel(128);
316433
spawn_thread(watcher_conf, signal_tx, &[dir], vec![], delay).unwrap();
317434

318435
if !test_signal(
319-
&mut file,
436+
&mut vec![file],
320437
crate::signal::SignalTo::ReloadFromDisk,
321438
delay * 5,
322439
signal_rx,
@@ -333,14 +450,14 @@ mod tests {
333450

334451
let delay = Duration::from_secs(3);
335452
let file_path = temp_file();
336-
let mut file = File::create(&file_path).unwrap();
453+
let file = File::create(&file_path).unwrap();
337454
let watcher_conf = WatcherConfig::RecommendedWatcher;
338455

339456
let (signal_tx, signal_rx) = broadcast::channel(128);
340457
spawn_thread(watcher_conf, signal_tx, &[file_path], vec![], delay).unwrap();
341458

342459
if !test_signal(
343-
&mut file,
460+
&mut vec![file],
344461
crate::signal::SignalTo::ReloadFromDisk,
345462
delay * 5,
346463
signal_rx,
@@ -359,7 +476,7 @@ mod tests {
359476
let delay = Duration::from_secs(3);
360477
let file_path = temp_file();
361478
let sym_file = temp_file();
362-
let mut file = File::create(&file_path).unwrap();
479+
let file = File::create(&file_path).unwrap();
363480
std::os::unix::fs::symlink(&file_path, &sym_file).unwrap();
364481

365482
let watcher_conf = WatcherConfig::RecommendedWatcher;
@@ -368,7 +485,7 @@ mod tests {
368485
spawn_thread(watcher_conf, signal_tx, &[sym_file], vec![], delay).unwrap();
369486

370487
if !test_signal(
371-
&mut file,
488+
&mut vec![file],
372489
crate::signal::SignalTo::ReloadFromDisk,
373490
delay * 5,
374491
signal_rx,
@@ -390,13 +507,13 @@ mod tests {
390507
let watcher_conf = WatcherConfig::RecommendedWatcher;
391508

392509
std::fs::create_dir_all(&sub_dir).unwrap();
393-
let mut file = File::create(&file_path).unwrap();
510+
let file = File::create(&file_path).unwrap();
394511

395512
let (signal_tx, signal_rx) = broadcast::channel(128);
396513
spawn_thread(watcher_conf, signal_tx, &[sub_dir], vec![], delay).unwrap();
397514

398515
if !test_signal(
399-
&mut file,
516+
&mut vec![file],
400517
crate::signal::SignalTo::ReloadFromDisk,
401518
delay * 5,
402519
signal_rx,

0 commit comments

Comments
 (0)