From e9a020ea43b642f1b1276f1d84002e0ddc9b96ee Mon Sep 17 00:00:00 2001 From: Jules de Smit Date: Tue, 24 Aug 2021 15:44:33 +0200 Subject: [PATCH 1/8] Remove Storage abstraction, and remove files iteratively on round reset Fixes #360 --- .../src/commands/aggregation.rs | 8 +- .../src/commands/initialization.rs | 6 +- phase1-coordinator/src/coordinator.rs | 53 +++------ phase1-coordinator/src/coordinator_state.rs | 4 +- phase1-coordinator/src/environment.rs | 5 +- phase1-coordinator/src/main.rs | 5 +- phase1-coordinator/src/objects/round.rs | 107 ++++-------------- phase1-coordinator/src/storage/disk.rs | 75 ++++++++---- phase1-coordinator/src/storage/storage.rs | 48 +------- 9 files changed, 103 insertions(+), 208 deletions(-) diff --git a/phase1-coordinator/src/commands/aggregation.rs b/phase1-coordinator/src/commands/aggregation.rs index 912ecff2..760f0906 100644 --- a/phase1-coordinator/src/commands/aggregation.rs +++ b/phase1-coordinator/src/commands/aggregation.rs @@ -1,7 +1,7 @@ use crate::{ environment::Environment, objects::Round, - storage::{ContributionLocator, Locator, Object, ObjectReader, Storage}, + storage::{ContributionLocator, Disk, Locator, Object, ObjectReader, StorageLocator, StorageObject}, CoordinatorError, }; use phase1::{helpers::CurveKind, Phase1}; @@ -15,7 +15,7 @@ pub(crate) struct Aggregation; impl Aggregation { /// Runs aggregation for a given environment, storage, and round. #[inline] - pub(crate) fn run(environment: &Environment, storage: &mut impl Storage, round: &Round) -> anyhow::Result<()> { + pub(crate) fn run(environment: &Environment, storage: &mut Disk, round: &Round) -> anyhow::Result<()> { let start = Instant::now(); // Fetch the round height. @@ -95,7 +95,7 @@ impl Aggregation { #[inline] fn readers<'a>( environment: &Environment, - storage: &'a impl Storage, + storage: &'a Disk, round: &Round, ) -> anyhow::Result>> { let mut readers = vec![]; @@ -149,7 +149,7 @@ mod tests { authentication::Dummy, commands::{Aggregation, Seed, SigningKey, SEED_LENGTH}, objects::Task, - storage::{Locator, Storage}, + storage::Locator, testing::prelude::*, Coordinator, }; diff --git a/phase1-coordinator/src/commands/initialization.rs b/phase1-coordinator/src/commands/initialization.rs index e331ac31..02355f92 100644 --- a/phase1-coordinator/src/commands/initialization.rs +++ b/phase1-coordinator/src/commands/initialization.rs @@ -1,6 +1,6 @@ use crate::{ environment::Environment, - storage::{ContributionLocator, Locator, Object, Storage}, + storage::{ContributionLocator, Disk, Locator, Object, StorageObject}, CoordinatorError, }; use phase1::{helpers::CurveKind, Phase1, Phase1Parameters}; @@ -23,7 +23,7 @@ impl Initialization { #[inline] pub(crate) fn run( environment: &Environment, - storage: &mut impl Storage, + storage: &mut Disk, round_height: u64, chunk_id: u64, ) -> anyhow::Result> { @@ -100,7 +100,7 @@ impl Initialization { /// Compute both contribution hashes and check for equivalence. #[inline] fn check_hash( - storage: &impl Storage, + storage: &Disk, contribution_locator: &Locator, next_contribution_locator: &Locator, ) -> anyhow::Result> { diff --git a/phase1-coordinator/src/coordinator.rs b/phase1-coordinator/src/coordinator.rs index c6d9d26f..5cd49723 100644 --- a/phase1-coordinator/src/coordinator.rs +++ b/phase1-coordinator/src/coordinator.rs @@ -22,8 +22,9 @@ use crate::{ Locator, LocatorPath, Object, - Storage, StorageAction, + StorageLocator, + StorageObject, }, }; use setup_utils::calculate_hash; @@ -333,13 +334,13 @@ impl TimeSource for MockTimeSource { /// A core structure for operating the Phase 1 ceremony. This struct /// is designed to be [Send] + [Sync]. The state of the ceremony is /// stored in a [CoordinatorState] object. -pub struct Coordinator { +pub struct Coordinator { /// The parameters and settings of this coordinator. environment: Environment, /// The signature scheme for contributors & verifiers with this coordinator. signature: Arc, /// The storage of contributions and rounds for this coordinator. - storage: S, + storage: Disk, /// The current round and participant self. state: CoordinatorState, /// The source of time, allows mocking system time for testing. @@ -348,7 +349,7 @@ pub struct Coordinator { aggregation_callback: Arc) -> () + Send + Sync>, } -impl Coordinator { +impl Coordinator { /// /// Creates a new instance of the `Coordinator`, for a given environment. /// @@ -396,10 +397,7 @@ impl Coordinator { } } -impl Coordinator -where - S: Storage + 'static, -{ +impl Coordinator { /// /// Runs a set of operations to initialize state and start the coordinator. /// @@ -2200,7 +2198,7 @@ where } #[inline] - fn load_current_round_height(storage: &S) -> Result { + fn load_current_round_height(storage: &Disk) -> Result { // Fetch the current round height from storage. match storage.get(&Locator::RoundHeight)? { // Case 1 - This is a typical round of the ceremony. @@ -2211,7 +2209,7 @@ where } #[inline] - fn load_current_round(storage: &S) -> Result { + fn load_current_round(storage: &Disk) -> Result { // Fetch the current round height from storage. let current_round_height = Self::load_current_round_height(storage)?; @@ -2220,7 +2218,7 @@ where } #[inline] - fn load_round(storage: &S, round_height: u64) -> Result { + fn load_round(storage: &Disk, round_height: u64) -> Result { // Fetch the current round height from storage. let current_round_height = Self::load_current_round_height(storage)?; @@ -2244,7 +2242,7 @@ where /// #[cfg(test)] #[inline] - pub(super) fn storage(&self) -> &S { + pub(super) fn storage(&self) -> &Disk { &self.storage } @@ -2254,7 +2252,7 @@ where /// #[cfg(test)] #[inline] - pub(super) fn storage_mut(&mut self) -> &mut S { + pub(super) fn storage_mut(&mut self) -> &mut Disk { &mut self.storage } @@ -2300,26 +2298,10 @@ where let mut round = Self::load_round(&mut self.storage, current_round_height)?; tracing::debug!("Resetting round and applying storage changes"); - if let Some(error) = round - .reset(&reset_action.remove_participants) - .into_iter() - .map(|action| match &action { - StorageAction::Remove(a) => match a.clone().try_into_locator(&self.storage) { - Ok(locator) => { - if self.storage.exists(&locator) { - return self.storage.process(action); - } else { - Ok(()) - } - } - Err(e) => Err(e), - }, - _ => self.storage.process(action), - }) - .find_map(Result::err) - { - return Err(error); - } + self.storage.process(round.reset(&reset_action.remove_participants))?; + + // Clear all files + self.storage.clear_round_files(current_round_height)?; if reset_action.rollback { if current_round_height == 0 { @@ -2346,10 +2328,7 @@ where use crate::commands::{Computation, Seed, SigningKey, Verification}; #[cfg(any(test, feature = "operator"))] -impl Coordinator -where - S: Storage + 'static, -{ +impl Coordinator { #[tracing::instrument( skip(self, contributor, contributor_signing_key, contributor_seed), fields(contributor = %contributor), diff --git a/phase1-coordinator/src/coordinator_state.rs b/phase1-coordinator/src/coordinator_state.rs index 2d2e104a..8fff877e 100644 --- a/phase1-coordinator/src/coordinator_state.rs +++ b/phase1-coordinator/src/coordinator_state.rs @@ -4,7 +4,7 @@ use crate::{ participant::*, task::{initialize_tasks, Task}, }, - storage::{Locator, Object, Storage}, + storage::{Disk, Locator, Object}, CoordinatorError, TimeSource, }; @@ -3066,7 +3066,7 @@ impl CoordinatorState { /// Save the coordinator state in storage. #[inline] - pub(crate) fn save(&self, storage: &mut impl Storage) -> Result<(), CoordinatorError> { + pub(crate) fn save(&self, storage: &mut Disk) -> Result<(), CoordinatorError> { storage.update(&Locator::CoordinatorState, Object::CoordinatorState(self.clone())) } } diff --git a/phase1-coordinator/src/environment.rs b/phase1-coordinator/src/environment.rs index 0abc1409..0c418cc9 100644 --- a/phase1-coordinator/src/environment.rs +++ b/phase1-coordinator/src/environment.rs @@ -1,7 +1,4 @@ -use crate::{ - objects::Participant, - storage::{Disk, Storage}, -}; +use crate::{objects::Participant, storage::Disk}; use phase1::{helpers::CurveKind, ContributionMode, ProvingSystem}; use setup_utils::{CheckForCorrectness, UseCompression}; diff --git a/phase1-coordinator/src/main.rs b/phase1-coordinator/src/main.rs index 1dfa2fff..099127ff 100644 --- a/phase1-coordinator/src/main.rs +++ b/phase1-coordinator/src/main.rs @@ -10,7 +10,7 @@ use std::{sync::Arc, time::Duration}; use tokio::{sync::RwLock, task, time::sleep}; use tracing::*; -fn coordinator(environment: &Environment, signature: Arc) -> anyhow::Result> { +fn coordinator(environment: &Environment, signature: Arc) -> anyhow::Result { Ok(Coordinator::new(environment.clone(), signature)?) } @@ -29,8 +29,7 @@ pub async fn main() -> anyhow::Result<()> { // let environment: Environment = Production::from(Parameters::AleoInner).into(); // Instantiate the coordinator. - let coordinator: Arc>> = - Arc::new(RwLock::new(coordinator(&environment, Arc::new(Dummy))?)); + let coordinator: Arc> = Arc::new(RwLock::new(coordinator(&environment, Arc::new(Dummy))?)); let ceremony_coordinator = coordinator.clone(); // Initialize the coordinator. diff --git a/phase1-coordinator/src/objects/round.rs b/phase1-coordinator/src/objects/round.rs index 740c56dc..e763d278 100644 --- a/phase1-coordinator/src/objects/round.rs +++ b/phase1-coordinator/src/objects/round.rs @@ -4,12 +4,13 @@ use crate::{ storage::{ ContributionLocator, ContributionSignatureLocator, + Disk, Locator, LocatorPath, Object, RemoveAction, - Storage, StorageAction, + StorageLocator, UpdateAction, }, CoordinatorError, @@ -87,7 +88,7 @@ impl Round { #[inline] pub(crate) fn new( environment: &Environment, - storage: &mut impl Storage, + storage: &mut Disk, round_height: u64, started_at: DateTime, contributor_ids: Vec, @@ -343,7 +344,7 @@ impl Round { )] pub(crate) fn current_contribution_locator( &self, - storage: &impl Storage, + storage: &Disk, chunk_id: u64, verified: bool, ) -> Result { @@ -404,7 +405,7 @@ impl Round { )] pub(crate) fn next_contribution_locator( &self, - storage: &impl Storage, + storage: &Disk, chunk_id: u64, ) -> Result { // Fetch the current round height. @@ -460,7 +461,7 @@ impl Round { #[inline] pub(crate) fn next_contribution_file_signature_locator( &self, - storage: &impl Storage, + storage: &Disk, chunk_id: u64, ) -> Result { // Fetch the current round height. @@ -511,7 +512,7 @@ impl Round { pub(crate) fn try_lock_chunk( &mut self, environment: &Environment, - storage: &mut impl Storage, + storage: &mut Disk, chunk_id: u64, participant: &Participant, ) -> Result { @@ -785,7 +786,7 @@ impl Round { /// Remove a contributor from the round. pub(crate) fn remove_contributor_unsafe( &mut self, - storage: &mut impl Storage, + storage: &mut Disk, contributor: &Participant, locked_chunks: &[u64], tasks: &[Task], @@ -822,7 +823,7 @@ impl Round { #[inline] pub(crate) fn remove_locks_unsafe( &mut self, - storage: &mut impl Storage, + storage: &mut Disk, participant: &Participant, locked_chunks: &[u64], ) -> Result<(), CoordinatorError> { @@ -996,7 +997,7 @@ impl Round { )] pub(crate) fn remove_chunk_contributions_unsafe( &mut self, - storage: &mut impl Storage, + storage: &mut Disk, participant: &Participant, tasks: &[Task], ) -> Result<(), CoordinatorError> { @@ -1120,82 +1121,14 @@ impl Round { /// the [crate::storage::Storage] to reflect the changes to the /// round state. `remove_participants` is a list of participants /// to remove from the round. - pub(crate) fn reset(&mut self, remove_participants: &[Participant]) -> Vec { - let expected_number_of_contributions = self.expected_number_of_contributions(); - let round_height = self.round_height(); - - let mut actions: Vec = - self.chunks - .iter_mut() - .flat_map(|chunk| { - let chunk_id = chunk.chunk_id(); - - let contributions_remove: Vec<(u64, Vec)> = chunk.get_contributions() - .iter() - .filter(|(id, _)| **id != 0) // don't remove initial challenge - .map(|(id, contribution)| { - let actions: Vec = contribution.get_locators() - .into_iter() - .map(|path| RemoveAction::new(path)) - .collect(); - (*id, actions) - }) - .collect(); - - // Remove files that were initialized when the lock was taken, - // but have not yet had the contirbution/verification uploaded. - let remove_initialized_files: Vec = match chunk.lock_holder() { - Some(participant) => { - let (adjusted_round_height, contribution_id, is_verified) = match participant { - Participant::Contributor(_) => { - (round_height, chunk.current_contribution_id() + 1, false) - } - Participant::Verifier(_) => { - let (adjusted_round_height, contribution_id) = - if chunk.current_contribution_id() == expected_number_of_contributions - 1 { - // handle the case where the final verification becomes - // the first verification for the next round. - (round_height + 1, 0) - } else { - (round_height, chunk.current_contribution_id()) - }; - (adjusted_round_height, contribution_id, true) - } - }; - - let contribution_locator = Locator::ContributionFile(ContributionLocator::new( - adjusted_round_height, - chunk_id, - contribution_id, - is_verified, - )); - let signature_locator = Locator::ContributionFileSignature( - ContributionSignatureLocator::new(round_height, chunk_id, contribution_id, is_verified), - ); - - vec![ - RemoveAction::new(contribution_locator), - RemoveAction::new(signature_locator), - ] - } - None => Vec::new(), - }; - - chunk.set_lock_holder_unsafe(None); - - let actions: Vec = contributions_remove - .into_iter() - .flat_map(|(contribution_id, actions)| { - chunk.remove_contribution_unsafe(contribution_id); - actions.into_iter() - }) - .map(StorageAction::Remove) - .chain(remove_initialized_files.into_iter().map(StorageAction::Remove)) - .collect(); + pub(crate) fn reset(&mut self, remove_participants: &[Participant]) -> StorageAction { + self.chunks.iter_mut().for_each(|chunk| { + chunk.set_lock_holder_unsafe(None); - actions.into_iter() - }) - .collect(); + for (id, _) in chunk.clone().get_contributions() { + chunk.remove_contribution_unsafe(*id); + } + }); // Remove the requested participants from the set of contributor IDs. self.contributor_ids = self @@ -1213,14 +1146,12 @@ impl Round { .filter(|v| remove_participants.iter().find(|p| p == &v).is_none()) .collect(); - actions.push(StorageAction::Update(UpdateAction { + StorageAction::Update(UpdateAction { locator: Locator::RoundState { round_height: self.height, }, object: Object::RoundState(self.clone()), // PERFORMANCE: clone here is not great for performance - })); - - actions + }) } } diff --git a/phase1-coordinator/src/storage/disk.rs b/phase1-coordinator/src/storage/disk.rs index aa9eb96c..2aa647d9 100644 --- a/phase1-coordinator/src/storage/disk.rs +++ b/phase1-coordinator/src/storage/disk.rs @@ -8,7 +8,6 @@ use crate::{ Object, ObjectReader, ObjectWriter, - Storage, StorageLocator, StorageObject, }, @@ -16,6 +15,7 @@ use crate::{ CoordinatorState, }; +use anyhow::Result; use itertools::Itertools; use memmap::{MmapMut, MmapOptions}; use rayon::prelude::*; @@ -24,8 +24,8 @@ use std::{ collections::{BTreeSet, HashMap, HashSet}, convert::TryFrom, fs::{self, File, OpenOptions}, - io::Write, - path::Path, + io::{self, Error, ErrorKind, Write}, + path::{Path, PathBuf}, str::FromStr, sync::{Arc, RwLock}, }; @@ -41,10 +41,44 @@ pub struct Disk { resolver: DiskResolver, } -impl Storage for Disk { +impl Disk { + pub fn clear_round_files(&self, current_round_height: u64) -> Result<()> { + // Let's first fully clear any files in the next round - these will be + // verifications and represent the initial challenges. + let next_round_dir = self.resolver.round_directory(current_round_height + 1); + clear_dir_files(next_round_dir.into(), &fs::remove_file)?; + + // Now, let's clear all the contributions made on this round. + let round_dir = self.resolver.round_directory(current_round_height); + clear_dir_files(round_dir.into(), &remove_round_contribution) + } +} + +fn remove_round_contribution(path: PathBuf) -> io::Result<()> { + let file_name = path + .to_str() + .ok_or(Error::new(ErrorKind::Other, "filepath is not UTF-8 encoded"))? + .to_owned(); + + match file_name.contains("contribution_0") { + true => Ok(()), + false => fs::remove_file(path), + } +} + +fn clear_dir_files(path: PathBuf, remove_file: &dyn Fn(PathBuf) -> io::Result<()>) -> Result<()> { + Ok(for entry in fs::read_dir(path.as_path())? { + let entry = entry?; + match entry.path().is_dir() { + true => clear_dir_files(entry.path(), remove_file)?, + false => remove_file(entry.path())?, + } + }) +} + +impl Disk { /// Loads a new instance of `Disk`. - #[inline] - fn load(environment: &Environment) -> Result + pub fn load(environment: &Environment) -> Result where Self: Sized, { @@ -94,8 +128,7 @@ impl Storage for Disk { } /// Initializes the location corresponding to the given locator. - #[inline] - fn initialize(&mut self, locator: Locator, size: u64) -> Result<(), CoordinatorError> { + pub fn initialize(&mut self, locator: Locator, size: u64) -> Result<(), CoordinatorError> { let locator_path = self.to_path(&locator)?; trace!("Initializing {:?}", locator_path); @@ -128,8 +161,7 @@ impl Storage for Disk { } /// Returns `true` if a given locator exists in storage. Otherwise, returns `false`. - #[inline] - fn exists(&self, locator: &Locator) -> bool { + pub fn exists(&self, locator: &Locator) -> bool { let is_in_manifest = self.manifest.read().unwrap().contains(locator); #[cfg(test)] trace!("Checking if locator exists in storage (manifest = {})", is_in_manifest,); @@ -137,8 +169,7 @@ impl Storage for Disk { } /// Returns `true` if a given locator is opened in storage. Otherwise, returns `false`. - #[inline] - fn is_open(&self, locator: &Locator) -> bool { + pub fn is_open(&self, locator: &Locator) -> bool { let is_in_manifest = self.manifest.read().unwrap().contains(locator); let is_in_locators = self.open.contains_key(locator); #[cfg(test)] @@ -151,8 +182,7 @@ impl Storage for Disk { } /// Returns a copy of an object at the given locator in storage, if it exists. - #[inline] - fn get(&self, locator: &Locator) -> Result { + pub fn get(&self, locator: &Locator) -> Result { trace!("Fetching {}", self.to_path(locator)?); // Check that the given locator exists in storage. @@ -254,8 +284,7 @@ impl Storage for Disk { } /// Inserts a new object at the given locator into storage, if it does not exist. - #[inline] - fn insert(&mut self, locator: Locator, object: Object) -> Result<(), CoordinatorError> { + pub fn insert(&mut self, locator: Locator, object: Object) -> Result<(), CoordinatorError> { trace!("Inserting {}", self.to_path(&locator)?); // Check that the given locator does not exist in storage. @@ -281,8 +310,7 @@ impl Storage for Disk { } /// Updates an existing object for the given locator in storage, if it exists. - #[inline] - fn update(&mut self, locator: &Locator, object: Object) -> Result<(), CoordinatorError> { + pub fn update(&mut self, locator: &Locator, object: Object) -> Result<(), CoordinatorError> { trace!("Updating {}", self.to_path(locator)?); // Check that the given locator exists in storage. @@ -325,8 +353,7 @@ impl Storage for Disk { } /// Copies an object from the given source locator to the given destination locator. - #[inline] - fn copy(&mut self, source_locator: &Locator, destination_locator: &Locator) -> Result<(), CoordinatorError> { + pub fn copy(&mut self, source_locator: &Locator, destination_locator: &Locator) -> Result<(), CoordinatorError> { trace!( "Copying from A to B\n\n\tA: {}\n\tB: {}\n", self.to_path(source_locator)?, @@ -359,8 +386,7 @@ impl Storage for Disk { } /// Removes the object corresponding to the given locator from storage. - #[inline] - fn remove(&mut self, locator: &Locator) -> Result<(), CoordinatorError> { + pub fn remove(&mut self, locator: &Locator) -> Result<(), CoordinatorError> { trace!("Removing {}", self.to_path(locator)?); // Check that the locator exists in storage. @@ -394,8 +420,7 @@ impl Storage for Disk { } /// Returns the size of the object stored at the given locator. - #[inline] - fn size(&self, locator: &Locator) -> Result { + pub fn size(&self, locator: &Locator) -> Result { trace!("Fetching size of {}", self.to_path(locator)?); // Check that the given locator exists in storage. @@ -414,7 +439,7 @@ impl Storage for Disk { Ok(size) } - fn process(&mut self, action: StorageAction) -> Result<(), CoordinatorError> { + pub fn process(&mut self, action: StorageAction) -> Result<(), CoordinatorError> { match action { StorageAction::Remove(remove_action) => { let locator = remove_action.try_into_locator(self)?; diff --git a/phase1-coordinator/src/storage/storage.rs b/phase1-coordinator/src/storage/storage.rs index 472e311a..ef180b3c 100644 --- a/phase1-coordinator/src/storage/storage.rs +++ b/phase1-coordinator/src/storage/storage.rs @@ -15,6 +15,8 @@ use std::{ sync::{RwLockReadGuard, RwLockWriteGuard}, }; +use super::Disk; + #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Serialize, Deserialize)] pub struct ContributionLocator { round_height: u64, @@ -193,44 +195,6 @@ impl Object { pub type ObjectReader<'a> = RwLockReadGuard<'a, MmapMut>; pub type ObjectWriter<'a> = RwLockWriteGuard<'a, MmapMut>; -/// A standard model for storage. -pub trait Storage: Send + Sync + StorageLocator + StorageObject { - /// Loads a new instance of `Storage`. - fn load(environment: &Environment) -> Result - where - Self: Sized; - - /// Initializes the location corresponding to the given locator with the given size. - fn initialize(&mut self, locator: Locator, size: u64) -> Result<(), CoordinatorError>; - - /// Returns `true` if a given locator exists in storage. Otherwise, returns `false`. - fn exists(&self, locator: &Locator) -> bool; - - /// Returns `true` if a given locator is opened in storage. Otherwise, returns `false`. - fn is_open(&self, locator: &Locator) -> bool; - - /// Returns a copy of an object at the given locator in storage, if it exists. - fn get(&self, locator: &Locator) -> Result; - - /// Inserts a new object at the given locator into storage, if it does not exist. - fn insert(&mut self, locator: Locator, object: Object) -> Result<(), CoordinatorError>; - - /// Updates an existing object for the given locator in storage, if it exists. - fn update(&mut self, locator: &Locator, object: Object) -> Result<(), CoordinatorError>; - - /// Copies the object in the given source locator to the given destination locator. - fn copy(&mut self, source_locator: &Locator, destination_locator: &Locator) -> Result<(), CoordinatorError>; - - /// Removes a object from storage for a given locator. - fn remove(&mut self, locator: &Locator) -> Result<(), CoordinatorError>; - - /// Returns the size of the object stored at the given locator. - fn size(&self, locator: &Locator) -> Result; - - /// Process a [StorageAction] which mutates the storage. - fn process(&mut self, action: StorageAction) -> Result<(), CoordinatorError>; -} - /// The path to a resource defined by a [Locator]. #[derive(Clone, Serialize, Deserialize, Eq, PartialEq, Hash, PartialOrd, Ord)] pub struct LocatorPath(String); @@ -296,14 +260,14 @@ pub enum LocatorOrPath { } impl LocatorOrPath { - pub fn try_into_locator(self, storage: &impl Storage) -> Result { + pub fn try_into_locator(self, storage: &Disk) -> Result { match self { LocatorOrPath::Path(path) => storage.to_locator(&path), LocatorOrPath::Locator(locator) => Ok(locator), } } - pub fn try_into_path(self, storage: &impl Storage) -> Result { + pub fn try_into_path(self, storage: &Disk) -> Result { match self { LocatorOrPath::Path(path) => Ok(path), LocatorOrPath::Locator(locator) => storage.to_path(&locator), @@ -345,11 +309,11 @@ impl RemoveAction { /// Obtain the location of the item to be removed from [Storage] /// as a [Locator]. - pub fn try_into_locator(self, storage: &impl Storage) -> Result { + pub fn try_into_locator(self, storage: &Disk) -> Result { self.locator_or_path.try_into_locator(storage) } - pub fn try_into_path(self, storage: &impl Storage) -> Result { + pub fn try_into_path(self, storage: &Disk) -> Result { self.locator_or_path.try_into_path(storage) } } From b23e156c9cb9f4eb784c3d670baba2851d6d375d Mon Sep 17 00:00:00 2001 From: Jules de Smit Date: Tue, 24 Aug 2021 16:16:32 +0200 Subject: [PATCH 2/8] Use proper deletion method, simplify deletion logic --- .../src/commands/computation.rs | 6 +-- phase1-coordinator/src/commands/mod.rs | 4 +- .../src/commands/verification.rs | 16 +++++-- phase1-coordinator/src/main.rs | 1 - phase1-coordinator/src/storage/disk.rs | 45 ++++++++++--------- phase1-coordinator/src/testing/coordinator.rs | 6 +-- 6 files changed, 43 insertions(+), 35 deletions(-) diff --git a/phase1-coordinator/src/commands/computation.rs b/phase1-coordinator/src/commands/computation.rs index f9af608f..f152909a 100644 --- a/phase1-coordinator/src/commands/computation.rs +++ b/phase1-coordinator/src/commands/computation.rs @@ -2,7 +2,7 @@ use crate::{ authentication::Signature, commands::SigningKey, environment::Environment, - storage::{Locator, Storage}, + storage::{Disk, Locator, StorageLocator, StorageObject}, CoordinatorError, }; use phase1::{helpers::CurveKind, Phase1, Phase1Parameters}; @@ -30,7 +30,7 @@ impl Computation { /// pub(crate) fn run( environment: &Environment, - storage: &mut impl Storage, + storage: &mut Disk, signature: Arc, contributor_signing_key: &SigningKey, challenge_locator: &Locator, @@ -175,7 +175,7 @@ mod tests { use crate::{ authentication::{Dummy, Signature}, commands::{Computation, Initialization, Seed, SEED_LENGTH}, - storage::{ContributionLocator, ContributionSignatureLocator, Locator, Object, Storage, StorageObject}, + storage::{ContributionLocator, ContributionSignatureLocator, Locator, Object, StorageObject}, testing::prelude::*, }; use setup_utils::calculate_hash; diff --git a/phase1-coordinator/src/commands/mod.rs b/phase1-coordinator/src/commands/mod.rs index f0aebb7e..99e0f20e 100644 --- a/phase1-coordinator/src/commands/mod.rs +++ b/phase1-coordinator/src/commands/mod.rs @@ -18,7 +18,7 @@ pub(crate) use verification::*; use crate::{ authentication::Signature, objects::{ContributionFileSignature, ContributionState}, - storage::{Locator, Storage}, + storage::{Disk, Locator, StorageLocator, StorageObject}, CoordinatorError, }; @@ -47,7 +47,7 @@ pub type SigningKey = String; #[cfg(any(test, feature = "operator"))] #[inline] pub(crate) fn write_contribution_file_signature( - storage: &mut impl Storage, + storage: &mut Disk, signature: Arc, signing_key: &SigningKey, challenge_locator: &Locator, diff --git a/phase1-coordinator/src/commands/verification.rs b/phase1-coordinator/src/commands/verification.rs index bca69193..06240a7b 100644 --- a/phase1-coordinator/src/commands/verification.rs +++ b/phase1-coordinator/src/commands/verification.rs @@ -2,7 +2,15 @@ use crate::{ authentication::Signature, commands::SigningKey, environment::Environment, - storage::{ContributionLocator, ContributionSignatureLocator, Locator, Object, Storage}, + storage::{ + ContributionLocator, + ContributionSignatureLocator, + Disk, + Locator, + Object, + StorageLocator, + StorageObject, + }, CoordinatorError, }; use phase1::{helpers::CurveKind, Phase1, Phase1Parameters, PublicKey}; @@ -23,7 +31,7 @@ impl Verification { #[inline] pub(crate) fn run( environment: &Environment, - storage: &mut impl Storage, + storage: &mut Disk, signature: Arc, signing_key: &SigningKey, round_height: u64, @@ -148,7 +156,7 @@ impl Verification { #[inline] fn verification( environment: &Environment, - storage: &mut impl Storage, + storage: &mut Disk, chunk_id: u64, challenge_locator: Locator, response_locator: Locator, @@ -341,7 +349,7 @@ mod tests { use crate::{ authentication::Dummy, commands::{Computation, Seed, Verification, SEED_LENGTH}, - storage::{ContributionLocator, ContributionSignatureLocator, Locator, Object, Storage}, + storage::{ContributionLocator, ContributionSignatureLocator, Locator, Object}, testing::prelude::*, Coordinator, }; diff --git a/phase1-coordinator/src/main.rs b/phase1-coordinator/src/main.rs index 099127ff..3df2150e 100644 --- a/phase1-coordinator/src/main.rs +++ b/phase1-coordinator/src/main.rs @@ -1,7 +1,6 @@ use phase1_coordinator::{ authentication::{Dummy, Signature}, environment::{Development, Environment, Parameters}, - storage::Disk, Coordinator, }; use tracing_subscriber; diff --git a/phase1-coordinator/src/storage/disk.rs b/phase1-coordinator/src/storage/disk.rs index 2aa647d9..c890c94b 100644 --- a/phase1-coordinator/src/storage/disk.rs +++ b/phase1-coordinator/src/storage/disk.rs @@ -42,40 +42,41 @@ pub struct Disk { } impl Disk { - pub fn clear_round_files(&self, current_round_height: u64) -> Result<()> { + pub fn clear_round_files(&mut self, current_round_height: u64) -> Result<()> { // Let's first fully clear any files in the next round - these will be // verifications and represent the initial challenges. let next_round_dir = self.resolver.round_directory(current_round_height + 1); - clear_dir_files(next_round_dir.into(), &fs::remove_file)?; + self.clear_dir_files(next_round_dir.into(), true)?; // Now, let's clear all the contributions made on this round. let round_dir = self.resolver.round_directory(current_round_height); - clear_dir_files(round_dir.into(), &remove_round_contribution) + self.clear_dir_files(round_dir.into(), false) } -} -fn remove_round_contribution(path: PathBuf) -> io::Result<()> { - let file_name = path - .to_str() - .ok_or(Error::new(ErrorKind::Other, "filepath is not UTF-8 encoded"))? - .to_owned(); + fn clear_dir_files(&mut self, path: PathBuf, delete_initial_contribution: bool) -> Result<()> { + Ok(for entry in fs::read_dir(path.as_path())? { + let entry = entry?; + match entry.path().is_dir() { + true => self.clear_dir_files(entry.path(), delete_initial_contribution)?, + false => { + let file_path = entry + .path() + .to_str() + .ok_or(Error::new(ErrorKind::Other, "filepath is not UTF-8 encoded"))? + .to_owned(); + + if !delete_initial_contribution && file_path.contains("contribution_0") { + continue; + } - match file_name.contains("contribution_0") { - true => Ok(()), - false => fs::remove_file(path), + let locator = self.resolver.to_locator(&LocatorPath::new(file_path))?; + self.remove(&locator)?; + } + }; + }) } } -fn clear_dir_files(path: PathBuf, remove_file: &dyn Fn(PathBuf) -> io::Result<()>) -> Result<()> { - Ok(for entry in fs::read_dir(path.as_path())? { - let entry = entry?; - match entry.path().is_dir() { - true => clear_dir_files(entry.path(), remove_file)?, - false => remove_file(entry.path())?, - } - }) -} - impl Disk { /// Loads a new instance of `Disk`. pub fn load(environment: &Environment) -> Result diff --git a/phase1-coordinator/src/testing/coordinator.rs b/phase1-coordinator/src/testing/coordinator.rs index 7e72a475..3570a7d6 100644 --- a/phase1-coordinator/src/testing/coordinator.rs +++ b/phase1-coordinator/src/testing/coordinator.rs @@ -2,7 +2,7 @@ use crate::{ authentication::Dummy, environment::{Environment, Parameters, Testing}, objects::{Participant, Round}, - storage::{Disk, Storage}, + storage::Disk, Coordinator, CoordinatorError, }; @@ -57,7 +57,7 @@ pub static TEST_CONTRIBUTOR_IDS: Lazy> = Lazy::new(|| vec![Lazy /// Verifier IDs for testing purposes only. pub static TEST_VERIFIER_IDS: Lazy> = Lazy::new(|| vec![Lazy::force(&TEST_VERIFIER_ID).clone()]); -pub fn test_coordinator(environment: &Environment) -> anyhow::Result> { +pub fn test_coordinator(environment: &Environment) -> anyhow::Result { info!("Starting coordinator"); let coordinator = Coordinator::new(environment.clone(), Arc::new(Dummy))?; info!("Coordinator is ready"); @@ -110,7 +110,7 @@ fn clear_test_storage(environment: &Environment) { } /// Initializes a test storage object. -pub fn test_storage(environment: &Environment) -> impl Storage { +pub fn test_storage(environment: &Environment) -> Disk { environment.storage().unwrap() } From 0497bad328e8df70c0e44c68dbf84519f538720c Mon Sep 17 00:00:00 2001 From: Jules de Smit Date: Tue, 24 Aug 2021 16:34:25 +0200 Subject: [PATCH 3/8] Fix tests --- phase1-coordinator/src/coordinator.rs | 77 ++++++++++--------------- phase1-coordinator/src/objects/round.rs | 8 +-- phase1-coordinator/src/tests.rs | 14 ++--- 3 files changed, 39 insertions(+), 60 deletions(-) diff --git a/phase1-coordinator/src/coordinator.rs b/phase1-coordinator/src/coordinator.rs index 5cd49723..cfc55954 100644 --- a/phase1-coordinator/src/coordinator.rs +++ b/phase1-coordinator/src/coordinator.rs @@ -6,25 +6,14 @@ use crate::{ authentication::Signature, commands::{Aggregation, Initialization}, coordinator_state::{ - CeremonyStorageAction, - CoordinatorState, - DropParticipant, - ParticipantInfo, - ResetCurrentRoundStorageAction, + CeremonyStorageAction, CoordinatorState, DropParticipant, ParticipantInfo, ResetCurrentRoundStorageAction, RoundMetrics, }, environment::{Deployment, Environment}, objects::{participant::*, task::TaskInitializationError, ContributionFileSignature, LockedLocators, Round, Task}, storage::{ - ContributionLocator, - ContributionSignatureLocator, - Disk, - Locator, - LocatorPath, - Object, - StorageAction, - StorageLocator, - StorageObject, + ContributionLocator, ContributionSignatureLocator, Disk, Locator, LocatorPath, Object, StorageAction, + StorageLocator, StorageObject, }, }; use setup_utils::calculate_hash; @@ -2682,7 +2671,7 @@ mod tests { Ok(()) } - fn initialize_coordinator(coordinator: &mut Coordinator) -> anyhow::Result<()> { + fn initialize_coordinator(coordinator: &mut Coordinator) -> anyhow::Result<()> { // Load the contributors and verifiers. let contributors = vec![ Lazy::force(&TEST_CONTRIBUTOR_ID).clone(), @@ -2692,7 +2681,7 @@ mod tests { initialize_to_round_1(coordinator, &contributors) } - fn initialize_coordinator_single_contributor(coordinator: &mut Coordinator) -> anyhow::Result<()> { + fn initialize_coordinator_single_contributor(coordinator: &mut Coordinator) -> anyhow::Result<()> { // Load the contributors and verifiers. let contributors = vec![Lazy::force(&TEST_CONTRIBUTOR_ID).clone()]; @@ -2884,18 +2873,16 @@ mod tests { // Run the computation let mut seed: Seed = [0; SEED_LENGTH]; rand::thread_rng().fill_bytes(&mut seed[..]); - assert!( - coordinator - .run_computation( - round_height, - chunk_id, - contribution_id, - &contributor, - &contributor_signing_key, - &seed - ) - .is_ok() - ); + assert!(coordinator + .run_computation( + round_height, + chunk_id, + contribution_id, + &contributor, + &contributor_signing_key, + &seed + ) + .is_ok()); } // Add contribution for round 1 chunk 0 contribution 1. @@ -2940,18 +2927,16 @@ mod tests { // Run computation on round 1 chunk 0 contribution 1. let mut seed: Seed = [0; SEED_LENGTH]; rand::thread_rng().fill_bytes(&mut seed[..]); - assert!( - coordinator - .run_computation( - round_height, - chunk_id, - contribution_id, - contributor, - &contributor_signing_key, - &seed - ) - .is_ok() - ); + assert!(coordinator + .run_computation( + round_height, + chunk_id, + contribution_id, + contributor, + &contributor_signing_key, + &seed + ) + .is_ok()); // Add round 1 chunk 0 contribution 1. assert!(coordinator.add_contribution(chunk_id, &contributor).is_ok()); @@ -3249,13 +3234,11 @@ mod tests { let mut seeds = HashMap::new(); for chunk_id in 0..TEST_ENVIRONMENT_3.number_of_chunks() { // Ensure contribution ID 0 is already verified by the coordinator. - assert!( - coordinator - .current_round()? - .chunk(chunk_id)? - .get_contribution(0)? - .is_verified() - ); + assert!(coordinator + .current_round()? + .chunk(chunk_id)? + .get_contribution(0)? + .is_verified()); // As contribution ID 0 is initialized by the coordinator, iterate from // contribution ID 1 up to the expected number of contributions. diff --git a/phase1-coordinator/src/objects/round.rs b/phase1-coordinator/src/objects/round.rs index e763d278..8d05e50c 100644 --- a/phase1-coordinator/src/objects/round.rs +++ b/phase1-coordinator/src/objects/round.rs @@ -1126,7 +1126,9 @@ impl Round { chunk.set_lock_holder_unsafe(None); for (id, _) in chunk.clone().get_contributions() { - chunk.remove_contribution_unsafe(*id); + if *id != 0 { + chunk.remove_contribution_unsafe(*id); + } } }); @@ -1213,10 +1215,8 @@ mod tests { let n_verifications = 30; let n_locked_chunks = 1; let n_files = 2 * n_contributions + 2 * n_verifications + 2 * n_locked_chunks; - let n_actions = n_files + 1; // include action to update round - let actions = round_1.reset(&[TEST_CONTRIBUTOR_ID_2.clone()]); - assert_eq!(n_actions, actions.len()); + let action = round_1.reset(&[TEST_CONTRIBUTOR_ID_2.clone()]); assert_eq!(64, round_1.chunks().len()); diff --git a/phase1-coordinator/src/tests.rs b/phase1-coordinator/src/tests.rs index 69c7f01e..f0f1508a 100644 --- a/phase1-coordinator/src/tests.rs +++ b/phase1-coordinator/src/tests.rs @@ -3,13 +3,9 @@ use crate::{ commands::{Seed, SigningKey, SEED_LENGTH}, environment::{Environment, Parameters, Settings, Testing}, objects::Task, - storage::{Disk, Storage}, + storage::{Disk, StorageLocator, StorageObject}, testing::prelude::*, - Coordinator, - CoordinatorError, - MockTimeSource, - Participant, - Round, + Coordinator, CoordinatorError, MockTimeSource, Participant, Round, }; use chrono::Utc; use phase1::{helpers::CurveKind, ContributionMode, ProvingSystem}; @@ -53,7 +49,7 @@ struct ContributorTestDetails { } impl ContributorTestDetails { - fn contribute_to(&self, coordinator: &mut Coordinator) -> Result<(), CoordinatorError> { + fn contribute_to(&self, coordinator: &mut Coordinator) -> Result<(), CoordinatorError> { coordinator.contribute(&self.participant, &self.signing_key, &self.seed) } } @@ -1275,7 +1271,7 @@ fn coordinator_drop_several_contributors() { fn contribute_verify_until_no_tasks( contributor: &ContributorTestDetails, verifier: &VerifierTestDetails, - coordinator: &mut Coordinator, + coordinator: &mut Coordinator, ) -> anyhow::Result { match contributor.contribute_to(coordinator) { Err(CoordinatorError::ParticipantHasNoRemainingTasks) => Ok(true), @@ -1323,7 +1319,7 @@ fn coordinator_drop_several_contributors() { assert_eq!(0, coordinator.number_of_queue_contributors()); } -fn check_round_matches_storage_files(storage: &impl Storage, round: &Round) { +fn check_round_matches_storage_files(storage: &Disk, round: &Round) { debug!("Checking round {}", round.round_height()); for chunk in round.chunks() { debug!("Checking chunk {}", chunk.chunk_id()); From dd3933ce44f38041d2bf66390076e28dca589c68 Mon Sep 17 00:00:00 2001 From: Jules de Smit Date: Wed, 8 Sep 2021 18:09:19 +0200 Subject: [PATCH 4/8] Fix fmt --- phase1-coordinator/src/coordinator.rs | 73 +++++++++++++++++---------- phase1-coordinator/src/tests.rs | 6 ++- 2 files changed, 50 insertions(+), 29 deletions(-) diff --git a/phase1-coordinator/src/coordinator.rs b/phase1-coordinator/src/coordinator.rs index cfc55954..e6fe21ae 100644 --- a/phase1-coordinator/src/coordinator.rs +++ b/phase1-coordinator/src/coordinator.rs @@ -6,14 +6,25 @@ use crate::{ authentication::Signature, commands::{Aggregation, Initialization}, coordinator_state::{ - CeremonyStorageAction, CoordinatorState, DropParticipant, ParticipantInfo, ResetCurrentRoundStorageAction, + CeremonyStorageAction, + CoordinatorState, + DropParticipant, + ParticipantInfo, + ResetCurrentRoundStorageAction, RoundMetrics, }, environment::{Deployment, Environment}, objects::{participant::*, task::TaskInitializationError, ContributionFileSignature, LockedLocators, Round, Task}, storage::{ - ContributionLocator, ContributionSignatureLocator, Disk, Locator, LocatorPath, Object, StorageAction, - StorageLocator, StorageObject, + ContributionLocator, + ContributionSignatureLocator, + Disk, + Locator, + LocatorPath, + Object, + StorageAction, + StorageLocator, + StorageObject, }, }; use setup_utils::calculate_hash; @@ -2873,16 +2884,18 @@ mod tests { // Run the computation let mut seed: Seed = [0; SEED_LENGTH]; rand::thread_rng().fill_bytes(&mut seed[..]); - assert!(coordinator - .run_computation( - round_height, - chunk_id, - contribution_id, - &contributor, - &contributor_signing_key, - &seed - ) - .is_ok()); + assert!( + coordinator + .run_computation( + round_height, + chunk_id, + contribution_id, + &contributor, + &contributor_signing_key, + &seed + ) + .is_ok() + ); } // Add contribution for round 1 chunk 0 contribution 1. @@ -2927,16 +2940,18 @@ mod tests { // Run computation on round 1 chunk 0 contribution 1. let mut seed: Seed = [0; SEED_LENGTH]; rand::thread_rng().fill_bytes(&mut seed[..]); - assert!(coordinator - .run_computation( - round_height, - chunk_id, - contribution_id, - contributor, - &contributor_signing_key, - &seed - ) - .is_ok()); + assert!( + coordinator + .run_computation( + round_height, + chunk_id, + contribution_id, + contributor, + &contributor_signing_key, + &seed + ) + .is_ok() + ); // Add round 1 chunk 0 contribution 1. assert!(coordinator.add_contribution(chunk_id, &contributor).is_ok()); @@ -3234,11 +3249,13 @@ mod tests { let mut seeds = HashMap::new(); for chunk_id in 0..TEST_ENVIRONMENT_3.number_of_chunks() { // Ensure contribution ID 0 is already verified by the coordinator. - assert!(coordinator - .current_round()? - .chunk(chunk_id)? - .get_contribution(0)? - .is_verified()); + assert!( + coordinator + .current_round()? + .chunk(chunk_id)? + .get_contribution(0)? + .is_verified() + ); // As contribution ID 0 is initialized by the coordinator, iterate from // contribution ID 1 up to the expected number of contributions. diff --git a/phase1-coordinator/src/tests.rs b/phase1-coordinator/src/tests.rs index f0f1508a..03107c5b 100644 --- a/phase1-coordinator/src/tests.rs +++ b/phase1-coordinator/src/tests.rs @@ -5,7 +5,11 @@ use crate::{ objects::Task, storage::{Disk, StorageLocator, StorageObject}, testing::prelude::*, - Coordinator, CoordinatorError, MockTimeSource, Participant, Round, + Coordinator, + CoordinatorError, + MockTimeSource, + Participant, + Round, }; use chrono::Utc; use phase1::{helpers::CurveKind, ContributionMode, ProvingSystem}; From 33ca464d3f9e176695ee9d01c95c6397961ac3ff Mon Sep 17 00:00:00 2001 From: Jules de Smit Date: Wed, 8 Sep 2021 18:11:53 +0200 Subject: [PATCH 5/8] Fix errors --- phase1-coordinator/src/coordinator.rs | 1 - phase1-coordinator/src/objects/round.rs | 5 ++--- phase1-coordinator/src/storage/disk.rs | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/phase1-coordinator/src/coordinator.rs b/phase1-coordinator/src/coordinator.rs index e6fe21ae..608a279d 100644 --- a/phase1-coordinator/src/coordinator.rs +++ b/phase1-coordinator/src/coordinator.rs @@ -22,7 +22,6 @@ use crate::{ Locator, LocatorPath, Object, - StorageAction, StorageLocator, StorageObject, }, diff --git a/phase1-coordinator/src/objects/round.rs b/phase1-coordinator/src/objects/round.rs index 8d05e50c..43917c4f 100644 --- a/phase1-coordinator/src/objects/round.rs +++ b/phase1-coordinator/src/objects/round.rs @@ -8,7 +8,6 @@ use crate::{ Locator, LocatorPath, Object, - RemoveAction, StorageAction, StorageLocator, UpdateAction, @@ -642,7 +641,7 @@ impl Round { pub fn initialize_verifier_response_files( &self, environment: &Environment, - storage: &mut impl Storage, + storage: &mut Disk, participant: &Participant, chunk_id: u64, locators: &LockedLocators, @@ -668,7 +667,7 @@ impl Round { /// Returns previous contribution, current contribution and next contribution paths pub(crate) fn get_chunk_locators_for_verifier( &self, - storage: &impl Storage, + storage: &Disk, participant: &Participant, chunk_id: u64, contribution_id: u64, diff --git a/phase1-coordinator/src/storage/disk.rs b/phase1-coordinator/src/storage/disk.rs index c890c94b..072aa553 100644 --- a/phase1-coordinator/src/storage/disk.rs +++ b/phase1-coordinator/src/storage/disk.rs @@ -24,7 +24,7 @@ use std::{ collections::{BTreeSet, HashMap, HashSet}, convert::TryFrom, fs::{self, File, OpenOptions}, - io::{self, Error, ErrorKind, Write}, + io::{Error, ErrorKind, Write}, path::{Path, PathBuf}, str::FromStr, sync::{Arc, RwLock}, From 54ed2d893d22188c880004d70d936d83168bfa8a Mon Sep 17 00:00:00 2001 From: Jules de Smit Date: Wed, 8 Sep 2021 18:16:08 +0200 Subject: [PATCH 6/8] Fix minor review points --- phase1-coordinator/src/storage/disk.rs | 72 +++++++++++++------------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/phase1-coordinator/src/storage/disk.rs b/phase1-coordinator/src/storage/disk.rs index 072aa553..0440a394 100644 --- a/phase1-coordinator/src/storage/disk.rs +++ b/phase1-coordinator/src/storage/disk.rs @@ -41,42 +41,6 @@ pub struct Disk { resolver: DiskResolver, } -impl Disk { - pub fn clear_round_files(&mut self, current_round_height: u64) -> Result<()> { - // Let's first fully clear any files in the next round - these will be - // verifications and represent the initial challenges. - let next_round_dir = self.resolver.round_directory(current_round_height + 1); - self.clear_dir_files(next_round_dir.into(), true)?; - - // Now, let's clear all the contributions made on this round. - let round_dir = self.resolver.round_directory(current_round_height); - self.clear_dir_files(round_dir.into(), false) - } - - fn clear_dir_files(&mut self, path: PathBuf, delete_initial_contribution: bool) -> Result<()> { - Ok(for entry in fs::read_dir(path.as_path())? { - let entry = entry?; - match entry.path().is_dir() { - true => self.clear_dir_files(entry.path(), delete_initial_contribution)?, - false => { - let file_path = entry - .path() - .to_str() - .ok_or(Error::new(ErrorKind::Other, "filepath is not UTF-8 encoded"))? - .to_owned(); - - if !delete_initial_contribution && file_path.contains("contribution_0") { - continue; - } - - let locator = self.resolver.to_locator(&LocatorPath::new(file_path))?; - self.remove(&locator)?; - } - }; - }) - } -} - impl Disk { /// Loads a new instance of `Disk`. pub fn load(environment: &Environment) -> Result @@ -440,6 +404,7 @@ impl Disk { Ok(size) } + /// Process a [StorageAction] which mutates the storage. pub fn process(&mut self, action: StorageAction) -> Result<(), CoordinatorError> { match action { StorageAction::Remove(remove_action) => { @@ -449,6 +414,41 @@ impl Disk { StorageAction::Update(update_action) => self.update(&update_action.locator, update_action.object), } } + + /// Clears all files related to a round - used for round reset purposes. + pub fn clear_round_files(&mut self, round_height: u64) -> Result<()> { + // Let's first fully clear any files in the next round - these will be + // verifications and represent the initial challenges. + let next_round_dir = self.resolver.round_directory(round_height + 1); + self.clear_dir_files(next_round_dir.into(), true)?; + + // Now, let's clear all the contributions made on this round. + let round_dir = self.resolver.round_directory(round_height); + self.clear_dir_files(round_dir.into(), false) + } + + fn clear_dir_files(&mut self, path: PathBuf, delete_initial_contribution: bool) -> Result<()> { + Ok(for entry in fs::read_dir(path.as_path())? { + let entry = entry?; + match entry.path().is_dir() { + true => self.clear_dir_files(entry.path(), delete_initial_contribution)?, + false => { + let file_path = entry + .path() + .to_str() + .ok_or(Error::new(ErrorKind::Other, "filepath is not UTF-8 encoded"))? + .to_owned(); + + if !delete_initial_contribution && file_path.contains("contribution_0") { + continue; + } + + let locator = self.resolver.to_locator(&LocatorPath::new(file_path))?; + self.remove(&locator)?; + } + }; + }) + } } impl StorageLocator for Disk { From 1f1499e43ab717716320d879425feef00fad12a4 Mon Sep 17 00:00:00 2001 From: Jules de Smit Date: Wed, 8 Sep 2021 18:24:33 +0200 Subject: [PATCH 7/8] Turn clear_round_files into a StorageAction --- phase1-coordinator/src/coordinator.rs | 4 +++- phase1-coordinator/src/storage/disk.rs | 9 +++++---- phase1-coordinator/src/storage/storage.rs | 1 + 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/phase1-coordinator/src/coordinator.rs b/phase1-coordinator/src/coordinator.rs index 608a279d..e7512a74 100644 --- a/phase1-coordinator/src/coordinator.rs +++ b/phase1-coordinator/src/coordinator.rs @@ -22,6 +22,7 @@ use crate::{ Locator, LocatorPath, Object, + StorageAction, StorageLocator, StorageObject, }, @@ -2300,7 +2301,8 @@ impl Coordinator { self.storage.process(round.reset(&reset_action.remove_participants))?; // Clear all files - self.storage.clear_round_files(current_round_height)?; + self.storage + .process(StorageAction::ClearRoundFiles(current_round_height))?; if reset_action.rollback { if current_round_height == 0 { diff --git a/phase1-coordinator/src/storage/disk.rs b/phase1-coordinator/src/storage/disk.rs index 0440a394..3d771cb5 100644 --- a/phase1-coordinator/src/storage/disk.rs +++ b/phase1-coordinator/src/storage/disk.rs @@ -405,18 +405,19 @@ impl Disk { } /// Process a [StorageAction] which mutates the storage. - pub fn process(&mut self, action: StorageAction) -> Result<(), CoordinatorError> { + pub fn process(&mut self, action: StorageAction) -> Result<()> { match action { StorageAction::Remove(remove_action) => { let locator = remove_action.try_into_locator(self)?; - self.remove(&locator) + Ok(self.remove(&locator)?) } - StorageAction::Update(update_action) => self.update(&update_action.locator, update_action.object), + StorageAction::Update(update_action) => Ok(self.update(&update_action.locator, update_action.object)?), + StorageAction::ClearRoundFiles(round_height) => self.clear_round_files(round_height), } } /// Clears all files related to a round - used for round reset purposes. - pub fn clear_round_files(&mut self, round_height: u64) -> Result<()> { + fn clear_round_files(&mut self, round_height: u64) -> Result<()> { // Let's first fully clear any files in the next round - these will be // verifications and represent the initial challenges. let next_round_dir = self.resolver.round_directory(round_height + 1); diff --git a/phase1-coordinator/src/storage/storage.rs b/phase1-coordinator/src/storage/storage.rs index ef180b3c..f22464ea 100644 --- a/phase1-coordinator/src/storage/storage.rs +++ b/phase1-coordinator/src/storage/storage.rs @@ -330,6 +330,7 @@ pub struct UpdateAction { pub enum StorageAction { Remove(RemoveAction), Update(UpdateAction), + ClearRoundFiles(u64), } pub trait StorageLocator { From c4c7cdc5e489a1b2577cbc4d3119ce554be0363d Mon Sep 17 00:00:00 2001 From: Jules de Smit Date: Wed, 8 Sep 2021 18:26:46 +0200 Subject: [PATCH 8/8] Fix tests --- phase1-coordinator/src/coordinator.rs | 2 +- phase1-coordinator/src/tests.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/phase1-coordinator/src/coordinator.rs b/phase1-coordinator/src/coordinator.rs index e7512a74..708cad2b 100644 --- a/phase1-coordinator/src/coordinator.rs +++ b/phase1-coordinator/src/coordinator.rs @@ -2637,7 +2637,7 @@ mod tests { use rand::RngCore; use std::{collections::HashMap, sync::Arc}; - fn initialize_to_round_1(coordinator: &mut Coordinator, contributors: &[Participant]) -> anyhow::Result<()> { + fn initialize_to_round_1(coordinator: &mut Coordinator, contributors: &[Participant]) -> anyhow::Result<()> { // Initialize the ceremony and add the contributors and verifiers to the queue. { // Run initialization. diff --git a/phase1-coordinator/src/tests.rs b/phase1-coordinator/src/tests.rs index 03107c5b..b296e87b 100644 --- a/phase1-coordinator/src/tests.rs +++ b/phase1-coordinator/src/tests.rs @@ -75,7 +75,7 @@ struct VerifierTestDetails { impl VerifierTestDetails { /// If there are pending verifications, grab one and verify it. /// Otherwise do nothing - fn verify_if_available(&self, coordinator: &mut Coordinator) -> anyhow::Result<()> { + fn verify_if_available(&self, coordinator: &mut Coordinator) -> anyhow::Result<()> { verify_task_if_available(coordinator, &self.participant, &self.signing_key) } } @@ -189,7 +189,7 @@ fn execute_round(proving_system: ProvingSystem, curve: CurveKind) -> anyhow::Res /// If there are pending verifications, grab one and verify it. /// Otherwise do nothing fn verify_task_if_available( - coordinator: &mut Coordinator, + coordinator: &mut Coordinator, verifier: &Participant, signing_key: &SigningKey, ) -> anyhow::Result<()> { @@ -200,7 +200,7 @@ fn verify_task_if_available( Ok(()) } -fn fetch_task_for_verifier(coordinator: &Coordinator) -> Option { +fn fetch_task_for_verifier(coordinator: &Coordinator) -> Option { coordinator.get_pending_verifications().keys().next().cloned() }