Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 0 additions & 33 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion objectstore-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ publish = false
anyhow = { workspace = true }
async-trait = { workspace = true }
bigtable_rs = "0.2.18"
bincode = { version = "2.0.1", features = ["serde"] }
bytes = { workspace = true }
data-encoding = "2.9.0"
futures-util = { workspace = true }
Expand Down
10 changes: 4 additions & 6 deletions objectstore-service/src/backend/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ use crate::backend::common::{Backend, BackendStream};

/// Connection timeout used for the initial connection to BigQuery.
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
/// Config for bincode encoding and decoding.
const BC_CONFIG: bincode::config::Configuration = bincode::config::standard();
/// Time to debounce bumping an object with configured TTI.
const TTI_DEBOUNCE: Duration = Duration::from_secs(24 * 3600); // 1 day

Expand All @@ -23,7 +21,7 @@ const REQUEST_RETRY_COUNT: usize = 2;

/// Column that stores the raw payload (compressed).
const COLUMN_PAYLOAD: &[u8] = b"p";
/// Column that stores metadata in bincode.
/// Column that stores metadata in JSON.
const COLUMN_METADATA: &[u8] = b"m";
/// Column family that uses timestamp-based garbage collection.
///
Expand Down Expand Up @@ -170,8 +168,7 @@ impl BigTableBackend {
family_name: family.to_owned(),
column_qualifier: COLUMN_METADATA.to_owned(),
timestamp_micros,
// TODO: Do we really want bincode here?
value: bincode::serde::encode_to_vec(metadata, BC_CONFIG)?,
value: serde_json::to_vec(metadata).with_context(|| "failed to encode metadata")?,
}),
];
self.mutate(path, mutations, action).await
Expand Down Expand Up @@ -254,7 +251,8 @@ impl Backend for BigTableBackend {
// TODO: Log if the timestamp is invalid.
}
self::COLUMN_METADATA => {
metadata = bincode::serde::decode_from_slice(&cell.value, BC_CONFIG)?.0;
metadata = serde_json::from_slice(&cell.value)
.with_context(|| "failed to decode metadata")?;
}
_ => {
// TODO: Log unknown column
Expand Down
41 changes: 6 additions & 35 deletions objectstore-service/src/backend/local_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::pin::pin;

use bincode::error::DecodeError;
use futures_util::StreamExt;
use objectstore_types::Metadata;
use tokio::fs::OpenOptions;
Expand All @@ -12,8 +11,6 @@ use tokio_util::io::{ReaderStream, StreamReader};
use crate::ObjectPath;
use crate::backend::common::{Backend, BackendStream};

const BC_CONFIG: bincode::config::Configuration = bincode::config::standard();

#[derive(Debug)]
pub struct LocalFsBackend {
path: PathBuf,
Expand Down Expand Up @@ -51,8 +48,9 @@ impl Backend for LocalFsBackend {
let mut reader = pin!(StreamReader::new(stream));
let mut writer = BufWriter::new(file);

let metadata = bincode::serde::encode_to_vec(metadata, BC_CONFIG)?;
writer.write_all(&metadata).await?;
let metadata_json = serde_json::to_string(metadata)?;
writer.write_all(metadata_json.as_bytes()).await?;
writer.write_all(b"\n").await?;

tokio::io::copy(&mut reader, &mut writer).await?;
writer.flush().await?;
Expand Down Expand Up @@ -81,36 +79,9 @@ impl Backend for LocalFsBackend {
};

let mut reader = BufReader::new(file);
let mut metadata_buf = vec![];
// TODO populate size in our metadata
let metadata = loop {
let reader_buf = reader.fill_buf().await?;
let buf = if metadata_buf.is_empty() {
reader_buf
} else {
metadata_buf.extend_from_slice(reader_buf);
&metadata_buf
};

match bincode::serde::decode_from_slice(buf, BC_CONFIG) {
Ok((metadata, read)) => {
let read = if metadata_buf.is_empty() {
read
} else {
let prev_consumed = metadata_buf.len() - reader_buf.len();
read - prev_consumed
};
reader.consume(read);
break metadata;
}
Err(DecodeError::UnexpectedEnd { .. }) => {
metadata_buf.extend_from_slice(reader_buf);
let len = reader_buf.len();
reader.consume(len);
}
Err(err) => Err(err)?,
}
};
let mut metadata_line = String::new();
reader.read_line(&mut metadata_line).await?;
let metadata: Metadata = serde_json::from_str(metadata_line.trim_end())?;

let stream = ReaderStream::new(reader);
Ok(Some((metadata, stream.boxed())))
Expand Down
8 changes: 4 additions & 4 deletions objectstore-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,22 +148,22 @@ impl FromStr for Compression {
#[serde(default)]
pub struct Metadata {
/// The expiration policy of the object.
// #[serde(skip_serializing_if = "ExpirationPolicy::is_manual")]
#[serde(skip_serializing_if = "ExpirationPolicy::is_manual")]
pub expiration_policy: ExpirationPolicy,

/// The content type of the object, if known.
pub content_type: Cow<'static, str>,

/// The compression algorithm used for this object, if any.
// #[serde(skip_serializing_if = "Option::is_none")]
#[serde(skip_serializing_if = "Option::is_none")]
pub compression: Option<Compression>,

/// Size of the data in bytes, if known.
// #[serde(skip_serializing_if = "Option::is_none")]
#[serde(skip_serializing_if = "Option::is_none")]
pub size: Option<usize>,

/// Some arbitrary user-provided metadata.
// #[serde(skip_serializing_if = "BTreeMap::is_empty")]
#[serde(skip_serializing_if = "BTreeMap::is_empty")]
pub custom: BTreeMap<String, String>,
}

Expand Down