Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[build]
rustflags = ["-D", "warnings"]

# This just provides a default value for FLOW_VERSION during development, so that you can compile
# without this being set. If FLOW_VERSION is defined, then this has no effect.
[env]
Expand Down
41 changes: 41 additions & 0 deletions .github/workflows/platform-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,44 @@ jobs:
- run: mise run ci:gotest
- run: mise run ci:catalog-test
- run: mise run build:flow-schema
dekaf-test:
name: Dekaf Test
runs-on: ubuntu-2404-large
steps:
- uses: actions/checkout@v5
with:
submodules: true # Official JSON test data.
lfs: true # Test fixtures.

- name: Install build tools
uses: jdx/mise-action@9dc7d5dd454262207dea3ab5a06a3df6afc8ff26 # v3.4.1
- run: rustup upgrade

- name: Cache Rust workspace
uses: Swatinem/rust-cache@f13886b937689c021905a6b90929199931d60db1 # v2.8.1
with:
workspaces: ". -> ../../../cargo-target"

- name: Cache RocksDB
uses: actions/cache@v4
with:
key: rocksdb-${{ runner.os }}-${{ runner.arch }}-${{ env.ROCKSDB_VERSION }}
path: |
~/rocksdb-${{ env.ROCKSDB_VERSION }}/include/
~/rocksdb-${{ env.ROCKSDB_VERSION }}/lib/

- name: Cache/Restore Go workspace.
uses: actions/cache@v4
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}

