Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions crates/spfs-cli/cmd-render/src/cmd_render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ impl CmdRender {
env_spec = self
.sync
.get_syncer(&origin, &handle)
.sync_env(env_spec)
.sync_ref_spec(env_spec.try_into()?)
.await?
.env;
.ref_spec
.into();
}

// Use PayloadFallback to repair any missing payloads found in the
Expand Down
7 changes: 4 additions & 3 deletions crates/spfs-cli/main/src/cmd_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use clap::Args;
use miette::Result;
use spfs::sync::reporter::Summary;
use spfs::tracking::RefSpec;
use spfs_cli_common as cli;

/// Pull one or more objects to the local repository
Expand All @@ -27,7 +28,7 @@ pub struct CmdPull {
/// These can be individual tags or digests, or they may also
/// be a collection of items joined by a '+'
#[clap(value_name = "REF", required = true)]
refs: Vec<spfs::tracking::EnvSpec>,
refs: Vec<spfs::tracking::RefSpec>,
}

impl CmdPull {
Expand All @@ -37,11 +38,11 @@ impl CmdPull {
spfs::config::open_repository_from_string(config, self.repos.remote.as_ref())
)?;

let env_spec = self.refs.iter().cloned().collect();
let ref_spec = RefSpec::combine(&self.refs)?;
let summary = self
.sync
.get_syncer(&remote, &repo)
.sync_env(env_spec)
.sync_ref_spec(ref_spec)
.await?
.summary();

Expand Down
7 changes: 4 additions & 3 deletions crates/spfs-cli/main/src/cmd_push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use clap::Args;
use miette::Result;
use spfs::sync::reporter::Summary;
use spfs::tracking::RefSpec;
use spfs_cli_common as cli;

/// Push one or more objects to a remote repository
Expand All @@ -24,7 +25,7 @@ pub struct CmdPush {
/// These can be individual tags or digests, or they may also
/// be a collection of items joined by a '+'
#[clap(value_name = "REF", required = true)]
refs: Vec<spfs::tracking::EnvSpec>,
refs: Vec<spfs::tracking::RefSpec>,
}

impl CmdPush {
Expand All @@ -40,13 +41,13 @@ impl CmdPush {
spfs::config::open_repository_from_string(config, self.repos.remote.as_ref()),
)?;

let env_spec = self.refs.iter().cloned().collect();
let ref_spec = RefSpec::combine(&self.refs)?;
// the latest tag is always synced when pushing
self.sync.sync = true;
let summary = self
.sync
.get_syncer(&repo, &remote)
.sync_env(env_spec)
.sync_ref_spec(ref_spec)
.await?
.summary();
tracing::info!("{}", spfs::io::format_sync_summary(&summary));
Expand Down
5 changes: 3 additions & 2 deletions crates/spfs-cli/main/src/cmd_reset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ impl CmdReset {
env_spec = self
.sync
.get_syncer(&origin, &repo)
.sync_env(env_spec)
.sync_ref_spec(env_spec.try_into()?)
.await?
.env;
.ref_spec
.into();
}
for item in env_spec.iter() {
let digest = item.resolve_digest(&repo).await?;
Expand Down
6 changes: 3 additions & 3 deletions crates/spfs-cli/main/src/cmd_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl CmdRun {
let _synced = self
.sync
.get_syncer(&origin, &repo)
.sync_env(references_to_sync)
.sync_ref_spec(references_to_sync.try_into()?)
.await?;
}
tracing::debug!("synced and about to launch process with durable runtime");
Expand Down Expand Up @@ -300,9 +300,9 @@ impl CmdRun {
let synced = self
.sync
.get_syncer(&origin, &repo)
.sync_env(references_to_sync)
.sync_ref_spec(references_to_sync.try_into()?)
.await?;
for item in synced.env.iter() {
for item in synced.ref_spec.iter() {
let digest = item.resolve_digest(&repo).await?;
runtime.push_digest(digest);
}
Expand Down
51 changes: 28 additions & 23 deletions crates/spfs/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use reporter::{
SyncAnnotationResult,
SyncBlobResult,
SyncEntryResult,
SyncEnvItemResult,
SyncEnvResult,
SyncLayerResult,
SyncManifestResult,
SyncObjectResult,
SyncPayloadResult,
SyncPlatformResult,
SyncRefItemResult,
SyncRefResult,
SyncReporter,
SyncReporters,
SyncTagResult,
Expand Down Expand Up @@ -177,52 +177,57 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
/// Sync the object(s) referenced by the given string.
///
/// Any valid [`crate::tracking::EnvSpec`] is accepted as a reference.
pub async fn sync_ref<R: AsRef<str>>(&self, reference: R) -> Result<SyncEnvResult> {
pub async fn sync_ref<R: AsRef<str>>(&self, reference: R) -> Result<SyncRefResult> {
let env_spec = reference.as_ref().parse()?;
self.sync_env(env_spec).await
self.sync_ref_spec(env_spec).await
}

/// Sync all of the objects identified by the given env.
pub async fn sync_env(&self, env: tracking::EnvSpec) -> Result<SyncEnvResult> {
self.reporter.visit_env(&env);
/// Sync all of the objects identified by the given ref spec.
pub async fn sync_ref_spec(&self, ref_spec: tracking::RefSpec) -> Result<SyncRefResult> {
self.reporter.visit_ref_spec(&ref_spec);
let mut futures = FuturesUnordered::new();
for item in env.iter().cloned() {
futures.push(self.sync_env_item(item));
for item in ref_spec.iter().cloned() {
futures.push(self.sync_ref_item(item));
}
let mut results = Vec::with_capacity(env.len());
let mut results = Vec::with_capacity(ref_spec.len());
while let Some(result) = futures.try_next().await? {
results.push(result);
}
let res = SyncEnvResult { env, results };
self.reporter.synced_env(&res);
let res = SyncRefResult { ref_spec, results };
self.reporter.synced_ref_spec(&res);
Ok(res)
}

/// Sync one environment item and any associated data.
pub async fn sync_env_item(&self, item: tracking::EnvSpecItem) -> Result<SyncEnvItemResult> {
pub async fn sync_ref_item(&self, item: tracking::RefSpecItem) -> Result<SyncRefItemResult> {
tracing::debug!(?item, "Syncing item");
self.reporter.visit_env_item(&item);
self.reporter.visit_ref_item(&item);
let res = match item {
tracking::EnvSpecItem::Digest(digest) => match self.sync_object_digest(digest).await {
Ok(r) => SyncEnvItemResult::Object(r),
tracking::RefSpecItem::Digest(digest) => match self.sync_object_digest(digest).await {
Ok(r) => SyncRefItemResult::Object(r),
Err(Error::UnknownObject(digest)) => self
.sync_payload(digest)
.await
.map(SyncEnvItemResult::Payload)?,
.map(SyncRefItemResult::Payload)?,
Err(e) => return Err(e),
},
tracking::EnvSpecItem::PartialDigest(digest) => {
tracking::RefSpecItem::PartialDigest(digest) => {
self.sync_partial_digest(digest).await.map(Into::into)?
}
tracking::EnvSpecItem::TagSpec(tag_spec) => {
self.sync_tag(tag_spec).await.map(SyncEnvItemResult::Tag)?
tracking::RefSpecItem::TagSpec(tag_spec) => {
self.sync_tag(tag_spec).await.map(SyncRefItemResult::Tag)?
}
// These are not objects in spfs, so they are not syncable
tracking::EnvSpecItem::SpecFile(_) => {
return Ok(SyncEnvItemResult::Object(SyncObjectResult::Ignorable));
// XXX but it can be a spec file the contains syncable things? Would
// those things become garbage instantly in the destination repo?
// XXX shouldn't this be an error to inform the user that it was
// not synced? Or should a RefSpecItem even be allowed to contain
// SpecFiles?
tracking::RefSpecItem::SpecFile(_) => {
return Ok(SyncRefItemResult::Object(SyncObjectResult::Ignorable));
}
};
self.reporter.synced_env_item(&res);
self.reporter.synced_ref_item(&res);
Ok(res)
}

Expand Down
78 changes: 39 additions & 39 deletions crates/spfs/src/sync/reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ impl SyncReporters {
/// followed up by a call to the corresponding synced_*.
#[enum_dispatch::enum_dispatch]
pub trait SyncReporter: Send + Sync {
/// Called when an environment has been identified to sync
fn visit_env(&self, _env: &tracking::EnvSpec) {}
/// Called when an ref spec has been identified to sync
fn visit_ref_spec(&self, _ref_spec: &tracking::RefSpec) {}

/// Called when a environment has finished syncing
fn synced_env(&self, _result: &SyncEnvResult) {}
/// Called when a ref spec has finished syncing
fn synced_ref_spec(&self, _result: &SyncRefResult) {}

/// Called when an environment item has been identified to sync
fn visit_env_item(&self, _item: &tracking::EnvSpecItem) {}
/// Called when an ref item has been identified to sync
fn visit_ref_item(&self, _item: &tracking::RefSpecItem) {}

/// Called when a environment item has finished syncing
fn synced_env_item(&self, _result: &SyncEnvItemResult) {}
/// Called when a ref item has finished syncing
fn synced_ref_item(&self, _result: &SyncRefItemResult) {}

/// Called when a tag has been identified to sync
fn visit_tag(&self, _tag: &tracking::TagSpec) {}
Expand Down Expand Up @@ -114,17 +114,17 @@ impl<T> SyncReporter for Arc<T>
where
T: SyncReporter,
{
fn visit_env(&self, env: &tracking::EnvSpec) {
(**self).visit_env(env)
fn visit_ref_spec(&self, ref_spec: &tracking::RefSpec) {
(**self).visit_ref_spec(ref_spec)
}
fn synced_env(&self, result: &SyncEnvResult) {
(**self).synced_env(result)
fn synced_ref_spec(&self, result: &SyncRefResult) {
(**self).synced_ref_spec(result)
}
fn visit_env_item(&self, item: &tracking::EnvSpecItem) {
(**self).visit_env_item(item)
fn visit_ref_item(&self, item: &tracking::RefSpecItem) {
(**self).visit_ref_item(item)
}
fn synced_env_item(&self, result: &SyncEnvItemResult) {
(**self).synced_env_item(result)
fn synced_ref_item(&self, result: &SyncRefItemResult) {
(**self).synced_ref_item(result)
}
fn visit_tag(&self, tag: &tracking::TagSpec) {
(**self).visit_tag(tag)
Expand Down Expand Up @@ -177,17 +177,17 @@ where
}

impl SyncReporter for Box<dyn SyncReporter> {
fn visit_env(&self, env: &tracking::EnvSpec) {
(**self).visit_env(env)
fn visit_ref_spec(&self, ref_spec: &tracking::RefSpec) {
(**self).visit_ref_spec(ref_spec)
}
fn synced_env(&self, result: &SyncEnvResult) {
(**self).synced_env(result)
fn synced_ref_spec(&self, result: &SyncRefResult) {
(**self).synced_ref_spec(result)
}
fn visit_env_item(&self, item: &tracking::EnvSpecItem) {
(**self).visit_env_item(item)
fn visit_ref_item(&self, item: &tracking::RefSpecItem) {
(**self).visit_ref_item(item)
}
fn synced_env_item(&self, result: &SyncEnvItemResult) {
(**self).synced_env_item(result)
fn synced_ref_item(&self, result: &SyncRefItemResult) {
(**self).synced_ref_item(result)
}
fn visit_tag(&self, tag: &tracking::TagSpec) {
(**self).visit_tag(tag)
Expand Down Expand Up @@ -243,17 +243,17 @@ impl<T> SyncReporter for Box<Arc<T>>
where
T: SyncReporter,
{
fn visit_env(&self, env: &tracking::EnvSpec) {
(***self).visit_env(env)
fn visit_ref_spec(&self, ref_spec: &tracking::RefSpec) {
(***self).visit_ref_spec(ref_spec)
}
fn synced_env(&self, result: &SyncEnvResult) {
(***self).synced_env(result)
fn synced_ref_spec(&self, result: &SyncRefResult) {
(***self).synced_ref_spec(result)
}
fn visit_env_item(&self, item: &tracking::EnvSpecItem) {
(***self).visit_env_item(item)
fn visit_ref_item(&self, item: &tracking::RefSpecItem) {
(***self).visit_ref_item(item)
}
fn synced_env_item(&self, result: &SyncEnvItemResult) {
(***self).synced_env_item(result)
fn synced_ref_item(&self, result: &SyncRefItemResult) {
(***self).synced_ref_item(result)
}
fn visit_tag(&self, tag: &tracking::TagSpec) {
(***self).visit_tag(tag)
Expand Down Expand Up @@ -342,7 +342,7 @@ impl SyncReporter for ConsoleSyncReporter {
bars.bytes.inc(result.summary().synced_payload_bytes);
}

fn synced_env(&self, _result: &SyncEnvResult) {
fn synced_ref_spec(&self, _result: &SyncRefResult) {
// Don't cause the bars to be initialized here if they haven't already
// been, calling abandon will briefly display some zero-progress bars.
if let Some(bars) = self.bars.get() {
Expand Down Expand Up @@ -453,26 +453,26 @@ where
}

#[derive(Debug)]
pub struct SyncEnvResult {
pub env: tracking::EnvSpec,
pub results: Vec<SyncEnvItemResult>,
pub struct SyncRefResult {
pub ref_spec: tracking::RefSpec,
pub results: Vec<SyncRefItemResult>,
}

impl Summary for SyncEnvResult {
impl Summary for SyncRefResult {
fn summary(&self) -> SyncSummary {
self.results.iter().map(|r| r.summary()).sum()
}
}

#[derive(Debug)]
#[enum_dispatch::enum_dispatch(Summary)]
pub enum SyncEnvItemResult {
pub enum SyncRefItemResult {
Tag(SyncTagResult),
Object(SyncObjectResult),
Payload(SyncPayloadResult),
}

impl From<SyncItemResult> for SyncEnvItemResult {
impl From<SyncItemResult> for SyncRefItemResult {
fn from(value: SyncItemResult) -> Self {
match value {
SyncItemResult::Object(obj) => Self::Object(obj),
Expand Down
4 changes: 2 additions & 2 deletions crates/spfs/src/sync_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,11 @@ async fn test_sync_missing_from_source(
.await
.expect("Should not fail when object is already in destination");
syncer
.sync_env(tag.into())
.sync_ref_spec(tag.into())
.await
.expect("Should not fail when object is already in destination");
syncer
.sync_env(platform_digest.into())
.sync_ref_spec(platform_digest.into())
.await
.expect("Should not fail when object is already in destination");
}
Expand Down
Loading
Loading