From 6021ab885417ecc7af4e8b19f6f48d3d4787a194 Mon Sep 17 00:00:00 2001 From: "Mikael Knutsson (mikn)" Date: Thu, 6 Mar 2025 17:16:28 +0100 Subject: [PATCH 1/5] fix: make twoliter work in parallel invocation scenarios --- Cargo.lock | 1 + twoliter/Cargo.toml | 1 + twoliter/src/common.rs | 85 ++++++++ twoliter/src/project/lock/verification.rs | 227 ++++++++++++++++++---- twoliter/src/project/mod.rs | 11 +- twoliter/src/tools.rs | 101 +++++++--- 6 files changed, 360 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 210223423..81095d706 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4689,6 +4689,7 @@ dependencies = [ "bytes", "clap", "env_logger", + "fastrand", "filetime", "flate2", "futures", diff --git a/twoliter/Cargo.toml b/twoliter/Cargo.toml index 972de88b8..4a2e04534 100644 --- a/twoliter/Cargo.toml +++ b/twoliter/Cargo.toml @@ -18,6 +18,7 @@ base64.workspace = true buildsys-config.workspace = true clap = { workspace = true, features = ["derive", "env", "std"] } env_logger.workspace = true +fastrand.workspace = true filetime.workspace = true flate2.workspace = true futures.workspace = true diff --git a/twoliter/src/common.rs b/twoliter/src/common.rs index 2a1190900..b621e5f37 100644 --- a/twoliter/src/common.rs +++ b/twoliter/src/common.rs @@ -1,6 +1,13 @@ use anyhow::{ensure, Context, Result}; +use fastrand; +use filetime::FileTime; use log::{self, LevelFilter}; +use std::io::ErrorKind; +use std::path::{Path, PathBuf}; +use std::time::Duration; +use tokio::fs::OpenOptions; use tokio::process::Command; +use tokio::time::sleep; use tracing::{debug, instrument}; /// This is passed as an environment variable to Buildsys. Buildsys tells Cargo to watch this @@ -69,6 +76,84 @@ pub(crate) async fn exec(cmd: &mut Command, quiet: bool) -> Result) -> Self { + Self { + lock_path: lock_path.as_ref().to_path_buf(), + stale_timeout_secs: 30, + max_attempts: 5, + base_delay_ms: 50, + } + } + + /// Try to acquire the lock with exponential backoff + pub(crate) async fn try_acquire(&self) -> Result> { + for attempt in 0..self.max_attempts { + match OpenOptions::new() + .create_new(true) + .write(true) + .open(&self.lock_path) + .await + { + Ok(file) => { + debug!("Acquired lock: {}", self.lock_path.display()); + return Ok(Some(FileLock { + lock_path: self.lock_path.clone(), + _file: file, + })); + } + Err(e) if e.kind() == ErrorKind::AlreadyExists => { + // Check if lock is stale + if let Ok(lock_meta) = fs::metadata(&self.lock_path).await { + let now = FileTime::now(); + let lock_time = FileTime::from_last_modification_time(&lock_meta); + if now.seconds() - lock_time.seconds() > self.stale_timeout_secs as i64 { + debug!("Removing stale lock: {}", self.lock_path.display()); + let _ = fs::remove_file(&self.lock_path).await; + continue; + } + } + } + Err(_) => {} + } + + // Exponential backoff with jitter + let max_pow = attempt.min(3); // Cap at 2^3 to avoid excessive delays + let delay = (2_u64.pow(max_pow) * self.base_delay_ms) + (fastrand::u64(1..=50)); + sleep(Duration::from_millis(delay)).await; + } + + Ok(None) // Failed to acquire lock after max attempts + } +} + +/// Represents an acquired file lock that is automatically released when dropped +pub(crate) struct FileLock { + lock_path: PathBuf, + _file: tokio::fs::File, // Keep the file handle to maintain the lock +} + +impl Drop for FileLock { + fn drop(&mut self) { + debug!("Releasing lock: {}", self.lock_path.display()); + + // Use a synchronous file removal on drop since we can't use async in Drop + // This is acceptable since the file is small and drop should be fast + let _ = std::fs::remove_file(&self.lock_path); + } +} + #[allow(dead_code)] pub(crate) mod fs { use anyhow::{Context, Result}; diff --git a/twoliter/src/project/lock/verification.rs b/twoliter/src/project/lock/verification.rs index 2d682ea5f..785c523a7 100644 --- a/twoliter/src/project/lock/verification.rs +++ b/twoliter/src/project/lock/verification.rs @@ -11,11 +11,14 @@ //! [`LockfileVerifier`]s. use super::image::LockedImage; use super::{Lock, LockedSDK}; -use anyhow::{Context, Result}; +use crate::common; +use anyhow::{anyhow, Context, Result}; use olpc_cjson::CanonicalFormatter as CanonicalJsonFormatter; use serde::{Deserialize, Serialize}; +use std::collections::hash_map::DefaultHasher; use std::collections::BTreeSet; use std::fmt::Debug; +use std::hash::{Hash, Hasher}; use std::path::Path; use strum::{EnumIter, IntoEnumIterator}; use tracing::{debug, instrument}; @@ -48,7 +51,7 @@ impl VerifyTag { } /// A manifest containing the list of elements that were verified by a `LockfileVerifier` -#[derive(Debug, Clone, Default, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] +#[derive(Debug, Default, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] #[serde(transparent)] pub(crate) struct VerificationManifest { verified_images: BTreeSet, @@ -125,57 +128,109 @@ impl From<&V> for VerificationTagger { impl VerificationTagger { /// Creates marker files for artifacts that have been verified against the lockfile + /// with proper handling of concurrent processes using file-based locking #[instrument(level = "trace", skip(external_kits_dir))] pub(crate) async fn write_tags>(&self, external_kits_dir: P) -> Result<()> { let external_kits_dir = external_kits_dir.as_ref(); - Self::cleanup_existing_tags(&external_kits_dir).await?; - debug!("Writing tag files for verified artifacts"); - tokio::fs::create_dir_all(&external_kits_dir) + // Ensure the directory exists + tokio::fs::create_dir_all(external_kits_dir) .await .context(format!( - "failed to create external-kits directory at '{}'", + "failed to create directory '{}'", external_kits_dir.display() ))?; - for tag in self.tags.iter() { - let flag_file = external_kits_dir.join(tag.marker_file_name()); - debug!( - "Writing tag file for verified artifacts: '{}'", - flag_file.display() - ); - tokio::fs::write(&flag_file, tag.manifest().as_canonical_json()?) - .await - .context(format!( - "failed to write verification tag file: '{}'", - flag_file.display() - ))?; - } - Ok(()) - } + // Create a file locker for this operation + let lock_path = external_kits_dir.join(".verification.lock"); + let file_locker = common::FileLocker::new(&lock_path); - /// Deletes any existing verifier marker files in the kits directory - #[instrument(level = "trace", skip(external_kits_dir))] - pub(crate) async fn cleanup_existing_tags>(external_kits_dir: P) -> Result<()> { - let external_kits_dir = external_kits_dir.as_ref(); + // Acquire the lock directly + let lock = file_locker + .try_acquire() + .await? + .ok_or_else(|| anyhow!("Failed to acquire lock for verification tags"))?; - debug!("Cleaning up any existing tag files for resolved artifacts",); - for resolve_tag in VerifyTag::iter() { - let flag_file = external_kits_dir.join(resolve_tag.marker_file_name()); - if flag_file.exists() { - debug!( - "Removing existing verification tag file '{}'", - flag_file.display() - ); + // Process tags while holding the lock + let result = self.process_tags(external_kits_dir).await; + + drop(lock); + result + } + + /// Process all tags once the lock is acquired + async fn process_tags(&self, external_kits_dir: &Path) -> Result<()> { + // First delete any tag files that we don't have + for tag_type in VerifyTag::iter() { + let flag_file = external_kits_dir.join(tag_type.marker_file_name()); + let has_tag = self + .tags + .iter() + .any(|t| t.marker_file_name() == tag_type.marker_file_name()); + + if !has_tag && flag_file.exists() { + debug!("Removing unused tag file '{}'", flag_file.display()); tokio::fs::remove_file(&flag_file).await.context(format!( - "failed to remove existing verification tag file: {}", + "Failed to remove tag file '{}'", flag_file.display() ))?; } } + // Now process our tags + for tag in &self.tags { + let flag_file = external_kits_dir.join(tag.marker_file_name()); + let new_content = tag.manifest().as_canonical_json()?; + + // Check if we need to update the file + let need_update = if flag_file.exists() { + match tokio::fs::read(&flag_file).await { + Ok(existing) => { + Self::calculate_hash(&existing) != Self::calculate_hash(&new_content) + } + Err(_) => true, // If we can't read it, we'll rewrite it + } + } else { + true // File doesn't exist, need to create it + }; + + if need_update { + // If the file exists but content is different, remove it first + if flag_file.exists() { + let _ = tokio::fs::remove_file(&flag_file).await; + } + + debug!("Writing tag file '{}'", flag_file.display()); + tokio::fs::write(&flag_file, &new_content) + .await + .context(format!( + "Failed to write tag file '{}'", + flag_file.display() + ))?; + } else { + debug!("Tag file '{}' unchanged, skipping", flag_file.display()); + } + } + Ok(()) } + + /// Calculate a hash for content + fn calculate_hash(content: &[u8]) -> u64 { + let mut hasher = DefaultHasher::new(); + content.hash(&mut hasher); + hasher.finish() + } + + /// Safely removes all verification tags using file-based locking + /// + /// This implementation uses proper locking to prevent race conditions between concurrent processes. + #[instrument(level = "trace", skip(external_kits_dir))] + pub(crate) async fn cleanup_existing_tags>(external_kits_dir: P) -> Result<()> { + // Create a VerificationTagger with no tags, which will remove all existing tags + let empty_tagger = Self::no_verifications(); + empty_tagger.write_tags(external_kits_dir).await + } } #[cfg(test)] @@ -265,4 +320,108 @@ mod test { let sdk_contents = tokio::fs::read_to_string(&sdk_flag_file).await.unwrap(); assert_eq!(sdk_contents, r#"["image1","image2"]"#); } + + #[tokio::test] + async fn test_content_based_skipping() { + // Test that we don't rewrite identical content (optimization) + let kits_dir = tempfile::tempdir().unwrap(); + let tagger = VerificationTagger::from(&SDKResolver); + + // First write + tagger.write_tags(&kits_dir.path()).await.unwrap(); + let sdk_flag_file = kits_dir.path().join(SDK_VERIFIED_MARKER_FILE); + let original_metadata = sdk_flag_file.metadata().unwrap(); + let original_modified = original_metadata.modified().unwrap(); + + // Sleep to ensure potential timestamp difference would be detectable + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Write again with same content + tagger.write_tags(&kits_dir.path()).await.unwrap(); + let new_metadata = sdk_flag_file.metadata().unwrap(); + let new_modified = new_metadata.modified().unwrap(); + + // File should not have been rewritten (timestamps should match) + assert_eq!( + original_modified, new_modified, + "File was rewritten despite identical content" + ); + } + + #[tokio::test] + async fn test_cleanup_functionality() { + // Test that cleanup_existing_tags properly removes all verification tags + let kits_dir = tempfile::tempdir().unwrap(); + + // First create all types of tag files + let kit_tagger = VerificationTagger::from(&KitResolver); + kit_tagger.write_tags(kits_dir.path()).await.unwrap(); + + // Verify SDK and Kit tags exist + let sdk_file = kits_dir.path().join(SDK_VERIFIED_MARKER_FILE); + let kit_file = kits_dir.path().join(KITS_VERIFIED_MARKER_FILE); + assert!(sdk_file.exists(), "SDK tag file missing after write"); + assert!(kit_file.exists(), "Kit tag file missing after write"); + + // Now remove all tags + VerificationTagger::cleanup_existing_tags(kits_dir.path()) + .await + .unwrap(); + + // Verify all tags were removed + assert!( + !sdk_file.exists(), + "SDK tag file still exists after cleanup_existing_tags" + ); + assert!( + !kit_file.exists(), + "Kit tag file still exists after cleanup_existing_tags" + ); + } + + #[tokio::test] + async fn test_concurrent_tag_writing() { + // Test that concurrent writes are handled safely + let kits_dir = tempfile::tempdir().unwrap(); + + // Create two different taggers + let sdk_tagger = VerificationTagger::from(&SDKResolver); + let kit_tagger = VerificationTagger::from(&KitResolver); + + // Launch concurrent writes + let sdk_future = sdk_tagger.write_tags(kits_dir.path()); + let kit_future = kit_tagger.write_tags(kits_dir.path()); + + // Both should complete without errors + let (sdk_result, kit_result) = tokio::join!(sdk_future, kit_future); + assert!( + sdk_result.is_ok(), + "SDK tagger failed during concurrent write" + ); + assert!( + kit_result.is_ok(), + "Kit tagger failed during concurrent write" + ); + + // One of the taggers should have "won" - ultimately we should have consistent state + let sdk_file = kits_dir.path().join(SDK_VERIFIED_MARKER_FILE); + let kit_file = kits_dir.path().join(KITS_VERIFIED_MARKER_FILE); + + assert!( + sdk_file.exists(), + "SDK tag file missing after concurrent writes" + ); + + // Check if the winning write was from the KitResolver (which includes kit tags) + // or the SDKResolver (which only includes SDK tags) + if kit_file.exists() { + // KitResolver won the race + let kit_contents = tokio::fs::read_to_string(&kit_file).await.unwrap(); + assert_eq!(kit_contents, r#"["kit1","kit2"]"#); + } + + // Either way, the SDK file should exist with proper content + let sdk_contents = tokio::fs::read_to_string(&sdk_file).await.unwrap(); + assert_eq!(sdk_contents, r#"["image1","image2"]"#); + } } diff --git a/twoliter/src/project/mod.rs b/twoliter/src/project/mod.rs index d3f517dea..bde42a6e7 100644 --- a/twoliter/src/project/mod.rs +++ b/twoliter/src/project/mod.rs @@ -86,12 +86,8 @@ impl Project { ))?; let project = unvalidated.validate(path).await?; - // When projects are resolved, tags are written indicating which artifacts have been checked - // against the lockfile. - // We clean these up as early as possible to avoid situations in which artifacts are - // incorrectly flagged as having been resolved. - VerificationTagger::cleanup_existing_tags(project.external_kits_dir()).await?; - + // No cleanup of verification tags needed here - will be handled properly in load_lock + // when write_tags is called Ok(project) } @@ -130,8 +126,7 @@ impl Project { } pub(crate) async fn load_lock(&self) -> Result> { - VerificationTagger::cleanup_existing_tags(self.external_kits_dir()).await?; - + // No need to remove tags first as write_tags will handle existing tags properly let resolved_lock = NL::load_lock(self, private::SealToken).await?; resolved_lock diff --git a/twoliter/src/tools.rs b/twoliter/src/tools.rs index 0d89091b7..0143ba11c 100644 --- a/twoliter/src/tools.rs +++ b/twoliter/src/tools.rs @@ -1,4 +1,4 @@ -use crate::common::fs; +use crate::common::{self, fs}; use anyhow::{Context, Result}; use filetime::{set_file_handle_times, set_file_mtime, FileTime}; use flate2::read::ZlibDecoder; @@ -20,44 +20,97 @@ const TESTSYS: &[u8] = include_bytes!(env!("CARGO_BIN_FILE_TESTSYS")); const TUFTOOL: &[u8] = include_bytes!(env!("CARGO_BIN_FILE_TUFTOOL")); const UNPLUG: &[u8] = include_bytes!(env!("CARGO_BIN_FILE_UNPLUG")); -/// Install tools into the given `tools_dir`. If you use a `TempDir` object, make sure to pass it by -/// reference and hold on to it until you no longer need the tools to still be installed (it will -/// auto delete when it goes out of scope). +/// Install tools into the given `tools_dir`. Uses a temporary directory and file locking +/// to prevent race conditions when multiple processes install tools concurrently. pub(crate) async fn install_tools(tools_dir: impl AsRef) -> Result<()> { - let dir = tools_dir.as_ref(); + let dir = tools_dir.as_ref().to_path_buf(); debug!("Installing tools to '{}'", dir.display()); - fs::remove_dir_all(dir) + + // Create parent directory and prepare temporary directory + let parent_dir = dir.parent().unwrap_or_else(|| Path::new(".")); + fs::create_dir_all(parent_dir) .await - .context("Unable to remove tools directory before installing")?; - fs::create_dir_all(dir) + .context("Unable to create parent directory")?; + + let temp_dir = parent_dir.join(format!( + "{}.tmp.{}", + dir.file_name().unwrap_or_default().to_string_lossy(), + uuid::Uuid::new_v4() + )); + let lock_path = parent_dir.join(format!( + ".{}.lock", + dir.file_name().unwrap_or_default().to_string_lossy() + )); + + // Clean up and recreate temporary directory + let _ = fs::remove_dir_all(&temp_dir).await; + fs::create_dir_all(&temp_dir) .await - .context("Unable to create directory for tools")?; + .context("Unable to create temporary directory for tools")?; // Write out the embedded tools and scripts. - unpack_tarball(dir) + unpack_tarball(&temp_dir) .await .context("Unable to install tools")?; // Pick one of the embedded files for use as the canonical mtime. - let metadata = fs::metadata(dir.join("build.Dockerfile")) + let metadata = fs::metadata(temp_dir.join("build.Dockerfile")) .await .context("Unable to get Dockerfile metadata")?; let mtime = FileTime::from_last_modification_time(&metadata); - write_bin("buildsys", BUILDSYS, &dir, mtime).await?; - write_bin("pipesys", PIPESYS, &dir, mtime).await?; + // Write all binaries to the temporary directory + write_bin("buildsys", BUILDSYS, &temp_dir, mtime).await?; + write_bin("pipesys", PIPESYS, &temp_dir, mtime).await?; #[cfg(feature = "pubsys")] - write_bin("pubsys", PUBSYS, &dir, mtime).await?; - write_bin("pubsys-setup", PUBSYS_SETUP, &dir, mtime).await?; - write_bin("testsys", TESTSYS, &dir, mtime).await?; - write_bin("tuftool", TUFTOOL, &dir, mtime).await?; - write_bin("unplug", UNPLUG, &dir, mtime).await?; - fs::copy(KRANE.path(), dir.join("krane")).await?; - - // Apply the mtime to the directory now that the writes are done. - set_file_mtime(dir, mtime).context(format!("Unable to set mtime for '{}'", dir.display()))?; - - Ok(()) + write_bin("pubsys", PUBSYS, &temp_dir, mtime).await?; + write_bin("pubsys-setup", PUBSYS_SETUP, &temp_dir, mtime).await?; + write_bin("testsys", TESTSYS, &temp_dir, mtime).await?; + write_bin("tuftool", TUFTOOL, &temp_dir, mtime).await?; + write_bin("unplug", UNPLUG, &temp_dir, mtime).await?; + fs::copy(KRANE.path(), temp_dir.join("krane")).await?; + set_file_mtime(&temp_dir, mtime) + .context(format!("Unable to set mtime for '{}'", temp_dir.display()))?; + + // Use the common FileLocker utility for file-based locking + let file_locker = common::FileLocker::new(&lock_path); + + // Acquire the lock directly + let lock = match file_locker.try_acquire().await? { + Some(lock) => lock, + None => { + // Clean up temporary directory if we can't get the lock + let _ = fs::remove_dir_all(&temp_dir).await; + return Err(anyhow::anyhow!( + "Failed to acquire lock for installing tools" + )); + } + }; + + // Perform the critical section operations while holding the lock + let result = async { + if dir.exists() { + fs::remove_dir_all(&dir) + .await + .context("Unable to remove existing tools directory")?; + } + fs::rename(&temp_dir, &dir).await.context(format!( + "Unable to move temp directory to '{}'", + dir.display() + ))?; + debug!("Successfully installed tools to '{}'", dir.display()); + Ok(()) + } + .await; + + // If there was an error and the temp directory still exists, clean it up + if result.is_err() { + let _ = fs::remove_dir_all(&temp_dir).await; + } + + drop(lock); + + result } async fn write_bin(name: &str, data: &[u8], dir: impl AsRef, mtime: FileTime) -> Result<()> { From e51c821103612cf55fb6a5380c66f4d293da9381 Mon Sep 17 00:00:00 2001 From: "Mikael Knutsson (mikn)" Date: Thu, 6 Mar 2025 20:36:31 +0100 Subject: [PATCH 2/5] fix: lint errors --- twoliter/src/common.rs | 155 ++++++++++++++++++++--------------------- 1 file changed, 77 insertions(+), 78 deletions(-) diff --git a/twoliter/src/common.rs b/twoliter/src/common.rs index b621e5f37..8c1834f88 100644 --- a/twoliter/src/common.rs +++ b/twoliter/src/common.rs @@ -1,5 +1,4 @@ use anyhow::{ensure, Context, Result}; -use fastrand; use filetime::FileTime; use log::{self, LevelFilter}; use std::io::ErrorKind; @@ -77,83 +76,6 @@ pub(crate) async fn exec(cmd: &mut Command, quiet: bool) -> Result) -> Self { - Self { - lock_path: lock_path.as_ref().to_path_buf(), - stale_timeout_secs: 30, - max_attempts: 5, - base_delay_ms: 50, - } - } - - /// Try to acquire the lock with exponential backoff - pub(crate) async fn try_acquire(&self) -> Result> { - for attempt in 0..self.max_attempts { - match OpenOptions::new() - .create_new(true) - .write(true) - .open(&self.lock_path) - .await - { - Ok(file) => { - debug!("Acquired lock: {}", self.lock_path.display()); - return Ok(Some(FileLock { - lock_path: self.lock_path.clone(), - _file: file, - })); - } - Err(e) if e.kind() == ErrorKind::AlreadyExists => { - // Check if lock is stale - if let Ok(lock_meta) = fs::metadata(&self.lock_path).await { - let now = FileTime::now(); - let lock_time = FileTime::from_last_modification_time(&lock_meta); - if now.seconds() - lock_time.seconds() > self.stale_timeout_secs as i64 { - debug!("Removing stale lock: {}", self.lock_path.display()); - let _ = fs::remove_file(&self.lock_path).await; - continue; - } - } - } - Err(_) => {} - } - - // Exponential backoff with jitter - let max_pow = attempt.min(3); // Cap at 2^3 to avoid excessive delays - let delay = (2_u64.pow(max_pow) * self.base_delay_ms) + (fastrand::u64(1..=50)); - sleep(Duration::from_millis(delay)).await; - } - - Ok(None) // Failed to acquire lock after max attempts - } -} - -/// Represents an acquired file lock that is automatically released when dropped -pub(crate) struct FileLock { - lock_path: PathBuf, - _file: tokio::fs::File, // Keep the file handle to maintain the lock -} - -impl Drop for FileLock { - fn drop(&mut self) { - debug!("Releasing lock: {}", self.lock_path.display()); - - // Use a synchronous file removal on drop since we can't use async in Drop - // This is acceptable since the file is small and drop should be fast - let _ = std::fs::remove_file(&self.lock_path); - } -} - #[allow(dead_code)] pub(crate) mod fs { use anyhow::{Context, Result}; @@ -295,6 +217,83 @@ pub(crate) mod fs { } } +/// A utility for safely acquiring and releasing file locks in a concurrent environment. +/// This provides atomic file-based locking with exponential backoff and stale lock detection. +pub(crate) struct FileLocker { + lock_path: PathBuf, + stale_timeout_secs: u64, + max_attempts: u32, + base_delay_ms: u64, +} + +impl FileLocker { + /// Create a new FileLocker for the specified lock path + pub(crate) fn new(lock_path: impl AsRef) -> Self { + Self { + lock_path: lock_path.as_ref().to_path_buf(), + stale_timeout_secs: 30, + max_attempts: 5, + base_delay_ms: 50, + } + } + + /// Try to acquire the lock with exponential backoff + pub(crate) async fn try_acquire(&self) -> Result> { + for attempt in 0..self.max_attempts { + match OpenOptions::new() + .create_new(true) + .write(true) + .open(&self.lock_path) + .await + { + Ok(file) => { + debug!("Acquired lock: {}", self.lock_path.display()); + return Ok(Some(FileLock { + lock_path: self.lock_path.clone(), + _file: file, + })); + } + Err(e) if e.kind() == ErrorKind::AlreadyExists => { + // Check if lock is stale + if let Ok(lock_meta) = fs::metadata(&self.lock_path).await { + let now = FileTime::now(); + let lock_time = FileTime::from_last_modification_time(&lock_meta); + if now.seconds() - lock_time.seconds() > self.stale_timeout_secs as i64 { + debug!("Removing stale lock: {}", self.lock_path.display()); + let _ = fs::remove_file(&self.lock_path).await; + continue; + } + } + } + Err(_) => {} + } + + // Exponential backoff with jitter + let max_pow = attempt.min(3); // Cap at 2^3 to avoid excessive delays + let delay = (2_u64.pow(max_pow) * self.base_delay_ms) + (fastrand::u64(1..=50)); + sleep(Duration::from_millis(delay)).await; + } + + Ok(None) // Failed to acquire lock after max attempts + } +} + +/// Represents an acquired file lock that is automatically released when dropped +pub(crate) struct FileLock { + lock_path: PathBuf, + _file: tokio::fs::File, // Keep the file handle to maintain the lock +} + +impl Drop for FileLock { + fn drop(&mut self) { + debug!("Releasing lock: {}", self.lock_path.display()); + + // Use a synchronous file removal on drop since we can't use async in Drop + // This is acceptable since the file is small and drop should be fast + let _ = std::fs::remove_file(&self.lock_path); + } +} + #[tokio::test] async fn test_remove_dir_all_no_dir() { use crate::common::fs; From 51718112925dcf3dcc0794aa802fef76e4ce5de2 Mon Sep 17 00:00:00 2001 From: "Mikael Knutsson (mikn)" Date: Fri, 7 Mar 2025 12:59:50 +0100 Subject: [PATCH 3/5] do hash based comparisons for tools/ folder also --- twoliter/src/common.rs | 167 ++++++++++++++++++++++ twoliter/src/project/lock/verification.rs | 39 ++--- twoliter/src/tools.rs | 165 +++++++++++++++++++-- 3 files changed, 332 insertions(+), 39 deletions(-) diff --git a/twoliter/src/common.rs b/twoliter/src/common.rs index 8c1834f88..3e0aec8c4 100644 --- a/twoliter/src/common.rs +++ b/twoliter/src/common.rs @@ -294,6 +294,173 @@ impl Drop for FileLock { } } +/// Utilities for content-based file comparison and tracking +pub(crate) mod content { + use super::*; + use anyhow::Context; + use std::collections::hash_map::DefaultHasher; + use std::collections::HashMap; + use std::hash::{Hash, Hasher}; + use std::path::Path; + use tokio::fs; + use tracing::debug; + + /// Calculate a hash for content + pub(crate) fn calculate_hash(content: &[u8]) -> u64 { + let mut hasher = DefaultHasher::new(); + content.hash(&mut hasher); + hasher.finish() + } + + /// Check if a file's content needs to be updated based on hash comparison + pub(crate) async fn needs_content_update( + path: impl AsRef, + new_content: &[u8], + ) -> Result { + let path = path.as_ref(); + if path.exists() { + match fs::read(path).await { + Ok(existing) => { + let existing_hash = calculate_hash(&existing); + let new_hash = calculate_hash(new_content); + let needs_update = existing_hash != new_hash; + + if needs_update { + debug!( + "Content hash mismatch for '{}': existing={:x}, new={:x}", + path.display(), + existing_hash, + new_hash + ); + } else { + debug!( + "Content hash match for '{}': hash={:x}", + path.display(), + existing_hash + ); + } + + Ok(needs_update) + } + Err(e) => { + debug!("Error reading existing file '{}': {}", path.display(), e); + Ok(true) // If we can't read it, we'll rewrite it + } + } + } else { + debug!("File '{}' doesn't exist, needs creation", path.display()); + Ok(true) // File doesn't exist, need to create it + } + } + + /// Compare two directories to determine if their contents are identical + /// Returns Ok(true) if directories differ, Ok(false) if they're identical, or an Error + pub(crate) async fn compare_directories( + dir1: impl AsRef, + dir2: impl AsRef, + ) -> Result { + let dir1 = dir1.as_ref(); + let dir2 = dir2.as_ref(); + + debug!( + "Comparing directories: '{}' and '{}'", + dir1.display(), + dir2.display() + ); + + // Get the list of files in both directories + let dir1_entries = get_file_list(dir1).await?; + let dir2_entries = get_file_list(dir2).await?; + + // First, check if the file lists are different + if dir1_entries.len() != dir2_entries.len() { + debug!( + "Directory sizes differ: {} vs {} files", + dir1_entries.len(), + dir2_entries.len() + ); + return Ok(true); // Different number of files means directories differ + } + + // Next check file by file to see if anything differs + for (file_path, hash1) in &dir1_entries { + match dir2_entries.get(file_path) { + Some(hash2) if hash1 == hash2 => { + // File content is the same, continue checking + continue; + } + Some(hash2) => { + debug!( + "Content hash mismatch for '{}': {:x} vs {:x}", + file_path, hash1, hash2 + ); + return Ok(true); // File content is different + } + None => { + debug!("File '{}' missing in second directory", file_path); + return Ok(true); // File doesn't exist in dir2 + } + } + } + + // If we got here, directories are identical + debug!( + "Directories are identical: {} files with matching content", + dir1_entries.len() + ); + Ok(false) + } + + /// Get list of files in a directory with their content hashes + pub(crate) async fn get_file_list(dir: &Path) -> Result> { + let mut result = HashMap::new(); + let entries = fs::read_dir(dir) + .await + .context(format!("Unable to read directory '{}'", dir.display()))?; + + let mut entries_vec = Vec::new(); + let mut dir_entries = entries; + while let Some(entry) = dir_entries.next_entry().await? { + entries_vec.push(entry); + } + + for entry in entries_vec { + let path = entry.path(); + let metadata = entry.metadata().await?; + + if metadata.is_file() { + // Get relative path + let rel_path = path + .strip_prefix(dir) + .context(format!("Unable to strip prefix from '{}'", path.display()))? + .to_string_lossy() + .into_owned(); + + // Calculate hash for file content + let content = fs::read(&path) + .await + .context(format!("Unable to read file '{}'", path.display()))?; + let hash = calculate_hash(&content); + + result.insert(rel_path, hash); + } else if metadata.is_dir() { + // Handle subdirectories recursively using Box::pin for recursive async calls + let subdirectory = Box::pin(get_file_list(&path)).await?; + for (sub_path, hash) in subdirectory { + let full_path = format!( + "{}/{}", + path.file_name().unwrap_or_default().to_string_lossy(), + sub_path + ); + result.insert(full_path, hash); + } + } + } + + Ok(result) + } +} + #[tokio::test] async fn test_remove_dir_all_no_dir() { use crate::common::fs; diff --git a/twoliter/src/project/lock/verification.rs b/twoliter/src/project/lock/verification.rs index 785c523a7..a67d08ec9 100644 --- a/twoliter/src/project/lock/verification.rs +++ b/twoliter/src/project/lock/verification.rs @@ -11,16 +11,15 @@ //! [`LockfileVerifier`]s. use super::image::LockedImage; use super::{Lock, LockedSDK}; -use crate::common; +use crate::common::{self, content}; use anyhow::{anyhow, Context, Result}; use olpc_cjson::CanonicalFormatter as CanonicalJsonFormatter; use serde::{Deserialize, Serialize}; -use std::collections::hash_map::DefaultHasher; use std::collections::BTreeSet; use std::fmt::Debug; -use std::hash::{Hash, Hasher}; use std::path::Path; use strum::{EnumIter, IntoEnumIterator}; +use tokio::fs; use tracing::{debug, instrument}; const SDK_VERIFIED_MARKER_FILE: &str = ".sdk-verified"; @@ -170,7 +169,7 @@ impl VerificationTagger { if !has_tag && flag_file.exists() { debug!("Removing unused tag file '{}'", flag_file.display()); - tokio::fs::remove_file(&flag_file).await.context(format!( + fs::remove_file(&flag_file).await.context(format!( "Failed to remove tag file '{}'", flag_file.display() ))?; @@ -182,31 +181,20 @@ impl VerificationTagger { let flag_file = external_kits_dir.join(tag.marker_file_name()); let new_content = tag.manifest().as_canonical_json()?; - // Check if we need to update the file - let need_update = if flag_file.exists() { - match tokio::fs::read(&flag_file).await { - Ok(existing) => { - Self::calculate_hash(&existing) != Self::calculate_hash(&new_content) - } - Err(_) => true, // If we can't read it, we'll rewrite it - } - } else { - true // File doesn't exist, need to create it - }; + // Check if we need to update the file using the shared utility + let need_update = content::needs_content_update(&flag_file, &new_content).await?; if need_update { // If the file exists but content is different, remove it first if flag_file.exists() { - let _ = tokio::fs::remove_file(&flag_file).await; + let _ = fs::remove_file(&flag_file).await; } debug!("Writing tag file '{}'", flag_file.display()); - tokio::fs::write(&flag_file, &new_content) - .await - .context(format!( - "Failed to write tag file '{}'", - flag_file.display() - ))?; + fs::write(&flag_file, &new_content).await.context(format!( + "Failed to write tag file '{}'", + flag_file.display() + ))?; } else { debug!("Tag file '{}' unchanged, skipping", flag_file.display()); } @@ -215,12 +203,7 @@ impl VerificationTagger { Ok(()) } - /// Calculate a hash for content - fn calculate_hash(content: &[u8]) -> u64 { - let mut hasher = DefaultHasher::new(); - content.hash(&mut hasher); - hasher.finish() - } + // Note: Using content::calculate_hash from common.rs instead of a local implementation /// Safely removes all verification tags using file-based locking /// diff --git a/twoliter/src/tools.rs b/twoliter/src/tools.rs index 0143ba11c..8531f18b3 100644 --- a/twoliter/src/tools.rs +++ b/twoliter/src/tools.rs @@ -1,4 +1,4 @@ -use crate::common::{self, fs}; +use crate::common::{self, content, fs}; use anyhow::{Context, Result}; use filetime::{set_file_handle_times, set_file_mtime, FileTime}; use flate2::read::ZlibDecoder; @@ -8,7 +8,7 @@ use tar::Archive; use tokio::fs::OpenOptions; use tokio::io::AsyncWriteExt; use tokio::runtime::Handle; -use tracing::debug; +use tracing::{debug, info}; const TAR_GZ_DATA: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/tools.tar.gz")); const BUILDSYS: &[u8] = include_bytes!(env!("CARGO_BIN_FILE_BUILDSYS")); @@ -90,15 +90,49 @@ pub(crate) async fn install_tools(tools_dir: impl AsRef) -> Result<()> { // Perform the critical section operations while holding the lock let result = async { if dir.exists() { - fs::remove_dir_all(&dir) - .await - .context("Unable to remove existing tools directory")?; + // Check if we need to update based on content hashes using common utilities + let need_update = match content::compare_directories(&temp_dir, &dir).await { + Ok(different) => { + if different { + debug!("Content differences detected, updating tools"); + true + } else { + info!("Tools directory content matches, skipping update"); + false + } + } + Err(e) => { + debug!("Error comparing directories, will reinstall: {}", e); + true + } + }; + + if need_update { + fs::remove_dir_all(&dir) + .await + .context("Unable to remove existing tools directory")?; + + fs::rename(&temp_dir, &dir).await.context(format!( + "Unable to move temp directory to '{}'", + dir.display() + ))?; + debug!("Successfully updated tools in '{}'", dir.display()); + } else { + // If no update needed, clean up the temp directory + let _ = fs::remove_dir_all(&temp_dir).await; + debug!( + "Tools in '{}' are up to date, no changes made", + dir.display() + ); + } + } else { + // Directory doesn't exist, just move the temp directory + fs::rename(&temp_dir, &dir).await.context(format!( + "Unable to move temp directory to '{}'", + dir.display() + ))?; + debug!("Successfully installed tools to '{}'", dir.display()); } - fs::rename(&temp_dir, &dir).await.context(format!( - "Unable to move temp directory to '{}'", - dir.display() - ))?; - debug!("Successfully installed tools to '{}'", dir.display()); Ok(()) } .await; @@ -149,7 +183,7 @@ async fn unpack_tarball(tools_dir: impl AsRef) -> Result<()> { "Unable to unpack tarball into directory '{}'", tools_dir.display() ))?; - debug!("Installed tools to '{}'", tools_dir.display()); + debug!("Unpacked tarball to '{}'", tools_dir.display()); Ok(()) } @@ -194,3 +228,112 @@ async fn test_install_tools() { assert_eq!(dockerfile_mtime, buildsys_mtime); } + +#[tokio::test] +async fn test_content_based_installation() { + let tempdir = tempfile::TempDir::new().unwrap(); + let toolsdir = tempdir.path().join("tools"); + + // First installation + install_tools(&toolsdir).await.unwrap(); + + // Get modification time of a file to check later + let test_file = toolsdir.join("Makefile.toml"); + let original_metadata = test_file.metadata().unwrap(); + let original_modified = original_metadata.modified().unwrap(); + + // Sleep to ensure potential timestamp difference would be detectable + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Second installation - should skip updates due to identical content + install_tools(&toolsdir).await.unwrap(); + + // Check that file wasn't modified (timestamp should be the same) + let new_metadata = test_file.metadata().unwrap(); + let new_modified = new_metadata.modified().unwrap(); + + assert_eq!( + original_modified, new_modified, + "File was replaced despite identical content" + ); +} + +#[tokio::test] +async fn test_content_update_when_different() { + let tempdir = tempfile::TempDir::new().unwrap(); + let toolsdir = tempdir.path().join("tools"); + + // First installation + install_tools(&toolsdir).await.unwrap(); + + // Get modification time of a file + let test_file = toolsdir.join("Makefile.toml"); + let _original_content = tokio::fs::read_to_string(&test_file).await.unwrap(); // Keep for test clarity + let original_metadata = test_file.metadata().unwrap(); + let original_modified = original_metadata.modified().unwrap(); + + // Sleep to ensure potential timestamp difference would be detectable + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Modify a file to force update on next installation + let modified_content = format!( + "modified content {}", + std::time::SystemTime::now().elapsed().unwrap().as_millis() + ); + tokio::fs::write(&test_file, &modified_content) + .await + .unwrap(); + + // Verify the file was actually modified + let intermediate_content = tokio::fs::read_to_string(&test_file).await.unwrap(); + assert_eq!( + intermediate_content, modified_content, + "Failed to modify file content for test" + ); + + // Sleep again to ensure timestamps would differ + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Second installation - should update due to content difference + install_tools(&toolsdir).await.unwrap(); + + // Check that file content was restored + let final_content = tokio::fs::read_to_string(&test_file).await.unwrap(); + assert_ne!( + final_content, modified_content, + "File content wasn't updated by reinstallation" + ); + + // Get the new metadata to check modification time + let new_metadata = test_file.metadata().unwrap(); + let new_modified = new_metadata.modified().unwrap(); + + // The file should have been replaced, so timestamps should differ + let timestamp_changed = original_modified != new_modified; + let content_changed = final_content != modified_content; + + // At least one of these conditions should be true + assert!( + timestamp_changed || content_changed, + "Neither file timestamp nor content was updated despite modified content" + ); +} + +#[tokio::test] +async fn test_concurrent_installation() { + let tempdir = tempfile::TempDir::new().unwrap(); + let toolsdir = tempdir.path().join("tools"); + + // Launch two concurrent installations + let install1 = install_tools(&toolsdir); + let install2 = install_tools(&toolsdir); + + // Both should complete without errors + let (result1, result2) = tokio::join!(install1, install2); + assert!(result1.is_ok(), "First installation failed"); + assert!(result2.is_ok(), "Second installation failed"); + + // Verify tools directory has expected files + assert!(toolsdir.join("Makefile.toml").is_file()); + assert!(toolsdir.join("buildsys").is_file()); +} From ec1d39b9899f59994a60f10f84c7a5c7dcd4ae75 Mon Sep 17 00:00:00 2001 From: "Mikael Knutsson (mikn)" Date: Mon, 10 Mar 2025 14:21:34 +0100 Subject: [PATCH 4/5] fix: another race in writing out cargo_metadata.json in Makefile.toml --- twoliter/embedded/Makefile.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/twoliter/embedded/Makefile.toml b/twoliter/embedded/Makefile.toml index 3962a457f..0e290c288 100644 --- a/twoliter/embedded/Makefile.toml +++ b/twoliter/embedded/Makefile.toml @@ -837,13 +837,13 @@ else exit 1 fi -rm -f "${BUILDSYS_CARGO_METADATA_PATH}" +temp_file=$(mktemp -p "$(dirname "${BUILDSYS_CARGO_METADATA_PATH}")" metadata.tmp.XXXXXX) cargo metadata \ --format-version 1 \ --manifest-path "${PROJECT_MANIFEST}" \ --offline \ --all-features \ - > "${BUILDSYS_CARGO_METADATA_PATH}" + > "${temp_file}" && mv -f "${temp_file}" "${BUILDSYS_CARGO_METADATA_PATH}" ''' ] From 8b2ccbf145459d21248549bb8706382cff438d54 Mon Sep 17 00:00:00 2001 From: "Mikael Knutsson (mikn)" Date: Tue, 13 May 2025 10:34:04 +0200 Subject: [PATCH 5/5] fix: add flock solution to check-licenses task also to "solve" race --- twoliter/embedded/Makefile.toml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/twoliter/embedded/Makefile.toml b/twoliter/embedded/Makefile.toml index 0e290c288..0e91001a4 100644 --- a/twoliter/embedded/Makefile.toml +++ b/twoliter/embedded/Makefile.toml @@ -998,6 +998,11 @@ run_cargo_deny=" (cd /tmp/sources && cargo deny --all-features check --disable-fetch licenses bans sources) " set +e +exec 9<>.cargo/vendor.lock +if ! flock -w 90 9; then + echo "failed to obtain lock" >&2 + exit 1 +fi docker run --rm \ --network=none \ --user "$(id -u):$(id -g)" \