Skip to content
4 changes: 4 additions & 0 deletions clients/python/src/objectstore_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def put(
contents: bytes | IO[bytes],
id: str | None = None,
compression: Compression | Literal["none"] | None = None,
content_type: str | None = None,
metadata: dict[str, str] | None = None,
expiration_policy: ExpirationPolicy | None = None,
) -> str:
Expand All @@ -143,6 +144,9 @@ def put(
body = cctx.stream_reader(original_body)
headers["Content-Encoding"] = "zstd"

if content_type:
headers["Content-Type"] = content_type

if expiration_policy:
headers[HEADER_EXPIRATION] = format_expiration(expiration_policy)
elif self._default_expiration_policy:
Expand Down
15 changes: 13 additions & 2 deletions clients/python/src/objectstore_client/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,34 @@ class TimeToLive:

@dataclass
class Metadata:
content_type: str | None
compression: Compression | None
expiration_policy: ExpirationPolicy | None
custom: dict[str, str]

@classmethod
def from_headers(cls, headers: Mapping[str, str]) -> Metadata:
content_type = "application/octet-stream"
compression = None
expiration_policy = None
custom_metadata = {}

for k, v in headers.items():
if k == "content-encoding":
if k == "content-type":
content_type = v
elif k == "content-encoding":
compression = cast(Compression | None, v)
elif k == HEADER_EXPIRATION:
expiration_policy = parse_expiration(v)
elif k.startswith(HEADER_META_PREFIX):
custom_metadata[k[len(HEADER_META_PREFIX) :]] = v
return Metadata(compression, expiration_policy, custom_metadata)

return Metadata(
content_type=content_type,
compression=compression,
expiration_policy=expiration_policy,
custom=custom_metadata,
)


def format_expiration(expiration_policy: ExpirationPolicy) -> str:
Expand Down
29 changes: 8 additions & 21 deletions objectstore-service/src/backend/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,10 +372,9 @@ mod tests {

let path = make_key();
let metadata = Metadata {
expiration_policy: ExpirationPolicy::Manual,
compression: None,
content_type: "text/plain".into(),
custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
size: None,
..Default::default()
};

backend
Expand All @@ -387,6 +386,7 @@ mod tests {
let payload = read_to_vec(stream).await?;
let str_payload = str::from_utf8(&payload).unwrap();
assert_eq!(str_payload, "hello, world");
assert_eq!(meta.content_type, metadata.content_type);
assert_eq!(meta.custom, metadata.custom);

Ok(())
Expand Down Expand Up @@ -419,21 +419,17 @@ mod tests {

let path = make_key();
let metadata = Metadata {
expiration_policy: ExpirationPolicy::Manual,
compression: None,
custom: BTreeMap::from_iter([("invalid".into(), "invalid".into())]),
size: None,
..Default::default()
};

backend
.put_object(&path, &metadata, make_stream(b"hello"))
.await?;

let metadata = Metadata {
expiration_policy: ExpirationPolicy::Manual,
compression: None,
custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
size: None,
..Default::default()
};

backend
Expand All @@ -455,12 +451,7 @@ mod tests {
let backend = create_test_backend().await?;

let path = make_key();
let metadata = Metadata {
expiration_policy: ExpirationPolicy::Manual,
compression: None,
custom: Default::default(),
size: None,
};
let metadata = Metadata::default();

backend
.put_object(&path, &metadata, make_stream(b"hello, world"))
Expand All @@ -484,9 +475,7 @@ mod tests {
let path = make_key();
let metadata = Metadata {
expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
compression: None,
custom: Default::default(),
size: None,
..Default::default()
};

backend
Expand All @@ -509,9 +498,7 @@ mod tests {
let path = make_key();
let metadata = Metadata {
expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
compression: None,
custom: Default::default(),
size: None,
..Default::default()
};

backend
Expand Down
49 changes: 24 additions & 25 deletions objectstore-service/src/backend/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,37 +30,45 @@ const CUSTOM_META_PREFIX: &str = "x-snme-";
/// This is the representation of the object resource in GCS JSON API without its payload. Where no
/// dedicated fields are available, we encode both built-in and custom metadata in the `metadata`
/// field.
#[derive(Debug, Default, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct GcsObject {
/// Content-Type of the object data. If an object is stored without a Content-Type, it is served
/// as application/octet-stream.
pub content_type: Cow<'static, str>,

/// Content encoding, used to store [`Metadata::compression`].
#[serde(default, skip_serializing_if = "Option::is_none")]
pub content_encoding: Option<String>,

/// Custom time stamp used for time-based expiration.
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "humantime_serde"
)]
pub custom_time: Option<SystemTime>,
/// User-provided metadata, including our built-in metadata.
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub metadata: BTreeMap<GcsMetaKey, String>,

/// The `Content-Length` of the data in bytes. GCS returns this as a string.
///
/// GCS sets this in metadata responses. We can use it to know the size of an object
/// without having to stream it.
#[serde(default)]
pub size: Option<String>,

/// User-provided metadata, including our built-in metadata.
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub metadata: BTreeMap<GcsMetaKey, String>,
}

impl GcsObject {
/// Converts our Metadata type to GCS JSON object metadata.
pub fn from_metadata(metadata: &Metadata) -> Self {
let mut gcs_object = GcsObject {
content_type: metadata.content_type.clone(),
size: metadata.size.map(|size| size.to_string()),
..Default::default()
content_encoding: None,
custom_time: None,
metadata: BTreeMap::new(),
};

// For time-based expiration, set the `customTime` field. The bucket must have a
Expand Down Expand Up @@ -102,6 +110,7 @@ impl GcsObject {
.transpose()?
.unwrap_or_default();

let content_type = self.content_type;
let compression = self.content_encoding.map(|s| s.parse()).transpose()?;
let size = self.size.map(|size| size.parse()).transpose()?;

Expand All @@ -116,6 +125,7 @@ impl GcsObject {
}

Ok(Metadata {
content_type,
expiration_policy,
compression,
custom,
Expand Down Expand Up @@ -299,7 +309,7 @@ impl Backend for GcsBackend {
.part(
"media",
multipart::Part::stream(Body::wrap_stream(stream))
.mime_str("application/octet-stream")?,
.mime_str(&metadata.content_type)?,
);

// GCS requires a multipart/related request. Its body looks identical to
Expand Down Expand Up @@ -448,6 +458,7 @@ mod tests {

let path = make_key();
let metadata = Metadata {
content_type: "text/plain".into(),
expiration_policy: ExpirationPolicy::Manual,
compression: None,
custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
Expand All @@ -463,6 +474,7 @@ mod tests {
let payload = read_to_vec(stream).await?;
let str_payload = str::from_utf8(&payload).unwrap();
assert_eq!(str_payload, "hello, world");
assert_eq!(meta.content_type, metadata.content_type);
assert_eq!(meta.custom, metadata.custom);

Ok(())
Expand Down Expand Up @@ -495,21 +507,17 @@ mod tests {

let path = make_key();
let metadata = Metadata {
expiration_policy: ExpirationPolicy::Manual,
compression: None,
custom: BTreeMap::from_iter([("invalid".into(), "invalid".into())]),
size: None,
..Default::default()
};

backend
.put_object(&path, &metadata, make_stream(b"hello"))
.await?;

let metadata = Metadata {
expiration_policy: ExpirationPolicy::Manual,
compression: None,
custom: BTreeMap::from_iter([("hello".into(), "world".into())]),
size: None,
..Default::default()
};

backend
Expand All @@ -531,12 +539,7 @@ mod tests {
let backend = create_test_backend().await?;

let path = make_key();
let metadata = Metadata {
expiration_policy: ExpirationPolicy::Manual,
compression: None,
custom: Default::default(),
size: None,
};
let metadata = Metadata::default();

backend
.put_object(&path, &metadata, make_stream(b"hello, world"))
Expand All @@ -560,9 +563,7 @@ mod tests {
let path = make_key();
let metadata = Metadata {
expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
compression: None,
custom: Default::default(),
size: None,
..Default::default()
};

backend
Expand All @@ -585,9 +586,7 @@ mod tests {
let path = make_key();
let metadata = Metadata {
expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
compression: None,
custom: Default::default(),
size: None,
..Default::default()
};

backend
Expand Down
1 change: 1 addition & 0 deletions objectstore-service/src/backend/local_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ mod tests {
key: "testing".into(),
};
let metadata = Metadata {
content_type: "text/plain".into(),
expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)),
compression: Some(Compression::Zstd),
custom: [("foo".into(), "bar".into())].into(),
Expand Down
Loading