Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ tokio-stream = { version = "0.1.18", default-features = false }
tokio-test = "0.4.5"
tokio-tungstenite = { version = "0.20.1", default-features = false }
toml = { version = "0.9.8", default-features = false, features = ["serde", "display", "parse"] }
tonic = { version = "0.11", default-features = false, features = ["transport", "codegen", "prost", "tls", "tls-roots", "gzip"] }
tonic = { version = "0.11", default-features = false, features = ["transport", "codegen", "prost", "tls", "tls-roots", "gzip", "zstd"] }
tonic-build = { version = "0.11", default-features = false, features = ["transport", "prost"] }
tonic-health = { version = "0.11", default-features = false }
tracing = { version = "0.1.44", default-features = false }
Expand Down
33 changes: 33 additions & 0 deletions changelog.d/23030_vector_sink_zstd_compression.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
The `vector` sink now supports `zstd` compression in addition to `gzip`. This provides better
compression ratios and performance for Vector-to-Vector communication.

The compression configuration has been enhanced to support multiple algorithms while maintaining
full backward compatibility:

## Legacy boolean syntax (still supported)

```yaml
sinks:
my_vector:
type: vector
address: "localhost:6000"
compression: true # Uses gzip (default)
# or
compression: false # No compression
```

## New string syntax

```yaml
sinks:
my_vector:
type: vector
address: "localhost:6000"
compression: "zstd" # Use zstd compression
# Supported values: "none", "gzip", "zstd"
```

The Vector source automatically accepts both gzip and zstd compressed data, enabling seamless
communication between Vector instances using different compression algorithms.

authors: jpds
1 change: 1 addition & 0 deletions docs/DEPRECATIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ For example:
- `v0.50.0` | `http-server-encoding` | The `encoding` field will be removed. Use `decoding` and `framing` instead.
- `v0.53.0` | `buffer-bytes-events-metrics` | The `buffer_byte_size` and `buffer_events` gauges are deprecated in favor of the `buffer_size_bytes`/`buffer_size_events` metrics described in `docs/specs/buffer.md`.
- `v0.58.0` | `azure-monitor-logs-sink` | The `azure_monitor_logs` sink is deprecated in favor of the new `azure_logs_ingestion` sink, which uses the Azure Monitor Logs Ingestion API. Users should migrate before Microsoft ends support for the old Data Collector API (scheduled for September 2026).
- `v0.57.0` | `bool-or-vector-compression` | The boolean syntax for the `compression` field in the `vector` sink (`compression: true`/`compression: false`) is deprecated. Use the string syntax instead (`compression: "gzip"`, `compression: "zstd"`, or `compression: "none"`). The `bool_or_vector_compression` deserializer should be removed once the boolean syntax is no longer supported.

## To be migrated

Expand Down
1 change: 1 addition & 0 deletions src/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,4 @@ impl<T> From<Vec<T>> for OneOrMany<T> {
Self::Many(value)
}
}

144 changes: 144 additions & 0 deletions src/sinks/vector/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use serde::{Deserialize, Deserializer, de};
use vector_lib::configurable::configurable_component;

/// Compression configuration for the Vector sink.
///
/// Only `gzip` and `zstd` are supported as compression algorithms for the
/// Vector sink's gRPC transport. Compression levels are not configurable
/// as the underlying tonic library does not support them.
#[configurable_component]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
#[configurable(metadata(
docs::enum_tag_description = "The compression algorithm to use for sending."
))]
pub enum VectorCompression {
/// No compression.
#[default]
None,

/// [Gzip][gzip] compression.
///
/// [gzip]: https://www.gzip.org/
Gzip,

/// [Zstandard][zstd] compression.
///
/// [zstd]: https://facebook.github.io/zstd/
Zstd,
}

impl VectorCompression {
/// Returns the corresponding `tonic::codec::CompressionEncoding`, if any.
pub const fn as_tonic_encoding(self) -> Option<tonic::codec::CompressionEncoding> {
match self {
VectorCompression::None => Option::None,
VectorCompression::Gzip => Some(tonic::codec::CompressionEncoding::Gzip),
VectorCompression::Zstd => Some(tonic::codec::CompressionEncoding::Zstd),
}
}
}

/// Enables deserializing compression from a bool (legacy) or string (new).
///
/// For backward compatibility:
/// - `true` maps to `VectorCompression::Gzip`
/// - `false` maps to `VectorCompression::None`
///
/// New syntax:
/// - `"none"`, `"gzip"`, `"zstd"` as strings
pub fn bool_or_vector_compression<'de, D>(deserializer: D) -> Result<VectorCompression, D::Error>
where
D: Deserializer<'de>,
{
struct BoolOrVectorCompression;

impl<'de> de::Visitor<'de> for BoolOrVectorCompression {
type Value = VectorCompression;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter
.write_str("boolean (deprecated) or string (\"none\", \"gzip\", or \"zstd\")")
}

fn visit_bool<E>(self, value: bool) -> Result<VectorCompression, E>
where
E: de::Error,
{
if value {
Ok(VectorCompression::Gzip)
} else {
Ok(VectorCompression::None)
}
}

fn visit_str<E>(self, value: &str) -> Result<VectorCompression, E>
where
E: de::Error,
{
VectorCompression::deserialize(de::value::StrDeserializer::new(value))
}
}

