From dad00d058bf6f19eb21c02edb1c80ccb6618ec89 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Tue, 13 Jan 2026 08:43:06 +0530 Subject: [PATCH 01/11] feat(connectors): implement Redshift Sink Connector with S3 staging Implements Issue #2540 - Redshift Sink Connector with S3 staging support. Features: - S3 staging with automatic CSV file upload - Redshift COPY command execution via PostgreSQL wire protocol - IAM role authentication (recommended) or access key credentials - Configurable batch size and compression (gzip, lzop, bzip2, zstd) - Automatic table creation with customizable schema - Retry logic with exponential backoff for transient failures - Automatic cleanup of staged S3 files Configuration options: - connection_string: Redshift cluster connection URL - target_table: Destination table name - iam_role: IAM role ARN for S3 access (recommended) - s3_bucket/s3_region/s3_prefix: S3 staging location - batch_size: Messages per batch (default: 10000) - compression: COPY compression format - delete_staged_files: Auto-cleanup toggle (default: true) - auto_create_table: Create table if missing (default: true) Closes #2540 --- Cargo.lock | 169 +++++++ Cargo.toml | 1 + .../connectors/sinks/redshift_sink/Cargo.toml | 54 ++ core/connectors/sinks/redshift_sink/README.md | 234 +++++++++ .../sinks/redshift_sink/src/config.rs | 237 +++++++++ .../connectors/sinks/redshift_sink/src/lib.rs | 477 ++++++++++++++++++ core/connectors/sinks/redshift_sink/src/s3.rs | 185 +++++++ 7 files changed, 1357 insertions(+) create mode 100644 core/connectors/sinks/redshift_sink/Cargo.toml create mode 100644 core/connectors/sinks/redshift_sink/README.md create mode 100644 core/connectors/sinks/redshift_sink/src/config.rs create mode 100644 core/connectors/sinks/redshift_sink/src/lib.rs create mode 100644 core/connectors/sinks/redshift_sink/src/s3.rs diff --git a/Cargo.lock b/Cargo.lock index b5761f4f33..b345fe005e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -889,6 +889,22 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "attohttpc" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e2cdb6d5ed835199484bb92bb8b3edd526effe995c61732580439c1a67e2e9" +dependencies = [ + "base64 0.22.1", + "http 1.4.0", + "log", + "rustls", + "serde", + "serde_json", + "url", + "webpki-roots 1.0.4", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -904,6 +920,23 @@ dependencies = [ "cc", ] +[[package]] +name = "aws-creds" +version = "0.39.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3b85155d265df828f84e53886ed9e427aed979dd8a39f5b8b2162c77e142d7" +dependencies = [ + "attohttpc", + "home", + "log", + "quick-xml 0.38.4", + "rust-ini", + "serde", + "thiserror 2.0.17", + "time", + "url", +] + [[package]] name = "aws-lc-rs" version = "1.15.2" @@ -926,6 +959,15 @@ dependencies = [ "fs_extra", ] +[[package]] +name = "aws-region" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "838b36c8dc927b6db1b6c6b8f5d05865f2213550b9e83bf92fa99ed6525472c0" +dependencies = [ + "thiserror 2.0.17", +] + [[package]] name = "axum" version = "0.8.8" @@ -1531,6 +1573,15 @@ dependencies = [ "thiserror 2.0.17", ] +[[package]] +name = "castaway" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dec551ab6e7578819132c713a93c022a05d60159dc86e7a7050223577484c55a" +dependencies = [ + "rustversion", +] + [[package]] name = "cc" version = "1.2.51" @@ -1724,6 +1775,19 @@ dependencies = [ "unicode-width 0.2.2", ] +[[package]] +name = "compact_str" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f86b9c4c00838774a6d902ef931eff7470720c51d90c2e32cfe15dc304737b3f" +dependencies = [ + "castaway", + "cfg-if", + "itoa", + "ryu", + "static_assertions", +] + [[package]] name = "compio" version = "0.17.0" @@ -4818,6 +4882,25 @@ dependencies = [ "uuid", ] +[[package]] +name = "iggy_connector_redshift_sink" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "dashmap", + "futures", + "iggy_connector_sdk", + "once_cell", + "rust-s3", + "serde", + "simd-json", + "sqlx", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "iggy_connector_sdk" version = "0.1.1-edge.1" @@ -5631,6 +5714,17 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" +[[package]] +name = "maybe-async" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cf92c10c7e361d6b99666ec1c6f9805b0bea2c3bd8c78dc6fe98ac5bd78db11" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "md-5" version = "0.10.6" @@ -5641,6 +5735,12 @@ dependencies = [ "digest", ] +[[package]] +name = "md5" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae960838283323069879657ca3de837e9f7bbb4c7bf6ea7f1b290d5e9476d2e0" + [[package]] name = "memchr" version = "2.7.6" @@ -5716,6 +5816,15 @@ dependencies = [ "unicase", ] +[[package]] +name = "minidom" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e394a0e3c7ccc2daea3dffabe82f09857b6b510cb25af87d54bf3e910ac1642d" +dependencies = [ + "rxml", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -7722,6 +7831,41 @@ dependencies = [ "ordered-multimap", ] +[[package]] +name = "rust-s3" +version = "0.37.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4af74047374528b627109d579ce86b23ccf6ffba7ff363c807126c1aff69e1bb" +dependencies = [ + "async-trait", + "aws-creds", + "aws-region", + "base64 0.22.1", + "bytes", + "cfg-if", + "futures-util", + "hex", + "hmac", + "http 1.4.0", + "log", + "maybe-async", + "md5", + "minidom", + "percent-encoding", + "quick-xml 0.38.4", + "reqwest", + "serde", + "serde_derive", + "serde_json", + "sha2", + "sysinfo 0.37.2", + "thiserror 2.0.17", + "time", + "tokio", + "tokio-stream", + "url", +] + [[package]] name = "rust_decimal" version = "1.39.0" @@ -7867,6 +8011,25 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "rxml" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc94b580d0f5a6b7a2d604e597513d3c673154b52ddeccd1d5c32360d945ee" +dependencies = [ + "bytes", + "rxml_validation", +] + +[[package]] +name = "rxml_validation" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "826e80413b9a35e9d33217b3dcac04cf95f6559d15944b93887a08be5496c4a4" +dependencies = [ + "compact_str", +] + [[package]] name = "ryu" version = "1.0.22" @@ -8779,6 +8942,12 @@ dependencies = [ "toml 0.8.23", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "stringprep" version = "0.1.5" diff --git a/Cargo.toml b/Cargo.toml index 3b68471ce5..641e9485b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ members = [ "core/connectors/sinks/iceberg_sink", "core/connectors/sinks/postgres_sink", "core/connectors/sinks/quickwit_sink", + "core/connectors/sinks/redshift_sink", "core/connectors/sinks/stdout_sink", "core/connectors/sources/elasticsearch_source", "core/connectors/sources/postgres_source", diff --git a/core/connectors/sinks/redshift_sink/Cargo.toml b/core/connectors/sinks/redshift_sink/Cargo.toml new file mode 100644 index 0000000000..63332613da --- /dev/null +++ b/core/connectors/sinks/redshift_sink/Cargo.toml @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_redshift_sink" +version = "0.1.0" +description = "Iggy Redshift sink connector for loading stream messages into Amazon Redshift via S3 staging" +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming", "redshift", "sink", "aws"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "README.md" + +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell", "futures", "simd-json", "prost"] + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +chrono = { workspace = true } +dashmap = { workspace = true } +futures = { workspace = true } +iggy_connector_sdk = { workspace = true } +once_cell = { workspace = true } +rust-s3 = { workspace = true } +serde = { workspace = true } +simd-json = { workspace = true } +sqlx = { version = "0.8", features = [ + "runtime-tokio-rustls", + "postgres", + "chrono", +] } +tokio = { workspace = true } +tracing = { workspace = true } +uuid = { workspace = true } \ No newline at end of file diff --git a/core/connectors/sinks/redshift_sink/README.md b/core/connectors/sinks/redshift_sink/README.md new file mode 100644 index 0000000000..7a22e25b5b --- /dev/null +++ b/core/connectors/sinks/redshift_sink/README.md @@ -0,0 +1,234 @@ +# Apache Iggy - Redshift Sink Connector + +A sink connector that loads data from Iggy streams into Amazon Redshift using the S3 staging method. This is the recommended approach for high-volume data loading into Redshift. + +## Overview + +The Redshift Sink Connector: + +1. **Buffers** incoming messages into batches +2. **Uploads** batches as CSV files to S3 +3. **Executes** Redshift COPY command to load data from S3 +4. **Cleans up** staged S3 files after successful load + +This approach leverages Redshift's massively parallel processing (MPP) architecture for efficient bulk loading. + +## Prerequisites + +- Amazon Redshift cluster with network access +- S3 bucket for staging files +- AWS credentials with appropriate permissions: + - S3: `s3:PutObject`, `s3:GetObject`, `s3:DeleteObject` on the staging bucket + - Redshift: `COPY` permission on the target table + +## Configuration + +Create a connector configuration file (e.g., `redshift.toml`): + +```toml +type = "sink" +key = "redshift" +enabled = true +version = 0 +name = "Redshift Sink" +path = "target/release/libiggy_connector_redshift_sink" +plugin_config_format = "toml" + +[[streams]] +stream = "events" +topics = ["user_actions"] +schema = "json" +batch_length = 10000 +poll_interval = "100ms" +consumer_group = "redshift_sink" + +[plugin_config] +# Redshift connection (PostgreSQL wire protocol) +connection_string = "postgres://admin:password@my-cluster.region.redshift.amazonaws.com:5439/mydb" +target_table = "public.events" + +# S3 staging configuration +s3_bucket = "my-staging-bucket" +s3_prefix = "redshift/staging/" +s3_region = "us-east-1" + +# AWS authentication - use either IAM role (preferred) or access keys +iam_role = "arn:aws:iam::123456789012:role/RedshiftS3Access" + +# Or use access keys instead of IAM role: +# aws_access_key_id = "AKIAIOSFODNN7EXAMPLE" +# aws_secret_access_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + +# Batching settings +batch_size = 10000 +flush_interval_ms = 30000 + +# CSV format options +csv_delimiter = "," +csv_quote = "\"" + +# COPY command options +max_errors = 10 +# compression = "gzip" + +# Cleanup and reliability +delete_staged_files = true +max_retries = 3 +retry_delay_ms = 1000 + +# Database settings +max_connections = 5 +auto_create_table = false + +# Metadata columns (adds iggy_offset, iggy_timestamp, etc.) +include_metadata = false +``` + +## Configuration Reference + +| Property | Type | Required | Default | Description | +|----------|------|----------|---------|-------------| +| `connection_string` | String | Yes | - | Redshift connection string in PostgreSQL format | +| `target_table` | String | Yes | - | Target table name (can include schema) | +| `s3_bucket` | String | Yes | - | S3 bucket for staging CSV files | +| `s3_region` | String | Yes | - | AWS region for the S3 bucket | +| `s3_prefix` | String | No | `""` | S3 key prefix for staged files | +| `iam_role` | String | No* | - | IAM role ARN for Redshift to access S3 | +| `aws_access_key_id` | String | No* | - | AWS access key ID | +| `aws_secret_access_key` | String | No* | - | AWS secret access key | +| `batch_size` | Integer | No | `10000` | Messages per batch before S3 upload | +| `flush_interval_ms` | Integer | No | `30000` | Max wait time before flushing partial batch | +| `csv_delimiter` | Char | No | `,` | CSV field delimiter | +| `csv_quote` | Char | No | `"` | CSV quote character | +| `max_errors` | Integer | No | `0` | Max errors before COPY fails | +| `compression` | String | No | `none` | Compression: `gzip`, `lzop`, `bzip2`, `zstd` | +| `delete_staged_files` | Boolean | No | `true` | Delete S3 files after successful COPY | +| `max_connections` | Integer | No | `5` | Max Redshift connections | +| `max_retries` | Integer | No | `3` | Max retry attempts for failures | +| `retry_delay_ms` | Integer | No | `1000` | Initial retry delay (exponential backoff) | +| `include_metadata` | Boolean | No | `false` | Include Iggy metadata columns | +| `auto_create_table` | Boolean | No | `false` | Auto-create table if not exists | + +*Either `iam_role` or both `aws_access_key_id` and `aws_secret_access_key` must be provided. + +## Table Schema + +When `auto_create_table` is enabled, the connector creates a table with this schema: + +```sql +CREATE TABLE IF NOT EXISTS ( + id VARCHAR(40) PRIMARY KEY, + payload VARCHAR(MAX), + -- When include_metadata = true: + iggy_offset BIGINT, + iggy_timestamp TIMESTAMP, + iggy_stream VARCHAR(256), + iggy_topic VARCHAR(256), + iggy_partition_id INTEGER, + -- + created_at TIMESTAMP DEFAULT GETDATE() +); +``` + +For production use, pre-create your table with appropriate column types, sort keys, and distribution style. + +## IAM Role Setup + +For IAM role authentication (recommended), create a role with this trust policy: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": "redshift.amazonaws.com" + }, + "Action": "sts:AssumeRole" + } + ] +} +``` + +And attach a policy with S3 access: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:GetObject", + "s3:GetObjectVersion", + "s3:GetBucketLocation", + "s3:ListBucket" + ], + "Resource": [ + "arn:aws:s3:::my-staging-bucket", + "arn:aws:s3:::my-staging-bucket/*" + ] + } + ] +} +``` + +Then associate the role with your Redshift cluster. + +## Performance Tuning + +### Batch Size + +- **Small batches** (1,000-5,000): Lower latency, more COPY operations +- **Large batches** (50,000-100,000): Higher throughput, more memory usage +- Recommended starting point: `10,000` + +### Compression + +Enable compression for large payloads to reduce S3 transfer time: + +```toml +compression = "gzip" +``` + +### Parallelism + +Increase `batch_length` in stream config to process more messages per poll: + +```toml +[[streams]] +batch_length = 50000 +``` + +## Error Handling + +The connector implements retry logic with exponential backoff for transient failures: + +- **S3 upload failures**: Retried up to `max_retries` times +- **COPY command failures**: Retried with backoff, failed rows logged +- **Cleanup failures**: Logged as warnings, do not block processing + +Use `max_errors` to control COPY behavior: +- `0`: Fail on first error (strict mode) +- `N`: Allow up to N errors per COPY operation + +## Monitoring + +The connector logs statistics on close: + +``` +Closing Redshift sink connector ID: 1. Stats: 150000 messages processed, 15 batches loaded, 0 errors +``` + +Monitor these metrics to track connector health. + +## Limitations + +- Payload must be convertible to string (JSON, text, or raw bytes) +- Table must exist unless `auto_create_table` is enabled +- Currently supports CSV format only (Parquet planned) + +## License + +Licensed under the Apache License, Version 2.0. diff --git a/core/connectors/sinks/redshift_sink/src/config.rs b/core/connectors/sinks/redshift_sink/src/config.rs new file mode 100644 index 0000000000..f13da6a434 --- /dev/null +++ b/core/connectors/sinks/redshift_sink/src/config.rs @@ -0,0 +1,237 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy_connector_sdk::Error; +use serde::{Deserialize, Serialize}; + +/// Configuration for the Redshift Sink Connector. +/// +/// This connector loads data from Iggy streams into Amazon Redshift using S3 staging, +/// which is the recommended approach for high-volume data loading. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RedshiftSinkConfig { + /// Redshift connection string in PostgreSQL format. + /// Example: `postgres://user:password@cluster.region.redshift.amazonaws.com:5439/database` + pub connection_string: String, + + /// Target table name in Redshift. Can include schema prefix. + /// Example: `public.events` or `analytics.user_actions` + pub target_table: String, + + /// IAM role ARN for Redshift to access S3. Preferred over access keys. + /// Example: `arn:aws:iam::123456789012:role/RedshiftS3Access` + pub iam_role: Option, + + /// S3 bucket name for staging CSV files before COPY. + pub s3_bucket: String, + + /// S3 key prefix for staged files. Defaults to empty string. + /// Example: `staging/redshift/` + pub s3_prefix: Option, + + /// AWS region for the S3 bucket. + /// Example: `us-east-1` + pub s3_region: String, + + /// AWS access key ID. Required if IAM role is not specified. + pub aws_access_key_id: Option, + + /// AWS secret access key. Required if IAM role is not specified. + pub aws_secret_access_key: Option, + + /// Number of messages to batch before uploading to S3 and executing COPY. + /// Defaults to 10000. + pub batch_size: Option, + + /// Maximum time in milliseconds to wait before flushing a partial batch. + /// Defaults to 30000 (30 seconds). + pub flush_interval_ms: Option, + + /// CSV field delimiter character. Defaults to `,`. + pub csv_delimiter: Option, + + /// CSV quote character for escaping. Defaults to `"`. + pub csv_quote: Option, + + /// Number of header rows to skip. Defaults to 0. + pub ignore_header: Option, + + /// Maximum number of errors allowed before COPY fails. Defaults to 0. + pub max_errors: Option, + + /// Compression format for staged files: `gzip`, `lzop`, `bzip2`, or `none`. + pub compression: Option, + + /// Whether to delete staged S3 files after successful COPY. Defaults to true. + pub delete_staged_files: Option, + + /// Maximum number of database connections. Defaults to 5. + pub max_connections: Option, + + /// Maximum number of retry attempts for transient failures. Defaults to 3. + pub max_retries: Option, + + /// Initial delay in milliseconds between retries. Uses exponential backoff. + /// Defaults to 1000. + pub retry_delay_ms: Option, + + /// Whether to include Iggy metadata columns (offset, timestamp, stream, topic, partition). + /// Defaults to false. + pub include_metadata: Option, + + /// Whether to auto-create the target table if it doesn't exist. Defaults to false. + pub auto_create_table: Option, +} + +impl RedshiftSinkConfig { + /// Validates the configuration and returns an error if invalid. + pub fn validate(&self) -> Result<(), Error> { + if self.connection_string.is_empty() { + return Err(Error::InvalidConfig); + } + + if self.target_table.is_empty() { + return Err(Error::InvalidConfig); + } + + if self.s3_bucket.is_empty() { + return Err(Error::InvalidConfig); + } + + if self.s3_region.is_empty() { + return Err(Error::InvalidConfig); + } + + // Validate AWS credentials: either IAM role or access keys must be provided + let has_iam_role = self.iam_role.as_ref().is_some_and(|r| !r.is_empty()); + let has_access_key = self + .aws_access_key_id + .as_ref() + .is_some_and(|k| !k.is_empty()); + let has_secret_key = self + .aws_secret_access_key + .as_ref() + .is_some_and(|s| !s.is_empty()); + + if !(has_iam_role || (has_access_key && has_secret_key)) { + return Err(Error::InvalidConfig); + } + + // If using access keys, both must be provided + if (has_access_key && !has_secret_key) || (!has_access_key && has_secret_key) { + return Err(Error::InvalidConfig); + } + + // Validate compression if specified + if let Some(compression) = &self.compression { + let valid = ["gzip", "lzop", "bzip2", "none", "zstd"]; + if !valid.contains(&compression.to_lowercase().as_str()) { + return Err(Error::InvalidConfig); + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn valid_config() -> RedshiftSinkConfig { + RedshiftSinkConfig { + connection_string: "postgres://user:pass@host:5439/db".to_string(), + target_table: "test_table".to_string(), + iam_role: Some("arn:aws:iam::123:role/Test".to_string()), + s3_bucket: "bucket".to_string(), + s3_prefix: None, + s3_region: "us-east-1".to_string(), + aws_access_key_id: None, + aws_secret_access_key: None, + batch_size: None, + flush_interval_ms: None, + csv_delimiter: None, + csv_quote: None, + ignore_header: None, + max_errors: None, + compression: None, + delete_staged_files: None, + max_connections: None, + max_retries: None, + retry_delay_ms: None, + include_metadata: None, + auto_create_table: None, + } + } + + #[test] + fn test_valid_config_with_iam_role() { + let config = valid_config(); + assert!(config.validate().is_ok()); + } + + #[test] + fn test_valid_config_with_access_keys() { + let mut config = valid_config(); + config.iam_role = None; + config.aws_access_key_id = Some("AKIAIOSFODNN7EXAMPLE".to_string()); + config.aws_secret_access_key = Some("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string()); + assert!(config.validate().is_ok()); + } + + #[test] + fn test_invalid_empty_connection_string() { + let mut config = valid_config(); + config.connection_string = String::new(); + assert!(config.validate().is_err()); + } + + #[test] + fn test_invalid_empty_table() { + let mut config = valid_config(); + config.target_table = String::new(); + assert!(config.validate().is_err()); + } + + #[test] + fn test_invalid_empty_bucket() { + let mut config = valid_config(); + config.s3_bucket = String::new(); + assert!(config.validate().is_err()); + } + + #[test] + fn test_invalid_compression() { + let mut config = valid_config(); + config.compression = Some("invalid".to_string()); + assert!(config.validate().is_err()); + } + + #[test] + fn test_valid_compression_options() { + for comp in ["gzip", "GZIP", "lzop", "bzip2", "none", "zstd"] { + let mut config = valid_config(); + config.compression = Some(comp.to_string()); + assert!( + config.validate().is_ok(), + "compression '{}' should be valid", + comp + ); + } + } +} diff --git a/core/connectors/sinks/redshift_sink/src/lib.rs b/core/connectors/sinks/redshift_sink/src/lib.rs new file mode 100644 index 0000000000..8502d8a99f --- /dev/null +++ b/core/connectors/sinks/redshift_sink/src/lib.rs @@ -0,0 +1,477 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +mod config; +mod s3; + +use async_trait::async_trait; +use config::RedshiftSinkConfig; +use iggy_connector_sdk::{ + ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, sink_connector, +}; +use s3::S3Uploader; +use sqlx::{Pool, Postgres, postgres::PgPoolOptions}; +use std::time::Duration; +use tokio::sync::Mutex; +use tracing::{error, info, warn}; + +sink_connector!(RedshiftSink); + +#[derive(Debug)] +pub struct RedshiftSink { + id: u32, + config: RedshiftSinkConfig, + pool: Option>, + s3_uploader: Option, + state: Mutex, +} + +#[derive(Debug, Default)] +struct SinkState { + messages_processed: u64, + batches_loaded: u64, + load_errors: u64, +} + +impl RedshiftSink { + pub fn new(id: u32, config: RedshiftSinkConfig) -> Self { + RedshiftSink { + id, + config, + pool: None, + s3_uploader: None, + state: Mutex::new(SinkState::default()), + } + } + + async fn connect_redshift(&mut self) -> Result<(), Error> { + let max_connections = self.config.max_connections.unwrap_or(5); + let redacted = self + .config + .connection_string + .chars() + .take(20) + .collect::(); + + info!( + "Connecting to Redshift with max {} connections, connection: {}...", + max_connections, redacted + ); + + let pool = PgPoolOptions::new() + .max_connections(max_connections) + .acquire_timeout(Duration::from_secs(30)) + .connect(&self.config.connection_string) + .await + .map_err(|e| Error::InitError(format!("Failed to connect to Redshift: {e}")))?; + + sqlx::query("SELECT 1") + .execute(&pool) + .await + .map_err(|e| Error::InitError(format!("Redshift connectivity test failed: {e}")))?; + + self.pool = Some(pool); + info!("Connected to Redshift cluster"); + Ok(()) + } + + fn init_s3_uploader(&mut self) -> Result<(), Error> { + let uploader = S3Uploader::new( + &self.config.s3_bucket, + self.config.s3_prefix.as_deref().unwrap_or(""), + &self.config.s3_region, + self.config.aws_access_key_id.as_deref(), + self.config.aws_secret_access_key.as_deref(), + )?; + self.s3_uploader = Some(uploader); + info!( + "Initialized S3 uploader for bucket: {}, region: {}", + self.config.s3_bucket, self.config.s3_region + ); + Ok(()) + } + + async fn ensure_table_exists(&self) -> Result<(), Error> { + if !self.config.auto_create_table.unwrap_or(false) { + return Ok(()); + } + + let pool = self + .pool + .as_ref() + .ok_or_else(|| Error::InitError("Database not connected".to_string()))?; + + let table_name = &self.config.target_table; + let include_metadata = self.config.include_metadata.unwrap_or(false); + + let mut sql = format!( + "CREATE TABLE IF NOT EXISTS {table_name} ( + id VARCHAR(40) PRIMARY KEY, + payload VARCHAR(MAX)" + ); + + if include_metadata { + sql.push_str( + ", + iggy_offset BIGINT, + iggy_timestamp TIMESTAMP, + iggy_stream VARCHAR(256), + iggy_topic VARCHAR(256), + iggy_partition_id INTEGER", + ); + } + + sql.push_str( + ", + created_at TIMESTAMP DEFAULT GETDATE() + )", + ); + + sqlx::query(&sql) + .execute(pool) + .await + .map_err(|e| Error::InitError(format!("Failed to create table '{table_name}': {e}")))?; + + info!("Ensured table '{}' exists in Redshift", table_name); + Ok(()) + } + + async fn process_batch( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + ) -> Result<(), Error> { + if messages.is_empty() { + return Ok(()); + } + + let s3_uploader = self + .s3_uploader + .as_ref() + .ok_or_else(|| Error::InitError("S3 uploader not initialized".to_string()))?; + + let pool = self + .pool + .as_ref() + .ok_or_else(|| Error::InitError("Database not connected".to_string()))?; + + // Convert messages to CSV + let csv_data = self.messages_to_csv(topic_metadata, messages_metadata, messages)?; + + // Upload to S3 + let s3_key = s3_uploader.upload_csv(&csv_data).await?; + let s3_path = format!("s3://{}/{}", self.config.s3_bucket, s3_key); + + info!( + "Uploaded {} messages ({} bytes) to {}", + messages.len(), + csv_data.len(), + s3_path + ); + + // Execute COPY command + let copy_result = self.execute_copy(pool, &s3_path).await; + + // Cleanup S3 file if configured + if self.config.delete_staged_files.unwrap_or(true) + && let Err(e) = s3_uploader.delete_file(&s3_key).await + { + warn!("Failed to delete staged file {}: {}", s3_key, e); + } + + copy_result?; + + let mut state = self.state.lock().await; + state.messages_processed += messages.len() as u64; + state.batches_loaded += 1; + + info!( + "Redshift sink ID: {} loaded {} messages to table '{}' (total: {}, batches: {})", + self.id, + messages.len(), + self.config.target_table, + state.messages_processed, + state.batches_loaded + ); + + Ok(()) + } + + fn messages_to_csv( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + messages: &[ConsumedMessage], + ) -> Result, Error> { + let delimiter = self.config.csv_delimiter.unwrap_or(','); + let quote = self.config.csv_quote.unwrap_or('"'); + let include_metadata = self.config.include_metadata.unwrap_or(false); + + let mut csv_output = Vec::new(); + + for message in messages { + let payload_str = match &message.payload { + Payload::Json(value) => simd_json::to_string(value).unwrap_or_default(), + Payload::Text(text) => text.clone(), + Payload::Raw(bytes) => String::from_utf8_lossy(bytes).to_string(), + _ => { + let bytes = message.payload.clone().try_into_vec().map_err(|e| { + error!("Failed to convert payload: {}", e); + Error::InvalidRecord + })?; + String::from_utf8_lossy(&bytes).to_string() + } + }; + + // Escape quotes in payload + let escaped_payload = payload_str.replace(quote, &format!("{quote}{quote}")); + + let mut row = format!( + "{}{delim}{quote}{payload}{quote}", + message.id, + delim = delimiter, + payload = escaped_payload + ); + + if include_metadata { + let timestamp_secs = message.timestamp / 1_000_000; + let timestamp = chrono::DateTime::from_timestamp(timestamp_secs as i64, 0) + .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string()) + .unwrap_or_default(); + + row.push_str(&format!( + "{delim}{offset}{delim}{ts}{delim}{quote}{stream}{quote}{delim}{quote}{topic}{quote}{delim}{partition}", + delim = delimiter, + offset = message.offset, + ts = timestamp, + stream = topic_metadata.stream, + topic = topic_metadata.topic, + partition = messages_metadata.partition_id + )); + } + + row.push('\n'); + csv_output.extend_from_slice(row.as_bytes()); + } + + Ok(csv_output) + } + + async fn execute_copy(&self, pool: &Pool, s3_path: &str) -> Result<(), Error> { + let table = &self.config.target_table; + let delimiter = self.config.csv_delimiter.unwrap_or(','); + let quote = self.config.csv_quote.unwrap_or('"'); + let max_errors = self.config.max_errors.unwrap_or(0); + let include_metadata = self.config.include_metadata.unwrap_or(false); + + let columns = if include_metadata { + "(id, payload, iggy_offset, iggy_timestamp, iggy_stream, iggy_topic, iggy_partition_id)" + } else { + "(id, payload)" + }; + + let credentials = if let Some(iam_role) = &self.config.iam_role { + format!("IAM_ROLE '{}'", iam_role) + } else if let (Some(key_id), Some(secret)) = ( + &self.config.aws_access_key_id, + &self.config.aws_secret_access_key, + ) { + format!("ACCESS_KEY_ID '{}' SECRET_ACCESS_KEY '{}'", key_id, secret) + } else { + return Err(Error::InitError( + "Either IAM role or AWS credentials must be provided".to_string(), + )); + }; + + let compression = self + .config + .compression + .as_deref() + .map(|c| format!("{} ", c.to_uppercase())) + .unwrap_or_default(); + + let copy_sql = format!( + "COPY {table} {columns} + FROM '{s3_path}' + {credentials} + {compression}FORMAT AS CSV + DELIMITER '{delimiter}' + QUOTE '{quote}' + MAXERROR {max_errors} + REGION '{region}'", + region = self.config.s3_region + ); + + let max_retries = self.config.max_retries.unwrap_or(3); + let retry_delay = self.config.retry_delay_ms.unwrap_or(1000); + + for attempt in 0..=max_retries { + match sqlx::query(©_sql).execute(pool).await { + Ok(_) => return Ok(()), + Err(e) if attempt < max_retries => { + let delay = retry_delay * 2u64.pow(attempt); + warn!( + "COPY command failed (attempt {}/{}): {}. Retrying in {}ms...", + attempt + 1, + max_retries + 1, + e, + delay + ); + tokio::time::sleep(Duration::from_millis(delay)).await; + } + Err(e) => { + error!( + "COPY command failed after {} attempts: {}", + max_retries + 1, + e + ); + let mut state = self.state.lock().await; + state.load_errors += 1; + return Err(Error::Storage(format!("COPY command failed: {e}"))); + } + } + } + + Ok(()) + } +} + +#[async_trait] +impl Sink for RedshiftSink { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opening Redshift sink connector ID: {}. Target: {}, S3 bucket: {}", + self.id, self.config.target_table, self.config.s3_bucket + ); + + self.config.validate()?; + self.init_s3_uploader()?; + self.connect_redshift().await?; + self.ensure_table_exists().await?; + + info!( + "Redshift sink connector ID: {} initialized successfully", + self.id + ); + Ok(()) + } + + async fn consume( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec, + ) -> Result<(), Error> { + let batch_size = self.config.batch_size.unwrap_or(10000) as usize; + + for chunk in messages.chunks(batch_size) { + if let Err(e) = self + .process_batch(topic_metadata, &messages_metadata, chunk) + .await + { + error!( + "Failed to process batch for table '{}': {}", + self.config.target_table, e + ); + return Err(e); + } + } + + Ok(()) + } + + async fn close(&mut self) -> Result<(), Error> { + let state = self.state.lock().await; + info!( + "Closing Redshift sink connector ID: {}. Stats: {} messages processed, {} batches loaded, {} errors", + self.id, state.messages_processed, state.batches_loaded, state.load_errors + ); + + if let Some(pool) = self.pool.take() { + pool.close().await; + } + + info!("Redshift sink connector ID: {} closed", self.id); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_config() -> RedshiftSinkConfig { + RedshiftSinkConfig { + connection_string: "postgres://user:pass@localhost:5439/dev".to_string(), + target_table: "test_table".to_string(), + iam_role: Some("arn:aws:iam::123456789:role/RedshiftS3Access".to_string()), + s3_bucket: "test-bucket".to_string(), + s3_prefix: Some("staging/".to_string()), + s3_region: "us-east-1".to_string(), + aws_access_key_id: None, + aws_secret_access_key: None, + batch_size: Some(1000), + flush_interval_ms: None, + csv_delimiter: Some(','), + csv_quote: Some('"'), + ignore_header: None, + max_errors: Some(10), + compression: None, + delete_staged_files: Some(true), + max_connections: Some(5), + max_retries: Some(3), + retry_delay_ms: Some(1000), + include_metadata: Some(false), + auto_create_table: Some(false), + } + } + + #[test] + fn test_config_validation_valid() { + let config = test_config(); + assert!(config.validate().is_ok()); + } + + #[test] + fn test_config_validation_missing_credentials() { + let mut config = test_config(); + config.iam_role = None; + config.aws_access_key_id = None; + config.aws_secret_access_key = None; + assert!(config.validate().is_err()); + } + + #[test] + fn test_config_validation_partial_credentials() { + let mut config = test_config(); + config.iam_role = None; + config.aws_access_key_id = Some("AKIAIOSFODNN7EXAMPLE".to_string()); + config.aws_secret_access_key = None; + assert!(config.validate().is_err()); + } + + #[test] + fn test_sink_creation() { + let config = test_config(); + let sink = RedshiftSink::new(1, config); + assert_eq!(sink.id, 1); + assert!(sink.pool.is_none()); + assert!(sink.s3_uploader.is_none()); + } +} diff --git a/core/connectors/sinks/redshift_sink/src/s3.rs b/core/connectors/sinks/redshift_sink/src/s3.rs new file mode 100644 index 0000000000..b49aaf0ddc --- /dev/null +++ b/core/connectors/sinks/redshift_sink/src/s3.rs @@ -0,0 +1,185 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy_connector_sdk::Error; +use s3::bucket::Bucket; +use s3::creds::Credentials; +use s3::region::Region; +use tracing::{error, info}; +use uuid::Uuid; + +/// S3 uploader for staging CSV files before Redshift COPY. +#[derive(Debug)] +pub struct S3Uploader { + bucket: Box, + prefix: String, +} + +impl S3Uploader { + /// Creates a new S3 uploader with the specified configuration. + pub fn new( + bucket_name: &str, + prefix: &str, + region: &str, + access_key_id: Option<&str>, + secret_access_key: Option<&str>, + ) -> Result { + let region = Region::Custom { + region: region.to_string(), + endpoint: format!("https://s3.{}.amazonaws.com", region), + }; + + let credentials = match (access_key_id, secret_access_key) { + (Some(key_id), Some(secret)) => { + Credentials::new(Some(key_id), Some(secret), None, None, None).map_err(|e| { + error!("Failed to create S3 credentials: {}", e); + Error::InitError(format!("Invalid AWS credentials: {e}")) + })? + } + _ => { + // Use default credential chain (environment variables, instance profile, etc.) + Credentials::default().map_err(|e| { + error!("Failed to load default S3 credentials: {}", e); + Error::InitError(format!("Failed to load AWS credentials: {e}")) + })? + } + }; + + let bucket = Bucket::new(bucket_name, region, credentials).map_err(|e| { + error!("Failed to create S3 bucket client: {}", e); + Error::InitError(format!("Failed to initialize S3 bucket: {e}")) + })?; + + let prefix = prefix.trim_end_matches('/').to_string(); + + Ok(S3Uploader { bucket, prefix }) + } + + /// Uploads CSV data to S3 and returns the S3 key. + pub async fn upload_csv(&self, data: &[u8]) -> Result { + let file_id = Uuid::new_v4(); + let key = if self.prefix.is_empty() { + format!("{}.csv", file_id) + } else { + format!("{}/{}.csv", self.prefix, file_id) + }; + + let response = self.bucket.put_object(&key, data).await.map_err(|e| { + error!("Failed to upload to S3 key '{}': {}", key, e); + Error::Storage(format!("S3 upload failed: {e}")) + })?; + + if response.status_code() != 200 { + error!( + "S3 upload returned status {}: {}", + response.status_code(), + String::from_utf8_lossy(response.as_slice()) + ); + return Err(Error::Storage(format!( + "S3 upload failed with status {}", + response.status_code() + ))); + } + + info!( + "Uploaded {} bytes to s3://{}/{}", + data.len(), + self.bucket.name(), + key + ); + Ok(key) + } + + /// Deletes a file from S3 by key. + pub async fn delete_file(&self, key: &str) -> Result<(), Error> { + let response = self.bucket.delete_object(key).await.map_err(|e| { + error!("Failed to delete S3 object '{}': {}", key, e); + Error::Storage(format!("S3 delete failed: {e}")) + })?; + + if response.status_code() != 204 && response.status_code() != 200 { + error!( + "S3 delete returned unexpected status {}: {}", + response.status_code(), + String::from_utf8_lossy(response.as_slice()) + ); + return Err(Error::Storage(format!( + "S3 delete failed with status {}", + response.status_code() + ))); + } + + info!("Deleted s3://{}/{}", self.bucket.name(), key); + Ok(()) + } + + /// Checks if the bucket is accessible by performing a HEAD request. + #[allow(dead_code)] + pub async fn check_connectivity(&self) -> Result<(), Error> { + self.bucket.head_object("/").await.map_err(|e| { + error!("S3 connectivity check failed: {}", e); + Error::Connection(format!("Cannot access S3 bucket: {e}")) + })?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_s3_uploader_creation_with_credentials() { + let result = S3Uploader::new( + "test-bucket", + "prefix/", + "us-east-1", + Some("AKIAIOSFODNN7EXAMPLE"), + Some("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"), + ); + assert!(result.is_ok()); + } + + #[test] + fn test_prefix_normalization() { + let uploader = S3Uploader::new( + "test-bucket", + "staging/redshift/", + "us-east-1", + Some("AKIAIOSFODNN7EXAMPLE"), + Some("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"), + ) + .unwrap(); + + assert_eq!(uploader.prefix, "staging/redshift"); + } + + #[test] + fn test_empty_prefix() { + let uploader = S3Uploader::new( + "test-bucket", + "", + "us-east-1", + Some("AKIAIOSFODNN7EXAMPLE"), + Some("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"), + ) + .unwrap(); + + assert_eq!(uploader.prefix, ""); + } +} From c60cccdf451c544d6f8015bd5a341efaf5427f73 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Tue, 13 Jan 2026 13:38:40 +0530 Subject: [PATCH 02/11] fix: resolve CI failures for redshift sink connector - Fix markdown lint issues in README.md (table formatting, blank lines, code fence language) - Fix trailing newline in Cargo.toml - Apply TOML formatting via taplo - Add missing dependencies to DEPENDENCIES.md (rust-s3, rxml, rxml_validation, static_assertions) --- DEPENDENCIES.md | 4 ++++ core/connectors/sinks/redshift_sink/Cargo.toml | 2 +- core/connectors/sinks/redshift_sink/README.md | 5 +++-- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 8423e83d3e..2933c86807 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -689,6 +689,9 @@ rustls-platform-verifier: 0.6.2, "Apache-2.0 OR MIT", rustls-platform-verifier-android: 0.1.1, "Apache-2.0 OR MIT", rustls-webpki: 0.103.8, "ISC", rustversion: 1.0.22, "Apache-2.0 OR MIT", +rust-s3: 0.37.1, "MIT", +rxml: 0.11.1, "MIT", +rxml_validation: 0.11.0, "MIT", ryu: 1.0.22, "Apache-2.0 OR BSL-1.0", same-file: 1.0.6, "MIT OR Unlicense", scc: 2.4.0, "Apache-2.0", @@ -761,6 +764,7 @@ sqlx-sqlite: 0.8.6, "Apache-2.0 OR MIT", sse-stream: 0.2.1, "Apache-2.0 OR MIT", stable_deref_trait: 1.2.1, "Apache-2.0 OR MIT", static-toml: 1.3.0, "MIT", +static_assertions: 1.1.0, "Apache-2.0 OR MIT", stringprep: 0.1.5, "Apache-2.0 OR MIT", strsim: 0.11.1, "MIT", structmeta: 0.3.0, "Apache-2.0 OR MIT", diff --git a/core/connectors/sinks/redshift_sink/Cargo.toml b/core/connectors/sinks/redshift_sink/Cargo.toml index 63332613da..47c5155cef 100644 --- a/core/connectors/sinks/redshift_sink/Cargo.toml +++ b/core/connectors/sinks/redshift_sink/Cargo.toml @@ -51,4 +51,4 @@ sqlx = { version = "0.8", features = [ ] } tokio = { workspace = true } tracing = { workspace = true } -uuid = { workspace = true } \ No newline at end of file +uuid = { workspace = true } diff --git a/core/connectors/sinks/redshift_sink/README.md b/core/connectors/sinks/redshift_sink/README.md index 7a22e25b5b..d7a57299bd 100644 --- a/core/connectors/sinks/redshift_sink/README.md +++ b/core/connectors/sinks/redshift_sink/README.md @@ -87,7 +87,7 @@ include_metadata = false ## Configuration Reference | Property | Type | Required | Default | Description | -|----------|------|----------|---------|-------------| +| -------- | ---- | -------- | ------- | ----------- | | `connection_string` | String | Yes | - | Redshift connection string in PostgreSQL format | | `target_table` | String | Yes | - | Target table name (can include schema) | | `s3_bucket` | String | Yes | - | S3 bucket for staging CSV files | @@ -210,6 +210,7 @@ The connector implements retry logic with exponential backoff for transient fail - **Cleanup failures**: Logged as warnings, do not block processing Use `max_errors` to control COPY behavior: + - `0`: Fail on first error (strict mode) - `N`: Allow up to N errors per COPY operation @@ -217,7 +218,7 @@ Use `max_errors` to control COPY behavior: The connector logs statistics on close: -``` +```text Closing Redshift sink connector ID: 1. Stats: 150000 messages processed, 15 batches loaded, 0 errors ``` From 8cc7748e3387ee9920467a87899e69d0003465ef Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Wed, 14 Jan 2026 18:49:25 +0530 Subject: [PATCH 03/11] fix: correct alphabetical ordering of rust-s3 in DEPENDENCIES.md --- DEPENDENCIES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 2933c86807..b05e545f9d 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -676,6 +676,7 @@ rust-embed: 8.9.0, "MIT", rust-embed-impl: 8.9.0, "MIT", rust-embed-utils: 8.9.0, "MIT", rust-ini: 0.21.3, "MIT", +rust-s3: 0.37.1, "MIT", rust_decimal: 1.39.0, "MIT", rustc-hash: 2.1.1, "Apache-2.0 OR MIT", rustc_version: 0.4.1, "Apache-2.0 OR MIT", @@ -689,7 +690,6 @@ rustls-platform-verifier: 0.6.2, "Apache-2.0 OR MIT", rustls-platform-verifier-android: 0.1.1, "Apache-2.0 OR MIT", rustls-webpki: 0.103.8, "ISC", rustversion: 1.0.22, "Apache-2.0 OR MIT", -rust-s3: 0.37.1, "MIT", rxml: 0.11.1, "MIT", rxml_validation: 0.11.0, "MIT", ryu: 1.0.22, "Apache-2.0 OR BSL-1.0", From 043470dfaf0f64e9fdee6f4c3b2719444e8a0fa5 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Wed, 14 Jan 2026 19:23:24 +0530 Subject: [PATCH 04/11] feat(connectors): add integration e2e test for Redshift sink connector - Add Redshift sink integration test using PostgreSQL (Redshift-compatible) and LocalStack for S3 - Add s3_endpoint config option to support custom endpoints (LocalStack, MinIO) - Add path-style S3 access for custom endpoints - Add localstack feature to testcontainers-modules - Create test configuration files for Redshift connector --- .../sinks/redshift_sink/src/config.rs | 6 + .../connectors/sinks/redshift_sink/src/lib.rs | 7 +- core/connectors/sinks/redshift_sink/src/s3.rs | 20 ++- core/integration/Cargo.toml | 2 +- core/integration/tests/connectors/mod.rs | 1 + .../tests/connectors/redshift/config.toml | 20 +++ .../redshift/connectors_config/redshift.toml | 44 ++++++ .../tests/connectors/redshift/mod.rs | 130 ++++++++++++++++++ .../connectors/redshift/redshift_sink.rs | 24 ++++ 9 files changed, 247 insertions(+), 7 deletions(-) create mode 100644 core/integration/tests/connectors/redshift/config.toml create mode 100644 core/integration/tests/connectors/redshift/connectors_config/redshift.toml create mode 100644 core/integration/tests/connectors/redshift/mod.rs create mode 100644 core/integration/tests/connectors/redshift/redshift_sink.rs diff --git a/core/connectors/sinks/redshift_sink/src/config.rs b/core/connectors/sinks/redshift_sink/src/config.rs index f13da6a434..5354231755 100644 --- a/core/connectors/sinks/redshift_sink/src/config.rs +++ b/core/connectors/sinks/redshift_sink/src/config.rs @@ -48,6 +48,11 @@ pub struct RedshiftSinkConfig { /// Example: `us-east-1` pub s3_region: String, + /// Custom S3 endpoint URL for testing with LocalStack or MinIO. + /// If not specified, uses the default AWS S3 endpoint. + /// Example: `http://localhost:4566` + pub s3_endpoint: Option, + /// AWS access key ID. Required if IAM role is not specified. pub aws_access_key_id: Option, @@ -161,6 +166,7 @@ mod tests { s3_bucket: "bucket".to_string(), s3_prefix: None, s3_region: "us-east-1".to_string(), + s3_endpoint: None, aws_access_key_id: None, aws_secret_access_key: None, batch_size: None, diff --git a/core/connectors/sinks/redshift_sink/src/lib.rs b/core/connectors/sinks/redshift_sink/src/lib.rs index 8502d8a99f..f29f9a298c 100644 --- a/core/connectors/sinks/redshift_sink/src/lib.rs +++ b/core/connectors/sinks/redshift_sink/src/lib.rs @@ -97,11 +97,14 @@ impl RedshiftSink { &self.config.s3_region, self.config.aws_access_key_id.as_deref(), self.config.aws_secret_access_key.as_deref(), + self.config.s3_endpoint.as_deref(), )?; self.s3_uploader = Some(uploader); info!( - "Initialized S3 uploader for bucket: {}, region: {}", - self.config.s3_bucket, self.config.s3_region + "Initialized S3 uploader for bucket: {}, region: {}{}", + self.config.s3_bucket, + self.config.s3_region, + self.config.s3_endpoint.as_ref().map_or(String::new(), |e| format!(", endpoint: {}", e)) ); Ok(()) } diff --git a/core/connectors/sinks/redshift_sink/src/s3.rs b/core/connectors/sinks/redshift_sink/src/s3.rs index b49aaf0ddc..c6f1d94760 100644 --- a/core/connectors/sinks/redshift_sink/src/s3.rs +++ b/core/connectors/sinks/redshift_sink/src/s3.rs @@ -38,10 +38,17 @@ impl S3Uploader { region: &str, access_key_id: Option<&str>, secret_access_key: Option<&str>, + endpoint: Option<&str>, ) -> Result { - let region = Region::Custom { - region: region.to_string(), - endpoint: format!("https://s3.{}.amazonaws.com", region), + let region = match endpoint { + Some(ep) => Region::Custom { + region: region.to_string(), + endpoint: ep.to_string(), + }, + None => Region::Custom { + region: region.to_string(), + endpoint: format!("https://s3.{}.amazonaws.com", region), + }, }; let credentials = match (access_key_id, secret_access_key) { @@ -60,11 +67,16 @@ impl S3Uploader { } }; - let bucket = Bucket::new(bucket_name, region, credentials).map_err(|e| { + let mut bucket = Bucket::new(bucket_name, region, credentials).map_err(|e| { error!("Failed to create S3 bucket client: {}", e); Error::InitError(format!("Failed to initialize S3 bucket: {e}")) })?; + // Use path-style access for custom endpoints (LocalStack, MinIO, etc.) + if endpoint.is_some() { + bucket = bucket.with_path_style(); + } + let prefix = prefix.trim_end_matches('/').to_string(); Ok(S3Uploader { bucket, prefix }) diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index bc41607109..dc36490bcf 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -61,7 +61,7 @@ server = { workspace = true } strum_macros = { workspace = true } tempfile = { workspace = true } test-case = { workspace = true } -testcontainers-modules = { version = "0.14.0", features = ["postgres"] } +testcontainers-modules = { version = "0.14.0", features = ["postgres", "localstack"] } tokio = { workspace = true } twox-hash = { workspace = true } uuid = { workspace = true } diff --git a/core/integration/tests/connectors/mod.rs b/core/integration/tests/connectors/mod.rs index 146de4d4bf..38bfc58ebf 100644 --- a/core/integration/tests/connectors/mod.rs +++ b/core/integration/tests/connectors/mod.rs @@ -30,6 +30,7 @@ use std::collections::HashMap; mod http_config_provider; mod postgres; mod random; +mod redshift; const DEFAULT_TEST_STREAM: &str = "test_stream"; const DEFAULT_TEST_TOPIC: &str = "test_topic"; diff --git a/core/integration/tests/connectors/redshift/config.toml b/core/integration/tests/connectors/redshift/config.toml new file mode 100644 index 0000000000..b24c908294 --- /dev/null +++ b/core/integration/tests/connectors/redshift/config.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "tests/connectors/redshift/connectors_config" diff --git a/core/integration/tests/connectors/redshift/connectors_config/redshift.toml b/core/integration/tests/connectors/redshift/connectors_config/redshift.toml new file mode 100644 index 0000000000..57f3e4f98e --- /dev/null +++ b/core/integration/tests/connectors/redshift/connectors_config/redshift.toml @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "redshift" +enabled = true +version = 0 +name = "Redshift sink" +path = "../../target/debug/libiggy_connector_redshift_sink" + +[[streams]] +stream = "test_stream" +topics = ["test_topic"] +schema = "json" +batch_length = 100 +poll_interval = "5ms" +consumer_group = "test" + +[plugin_config] +connection_string = "" +target_table = "iggy_messages" +s3_bucket = "iggy-redshift-staging" +s3_region = "us-east-1" +s3_prefix = "staging/" +s3_endpoint = "" +batch_size = 100 +max_connections = 5 +auto_create_table = true +include_metadata = true +delete_staged_files = true diff --git a/core/integration/tests/connectors/redshift/mod.rs b/core/integration/tests/connectors/redshift/mod.rs new file mode 100644 index 0000000000..8292f7e18d --- /dev/null +++ b/core/integration/tests/connectors/redshift/mod.rs @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::connectors::{ConnectorsRuntime, IggySetup, setup_runtime}; +use std::collections::HashMap; +use testcontainers_modules::{ + localstack::LocalStack, + postgres, + testcontainers::{ContainerAsync, runners::AsyncRunner}, +}; + +mod redshift_sink; + +/// Holds the test containers to keep them alive during tests. +struct RedshiftTestContainers { + _postgres: ContainerAsync, + _localstack: ContainerAsync, +} + +/// Setup result containing both runtime and containers. +struct RedshiftTestSetup { + runtime: ConnectorsRuntime, + _containers: RedshiftTestContainers, +} + +async fn setup() -> RedshiftTestSetup { + // Start PostgreSQL container (simulating Redshift as they share the same wire protocol) + let postgres_container = postgres::Postgres::default() + .start() + .await + .expect("Failed to start Postgres (Redshift simulator)"); + let postgres_port = postgres_container + .get_host_port_ipv4(5432) + .await + .expect("Failed to get Postgres port"); + + // Start LocalStack for S3 + let localstack_container = LocalStack::default() + .start() + .await + .expect("Failed to start LocalStack"); + let localstack_port = localstack_container + .get_host_port_ipv4(4566) + .await + .expect("Failed to get LocalStack port"); + + // Create S3 bucket using LocalStack S3 API + let s3_endpoint = format!("http://localhost:{localstack_port}"); + let bucket_name = "iggy-redshift-staging"; + + // Create the bucket via LocalStack S3 API using path-style URL + let client = reqwest::Client::new(); + let create_bucket_url = format!("{s3_endpoint}/{bucket_name}"); + let _ = client.put(&create_bucket_url).send().await; + + let mut envs = HashMap::new(); + let iggy_setup = IggySetup::default(); + + // Redshift connection (using PostgreSQL as simulator) + let connection_string = format!("postgres://postgres:postgres@localhost:{postgres_port}"); + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_CONFIG_CONNECTION_STRING".to_owned(), + connection_string, + ); + + // S3 configuration for staging + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_CONFIG_S3_BUCKET".to_owned(), + bucket_name.to_owned(), + ); + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_CONFIG_S3_REGION".to_owned(), + "us-east-1".to_owned(), + ); + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_CONFIG_S3_ENDPOINT".to_owned(), + s3_endpoint, + ); + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_CONFIG_AWS_ACCESS_KEY_ID".to_owned(), + "test".to_owned(), + ); + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_CONFIG_AWS_SECRET_ACCESS_KEY".to_owned(), + "test".to_owned(), + ); + + // Stream configuration + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_STREAMS_0_STREAM".to_owned(), + iggy_setup.stream.to_owned(), + ); + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_STREAMS_0_TOPICS_0".to_owned(), + iggy_setup.topic.to_owned(), + ); + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_STREAMS_0_SCHEMA".to_owned(), + "json".to_owned(), + ); + + let mut runtime = setup_runtime(); + runtime + .init("redshift/config.toml", Some(envs), iggy_setup) + .await; + + RedshiftTestSetup { + runtime, + _containers: RedshiftTestContainers { + _postgres: postgres_container, + _localstack: localstack_container, + }, + } +} diff --git a/core/integration/tests/connectors/redshift/redshift_sink.rs b/core/integration/tests/connectors/redshift/redshift_sink.rs new file mode 100644 index 0000000000..05174974a5 --- /dev/null +++ b/core/integration/tests/connectors/redshift/redshift_sink.rs @@ -0,0 +1,24 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::connectors::redshift::setup; + +#[tokio::test] +async fn given_valid_configuration_redshift_sink_connector_should_start() { + let _setup = setup().await; +} From db76d460f7f777d179a19a952d1d674e2bf62a73 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Thu, 15 Jan 2026 12:28:46 +0530 Subject: [PATCH 05/11] fix: add missing s3_endpoint field and update DEPENDENCIES.md - Add s3_endpoint: None to test_config() in lib.rs (fixes E0063) - Add endpoint parameter to S3Uploader tests in s3.rs - Fix formatting for long line in init_s3_uploader() - Add iggy_connector_redshift_sink to DEPENDENCIES.md - Add maybe-async, md5, minidom to DEPENDENCIES.md --- DEPENDENCIES.md | 4 ++++ core/connectors/sinks/redshift_sink/src/lib.rs | 6 +++++- core/connectors/sinks/redshift_sink/src/s3.rs | 3 +++ 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 9cb89abdd9..df9369d9a1 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -407,6 +407,7 @@ iggy_connector_postgres_sink: 0.1.0, "Apache-2.0", iggy_connector_postgres_source: 0.1.0, "Apache-2.0", iggy_connector_quickwit_sink: 0.1.0, "Apache-2.0", iggy_connector_random_source: 0.1.0, "Apache-2.0", +iggy_connector_redshift_sink: 0.1.0, "Apache-2.0", iggy_connector_sdk: 0.1.1-edge.1, "Apache-2.0", iggy_connector_stdout_sink: 0.1.0, "Apache-2.0", iggy_examples: 0.0.5, "Apache-2.0", @@ -489,7 +490,9 @@ macro_rules_attribute: 0.1.3, "MIT", macro_rules_attribute-proc_macro: 0.1.3, "MIT", matchers: 0.2.0, "MIT", matchit: 0.8.4, "BSD-3-Clause AND MIT", +maybe-async: 0.2.10, "MIT", md-5: 0.10.6, "Apache-2.0 OR MIT", +md5: 0.8.0, "Apache-2.0 OR MIT", memchr: 2.7.6, "MIT OR Unlicense", message_bus: 0.1.0, "Apache-2.0", metadata: 0.1.0, "Apache-2.0", @@ -499,6 +502,7 @@ mimalloc: 0.1.48, "MIT", mime: 0.3.17, "Apache-2.0 OR MIT", mime_guess: 2.0.5, "MIT", minimal-lexical: 0.2.1, "Apache-2.0 OR MIT", +minidom: 0.16.0, "MPL-2.0", miniz_oxide: 0.8.9, "Apache-2.0 OR MIT OR Zlib", mio: 1.1.1, "MIT", mockall: 0.14.0, "Apache-2.0 OR MIT", diff --git a/core/connectors/sinks/redshift_sink/src/lib.rs b/core/connectors/sinks/redshift_sink/src/lib.rs index f29f9a298c..99087cc822 100644 --- a/core/connectors/sinks/redshift_sink/src/lib.rs +++ b/core/connectors/sinks/redshift_sink/src/lib.rs @@ -104,7 +104,10 @@ impl RedshiftSink { "Initialized S3 uploader for bucket: {}, region: {}{}", self.config.s3_bucket, self.config.s3_region, - self.config.s3_endpoint.as_ref().map_or(String::new(), |e| format!(", endpoint: {}", e)) + self.config + .s3_endpoint + .as_ref() + .map_or(String::new(), |e| format!(", endpoint: {}", e)) ); Ok(()) } @@ -427,6 +430,7 @@ mod tests { s3_bucket: "test-bucket".to_string(), s3_prefix: Some("staging/".to_string()), s3_region: "us-east-1".to_string(), + s3_endpoint: None, aws_access_key_id: None, aws_secret_access_key: None, batch_size: Some(1000), diff --git a/core/connectors/sinks/redshift_sink/src/s3.rs b/core/connectors/sinks/redshift_sink/src/s3.rs index c6f1d94760..ed7aa82a6e 100644 --- a/core/connectors/sinks/redshift_sink/src/s3.rs +++ b/core/connectors/sinks/redshift_sink/src/s3.rs @@ -163,6 +163,7 @@ mod tests { "us-east-1", Some("AKIAIOSFODNN7EXAMPLE"), Some("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"), + None, ); assert!(result.is_ok()); } @@ -175,6 +176,7 @@ mod tests { "us-east-1", Some("AKIAIOSFODNN7EXAMPLE"), Some("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"), + None, ) .unwrap(); @@ -189,6 +191,7 @@ mod tests { "us-east-1", Some("AKIAIOSFODNN7EXAMPLE"), Some("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"), + None, ) .unwrap(); From 5e17c196815ff87ab4227e5d906a3b6ac3c50a76 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Thu, 15 Jan 2026 12:39:19 +0530 Subject: [PATCH 06/11] fix: address Copilot review feedback Critical fixes: - Change Rust edition from 2024 to 2021 in Cargo.toml - Fix S3 cleanup to happen regardless of COPY result (prevents orphaned files) Moderate fixes: - Remove zstd from valid compression options (not supported by Redshift) - Update README to remove zstd from compression list - Handle bucket creation error in integration tests with expect() - Log JSON serialization errors instead of silent unwrap_or_default() Performance: - Cache escaped quote string to avoid repeated format! allocations Windows compatibility (for local testing): - Add #[cfg(unix)] conditionals for Unix-specific code in sender/mod.rs --- core/common/src/sender/mod.rs | 12 ++++++++- .../connectors/sinks/redshift_sink/Cargo.toml | 2 +- core/connectors/sinks/redshift_sink/README.md | 2 +- .../sinks/redshift_sink/src/config.rs | 4 +-- .../connectors/sinks/redshift_sink/src/lib.rs | 27 ++++++++++++------- .../tests/connectors/redshift/mod.rs | 6 ++++- 6 files changed, 38 insertions(+), 15 deletions(-) diff --git a/core/common/src/sender/mod.rs b/core/common/src/sender/mod.rs index 27c98d8900..d1bb565e83 100644 --- a/core/common/src/sender/mod.rs +++ b/core/common/src/sender/mod.rs @@ -37,8 +37,11 @@ use compio::net::TcpStream; use compio_quic::{RecvStream, SendStream}; use compio_tls::TlsStream; use std::future::Future; +#[cfg(unix)] use std::os::fd::{AsFd, OwnedFd}; -use tracing::{debug, error}; +use tracing::debug; +#[cfg(unix)] +use tracing::error; macro_rules! forward_async_methods { ( @@ -117,6 +120,7 @@ impl SenderKind { Self::WebSocketTls(stream) } + #[cfg(unix)] pub fn take_and_migrate_tcp(&mut self) -> Option { match self { SenderKind::Tcp(tcp_sender) => { @@ -137,6 +141,12 @@ impl SenderKind { } } + #[cfg(not(unix))] + pub fn take_and_migrate_tcp(&mut self) -> Option<()> { + // Socket migration is not supported on non-Unix platforms + None + } + forward_async_methods! { async fn read(&mut self, buffer: B) -> (Result<(), IggyError>, B); async fn send_empty_ok_response(&mut self) -> Result<(), IggyError>; diff --git a/core/connectors/sinks/redshift_sink/Cargo.toml b/core/connectors/sinks/redshift_sink/Cargo.toml index 47c5155cef..d4b21cbc87 100644 --- a/core/connectors/sinks/redshift_sink/Cargo.toml +++ b/core/connectors/sinks/redshift_sink/Cargo.toml @@ -19,7 +19,7 @@ name = "iggy_connector_redshift_sink" version = "0.1.0" description = "Iggy Redshift sink connector for loading stream messages into Amazon Redshift via S3 staging" -edition = "2024" +edition = "2021" license = "Apache-2.0" keywords = ["iggy", "messaging", "streaming", "redshift", "sink", "aws"] categories = ["command-line-utilities", "database", "network-programming"] diff --git a/core/connectors/sinks/redshift_sink/README.md b/core/connectors/sinks/redshift_sink/README.md index d7a57299bd..cc49d6442a 100644 --- a/core/connectors/sinks/redshift_sink/README.md +++ b/core/connectors/sinks/redshift_sink/README.md @@ -101,7 +101,7 @@ include_metadata = false | `csv_delimiter` | Char | No | `,` | CSV field delimiter | | `csv_quote` | Char | No | `"` | CSV quote character | | `max_errors` | Integer | No | `0` | Max errors before COPY fails | -| `compression` | String | No | `none` | Compression: `gzip`, `lzop`, `bzip2`, `zstd` | +| `compression` | String | No | `none` | Compression: `gzip`, `lzop`, `bzip2` | | `delete_staged_files` | Boolean | No | `true` | Delete S3 files after successful COPY | | `max_connections` | Integer | No | `5` | Max Redshift connections | | `max_retries` | Integer | No | `3` | Max retry attempts for failures | diff --git a/core/connectors/sinks/redshift_sink/src/config.rs b/core/connectors/sinks/redshift_sink/src/config.rs index 5354231755..5a6c8adfcc 100644 --- a/core/connectors/sinks/redshift_sink/src/config.rs +++ b/core/connectors/sinks/redshift_sink/src/config.rs @@ -144,7 +144,7 @@ impl RedshiftSinkConfig { // Validate compression if specified if let Some(compression) = &self.compression { - let valid = ["gzip", "lzop", "bzip2", "none", "zstd"]; + let valid = ["gzip", "lzop", "bzip2", "none"]; if !valid.contains(&compression.to_lowercase().as_str()) { return Err(Error::InvalidConfig); } @@ -230,7 +230,7 @@ mod tests { #[test] fn test_valid_compression_options() { - for comp in ["gzip", "GZIP", "lzop", "bzip2", "none", "zstd"] { + for comp in ["gzip", "GZIP", "lzop", "bzip2", "none"] { let mut config = valid_config(); config.compression = Some(comp.to_string()); assert!( diff --git a/core/connectors/sinks/redshift_sink/src/lib.rs b/core/connectors/sinks/redshift_sink/src/lib.rs index 99087cc822..663851742d 100644 --- a/core/connectors/sinks/redshift_sink/src/lib.rs +++ b/core/connectors/sinks/redshift_sink/src/lib.rs @@ -22,10 +22,10 @@ mod s3; use async_trait::async_trait; use config::RedshiftSinkConfig; use iggy_connector_sdk::{ - ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, sink_connector, + sink_connector, ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, }; use s3::S3Uploader; -use sqlx::{Pool, Postgres, postgres::PgPoolOptions}; +use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; use std::time::Duration; use tokio::sync::Mutex; use tracing::{error, info, warn}; @@ -194,13 +194,14 @@ impl RedshiftSink { // Execute COPY command let copy_result = self.execute_copy(pool, &s3_path).await; - // Cleanup S3 file if configured - if self.config.delete_staged_files.unwrap_or(true) - && let Err(e) = s3_uploader.delete_file(&s3_key).await - { - warn!("Failed to delete staged file {}: {}", s3_key, e); + // Cleanup S3 file if configured - always attempt cleanup regardless of COPY result + if self.config.delete_staged_files.unwrap_or(true) { + if let Err(e) = s3_uploader.delete_file(&s3_key).await { + warn!("Failed to delete staged file {}: {}", s3_key, e); + } } + // Return COPY result after cleanup copy_result?; let mut state = self.state.lock().await; @@ -230,10 +231,18 @@ impl RedshiftSink { let include_metadata = self.config.include_metadata.unwrap_or(false); let mut csv_output = Vec::new(); + // Pre-allocate the escaped quote string for performance + let escaped_quote = format!("{quote}{quote}"); for message in messages { let payload_str = match &message.payload { - Payload::Json(value) => simd_json::to_string(value).unwrap_or_default(), + Payload::Json(value) => simd_json::to_string(value).unwrap_or_else(|e| { + warn!( + "Failed to serialize JSON payload for message {}: {}", + message.id, e + ); + String::new() + }), Payload::Text(text) => text.clone(), Payload::Raw(bytes) => String::from_utf8_lossy(bytes).to_string(), _ => { @@ -246,7 +255,7 @@ impl RedshiftSink { }; // Escape quotes in payload - let escaped_payload = payload_str.replace(quote, &format!("{quote}{quote}")); + let escaped_payload = payload_str.replace(quote, &escaped_quote); let mut row = format!( "{}{delim}{quote}{payload}{quote}", diff --git a/core/integration/tests/connectors/redshift/mod.rs b/core/integration/tests/connectors/redshift/mod.rs index 8292f7e18d..09ad54efeb 100644 --- a/core/integration/tests/connectors/redshift/mod.rs +++ b/core/integration/tests/connectors/redshift/mod.rs @@ -67,7 +67,11 @@ async fn setup() -> RedshiftTestSetup { // Create the bucket via LocalStack S3 API using path-style URL let client = reqwest::Client::new(); let create_bucket_url = format!("{s3_endpoint}/{bucket_name}"); - let _ = client.put(&create_bucket_url).send().await; + client + .put(&create_bucket_url) + .send() + .await + .expect("Failed to create S3 bucket in LocalStack"); let mut envs = HashMap::new(); let iggy_setup = IggySetup::default(); From 1e9e9c7cd44659d112eb6a880f17b81bb762cd34 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Thu, 15 Jan 2026 17:29:50 +0530 Subject: [PATCH 07/11] fix(integration): add #[allow(dead_code)] to RedshiftTestSetup struct Fixes clippy warning about unused 'runtime' field in test setup struct. The runtime field is kept for future test expansion. --- core/integration/tests/connectors/redshift/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/core/integration/tests/connectors/redshift/mod.rs b/core/integration/tests/connectors/redshift/mod.rs index 09ad54efeb..c318e0a424 100644 --- a/core/integration/tests/connectors/redshift/mod.rs +++ b/core/integration/tests/connectors/redshift/mod.rs @@ -34,6 +34,7 @@ struct RedshiftTestContainers { } /// Setup result containing both runtime and containers. +#[allow(dead_code)] struct RedshiftTestSetup { runtime: ConnectorsRuntime, _containers: RedshiftTestContainers, From 0a49fbafc9b78b3575c486a3081b3131033c20b8 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Fri, 16 Jan 2026 14:00:43 +0530 Subject: [PATCH 08/11] Update core/connectors/sinks/redshift_sink/src/lib.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- core/connectors/sinks/redshift_sink/src/lib.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/connectors/sinks/redshift_sink/src/lib.rs b/core/connectors/sinks/redshift_sink/src/lib.rs index 663851742d..9c87ae47a7 100644 --- a/core/connectors/sinks/redshift_sink/src/lib.rs +++ b/core/connectors/sinks/redshift_sink/src/lib.rs @@ -265,9 +265,13 @@ impl RedshiftSink { ); if include_metadata { - let timestamp_secs = message.timestamp / 1_000_000; - let timestamp = chrono::DateTime::from_timestamp(timestamp_secs as i64, 0) - .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string()) + // `message.timestamp` is in microseconds. Preserve microsecond precision + // by converting to seconds and nanoseconds for `from_timestamp`. + let timestamp_micros = message.timestamp; + let timestamp_secs = (timestamp_micros / 1_000_000) as i64; + let timestamp_nanos = ((timestamp_micros % 1_000_000) as u32) * 1_000; + let timestamp = chrono::DateTime::from_timestamp(timestamp_secs, timestamp_nanos) + .map(|dt| dt.format("%Y-%m-%d %H:%M:%S%.6f").to_string()) .unwrap_or_default(); row.push_str(&format!( From 898b58d11cbdd3a68ca04e2d08c45cc0fab59ac6 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Sat, 31 Jan 2026 13:42:18 +0530 Subject: [PATCH 09/11] chore: update DEPENDENCIES.md and Cargo.lock after merge --- Cargo.lock | 10 +-- DEPENDENCIES.md | 159 +++++++++++++++++++++++++----------------------- 2 files changed, 87 insertions(+), 82 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a21f9b9c0a..21600b0e33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -889,7 +889,7 @@ dependencies = [ "serde", "serde_json", "url", - "webpki-roots 1.0.4", + "webpki-roots 1.0.5", ] [[package]] @@ -919,7 +919,7 @@ dependencies = [ "quick-xml 0.38.4", "rust-ini", "serde", - "thiserror 2.0.17", + "thiserror 2.0.18", "time", "url", ] @@ -952,7 +952,7 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "838b36c8dc927b6db1b6c6b8f5d05865f2213550b9e83bf92fa99ed6525472c0" dependencies = [ - "thiserror 2.0.17", + "thiserror 2.0.18", ] [[package]] @@ -5717,7 +5717,7 @@ checksum = "5cf92c10c7e361d6b99666ec1c6f9805b0bea2c3bd8c78dc6fe98ac5bd78db11" dependencies = [ "proc-macro2", "quote", - "syn 2.0.111", + "syn 2.0.114", ] [[package]] @@ -7849,7 +7849,7 @@ dependencies = [ "serde_json", "sha2", "sysinfo 0.37.2", - "thiserror 2.0.17", + "thiserror 2.0.18", "time", "tokio", "tokio-stream", diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 9283c233c7..5c6444b12d 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -19,9 +19,9 @@ aes-gcm: 0.10.3, "Apache-2.0 OR MIT", ahash: 0.7.8, "Apache-2.0 OR MIT", ahash: 0.8.12, "Apache-2.0 OR MIT", aho-corasick: 1.1.4, "MIT OR Unlicense", +allocator-api2: 0.2.21, "Apache-2.0 OR MIT", alloc-no-stdlib: 2.0.4, "BSD-3-Clause", alloc-stdlib: 0.2.2, "BSD-3-Clause", -allocator-api2: 0.2.21, "Apache-2.0 OR MIT", android_system_properties: 0.1.5, "Apache-2.0 OR MIT", anstream: 0.6.21, "Apache-2.0 OR MIT", anstyle: 1.0.13, "Apache-2.0 OR MIT", @@ -52,6 +52,7 @@ asn1-rs-derive: 0.6.0, "Apache-2.0 OR MIT", asn1-rs-impl: 0.2.0, "Apache-2.0 OR MIT", assert_cmd: 2.1.2, "Apache-2.0 OR MIT", astral-tokio-tar: 0.5.6, "Apache-2.0 OR MIT", +async_zip: 0.0.18, "MIT", async-broadcast: 0.7.2, "Apache-2.0 OR MIT", async-channel: 2.5.0, "Apache-2.0 OR MIT", async-compression: 0.4.37, "Apache-2.0 OR MIT", @@ -64,15 +65,17 @@ async-stream: 0.3.6, "MIT", async-stream-impl: 0.3.6, "MIT", async-task: 4.7.1, "Apache-2.0 OR MIT", async-trait: 0.1.89, "Apache-2.0 OR MIT", -async_zip: 0.0.18, "MIT", atoi: 2.0.0, "MIT", atomic: 0.6.1, "Apache-2.0 OR MIT", atomic-polyfill: 1.0.3, "Apache-2.0 OR MIT", atomic-waker: 1.1.2, "Apache-2.0 OR MIT", +attohttpc: 0.30.1, "MPL-2.0", autocfg: 1.5.0, "Apache-2.0 OR MIT", autotools: 0.2.7, "MIT", +aws-creds: 0.39.1, "MIT", aws-lc-rs: 1.15.2, "(Apache-2.0 OR ISC) AND ISC", aws-lc-sys: 0.35.0, "(Apache-2.0 OR ISC) AND ISC AND OpenSSL", +aws-region: 0.28.1, "MIT", axum: 0.8.8, "MIT", axum-core: 0.5.6, "MIT", axum-macros: 0.5.0, "MIT", @@ -91,15 +94,15 @@ bench-runner: 0.1.0, "Apache-2.0", bigdecimal: 0.4.10, "Apache-2.0 OR MIT", bimap: 0.6.3, "Apache-2.0 OR MIT", bincode: 1.3.3, "MIT", -bit-set: 0.8.0, "Apache-2.0 OR MIT", -bit-vec: 0.8.0, "Apache-2.0 OR MIT", bitflags: 1.3.2, "Apache-2.0 OR MIT", bitflags: 2.10.0, "Apache-2.0 OR MIT", +bit-set: 0.8.0, "Apache-2.0 OR MIT", bitvec: 1.0.1, "MIT", +bit-vec: 0.8.0, "Apache-2.0 OR MIT", blake2: 0.10.6, "Apache-2.0 OR MIT", blake3: 1.8.3, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR CC0-1.0", -block-buffer: 0.10.4, "Apache-2.0 OR MIT", block2: 0.6.2, "MIT", +block-buffer: 0.10.4, "Apache-2.0 OR MIT", bollard: 0.19.4, "Apache-2.0", bollard-buildkit-proto: 0.7.0, "Apache-2.0", bollard-stubs: 1.49.1-rc.28.4.0, "Apache-2.0", @@ -111,7 +114,6 @@ brotli: 8.0.2, "BSD-3-Clause AND MIT", brotli-decompressor: 5.0.0, "BSD-3-Clause OR MIT", bstr: 1.12.1, "Apache-2.0 OR MIT", bumpalo: 3.19.1, "Apache-2.0 OR MIT", -byte-unit: 5.2.0, "MIT", bytecheck: 0.6.12, "MIT", bytecheck_derive: 0.6.12, "MIT", bytecount: 0.6.9, "Apache-2.0 OR MIT", @@ -119,14 +121,16 @@ bytemuck: 1.24.0, "Apache-2.0 OR MIT OR Zlib", byteorder: 1.5.0, "MIT OR Unlicense", bytes: 1.11.0, "MIT", bytestring: 1.5.0, "Apache-2.0 OR MIT", +byte-unit: 5.2.0, "MIT", bzip2: 0.6.1, "Apache-2.0 OR MIT", camino: 1.2.2, "Apache-2.0 OR MIT", -cargo-platform: 0.3.2, "Apache-2.0 OR MIT", cargo_metadata: 0.23.1, "MIT", +cargo-platform: 0.3.2, "Apache-2.0 OR MIT", +castaway: 0.2.4, "MIT", cc: 1.2.52, "Apache-2.0 OR MIT", cesu8: 1.1.0, "Apache-2.0 OR MIT", -cfg-if: 1.0.4, "Apache-2.0 OR MIT", cfg_aliases: 0.2.1, "MIT", +cfg-if: 1.0.4, "Apache-2.0 OR MIT", charming: 0.6.0, "Apache-2.0 OR MIT", charming_macros: 0.1.0, "Apache-2.0 OR MIT", chrono: 0.4.43, "Apache-2.0 OR MIT", @@ -136,13 +140,14 @@ clap_builder: 4.5.54, "Apache-2.0 OR MIT", clap_complete: 4.5.65, "Apache-2.0 OR MIT", clap_derive: 4.5.49, "Apache-2.0 OR MIT", clap_lex: 0.7.7, "Apache-2.0 OR MIT", -clock: 0.1.0, "N/A", +clock: 0.1.0, "", cmake: 0.1.57, "Apache-2.0 OR MIT", cobs: 0.3.0, "Apache-2.0 OR MIT", colorchoice: 1.0.4, "Apache-2.0 OR MIT", colored: 3.1.1, "MPL-2.0", combine: 4.6.7, "MIT", comfy-table: 7.2.2, "MIT", +compact_str: 0.7.1, "MIT", compio: 0.17.0, "MIT", compio-buf: 0.7.2, "MIT", compio-driver: 0.10.0, "MIT", @@ -163,21 +168,21 @@ configs_derive: 0.1.0, "Apache-2.0", consensus: 0.1.0, "Apache-2.0", console: 0.16.2, "MIT", console_error_panic_hook: 0.1.7, "Apache-2.0 OR MIT", +constant_time_eq: 0.4.2, "Apache-2.0 OR CC0-1.0 OR MIT-0", const-oid: 0.9.6, "Apache-2.0 OR MIT", const-random: 0.1.18, "Apache-2.0 OR MIT", const-random-macro: 0.1.16, "Apache-2.0 OR MIT", -constant_time_eq: 0.4.2, "Apache-2.0 OR CC0-1.0 OR MIT-0", -convert_case: 0.6.0, "MIT", convert_case: 0.10.0, "MIT", +convert_case: 0.6.0, "MIT", cookie: 0.16.2, "Apache-2.0 OR MIT", +core_affinity: 0.8.3, "Apache-2.0 OR MIT", core-foundation: 0.10.1, "Apache-2.0 OR MIT", core-foundation-sys: 0.8.7, "Apache-2.0 OR MIT", -core_affinity: 0.8.3, "Apache-2.0 OR MIT", cpufeatures: 0.2.17, "Apache-2.0 OR MIT", crc: 3.4.0, "Apache-2.0 OR MIT", -crc-catalog: 2.4.0, "Apache-2.0 OR MIT", crc32c: 0.6.8, "Apache-2.0 OR MIT", crc32fast: 1.5.0, "Apache-2.0 OR MIT", +crc-catalog: 2.4.0, "Apache-2.0 OR MIT", critical-section: 1.2.0, "Apache-2.0 OR MIT", crossbeam: 0.8.4, "Apache-2.0 OR MIT", crossbeam-channel: 0.5.15, "Apache-2.0 OR MIT", @@ -216,14 +221,14 @@ data-encoding: 2.10.0, "MIT", dbus: 0.9.10, "Apache-2.0 OR MIT", dbus-secret-service: 4.1.0, "Apache-2.0 OR MIT", der: 0.7.10, "Apache-2.0 OR MIT", -der-parser: 10.0.0, "Apache-2.0 OR MIT", deranged: 0.5.5, "Apache-2.0 OR MIT", -derive-new: 0.7.0, "MIT", derive_builder: 0.20.2, "Apache-2.0 OR MIT", derive_builder_core: 0.20.2, "Apache-2.0 OR MIT", derive_builder_macro: 0.20.2, "Apache-2.0 OR MIT", derive_more: 2.1.1, "MIT", derive_more-impl: 2.1.1, "MIT", +derive-new: 0.7.0, "MIT", +der-parser: 10.0.0, "Apache-2.0 OR MIT", difflib: 0.4.0, "MIT", digest: 0.10.7, "Apache-2.0 OR MIT", dircpy: 0.3.19, "MIT", @@ -262,14 +267,14 @@ err_trail: 0.11.0, "Apache-2.0", errno: 0.3.14, "Apache-2.0 OR MIT", error_set: 0.9.1, "Apache-2.0", error_set_impl: 0.9.1, "Apache-2.0", -etcetera: 0.8.0, "Apache-2.0 OR MIT", etcetera: 0.11.0, "Apache-2.0 OR MIT", +etcetera: 0.8.0, "Apache-2.0 OR MIT", event-listener: 5.4.1, "Apache-2.0 OR MIT", event-listener-strategy: 0.5.4, "Apache-2.0 OR MIT", expect-test: 1.5.1, "Apache-2.0 OR MIT", +extension-traits: 1.0.1, "Apache-2.0 OR MIT OR Zlib", ext-trait: 1.0.1, "Apache-2.0 OR MIT OR Zlib", ext-trait-proc_macros: 1.0.1, "Apache-2.0 OR MIT OR Zlib", -extension-traits: 1.0.1, "Apache-2.0 OR MIT OR Zlib", fastbloom: 0.14.1, "Apache-2.0 OR MIT", fastrand: 2.3.0, "Apache-2.0 OR MIT", ferroid: 0.8.9, "Apache-2.0 OR MIT", @@ -292,8 +297,8 @@ foreign-types: 0.3.2, "Apache-2.0 OR MIT", foreign-types-shared: 0.1.1, "Apache-2.0 OR MIT", form_urlencoded: 1.2.2, "Apache-2.0 OR MIT", fragile: 2.0.1, "Apache-2.0", -fs-err: 3.2.2, "Apache-2.0 OR MIT", fs_extra: 1.3.0, "MIT", +fs-err: 3.2.2, "Apache-2.0 OR MIT", fsevent-sys: 4.1.0, "MIT", funty: 2.0.0, "MIT", futures: 0.3.31, "Apache-2.0 OR MIT", @@ -355,21 +360,21 @@ home: 0.5.12, "Apache-2.0 OR MIT", hostname: 0.4.2, "MIT", http: 0.2.12, "Apache-2.0 OR MIT", http: 1.4.0, "Apache-2.0 OR MIT", +httparse: 1.10.1, "Apache-2.0 OR MIT", http-body: 1.0.1, "MIT", http-body-util: 0.1.3, "MIT", -http-range: 0.1.5, "MIT", -httparse: 1.10.1, "Apache-2.0 OR MIT", httpdate: 1.0.3, "Apache-2.0 OR MIT", +http-range: 0.1.5, "MIT", human-repr: 1.1.0, "MIT", humantime: 2.3.0, "Apache-2.0 OR MIT", hwlocality: 1.0.0-alpha.11, "MIT", hwlocality-sys: 0.6.4, "MIT", hyper: 1.8.1, "MIT", +hyperlocal: 0.9.1, "MIT", hyper-named-pipe: 0.1.0, "Apache-2.0", hyper-rustls: 0.27.7, "Apache-2.0 OR ISC OR MIT", hyper-timeout: 0.5.2, "Apache-2.0 OR MIT", hyper-util: 0.1.19, "MIT", -hyperlocal: 0.9.1, "MIT", iana-time-zone: 0.1.64, "Apache-2.0 OR MIT", iana-time-zone-haiku: 0.1.2, "Apache-2.0 OR MIT", iceberg: 0.8.0, "Apache-2.0", @@ -385,11 +390,6 @@ ident_case: 1.0.1, "Apache-2.0 OR MIT", idna: 1.1.0, "Apache-2.0 OR MIT", idna_adapter: 1.2.1, "Apache-2.0 OR MIT", iggy: 0.8.2-edge.2, "Apache-2.0", -iggy-bench: 0.3.2-edge.1, "Apache-2.0", -iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0", -iggy-cli: 0.10.2-edge.1, "Apache-2.0", -iggy-connectors: 0.2.2-edge.1, "Apache-2.0", -iggy-mcp: 0.2.2-edge.1, "Apache-2.0", iggy_binary_protocol: 0.8.2-edge.2, "Apache-2.0", iggy_common: 0.8.2-edge.2, "Apache-2.0", iggy_connector_elasticsearch_sink: 0.2.1-edge.1, "Apache-2.0", @@ -403,10 +403,15 @@ iggy_connector_redshift_sink: 0.1.0, "Apache-2.0", iggy_connector_sdk: 0.1.2-edge.1, "Apache-2.0", iggy_connector_stdout_sink: 0.2.1-edge.1, "Apache-2.0", iggy_examples: 0.0.6, "Apache-2.0", +iggy-bench: 0.3.2-edge.1, "Apache-2.0", +iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0", +iggy-cli: 0.10.2-edge.1, "Apache-2.0", +iggy-connectors: 0.2.2-edge.1, "Apache-2.0", +iggy-mcp: 0.2.2-edge.1, "Apache-2.0", ignore: 0.4.25, "MIT OR Unlicense", -impl-more: 0.1.9, "Apache-2.0 OR MIT", implicit-clone: 0.6.0, "Apache-2.0 OR MIT", implicit-clone-derive: 0.1.2, "Apache-2.0 OR MIT", +impl-more: 0.1.9, "Apache-2.0 OR MIT", indexmap: 1.9.3, "Apache-2.0 OR MIT", indexmap: 2.13.0, "Apache-2.0 OR MIT", inflections: 1.1.1, "MIT", @@ -417,8 +422,8 @@ inout: 0.1.4, "Apache-2.0 OR MIT", integer-encoding: 3.0.4, "MIT", integration: 0.0.1, "Apache-2.0", inventory: 0.3.21, "Apache-2.0 OR MIT", -io-uring: 0.7.11, "Apache-2.0 OR MIT", io_uring_buf_ring: 0.2.3, "MIT", +io-uring: 0.7.11, "Apache-2.0 OR MIT", ipnet: 2.11.0, "Apache-2.0 OR MIT", iri-string: 0.7.10, "Apache-2.0 OR MIT", is_terminal_polyfill: 1.70.2, "Apache-2.0 OR MIT", @@ -433,8 +438,8 @@ jni: 0.21.1, "Apache-2.0 OR MIT", jni-sys: 0.3.0, "Apache-2.0 OR MIT", jobserver: 0.1.34, "Apache-2.0 OR MIT", journal: 0.1.0, "Apache-2.0", -js-sys: 0.3.83, "Apache-2.0 OR MIT", jsonwebtoken: 10.2.0, "MIT", +js-sys: 0.3.83, "Apache-2.0 OR MIT", jwalk: 0.8.1, "MIT", keccak: 0.1.5, "Apache-2.0 OR MIT", keyring: 3.6.3, "Apache-2.0 OR MIT", @@ -481,8 +486,8 @@ macro_rules_attribute-proc_macro: 0.1.3, "MIT", matchers: 0.2.0, "MIT", matchit: 0.8.4, "BSD-3-Clause AND MIT", maybe-async: 0.2.10, "MIT", -md-5: 0.10.6, "Apache-2.0 OR MIT", md5: 0.8.0, "Apache-2.0 OR MIT", +md-5: 0.10.6, "Apache-2.0 OR MIT", memchr: 2.7.6, "MIT OR Unlicense", message_bus: 0.1.0, "Apache-2.0", metadata: 0.1.0, "Apache-2.0", @@ -491,8 +496,8 @@ miette-derive: 7.6.0, "Apache-2.0", mimalloc: 0.1.48, "MIT", mime: 0.3.17, "Apache-2.0 OR MIT", mime_guess: 2.0.5, "MIT", -minimal-lexical: 0.2.1, "Apache-2.0 OR MIT", minidom: 0.16.0, "MPL-2.0", +minimal-lexical: 0.2.1, "Apache-2.0 OR MIT", miniz_oxide: 0.8.9, "Apache-2.0 OR MIT OR Zlib", mio: 1.1.1, "MIT", mockall: 0.14.0, "Apache-2.0 OR MIT", @@ -515,6 +520,8 @@ nougat-proc_macros: 0.2.4, "Apache-2.0 OR MIT OR Zlib", ntapi: 0.4.2, "Apache-2.0 OR MIT", nu-ansi-term: 0.50.3, "MIT", num: 0.4.3, "Apache-2.0 OR MIT", +num_cpus: 1.17.0, "Apache-2.0 OR MIT", +num_threads: 0.1.7, "Apache-2.0 OR MIT", num-bigint: 0.4.6, "Apache-2.0 OR MIT", num-bigint-dig: 0.8.6, "Apache-2.0 OR MIT", num-complex: 0.4.6, "Apache-2.0 OR MIT", @@ -525,8 +532,6 @@ num-modular: 0.6.1, "Apache-2.0", num-order: 1.2.0, "Apache-2.0", num-rational: 0.4.2, "Apache-2.0 OR MIT", num-traits: 0.2.19, "Apache-2.0 OR MIT", -num_cpus: 1.17.0, "Apache-2.0 OR MIT", -num_threads: 0.1.7, "Apache-2.0 OR MIT", objc2: 0.6.3, "MIT", objc2-core-foundation: 0.3.2, "Apache-2.0 OR MIT OR Zlib", objc2-encode: 4.1.0, "MIT", @@ -543,12 +548,12 @@ openssl-probe: 0.2.0, "Apache-2.0 OR MIT", openssl-src: 300.5.4+3.5.4, "Apache-2.0 OR MIT", openssl-sys: 0.9.111, "MIT", opentelemetry: 0.31.0, "Apache-2.0", +opentelemetry_sdk: 0.31.0, "Apache-2.0", opentelemetry-appender-tracing: 0.31.1, "Apache-2.0", opentelemetry-http: 0.31.0, "Apache-2.0", opentelemetry-otlp: 0.31.0, "Apache-2.0", opentelemetry-proto: 0.31.0, "Apache-2.0", opentelemetry-semantic-conventions: 0.31.0, "Apache-2.0", -opentelemetry_sdk: 0.31.0, "Apache-2.0", option-ext: 0.2.0, "MPL-2.0", ordered-float: 2.10.1, "MIT", ordered-float: 4.6.0, "MIT", @@ -580,11 +585,11 @@ pest: 2.8.5, "Apache-2.0 OR MIT", pest_derive: 2.8.5, "Apache-2.0 OR MIT", pest_generator: 2.8.5, "Apache-2.0 OR MIT", pest_meta: 2.8.5, "Apache-2.0 OR MIT", +pinned: 0.1.0, "Apache-2.0 OR MIT", pin-project: 1.1.10, "Apache-2.0 OR MIT", pin-project-internal: 1.1.10, "Apache-2.0 OR MIT", pin-project-lite: 0.2.16, "Apache-2.0 OR MIT", pin-utils: 0.1.0, "Apache-2.0 OR MIT", -pinned: 0.1.0, "Apache-2.0 OR MIT", pkcs1: 0.7.5, "Apache-2.0 OR MIT", pkcs8: 0.10.2, "Apache-2.0 OR MIT", pkg-config: 0.3.32, "Apache-2.0 OR MIT", @@ -602,12 +607,12 @@ predicates-core: 1.0.9, "Apache-2.0 OR MIT", predicates-tree: 1.0.12, "Apache-2.0 OR MIT", prettyplease: 0.2.37, "Apache-2.0 OR MIT", primeorder: 0.13.6, "Apache-2.0 OR MIT", +proc-macro2: 1.0.105, "Apache-2.0 OR MIT", +proc-macro2-diagnostics: 0.10.1, "Apache-2.0 OR MIT", proc-macro-crate: 1.3.1, "Apache-2.0 OR MIT", proc-macro-crate: 3.4.0, "Apache-2.0 OR MIT", proc-macro-error: 1.0.4, "Apache-2.0 OR MIT", proc-macro-error-attr: 1.0.4, "Apache-2.0 OR MIT", -proc-macro2: 1.0.105, "Apache-2.0 OR MIT", -proc-macro2-diagnostics: 0.10.1, "Apache-2.0 OR MIT", prometheus-client: 0.24.0, "Apache-2.0 OR MIT", prometheus-client-derive-encode: 0.5.0, "Apache-2.0 OR MIT", prost: 0.14.3, "Apache-2.0", @@ -626,7 +631,6 @@ quinn: 0.11.9, "Apache-2.0 OR MIT", quinn-proto: 0.11.13, "Apache-2.0 OR MIT", quinn-udp: 0.5.14, "Apache-2.0 OR MIT", quote: 1.0.43, "Apache-2.0 OR MIT", -r-efi: 5.3.0, "Apache-2.0 OR LGPL-2.1-or-later OR MIT", radium: 0.7.0, "MIT", rand: 0.8.5, "Apache-2.0 OR MIT", rand: 0.9.2, "Apache-2.0 OR MIT", @@ -644,6 +648,7 @@ redox_syscall: 0.7.0, "MIT", redox_users: 0.5.2, "MIT", ref-cast: 1.0.25, "Apache-2.0 OR MIT", ref-cast-impl: 1.0.25, "Apache-2.0 OR MIT", +r-efi: 5.3.0, "Apache-2.0 OR LGPL-2.1-or-later OR MIT", regex: 1.12.2, "Apache-2.0 OR MIT", regex-automata: 0.4.13, "Apache-2.0 OR MIT", regex-lite: 0.1.8, "Apache-2.0 OR MIT", @@ -667,15 +672,14 @@ rmp-serde: 1.3.1, "MIT", roaring: 0.11.3, "Apache-2.0 OR MIT", route-recognizer: 0.3.1, "MIT", rsa: 0.9.10, "Apache-2.0 OR MIT", +rust_decimal: 1.40.0, "MIT", +rustc_version: 0.4.1, "Apache-2.0 OR MIT", +rustc-hash: 2.1.1, "Apache-2.0 OR MIT", rust-embed: 8.11.0, "MIT", rust-embed-impl: 8.11.0, "MIT", rust-embed-utils: 8.11.0, "MIT", -rust-ini: 0.21.3, "MIT", -rust-s3: 0.37.1, "MIT", -rust_decimal: 1.40.0, "MIT", -rustc-hash: 2.1.1, "Apache-2.0 OR MIT", -rustc_version: 0.4.1, "Apache-2.0 OR MIT", rusticata-macros: 4.1.0, "Apache-2.0 OR MIT", +rust-ini: 0.21.3, "MIT", rustix: 1.1.3, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT", rustls: 0.23.36, "Apache-2.0 OR ISC OR MIT", rustls-native-certs: 0.8.3, "Apache-2.0 OR ISC OR MIT", @@ -684,6 +688,7 @@ rustls-pki-types: 1.13.2, "Apache-2.0 OR MIT", rustls-platform-verifier: 0.6.2, "Apache-2.0 OR MIT", rustls-platform-verifier-android: 0.1.1, "Apache-2.0 OR MIT", rustls-webpki: 0.103.8, "ISC", +rust-s3: 0.37.1, "MIT", rustversion: 1.0.22, "Apache-2.0 OR MIT", rxml: 0.11.1, "MIT", rxml_validation: 0.11.0, "MIT", @@ -708,7 +713,6 @@ semver: 1.0.27, "Apache-2.0 OR MIT", send_wrapper: 0.6.0, "Apache-2.0 OR MIT", seq-macro: 0.3.6, "Apache-2.0 OR MIT", serde: 1.0.228, "Apache-2.0 OR MIT", -serde-wasm-bindgen: 0.6.5, "MIT", serde_bytes: 0.11.19, "Apache-2.0 OR MIT", serde_core: 1.0.228, "Apache-2.0 OR MIT", serde_derive: 1.0.228, "Apache-2.0 OR MIT", @@ -722,6 +726,7 @@ serde_urlencoded: 0.7.1, "Apache-2.0 OR MIT", serde_with: 3.16.1, "Apache-2.0 OR MIT", serde_with_macros: 3.16.1, "Apache-2.0 OR MIT", serde_yaml_ng: 0.10.0, "MIT", +serde-wasm-bindgen: 0.6.5, "MIT", serial_test: 3.3.1, "MIT", serial_test_derive: 3.3.1, "MIT", server: 0.6.2-edge.1, "Apache-2.0", @@ -758,8 +763,8 @@ sqlx-postgres: 0.8.6, "Apache-2.0 OR MIT", sqlx-sqlite: 0.8.6, "Apache-2.0 OR MIT", sse-stream: 0.2.1, "Apache-2.0 OR MIT", stable_deref_trait: 1.2.1, "Apache-2.0 OR MIT", -static-toml: 1.3.0, "MIT", static_assertions: 1.1.0, "Apache-2.0 OR MIT", +static-toml: 1.3.0, "MIT", stringprep: 0.1.5, "Apache-2.0 OR MIT", strsim: 0.11.1, "MIT", structmeta: 0.3.0, "Apache-2.0 OR MIT", @@ -863,9 +868,9 @@ ureq-proto: 0.5.3, "Apache-2.0 OR MIT", url: 2.5.8, "Apache-2.0 OR MIT", urlencoding: 2.1.3, "MIT", utf-8: 0.7.6, "Apache-2.0 OR MIT", -utf8-width: 0.1.8, "MIT", utf8_iter: 1.0.4, "Apache-2.0 OR MIT", utf8parse: 0.2.2, "Apache-2.0 OR MIT", +utf8-width: 0.1.8, "MIT", uuid: 1.20.0, "Apache-2.0 OR MIT", v_htmlescape: 0.15.8, "Apache-2.0 OR MIT", valuable: 0.1.1, "MIT", @@ -889,11 +894,11 @@ wasm-bindgen-macro-support: 0.2.106, "Apache-2.0 OR MIT", wasm-bindgen-shared: 0.2.106, "Apache-2.0 OR MIT", wasm-streams: 0.4.2, "Apache-2.0 OR MIT", wasmtimer: 0.4.3, "MIT", -web-sys: 0.3.83, "Apache-2.0 OR MIT", -web-time: 1.1.0, "Apache-2.0 OR MIT", webpki-root-certs: 1.0.5, "CDLA-Permissive-2.0", webpki-roots: 0.26.11, "CDLA-Permissive-2.0", webpki-roots: 1.0.5, "CDLA-Permissive-2.0", +web-sys: 0.3.83, "Apache-2.0 OR MIT", +web-time: 1.1.0, "Apache-2.0 OR MIT", whoami: 1.6.1, "Apache-2.0 OR BSL-1.0 OR MIT", widestring: 1.2.1, "Apache-2.0 OR MIT", winapi: 0.3.9, "Apache-2.0 OR MIT", @@ -902,34 +907,6 @@ winapi-util: 0.1.11, "MIT OR Unlicense", winapi-x86_64-pc-windows-gnu: 0.4.0, "Apache-2.0 OR MIT", windows: 0.61.3, "Apache-2.0 OR MIT", windows: 0.62.2, "Apache-2.0 OR MIT", -windows-collections: 0.2.0, "Apache-2.0 OR MIT", -windows-collections: 0.3.2, "Apache-2.0 OR MIT", -windows-core: 0.61.2, "Apache-2.0 OR MIT", -windows-core: 0.62.2, "Apache-2.0 OR MIT", -windows-future: 0.2.1, "Apache-2.0 OR MIT", -windows-future: 0.3.2, "Apache-2.0 OR MIT", -windows-implement: 0.60.2, "Apache-2.0 OR MIT", -windows-interface: 0.59.3, "Apache-2.0 OR MIT", -windows-link: 0.1.3, "Apache-2.0 OR MIT", -windows-link: 0.2.1, "Apache-2.0 OR MIT", -windows-numerics: 0.2.0, "Apache-2.0 OR MIT", -windows-numerics: 0.3.1, "Apache-2.0 OR MIT", -windows-result: 0.3.4, "Apache-2.0 OR MIT", -windows-result: 0.4.1, "Apache-2.0 OR MIT", -windows-strings: 0.4.2, "Apache-2.0 OR MIT", -windows-strings: 0.5.1, "Apache-2.0 OR MIT", -windows-sys: 0.45.0, "Apache-2.0 OR MIT", -windows-sys: 0.48.0, "Apache-2.0 OR MIT", -windows-sys: 0.52.0, "Apache-2.0 OR MIT", -windows-sys: 0.59.0, "Apache-2.0 OR MIT", -windows-sys: 0.60.2, "Apache-2.0 OR MIT", -windows-sys: 0.61.2, "Apache-2.0 OR MIT", -windows-targets: 0.42.2, "Apache-2.0 OR MIT", -windows-targets: 0.48.5, "Apache-2.0 OR MIT", -windows-targets: 0.52.6, "Apache-2.0 OR MIT", -windows-targets: 0.53.5, "Apache-2.0 OR MIT", -windows-threading: 0.1.0, "Apache-2.0 OR MIT", -windows-threading: 0.2.1, "Apache-2.0 OR MIT", windows_aarch64_gnullvm: 0.42.2, "Apache-2.0 OR MIT", windows_aarch64_gnullvm: 0.48.5, "Apache-2.0 OR MIT", windows_aarch64_gnullvm: 0.52.6, "Apache-2.0 OR MIT", @@ -960,6 +937,34 @@ windows_x86_64_msvc: 0.42.2, "Apache-2.0 OR MIT", windows_x86_64_msvc: 0.48.5, "Apache-2.0 OR MIT", windows_x86_64_msvc: 0.52.6, "Apache-2.0 OR MIT", windows_x86_64_msvc: 0.53.1, "Apache-2.0 OR MIT", +windows-collections: 0.2.0, "Apache-2.0 OR MIT", +windows-collections: 0.3.2, "Apache-2.0 OR MIT", +windows-core: 0.61.2, "Apache-2.0 OR MIT", +windows-core: 0.62.2, "Apache-2.0 OR MIT", +windows-future: 0.2.1, "Apache-2.0 OR MIT", +windows-future: 0.3.2, "Apache-2.0 OR MIT", +windows-implement: 0.60.2, "Apache-2.0 OR MIT", +windows-interface: 0.59.3, "Apache-2.0 OR MIT", +windows-link: 0.1.3, "Apache-2.0 OR MIT", +windows-link: 0.2.1, "Apache-2.0 OR MIT", +windows-numerics: 0.2.0, "Apache-2.0 OR MIT", +windows-numerics: 0.3.1, "Apache-2.0 OR MIT", +windows-result: 0.3.4, "Apache-2.0 OR MIT", +windows-result: 0.4.1, "Apache-2.0 OR MIT", +windows-strings: 0.4.2, "Apache-2.0 OR MIT", +windows-strings: 0.5.1, "Apache-2.0 OR MIT", +windows-sys: 0.45.0, "Apache-2.0 OR MIT", +windows-sys: 0.48.0, "Apache-2.0 OR MIT", +windows-sys: 0.52.0, "Apache-2.0 OR MIT", +windows-sys: 0.59.0, "Apache-2.0 OR MIT", +windows-sys: 0.60.2, "Apache-2.0 OR MIT", +windows-sys: 0.61.2, "Apache-2.0 OR MIT", +windows-targets: 0.42.2, "Apache-2.0 OR MIT", +windows-targets: 0.48.5, "Apache-2.0 OR MIT", +windows-targets: 0.52.6, "Apache-2.0 OR MIT", +windows-targets: 0.53.5, "Apache-2.0 OR MIT", +windows-threading: 0.1.0, "Apache-2.0 OR MIT", +windows-threading: 0.2.1, "Apache-2.0 OR MIT", winnow: 0.5.40, "MIT", winnow: 0.7.14, "MIT", wit-bindgen: 0.46.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT", @@ -990,4 +995,4 @@ zmij: 1.0.14, "MIT", zopfli: 0.8.3, "Apache-2.0", zstd: 0.13.3, "MIT", zstd-safe: 7.2.4, "Apache-2.0 OR MIT", -zstd-sys: 2.0.16+zstd.1.5.7, "Apache-2.0 OR MIT", +zstd-sys: 2.0.16+zstd.1.5.7, "Apache-2.0 OR MIT" \ No newline at end of file From 67bae8a3695dca0f870b23184739d30a7a595d09 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Sat, 31 Jan 2026 14:51:29 +0530 Subject: [PATCH 10/11] fix: add trailing newline and comma to DEPENDENCIES.md --- DEPENDENCIES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 5c6444b12d..2749737102 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -995,4 +995,4 @@ zmij: 1.0.14, "MIT", zopfli: 0.8.3, "Apache-2.0", zstd: 0.13.3, "MIT", zstd-safe: 7.2.4, "Apache-2.0 OR MIT", -zstd-sys: 2.0.16+zstd.1.5.7, "Apache-2.0 OR MIT" \ No newline at end of file +zstd-sys: 2.0.16+zstd.1.5.7, "Apache-2.0 OR MIT", From 8faa335883c55d2ce31dc119118fea37ba4cb31e Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Sat, 31 Jan 2026 15:06:02 +0530 Subject: [PATCH 11/11] fix: correct environment variable naming in redshift integration tests - Changed CONFIG_ to PLUGIN_CONFIG_ for plugin configuration fields - Changed TOPICS_0 to TOPICS with proper JSON array format - Added CONSUMER_GROUP environment variable --- .../tests/connectors/redshift/mod.rs | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/core/integration/tests/connectors/redshift/mod.rs b/core/integration/tests/connectors/redshift/mod.rs index c318e0a424..2af8857025 100644 --- a/core/integration/tests/connectors/redshift/mod.rs +++ b/core/integration/tests/connectors/redshift/mod.rs @@ -80,29 +80,29 @@ async fn setup() -> RedshiftTestSetup { // Redshift connection (using PostgreSQL as simulator) let connection_string = format!("postgres://postgres:postgres@localhost:{postgres_port}"); envs.insert( - "IGGY_CONNECTORS_SINK_REDSHIFT_CONFIG_CONNECTION_STRING".to_owned(), + "IGGY_CONNECTORS_SINK_REDSHIFT_PLUGIN_CONFIG_CONNECTION_STRING".to_owned(), connection_string, ); // S3 configuration for staging envs.insert( - "IGGY_CONNECTORS_SINK_REDSHIFT_CONFIG_S3_BUCKET".to_owned(), + "IGGY_CONNECTORS_SINK_REDSHIFT_PLUGIN_CONFIG_S3_BUCKET".to_owned(), bucket_name.to_owned(), ); envs.insert( - "IGGY_CONNECTORS_SINK_REDSHIFT_CONFIG_S3_REGION".to_owned(), + "IGGY_CONNECTORS_SINK_REDSHIFT_PLUGIN_CONFIG_S3_REGION".to_owned(), "us-east-1".to_owned(), ); envs.insert( - "IGGY_CONNECTORS_SINK_REDSHIFT_CONFIG_S3_ENDPOINT".to_owned(), + "IGGY_CONNECTORS_SINK_REDSHIFT_PLUGIN_CONFIG_S3_ENDPOINT".to_owned(), s3_endpoint, ); envs.insert( - "IGGY_CONNECTORS_SINK_REDSHIFT_CONFIG_AWS_ACCESS_KEY_ID".to_owned(), + "IGGY_CONNECTORS_SINK_REDSHIFT_PLUGIN_CONFIG_AWS_ACCESS_KEY_ID".to_owned(), "test".to_owned(), ); envs.insert( - "IGGY_CONNECTORS_SINK_REDSHIFT_CONFIG_AWS_SECRET_ACCESS_KEY".to_owned(), + "IGGY_CONNECTORS_SINK_REDSHIFT_PLUGIN_CONFIG_AWS_SECRET_ACCESS_KEY".to_owned(), "test".to_owned(), ); @@ -112,13 +112,17 @@ async fn setup() -> RedshiftTestSetup { iggy_setup.stream.to_owned(), ); envs.insert( - "IGGY_CONNECTORS_SINK_REDSHIFT_STREAMS_0_TOPICS_0".to_owned(), - iggy_setup.topic.to_owned(), + "IGGY_CONNECTORS_SINK_REDSHIFT_STREAMS_0_TOPICS".to_owned(), + format!("[{}]", iggy_setup.topic), ); envs.insert( "IGGY_CONNECTORS_SINK_REDSHIFT_STREAMS_0_SCHEMA".to_owned(), "json".to_owned(), ); + envs.insert( + "IGGY_CONNECTORS_SINK_REDSHIFT_STREAMS_0_CONSUMER_GROUP".to_owned(), + "test".to_owned(), + ); let mut runtime = setup_runtime(); runtime