From 5fad514a087608f728be50f44493eec62b9b2875 Mon Sep 17 00:00:00 2001 From: Karun Veluru Date: Fri, 31 Oct 2025 13:33:40 -0500 Subject: [PATCH 1/4] feature(examples): Add Xurrent connector --- README.md | 1 + connectors/xurrent/README.md | 105 +++++ connectors/xurrent/configuration.json | 11 + connectors/xurrent/connector.py | 639 ++++++++++++++++++++++++++ 4 files changed, 756 insertions(+) create mode 100644 connectors/xurrent/README.md create mode 100644 connectors/xurrent/configuration.json create mode 100644 connectors/xurrent/connector.py diff --git a/README.md b/README.md index 86bade28c..cc8fcbc2f 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,7 @@ These connectors are ready to use out of the box, requiring minimal modification - [basic_auth](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/veeva_vault/basic_auth) - This example shows how to authenticate to Veeva Vault using basic authentication and sync records from Veeva Vault. You need to provide your Veeva Vault credentials for this example to work. - [session_id_auth](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/veeva_vault/session_id_auth) - This example shows how to authenticate to Veeva Vault using session id authentication and sync records from Veeva Vault. You need to provide your Veeva Vault credentials for this example to work. - [vercel](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/vercel) - This example shows how to sync deployment data from Vercel's REST API by using the Connector SDK, focusing on a single endpoint (/v6/deployments). You need to provide your Vercel API token for this example to work. Optionally, you can also provide a Team ID to access team resources instead of personal account deployments. +- [xurrent](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/xurrent) - This example shows how to sync organizations, products, and projects data from Xurrent API using Connector SDK. You need to provide your Xurrent API key and account ID for this example to work. - [zigpoll](https://github.com/fivetran/fivetran_connector_sdk/tree/main/connectors/zigpoll) - This is an example of how to sync Zigpoll data using Connector SDK. You need to provide your Zigpoll API key for this example to work. ## Examples diff --git a/connectors/xurrent/README.md b/connectors/xurrent/README.md new file mode 100644 index 000000000..a02a076c6 --- /dev/null +++ b/connectors/xurrent/README.md @@ -0,0 +1,105 @@ +# Xurrent API Connector Example + +## Connector overview +This connector synchronizes organizations, products, and projects data from the Xurrent API into your data warehouse. It fetches organizational structures, product catalogs, and project management data using OAuth2 authentication with automatic token refresh. The connector implements memory-efficient streaming patterns to handle large datasets and supports incremental synchronization using timestamp-based cursors for optimal performance. + +## Requirements +- [Supported Python versions](https://github.com/fivetran/fivetran_connector_sdk/blob/main/README.md#requirements): **3.9-3.13** +- Operating system: + - Windows: 10 or later (64-bit only) + - macOS: 13 (Ventura) or later (Apple Silicon [arm64] or Intel [x86_64]) + - Linux: Distributions such as Ubuntu 20.04 or later, Debian 10 or later, or Amazon Linux 2 or later (arm64 or x86_64) + +## Getting started +Refer to the [Connector SDK Setup Guide](https://fivetran.com/docs/connectors/connector-sdk/setup-guide) to get started. + +## Features +- Syncs organizations, products, and projects data from Xurrent API +- OAuth2 authentication with Bearer token and account ID authentication (refer to `execute_api_request` function) +- Page-based pagination with automatic page traversal (refer to `get_organizations`, `get_products`, and `get_projects` functions) +- Memory-efficient streaming prevents data accumulation for large datasets +- Incremental synchronization using timestamp-based cursors (refer to `get_time_range` function) +- Comprehensive error handling with exponential backoff retry logic +- Rate limiting support with automatic retry after delays +- Configurable pagination limits and timeout settings + +## Configuration file +```json +{ + "oauth_token": "", + "account_id": "", + "sync_frequency_hours": "", + "initial_sync_days": "", + "max_records_per_page": "", + "request_timeout_seconds": "", + "retry_attempts": "", + "enable_incremental_sync": "", + "enable_debug_logging": "" +} +``` + +**Configuration parameters:** +- `oauth_token`: OAuth Bearer token for API authentication (required) +- `account_id`: Xurrent account identifier for X-Xurrent-Account header (required) +- `base_url`: API base URL, defaults to https://api.xurrent.com/v1 +- `sync_frequency_hours`: How often to run sync in hours +- `initial_sync_days`: Days of historical data to fetch on first sync (1-365) +- `max_records_per_page`: Records per API request page (1-100, default: 25) +- `request_timeout_seconds`: HTTP request timeout in seconds (10-300, default: 30) +- `retry_attempts`: Number of retry attempts for failed requests (1-10, default: 3) +- `enable_incremental_sync`: Enable timestamp-based incremental sync (true/false) +- `enable_debug_logging`: Enable detailed debug logging (true/false) + +## Requirements file +This connector does not require any additional packages beyond those provided by the Fivetran environment. + +Note: The `fivetran_connector_sdk:latest` and `requests:latest` packages are pre-installed in the Fivetran environment. To avoid dependency conflicts, do not declare them in your `requirements.txt`. + +## Authentication +1. Log in to the [Xurrent Developer Portal](https://developer.xurrent.com/). +2. Register a new application to obtain OAuth credentials. +3. Generate a Personal Access Token or create an OAuth Application. +4. Make a note of the `oauth_token` from your application settings. +5. Retrieve your `account_id` from Xurrent administrators or account settings. +6. Use sandbox credentials for testing, production credentials for live syncing. + +Note: The connector automatically handles OAuth2 authentication using Bearer tokens. Credentials are never logged or exposed in plain text. + +## Pagination +Page-based pagination with automatic page traversal (refer to `get_organizations`, `get_products`, and `get_projects` functions). Generator-based processing prevents memory accumulation for large datasets. Processes pages sequentially while yielding individual records for immediate processing. Supports configurable page sizes from 1 to 100 records per request. + +## Data handling +Organization, product, and project data is mapped from Xurrent API format to normalized database columns (refer to the `__map_organization_data`, `__map_product_data`, and `__map_project_data` functions). Nested objects are flattened, and all timestamps are converted to UTC format for consistency. + +Supports timestamp-based incremental synchronization using the `last_sync_time` state parameter (refer to the `get_time_range` function). Initial sync can be configured to fetch historical data up to 365 days. + +## Error handling +- 429 Rate Limited: Automatic retry with Retry-After header support (refer to the `__handle_rate_limit` function) +- Timeout handling with configurable retry attempts (refer to the `__handle_request_error` function) +- Exponential backoff with jitter prevents multiple clients from making requests at the same time +- Parameter validation with descriptive error messages provides clear guidance for fixing setup issues +- Network connectivity errors with automatic retry logic +- Authentication failures with clear error reporting + +## Tables created +| Table | Primary Key | Description | +|-------|-------------|-------------| +| ORGANIZATIONS | `id` | Organization hierarchy and management structure | +| PRODUCTS | `id` | Product catalog with categories and support information | +| PROJECTS | `id` | Project management data with status and timeline information | + +Column types are automatically inferred by Fivetran. Sample columns include `name`, `status`, `category`, `created_at`, `updated_at`, `manager_id`, `service_id`, `customer_id`. + +Organizations table includes parent-child relationships, manager assignments, and business unit associations. Products table contains brand information, service relationships, and support team assignments. Projects table tracks status, completion targets, and customer relationships. + +## Additional files + +The connector includes several additional files to support functionality, testing, and deployment: + +- `requirements.txt` – Python dependency specification for Xurrent API integration and connector requirements including faker for mock testing. + +- `configuration.json` – Configuration template for API credentials and connector parameters (should be excluded from version control). + + +## Additional considerations +The examples provided are intended to help you effectively use Fivetran's Connector SDK. While we've tested the code, Fivetran cannot be held responsible for any unexpected or negative consequences that may arise from using these examples. For inquiries, please reach out to our Support team. \ No newline at end of file diff --git a/connectors/xurrent/configuration.json b/connectors/xurrent/configuration.json new file mode 100644 index 000000000..3fbda028e --- /dev/null +++ b/connectors/xurrent/configuration.json @@ -0,0 +1,11 @@ +{ + "oauth_token": "", + "account_id": "", + "sync_frequency_hours": "", + "initial_sync_days": "", + "max_records_per_page": "", + "request_timeout_seconds": "", + "retry_attempts": "", + "enable_incremental_sync": "", + "enable_debug_logging": "" +} \ No newline at end of file diff --git a/connectors/xurrent/connector.py b/connectors/xurrent/connector.py new file mode 100644 index 000000000..6f1fb3bd8 --- /dev/null +++ b/connectors/xurrent/connector.py @@ -0,0 +1,639 @@ +"""Xurrent API connector for syncing organizations, products, and projects data. +This connector demonstrates how to fetch data from Xurrent API and upsert it into destination using memory-efficient streaming patterns. +See the Technical Reference documentation (https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update) +and the Best Practices documentation (https://fivetran.com/docs/connectors/connector-sdk/best-practices) for details +""" + +# For reading configuration from a JSON file +import json + +# For adding jitter to retry delays +import random + +# For making HTTP requests to Xurrent API +import requests + +# For implementing delays in retry logic and rate limiting +import time + +# For handling dates and timestamps +from datetime import datetime, timedelta, timezone + +# Import required classes from fivetran_connector_sdk +from fivetran_connector_sdk import Connector + +# For enabling Logs in your connector code +from fivetran_connector_sdk import Logging as log + +# For supporting Data operations like Upsert(), Update(), Delete() and checkpoint() +from fivetran_connector_sdk import Operations as op + +# Private constants (use __ prefix) +__API_BASE_URL = "https://api.xurrent.com/v1" +__DEFAULT_PAGE_SIZE = 25 +__MAX_PAGE_SIZE = 100 +__MAX_RETRY_ATTEMPTS = 3 +__DEFAULT_REQUEST_TIMEOUT = 30 + + +def __get_config_int(configuration, key, default, min_val=None, max_val=None): + """ + Extract and validate integer configuration parameters with range checking. + This function safely extracts integer values from configuration and applies validation. + + Args: + configuration: Configuration dictionary containing connector settings. + key: The configuration key to extract. + default: Default value to return if key is missing or invalid. + min_val: Minimum allowed value (optional). + max_val: Maximum allowed value (optional). + + Returns: + int: The validated integer value or default if validation fails. + """ + try: + value = int(configuration.get(key, default)) + if min_val is not None and value < min_val: + return default + if max_val is not None and value > max_val: + return default + return value + except (ValueError, TypeError): + return default + + +def __get_config_str(configuration, key, default=""): + """ + Extract string configuration parameters with type safety. + This function safely extracts string values from configuration dictionary. + + Args: + configuration: Configuration dictionary containing connector settings. + key: The configuration key to extract. + default: Default value to return if key is missing. + + Returns: + str: The string value or default if key is missing. + """ + return str(configuration.get(key, default)) + + +def __get_config_bool(configuration, key, default=False): + """ + Extract and parse boolean configuration parameters from strings or boolean values. + This function handles string representations of boolean values commonly used in JSON configuration. + + Args: + configuration: Configuration dictionary containing connector settings. + key: The configuration key to extract. + default: Default boolean value to return if key is missing. + + Returns: + bool: The parsed boolean value or default if key is missing. + """ + value = configuration.get(key, default) + if isinstance(value, str): + return value.lower() in ("true", "1", "yes", "on") + return bool(value) + + +def __calculate_wait_time(attempt, response_headers, base_delay=1, max_delay=60): + """ + Calculate exponential backoff wait time with jitter for retry attempts. + This function implements exponential backoff with random jitter to prevent thundering herd problems. + + Args: + attempt: Current attempt number (0-based). + response_headers: HTTP response headers dictionary that may contain Retry-After. + base_delay: Base delay in seconds for exponential backoff. + max_delay: Maximum delay cap in seconds. + + Returns: + float: Wait time in seconds before next retry attempt. + """ + if "Retry-After" in response_headers: + return min(int(response_headers["Retry-After"]), max_delay) + + # Exponential backoff with jitter + wait_time = min(base_delay * (2**attempt), max_delay) + jitter = random.uniform(0.1, 0.3) * wait_time + return wait_time + jitter + + +def __handle_rate_limit(attempt, response): + """ + Handle HTTP 429 rate limiting responses with appropriate delays. + This function logs the rate limit and waits before allowing retry attempts. + + Args: + attempt: Current attempt number for logging purposes. + response: HTTP response object containing rate limit headers. + """ + wait_time = __calculate_wait_time(attempt, response.headers) + log.warning(f"Rate limit hit, waiting {wait_time:.1f} seconds before retry {attempt + 1}") + time.sleep(wait_time) + + +def __handle_request_error(attempt, retry_attempts, error, endpoint): + """ + Handle request errors with exponential backoff retry logic. + This function manages retry attempts for failed API requests with appropriate delays. + + Args: + attempt: Current attempt number (0-based). + retry_attempts: Total number of retry attempts allowed. + error: The exception that occurred during the request. + endpoint: API endpoint that failed for logging purposes. + + Raises: + Exception: Re-raises the original error after all retry attempts are exhausted. + """ + if attempt < retry_attempts - 1: + wait_time = __calculate_wait_time(attempt, {}) + log.warning( + f"Request failed for {endpoint}: {str(error)}. Retrying in {wait_time:.1f} seconds..." + ) + time.sleep(wait_time) + else: + log.severe(f"All retry attempts failed for {endpoint}: {str(error)}") + raise error + + +def execute_api_request(endpoint, oauth_token, account_id, params=None, configuration=None): + """ + Execute HTTP API requests with comprehensive error handling and retry logic. + This function handles authentication, rate limiting, timeouts, and network errors. + + Args: + endpoint: API endpoint path to request. + oauth_token: OAuth token for API authentication. + account_id: Xurrent account ID for API access. + params: Query parameters for the request (optional). + configuration: Configuration dictionary for timeout and retry settings. + + Returns: + dict: Parsed JSON response from the API. + + Raises: + RuntimeError: If all retry attempts fail or unexpected errors occur. + requests.exceptions.RequestException: For unrecoverable HTTP errors. + """ + url = f"{__API_BASE_URL}{endpoint}" + headers = { + "Authorization": f"Bearer {oauth_token}", + "X-Xurrent-Account": account_id, + "Accept": "application/json", + } + + timeout = __get_config_int(configuration, "request_timeout_seconds", __DEFAULT_REQUEST_TIMEOUT) + retry_attempts = __get_config_int(configuration, "retry_attempts", __MAX_RETRY_ATTEMPTS) + + for attempt in range(retry_attempts): + try: + response = requests.get(url, headers=headers, params=params, timeout=timeout) + + if response.status_code == 429: + __handle_rate_limit(attempt, response) + continue + + response.raise_for_status() + return response.json() + + except requests.exceptions.RequestException as e: + __handle_request_error(attempt, retry_attempts, e, endpoint) + continue + + raise RuntimeError("Unexpected error in API request execution") + + +def __map_organization_data(record): + """ + Transform API response record to organizations table schema format. + This function maps raw API fields to normalized database column names and types. + + Args: + record: Raw API response record dictionary for organization. + + Returns: + dict: Transformed organization record ready for database insertion. + """ + return { + "id": record.get("id"), + "name": record.get("name", ""), + "source_id": record.get("sourceID", ""), + "disabled": record.get("disabled", False), + "parent_id": record.get("parent", {}).get("id") if record.get("parent") else None, + "parent_name": record.get("parent", {}).get("name") if record.get("parent") else None, + "manager_id": record.get("manager", {}).get("id") if record.get("manager") else None, + "manager_name": record.get("manager", {}).get("name") if record.get("manager") else None, + "business_unit_id": ( + record.get("business_unit_organization", {}).get("id") + if record.get("business_unit_organization") + else None + ), + "region": record.get("region", ""), + "remarks": record.get("remarks", ""), + "created_at": record.get("created_at"), + "updated_at": record.get("updated_at"), + "synced_at": datetime.now(timezone.utc).isoformat(), + } + + +def __map_product_data(record): + """ + Transform API response record to products table schema format. + This function maps raw API fields to normalized database column names and types. + + Args: + record: Raw API response record dictionary for product. + + Returns: + dict: Transformed product record ready for database insertion. + """ + return { + "id": record.get("id"), + "name": record.get("name", ""), + "source_id": record.get("sourceID", ""), + "brand": record.get("brand", ""), + "category": record.get("category", ""), + "model": record.get("model", ""), + "disabled": record.get("disabled", False), + "service_id": record.get("service", {}).get("id") if record.get("service") else None, + "service_name": record.get("service", {}).get("name") if record.get("service") else None, + "support_team_id": ( + record.get("support_team", {}).get("id") if record.get("support_team") else None + ), + "support_team_name": ( + record.get("support_team", {}).get("name") if record.get("support_team") else None + ), + "financial_owner_id": ( + record.get("financial_owner", {}).get("id") if record.get("financial_owner") else None + ), + "supplier_id": record.get("supplier", {}).get("id") if record.get("supplier") else None, + "useful_life": record.get("useful_life"), + "depreciation_method": record.get("depreciation_method", ""), + "remarks": record.get("remarks", ""), + "created_at": record.get("created_at"), + "updated_at": record.get("updated_at"), + "synced_at": datetime.now(timezone.utc).isoformat(), + } + + +def __map_project_data(record): + """ + Transform API response record to projects table schema format. + This function maps raw API fields to normalized database column names and types. + + Args: + record: Raw API response record dictionary for project. + + Returns: + dict: Transformed project record ready for database insertion. + """ + return { + "id": record.get("id"), + "subject": record.get("subject", ""), + "source_id": record.get("sourceID", ""), + "category": record.get("category", ""), + "status": record.get("status", ""), + "program": record.get("program", ""), + "justification": record.get("justification", ""), + "time_zone": record.get("time_zone", ""), + "completion_target_at": record.get("completion_target_at"), + "service_id": record.get("service", {}).get("id") if record.get("service") else None, + "service_name": record.get("service", {}).get("name") if record.get("service") else None, + "customer_id": record.get("customer", {}).get("id") if record.get("customer") else None, + "customer_name": ( + record.get("customer", {}).get("name") if record.get("customer") else None + ), + "manager_id": record.get("manager", {}).get("id") if record.get("manager") else None, + "manager_name": record.get("manager", {}).get("name") if record.get("manager") else None, + "work_hours_id": ( + record.get("work_hours", {}).get("id") if record.get("work_hours") else None + ), + "created_at": record.get("created_at"), + "updated_at": record.get("updated_at"), + "synced_at": datetime.now(timezone.utc).isoformat(), + } + + +def get_time_range(last_sync_time=None, configuration=None): + """ + Generate time range for incremental or initial data synchronization. + This function creates start and end timestamps for API queries based on sync state. + + Args: + last_sync_time: Timestamp of last successful sync (optional). + configuration: Configuration dictionary containing sync settings. + + Returns: + dict: Dictionary containing 'start' and 'end' timestamps in ISO format. + """ + end_time = datetime.now(timezone.utc).isoformat() + + if last_sync_time: + start_time = last_sync_time + else: + initial_sync_days = __get_config_int(configuration, "initial_sync_days", 90) + start_time = (datetime.now(timezone.utc) - timedelta(days=initial_sync_days)).isoformat() + + return {"start": start_time, "end": end_time} + + +def get_organizations(oauth_token, account_id, last_sync_time=None, configuration=None): + """ + Fetch organizations using memory-efficient streaming approach with pagination. + This generator function prevents memory accumulation by yielding individual records. + + Args: + oauth_token: OAuth token for API authentication. + account_id: Xurrent account ID for API access. + last_sync_time: Timestamp for incremental sync (optional). + configuration: Configuration dictionary containing connector settings. + + Yields: + dict: Individual organization records mapped to destination schema. + + Raises: + RuntimeError: If API requests fail after all retry attempts. + """ + endpoint = "/organizations" + max_records = __get_config_int( + configuration, "max_records_per_page", __DEFAULT_PAGE_SIZE, 1, __MAX_PAGE_SIZE + ) + + params = {"per_page": max_records, "page": 1} + + # Add incremental sync filter if available + if last_sync_time: + params["updated_at"] = f">{last_sync_time}" + + page = 1 + while True: + params["page"] = page + response = execute_api_request(endpoint, oauth_token, account_id, params, configuration) + + data = response if isinstance(response, list) else response.get("data", []) + if not data: + break + + # Yield individual records instead of accumulating + for record in data: + yield __map_organization_data(record) + + if len(data) < max_records: + break + page += 1 + + +def get_products(oauth_token, account_id, last_sync_time=None, configuration=None): + """ + Fetch products using memory-efficient streaming approach with pagination. + This generator function prevents memory accumulation by yielding individual records. + + Args: + oauth_token: OAuth token for API authentication. + account_id: Xurrent account ID for API access. + last_sync_time: Timestamp for incremental sync (optional). + configuration: Configuration dictionary containing connector settings. + + Yields: + dict: Individual product records mapped to destination schema. + + Raises: + RuntimeError: If API requests fail after all retry attempts. + """ + endpoint = "/products" + max_records = __get_config_int( + configuration, "max_records_per_page", __DEFAULT_PAGE_SIZE, 1, __MAX_PAGE_SIZE + ) + + params = {"per_page": max_records, "page": 1} + + # Add incremental sync filter if available + if last_sync_time: + params["updated_at"] = f">{last_sync_time}" + + page = 1 + while True: + params["page"] = page + response = execute_api_request(endpoint, oauth_token, account_id, params, configuration) + + data = response if isinstance(response, list) else response.get("data", []) + if not data: + break + + # Yield individual records instead of accumulating + for record in data: + yield __map_product_data(record) + + if len(data) < max_records: + break + page += 1 + + +def get_projects(oauth_token, account_id, last_sync_time=None, configuration=None): + """ + Fetch projects using memory-efficient streaming approach with pagination. + This generator function prevents memory accumulation by yielding individual records. + + Args: + oauth_token: OAuth token for API authentication. + account_id: Xurrent account ID for API access. + last_sync_time: Timestamp for incremental sync (optional). + configuration: Configuration dictionary containing connector settings. + + Yields: + dict: Individual project records mapped to destination schema. + + Raises: + RuntimeError: If API requests fail after all retry attempts. + """ + endpoint = "/projects" + max_records = __get_config_int( + configuration, "max_records_per_page", __DEFAULT_PAGE_SIZE, 1, __MAX_PAGE_SIZE + ) + + params = {"per_page": max_records, "page": 1} + + # Add incremental sync filter if available + if last_sync_time: + params["updated_at"] = f">{last_sync_time}" + + page = 1 + while True: + params["page"] = page + response = execute_api_request(endpoint, oauth_token, account_id, params, configuration) + + data = response if isinstance(response, list) else response.get("data", []) + if not data: + break + + # Yield individual records instead of accumulating + for record in data: + yield __map_project_data(record) + + if len(data) < max_records: + break + page += 1 + + +def schema(configuration: dict): + """ + Define database schema with table names and primary keys for the connector. + This function specifies the destination tables and their primary keys for Fivetran to create. + + Args: + configuration: Configuration dictionary (not used but required by SDK). + + Returns: + list: List of table schema dictionaries with table names and primary keys. + """ + return [ + {"table": "organizations", "primary_key": ["id"]}, + {"table": "products", "primary_key": ["id"]}, + {"table": "projects", "primary_key": ["id"]}, + ] + + +def update(configuration: dict, state: dict): + """ + Main synchronization function that fetches and processes data from the Xurrent API. + This function orchestrates the entire sync process using memory-efficient streaming patterns. + + Args: + configuration: Configuration dictionary containing API credentials and settings. + state: State dictionary containing sync cursors and checkpoints from previous runs. + + Raises: + RuntimeError: If sync fails due to API errors or configuration issues. + """ + log.info("Starting Xurrent API connector sync") + + # Extract configuration parameters (SDK auto-validates required fields) + oauth_token = __get_config_str(configuration, "oauth_token") + account_id = __get_config_str(configuration, "account_id") + max_records_per_page = __get_config_int( + configuration, "max_records_per_page", __DEFAULT_PAGE_SIZE, 1, __MAX_PAGE_SIZE + ) + enable_incremental = __get_config_bool(configuration, "enable_incremental_sync", True) + + # Get state for incremental sync + last_sync_time = state.get("last_sync_time") if enable_incremental else None + + try: + # Fetch organizations data using generator with incremental checkpointing + log.info("Fetching organizations data...") + org_count = 0 + org_page = 1 + + for record in get_organizations(oauth_token, account_id, last_sync_time, configuration): + # The 'upsert' operation is used to insert or update data in the destination table. + # The op.upsert method is called with two arguments: + # - The first argument is the name of the table to upsert the data into. + # - The second argument is a dictionary containing the data to be upserted, + op.upsert(table="organizations", data=record) + org_count += 1 + + # Checkpoint every page/batch to save progress incrementally + if org_count % max_records_per_page == 0: + checkpoint_state = { + "last_sync_time": record.get( + "updated_at", datetime.now(timezone.utc).isoformat() + ), + "last_processed_organizations_page": org_page, + } + # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume + # from the correct position in case of next sync or interruptions. + # Learn more about how and where to checkpoint by reading our best practices documentation + # (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation). + op.checkpoint(checkpoint_state) + org_page += 1 + + # Fetch products data using generator with incremental checkpointing + log.info("Fetching products data...") + product_count = 0 + product_page = 1 + + for record in get_products(oauth_token, account_id, last_sync_time, configuration): + # The 'upsert' operation is used to insert or update data in the destination table. + # The op.upsert method is called with two arguments: + # - The first argument is the name of the table to upsert the data into. + # - The second argument is a dictionary containing the data to be upserted, + op.upsert(table="products", data=record) + product_count += 1 + + # Checkpoint every page/batch to save progress incrementally + if product_count % max_records_per_page == 0: + checkpoint_state = { + "last_sync_time": record.get( + "updated_at", datetime.now(timezone.utc).isoformat() + ), + "last_processed_products_page": product_page, + } + # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume + # from the correct position in case of next sync or interruptions. + # Learn more about how and where to checkpoint by reading our best practices documentation + # (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation). + op.checkpoint(checkpoint_state) + product_page += 1 + + # Fetch projects data using generator with incremental checkpointing + log.info("Fetching projects data...") + project_count = 0 + project_page = 1 + + for record in get_projects(oauth_token, account_id, last_sync_time, configuration): + # The 'upsert' operation is used to insert or update data in the destination table. + # The op.upsert method is called with two arguments: + # - The first argument is the name of the table to upsert the data into. + # - The second argument is a dictionary containing the data to be upserted, + op.upsert(table="projects", data=record) + project_count += 1 + + # Checkpoint every page/batch to save progress incrementally + if project_count % max_records_per_page == 0: + checkpoint_state = { + "last_sync_time": record.get( + "updated_at", datetime.now(timezone.utc).isoformat() + ), + "last_processed_projects_page": project_page, + } + # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume + # from the correct position in case of next sync or interruptions. + # Learn more about how and where to checkpoint by reading our best practices documentation + # (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation). + op.checkpoint(checkpoint_state) + project_page += 1 + + # Final checkpoint with completion status + final_state = {"last_sync_time": datetime.now(timezone.utc).isoformat()} + # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume + # from the correct position in case of next sync or interruptions. + # Learn more about how and where to checkpoint by reading our best practices documentation + # (https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation). + op.checkpoint(final_state) + + log.info( + f"Sync completed successfully. Processed {org_count} organizations, {product_count} products, {project_count} projects." + ) + + except Exception as e: + log.severe(f"Sync failed: {str(e)}") + raise RuntimeError(f"Failed to sync data: {str(e)}") + + +# Entry point for the connector +connector = Connector(update=update, schema=schema) + +# Check if the script is being run as the main module. +# This is Python's standard entry method allowing your script to be run directly from the command line or IDE 'run' button. +# This is useful for debugging while you write your code. Note this method is not called by Fivetran when executing your connector in production. +# Please test using the Fivetran debug command prior to finalizing and deploying your connector. +if __name__ == "__main__": + # Open the configuration.json file and load its contents + with open("configuration.json", "r") as f: + configuration = json.load(f) + + # Test the connector locally + connector.debug(configuration=configuration) From bc50edcfdd856e166945116badec2586c0f66e2a Mon Sep 17 00:00:00 2001 From: Karun Veluru Date: Mon, 3 Nov 2025 18:22:33 -0600 Subject: [PATCH 2/4] feature(examples): Address PR comments --- connectors/xurrent/README.md | 42 ++++++++++----------------- connectors/xurrent/configuration.json | 12 ++++---- connectors/xurrent/connector.py | 25 +++++++--------- 3 files changed, 30 insertions(+), 49 deletions(-) diff --git a/connectors/xurrent/README.md b/connectors/xurrent/README.md index a02a076c6..b19b03bf3 100644 --- a/connectors/xurrent/README.md +++ b/connectors/xurrent/README.md @@ -4,7 +4,7 @@ This connector synchronizes organizations, products, and projects data from the Xurrent API into your data warehouse. It fetches organizational structures, product catalogs, and project management data using OAuth2 authentication with automatic token refresh. The connector implements memory-efficient streaming patterns to handle large datasets and supports incremental synchronization using timestamp-based cursors for optimal performance. ## Requirements -- [Supported Python versions](https://github.com/fivetran/fivetran_connector_sdk/blob/main/README.md#requirements): **3.9-3.13** +- [Supported Python versions](https://github.com/fivetran/fivetran_connector_sdk/blob/main/README.md#requirements): - Operating system: - Windows: 10 or later (64-bit only) - macOS: 13 (Ventura) or later (Apple Silicon [arm64] or Intel [x86_64]) @@ -28,27 +28,24 @@ Refer to the [Connector SDK Setup Guide](https://fivetran.com/docs/connectors/co { "oauth_token": "", "account_id": "", - "sync_frequency_hours": "", - "initial_sync_days": "", - "max_records_per_page": "", - "request_timeout_seconds": "", - "retry_attempts": "", - "enable_incremental_sync": "", - "enable_debug_logging": "" + "sync_frequency_hours": "", + "initial_sync_days": "", + "max_records_per_page": "", + "request_timeout_seconds": "", + "retry_attempts": "", + "enable_incremental_sync": "" } ``` **Configuration parameters:** -- `oauth_token`: OAuth Bearer token for API authentication (required) -- `account_id`: Xurrent account identifier for X-Xurrent-Account header (required) -- `base_url`: API base URL, defaults to https://api.xurrent.com/v1 -- `sync_frequency_hours`: How often to run sync in hours -- `initial_sync_days`: Days of historical data to fetch on first sync (1-365) -- `max_records_per_page`: Records per API request page (1-100, default: 25) -- `request_timeout_seconds`: HTTP request timeout in seconds (10-300, default: 30) -- `retry_attempts`: Number of retry attempts for failed requests (1-10, default: 3) -- `enable_incremental_sync`: Enable timestamp-based incremental sync (true/false) -- `enable_debug_logging`: Enable detailed debug logging (true/false) +- `oauth_token` (required): OAuth Bearer token for API authentication +- `account_id` (required): Xurrent account identifier for X-Xurrent-Account header +- `sync_frequency_hours` (optional): How often to run sync in hours +- `initial_sync_days` (optional): Days of historical data to fetch on first sync (1-365) +- `max_records_per_page` (optional): Records per API request page (1-100, default: 25) +- `request_timeout_seconds` (optional): HTTP request timeout in seconds (10-300, default: 30) +- `retry_attempts` (optional): Number of retry attempts for failed requests (1-10, default: 3) +- `enable_incremental_sync` (optional): Enable timestamp-based incremental sync (true/false) ## Requirements file This connector does not require any additional packages beyond those provided by the Fivetran environment. @@ -92,14 +89,5 @@ Column types are automatically inferred by Fivetran. Sample columns include `nam Organizations table includes parent-child relationships, manager assignments, and business unit associations. Products table contains brand information, service relationships, and support team assignments. Projects table tracks status, completion targets, and customer relationships. -## Additional files - -The connector includes several additional files to support functionality, testing, and deployment: - -- `requirements.txt` – Python dependency specification for Xurrent API integration and connector requirements including faker for mock testing. - -- `configuration.json` – Configuration template for API credentials and connector parameters (should be excluded from version control). - - ## Additional considerations The examples provided are intended to help you effectively use Fivetran's Connector SDK. While we've tested the code, Fivetran cannot be held responsible for any unexpected or negative consequences that may arise from using these examples. For inquiries, please reach out to our Support team. \ No newline at end of file diff --git a/connectors/xurrent/configuration.json b/connectors/xurrent/configuration.json index 3fbda028e..32a3650cf 100644 --- a/connectors/xurrent/configuration.json +++ b/connectors/xurrent/configuration.json @@ -1,11 +1,9 @@ { "oauth_token": "", "account_id": "", - "sync_frequency_hours": "", - "initial_sync_days": "", - "max_records_per_page": "", - "request_timeout_seconds": "", - "retry_attempts": "", - "enable_incremental_sync": "", - "enable_debug_logging": "" + "initial_sync_days": "", + "max_records_per_page": "", + "request_timeout_seconds": "", + "retry_attempts": "", + "enable_incremental_sync": "" } \ No newline at end of file diff --git a/connectors/xurrent/connector.py b/connectors/xurrent/connector.py index 6f1fb3bd8..d37207ace 100644 --- a/connectors/xurrent/connector.py +++ b/connectors/xurrent/connector.py @@ -480,14 +480,11 @@ def get_projects(oauth_token, account_id, last_sync_time=None, configuration=Non def schema(configuration: dict): """ - Define database schema with table names and primary keys for the connector. - This function specifies the destination tables and their primary keys for Fivetran to create. - + Define the schema function which lets you configure the schema your connector delivers. + See the technical reference documentation for more details on the schema function: + https://fivetran.com/docs/connectors/connector-sdk/technical-reference#schema Args: - configuration: Configuration dictionary (not used but required by SDK). - - Returns: - list: List of table schema dictionaries with table names and primary keys. + configuration: a dictionary that holds the configuration settings for the connector. """ return [ {"table": "organizations", "primary_key": ["id"]}, @@ -498,15 +495,13 @@ def schema(configuration: dict): def update(configuration: dict, state: dict): """ - Main synchronization function that fetches and processes data from the Xurrent API. - This function orchestrates the entire sync process using memory-efficient streaming patterns. - + Define the update function, which is a required function, and is called by Fivetran during each sync. + See the technical reference documentation for more details on the update function + https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update Args: - configuration: Configuration dictionary containing API credentials and settings. - state: State dictionary containing sync cursors and checkpoints from previous runs. - - Raises: - RuntimeError: If sync fails due to API errors or configuration issues. + configuration: A dictionary containing connection details + state: A dictionary containing state information from previous runs + The state dictionary is empty for the first sync or for any full re-sync """ log.info("Starting Xurrent API connector sync") From 28ad10785a3f4905e17ad62d120d9e3aa0893d7d Mon Sep 17 00:00:00 2001 From: Dejan Tucakov Date: Wed, 5 Nov 2025 15:22:31 +0100 Subject: [PATCH 3/4] Apply suggestions from code review --- connectors/xurrent/README.md | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/connectors/xurrent/README.md b/connectors/xurrent/README.md index b19b03bf3..bc8dff284 100644 --- a/connectors/xurrent/README.md +++ b/connectors/xurrent/README.md @@ -4,7 +4,7 @@ This connector synchronizes organizations, products, and projects data from the Xurrent API into your data warehouse. It fetches organizational structures, product catalogs, and project management data using OAuth2 authentication with automatic token refresh. The connector implements memory-efficient streaming patterns to handle large datasets and supports incremental synchronization using timestamp-based cursors for optimal performance. ## Requirements -- [Supported Python versions](https://github.com/fivetran/fivetran_connector_sdk/blob/main/README.md#requirements): +- [Supported Python versions](https://github.com/fivetran/fivetran_connector_sdk/blob/main/README.md#requirements) - Operating system: - Windows: 10 or later (64-bit only) - macOS: 13 (Ventura) or later (Apple Silicon [arm64] or Intel [x86_64]) @@ -15,10 +15,10 @@ Refer to the [Connector SDK Setup Guide](https://fivetran.com/docs/connectors/co ## Features - Syncs organizations, products, and projects data from Xurrent API -- OAuth2 authentication with Bearer token and account ID authentication (refer to `execute_api_request` function) -- Page-based pagination with automatic page traversal (refer to `get_organizations`, `get_products`, and `get_projects` functions) +- OAuth2 authentication with Bearer token and account ID authentication (refer to the `execute_api_request` function) +- Page-based pagination with automatic page traversal (refer to the `get_organizations`, `get_products`, and `get_projects` functions) - Memory-efficient streaming prevents data accumulation for large datasets -- Incremental synchronization using timestamp-based cursors (refer to `get_time_range` function) +- Incremental synchronization using timestamp-based cursors (refer to the `get_time_range` function) - Comprehensive error handling with exponential backoff retry logic - Rate limiting support with automatic retry after delays - Configurable pagination limits and timeout settings @@ -37,14 +37,14 @@ Refer to the [Connector SDK Setup Guide](https://fivetran.com/docs/connectors/co } ``` -**Configuration parameters:** -- `oauth_token` (required): OAuth Bearer token for API authentication -- `account_id` (required): Xurrent account identifier for X-Xurrent-Account header +Configuration parameters: +- `oauth_token` (required): OAuth bearer token for API authentication +- `account_id` (required): Xurrent account identifier for `X-Xurrent-Account` header - `sync_frequency_hours` (optional): How often to run sync in hours - `initial_sync_days` (optional): Days of historical data to fetch on first sync (1-365) -- `max_records_per_page` (optional): Records per API request page (1-100, default: 25) -- `request_timeout_seconds` (optional): HTTP request timeout in seconds (10-300, default: 30) -- `retry_attempts` (optional): Number of retry attempts for failed requests (1-10, default: 3) +- `max_records_per_page` (optional): Records per API request page (1-100, default: `25`) +- `request_timeout_seconds` (optional): HTTP request timeout in seconds (10-300, default: `30`) +- `retry_attempts` (optional): Number of retry attempts for failed requests (1-10, default: `3`) - `enable_incremental_sync` (optional): Enable timestamp-based incremental sync (true/false) ## Requirements file @@ -55,7 +55,7 @@ Note: The `fivetran_connector_sdk:latest` and `requests:latest` packages are pre ## Authentication 1. Log in to the [Xurrent Developer Portal](https://developer.xurrent.com/). 2. Register a new application to obtain OAuth credentials. -3. Generate a Personal Access Token or create an OAuth Application. +3. Generate a personal access token or create an OAuth application. 4. Make a note of the `oauth_token` from your application settings. 5. Retrieve your `account_id` from Xurrent administrators or account settings. 6. Use sandbox credentials for testing, production credentials for live syncing. @@ -63,7 +63,7 @@ Note: The `fivetran_connector_sdk:latest` and `requests:latest` packages are pre Note: The connector automatically handles OAuth2 authentication using Bearer tokens. Credentials are never logged or exposed in plain text. ## Pagination -Page-based pagination with automatic page traversal (refer to `get_organizations`, `get_products`, and `get_projects` functions). Generator-based processing prevents memory accumulation for large datasets. Processes pages sequentially while yielding individual records for immediate processing. Supports configurable page sizes from 1 to 100 records per request. +Page-based pagination with automatic page traversal (refer to the `get_organizations`, `get_products`, and `get_projects` functions). Generator-based processing prevents memory accumulation for large datasets. Processes pages sequentially while yielding individual records for immediate processing. Supports configurable page sizes from 1 to 100 records per request. ## Data handling Organization, product, and project data is mapped from Xurrent API format to normalized database columns (refer to the `__map_organization_data`, `__map_product_data`, and `__map_project_data` functions). Nested objects are flattened, and all timestamps are converted to UTC format for consistency. @@ -85,7 +85,7 @@ Supports timestamp-based incremental synchronization using the `last_sync_time` | PRODUCTS | `id` | Product catalog with categories and support information | | PROJECTS | `id` | Project management data with status and timeline information | -Column types are automatically inferred by Fivetran. Sample columns include `name`, `status`, `category`, `created_at`, `updated_at`, `manager_id`, `service_id`, `customer_id`. +Column types are automatically inferred by Fivetran. Sample columns include `name`, `status`, `category`, `created_at`, `updated_at`, `manager_id`, `service_id`, and `customer_id`. Organizations table includes parent-child relationships, manager assignments, and business unit associations. Products table contains brand information, service relationships, and support team assignments. Projects table tracks status, completion targets, and customer relationships. From f3efa449f2abc05c7aad8a563644b9adb569dd92 Mon Sep 17 00:00:00 2001 From: Dejan Tucakov Date: Wed, 5 Nov 2025 15:24:11 +0100 Subject: [PATCH 4/4] Enhance table descriptions in README.md Expanded descriptions for tables in the README. --- connectors/xurrent/README.md | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/connectors/xurrent/README.md b/connectors/xurrent/README.md index bc8dff284..b6a76c6d7 100644 --- a/connectors/xurrent/README.md +++ b/connectors/xurrent/README.md @@ -81,13 +81,11 @@ Supports timestamp-based incremental synchronization using the `last_sync_time` ## Tables created | Table | Primary Key | Description | |-------|-------------|-------------| -| ORGANIZATIONS | `id` | Organization hierarchy and management structure | -| PRODUCTS | `id` | Product catalog with categories and support information | -| PROJECTS | `id` | Project management data with status and timeline information | +| ORGANIZATIONS | `id` | Organization hierarchy and management structure. The table includes parent-child relationships, manager assignments, and business unit associations. | +| PRODUCTS | `id` | Product catalog with categories and support information. The table contains brand information, service relationships, and support team assignments. | +| PROJECTS | `id` | Project management data with status and timeline information. The table tracks status, completion targets, and customer relationships. | Column types are automatically inferred by Fivetran. Sample columns include `name`, `status`, `category`, `created_at`, `updated_at`, `manager_id`, `service_id`, and `customer_id`. -Organizations table includes parent-child relationships, manager assignments, and business unit associations. Products table contains brand information, service relationships, and support team assignments. Projects table tracks status, completion targets, and customer relationships. - ## Additional considerations -The examples provided are intended to help you effectively use Fivetran's Connector SDK. While we've tested the code, Fivetran cannot be held responsible for any unexpected or negative consequences that may arise from using these examples. For inquiries, please reach out to our Support team. \ No newline at end of file +The examples provided are intended to help you effectively use Fivetran's Connector SDK. While we've tested the code, Fivetran cannot be held responsible for any unexpected or negative consequences that may arise from using these examples. For inquiries, please reach out to our Support team.