- uses: mozilla-actions/sccache-action@v0.0.9
- run: echo 'SCCACHE_GHA_ENABLED=true' >> $GITHUB_ENV
- run: mise run build:rocksdb
- run: mise run ci:musl-dev
- run: mise run ci:gnu-dev
- run: mise run build:gazette
- run: mise run build:flowctl-go
- run: mise run local:stack
- run: mise run ci:dekaf-e2e
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions crates/activate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -857,15 +857,15 @@ fn map_shard_to_split(

// Pick a split point of the parent range, which will divide the future
// LHS & RHS children.
let (mut lhs_range, mut rhs_range) = (parent_range.clone(), parent_range.clone());
let (mut _lhs_range, mut rhs_range) = (parent_range.clone(), parent_range.clone());

if split_on_key {
let pivot = ((parent_range.key_begin as u64 + parent_range.key_end as u64 + 1) / 2) as u32;
(lhs_range.key_end, rhs_range.key_begin) = (pivot - 1, pivot);
(_lhs_range.key_end, rhs_range.key_begin) = (pivot - 1, pivot);
} else {
let pivot =
((parent_range.r_clock_begin as u64 + parent_range.r_clock_end as u64 + 1) / 2) as u32;
(lhs_range.r_clock_end, rhs_range.r_clock_begin) = (pivot - 1, pivot);
(_lhs_range.r_clock_end, rhs_range.r_clock_begin) = (pivot - 1, pivot);
}

// Deep-copy parent labels for the desired LHS / RHS updates.
Expand Down
2 changes: 2 additions & 0 deletions crates/dekaf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,7 @@ apache-avro = { workspace = true }
insta = { workspace = true }
rdkafka = { workspace = true }
schema_registry_converter = { workspace = true }
serde_yaml = { workspace = true }
tempfile = { workspace = true }
tracing-test = { workspace = true }
uuid = { workspace = true, features = ["v4"] }
171 changes: 115 additions & 56 deletions crates/dekaf/src/api_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{Context, anyhow, bail};
use anyhow::{Context, bail};
use bytes::{Bytes, BytesMut};
use futures::{SinkExt, TryStreamExt};
use kafka_protocol::{
Expand All @@ -19,50 +19,74 @@ use std::{io::BufWriter, pin::Pin, sync::Arc};
use tokio::sync::OnceCell;
use tokio_rustls::rustls;
use tokio_util::codec;
use tokio_util::either::Either;
use tracing::instrument;
use url::Url;

type BoxedKafkaConnection = Pin<
Box<
tokio_util::codec::Framed<
tokio_rustls::client::TlsStream<tokio::net::TcpStream>,
codec::LengthDelimitedCodec,
>,
>,
>;
/// A stream that may or may not be TLS-encrypted.
/// Uses `tokio_util::either::Either` which implements AsyncRead/AsyncWrite
/// when both variants do.
type MaybeTlsStream =
Either<tokio::net::TcpStream, tokio_rustls::client::TlsStream<tokio::net::TcpStream>>;

static ROOT_CERT_STORE: OnceCell<Arc<RootCertStore>> = OnceCell::const_new();
type BoxedKafkaConnection =
Pin<Box<tokio_util::codec::Framed<MaybeTlsStream, codec::LengthDelimitedCodec>>>;

#[tracing::instrument(skip_all)]
async fn async_connect(broker_url: &str) -> anyhow::Result<BoxedKafkaConnection> {
// Establish a TCP connection to the Kafka broker
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ConnectionScheme {
Plaintext,
Tls,
}

let parsed_url = Url::parse(broker_url)?;
/// Parse a broker URL to extract the connection scheme, host, and port.
///
/// Supported schemes:
/// - `tcp://` - plaintext connection (no TLS)
/// - `tls://` - TLS-encrypted connection (default if no scheme provided)
fn parse_broker_url(broker_url: &str) -> anyhow::Result<(ConnectionScheme, String, u16)> {
// Default to tls:// if no scheme is provided
let url_with_scheme = if broker_url.contains("://") {
broker_url.to_string()
} else {
format!("tls://{broker_url}")
};

let root_certs = ROOT_CERT_STORE
.get_or_try_init(|| async {
let mut certs = rustls::RootCertStore::empty();
certs.add_parsable_certificates(
rustls_native_certs::load_native_certs().expect("failed to load native certs"),
);
Ok::<Arc<RootCertStore>, anyhow::Error>(Arc::new(certs))
})
.await?;
let parsed = Url::parse(&url_with_scheme)
.with_context(|| format!("invalid broker URL: {broker_url}"))?;

let tls_config = rustls::ClientConfig::builder()
.with_root_certificates(root_certs.to_owned())
.with_no_client_auth();
let scheme = match parsed.scheme() {
"tcp" => ConnectionScheme::Plaintext,
"tls" => ConnectionScheme::Tls,
other => anyhow::bail!("unknown broker scheme: {other} (expected 'tcp' or 'tls')"),
};
let host = parsed
.host()
.context("missing host in broker URL")?
.to_string();
let port = parsed.port().unwrap_or(9092);
Ok((scheme, host, port))
}

let tls_connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config));
static ROOT_CERT_STORE: OnceCell<Arc<RootCertStore>> = OnceCell::const_new();

let hostname = parsed_url
.host()
.ok_or(anyhow!("Broker URL must contain a hostname"))?;
let port = parsed_url.port().unwrap_or(9092);
let dnsname = rustls::pki_types::ServerName::try_from(hostname.to_string())?;
fn kafka_codec() -> codec::LengthDelimitedCodec {
// https://kafka.apache.org/protocol.html#protocol_common
// All requests and responses originate from the following:
// > RequestOrResponse => Size (RequestMessage | ResponseMessage)
// > Size => int32
tokio_util::codec::LengthDelimitedCodec::builder()
.big_endian()
.length_field_length(4)
.max_frame_length(1 << 27) // 128 MiB
.new_codec()
}

#[tracing::instrument(skip_all)]
async fn async_connect(broker_url: &str) -> anyhow::Result<BoxedKafkaConnection> {
let (scheme, host, port) = parse_broker_url(broker_url)?;

tracing::debug!(port = port,host = ?hostname, "Attempting to connect");
let tcp_stream = tokio::net::TcpStream::connect(format!("{hostname}:{port}")).await?;
tracing::debug!(port, host = ?host, ?scheme, "Attempting to connect");
let tcp_stream = tokio::net::TcpStream::connect(format!("{host}:{port}")).await?;

// Let's keep this stream alive
let sock_ref = socket2::SockRef::from(&tcp_stream);
Expand All @@ -71,22 +95,33 @@ async fn async_connect(broker_url: &str) -> anyhow::Result<BoxedKafkaConnection>
.with_interval(Duration::from_secs(20));
sock_ref.set_tcp_keepalive(&ka)?;

let stream = tls_connector.connect(dnsname, tcp_stream).await?;
tracing::debug!(port = port,host = ?hostname, "Connection established");
let stream: MaybeTlsStream = match scheme {
ConnectionScheme::Plaintext => Either::Left(tcp_stream),
ConnectionScheme::Tls => {
let root_certs = ROOT_CERT_STORE
.get_or_try_init(|| async {
let mut certs = rustls::RootCertStore::empty();
certs.add_parsable_certificates(
rustls_native_certs::load_native_certs()
.expect("failed to load native certs"),
);
Ok::<Arc<RootCertStore>, anyhow::Error>(Arc::new(certs))
})
.await?;

// https://kafka.apache.org/protocol.html#protocol_common
// All requests and responses originate from the following:
// > RequestOrResponse => Size (RequestMessage | ResponseMessage)
// > Size => int32
let framed = tokio_util::codec::Framed::new(
stream,
tokio_util::codec::LengthDelimitedCodec::builder()
.big_endian()
.length_field_length(4)
.max_frame_length(1 << 27) // 128 MiB
.new_codec(),
);
let tls_config = rustls::ClientConfig::builder()
.with_root_certificates(root_certs.to_owned())
.with_no_client_auth();

let tls_connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config));
let dnsname = rustls::pki_types::ServerName::try_from(host.clone())?;

let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
Either::Right(tls_stream)
}
};

