Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions connectors/redshift_multithreading/connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
Redshift Multithreading connector for Fivetran Connector SDK.
Demonstrates threaded extraction from Amazon Redshift.
"""

import threading
import psycopg2
from fivetran_connector_sdk import connector, config, state, records, log, schema
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'log' is not used.

Suggested change
from fivetran_connector_sdk import connector, config, state, records, log, schema
from fivetran_connector_sdk import connector, config, state, records, schema

Copilot uses AI. Check for mistakes.

CONFIG = config.Config(
host=config.StringField(),
port=config.IntegerField(default=5439),
database=config.StringField(),
user=config.StringField(),
password=config.SecretField(),
threads=config.IntegerField(default=4)
)

SCHEMA = schema.Schema(
name="redshift_table",
columns={
"id": schema.StringColumn(),
"data": schema.JSONColumn(),
}
)

@connector(
name="RedshiftMultithreadingConnector",
version="0.1.0",
config=CONFIG,
schema=SCHEMA,
)
def run_connector(ctx: state.Context):
def worker(offset):
conn = psycopg2.connect(
host=ctx.config.host,
dbname=ctx.config.database,
user=ctx.config.user,
password=ctx.config.password,
port=ctx.config.port,
)
cur = conn.cursor()
cur.execute(f"SELECT * FROM some_table LIMIT 100 OFFSET {offset}")
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL injection vulnerability: The offset parameter is directly interpolated into the SQL query using an f-string without sanitization. While offset comes from controlled code in this example, this pattern is unsafe and violates security best practices.

Use parameterized queries instead:

cur.execute("SELECT * FROM some_table LIMIT %s OFFSET %s", (100, offset))
Suggested change
cur.execute(f"SELECT * FROM some_table LIMIT 100 OFFSET {offset}")
cur.execute("SELECT * FROM some_table LIMIT %s OFFSET %s", (100, offset))

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded table name some_table should be configurable. Users of this connector would need to modify the source code to work with their actual table names. This should be a configuration parameter instead:

# In configuration
table_name=config.StringField()

# In query
cur.execute(f"SELECT * FROM {configuration['table_name']} LIMIT 100 OFFSET {offset}")

Or better yet, use parameterized queries for the table name with proper validation to prevent SQL injection.

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pagination logic with fixed LIMIT/OFFSET is inefficient and can lead to data inconsistency issues:

  1. Performance: OFFSET becomes slower as the offset increases because the database must scan and skip all previous rows
  2. Data consistency: If rows are inserted/updated between thread executions, some data may be missed or duplicated
  3. Not incremental: This approach always reads all data, not just new/updated records

For proper incremental syncs, use:

  • A replication key (e.g., updated_at timestamp or auto-incrementing ID)
  • WHERE clause filtering: WHERE updated_at > last_synced_timestamp ORDER BY updated_at
  • Track progress in state: state["last_updated_at"]

See connectors/redshift/simple_redshift_connector/connector.py for a proper incremental sync pattern.

Copilot uses AI. Check for mistakes.
for row in cur.fetchall():
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory safety violation: fetchall() loads the entire result set into memory, which can cause memory overflow with large datasets.

Use server-side cursors with fetchmany(batch_size) instead:

__BATCH_SIZE = 1000
cursor = connection.cursor(name='server_side_cursor')
cursor.execute(query)
while True:
    rows = cursor.fetchmany(__BATCH_SIZE)
    if not rows:
        break
    for row in rows:
        op.upsert(table, row_to_dict(row))
    op.checkpoint(state)

Copilot generated this review using guidance from repository custom instructions.
records.write("redshift_table", {"id": row[0], "data": row})
conn.close()
Comment on lines +35 to +46
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing error handling and retry logic. Database queries can fail due to transient network issues, connection timeouts, or temporary database unavailability. There is no try-except block or retry mechanism with exponential backoff.

Example with retry logic:

import time

__MAX_RETRIES = 5

for attempt in range(__MAX_RETRIES):
    try:
        conn = psycopg2.connect(...)
        cursor = conn.cursor()
        cursor.execute(query)
        break
    except (psycopg2.OperationalError, psycopg2.InterfaceError) as e:
        if attempt == __MAX_RETRIES - 1:
            raise
        sleep_time = min(60, 2 ** attempt)
        log.warning(f"Retry {attempt + 1}/{__MAX_RETRIES} after {sleep_time}s: {e}")
        time.sleep(sleep_time)

Copilot generated this review using guidance from repository custom instructions.
Comment on lines +34 to +46
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing logging. The SDK requires using from fivetran_connector_sdk import Logging as log and logging important events such as:

  • Connection establishment: log.info(f"Connected to Redshift at {host}:{port}")
  • Progress updates: log.info(f"Processing batch {batch_num}")
  • Retry attempts: log.warning(f"Retry {attempt}/{max_retries} due to: {error}")
  • Errors: log.severe(f"Failed to fetch data: {error}")

Never use Python's built-in logging module or print() statements.

Copilot generated this review using guidance from repository custom instructions.
Comment on lines +35 to +46
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource leak: Database connections are not properly closed in case of exceptions. If an error occurs during query execution or data processing, the connection remains open.

Use context managers or try-finally blocks:

conn = None
try:
    conn = psycopg2.connect(...)
    cur = conn.cursor()
    # ... process data
finally:
    if conn:
        conn.close()

Or better, use psycopg2's context manager:

with psycopg2.connect(...) as conn:
    with conn.cursor() as cur:
        # ... process data
Suggested change
conn = psycopg2.connect(
host=ctx.config.host,
dbname=ctx.config.database,
user=ctx.config.user,
password=ctx.config.password,
port=ctx.config.port,
)
cur = conn.cursor()
cur.execute(f"SELECT * FROM some_table LIMIT 100 OFFSET {offset}")
for row in cur.fetchall():
records.write("redshift_table", {"id": row[0], "data": row})
conn.close()
# Use context managers to ensure the connection and cursor are always closed, even if an exception occurs.
with psycopg2.connect(
host=ctx.config.host,
dbname=ctx.config.database,
user=ctx.config.user,
password=ctx.config.password,
port=ctx.config.port,
) as conn:
with conn.cursor() as cur:
cur.execute(f"SELECT * FROM some_table LIMIT 100 OFFSET {offset}")
for row in cur.fetchall():
records.write("redshift_table", {"id": row[0], "data": row})

Copilot uses AI. Check for mistakes.

threads = []
for i in range(ctx.config.threads):
t = threading.Thread(target=worker, args=(i * 100,))
t.start()
threads.append(t)

for t in threads:
t.join()
Comment on lines +34 to +55
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect multithreading implementation. According to SDK multithreading guidelines (see connectors/oauth2_and_accelo_api_connector_multithreading_enabled/api_threading_utils.py):

Critical rules violated:

  1. Never call SDK operations inside threads: records.write() (or correct SDK's op.upsert()) should NOT be called from worker threads
  2. Only API/data fetching should be threaded: Threads should only fetch data and return it; all SDK operations must happen in the main thread
  3. Thread-safe state management: State must use threading.local() or be synchronized

Correct pattern:

from concurrent.futures import ThreadPoolExecutor

def fetch_batch(offset):
    # Only fetch data in thread
    conn = psycopg2.connect(...)
    cursor = conn.cursor()
    cursor.execute(query, (offset,))
    rows = cursor.fetchmany(batch_size)
    conn.close()
    return rows

# In main thread
with ThreadPoolExecutor(max_workers=threads) as executor:
    futures = [executor.submit(fetch_batch, i * 100) for i in range(num_batches)]
    for future in futures:
        rows = future.result()
        for row in rows:
            op.upsert(table, row)  # SDK operations in main thread only
        op.checkpoint(state)

Copilot uses AI. Check for mistakes.

return ctx.update_state({"last_sync": "now"})
Comment on lines +6 to +57
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This connector uses an incorrect API pattern for the Fivetran Connector SDK. The SDK does not support decorator-based connectors with @connector, config.Config(), or schema.Schema() patterns.

The correct pattern requires:

  1. Import: from fivetran_connector_sdk import Connector, Operations as op, Logging as log
  2. Define update(configuration: dict, state: dict) function
  3. Define schema(configuration: dict) function
  4. Initialize: connector = Connector(update=update, schema=schema)

Please refer to the template at template_example_connector/connector.py or existing examples like connectors/redshift/simple_redshift_connector/connector.py for the correct structure.

Suggested change
import threading
import psycopg2
from fivetran_connector_sdk import connector, config, state, records, log, schema
CONFIG = config.Config(
host=config.StringField(),
port=config.IntegerField(default=5439),
database=config.StringField(),
user=config.StringField(),
password=config.SecretField(),
threads=config.IntegerField(default=4)
)
SCHEMA = schema.Schema(
name="redshift_table",
columns={
"id": schema.StringColumn(),
"data": schema.JSONColumn(),
}
)
@connector(
name="RedshiftMultithreadingConnector",
version="0.1.0",
config=CONFIG,
schema=SCHEMA,
)
def run_connector(ctx: state.Context):
def worker(offset):
conn = psycopg2.connect(
host=ctx.config.host,
dbname=ctx.config.database,
user=ctx.config.user,
password=ctx.config.password,
port=ctx.config.port,
)
cur = conn.cursor()
cur.execute(f"SELECT * FROM some_table LIMIT 100 OFFSET {offset}")
for row in cur.fetchall():
records.write("redshift_table", {"id": row[0], "data": row})
conn.close()
threads = []
for i in range(ctx.config.threads):
t = threading.Thread(target=worker, args=(i * 100,))
t.start()
threads.append(t)
for t in threads:
t.join()
return ctx.update_state({"last_sync": "now"})
import threading # For multithreaded data extraction
import psycopg2 # For connecting to Amazon Redshift
import json # For reading configuration from JSON file
from fivetran_connector_sdk import Connector # For connector initialization
from fivetran_connector_sdk import Logging as log # For logging
from fivetran_connector_sdk import Operations as op # For upsert and checkpoint operations
# Constants for configuration keys and batch size
__BATCH_SIZE = 100 # Number of records per thread
__TABLE_NAME = "redshift_table"
__MAX_THREADS = 16 # Maximum allowed threads
def validate_configuration(configuration: dict):
"""
Validate the configuration dictionary to ensure it contains all required parameters.
This function is called at the start of the update method to ensure that the connector has all necessary configuration values.
Args:
configuration: a dictionary that holds the configuration settings for the connector.
Raises:
ValueError: if any required configuration parameter is missing or invalid.
"""
required_configs = ["host", "port", "database", "user", "password", "threads"]
for key in required_configs:
if key not in configuration:
raise ValueError(f"Missing required configuration value: {key}")
if not isinstance(configuration["port"], int) or not (0 < configuration["port"] < 65536):
raise ValueError("Port must be a valid integer between 1 and 65535.")
if not isinstance(configuration["threads"], int) or not (1 <= configuration["threads"] <= __MAX_THREADS):
raise ValueError(f"Threads must be an integer between 1 and {__MAX_THREADS}.")
# Additional validation can be added here as needed
def schema(configuration: dict):
"""
Define the schema function which lets you configure the schema your connector delivers.
See the technical reference documentation for more details on the schema function:
https://fivetran.com/docs/connectors/connector-sdk/technical-reference#schema
Args:
configuration: a dictionary that holds the configuration settings for the connector.
"""
return [
{
"table": __TABLE_NAME,
"primary_key": ["id"],
"columns": {
"id": "STRING",
"data": "JSON"
}
}
]
def update(configuration: dict, state: dict):
"""
Define the update function which lets you configure how your connector fetches data.
See the technical reference documentation for more details on the update function:
https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update
Args:
configuration: a dictionary that holds the configuration settings for the connector.
state: a dictionary that holds the state of the connector.
"""
log.warning("Example: DATABASE : Redshift Multithreading")
validate_configuration(configuration)
# Use threading to fetch data in parallel from Redshift
threads = []
thread_errors = []
results_lock = threading.Lock()
def worker(offset):
"""
Worker function to fetch a batch of records from Redshift.
Args:
offset: The offset for the SQL query LIMIT/OFFSET.
"""
try:
conn = psycopg2.connect(
host=configuration["host"],
dbname=configuration["database"],
user=configuration["user"],
password=configuration["password"],
port=configuration["port"],
)
cur = conn.cursor()
# Fetch a batch of records using LIMIT and OFFSET
cur.execute(f"SELECT * FROM some_table LIMIT {__BATCH_SIZE} OFFSET {offset}")
rows = cur.fetchall()
for row in rows:
# The 'upsert' operation is used to insert or update data in the destination table.
# The first argument is the name of the destination table.
# The second argument is a dictionary containing the record to be upserted.
op.upsert(
table=__TABLE_NAME,
data={"id": str(row[0]), "data": row}
)
conn.close()
except Exception as e:
with results_lock:
thread_errors.append(str(e))
log.severe(f"Thread failed with error: {e}")
num_threads = configuration["threads"]
for i in range(num_threads):
t = threading.Thread(target=worker, args=(i * __BATCH_SIZE,))
t.start()
threads.append(t)
for t in threads:
t.join()
if thread_errors:
raise RuntimeError(f"One or more threads failed: {thread_errors}")
# Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume
# from the correct position in case of next sync or interruptions.
# Learn more about how and where to checkpoint by reading our best practices documentation
# (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation).
state["last_sync"] = "now"
op.checkpoint(state)
# Create the connector object using the schema and update functions
connector = Connector(update=update, schema=schema)
# Check if the script is being run as the main module.
# This is Python's standard entry method allowing your script to be run directly from the command line or IDE 'run' button.
# This is useful for debugging while you write your code. Note this method is not called by Fivetran when executing your connector in production.
# Please test using the Fivetran debug command prior to finalizing and deploying your connector.
if __name__ == "__main__":
# Open the configuration.json file and load its contents
with open("configuration.json", "r") as f:
configuration = json.load(f)
# Test the connector locally
connector.debug()

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing checkpointing logic. State is only updated once at the end via ctx.update_state({"last_sync": "now"}), which means:

  1. If the connector fails mid-sync, all progress is lost
  2. The state value "now" is not meaningful for incremental syncs
  3. No progress tracking during the sync

The SDK requires checkpointing at appropriate intervals (e.g., after processing each batch) using op.checkpoint(state) with meaningful state values like last processed record ID or timestamp.

Copilot generated this review using guidance from repository custom instructions.
Comment on lines +1 to +57
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing requirements.txt file. This connector uses psycopg2 which is not provided by the Fivetran runtime, so it must be declared in requirements.txt with an explicit version:

psycopg2-binary==2.9.9

Note: Use psycopg2-binary instead of psycopg2 for easier installation. Never include fivetran_connector_sdk or requests in requirements.txt as they are provided by the runtime.

Copilot generated this review using guidance from repository custom instructions.
Comment on lines +1 to +57
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing all required docstrings and comments. The SDK requires:

  1. Function docstrings for update() and schema() with exact format specified in coding standards
  2. Import comments explaining the purpose of each import
  3. Comments before op.upsert() explaining the upsert operation
  4. Comments before op.checkpoint() explaining checkpointing
  5. Comments at if __name__ == "__main__" block explaining the debug entry point
  6. First log statement in update(): log.warning("Example: <CATEGORY> : <EXAMPLE_NAME>")

Refer to template_example_connector/connector.py for the exact required docstring and comment formats.

Copilot generated this review using guidance from repository custom instructions.
Comment on lines +1 to +57
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing required configuration.json file. Every connector must include a configuration.json file with placeholder values in the format "key": "<YOUR_KEY>".

Example for this connector:

{
    "host": "<YOUR_REDSHIFT_HOST>",
    "port": 5439,
    "database": "<YOUR_DATABASE_NAME>",
    "user": "<YOUR_USERNAME>",
    "password": "<YOUR_PASSWORD>",
    "threads": 4
}

This file is required for local testing with fivetran debug command.

Copilot generated this review using guidance from repository custom instructions.
Comment on lines +1 to +57
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing required README.md file. Every connector must include a README.md following the structure in template_example_connector/README_template.md.

Required sections:

  1. Single H1 heading: # Redshift Multithreading Connector Example
  2. Connector overview
  3. Requirements
  4. Getting started
  5. Features
  6. Data handling
  7. Error handling
  8. Tables created
  9. Additional considerations

This documentation is essential for users to understand how to use and configure the connector.

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +57
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing the main execution block required for local testing. The connector must include:

# Create the connector object using the schema and update functions
connector = Connector(update=update, schema=schema)

# Check if the script is being run as the main module.
# This is Python's standard entry method allowing your script to be run directly from the command line or IDE 'run' button.
# This is useful for debugging while you write your code. Note this method is not called by Fivetran when executing your connector in production.
# Please test using the Fivetran debug command prior to finalizing and deploying your connector.
if __name__ == "__main__":
    # Open the configuration.json file and load its contents
    with open("configuration.json", "r") as f:
        configuration = json.load(f)
    
    # Test the connector locally
    connector.debug(configuration=configuration)

This is required for testing with the fivetran debug command.

Copilot generated this review using guidance from repository custom instructions.
Comment on lines +1 to +57
Copy link

Copilot AI Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing the required validate_configuration() function. This function must validate configuration values for correctness (e.g., port ranges, valid formats, required fields) and raise ValueError with descriptive error messages for invalid configurations.

Example:

def validate_configuration(configuration: dict):
    """
    Validate the configuration dictionary to ensure it contains all required parameters.
    Args:
        configuration: a dictionary that holds the configuration settings for the connector.
    Raises:
        ValueError: if any required configuration parameter is missing or invalid.
    """
    required_configs = ["host", "database", "user", "password"]
    for key in required_configs:
        if key not in configuration:
            raise ValueError(f"Missing required configuration value: {key}")
    
    port = configuration.get("port", 5439)
    if not isinstance(port, int) or port < 1 or port > 65535:
        raise ValueError(f"Invalid port number: {port}. Must be between 1 and 65535.")

This function should be called at the start of the update() function.

Copilot generated this review using guidance from repository custom instructions.
Loading