deserializer.deserialize_any(BoolOrVectorCompression)
}

#[cfg(test)]
mod tests {
use super::*;

#[derive(Deserialize)]
struct TestConfig {
#[serde(deserialize_with = "bool_or_vector_compression")]
compression: VectorCompression,
}

#[test]
fn test_legacy_true() {
let json = r#"{"compression": true}"#;
let result: TestConfig = serde_json::from_str(json).unwrap();
assert_eq!(result.compression, VectorCompression::Gzip);
}

#[test]
fn test_legacy_false() {
let json = r#"{"compression": false}"#;
let result: TestConfig = serde_json::from_str(json).unwrap();
assert_eq!(result.compression, VectorCompression::None);
}

#[test]
fn test_string_gzip() {
let json = r#"{"compression": "gzip"}"#;
let result: TestConfig = serde_json::from_str(json).unwrap();
assert_eq!(result.compression, VectorCompression::Gzip);
}

#[test]
fn test_string_zstd() {
let json = r#"{"compression": "zstd"}"#;
let result: TestConfig = serde_json::from_str(json).unwrap();
assert_eq!(result.compression, VectorCompression::Zstd);
}

#[test]
fn test_string_none() {
let json = r#"{"compression": "none"}"#;
let result: TestConfig = serde_json::from_str(json).unwrap();
assert_eq!(result.compression, VectorCompression::None);
}

#[test]
fn test_unsupported_algorithm_rejected() {
let json = r#"{"compression": "snappy"}"#;
let result = serde_json::from_str::<TestConfig>(json);
assert!(result.is_err());
}

#[test]
fn test_object_syntax_rejected() {
let json = r#"{"compression": {"algorithm": "zstd", "level": 3}}"#;
let result = serde_json::from_str::<TestConfig>(json);
assert!(result.is_err());
}
}
23 changes: 15 additions & 8 deletions src/sinks/vector/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use vector_lib::configurable::configurable_component;

use super::{
VectorSinkError,
compression::VectorCompression,
service::{VectorRequest, VectorResponse, VectorService},
sink::VectorSink,
};
Expand Down Expand Up @@ -49,14 +50,19 @@ pub struct VectorConfig {
#[configurable(metadata(docs::examples = "https://somehost:6000"))]
address: String,

/// Whether or not to compress requests.
/// Compression algorithm for requests.
///
/// If set to `true`, requests are compressed with [`gzip`][gzip_docs].
/// Supports `"none"`, `"gzip"`, or `"zstd"`.
///
/// [gzip_docs]: https://www.gzip.org/
#[configurable(metadata(docs::advanced))]
#[serde(default)]
compression: bool,
/// For backward compatibility, boolean values are still accepted:
/// - `true` defaults to gzip compression
/// - `false` disables compression (deprecated syntax)
#[configurable(derived)]
#[serde(
default,
deserialize_with = "super::compression::bool_or_vector_compression"
)]
compression: VectorCompression,

#[configurable(derived)]
#[serde(default)]
Expand Down Expand Up @@ -97,7 +103,7 @@ fn default_config(address: &str) -> VectorConfig {
VectorConfig {
version: None,
address: address.to_owned(),
compression: false,
compression: VectorCompression::None,
batch: BatchConfig::default(),
request: TowerRequestConfig::default(),
tls: None,
Expand All @@ -120,7 +126,8 @@ impl SinkConfig for VectorConfig {
.clone()
.map(|uri| uri.uri)
.unwrap_or_else(|| uri.clone());
let healthcheck_client = VectorService::new(client.clone(), healthcheck_uri, false);
let healthcheck_client =
VectorService::new(client.clone(), healthcheck_uri, VectorCompression::None);
let healthcheck = healthcheck(healthcheck_client, cx.healthcheck);
let service = VectorService::new(client, uri, self.compression);
let request_settings = self.request.into_settings();
Expand Down
1 change: 1 addition & 0 deletions src/sinks/vector/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use snafu::Snafu;
use vector_lib::configurable::configurable_component;

mod compression;
mod config;
mod service;
mod sink;
Expand Down
9 changes: 5 additions & 4 deletions src/sinks/vector/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use vector_lib::{
stream::DriverResponse,
};

use super::VectorSinkError;
use super::{VectorSinkError, compression::VectorCompression};
use crate::{
Error,
event::{EventFinalizers, EventStatus, Finalizable},
Expand Down Expand Up @@ -70,17 +70,18 @@ impl VectorService {
pub fn new(
hyper_client: hyper::Client<ProxyConnector<HttpsConnector<HttpConnector>>, BoxBody>,
uri: Uri,
compression: bool,
compression: VectorCompression,
) -> Self {
let (protocol, endpoint) = uri::protocol_endpoint(uri.clone());
let mut proto_client = proto_vector::Client::new(HyperSvc {
uri,
client: hyper_client,
});

if compression {
proto_client = proto_client.send_compressed(tonic::codec::CompressionEncoding::Gzip);
if let Some(encoding) = compression.as_tonic_encoding() {
proto_client = proto_client.send_compressed(encoding);
}

Self {
client: proto_client,
protocol,
Expand Down
Loading
Loading