diff --git a/Cargo.lock b/Cargo.lock index f2806dd02..41ec7b29d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3874,6 +3874,15 @@ dependencies = [ "thiserror 2.0.18", ] +[[package]] +name = "harness_derive" +version = "0.1.0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "hash32" version = "0.2.1" @@ -4612,6 +4621,7 @@ dependencies = [ "rmcp", "serde", "serde_json", + "socket2 0.6.2", "strum", "tempfile", "thiserror 2.0.18", @@ -5004,6 +5014,7 @@ dependencies = [ "env_logger", "figment", "futures", + "harness_derive", "humantime", "iggy", "iggy_binary_protocol", diff --git a/Cargo.toml b/Cargo.toml index 0be7d8a9c..708f03d7b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ members = [ "core/connectors/sources/postgres_source", "core/connectors/sources/random_source", "core/consensus", + "core/harness_derive", "core/integration", "core/journal", "core/message_bus", @@ -153,6 +154,7 @@ getrandom = { version = "0.3", features = ["wasm_js"] } git2 = { version = "0.20.3", default-features = false, features = ["vendored-libgit2"] } gloo = "0.11" governor = "0.10.4" +harness_derive = { path = "core/harness_derive" } hash32 = "1.0.0" hostname = "0.4.2" human-repr = "1.1.0" diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 924cbd10d..2508b3fb6 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -339,6 +339,7 @@ h2: 0.4.13, "MIT", half: 2.7.1, "Apache-2.0 OR MIT", halfbrown: 0.4.0, "Apache-2.0 OR MIT", handlebars: 6.4.0, "MIT", +harness_derive: 0.1.0, "Apache-2.0", hash32: 0.2.1, "Apache-2.0 OR MIT", hash32: 1.0.0, "Apache-2.0 OR MIT", hashbrown: 0.12.3, "Apache-2.0 OR MIT", diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml index ac05978a5..f5cf019e1 100644 --- a/core/ai/mcp/Cargo.toml +++ b/core/ai/mcp/Cargo.toml @@ -49,6 +49,7 @@ rmcp = { workspace = true, features = [ ] } serde = { workspace = true } serde_json = { workspace = true } +socket2 = "0.6" strum = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/core/ai/mcp/src/api.rs b/core/ai/mcp/src/api.rs index 5b5f264e9..e0285baa0 100644 --- a/core/ai/mcp/src/api.rs +++ b/core/ai/mcp/src/api.rs @@ -32,10 +32,58 @@ use rmcp::{ StreamableHttpService, streamable_http_server::session::local::LocalSessionManager, }, }; +use socket2::{Domain, Protocol, Socket, Type}; use std::{net::SocketAddr, path::PathBuf, sync::Arc}; use tokio::spawn; use tracing::{error, info}; +fn create_reusable_listener(address: &str) -> Result { + let addr: SocketAddr = address.parse().map_err(|_| { + error!("Invalid address: {address}"); + McpRuntimeError::FailedToStartHttpServer + })?; + + let domain = if addr.is_ipv6() { + Domain::IPV6 + } else { + Domain::IPV4 + }; + + let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)).map_err(|e| { + error!("Failed to create socket: {e}"); + McpRuntimeError::FailedToStartHttpServer + })?; + + socket.set_reuse_address(true).map_err(|e| { + error!("Failed to set SO_REUSEADDR: {e}"); + McpRuntimeError::FailedToStartHttpServer + })?; + + #[cfg(unix)] + socket.set_reuse_port(true).map_err(|e| { + error!("Failed to set SO_REUSEPORT: {e}"); + McpRuntimeError::FailedToStartHttpServer + })?; + + socket.bind(&addr.into()).map_err(|e| { + error!("Failed to bind to {address}: {e}"); + McpRuntimeError::FailedToStartHttpServer + })?; + + socket.listen(128).map_err(|e| { + error!("Failed to listen on {address}: {e}"); + McpRuntimeError::FailedToStartHttpServer + })?; + + let listener: std::net::TcpListener = socket.into(); + listener.set_nonblocking(true).map_err(|e| { + error!("Failed to set non-blocking: {e}"); + McpRuntimeError::FailedToStartHttpServer + })?; + + Ok(listener) +} + pub async fn init( config: HttpConfig, iggy_client: Arc, @@ -77,15 +125,14 @@ pub async fn init( } if !config.tls.enabled { - let listener = tokio::net::TcpListener::bind(&config.address) - .await - .map_err(|error| { - error!("Failed to bind TCP listener: {:?}", error); - McpRuntimeError::FailedToStartHttpServer - })?; - let address = listener + let std_listener = create_reusable_listener(&config.address)?; + let address = std_listener .local_addr() .expect("Failed to get local address for HTTP server"); + let listener = tokio::net::TcpListener::from_std(std_listener).map_err(|e| { + error!("Failed to convert to tokio listener: {e}"); + McpRuntimeError::FailedToStartHttpServer + })?; info!( "HTTP API listening on: {address}, MCP path: {}", config.path @@ -110,8 +157,7 @@ pub async fn init( .await .expect("Failed to load TLS certificate or key file"); - let listener = - std::net::TcpListener::bind(&config.address).expect("Failed to bind TCP listener"); + let listener = create_reusable_listener(&config.address)?; let address = listener .local_addr() .expect("Failed to get local address for HTTPS / TLS server"); diff --git a/core/configs/src/configs_impl/typed_env_provider.rs b/core/configs/src/configs_impl/typed_env_provider.rs index 154884f90..41b0c19a9 100644 --- a/core/configs/src/configs_impl/typed_env_provider.rs +++ b/core/configs/src/configs_impl/typed_env_provider.rs @@ -54,6 +54,7 @@ const IGNORED_ENV_VARS: &[&str] = &[ "IGGY_MCP_CONFIG_PATH", "IGGY_ROOT_PASSWORD", "IGGY_ROOT_USERNAME", + "IGGY_TEST_CLEANUP_DISABLED", "IGGY_TEST_VERBOSE", ]; diff --git a/core/harness_derive/Cargo.toml b/core/harness_derive/Cargo.toml new file mode 100644 index 000000000..c9ed19f02 --- /dev/null +++ b/core/harness_derive/Cargo.toml @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "harness_derive" +version = "0.1.0" +edition = "2024" +license = "Apache-2.0" + +[lib] +proc-macro = true + +[dependencies] +proc-macro2 = { workspace = true } +quote = { workspace = true } +syn = { workspace = true } diff --git a/core/harness_derive/src/attrs.rs b/core/harness_derive/src/attrs.rs new file mode 100644 index 000000000..2e4389faa --- /dev/null +++ b/core/harness_derive/src/attrs.rs @@ -0,0 +1,750 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! Attribute parsing for `#[iggy_harness(...)]`. +//! +//! Supports: +//! - `test_client_transport = Tcp` (single) +//! - `test_client_transport = [Tcp, Http]` (matrix) +//! - `server(path.to.field = "value")` (static) +//! - `server(path.to.field = ["v1", "v2"])` (matrix) + +use proc_macro2::Span; +use syn::parse::{Parse, ParseStream}; +use syn::punctuated::Punctuated; +use syn::{Ident, LitStr, Token, bracketed, parenthesized}; + +/// Cluster node configuration. +#[derive(Debug, Clone, Default)] +pub enum ClusterNodesValue { + #[default] + None, + Single(usize), + Matrix(Vec), +} + +impl ClusterNodesValue { + pub fn variants(&self) -> Vec> { + match self { + ClusterNodesValue::None => vec![None], + ClusterNodesValue::Single(n) => vec![Some(*n)], + ClusterNodesValue::Matrix(v) => v.iter().map(|n| Some(*n)).collect(), + } + } +} + +/// Parsed `#[iggy_harness(...)]` attributes. +#[derive(Debug, Default)] +pub struct IggyTestAttrs { + pub transports: Vec, + /// True if `test_client_transport` was explicitly provided in attributes. + pub transport_explicit: bool, + pub server: ServerAttrs, + pub seed_fn: Option, + pub cluster_nodes: ClusterNodesValue, +} + +/// MCP configuration attributes. +#[derive(Debug, Default, Clone)] +pub struct McpAttrs { + pub consumer_name: Option, + pub http_path: Option, +} + +/// Connectors runtime configuration attributes. +#[derive(Debug, Default, Clone)] +pub struct ConnectorsRuntimeAttrs { + pub config_path: Option, +} + +#[cfg(test)] +impl IggyTestAttrs { + /// Create attrs with specified transports and default server config. + pub fn with_transports(transports: Vec) -> Self { + Self { + transports, + transport_explicit: true, + server: ServerAttrs::default(), + seed_fn: None, + cluster_nodes: ClusterNodesValue::None, + } + } +} + +/// Transport protocol variant. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Transport { + Tcp, + TcpTlsSelfSigned, + TcpTlsGenerated, + Http, + Quic, + WebSocket, + WebSocketTlsSelfSigned, + WebSocketTlsGenerated, +} + +impl Transport { + pub fn as_str(&self) -> &'static str { + match self { + Transport::Tcp => "tcp", + Transport::TcpTlsSelfSigned => "tcp_tls_self_signed", + Transport::TcpTlsGenerated => "tcp_tls_generated", + Transport::Http => "http", + Transport::Quic => "quic", + Transport::WebSocket => "websocket", + Transport::WebSocketTlsSelfSigned => "websocket_tls_self_signed", + Transport::WebSocketTlsGenerated => "websocket_tls_generated", + } + } + + pub fn variant_ident(&self) -> Ident { + let name = match self { + Transport::Tcp | Transport::TcpTlsSelfSigned | Transport::TcpTlsGenerated => "Tcp", + Transport::Http => "Http", + Transport::Quic => "Quic", + Transport::WebSocket + | Transport::WebSocketTlsSelfSigned + | Transport::WebSocketTlsGenerated => "WebSocket", + }; + Ident::new(name, Span::call_site()) + } + + pub fn client_config_method(&self) -> &'static str { + match self { + Transport::Tcp | Transport::TcpTlsSelfSigned | Transport::TcpTlsGenerated => "root_tcp", + Transport::Http => "root_http", + Transport::Quic => "root_quic", + Transport::WebSocket + | Transport::WebSocketTlsSelfSigned + | Transport::WebSocketTlsGenerated => "root_websocket", + } + } + + /// Returns the TLS mode if this transport uses TLS. + pub fn tls_mode(&self) -> Option { + match self { + Transport::TcpTlsSelfSigned | Transport::WebSocketTlsSelfSigned => { + Some(TlsMode::SelfSigned) + } + Transport::TcpTlsGenerated | Transport::WebSocketTlsGenerated => { + Some(TlsMode::Generated) + } + _ => None, + } + } + + /// Returns true if this transport uses WebSocket protocol. + pub fn is_websocket(&self) -> bool { + matches!( + self, + Transport::WebSocket + | Transport::WebSocketTlsSelfSigned + | Transport::WebSocketTlsGenerated + ) + } +} + +/// A single config override with dot-notation path. +#[derive(Debug, Clone)] +pub struct ConfigOverride { + pub path: String, + pub value: ConfigValue, +} + +/// Server configuration attributes. +#[derive(Debug, Default)] +pub struct ServerAttrs { + /// Dynamic config overrides using dot-notation paths. + pub config_overrides: Vec, + + /// Special cases requiring custom codegen. + pub mcp: Option, + pub connectors_runtime: Option, + pub tls: Option, + pub websocket_tls: Option, +} + +impl ServerAttrs { + /// Find a config override by its path. Used in tests. + #[cfg(test)] + pub fn find_override(&self, path: &str) -> Option<&ConfigOverride> { + self.config_overrides.iter().find(|o| o.path == path) + } +} + +/// TLS configuration mode for server. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TlsMode { + /// Server generates its own self-signed certs. Client cannot validate. + SelfSigned, + /// Harness generates test certs. Client can validate using CA cert. + Generated, +} + +/// Parsed TLS configuration from attributes. +#[derive(Debug, Clone)] +pub struct TlsConfig { + pub mode: TlsMode, +} + +/// A configuration value that can be static or a matrix. +#[derive(Debug, Default, Clone)] +pub enum ConfigValue { + #[default] + Unset, + Static(String), + Matrix(Vec), +} + +impl ConfigValue { + pub fn variants(&self) -> Vec> { + match self { + ConfigValue::Unset => vec![None], + ConfigValue::Static(s) => vec![Some(s.as_str())], + ConfigValue::Matrix(v) => v.iter().map(|s| Some(s.as_str())).collect(), + } + } +} + +impl Parse for IggyTestAttrs { + fn parse(input: ParseStream) -> syn::Result { + let mut attrs = IggyTestAttrs::default(); + + if input.is_empty() { + attrs.transports.push(Transport::Tcp); + return Ok(attrs); + } + + let items: Punctuated = Punctuated::parse_terminated(input)?; + + for item in items { + match item { + AttrItem::Transport(transports) => { + attrs.transports = transports; + attrs.transport_explicit = true; + } + AttrItem::Server(server) => { + attrs.server = *server; + } + AttrItem::Seed(path) => { + attrs.seed_fn = Some(path); + } + AttrItem::ClusterNodes(cluster) => { + attrs.cluster_nodes = cluster; + } + } + } + + if attrs.transports.is_empty() { + attrs.transports.push(Transport::Tcp); + } + + Ok(attrs) + } +} + +enum AttrItem { + Transport(Vec), + Server(Box), + Seed(syn::Path), + ClusterNodes(ClusterNodesValue), +} + +impl Parse for AttrItem { + fn parse(input: ParseStream) -> syn::Result { + let ident: Ident = input.parse()?; + let ident_str = ident.to_string(); + + match ident_str.as_str() { + "test_client_transport" => { + input.parse::()?; + let transports = parse_transport_value(input)?; + Ok(AttrItem::Transport(transports)) + } + "cluster_nodes" => { + input.parse::()?; + let cluster = parse_cluster_nodes_value(input)?; + Ok(AttrItem::ClusterNodes(cluster)) + } + "server" => { + let content; + parenthesized!(content in input); + let server = parse_server_attrs(&content)?; + Ok(AttrItem::Server(Box::new(server))) + } + "seed" => { + input.parse::()?; + let path: syn::Path = input.parse()?; + Ok(AttrItem::Seed(path)) + } + _ => Err(syn::Error::new( + ident.span(), + format!("unknown attribute: {ident_str}"), + )), + } + } +} + +fn parse_transport_value(input: ParseStream) -> syn::Result> { + if input.peek(syn::token::Bracket) { + let content; + bracketed!(content in input); + let idents: Punctuated = Punctuated::parse_terminated(&content)?; + idents.into_iter().map(parse_transport_ident).collect() + } else { + let ident: Ident = input.parse()?; + Ok(vec![parse_transport_ident(ident)?]) + } +} + +fn parse_cluster_nodes_value(input: ParseStream) -> syn::Result { + if input.peek(syn::token::Bracket) { + let content; + bracketed!(content in input); + let values: Punctuated = Punctuated::parse_terminated(&content)?; + let nodes: Result, _> = values + .into_iter() + .map(|lit| lit.base10_parse::()) + .collect(); + Ok(ClusterNodesValue::Matrix(nodes?)) + } else { + let lit: syn::LitInt = input.parse()?; + let n = lit.base10_parse::()?; + Ok(ClusterNodesValue::Single(n)) + } +} + +fn parse_transport_ident(ident: Ident) -> syn::Result { + match ident.to_string().as_str() { + "Tcp" => Ok(Transport::Tcp), + "TcpTlsSelfSigned" => Ok(Transport::TcpTlsSelfSigned), + "TcpTlsGenerated" => Ok(Transport::TcpTlsGenerated), + "Http" => Ok(Transport::Http), + "Quic" => Ok(Transport::Quic), + "WebSocket" => Ok(Transport::WebSocket), + "WebSocketTlsSelfSigned" => Ok(Transport::WebSocketTlsSelfSigned), + "WebSocketTlsGenerated" => Ok(Transport::WebSocketTlsGenerated), + other => Err(syn::Error::new( + ident.span(), + format!("unknown transport: {other}"), + )), + } +} + +/// Parses a dot-notation config key like `segment.size` or `partition.messages_required_to_save`. +fn parse_config_key(input: ParseStream) -> syn::Result<(String, Span)> { + let first: Ident = input.parse()?; + let span = first.span(); + let mut path = first.to_string(); + + while input.peek(Token![.]) { + input.parse::()?; + let next: Ident = input.parse()?; + path.push('.'); + path.push_str(&next.to_string()); + } + + Ok((path, span)) +} + +fn parse_tls_value(input: ParseStream, span: Span) -> syn::Result { + let lit: Ident = input.parse()?; + let mode = match lit.to_string().as_str() { + "self_signed" | "SelfSigned" => TlsMode::SelfSigned, + "generated" | "Generated" => TlsMode::Generated, + other => { + return Err(syn::Error::new( + span, + format!("unknown tls mode: {other}, expected 'self_signed' or 'generated'"), + )); + } + }; + Ok(TlsConfig { mode }) +} + +fn parse_server_attrs(input: ParseStream) -> syn::Result { + let mut server = ServerAttrs::default(); + + while !input.is_empty() { + let (key, span) = parse_config_key(input)?; + + match key.as_str() { + "mcp" => { + let mcp = if input.peek(syn::token::Paren) { + let content; + parenthesized!(content in input); + parse_mcp_attrs(&content)? + } else { + McpAttrs::default() + }; + server.mcp = Some(mcp); + } + "connectors_runtime" => { + let attrs = if input.peek(syn::token::Paren) { + let content; + parenthesized!(content in input); + parse_connectors_runtime_attrs(&content)? + } else { + ConnectorsRuntimeAttrs::default() + }; + server.connectors_runtime = Some(attrs); + } + "tls" => { + input.parse::()?; + server.tls = Some(parse_tls_value(input, span)?); + } + "websocket_tls" => { + input.parse::()?; + server.websocket_tls = Some(parse_tls_value(input, span)?); + } + _ => { + input.parse::()?; + let value = parse_config_value(input)?; + server + .config_overrides + .push(ConfigOverride { path: key, value }); + } + } + + if !input.is_empty() { + input.parse::()?; + } + } + + Ok(server) +} + +fn parse_config_value(input: ParseStream) -> syn::Result { + if input.peek(syn::token::Bracket) { + let content; + bracketed!(content in input); + let values: Punctuated = Punctuated::parse_terminated(&content)?; + Ok(ConfigValue::Matrix( + values.into_iter().map(|v| v.to_string_value()).collect(), + )) + } else if input.peek(LitStr) { + let lit: LitStr = input.parse()?; + Ok(ConfigValue::Static(lit.value())) + } else if input.peek(syn::LitBool) { + let lit: syn::LitBool = input.parse()?; + Ok(ConfigValue::Static(lit.value.to_string())) + } else if input.peek(syn::LitInt) { + let lit: syn::LitInt = input.parse()?; + Ok(ConfigValue::Static(lit.base10_digits().to_string())) + } else { + Err(input.error("expected string literal, bool, int, or array")) + } +} + +fn parse_mcp_attrs(input: ParseStream) -> syn::Result { + let mut mcp = McpAttrs::default(); + + let items: Punctuated = Punctuated::parse_terminated(input)?; + + for item in items { + match item.key.as_str() { + "consumer_name" => mcp.consumer_name = Some(item.value), + "http_path" => mcp.http_path = Some(item.value), + other => { + return Err(syn::Error::new( + Span::call_site(), + format!("unknown mcp attribute: {other}"), + )); + } + } + } + + Ok(mcp) +} + +fn parse_connectors_runtime_attrs(input: ParseStream) -> syn::Result { + let mut attrs = ConnectorsRuntimeAttrs::default(); + + let items: Punctuated = Punctuated::parse_terminated(input)?; + + for item in items { + match item.key.as_str() { + "config_path" => attrs.config_path = Some(item.value), + other => { + return Err(syn::Error::new( + Span::call_site(), + format!("unknown connectors_runtime attribute: {other}"), + )); + } + } + } + + Ok(attrs) +} + +struct KeyValueAttrItem { + key: String, + value: String, +} + +impl Parse for KeyValueAttrItem { + fn parse(input: ParseStream) -> syn::Result { + let ident: Ident = input.parse()?; + input.parse::()?; + let lit: LitStr = input.parse()?; + + Ok(KeyValueAttrItem { + key: ident.to_string(), + value: lit.value(), + }) + } +} + +/// A literal that can be a string or an integer. +enum ArrayLiteral { + Str(LitStr), + Int(syn::LitInt), +} + +impl Parse for ArrayLiteral { + fn parse(input: ParseStream) -> syn::Result { + if input.peek(LitStr) { + Ok(ArrayLiteral::Str(input.parse()?)) + } else if input.peek(syn::LitInt) { + Ok(ArrayLiteral::Int(input.parse()?)) + } else { + Err(input.error("expected string or integer literal")) + } + } +} + +impl ArrayLiteral { + fn to_string_value(&self) -> String { + match self { + ArrayLiteral::Str(s) => s.value(), + ArrayLiteral::Int(i) => i.base10_digits().to_string(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_empty() { + let attrs: IggyTestAttrs = syn::parse_quote!(); + assert_eq!(attrs.transports.len(), 1); + assert_eq!(attrs.transports[0], Transport::Tcp); + } + + #[test] + fn parse_single_transport() { + let attrs: IggyTestAttrs = syn::parse_quote!(test_client_transport = Http); + assert_eq!(attrs.transports.len(), 1); + assert_eq!(attrs.transports[0], Transport::Http); + } + + #[test] + fn parse_transport_array() { + let attrs: IggyTestAttrs = syn::parse_quote!(test_client_transport = [Tcp, Http, Quic]); + assert_eq!(attrs.transports.len(), 3); + assert_eq!(attrs.transports[0], Transport::Tcp); + assert_eq!(attrs.transports[1], Transport::Http); + assert_eq!(attrs.transports[2], Transport::Quic); + } + + #[test] + fn parse_server_static() { + let attrs: IggyTestAttrs = syn::parse_quote!(server(segment.size = "1MiB")); + let segment_size = attrs.server.find_override("segment.size").unwrap(); + assert!(matches!(&segment_size.value, ConfigValue::Static(s) if s == "1MiB")); + } + + #[test] + fn parse_server_matrix() { + let attrs: IggyTestAttrs = syn::parse_quote!(server(segment.size = ["512B", "1MiB"])); + let segment_size = attrs.server.find_override("segment.size").unwrap(); + assert!(matches!(&segment_size.value, ConfigValue::Matrix(v) if v.len() == 2)); + } + + #[test] + fn parse_full() { + let attrs: IggyTestAttrs = syn::parse_quote!( + test_client_transport = [Tcp, Http], + server( + segment.size = ["512B", "1MiB"], + segment.cache_indexes = "none", + tcp.socket.nodelay = true + ) + ); + assert_eq!(attrs.transports.len(), 2); + let segment_size = attrs.server.find_override("segment.size").unwrap(); + let cache_indexes = attrs.server.find_override("segment.cache_indexes").unwrap(); + let tcp_nodelay = attrs.server.find_override("tcp.socket.nodelay").unwrap(); + assert!(matches!(&segment_size.value, ConfigValue::Matrix(v) if v.len() == 2)); + assert!(matches!(&cache_indexes.value, ConfigValue::Static(s) if s == "none")); + assert!(matches!(&tcp_nodelay.value, ConfigValue::Static(s) if s == "true")); + } + + #[test] + fn parse_tls_transports() { + let attrs: IggyTestAttrs = + syn::parse_quote!(test_client_transport = [TcpTlsSelfSigned, TcpTlsGenerated]); + assert_eq!(attrs.transports.len(), 2); + assert_eq!(attrs.transports[0], Transport::TcpTlsSelfSigned); + assert_eq!(attrs.transports[1], Transport::TcpTlsGenerated); + } + + #[test] + fn parse_mcp_empty() { + let attrs: IggyTestAttrs = syn::parse_quote!(server(mcp)); + assert!(attrs.server.mcp.is_some()); + let mcp = attrs.server.mcp.unwrap(); + assert!(mcp.consumer_name.is_none()); + assert!(mcp.http_path.is_none()); + } + + #[test] + fn parse_mcp_with_consumer() { + let attrs: IggyTestAttrs = syn::parse_quote!(server(mcp(consumer_name = "test-consumer"))); + assert!(attrs.server.mcp.is_some()); + let mcp = attrs.server.mcp.unwrap(); + assert_eq!(mcp.consumer_name, Some("test-consumer".to_string())); + } + + #[test] + fn parse_mcp_with_http_path() { + let attrs: IggyTestAttrs = + syn::parse_quote!(server(mcp(consumer_name = "test", http_path = "/custom"))); + assert!(attrs.server.mcp.is_some()); + let mcp = attrs.server.mcp.unwrap(); + assert_eq!(mcp.consumer_name, Some("test".to_string())); + assert_eq!(mcp.http_path, Some("/custom".to_string())); + } + + #[test] + fn parse_seed() { + let attrs: IggyTestAttrs = syn::parse_quote!(seed = my_seed_fn); + assert!(attrs.seed_fn.is_some()); + } + + #[test] + fn parse_mcp_with_seed() { + let attrs: IggyTestAttrs = syn::parse_quote!(server(mcp), seed = crate::seeds::standard); + assert!(attrs.server.mcp.is_some()); + assert!(attrs.seed_fn.is_some()); + } + + #[test] + fn parse_mcp_combined() { + let attrs: IggyTestAttrs = + syn::parse_quote!(seed = my_seed, server(mcp, segment.size = "1MiB")); + assert!(attrs.server.mcp.is_some()); + assert!(attrs.seed_fn.is_some()); + let segment_size = attrs.server.find_override("segment.size").unwrap(); + assert!(matches!(&segment_size.value, ConfigValue::Static(s) if s == "1MiB")); + } + + #[test] + fn parse_cluster_enabled() { + let attrs: IggyTestAttrs = syn::parse_quote!(server(cluster.enabled = true)); + let cluster_enabled = attrs.server.find_override("cluster.enabled").unwrap(); + assert!(matches!(&cluster_enabled.value, ConfigValue::Static(s) if s == "true")); + } + + #[test] + fn parse_cluster_enabled_false() { + let attrs: IggyTestAttrs = syn::parse_quote!(server(cluster.enabled = false)); + let cluster_enabled = attrs.server.find_override("cluster.enabled").unwrap(); + assert!(matches!(&cluster_enabled.value, ConfigValue::Static(s) if s == "false")); + } + + #[test] + fn parse_cluster_enabled_with_mcp() { + let attrs: IggyTestAttrs = syn::parse_quote!(server(cluster.enabled = true, mcp)); + let cluster_enabled = attrs.server.find_override("cluster.enabled").unwrap(); + assert!(matches!(&cluster_enabled.value, ConfigValue::Static(s) if s == "true")); + assert!(attrs.server.mcp.is_some()); + } + + #[test] + fn parse_dot_notation_deep() { + let attrs: IggyTestAttrs = syn::parse_quote!(server( + partition.messages_required_to_save = [32, 64], + system.encryption.enabled = true + )); + assert_eq!(attrs.server.config_overrides.len(), 2); + let msgs = attrs + .server + .find_override("partition.messages_required_to_save") + .unwrap(); + assert!(matches!(&msgs.value, ConfigValue::Matrix(v) if v.len() == 2)); + } + + #[test] + fn parse_tls_self_signed() { + let attrs: IggyTestAttrs = syn::parse_quote!(server(tls = self_signed)); + assert!(attrs.server.tls.is_some()); + assert_eq!(attrs.server.tls.unwrap().mode, TlsMode::SelfSigned); + } + + #[test] + fn parse_tls_generated() { + let attrs: IggyTestAttrs = syn::parse_quote!(server(tls = generated)); + assert!(attrs.server.tls.is_some()); + assert_eq!(attrs.server.tls.unwrap().mode, TlsMode::Generated); + } + + #[test] + fn parse_websocket_tls() { + let attrs: IggyTestAttrs = syn::parse_quote!(server(websocket_tls = generated)); + assert!(attrs.server.websocket_tls.is_some()); + assert_eq!(attrs.server.websocket_tls.unwrap().mode, TlsMode::Generated); + } + + #[test] + fn parse_cluster_nodes_single() { + let attrs: IggyTestAttrs = syn::parse_quote!(cluster_nodes = 3); + assert!(matches!(attrs.cluster_nodes, ClusterNodesValue::Single(3))); + } + + #[test] + fn parse_cluster_nodes_matrix() { + let attrs: IggyTestAttrs = syn::parse_quote!(cluster_nodes = [3, 5]); + assert!(matches!(&attrs.cluster_nodes, ClusterNodesValue::Matrix(v) if v == &[3, 5])); + } + + #[test] + fn parse_cluster_nodes_with_transport() { + let attrs: IggyTestAttrs = + syn::parse_quote!(cluster_nodes = 3, test_client_transport = Tcp); + assert!(matches!(attrs.cluster_nodes, ClusterNodesValue::Single(3))); + assert_eq!(attrs.transports.len(), 1); + assert_eq!(attrs.transports[0], Transport::Tcp); + } + + #[test] + fn parse_cluster_nodes_full_matrix() { + let attrs: IggyTestAttrs = syn::parse_quote!( + cluster_nodes = [3, 5], + test_client_transport = [Tcp, Http], + server(segment.size = ["512B", "1MiB"]) + ); + assert!(matches!(&attrs.cluster_nodes, ClusterNodesValue::Matrix(v) if v == &[3, 5])); + assert_eq!(attrs.transports.len(), 2); + let segment_size = attrs.server.find_override("segment.size").unwrap(); + assert!(matches!(&segment_size.value, ConfigValue::Matrix(v) if v.len() == 2)); + } +} diff --git a/core/harness_derive/src/codegen.rs b/core/harness_derive/src/codegen.rs new file mode 100644 index 000000000..2d445c942 --- /dev/null +++ b/core/harness_derive/src/codegen.rs @@ -0,0 +1,820 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::attrs::{ConfigOverride, IggyTestAttrs, TlsMode, Transport}; +use crate::params::{ + DetectedParam, analyze_signature, fixture_params, matrix_params, needs_fixtures, +}; +use proc_macro2::{Span, TokenStream}; +use quote::{format_ident, quote}; +use syn::{Ident, ItemFn}; + +/// Represents a single test variant configuration. +#[derive(Debug)] +struct TestVariant { + transport: Transport, + /// Whether transport was explicitly specified in attributes. + transport_explicit: bool, + /// Generic config overrides as (path, value) pairs. + config_values: Vec<(String, String)>, + /// TLS config from transport (not from server attrs). + tls: Option, + /// WebSocket TLS from transport (not from server attrs). + websocket_tls: Option, + /// Cluster node count (None = single server). + cluster_nodes: Option, +} + +impl TestVariant { + fn suffix(&self) -> String { + let mut parts = Vec::new(); + + if self.transport_explicit { + parts.push(self.transport.as_str().to_string()); + } + + if let Some(n) = self.cluster_nodes { + parts.push(format!("cluster_{n}")); + } + + for (path, value) in &self.config_values { + let key = path.replace('.', "_"); + parts.push(format!("{}_{}", key, sanitize_value(value))); + } + + parts.join("_") + } +} + +fn sanitize_value(s: &str) -> String { + s.to_lowercase() + .chars() + .filter(|c| c.is_alphanumeric() || *c == '_') + .collect() +} + +/// Compute cartesian product of config override variants. +fn cartesian_product(overrides: &[ConfigOverride]) -> Vec> { + if overrides.is_empty() { + return vec![vec![]]; + } + + let mut result = vec![vec![]]; + + for config_override in overrides { + let variants = config_override.value.variants(); + let mut new_result = Vec::new(); + + for existing in &result { + for variant_value in &variants { + let mut new_combo = existing.clone(); + if let Some(val) = variant_value { + new_combo.push((config_override.path.clone(), val.to_string())); + } + new_result.push(new_combo); + } + } + + result = new_result; + } + + result +} + +/// Generate all infrastructure variants from attributes. +fn generate_variants(attrs: &IggyTestAttrs) -> Vec { + let config_combos = cartesian_product(&attrs.server.config_overrides); + let cluster_variants = attrs.cluster_nodes.variants(); + + let mut variants = Vec::new(); + + for transport in &attrs.transports { + let tls = transport.tls_mode(); + let websocket_tls = if transport.is_websocket() { tls } else { None }; + let tcp_tls = if transport.is_websocket() { None } else { tls }; + + for cluster_nodes in &cluster_variants { + for combo in &config_combos { + variants.push(TestVariant { + transport: *transport, + transport_explicit: attrs.transport_explicit, + config_values: combo.clone(), + tls: tcp_tls, + websocket_tls, + cluster_nodes: *cluster_nodes, + }); + } + } + } + + variants +} + +/// Generate test code from attributes and input function. +pub fn generate_tests(attrs: &IggyTestAttrs, input: &ItemFn) -> syn::Result { + let fn_name = &input.sig.ident; + let fn_vis = &input.vis; + let fn_body = &input.block; + + let other_attrs: Vec<_> = input + .attrs + .iter() + .filter(|attr| !attr.path().is_ident("iggy_harness")) + .collect(); + + let params = analyze_signature(&input.sig)?; + let matrix_params_list = matrix_params(¶ms); + let has_matrix_params = !matrix_params_list.is_empty(); + + let variants = generate_variants(attrs); + + if variants.len() == 1 && !has_matrix_params { + return generate_single_test( + fn_name, + fn_vis, + fn_body, + &other_attrs, + ¶ms, + &variants[0], + attrs, + ); + } + + if has_matrix_params { + return generate_impl_functions_for_test_matrix( + fn_name, fn_vis, fn_body, input, ¶ms, &variants, attrs, + ); + } + + generate_test_module( + fn_name, + fn_vis, + fn_body, + &other_attrs, + ¶ms, + &variants, + attrs, + ) +} + +fn generate_single_test( + fn_name: &Ident, + fn_vis: &syn::Visibility, + fn_body: &syn::Block, + other_attrs: &[&syn::Attribute], + params: &[DetectedParam], + variant: &TestVariant, + attrs: &IggyTestAttrs, +) -> syn::Result { + let has_fixtures = needs_fixtures(params); + let fixture_setup = generate_fixture_setup(params); + let fixture_envs = generate_fixture_envs_collection(params); + let harness_setup = generate_harness_setup(variant, has_fixtures, attrs); + let fixture_seed = generate_fixture_seed(params); + let start_and_seed = generate_start_and_seed(attrs, fixture_seed); + let param_bindings = generate_param_bindings(params); + + Ok(quote! { + #(#other_attrs)* + #[::tokio::test] + #[::serial_test::parallel] + #fn_vis async fn #fn_name() { + #fixture_setup + #fixture_envs + #harness_setup + #start_and_seed + #param_bindings + #fn_body + } + }) +} + +fn generate_test_module( + fn_name: &Ident, + fn_vis: &syn::Visibility, + fn_body: &syn::Block, + other_attrs: &[&syn::Attribute], + params: &[DetectedParam], + variants: &[TestVariant], + attrs: &IggyTestAttrs, +) -> syn::Result { + let has_fixtures = needs_fixtures(params); + let fixture_setup = generate_fixture_setup(params); + let fixture_envs = generate_fixture_envs_collection(params); + let fixture_seed = generate_fixture_seed(params); + let param_bindings = generate_param_bindings(params); + + let mut test_fns = Vec::new(); + + for variant in variants { + let test_name = format_ident!("{}", variant.suffix()); + let harness_setup = generate_harness_setup(variant, has_fixtures, attrs); + let start_and_seed = generate_start_and_seed(attrs, fixture_seed.clone()); + + test_fns.push(quote! { + #(#other_attrs)* + #[::tokio::test] + #[::serial_test::parallel] + async fn #test_name() { + #fixture_setup + #fixture_envs + #harness_setup + #start_and_seed + #param_bindings + #fn_body + } + }); + } + + Ok(quote! { + #fn_vis mod #fn_name { + use super::*; + #(#test_fns)* + } + }) +} + +fn generate_impl_functions_for_test_matrix( + fn_name: &Ident, + fn_vis: &syn::Visibility, + fn_body: &syn::Block, + input: &ItemFn, + params: &[DetectedParam], + variants: &[TestVariant], + attrs: &IggyTestAttrs, +) -> syn::Result { + let matrix_params_list: Vec<_> = params + .iter() + .filter_map(|p| { + if let DetectedParam::MatrixParam { name, ty } = p { + Some((name, ty)) + } else { + None + } + }) + .collect(); + + let param_names: Vec<_> = matrix_params_list.iter().map(|(name, _)| *name).collect(); + let param_types: Vec<_> = matrix_params_list.iter().map(|(_, ty)| *ty).collect(); + + let other_attrs: Vec<_> = input + .attrs + .iter() + .filter(|attr| { + let path = attr.path(); + !path.is_ident("iggy_harness") + }) + .collect(); + + let has_fixtures = needs_fixtures(params); + let fixture_setup = generate_fixture_setup(params); + let fixture_envs = generate_fixture_envs_collection(params); + let fixture_seed = generate_fixture_seed(params); + let param_bindings = generate_param_bindings(params); + let start_and_seed = generate_start_and_seed(attrs, fixture_seed.clone()); + + if variants.len() == 1 { + let variant = &variants[0]; + let harness_setup = generate_harness_setup(variant, has_fixtures, attrs); + + return Ok(quote! { + #(#other_attrs)* + #[::tokio::test] + #[::serial_test::parallel] + #fn_vis async fn #fn_name(#(#param_names: #param_types),*) { + #fixture_setup + #fixture_envs + #harness_setup + #start_and_seed + #param_bindings + #fn_body + } + }); + } + + let mut impl_fns = Vec::new(); + let mut test_fn_calls = Vec::new(); + + for variant in variants { + let impl_name = format_ident!("__impl_{}", variant.suffix()); + let harness_setup = generate_harness_setup(variant, has_fixtures, attrs); + + impl_fns.push(quote! { + async fn #impl_name(#(#param_names: #param_types),*) { + #fixture_setup + #fixture_envs + #harness_setup + #start_and_seed + #param_bindings + #fn_body + } + }); + + let test_name = format_ident!("{}", variant.suffix()); + test_fn_calls.push(quote! { + #(#other_attrs)* + #[::tokio::test] + #[::serial_test::parallel] + async fn #test_name(#(#param_names: #param_types),*) { + #impl_name(#(#param_names),*).await; + } + }); + } + + Ok(quote! { + #fn_vis mod #fn_name { + use super::*; + #(#impl_fns)* + #(#test_fn_calls)* + } + }) +} + +fn generate_tls_config_token(mode: TlsMode) -> TokenStream { + match mode { + TlsMode::SelfSigned => quote!(::integration::harness::TlsConfig::self_signed()), + TlsMode::Generated => quote!(::integration::harness::TlsConfig::generated()), + } +} + +fn generate_harness_setup( + variant: &TestVariant, + has_fixtures: bool, + attrs: &IggyTestAttrs, +) -> TokenStream { + let transport = variant.transport.variant_ident(); + + // Build config entries for runtime validation + let config_entries: Vec<_> = variant + .config_values + .iter() + .map(|(path, value)| quote!((#path.to_string(), #value.to_string()))) + .collect(); + + let has_config_overrides = !config_entries.is_empty(); + + // Generate the config override resolution + let config_resolution = if has_config_overrides { + quote! { + let __config_overrides: ::std::collections::HashMap = + [#(#config_entries),*].into_iter().collect(); + let __extra_envs = ::integration::harness::resolve_config_paths(&__config_overrides) + .unwrap_or_else(|e| panic!("invalid config path in #[iggy_harness]:\n{}", e)); + } + } else { + quote! { + let __extra_envs = ::std::collections::HashMap::::new(); + } + }; + + // Build server config with TLS from transport and explicit attrs + let mut server_builder_calls = Vec::new(); + + // TLS config: transport takes precedence over explicit server attrs + if let Some(mode) = variant.tls { + let tls_config = generate_tls_config_token(mode); + server_builder_calls.push(quote!(.tls(#tls_config))); + } else if let Some(ref tls) = attrs.server.tls { + let tls_config = generate_tls_config_token(tls.mode); + server_builder_calls.push(quote!(.tls(#tls_config))); + } + + if let Some(mode) = variant.websocket_tls { + let tls_config = generate_tls_config_token(mode); + server_builder_calls.push(quote!(.websocket_tls(#tls_config))); + } else if let Some(ref tls) = attrs.server.websocket_tls { + let tls_config = generate_tls_config_token(tls.mode); + server_builder_calls.push(quote!(.websocket_tls(#tls_config))); + } + + // Always add extra_envs (may be empty) + server_builder_calls.push(quote!(.extra_envs(__extra_envs))); + + let server_config = quote! { + ::integration::harness::TestServerConfig::builder() + #(#server_builder_calls)* + .build() + }; + + // Configure primary client with TLS based on transport + let tls_mode = variant.tls.or(variant.websocket_tls); + let client_config_method = + Ident::new(variant.transport.client_config_method(), Span::call_site()); + let client_config = match tls_mode { + Some(TlsMode::Generated) => { + quote! { + ::integration::harness::ClientConfig::#client_config_method() + .with_tls("localhost".to_string(), None, true) + } + } + Some(TlsMode::SelfSigned) => { + quote! { + ::integration::harness::ClientConfig::#client_config_method() + .with_tls("localhost".to_string(), None, false) + } + } + None => { + quote!(::integration::harness::ClientConfig::#client_config_method()) + } + }; + + let mcp_builder_call = if let Some(ref mcp_attrs) = attrs.server.mcp { + if let Some(ref consumer) = mcp_attrs.consumer_name { + quote!(.mcp(::integration::harness::McpConfig::builder() + .consumer_name(#consumer) + .build())) + } else { + quote!(.default_mcp()) + } + } else { + quote!() + }; + + let connectors_runtime_builder_call = if let Some(ref runtime_attrs) = + attrs.server.connectors_runtime + { + let config_path = runtime_attrs + .config_path + .as_deref() + .unwrap_or("connectors/config.toml"); + if has_fixtures { + quote!(.connectors_runtime(::integration::harness::ConnectorsRuntimeConfig::builder() + .config_path(::std::path::PathBuf::from(#config_path)) + .extra_envs(__fixture_envs.clone()) + .build())) + } else { + quote!(.connectors_runtime(::integration::harness::ConnectorsRuntimeConfig::builder() + .config_path(::std::path::PathBuf::from(#config_path)) + .build())) + } + } else { + quote!() + }; + + let cluster_builder_call = if let Some(n) = variant.cluster_nodes { + quote!(.cluster_nodes(#n)) + } else { + quote!() + }; + + quote! { + #config_resolution + let mut __harness = ::integration::harness::TestHarness::builder() + .server(#server_config) + .primary_client(#client_config) + #mcp_builder_call + #connectors_runtime_builder_call + #cluster_builder_call + .build() + .unwrap_or_else(|e| panic!("failed to build test harness: {e}")); + let _ = ::integration::__macro_support::TransportProtocol::#transport; + } +} + +/// Generate the harness start call and seed handling. +/// +/// When a seed function is present, uses `start_with_seed` to run seed +/// after server but before MCP and connectors runtime (which may depend on seed data). +/// Fixture seeds are combined with the global seed. +fn generate_start_and_seed(attrs: &IggyTestAttrs, fixture_seed: TokenStream) -> TokenStream { + let has_fixture_seed = !fixture_seed.is_empty(); + match (&attrs.seed_fn, has_fixture_seed) { + (Some(seed_fn), true) => { + quote! { + __harness.start_with_seed(|__seed_client| async move { + #seed_fn(&__seed_client).await?; + #fixture_seed + Ok(()) + }).await.unwrap_or_else(|e| panic!("failed to start test harness: {e}")); + } + } + (Some(seed_fn), false) => { + quote! { + __harness.start_with_seed(|__seed_client| async move { + #seed_fn(&__seed_client).await + }).await.unwrap_or_else(|e| panic!("failed to start test harness: {e}")); + } + } + (None, true) => { + quote! { + __harness.start_with_seed(|__seed_client| async move { + #fixture_seed + Ok(()) + }).await.unwrap_or_else(|e| panic!("failed to start test harness: {e}")); + } + } + (None, false) => { + quote! { + __harness.start().await.unwrap_or_else(|e| panic!("failed to start test harness: {e}")); + } + } + } +} + +/// Generate fixture setup calls (before harness setup). +fn generate_fixture_setup(params: &[DetectedParam]) -> TokenStream { + let fixtures = fixture_params(params); + if fixtures.is_empty() { + return quote!(); + } + + let setup_calls: Vec<_> = fixtures + .iter() + .filter_map(|p| { + if let DetectedParam::Fixture { name, ty } = p { + let var_name = format_ident!("__fixture_{}", name); + Some(quote! { + let #var_name = <#ty as ::integration::harness::TestFixture>::setup() + .await + .expect("failed to setup fixture"); + }) + } else { + None + } + }) + .collect(); + + quote!(#(#setup_calls)*) +} + +/// Generate fixture envs collection (after fixture setup, before harness). +fn generate_fixture_envs_collection(params: &[DetectedParam]) -> TokenStream { + let fixtures = fixture_params(params); + if fixtures.is_empty() { + return quote!(); + } + + let env_calls: Vec<_> = fixtures + .iter() + .filter_map(|p| { + if let DetectedParam::Fixture { name, .. } = p { + let var_name = format_ident!("__fixture_{}", name); + Some(quote! { + __fixture_envs.extend( + ::integration::harness::TestFixture::connectors_runtime_envs(&#var_name) + ); + }) + } else { + None + } + }) + .collect(); + + quote! { + let mut __fixture_envs = ::std::collections::HashMap::::new(); + #(#env_calls)* + } +} + +/// Generate fixture seed calls (inside start_with_seed closure). +/// +/// Note: Currently disabled to avoid move semantics issues with async closures. +/// Fixtures that need to seed data should do so in the test body after harness start. +fn generate_fixture_seed(_params: &[DetectedParam]) -> TokenStream { + // Fixture seeding is disabled for now because: + // 1. The async move closure in start_with_seed captures the fixture by value + // 2. This prevents using the fixture in the test body after seeding + // 3. Most fixtures don't need to seed data - they just provide env vars + // + // If a fixture needs to seed data, it can be done manually in the test body: + // fixture.seed(&client).await.unwrap(); + quote!() +} + +fn generate_param_bindings(params: &[DetectedParam]) -> TokenStream { + let mut bindings = Vec::new(); + + for param in params { + match param { + DetectedParam::HarnessRef { name } => { + bindings.push(quote! { + let #name = &__harness; + }); + } + DetectedParam::HarnessMut { name } => { + bindings.push(quote! { + let #name = &mut __harness; + }); + } + DetectedParam::HarnessOwned { name } => { + bindings.push(quote! { + let #name = __harness; + }); + } + DetectedParam::HarnessOwnedMut { name } => { + bindings.push(quote! { + let mut #name = __harness; + }); + } + DetectedParam::Fixture { name, .. } => { + let fixture_var = format_ident!("__fixture_{}", name); + bindings.push(quote! { + let #name = #fixture_var; + }); + } + DetectedParam::MatrixParam { .. } => {} + } + } + + quote!(#(#bindings)*) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::attrs::{ConfigOverride, ConfigValue}; + + #[test] + fn variant_suffix_basic_explicit() { + let v = TestVariant { + transport: Transport::Tcp, + transport_explicit: true, + config_values: vec![], + tls: None, + websocket_tls: None, + cluster_nodes: None, + }; + assert_eq!(v.suffix(), "tcp"); + } + + #[test] + fn variant_suffix_implicit_transport() { + let v = TestVariant { + transport: Transport::Tcp, + transport_explicit: false, + config_values: vec![], + tls: None, + websocket_tls: None, + cluster_nodes: None, + }; + assert_eq!(v.suffix(), ""); + } + + #[test] + fn variant_suffix_implicit_transport_with_cluster() { + let v = TestVariant { + transport: Transport::Tcp, + transport_explicit: false, + config_values: vec![], + tls: None, + websocket_tls: None, + cluster_nodes: Some(3), + }; + assert_eq!(v.suffix(), "cluster_3"); + } + + #[test] + fn variant_suffix_full() { + let v = TestVariant { + transport: Transport::Http, + transport_explicit: true, + config_values: vec![ + ("segment.size".to_string(), "1MiB".to_string()), + ("segment.cache_indexes".to_string(), "all".to_string()), + ( + "partition.messages_required_to_save".to_string(), + "64".to_string(), + ), + ], + tls: None, + websocket_tls: None, + cluster_nodes: None, + }; + assert_eq!( + v.suffix(), + "http_segment_size_1mib_segment_cache_indexes_all_partition_messages_required_to_save_64" + ); + } + + #[test] + fn variant_suffix_with_cluster() { + let v = TestVariant { + transport: Transport::Tcp, + transport_explicit: true, + config_values: vec![], + tls: None, + websocket_tls: None, + cluster_nodes: Some(3), + }; + assert_eq!(v.suffix(), "tcp_cluster_3"); + } + + #[test] + fn variant_suffix_with_tls() { + let v = TestVariant { + transport: Transport::TcpTlsSelfSigned, + transport_explicit: true, + config_values: vec![], + tls: Some(TlsMode::SelfSigned), + websocket_tls: None, + cluster_nodes: None, + }; + assert_eq!(v.suffix(), "tcp_tls_self_signed"); + + let v = TestVariant { + transport: Transport::TcpTlsGenerated, + transport_explicit: true, + config_values: vec![], + tls: Some(TlsMode::Generated), + websocket_tls: None, + cluster_nodes: None, + }; + assert_eq!(v.suffix(), "tcp_tls_generated"); + } + + #[test] + fn generate_variants_simple() { + let attrs = IggyTestAttrs::with_transports(vec![Transport::Tcp]); + let variants = generate_variants(&attrs); + assert_eq!(variants.len(), 1); + assert_eq!(variants[0].transport, Transport::Tcp); + } + + #[test] + fn generate_variants_transport_matrix() { + let attrs = IggyTestAttrs::with_transports(vec![Transport::Tcp, Transport::Http]); + let variants = generate_variants(&attrs); + assert_eq!(variants.len(), 2); + } + + #[test] + fn generate_variants_full_matrix() { + let attrs = IggyTestAttrs { + transports: vec![Transport::Tcp, Transport::Http], + transport_explicit: true, + server: crate::attrs::ServerAttrs { + config_overrides: vec![ + ConfigOverride { + path: "segment.size".to_string(), + value: ConfigValue::Matrix(vec!["512B".to_string(), "1MiB".to_string()]), + }, + ConfigOverride { + path: "segment.cache_indexes".to_string(), + value: ConfigValue::Matrix(vec!["none".to_string(), "all".to_string()]), + }, + ], + ..Default::default() + }, + seed_fn: None, + cluster_nodes: crate::attrs::ClusterNodesValue::None, + }; + let variants = generate_variants(&attrs); + // 2 transports * 2 segment sizes * 2 cache modes = 8 variants + assert_eq!(variants.len(), 8); + } + + #[test] + fn cartesian_product_empty() { + let result = cartesian_product(&[]); + assert_eq!(result, vec![vec![]]); + } + + #[test] + fn cartesian_product_single() { + let overrides = vec![ConfigOverride { + path: "segment.size".to_string(), + value: ConfigValue::Matrix(vec!["512B".to_string(), "1MiB".to_string()]), + }]; + let result = cartesian_product(&overrides); + assert_eq!(result.len(), 2); + assert_eq!( + result[0], + vec![("segment.size".to_string(), "512B".to_string())] + ); + assert_eq!( + result[1], + vec![("segment.size".to_string(), "1MiB".to_string())] + ); + } + + #[test] + fn cartesian_product_multiple() { + let overrides = vec![ + ConfigOverride { + path: "a".to_string(), + value: ConfigValue::Matrix(vec!["1".to_string(), "2".to_string()]), + }, + ConfigOverride { + path: "b".to_string(), + value: ConfigValue::Matrix(vec!["x".to_string(), "y".to_string()]), + }, + ]; + let result = cartesian_product(&overrides); + assert_eq!(result.len(), 4); + } +} diff --git a/core/harness_derive/src/lib.rs b/core/harness_derive/src/lib.rs new file mode 100644 index 000000000..4f5517eba --- /dev/null +++ b/core/harness_derive/src/lib.rs @@ -0,0 +1,94 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! Procedural macro for ergonomic integration test definition. +//! +//! The `#[iggy_harness]` attribute replaces boilerplate harness setup with a declarative DSL. +//! +//! # Examples +//! +//! Simple test with default TCP transport: +//! ```ignore +//! #[iggy_harness] +//! async fn test_ping(client: &IggyClient) { +//! client.ping().await.unwrap(); +//! } +//! ``` +//! +//! Test with transport matrix: +//! ```ignore +//! #[iggy_harness(transport = [Tcp, Http, Quic, WebSocket])] +//! async fn test_all_transports(client: &IggyClient) { +//! client.ping().await.unwrap(); +//! } +//! ``` +//! +//! Test with server config matrix: +//! ```ignore +//! #[iggy_harness(server( +//! segment_size = ["512B", "1MiB"], +//! cache_indexes = ["none", "all"], +//! ))] +//! async fn test_caching(client: &IggyClient) { +//! // 2 segment sizes × 2 cache modes = 4 tests +//! } +//! ``` + +mod attrs; +mod codegen; +mod params; + +use proc_macro::TokenStream; +use syn::{ItemFn, parse_macro_input}; + +use attrs::IggyTestAttrs; +use codegen::generate_tests; + +/// Attribute macro for declaring Iggy integration tests. +/// +/// This macro handles test harness setup, client creation, and generates test variants +/// for infrastructure dimensions (transport, server config). +/// +/// # Attributes +/// +/// - `transport = [Tcp, Http, Quic, WebSocket]` - Generate variants for each transport +/// - `server(key = value)` - Static server config +/// - `server(key = [v1, v2])` - Generate variants for each config value +/// +/// # Special Parameters +/// +/// The test function can request special parameters: +/// - `client: &IggyClient` - Injected client connected to the test server +/// - `harness: &TestHarness` - Reference to the test harness (for data_path, etc.) +/// - `harness: &mut TestHarness` - Mutable reference (for restart_server, etc.) +/// +/// Any other parameters are passed through from `#[test_matrix]`. +#[proc_macro_attribute] +pub fn iggy_harness(attr: TokenStream, item: TokenStream) -> TokenStream { + let attrs = match syn::parse::(attr) { + Ok(a) => a, + Err(e) => return e.to_compile_error().into(), + }; + + let input = parse_macro_input!(item as ItemFn); + + match generate_tests(&attrs, &input) { + Ok(tokens) => tokens.into(), + Err(e) => e.to_compile_error().into(), + } +} diff --git a/core/harness_derive/src/params.rs b/core/harness_derive/src/params.rs new file mode 100644 index 000000000..b51c196a6 --- /dev/null +++ b/core/harness_derive/src/params.rs @@ -0,0 +1,257 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! Parameter detection for test function signatures. +//! +//! Detects special parameters like `harness: &TestHarness` and test fixtures, +//! and distinguishes them from user-defined test_matrix parameters. + +use proc_macro2::Span; +use syn::{FnArg, Ident, Pat, PatIdent, PatType, Signature, Type}; + +/// A detected parameter from the function signature. +#[derive(Debug)] +pub enum DetectedParam { + /// A harness reference: `harness: &TestHarness` + HarnessRef { name: Ident }, + /// A mutable harness reference: `harness: &mut TestHarness` + HarnessMut { name: Ident }, + /// An owned harness: `harness: TestHarness` + HarnessOwned { name: Ident }, + /// A mutable owned harness: `mut harness: TestHarness` + HarnessOwnedMut { name: Ident }, + /// A test fixture parameter: `fixture: PostgresSinkFixture` + Fixture { name: Ident, ty: Box }, + /// A parameter from test_matrix (passed through) + MatrixParam { name: Ident, ty: Box }, +} + +impl DetectedParam { + #[allow(dead_code)] + pub fn name(&self) -> &Ident { + match self { + DetectedParam::HarnessRef { name } + | DetectedParam::HarnessMut { name } + | DetectedParam::HarnessOwned { name } + | DetectedParam::HarnessOwnedMut { name } + | DetectedParam::Fixture { name, .. } + | DetectedParam::MatrixParam { name, .. } => name, + } + } +} + +/// Analyze a function signature and detect parameter types. +pub fn analyze_signature(sig: &Signature) -> syn::Result> { + let mut params = Vec::new(); + + for arg in &sig.inputs { + let FnArg::Typed(PatType { pat, ty, .. }) = arg else { + return Err(syn::Error::new( + Span::call_site(), + "self parameters not supported in test functions", + )); + }; + + let Pat::Ident(PatIdent { + ident, mutability, .. + }) = pat.as_ref() + else { + return Err(syn::Error::new( + Span::call_site(), + "pattern parameters not supported, use simple identifiers", + )); + }; + + let is_mut = mutability.is_some(); + let detected = detect_param_type(ident.clone(), ty, is_mut)?; + params.push(detected); + } + + Ok(params) +} + +fn detect_param_type(name: Ident, ty: &Type, is_mut: bool) -> syn::Result { + let type_str = quote::quote!(#ty).to_string(); + let normalized = type_str.replace(" ", ""); + + if is_harness_mut_ref_type(&normalized) { + return Ok(DetectedParam::HarnessMut { name }); + } + + if is_harness_ref_type(&normalized) { + return Ok(DetectedParam::HarnessRef { name }); + } + + if is_harness_owned_type(&normalized) { + return if is_mut { + Ok(DetectedParam::HarnessOwnedMut { name }) + } else { + Ok(DetectedParam::HarnessOwned { name }) + }; + } + + if is_fixture_type(&normalized) { + return Ok(DetectedParam::Fixture { + name, + ty: Box::new(ty.clone()), + }); + } + + Ok(DetectedParam::MatrixParam { + name, + ty: Box::new(ty.clone()), + }) +} + +fn is_harness_ref_type(normalized: &str) -> bool { + normalized.contains("TestHarness") + && normalized.starts_with("&") + && !normalized.contains("&mut") +} + +fn is_harness_mut_ref_type(normalized: &str) -> bool { + normalized.contains("TestHarness") && normalized.contains("&mut") +} + +fn is_harness_owned_type(normalized: &str) -> bool { + !normalized.starts_with('&') + && (normalized == "TestHarness" || normalized.ends_with("::TestHarness")) +} + +fn is_fixture_type(normalized: &str) -> bool { + normalized.ends_with("Fixture") || normalized.ends_with("Fixture>") +} + +/// Get the matrix parameters (those from test_matrix). +pub fn matrix_params(params: &[DetectedParam]) -> Vec<&DetectedParam> { + params + .iter() + .filter(|p| matches!(p, DetectedParam::MatrixParam { .. })) + .collect() +} + +/// Check if any parameter is a fixture. +pub fn needs_fixtures(params: &[DetectedParam]) -> bool { + params + .iter() + .any(|p| matches!(p, DetectedParam::Fixture { .. })) +} + +/// Get the fixture parameters. +pub fn fixture_params(params: &[DetectedParam]) -> Vec<&DetectedParam> { + params + .iter() + .filter(|p| matches!(p, DetectedParam::Fixture { .. })) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse_sig(s: &str) -> Signature { + let item: syn::ItemFn = syn::parse_str(&format!("{s} {{}}")).unwrap(); + item.sig + } + + #[test] + fn detect_harness_ref() { + let sig = parse_sig("async fn test(harness: &TestHarness)"); + let params = analyze_signature(&sig).unwrap(); + assert_eq!(params.len(), 1); + assert!(matches!(¶ms[0], DetectedParam::HarnessRef { name } if name == "harness")); + } + + #[test] + fn detect_harness_mut() { + let sig = parse_sig("async fn test(harness: &mut TestHarness)"); + let params = analyze_signature(&sig).unwrap(); + assert_eq!(params.len(), 1); + assert!(matches!(¶ms[0], DetectedParam::HarnessMut { name } if name == "harness")); + } + + #[test] + fn detect_harness_owned() { + let sig = parse_sig("async fn test(harness: TestHarness)"); + let params = analyze_signature(&sig).unwrap(); + assert_eq!(params.len(), 1); + assert!(matches!(¶ms[0], DetectedParam::HarnessOwned { name } if name == "harness")); + } + + #[test] + fn detect_harness_owned_mut() { + let sig = parse_sig("async fn test(mut harness: TestHarness)"); + let params = analyze_signature(&sig).unwrap(); + assert_eq!(params.len(), 1); + assert!(matches!(¶ms[0], DetectedParam::HarnessOwnedMut { name } if name == "harness")); + } + + #[test] + fn detect_matrix_param() { + let sig = parse_sig("async fn test(count: u32)"); + let params = analyze_signature(&sig).unwrap(); + assert_eq!(params.len(), 1); + assert!(matches!(¶ms[0], DetectedParam::MatrixParam { name, .. } if name == "count")); + } + + #[test] + fn detect_mixed_params() { + let sig = parse_sig("async fn test(count: u32, harness: &TestHarness)"); + let params = analyze_signature(&sig).unwrap(); + assert_eq!(params.len(), 2); + assert!(matches!(¶ms[0], DetectedParam::MatrixParam { .. })); + assert!(matches!(¶ms[1], DetectedParam::HarnessRef { .. })); + } + + #[test] + fn detect_fixture_param() { + let sig = parse_sig("async fn test(fixture: PostgresSinkFixture)"); + let params = analyze_signature(&sig).unwrap(); + assert_eq!(params.len(), 1); + assert!(matches!(¶ms[0], DetectedParam::Fixture { name, .. } if name == "fixture")); + } + + #[test] + fn detect_fixture_with_generic() { + let sig = parse_sig("async fn test(fixture: Box)"); + let params = analyze_signature(&sig).unwrap(); + assert_eq!(params.len(), 1); + assert!(matches!(¶ms[0], DetectedParam::Fixture { name, .. } if name == "fixture")); + } + + #[test] + fn needs_fixtures_works() { + let sig = parse_sig("async fn test(fixture: RandomSourceFixture)"); + let params = analyze_signature(&sig).unwrap(); + assert!(super::needs_fixtures(¶ms)); + + let sig = parse_sig("async fn test(harness: &TestHarness)"); + let params = analyze_signature(&sig).unwrap(); + assert!(!super::needs_fixtures(¶ms)); + } + + #[test] + fn fixture_params_works() { + let sig = parse_sig( + "async fn test(f1: PostgresFixture, harness: &TestHarness, f2: RandomSourceFixture)", + ); + let params = analyze_signature(&sig).unwrap(); + let fixtures = super::fixture_params(¶ms); + assert_eq!(fixtures.len(), 2); + } +} diff --git a/core/integration/.gitignore b/core/integration/.gitignore deleted file mode 100644 index fcf6a3240..000000000 --- a/core/integration/.gitignore +++ /dev/null @@ -1 +0,0 @@ -test_logs/ diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index f0b743f5a..4805192ce 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -42,6 +42,7 @@ derive_more = { workspace = true } env_logger = { workspace = true } figment = { workspace = true } futures = { workspace = true } +harness_derive = { workspace = true } humantime = { workspace = true } iggy = { workspace = true } iggy_binary_protocol = { workspace = true } diff --git a/core/integration/src/harness/handle/client_builder.rs b/core/integration/src/harness/handle/client_builder.rs new file mode 100644 index 000000000..ef35b99e4 --- /dev/null +++ b/core/integration/src/harness/handle/client_builder.rs @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! Chainable client builder for test harness. +//! +//! Provides a fluent API for creating and configuring clients connected to test servers. +//! +//! # Examples +//! +//! ```ignore +//! // Simple unauthenticated client +//! let client = harness.server().tcp_client().connect().await?; +//! +//! // With root login +//! let client = harness.server().tcp_client().with_root_login().connect().await?; +//! +//! // Custom login +//! let client = harness.server().http_client() +//! .with_login("user", "pass") +//! .connect().await?; +//! ``` + +use crate::harness::config::{AutoLoginConfig, TlsConfig}; +use crate::harness::error::TestBinaryError; +use iggy::http::http_client::HttpClient; +use iggy::prelude::{ + Client, HttpClientConfig, IggyClient, QuicClientConfig, TcpClient, TcpClientConfig, UserClient, + WebSocketClientConfig, +}; +use iggy::quic::quic_client::QuicClient; +use iggy::websocket::websocket_client::WebSocketClient; +use iggy_common::TransportProtocol; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::sync::Arc; + +/// Server connection details needed by `ClientBuilder`. +#[derive(Debug, Clone)] +pub struct ServerConnection { + pub tcp_addr: Option, + pub http_addr: Option, + pub quic_addr: Option, + pub websocket_addr: Option, + pub tls: Option, + pub websocket_tls: Option, + pub tls_ca_cert_path: Option, +} + +/// Chainable client builder. +/// +/// Created via `ServerHandle::tcp_client()`, `http_client()`, etc. +/// Configuration is chainable, connection happens on `connect()`. +#[must_use = "ClientBuilder does nothing until .connect() is called"] +pub struct ClientBuilder { + transport: TransportProtocol, + connection: ServerConnection, + auto_login: Option, + tcp_nodelay: bool, +} + +impl ClientBuilder { + pub(crate) fn new(transport: TransportProtocol, connection: ServerConnection) -> Self { + Self { + transport, + connection, + auto_login: None, + tcp_nodelay: false, + } + } + + /// Enable automatic login as root user after connection. + pub fn with_root_login(mut self) -> Self { + self.auto_login = Some(AutoLoginConfig::root()); + self + } + + /// Enable automatic login with custom credentials after connection. + pub fn with_login(mut self, username: impl Into, password: impl Into) -> Self { + self.auto_login = Some(AutoLoginConfig::new(username, password)); + self + } + + /// Enable TCP_NODELAY (only affects TCP transport). + pub fn with_nodelay(mut self) -> Self { + self.tcp_nodelay = true; + self + } + + /// Connect to the server and optionally perform auto-login. + pub async fn connect(self) -> Result { + let client = match self.transport { + TransportProtocol::Tcp => self.create_tcp_client().await?, + TransportProtocol::Http => self.create_http_client().await?, + TransportProtocol::Quic => self.create_quic_client().await?, + TransportProtocol::WebSocket => self.create_websocket_client().await?, + }; + + if let Some(ref login) = self.auto_login { + client + .login_user(&login.username, &login.password) + .await + .map_err(|e| TestBinaryError::ClientConnection { + transport: self.transport.to_string(), + address: self.get_address_string(), + source: format!("login failed: {e}"), + })?; + } + + Ok(client) + } + + async fn create_tcp_client(&self) -> Result { + let addr = self + .connection + .tcp_addr + .ok_or_else(|| TestBinaryError::InvalidState { + message: "TCP transport not available".to_string(), + })?; + + let tls_enabled = self.connection.tls.is_some(); + let tls_validate = self.connection.tls.as_ref().is_some_and(|t| !t.self_signed); + + let config = TcpClientConfig { + server_address: addr.to_string(), + nodelay: self.tcp_nodelay, + tls_enabled, + tls_domain: "localhost".to_string(), + tls_ca_file: self + .connection + .tls_ca_cert_path + .as_ref() + .map(|p| p.to_string_lossy().to_string()), + tls_validate_certificate: tls_validate, + ..TcpClientConfig::default() + }; + + let client = + TcpClient::create(Arc::new(config)).map_err(|e| TestBinaryError::ClientCreation { + transport: "TCP".to_string(), + address: addr.to_string(), + source: e.to_string(), + })?; + + Client::connect(&client) + .await + .map_err(|e| TestBinaryError::ClientConnection { + transport: "TCP".to_string(), + address: addr.to_string(), + source: e.to_string(), + })?; + + Ok(IggyClient::create( + iggy::prelude::ClientWrapper::Tcp(client), + None, + None, + )) + } + + async fn create_http_client(&self) -> Result { + let addr = self + .connection + .http_addr + .ok_or_else(|| TestBinaryError::InvalidState { + message: "HTTP transport not available".to_string(), + })?; + + let config = HttpClientConfig { + api_url: format!("http://{}", addr), + ..HttpClientConfig::default() + }; + + let client = + HttpClient::create(Arc::new(config)).map_err(|e| TestBinaryError::ClientCreation { + transport: "HTTP".to_string(), + address: addr.to_string(), + source: e.to_string(), + })?; + + Ok(IggyClient::create( + iggy::prelude::ClientWrapper::Http(client), + None, + None, + )) + } + + async fn create_quic_client(&self) -> Result { + let addr = self + .connection + .quic_addr + .ok_or_else(|| TestBinaryError::InvalidState { + message: "QUIC transport not available".to_string(), + })?; + + let config = QuicClientConfig { + server_address: addr.to_string(), + max_idle_timeout: 2_000_000, + ..QuicClientConfig::default() + }; + + let client = + QuicClient::create(Arc::new(config)).map_err(|e| TestBinaryError::ClientCreation { + transport: "QUIC".to_string(), + address: addr.to_string(), + source: e.to_string(), + })?; + + Client::connect(&client) + .await + .map_err(|e| TestBinaryError::ClientConnection { + transport: "QUIC".to_string(), + address: addr.to_string(), + source: e.to_string(), + })?; + + Ok(IggyClient::create( + iggy::prelude::ClientWrapper::Quic(client), + None, + None, + )) + } + + async fn create_websocket_client(&self) -> Result { + let addr = self + .connection + .websocket_addr + .ok_or_else(|| TestBinaryError::InvalidState { + message: "WebSocket transport not available".to_string(), + })?; + + let tls_enabled = self.connection.websocket_tls.is_some(); + let tls_validate = self + .connection + .websocket_tls + .as_ref() + .is_some_and(|t| !t.self_signed); + + let config = WebSocketClientConfig { + server_address: addr.to_string(), + tls_enabled, + tls_domain: "localhost".to_string(), + tls_ca_file: self + .connection + .tls_ca_cert_path + .as_ref() + .map(|p| p.to_string_lossy().to_string()), + tls_validate_certificate: tls_validate, + ..WebSocketClientConfig::default() + }; + + let client = WebSocketClient::create(Arc::new(config)).map_err(|e| { + TestBinaryError::ClientCreation { + transport: "WebSocket".to_string(), + address: addr.to_string(), + source: e.to_string(), + } + })?; + + Client::connect(&client) + .await + .map_err(|e| TestBinaryError::ClientConnection { + transport: "WebSocket".to_string(), + address: addr.to_string(), + source: e.to_string(), + })?; + + Ok(IggyClient::create( + iggy::prelude::ClientWrapper::WebSocket(client), + None, + None, + )) + } + + fn get_address_string(&self) -> String { + match self.transport { + TransportProtocol::Tcp => self + .connection + .tcp_addr + .map(|a| a.to_string()) + .unwrap_or_default(), + TransportProtocol::Http => self + .connection + .http_addr + .map(|a| a.to_string()) + .unwrap_or_default(), + TransportProtocol::Quic => self + .connection + .quic_addr + .map(|a| a.to_string()) + .unwrap_or_default(), + TransportProtocol::WebSocket => self + .connection + .websocket_addr + .map(|a| a.to_string()) + .unwrap_or_default(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn dummy_connection() -> ServerConnection { + ServerConnection { + tcp_addr: Some("127.0.0.1:8080".parse().unwrap()), + http_addr: Some("127.0.0.1:8081".parse().unwrap()), + quic_addr: Some("127.0.0.1:8082".parse().unwrap()), + websocket_addr: Some("127.0.0.1:8083".parse().unwrap()), + tls: None, + websocket_tls: None, + tls_ca_cert_path: None, + } + } + + #[test] + fn test_builder_chainable() { + let builder = ClientBuilder::new(TransportProtocol::Tcp, dummy_connection()) + .with_root_login() + .with_nodelay(); + + assert!(builder.auto_login.is_some()); + assert!(builder.tcp_nodelay); + } + + #[test] + fn test_with_login() { + let builder = ClientBuilder::new(TransportProtocol::Http, dummy_connection()) + .with_login("user", "pass"); + + let login = builder.auto_login.unwrap(); + assert_eq!(login.username, "user"); + assert_eq!(login.password, "pass"); + } +} diff --git a/core/integration/src/harness/handle/connectors_runtime.rs b/core/integration/src/harness/handle/connectors_runtime.rs index c414c78f0..941353886 100644 --- a/core/integration/src/harness/handle/connectors_runtime.rs +++ b/core/integration/src/harness/handle/connectors_runtime.rs @@ -178,6 +178,12 @@ impl TestBinary for ConnectorsRuntimeHandle { })?; self.child_handle = Some(child); + // Release port reservation immediately after spawn to avoid SO_REUSEPORT + // load-balancing conflicts during health checks. + if let Some(reserver) = self.port_reserver.take() { + reserver.release(); + } + Ok(()) } @@ -239,9 +245,6 @@ impl IggyServerDependent for ConnectorsRuntimeHandle { for retry in 0..common::DEFAULT_HEALTH_CHECK_RETRIES { match client.get(&http_address).send().await { Ok(_) => { - if let Some(reserver) = self.port_reserver.take() { - reserver.release(); - } return Ok(()); } Err(_) => { diff --git a/core/integration/src/harness/handle/mcp.rs b/core/integration/src/harness/handle/mcp.rs index 3f587e997..145d39ecd 100644 --- a/core/integration/src/harness/handle/mcp.rs +++ b/core/integration/src/harness/handle/mcp.rs @@ -204,6 +204,12 @@ impl TestBinary for McpHandle { })?; self.child_handle = Some(child); + // Release port reservation immediately after spawn to avoid SO_REUSEPORT + // load-balancing conflicts during health checks. + if let Some(reserver) = self.port_reserver.take() { + reserver.release(); + } + Ok(()) } @@ -252,9 +258,6 @@ impl IggyServerDependent for McpHandle { for retry in 0..MCP_HEALTH_CHECK_RETRIES { match client.get(&http_address).send().await { Ok(_) => { - if let Some(reserver) = self.port_reserver.take() { - reserver.release(); - } return Ok(()); } Err(_) => { diff --git a/core/integration/src/harness/handle/mod.rs b/core/integration/src/harness/handle/mod.rs index effa35b6c..d98a4dfc6 100644 --- a/core/integration/src/harness/handle/mod.rs +++ b/core/integration/src/harness/handle/mod.rs @@ -18,12 +18,14 @@ */ mod client; +mod client_builder; pub mod common; mod connectors_runtime; mod mcp; mod server; pub use client::ClientHandle; +pub use client_builder::ClientBuilder; pub use connectors_runtime::ConnectorsRuntimeHandle; pub use mcp::{McpClient, McpHandle}; pub use server::ServerHandle; diff --git a/core/integration/src/harness/handle/server.rs b/core/integration/src/harness/handle/server.rs index 02f6713f7..10a651496 100644 --- a/core/integration/src/harness/handle/server.rs +++ b/core/integration/src/harness/handle/server.rs @@ -17,14 +17,15 @@ * under the License. */ +use super::client_builder::{ClientBuilder, ServerConnection}; use crate::harness::config::{IpAddrKind, TestServerConfig}; use crate::harness::context::TestContext; use crate::harness::error::TestBinaryError; use crate::harness::port_reserver::PortReserver; use crate::harness::traits::{Restartable, TestBinary}; use assert_cmd::prelude::CommandCargoExt; -use iggy::prelude::DEFAULT_ROOT_PASSWORD; -use iggy::prelude::DEFAULT_ROOT_USERNAME; +use iggy::prelude::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME}; +use iggy_common::TransportProtocol; use rand::Rng as _; use std::collections::HashMap; use std::fs::{self, File, OpenOptions}; @@ -73,6 +74,7 @@ pub struct ServerHandle { watchdog_stop: Arc, generated_cert_dir: Option, port_reserver: Option, + test_transport: Option, } impl std::fmt::Debug for ServerHandle { @@ -146,6 +148,51 @@ impl ServerHandle { super::common::collect_logs(&self.stdout_path, &self.stderr_path) } + /// Returns a `ClientBuilder` using the test transport. + /// + /// Returns an error if no test transport is configured. + pub fn test_client(&self) -> Result { + let transport = self + .test_transport + .ok_or_else(|| TestBinaryError::InvalidState { + message: "No test transport configured".to_string(), + })?; + Ok(self.client_builder(transport)) + } + + /// Returns a TCP `ClientBuilder`. Call `.connect()` to create the client. + pub fn tcp_client(&self) -> Result { + Ok(self.client_builder(TransportProtocol::Tcp)) + } + + /// Returns an HTTP `ClientBuilder`. Call `.connect()` to create the client. + pub fn http_client(&self) -> Result { + Ok(self.client_builder(TransportProtocol::Http)) + } + + /// Returns a QUIC `ClientBuilder`. Call `.connect()` to create the client. + pub fn quic_client(&self) -> Result { + Ok(self.client_builder(TransportProtocol::Quic)) + } + + /// Returns a WebSocket `ClientBuilder`. Call `.connect()` to create the client. + pub fn websocket_client(&self) -> Result { + Ok(self.client_builder(TransportProtocol::WebSocket)) + } + + fn client_builder(&self, transport: TransportProtocol) -> ClientBuilder { + let connection = ServerConnection { + tcp_addr: self.addrs.tcp, + http_addr: self.addrs.http, + quic_addr: self.addrs.quic, + websocket_addr: self.addrs.websocket, + tls: self.config.tls.clone(), + websocket_tls: self.config.websocket_tls.clone(), + tls_ca_cert_path: self.tls_ca_cert_path(), + }; + ClientBuilder::new(transport, connection) + } + fn build_envs(&mut self) -> Result<(), TestBinaryError> { // Pass through IGGY_* env vars from parent process, except those critical for test isolation. const PROTECTED_PREFIXES: &[&str] = &[ @@ -263,38 +310,67 @@ impl ServerHandle { } fn set_protocol_addresses(&mut self) -> Result<(), TestBinaryError> { - let reserver = PortReserver::reserve(self.config.ip_kind, &self.config)?; - let addrs = reserver.addresses(); + // Cluster mode: port reserver and addresses are pre-set by builder + if self.port_reserver.is_some() { + debug_assert!( + self.addrs.tcp.is_some() + || self.addrs.http.is_some() + || self.addrs.quic.is_some() + || self.addrs.websocket.is_some(), + "port_reserver set but no addresses configured" + ); + return Ok(()); + } - if let Some(tcp) = addrs.tcp { - if !self.envs.contains_key("IGGY_TCP_ADDRESS") { + // Restart case: reuse existing addresses to maintain consistency + if self.addrs.tcp.is_some() + || self.addrs.http.is_some() + || self.addrs.quic.is_some() + || self.addrs.websocket.is_some() + { + if let Some(tcp) = self.addrs.tcp { self.envs .insert("IGGY_TCP_ADDRESS".to_string(), tcp.to_string()); } + if let Some(http) = self.addrs.http { + self.envs + .insert("IGGY_HTTP_ADDRESS".to_string(), http.to_string()); + } + if let Some(quic) = self.addrs.quic { + self.envs + .insert("IGGY_QUIC_ADDRESS".to_string(), quic.to_string()); + } + if let Some(websocket) = self.addrs.websocket { + self.envs + .insert("IGGY_WEBSOCKET_ADDRESS".to_string(), websocket.to_string()); + } + return Ok(()); + } + + let reserver = PortReserver::reserve(self.config.ip_kind, &self.config)?; + let addrs = reserver.addresses(); + + if let Some(tcp) = addrs.tcp { + self.envs + .insert("IGGY_TCP_ADDRESS".to_string(), tcp.to_string()); self.addrs.tcp = Some(tcp); } if let Some(http) = addrs.http { - if !self.envs.contains_key("IGGY_HTTP_ADDRESS") { - self.envs - .insert("IGGY_HTTP_ADDRESS".to_string(), http.to_string()); - } + self.envs + .insert("IGGY_HTTP_ADDRESS".to_string(), http.to_string()); self.addrs.http = Some(http); } if let Some(quic) = addrs.quic { - if !self.envs.contains_key("IGGY_QUIC_ADDRESS") { - self.envs - .insert("IGGY_QUIC_ADDRESS".to_string(), quic.to_string()); - } + self.envs + .insert("IGGY_QUIC_ADDRESS".to_string(), quic.to_string()); self.addrs.quic = Some(quic); } if let Some(websocket) = addrs.websocket { - if !self.envs.contains_key("IGGY_WEBSOCKET_ADDRESS") { - self.envs - .insert("IGGY_WEBSOCKET_ADDRESS".to_string(), websocket.to_string()); - } + self.envs + .insert("IGGY_WEBSOCKET_ADDRESS".to_string(), websocket.to_string()); self.addrs.websocket = Some(websocket); } @@ -323,11 +399,6 @@ impl ServerHandle { } if config_path.exists() { - // Server has written config file - it has bound to ports successfully. - // Release pre-reserved ports so server has exclusive access. - if let Some(reserver) = self.port_reserver.take() { - reserver.release(); - } return Ok(()); } @@ -407,6 +478,47 @@ impl ServerHandle { } } +impl ServerHandle { + /// Create a server handle with custom ID and cluster configuration. + pub fn with_cluster_config( + config: TestServerConfig, + context: Arc, + server_id: u32, + cluster_envs: HashMap, + ) -> Self { + Self { + server_id, + config, + context, + envs: cluster_envs, + child_handle: None, + addrs: ServerProtocolAddr::empty(), + stdout_path: None, + stderr_path: None, + watchdog_handle: None, + watchdog_stop: Arc::new(AtomicBool::new(false)), + generated_cert_dir: None, + port_reserver: None, + test_transport: None, + } + } + + /// Set a pre-reserved port reserver (used by cluster builder). + pub fn set_port_reserver(&mut self, reserver: crate::harness::port_reserver::PortReserver) { + let addrs = reserver.addresses(); + self.addrs.tcp = addrs.tcp; + self.addrs.http = addrs.http; + self.addrs.quic = addrs.quic; + self.addrs.websocket = addrs.websocket; + self.port_reserver = Some(reserver); + } + + /// Set the test transport (used by harness builder). + pub fn set_test_transport(&mut self, transport: iggy_common::TransportProtocol) { + self.test_transport = Some(transport); + } +} + impl TestBinary for ServerHandle { type Config = TestServerConfig; @@ -424,6 +536,7 @@ impl TestBinary for ServerHandle { watchdog_stop: Arc::new(AtomicBool::new(false)), generated_cert_dir: None, port_reserver: None, + test_transport: None, } } @@ -475,6 +588,11 @@ impl TestBinary for ServerHandle { command.env("IGGY_SYSTEM_PATH", data_path.display().to_string()); command.envs(&self.envs); + // TODO(hubcio): Remove --follower flag when proper clustering is implemented + if self.server_id > 0 { + command.arg("--follower"); + } + let verbose = std::env::var(TEST_VERBOSITY_ENV_VAR).is_ok() || self.envs.contains_key(TEST_VERBOSITY_ENV_VAR); @@ -511,6 +629,13 @@ impl TestBinary for ServerHandle { self.watchdog_stop = Arc::new(AtomicBool::new(false)); self.wait_for_server_ready()?; + + // Release port reservation after server has written config file (bound to ports). + // This avoids SO_REUSEPORT load-balancing conflicts during startup. + if let Some(reserver) = self.port_reserver.take() { + reserver.release(); + } + self.start_watchdog(); Ok(()) diff --git a/core/integration/src/harness/helpers.rs b/core/integration/src/harness/helpers.rs index 45704549a..0b712644b 100644 --- a/core/integration/src/harness/helpers.rs +++ b/core/integration/src/harness/helpers.rs @@ -27,11 +27,10 @@ use iggy::prelude::{ pub const USER_PASSWORD: &str = "secret"; /// Login as root user. -pub async fn login_root(client: &impl UserClient) -> IdentityInfo { +pub async fn login_root(client: &impl UserClient) -> Result { client .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) .await - .expect("Failed to login as root") } /// Login as a specific user with the default test password. diff --git a/core/integration/src/harness/mod.rs b/core/integration/src/harness/mod.rs index c3b4a6a6a..2d5f4e5d1 100644 --- a/core/integration/src/harness/mod.rs +++ b/core/integration/src/harness/mod.rs @@ -60,7 +60,9 @@ pub use config::{ pub use context::{TestContext, get_test_directory}; pub use error::TestBinaryError; -pub use handle::{ClientHandle, ConnectorsRuntimeHandle, McpClient, McpHandle, ServerHandle}; +pub use handle::{ + ClientBuilder, ClientHandle, ConnectorsRuntimeHandle, McpClient, McpHandle, ServerHandle, +}; pub use orchestrator::{TestHarness, TestHarnessBuilder, TestLogs}; pub use traits::{IggyServerDependent, Restartable, TestBinary}; diff --git a/core/integration/src/harness/orchestrator/builder.rs b/core/integration/src/harness/orchestrator/builder.rs index a8991563b..ab9dfe156 100644 --- a/core/integration/src/harness/orchestrator/builder.rs +++ b/core/integration/src/harness/orchestrator/builder.rs @@ -18,12 +18,17 @@ */ use super::harness::TestHarness; -use crate::harness::config::{ClientConfig, ConnectorsRuntimeConfig, McpConfig, TestServerConfig}; +use crate::harness::config::{ + ClientConfig, ConnectorsRuntimeConfig, IpAddrKind, McpConfig, TestServerConfig, +}; use crate::harness::context::TestContext; use crate::harness::error::TestBinaryError; use crate::harness::handle::{ConnectorsRuntimeHandle, McpHandle, ServerHandle}; +use crate::harness::port_reserver::ClusterPortReserver; use crate::harness::traits::TestBinary; +use std::collections::HashMap; use std::sync::Arc; +use uuid::Uuid; /// Builder for TestHarness with fluent configuration API. pub struct TestHarnessBuilder { @@ -35,6 +40,7 @@ pub struct TestHarnessBuilder { primary_client_config: Option, clients: Vec, cleanup: bool, + cluster_node_count: Option, } impl Default for TestHarnessBuilder { @@ -48,6 +54,7 @@ impl Default for TestHarnessBuilder { primary_client_config: None, clients: Vec::new(), cleanup: true, + cluster_node_count: None, } } } @@ -176,15 +183,22 @@ impl TestHarnessBuilder { self } + /// Configure a multi-node cluster. + /// + /// When set, creates N server nodes configured as a cluster. + /// The first node starts as leader candidate, others as followers. + pub fn cluster_nodes(mut self, count: usize) -> Self { + self.cluster_node_count = Some(count); + self + } + /// Build the TestHarness. Does NOT start any binaries. pub fn build(self) -> Result { let mut context = TestContext::new(self.test_name, self.cleanup)?; context.ensure_created()?; let context = Arc::new(context); - let server = self - .server_config - .map(|config| ServerHandle::with_config(config, context.clone())); + let servers = build_servers(self.server_config, self.cluster_node_count, &context)?; let mcp = self .mcp_config @@ -196,7 +210,7 @@ impl TestHarnessBuilder { Ok(TestHarness { context, - server, + servers, mcp, connectors_runtime, clients: Vec::new(), @@ -208,6 +222,132 @@ impl TestHarnessBuilder { } } +fn build_servers( + server_config: Option, + cluster_node_count: Option, + context: &Arc, +) -> Result, TestBinaryError> { + let Some(config) = server_config else { + return Ok(Vec::new()); + }; + + let node_count = cluster_node_count.unwrap_or(1); + + if node_count == 1 { + return Ok(vec![ServerHandle::with_config(config, context.clone())]); + } + + // Multi-node cluster: pre-reserve all ports + let cluster_ports = ClusterPortReserver::reserve(node_count, config.ip_kind, &config)?; + let all_addrs = cluster_ports.all_addresses(); + let cluster_name = format!("test-cluster-{}", Uuid::new_v4()); + + let mut servers = Vec::with_capacity(node_count); + for (i, (addrs, reserver)) in all_addrs + .iter() + .zip(cluster_ports.into_reservers()) + .enumerate() + { + let mut cluster_envs = build_cluster_envs(i, &cluster_name, &all_addrs, config.ip_kind); + + // Inject the pre-reserved port addresses + if let Some(tcp) = addrs.tcp { + cluster_envs.insert("IGGY_TCP_ADDRESS".to_string(), tcp.to_string()); + } + if let Some(http) = addrs.http { + cluster_envs.insert("IGGY_HTTP_ADDRESS".to_string(), http.to_string()); + } + if let Some(quic) = addrs.quic { + cluster_envs.insert("IGGY_QUIC_ADDRESS".to_string(), quic.to_string()); + } + if let Some(websocket) = addrs.websocket { + cluster_envs.insert("IGGY_WEBSOCKET_ADDRESS".to_string(), websocket.to_string()); + } + + let mut server = ServerHandle::with_cluster_config( + config.clone(), + context.clone(), + i as u32, + cluster_envs, + ); + server.set_port_reserver(reserver); + servers.push(server); + } + + Ok(servers) +} + +fn build_cluster_envs( + node_index: usize, + cluster_name: &str, + all_addrs: &[crate::harness::port_reserver::ProtocolAddresses], + ip_kind: IpAddrKind, +) -> HashMap { + let mut envs = HashMap::new(); + + let loopback = match ip_kind { + IpAddrKind::V4 => "127.0.0.1", + IpAddrKind::V6 => "::1", + }; + + envs.insert("IGGY_CLUSTER_ENABLED".to_string(), "true".to_string()); + envs.insert("IGGY_CLUSTER_NAME".to_string(), cluster_name.to_string()); + envs.insert( + "IGGY_CLUSTER_NODE_CURRENT_NAME".to_string(), + format!("node-{}", node_index), + ); + envs.insert( + "IGGY_CLUSTER_NODE_CURRENT_IP".to_string(), + loopback.to_string(), + ); + + // Add other nodes' addresses + let mut other_index = 0; + for (i, addrs) in all_addrs.iter().enumerate() { + if i == node_index { + continue; + } + + envs.insert( + format!("IGGY_CLUSTER_NODE_OTHERS_{other_index}_NAME"), + format!("node-{i}"), + ); + envs.insert( + format!("IGGY_CLUSTER_NODE_OTHERS_{other_index}_IP"), + loopback.to_string(), + ); + + if let Some(tcp) = addrs.tcp { + envs.insert( + format!("IGGY_CLUSTER_NODE_OTHERS_{other_index}_PORTS_TCP"), + tcp.port().to_string(), + ); + } + if let Some(http) = addrs.http { + envs.insert( + format!("IGGY_CLUSTER_NODE_OTHERS_{other_index}_PORTS_HTTP"), + http.port().to_string(), + ); + } + if let Some(quic) = addrs.quic { + envs.insert( + format!("IGGY_CLUSTER_NODE_OTHERS_{other_index}_PORTS_QUIC"), + quic.port().to_string(), + ); + } + if let Some(websocket) = addrs.websocket { + envs.insert( + format!("IGGY_CLUSTER_NODE_OTHERS_{other_index}_PORTS_WEBSOCKET"), + websocket.port().to_string(), + ); + } + + other_index += 1; + } + + envs +} + #[cfg(test)] mod tests { use super::*; @@ -228,7 +368,7 @@ mod tests { .build() .unwrap(); - assert!(harness.server.is_some()); + assert_eq!(harness.servers.len(), 1); assert!(!harness.started); assert_eq!(harness.client_configs.len(), 1); } @@ -242,7 +382,7 @@ mod tests { .build() .unwrap(); - assert!(harness.server.is_some()); + assert_eq!(harness.servers.len(), 1); assert!(harness.mcp.is_some()); } @@ -287,6 +427,6 @@ mod tests { .build() .unwrap(); - assert!(harness.server.is_some()); + assert_eq!(harness.servers.len(), 1); } } diff --git a/core/integration/src/harness/orchestrator/harness.rs b/core/integration/src/harness/orchestrator/harness.rs index d187eb74d..2c32ef0f2 100644 --- a/core/integration/src/harness/orchestrator/harness.rs +++ b/core/integration/src/harness/orchestrator/harness.rs @@ -57,7 +57,7 @@ pub(super) struct TlsSettings { /// Orchestrates test binaries and clients for integration tests. pub struct TestHarness { pub(super) context: Arc, - pub(super) server: Option, + pub(super) servers: Vec, pub(super) mcp: Option, pub(super) connectors_runtime: Option, pub(super) clients: Vec, @@ -72,7 +72,7 @@ impl std::fmt::Debug for TestHarness { f.debug_struct("TestHarness") .field("test_name", &self.context.test_name()) .field("started", &self.started) - .field("has_server", &self.server.is_some()) + .field("server_count", &self.servers.len()) .field("has_mcp", &self.mcp.is_some()) .field("has_connectors_runtime", &self.connectors_runtime.is_some()) .field("client_count", &self.clients.len()) @@ -119,7 +119,7 @@ impl TestHarness { return Err(TestBinaryError::AlreadyStarted); } - if let Some(ref mut server) = self.server { + for server in &mut self.servers { server.start()?; } @@ -138,7 +138,7 @@ impl TestHarness { } async fn start_dependents(&mut self) -> Result<(), TestBinaryError> { - let tcp_addr = self.server.as_ref().and_then(|s| s.tcp_addr()); + let tcp_addr = self.servers.first().and_then(|s| s.tcp_addr()); if let Some(ref mut mcp) = self.mcp { if let Some(addr) = tcp_addr { @@ -174,7 +174,7 @@ impl TestHarness { mcp.stop()?; } - if let Some(ref mut server) = self.server { + for server in self.servers.iter_mut().rev() { server.stop()?; } @@ -182,17 +182,17 @@ impl TestHarness { Ok(()) } - /// Restart the server and reconnect all clients. + /// Restart the primary server and reconnect all clients. pub async fn restart_server(&mut self) -> Result<(), TestBinaryError> { - let Some(ref mut server) = self.server else { + if self.servers.is_empty() { return Err(TestBinaryError::MissingServer); - }; + } for client in &mut self.clients { client.disconnect().await; } - server.restart()?; + self.servers[0].restart()?; self.update_client_addresses(); for client in &mut self.clients { @@ -202,14 +202,43 @@ impl TestHarness { Ok(()) } - /// Get reference to the server handle. + /// Get reference to the first (primary) server handle. pub fn server(&self) -> &ServerHandle { - self.server.as_ref().expect("Server not configured") + self.servers.first().expect("No servers configured") } - /// Get mutable reference to the server handle. + /// Get mutable reference to the first (primary) server handle. pub fn server_mut(&mut self) -> &mut ServerHandle { - self.server.as_mut().expect("Server not configured") + self.servers.first_mut().expect("No servers configured") + } + + /// Get reference to a specific server node by index (for clusters). + pub fn node(&self, index: usize) -> &ServerHandle { + self.servers.get(index).unwrap_or_else(|| { + panic!( + "Node {} not configured (cluster has {} nodes)", + index, + self.servers.len() + ) + }) + } + + /// Get mutable reference to a specific server node by index (for clusters). + pub fn node_mut(&mut self, index: usize) -> &mut ServerHandle { + let len = self.servers.len(); + self.servers + .get_mut(index) + .unwrap_or_else(|| panic!("Node {} not configured (cluster has {} nodes)", index, len)) + } + + /// Get reference to all servers. + pub fn all_servers(&self) -> &[ServerHandle] { + &self.servers + } + + /// Get the number of server nodes (1 for single server, N for cluster). + pub fn cluster_size(&self) -> usize { + self.servers.len() } /// Get the first client (panics if no clients configured). @@ -258,7 +287,7 @@ impl TestHarness { /// Collect logs from all binaries. pub fn collect_logs(&self) -> TestLogs { TestLogs { - server: self.server.as_ref().map(|s| s.collect_logs()), + server: self.servers.first().map(|s| s.collect_logs()), mcp: self.mcp.as_ref().map(|m| m.collect_logs()), connectors_runtime: self.connectors_runtime.as_ref().map(|c| c.collect_logs()), } @@ -266,7 +295,7 @@ impl TestHarness { /// Get a TCP client factory for creating additional clients. pub fn tcp_client_factory(&self) -> Option { - let server = self.server.as_ref()?; + let server = self.servers.first()?; let addr = server.tcp_addr()?; let config = self.find_client_config(TransportProtocol::Tcp); let tls = self.extract_tls_settings(config, server); @@ -283,8 +312,8 @@ impl TestHarness { /// Get an HTTP client factory for creating additional clients. pub fn http_client_factory(&self) -> Option { - self.server - .as_ref() + self.servers + .first() .and_then(|s| s.http_addr()) .map(|addr| HttpClientFactory { server_addr: addr.to_string(), @@ -293,8 +322,8 @@ impl TestHarness { /// Get a QUIC client factory for creating additional clients. pub fn quic_client_factory(&self) -> Option { - self.server - .as_ref() + self.servers + .first() .and_then(|s| s.quic_addr()) .map(|addr| QuicClientFactory { server_addr: addr.to_string(), @@ -303,7 +332,7 @@ impl TestHarness { /// Get a WebSocket client factory for creating additional clients. pub fn websocket_client_factory(&self) -> Option { - let server = self.server.as_ref()?; + let server = self.servers.first()?; let addr = server.websocket_addr()?; let config = self.find_client_config(TransportProtocol::WebSocket); let tls = self.extract_tls_settings(config, server); @@ -554,7 +583,7 @@ impl TestHarness { } pub(super) async fn create_clients(&mut self) -> Result<(), TestBinaryError> { - let Some(ref server) = self.server else { + let Some(server) = self.servers.first() else { return Ok(()); }; @@ -588,7 +617,7 @@ impl TestHarness { } fn update_client_addresses(&mut self) { - let Some(ref server) = self.server else { + let Some(server) = self.servers.first() else { return; }; diff --git a/core/integration/src/harness/port_reserver.rs b/core/integration/src/harness/port_reserver.rs index 5e0f93a12..cc59a6d02 100644 --- a/core/integration/src/harness/port_reserver.rs +++ b/core/integration/src/harness/port_reserver.rs @@ -189,6 +189,36 @@ pub struct ProtocolAddresses { pub websocket: Option, } +/// Pre-allocated ports for a cluster of servers. +pub struct ClusterPortReserver { + nodes: Vec, +} + +impl ClusterPortReserver { + /// Reserve ports for all nodes in a cluster. + pub fn reserve( + node_count: usize, + ip_kind: IpAddrKind, + config: &TestServerConfig, + ) -> Result { + let mut nodes = Vec::with_capacity(node_count); + for _ in 0..node_count { + nodes.push(PortReserver::reserve(ip_kind, config)?); + } + Ok(Self { nodes }) + } + + /// Get the addresses for all nodes. + pub fn all_addresses(&self) -> Vec { + self.nodes.iter().map(|n| n.addresses()).collect() + } + + /// Take ownership of individual port reservers for each node. + pub fn into_reservers(self) -> Vec { + self.nodes + } +} + impl PortReserver { /// Reserve ports for all protocols enabled in the config. pub fn reserve( diff --git a/core/integration/src/lib.rs b/core/integration/src/lib.rs index 0359462aa..1a4b5c4bb 100644 --- a/core/integration/src/lib.rs +++ b/core/integration/src/lib.rs @@ -35,3 +35,14 @@ pub mod test_mcp_server; #[allow(deprecated)] pub mod test_server; pub mod test_tls_utils; + +pub use harness_derive::iggy_harness; + +#[doc(hidden)] +pub mod __macro_support { + pub use crate::harness::{ + ClientConfig, McpClient, McpConfig, TestHarness, TestServerConfig, TlsConfig, + }; + pub use iggy::prelude::ClientWrapper; + pub use iggy_common::TransportProtocol; +} diff --git a/core/integration/tests/cluster/mod.rs b/core/integration/tests/cluster/mod.rs new file mode 100644 index 000000000..8387a2b57 --- /dev/null +++ b/core/integration/tests/cluster/mod.rs @@ -0,0 +1,39 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy::prelude::*; +use integration::iggy_harness; + +#[iggy_harness(cluster_nodes = [3, 5, 7], + test_client_transport = [ Tcp, Http, WebSocket, Quic, TcpTlsSelfSigned, TcpTlsGenerated, WebSocketTlsSelfSigned, WebSocketTlsGenerated], + server(segment.size = ["1MiB", "2MiB"], + segment.cache_indexes = ["open_segment", "all"]))] +#[ignore] +async fn should_ping_all_cluster_nodes(harness: TestHarness) { + for i in 0..harness.cluster_size() { + let client = harness + .node(i) + .test_client() + .unwrap() + .with_root_login() + .connect() + .await + .unwrap(); + client.ping().await.unwrap(); + } +} diff --git a/core/integration/tests/mcp/mod.rs b/core/integration/tests/mcp/mod.rs index 0fadd7ea3..9b2d1c4ae 100644 --- a/core/integration/tests/mcp/mod.rs +++ b/core/integration/tests/mcp/mod.rs @@ -16,493 +16,580 @@ * under the License. */ -use iggy::prelude::{Client, DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME, IggyClient}; -use iggy_binary_protocol::{ - ConsumerGroupClient, ConsumerOffsetClient, MessageClient, PersonalAccessTokenClient, - StreamClient, TopicClient, UserClient, -}; use iggy_common::{ - ClientInfo, ClientInfoDetails, ClusterMetadata, Consumer, ConsumerGroup, ConsumerGroupDetails, - ConsumerOffsetInfo, Identifier, IggyExpiry, IggyMessage, MaxTopicSize, Partitioning, - PersonalAccessTokenExpiry, PersonalAccessTokenInfo, PolledMessages, RawPersonalAccessToken, - Snapshot, Stats, Stream, StreamDetails, Topic, TopicDetails, UserInfo, UserInfoDetails, - UserStatus, + ClientInfo, ClientInfoDetails, ClusterMetadata, ConsumerGroup, ConsumerGroupDetails, + ConsumerOffsetInfo, PersonalAccessTokenExpiry, PersonalAccessTokenInfo, PolledMessages, + RawPersonalAccessToken, Snapshot, Stats, Stream, StreamDetails, Topic, TopicDetails, UserInfo, + UserInfoDetails, }; use integration::{ - test_mcp_server::{CONSUMER_NAME, McpClient, TestMcpServer}, - test_server::{IpAddrKind, TestServer}, + harness::{McpClient, seeds}, + iggy_harness, }; -use lazy_static::lazy_static; use rmcp::{ - ServiceError, - model::{CallToolRequestParams, CallToolResult, ListToolsResult}, + model::CallToolRequestParams, serde::de::DeserializeOwned, serde_json::{self, json}, }; -use serial_test::parallel; -use std::collections::HashMap; - -const STREAM_NAME: &str = "test_stream"; -const TOPIC_NAME: &str = "test_topic"; -const MESSAGE_PAYLOAD: &str = "test_message"; -const CONSUMER_GROUP_NAME: &str = "test_consumer_group"; -const PERSONAL_ACCESS_TOKEN_NAME: &str = "test_personal_access_token"; -const USER_NAME: &str = "test_user"; -const USER_PASSWORD: &str = "secret"; - -lazy_static! { - static ref STREAM_ID: Identifier = - Identifier::from_str_value(STREAM_NAME).expect("Failed to create stream ID"); - static ref TOPIC_ID: Identifier = - Identifier::from_str_value(TOPIC_NAME).expect("Failed to create topic ID"); - static ref CONSUMER_GROUP_ID: Identifier = - Identifier::from_str_value(CONSUMER_GROUP_NAME).expect("Failed to create group ID"); -} - -#[tokio::test] -#[parallel] -async fn mcp_server_should_list_tools() { - let infra = setup().await; - let client = infra.mcp_client; - let tools = client.list_tools().await.expect("Failed to list tools"); + +async fn invoke( + client: &McpClient, + method: &str, + data: Option, +) -> T { + let mut result = client + .call_tool(CallToolRequestParams { + name: method.to_owned().into(), + arguments: data.and_then(|v| v.as_object().cloned()), + task: None, + meta: None, + }) + .await + .unwrap_or_else(|e| panic!("Failed to invoke {method}: {e}")); + + let content = result.content.remove(0); + let text = content + .as_text() + .unwrap_or_else(|| panic!("Expected text response for {method}")); + serde_json::from_str(&text.text) + .unwrap_or_else(|e| panic!("Failed to parse {method} response: {e}")) +} + +async fn invoke_empty(client: &McpClient, method: &str, data: Option) { + let result = client + .call_tool(CallToolRequestParams { + name: method.to_owned().into(), + arguments: data.and_then(|v| v.as_object().cloned()), + task: None, + meta: None, + }) + .await + .unwrap_or_else(|e| panic!("Failed to invoke {method}: {e}")); + + assert!(!result.is_error.unwrap_or(false), "{method} returned error"); +} + +#[iggy_harness(server(mcp))] +async fn should_list_tools(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + let tools = mcp_client + .list_tools(Default::default()) + .await + .expect("Failed to list tools"); assert!(!tools.tools.is_empty()); - let tools_count = tools.tools.len(); - assert_eq!(tools_count, 41); + assert_eq!(tools.tools.len(), 41); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_handle_ping() { - assert_empty_response("ping", None).await; +#[iggy_harness(server(mcp))] +async fn should_handle_ping(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + invoke_empty(&mcp_client, "ping", None).await; } -#[tokio::test] -#[parallel] -async fn mcp_server_should_return_cluster_metadata() { - assert_response::("get_cluster_metadata", None, |cluster| { - assert!(!cluster.name.is_empty()); - assert_eq!(cluster.nodes.len(), 2); - }) - .await; +#[iggy_harness(server(cluster.enabled = true, mcp))] +async fn should_return_cluster_metadata(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + let cluster: ClusterMetadata = invoke(&mcp_client, "get_cluster_metadata", None).await; + + assert!(!cluster.name.is_empty()); + assert_eq!(cluster.nodes.len(), 2); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_return_list_of_streams() { - assert_response::>("get_streams", None, |streams| { - assert_eq!(streams.len(), 1); - let stream = &streams[0]; - assert_eq!(&stream.name, STREAM_NAME); - assert_eq!(&stream.topics_count, &1); - }) - .await; +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_return_list_of_streams(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + let streams: Vec = invoke(&mcp_client, "get_streams", None).await; + + assert_eq!(streams.len(), 1); + assert_eq!(streams[0].name, seeds::names::STREAM); + assert_eq!(streams[0].topics_count, 1); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_return_stream_details() { - assert_response::( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_return_stream_details(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + let stream: StreamDetails = invoke( + &mcp_client, "get_stream", - Some(json!({"stream_id": STREAM_NAME})), - |stream| { - assert_eq!(stream.name, STREAM_NAME); - assert_eq!(stream.topics_count, 1); - assert_eq!(stream.messages_count, 1); - }, + Some(json!({"stream_id": seeds::names::STREAM})), ) .await; + + assert_eq!(stream.name, seeds::names::STREAM); + assert_eq!(stream.topics_count, 1); + assert_eq!(stream.messages_count, 1); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_create_stream() { +#[iggy_harness(server(mcp))] +async fn should_create_stream(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); let name = "new_stream"; - assert_response::("create_stream", Some(json!({ "name": name})), |stream| { - assert_eq!(stream.name, name); - assert_eq!(stream.topics_count, 0); - assert_eq!(stream.messages_count, 0); - }) - .await; + let stream: StreamDetails = + invoke(&mcp_client, "create_stream", Some(json!({"name": name}))).await; + + assert_eq!(stream.name, name); + assert_eq!(stream.topics_count, 0); + assert_eq!(stream.messages_count, 0); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_update_stream() { - let name = "updated_stream"; - assert_empty_response( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_update_stream(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + invoke_empty( + &mcp_client, "update_stream", - Some(json!({"stream_id": STREAM_NAME, "name": name})), + Some(json!({"stream_id": seeds::names::STREAM, "name": "updated_stream"})), ) .await; } -#[tokio::test] -#[parallel] -async fn mcp_server_should_delete_stream() { - assert_empty_response("delete_stream", Some(json!({"stream_id": STREAM_NAME}))).await; +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_delete_stream(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + invoke_empty( + &mcp_client, + "delete_stream", + Some(json!({"stream_id": seeds::names::STREAM})), + ) + .await; } -#[tokio::test] -#[parallel] -async fn mcp_server_should_purge_stream() { - assert_empty_response("purge_stream", Some(json!({"stream_id": STREAM_NAME}))).await; +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_purge_stream(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + invoke_empty( + &mcp_client, + "purge_stream", + Some(json!({"stream_id": seeds::names::STREAM})), + ) + .await; } -#[tokio::test] -#[parallel] -async fn mcp_server_should_return_list_of_topics() { - assert_response::>( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_return_list_of_topics(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + let topics: Vec = invoke( + &mcp_client, "get_topics", - Some(json!({"stream_id": STREAM_NAME})), - |topics| { - assert_eq!(topics.len(), 1); - let topic = &topics[0]; - assert_eq!(topic.name, TOPIC_NAME); - assert_eq!(topic.partitions_count, 1); - assert_eq!(topic.messages_count, 1); - }, + Some(json!({"stream_id": seeds::names::STREAM})), ) .await; + + assert_eq!(topics.len(), 1); + assert_eq!(topics[0].name, seeds::names::TOPIC); + assert_eq!(topics[0].partitions_count, 1); + assert_eq!(topics[0].messages_count, 1); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_return_topic_details() { - assert_response::( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_return_topic_details(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + let topic: TopicDetails = invoke( + &mcp_client, "get_topic", - Some(json!({"stream_id": STREAM_NAME, "topic_id": TOPIC_NAME})), - |topic| { - assert_eq!(topic.id, 0); - assert_eq!(topic.name, TOPIC_NAME); - assert_eq!(topic.messages_count, 1); - }, + Some(json!({"stream_id": seeds::names::STREAM, "topic_id": seeds::names::TOPIC})), ) .await; + + assert_eq!(topic.id, 0); + assert_eq!(topic.name, seeds::names::TOPIC); + assert_eq!(topic.messages_count, 1); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_create_topic() { +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_create_topic(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); let name = "new_topic"; - assert_response::( + let topic: TopicDetails = invoke( + &mcp_client, "create_topic", - Some(json!({ "stream_id": STREAM_NAME, "name": name, "partitions_count": 1})), - |topic| { - assert_eq!(topic.id, 1); - assert_eq!(topic.name, name); - assert_eq!(topic.partitions_count, 1); - assert_eq!(topic.messages_count, 0); - }, + Some(json!({"stream_id": seeds::names::STREAM, "name": name, "partitions_count": 1})), ) .await; + + assert_eq!(topic.id, 1); + assert_eq!(topic.name, name); + assert_eq!(topic.partitions_count, 1); + assert_eq!(topic.messages_count, 0); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_update_topic() { - let name = "updated_topic"; - assert_empty_response( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_update_topic(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + invoke_empty( + &mcp_client, "update_topic", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "name": name})), + Some(json!({ + "stream_id": seeds::names::STREAM, + "topic_id": seeds::names::TOPIC, + "name": "updated_topic" + })), ) .await; } -#[tokio::test] -#[parallel] -async fn mcp_server_should_delete_topic() { - assert_empty_response( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_delete_topic(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + invoke_empty( + &mcp_client, "delete_topic", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME })), + Some(json!({"stream_id": seeds::names::STREAM, "topic_id": seeds::names::TOPIC})), ) .await; } -#[tokio::test] -#[parallel] -async fn mcp_server_should_purge_topic() { - assert_empty_response( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_purge_topic(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + invoke_empty( + &mcp_client, "purge_topic", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME })), + Some(json!({"stream_id": seeds::names::STREAM, "topic_id": seeds::names::TOPIC})), ) .await; } -#[tokio::test] -#[parallel] -async fn mcp_server_should_create_partitions() { - assert_empty_response( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_create_partitions(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + invoke_empty( + &mcp_client, "create_partitions", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partitions_count": 3 })), + Some(json!({ + "stream_id": seeds::names::STREAM, + "topic_id": seeds::names::TOPIC, + "partitions_count": 3 + })), ) .await; } -#[tokio::test] -#[parallel] -async fn mcp_server_should_delete_partitions() { - assert_empty_response( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_delete_partitions(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + invoke_empty( + &mcp_client, "delete_partitions", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partitions_count": 1 })), + Some(json!({ + "stream_id": seeds::names::STREAM, + "topic_id": seeds::names::TOPIC, + "partitions_count": 1 + })), ) .await; } -#[tokio::test] -#[parallel] -async fn mcp_server_should_delete_segments() { - assert_empty_response( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_delete_segments(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + invoke_empty( + &mcp_client, "delete_segments", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 0, "segments_count": 1 })), + Some(json!({ + "stream_id": seeds::names::STREAM, + "topic_id": seeds::names::TOPIC, + "partition_id": 0, + "segments_count": 1 + })), ) .await; } -#[tokio::test] -#[parallel] -async fn mcp_server_should_poll_messages() { - assert_response::( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_poll_messages(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + let messages: PolledMessages = invoke( + &mcp_client, "poll_messages", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 0, "offset": 0 })), - |messages| { - assert_eq!(messages.messages.len(), 1); - let message = &messages.messages[0]; - assert_eq!(message.header.offset, 0); - let payload = message.payload_as_string().expect("Failed to parse message payload"); - assert_eq!(payload, MESSAGE_PAYLOAD); - }, + Some(json!({ + "stream_id": seeds::names::STREAM, + "topic_id": seeds::names::TOPIC, + "partition_id": 0, + "offset": 0 + })), ) .await; + + assert_eq!(messages.messages.len(), 1); + assert_eq!(messages.messages[0].header.offset, 0); + let payload = messages.messages[0] + .payload_as_string() + .expect("Failed to parse payload"); + assert_eq!(payload, seeds::names::MESSAGE_PAYLOAD); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_send_messages() { - assert_empty_response( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_send_messages(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + invoke_empty( + &mcp_client, "send_messages", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 0, "messages": [ - { - "payload": "test" - } - ] })), + Some(json!({ + "stream_id": seeds::names::STREAM, + "topic_id": seeds::names::TOPIC, + "partition_id": 0, + "messages": [{"payload": "test"}] + })), ) .await; } -#[tokio::test] -#[parallel] -async fn mcp_server_should_return_stats() { - assert_response::("get_stats", None, |stats| { - assert!(!stats.hostname.is_empty()); - assert_eq!(stats.messages_count, 1); - }) - .await; +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_return_stats(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + let stats: Stats = invoke(&mcp_client, "get_stats", None).await; + + assert!(!stats.hostname.is_empty()); + assert_eq!(stats.messages_count, 1); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_return_me() { - assert_response::("get_me", None, |client| { - assert!(client.client_id > 0); - }) - .await; +#[iggy_harness(server(mcp))] +async fn should_return_me(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + let client: ClientInfoDetails = invoke(&mcp_client, "get_me", None).await; + + assert!(client.client_id > 0); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_return_clients() { - assert_response::>("get_clients", None, |clients| { - assert!(!clients.is_empty()); - }) - .await; +#[iggy_harness(server(mcp))] +async fn should_return_clients(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + let clients: Vec = invoke(&mcp_client, "get_clients", None).await; + + assert!(!clients.is_empty()); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_handle_snapshot() { - assert_response::("snapshot", None, |snapshot| { - assert!(!snapshot.0.is_empty()); - }) - .await; +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_handle_snapshot(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + let snapshot: Snapshot = invoke(&mcp_client, "snapshot", None).await; + + assert!(!snapshot.0.is_empty()); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_return_consumer_groups() { - assert_response::>( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_return_consumer_groups(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + let groups: Vec = invoke( + &mcp_client, "get_consumer_groups", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME})), - |groups| { - assert!(!groups.is_empty()); - }, + Some(json!({"stream_id": seeds::names::STREAM, "topic_id": seeds::names::TOPIC})), ) .await; -} -#[tokio::test] -#[parallel] -async fn mcp_server_should_return_consumer_group_details() { - assert_response::("get_consumer_group", Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "group_id": CONSUMER_GROUP_NAME })), |group| { - assert_eq!(group.name, CONSUMER_GROUP_NAME); - assert_eq!(group.partitions_count, 1); - assert_eq!(group.members_count, 0); - assert!(group.members.is_empty()); + assert!(!groups.is_empty()); +} - }) +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_return_consumer_group_details(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + let group: ConsumerGroupDetails = invoke( + &mcp_client, + "get_consumer_group", + Some(json!({ + "stream_id": seeds::names::STREAM, + "topic_id": seeds::names::TOPIC, + "group_id": seeds::names::CONSUMER_GROUP + })), + ) .await; + + assert_eq!(group.name, seeds::names::CONSUMER_GROUP); + assert_eq!(group.partitions_count, 1); + assert_eq!(group.members_count, 0); + assert!(group.members.is_empty()); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_create_consumer_group() { - let name = "test"; - assert_response::( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_create_consumer_group(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + let name = "new_group"; + let group: ConsumerGroupDetails = invoke( + &mcp_client, "create_consumer_group", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "name": name })), - |group| { - assert_eq!(group.name, name); - assert_eq!(group.partitions_count, 1); - assert_eq!(group.members_count, 0); - assert!(group.members.is_empty()); - }, + Some(json!({ + "stream_id": seeds::names::STREAM, + "topic_id": seeds::names::TOPIC, + "name": name + })), ) .await; + + assert_eq!(group.name, name); + assert_eq!(group.partitions_count, 1); + assert_eq!(group.members_count, 0); + assert!(group.members.is_empty()); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_delete_consumer_group() { - assert_empty_response( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_delete_consumer_group(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + invoke_empty( + &mcp_client, "delete_consumer_group", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "group_id": CONSUMER_GROUP_NAME })), + Some(json!({ + "stream_id": seeds::names::STREAM, + "topic_id": seeds::names::TOPIC, + "group_id": seeds::names::CONSUMER_GROUP + })), ) .await; } -#[tokio::test] -#[parallel] -async fn mcp_server_should_return_consumer_offset() { - assert_response::>( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_return_consumer_offset(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + let offset: Option = invoke( + &mcp_client, "get_consumer_offset", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 0 })), - |offset| { - assert!(offset.is_some()); - let offset = offset.unwrap(); - assert_eq!(offset.partition_id, 0); - assert_eq!(offset.stored_offset, 0); - assert_eq!(offset.current_offset, 0); - }, + Some(json!({ + "stream_id": seeds::names::STREAM, + "topic_id": seeds::names::TOPIC, + "partition_id": 0 + })), ) .await; + + let offset = offset.expect("Expected consumer offset"); + assert_eq!(offset.partition_id, 0); + assert_eq!(offset.stored_offset, 0); + assert_eq!(offset.current_offset, 0); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_store_consumer_offset() { - assert_empty_response( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_store_consumer_offset(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + invoke_empty( + &mcp_client, "store_consumer_offset", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 0, "offset": 0 })), + Some(json!({ + "stream_id": seeds::names::STREAM, + "topic_id": seeds::names::TOPIC, + "partition_id": 0, + "offset": 0 + })), ) .await; } -#[tokio::test] -#[parallel] -async fn mcp_server_should_delete_consumer_offset() { - assert_empty_response( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_delete_consumer_offset(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + invoke_empty( + &mcp_client, "delete_consumer_offset", - Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, "partition_id": 0, "offset": 0 })), + Some(json!({ + "stream_id": seeds::names::STREAM, + "topic_id": seeds::names::TOPIC, + "partition_id": 0, + "offset": 0 + })), ) .await; } -#[tokio::test] -#[parallel] -async fn mcp_server_should_return_personal_access_tokens() { - assert_response::>("get_personal_access_tokens", None, |tokens| { - assert_eq!(tokens.len(), 1); - assert_eq!(tokens[0].name, PERSONAL_ACCESS_TOKEN_NAME); - }) - .await; +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_return_personal_access_tokens(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + let tokens: Vec = + invoke(&mcp_client, "get_personal_access_tokens", None).await; + + assert_eq!(tokens.len(), 1); + assert_eq!(tokens[0].name, seeds::names::PERSONAL_ACCESS_TOKEN); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_create_personal_access_token() { +#[iggy_harness(server(mcp))] +async fn should_create_personal_access_token(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); let name = "test_token"; let expiry = PersonalAccessTokenExpiry::NeverExpire.to_string(); - assert_response::( + let token: RawPersonalAccessToken = invoke( + &mcp_client, "create_personal_access_token", - Some(json!({ "name": name, "expiry": expiry })), - |token| { - assert!(!token.token.is_empty()); - }, + Some(json!({"name": name, "expiry": expiry})), ) .await; + + assert!(!token.token.is_empty()); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_delete_personal_access_token() { - assert_empty_response( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_delete_personal_access_token(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + invoke_empty( + &mcp_client, "delete_personal_access_token", - Some(json!({ "name": PERSONAL_ACCESS_TOKEN_NAME})), + Some(json!({"name": seeds::names::PERSONAL_ACCESS_TOKEN})), ) .await; } -#[tokio::test] -#[parallel] -async fn mcp_server_should_return_users() { - assert_response::>("get_users", None, |users| { - assert_eq!(users.len(), 2); - }) - .await; +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_return_users(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + let users: Vec = invoke(&mcp_client, "get_users", None).await; + + assert_eq!(users.len(), 2); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_return_user_details() { - assert_response::("get_user", Some(json!({ "user_id": USER_NAME})), |user| { - assert_eq!(user.username, USER_NAME); - }) +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_return_user_details(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + let user: UserInfoDetails = invoke( + &mcp_client, + "get_user", + Some(json!({"user_id": seeds::names::USER})), + ) .await; + + assert_eq!(user.username, seeds::names::USER); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_create_user() { +#[iggy_harness(server(mcp))] +async fn should_create_user(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); let username = "test-mcp-user"; - assert_response::( + let user: UserInfoDetails = invoke( + &mcp_client, "create_user", - Some(json!({ "username": username, "password": "secret"})), - |user| { - assert_eq!(user.username, username); - }, + Some(json!({"username": username, "password": "secret"})), ) .await; + + assert_eq!(user.username, username); } -#[tokio::test] -#[parallel] -async fn mcp_server_should_update_user() { - assert_empty_response( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_update_user(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + invoke_empty( + &mcp_client, "update_user", - Some(json!({ "user_id": USER_NAME, "username": "test-mcp-user", "active": false})), + Some(json!({ + "user_id": seeds::names::USER, + "username": "updated-user", + "active": false + })), ) .await; } -#[tokio::test] -#[parallel] -async fn mcp_server_should_delete_user() { - assert_empty_response("delete_user", Some(json!({ "user_id": USER_NAME}))).await; +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_delete_user(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + invoke_empty( + &mcp_client, + "delete_user", + Some(json!({"user_id": seeds::names::USER})), + ) + .await; } -#[tokio::test] -#[parallel] -async fn mcp_server_should_update_permissions() { +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_update_permissions(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); let permissions = json!({ "global": { "manage_servers": true, @@ -524,186 +611,25 @@ async fn mcp_server_should_update_permissions() { } }); - assert_empty_response( + invoke_empty( + &mcp_client, "update_permissions", - Some(json!({ "user_id": USER_NAME, "permissions": permissions })), + Some(json!({"user_id": seeds::names::USER, "permissions": permissions})), ) .await; } -#[tokio::test] -#[parallel] -async fn mcp_server_should_change_password() { - assert_empty_response( +#[iggy_harness(server(mcp), seed = seeds::mcp_standard)] +async fn should_change_password(harness: &TestHarness) { + let mcp_client = harness.mcp_client().await.expect("MCP client required"); + invoke_empty( + &mcp_client, "change_password", - Some(json!({ "user_id": USER_NAME, "current_password": USER_PASSWORD, "new_password": "secret2"})), + Some(json!({ + "user_id": seeds::names::USER, + "current_password": seeds::names::USER_PASSWORD, + "new_password": "new_secret" + })), ) .await; } - -async fn assert_empty_response(method: &str, data: Option) { - assert_response::<()>(method, data, |()| {}).await -} - -async fn assert_response( - method: &str, - data: Option, - assert_response: impl FnOnce(T), -) { - let infra = setup().await; - let client = infra.mcp_client; - let result = invoke_request(&client, method, data).await; - assert_response(result) -} - -async fn invoke_request( - client: &TestMcpClient, - method: &str, - data: Option, -) -> T { - let error_message = format!("Failed to invoke MCP method: {method}",); - let mut result = client.invoke(method, data).await.expect(&error_message); - let result = result.content.remove(0); - let Some(text) = result.as_text() else { - panic!("Expected text response for MCP method: {method}"); - }; - - serde_json::from_str::(&text.text).expect("Failed to parse JSON") -} - -async fn setup() -> McpInfra { - let mut iggy_envs = HashMap::new(); - iggy_envs.insert("IGGY_CLUSTER_ENABLED".to_owned(), "true".to_owned()); - iggy_envs.insert("IGGY_QUIC_ENABLED".to_owned(), "false".to_owned()); - iggy_envs.insert("IGGY_WEBSOCKET_ENABLED".to_owned(), "false".to_owned()); - let mut test_server = TestServer::new(Some(iggy_envs), true, None, IpAddrKind::V4); - test_server.start(); - let iggy_server_address = test_server - .get_raw_tcp_addr() - .expect("Failed to get Iggy TCP address"); - seed_data(&iggy_server_address).await; - - let mut test_mcp_server = TestMcpServer::with_iggy_address(&iggy_server_address); - test_mcp_server.start(); - test_mcp_server.ensure_started().await; - let mcp_client = test_mcp_server.get_client().await; - - McpInfra { - _iggy_server: test_server, - _mcp_server: test_mcp_server, - mcp_client: TestMcpClient { mcp_client }, - } -} - -async fn seed_data(iggy_server_address: &str) { - let iggy_port = iggy_server_address - .split(':') - .next_back() - .unwrap() - .parse::() - .unwrap(); - - let iggy_client = IggyClient::from_connection_string(&format!( - "iggy://{DEFAULT_ROOT_USERNAME}:{DEFAULT_ROOT_PASSWORD}@localhost:{iggy_port}" - )) - .expect("Failed to create Iggy client"); - - iggy_client - .connect() - .await - .expect("Failed to initialize Iggy client"); - - iggy_client - .create_stream(STREAM_NAME) - .await - .expect("Failed to create stream"); - - iggy_client - .create_topic( - &STREAM_ID, - TOPIC_NAME, - 1, - iggy_common::CompressionAlgorithm::None, - None, - IggyExpiry::ServerDefault, - MaxTopicSize::ServerDefault, - ) - .await - .expect("Failed to create topic"); - - let mut messages = vec![ - IggyMessage::builder() - .payload(MESSAGE_PAYLOAD.into()) - .build() - .expect("Failed to build message"), - ]; - - iggy_client - .send_messages( - &STREAM_ID, - &TOPIC_ID, - &Partitioning::partition_id(0), - &mut messages, - ) - .await - .expect("Failed to send messages"); - - let consumer = - Consumer::new(Identifier::named(CONSUMER_NAME).expect("Failed to create consumer")); - - iggy_client - .store_consumer_offset(&consumer, &STREAM_ID, &TOPIC_ID, Some(0), 0) - .await - .expect("Failed to store consumer offset"); - - iggy_client - .create_consumer_group(&STREAM_ID, &TOPIC_ID, CONSUMER_GROUP_NAME) - .await - .expect("Failed to create consumer group"); - - iggy_client - .create_user(USER_NAME, USER_PASSWORD, UserStatus::Active, None) - .await - .expect("Failed to create user"); - - iggy_client - .create_personal_access_token( - PERSONAL_ACCESS_TOKEN_NAME, - PersonalAccessTokenExpiry::NeverExpire, - ) - .await - .expect("Failed to create personal access token"); -} - -#[derive(Debug)] -struct McpInfra { - _iggy_server: TestServer, - _mcp_server: TestMcpServer, - mcp_client: TestMcpClient, -} - -#[derive(Debug)] -struct TestMcpClient { - mcp_client: McpClient, -} - -impl TestMcpClient { - pub async fn list_tools(&self) -> Result { - self.mcp_client.list_tools(Default::default()).await - } - - pub async fn invoke( - &self, - method: &str, - data: Option, - ) -> Result { - self.mcp_client - .call_tool(CallToolRequestParams { - name: method.to_owned().into(), - arguments: data.and_then(|value| value.as_object().cloned()), - task: None, - meta: None, - }) - .await - } -} diff --git a/core/integration/tests/mod.rs b/core/integration/tests/mod.rs index b451f4499..22e917fab 100644 --- a/core/integration/tests/mod.rs +++ b/core/integration/tests/mod.rs @@ -26,6 +26,7 @@ use std::sync::{Arc, Once}; use std::{panic, thread}; mod cli; +mod cluster; mod config_provider; mod connectors; mod data_integrity;