let framed = tokio_util::codec::Framed::new(stream, kafka_codec());
Ok(Box::pin(framed))
}

Expand Down Expand Up @@ -318,10 +353,14 @@ impl KafkaApiClient {
.await
.context("Failed to establish TCP connection")?;

tracing::debug!("Authenticating connection");
sasl_auth(&mut conn, url, auth.sasl_config().await?)
.await
.context("SASL authentication failed")?;
if let Some(sasl_config) = auth.sasl_config().await? {
tracing::debug!("Authenticating connection via SASL");
sasl_auth(&mut conn, url, sasl_config)
.await
.context("SASL authentication failed")?;
} else {
tracing::debug!("Skipping SASL authentication (no auth configured)");
}

let versions = get_versions(&mut conn)
.await
Expand Down Expand Up @@ -711,7 +750,11 @@ impl KafkaApiClient {

#[derive(Clone)]
pub enum KafkaClientAuth {
/// No authentication - for local testing with plaintext Kafka brokers.
None,
/// Static SASL configuration that doesn't refresh.
NonRefreshing(Arc<SASLConfig>),
/// AWS MSK IAM authentication with automatic token refresh.
MSK {
aws_region: String,
provider: aws_credential_types::provider::SharedCredentialsProvider,
Expand All @@ -720,9 +763,25 @@ pub enum KafkaClientAuth {
}

impl KafkaClientAuth {
async fn sasl_config(&mut self) -> anyhow::Result<Arc<SASLConfig>> {
pub async fn from_msk_region(region: &str) -> Self {
let provider = aws_config::from_env()
.region(aws_types::region::Region::new(region.to_owned()))
.load()
.await
.credentials_provider()
.expect("AWS credentials provider should be available");

KafkaClientAuth::MSK {
aws_region: region.to_owned(),
provider,
cached: None,
}
}

async fn sasl_config(&mut self) -> anyhow::Result<Option<Arc<SASLConfig>>> {
match self {
KafkaClientAuth::NonRefreshing(cfg) => Ok(cfg.clone()),
KafkaClientAuth::None => Ok(None),
KafkaClientAuth::NonRefreshing(cfg) => Ok(Some(cfg.clone())),
KafkaClientAuth::MSK {
aws_region,
provider,
Expand All @@ -732,7 +791,7 @@ impl KafkaClientAuth {
let now_seconds = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs();
// Use a 30-second buffer before expiration to refresh the token.
if *exp as u64 > now_seconds + 30 {
return Ok(cfg.clone());
return Ok(Some(cfg.clone()));
}
}

Expand All @@ -751,7 +810,7 @@ impl KafkaClientAuth {

cached.replace((cfg.clone(), exp));

Ok(cfg)
Ok(Some(cfg))
}
}
}
Expand Down
15 changes: 15 additions & 0 deletions crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,21 @@ pub mod registry;
mod api_client;
pub use api_client::{KafkaApiClient, KafkaClientAuth};

#[derive(Clone, Debug)]
pub enum UpstreamKafkaAuth {
Msk { region: String },
None,
}

impl UpstreamKafkaAuth {
pub async fn build_kafka_client_auth(&self) -> KafkaClientAuth {
match self {
UpstreamKafkaAuth::Msk { region } => KafkaClientAuth::from_msk_region(region).await,
UpstreamKafkaAuth::None => KafkaClientAuth::None,
}
}
}

/// Re-export the dekaf-connector crate so it can be used as `crate::connector`.
pub use dekaf_connector as connector;

Expand Down
Loading
Loading